summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-12-22 16:42:49 +0000
committerMatthew Sackman <matthew@lshift.net>2009-12-22 16:42:49 +0000
commit5853e213c08caa57ecd1e54b43a0b60926fe87e6 (patch)
tree25f0fb033b162f49795744daf40a66c3f915c1fc
parent95f466aa10e9ae29bf0d8b6b3f81846bb55bfbd0 (diff)
parent4e6dfcb1256cdab73233df88dba1aad543e71095 (diff)
downloadrabbitmq-server-git-5853e213c08caa57ecd1e54b43a0b60926fe87e6.tar.gz
merging in (and adjustments for startup) from default
-rw-r--r--src/rabbit.erl303
-rw-r--r--src/rabbit_alarm.erl10
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_error_logger.erl6
-rw-r--r--src/rabbit_networking.erl23
-rw-r--r--src/rabbit_plugin_activator.erl25
-rw-r--r--src/rabbit_queue_index.erl13
-rw-r--r--src/rabbit_sup.erl12
-rw-r--r--src/rabbit_tests.erl4
9 files changed, 273 insertions, 127 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index fe1be7c292..ac7ad04660 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -37,7 +37,100 @@
-export([start/2, stop/1]).
--export([log_location/1, start_child/2]).
+-export([log_location/1]).
+
+%%---------------------------------------------------------------------------
+%% Boot steps.
+-export([maybe_insert_default_data/0]).
+
+-rabbit_boot_step({codec_correctness_check,
+ [{description, "codec correctness check"},
+ {mfa, {rabbit_binary_generator,
+ check_empty_content_body_frame_size,
+ []}}]}).
+
+-rabbit_boot_step({database,
+ [{mfa, {rabbit_mnesia, init, []}},
+ {pre, kernel_ready}]}).
+
+-rabbit_boot_step({rabbit_log,
+ [{description, "logging server"},
+ {mfa, {rabbit_sup, start_child, [rabbit_log]}},
+ {pre, kernel_ready}]}).
+
+-rabbit_boot_step({rabbit_hooks,
+ [{description, "internal event notification system"},
+ {mfa, {rabbit_hooks, start, []}},
+ {pre, kernel_ready}]}).
+
+-rabbit_boot_step({kernel_ready,
+ [{description, "kernel ready"}]}).
+
+-rabbit_boot_step({rabbit_alarm,
+ [{description, "alarm handler"},
+ {mfa, {rabbit_alarm, start, []}},
+ {post, kernel_ready},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({rabbit_memory_monitor,
+ [{description, "memory moniter"},
+ {mfa, {rabbit_sup, start_child, [rabbit_memory_monitor]}},
+ {post, rabbit_alarm},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({guid_generator,
+ [{description, "guid generator"},
+ {mfa, {rabbit_sup, start_child, [rabbit_guid]}},
+ {post, kernel_ready},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({rabbit_router,
+ [{description, "cluster router"},
+ {mfa, {rabbit_sup, start_child, [rabbit_router]}},
+ {post, kernel_ready},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({rabbit_node_monitor,
+ [{description, "node monitor"},
+ {mfa, {rabbit_sup, start_child, [rabbit_node_monitor]}},
+ {post, kernel_ready},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({core_initialized,
+ [{description, "core initialized"}]}).
+
+-rabbit_boot_step({empty_db_check,
+ [{description, "empty DB check"},
+ {mfa, {?MODULE, maybe_insert_default_data, []}},
+ {post, core_initialized}]}).
+
+-rabbit_boot_step({exchange_recovery,
+ [{description, "exchange recovery"},
+ {mfa, {rabbit_exchange, recover, []}},
+ {post, empty_db_check}]}).
+
+-rabbit_boot_step({message_store_queue_sup_queue_recovery,
+ [{description, "message store, queue supervisor and queue recovery"},
+ {mfa, {rabbit_queue_index, start_msg_store, []}},
+ {post, exchange_recovery}]}).
+
+-rabbit_boot_step({routing_ready,
+ [{description, "message delivery logic ready"}]}).
+
+-rabbit_boot_step({log_relay,
+ [{description, "error log relay"},
+ {mfa, {rabbit_error_logger, boot, []}},
+ {post, routing_ready}]}).
+
+-rabbit_boot_step({networking,
+ [{mfa, {rabbit_networking, boot, []}},
+ {post, log_relay},
+ {pre, networking_listening}]}).
+
+-rabbit_boot_step({networking_listening,
+ [{description, "network listeners available"}]}).
+
+%%---------------------------------------------------------------------------
-import(application).
-import(mnesia).
@@ -67,7 +160,6 @@
{nodes, [erlang_node()]} |
{running_nodes, [erlang_node()]}]).
-spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()).
--spec(start_child/2 :: (atom(), [any()]) -> 'ok').
-endif.
@@ -113,102 +205,15 @@ rotate_logs(BinarySuffix) ->
%%--------------------------------------------------------------------
start(normal, []) ->
-
{ok, SupPid} = rabbit_sup:start_link(),
print_banner(),
-
- lists:foreach(
- fun ({Msg, Thunk}) ->
- io:format("starting ~-20s ...", [Msg]),
- Thunk(),
- io:format("done~n");
- ({Msg, M, F, A}) ->
- io:format("starting ~-20s ...", [Msg]),
- apply(M, F, A),
- io:format("done~n")
- end,
- [{"database",
- fun () -> ok = rabbit_mnesia:init() end},
- {"core processes",
- fun () ->
- ok = start_child(rabbit_log),
- ok = rabbit_hooks:start(),
-
- ok = rabbit_binary_generator:
- check_empty_content_body_frame_size(),
-
- ok = rabbit_alarm:start(),
- ok = start_child(file_handle_cache),
-
- {ok, MemoryWatermark} =
- application:get_env(vm_memory_high_watermark),
- ok = case MemoryWatermark == 0 of
- true ->
- ok;
- false ->
- start_child(vm_memory_monitor, [MemoryWatermark])
- end,
-
- ok = start_child(rabbit_memory_monitor),
- ok = start_child(rabbit_guid),
-
- ok = start_child(rabbit_router),
- ok = start_child(rabbit_node_monitor)
- end},
- {"recovery",
- fun () ->
- ok = maybe_insert_default_data(),
- ok = rabbit_exchange:recover(),
- DurableQueues = rabbit_amqqueue:find_durable_queues(),
- ok = rabbit_queue_index:start_msg_store(DurableQueues),
-
- ok = rabbit_amqqueue:start(),
-
- {ok, _RealDurableQueues} = rabbit_amqqueue:recover(DurableQueues)
- %% TODO - RealDurableQueues is a subset of
- %% DurableQueues. It may have queues removed which
- %% have since been recreated on another node in our
- %% cluster. We need to remove DurableQueues --
- %% RealDurableQueues somehow. See also bug 20916
- end},
- {"builtin applications",
- fun () ->
- {ok, DefaultVHost} = application:get_env(default_vhost),
- ok = error_logger:add_report_handler(
- rabbit_error_logger, [DefaultVHost]),
- ok = start_builtin_amq_applications()
- end},
- {"TCP listeners",
- fun () ->
- ok = rabbit_networking:start(),
- {ok, TcpListeners} = application:get_env(tcp_listeners),
- lists:foreach(
- fun ({Host, Port}) ->
- ok = rabbit_networking:start_tcp_listener(Host, Port)
- end,
- TcpListeners)
- end},
- {"SSL listeners",
- fun () ->
- case application:get_env(ssl_listeners) of
- {ok, []} ->
- ok;
- {ok, SslListeners} ->
- ok = rabbit_misc:start_applications([crypto, ssl]),
-
- {ok, SslOpts} = application:get_env(ssl_options),
-
- [rabbit_networking:start_ssl_listener
- (Host, Port, SslOpts) || {Host, Port} <- SslListeners],
- ok
- end
- end}]),
-
+ [ok = run_boot_step(Step) || Step <- boot_steps()],
io:format("~nbroker running~n"),
{ok, SupPid}.
+
stop(_State) ->
terminated_ok = error_logger:delete_report_handler(rabbit_error_logger),
ok = rabbit_alarm:stop(),
@@ -218,7 +223,105 @@ stop(_State) ->
end,
ok.
-%---------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
+
+boot_error(Format, Args) ->
+ io:format("BOOT ERROR: " ++ Format, Args),
+ error_logger:error_msg(Format, Args),
+ timer:sleep(1000),
+ exit({?MODULE, failure_during_boot}).
+
+run_boot_step({StepName, Attributes}) ->
+ Description = case lists:keysearch(description, 1, Attributes) of
+ {value, {_, D}} -> D;
+ false -> StepName
+ end,
+ case [MFA || {mfa, MFA} <- Attributes] of
+ [] ->
+ io:format("progress -- ~s~n", [Description]);
+ MFAs ->
+ io:format("starting ~-60s ...", [Description]),
+ [case catch apply(M,F,A) of
+ {'EXIT', Reason} ->
+ boot_error("FAILED~nReason: ~p~n", [Reason]);
+ ok ->
+ ok
+ end || {M,F,A} <- MFAs],
+ io:format("done~n"),
+ ok
+ end.
+
+boot_steps() ->
+ AllApps = [App || {App, _, _} <- application:loaded_applications()],
+ Modules = lists:usort(
+ lists:append([Modules
+ || {ok, Modules} <-
+ [application:get_key(App, modules)
+ || App <- AllApps]])),
+ UnsortedSteps =
+ lists:flatmap(fun (Module) ->
+ [{StepName, Attributes}
+ || {rabbit_boot_step, [{StepName, Attributes}]}
+ <- Module:module_info(attributes)]
+ end, Modules),
+ sort_boot_steps(UnsortedSteps).
+
+sort_boot_steps(UnsortedSteps) ->
+ G = digraph:new([acyclic]),
+
+ %% Add vertices, with duplicate checking.
+ [case digraph:vertex(G, StepName) of
+ false -> digraph:add_vertex(G, StepName, Step);
+ _ -> boot_error("Duplicate boot step name: ~w~n", [StepName])
+ end || Step = {StepName, _Attrs} <- UnsortedSteps],
+
+ %% Add edges, detecting cycles and missing vertices.
+ lists:foreach(fun ({StepName, Attributes}) ->
+ [add_boot_step_dep(G, StepName, PrecedingStepName)
+ || {post, PrecedingStepName} <- Attributes],
+ [add_boot_step_dep(G, SucceedingStepName, StepName)
+ || {pre, SucceedingStepName} <- Attributes]
+ end, UnsortedSteps),
+
+ %% Use topological sort to find a consistent ordering (if there is
+ %% one, otherwise fail).
+ SortedStepsRev = [begin
+ {StepName, Step} = digraph:vertex(G, StepName),
+ Step
+ end || StepName <- digraph_utils:topsort(G)],
+ SortedSteps = lists:reverse(SortedStepsRev),
+
+ digraph:delete(G),
+
+ %% Check that all mentioned {M,F,A} triples are exported.
+ case [{StepName, {M,F,A}}
+ || {StepName, Attributes} <- SortedSteps,
+ {mfa, {M,F,A}} <- Attributes,
+ not erlang:function_exported(M, F, length(A))] of
+ [] -> SortedSteps;
+ MissingFunctions -> boot_error("Boot step functions not exported: ~p~n",
+ [MissingFunctions])
+ end.
+
+add_boot_step_dep(G, RunsSecond, RunsFirst) ->
+ case digraph:add_edge(G, RunsSecond, RunsFirst) of
+ {error, Reason} ->
+ boot_error("Could not add boot step dependency of ~w on ~w:~n~s",
+ [RunsSecond, RunsFirst,
+ case Reason of
+ {bad_vertex, V} ->
+ io_lib:format("Boot step not registered: ~w~n", [V]);
+ {bad_edge, [First | Rest]} ->
+ [io_lib:format("Cyclic dependency: ~w", [First]),
+ [io_lib:format(" depends on ~w", [Next])
+ || Next <- Rest],
+ io_lib:format(" depends on ~w~n", [First])]
+ end]);
+ _ ->
+ ok
+ end.
+
+%%---------------------------------------------------------------------------
log_location(Type) ->
case application:get_env(Type, case Type of
@@ -277,16 +380,6 @@ print_banner() ->
lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings),
io:nl().
-start_child(Mod) ->
- start_child(Mod, []).
-
-start_child(Mod, Args) ->
- {ok,_} = supervisor:start_child(rabbit_sup,
- {Mod, {Mod, start_link, Args},
- %% 4294967295 is 2^32 - 1, which is the highest value allowed
- transient, 4294967295, worker, [Mod]}),
- ok.
-
ensure_working_log_handlers() ->
Handlers = gen_event:which_handlers(error_logger),
ok = ensure_working_log_handler(error_logger_file_h,
@@ -344,12 +437,6 @@ insert_default_data() ->
DefaultReadPerm),
ok.
-start_builtin_amq_applications() ->
- %%TODO: we may want to create a separate supervisor for these so
- %%they don't bring down the entire app when they die and fail to
- %%restart
- ok.
-
rotate_logs(File, Suffix, Handler) ->
rotate_logs(File, Suffix, Handler, Handler).
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 9a639ed40f..534409aaea 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -54,7 +54,15 @@
%%----------------------------------------------------------------------------
start() ->
- ok = alarm_handler:add_alarm_handler(?MODULE, []).
+ ok = alarm_handler:add_alarm_handler(?MODULE, []),
+ {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
+ ok = case MemoryWatermark == 0 of
+ true ->
+ ok;
+ false ->
+ rabbit_sup:start_child(vm_memory_monitor, [MemoryWatermark])
+ end,
+ ok.
stop() ->
ok = alarm_handler:delete_alarm_handler(?MODULE).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 77dbf03d87..36123fbd19 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -221,8 +221,8 @@ start_queue_process(Q = #amqqueue{name = QueueName}) ->
case supervisor:start_child(
rabbit_amqqueue_sup,
{QueueName, {rabbit_amqqueue_process, start_link, [Q]},
- %% 4294967295 is 2^32 - 1, which is the highest value allowed
- temporary, 4294967295, worker, [rabbit_amqqueue_process]}) of
+ %% 16#ffffffff is the biggest value allowed
+ temporary, 16#ffffffff, worker, [rabbit_amqqueue_process]}) of
{ok, Pid} ->
Q#amqqueue{pid = Pid};
{error, already_present} ->
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index b28574b707..b9bd71b78d 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -37,8 +37,14 @@
-behaviour(gen_event).
+-export([boot/0]).
+
-export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2, handle_info/2]).
+boot() ->
+ {ok, DefaultVHost} = application:get_env(default_vhost),
+ ok = error_logger:add_report_handler(?MODULE, [DefaultVHost]).
+
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 3a0f9240dd..84658a85c6 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -31,7 +31,7 @@
-module(rabbit_networking).
--export([start/0, start_tcp_listener/2, start_ssl_listener/3,
+-export([boot/0, start/0, start_tcp_listener/2, start_ssl_listener/3,
stop_tcp_listener/2, on_node_down/1, active_listeners/0,
node_listeners/1, connections/0, connection_info/1,
connection_info/2, connection_info_all/0,
@@ -82,6 +82,27 @@
%%----------------------------------------------------------------------------
+boot() ->
+ ok = start(),
+ ok = boot_tcp(),
+ ok = boot_ssl().
+
+boot_tcp() ->
+ {ok, TcpListeners} = application:get_env(tcp_listeners),
+ [ok = start_tcp_listener(Host, Port) || {Host, Port} <- TcpListeners],
+ ok.
+
+boot_ssl() ->
+ case application:get_env(ssl_listeners) of
+ {ok, []} ->
+ ok;
+ {ok, SslListeners} ->
+ ok = rabbit_misc:start_applications([crypto, ssl]),
+ {ok, SslOpts} = application:get_env(ssl_options),
+ [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners],
+ ok
+ end.
+
start() ->
{ok,_} = supervisor:start_child(
rabbit_sup,
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index 9f7879209b..4fcfab7895 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -96,12 +96,20 @@ start() ->
{ok, Module, Warnings} ->
%% This gets lots of spurious no-source warnings when we
%% have .ez files, so we want to supress them to prevent
- %% hiding real issues.
+ %% hiding real issues. On Ubuntu, we also get warnings
+ %% about kernel/stdlib sources being out of date, which we
+ %% also ignore for the same reason.
WarningStr = Module:format_warning(
[W || W <- Warnings,
case W of
{warning, {source_not_found, _}} -> false;
- _ -> true
+ {warning, {obj_out_of_date, {_,_,WApp,_,_}}}
+ when WApp == mnesia;
+ WApp == stdlib;
+ WApp == kernel;
+ WApp == sasl;
+ WApp == os_mon -> false;
+ _ -> true
end]),
case length(WarningStr) of
0 -> ok;
@@ -222,7 +230,7 @@ expand_dependencies(Current, [Next|Rest]) ->
post_process_script(ScriptFile) ->
case file:consult(ScriptFile) of
{ok, [{script, Name, Entries}]} ->
- NewEntries = process_entries(Entries),
+ NewEntries = lists:flatmap(fun process_entry/1, Entries),
case file:open(ScriptFile, [write]) of
{ok, Fd} ->
io:format(Fd, "%% script generated at ~w ~w~n~p.~n",
@@ -236,13 +244,10 @@ post_process_script(ScriptFile) ->
{error, {failed_to_load_script, Reason}}
end.
-process_entries([]) ->
- [];
-process_entries([Entry = {apply,{application,start_boot,[stdlib,permanent]}} |
- Rest]) ->
- [Entry, {apply,{rabbit,prepare,[]}} | Rest];
-process_entries([Entry|Rest]) ->
- [Entry | process_entries(Rest)].
+process_entry(Entry = {apply,{application,start_boot,[stdlib,permanent]}}) ->
+ [Entry, {apply,{rabbit,prepare,[]}}];
+process_entry(Entry) ->
+ [Entry].
error(Fmt, Args) ->
io:format("ERROR: " ++ Fmt ++ "~n", Args),
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 2007b00eab..46a6e008ec 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -34,7 +34,8 @@
-export([init/1, terminate/1, terminate_and_erase/1, write_published/4,
write_delivered/2, write_acks/2, sync_seq_ids/2, flush_journal/1,
read_segment_entries/2, next_segment_boundary/1, segment_size/0,
- find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]).
+ find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/0,
+ start_msg_store/1]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -137,6 +138,7 @@
-spec(segment_size/0 :: () -> non_neg_integer()).
-spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
+-spec(start_msg_store/0 :: () -> 'ok').
-spec(start_msg_store/1 :: ([amqqueue()]) -> 'ok').
-endif.
@@ -309,6 +311,13 @@ find_lowest_seq_id_seg_and_next_seq_id(State) ->
end,
{LowSeqIdSeg, NextSeqId, State}.
+start_msg_store() ->
+ DurableQueues = rabbit_amqqueue:find_durable_queues(),
+ ok = start_msg_store(DurableQueues),
+ ok = rabbit_amqqueue:start(),
+ {ok, _RealDurableQueues} = rabbit_amqqueue:recover(DurableQueues),
+ ok.
+
start_msg_store(DurableQueues) ->
DurableDict =
dict:from_list([ {queue_name_to_dir_name(Queue #amqqueue.name),
@@ -335,7 +344,7 @@ start_msg_store(DurableQueues) ->
end
end, {[], []}, Directories),
MsgStoreDir = filename:join(rabbit_mnesia:dir(), "msg_store"),
- ok = rabbit:start_child(rabbit_msg_store, [MsgStoreDir,
+ ok = rabbit_sup:start_child(rabbit_msg_store, [MsgStoreDir,
fun queue_index_walker/1,
DurableQueueNames]),
lists:foreach(fun (DirName) ->
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index 730d7909bc..2132e74303 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -33,7 +33,7 @@
-behaviour(supervisor).
--export([start_link/0]).
+-export([start_link/0, start_child/1, start_child/2]).
-export([init/1]).
@@ -42,5 +42,15 @@
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+start_child(Mod) ->
+ start_child(Mod, []).
+
+start_child(Mod, Args) ->
+ {ok, _} = supervisor:start_child(
+ ?SERVER, {Mod, {Mod, start_link, Args},
+ %% 16#ffffffff is the highest value allowed
+ transient, 16#ffffffff, worker, [Mod]}),
+ ok.
+
init([]) ->
{ok, {{one_for_one, 10, 10}, []}}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index f5d7978cae..62a4792cd4 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -881,8 +881,8 @@ start_msg_store_empty() ->
start_msg_store(fun (ok) -> finished end, ok).
start_msg_store(MsgRefDeltaGen, MsgRefDeltaGenInit) ->
- rabbit:start_child(rabbit_msg_store, [msg_store_dir(), MsgRefDeltaGen,
- MsgRefDeltaGenInit]).
+ rabbit_sup:start_child(rabbit_msg_store, [msg_store_dir(), MsgRefDeltaGen,
+ MsgRefDeltaGenInit]).
stop_msg_store() ->
case supervisor:terminate_child(rabbit_sup, rabbit_msg_store) of