summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@lshift.net>2009-12-17 18:45:36 +0000
committerTony Garnock-Jones <tonyg@lshift.net>2009-12-17 18:45:36 +0000
commit21430df7fc48e1328f787bd26d62b095c6480fa1 (patch)
treef84e3f9d7013dad9bddbff192711a850be5ec4b5
parentbb173a12eb269eeca191369b8ac71dffce8292a1 (diff)
parent6e96d66ff58d2363a37368003f64a4c4082b91a1 (diff)
downloadrabbitmq-server-21430df7fc48e1328f787bd26d62b095c6480fa1.tar.gz
Merge default into bug22039
-rw-r--r--src/rabbit.erl313
-rw-r--r--src/rabbit_error_logger.erl6
-rw-r--r--src/rabbit_plugin_activator.erl25
-rw-r--r--src/rabbit_sup.erl11
4 files changed, 240 insertions, 115 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 3293927a..a8b65f46 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -39,6 +39,64 @@
-export([log_location/1]).
+%%---------------------------------------------------------------------------
+%% Boot steps.
+-export([boot_core_processes/0,
+ boot_recovery/0,
+ boot_tcp_listeners/0,
+ boot_ssl_listeners/0]).
+
+-rabbit_boot_step({database,
+ [{mfa, {rabbit_mnesia, init, []}}]}).
+
+-rabbit_boot_step({core_processes,
+ [{description, "core processes"},
+ {mfa, {?MODULE, boot_core_processes, []}},
+ {post, database},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({core_initialized,
+ [{description, "core initialized"}]}).
+
+-rabbit_boot_step({recovery,
+ [{mfa, {?MODULE, boot_recovery, []}},
+ {post, core_initialized}]}).
+
+-rabbit_boot_step({persister,
+ [{mfa, {rabbit_sup, start_child, [rabbit_persister]}},
+ {post, recovery}]}).
+
+-rabbit_boot_step({guid_generator,
+ [{description, "guid generator"},
+ {mfa, {rabbit_sup, start_child, [rabbit_guid]}},
+ {post, persister},
+ {pre, routing_ready}]}).
+
+-rabbit_boot_step({routing_ready,
+ [{description, "message delivery logic ready"}]}).
+
+-rabbit_boot_step({log_relay,
+ [{description, "error log relay"},
+ {mfa, {rabbit_error_logger, boot, []}},
+ {post, routing_ready}]}).
+
+-rabbit_boot_step({tcp_listeners,
+ [{description, "TCP listeners"},
+ {mfa, {?MODULE, boot_tcp_listeners, []}},
+ {post, log_relay},
+ {pre, networking_listening}]}).
+
+-rabbit_boot_step({ssl_listeners,
+ [{description, "SSL listeners"},
+ {mfa, {?MODULE, boot_ssl_listeners, []}},
+ {post, tcp_listeners},
+ {pre, networking_listening}]}).
+
+-rabbit_boot_step({networking_listening,
+ [{description, "network listeners available"}]}).
+
+%%---------------------------------------------------------------------------
+
-import(application).
-import(mnesia).
-import(lists).
@@ -79,7 +137,7 @@ prepare() ->
start() ->
try
ok = prepare(),
- ok = rabbit_misc:start_applications(?APPS)
+ ok = rabbit_misc:start_applications(?APPS)
after
%%give the error loggers some time to catch up
timer:sleep(100)
@@ -115,99 +173,15 @@ rotate_logs(BinarySuffix) ->
%%--------------------------------------------------------------------
start(normal, []) ->
-
{ok, SupPid} = rabbit_sup:start_link(),
print_banner(),
-
- lists:foreach(
- fun ({Msg, Thunk}) ->
- io:format("starting ~-20s ...", [Msg]),
- Thunk(),
- io:format("done~n");
- ({Msg, M, F, A}) ->
- io:format("starting ~-20s ...", [Msg]),
- apply(M, F, A),
- io:format("done~n")
- end,
- [{"database",
- fun () -> ok = rabbit_mnesia:init() end},
- {"core processes",
- fun () ->
- ok = start_child(rabbit_exchange_type),
- ok = start_child(rabbit_log),
- ok = rabbit_hooks:start(),
-
- ok = rabbit_binary_generator:
- check_empty_content_body_frame_size(),
-
- ok = rabbit_alarm:start(),
-
- {ok, MemoryWatermark} =
- application:get_env(vm_memory_high_watermark),
- ok = case MemoryWatermark == 0 of
- true ->
- ok;
- false ->
- start_child(vm_memory_monitor, [MemoryWatermark])
- end,
-
- ok = rabbit_amqqueue:start(),
-
- ok = start_child(rabbit_router),
- ok = start_child(rabbit_node_monitor)
- end},
- {"recovery",
- fun () ->
- ok = maybe_insert_default_data(),
- ok = rabbit_exchange:recover(),
- ok = rabbit_amqqueue:recover()
- end},
- {"persister",
- fun () ->
- ok = start_child(rabbit_persister)
- end},
- {"guid generator",
- fun () ->
- ok = start_child(rabbit_guid)
- end},
- {"builtin applications",
- fun () ->
- {ok, DefaultVHost} = application:get_env(default_vhost),
- ok = error_logger:add_report_handler(
- rabbit_error_logger, [DefaultVHost]),
- ok = start_builtin_amq_applications()
- end},
- {"TCP listeners",
- fun () ->
- ok = rabbit_networking:start(),
- {ok, TcpListeners} = application:get_env(tcp_listeners),
- lists:foreach(
- fun ({Host, Port}) ->
- ok = rabbit_networking:start_tcp_listener(Host, Port)
- end,
- TcpListeners)
- end},
- {"SSL listeners",
- fun () ->
- case application:get_env(ssl_listeners) of
- {ok, []} ->
- ok;
- {ok, SslListeners} ->
- ok = rabbit_misc:start_applications([crypto, ssl]),
-
- {ok, SslOpts} = application:get_env(ssl_options),
-
- [rabbit_networking:start_ssl_listener
- (Host, Port, SslOpts) || {Host, Port} <- SslListeners],
- ok
- end
- end}]),
-
+ [ok = run_boot_step(Step) || Step <- boot_steps()],
io:format("~nbroker running~n"),
{ok, SupPid}.
+
stop(_State) ->
terminated_ok = error_logger:delete_report_handler(rabbit_error_logger),
ok = rabbit_alarm:stop(),
@@ -217,10 +191,156 @@ stop(_State) ->
end,
ok.
+%%---------------------------------------------------------------------------
+
+boot_error(Format, Args) ->
+ io:format("BOOT ERROR: " ++ Format, Args),
+ error_logger:error_msg(Format, Args),
+ timer:sleep(1000),
+ exit({?MODULE, failure_during_boot}).
+
+run_boot_step({StepName, Attributes}) ->
+ Description = case lists:keysearch(description, 1, Attributes) of
+ {value, {_, D}} -> D;
+ false -> StepName
+ end,
+ case [MFA || {mfa, MFA} <- Attributes] of
+ [] ->
+ io:format("progress -- ~s~n", [Description]);
+ MFAs ->
+ io:format("starting ~-20s ...", [Description]),
+ [case catch apply(M,F,A) of
+ {'EXIT', Reason} ->
+ boot_error("FAILED~nReason: ~p~n", [Reason]);
+ ok ->
+ ok
+ end || {M,F,A} <- MFAs],
+ io:format("done~n"),
+ ok
+ end.
+
+boot_steps() ->
+ AllApps = [App || {App, _, _} <- application:loaded_applications()],
+ Modules = lists:usort(
+ lists:append([Modules
+ || {ok, Modules} <-
+ [application:get_key(App, modules)
+ || App <- AllApps]])),
+ UnsortedSteps =
+ lists:flatmap(fun (Module) ->
+ [{StepName, Attributes}
+ || {rabbit_boot_step, [{StepName, Attributes}]}
+ <- Module:module_info(attributes)]
+ end, Modules),
+ sort_boot_steps(UnsortedSteps).
+
+sort_boot_steps(UnsortedSteps) ->
+ G = digraph:new([acyclic]),
+
+ %% Add vertices, with duplicate checking.
+ [case digraph:vertex(G, StepName) of
+ false -> digraph:add_vertex(G, StepName, Step);
+ _ -> boot_error("Duplicate boot step name: ~w~n", [StepName])
+ end || Step = {StepName, _Attrs} <- UnsortedSteps],
+
+ %% Add edges, detecting cycles and missing vertices.
+ lists:foreach(fun ({StepName, Attributes}) ->
+ [add_boot_step_dep(G, StepName, PrecedingStepName)
+ || {post, PrecedingStepName} <- Attributes],
+ [add_boot_step_dep(G, SucceedingStepName, StepName)
+ || {pre, SucceedingStepName} <- Attributes]
+ end, UnsortedSteps),
+
+ %% Use topological sort to find a consistent ordering (if there is
+ %% one, otherwise fail).
+ SortedStepsRev = [begin
+ {StepName, Step} = digraph:vertex(G, StepName),
+ Step
+ end || StepName <- digraph_utils:topsort(G)],
+ SortedSteps = lists:reverse(SortedStepsRev),
+
+ digraph:delete(G),
+
+ %% Check that all mentioned {M,F,A} triples are exported.
+ case [{StepName, {M,F,A}}
+ || {StepName, Attributes} <- SortedSteps,
+ {mfa, {M,F,A}} <- Attributes,
+ not erlang:function_exported(M, F, length(A))] of
+ [] -> SortedSteps;
+ MissingFunctions -> boot_error("Boot step functions not exported: ~p~n",
+ [MissingFunctions])
+ end.
+
+add_boot_step_dep(G, RunsSecond, RunsFirst) ->
+ case digraph:add_edge(G, RunsSecond, RunsFirst) of
+ {error, Reason} ->
+ boot_error("Could not add boot step dependency of ~w on ~w:~n~s",
+ [RunsSecond, RunsFirst,
+ case Reason of
+ {bad_vertex, V} ->
+ io_lib:format("Boot step not registered: ~w~n", [V]);
+ {bad_edge, [First | Rest]} ->
+ [io_lib:format("Cyclic dependency: ~w", [First]),
+ [io_lib:format(" depends on ~w", [Next])
+ || Next <- Rest],
+ io_lib:format(" depends on ~w~n", [First])]
+ end]);
+ _ ->
+ ok
+ end.
+
+%%---------------------------------------------------------------------------
+
+boot_core_processes() ->
+ ok = rabbit_sup:start_child(rabbit_exchange_type),
+ ok = rabbit_sup:start_child(rabbit_log),
+ ok = rabbit_hooks:start(),
+
+ ok = rabbit_binary_generator:check_empty_content_body_frame_size(),
+
+ ok = rabbit_alarm:start(),
+
+ {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
+ ok = case MemoryWatermark == 0 of
+ true ->
+ ok;
+ false ->
+ rabbit_sup:start_child(vm_memory_monitor, [MemoryWatermark])
+ end,
+
+ ok = rabbit_amqqueue:start(),
+
+ ok = rabbit_sup:start_child(rabbit_router),
+ ok = rabbit_sup:start_child(rabbit_node_monitor).
+
+boot_recovery() ->
+ ok = maybe_insert_default_data(),
+ ok = rabbit_exchange:recover(),
+ ok = rabbit_amqqueue:recover().
+
+boot_tcp_listeners() ->
+ ok = rabbit_networking:start(),
+ {ok, TcpListeners} = application:get_env(tcp_listeners),
+ [ok = rabbit_networking:start_tcp_listener(Host, Port)
+ || {Host, Port} <- TcpListeners],
+ ok.
+
+boot_ssl_listeners() ->
+ case application:get_env(ssl_listeners) of
+ {ok, []} ->
+ ok;
+ {ok, SslListeners} ->
+ ok = rabbit_misc:start_applications([crypto, ssl]),
+ {ok, SslOpts} = application:get_env(ssl_options),
+ [rabbit_networking:start_ssl_listener(Host, Port, SslOpts)
+ || {Host, Port} <- SslListeners],
+ ok
+ end.
+
%---------------------------------------------------------------------------
log_location(Type) ->
- case application:get_env(Type, case Type of
+ case application:get_env(Type, case Type of
kernel -> error_logger;
sasl -> sasl_error_logger
end) of
@@ -276,15 +396,6 @@ print_banner() ->
lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings),
io:nl().
-start_child(Mod) ->
- start_child(Mod, []).
-
-start_child(Mod, Args) ->
- {ok,_} = supervisor:start_child(rabbit_sup,
- {Mod, {Mod, start_link, Args},
- transient, 100, worker, [Mod]}),
- ok.
-
ensure_working_log_handlers() ->
Handlers = gen_event:which_handlers(error_logger),
ok = ensure_working_log_handler(error_logger_file_h,
@@ -310,7 +421,7 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler,
throw({error, {cannot_log_to_tty,
TTYHandler, not_installed}})
end;
- _ -> case lists:member(NewFHandler, Handlers) of
+ _ -> case lists:member(NewFHandler, Handlers) of
true -> ok;
false -> case rotate_logs(LogLocation, "",
OldFHandler, NewFHandler) of
@@ -342,12 +453,6 @@ insert_default_data() ->
DefaultReadPerm),
ok.
-start_builtin_amq_applications() ->
- %%TODO: we may want to create a separate supervisor for these so
- %%they don't bring down the entire app when they die and fail to
- %%restart
- ok.
-
rotate_logs(File, Suffix, Handler) ->
rotate_logs(File, Suffix, Handler, Handler).
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 297ed5aa..9651ae12 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -37,8 +37,14 @@
-behaviour(gen_event).
+-export([boot/0]).
+
-export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2, handle_info/2]).
+boot() ->
+ {ok, DefaultVHost} = application:get_env(default_vhost),
+ ok = error_logger:add_report_handler(?MODULE, [DefaultVHost]).
+
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index 9f787920..4fcfab78 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -96,12 +96,20 @@ start() ->
{ok, Module, Warnings} ->
%% This gets lots of spurious no-source warnings when we
%% have .ez files, so we want to supress them to prevent
- %% hiding real issues.
+ %% hiding real issues. On Ubuntu, we also get warnings
+ %% about kernel/stdlib sources being out of date, which we
+ %% also ignore for the same reason.
WarningStr = Module:format_warning(
[W || W <- Warnings,
case W of
{warning, {source_not_found, _}} -> false;
- _ -> true
+ {warning, {obj_out_of_date, {_,_,WApp,_,_}}}
+ when WApp == mnesia;
+ WApp == stdlib;
+ WApp == kernel;
+ WApp == sasl;
+ WApp == os_mon -> false;
+ _ -> true
end]),
case length(WarningStr) of
0 -> ok;
@@ -222,7 +230,7 @@ expand_dependencies(Current, [Next|Rest]) ->
post_process_script(ScriptFile) ->
case file:consult(ScriptFile) of
{ok, [{script, Name, Entries}]} ->
- NewEntries = process_entries(Entries),
+ NewEntries = lists:flatmap(fun process_entry/1, Entries),
case file:open(ScriptFile, [write]) of
{ok, Fd} ->
io:format(Fd, "%% script generated at ~w ~w~n~p.~n",
@@ -236,13 +244,10 @@ post_process_script(ScriptFile) ->
{error, {failed_to_load_script, Reason}}
end.
-process_entries([]) ->
- [];
-process_entries([Entry = {apply,{application,start_boot,[stdlib,permanent]}} |
- Rest]) ->
- [Entry, {apply,{rabbit,prepare,[]}} | Rest];
-process_entries([Entry|Rest]) ->
- [Entry | process_entries(Rest)].
+process_entry(Entry = {apply,{application,start_boot,[stdlib,permanent]}}) ->
+ [Entry, {apply,{rabbit,prepare,[]}}];
+process_entry(Entry) ->
+ [Entry].
error(Fmt, Args) ->
io:format("ERROR: " ++ Fmt ++ "~n", Args),
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index 730d7909..ef32544c 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -33,7 +33,7 @@
-behaviour(supervisor).
--export([start_link/0]).
+-export([start_link/0, start_child/1, start_child/2]).
-export([init/1]).
@@ -42,5 +42,14 @@
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+start_child(Mod) ->
+ start_child(Mod, []).
+
+start_child(Mod, Args) ->
+ {ok, _} = supervisor:start_child(?SERVER,
+ {Mod, {Mod, start_link, Args},
+ transient, 100, worker, [Mod]}),
+ ok.
+
init([]) ->
{ok, {{one_for_one, 10, 10}, []}}.