summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2015-06-29 14:26:12 +0300
committerMichael Klishin <michael@novemberain.com>2015-06-29 14:26:12 +0300
commite2d819164703728b9dd0bdee47d58d3589e44151 (patch)
treea64e132c9d0654e8a5e8af1ae842a1f7149462c6 /src
parentede194b2d5600b3214c7dfbd9310c560a1ce8add (diff)
parent31173263e1c34356f7e85b0268a531b85247e0f1 (diff)
downloadrabbitmq-server-git-e2d819164703728b9dd0bdee47d58d3589e44151.tar.gz
Merge pull request #201 from rabbitmq/rabbitmq-server-200
rabbit_amqqueue:on_node_up/1: Check the queue's running slaves list
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl38
1 files changed, 32 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9ce800023f..5bfa006e09 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -782,15 +782,41 @@ on_node_up(Node) ->
fun () ->
Qs = mnesia:match_object(rabbit_queue,
#amqqueue{_ = '_'}, write),
- [case lists:member(Node, RSs) of
- true -> RSs1 = RSs -- [Node],
- store_queue(
- Q#amqqueue{recoverable_slaves = RSs1});
- false -> ok
- end || #amqqueue{recoverable_slaves = RSs} = Q <- Qs],
+ [maybe_clear_recoverable_node(Node, Q) || Q <- Qs],
ok
end).
+maybe_clear_recoverable_node(Node,
+ #amqqueue{sync_slave_pids = SPids,
+ recoverable_slaves = RSs} = Q) ->
+ case lists:member(Node, RSs) of
+ true ->
+ %% There is a race with
+ %% rabbit_mirror_queue_slave:record_synchronised/1 called
+ %% by the incoming slave node and this function, called
+ %% by the master node. If this function is executed after
+ %% record_synchronised/1, the node is erroneously removed
+ %% from the recoverable slaves list.
+ %%
+ %% We check if the slave node's queue PID is alive. If it is
+ %% the case, then this function is executed after. In this
+ %% situation, we don't touch the queue record, it is already
+ %% correct.
+ DoClearNode =
+ case [SP || SP <- SPids, node(SP) =:= Node] of
+ [SPid] -> not rabbit_misc:is_process_alive(SPid);
+ _ -> true
+ end,
+ if
+ DoClearNode -> RSs1 = RSs -- [Node],
+ store_queue(
+ Q#amqqueue{recoverable_slaves = RSs1});
+ true -> ok
+ end;
+ false ->
+ ok
+ end.
+
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =