diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-08-10 13:39:09 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-08-10 13:39:09 +0100 |
| commit | b22535586fb5a141046b732cf02a933142f848bc (patch) | |
| tree | 3a9019a8c89acf30e34b1cee7e3c0a32b1954a3b /src | |
| parent | 4837a81deacf1ce23358384ebbd900e7d6b6ed46 (diff) | |
| download | rabbitmq-server-git-b22535586fb5a141046b732cf02a933142f848bc.tar.gz | |
Various improvements mainly to detection of and reporting of death
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 34 |
3 files changed, 41 insertions, 30 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 0b9f053f29..bbc75c3213 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -55,7 +55,7 @@ %% %% A queue with mirrors consists of the following: %% -%% #amqqueue{ pid, mirror_pids } +%% #amqqueue{ pid, slave_pids } %% | | %% +----------+ +-------+--------------+-----------...etc... %% | | | @@ -340,12 +340,9 @@ handle_call(get_gm, _From, State = #state { gm = GM }) -> handle_cast({gm_deaths, Deaths}, State = #state { q = #amqqueue { name = QueueName } }) -> - rabbit_log:info("Mirrored-queue (~s): Master ~s saw deaths of mirrors ~s~n", - [rabbit_misc:rs(QueueName), - rabbit_misc:pid_to_string(self()), - [[rabbit_misc:pid_to_string(Pid), $ ] || Pid <- Deaths]]), case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of - {ok, Pid} when node(Pid) =:= node() -> + {ok, Pid, DeadPids} when node(Pid) =:= node() -> + rabbit_mirror_queue_misc:report_deaths(true, QueueName, DeadPids), noreply(State); {error, not_found} -> {stop, normal, State} diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 6a9f733e41..a9d0fe793f 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -17,7 +17,8 @@ -module(rabbit_mirror_queue_misc). -export([remove_from_queue/2, on_node_up/0, - drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3]). + drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3, + report_deaths/3]). -include("rabbit.hrl"). @@ -38,27 +39,27 @@ remove_from_queue(QueueName, DeadPids) -> [] -> {error, not_found}; [Q = #amqqueue { pid = QPid, slave_pids = SPids }] -> - [QPid1 | SPids1] = + [QPid1 | SPids1] = Alive = [Pid || Pid <- [QPid | SPids], not lists:member(node(Pid), DeadNodes)], case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> - ok; + {ok, QPid1, []}; _ when QPid =:= QPid1 orelse node(QPid1) =:= node() -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. Q1 = Q #amqqueue { pid = QPid1, slave_pids = SPids1 }, - ok = rabbit_amqqueue:store_queue(Q1); + ok = rabbit_amqqueue:store_queue(Q1), + {ok, QPid1, [QPid | SPids] -- Alive}; _ -> %% Master has changed, and we're not it, %% so leave alone to allow the promoted %% slave to find it and make its %% promotion atomic. - ok - end, - {ok, QPid1} + {ok, QPid1, []} + end end end). @@ -133,3 +134,16 @@ if_mirrored_queue(Queue, Fun) -> _ -> Fun(Q) end end). + +report_deaths(_IsMaster, _QueueName, []) -> + ok; +report_deaths(IsMaster, QueueName, DeadPids) -> + rabbit_event:notify(queue_mirror_deaths, [{pids, DeadPids}]), + rabbit_log:info("Mirrored-queue (~s): ~s ~s saw deaths of mirrors ~s~n", + [rabbit_misc:rs(QueueName), + case IsMaster of + true -> "Master"; + false -> "Slave" + end, + rabbit_misc:pid_to_string(self()), + [[rabbit_misc:pid_to_string(P), $ ] || P <- DeadPids]]). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index fa46ad1157..2f71dea4e1 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -164,28 +164,27 @@ handle_call({gm_deaths, Deaths}, From, State = #state { q = #amqqueue { name = QueueName }, gm = GM, master_pid = MPid }) -> - rabbit_log:info("Mirrored-queue (~s): Slave ~s saw deaths of mirrors ~s~n", - [rabbit_misc:rs(QueueName), - rabbit_misc:pid_to_string(self()), - [[rabbit_misc:pid_to_string(Pid), $ ] || Pid <- Deaths]]), %% The GM has told us about deaths, which means we're not going to %% receive any more messages from GM case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of - {ok, Pid} when node(Pid) =:= node(MPid) -> - %% master hasn't changed - reply(ok, State); - {ok, Pid} when node(Pid) =:= node() -> - %% we've become master - promote_me(From, State); - {ok, Pid} -> - %% master has changed to not us. - gen_server2:reply(From, ok), - erlang:monitor(process, Pid), - ok = gm:broadcast(GM, heartbeat), - noreply(State #state { master_pid = Pid }); {error, not_found} -> gen_server2:reply(From, ok), - {stop, normal, State} + {stop, normal, State}; + {ok, Pid, DeadPids} -> + rabbit_mirror_queue_misc:report_deaths(false, QueueName, DeadPids), + if node(Pid) =:= node(MPid) -> + %% master hasn't changed + reply(ok, State); + node(Pid) =:= node() -> + %% we've become master + promote_me(From, State); + true -> + %% master has changed to not us. + gen_server2:reply(From, ok), + erlang:monitor(process, Pid), + ok = gm:broadcast(GM, heartbeat), + noreply(State #state { master_pid = Pid }) + end end; handle_call(info, _From, State) -> @@ -282,6 +281,7 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, prioritise_call(Msg, _From, _State) -> case Msg of + info -> 9; {gm_deaths, _Deaths} -> 5; _ -> 0 end. |
