diff options
| author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-06-27 17:07:21 +0100 |
|---|---|---|
| committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-06-27 17:07:21 +0100 |
| commit | 6cacd4b0696e5eb6341babcefad6c31800d3e396 (patch) | |
| tree | eada785bb313b29937e59d95a7487c6f6a2b753f /src | |
| parent | 22d0abdbcf0d5e5813d68f0094bef47d00413349 (diff) | |
| download | rabbitmq-server-git-6cacd4b0696e5eb6341babcefad6c31800d3e396.tar.gz | |
moved the status file handling to `rabbit_node_monitor', status from mnesia when online
I'm much happier with this configuration, with the exception of one
problem: `rabbit_mnesia:cluster_status' will happily return the data from
mnesia whenever mnesia is online, but that gives wrong results when the
node is a RAM node and it just got online. I have some ideas to improve this.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 281 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 309 |
3 files changed, 320 insertions, 275 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 0d2e27b931..7bdde56954 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -176,7 +176,7 @@ -rabbit_boot_step({notify_cluster, [{description, "notify cluster nodes"}, - {mfa, {rabbit_node_monitor, notify_cluster, []}}, + {mfa, {rabbit_node_monitor, notify_node_up, []}}, {requires, networking}]}). %%--------------------------------------------------------------------------- @@ -332,7 +332,8 @@ start_it(StartFun) -> stop() -> rabbit_log:info("Stopping Rabbit~n"), - ok = app_utils:stop_applications(app_shutdown_order()). + ok = app_utils:stop_applications(app_shutdown_order()), + rabbit_node_monitor:this_node_down(). stop_and_halt() -> try diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 5c315a07c6..727054ad16 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -30,27 +30,25 @@ is_db_empty/0, is_clustered/0, all_clustered_nodes/0, - all_clustered_disc_nodes/0, + clustered_disc_nodes/0, running_clustered_nodes/0, is_disc_node/0, dir/0, table_names/0, wait_for_tables/1, + cluster_status_from_mnesia/0, init_db/3, empty_ram_only_tables/0, copy_db/1, wait_for_tables/0, - on_node_up/2, - on_node_down/1, - on_node_join/2, - on_node_leave/1 + on_node_up/1, + on_node_down/1 ]). %% Used internally in rpc calls --export([cluster_status_if_running/0, - node_info/0, +-export([node_info/0, remove_node_if_mnesia_running/1 ]). @@ -64,11 +62,11 @@ -ifdef(use_specs). --export_type([node_type/0]). +-export_type([node_type/0, cluster_status/0]). -type(node_type() :: disc | ram). --type(node_status() :: {ordsets:ordset(node()), ordsets:ordset(node()), - ordsets:ordset(node())}). +-type(cluster_status() :: {ordsets:ordset(node()), ordsets:ordset(node()), + ordsets:ordset(node())}). %% Main interface -spec(prepare/0 :: () -> 'ok'). @@ -86,11 +84,13 @@ -spec(is_db_empty/0 :: () -> boolean()). -spec(is_clustered/0 :: () -> boolean()). -spec(all_clustered_nodes/0 :: () -> [node()]). --spec(all_clustered_disc_nodes/0 :: () -> [node()]). +-spec(clustered_disc_nodes/0 :: () -> [node()]). -spec(running_clustered_nodes/0 :: () -> [node()]). -spec(is_disc_node/0 :: () -> boolean()). -spec(dir/0 :: () -> file:filename()). -spec(table_names/0 :: () -> [atom()]). +-spec(cluster_status_from_mnesia/0 :: () -> {'ok', cluster_status()} | + {'error', 'mnesia_not_running'}). %% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit' -spec(init_db/3 :: ([node()], boolean(), boolean()) -> 'ok'). @@ -98,17 +98,15 @@ -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). -spec(wait_for_tables/1 :: ([atom()]) -> 'ok'). +-spec(check_cluster_consistency/0 :: () -> 'ok'). %% Hooks used in `rabbit_node_monitor' --spec(on_node_up/2 :: (node(), boolean()) -> 'ok'). +-spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). --spec(on_node_leave/1 :: (node()) -> 'ok'). %% Functions used in internal rpc calls --spec(cluster_status_if_running/0 :: () -> {'ok', node_status()} | - {'error', 'node_not_running'}). -spec(node_info/0 :: () -> {string(), string(), - ({'ok', node_status()} | 'error')}). + ({'ok', cluster_status()} | 'error')}). -spec(remove_node_if_mnesia_running/1 :: (node()) -> 'ok' | {'error', term()}). @@ -118,49 +116,19 @@ %% Main interface %%---------------------------------------------------------------------------- -%% Sets up the cluster status file when needed, taking care of the legacy -%% files prepare() -> ensure_mnesia_dir(), - NotPresent = - fun (AllNodes0, WantDiscNode) -> - ThisNode = [node()], - - RunningNodes0 = legacy_read_previously_running_nodes(), - legacy_delete_previously_running_nodes(), - - RunningNodes = lists:usort(RunningNodes0 ++ ThisNode), - AllNodes = - lists:usort(AllNodes0 ++ RunningNodes), - DiscNodes = case WantDiscNode of - true -> ThisNode; - false -> [] - end, - - ok = write_cluster_status_file({AllNodes, DiscNodes, RunningNodes}) - end, - case try_read_cluster_status_file() of - {ok, _} -> - %% We check the consistency only when the cluster status exists, - %% since when it doesn't exist it means that we just started a fresh - %% node, and when we have a legacy node with an old - %% "cluster_nodes.config" we can't check the consistency anyway - check_cluster_consistency(), - ok; - {error, {invalid_term, _, [AllNodes]}} -> - %% Legacy file - NotPresent(AllNodes, should_be_disc_node(AllNodes)); - {error, {cannot_read_file, _, enoent}} -> - {ok, {AllNodes, WantDiscNode}} = - application:get_env(rabbit, cluster_nodes), - NotPresent(AllNodes, WantDiscNode) - end. + rabbit_node_monitor:prepare_cluster_status_file(), + check_cluster_consistency(). init() -> ensure_mnesia_running(), ensure_mnesia_dir(), - {AllNodes, DiscNodes, _} = read_cluster_status_file(), - DiscNode = should_be_disc_node(DiscNodes), + %% Here we want the cluster status *file*, not the status from mnesia, + %% because in the case of RAM nodes the status from mnesia doesn't matter. + Status = {AllNodes, _, _} = + rabbit_node_monitor:read_cluster_status_file(), + DiscNode = is_disc_node(Status), init_db_and_upgrade(AllNodes, DiscNode, DiscNode), %% We intuitively expect the global name server to be synced when %% Mnesia is up. In fact that's not guaranteed to be the case - let's @@ -218,7 +186,7 @@ join_cluster(DiscoveryNode, WantDiscNode) -> %% Join the cluster ok = init_db_with_mnesia(DiscNodes, WantDiscNode, false), - rabbit_node_monitor:notify_join_cluster(), + rabbit_node_monitor:notify_joined_cluster(), ok. @@ -246,22 +214,17 @@ reset(Force) -> Node = node(), case Force of true -> - ok; + all_clustered_nodes(); false -> + AllNodes0 = all_clustered_nodes(), %% Reconnecting so that we will get an up to date nodes - try - %% Force=true here so that reset still works when - %% clustered with a node which is down - start_mnesia(), - {AllNodes, DiscNodes, _} = read_cluster_status_file(), - init_db_and_upgrade( - AllNodes, should_be_disc_node(DiscNodes), true) - after - stop_mnesia() - end, + %% Force=true here so that reset still works when + %% clustered with a node which is down + init_db_with_mnesia(AllNodes0, is_disc_node(), true), leave_cluster(), rabbit_misc:ensure_ok(mnesia:delete_schema([Node]), - cannot_delete_schema) + cannot_delete_schema), + all_clustered_nodes() end, %% We need to make sure that we don't end up in a distributed Erlang system %% with nodes while not being in an Mnesia cluster with them. We don't @@ -269,7 +232,7 @@ reset(Force) -> [erlang:disconnect_node(N) || N <- all_clustered_nodes()], %% remove persisted messages and any other garbage we find ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")), - ok = write_cluster_status_file(initial_cluster_status()), + ok = rabbit_node_monitor:reset_cluster_status_file(), ok. change_node_type(Type) -> @@ -376,7 +339,7 @@ status() -> IfNonEmpty = fun (_, []) -> []; (Type, Nodes) -> [{Type, Nodes}] end, - [{nodes, (IfNonEmpty(disc, all_clustered_disc_nodes()) ++ + [{nodes, (IfNonEmpty(disc, clustered_disc_nodes()) ++ IfNonEmpty(ram, all_clustered_ram_nodes()))}, {running_nodes, running_clustered_nodes()}]. @@ -386,7 +349,7 @@ is_db_empty() -> is_clustered() -> Nodes = all_clustered_nodes(), - [node()] /= Nodes andalso [] /= Nodes. + [node()] =/= Nodes andalso [] =/= Nodes. is_disc_and_clustered() -> is_disc_node() andalso is_clustered(). @@ -398,7 +361,7 @@ all_clustered_nodes() -> {AllNodes, _, _} = cluster_status(), AllNodes. -all_clustered_disc_nodes() -> +clustered_disc_nodes() -> {_, DiscNodes, _} = cluster_status(), DiscNodes. @@ -416,25 +379,38 @@ running_clustered_disc_nodes() -> %% This function is the actual source of information, since it gets the data %% from mnesia. Obviously it'll work only when mnesia is running. -cluster_status_if_running() -> +cluster_status_from_mnesia() -> + Check = fun (Nodes) -> + case mnesia:system_info(use_dir) of + true -> ordsets:add_element(node(), Nodes); + false -> Nodes + end + end, case mnesia:system_info(is_running) of - no -> {error, node_not_running}; + no -> {error, mnesia_not_running}; yes -> {ok, {ordsets:from_list(mnesia:system_info(db_nodes)), - ordsets:from_list(mnesia:table_info(schema, disc_copies)), + Check(ordsets:from_list( + mnesia:table_info(schema, disc_copies))), ordsets:from_list(mnesia:system_info(running_db_nodes))}} end. cluster_status() -> - case cluster_status_if_running() of - {ok, Status} -> Status; - {error, node_not_running} -> read_cluster_status_file() + case cluster_status_from_mnesia() of + {ok, Status} -> + Status; + {error, mnesia_not_running} -> + rabbit_node_monitor:read_cluster_status_file() end. node_info() -> {erlang:system_info(otp_release), rabbit_misc:rabbit_version(), - cluster_status_if_running()}. + cluster_status_from_mnesia()}. + +is_disc_node() -> + is_disc_node(cluster_status()). -is_disc_node() -> mnesia:system_info(use_dir). +is_disc_node({_, DiscNodes, _}) -> + DiscNodes =:= [] orelse ordsets:is_element(node(), DiscNodes). dir() -> mnesia:system_info(directory). @@ -453,7 +429,10 @@ init_db(ClusterNodes, WantDiscNode, Force) -> {error, Reason} -> throw({error, Reason}); {ok, Nodes} -> - WasDiscNode = is_disc_node(), + %% Note that we use `system_info' here and not the cluster status + %% since when we start rabbit for the first time the cluster status + %% will say we are a disc node but the tables won't be present yet. + WasDiscNode = mnesia:system_info(use_dir), case {Nodes, WasDiscNode, WantDiscNode} of {[], _, false} -> %% Standalone ram node, we don't want that @@ -482,7 +461,7 @@ init_db(ClusterNodes, WantDiscNode, Force) -> end end, ensure_schema_integrity(), - update_cluster_status_file(), + rabbit_node_monitor:update_cluster_status_file(), ok end. @@ -510,9 +489,7 @@ init_db_with_mnesia(ClusterNodes, WantDiscNode, Force) -> after stop_mnesia() end, - {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(), - write_cluster_status_file( - {AllNodes, DiscNodes, ordsets:del_element(node(), RunningNodes)}). + rabbit_node_monitor:this_node_down(). ensure_mnesia_dir() -> MnesiaDir = dir() ++ "/", @@ -610,9 +587,6 @@ wait_for_tables(TableNames) -> throw({error, {failed_waiting_for_tables, Reason}}) end. -should_be_disc_node(DiscNodes) -> - DiscNodes == [] orelse lists:member(node(), DiscNodes). - %% This does not guarantee us much, but it avoids some situations that will %% definitely end up badly check_cluster_consistency() -> @@ -652,7 +626,7 @@ check_cluster_consistency() -> case rpc:call(Node, rabbit_mnesia, node_info, []) of {badrpc, _Reason} -> ok; - {OTP, Rabbit, {error, node_not_running}} -> + {OTP, Rabbit, {error, mnesia_not_running}} -> CheckOTP(OTP), CheckRabbit(Rabbit); {OTP, Rabbit, {ok, {AllNodes, _, _}}} -> @@ -662,104 +636,21 @@ check_cluster_consistency() -> end end, all_clustered_nodes()). -%%---------------------------------------------------------------------------- -%% Cluster status file functions -%%---------------------------------------------------------------------------- - -%% The cluster node status file contains all we need to know about the cluster: -%% -%% * All the clustered nodes -%% * The disc nodes -%% * The running nodes. -%% -%% If the current node is a disc node it will be included in the disc nodes -%% list. -%% -%% We strive to keep the file up to date and we rely on this assumption in -%% various situations. Obviously when mnesia is offline the information we have -%% will be outdated, but it can't be otherwise. - -cluster_status_file_filename() -> - dir() ++ "/cluster_nodes.config". - -initial_cluster_status() -> - {[node()], [node()], [node()]}. - -write_cluster_status_file(Status) -> - FileName = cluster_status_file_filename(), - case rabbit_file:write_term_file(FileName, [Status]) of - ok -> ok; - {error, Reason} -> - throw({error, {cannot_write_cluster_status_file, - FileName, Reason}}) - end. - -try_read_cluster_status_file() -> - FileName = cluster_status_file_filename(), - case rabbit_file:read_term_file(FileName) of - {ok, [{_, _, _} = Status]} -> - {ok, Status}; - {ok, Term} -> - {error, {invalid_term, FileName, Term}}; - {error, Reason} -> - {error, {cannot_read_file, FileName, Reason}} - end. - -read_cluster_status_file() -> - case try_read_cluster_status_file() of - {ok, Status} -> - Status; - {error, Reason} -> - throw({error, {cannot_read_cluster_status_file, Reason}}) - end. - -%% To update the cluster status when mnesia is running. -update_cluster_status_file() -> - {ok, Status} = cluster_status_if_running(), - write_cluster_status_file(Status). - %%-------------------------------------------------------------------- %% Hooks for `rabbit_node_monitor' %%-------------------------------------------------------------------- -on_node_up(Node, IsDiscNode) -> - {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(), - write_cluster_status_file({ordsets:add_element(Node, AllNodes), - case IsDiscNode of - true -> ordsets:add_element(Node, - DiscNodes); - false -> DiscNodes - end, - ordsets:add_element(Node, RunningNodes)}), - case is_only_running_disc_node(Node) of +on_node_up(Node) -> + case running_clustered_disc_nodes() =:= [Node] of true -> rabbit_log:info("cluster contains disc nodes again~n"); false -> ok end. -on_node_down(Node) -> - case is_only_running_disc_node(Node) of +on_node_down(_Node) -> + case running_clustered_disc_nodes() =:= [] of true -> rabbit_log:info("only running disc node went down~n"); false -> ok - end, - {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(), - write_cluster_status_file({AllNodes, DiscNodes, - ordsets:del_element(Node, RunningNodes)}). - -on_node_join(Node, IsDiscNode) -> - {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(), - write_cluster_status_file({ordsets:add_element(Node, AllNodes), - case IsDiscNode of - true -> ordsets:add_element(Node, - DiscNodes); - false -> DiscNodes - end, - RunningNodes}). - -on_node_leave(Node) -> - {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(), - write_cluster_status_file({ordsets:del_element(Node, AllNodes), - ordsets:del_element(Node, DiscNodes), - ordsets:del_element(Node, RunningNodes)}). + end. %%-------------------------------------------------------------------- %% Internal helpers @@ -778,10 +669,10 @@ discover_cluster(Node) -> {error, {cannot_discover_cluster, "You provided the current node as node to cluster with"}}; false -> - case rpc:call(Node, rabbit_mnesia, cluster_status_if_running, []) of - {badrpc, _Reason} -> discover_cluster([]); - {error, node_not_running} -> discover_cluster([]); - {ok, Res} -> {ok, Res} + case rpc:call(Node, rabbit_mnesia, cluster_status_from_mnesia, []) of + {badrpc, _Reason} -> discover_cluster([]); + {error, mnesia_not_running} -> discover_cluster([]); + {ok, Res} -> {ok, Res} end end. @@ -1053,7 +944,7 @@ remove_node_if_mnesia_running(Node) -> %% propagated to all nodes case mnesia:del_table_copy(schema, Node) of {atomic, ok} -> - rabbit_node_monitor:notify_leave_cluster(Node), + rabbit_node_monitor:notify_left_cluster(Node), ok; {aborted, Reason} -> {error, {failed_to_remove_node, Node, Reason}} @@ -1097,10 +988,7 @@ wait_for(Condition) -> timer:sleep(1000). is_only_disc_node(Node) -> - [Node] =:= all_clustered_disc_nodes(). - -is_only_running_disc_node(Node) -> - [Node] =:= running_clustered_disc_nodes(). + [Node] =:= clustered_disc_nodes(). start_mnesia() -> check_cluster_consistency(), @@ -1120,28 +1008,3 @@ change_extra_db_nodes(ClusterNodes0, Force) -> {ok, Nodes} -> {ok, Nodes} end. - -%%-------------------------------------------------------------------- -%% Legacy functions related to the "running nodes" file -%%-------------------------------------------------------------------- - -legacy_running_nodes_filename() -> - filename:join(dir(), "nodes_running_at_shutdown"). - -legacy_read_previously_running_nodes() -> - FileName = legacy_running_nodes_filename(), - case rabbit_file:read_term_file(FileName) of - {ok, [Nodes]} -> Nodes; - {error, enoent} -> []; - {error, Reason} -> throw({error, {cannot_read_previous_nodes_file, - FileName, Reason}}) - end. - -legacy_delete_previously_running_nodes() -> - FileName = legacy_running_nodes_filename(), - case file:delete(FileName) of - ok -> ok; - {error, enoent} -> ok; - {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file, - FileName, Reason}}) - end. diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 342c744331..9e763da0c9 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -18,12 +18,27 @@ -behaviour(gen_server). --export([start_link/0]). +-export([prepare_cluster_status_file/0, + read_cluster_status_file/0, + update_cluster_status_file/0, + reset_cluster_status_file/0, --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). --export([rabbit_running_on/2, rabbit_left_cluster/1, rabbit_joined_cluster/2, - notify_cluster/0, notify_join_cluster/0, notify_leave_cluster/1]). + joined_cluster/2, + notify_joined_cluster/0, + left_cluster/1, + notify_left_cluster/1, + node_up/2, + notify_node_up/0, + this_node_down/0, + + start_link/0, + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 + ]). -define(SERVER, ?MODULE). -define(RABBIT_UP_RPC_TIMEOUT, 2000). @@ -32,90 +47,207 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(rabbit_running_on/2 :: (node(), boolean()) -> 'ok'). --spec(rabbit_left_cluster/1 :: (node()) -> 'ok'). --spec(notify_cluster/0 :: () -> 'ok'). --spec(notify_leave_cluster/1 :: (node()) -> 'ok'). +-spec(prepare_cluster_status_file/0 :: () -> 'ok'). +-spec(read_cluster_status_file/0 :: () -> rabbit_mnesia:cluster_status()). +-spec(update_cluster_status_file/0 :: () -> 'ok'). +-spec(reset_cluster_status_file/0 :: () -> 'ok'). + +-spec(joined_cluster/2 :: (node(), boolean()) -> 'ok'). +-spec(notify_joined_cluster/0 :: () -> 'ok'). +-spec(left_cluster/1 :: (node()) -> 'ok'). +-spec(notify_left_cluster/1 :: (node()) -> 'ok'). +-spec(node_up/2 :: (node(), boolean()) -> 'ok'). +-spec(notify_node_up/0 :: () -> 'ok'). +-spec(this_node_down/0 :: () -> 'ok'). -endif. -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------- +%% Cluster file operations +%%---------------------------------------------------------------------------- -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +%% The cluster node status file contains all we need to know about the cluster: +%% +%% * All the clustered nodes +%% * The disc nodes +%% * The running nodes. +%% +%% If the current node is a disc node it will be included in the disc nodes +%% list. +%% +%% We strive to keep the file up to date and we rely on this assumption in +%% various situations. Obviously when mnesia is offline the information we have +%% will be outdated, but it can't be otherwise. -rabbit_running_on(Node, IsDiscNode) -> - gen_server:cast(rabbit_node_monitor, - {rabbit_running_on, Node, IsDiscNode}). +cluster_status_file_filename() -> + rabbit_mnesia:dir() ++ "/cluster_nodes.config". -rabbit_running_on(Node) -> - rabbit_running_on(Node, rabbit_mnesia:is_disc_node()). +prepare_cluster_status_file() -> + NotPresent = + fun (AllNodes0, WantDiscNode) -> + ThisNode = [node()], -rabbit_left_cluster(Node) -> - gen_server:cast(rabbit_node_monitor, {rabbit_left_cluster, Node}). + RunningNodes0 = legacy_read_previously_running_nodes(), + legacy_delete_previously_running_nodes(), -rabbit_joined_cluster(Node, IsDiscNode) -> - gen_server:cast(rabbit_node_monitor, - {rabbit_joined_cluster, Node, IsDiscNode}). + RunningNodes = lists:usort(RunningNodes0 ++ ThisNode), + AllNodes = + lists:usort(AllNodes0 ++ RunningNodes), + DiscNodes = case WantDiscNode of + true -> ThisNode; + false -> [] + end, -cluster_multicall(Fun, Args) -> - Node = node(), - Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node], - %% notify other rabbits of this rabbit - case rpc:multicall(Nodes, rabbit_node_monitor, Fun, Args, - ?RABBIT_UP_RPC_TIMEOUT) of - {_, [] } -> ok; - {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad]) - end, - Nodes. + ok = write_cluster_status_file({AllNodes, DiscNodes, RunningNodes}) + end, + case try_read_cluster_status_file() of + {ok, _} -> + ok; + {error, {invalid_term, _, [AllNodes]}} -> + %% Legacy file + NotPresent(AllNodes, legacy_should_be_disc_node(AllNodes)); + {error, {cannot_read_file, _, enoent}} -> + {ok, {AllNodes, WantDiscNode}} = + application:get_env(rabbit, cluster_nodes), + NotPresent(AllNodes, WantDiscNode) + end. -notify_cluster() -> - Nodes = cluster_multicall(rabbit_running_on, - [node(), rabbit_mnesia:is_disc_node()]), - %% register other active rabbits with this rabbit - [ rabbit_running_on(N) || N <- Nodes ], + +write_cluster_status_file(Status) -> + FileName = cluster_status_file_filename(), + case rabbit_file:write_term_file(FileName, [Status]) of + ok -> ok; + {error, Reason} -> + throw({error, {cannot_write_cluster_status_file, + FileName, Reason}}) + end. + +try_read_cluster_status_file() -> + FileName = cluster_status_file_filename(), + case rabbit_file:read_term_file(FileName) of + {ok, [{_, _, _} = Status]} -> + {ok, Status}; + {ok, Term} -> + {error, {invalid_term, FileName, Term}}; + {error, Reason} -> + {error, {cannot_read_file, FileName, Reason}} + end. + +read_cluster_status_file() -> + case try_read_cluster_status_file() of + {ok, Status} -> + Status; + {error, Reason} -> + throw({error, {cannot_read_cluster_status_file, Reason}}) + end. + +update_cluster_status_file() -> + {ok, Status} = rabbit_mnesia:cluster_status_from_mnesia(), + write_cluster_status_file(Status). + +reset_cluster_status_file() -> + write_cluster_status_file({[node()], [node()], [node()]}). + +%%---------------------------------------------------------------------------- +%% Cluster notifications +%%---------------------------------------------------------------------------- + +joined_cluster(Node, IsDiscNode) -> + gen_server:cast(rabbit_node_monitor, {rabbit_join, Node, IsDiscNode}). + +notify_joined_cluster() -> + cluster_multicall(joined_cluster, [node(), rabbit_mnesia:is_disc_node()]), ok. -notify_join_cluster() -> - cluster_multicall(rabbit_joined_cluster, - [node(), rabbit_mnesia:is_disc_node()]), +left_cluster(Node) -> + gen_server:cast(rabbit_node_monitor, {left_cluster, Node}). + +notify_left_cluster(Node) -> + left_cluster(Node), + cluster_multicall(left_cluster, [Node]), ok. -notify_leave_cluster(Node) -> - rabbit_left_cluster(Node), - cluster_multicall(rabbit_left_cluster, [Node]), +node_up(Node, IsDiscNode) -> + gen_server:cast(rabbit_node_monitor, {node_up, Node, IsDiscNode}). + +notify_node_up() -> + Nodes = cluster_multicall(node_up, [node(), rabbit_mnesia:is_disc_node()]), + %% register other active rabbits with this rabbit + [ node_up(N, ordsets:is_element(N, rabbit_mnesia:clustered_disc_nodes())) || + N <- Nodes ], ok. -%%-------------------------------------------------------------------- +this_node_down() -> + case mnesia:system_info(is_running) of + yes -> throw({error, node_running}); + no -> {AllNodes, DiscNodes, RunningNodes} = + read_cluster_status_file(), + write_cluster_status_file( + {AllNodes, DiscNodes, + ordsets:del_element(node(), RunningNodes)}) + end. + +%%---------------------------------------------------------------------------- +%% gen_server callbacks +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). init([]) -> - {ok, ordsets:new()}. + {ok, no_state}. handle_call(_Request, _From, State) -> {noreply, State}. -handle_cast({rabbit_running_on, Node, IsDiscNode}, Nodes) -> - case ordsets:is_element(Node, Nodes) of - true -> {noreply, Nodes}; +%% Note: when updating the status file, we can't simply write the mnesia +%% information since the message can (and will) overtake the mnesia propagation. +handle_cast({node_up, Node, IsDiscNode}, State) -> + case is_already_monitored({rabbit, Node}) of + true -> {noreply, State}; false -> rabbit_log:info("rabbit on node ~p up~n", [Node]), + {ok, {AllNodes, DiscNodes, RunningNodes}} = + rabbit_mnesia:cluster_status_from_mnesia(), + write_cluster_status_file( + {ordsets:add_element(Node, AllNodes), + case IsDiscNode of + true -> ordsets:add_element(Node, DiscNodes); + false -> DiscNodes + end, + ordsets:add_element(Node, RunningNodes)}), erlang:monitor(process, {rabbit, Node}), - ok = handle_live_rabbit(Node, IsDiscNode), - {noreply, ordsets:add_element(Node, Nodes)} + ok = handle_live_rabbit(Node), + {noreply, State} end; -handle_cast({rabbit_joined_cluster, Node, IsDiscNode}, Nodes) -> - ok = rabbit_mnesia:on_node_join(Node, IsDiscNode), - {noreply, Nodes}; -handle_cast({rabbit_left_cluster, Node}, Nodes) -> - ok = rabbit_mnesia:on_node_leave(Node), - {noreply, Nodes}; +handle_cast({joined_cluster, Node, IsDiscNode}, State) -> + {ok, {AllNodes, DiscNodes, RunningNodes}} = + rabbit_mnesia:cluster_status_from_mnesia(), + write_cluster_status_file({ordsets:add_element(Node, AllNodes), + case IsDiscNode of + true -> ordsets:add_element(Node, + DiscNodes); + false -> DiscNodes + end, + RunningNodes}), + {noreply, State}; +handle_cast({left_cluster, Node}, State) -> + {ok, {AllNodes, DiscNodes, RunningNodes}} = + rabbit_mnesia:cluster_status_from_mnesia(), + write_cluster_status_file({ordsets:del_element(Node, AllNodes), + ordsets:del_element(Node, DiscNodes), + ordsets:del_element(Node, RunningNodes)}), + {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Nodes) -> +handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) -> rabbit_log:info("rabbit on node ~p down~n", [Node]), + {ok, {AllNodes, DiscNodes, RunningNodes}} = + rabbit_mnesia:cluster_status_from_mnesia(), + write_cluster_status_file({AllNodes, DiscNodes, + ordsets:del_element(Node, RunningNodes)}), ok = handle_dead_rabbit(Node), - {noreply, ordsets:del_element(Node, Nodes)}; + {noreply, State}; handle_info(_Info, State) -> {noreply, State}. @@ -125,18 +257,67 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------- +%% Functions that call the module specific hooks when nodes go up/down +%%---------------------------------------------------------------------------- %% TODO: This may turn out to be a performance hog when there are lots %% of nodes. We really only need to execute some of these statements %% on *one* node, rather than all of them. handle_dead_rabbit(Node) -> - ok = rabbit_mnesia:on_node_down(Node), ok = rabbit_networking:on_node_down(Node), ok = rabbit_amqqueue:on_node_down(Node), ok = rabbit_alarm:on_node_down(Node). -handle_live_rabbit(Node, IsDiscNode) -> - ok = rabbit_mnesia:on_node_up(Node, IsDiscNode), - ok = rabbit_alarm:on_node_up(Node). +handle_live_rabbit(Node) -> + ok = rabbit_alarm:on_node_up(Node), + ok = rabbit_mnesia:on_node_down(Node). + +%%-------------------------------------------------------------------- +%% Internal utils +%%-------------------------------------------------------------------- + +cluster_multicall(Fun, Args) -> + Node = node(), + Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node], + %% notify other rabbits of this cluster + case rpc:multicall(Nodes, rabbit_node_monitor, Fun, Args, + ?RABBIT_UP_RPC_TIMEOUT) of + {_, [] } -> ok; + {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad]) + end, + Nodes. + +is_already_monitored(Item) -> + {monitors, Monitors} = process_info(self(), monitors), + lists:any(fun ({_, Item1}) when Item =:= Item1 -> true; + (_) -> false + end, Monitors). + +legacy_should_be_disc_node(DiscNodes) -> + DiscNodes == [] orelse lists:member(node(), DiscNodes). + +%%-------------------------------------------------------------------- +%% Legacy functions related to the "running nodes" file +%%-------------------------------------------------------------------- + +legacy_running_nodes_filename() -> + filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown"). + +legacy_read_previously_running_nodes() -> + FileName = legacy_running_nodes_filename(), + case rabbit_file:read_term_file(FileName) of + {ok, [Nodes]} -> Nodes; + {error, enoent} -> []; + {error, Reason} -> throw({error, {cannot_read_previous_nodes_file, + FileName, Reason}}) + end. +legacy_delete_previously_running_nodes() -> + FileName = legacy_running_nodes_filename(), + case file:delete(FileName) of + ok -> ok; + {error, enoent} -> ok; + {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file, + FileName, Reason}}) + end. |
