diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-08-10 16:20:18 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-08-10 16:20:18 +0100 |
| commit | 14fea80fbd9be28925a602ca9454fc55f63f3b6b (patch) | |
| tree | 8e2f2602705b2066cad7f8c74f5322a4ccc13956 /src | |
| parent | ab82e9f59f7e376f8978864840249051b4bffdab (diff) | |
| download | rabbitmq-server-git-14fea80fbd9be28925a602ca9454fc55f63f3b6b.tar.gz | |
Reworkings
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 6 |
3 files changed, 15 insertions, 14 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 072de5b2f2..bae4df47ae 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -339,10 +339,11 @@ handle_call(get_gm, _From, State = #state { gm = GM }) -> reply(GM, State). handle_cast({gm_deaths, Deaths}, - State = #state { q = #amqqueue { name = QueueName } }) -> + State = #state { q = #amqqueue { name = QueueName, pid = MPid } }) + when node(MPid) =:= node() -> case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of - {ok, Pid, Pid, DeadPids} when node(Pid) =:= node() -> - rabbit_mirror_queue_misc:report_deaths(true, QueueName, Pid, + {ok, MPid, DeadPids} -> + rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName, MPid, DeadPids), noreply(State); {error, not_found} -> diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index cafe484ff7..079a9f1a97 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -18,7 +18,7 @@ -export([remove_from_queue/2, on_node_up/0, drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3, - report_deaths/4]). + report_deaths/5]). -include("rabbit.hrl"). @@ -29,7 +29,7 @@ %% become the new master, which is bad because it could then mean the %% slave (now master) receives messages it's not ready for (for %% example, new consumers). -%% Returns {ok, NewMPid, OldMPid, DeadPids} +%% Returns {ok, NewMPid, DeadPids} remove_from_queue(QueueName, DeadPids) -> DeadNodes = [node(DeadPid) || DeadPid <- DeadPids], rabbit_misc:execute_mnesia_transaction( @@ -45,7 +45,7 @@ remove_from_queue(QueueName, DeadPids) -> not lists:member(node(Pid), DeadNodes)], case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> - {ok, QPid1, QPid1, []}; + {ok, QPid1, []}; _ when QPid =:= QPid1 orelse node(QPid1) =:= node() -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have @@ -53,13 +53,13 @@ remove_from_queue(QueueName, DeadPids) -> Q1 = Q #amqqueue { pid = QPid1, slave_pids = SPids1 }, ok = rabbit_amqqueue:store_queue(Q1), - {ok, QPid1, QPid, [QPid | SPids] -- Alive}; + {ok, QPid1, [QPid | SPids] -- Alive}; _ -> %% Master has changed, and we're not it, %% so leave alone to allow the promoted %% slave to find it and make its %% promotion atomic. - {ok, QPid1, QPid, []} + {ok, QPid1, []} end end end). @@ -136,9 +136,9 @@ if_mirrored_queue(Queue, Fun) -> end end). -report_deaths(_IsMaster, _QueueName, _OldMPid, []) -> +report_deaths(_MirrorPid, _IsMaster, _QueueName, _OldMPid, []) -> ok; -report_deaths(IsMaster, QueueName, OldMPid, DeadPids) -> +report_deaths(MirrorPid, IsMaster, QueueName, OldMPid, DeadPids) -> rabbit_event:notify(queue_mirror_deaths, [{master_pid, OldMPid}, {pids, DeadPids}]), rabbit_log:info("Mirrored-queue (~s): ~s ~s saw deaths of mirrors ~s~n", @@ -147,5 +147,5 @@ report_deaths(IsMaster, QueueName, OldMPid, DeadPids) -> true -> "Master"; false -> "Slave" end, - rabbit_misc:pid_to_string(self()), + rabbit_misc:pid_to_string(MirrorPid), [[rabbit_misc:pid_to_string(P), $ ] || P <- DeadPids]]). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index c12ea0cca3..2ec3a97abe 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -170,9 +170,9 @@ handle_call({gm_deaths, Deaths}, From, {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State}; - {ok, Pid, OldPid, DeadPids} -> - rabbit_mirror_queue_misc:report_deaths(false, QueueName, OldPid, - DeadPids), + {ok, Pid, DeadPids} -> + rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName, + MPid, DeadPids), if node(Pid) =:= node(MPid) -> %% master hasn't changed reply(ok, State); |
