summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rabbitmq-components.mk4
-rw-r--r--src/rabbit_amqqueue.erl7
-rw-r--r--src/rabbit_fifo.erl79
-rw-r--r--src/rabbit_fifo_client.erl11
-rw-r--r--src/rabbit_quorum_queue.erl27
-rw-r--r--src/rabbit_vm.erl16
-rw-r--r--test/quorum_queue_SUITE.erl213
7 files changed, 307 insertions, 50 deletions
diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk
index 709d155040..5307809c8b 100644
--- a/rabbitmq-components.mk
+++ b/rabbitmq-components.mk
@@ -110,12 +110,12 @@ dep_rabbitmq_public_umbrella = git_rmq rabbitmq-public-umbrella $(curre
# all projects use the same versions. It avoids conflicts and makes it
# possible to work with rabbitmq-public-umbrella.
-dep_cowboy = hex 2.6.0
+dep_cowboy = hex 2.6.1
dep_cowlib = hex 2.7.0
dep_jsx = hex 2.9.0
dep_lager = hex 3.6.5
dep_ra = git https://github.com/rabbitmq/ra.git master
-dep_ranch = hex 1.7.0
+dep_ranch = hex 1.7.1
dep_ranch_proxy_protocol = hex 2.1.1
dep_recon = hex 2.3.6
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 5127280871..f4876356ee 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -913,8 +913,11 @@ list_local(VHostPath) ->
[ Q || #amqqueue{state = State, pid = QPid} = Q <- list(VHostPath),
State =/= crashed, is_local_to_node(QPid, node()) ].
-notify_policy_changed(#amqqueue{pid = QPid}) ->
- gen_server2:cast(QPid, policy_changed).
+notify_policy_changed(#amqqueue{pid = QPid}) when ?IS_CLASSIC(QPid) ->
+ gen_server2:cast(QPid, policy_changed);
+notify_policy_changed(#amqqueue{pid = QPid,
+ name = QName}) when ?IS_QUORUM(QPid) ->
+ rabbit_quorum_queue:policy_changed(QName, QPid).
consumers(#amqqueue{ pid = QPid }) ->
delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}).
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 79d4a3effc..c12a6ec464 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -207,19 +207,19 @@
-spec init(config()) -> {state(), ra_machine:effects()}.
init(#{name := Name} = Conf) ->
+ update_state(Conf, #state{name = Name}).
+
+update_state(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
CCH = maps:get(cancel_consumer_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
MH = maps:get(metrics_handler, Conf, undefined),
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
- #state{name = Name,
- dead_letter_handler = DLH,
- cancel_consumer_handler = CCH,
- become_leader_handler = BLH,
- metrics_handler = MH,
- shadow_copy_interval = SHI}.
-
-
+ State#state{dead_letter_handler = DLH,
+ cancel_consumer_handler = CCH,
+ become_leader_handler = BLH,
+ metrics_handler = MH,
+ shadow_copy_interval = SHI}.
% msg_ids are scoped per consumer
% ra_indexes holds all raft indexes for enqueues currently on queue
@@ -374,10 +374,16 @@ apply(_, {down, ConsumerPid, noconnection},
Node = node(ConsumerPid),
% mark all consumers and enqueuers as suspect
% and monitor the node
- Cons = maps:map(fun({_, P}, C) when node(P) =:= Node ->
- C#consumer{suspected_down = true};
- (_, C) -> C
- end, Cons0),
+ {Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C,
+ {Co, St0}) when node(P) =:= Node ->
+ St = return_all(St0, Checked0),
+ {maps:put(K, C#consumer{suspected_down = true,
+ checked_out = #{}},
+ Co),
+ St};
+ (K, C, {Co, St}) ->
+ {maps:put(K, C, Co), St}
+ end, {#{}, State0}, Cons0),
Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
E#enqueuer{suspected_down = true};
(_, E) -> E
@@ -388,7 +394,7 @@ apply(_, {down, ConsumerPid, noconnection},
_ ->
[{monitor, node, Node} | Effects0]
end,
- {State0#state{consumers = Cons, enqueuers = Enqs}, Effects, ok};
+ {State#state{consumers = Cons, enqueuers = Enqs}, Effects, ok};
apply(_, {down, Pid, _Info}, Effects0,
#state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
@@ -411,7 +417,8 @@ apply(_, {down, Pid, _Info}, Effects0,
checkout(State2, Effects1);
apply(_, {nodeup, Node}, Effects0,
#state{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
+ enqueuers = Enqs0,
+ service_queue = SQ0} = State0) ->
%% A node we are monitoring has come back.
%% If we have suspected any processes of being
%% down we should now re-issue the monitors for them to detect if they're
@@ -427,10 +434,26 @@ apply(_, {nodeup, Node}, Effects0,
(_, _, Acc) -> Acc
end, [], Enqs0),
Monitors = [{monitor, process, P} || P <- Cons ++ Enqs],
+ Enqs1 = maps:map(fun(P, E) when node(P) =:= Node ->
+ E#enqueuer{suspected_down = false};
+ (_, E) -> E
+ end, Enqs0),
+ {Cons1, SQ, Effects} =
+ maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
+ when node(P) =:= Node ->
+ update_or_remove_sub(
+ ConsumerId, C#consumer{suspected_down = false},
+ CAcc, SQAcc, EAcc);
+ (_, _, Acc) ->
+ Acc
+ end, {Cons0, SQ0, Effects0}, Cons0),
% TODO: avoid list concat
- {State0, Monitors ++ Effects0, ok};
+ checkout(State0#state{consumers = Cons1, enqueuers = Enqs1,
+ service_queue = SQ}, Monitors ++ Effects);
apply(_, {nodedown, _Node}, Effects, State) ->
- {State, Effects, ok}.
+ {State, Effects, ok};
+apply(_, {update_state, Conf}, Effects, State) ->
+ {update_state(Conf, State), Effects, ok}.
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #state{consumers = Custs,
@@ -583,9 +606,7 @@ cancel_consumer(ConsumerId,
{Effects0, #state{consumers = C0, name = Name} = S0}) ->
case maps:take(ConsumerId, C0) of
{#consumer{checked_out = Checked0}, Cons} ->
- S = maps:fold(fun (_, {MsgNum, Msg}, S) ->
- return_one(MsgNum, Msg, S)
- end, S0, Checked0),
+ S = return_all(S0, Checked0),
Effects = cancel_consumer_effects(ConsumerId, Name, S, Effects0),
case maps:size(Cons) of
0 ->
@@ -788,6 +809,10 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
State0#state{messages = maps:put(MsgNum, Msg, Messages),
returns = queue:in(MsgNum, Returns)}.
+return_all(State, Checked) ->
+ maps:fold(fun (_, {MsgNum, Msg}, S) ->
+ return_one(MsgNum, Msg, S)
+ end, State, Checked).
checkout(State, Effects) ->
checkout0(checkout_one(State), Effects, #{}).
@@ -871,6 +896,8 @@ checkout_one(#state{service_queue = SQ0,
%% can happen when draining
%% recurse without consumer on queue
checkout_one(InitState#state{service_queue = SQ1});
+ {ok, #consumer{suspected_down = true}} ->
+ checkout_one(InitState#state{service_queue = SQ1});
{ok, #consumer{checked_out = Checked0,
next_msg_id = Next,
credit = Credit,
@@ -1289,6 +1316,20 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() ->
?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3),
ok.
+down_with_noconnection_returns_unack_test() ->
+ Pid = spawn(fun() -> ok end),
+ Cid = {<<"down_with_noconnect">>, Pid},
+ {State0, _} = enq(1, 1, second, test_init(test)),
+ ?assertEqual(1, maps:size(State0#state.messages)),
+ ?assertEqual(0, queue:len(State0#state.returns)),
+ {State1, {_, _}} = deq(2, Cid, unsettled, State0),
+ ?assertEqual(0, maps:size(State1#state.messages)),
+ ?assertEqual(0, queue:len(State1#state.returns)),
+ {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, [], State1),
+ ?assertEqual(1, maps:size(State2a#state.messages)),
+ ?assertEqual(1, queue:len(State2a#state.returns)),
+ ok.
+
down_with_noproc_enqueuer_is_cleaned_up_test() ->
State00 = test_init(test),
Pid = spawn(fun() -> ok end),
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index c087e35fb2..c063ef9a17 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -21,7 +21,8 @@
handle_ra_event/3,
untracked_enqueue/2,
purge/1,
- cluster_name/1
+ cluster_name/1,
+ update_machine_state/2
]).
-include_lib("ra/include/ra.hrl").
@@ -375,6 +376,14 @@ purge(Node) ->
cluster_name(#state{cluster_name = ClusterName}) ->
ClusterName.
+update_machine_state(Node, Conf) ->
+ case ra:process_command(Node, {update_state, Conf}) of
+ {ok, ok, _} ->
+ ok;
+ Err ->
+ Err
+ end.
+
%% @doc Handles incoming `ra_events'. Events carry both internal "bookeeping"
%% events emitted by the `ra' leader as well as `rabbit_fifo' emitted events such
%% as message deliveries. All ra events need to be handled by {@module}
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 70c3116c33..61c4858f40 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -34,6 +34,7 @@
-export([add_member/3]).
-export([delete_member/3]).
-export([requeue/3]).
+-export([policy_changed/2]).
-export([cleanup_data_dir/0]).
-include_lib("rabbit_common/include/rabbit.hrl").
@@ -101,8 +102,9 @@ init_state({Name, _}, QName) ->
{ok, #amqqueue{pid = Leader, quorum_nodes = Nodes0}} =
rabbit_amqqueue:lookup(QName),
%% Ensure the leader is listed first
- Nodes = [Leader | lists:delete(Leader, [{Name, N} || N <- Nodes0])],
- rabbit_fifo_client:init(qname_to_rname(QName), Nodes, SoftLimit,
+ Servers0 = [{Name, N} || N <- Nodes],
+ Servers = [Leader | lists:delete(Leader, Servers0)],
+ rabbit_fifo_client:init(qname_to_rname(QName), Servers, SoftLimit,
fun() -> credit_flow:block(Name), ok end,
fun() -> credit_flow:unblock(Name), ok end).
@@ -150,12 +152,14 @@ declare(#amqqueue{name = QName,
-ra_machine(Q = #amqqueue{name = QName}) ->
- {module, rabbit_fifo,
- #{dead_letter_handler => dlx_mfa(Q),
- cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]},
- become_leader_handler => {?MODULE, become_leader, [QName]},
- metrics_handler => {?MODULE, update_metrics, [QName]}}}.
+ra_machine(Q) ->
+ {module, rabbit_fifo, ra_machine_config(Q)}.
+
+ra_machine_config(Q = #amqqueue{name = QName}) ->
+ #{dead_letter_handler => dlx_mfa(Q),
+ cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]},
+ become_leader_handler => {?MODULE, become_leader, [QName]},
+ metrics_handler => {?MODULE, update_metrics, [QName]}}.
cancel_consumer_handler(QName, {ConsumerTag, ChPid}, _Name) ->
Node = node(ChPid),
@@ -274,9 +278,10 @@ stop(VHost) ->
delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = QNodes},
_IfUnused, _IfEmpty, ActingUser) ->
%% TODO Quorum queue needs to support consumer tracking for IfUnused
+ Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
Msgs = quorum_messages(Name),
_ = rabbit_amqqueue:internal_delete(QName, ActingUser),
- case ra:delete_cluster([{Name, Node} || Node <- QNodes], 120000) of
+ case ra:delete_cluster([{Name, Node} || Node <- QNodes], Timeout) of
{ok, {_, LeaderNode} = Leader} ->
MRef = erlang:monitor(process, Leader),
receive
@@ -412,6 +417,10 @@ maybe_delete_data_dir(UId) ->
ok
end.
+policy_changed(QName, Node) ->
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ rabbit_fifo_client:update_machine_state(Node, ra_machine_config(Q)).
+
cluster_state(Name) ->
case whereis(Name) of
undefined -> down;
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 265bcb45e3..e495ab8677 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -35,7 +35,7 @@ memory() ->
{Sums, _Other} = sum_processes(
lists:append(All), distinguishers(), [memory]),
- [Qs, QsSlave, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther,
+ [Qs, QsSlave, Qqs, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther,
MsgIndexProc, MgmtDbProc, Plugins] =
[aggregate(Names, Sums, memory, fun (X) -> X end)
|| Names <- distinguished_interesting_sups()],
@@ -69,7 +69,7 @@ memory() ->
OtherProc = Processes
- ConnsReader - ConnsWriter - ConnsChannel - ConnsOther
- - Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc,
+ - Qs - QsSlave - Qqs - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc,
[
%% Connections
@@ -81,6 +81,7 @@ memory() ->
%% Queues
{queue_procs, Qs},
{queue_slave_procs, QsSlave},
+ {quorum_queue_procs, Qqs},
%% Processes
{plugins, Plugins},
@@ -124,7 +125,7 @@ binary() ->
sets:add_element({Ptr, Sz}, Acc0)
end, Acc, Info)
end, distinguishers(), [{binary, sets:new()}]),
- [Other, Qs, QsSlave, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther,
+ [Other, Qs, QsSlave, Qqs, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther,
MsgIndexProc, MgmtDbProc, Plugins] =
[aggregate(Names, [{other, Rest} | Sums], binary, fun sum_binary/1)
|| Names <- [[other] | distinguished_interesting_sups()]],
@@ -134,6 +135,7 @@ binary() ->
{connection_other, ConnsOther},
{queue_procs, Qs},
{queue_slave_procs, QsSlave},
+ {quorum_queue_procs, Qqs},
{plugins, Plugins},
{mgmt_db, MgmtDbProc},
{msg_index, MsgIndexProc},
@@ -173,11 +175,16 @@ bytes(Words) -> try
end.
interesting_sups() ->
- [queue_sups(), conn_sups() | interesting_sups0()].
+ [queue_sups(), quorum_sups(), conn_sups() | interesting_sups0()].
queue_sups() ->
all_vhosts_children(rabbit_amqqueue_sup_sup).
+quorum_sups() ->
+ %% TODO: in the future not all ra servers may be queues and we needs
+ %% some way to filter this
+ [ra_server_sup].
+
msg_stores() ->
all_vhosts_children(msg_store_transient)
++
@@ -229,6 +236,7 @@ distinguished_interesting_sups() ->
[
with(queue_sups(), master),
with(queue_sups(), slave),
+ quorum_sups(),
with(conn_sups(), reader),
with(conn_sups(), writer),
with(conn_sups(), channel),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 531b645774..14a3650a87 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -54,12 +54,16 @@ groups() ->
delete_declare,
metrics_cleanup_on_leadership_takeover,
metrics_cleanup_on_leader_crash,
- consume_in_minority
- ]},
+ consume_in_minority]},
{cluster_size_5, [], [start_queue,
start_queue_concurrent,
quorum_cluster_size_3,
quorum_cluster_size_7
+ ]},
+ {clustered_with_partitions, [], [
+ reconnect_consumer_and_publish,
+ reconnect_consumer_and_wait,
+ reconnect_consumer_and_wait_channel_down
]}
]}
].
@@ -100,6 +104,7 @@ all_tests() ->
dead_letter_to_classic_queue,
dead_letter_to_quorum_queue,
dead_letter_from_classic_to_quorum_queue,
+ dead_letter_policy,
cleanup_queue_state_on_channel_after_publish,
cleanup_queue_state_on_channel_after_subscribe,
basic_cancel,
@@ -119,7 +124,9 @@ all_tests() ->
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
- rabbit_ct_helpers:run_setup_steps(Config).
+ rabbit_ct_helpers:run_setup_steps(
+ Config,
+ [fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1]).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
@@ -128,6 +135,8 @@ init_per_group(clustered, Config) ->
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]);
init_per_group(unclustered, Config) ->
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]);
+init_per_group(clustered_with_partitions, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{net_ticktime, 10}]);
init_per_group(Group, Config) ->
ClusterSize = case Group of
single_node -> 1;
@@ -139,7 +148,8 @@ init_per_group(Group, Config) ->
[{rmq_nodes_count, ClusterSize},
{rmq_nodename_suffix, Group},
{tcp_ports_base}]),
- Config2 = rabbit_ct_helpers:run_steps(Config1,
+ Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]),
+ Config2 = rabbit_ct_helpers:run_steps(Config1b,
[fun merge_app_env/1 ] ++
rabbit_ct_broker_helpers:setup_steps()),
ok = rabbit_ct_broker_helpers:rpc(
@@ -159,10 +169,28 @@ end_per_group(clustered, Config) ->
Config;
end_per_group(unclustered, Config) ->
Config;
+end_per_group(clustered_with_partitions, Config) ->
+ Config;
end_per_group(_, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_broker_helpers:teardown_steps()).
+init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish;
+ Testcase == reconnect_consumer_and_wait;
+ Testcase == reconnect_consumer_and_wait_channel_down ->
+ Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
+ Q = rabbit_data_coercion:to_binary(Testcase),
+ Config2 = rabbit_ct_helpers:set_config(Config1,
+ [{rmq_nodes_count, 3},
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base},
+ {queue_name, Q}
+ ]),
+ rabbit_ct_helpers:run_steps(Config2,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++
+ [fun rabbit_ct_broker_helpers:enable_dist_proxy/1,
+ fun rabbit_ct_broker_helpers:cluster_nodes/1]);
init_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
@@ -170,12 +198,18 @@ init_per_testcase(Testcase, Config) ->
Config2 = rabbit_ct_helpers:set_config(Config1,
[{queue_name, Q}
]),
- rabbit_ct_helpers:run_steps(Config2,
- rabbit_ct_client_helpers:setup_steps()).
+ rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()).
merge_app_env(Config) ->
rabbit_ct_helpers:merge_app_env(Config, {rabbit, [{core_metrics_gc_interval, 100}]}).
+end_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish;
+ Testcase == reconnect_consumer_and_wait;
+ Testcase == reconnect_consumer_and_wait_channel_down ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase);
end_per_testcase(Testcase, Config) ->
catch delete_queues(),
Config1 = rabbit_ct_helpers:run_steps(
@@ -1066,22 +1100,47 @@ dead_letter_to_classic_queue(Config) ->
{<<"x-dead-letter-routing-key">>, longstr, CQ}
])),
?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])),
- RaName = ra_name(QQ),
- publish(Ch, QQ),
+ test_dead_lettering(true, Config, Ch, Servers, ra_name(QQ), QQ, CQ).
+
+test_dead_lettering(PolicySet, Config, Ch, Servers, RaName, Source, Destination) ->
+ publish(Ch, Source),
wait_for_messages_ready(Servers, RaName, 1),
wait_for_messages_pending_ack(Servers, RaName, 0),
- wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]),
- DeliveryTag = consume(Ch, QQ, false),
+ wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]]),
+ DeliveryTag = consume(Ch, Source, false),
wait_for_messages_ready(Servers, RaName, 0),
wait_for_messages_pending_ack(Servers, RaName, 1),
- wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]),
+ wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]]),
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
requeue = false}),
wait_for_messages_ready(Servers, RaName, 0),
wait_for_messages_pending_ack(Servers, RaName, 0),
- wait_for_messages(Config, [[CQ, <<"1">>, <<"1">>, <<"0">>]]),
- _ = consume(Ch, CQ, false).
+ case PolicySet of
+ true ->
+ wait_for_messages(Config, [[Destination, <<"1">>, <<"1">>, <<"0">>]]),
+ _ = consume(Ch, Destination, true);
+ false ->
+ wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]])
+ end.
+
+dead_letter_policy(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ CQ = <<"classic-dead_letter_policy">>,
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])),
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"dlx">>, <<"dead_letter.*">>, <<"queues">>,
+ [{<<"dead-letter-exchange">>, <<"">>},
+ {<<"dead-letter-routing-key">>, CQ}]),
+ RaName = ra_name(QQ),
+ test_dead_lettering(true, Config, Ch, Servers, RaName, QQ, CQ),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"dlx">>),
+ test_dead_lettering(false, Config, Ch, Servers, RaName, QQ, CQ).
dead_letter_to_quorum_queue(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1589,6 +1648,7 @@ delete_member(Config) ->
rpc:call(Server, rabbit_quorum_queue, delete_member,
[<<"/">>, QQ, Server])).
+
cleanup_data_dir(Config) ->
%% This test is slow, but also checks that we handle properly errors when
%% trying to delete a queue in minority. A case clause there had gone
@@ -1610,6 +1670,7 @@ cleanup_data_dir(Config) ->
?assertExit({{shutdown,
{connection_closing, {server_initiated_close, 541, _}}}, _},
amqp_channel:call(Ch, #'queue.delete'{queue = QQ})),
+ catch amqp_channel:call(Ch, #'queue.delete'{queue = QQ}),
?assert(filelib:is_dir(DataDir)),
?assertEqual(ok,
@@ -1617,6 +1678,132 @@ cleanup_data_dir(Config) ->
[])),
?assert(not filelib:is_dir(DataDir)).
+reconnect_consumer_and_publish(Config) ->
+ [Server | _] = Servers =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server}),
+ [F1, F2] = lists:delete(Leader, Servers),
+ ChF = rabbit_ct_client_helpers:open_channel(Config, F1),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(ChF, QQ, false),
+ receive
+ {#'basic.deliver'{redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1)
+ end,
+ Up = [Leader, F2],
+ rabbit_ct_broker_helpers:block_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:block_traffic_between(F1, F2),
+ wait_for_messages_ready(Up, RaName, 1),
+ wait_for_messages_pending_ack(Up, RaName, 0),
+ wait_for_messages_ready([F1], RaName, 0),
+ wait_for_messages_pending_ack([F1], RaName, 1),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, F2),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 2),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1)
+ end,
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag2,
+ redelivered = true}, _} ->
+ amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag2,
+ multiple = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+reconnect_consumer_and_wait(Config) ->
+ [Server | _] = Servers =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server}),
+ [F1, F2] = lists:delete(Leader, Servers),
+ ChF = rabbit_ct_client_helpers:open_channel(Config, F1),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(ChF, QQ, false),
+ receive
+ {#'basic.deliver'{redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1)
+ end,
+ Up = [Leader, F2],
+ rabbit_ct_broker_helpers:block_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:block_traffic_between(F1, F2),
+ wait_for_messages_ready(Up, RaName, 1),
+ wait_for_messages_pending_ack(Up, RaName, 0),
+ wait_for_messages_ready([F1], RaName, 0),
+ wait_for_messages_pending_ack([F1], RaName, 1),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, F2),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = true}, _} ->
+ amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+reconnect_consumer_and_wait_channel_down(Config) ->
+ [Server | _] = Servers =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server}),
+ [F1, F2] = lists:delete(Leader, Servers),
+ ChF = rabbit_ct_client_helpers:open_channel(Config, F1),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(ChF, QQ, false),
+ receive
+ {#'basic.deliver'{redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1)
+ end,
+ Up = [Leader, F2],
+ rabbit_ct_broker_helpers:block_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:block_traffic_between(F1, F2),
+ wait_for_messages_ready(Up, RaName, 1),
+ wait_for_messages_pending_ack(Up, RaName, 0),
+ wait_for_messages_ready([F1], RaName, 0),
+ wait_for_messages_pending_ack([F1], RaName, 1),
+ rabbit_ct_client_helpers:close_channel(ChF),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, F2),
+ %% Let's give it a few seconds to ensure it doesn't attempt to
+ %% deliver to the down channel - it shouldn't be monitored
+ %% at this time!
+ timer:sleep(5000),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
basic_recover(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),