diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-12-14 13:07:21 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-12-14 13:07:21 +0000 |
| commit | 8b1fafcca7fe80084f12b4740366c850e748de79 (patch) | |
| tree | e04105d9899544e87eef7351bfe48a2dbbb5a69a | |
| parent | 4b3c6121f420fbfadce4ca5202d53558bb0ff506 (diff) | |
| download | rabbitmq-server-git-8b1fafcca7fe80084f12b4740366c850e748de79.tar.gz | |
Cancel sync
| -rw-r--r-- | docs/rabbitmqctl.1.xml | 20 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_control_main.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 26 |
5 files changed, 63 insertions, 5 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index d18a0f9e49..56fce22713 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -477,6 +477,26 @@ </para> </listitem> </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>cancel_sync_queue</command> <arg choice="req">queue</arg></cmdsynopsis> + </term> + <listitem> + <variablelist> + <varlistentry> + <term>queue</term> + <listitem> + <para> + The name of the queue to cancel synchronisation for. + </para> + </listitem> + </varlistentry> + </variablelist> + <para> + Instructs a synchronising mirrored queue to stop + synchronising itself. + </para> + </listitem> + </varlistentry> </variablelist> </refsect2> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3169948bab..c49d5bee08 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,7 @@ -export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). --export([start_mirroring/1, stop_mirroring/1, sync/2]). +-export([start_mirroring/1, stop_mirroring/1, sync/2, cancel_sync/2]). %% internal -export([internal_declare/2, internal_delete/1, run_backing_queue/3, @@ -175,6 +175,8 @@ -spec(stop_mirroring/1 :: (pid()) -> 'ok'). -spec(sync/2 :: (binary(), rabbit_types:vhost()) -> 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')). +-spec(cancel_sync/2 :: (binary(), rabbit_types:vhost()) -> + 'ok' | rabbit_types:error('not_mirrored')). -endif. @@ -599,6 +601,13 @@ sync(QNameBin, VHostBin) -> E -> E end. +cancel_sync(QNameBin, VHostBin) -> + QName = rabbit_misc:r(VHostBin, queue, QNameBin), + case lookup(QName) of + {ok, #amqqueue{pid = QPid}} -> delegate_call(QPid, cancel_sync_mirrors); + E -> E + end. + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5d9da20491..26c0edbe26 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1188,6 +1188,10 @@ handle_call(sync_mirrors, _From, handle_call(sync_mirrors, _From, State) -> reply({error, not_mirrored}, State); +%% By definition if we get this message here we do not have to do anything. +handle_call(cancel_sync_mirrors, _From, State) -> + reply({error, not_syncing}, State); + handle_call(force_event_refresh, _From, State = #q{exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index b4272555f2..12096ff5f9 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -51,6 +51,7 @@ {forget_cluster_node, [?OFFLINE_DEF]}, cluster_status, {sync_queue, [?VHOST_DEF]}, + {cancel_sync_queue, [?VHOST_DEF]}, add_user, delete_user, @@ -287,6 +288,12 @@ action(sync_queue, Node, [Queue], Opts, Inform) -> rpc_call(Node, rabbit_amqqueue, sync, [list_to_binary(Queue), list_to_binary(VHost)]); +action(cancel_sync_queue, Node, [Queue], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + Inform("Stopping synchronising queue ~s in ~s", [Queue, VHost]), + rpc_call(Node, rabbit_amqqueue, cancel_sync, + [list_to_binary(Queue), list_to_binary(VHost)]); + action(wait, Node, [PidFile], _Opts, Inform) -> Inform("Waiting for ~p", [Node]), wait_for_application(Node, PidFile, rabbit_and_plugins, Inform); diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 457f658bf7..53f18c5b30 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -92,6 +92,14 @@ master_send({Syncer, Ref, Log, InfoPush, Parent}, I, Last, Msg, MsgProps) -> ok end, receive + {'$gen_call', From, + cancel_sync_mirrors} -> unlink(Syncer), + Syncer ! {cancel, Ref}, + receive {'EXIT', Syncer, _} -> ok + after 0 -> ok + end, + gen_server2:reply(From, ok), + {stop, cancelled}; {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps}, {cont, Acc}; {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}}; @@ -124,8 +132,16 @@ syncer(Ref, Log, MPid, SPids) -> SPidsMRefs1 -> MPid ! {ready, self()}, Log("~p to sync", [[rabbit_misc:pid_to_string(S) || {S, _} <- SPidsMRefs1]]), - SPidsMRefs2 = syncer_loop({Ref, MPid}, SPidsMRefs1), - foreach_slave(SPidsMRefs2, Ref, fun sync_send_complete/3) + case syncer_loop({Ref, MPid}, SPidsMRefs1) of + {done, SPidsMRefs2} -> + foreach_slave(SPidsMRefs2, Ref, + fun sync_send_complete/3); + cancelled -> + %% We don't tell the slaves we will die + %% - so when we do they interpret that + %% as a failure, which is what we want. + ok + end end. syncer_loop({Ref, MPid} = Args, SPidsMRefs) -> @@ -135,11 +151,13 @@ syncer_loop({Ref, MPid} = Args, SPidsMRefs) -> SPidsMRefs1 = wait_for_credit(SPidsMRefs, Ref), [begin credit_flow:send(SPid), - SPid ! {sync_msg, Ref, Msg, MsgProps} + SPid ! Msg end || {SPid, _} <- SPidsMRefs1], syncer_loop(Args, SPidsMRefs1); + {cancel, Ref} -> + cancelled; {done, Ref} -> - SPidsMRefs + {done, SPidsMRefs} end. wait_for_credit(SPidsMRefs, Ref) -> |
