summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mnesia.erl46
-rw-r--r--src/rabbit_node_monitor.erl47
2 files changed, 66 insertions, 27 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 7aac7f3f63..679f1c8481 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -42,8 +42,9 @@
copy_db/1,
wait_for_tables/0,
- on_node_up/1,
- on_node_down/1
+ on_node_up/2,
+ on_node_down/1,
+ on_node_leave/1
]).
%% Used internally in rpc calls
@@ -65,7 +66,8 @@
-export_type([node_type/0]).
-type(node_type() :: disc | ram).
--type(node_status() :: {[node()], [node()], [node()]}).
+-type(node_status() :: {ordsets:ordset(node()), ordsets:ordset(node()),
+ ordsets:ordset(node())}).
%% Main interface
-spec(prepare/0 :: () -> 'ok').
@@ -97,8 +99,9 @@
-spec(wait_for_tables/1 :: ([atom()]) -> 'ok').
%% Hooks used in `rabbit_node_monitor'
--spec(on_node_up/1 :: (node()) -> 'ok').
+-spec(on_node_up/2 :: (node(), boolean()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
+-spec(on_node_leave/1 :: (node()) -> 'ok').
%% Functions used in internal rpc calls
-spec(cluster_status_if_running/0 :: () -> {'ok', node_status()} | 'error').
@@ -409,9 +412,9 @@ running_clustered_disc_nodes() ->
cluster_status_if_running() ->
case mnesia:system_info(is_running) of
no -> error;
- yes -> {ok, {mnesia:system_info(db_nodes),
- mnesia:table_info(schema, disc_copies),
- mnesia:system_info(running_db_nodes)}}
+ yes -> {ok, {ordsets:from_list(mnesia:system_info(db_nodes)),
+ ordsets:from_list(mnesia:table_info(schema, disc_copies)),
+ ordsets:from_list(mnesia:system_info(running_db_nodes))}}
end.
node_info() ->
@@ -703,8 +706,15 @@ update_cluster_nodes_status() ->
%% Hooks for `rabbit_node_monitor'
%%--------------------------------------------------------------------
-on_node_up(Node) ->
- update_cluster_nodes_status(),
+on_node_up(Node, IsDiscNode) ->
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_nodes_status(),
+ write_cluster_nodes_status({ordsets:add_element(Node, AllNodes),
+ case IsDiscNode of
+ true -> ordsets:add_element(Node,
+ DiscNodes);
+ false -> DiscNodes
+ end,
+ ordsets:add_element(Node, RunningNodes)}),
case is_only_disc_node(Node) of
true -> rabbit_log:info("cluster contains disc nodes again~n");
false -> ok
@@ -715,7 +725,15 @@ on_node_down(Node) ->
true -> rabbit_log:info("only running disc node went down~n");
false -> ok
end,
- update_cluster_nodes_status().
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_nodes_status(),
+ write_cluster_nodes_status({AllNodes, DiscNodes,
+ ordsets:del_element(Node, RunningNodes)}).
+
+on_node_leave(Node) ->
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_nodes_status(),
+ write_cluster_nodes_status({ordsets:del_element(Node, AllNodes),
+ ordsets:del_element(Node, DiscNodes),
+ ordsets:del_element(Node, RunningNodes)}).
%%--------------------------------------------------------------------
%% Internal helpers
@@ -1009,11 +1027,7 @@ remove_node_if_mnesia_running(Node) ->
%% propagated to all nodes
case mnesia:del_table_copy(schema, Node) of
{atomic, ok} ->
- update_cluster_nodes_status(),
- io:format("nodes: ~p~n", [running_clustered_disc_nodes()]),
- {_, []} = rpc:multicall(running_clustered_nodes(),
- rabbit_mnesia,
- update_cluster_nodes_status, []),
+ rabbit_node_monitor:notify_leave_cluster(Node),
ok;
{aborted, Reason} ->
{error, {failed_to_remove_node, Node, Reason}}
@@ -1027,7 +1041,7 @@ leave_cluster() ->
remove_node_remotely(Removee) ->
case running_clustered_nodes() -- [Removee] of
[] ->
- ok;
+ {error, no_running_cluster_nodes};
RunningNodes ->
case lists:any(
fun (Node) ->
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 323cf0ce9e..a25d0fe04d 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -22,7 +22,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([notify_cluster/0, rabbit_running_on/1]).
+-export([rabbit_running_on/2, rabbit_left_cluster/1, notify_cluster/0,
+ notify_leave_cluster/1]).
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
@@ -32,8 +33,10 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(rabbit_running_on/1 :: (node()) -> 'ok').
+-spec(rabbit_running_on/2 :: (node(), boolean()) -> 'ok').
+-spec(rabbit_left_cluster/1 :: (node()) -> 'ok').
-spec(notify_cluster/0 :: () -> 'ok').
+-spec(notify_leave_cluster/1 :: (node()) -> 'ok').
-endif.
@@ -42,15 +45,23 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+rabbit_running_on(Node, IsDiscNode) ->
+ gen_server:cast(rabbit_node_monitor,
+ {rabbit_running_on, Node, IsDiscNode}).
+
rabbit_running_on(Node) ->
- gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node}).
+ rabbit_running_on(Node, rabbit_mnesia:is_disc_node()).
+
+rabbit_left_cluster(Node) ->
+ gen_server:cast(rabbit_node_monitor, {rabbit_left_cluster, Node}).
notify_cluster() ->
Node = node(),
Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
%% notify other rabbits of this rabbit
case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on,
- [Node], ?RABBIT_UP_RPC_TIMEOUT) of
+ [Node, rabbit_mnesia:is_disc_node()],
+ ?RABBIT_UP_RPC_TIMEOUT) of
{_, [] } -> ok;
{_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
end,
@@ -58,6 +69,16 @@ notify_cluster() ->
[ rabbit_running_on(N) || N <- Nodes ],
ok.
+notify_leave_cluster(Node) ->
+ Nodes = rabbit_mnesia:running_clustered_nodes() -- [node()],
+ rabbit_left_cluster(Node),
+ case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_left_cluster, [Node],
+ ?RABBIT_UP_RPC_TIMEOUT) of
+ {_, [] } -> ok;
+ {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
+ end,
+ ok.
+
%%--------------------------------------------------------------------
init([]) ->
@@ -66,14 +87,17 @@ init([]) ->
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast({rabbit_running_on, Node}, Nodes) ->
+handle_cast({rabbit_running_on, Node, IsDiscNode}, Nodes) ->
case ordsets:is_element(Node, Nodes) of
true -> {noreply, Nodes};
false -> rabbit_log:info("rabbit on node ~p up~n", [Node]),
erlang:monitor(process, {rabbit, Node}),
- ok = handle_live_rabbit(Node),
+ ok = handle_live_rabbit(Node, IsDiscNode),
{noreply, ordsets:add_element(Node, Nodes)}
end;
+handle_cast({rabbit_left_cluster, Node}, Nodes) ->
+ ok = rabbit_mnesia:on_node_leave(Node),
+ {noreply, Nodes};
handle_cast(_Msg, State) ->
{noreply, State}.
@@ -96,11 +120,12 @@ code_change(_OldVsn, State, _Extra) ->
%% of nodes. We really only need to execute some of these statements
%% on *one* node, rather than all of them.
handle_dead_rabbit(Node) ->
+ ok = rabbit_mnesia:on_node_down(Node),
ok = rabbit_networking:on_node_down(Node),
ok = rabbit_amqqueue:on_node_down(Node),
- ok = rabbit_alarm:on_node_down(Node),
- ok = rabbit_mnesia:on_node_down(Node).
+ ok = rabbit_alarm:on_node_down(Node).
+
+handle_live_rabbit(Node, IsDiscNode) ->
+ ok = rabbit_mnesia:on_node_up(Node, IsDiscNode),
+ ok = rabbit_alarm:on_node_up(Node).
-handle_live_rabbit(Node) ->
- ok = rabbit_alarm:on_node_up(Node),
- ok = rabbit_mnesia:on_node_up(Node).