diff options
| author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-06-14 18:07:01 +0100 |
|---|---|---|
| committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-06-14 18:07:01 +0100 |
| commit | e6f8c8cadabb9ee7487d73974c33c38c4060449a (patch) | |
| tree | 3a4b51c2396f767e50901b37846b28d1f9c85156 /src | |
| parent | f524ba790c5056c0386de0237cc0c8f83814db01 (diff) | |
| download | rabbitmq-server-git-e6f8c8cadabb9ee7487d73974c33c38c4060449a.tar.gz | |
add a "leave cluster" hook in `rabbit_node_monitor', handle cluster nodes joins/departures better
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mnesia.erl | 46 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 47 |
2 files changed, 66 insertions, 27 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 7aac7f3f63..679f1c8481 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -42,8 +42,9 @@ copy_db/1, wait_for_tables/0, - on_node_up/1, - on_node_down/1 + on_node_up/2, + on_node_down/1, + on_node_leave/1 ]). %% Used internally in rpc calls @@ -65,7 +66,8 @@ -export_type([node_type/0]). -type(node_type() :: disc | ram). --type(node_status() :: {[node()], [node()], [node()]}). +-type(node_status() :: {ordsets:ordset(node()), ordsets:ordset(node()), + ordsets:ordset(node())}). %% Main interface -spec(prepare/0 :: () -> 'ok'). @@ -97,8 +99,9 @@ -spec(wait_for_tables/1 :: ([atom()]) -> 'ok'). %% Hooks used in `rabbit_node_monitor' --spec(on_node_up/1 :: (node()) -> 'ok'). +-spec(on_node_up/2 :: (node(), boolean()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). +-spec(on_node_leave/1 :: (node()) -> 'ok'). %% Functions used in internal rpc calls -spec(cluster_status_if_running/0 :: () -> {'ok', node_status()} | 'error'). @@ -409,9 +412,9 @@ running_clustered_disc_nodes() -> cluster_status_if_running() -> case mnesia:system_info(is_running) of no -> error; - yes -> {ok, {mnesia:system_info(db_nodes), - mnesia:table_info(schema, disc_copies), - mnesia:system_info(running_db_nodes)}} + yes -> {ok, {ordsets:from_list(mnesia:system_info(db_nodes)), + ordsets:from_list(mnesia:table_info(schema, disc_copies)), + ordsets:from_list(mnesia:system_info(running_db_nodes))}} end. node_info() -> @@ -703,8 +706,15 @@ update_cluster_nodes_status() -> %% Hooks for `rabbit_node_monitor' %%-------------------------------------------------------------------- -on_node_up(Node) -> - update_cluster_nodes_status(), +on_node_up(Node, IsDiscNode) -> + {AllNodes, DiscNodes, RunningNodes} = read_cluster_nodes_status(), + write_cluster_nodes_status({ordsets:add_element(Node, AllNodes), + case IsDiscNode of + true -> ordsets:add_element(Node, + DiscNodes); + false -> DiscNodes + end, + ordsets:add_element(Node, RunningNodes)}), case is_only_disc_node(Node) of true -> rabbit_log:info("cluster contains disc nodes again~n"); false -> ok @@ -715,7 +725,15 @@ on_node_down(Node) -> true -> rabbit_log:info("only running disc node went down~n"); false -> ok end, - update_cluster_nodes_status(). + {AllNodes, DiscNodes, RunningNodes} = read_cluster_nodes_status(), + write_cluster_nodes_status({AllNodes, DiscNodes, + ordsets:del_element(Node, RunningNodes)}). + +on_node_leave(Node) -> + {AllNodes, DiscNodes, RunningNodes} = read_cluster_nodes_status(), + write_cluster_nodes_status({ordsets:del_element(Node, AllNodes), + ordsets:del_element(Node, DiscNodes), + ordsets:del_element(Node, RunningNodes)}). %%-------------------------------------------------------------------- %% Internal helpers @@ -1009,11 +1027,7 @@ remove_node_if_mnesia_running(Node) -> %% propagated to all nodes case mnesia:del_table_copy(schema, Node) of {atomic, ok} -> - update_cluster_nodes_status(), - io:format("nodes: ~p~n", [running_clustered_disc_nodes()]), - {_, []} = rpc:multicall(running_clustered_nodes(), - rabbit_mnesia, - update_cluster_nodes_status, []), + rabbit_node_monitor:notify_leave_cluster(Node), ok; {aborted, Reason} -> {error, {failed_to_remove_node, Node, Reason}} @@ -1027,7 +1041,7 @@ leave_cluster() -> remove_node_remotely(Removee) -> case running_clustered_nodes() -- [Removee] of [] -> - ok; + {error, no_running_cluster_nodes}; RunningNodes -> case lists:any( fun (Node) -> diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 323cf0ce9e..a25d0fe04d 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -22,7 +22,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([notify_cluster/0, rabbit_running_on/1]). +-export([rabbit_running_on/2, rabbit_left_cluster/1, notify_cluster/0, + notify_leave_cluster/1]). -define(SERVER, ?MODULE). -define(RABBIT_UP_RPC_TIMEOUT, 2000). @@ -32,8 +33,10 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(rabbit_running_on/1 :: (node()) -> 'ok'). +-spec(rabbit_running_on/2 :: (node(), boolean()) -> 'ok'). +-spec(rabbit_left_cluster/1 :: (node()) -> 'ok'). -spec(notify_cluster/0 :: () -> 'ok'). +-spec(notify_leave_cluster/1 :: (node()) -> 'ok'). -endif. @@ -42,15 +45,23 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +rabbit_running_on(Node, IsDiscNode) -> + gen_server:cast(rabbit_node_monitor, + {rabbit_running_on, Node, IsDiscNode}). + rabbit_running_on(Node) -> - gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node}). + rabbit_running_on(Node, rabbit_mnesia:is_disc_node()). + +rabbit_left_cluster(Node) -> + gen_server:cast(rabbit_node_monitor, {rabbit_left_cluster, Node}). notify_cluster() -> Node = node(), Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node], %% notify other rabbits of this rabbit case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on, - [Node], ?RABBIT_UP_RPC_TIMEOUT) of + [Node, rabbit_mnesia:is_disc_node()], + ?RABBIT_UP_RPC_TIMEOUT) of {_, [] } -> ok; {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad]) end, @@ -58,6 +69,16 @@ notify_cluster() -> [ rabbit_running_on(N) || N <- Nodes ], ok. +notify_leave_cluster(Node) -> + Nodes = rabbit_mnesia:running_clustered_nodes() -- [node()], + rabbit_left_cluster(Node), + case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_left_cluster, [Node], + ?RABBIT_UP_RPC_TIMEOUT) of + {_, [] } -> ok; + {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad]) + end, + ok. + %%-------------------------------------------------------------------- init([]) -> @@ -66,14 +87,17 @@ init([]) -> handle_call(_Request, _From, State) -> {noreply, State}. -handle_cast({rabbit_running_on, Node}, Nodes) -> +handle_cast({rabbit_running_on, Node, IsDiscNode}, Nodes) -> case ordsets:is_element(Node, Nodes) of true -> {noreply, Nodes}; false -> rabbit_log:info("rabbit on node ~p up~n", [Node]), erlang:monitor(process, {rabbit, Node}), - ok = handle_live_rabbit(Node), + ok = handle_live_rabbit(Node, IsDiscNode), {noreply, ordsets:add_element(Node, Nodes)} end; +handle_cast({rabbit_left_cluster, Node}, Nodes) -> + ok = rabbit_mnesia:on_node_leave(Node), + {noreply, Nodes}; handle_cast(_Msg, State) -> {noreply, State}. @@ -96,11 +120,12 @@ code_change(_OldVsn, State, _Extra) -> %% of nodes. We really only need to execute some of these statements %% on *one* node, rather than all of them. handle_dead_rabbit(Node) -> + ok = rabbit_mnesia:on_node_down(Node), ok = rabbit_networking:on_node_down(Node), ok = rabbit_amqqueue:on_node_down(Node), - ok = rabbit_alarm:on_node_down(Node), - ok = rabbit_mnesia:on_node_down(Node). + ok = rabbit_alarm:on_node_down(Node). + +handle_live_rabbit(Node, IsDiscNode) -> + ok = rabbit_mnesia:on_node_up(Node, IsDiscNode), + ok = rabbit_alarm:on_node_up(Node). -handle_live_rabbit(Node) -> - ok = rabbit_alarm:on_node_up(Node), - ok = rabbit_mnesia:on_node_up(Node). |
