diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2011-08-10 15:03:16 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2011-08-10 15:03:16 +0100 |
| commit | 282c6f53d129fc7b0e1589e84fb41f575df60428 (patch) | |
| tree | 5eec788d8fe5af47b7029a9c16ad414172d5f725 | |
| parent | b22535586fb5a141046b732cf02a933142f848bc (diff) | |
| download | rabbitmq-server-git-282c6f53d129fc7b0e1589e84fb41f575df60428.tar.gz | |
Identify the (possibly former if it changed) master pid in the queue_mirror_deaths event.
| -rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 5 |
3 files changed, 15 insertions, 11 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index bbc75c3213..00ef72ac74 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -341,8 +341,9 @@ handle_call(get_gm, _From, State = #state { gm = GM }) -> handle_cast({gm_deaths, Deaths}, State = #state { q = #amqqueue { name = QueueName } }) -> case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of - {ok, Pid, DeadPids} when node(Pid) =:= node() -> - rabbit_mirror_queue_misc:report_deaths(true, QueueName, DeadPids), + {ok, Pid, OldPid, DeadPids} when node(Pid) =:= node() -> + rabbit_mirror_queue_misc:report_deaths(true, QueueName, OldPid, + DeadPids), noreply(State); {error, not_found} -> {stop, normal, State} diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index a9d0fe793f..cafe484ff7 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -18,7 +18,7 @@ -export([remove_from_queue/2, on_node_up/0, drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3, - report_deaths/3]). + report_deaths/4]). -include("rabbit.hrl"). @@ -29,6 +29,7 @@ %% become the new master, which is bad because it could then mean the %% slave (now master) receives messages it's not ready for (for %% example, new consumers). +%% Returns {ok, NewMPid, OldMPid, DeadPids} remove_from_queue(QueueName, DeadPids) -> DeadNodes = [node(DeadPid) || DeadPid <- DeadPids], rabbit_misc:execute_mnesia_transaction( @@ -44,7 +45,7 @@ remove_from_queue(QueueName, DeadPids) -> not lists:member(node(Pid), DeadNodes)], case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> - {ok, QPid1, []}; + {ok, QPid1, QPid1, []}; _ when QPid =:= QPid1 orelse node(QPid1) =:= node() -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have @@ -52,13 +53,13 @@ remove_from_queue(QueueName, DeadPids) -> Q1 = Q #amqqueue { pid = QPid1, slave_pids = SPids1 }, ok = rabbit_amqqueue:store_queue(Q1), - {ok, QPid1, [QPid | SPids] -- Alive}; + {ok, QPid1, QPid, [QPid | SPids] -- Alive}; _ -> %% Master has changed, and we're not it, %% so leave alone to allow the promoted %% slave to find it and make its %% promotion atomic. - {ok, QPid1, []} + {ok, QPid1, QPid, []} end end end). @@ -135,10 +136,11 @@ if_mirrored_queue(Queue, Fun) -> end end). -report_deaths(_IsMaster, _QueueName, []) -> +report_deaths(_IsMaster, _QueueName, _OldMPid, []) -> ok; -report_deaths(IsMaster, QueueName, DeadPids) -> - rabbit_event:notify(queue_mirror_deaths, [{pids, DeadPids}]), +report_deaths(IsMaster, QueueName, OldMPid, DeadPids) -> + rabbit_event:notify(queue_mirror_deaths, [{master_pid, OldMPid}, + {pids, DeadPids}]), rabbit_log:info("Mirrored-queue (~s): ~s ~s saw deaths of mirrors ~s~n", [rabbit_misc:rs(QueueName), case IsMaster of diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 2f71dea4e1..c12ea0cca3 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -170,8 +170,9 @@ handle_call({gm_deaths, Deaths}, From, {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State}; - {ok, Pid, DeadPids} -> - rabbit_mirror_queue_misc:report_deaths(false, QueueName, DeadPids), + {ok, Pid, OldPid, DeadPids} -> + rabbit_mirror_queue_misc:report_deaths(false, QueueName, OldPid, + DeadPids), if node(Pid) =:= node(MPid) -> %% master hasn't changed reply(ok, State); |
