summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl11
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_control_main.erl7
-rw-r--r--src/rabbit_mirror_queue_sync.erl26
4 files changed, 43 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 3169948bab..c49d5bee08 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,7 @@
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
--export([start_mirroring/1, stop_mirroring/1, sync/2]).
+-export([start_mirroring/1, stop_mirroring/1, sync/2, cancel_sync/2]).
%% internal
-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
@@ -175,6 +175,8 @@
-spec(stop_mirroring/1 :: (pid()) -> 'ok').
-spec(sync/2 :: (binary(), rabbit_types:vhost()) ->
'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')).
+-spec(cancel_sync/2 :: (binary(), rabbit_types:vhost()) ->
+ 'ok' | rabbit_types:error('not_mirrored')).
-endif.
@@ -599,6 +601,13 @@ sync(QNameBin, VHostBin) ->
E -> E
end.
+cancel_sync(QNameBin, VHostBin) ->
+ QName = rabbit_misc:r(VHostBin, queue, QNameBin),
+ case lookup(QName) of
+ {ok, #amqqueue{pid = QPid}} -> delegate_call(QPid, cancel_sync_mirrors);
+ E -> E
+ end.
+
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5d9da20491..26c0edbe26 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1188,6 +1188,10 @@ handle_call(sync_mirrors, _From,
handle_call(sync_mirrors, _From, State) ->
reply({error, not_mirrored}, State);
+%% By definition if we get this message here we do not have to do anything.
+handle_call(cancel_sync_mirrors, _From, State) ->
+ reply({error, not_syncing}, State);
+
handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index b4272555f2..12096ff5f9 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -51,6 +51,7 @@
{forget_cluster_node, [?OFFLINE_DEF]},
cluster_status,
{sync_queue, [?VHOST_DEF]},
+ {cancel_sync_queue, [?VHOST_DEF]},
add_user,
delete_user,
@@ -287,6 +288,12 @@ action(sync_queue, Node, [Queue], Opts, Inform) ->
rpc_call(Node, rabbit_amqqueue, sync,
[list_to_binary(Queue), list_to_binary(VHost)]);
+action(cancel_sync_queue, Node, [Queue], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
+ Inform("Stopping synchronising queue ~s in ~s", [Queue, VHost]),
+ rpc_call(Node, rabbit_amqqueue, cancel_sync,
+ [list_to_binary(Queue), list_to_binary(VHost)]);
+
action(wait, Node, [PidFile], _Opts, Inform) ->
Inform("Waiting for ~p", [Node]),
wait_for_application(Node, PidFile, rabbit_and_plugins, Inform);
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 457f658bf7..53f18c5b30 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -92,6 +92,14 @@ master_send({Syncer, Ref, Log, InfoPush, Parent}, I, Last, Msg, MsgProps) ->
ok
end,
receive
+ {'$gen_call', From,
+ cancel_sync_mirrors} -> unlink(Syncer),
+ Syncer ! {cancel, Ref},
+ receive {'EXIT', Syncer, _} -> ok
+ after 0 -> ok
+ end,
+ gen_server2:reply(From, ok),
+ {stop, cancelled};
{next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps},
{cont, Acc};
{'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}};
@@ -124,8 +132,16 @@ syncer(Ref, Log, MPid, SPids) ->
SPidsMRefs1 -> MPid ! {ready, self()},
Log("~p to sync", [[rabbit_misc:pid_to_string(S) ||
{S, _} <- SPidsMRefs1]]),
- SPidsMRefs2 = syncer_loop({Ref, MPid}, SPidsMRefs1),
- foreach_slave(SPidsMRefs2, Ref, fun sync_send_complete/3)
+ case syncer_loop({Ref, MPid}, SPidsMRefs1) of
+ {done, SPidsMRefs2} ->
+ foreach_slave(SPidsMRefs2, Ref,
+ fun sync_send_complete/3);
+ cancelled ->
+ %% We don't tell the slaves we will die
+ %% - so when we do they interpret that
+ %% as a failure, which is what we want.
+ ok
+ end
end.
syncer_loop({Ref, MPid} = Args, SPidsMRefs) ->
@@ -135,11 +151,13 @@ syncer_loop({Ref, MPid} = Args, SPidsMRefs) ->
SPidsMRefs1 = wait_for_credit(SPidsMRefs, Ref),
[begin
credit_flow:send(SPid),
- SPid ! {sync_msg, Ref, Msg, MsgProps}
+ SPid ! Msg
end || {SPid, _} <- SPidsMRefs1],
syncer_loop(Args, SPidsMRefs1);
+ {cancel, Ref} ->
+ cancelled;
{done, Ref} ->
- SPidsMRefs
+ {done, SPidsMRefs}
end.
wait_for_credit(SPidsMRefs, Ref) ->