diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 83 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave_sup.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 6 |
9 files changed, 102 insertions, 57 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ea31ec1349..b1c95338c2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -145,16 +145,16 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, StateN) end, State, Deliveries). -terminate(shutdown, State = #q{backing_queue = BQ}) -> - terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); -terminate({shutdown, _}, State = #q{backing_queue = BQ}) -> - terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); -terminate(_Reason, State = #q{backing_queue = BQ}) -> +terminate(shutdown = R, State = #q{backing_queue = BQ}) -> + terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); +terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> + terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); +terminate(Reason, State = #q{backing_queue = BQ}) -> %% FIXME: How do we cancel active subscriptions? terminate_shutdown(fun (BQS) -> rabbit_event:notify( queue_deleted, [{pid, self()}]), - BQS1 = BQ:delete_and_terminate(BQS), + BQS1 = BQ:delete_and_terminate(Reason, BQS), %% don't care if the internal delete %% doesn't return 'ok'. rabbit_amqqueue:internal_delete(qname(State)), diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index addaabc584..217ad3eb5b 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -49,11 +49,11 @@ behaviour_info(callbacks) -> {init, 4}, %% Called on queue shutdown when queue isn't being deleted. - {terminate, 1}, + {terminate, 2}, %% Called when the queue is terminating and needs to delete all %% its content. - {delete_and_terminate, 1}, + {delete_and_terminate, 2}, %% Remove all messages in the queue, but not messages which have %% been fetched and are pending acks. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 1140a2f0fd..b4b6255ebd 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -244,6 +244,12 @@ action(add_queue_mirror, Node, [Queue, MirrorNode], Opts, Inform) -> rpc_call(Node, rabbit_mirror_queue_misc, add_slave, [VHostArg, list_to_binary(Queue), list_to_atom(MirrorNode)]); +action(drop_queue_mirror, Node, [Queue, MirrorNode], Opts, Inform) -> + Inform("Dropping mirror of queue ~p on node ~p~n", [Queue, MirrorNode]), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + rpc_call(Node, rabbit_mirror_queue_misc, drop_slave, + [VHostArg, list_to_binary(Queue), list_to_atom(MirrorNode)]); + action(list_exchanges, Node, Args, Opts, Inform) -> Inform("Listing exchanges", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 99de1b18ac..9bd8565f6e 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -16,7 +16,7 @@ -module(rabbit_mirror_queue_master). --export([init/4, terminate/1, delete_and_terminate/1, +-export([init/4, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2, @@ -106,17 +106,28 @@ promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> ack_msg_id = dict:new(), known_senders = sets:from_list(KS) }. -terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> +terminate({shutdown, dropped} = Reason, + State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + %% Backing queue termination - this node has been explicitly + %% dropped. Normally, non-durable queues would be tidied up on + %% startup, but there's a possibility that we will be added back + %% in without this node being restarted. Thus we must do the full + %% blown delete_and_terminate now, but only locally: we do not + %% broadcast delete_and_terminate. + State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), + set_delivered = 0 }; +terminate(Reason, + State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> %% Backing queue termination. The queue is going down but %% shouldn't be deleted. Most likely safe shutdown of this %% node. Thus just let some other slave take over. - State #state { backing_queue_state = BQ:terminate(BQS) }. + State #state { backing_queue_state = BQ:terminate(Reason, BQS) }. -delete_and_terminate(State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS }) -> - ok = gm:broadcast(GM, delete_and_terminate), - State #state { backing_queue_state = BQ:delete_and_terminate(BQS), +delete_and_terminate(Reason, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {delete_and_terminate, Reason}), + State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), set_delivered = 0 }. purge(State = #state { gm = GM, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 5f180c5eae..046d338056 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -16,7 +16,8 @@ -module(rabbit_mirror_queue_misc). --export([remove_from_queue/2, add_slave/2, add_slave/3, on_node_up/0]). +-export([remove_from_queue/2, on_node_up/0, + drop_slave/2, drop_slave/3, add_slave/2, add_slave/3]). -include("rabbit.hrl"). @@ -59,36 +60,6 @@ remove_from_queue(QueueName, DeadPids) -> end end). -add_slave(VHostPath, QueueName, MirrorNode) -> - add_slave(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). - -add_slave(Queue, MirrorNode) -> - rabbit_amqqueue:with( - Queue, - fun (#amqqueue { arguments = Args, name = Name, - pid = QPid, mirror_pids = MPids } = Q) -> - case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of - undefined -> - ok; - _ -> - case [MirrorNode || Pid <- [QPid | MPids], - node(Pid) =:= MirrorNode] of - [] -> - Result = - rabbit_mirror_queue_slave_sup:start_child( - MirrorNode, [Q]), - rabbit_log:info("Adding slave node for ~s: ~p~n", - [rabbit_misc:rs(Name), Result]), - case Result of - {ok, _Pid} -> ok; - _ -> Result - end; - [_] -> - {error, queue_already_mirrored_on_node} - end - end - end). - on_node_up() -> Qs = rabbit_misc:execute_mnesia_transaction( @@ -113,3 +84,53 @@ on_node_up() -> end), [add_slave(Q, node()) || Q <- Qs], ok. + +drop_slave(VHostPath, QueueName, MirrorNode) -> + drop_slave(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). + +drop_slave(Queue, MirrorNode) -> + if_mirrored_queue( + Queue, + fun (#amqqueue { name = Name, pid = QPid, mirror_pids = MPids }) -> + case [Pid || Pid <- [QPid | MPids], node(Pid) =:= MirrorNode] of + [] -> + {error, {queue_not_mirrored_on_node, MirrorNode}}; + [QPid | MPids] -> + {error, cannot_drop_only_mirror}; + [Pid] -> + rabbit_log:info("Dropping slave node on ~p for ~s~n", + [MirrorNode, rabbit_misc:rs(Name)]), + exit(Pid, {shutdown, dropped}), + ok + end + end). + +add_slave(VHostPath, QueueName, MirrorNode) -> + add_slave(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). + +add_slave(Queue, MirrorNode) -> + if_mirrored_queue( + Queue, + fun (#amqqueue { name = Name, pid = QPid, mirror_pids = MPids } = Q) -> + case [Pid || Pid <- [QPid | MPids], node(Pid) =:= MirrorNode] of + [] -> Result = rabbit_mirror_queue_slave_sup:start_child( + MirrorNode, [Q]), + rabbit_log:info( + "Adding slave node for ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, Result]), + case Result of + {ok, _Pid} -> ok; + _ -> Result + end; + [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}} + end + end). + +if_mirrored_queue(Queue, Fun) -> + rabbit_amqqueue:with( + Queue, fun (#amqqueue { arguments = Args } = Q) -> + case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of + undefined -> ok; + _ -> Fun(Q) + end + end). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index c7ff44807c..666687a527 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -226,6 +226,9 @@ handle_info({'DOWN', _MonitorRef, process, MPid, _Reason}, handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) -> noreply(local_sender_death(ChPid, State)); +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}; + handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. @@ -238,6 +241,10 @@ terminate(_Reason, #state { backing_queue_state = undefined }) -> %% We've received a delete_and_terminate from gm, thus nothing to %% do here. ok; +terminate({shutdown, dropped} = R, #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + %% See rabbit_mirror_queue_master:terminate/2 + BQ:delete_and_terminate(R, BQS); terminate(Reason, #state { q = Q, gm = GM, backing_queue = BQ, @@ -839,10 +846,10 @@ process_instruction({sender_death, ChPid}, msg_id_status = MS1, known_senders = dict:erase(ChPid, KS) } end}; -process_instruction(delete_and_terminate, +process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - BQ:delete_and_terminate(BQS), + BQ:delete_and_terminate(Reason, BQS), {stop, State #state { backing_queue_state = undefined }}. msg_ids_to_acktags(MsgIds, MA) -> diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl index 25ee1fd0a7..2ce5941ea9 100644 --- a/src/rabbit_mirror_queue_slave_sup.erl +++ b/src/rabbit_mirror_queue_slave_sup.erl @@ -40,7 +40,7 @@ start() -> {ok, _} = - supervisor:start_child( + supervisor2:start_child( rabbit_sup, {rabbit_mirror_queue_slave_sup, {rabbit_mirror_queue_slave_sup, start_link, []}, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 1a37cdfffa..3f4aa54e7f 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2116,7 +2116,7 @@ with_fresh_variable_queue(Fun) -> {delta, {delta, undefined, 0, undefined}}, {q3, 0}, {q4, 0}, {len, 0}]), - _ = rabbit_variable_queue:delete_and_terminate(Fun(VQ)), + _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)), passed. test_variable_queue() -> @@ -2284,7 +2284,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> Count + Count, VQ3), {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), - _VQ6 = rabbit_variable_queue:terminate(VQ5), + _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5), VQ7 = variable_queue_init(test_amqqueue(true), true), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), @@ -2301,7 +2301,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> {_Guids, VQ4} = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:timeout(VQ4), - _VQ6 = rabbit_variable_queue:terminate(VQ5), + _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5), VQ7 = variable_queue_init(test_amqqueue(true), true), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -2336,7 +2336,7 @@ test_queue_recover() -> VQ1 = variable_queue_init(Q, true), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), + _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2), rabbit_amqqueue:internal_delete(QName) end), passed. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8ac3ad43f7..a167cca0c5 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,7 +16,7 @@ -module(rabbit_variable_queue). --export([init/4, terminate/1, delete_and_terminate/1, +-export([init/4, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2, @@ -452,7 +452,7 @@ init(#amqqueue { name = QueueName, durable = true }, true, init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback, PersistentClient, TransientClient). -terminate(State) -> +terminate(_Reason, State) -> State1 = #vqstate { persistent_count = PCount, index_state = IndexState, msg_store_clients = {MSCStateP, MSCStateT} } = @@ -473,7 +473,7 @@ terminate(State) -> %% the only difference between purge and delete is that delete also %% needs to delete everything that's been delivered and not ack'd. -delete_and_terminate(State) -> +delete_and_terminate(_Reason, State) -> %% TODO: there is no need to interact with qi at all - which we do %% as part of 'purge' and 'remove_pending_ack', other than %% deleting it. |
