summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/rabbitmqctl.1.xml43
-rw-r--r--include/rabbit_backing_queue_spec.hrl4
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_control.erl6
-rw-r--r--src/rabbit_mirror_queue_master.erl27
-rw-r--r--src/rabbit_mirror_queue_misc.erl83
-rw-r--r--src/rabbit_mirror_queue_slave.erl11
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl2
-rw-r--r--src/rabbit_tests.erl8
-rw-r--r--src/rabbit_variable_queue.erl6
11 files changed, 147 insertions, 59 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 628691589b..908ca97372 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1325,6 +1325,49 @@
</variablelist>
</refsect2>
+
+ <refsect2>
+ <title>Mirrored Queue Management</title>
+ <para>
+ Mirrored queues can have slaves dynamically added, and slaves
+ or the master dynamically dropped. Refer to the <ulink
+ url="http://www.rabbitmq.com/ha.html">High Availability
+ guide</ulink> for further details about mirrored queues in
+ general.
+ </para>
+
+ <variablelist>
+ <varlistentry>
+ <term><cmdsynopsis><command>add_queue_mirror</command> <arg choice="req"><replaceable>queue_name</replaceable></arg> <arg choice="req"><replaceable>node</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Attempts to add a mirror of the queue
+ <command>queue_name</command> on
+ <command>node</command>. This will only succeed if the
+ queue was declared a mirrored queue and if there is no
+ mirror of the queue already on the node. If it succeeds,
+ the new mirror will start off as an empty slave.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>drop_queue_mirror</command> <arg choice="req"><replaceable>queue_name</replaceable></arg> <arg choice="req"><replaceable>node</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Attempts to drop a mirror of the queue
+ <command>queue_name</command> on
+ <command>node</command>. This will only succeed if the
+ queue was declared a mirrored queue and if there is a
+ mirror of the queue already on the node. If the node
+ contains the master of the queue, a slave on some other
+ node will be promoted to become the new master. It is
+ not permitted to drop the only node of a mirrored-queue.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </refsect2>
</refsect1>
</refentry>
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 1c2b94e267..295d90394f 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -32,8 +32,8 @@
-spec(stop/0 :: () -> 'ok').
-spec(init/4 :: (rabbit_types:amqqueue(), attempt_recovery(),
async_callback(), sync_callback()) -> state()).
--spec(terminate/1 :: (state()) -> state()).
--spec(delete_and_terminate/1 :: (state()) -> state()).
+-spec(terminate/2 :: (any(), state()) -> state()).
+-spec(delete_and_terminate/2 :: (any(), state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
-spec(publish/4 :: (rabbit_types:basic_message(),
rabbit_types:message_properties(), pid(), state()) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ea31ec1349..b1c95338c2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -145,16 +145,16 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, StateN) end,
State, Deliveries).
-terminate(shutdown, State = #q{backing_queue = BQ}) ->
- terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
-terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->
- terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
-terminate(_Reason, State = #q{backing_queue = BQ}) ->
+terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
+ terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
+ terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+terminate(Reason, State = #q{backing_queue = BQ}) ->
%% FIXME: How do we cancel active subscriptions?
terminate_shutdown(fun (BQS) ->
rabbit_event:notify(
queue_deleted, [{pid, self()}]),
- BQS1 = BQ:delete_and_terminate(BQS),
+ BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete
%% doesn't return 'ok'.
rabbit_amqqueue:internal_delete(qname(State)),
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index addaabc584..217ad3eb5b 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -49,11 +49,11 @@ behaviour_info(callbacks) ->
{init, 4},
%% Called on queue shutdown when queue isn't being deleted.
- {terminate, 1},
+ {terminate, 2},
%% Called when the queue is terminating and needs to delete all
%% its content.
- {delete_and_terminate, 1},
+ {delete_and_terminate, 2},
%% Remove all messages in the queue, but not messages which have
%% been fetched and are pending acks.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 1140a2f0fd..b4b6255ebd 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -244,6 +244,12 @@ action(add_queue_mirror, Node, [Queue, MirrorNode], Opts, Inform) ->
rpc_call(Node, rabbit_mirror_queue_misc, add_slave,
[VHostArg, list_to_binary(Queue), list_to_atom(MirrorNode)]);
+action(drop_queue_mirror, Node, [Queue, MirrorNode], Opts, Inform) ->
+ Inform("Dropping mirror of queue ~p on node ~p~n", [Queue, MirrorNode]),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ rpc_call(Node, rabbit_mirror_queue_misc, drop_slave,
+ [VHostArg, list_to_binary(Queue), list_to_atom(MirrorNode)]);
+
action(list_exchanges, Node, Args, Opts, Inform) ->
Inform("Listing exchanges", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 99de1b18ac..9bd8565f6e 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -16,7 +16,7 @@
-module(rabbit_mirror_queue_master).
--export([init/4, terminate/1, delete_and_terminate/1,
+-export([init/4, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2,
@@ -106,17 +106,28 @@ promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
ack_msg_id = dict:new(),
known_senders = sets:from_list(KS) }.
-terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+terminate({shutdown, dropped} = Reason,
+ State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ %% Backing queue termination - this node has been explicitly
+ %% dropped. Normally, non-durable queues would be tidied up on
+ %% startup, but there's a possibility that we will be added back
+ %% in without this node being restarted. Thus we must do the full
+ %% blown delete_and_terminate now, but only locally: we do not
+ %% broadcast delete_and_terminate.
+ State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
+ set_delivered = 0 };
+terminate(Reason,
+ State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
%% shouldn't be deleted. Most likely safe shutdown of this
%% node. Thus just let some other slave take over.
- State #state { backing_queue_state = BQ:terminate(BQS) }.
+ State #state { backing_queue_state = BQ:terminate(Reason, BQS) }.
-delete_and_terminate(State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, delete_and_terminate),
- State #state { backing_queue_state = BQ:delete_and_terminate(BQS),
+delete_and_terminate(Reason, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
+ State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
set_delivered = 0 }.
purge(State = #state { gm = GM,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 5f180c5eae..046d338056 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -16,7 +16,8 @@
-module(rabbit_mirror_queue_misc).
--export([remove_from_queue/2, add_slave/2, add_slave/3, on_node_up/0]).
+-export([remove_from_queue/2, on_node_up/0,
+ drop_slave/2, drop_slave/3, add_slave/2, add_slave/3]).
-include("rabbit.hrl").
@@ -59,36 +60,6 @@ remove_from_queue(QueueName, DeadPids) ->
end
end).
-add_slave(VHostPath, QueueName, MirrorNode) ->
- add_slave(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
-
-add_slave(Queue, MirrorNode) ->
- rabbit_amqqueue:with(
- Queue,
- fun (#amqqueue { arguments = Args, name = Name,
- pid = QPid, mirror_pids = MPids } = Q) ->
- case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of
- undefined ->
- ok;
- _ ->
- case [MirrorNode || Pid <- [QPid | MPids],
- node(Pid) =:= MirrorNode] of
- [] ->
- Result =
- rabbit_mirror_queue_slave_sup:start_child(
- MirrorNode, [Q]),
- rabbit_log:info("Adding slave node for ~s: ~p~n",
- [rabbit_misc:rs(Name), Result]),
- case Result of
- {ok, _Pid} -> ok;
- _ -> Result
- end;
- [_] ->
- {error, queue_already_mirrored_on_node}
- end
- end
- end).
-
on_node_up() ->
Qs =
rabbit_misc:execute_mnesia_transaction(
@@ -113,3 +84,53 @@ on_node_up() ->
end),
[add_slave(Q, node()) || Q <- Qs],
ok.
+
+drop_slave(VHostPath, QueueName, MirrorNode) ->
+ drop_slave(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
+
+drop_slave(Queue, MirrorNode) ->
+ if_mirrored_queue(
+ Queue,
+ fun (#amqqueue { name = Name, pid = QPid, mirror_pids = MPids }) ->
+ case [Pid || Pid <- [QPid | MPids], node(Pid) =:= MirrorNode] of
+ [] ->
+ {error, {queue_not_mirrored_on_node, MirrorNode}};
+ [QPid | MPids] ->
+ {error, cannot_drop_only_mirror};
+ [Pid] ->
+ rabbit_log:info("Dropping slave node on ~p for ~s~n",
+ [MirrorNode, rabbit_misc:rs(Name)]),
+ exit(Pid, {shutdown, dropped}),
+ ok
+ end
+ end).
+
+add_slave(VHostPath, QueueName, MirrorNode) ->
+ add_slave(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
+
+add_slave(Queue, MirrorNode) ->
+ if_mirrored_queue(
+ Queue,
+ fun (#amqqueue { name = Name, pid = QPid, mirror_pids = MPids } = Q) ->
+ case [Pid || Pid <- [QPid | MPids], node(Pid) =:= MirrorNode] of
+ [] -> Result = rabbit_mirror_queue_slave_sup:start_child(
+ MirrorNode, [Q]),
+ rabbit_log:info(
+ "Adding slave node for ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, Result]),
+ case Result of
+ {ok, _Pid} -> ok;
+ _ -> Result
+ end;
+ [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}}
+ end
+ end).
+
+if_mirrored_queue(Queue, Fun) ->
+ rabbit_amqqueue:with(
+ Queue, fun (#amqqueue { arguments = Args } = Q) ->
+ case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of
+ undefined -> ok;
+ _ -> Fun(Q)
+ end
+ end).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index c7ff44807c..666687a527 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -226,6 +226,9 @@ handle_info({'DOWN', _MonitorRef, process, MPid, _Reason},
handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
noreply(local_sender_death(ChPid, State));
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State};
+
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
@@ -238,6 +241,10 @@ terminate(_Reason, #state { backing_queue_state = undefined }) ->
%% We've received a delete_and_terminate from gm, thus nothing to
%% do here.
ok;
+terminate({shutdown, dropped} = R, #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ %% See rabbit_mirror_queue_master:terminate/2
+ BQ:delete_and_terminate(R, BQS);
terminate(Reason, #state { q = Q,
gm = GM,
backing_queue = BQ,
@@ -839,10 +846,10 @@ process_instruction({sender_death, ChPid},
msg_id_status = MS1,
known_senders = dict:erase(ChPid, KS) }
end};
-process_instruction(delete_and_terminate,
+process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
- BQ:delete_and_terminate(BQS),
+ BQ:delete_and_terminate(Reason, BQS),
{stop, State #state { backing_queue_state = undefined }}.
msg_ids_to_acktags(MsgIds, MA) ->
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
index 25ee1fd0a7..2ce5941ea9 100644
--- a/src/rabbit_mirror_queue_slave_sup.erl
+++ b/src/rabbit_mirror_queue_slave_sup.erl
@@ -40,7 +40,7 @@
start() ->
{ok, _} =
- supervisor:start_child(
+ supervisor2:start_child(
rabbit_sup,
{rabbit_mirror_queue_slave_sup,
{rabbit_mirror_queue_slave_sup, start_link, []},
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1a37cdfffa..3f4aa54e7f 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2116,7 +2116,7 @@ with_fresh_variable_queue(Fun) ->
{delta, {delta, undefined, 0, undefined}},
{q3, 0}, {q4, 0},
{len, 0}]),
- _ = rabbit_variable_queue:delete_and_terminate(Fun(VQ)),
+ _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)),
passed.
test_variable_queue() ->
@@ -2284,7 +2284,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count + Count, VQ3),
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
- _VQ6 = rabbit_variable_queue:terminate(VQ5),
+ _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
@@ -2301,7 +2301,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
{_Guids, VQ4} =
rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
VQ5 = rabbit_variable_queue:timeout(VQ4),
- _VQ6 = rabbit_variable_queue:terminate(VQ5),
+ _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2336,7 +2336,7 @@ test_queue_recover() ->
VQ1 = variable_queue_init(Q, true),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
- _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
+ _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
rabbit_amqqueue:internal_delete(QName)
end),
passed.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8ac3ad43f7..a167cca0c5 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,7 +16,7 @@
-module(rabbit_variable_queue).
--export([init/4, terminate/1, delete_and_terminate/1,
+-export([init/4, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
@@ -452,7 +452,7 @@ init(#amqqueue { name = QueueName, durable = true }, true,
init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback,
PersistentClient, TransientClient).
-terminate(State) ->
+terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
@@ -473,7 +473,7 @@ terminate(State) ->
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
-delete_and_terminate(State) ->
+delete_and_terminate(_Reason, State) ->
%% TODO: there is no need to interact with qi at all - which we do
%% as part of 'purge' and 'remove_pending_ack', other than
%% deleting it.