diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-02-07 12:15:21 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-02-07 12:15:21 +0000 |
| commit | 7dbbd35b10310218ce8acf7c5e5af76086f8207a (patch) | |
| tree | 3be8ad91887807e3e7225344b6334bd27c1c0bfc | |
| parent | 5d3d29173995d6181764246685d5f30c469de5b8 (diff) | |
| parent | a30b391098fe92ae34b80440a842ce1eb26ca608 (diff) | |
| download | rabbitmq-server-git-7dbbd35b10310218ce8acf7c5e5af76086f8207a.tar.gz | |
Merge bug26002
| -rw-r--r-- | src/rabbit_amqqueue.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 79 |
2 files changed, 47 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 346d57a8e6..2b86435d28 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -354,14 +354,14 @@ with(Name, F, E) -> {ok, Q = #amqqueue{pid = QPid}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do - %% with the QPid. + %% with the QPid. F() should be written s.t. that this + %% cannot happen, so we bail if it does since that + %% indicates a code bug and we don't want to get stuck in + %% the retry loop. rabbit_misc:with_exit_handler( - fun () -> - case rabbit_misc:is_process_alive(QPid) of - true -> E(not_found_or_absent_dirty(Name)); - false -> timer:sleep(25), - with(Name, F, E) - end + fun () -> false = rabbit_misc:is_process_alive(QPid), + timer:sleep(25), + with(Name, F, E) end, fun () -> F(Q) end); {error, not_found} -> E(not_found_or_absent_dirty(Name)) diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index ca49573357..4f77009c47 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -148,53 +148,54 @@ drop_mirrors(QName, Nodes) -> ok. drop_mirror(QName, MirrorNode) -> - rabbit_amqqueue:with( - QName, - fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) -> - case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of - [] -> - {error, {queue_not_mirrored_on_node, MirrorNode}}; - [QPid] when SPids =:= [] -> - {error, cannot_drop_only_mirror}; - [Pid] -> - rabbit_log:info( - "Dropping queue mirror on node ~p for ~s~n", - [MirrorNode, rabbit_misc:rs(Name)]), - exit(Pid, {shutdown, dropped}), - {ok, dropped} - end - end). + case rabbit_amqqueue:lookup(QName) of + {ok, #amqqueue { name = Name, pid = QPid, slave_pids = SPids }} -> + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of + [] -> + {error, {queue_not_mirrored_on_node, MirrorNode}}; + [QPid] when SPids =:= [] -> + {error, cannot_drop_only_mirror}; + [Pid] -> + rabbit_log:info( + "Dropping queue mirror on node ~p for ~s~n", + [MirrorNode, rabbit_misc:rs(Name)]), + exit(Pid, {shutdown, dropped}), + {ok, dropped} + end; + {error, not_found} = E -> + E + end. add_mirrors(QName, Nodes, SyncMode) -> [add_mirror(QName, Node, SyncMode) || Node <- Nodes], ok. add_mirror(QName, MirrorNode, SyncMode) -> - rabbit_amqqueue:with( - QName, - fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> - case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of - [] -> - start_child(Name, MirrorNode, Q, SyncMode); - [SPid] -> - case rabbit_misc:is_process_alive(SPid) of - true -> {ok, already_mirrored}; - false -> start_child(Name, MirrorNode, Q, SyncMode) - end - end - end). + case rabbit_amqqueue:lookup(QName) of + {ok, #amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q} -> + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of + [] -> + start_child(Name, MirrorNode, Q, SyncMode); + [SPid] -> + case rabbit_misc:is_process_alive(SPid) of + true -> {ok, already_mirrored}; + false -> start_child(Name, MirrorNode, Q, SyncMode) + end + end; + {error, not_found} = E -> + E + end. start_child(Name, MirrorNode, Q, SyncMode) -> - case rabbit_misc:with_exit_handler( - rabbit_misc:const(down), - fun () -> - rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) - end) of - {ok, SPid} -> rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, SPid]), - rabbit_mirror_queue_slave:go(SPid, SyncMode); - _ -> ok - end. + rabbit_misc:with_exit_handler( + rabbit_misc:const(ok), + fun () -> + {ok, SPid} = rabbit_mirror_queue_slave_sup:start_child( + MirrorNode, [Q]), + rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, SPid]), + rabbit_mirror_queue_slave:go(SPid, SyncMode) + end). report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> ok; |
