summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit.erl5
-rw-r--r--src/rabbit_mnesia.erl281
-rw-r--r--src/rabbit_node_monitor.erl309
3 files changed, 320 insertions, 275 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 0d2e27b931..7bdde56954 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -176,7 +176,7 @@
-rabbit_boot_step({notify_cluster,
[{description, "notify cluster nodes"},
- {mfa, {rabbit_node_monitor, notify_cluster, []}},
+ {mfa, {rabbit_node_monitor, notify_node_up, []}},
{requires, networking}]}).
%%---------------------------------------------------------------------------
@@ -332,7 +332,8 @@ start_it(StartFun) ->
stop() ->
rabbit_log:info("Stopping Rabbit~n"),
- ok = app_utils:stop_applications(app_shutdown_order()).
+ ok = app_utils:stop_applications(app_shutdown_order()),
+ rabbit_node_monitor:this_node_down().
stop_and_halt() ->
try
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 5c315a07c6..727054ad16 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -30,27 +30,25 @@
is_db_empty/0,
is_clustered/0,
all_clustered_nodes/0,
- all_clustered_disc_nodes/0,
+ clustered_disc_nodes/0,
running_clustered_nodes/0,
is_disc_node/0,
dir/0,
table_names/0,
wait_for_tables/1,
+ cluster_status_from_mnesia/0,
init_db/3,
empty_ram_only_tables/0,
copy_db/1,
wait_for_tables/0,
- on_node_up/2,
- on_node_down/1,
- on_node_join/2,
- on_node_leave/1
+ on_node_up/1,
+ on_node_down/1
]).
%% Used internally in rpc calls
--export([cluster_status_if_running/0,
- node_info/0,
+-export([node_info/0,
remove_node_if_mnesia_running/1
]).
@@ -64,11 +62,11 @@
-ifdef(use_specs).
--export_type([node_type/0]).
+-export_type([node_type/0, cluster_status/0]).
-type(node_type() :: disc | ram).
--type(node_status() :: {ordsets:ordset(node()), ordsets:ordset(node()),
- ordsets:ordset(node())}).
+-type(cluster_status() :: {ordsets:ordset(node()), ordsets:ordset(node()),
+ ordsets:ordset(node())}).
%% Main interface
-spec(prepare/0 :: () -> 'ok').
@@ -86,11 +84,13 @@
-spec(is_db_empty/0 :: () -> boolean()).
-spec(is_clustered/0 :: () -> boolean()).
-spec(all_clustered_nodes/0 :: () -> [node()]).
--spec(all_clustered_disc_nodes/0 :: () -> [node()]).
+-spec(clustered_disc_nodes/0 :: () -> [node()]).
-spec(running_clustered_nodes/0 :: () -> [node()]).
-spec(is_disc_node/0 :: () -> boolean()).
-spec(dir/0 :: () -> file:filename()).
-spec(table_names/0 :: () -> [atom()]).
+-spec(cluster_status_from_mnesia/0 :: () -> {'ok', cluster_status()} |
+ {'error', 'mnesia_not_running'}).
%% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit'
-spec(init_db/3 :: ([node()], boolean(), boolean()) -> 'ok').
@@ -98,17 +98,15 @@
-spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
-spec(wait_for_tables/1 :: ([atom()]) -> 'ok').
+-spec(check_cluster_consistency/0 :: () -> 'ok').
%% Hooks used in `rabbit_node_monitor'
--spec(on_node_up/2 :: (node(), boolean()) -> 'ok').
+-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
--spec(on_node_leave/1 :: (node()) -> 'ok').
%% Functions used in internal rpc calls
--spec(cluster_status_if_running/0 :: () -> {'ok', node_status()} |
- {'error', 'node_not_running'}).
-spec(node_info/0 :: () -> {string(), string(),
- ({'ok', node_status()} | 'error')}).
+ ({'ok', cluster_status()} | 'error')}).
-spec(remove_node_if_mnesia_running/1 :: (node()) -> 'ok' |
{'error', term()}).
@@ -118,49 +116,19 @@
%% Main interface
%%----------------------------------------------------------------------------
-%% Sets up the cluster status file when needed, taking care of the legacy
-%% files
prepare() ->
ensure_mnesia_dir(),
- NotPresent =
- fun (AllNodes0, WantDiscNode) ->
- ThisNode = [node()],
-
- RunningNodes0 = legacy_read_previously_running_nodes(),
- legacy_delete_previously_running_nodes(),
-
- RunningNodes = lists:usort(RunningNodes0 ++ ThisNode),
- AllNodes =
- lists:usort(AllNodes0 ++ RunningNodes),
- DiscNodes = case WantDiscNode of
- true -> ThisNode;
- false -> []
- end,
-
- ok = write_cluster_status_file({AllNodes, DiscNodes, RunningNodes})
- end,
- case try_read_cluster_status_file() of
- {ok, _} ->
- %% We check the consistency only when the cluster status exists,
- %% since when it doesn't exist it means that we just started a fresh
- %% node, and when we have a legacy node with an old
- %% "cluster_nodes.config" we can't check the consistency anyway
- check_cluster_consistency(),
- ok;
- {error, {invalid_term, _, [AllNodes]}} ->
- %% Legacy file
- NotPresent(AllNodes, should_be_disc_node(AllNodes));
- {error, {cannot_read_file, _, enoent}} ->
- {ok, {AllNodes, WantDiscNode}} =
- application:get_env(rabbit, cluster_nodes),
- NotPresent(AllNodes, WantDiscNode)
- end.
+ rabbit_node_monitor:prepare_cluster_status_file(),
+ check_cluster_consistency().
init() ->
ensure_mnesia_running(),
ensure_mnesia_dir(),
- {AllNodes, DiscNodes, _} = read_cluster_status_file(),
- DiscNode = should_be_disc_node(DiscNodes),
+ %% Here we want the cluster status *file*, not the status from mnesia,
+ %% because in the case of RAM nodes the status from mnesia doesn't matter.
+ Status = {AllNodes, _, _} =
+ rabbit_node_monitor:read_cluster_status_file(),
+ DiscNode = is_disc_node(Status),
init_db_and_upgrade(AllNodes, DiscNode, DiscNode),
%% We intuitively expect the global name server to be synced when
%% Mnesia is up. In fact that's not guaranteed to be the case - let's
@@ -218,7 +186,7 @@ join_cluster(DiscoveryNode, WantDiscNode) ->
%% Join the cluster
ok = init_db_with_mnesia(DiscNodes, WantDiscNode, false),
- rabbit_node_monitor:notify_join_cluster(),
+ rabbit_node_monitor:notify_joined_cluster(),
ok.
@@ -246,22 +214,17 @@ reset(Force) ->
Node = node(),
case Force of
true ->
- ok;
+ all_clustered_nodes();
false ->
+ AllNodes0 = all_clustered_nodes(),
%% Reconnecting so that we will get an up to date nodes
- try
- %% Force=true here so that reset still works when
- %% clustered with a node which is down
- start_mnesia(),
- {AllNodes, DiscNodes, _} = read_cluster_status_file(),
- init_db_and_upgrade(
- AllNodes, should_be_disc_node(DiscNodes), true)
- after
- stop_mnesia()
- end,
+ %% Force=true here so that reset still works when
+ %% clustered with a node which is down
+ init_db_with_mnesia(AllNodes0, is_disc_node(), true),
leave_cluster(),
rabbit_misc:ensure_ok(mnesia:delete_schema([Node]),
- cannot_delete_schema)
+ cannot_delete_schema),
+ all_clustered_nodes()
end,
%% We need to make sure that we don't end up in a distributed Erlang system
%% with nodes while not being in an Mnesia cluster with them. We don't
@@ -269,7 +232,7 @@ reset(Force) ->
[erlang:disconnect_node(N) || N <- all_clustered_nodes()],
%% remove persisted messages and any other garbage we find
ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
- ok = write_cluster_status_file(initial_cluster_status()),
+ ok = rabbit_node_monitor:reset_cluster_status_file(),
ok.
change_node_type(Type) ->
@@ -376,7 +339,7 @@ status() ->
IfNonEmpty = fun (_, []) -> [];
(Type, Nodes) -> [{Type, Nodes}]
end,
- [{nodes, (IfNonEmpty(disc, all_clustered_disc_nodes()) ++
+ [{nodes, (IfNonEmpty(disc, clustered_disc_nodes()) ++
IfNonEmpty(ram, all_clustered_ram_nodes()))},
{running_nodes, running_clustered_nodes()}].
@@ -386,7 +349,7 @@ is_db_empty() ->
is_clustered() ->
Nodes = all_clustered_nodes(),
- [node()] /= Nodes andalso [] /= Nodes.
+ [node()] =/= Nodes andalso [] =/= Nodes.
is_disc_and_clustered() ->
is_disc_node() andalso is_clustered().
@@ -398,7 +361,7 @@ all_clustered_nodes() ->
{AllNodes, _, _} = cluster_status(),
AllNodes.
-all_clustered_disc_nodes() ->
+clustered_disc_nodes() ->
{_, DiscNodes, _} = cluster_status(),
DiscNodes.
@@ -416,25 +379,38 @@ running_clustered_disc_nodes() ->
%% This function is the actual source of information, since it gets the data
%% from mnesia. Obviously it'll work only when mnesia is running.
-cluster_status_if_running() ->
+cluster_status_from_mnesia() ->
+ Check = fun (Nodes) ->
+ case mnesia:system_info(use_dir) of
+ true -> ordsets:add_element(node(), Nodes);
+ false -> Nodes
+ end
+ end,
case mnesia:system_info(is_running) of
- no -> {error, node_not_running};
+ no -> {error, mnesia_not_running};
yes -> {ok, {ordsets:from_list(mnesia:system_info(db_nodes)),
- ordsets:from_list(mnesia:table_info(schema, disc_copies)),
+ Check(ordsets:from_list(
+ mnesia:table_info(schema, disc_copies))),
ordsets:from_list(mnesia:system_info(running_db_nodes))}}
end.
cluster_status() ->
- case cluster_status_if_running() of
- {ok, Status} -> Status;
- {error, node_not_running} -> read_cluster_status_file()
+ case cluster_status_from_mnesia() of
+ {ok, Status} ->
+ Status;
+ {error, mnesia_not_running} ->
+ rabbit_node_monitor:read_cluster_status_file()
end.
node_info() ->
{erlang:system_info(otp_release), rabbit_misc:rabbit_version(),
- cluster_status_if_running()}.
+ cluster_status_from_mnesia()}.
+
+is_disc_node() ->
+ is_disc_node(cluster_status()).
-is_disc_node() -> mnesia:system_info(use_dir).
+is_disc_node({_, DiscNodes, _}) ->
+ DiscNodes =:= [] orelse ordsets:is_element(node(), DiscNodes).
dir() -> mnesia:system_info(directory).
@@ -453,7 +429,10 @@ init_db(ClusterNodes, WantDiscNode, Force) ->
{error, Reason} ->
throw({error, Reason});
{ok, Nodes} ->
- WasDiscNode = is_disc_node(),
+ %% Note that we use `system_info' here and not the cluster status
+ %% since when we start rabbit for the first time the cluster status
+ %% will say we are a disc node but the tables won't be present yet.
+ WasDiscNode = mnesia:system_info(use_dir),
case {Nodes, WasDiscNode, WantDiscNode} of
{[], _, false} ->
%% Standalone ram node, we don't want that
@@ -482,7 +461,7 @@ init_db(ClusterNodes, WantDiscNode, Force) ->
end
end,
ensure_schema_integrity(),
- update_cluster_status_file(),
+ rabbit_node_monitor:update_cluster_status_file(),
ok
end.
@@ -510,9 +489,7 @@ init_db_with_mnesia(ClusterNodes, WantDiscNode, Force) ->
after
stop_mnesia()
end,
- {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(),
- write_cluster_status_file(
- {AllNodes, DiscNodes, ordsets:del_element(node(), RunningNodes)}).
+ rabbit_node_monitor:this_node_down().
ensure_mnesia_dir() ->
MnesiaDir = dir() ++ "/",
@@ -610,9 +587,6 @@ wait_for_tables(TableNames) ->
throw({error, {failed_waiting_for_tables, Reason}})
end.
-should_be_disc_node(DiscNodes) ->
- DiscNodes == [] orelse lists:member(node(), DiscNodes).
-
%% This does not guarantee us much, but it avoids some situations that will
%% definitely end up badly
check_cluster_consistency() ->
@@ -652,7 +626,7 @@ check_cluster_consistency() ->
case rpc:call(Node, rabbit_mnesia, node_info, []) of
{badrpc, _Reason} ->
ok;
- {OTP, Rabbit, {error, node_not_running}} ->
+ {OTP, Rabbit, {error, mnesia_not_running}} ->
CheckOTP(OTP),
CheckRabbit(Rabbit);
{OTP, Rabbit, {ok, {AllNodes, _, _}}} ->
@@ -662,104 +636,21 @@ check_cluster_consistency() ->
end
end, all_clustered_nodes()).
-%%----------------------------------------------------------------------------
-%% Cluster status file functions
-%%----------------------------------------------------------------------------
-
-%% The cluster node status file contains all we need to know about the cluster:
-%%
-%% * All the clustered nodes
-%% * The disc nodes
-%% * The running nodes.
-%%
-%% If the current node is a disc node it will be included in the disc nodes
-%% list.
-%%
-%% We strive to keep the file up to date and we rely on this assumption in
-%% various situations. Obviously when mnesia is offline the information we have
-%% will be outdated, but it can't be otherwise.
-
-cluster_status_file_filename() ->
- dir() ++ "/cluster_nodes.config".
-
-initial_cluster_status() ->
- {[node()], [node()], [node()]}.
-
-write_cluster_status_file(Status) ->
- FileName = cluster_status_file_filename(),
- case rabbit_file:write_term_file(FileName, [Status]) of
- ok -> ok;
- {error, Reason} ->
- throw({error, {cannot_write_cluster_status_file,
- FileName, Reason}})
- end.
-
-try_read_cluster_status_file() ->
- FileName = cluster_status_file_filename(),
- case rabbit_file:read_term_file(FileName) of
- {ok, [{_, _, _} = Status]} ->
- {ok, Status};
- {ok, Term} ->
- {error, {invalid_term, FileName, Term}};
- {error, Reason} ->
- {error, {cannot_read_file, FileName, Reason}}
- end.
-
-read_cluster_status_file() ->
- case try_read_cluster_status_file() of
- {ok, Status} ->
- Status;
- {error, Reason} ->
- throw({error, {cannot_read_cluster_status_file, Reason}})
- end.
-
-%% To update the cluster status when mnesia is running.
-update_cluster_status_file() ->
- {ok, Status} = cluster_status_if_running(),
- write_cluster_status_file(Status).
-
%%--------------------------------------------------------------------
%% Hooks for `rabbit_node_monitor'
%%--------------------------------------------------------------------
-on_node_up(Node, IsDiscNode) ->
- {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(),
- write_cluster_status_file({ordsets:add_element(Node, AllNodes),
- case IsDiscNode of
- true -> ordsets:add_element(Node,
- DiscNodes);
- false -> DiscNodes
- end,
- ordsets:add_element(Node, RunningNodes)}),
- case is_only_running_disc_node(Node) of
+on_node_up(Node) ->
+ case running_clustered_disc_nodes() =:= [Node] of
true -> rabbit_log:info("cluster contains disc nodes again~n");
false -> ok
end.
-on_node_down(Node) ->
- case is_only_running_disc_node(Node) of
+on_node_down(_Node) ->
+ case running_clustered_disc_nodes() =:= [] of
true -> rabbit_log:info("only running disc node went down~n");
false -> ok
- end,
- {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(),
- write_cluster_status_file({AllNodes, DiscNodes,
- ordsets:del_element(Node, RunningNodes)}).
-
-on_node_join(Node, IsDiscNode) ->
- {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(),
- write_cluster_status_file({ordsets:add_element(Node, AllNodes),
- case IsDiscNode of
- true -> ordsets:add_element(Node,
- DiscNodes);
- false -> DiscNodes
- end,
- RunningNodes}).
-
-on_node_leave(Node) ->
- {AllNodes, DiscNodes, RunningNodes} = read_cluster_status_file(),
- write_cluster_status_file({ordsets:del_element(Node, AllNodes),
- ordsets:del_element(Node, DiscNodes),
- ordsets:del_element(Node, RunningNodes)}).
+ end.
%%--------------------------------------------------------------------
%% Internal helpers
@@ -778,10 +669,10 @@ discover_cluster(Node) ->
{error, {cannot_discover_cluster,
"You provided the current node as node to cluster with"}};
false ->
- case rpc:call(Node, rabbit_mnesia, cluster_status_if_running, []) of
- {badrpc, _Reason} -> discover_cluster([]);
- {error, node_not_running} -> discover_cluster([]);
- {ok, Res} -> {ok, Res}
+ case rpc:call(Node, rabbit_mnesia, cluster_status_from_mnesia, []) of
+ {badrpc, _Reason} -> discover_cluster([]);
+ {error, mnesia_not_running} -> discover_cluster([]);
+ {ok, Res} -> {ok, Res}
end
end.
@@ -1053,7 +944,7 @@ remove_node_if_mnesia_running(Node) ->
%% propagated to all nodes
case mnesia:del_table_copy(schema, Node) of
{atomic, ok} ->
- rabbit_node_monitor:notify_leave_cluster(Node),
+ rabbit_node_monitor:notify_left_cluster(Node),
ok;
{aborted, Reason} ->
{error, {failed_to_remove_node, Node, Reason}}
@@ -1097,10 +988,7 @@ wait_for(Condition) ->
timer:sleep(1000).
is_only_disc_node(Node) ->
- [Node] =:= all_clustered_disc_nodes().
-
-is_only_running_disc_node(Node) ->
- [Node] =:= running_clustered_disc_nodes().
+ [Node] =:= clustered_disc_nodes().
start_mnesia() ->
check_cluster_consistency(),
@@ -1120,28 +1008,3 @@ change_extra_db_nodes(ClusterNodes0, Force) ->
{ok, Nodes} ->
{ok, Nodes}
end.
-
-%%--------------------------------------------------------------------
-%% Legacy functions related to the "running nodes" file
-%%--------------------------------------------------------------------
-
-legacy_running_nodes_filename() ->
- filename:join(dir(), "nodes_running_at_shutdown").
-
-legacy_read_previously_running_nodes() ->
- FileName = legacy_running_nodes_filename(),
- case rabbit_file:read_term_file(FileName) of
- {ok, [Nodes]} -> Nodes;
- {error, enoent} -> [];
- {error, Reason} -> throw({error, {cannot_read_previous_nodes_file,
- FileName, Reason}})
- end.
-
-legacy_delete_previously_running_nodes() ->
- FileName = legacy_running_nodes_filename(),
- case file:delete(FileName) of
- ok -> ok;
- {error, enoent} -> ok;
- {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file,
- FileName, Reason}})
- end.
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 342c744331..9e763da0c9 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -18,12 +18,27 @@
-behaviour(gen_server).
--export([start_link/0]).
+-export([prepare_cluster_status_file/0,
+ read_cluster_status_file/0,
+ update_cluster_status_file/0,
+ reset_cluster_status_file/0,
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
--export([rabbit_running_on/2, rabbit_left_cluster/1, rabbit_joined_cluster/2,
- notify_cluster/0, notify_join_cluster/0, notify_leave_cluster/1]).
+ joined_cluster/2,
+ notify_joined_cluster/0,
+ left_cluster/1,
+ notify_left_cluster/1,
+ node_up/2,
+ notify_node_up/0,
+ this_node_down/0,
+
+ start_link/0,
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3
+ ]).
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
@@ -32,90 +47,207 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(rabbit_running_on/2 :: (node(), boolean()) -> 'ok').
--spec(rabbit_left_cluster/1 :: (node()) -> 'ok').
--spec(notify_cluster/0 :: () -> 'ok').
--spec(notify_leave_cluster/1 :: (node()) -> 'ok').
+-spec(prepare_cluster_status_file/0 :: () -> 'ok').
+-spec(read_cluster_status_file/0 :: () -> rabbit_mnesia:cluster_status()).
+-spec(update_cluster_status_file/0 :: () -> 'ok').
+-spec(reset_cluster_status_file/0 :: () -> 'ok').
+
+-spec(joined_cluster/2 :: (node(), boolean()) -> 'ok').
+-spec(notify_joined_cluster/0 :: () -> 'ok').
+-spec(left_cluster/1 :: (node()) -> 'ok').
+-spec(notify_left_cluster/1 :: (node()) -> 'ok').
+-spec(node_up/2 :: (node(), boolean()) -> 'ok').
+-spec(notify_node_up/0 :: () -> 'ok').
+-spec(this_node_down/0 :: () -> 'ok').
-endif.
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------
+%% Cluster file operations
+%%----------------------------------------------------------------------------
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+%% The cluster node status file contains all we need to know about the cluster:
+%%
+%% * All the clustered nodes
+%% * The disc nodes
+%% * The running nodes.
+%%
+%% If the current node is a disc node it will be included in the disc nodes
+%% list.
+%%
+%% We strive to keep the file up to date and we rely on this assumption in
+%% various situations. Obviously when mnesia is offline the information we have
+%% will be outdated, but it can't be otherwise.
-rabbit_running_on(Node, IsDiscNode) ->
- gen_server:cast(rabbit_node_monitor,
- {rabbit_running_on, Node, IsDiscNode}).
+cluster_status_file_filename() ->
+ rabbit_mnesia:dir() ++ "/cluster_nodes.config".
-rabbit_running_on(Node) ->
- rabbit_running_on(Node, rabbit_mnesia:is_disc_node()).
+prepare_cluster_status_file() ->
+ NotPresent =
+ fun (AllNodes0, WantDiscNode) ->
+ ThisNode = [node()],
-rabbit_left_cluster(Node) ->
- gen_server:cast(rabbit_node_monitor, {rabbit_left_cluster, Node}).
+ RunningNodes0 = legacy_read_previously_running_nodes(),
+ legacy_delete_previously_running_nodes(),
-rabbit_joined_cluster(Node, IsDiscNode) ->
- gen_server:cast(rabbit_node_monitor,
- {rabbit_joined_cluster, Node, IsDiscNode}).
+ RunningNodes = lists:usort(RunningNodes0 ++ ThisNode),
+ AllNodes =
+ lists:usort(AllNodes0 ++ RunningNodes),
+ DiscNodes = case WantDiscNode of
+ true -> ThisNode;
+ false -> []
+ end,
-cluster_multicall(Fun, Args) ->
- Node = node(),
- Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
- %% notify other rabbits of this rabbit
- case rpc:multicall(Nodes, rabbit_node_monitor, Fun, Args,
- ?RABBIT_UP_RPC_TIMEOUT) of
- {_, [] } -> ok;
- {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
- end,
- Nodes.
+ ok = write_cluster_status_file({AllNodes, DiscNodes, RunningNodes})
+ end,
+ case try_read_cluster_status_file() of
+ {ok, _} ->
+ ok;
+ {error, {invalid_term, _, [AllNodes]}} ->
+ %% Legacy file
+ NotPresent(AllNodes, legacy_should_be_disc_node(AllNodes));
+ {error, {cannot_read_file, _, enoent}} ->
+ {ok, {AllNodes, WantDiscNode}} =
+ application:get_env(rabbit, cluster_nodes),
+ NotPresent(AllNodes, WantDiscNode)
+ end.
-notify_cluster() ->
- Nodes = cluster_multicall(rabbit_running_on,
- [node(), rabbit_mnesia:is_disc_node()]),
- %% register other active rabbits with this rabbit
- [ rabbit_running_on(N) || N <- Nodes ],
+
+write_cluster_status_file(Status) ->
+ FileName = cluster_status_file_filename(),
+ case rabbit_file:write_term_file(FileName, [Status]) of
+ ok -> ok;
+ {error, Reason} ->
+ throw({error, {cannot_write_cluster_status_file,
+ FileName, Reason}})
+ end.
+
+try_read_cluster_status_file() ->
+ FileName = cluster_status_file_filename(),
+ case rabbit_file:read_term_file(FileName) of
+ {ok, [{_, _, _} = Status]} ->
+ {ok, Status};
+ {ok, Term} ->
+ {error, {invalid_term, FileName, Term}};
+ {error, Reason} ->
+ {error, {cannot_read_file, FileName, Reason}}
+ end.
+
+read_cluster_status_file() ->
+ case try_read_cluster_status_file() of
+ {ok, Status} ->
+ Status;
+ {error, Reason} ->
+ throw({error, {cannot_read_cluster_status_file, Reason}})
+ end.
+
+update_cluster_status_file() ->
+ {ok, Status} = rabbit_mnesia:cluster_status_from_mnesia(),
+ write_cluster_status_file(Status).
+
+reset_cluster_status_file() ->
+ write_cluster_status_file({[node()], [node()], [node()]}).
+
+%%----------------------------------------------------------------------------
+%% Cluster notifications
+%%----------------------------------------------------------------------------
+
+joined_cluster(Node, IsDiscNode) ->
+ gen_server:cast(rabbit_node_monitor, {rabbit_join, Node, IsDiscNode}).
+
+notify_joined_cluster() ->
+ cluster_multicall(joined_cluster, [node(), rabbit_mnesia:is_disc_node()]),
ok.
-notify_join_cluster() ->
- cluster_multicall(rabbit_joined_cluster,
- [node(), rabbit_mnesia:is_disc_node()]),
+left_cluster(Node) ->
+ gen_server:cast(rabbit_node_monitor, {left_cluster, Node}).
+
+notify_left_cluster(Node) ->
+ left_cluster(Node),
+ cluster_multicall(left_cluster, [Node]),
ok.
-notify_leave_cluster(Node) ->
- rabbit_left_cluster(Node),
- cluster_multicall(rabbit_left_cluster, [Node]),
+node_up(Node, IsDiscNode) ->
+ gen_server:cast(rabbit_node_monitor, {node_up, Node, IsDiscNode}).
+
+notify_node_up() ->
+ Nodes = cluster_multicall(node_up, [node(), rabbit_mnesia:is_disc_node()]),
+ %% register other active rabbits with this rabbit
+ [ node_up(N, ordsets:is_element(N, rabbit_mnesia:clustered_disc_nodes())) ||
+ N <- Nodes ],
ok.
-%%--------------------------------------------------------------------
+this_node_down() ->
+ case mnesia:system_info(is_running) of
+ yes -> throw({error, node_running});
+ no -> {AllNodes, DiscNodes, RunningNodes} =
+ read_cluster_status_file(),
+ write_cluster_status_file(
+ {AllNodes, DiscNodes,
+ ordsets:del_element(node(), RunningNodes)})
+ end.
+
+%%----------------------------------------------------------------------------
+%% gen_server callbacks
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
init([]) ->
- {ok, ordsets:new()}.
+ {ok, no_state}.
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast({rabbit_running_on, Node, IsDiscNode}, Nodes) ->
- case ordsets:is_element(Node, Nodes) of
- true -> {noreply, Nodes};
+%% Note: when updating the status file, we can't simply write the mnesia
+%% information since the message can (and will) overtake the mnesia propagation.
+handle_cast({node_up, Node, IsDiscNode}, State) ->
+ case is_already_monitored({rabbit, Node}) of
+ true -> {noreply, State};
false -> rabbit_log:info("rabbit on node ~p up~n", [Node]),
+ {ok, {AllNodes, DiscNodes, RunningNodes}} =
+ rabbit_mnesia:cluster_status_from_mnesia(),
+ write_cluster_status_file(
+ {ordsets:add_element(Node, AllNodes),
+ case IsDiscNode of
+ true -> ordsets:add_element(Node, DiscNodes);
+ false -> DiscNodes
+ end,
+ ordsets:add_element(Node, RunningNodes)}),
erlang:monitor(process, {rabbit, Node}),
- ok = handle_live_rabbit(Node, IsDiscNode),
- {noreply, ordsets:add_element(Node, Nodes)}
+ ok = handle_live_rabbit(Node),
+ {noreply, State}
end;
-handle_cast({rabbit_joined_cluster, Node, IsDiscNode}, Nodes) ->
- ok = rabbit_mnesia:on_node_join(Node, IsDiscNode),
- {noreply, Nodes};
-handle_cast({rabbit_left_cluster, Node}, Nodes) ->
- ok = rabbit_mnesia:on_node_leave(Node),
- {noreply, Nodes};
+handle_cast({joined_cluster, Node, IsDiscNode}, State) ->
+ {ok, {AllNodes, DiscNodes, RunningNodes}} =
+ rabbit_mnesia:cluster_status_from_mnesia(),
+ write_cluster_status_file({ordsets:add_element(Node, AllNodes),
+ case IsDiscNode of
+ true -> ordsets:add_element(Node,
+ DiscNodes);
+ false -> DiscNodes
+ end,
+ RunningNodes}),
+ {noreply, State};
+handle_cast({left_cluster, Node}, State) ->
+ {ok, {AllNodes, DiscNodes, RunningNodes}} =
+ rabbit_mnesia:cluster_status_from_mnesia(),
+ write_cluster_status_file({ordsets:del_element(Node, AllNodes),
+ ordsets:del_element(Node, DiscNodes),
+ ordsets:del_element(Node, RunningNodes)}),
+ {noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Nodes) ->
+handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
rabbit_log:info("rabbit on node ~p down~n", [Node]),
+ {ok, {AllNodes, DiscNodes, RunningNodes}} =
+ rabbit_mnesia:cluster_status_from_mnesia(),
+ write_cluster_status_file({AllNodes, DiscNodes,
+ ordsets:del_element(Node, RunningNodes)}),
ok = handle_dead_rabbit(Node),
- {noreply, ordsets:del_element(Node, Nodes)};
+ {noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
@@ -125,18 +257,67 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------
+%% Functions that call the module specific hooks when nodes go up/down
+%%----------------------------------------------------------------------------
%% TODO: This may turn out to be a performance hog when there are lots
%% of nodes. We really only need to execute some of these statements
%% on *one* node, rather than all of them.
handle_dead_rabbit(Node) ->
- ok = rabbit_mnesia:on_node_down(Node),
ok = rabbit_networking:on_node_down(Node),
ok = rabbit_amqqueue:on_node_down(Node),
ok = rabbit_alarm:on_node_down(Node).
-handle_live_rabbit(Node, IsDiscNode) ->
- ok = rabbit_mnesia:on_node_up(Node, IsDiscNode),
- ok = rabbit_alarm:on_node_up(Node).
+handle_live_rabbit(Node) ->
+ ok = rabbit_alarm:on_node_up(Node),
+ ok = rabbit_mnesia:on_node_down(Node).
+
+%%--------------------------------------------------------------------
+%% Internal utils
+%%--------------------------------------------------------------------
+
+cluster_multicall(Fun, Args) ->
+ Node = node(),
+ Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
+ %% notify other rabbits of this cluster
+ case rpc:multicall(Nodes, rabbit_node_monitor, Fun, Args,
+ ?RABBIT_UP_RPC_TIMEOUT) of
+ {_, [] } -> ok;
+ {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
+ end,
+ Nodes.
+
+is_already_monitored(Item) ->
+ {monitors, Monitors} = process_info(self(), monitors),
+ lists:any(fun ({_, Item1}) when Item =:= Item1 -> true;
+ (_) -> false
+ end, Monitors).
+
+legacy_should_be_disc_node(DiscNodes) ->
+ DiscNodes == [] orelse lists:member(node(), DiscNodes).
+
+%%--------------------------------------------------------------------
+%% Legacy functions related to the "running nodes" file
+%%--------------------------------------------------------------------
+
+legacy_running_nodes_filename() ->
+ filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown").
+
+legacy_read_previously_running_nodes() ->
+ FileName = legacy_running_nodes_filename(),
+ case rabbit_file:read_term_file(FileName) of
+ {ok, [Nodes]} -> Nodes;
+ {error, enoent} -> [];
+ {error, Reason} -> throw({error, {cannot_read_previous_nodes_file,
+ FileName, Reason}})
+ end.
+legacy_delete_previously_running_nodes() ->
+ FileName = legacy_running_nodes_filename(),
+ case file:delete(FileName) of
+ ok -> ok;
+ {error, enoent} -> ok;
+ {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file,
+ FileName, Reason}})
+ end.