diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2018-11-22 11:32:29 +0000 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2018-11-22 11:32:29 +0000 |
| commit | 612ed35994feb626e1fbc28c15bc09158dd5d19f (patch) | |
| tree | e9f6f72ffe064f9d0bafc23903a8aa75fe8d5b81 | |
| parent | 644b0eca35e4e8d8dbecb4232b0b37870dcf6ed1 (diff) | |
| parent | af68fc873ef7b34eba922cedac0e62b99adf1dd0 (diff) | |
| download | rabbitmq-server-git-612ed35994feb626e1fbc28c15bc09158dd5d19f.tar.gz | |
Merge branch 'master' into qq-dlx-policy
| -rw-r--r-- | src/rabbit_amqqueue.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_dead_letter.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 98 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_vhost_dead_letter.erl | 132 | ||||
| -rw-r--r-- | src/rabbit_vhost_process.erl | 1 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 29 |
10 files changed, 121 insertions, 202 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1be5637437..e19914d5ab 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -30,6 +30,7 @@ emit_info_all/5, list_local/1, info_local/1, emit_info_local/4, emit_info_down/4]). -export([list_down/1, count/1, list_names/0, list_local_names/0]). +-export([list_by_type/1]). -export([notify_policy_changed/1]). -export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]). -export([basic_get/6, basic_consume/12, basic_cancel/6, notify_decorators/1]). @@ -124,6 +125,7 @@ -spec list(rabbit_types:vhost()) -> [rabbit_types:amqqueue()]. -spec list_names() -> [rabbit_amqqueue:name()]. -spec list_down(rabbit_types:vhost()) -> [rabbit_types:amqqueue()]. +-spec list_by_type(atom()) -> [rabbit_types:amqqueue()]. -spec info_keys() -> rabbit_types:info_keys(). -spec info(rabbit_types:amqqueue()) -> rabbit_types:infos(). -spec info(rabbit_types:amqqueue(), rabbit_types:info_keys()) -> @@ -741,6 +743,15 @@ list_local_names() -> [ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(), State =/= crashed, is_local_to_node(QPid, node())]. +list_by_type(Type) -> + {atomic, Qs} = + mnesia:sync_transaction( + fun () -> + mnesia:match_object(rabbit_durable_queue, + #amqqueue{_ = '_', type = Type}, read) + end), + Qs. + list_local_followers() -> [ Q#amqqueue.name || #amqqueue{state = State, type = quorum, pid = {_, Leader}, @@ -1468,7 +1479,8 @@ deliver(Qs, Delivery = #delivery{flow = Flow, lists:foldl( fun({{Name, _} = Pid, QName}, QStates) -> QState0 = get_quorum_state(Pid, QName, QStates), - case rabbit_quorum_queue:deliver(Confirm, Delivery, QState0) of + case rabbit_quorum_queue:deliver(Confirm, Delivery, + QState0) of {ok, QState} -> maps:put(Name, QState, QStates); {slow, QState} -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d9f1711401..c2dab3da6f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -954,13 +954,11 @@ dead_letter_maxlen_msg(X, State = #q{backing_queue = BQ}) -> dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK, backing_queue_state = BQS, - backing_queue = BQ, - q = #amqqueue{ name = Resource } }) -> - #resource{virtual_host = VHost} = Resource, + backing_queue = BQ}) -> QName = qname(State), {Res, Acks1, BQS1} = Fun(fun (Msg, AckTag, Acks) -> - rabbit_vhost_dead_letter:publish(VHost, X, RK, QName, [{Reason, Msg}]), + rabbit_dead_letter:publish(Msg, Reason, X, RK, QName), [AckTag | Acks] end, [], BQS), {_Guids, BQS2} = BQ:ack(Acks1, BQS1), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a8f3f88c26..1b74b655f5 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1811,7 +1811,7 @@ internal_reject(Requeue, Acked, Limiter, ok = notify_limiter(Limiter, Acked), State#ch{queue_states = QueueStates}. -record_sent(ConsumerTag, AckRequired, +record_sent(Type, Tag, AckRequired, Msg = {QName, QPid, MsgId, Redelivered, _Message}, State = #ch{unacked_message_q = UAMQ, next_tag = DeliveryTag, @@ -1819,12 +1819,11 @@ record_sent(ConsumerTag, AckRequired, user = #user{username = Username}, conn_name = ConnName, channel = ChannelNum}) -> - ?INCR_STATS(queue_stats, QName, 1, case {ConsumerTag, AckRequired} of - {_, true} when is_integer(ConsumerTag) -> get; - {_, false} when is_integer(ConsumerTag) -> get_no_ack; - %% Authentic consumer tag, this is a delivery - {_ , true} -> deliver; - {_ , false} -> deliver_no_ack + ?INCR_STATS(queue_stats, QName, 1, case {Type, AckRequired} of + {get, true} -> get; + {get, false} -> get_no_ack; + {deliver, true} -> deliver; + {deliver, false} -> deliver_no_ack end, State), case Redelivered of true -> ?INCR_STATS(queue_stats, QName, 1, redeliver, State); @@ -1832,7 +1831,7 @@ record_sent(ConsumerTag, AckRequired, end, rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState), UAMQ1 = case AckRequired of - true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}}, + true -> queue:in({DeliveryTag, Tag, {QPid, MsgId}}, UAMQ); false -> UAMQ end, @@ -2457,7 +2456,7 @@ handle_deliver(ConsumerTag, AckRequired, ok = rabbit_writer:send_command(WriterPid, Deliver, Content) end, rabbit_basic:maybe_gc_large_msg(Content), - record_sent(ConsumerTag, AckRequired, Msg, State). + record_sent(deliver, ConsumerTag, AckRequired, Msg, State). handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount, Msg = {QName, QPid, _MsgId, Redelivered, @@ -2473,7 +2472,7 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount, message_count = MessageCount}, Content), State1 = track_delivering_queue(NoAck, QPid, QName, State), - {noreply, record_sent(DeliveryTag, not(NoAck), Msg, State1)}. + {noreply, record_sent(get, DeliveryTag, not(NoAck), Msg, State1)}. init_queue_cleanup_timer(State) -> {ok, Interval} = application:get_env(rabbit, channel_queue_cleanup_interval), diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index 06691a29ad..bf77f2d832 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -16,7 +16,7 @@ -module(rabbit_dead_letter). --export([publish/6]). +-export([publish/5]). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). @@ -26,20 +26,17 @@ -type reason() :: 'expired' | 'rejected' | 'maxlen'. -spec publish(rabbit_types:message(), reason(), rabbit_types:exchange(), - 'undefined' | binary(), rabbit_amqqueue:name(), - #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'. + 'undefined' | binary(), rabbit_amqqueue:name()) -> 'ok'. %%---------------------------------------------------------------------------- -publish(Msg, Reason, X, RK, QName, QueueStates0) -> +publish(Msg, Reason, X, RK, QName) -> DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName), Delivery = rabbit_basic:delivery(false, false, DLMsg, undefined), {Queues, Cycles} = detect_cycles(Reason, DLMsg, rabbit_exchange:route(X, Delivery)), lists:foreach(fun log_cycle_once/1, Cycles), - {_, _, QueueStates} = rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(Queues), - Delivery, QueueStates0), - QueueStates. + rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(Queues), Delivery). make_msg(Msg = #basic_message{content = Content, exchange_name = Exchange, diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 21f77fc872..2bd652d302 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -392,8 +392,7 @@ apply(_, {down, ConsumerPid, noconnection}, apply(_, {down, Pid, _Info}, Effects0, #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> - % remove any enqueuer for the same pid - % TODO: if there are any pending enqueuers these should be enqueued + % Remove any enqueuer for the same pid and enqueue any pending messages % This should be ok as we won't see any more enqueues from this pid State1 = case maps:take(Pid, Enqs0) of {#enqueuer{pending = Pend}, Enqs} -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index cdb1b48451..385df48c86 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -23,7 +23,7 @@ -export([credit/4]). -export([purge/1]). -export([stateless_deliver/2, deliver/3]). --export([dead_letter_publish/5]). +-export([dead_letter_publish/4]). -export([queue_name/1]). -export([cluster_state/1, status/2]). -export([cancel_consumer_handler/3, cancel_consumer/3]). @@ -35,6 +35,7 @@ -export([delete_member/3]). -export([requeue/3]). -export([policy_changed/2]). +-export([cleanup_data_dir/0]). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -104,8 +105,8 @@ init_state({Name, _}, QName) -> fun() -> credit_flow:block(Name), ok end, fun() -> credit_flow:unblock(Name), ok end). -handle_event({ra_event, From, Evt}, FState) -> - rabbit_fifo_client:handle_ra_event(From, Evt, FState). +handle_event({ra_event, From, Evt}, QState) -> + rabbit_fifo_client:handle_ra_event(From, Evt, QState). declare(#amqqueue{name = QName, durable = Durable, @@ -162,9 +163,10 @@ cancel_consumer_handler(QName, {ConsumerTag, ChPid}, _Name) -> % QName = queue_name(Name), case Node == node() of true -> cancel_consumer(QName, ChPid, ConsumerTag); - false -> rabbit_misc:rpc_call(Node, rabbit_quorum_queue, - cancel_consumer, - [QName, ChPid, ConsumerTag]) + false -> + rpc:cast(Node, rabbit_quorum_queue, + cancel_consumer, + [QName, ChPid, ConsumerTag]) end. cancel_consumer(QName, ChPid, ConsumerTag) -> @@ -243,7 +245,7 @@ recover(Queues) -> case ra:start_server(Name, {Name, node()}, Machine, RaNodes) of ok -> ok; Err -> - rabbit_log:warning("recover: Quorum queue ~w could not" + rabbit_log:warning("recover: quorum queue ~w could not" " be started ~w", [Name, Err]), ok end; @@ -254,7 +256,7 @@ recover(Queues) -> ok; Err -> %% catch all clause to avoid causing the vhost not to start - rabbit_log:warning("recover: Quorum queue ~w could not be " + rabbit_log:warning("recover: quorum queue ~w could not be " "restarted ~w", [Name, Err]), ok end, @@ -284,7 +286,7 @@ delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = Q end, rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName]), {ok, Msgs}; - {error, {no_more_nodes_to_try, Errs}} = Err -> + {error, {no_more_servers_to_try, Errs}} -> case lists:all(fun({{error, noproc}, _}) -> true; (_) -> false end, Errs) of @@ -294,7 +296,10 @@ delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = Q rabbit_core_metrics:queue_deleted(QName), {ok, Msgs}; false -> - Err + rabbit_misc:protocol_error( + internal_error, + "Cannot delete quorum queue '~s', not enough nodes online to reach a quorum: ~255p", + [rabbit_misc:rs(QName), Errs]) end end. @@ -305,19 +310,19 @@ delete_immediately({Name, _} = QPid) -> rabbit_core_metrics:queue_deleted(QName), ok. -ack(CTag, MsgIds, FState) -> - rabbit_fifo_client:settle(quorum_ctag(CTag), MsgIds, FState). +ack(CTag, MsgIds, QState) -> + rabbit_fifo_client:settle(quorum_ctag(CTag), MsgIds, QState). -reject(true, CTag, MsgIds, FState) -> - rabbit_fifo_client:return(quorum_ctag(CTag), MsgIds, FState); -reject(false, CTag, MsgIds, FState) -> - rabbit_fifo_client:discard(quorum_ctag(CTag), MsgIds, FState). +reject(true, CTag, MsgIds, QState) -> + rabbit_fifo_client:return(quorum_ctag(CTag), MsgIds, QState); +reject(false, CTag, MsgIds, QState) -> + rabbit_fifo_client:discard(quorum_ctag(CTag), MsgIds, QState). credit(CTag, Credit, Drain, QState) -> rabbit_fifo_client:credit(quorum_ctag(CTag), Credit, Drain, QState). basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck, - CTag0, FState0) -> + CTag0, QState0) -> CTag = quorum_ctag(CTag0), Settlement = case NoAck of true -> @@ -325,12 +330,12 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck, false -> unsettled end, - case rabbit_fifo_client:dequeue(CTag, Settlement, FState0) of - {ok, empty, FState} -> - {ok, empty, FState}; - {ok, {MsgId, {MsgHeader, Msg}}, FState} -> + case rabbit_fifo_client:dequeue(CTag, Settlement, QState0) of + {ok, empty, QState} -> + {ok, empty, QState}; + {ok, {MsgId, {MsgHeader, Msg}}, QState} -> IsDelivered = maps:is_key(delivery_count, MsgHeader), - {ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, FState}; + {ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, QState}; {timeout, _} -> {error, timeout} end. @@ -351,19 +356,19 @@ basic_consume(#amqqueue{name = QName, type = quorum}, NoAck, ChPid, ConsumerPrefetchCount, Args), {ok, QState}. -basic_cancel(ConsumerTag, ChPid, OkMsg, FState0) -> +basic_cancel(ConsumerTag, ChPid, OkMsg, QState0) -> maybe_send_reply(ChPid, OkMsg), - rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), FState0). + rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), QState0). stateless_deliver(ServerId, Delivery) -> ok = rabbit_fifo_client:untracked_enqueue([ServerId], Delivery#delivery.message). -deliver(false, Delivery, FState0) -> - rabbit_fifo_client:enqueue(Delivery#delivery.message, FState0); -deliver(true, Delivery, FState0) -> +deliver(false, Delivery, QState0) -> + rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0); +deliver(true, Delivery, QState0) -> rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no, - Delivery#delivery.message, FState0). + Delivery#delivery.message, QState0). info(Q) -> info(Q, [name, durable, auto_delete, arguments, pid, state, messages, @@ -386,8 +391,28 @@ stat(_Q) -> purge(Node) -> rabbit_fifo_client:purge(Node). -requeue(ConsumerTag, MsgIds, FState) -> - rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, FState). +requeue(ConsumerTag, MsgIds, QState) -> + rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, QState). + +cleanup_data_dir() -> + Names = [Name || #amqqueue{pid = {Name, _}, quorum_nodes = Nodes} + <- rabbit_amqqueue:list_by_type(quorum), + lists:member(node(), Nodes)], + Registered = ra_directory:list_registered(), + [maybe_delete_data_dir(UId) || {Name, UId} <- Registered, + not lists:member(Name, Names)], + ok. + +maybe_delete_data_dir(UId) -> + Dir = ra_env:server_data_dir(UId), + {ok, Config} = ra_log:read_config(Dir), + case maps:get(machine, Config) of + {module, rabbit_fifo, _} -> + ra_lib:recursive_delete(Dir), + ra_directory:unregister_name(UId); + _ -> + ok + end. policy_changed(QName, Node) -> {ok, Q} = rabbit_amqqueue:lookup(QName), @@ -505,11 +530,10 @@ delete_member(#amqqueue{pid = {RaName, _}, name = QName}, Node) -> end. %%---------------------------------------------------------------------------- -dlx_mfa(#amqqueue{name = Resource} = Q) -> - #resource{virtual_host = VHost} = Resource, +dlx_mfa(Q) -> DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>, fun res_arg/2, Q), Q), DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>, fun res_arg/2, Q), - {?MODULE, dead_letter_publish, [VHost, DLX, DLXRKey, Q#amqqueue.name]}. + {?MODULE, dead_letter_publish, [DLX, DLXRKey, Q#amqqueue.name]}. init_dlx(undefined, _Q) -> undefined; @@ -527,10 +551,12 @@ args_policy_lookup(Name, Resolve, Q = #amqqueue{arguments = Args}) -> {PolVal, {_Type, ArgVal}} -> Resolve(PolVal, ArgVal) end. -dead_letter_publish(_, undefined, _, _, _) -> +dead_letter_publish(undefined, _, _, _) -> ok; -dead_letter_publish(VHost, X, RK, QName, ReasonMsgs) -> - rabbit_vhost_dead_letter:publish(VHost, X, RK, QName, ReasonMsgs). +dead_letter_publish(X, RK, QName, ReasonMsgs) -> + {ok, Exchange} = rabbit_exchange:lookup(X), + [rabbit_dead_letter:publish(Msg, Reason, Exchange, RK, QName) + || {Reason, Msg} <- ReasonMsgs]. %% TODO escape hack qname_to_rname(#resource{virtual_host = <<"/">>, name = Name}) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f91f8a648e..83738b7e52 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -488,12 +488,10 @@ start(VHost, DurableQueues) -> Ref = proplists:get_value(persistent_ref, Terms), Ref =/= undefined end], - start_dead_letter_process(VHost), start_msg_store(VHost, ClientRefs, StartFunState), {ok, AllTerms}. stop(VHost) -> - ok = rabbit_vhost_dead_letter:stop(VHost), ok = stop_msg_store(VHost), ok = rabbit_queue_index:stop(VHost). @@ -517,14 +515,6 @@ do_start_msg_store(VHost, Type, Refs, StartFunState) -> exit({error, Error}) end. -start_dead_letter_process(VHost) -> - case rabbit_vhost_dead_letter:start(VHost) of - {ok, _} -> - rabbit_log:info("Started dead letter process for vhost '~s'~n", [VHost]); - Err -> - exit(Err) - end. - abbreviated_type(?TRANSIENT_MSG_STORE) -> transient; abbreviated_type(?PERSISTENT_MSG_STORE) -> persistent. diff --git a/src/rabbit_vhost_dead_letter.erl b/src/rabbit_vhost_dead_letter.erl deleted file mode 100644 index b4dede5e19..0000000000 --- a/src/rabbit_vhost_dead_letter.erl +++ /dev/null @@ -1,132 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. -%% - --module(rabbit_vhost_dead_letter). - --include("rabbit.hrl"). - --behaviour(gen_server). - --export([start/1, start_link/0]). --export([stop/1]). --export([publish/5]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - --record(state, {queue_states, - queue_cleanup_timer}). - -start(VHost) -> - case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of - {ok, VHostSup} -> - supervisor2:start_child(VHostSup, - {rabbit_vhost_dead_letter, - {rabbit_vhost_dead_letter, start_link, []}, - transient, ?WORKER_WAIT, worker, [rabbit_vhost_dead_letter]}); - {error, {no_such_vhost, VHost}} = E -> - rabbit_log:error("Failed to start a dead letter process for vhost ~s: vhost no" - " longer exists!", [VHost]), - E - end. - -stop(VHost) -> - case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of - {ok, VHostSup} -> - ok = supervisor2:terminate_child(VHostSup, rabbit_vhost_dead_letter), - ok = supervisor2:delete_child(VHostSup, rabbit_vhost_dead_letter); - {error, {no_such_vhost, VHost}} -> - rabbit_log:error("Failed to stop a dead letter process for vhost ~s: " - "vhost no longer exists!", [VHost]), - - ok - end. - -publish(VHost, X, RK, QName, ReasonMsgs) -> - case vhost_dead_letter_pid(VHost) of - no_pid -> - %% TODO what to do??? - ok; - Pid -> - gen_server:cast(Pid, {publish, X, RK, QName, ReasonMsgs}) - end. - -vhost_dead_letter_pid(VHost) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(VHost), - case supervisor2:find_child(VHostSup, rabbit_vhost_dead_letter) of - [Pid] -> Pid; - [] -> no_pid - end. - -start_link() -> - gen_server:start_link(?MODULE, [], []). - -init([]) -> - {ok, init_queue_cleanup_timer(#state{queue_states = #{}})}. - -handle_call(_Req, _From, State) -> - {reply, ok, State}. - -handle_cast({publish, X, RK, QName, ReasonMsgs}, #state{queue_states = QueueStates0} = State) - when is_record(X, exchange) -> - QueueStates = batch_publish(X, RK, QName, ReasonMsgs, QueueStates0), - {noreply, State#state{queue_states = QueueStates}}; -handle_cast({publish, DLX, RK, QName, ReasonMsgs}, #state{queue_states = QueueStates0} = State) -> - QueueStates = - case rabbit_exchange:lookup(DLX) of - {ok, X} -> - batch_publish(X, RK, QName, ReasonMsgs, QueueStates0); - {error, not_found} -> - QueueStates0 - end, - {noreply, State#state{queue_states = QueueStates}}. - -handle_info({ra_event, {Name, _}, _} = Evt, - #state{queue_states = QueueStates} = State0) -> - FState0 = maps:get(Name, QueueStates), - case rabbit_quorum_queue:handle_event(Evt, FState0) of - {_, _, _, FState1} -> - {noreply, - State0#state{queue_states = maps:put(Name, FState1, QueueStates)}}; - eol -> - {noreply, - State0#state{queue_states = maps:remove(Name, QueueStates)}} - end; -handle_info(queue_cleanup, State = #state{queue_states = QueueStates0}) -> - QueueStates = maps:filter(fun(Name, _) -> - QName = rabbit_quorum_queue:queue_name(Name), - case rabbit_amqqueue:lookup(QName) of - [] -> - false; - _ -> - true - end - end, QueueStates0), - {noreply, init_queue_cleanup_timer(State#state{queue_states = QueueStates})}; -handle_info(_I, State) -> - {noreply, State}. - -terminate(_, _) -> ok. - -code_change(_, State, _) -> {ok, State}. - -batch_publish(X, RK, QName, ReasonMsgs, QueueStates) -> - lists:foldl(fun({Reason, Msg}, Acc) -> - rabbit_dead_letter:publish(Msg, Reason, X, RK, QName, Acc) - end, QueueStates, ReasonMsgs). - -init_queue_cleanup_timer(State) -> - {ok, Interval} = application:get_env(rabbit, channel_queue_cleanup_interval), - State#state{queue_cleanup_timer = erlang:send_after(Interval, self(), queue_cleanup)}. diff --git a/src/rabbit_vhost_process.erl b/src/rabbit_vhost_process.erl index 043b8b9608..487308c25d 100644 --- a/src/rabbit_vhost_process.erl +++ b/src/rabbit_vhost_process.erl @@ -57,6 +57,7 @@ init([VHost]) -> rabbit_vhost_sup_sup:save_vhost_process(VHost, self()), Interval = interval(), timer:send_interval(Interval, check_vhost), + true = erlang:garbage_collect(), {ok, VHost} catch _:Reason -> rabbit_amqqueue:mark_local_durable_queues_stopped(VHost), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 4a175af6e2..731a7277c1 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -36,6 +36,7 @@ groups() -> {cluster_size_2, [], [add_member]} ]}, {clustered, [], [ + {cluster_size_2, [], [cleanup_data_dir]}, {cluster_size_2, [], [add_member_not_running, add_member_classic, add_member_already_a_member, @@ -1612,6 +1613,34 @@ delete_member(Config) -> rpc:call(Server, rabbit_quorum_queue, delete_member, [<<"/">>, QQ, Server])). +cleanup_data_dir(Config) -> + %% This test is slow, but also checks that we handle properly errors when + %% trying to delete a queue in minority. A case clause there had gone + %% previously unnoticed. + + [Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + timer:sleep(100), + + [{_, UId}] = rpc:call(Server1, ra_directory, list_registered, []), + DataDir = rpc:call(Server1, ra_env, server_data_dir, [UId]), + ?assert(filelib:is_dir(DataDir)), + + ok = rabbit_ct_broker_helpers:stop_node(Config, Server2), + + ?assertExit({{shutdown, + {connection_closing, {server_initiated_close, 541, _}}}, _}, + amqp_channel:call(Ch, #'queue.delete'{queue = QQ})), + ?assert(filelib:is_dir(DataDir)), + + ?assertEqual(ok, + rpc:call(Server1, rabbit_quorum_queue, cleanup_data_dir, + [])), + ?assert(not filelib:is_dir(DataDir)). + basic_recover(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |
