summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2019-06-07 19:53:04 +0300
committerMichael Klishin <michael@clojurewerkz.org>2019-06-07 19:53:04 +0300
commit23ce0c8fa3150a737d6c59f60ed99a72f33d2844 (patch)
tree53fbbe009ad4f14ffeebd3f0210127515f0faa04
parent7ce71c3f5fd9d13949caa907343b0443762f67bf (diff)
parentdef400e81db176b348e8ffc2574e47d8585e7fb1 (diff)
downloadrabbitmq-server-git-23ce0c8fa3150a737d6c59f60ed99a72f33d2844.tar.gz
Merge branch 'master' into rabbitmq-server-1767-protocol-specific-ctx-in-authn-authz
-rw-r--r--rabbitmq-components.mk2
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_amqqueue_process.erl61
-rw-r--r--src/rabbit_channel.erl13
-rw-r--r--src/rabbit_fifo_client.erl5
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
-rw-r--r--src/rabbit_msg_store.erl4
-rw-r--r--src/rabbit_policies.erl2
-rw-r--r--src/rabbit_vhost_sup_sup.erl18
-rw-r--r--src/unconfirmed_messages.erl43
-rw-r--r--test/confirms_rejects_SUITE.erl48
-rw-r--r--test/dead_lettering_SUITE.erl33
-rw-r--r--test/priority_queue_SUITE.erl24
-rw-r--r--test/quorum_queue_SUITE.erl11
-rw-r--r--test/simple_ha_SUITE.erl23
-rw-r--r--test/vhost_SUITE.erl10
16 files changed, 224 insertions, 81 deletions
diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk
index 0ff2ad0253..a1817e3196 100644
--- a/rabbitmq-components.mk
+++ b/rabbitmq-components.mk
@@ -114,7 +114,7 @@ dep_cowboy = hex 2.6.1
dep_cowlib = hex 2.7.0
dep_jsx = hex 2.9.0
dep_lager = hex 3.6.10
-dep_prometheus = git https://github.com/deadtrickster/prometheus.erl v4.3.0
+dep_prometheus = hex 4.4.0
dep_ra = git https://github.com/rabbitmq/ra.git master
dep_ranch = hex 1.7.1
dep_recon = hex 2.5.0
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9e94dd8f27..85c647ae8c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -761,7 +761,9 @@ check_dlxrk_arg({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
check_overflow({longstr, Val}, _Args) ->
- case lists:member(Val, [<<"drop-head">>, <<"reject-publish">>]) of
+ case lists:member(Val, [<<"drop-head">>,
+ <<"reject-publish">>,
+ <<"reject-publish-dlx">>]) of
true -> ok;
false -> {error, invalid_overflow}
end;
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b3f89b7ef0..2185d7c95f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -82,7 +82,7 @@
%% max length in bytes, if configured
max_bytes,
%% an action to perform if queue is to be over a limit,
- %% can be either drop-head (default) or reject-publish
+ %% can be either drop-head (default), reject-publish or reject-publish-dlx
overflow,
%% when policies change, this version helps queue
%% determine what previously scheduled/set up state to ignore,
@@ -163,7 +163,7 @@ init_state(Q) ->
has_had_consumers = false,
consumers = rabbit_queue_consumers:new(),
senders = pmon:new(delegate),
- msg_id_to_channel = gb_trees:empty(),
+ msg_id_to_channel = #{},
status = running,
args_policy_version = 0,
overflow = 'drop-head',
@@ -261,7 +261,7 @@ recovery_barrier(BarrierPid) ->
-spec init_with_backing_queue_state
(amqqueue:amqqueue(), atom(), tuple(), any(),
- [rabbit_types:delivery()], pmon:pmon(), gb_trees:tree()) ->
+ [rabbit_types:delivery()], pmon:pmon(), maps:map()) ->
#q{}.
init_with_backing_queue_state(Q, BQ, BQS,
@@ -599,16 +599,26 @@ confirm_messages(MsgIds, MTC) ->
{CMs, MTC1} =
lists:foldl(
fun(MsgId, {CMs, MTC0}) ->
- case gb_trees:lookup(MsgId, MTC0) of
- {value, {SenderPid, MsgSeqNo}} ->
- {rabbit_misc:gb_trees_cons(SenderPid,
- MsgSeqNo, CMs),
- gb_trees:delete(MsgId, MTC0)};
+ case maps:get(MsgId, MTC0, none) of
none ->
- {CMs, MTC0}
+ {CMs, MTC0};
+ {SenderPid, MsgSeqNo} ->
+ {maps:update_with(SenderPid,
+ fun(MsgSeqNos) ->
+ [MsgSeqNo | MsgSeqNos]
+ end,
+ [MsgSeqNo],
+ CMs),
+ maps:remove(MsgId, MTC0)}
+
end
- end, {gb_trees:empty(), MTC}, MsgIds),
- rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
+ end, {#{}, MTC}, MsgIds),
+ maps:fold(
+ fun(Pid, MsgSeqNos, _) ->
+ rabbit_misc:confirm_to_sender(Pid, MsgSeqNos)
+ end,
+ ok,
+ CMs),
MTC1.
send_or_record_confirm(#delivery{confirm = false}, State) ->
@@ -622,7 +632,7 @@ send_or_record_confirm(#delivery{confirm = true,
State = #q{q = Q,
msg_id_to_channel = MTC})
when ?amqqueue_is_durable(Q) ->
- MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC),
+ MTC1 = maps:put(MsgId, {SenderPid, MsgSeqNo}, MTC),
{eventually, State#q{msg_id_to_channel = MTC1}};
send_or_record_confirm(#delivery{confirm = true,
sender = SenderPid,
@@ -704,12 +714,25 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
Delivered,
State = #q{overflow = Overflow,
backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ backing_queue_state = BQS,
+ dlx = DLX,
+ dlx_routing_key = RK}) ->
send_mandatory(Delivery), %% must do this before confirms
case {will_overflow(Delivery, State), Overflow} of
{true, 'reject-publish'} ->
%% Drop publish and nack to publisher
send_reject_publish(Delivery, Delivered, State);
+ {true, 'reject-publish-dlx'} ->
+ %% Publish to DLX
+ with_dlx(
+ DLX,
+ fun (X) ->
+ QName = qname(State),
+ rabbit_dead_letter:publish(Message, maxlen, X, RK, QName)
+ end,
+ fun () -> ok end),
+ %% Drop publish and nack to publisher
+ send_reject_publish(Delivery, Delivered, State);
_ ->
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
State1 = State#q{backing_queue_state = BQS1},
@@ -766,6 +789,8 @@ maybe_drop_head(State = #q{max_length = undefined,
{false, State};
maybe_drop_head(State = #q{overflow = 'reject-publish'}) ->
{false, State};
+maybe_drop_head(State = #q{overflow = 'reject-publish-dlx'}) ->
+ {false, State};
maybe_drop_head(State = #q{overflow = 'drop-head'}) ->
maybe_drop_head(false, State).
@@ -786,14 +811,18 @@ maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ,
end.
send_reject_publish(#delivery{confirm = true,
- sender = SenderPid,
- msg_seq_no = MsgSeqNo} = Delivery,
+ sender = SenderPid,
+ flow = Flow,
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message{id = MsgId}},
_Delivered,
State = #q{ backing_queue = BQ,
backing_queue_state = BQS,
msg_id_to_channel = MTC}) ->
- {BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC),
gen_server2:cast(SenderPid, {reject_publish, MsgSeqNo, self()}),
+
+ MTC1 = maps:remove(MsgId, MTC),
+ BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS),
State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 };
send_reject_publish(#delivery{confirm = false},
_Delivered, State) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index cf61ae243f..f5c9e8dfce 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -189,6 +189,7 @@
messages_unconfirmed,
messages_uncommitted,
acks_uncommitted,
+ pending_raft_commands,
prefetch_count,
global_prefetch_count,
state,
@@ -2241,10 +2242,11 @@ confirm(MsgSeqNos, QRef, State = #ch{queue_names = QNames, unconfirmed = UC}) ->
%% does not exist in unconfirmed messages.
%% Neither does the 'ignore' atom, so it's a reasonable fallback.
QName = maps:get(QRef, QNames, ignore),
- {MXs, UC1} =
+ {ConfirmMXs, RejectMXs, UC1} =
unconfirmed_messages:confirm_multiple_msg_ref(MsgSeqNos, QName, QRef, UC),
%% NB: don't call noreply/1 since we don't want to send confirms.
- record_confirms(MXs, State#ch{unconfirmed = UC1}).
+ State1 = record_confirms(ConfirmMXs, State#ch{unconfirmed = UC1}),
+ record_rejects(RejectMXs, State1).
send_confirms_and_nacks(State = #ch{tx = none, confirmed = [], rejected = []}) ->
State;
@@ -2371,6 +2373,8 @@ i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> ?QUEUE:len(Msgs);
i(messages_uncommitted, #ch{}) -> 0;
i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
+i(pending_raft_commands, #ch{queue_states = QS}) ->
+ pending_raft_commands(QS);
i(state, #ch{cfg = #conf{state = running}}) -> credit_flow:state();
i(state, #ch{cfg = #conf{state = State}}) -> State;
i(prefetch_count, #ch{cfg = #conf{consumer_prefetch = C}}) -> C;
@@ -2386,6 +2390,11 @@ i(reductions, _State) ->
i(Item, _) ->
throw({bad_argument, Item}).
+pending_raft_commands(QStates) ->
+ maps:fold(fun (_, V, Acc) ->
+ Acc + rabbit_fifo_client:pending_size(V)
+ end, 0, QStates).
+
name(#ch{cfg = #conf{conn_name = ConnName, channel = Channel}}) ->
list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])).
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index a3c241aff2..136800cc99 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -39,6 +39,7 @@
purge/1,
cluster_name/1,
update_machine_state/2,
+ pending_size/1,
stat/1
]).
@@ -409,6 +410,10 @@ purge(Node) ->
Err
end.
+-spec pending_size(state()) -> non_neg_integer().
+pending_size(#state{pending = Pend}) ->
+ maps:size(Pend).
+
-spec stat(ra_server_id()) ->
{ok, non_neg_integer(), non_neg_integer()}
| {error | timeout, term()}.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index f7a122f98a..22df1751e5 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -717,10 +717,10 @@ promote_me(From, #state { q = Q0,
QName, CPid, BQ, BQS, GM, AckTags, SS, MPids),
MTC = maps:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
- gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
+ maps:put(MsgId, {ChPid, MsgSeqNo}, MTC0);
(_Msgid, _Status, MTC0) ->
MTC0
- end, gb_trees:empty(), MS),
+ end, #{}, MS),
Deliveries = [promote_delivery(Delivery) ||
{_ChPid, {PubQ, _PendCh, _ChState}} <- maps:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e3b23cfbca..5271d503eb 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1971,7 +1971,7 @@ cleanup_after_file_deletion(File,
%%----------------------------------------------------------------------------
-spec combine_files(non_neg_integer(), non_neg_integer(), gc_state()) ->
- {ok, deletion_thunk()} | {defer, non_neg_integer()}.
+ {ok, deletion_thunk()} | {defer, [non_neg_integer()]}.
combine_files(Source, Destination,
State = #gc_state { file_summary_ets = FileSummaryEts }) ->
@@ -2073,7 +2073,7 @@ do_combine_files(SourceSummary, DestinationSummary,
gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}),
safe_file_delete_fun(Source, Dir, FileHandlesEts).
--spec delete_file(non_neg_integer(), gc_state()) -> {ok, deletion_thunk()} | {defer, non_neg_integer()}.
+-spec delete_file(non_neg_integer(), gc_state()) -> {ok, deletion_thunk()} | {defer, [non_neg_integer()]}.
delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
file_handles_ets = FileHandlesEts,
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index 7878bed02d..c4f4226448 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -131,6 +131,8 @@ validate_policy0(<<"overflow">>, <<"drop-head">>) ->
ok;
validate_policy0(<<"overflow">>, <<"reject-publish">>) ->
ok;
+validate_policy0(<<"overflow">>, <<"reject-publish-dlx">>) ->
+ ok;
validate_policy0(<<"overflow">>, Value) ->
{error, "~p is not a valid overflow value", [Value]};
diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl
index 46a2b485f2..d9d6091c39 100644
--- a/src/rabbit_vhost_sup_sup.erl
+++ b/src/rabbit_vhost_sup_sup.erl
@@ -30,6 +30,7 @@
save_vhost_process/2]).
-export([delete_on_all_nodes/1, start_on_all_nodes/1]).
-export([is_vhost_alive/1]).
+-export([check/0]).
%% Internal
-export([stop_and_delete_vhost/1]).
@@ -260,3 +261,20 @@ vhost_restart_strategy() ->
transient -> transient;
permanent -> permanent
end.
+
+check() ->
+ VHosts = rabbit_vhost:list(),
+ lists:filter(
+ fun(V) ->
+ case rabbit_vhost_sup_sup:get_vhost_sup(V) of
+ {ok, Sup} ->
+ MsgStores = [Pid || {Name, Pid, _, _} <- supervisor:which_children(Sup),
+ lists:member(Name, [msg_store_persistent,
+ msg_store_transient])],
+ not is_vhost_alive(V) orelse (not lists:all(fun(P) ->
+ erlang:is_process_alive(P)
+ end, MsgStores));
+ {error, _} ->
+ true
+ end
+ end, VHosts).
diff --git a/src/unconfirmed_messages.erl b/src/unconfirmed_messages.erl
index 63a504a239..0a4b533448 100644
--- a/src/unconfirmed_messages.erl
+++ b/src/unconfirmed_messages.erl
@@ -33,7 +33,6 @@
-export([new/0,
insert/5,
- confirm_msg_ref/4,
confirm_multiple_msg_ref/4,
forget_ref/2,
@@ -112,27 +111,22 @@ insert(MsgId, QueueNames, QueueRefs, XName,
error({message_already_exists, MsgId, QueueNames, QueueRefs, XName, UC})
end.
-%% Confirms a message on behalf of the given queue. If it was the last queue (ref)
-%% on the waiting list, returns 'confirmed' and performs the necessary cleanup.
--spec confirm_msg_ref(msg_id(), queue_name(), queue_ref(), ?MODULE()) ->
- {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}.
-confirm_msg_ref(MsgId, QueueName, QueueRef,
- #unconfirmed{reverse = Reverse} = UC) ->
- remove_msg_ref(confirm, MsgId, QueueName, QueueRef,
- UC#unconfirmed{reverse = remove_from_reverse(QueueRef, [MsgId], Reverse)}).
-
+%% Confirms messages on behalf of the given queue. If it was the last queue (ref)
+%% on the waiting list, returns message id and excahnge name
+%% and performs the necessary cleanup.
-spec confirm_multiple_msg_ref(msg_id(), queue_name(), queue_ref(), ?MODULE()) ->
- {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}.
+ {[{msg_id(), exchange_name()}], [{msg_id(), exchange_name()}], ?MODULE()}.
confirm_multiple_msg_ref(MsgIds, QueueName, QueueRef,
#unconfirmed{reverse = Reverse} = UC0) ->
lists:foldl(
- fun(MsgId, {C, UC}) ->
+ fun(MsgId, {C, R, UC}) ->
case remove_msg_ref(confirm, MsgId, QueueName, QueueRef, UC) of
- {{confirmed, V}, UC1} -> {[V | C], UC1};
- {not_confirmed, UC1} -> {C, UC1}
+ {{confirmed, V}, UC1} -> {[V | C], R, UC1};
+ {{rejected, V}, UC1} -> {C, [V | R], UC1};
+ {not_confirmed, UC1} -> {C, R, UC1}
end
end,
- {[], UC0#unconfirmed{reverse = remove_from_reverse(QueueRef, MsgIds, Reverse)}},
+ {[], [], UC0#unconfirmed{reverse = remove_from_reverse(QueueRef, MsgIds, Reverse)}},
MsgIds).
%% Removes all messages for a queue.
@@ -179,14 +173,15 @@ reject_msg(MsgId, #unconfirmed{ordered = Ordered, index = Index, reverse = Rever
{Rejected :: [{msg_id(), exchange_name()}], ?MODULE()}.
reject_all_for_queue(QueueRef, #unconfirmed{reverse = Reverse0} = UC0) ->
MsgIds = maps:keys(maps:get(QueueRef, Reverse0, #{})),
- lists:foldl(fun(MsgId, {R, UC}) ->
- case reject_msg(MsgId, UC) of
- {not_confirmed, UC1} -> {R, UC1};
- {{rejected, V}, UC1} -> {[V | R], UC1}
- end
- end,
- {[], UC0#unconfirmed{reverse = maps:remove(QueueRef, Reverse0)}},
- MsgIds).
+ lists:foldl(
+ fun(MsgId, {R, UC}) ->
+ case reject_msg(MsgId, UC) of
+ {not_confirmed, UC1} -> {R, UC1};
+ {{rejected, V}, UC1} -> {[V | R], UC1}
+ end
+ end,
+ {[], UC0#unconfirmed{reverse = maps:remove(QueueRef, Reverse0)}},
+ MsgIds).
%% Returns a smallest message id.
-spec smallest(?MODULE()) -> msg_id().
@@ -238,7 +233,7 @@ remove_multiple_from_reverse(Refs, MsgIds, Reverse0) ->
Reverse0,
Refs).
--spec remove_msg_ref(confirm | no_confirm, msg_id(), queue_name(), queue_ref(), ?MODULE()) ->
+-spec remove_msg_ref(confirm | no_confirm, msg_id(), queue_name() | 'ignore', queue_ref(), ?MODULE()) ->
{{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed,
?MODULE()}.
remove_msg_ref(Confirm, MsgId, QueueName, QueueRef,
diff --git a/test/confirms_rejects_SUITE.erl b/test/confirms_rejects_SUITE.erl
index 6b2133b8ff..402bca8737 100644
--- a/test/confirms_rejects_SUITE.erl
+++ b/test/confirms_rejects_SUITE.erl
@@ -11,13 +11,17 @@ all() ->
].
groups() ->
+ OverflowTests = [
+ confirms_rejects_conflict,
+ policy_resets_to_default
+ ],
[
{parallel_tests, [parallel], [
- confirms_rejects_conflict,
- policy_resets_to_default,
- dead_queue_rejects,
- mixed_dead_alive_queues_reject
- ]}
+ {overflow_reject_publish_dlx, [parallel], OverflowTests},
+ {overflow_reject_publish, [parallel], OverflowTests},
+ dead_queue_rejects,
+ mixed_dead_alive_queues_reject
+ ]}
].
init_per_suite(Config) ->
@@ -28,6 +32,14 @@ end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
+init_per_group(overflow_reject_publish, Config) ->
+ rabbit_ct_helpers:set_config(Config, [
+ {overflow, <<"reject-publish">>}
+ ]);
+init_per_group(overflow_reject_publish_dlx, Config) ->
+ rabbit_ct_helpers:set_config(Config, [
+ {overflow, <<"reject-publish-dlx">>}
+ ]);
init_per_group(Group, Config) ->
ClusterSize = 2,
Config1 = rabbit_ct_helpers:set_config(Config, [
@@ -38,6 +50,10 @@ init_per_group(Group, Config) ->
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
+end_per_group(overflow_reject_publish, _Config) ->
+ ok;
+end_per_group(overflow_reject_publish_dlx, _Config) ->
+ ok;
end_per_group(_Group, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
@@ -60,7 +76,9 @@ init_per_testcase(Testcase, Config)
end_per_testcase(policy_resets_to_default = Testcase, Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"policy_resets_to_default">>}),
+ XOverflow = ?config(overflow, Config),
+ QueueName = <<"policy_resets_to_default", "_", XOverflow/binary>>,
+ amqp_channel:call(Ch, #'queue.delete'{queue = QueueName}),
rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
Conn = ?config(conn, Config),
@@ -70,7 +88,9 @@ end_per_testcase(policy_resets_to_default = Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase);
end_per_testcase(confirms_rejects_conflict = Testcase, Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"confirms_rejects_conflict">>}),
+ XOverflow = ?config(overflow, Config),
+ QueueName = <<"confirms_rejects_conflict", "_", XOverflow/binary>>,
+ amqp_channel:call(Ch, #'queue.delete'{queue = QueueName}),
end_per_testcase0(Testcase, Config);
end_per_testcase(dead_queue_rejects = Testcase, Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -187,15 +207,15 @@ confirms_rejects_conflict(Config) ->
false = Conn =:= Conn1,
false = Ch =:= Ch1,
- QueueName = <<"confirms_rejects_conflict">>,
-
amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
+ XOverflow = ?config(overflow, Config),
+ QueueName = <<"confirms_rejects_conflict", "_", XOverflow/binary>>,
amqp_channel:call(Ch, #'queue.declare'{queue = QueueName,
durable = true,
- arguments = [{<<"x-max-length">>,long,12},
- {<<"x-overflow">>,longstr,<<"reject-publish">>}]
+ arguments = [{<<"x-max-length">>, long, 12},
+ {<<"x-overflow">>, longstr, XOverflow}]
}),
%% Consume 3 messages at once. Do that often.
Consume = fun Consume() ->
@@ -238,12 +258,14 @@ confirms_rejects_conflict(Config) ->
policy_resets_to_default(Config) ->
Conn = ?config(conn, Config),
+
{ok, Ch} = amqp_connection:open_channel(Conn),
- QueueName = <<"policy_resets_to_default">>,
amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
+ XOverflow = ?config(overflow, Config),
+ QueueName = <<"policy_resets_to_default", "_", XOverflow/binary>>,
amqp_channel:call(Ch, #'queue.declare'{queue = QueueName,
durable = true
}),
@@ -251,7 +273,7 @@ policy_resets_to_default(Config) ->
rabbit_ct_broker_helpers:set_policy(
Config, 0,
QueueName, QueueName, <<"queues">>,
- [{<<"max-length">>, MaxLength}, {<<"overflow">>, <<"reject-publish">>}]),
+ [{<<"max-length">>, MaxLength}, {<<"overflow">>, XOverflow}]),
timer:sleep(1000),
diff --git a/test/dead_lettering_SUITE.erl b/test/dead_lettering_SUITE.erl
index 6fe2a3a522..fe5e91c8ed 100644
--- a/test/dead_lettering_SUITE.erl
+++ b/test/dead_lettering_SUITE.erl
@@ -60,13 +60,15 @@ groups() ->
{dead_letter_tests, [],
[
{classic_queue, [parallel], DeadLetterTests ++ [dead_letter_ttl,
+ dead_letter_max_length_reject_publish_dlx,
dead_letter_routing_key_cycle_ttl,
dead_letter_headers_reason_expired,
dead_letter_headers_reason_expired_per_message]},
{mirrored_queue, [parallel], DeadLetterTests ++ [dead_letter_ttl,
- dead_letter_routing_key_cycle_ttl,
- dead_letter_headers_reason_expired,
- dead_letter_headers_reason_expired_per_message]},
+ dead_letter_max_length_reject_publish_dlx,
+ dead_letter_routing_key_cycle_ttl,
+ dead_letter_headers_reason_expired,
+ dead_letter_headers_reason_expired_per_message]},
{quorum_queue, [parallel], DeadLetterTests}
]}
].
@@ -381,6 +383,31 @@ dead_letter_max_length_drop_head(Config) ->
_ = consume(Ch, DLXQName, [P1, P2]),
consume_empty(Ch, DLXQName).
+%% Another strategy: reject-publish-dlx
+dead_letter_max_length_reject_publish_dlx(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName,
+ [{<<"x-max-length">>, long, 1},
+ {<<"x-overflow">>, longstr, <<"reject-publish-dlx">>}]),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ P3 = <<"msg3">>,
+
+ %% Publish 3 messages
+ publish(Ch, QName, [P1, P2, P3]),
+ %% Consume the first one from the queue (max-length = 1)
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ _ = consume(Ch, QName, [P1]),
+ consume_empty(Ch, QName),
+ %% Consume the dropped ones from the dead letter queue
+ wait_for_messages(Config, [[DLXQName, <<"2">>, <<"2">>, <<"0">>]]),
+ _ = consume(Ch, DLXQName, [P2, P3]),
+ consume_empty(Ch, DLXQName).
+
%% Dead letter exchange does not have to be declared when the queue is declared, but it should
%% exist by the time messages need to be dead-lettered; if it is missing then, the messages will
%% be silently dropped.
diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl
index 5bac8482fa..f7e20a9fe6 100644
--- a/test/priority_queue_SUITE.erl
+++ b/test/priority_queue_SUITE.erl
@@ -33,7 +33,8 @@ groups() ->
{cluster_size_2, [], [
ackfold,
drop,
- reject,
+ {overflow_reject_publish, [], [reject]},
+ {overflow_reject_publish_dlx, [], [reject]},
dropwhile_fetchwhile,
info_head_message_timestamp,
matching,
@@ -87,8 +88,20 @@ init_per_group(cluster_size_3, Config) ->
]),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
- rabbit_ct_client_helpers:setup_steps()).
-
+ rabbit_ct_client_helpers:setup_steps());
+init_per_group(overflow_reject_publish, Config) ->
+ rabbit_ct_helpers:set_config(Config, [
+ {overflow, <<"reject-publish">>}
+ ]);
+init_per_group(overflow_reject_publish_dlx, Config) ->
+ rabbit_ct_helpers:set_config(Config, [
+ {overflow, <<"reject-publish-dlx">>}
+ ]).
+
+end_per_group(overflow_reject_publish, _Config) ->
+ ok;
+end_per_group(overflow_reject_publish_dlx, _Config) ->
+ ok;
end_per_group(_Group, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
@@ -334,9 +347,10 @@ drop(Config) ->
reject(Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- Q = <<"reject-queue">>,
+ XOverflow = ?config(overflow, Config),
+ Q = <<"reject-queue-", XOverflow/binary>>,
declare(Ch, Q, [{<<"x-max-length">>, long, 4},
- {<<"x-overflow">>, longstr, <<"reject-publish">>}
+ {<<"x-overflow">>, longstr, XOverflow}
| arguments(3)]),
publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]),
%% First 4 messages are published, all others are discarded.
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index c23b7ac85e..1d9789fe89 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -323,11 +323,12 @@ declare_invalid_args(Config) ->
LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-max-priority">>, long, 2000}])),
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-overflow">>, longstr, <<"reject-publish">>}])),
+ [?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-overflow">>, longstr, XOverflow}]))
+ || XOverflow <- [<<"reject-publish">>, <<"reject-publish-dlx">>]],
?assertExit(
{{shutdown, {server_initiated_close, 406, _}}, _},
diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl
index 20012b09c8..b2caff86a9 100644
--- a/test/simple_ha_SUITE.erl
+++ b/test/simple_ha_SUITE.erl
@@ -30,6 +30,11 @@ all() ->
].
groups() ->
+ RejectTests = [
+ rejects_survive_stop,
+ rejects_survive_sigkill,
+ rejects_survive_policy
+ ],
[
{cluster_size_2, [], [
rapid_redeclare,
@@ -45,9 +50,8 @@ groups() ->
confirms_survive_stop,
confirms_survive_sigkill,
confirms_survive_policy,
- rejects_survive_stop,
- rejects_survive_sigkill,
- rejects_survive_policy
+ {overflow_reject_publish, [], RejectTests},
+ {overflow_reject_publish_dlx, [], RejectTests}
]}
].
@@ -69,6 +73,14 @@ init_per_group(cluster_size_2, Config) ->
init_per_group(cluster_size_3, Config) ->
rabbit_ct_helpers:set_config(Config, [
{rmq_nodes_count, 3}
+ ]);
+init_per_group(overflow_reject_publish, Config) ->
+ rabbit_ct_helpers:set_config(Config, [
+ {overflow, <<"reject-publish">>}
+ ]);
+init_per_group(overflow_reject_publish_dlx, Config) ->
+ rabbit_ct_helpers:set_config(Config, [
+ {overflow, <<"reject-publish-dlx">>}
]).
end_per_group(_, Config) ->
@@ -227,12 +239,13 @@ rejects_survive(Config, DeathFun) ->
Node2Channel = rabbit_ct_client_helpers:open_channel(Config, B),
%% declare the queue on the master, mirrored to the two slaves
- Queue = <<"test_rejects">>,
+ XOverflow = ?config(overflow, Config),
+ Queue = <<"test_rejects", "_", XOverflow/binary>>,
amqp_channel:call(Node1Channel,#'queue.declare'{queue = Queue,
auto_delete = false,
durable = true,
arguments = [{<<"x-max-length">>, long, 1},
- {<<"x-overflow">>, longstr, <<"reject-publish">>}]}),
+ {<<"x-overflow">>, longstr, XOverflow}]}),
Payload = <<"there can be only one">>,
amqp_channel:call(Node1Channel,
#'basic.publish'{routing_key = Queue},
diff --git a/test/vhost_SUITE.erl b/test/vhost_SUITE.erl
index 5e598d9f90..123bf741f0 100644
--- a/test/vhost_SUITE.erl
+++ b/test/vhost_SUITE.erl
@@ -318,7 +318,9 @@ node_starts_with_dead_vhosts(Config) ->
false = rabbit_ct_broker_helpers:rpc(Config, 1,
rabbit_vhost_sup_sup, is_vhost_alive, [VHost1]),
true = rabbit_ct_broker_helpers:rpc(Config, 1,
- rabbit_vhost_sup_sup, is_vhost_alive, [VHost2]).
+ rabbit_vhost_sup_sup, is_vhost_alive, [VHost2]),
+ [VHost1] = rabbit_ct_broker_helpers:rpc(Config, 1,
+ rabbit_vhost_sup_sup, check, []).
node_starts_with_dead_vhosts_and_ignore_slaves(Config) ->
VHost1 = <<"vhost1">>,
@@ -331,6 +333,8 @@ node_starts_with_dead_vhosts_and_ignore_slaves(Config) ->
rabbit_vhost_sup_sup, is_vhost_alive, [VHost1]),
true = rabbit_ct_broker_helpers:rpc(Config, 1,
rabbit_vhost_sup_sup, is_vhost_alive, [VHost2]),
+ [] = rabbit_ct_broker_helpers:rpc(Config, 1,
+ rabbit_vhost_sup_sup, check, []),
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost1),
{ok, Chan} = amqp_connection:open_channel(Conn),
@@ -373,7 +377,9 @@ node_starts_with_dead_vhosts_and_ignore_slaves(Config) ->
false = rabbit_ct_broker_helpers:rpc(Config, 1,
rabbit_vhost_sup_sup, is_vhost_alive, [VHost1]),
true = rabbit_ct_broker_helpers:rpc(Config, 1,
- rabbit_vhost_sup_sup, is_vhost_alive, [VHost2]).
+ rabbit_vhost_sup_sup, is_vhost_alive, [VHost2]),
+ [VHost1] = rabbit_ct_broker_helpers:rpc(Config, 1,
+ rabbit_vhost_sup_sup, check, []).
vhost_creation_idempotency(Config) ->
VHost = <<"idempotency-test">>,