summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2018-11-22 11:32:29 +0000
committerDiana Corbacho <diana@rabbitmq.com>2018-11-22 11:32:29 +0000
commit612ed35994feb626e1fbc28c15bc09158dd5d19f (patch)
treee9f6f72ffe064f9d0bafc23903a8aa75fe8d5b81
parent644b0eca35e4e8d8dbecb4232b0b37870dcf6ed1 (diff)
parentaf68fc873ef7b34eba922cedac0e62b99adf1dd0 (diff)
downloadrabbitmq-server-git-612ed35994feb626e1fbc28c15bc09158dd5d19f.tar.gz
Merge branch 'master' into qq-dlx-policy
-rw-r--r--src/rabbit_amqqueue.erl14
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_channel.erl19
-rw-r--r--src/rabbit_dead_letter.erl11
-rw-r--r--src/rabbit_fifo.erl3
-rw-r--r--src/rabbit_quorum_queue.erl98
-rw-r--r--src/rabbit_variable_queue.erl10
-rw-r--r--src/rabbit_vhost_dead_letter.erl132
-rw-r--r--src/rabbit_vhost_process.erl1
-rw-r--r--test/quorum_queue_SUITE.erl29
10 files changed, 121 insertions, 202 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 1be5637437..e19914d5ab 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -30,6 +30,7 @@
emit_info_all/5, list_local/1, info_local/1,
emit_info_local/4, emit_info_down/4]).
-export([list_down/1, count/1, list_names/0, list_local_names/0]).
+-export([list_by_type/1]).
-export([notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
-export([basic_get/6, basic_consume/12, basic_cancel/6, notify_decorators/1]).
@@ -124,6 +125,7 @@
-spec list(rabbit_types:vhost()) -> [rabbit_types:amqqueue()].
-spec list_names() -> [rabbit_amqqueue:name()].
-spec list_down(rabbit_types:vhost()) -> [rabbit_types:amqqueue()].
+-spec list_by_type(atom()) -> [rabbit_types:amqqueue()].
-spec info_keys() -> rabbit_types:info_keys().
-spec info(rabbit_types:amqqueue()) -> rabbit_types:infos().
-spec info(rabbit_types:amqqueue(), rabbit_types:info_keys()) ->
@@ -741,6 +743,15 @@ list_local_names() ->
[ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(),
State =/= crashed, is_local_to_node(QPid, node())].
+list_by_type(Type) ->
+ {atomic, Qs} =
+ mnesia:sync_transaction(
+ fun () ->
+ mnesia:match_object(rabbit_durable_queue,
+ #amqqueue{_ = '_', type = Type}, read)
+ end),
+ Qs.
+
list_local_followers() ->
[ Q#amqqueue.name
|| #amqqueue{state = State, type = quorum, pid = {_, Leader},
@@ -1468,7 +1479,8 @@ deliver(Qs, Delivery = #delivery{flow = Flow,
lists:foldl(
fun({{Name, _} = Pid, QName}, QStates) ->
QState0 = get_quorum_state(Pid, QName, QStates),
- case rabbit_quorum_queue:deliver(Confirm, Delivery, QState0) of
+ case rabbit_quorum_queue:deliver(Confirm, Delivery,
+ QState0) of
{ok, QState} ->
maps:put(Name, QState, QStates);
{slow, QState} ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d9f1711401..c2dab3da6f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -954,13 +954,11 @@ dead_letter_maxlen_msg(X, State = #q{backing_queue = BQ}) ->
dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
backing_queue_state = BQS,
- backing_queue = BQ,
- q = #amqqueue{ name = Resource } }) ->
- #resource{virtual_host = VHost} = Resource,
+ backing_queue = BQ}) ->
QName = qname(State),
{Res, Acks1, BQS1} =
Fun(fun (Msg, AckTag, Acks) ->
- rabbit_vhost_dead_letter:publish(VHost, X, RK, QName, [{Reason, Msg}]),
+ rabbit_dead_letter:publish(Msg, Reason, X, RK, QName),
[AckTag | Acks]
end, [], BQS),
{_Guids, BQS2} = BQ:ack(Acks1, BQS1),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a8f3f88c26..1b74b655f5 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1811,7 +1811,7 @@ internal_reject(Requeue, Acked, Limiter,
ok = notify_limiter(Limiter, Acked),
State#ch{queue_states = QueueStates}.
-record_sent(ConsumerTag, AckRequired,
+record_sent(Type, Tag, AckRequired,
Msg = {QName, QPid, MsgId, Redelivered, _Message},
State = #ch{unacked_message_q = UAMQ,
next_tag = DeliveryTag,
@@ -1819,12 +1819,11 @@ record_sent(ConsumerTag, AckRequired,
user = #user{username = Username},
conn_name = ConnName,
channel = ChannelNum}) ->
- ?INCR_STATS(queue_stats, QName, 1, case {ConsumerTag, AckRequired} of
- {_, true} when is_integer(ConsumerTag) -> get;
- {_, false} when is_integer(ConsumerTag) -> get_no_ack;
- %% Authentic consumer tag, this is a delivery
- {_ , true} -> deliver;
- {_ , false} -> deliver_no_ack
+ ?INCR_STATS(queue_stats, QName, 1, case {Type, AckRequired} of
+ {get, true} -> get;
+ {get, false} -> get_no_ack;
+ {deliver, true} -> deliver;
+ {deliver, false} -> deliver_no_ack
end, State),
case Redelivered of
true -> ?INCR_STATS(queue_stats, QName, 1, redeliver, State);
@@ -1832,7 +1831,7 @@ record_sent(ConsumerTag, AckRequired,
end,
rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState),
UAMQ1 = case AckRequired of
- true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}},
+ true -> queue:in({DeliveryTag, Tag, {QPid, MsgId}},
UAMQ);
false -> UAMQ
end,
@@ -2457,7 +2456,7 @@ handle_deliver(ConsumerTag, AckRequired,
ok = rabbit_writer:send_command(WriterPid, Deliver, Content)
end,
rabbit_basic:maybe_gc_large_msg(Content),
- record_sent(ConsumerTag, AckRequired, Msg, State).
+ record_sent(deliver, ConsumerTag, AckRequired, Msg, State).
handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
Msg = {QName, QPid, _MsgId, Redelivered,
@@ -2473,7 +2472,7 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
message_count = MessageCount},
Content),
State1 = track_delivering_queue(NoAck, QPid, QName, State),
- {noreply, record_sent(DeliveryTag, not(NoAck), Msg, State1)}.
+ {noreply, record_sent(get, DeliveryTag, not(NoAck), Msg, State1)}.
init_queue_cleanup_timer(State) ->
{ok, Interval} = application:get_env(rabbit, channel_queue_cleanup_interval),
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
index 06691a29ad..bf77f2d832 100644
--- a/src/rabbit_dead_letter.erl
+++ b/src/rabbit_dead_letter.erl
@@ -16,7 +16,7 @@
-module(rabbit_dead_letter).
--export([publish/6]).
+-export([publish/5]).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -26,20 +26,17 @@
-type reason() :: 'expired' | 'rejected' | 'maxlen'.
-spec publish(rabbit_types:message(), reason(), rabbit_types:exchange(),
- 'undefined' | binary(), rabbit_amqqueue:name(),
- #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'.
+ 'undefined' | binary(), rabbit_amqqueue:name()) -> 'ok'.
%%----------------------------------------------------------------------------
-publish(Msg, Reason, X, RK, QName, QueueStates0) ->
+publish(Msg, Reason, X, RK, QName) ->
DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName),
Delivery = rabbit_basic:delivery(false, false, DLMsg, undefined),
{Queues, Cycles} = detect_cycles(Reason, DLMsg,
rabbit_exchange:route(X, Delivery)),
lists:foreach(fun log_cycle_once/1, Cycles),
- {_, _, QueueStates} = rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(Queues),
- Delivery, QueueStates0),
- QueueStates.
+ rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(Queues), Delivery).
make_msg(Msg = #basic_message{content = Content,
exchange_name = Exchange,
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 21f77fc872..2bd652d302 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -392,8 +392,7 @@ apply(_, {down, ConsumerPid, noconnection},
apply(_, {down, Pid, _Info}, Effects0,
#state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
- % remove any enqueuer for the same pid
- % TODO: if there are any pending enqueuers these should be enqueued
+ % Remove any enqueuer for the same pid and enqueue any pending messages
% This should be ok as we won't see any more enqueues from this pid
State1 = case maps:take(Pid, Enqs0) of
{#enqueuer{pending = Pend}, Enqs} ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index cdb1b48451..385df48c86 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -23,7 +23,7 @@
-export([credit/4]).
-export([purge/1]).
-export([stateless_deliver/2, deliver/3]).
--export([dead_letter_publish/5]).
+-export([dead_letter_publish/4]).
-export([queue_name/1]).
-export([cluster_state/1, status/2]).
-export([cancel_consumer_handler/3, cancel_consumer/3]).
@@ -35,6 +35,7 @@
-export([delete_member/3]).
-export([requeue/3]).
-export([policy_changed/2]).
+-export([cleanup_data_dir/0]).
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
@@ -104,8 +105,8 @@ init_state({Name, _}, QName) ->
fun() -> credit_flow:block(Name), ok end,
fun() -> credit_flow:unblock(Name), ok end).
-handle_event({ra_event, From, Evt}, FState) ->
- rabbit_fifo_client:handle_ra_event(From, Evt, FState).
+handle_event({ra_event, From, Evt}, QState) ->
+ rabbit_fifo_client:handle_ra_event(From, Evt, QState).
declare(#amqqueue{name = QName,
durable = Durable,
@@ -162,9 +163,10 @@ cancel_consumer_handler(QName, {ConsumerTag, ChPid}, _Name) ->
% QName = queue_name(Name),
case Node == node() of
true -> cancel_consumer(QName, ChPid, ConsumerTag);
- false -> rabbit_misc:rpc_call(Node, rabbit_quorum_queue,
- cancel_consumer,
- [QName, ChPid, ConsumerTag])
+ false ->
+ rpc:cast(Node, rabbit_quorum_queue,
+ cancel_consumer,
+ [QName, ChPid, ConsumerTag])
end.
cancel_consumer(QName, ChPid, ConsumerTag) ->
@@ -243,7 +245,7 @@ recover(Queues) ->
case ra:start_server(Name, {Name, node()}, Machine, RaNodes) of
ok -> ok;
Err ->
- rabbit_log:warning("recover: Quorum queue ~w could not"
+ rabbit_log:warning("recover: quorum queue ~w could not"
" be started ~w", [Name, Err]),
ok
end;
@@ -254,7 +256,7 @@ recover(Queues) ->
ok;
Err ->
%% catch all clause to avoid causing the vhost not to start
- rabbit_log:warning("recover: Quorum queue ~w could not be "
+ rabbit_log:warning("recover: quorum queue ~w could not be "
"restarted ~w", [Name, Err]),
ok
end,
@@ -284,7 +286,7 @@ delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = Q
end,
rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName]),
{ok, Msgs};
- {error, {no_more_nodes_to_try, Errs}} = Err ->
+ {error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
(_) -> false
end, Errs) of
@@ -294,7 +296,10 @@ delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = Q
rabbit_core_metrics:queue_deleted(QName),
{ok, Msgs};
false ->
- Err
+ rabbit_misc:protocol_error(
+ internal_error,
+ "Cannot delete quorum queue '~s', not enough nodes online to reach a quorum: ~255p",
+ [rabbit_misc:rs(QName), Errs])
end
end.
@@ -305,19 +310,19 @@ delete_immediately({Name, _} = QPid) ->
rabbit_core_metrics:queue_deleted(QName),
ok.
-ack(CTag, MsgIds, FState) ->
- rabbit_fifo_client:settle(quorum_ctag(CTag), MsgIds, FState).
+ack(CTag, MsgIds, QState) ->
+ rabbit_fifo_client:settle(quorum_ctag(CTag), MsgIds, QState).
-reject(true, CTag, MsgIds, FState) ->
- rabbit_fifo_client:return(quorum_ctag(CTag), MsgIds, FState);
-reject(false, CTag, MsgIds, FState) ->
- rabbit_fifo_client:discard(quorum_ctag(CTag), MsgIds, FState).
+reject(true, CTag, MsgIds, QState) ->
+ rabbit_fifo_client:return(quorum_ctag(CTag), MsgIds, QState);
+reject(false, CTag, MsgIds, QState) ->
+ rabbit_fifo_client:discard(quorum_ctag(CTag), MsgIds, QState).
credit(CTag, Credit, Drain, QState) ->
rabbit_fifo_client:credit(quorum_ctag(CTag), Credit, Drain, QState).
basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
- CTag0, FState0) ->
+ CTag0, QState0) ->
CTag = quorum_ctag(CTag0),
Settlement = case NoAck of
true ->
@@ -325,12 +330,12 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
false ->
unsettled
end,
- case rabbit_fifo_client:dequeue(CTag, Settlement, FState0) of
- {ok, empty, FState} ->
- {ok, empty, FState};
- {ok, {MsgId, {MsgHeader, Msg}}, FState} ->
+ case rabbit_fifo_client:dequeue(CTag, Settlement, QState0) of
+ {ok, empty, QState} ->
+ {ok, empty, QState};
+ {ok, {MsgId, {MsgHeader, Msg}}, QState} ->
IsDelivered = maps:is_key(delivery_count, MsgHeader),
- {ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, FState};
+ {ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, QState};
{timeout, _} ->
{error, timeout}
end.
@@ -351,19 +356,19 @@ basic_consume(#amqqueue{name = QName, type = quorum}, NoAck, ChPid,
ConsumerPrefetchCount, Args),
{ok, QState}.
-basic_cancel(ConsumerTag, ChPid, OkMsg, FState0) ->
+basic_cancel(ConsumerTag, ChPid, OkMsg, QState0) ->
maybe_send_reply(ChPid, OkMsg),
- rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), FState0).
+ rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), QState0).
stateless_deliver(ServerId, Delivery) ->
ok = rabbit_fifo_client:untracked_enqueue([ServerId],
Delivery#delivery.message).
-deliver(false, Delivery, FState0) ->
- rabbit_fifo_client:enqueue(Delivery#delivery.message, FState0);
-deliver(true, Delivery, FState0) ->
+deliver(false, Delivery, QState0) ->
+ rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0);
+deliver(true, Delivery, QState0) ->
rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no,
- Delivery#delivery.message, FState0).
+ Delivery#delivery.message, QState0).
info(Q) ->
info(Q, [name, durable, auto_delete, arguments, pid, state, messages,
@@ -386,8 +391,28 @@ stat(_Q) ->
purge(Node) ->
rabbit_fifo_client:purge(Node).
-requeue(ConsumerTag, MsgIds, FState) ->
- rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, FState).
+requeue(ConsumerTag, MsgIds, QState) ->
+ rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, QState).
+
+cleanup_data_dir() ->
+ Names = [Name || #amqqueue{pid = {Name, _}, quorum_nodes = Nodes}
+ <- rabbit_amqqueue:list_by_type(quorum),
+ lists:member(node(), Nodes)],
+ Registered = ra_directory:list_registered(),
+ [maybe_delete_data_dir(UId) || {Name, UId} <- Registered,
+ not lists:member(Name, Names)],
+ ok.
+
+maybe_delete_data_dir(UId) ->
+ Dir = ra_env:server_data_dir(UId),
+ {ok, Config} = ra_log:read_config(Dir),
+ case maps:get(machine, Config) of
+ {module, rabbit_fifo, _} ->
+ ra_lib:recursive_delete(Dir),
+ ra_directory:unregister_name(UId);
+ _ ->
+ ok
+ end.
policy_changed(QName, Node) ->
{ok, Q} = rabbit_amqqueue:lookup(QName),
@@ -505,11 +530,10 @@ delete_member(#amqqueue{pid = {RaName, _}, name = QName}, Node) ->
end.
%%----------------------------------------------------------------------------
-dlx_mfa(#amqqueue{name = Resource} = Q) ->
- #resource{virtual_host = VHost} = Resource,
+dlx_mfa(Q) ->
DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>, fun res_arg/2, Q), Q),
DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>, fun res_arg/2, Q),
- {?MODULE, dead_letter_publish, [VHost, DLX, DLXRKey, Q#amqqueue.name]}.
+ {?MODULE, dead_letter_publish, [DLX, DLXRKey, Q#amqqueue.name]}.
init_dlx(undefined, _Q) ->
undefined;
@@ -527,10 +551,12 @@ args_policy_lookup(Name, Resolve, Q = #amqqueue{arguments = Args}) ->
{PolVal, {_Type, ArgVal}} -> Resolve(PolVal, ArgVal)
end.
-dead_letter_publish(_, undefined, _, _, _) ->
+dead_letter_publish(undefined, _, _, _) ->
ok;
-dead_letter_publish(VHost, X, RK, QName, ReasonMsgs) ->
- rabbit_vhost_dead_letter:publish(VHost, X, RK, QName, ReasonMsgs).
+dead_letter_publish(X, RK, QName, ReasonMsgs) ->
+ {ok, Exchange} = rabbit_exchange:lookup(X),
+ [rabbit_dead_letter:publish(Msg, Reason, Exchange, RK, QName)
+ || {Reason, Msg} <- ReasonMsgs].
%% TODO escape hack
qname_to_rname(#resource{virtual_host = <<"/">>, name = Name}) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f91f8a648e..83738b7e52 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -488,12 +488,10 @@ start(VHost, DurableQueues) ->
Ref = proplists:get_value(persistent_ref, Terms),
Ref =/= undefined
end],
- start_dead_letter_process(VHost),
start_msg_store(VHost, ClientRefs, StartFunState),
{ok, AllTerms}.
stop(VHost) ->
- ok = rabbit_vhost_dead_letter:stop(VHost),
ok = stop_msg_store(VHost),
ok = rabbit_queue_index:stop(VHost).
@@ -517,14 +515,6 @@ do_start_msg_store(VHost, Type, Refs, StartFunState) ->
exit({error, Error})
end.
-start_dead_letter_process(VHost) ->
- case rabbit_vhost_dead_letter:start(VHost) of
- {ok, _} ->
- rabbit_log:info("Started dead letter process for vhost '~s'~n", [VHost]);
- Err ->
- exit(Err)
- end.
-
abbreviated_type(?TRANSIENT_MSG_STORE) -> transient;
abbreviated_type(?PERSISTENT_MSG_STORE) -> persistent.
diff --git a/src/rabbit_vhost_dead_letter.erl b/src/rabbit_vhost_dead_letter.erl
deleted file mode 100644
index b4dede5e19..0000000000
--- a/src/rabbit_vhost_dead_letter.erl
+++ /dev/null
@@ -1,132 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(rabbit_vhost_dead_letter).
-
--include("rabbit.hrl").
-
--behaviour(gen_server).
-
--export([start/1, start_link/0]).
--export([stop/1]).
--export([publish/5]).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
-
--record(state, {queue_states,
- queue_cleanup_timer}).
-
-start(VHost) ->
- case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
- {ok, VHostSup} ->
- supervisor2:start_child(VHostSup,
- {rabbit_vhost_dead_letter,
- {rabbit_vhost_dead_letter, start_link, []},
- transient, ?WORKER_WAIT, worker, [rabbit_vhost_dead_letter]});
- {error, {no_such_vhost, VHost}} = E ->
- rabbit_log:error("Failed to start a dead letter process for vhost ~s: vhost no"
- " longer exists!", [VHost]),
- E
- end.
-
-stop(VHost) ->
- case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
- {ok, VHostSup} ->
- ok = supervisor2:terminate_child(VHostSup, rabbit_vhost_dead_letter),
- ok = supervisor2:delete_child(VHostSup, rabbit_vhost_dead_letter);
- {error, {no_such_vhost, VHost}} ->
- rabbit_log:error("Failed to stop a dead letter process for vhost ~s: "
- "vhost no longer exists!", [VHost]),
-
- ok
- end.
-
-publish(VHost, X, RK, QName, ReasonMsgs) ->
- case vhost_dead_letter_pid(VHost) of
- no_pid ->
- %% TODO what to do???
- ok;
- Pid ->
- gen_server:cast(Pid, {publish, X, RK, QName, ReasonMsgs})
- end.
-
-vhost_dead_letter_pid(VHost) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(VHost),
- case supervisor2:find_child(VHostSup, rabbit_vhost_dead_letter) of
- [Pid] -> Pid;
- [] -> no_pid
- end.
-
-start_link() ->
- gen_server:start_link(?MODULE, [], []).
-
-init([]) ->
- {ok, init_queue_cleanup_timer(#state{queue_states = #{}})}.
-
-handle_call(_Req, _From, State) ->
- {reply, ok, State}.
-
-handle_cast({publish, X, RK, QName, ReasonMsgs}, #state{queue_states = QueueStates0} = State)
- when is_record(X, exchange) ->
- QueueStates = batch_publish(X, RK, QName, ReasonMsgs, QueueStates0),
- {noreply, State#state{queue_states = QueueStates}};
-handle_cast({publish, DLX, RK, QName, ReasonMsgs}, #state{queue_states = QueueStates0} = State) ->
- QueueStates =
- case rabbit_exchange:lookup(DLX) of
- {ok, X} ->
- batch_publish(X, RK, QName, ReasonMsgs, QueueStates0);
- {error, not_found} ->
- QueueStates0
- end,
- {noreply, State#state{queue_states = QueueStates}}.
-
-handle_info({ra_event, {Name, _}, _} = Evt,
- #state{queue_states = QueueStates} = State0) ->
- FState0 = maps:get(Name, QueueStates),
- case rabbit_quorum_queue:handle_event(Evt, FState0) of
- {_, _, _, FState1} ->
- {noreply,
- State0#state{queue_states = maps:put(Name, FState1, QueueStates)}};
- eol ->
- {noreply,
- State0#state{queue_states = maps:remove(Name, QueueStates)}}
- end;
-handle_info(queue_cleanup, State = #state{queue_states = QueueStates0}) ->
- QueueStates = maps:filter(fun(Name, _) ->
- QName = rabbit_quorum_queue:queue_name(Name),
- case rabbit_amqqueue:lookup(QName) of
- [] ->
- false;
- _ ->
- true
- end
- end, QueueStates0),
- {noreply, init_queue_cleanup_timer(State#state{queue_states = QueueStates})};
-handle_info(_I, State) ->
- {noreply, State}.
-
-terminate(_, _) -> ok.
-
-code_change(_, State, _) -> {ok, State}.
-
-batch_publish(X, RK, QName, ReasonMsgs, QueueStates) ->
- lists:foldl(fun({Reason, Msg}, Acc) ->
- rabbit_dead_letter:publish(Msg, Reason, X, RK, QName, Acc)
- end, QueueStates, ReasonMsgs).
-
-init_queue_cleanup_timer(State) ->
- {ok, Interval} = application:get_env(rabbit, channel_queue_cleanup_interval),
- State#state{queue_cleanup_timer = erlang:send_after(Interval, self(), queue_cleanup)}.
diff --git a/src/rabbit_vhost_process.erl b/src/rabbit_vhost_process.erl
index 043b8b9608..487308c25d 100644
--- a/src/rabbit_vhost_process.erl
+++ b/src/rabbit_vhost_process.erl
@@ -57,6 +57,7 @@ init([VHost]) ->
rabbit_vhost_sup_sup:save_vhost_process(VHost, self()),
Interval = interval(),
timer:send_interval(Interval, check_vhost),
+ true = erlang:garbage_collect(),
{ok, VHost}
catch _:Reason ->
rabbit_amqqueue:mark_local_durable_queues_stopped(VHost),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 4a175af6e2..731a7277c1 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -36,6 +36,7 @@ groups() ->
{cluster_size_2, [], [add_member]}
]},
{clustered, [], [
+ {cluster_size_2, [], [cleanup_data_dir]},
{cluster_size_2, [], [add_member_not_running,
add_member_classic,
add_member_already_a_member,
@@ -1612,6 +1613,34 @@ delete_member(Config) ->
rpc:call(Server, rabbit_quorum_queue, delete_member,
[<<"/">>, QQ, Server])).
+cleanup_data_dir(Config) ->
+ %% This test is slow, but also checks that we handle properly errors when
+ %% trying to delete a queue in minority. A case clause there had gone
+ %% previously unnoticed.
+
+ [Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ timer:sleep(100),
+
+ [{_, UId}] = rpc:call(Server1, ra_directory, list_registered, []),
+ DataDir = rpc:call(Server1, ra_env, server_data_dir, [UId]),
+ ?assert(filelib:is_dir(DataDir)),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
+
+ ?assertExit({{shutdown,
+ {connection_closing, {server_initiated_close, 541, _}}}, _},
+ amqp_channel:call(Ch, #'queue.delete'{queue = QQ})),
+ ?assert(filelib:is_dir(DataDir)),
+
+ ?assertEqual(ok,
+ rpc:call(Server1, rabbit_quorum_queue, cleanup_data_dir,
+ [])),
+ ?assert(not filelib:is_dir(DataDir)).
+
basic_recover(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),