summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-08-10 16:20:18 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-08-10 16:20:18 +0100
commit14fea80fbd9be28925a602ca9454fc55f63f3b6b (patch)
tree8e2f2602705b2066cad7f8c74f5322a4ccc13956 /src
parentab82e9f59f7e376f8978864840249051b4bffdab (diff)
downloadrabbitmq-server-git-14fea80fbd9be28925a602ca9454fc55f63f3b6b.tar.gz
Reworkings
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl7
-rw-r--r--src/rabbit_mirror_queue_misc.erl16
-rw-r--r--src/rabbit_mirror_queue_slave.erl6
3 files changed, 15 insertions, 14 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 072de5b2f2..bae4df47ae 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -339,10 +339,11 @@ handle_call(get_gm, _From, State = #state { gm = GM }) ->
reply(GM, State).
handle_cast({gm_deaths, Deaths},
- State = #state { q = #amqqueue { name = QueueName } }) ->
+ State = #state { q = #amqqueue { name = QueueName, pid = MPid } })
+ when node(MPid) =:= node() ->
case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
- {ok, Pid, Pid, DeadPids} when node(Pid) =:= node() ->
- rabbit_mirror_queue_misc:report_deaths(true, QueueName, Pid,
+ {ok, MPid, DeadPids} ->
+ rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName, MPid,
DeadPids),
noreply(State);
{error, not_found} ->
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index cafe484ff7..079a9f1a97 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -18,7 +18,7 @@
-export([remove_from_queue/2, on_node_up/0,
drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3,
- report_deaths/4]).
+ report_deaths/5]).
-include("rabbit.hrl").
@@ -29,7 +29,7 @@
%% become the new master, which is bad because it could then mean the
%% slave (now master) receives messages it's not ready for (for
%% example, new consumers).
-%% Returns {ok, NewMPid, OldMPid, DeadPids}
+%% Returns {ok, NewMPid, DeadPids}
remove_from_queue(QueueName, DeadPids) ->
DeadNodes = [node(DeadPid) || DeadPid <- DeadPids],
rabbit_misc:execute_mnesia_transaction(
@@ -45,7 +45,7 @@ remove_from_queue(QueueName, DeadPids) ->
not lists:member(node(Pid), DeadNodes)],
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
- {ok, QPid1, QPid1, []};
+ {ok, QPid1, []};
_ when QPid =:= QPid1 orelse node(QPid1) =:= node() ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
@@ -53,13 +53,13 @@ remove_from_queue(QueueName, DeadPids) ->
Q1 = Q #amqqueue { pid = QPid1,
slave_pids = SPids1 },
ok = rabbit_amqqueue:store_queue(Q1),
- {ok, QPid1, QPid, [QPid | SPids] -- Alive};
+ {ok, QPid1, [QPid | SPids] -- Alive};
_ ->
%% Master has changed, and we're not it,
%% so leave alone to allow the promoted
%% slave to find it and make its
%% promotion atomic.
- {ok, QPid1, QPid, []}
+ {ok, QPid1, []}
end
end
end).
@@ -136,9 +136,9 @@ if_mirrored_queue(Queue, Fun) ->
end
end).
-report_deaths(_IsMaster, _QueueName, _OldMPid, []) ->
+report_deaths(_MirrorPid, _IsMaster, _QueueName, _OldMPid, []) ->
ok;
-report_deaths(IsMaster, QueueName, OldMPid, DeadPids) ->
+report_deaths(MirrorPid, IsMaster, QueueName, OldMPid, DeadPids) ->
rabbit_event:notify(queue_mirror_deaths, [{master_pid, OldMPid},
{pids, DeadPids}]),
rabbit_log:info("Mirrored-queue (~s): ~s ~s saw deaths of mirrors ~s~n",
@@ -147,5 +147,5 @@ report_deaths(IsMaster, QueueName, OldMPid, DeadPids) ->
true -> "Master";
false -> "Slave"
end,
- rabbit_misc:pid_to_string(self()),
+ rabbit_misc:pid_to_string(MirrorPid),
[[rabbit_misc:pid_to_string(P), $ ] || P <- DeadPids]]).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index c12ea0cca3..2ec3a97abe 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -170,9 +170,9 @@ handle_call({gm_deaths, Deaths}, From,
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
- {ok, Pid, OldPid, DeadPids} ->
- rabbit_mirror_queue_misc:report_deaths(false, QueueName, OldPid,
- DeadPids),
+ {ok, Pid, DeadPids} ->
+ rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName,
+ MPid, DeadPids),
if node(Pid) =:= node(MPid) ->
%% master hasn't changed
reply(ok, State);