summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-02-07 12:15:21 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-02-07 12:15:21 +0000
commit7dbbd35b10310218ce8acf7c5e5af76086f8207a (patch)
tree3be8ad91887807e3e7225344b6334bd27c1c0bfc
parent5d3d29173995d6181764246685d5f30c469de5b8 (diff)
parenta30b391098fe92ae34b80440a842ce1eb26ca608 (diff)
downloadrabbitmq-server-git-7dbbd35b10310218ce8acf7c5e5af76086f8207a.tar.gz
Merge bug26002
-rw-r--r--src/rabbit_amqqueue.erl14
-rw-r--r--src/rabbit_mirror_queue_misc.erl79
2 files changed, 47 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 346d57a8e6..2b86435d28 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -354,14 +354,14 @@ with(Name, F, E) ->
{ok, Q = #amqqueue{pid = QPid}} ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
- %% with the QPid.
+ %% with the QPid. F() should be written s.t. that this
+ %% cannot happen, so we bail if it does since that
+ %% indicates a code bug and we don't want to get stuck in
+ %% the retry loop.
rabbit_misc:with_exit_handler(
- fun () ->
- case rabbit_misc:is_process_alive(QPid) of
- true -> E(not_found_or_absent_dirty(Name));
- false -> timer:sleep(25),
- with(Name, F, E)
- end
+ fun () -> false = rabbit_misc:is_process_alive(QPid),
+ timer:sleep(25),
+ with(Name, F, E)
end, fun () -> F(Q) end);
{error, not_found} ->
E(not_found_or_absent_dirty(Name))
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index ca49573357..4f77009c47 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -148,53 +148,54 @@ drop_mirrors(QName, Nodes) ->
ok.
drop_mirror(QName, MirrorNode) ->
- rabbit_amqqueue:with(
- QName,
- fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) ->
- case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
- [] ->
- {error, {queue_not_mirrored_on_node, MirrorNode}};
- [QPid] when SPids =:= [] ->
- {error, cannot_drop_only_mirror};
- [Pid] ->
- rabbit_log:info(
- "Dropping queue mirror on node ~p for ~s~n",
- [MirrorNode, rabbit_misc:rs(Name)]),
- exit(Pid, {shutdown, dropped}),
- {ok, dropped}
- end
- end).
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, #amqqueue { name = Name, pid = QPid, slave_pids = SPids }} ->
+ case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
+ [] ->
+ {error, {queue_not_mirrored_on_node, MirrorNode}};
+ [QPid] when SPids =:= [] ->
+ {error, cannot_drop_only_mirror};
+ [Pid] ->
+ rabbit_log:info(
+ "Dropping queue mirror on node ~p for ~s~n",
+ [MirrorNode, rabbit_misc:rs(Name)]),
+ exit(Pid, {shutdown, dropped}),
+ {ok, dropped}
+ end;
+ {error, not_found} = E ->
+ E
+ end.
add_mirrors(QName, Nodes, SyncMode) ->
[add_mirror(QName, Node, SyncMode) || Node <- Nodes],
ok.
add_mirror(QName, MirrorNode, SyncMode) ->
- rabbit_amqqueue:with(
- QName,
- fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
- case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
- [] ->
- start_child(Name, MirrorNode, Q, SyncMode);
- [SPid] ->
- case rabbit_misc:is_process_alive(SPid) of
- true -> {ok, already_mirrored};
- false -> start_child(Name, MirrorNode, Q, SyncMode)
- end
- end
- end).
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, #amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q} ->
+ case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
+ [] ->
+ start_child(Name, MirrorNode, Q, SyncMode);
+ [SPid] ->
+ case rabbit_misc:is_process_alive(SPid) of
+ true -> {ok, already_mirrored};
+ false -> start_child(Name, MirrorNode, Q, SyncMode)
+ end
+ end;
+ {error, not_found} = E ->
+ E
+ end.
start_child(Name, MirrorNode, Q, SyncMode) ->
- case rabbit_misc:with_exit_handler(
- rabbit_misc:const(down),
- fun () ->
- rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q])
- end) of
- {ok, SPid} -> rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, SPid]),
- rabbit_mirror_queue_slave:go(SPid, SyncMode);
- _ -> ok
- end.
+ rabbit_misc:with_exit_handler(
+ rabbit_misc:const(ok),
+ fun () ->
+ {ok, SPid} = rabbit_mirror_queue_slave_sup:start_child(
+ MirrorNode, [Q]),
+ rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, SPid]),
+ rabbit_mirror_queue_slave:go(SPid, SyncMode)
+ end).
report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
ok;