summaryrefslogtreecommitdiff
path: root/src/rabbit.erl
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-08-05 10:03:12 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-08-05 10:03:12 +0100
commitf72086292bc13dcd5cbb9aa2e5aa9eacc6602d42 (patch)
tree2e15724d608a1d9733ba8d9c3dc2fed3676944b3 /src/rabbit.erl
parent625f9b72b3f8469f5e9935f3347b13e3ae1ae419 (diff)
parent36d6750b0f984a105c448609cb29f9037193d8b0 (diff)
downloadrabbitmq-server-git-f72086292bc13dcd5cbb9aa2e5aa9eacc6602d42.tar.gz
merge default into bug23056
Diffstat (limited to 'src/rabbit.erl')
-rw-r--r--src/rabbit.erl328
1 files changed, 202 insertions, 126 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 390b40d985..b1a8dc46a8 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -18,37 +18,43 @@
-behaviour(application).
--export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0,
+-export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, environment/0,
rotate_logs/1]).
-export([start/2, stop/1]).
--export([log_location/1]).
+-export([log_location/1]). %% for testing
%%---------------------------------------------------------------------------
%% Boot steps.
--export([maybe_insert_default_data/0, boot_delegate/0]).
+-export([maybe_insert_default_data/0, boot_delegate/0, recover/0]).
+
+-rabbit_boot_step({pre_boot, [{description, "rabbit boot start"}]}).
-rabbit_boot_step({codec_correctness_check,
[{description, "codec correctness check"},
{mfa, {rabbit_binary_generator,
check_empty_content_body_frame_size,
[]}},
+ {requires, pre_boot},
{enables, external_infrastructure}]}).
-rabbit_boot_step({database,
[{mfa, {rabbit_mnesia, init, []}},
+ {requires, file_handle_cache},
{enables, external_infrastructure}]}).
-rabbit_boot_step({file_handle_cache,
[{description, "file handle cache server"},
{mfa, {rabbit_sup, start_restartable_child,
[file_handle_cache]}},
+ {requires, pre_boot},
{enables, worker_pool}]}).
-rabbit_boot_step({worker_pool,
[{description, "worker pool"},
{mfa, {rabbit_sup, start_child, [worker_pool_sup]}},
+ {requires, pre_boot},
{enables, external_infrastructure}]}).
-rabbit_boot_step({external_infrastructure,
@@ -122,16 +128,22 @@
{requires, core_initialized},
{enables, routing_ready}]}).
--rabbit_boot_step({exchange_recovery,
- [{description, "exchange recovery"},
- {mfa, {rabbit_exchange, recover, []}},
+-rabbit_boot_step({recovery,
+ [{description, "exchange, queue and binding recovery"},
+ {mfa, {rabbit, recover, []}},
{requires, empty_db_check},
{enables, routing_ready}]}).
--rabbit_boot_step({queue_sup_queue_recovery,
- [{description, "queue supervisor and queue recovery"},
- {mfa, {rabbit_amqqueue, start, []}},
- {requires, empty_db_check},
+-rabbit_boot_step({mirror_queue_slave_sup,
+ [{description, "mirror queue slave sup"},
+ {mfa, {rabbit_mirror_queue_slave_sup, start, []}},
+ {requires, recovery},
+ {enables, routing_ready}]}).
+
+-rabbit_boot_step({mirrored_queues,
+ [{description, "adding mirrors to queues"},
+ {mfa, {rabbit_mirror_queue_misc, on_node_up, []}},
+ {requires, mirror_queue_slave_sup},
{enables, routing_ready}]}).
-rabbit_boot_step({routing_ready,
@@ -178,13 +190,17 @@
-spec(stop_and_halt/0 :: () -> 'ok').
-spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())).
-spec(status/0 ::
- () -> [{running_applications, [{atom(), string(), string()}]} |
- {nodes, [{rabbit_mnesia:node_type(), [node()]}]} |
- {running_nodes, [node()]}]).
+ () -> [{pid, integer()} |
+ {running_applications, [{atom(), string(), string()}]} |
+ {os, {atom(), atom()}} |
+ {erlang_version, string()} |
+ {memory, any()}]).
+-spec(environment/0 :: () -> [{atom() | term()}]).
-spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()).
-spec(maybe_insert_default_data/0 :: () -> 'ok').
-spec(boot_delegate/0 :: () -> 'ok').
+-spec(recover/0 :: () -> 'ok').
-spec(start/2 :: ('normal',[]) ->
{'error',
@@ -199,19 +215,20 @@
%%----------------------------------------------------------------------------
prepare() ->
- ok = ensure_working_log_handlers().
+ ok = ensure_working_log_handlers(),
+ ok = rabbit_upgrade:maybe_upgrade_mnesia().
start() ->
try
ok = prepare(),
- ok = rabbit_misc:start_applications(?APPS)
+ ok = rabbit_misc:start_applications(application_load_order())
after
%%give the error loggers some time to catch up
timer:sleep(100)
end.
stop() ->
- ok = rabbit_misc:stop_applications(?APPS).
+ ok = rabbit_misc:stop_applications(application_load_order()).
stop_and_halt() ->
try
@@ -223,8 +240,15 @@ stop_and_halt() ->
status() ->
[{pid, list_to_integer(os:getpid())},
- {running_applications, application:which_applications()}] ++
- rabbit_mnesia:status().
+ {running_applications, application:which_applications()},
+ {os, os:type()},
+ {erlang_version, erlang:system_info(system_version)},
+ {memory, erlang:memory()}].
+
+environment() ->
+ lists:keysort(
+ 1, [P || P = {K, _} <- application:get_all_env(rabbit),
+ K =/= default_pass]).
rotate_logs(BinarySuffix) ->
Suffix = binary_to_list(BinarySuffix),
@@ -240,6 +264,7 @@ rotate_logs(BinarySuffix) ->
start(normal, []) ->
case erts_version_check() of
ok ->
+ ok = rabbit_mnesia:delete_previously_running_nodes(),
{ok, SupPid} = rabbit_sup:start_link(),
true = register(rabbit, self()),
@@ -252,6 +277,7 @@ start(normal, []) ->
end.
stop(_State) ->
+ ok = rabbit_mnesia:record_running_nodes(),
terminated_ok = error_logger:delete_report_handler(rabbit_error_logger),
ok = rabbit_alarm:stop(),
ok = case rabbit_mnesia:is_clustered() of
@@ -261,20 +287,51 @@ stop(_State) ->
ok.
%%---------------------------------------------------------------------------
+%% application life cycle
+
+application_load_order() ->
+ ok = load_applications(),
+ {ok, G} = rabbit_misc:build_acyclic_graph(
+ fun (App, _Deps) -> [{App, App}] end,
+ fun (App, Deps) -> [{Dep, App} || Dep <- Deps] end,
+ [{App, app_dependencies(App)} ||
+ {App, _Desc, _Vsn} <- application:loaded_applications()]),
+ true = digraph:del_vertices(
+ G, digraph:vertices(G) -- digraph_utils:reachable(?APPS, G)),
+ Result = digraph_utils:topsort(G),
+ true = digraph:delete(G),
+ Result.
+
+load_applications() ->
+ load_applications(queue:from_list(?APPS), sets:new()).
+
+load_applications(Worklist, Loaded) ->
+ case queue:out(Worklist) of
+ {empty, _WorkList} ->
+ ok;
+ {{value, App}, Worklist1} ->
+ case sets:is_element(App, Loaded) of
+ true -> load_applications(Worklist1, Loaded);
+ false -> case application:load(App) of
+ ok -> ok;
+ {error, {already_loaded, App}} -> ok;
+ Error -> throw(Error)
+ end,
+ load_applications(
+ queue:join(Worklist1,
+ queue:from_list(app_dependencies(App))),
+ sets:add_element(App, Loaded))
+ end
+ end.
-erts_version_check() ->
- FoundVer = erlang:system_info(version),
- case rabbit_misc:version_compare(?ERTS_MINIMUM, FoundVer, lte) of
- true -> ok;
- false -> {error, {erlang_version_too_old,
- {found, FoundVer}, {required, ?ERTS_MINIMUM}}}
+app_dependencies(App) ->
+ case application:get_key(App, applications) of
+ undefined -> [];
+ {ok, Lst} -> Lst
end.
-boot_error(Format, Args) ->
- io:format("BOOT ERROR: " ++ Format, Args),
- error_logger:error_msg(Format, Args),
- timer:sleep(1000),
- exit({?MODULE, failure_during_boot}).
+%%---------------------------------------------------------------------------
+%% boot step logic
run_boot_step({StepName, Attributes}) ->
Description = case lists:keysearch(description, 1, Attributes) of
@@ -349,83 +406,46 @@ sort_boot_steps(UnsortedSteps) ->
end])
end.
-%%---------------------------------------------------------------------------
+boot_error(Format, Args) ->
+ io:format("BOOT ERROR: " ++ Format, Args),
+ error_logger:error_msg(Format, Args),
+ timer:sleep(1000),
+ exit({?MODULE, failure_during_boot}).
-log_location(Type) ->
- case application:get_env(Type, case Type of
- kernel -> error_logger;
- sasl -> sasl_error_logger
- end) of
- {ok, {file, File}} -> File;
- {ok, false} -> undefined;
- {ok, tty} -> tty;
- {ok, silent} -> undefined;
- {ok, Bad} -> throw({error, {cannot_log_to_file, Bad}});
- _ -> undefined
- end.
+%%---------------------------------------------------------------------------
+%% boot step functions
-app_location() ->
- {ok, Application} = application:get_application(),
- filename:absname(code:where_is_file(atom_to_list(Application) ++ ".app")).
+boot_delegate() ->
+ {ok, Count} = application:get_env(rabbit, delegate_count),
+ rabbit_sup:start_child(delegate_sup, [Count]).
-home_dir() ->
- case init:get_argument(home) of
- {ok, [[Home]]} -> Home;
- Other -> Other
- end.
+recover() ->
+ rabbit_binding:recover(rabbit_exchange:recover(), rabbit_amqqueue:start()).
-config_files() ->
- case init:get_argument(config) of
- {ok, Files} -> [filename:absname(
- filename:rootname(File, ".config") ++ ".config") ||
- File <- Files];
- error -> []
+maybe_insert_default_data() ->
+ case rabbit_mnesia:is_db_empty() of
+ true -> insert_default_data();
+ false -> ok
end.
-%---------------------------------------------------------------------------
+insert_default_data() ->
+ {ok, DefaultUser} = application:get_env(default_user),
+ {ok, DefaultPass} = application:get_env(default_pass),
+ {ok, DefaultTags} = application:get_env(default_user_tags),
+ {ok, DefaultVHost} = application:get_env(default_vhost),
+ {ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} =
+ application:get_env(default_permissions),
+ ok = rabbit_vhost:add(DefaultVHost),
+ ok = rabbit_auth_backend_internal:add_user(DefaultUser, DefaultPass),
+ ok = rabbit_auth_backend_internal:set_tags(DefaultUser, DefaultTags),
+ ok = rabbit_auth_backend_internal:set_permissions(DefaultUser, DefaultVHost,
+ DefaultConfigurePerm,
+ DefaultWritePerm,
+ DefaultReadPerm),
+ ok.
-print_banner() ->
- {ok, Product} = application:get_key(id),
- {ok, Version} = application:get_key(vsn),
- ProductLen = string:len(Product),
- io:format("~n"
- "+---+ +---+~n"
- "| | | |~n"
- "| | | |~n"
- "| | | |~n"
- "| +---+ +-------+~n"
- "| |~n"
- "| ~s +---+ |~n"
- "| | | |~n"
- "| ~s +---+ |~n"
- "| |~n"
- "+-------------------+~n"
- "~s~n~s~n~s~n~n",
- [Product, string:right([$v|Version], ProductLen),
- ?PROTOCOL_VERSION,
- ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
- Settings = [{"node", node()},
- {"app descriptor", app_location()},
- {"home dir", home_dir()},
- {"config file(s)", config_files()},
- {"cookie hash", rabbit_misc:cookie_hash()},
- {"log", log_location(kernel)},
- {"sasl log", log_location(sasl)},
- {"database dir", rabbit_mnesia:dir()},
- {"erlang version", erlang:system_info(version)}],
- DescrLen = 1 + lists:max([length(K) || {K, _V} <- Settings]),
- Format = fun (K, V) ->
- io:format("~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n",
- [K, V])
- end,
- lists:foreach(fun ({"config file(s)" = K, []}) ->
- Format(K, "(none)");
- ({"config file(s)" = K, [V0 | Vs]}) ->
- Format(K, V0), [Format("", V) || V <- Vs];
- ({K, V}) ->
- Format(K, V)
- end, Settings),
- io:nl().
+%%---------------------------------------------------------------------------
+%% logging
ensure_working_log_handlers() ->
Handlers = gen_event:which_handlers(error_logger),
@@ -464,35 +484,19 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler,
end
end.
-boot_delegate() ->
- {ok, Count} = application:get_env(rabbit, delegate_count),
- rabbit_sup:start_child(delegate_sup, [Count]).
-
-maybe_insert_default_data() ->
- case rabbit_mnesia:is_db_empty() of
- true -> insert_default_data();
- false -> ok
+log_location(Type) ->
+ case application:get_env(Type, case Type of
+ kernel -> error_logger;
+ sasl -> sasl_error_logger
+ end) of
+ {ok, {file, File}} -> File;
+ {ok, false} -> undefined;
+ {ok, tty} -> tty;
+ {ok, silent} -> undefined;
+ {ok, Bad} -> throw({error, {cannot_log_to_file, Bad}});
+ _ -> undefined
end.
-insert_default_data() ->
- {ok, DefaultUser} = application:get_env(default_user),
- {ok, DefaultPass} = application:get_env(default_pass),
- {ok, DefaultAdmin} = application:get_env(default_user_is_admin),
- {ok, DefaultVHost} = application:get_env(default_vhost),
- {ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} =
- application:get_env(default_permissions),
- ok = rabbit_vhost:add(DefaultVHost),
- ok = rabbit_auth_backend_internal:add_user(DefaultUser, DefaultPass),
- case DefaultAdmin of
- true -> rabbit_auth_backend_internal:set_admin(DefaultUser);
- _ -> ok
- end,
- ok = rabbit_auth_backend_internal:set_permissions(DefaultUser, DefaultVHost,
- DefaultConfigurePerm,
- DefaultWritePerm,
- DefaultReadPerm),
- ok.
-
rotate_logs(File, Suffix, Handler) ->
rotate_logs(File, Suffix, Handler, Handler).
@@ -515,3 +519,75 @@ log_rotation_result(ok, {error, SaslLogError}) ->
{error, {cannot_rotate_sasl_logs, SaslLogError}};
log_rotation_result(ok, ok) ->
ok.
+
+%%---------------------------------------------------------------------------
+%% misc
+
+erts_version_check() ->
+ FoundVer = erlang:system_info(version),
+ case rabbit_misc:version_compare(?ERTS_MINIMUM, FoundVer, lte) of
+ true -> ok;
+ false -> {error, {erlang_version_too_old,
+ {found, FoundVer}, {required, ?ERTS_MINIMUM}}}
+ end.
+
+print_banner() ->
+ {ok, Product} = application:get_key(id),
+ {ok, Version} = application:get_key(vsn),
+ ProductLen = string:len(Product),
+ io:format("~n"
+ "+---+ +---+~n"
+ "| | | |~n"
+ "| | | |~n"
+ "| | | |~n"
+ "| +---+ +-------+~n"
+ "| |~n"
+ "| ~s +---+ |~n"
+ "| | | |~n"
+ "| ~s +---+ |~n"
+ "| |~n"
+ "+-------------------+~n"
+ "~s~n~s~n~s~n~n",
+ [Product, string:right([$v|Version], ProductLen),
+ ?PROTOCOL_VERSION,
+ ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
+ Settings = [{"node", node()},
+ {"app descriptor", app_location()},
+ {"home dir", home_dir()},
+ {"config file(s)", config_files()},
+ {"cookie hash", rabbit_misc:cookie_hash()},
+ {"log", log_location(kernel)},
+ {"sasl log", log_location(sasl)},
+ {"database dir", rabbit_mnesia:dir()},
+ {"erlang version", erlang:system_info(version)}],
+ DescrLen = 1 + lists:max([length(K) || {K, _V} <- Settings]),
+ Format = fun (K, V) ->
+ io:format("~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n",
+ [K, V])
+ end,
+ lists:foreach(fun ({"config file(s)" = K, []}) ->
+ Format(K, "(none)");
+ ({"config file(s)" = K, [V0 | Vs]}) ->
+ Format(K, V0), [Format("", V) || V <- Vs];
+ ({K, V}) ->
+ Format(K, V)
+ end, Settings),
+ io:nl().
+
+app_location() ->
+ {ok, Application} = application:get_application(),
+ filename:absname(code:where_is_file(atom_to_list(Application) ++ ".app")).
+
+home_dir() ->
+ case init:get_argument(home) of
+ {ok, [[Home]]} -> Home;
+ Other -> Other
+ end.
+
+config_files() ->
+ case init:get_argument(config) of
+ {ok, Files} -> [filename:absname(
+ filename:rootname(File, ".config") ++ ".config") ||
+ File <- Files];
+ error -> []
+ end.