summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-08-10 13:39:09 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-08-10 13:39:09 +0100
commitb22535586fb5a141046b732cf02a933142f848bc (patch)
tree3a9019a8c89acf30e34b1cee7e3c0a32b1954a3b /src
parent4837a81deacf1ce23358384ebbd900e7d6b6ed46 (diff)
downloadrabbitmq-server-git-b22535586fb5a141046b732cf02a933142f848bc.tar.gz
Various improvements mainly to detection of and reporting of death
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl9
-rw-r--r--src/rabbit_mirror_queue_misc.erl28
-rw-r--r--src/rabbit_mirror_queue_slave.erl34
3 files changed, 41 insertions, 30 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 0b9f053f29..bbc75c3213 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -55,7 +55,7 @@
%%
%% A queue with mirrors consists of the following:
%%
-%% #amqqueue{ pid, mirror_pids }
+%% #amqqueue{ pid, slave_pids }
%% | |
%% +----------+ +-------+--------------+-----------...etc...
%% | | |
@@ -340,12 +340,9 @@ handle_call(get_gm, _From, State = #state { gm = GM }) ->
handle_cast({gm_deaths, Deaths},
State = #state { q = #amqqueue { name = QueueName } }) ->
- rabbit_log:info("Mirrored-queue (~s): Master ~s saw deaths of mirrors ~s~n",
- [rabbit_misc:rs(QueueName),
- rabbit_misc:pid_to_string(self()),
- [[rabbit_misc:pid_to_string(Pid), $ ] || Pid <- Deaths]]),
case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
- {ok, Pid} when node(Pid) =:= node() ->
+ {ok, Pid, DeadPids} when node(Pid) =:= node() ->
+ rabbit_mirror_queue_misc:report_deaths(true, QueueName, DeadPids),
noreply(State);
{error, not_found} ->
{stop, normal, State}
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 6a9f733e41..a9d0fe793f 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -17,7 +17,8 @@
-module(rabbit_mirror_queue_misc).
-export([remove_from_queue/2, on_node_up/0,
- drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3]).
+ drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3,
+ report_deaths/3]).
-include("rabbit.hrl").
@@ -38,27 +39,27 @@ remove_from_queue(QueueName, DeadPids) ->
[] -> {error, not_found};
[Q = #amqqueue { pid = QPid,
slave_pids = SPids }] ->
- [QPid1 | SPids1] =
+ [QPid1 | SPids1] = Alive =
[Pid || Pid <- [QPid | SPids],
not lists:member(node(Pid), DeadNodes)],
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
- ok;
+ {ok, QPid1, []};
_ when QPid =:= QPid1 orelse node(QPid1) =:= node() ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
Q1 = Q #amqqueue { pid = QPid1,
slave_pids = SPids1 },
- ok = rabbit_amqqueue:store_queue(Q1);
+ ok = rabbit_amqqueue:store_queue(Q1),
+ {ok, QPid1, [QPid | SPids] -- Alive};
_ ->
%% Master has changed, and we're not it,
%% so leave alone to allow the promoted
%% slave to find it and make its
%% promotion atomic.
- ok
- end,
- {ok, QPid1}
+ {ok, QPid1, []}
+ end
end
end).
@@ -133,3 +134,16 @@ if_mirrored_queue(Queue, Fun) ->
_ -> Fun(Q)
end
end).
+
+report_deaths(_IsMaster, _QueueName, []) ->
+ ok;
+report_deaths(IsMaster, QueueName, DeadPids) ->
+ rabbit_event:notify(queue_mirror_deaths, [{pids, DeadPids}]),
+ rabbit_log:info("Mirrored-queue (~s): ~s ~s saw deaths of mirrors ~s~n",
+ [rabbit_misc:rs(QueueName),
+ case IsMaster of
+ true -> "Master";
+ false -> "Slave"
+ end,
+ rabbit_misc:pid_to_string(self()),
+ [[rabbit_misc:pid_to_string(P), $ ] || P <- DeadPids]]).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index fa46ad1157..2f71dea4e1 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -164,28 +164,27 @@ handle_call({gm_deaths, Deaths}, From,
State = #state { q = #amqqueue { name = QueueName },
gm = GM,
master_pid = MPid }) ->
- rabbit_log:info("Mirrored-queue (~s): Slave ~s saw deaths of mirrors ~s~n",
- [rabbit_misc:rs(QueueName),
- rabbit_misc:pid_to_string(self()),
- [[rabbit_misc:pid_to_string(Pid), $ ] || Pid <- Deaths]]),
%% The GM has told us about deaths, which means we're not going to
%% receive any more messages from GM
case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
- {ok, Pid} when node(Pid) =:= node(MPid) ->
- %% master hasn't changed
- reply(ok, State);
- {ok, Pid} when node(Pid) =:= node() ->
- %% we've become master
- promote_me(From, State);
- {ok, Pid} ->
- %% master has changed to not us.
- gen_server2:reply(From, ok),
- erlang:monitor(process, Pid),
- ok = gm:broadcast(GM, heartbeat),
- noreply(State #state { master_pid = Pid });
{error, not_found} ->
gen_server2:reply(From, ok),
- {stop, normal, State}
+ {stop, normal, State};
+ {ok, Pid, DeadPids} ->
+ rabbit_mirror_queue_misc:report_deaths(false, QueueName, DeadPids),
+ if node(Pid) =:= node(MPid) ->
+ %% master hasn't changed
+ reply(ok, State);
+ node(Pid) =:= node() ->
+ %% we've become master
+ promote_me(From, State);
+ true ->
+ %% master has changed to not us.
+ gen_server2:reply(From, ok),
+ erlang:monitor(process, Pid),
+ ok = gm:broadcast(GM, heartbeat),
+ noreply(State #state { master_pid = Pid })
+ end
end;
handle_call(info, _From, State) ->
@@ -282,6 +281,7 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
prioritise_call(Msg, _From, _State) ->
case Msg of
+ info -> 9;
{gm_deaths, _Deaths} -> 5;
_ -> 0
end.