diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2019-06-07 19:53:04 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2019-06-07 19:53:04 +0300 |
| commit | 23ce0c8fa3150a737d6c59f60ed99a72f33d2844 (patch) | |
| tree | 53fbbe009ad4f14ffeebd3f0210127515f0faa04 | |
| parent | 7ce71c3f5fd9d13949caa907343b0443762f67bf (diff) | |
| parent | def400e81db176b348e8ffc2574e47d8585e7fb1 (diff) | |
| download | rabbitmq-server-git-23ce0c8fa3150a737d6c59f60ed99a72f33d2844.tar.gz | |
Merge branch 'master' into rabbitmq-server-1767-protocol-specific-ctx-in-authn-authz
| -rw-r--r-- | rabbitmq-components.mk | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 61 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_policies.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_sup.erl | 18 | ||||
| -rw-r--r-- | src/unconfirmed_messages.erl | 43 | ||||
| -rw-r--r-- | test/confirms_rejects_SUITE.erl | 48 | ||||
| -rw-r--r-- | test/dead_lettering_SUITE.erl | 33 | ||||
| -rw-r--r-- | test/priority_queue_SUITE.erl | 24 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 11 | ||||
| -rw-r--r-- | test/simple_ha_SUITE.erl | 23 | ||||
| -rw-r--r-- | test/vhost_SUITE.erl | 10 |
16 files changed, 224 insertions, 81 deletions
diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 0ff2ad0253..a1817e3196 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -114,7 +114,7 @@ dep_cowboy = hex 2.6.1 dep_cowlib = hex 2.7.0 dep_jsx = hex 2.9.0 dep_lager = hex 3.6.10 -dep_prometheus = git https://github.com/deadtrickster/prometheus.erl v4.3.0 +dep_prometheus = hex 4.4.0 dep_ra = git https://github.com/rabbitmq/ra.git master dep_ranch = hex 1.7.1 dep_recon = hex 2.5.0 diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9e94dd8f27..85c647ae8c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -761,7 +761,9 @@ check_dlxrk_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. check_overflow({longstr, Val}, _Args) -> - case lists:member(Val, [<<"drop-head">>, <<"reject-publish">>]) of + case lists:member(Val, [<<"drop-head">>, + <<"reject-publish">>, + <<"reject-publish-dlx">>]) of true -> ok; false -> {error, invalid_overflow} end; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b3f89b7ef0..2185d7c95f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -82,7 +82,7 @@ %% max length in bytes, if configured max_bytes, %% an action to perform if queue is to be over a limit, - %% can be either drop-head (default) or reject-publish + %% can be either drop-head (default), reject-publish or reject-publish-dlx overflow, %% when policies change, this version helps queue %% determine what previously scheduled/set up state to ignore, @@ -163,7 +163,7 @@ init_state(Q) -> has_had_consumers = false, consumers = rabbit_queue_consumers:new(), senders = pmon:new(delegate), - msg_id_to_channel = gb_trees:empty(), + msg_id_to_channel = #{}, status = running, args_policy_version = 0, overflow = 'drop-head', @@ -261,7 +261,7 @@ recovery_barrier(BarrierPid) -> -spec init_with_backing_queue_state (amqqueue:amqqueue(), atom(), tuple(), any(), - [rabbit_types:delivery()], pmon:pmon(), gb_trees:tree()) -> + [rabbit_types:delivery()], pmon:pmon(), maps:map()) -> #q{}. init_with_backing_queue_state(Q, BQ, BQS, @@ -599,16 +599,26 @@ confirm_messages(MsgIds, MTC) -> {CMs, MTC1} = lists:foldl( fun(MsgId, {CMs, MTC0}) -> - case gb_trees:lookup(MsgId, MTC0) of - {value, {SenderPid, MsgSeqNo}} -> - {rabbit_misc:gb_trees_cons(SenderPid, - MsgSeqNo, CMs), - gb_trees:delete(MsgId, MTC0)}; + case maps:get(MsgId, MTC0, none) of none -> - {CMs, MTC0} + {CMs, MTC0}; + {SenderPid, MsgSeqNo} -> + {maps:update_with(SenderPid, + fun(MsgSeqNos) -> + [MsgSeqNo | MsgSeqNos] + end, + [MsgSeqNo], + CMs), + maps:remove(MsgId, MTC0)} + end - end, {gb_trees:empty(), MTC}, MsgIds), - rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), + end, {#{}, MTC}, MsgIds), + maps:fold( + fun(Pid, MsgSeqNos, _) -> + rabbit_misc:confirm_to_sender(Pid, MsgSeqNos) + end, + ok, + CMs), MTC1. send_or_record_confirm(#delivery{confirm = false}, State) -> @@ -622,7 +632,7 @@ send_or_record_confirm(#delivery{confirm = true, State = #q{q = Q, msg_id_to_channel = MTC}) when ?amqqueue_is_durable(Q) -> - MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC), + MTC1 = maps:put(MsgId, {SenderPid, MsgSeqNo}, MTC), {eventually, State#q{msg_id_to_channel = MTC1}}; send_or_record_confirm(#delivery{confirm = true, sender = SenderPid, @@ -704,12 +714,25 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message}, Delivered, State = #q{overflow = Overflow, backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS, + dlx = DLX, + dlx_routing_key = RK}) -> send_mandatory(Delivery), %% must do this before confirms case {will_overflow(Delivery, State), Overflow} of {true, 'reject-publish'} -> %% Drop publish and nack to publisher send_reject_publish(Delivery, Delivered, State); + {true, 'reject-publish-dlx'} -> + %% Publish to DLX + with_dlx( + DLX, + fun (X) -> + QName = qname(State), + rabbit_dead_letter:publish(Message, maxlen, X, RK, QName) + end, + fun () -> ok end), + %% Drop publish and nack to publisher + send_reject_publish(Delivery, Delivered, State); _ -> {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), State1 = State#q{backing_queue_state = BQS1}, @@ -766,6 +789,8 @@ maybe_drop_head(State = #q{max_length = undefined, {false, State}; maybe_drop_head(State = #q{overflow = 'reject-publish'}) -> {false, State}; +maybe_drop_head(State = #q{overflow = 'reject-publish-dlx'}) -> + {false, State}; maybe_drop_head(State = #q{overflow = 'drop-head'}) -> maybe_drop_head(false, State). @@ -786,14 +811,18 @@ maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ, end. send_reject_publish(#delivery{confirm = true, - sender = SenderPid, - msg_seq_no = MsgSeqNo} = Delivery, + sender = SenderPid, + flow = Flow, + msg_seq_no = MsgSeqNo, + message = #basic_message{id = MsgId}}, _Delivered, State = #q{ backing_queue = BQ, backing_queue_state = BQS, msg_id_to_channel = MTC}) -> - {BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC), gen_server2:cast(SenderPid, {reject_publish, MsgSeqNo, self()}), + + MTC1 = maps:remove(MsgId, MTC), + BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS), State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 }; send_reject_publish(#delivery{confirm = false}, _Delivered, State) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index cf61ae243f..f5c9e8dfce 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -189,6 +189,7 @@ messages_unconfirmed, messages_uncommitted, acks_uncommitted, + pending_raft_commands, prefetch_count, global_prefetch_count, state, @@ -2241,10 +2242,11 @@ confirm(MsgSeqNos, QRef, State = #ch{queue_names = QNames, unconfirmed = UC}) -> %% does not exist in unconfirmed messages. %% Neither does the 'ignore' atom, so it's a reasonable fallback. QName = maps:get(QRef, QNames, ignore), - {MXs, UC1} = + {ConfirmMXs, RejectMXs, UC1} = unconfirmed_messages:confirm_multiple_msg_ref(MsgSeqNos, QName, QRef, UC), %% NB: don't call noreply/1 since we don't want to send confirms. - record_confirms(MXs, State#ch{unconfirmed = UC1}). + State1 = record_confirms(ConfirmMXs, State#ch{unconfirmed = UC1}), + record_rejects(RejectMXs, State1). send_confirms_and_nacks(State = #ch{tx = none, confirmed = [], rejected = []}) -> State; @@ -2371,6 +2373,8 @@ i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> ?QUEUE:len(Msgs); i(messages_uncommitted, #ch{}) -> 0; i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); i(acks_uncommitted, #ch{}) -> 0; +i(pending_raft_commands, #ch{queue_states = QS}) -> + pending_raft_commands(QS); i(state, #ch{cfg = #conf{state = running}}) -> credit_flow:state(); i(state, #ch{cfg = #conf{state = State}}) -> State; i(prefetch_count, #ch{cfg = #conf{consumer_prefetch = C}}) -> C; @@ -2386,6 +2390,11 @@ i(reductions, _State) -> i(Item, _) -> throw({bad_argument, Item}). +pending_raft_commands(QStates) -> + maps:fold(fun (_, V, Acc) -> + Acc + rabbit_fifo_client:pending_size(V) + end, 0, QStates). + name(#ch{cfg = #conf{conn_name = ConnName, channel = Channel}}) -> list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])). diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index a3c241aff2..136800cc99 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -39,6 +39,7 @@ purge/1, cluster_name/1, update_machine_state/2, + pending_size/1, stat/1 ]). @@ -409,6 +410,10 @@ purge(Node) -> Err end. +-spec pending_size(state()) -> non_neg_integer(). +pending_size(#state{pending = Pend}) -> + maps:size(Pend). + -spec stat(ra_server_id()) -> {ok, non_neg_integer(), non_neg_integer()} | {error | timeout, term()}. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index f7a122f98a..22df1751e5 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -717,10 +717,10 @@ promote_me(From, #state { q = Q0, QName, CPid, BQ, BQS, GM, AckTags, SS, MPids), MTC = maps:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) -> - gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); + maps:put(MsgId, {ChPid, MsgSeqNo}, MTC0); (_Msgid, _Status, MTC0) -> MTC0 - end, gb_trees:empty(), MS), + end, #{}, MS), Deliveries = [promote_delivery(Delivery) || {_ChPid, {PubQ, _PendCh, _ChState}} <- maps:to_list(SQ), Delivery <- queue:to_list(PubQ)], diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index e3b23cfbca..5271d503eb 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -1971,7 +1971,7 @@ cleanup_after_file_deletion(File, %%---------------------------------------------------------------------------- -spec combine_files(non_neg_integer(), non_neg_integer(), gc_state()) -> - {ok, deletion_thunk()} | {defer, non_neg_integer()}. + {ok, deletion_thunk()} | {defer, [non_neg_integer()]}. combine_files(Source, Destination, State = #gc_state { file_summary_ets = FileSummaryEts }) -> @@ -2073,7 +2073,7 @@ do_combine_files(SourceSummary, DestinationSummary, gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}), safe_file_delete_fun(Source, Dir, FileHandlesEts). --spec delete_file(non_neg_integer(), gc_state()) -> {ok, deletion_thunk()} | {defer, non_neg_integer()}. +-spec delete_file(non_neg_integer(), gc_state()) -> {ok, deletion_thunk()} | {defer, [non_neg_integer()]}. delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, file_handles_ets = FileHandlesEts, diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index 7878bed02d..c4f4226448 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -131,6 +131,8 @@ validate_policy0(<<"overflow">>, <<"drop-head">>) -> ok; validate_policy0(<<"overflow">>, <<"reject-publish">>) -> ok; +validate_policy0(<<"overflow">>, <<"reject-publish-dlx">>) -> + ok; validate_policy0(<<"overflow">>, Value) -> {error, "~p is not a valid overflow value", [Value]}; diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl index 46a2b485f2..d9d6091c39 100644 --- a/src/rabbit_vhost_sup_sup.erl +++ b/src/rabbit_vhost_sup_sup.erl @@ -30,6 +30,7 @@ save_vhost_process/2]). -export([delete_on_all_nodes/1, start_on_all_nodes/1]). -export([is_vhost_alive/1]). +-export([check/0]). %% Internal -export([stop_and_delete_vhost/1]). @@ -260,3 +261,20 @@ vhost_restart_strategy() -> transient -> transient; permanent -> permanent end. + +check() -> + VHosts = rabbit_vhost:list(), + lists:filter( + fun(V) -> + case rabbit_vhost_sup_sup:get_vhost_sup(V) of + {ok, Sup} -> + MsgStores = [Pid || {Name, Pid, _, _} <- supervisor:which_children(Sup), + lists:member(Name, [msg_store_persistent, + msg_store_transient])], + not is_vhost_alive(V) orelse (not lists:all(fun(P) -> + erlang:is_process_alive(P) + end, MsgStores)); + {error, _} -> + true + end + end, VHosts). diff --git a/src/unconfirmed_messages.erl b/src/unconfirmed_messages.erl index 63a504a239..0a4b533448 100644 --- a/src/unconfirmed_messages.erl +++ b/src/unconfirmed_messages.erl @@ -33,7 +33,6 @@ -export([new/0, insert/5, - confirm_msg_ref/4, confirm_multiple_msg_ref/4, forget_ref/2, @@ -112,27 +111,22 @@ insert(MsgId, QueueNames, QueueRefs, XName, error({message_already_exists, MsgId, QueueNames, QueueRefs, XName, UC}) end. -%% Confirms a message on behalf of the given queue. If it was the last queue (ref) -%% on the waiting list, returns 'confirmed' and performs the necessary cleanup. --spec confirm_msg_ref(msg_id(), queue_name(), queue_ref(), ?MODULE()) -> - {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}. -confirm_msg_ref(MsgId, QueueName, QueueRef, - #unconfirmed{reverse = Reverse} = UC) -> - remove_msg_ref(confirm, MsgId, QueueName, QueueRef, - UC#unconfirmed{reverse = remove_from_reverse(QueueRef, [MsgId], Reverse)}). - +%% Confirms messages on behalf of the given queue. If it was the last queue (ref) +%% on the waiting list, returns message id and excahnge name +%% and performs the necessary cleanup. -spec confirm_multiple_msg_ref(msg_id(), queue_name(), queue_ref(), ?MODULE()) -> - {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}. + {[{msg_id(), exchange_name()}], [{msg_id(), exchange_name()}], ?MODULE()}. confirm_multiple_msg_ref(MsgIds, QueueName, QueueRef, #unconfirmed{reverse = Reverse} = UC0) -> lists:foldl( - fun(MsgId, {C, UC}) -> + fun(MsgId, {C, R, UC}) -> case remove_msg_ref(confirm, MsgId, QueueName, QueueRef, UC) of - {{confirmed, V}, UC1} -> {[V | C], UC1}; - {not_confirmed, UC1} -> {C, UC1} + {{confirmed, V}, UC1} -> {[V | C], R, UC1}; + {{rejected, V}, UC1} -> {C, [V | R], UC1}; + {not_confirmed, UC1} -> {C, R, UC1} end end, - {[], UC0#unconfirmed{reverse = remove_from_reverse(QueueRef, MsgIds, Reverse)}}, + {[], [], UC0#unconfirmed{reverse = remove_from_reverse(QueueRef, MsgIds, Reverse)}}, MsgIds). %% Removes all messages for a queue. @@ -179,14 +173,15 @@ reject_msg(MsgId, #unconfirmed{ordered = Ordered, index = Index, reverse = Rever {Rejected :: [{msg_id(), exchange_name()}], ?MODULE()}. reject_all_for_queue(QueueRef, #unconfirmed{reverse = Reverse0} = UC0) -> MsgIds = maps:keys(maps:get(QueueRef, Reverse0, #{})), - lists:foldl(fun(MsgId, {R, UC}) -> - case reject_msg(MsgId, UC) of - {not_confirmed, UC1} -> {R, UC1}; - {{rejected, V}, UC1} -> {[V | R], UC1} - end - end, - {[], UC0#unconfirmed{reverse = maps:remove(QueueRef, Reverse0)}}, - MsgIds). + lists:foldl( + fun(MsgId, {R, UC}) -> + case reject_msg(MsgId, UC) of + {not_confirmed, UC1} -> {R, UC1}; + {{rejected, V}, UC1} -> {[V | R], UC1} + end + end, + {[], UC0#unconfirmed{reverse = maps:remove(QueueRef, Reverse0)}}, + MsgIds). %% Returns a smallest message id. -spec smallest(?MODULE()) -> msg_id(). @@ -238,7 +233,7 @@ remove_multiple_from_reverse(Refs, MsgIds, Reverse0) -> Reverse0, Refs). --spec remove_msg_ref(confirm | no_confirm, msg_id(), queue_name(), queue_ref(), ?MODULE()) -> +-spec remove_msg_ref(confirm | no_confirm, msg_id(), queue_name() | 'ignore', queue_ref(), ?MODULE()) -> {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}. remove_msg_ref(Confirm, MsgId, QueueName, QueueRef, diff --git a/test/confirms_rejects_SUITE.erl b/test/confirms_rejects_SUITE.erl index 6b2133b8ff..402bca8737 100644 --- a/test/confirms_rejects_SUITE.erl +++ b/test/confirms_rejects_SUITE.erl @@ -11,13 +11,17 @@ all() -> ]. groups() -> + OverflowTests = [ + confirms_rejects_conflict, + policy_resets_to_default + ], [ {parallel_tests, [parallel], [ - confirms_rejects_conflict, - policy_resets_to_default, - dead_queue_rejects, - mixed_dead_alive_queues_reject - ]} + {overflow_reject_publish_dlx, [parallel], OverflowTests}, + {overflow_reject_publish, [parallel], OverflowTests}, + dead_queue_rejects, + mixed_dead_alive_queues_reject + ]} ]. init_per_suite(Config) -> @@ -28,6 +32,14 @@ end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). +init_per_group(overflow_reject_publish, Config) -> + rabbit_ct_helpers:set_config(Config, [ + {overflow, <<"reject-publish">>} + ]); +init_per_group(overflow_reject_publish_dlx, Config) -> + rabbit_ct_helpers:set_config(Config, [ + {overflow, <<"reject-publish-dlx">>} + ]); init_per_group(Group, Config) -> ClusterSize = 2, Config1 = rabbit_ct_helpers:set_config(Config, [ @@ -38,6 +50,10 @@ init_per_group(Group, Config) -> rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()). +end_per_group(overflow_reject_publish, _Config) -> + ok; +end_per_group(overflow_reject_publish_dlx, _Config) -> + ok; end_per_group(_Group, Config) -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_client_helpers:teardown_steps() ++ @@ -60,7 +76,9 @@ init_per_testcase(Testcase, Config) end_per_testcase(policy_resets_to_default = Testcase, Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"policy_resets_to_default">>}), + XOverflow = ?config(overflow, Config), + QueueName = <<"policy_resets_to_default", "_", XOverflow/binary>>, + amqp_channel:call(Ch, #'queue.delete'{queue = QueueName}), rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), Conn = ?config(conn, Config), @@ -70,7 +88,9 @@ end_per_testcase(policy_resets_to_default = Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase); end_per_testcase(confirms_rejects_conflict = Testcase, Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"confirms_rejects_conflict">>}), + XOverflow = ?config(overflow, Config), + QueueName = <<"confirms_rejects_conflict", "_", XOverflow/binary>>, + amqp_channel:call(Ch, #'queue.delete'{queue = QueueName}), end_per_testcase0(Testcase, Config); end_per_testcase(dead_queue_rejects = Testcase, Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -187,15 +207,15 @@ confirms_rejects_conflict(Config) -> false = Conn =:= Conn1, false = Ch =:= Ch1, - QueueName = <<"confirms_rejects_conflict">>, - amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch, self()), + XOverflow = ?config(overflow, Config), + QueueName = <<"confirms_rejects_conflict", "_", XOverflow/binary>>, amqp_channel:call(Ch, #'queue.declare'{queue = QueueName, durable = true, - arguments = [{<<"x-max-length">>,long,12}, - {<<"x-overflow">>,longstr,<<"reject-publish">>}] + arguments = [{<<"x-max-length">>, long, 12}, + {<<"x-overflow">>, longstr, XOverflow}] }), %% Consume 3 messages at once. Do that often. Consume = fun Consume() -> @@ -238,12 +258,14 @@ confirms_rejects_conflict(Config) -> policy_resets_to_default(Config) -> Conn = ?config(conn, Config), + {ok, Ch} = amqp_connection:open_channel(Conn), - QueueName = <<"policy_resets_to_default">>, amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch, self()), + XOverflow = ?config(overflow, Config), + QueueName = <<"policy_resets_to_default", "_", XOverflow/binary>>, amqp_channel:call(Ch, #'queue.declare'{queue = QueueName, durable = true }), @@ -251,7 +273,7 @@ policy_resets_to_default(Config) -> rabbit_ct_broker_helpers:set_policy( Config, 0, QueueName, QueueName, <<"queues">>, - [{<<"max-length">>, MaxLength}, {<<"overflow">>, <<"reject-publish">>}]), + [{<<"max-length">>, MaxLength}, {<<"overflow">>, XOverflow}]), timer:sleep(1000), diff --git a/test/dead_lettering_SUITE.erl b/test/dead_lettering_SUITE.erl index 6fe2a3a522..fe5e91c8ed 100644 --- a/test/dead_lettering_SUITE.erl +++ b/test/dead_lettering_SUITE.erl @@ -60,13 +60,15 @@ groups() -> {dead_letter_tests, [], [ {classic_queue, [parallel], DeadLetterTests ++ [dead_letter_ttl, + dead_letter_max_length_reject_publish_dlx, dead_letter_routing_key_cycle_ttl, dead_letter_headers_reason_expired, dead_letter_headers_reason_expired_per_message]}, {mirrored_queue, [parallel], DeadLetterTests ++ [dead_letter_ttl, - dead_letter_routing_key_cycle_ttl, - dead_letter_headers_reason_expired, - dead_letter_headers_reason_expired_per_message]}, + dead_letter_max_length_reject_publish_dlx, + dead_letter_routing_key_cycle_ttl, + dead_letter_headers_reason_expired, + dead_letter_headers_reason_expired_per_message]}, {quorum_queue, [parallel], DeadLetterTests} ]} ]. @@ -381,6 +383,31 @@ dead_letter_max_length_drop_head(Config) -> _ = consume(Ch, DLXQName, [P1, P2]), consume_empty(Ch, DLXQName). +%% Another strategy: reject-publish-dlx +dead_letter_max_length_reject_publish_dlx(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + + declare_dead_letter_queues(Ch, Config, QName, DLXQName, + [{<<"x-max-length">>, long, 1}, + {<<"x-overflow">>, longstr, <<"reject-publish-dlx">>}]), + + P1 = <<"msg1">>, + P2 = <<"msg2">>, + P3 = <<"msg3">>, + + %% Publish 3 messages + publish(Ch, QName, [P1, P2, P3]), + %% Consume the first one from the queue (max-length = 1) + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + _ = consume(Ch, QName, [P1]), + consume_empty(Ch, QName), + %% Consume the dropped ones from the dead letter queue + wait_for_messages(Config, [[DLXQName, <<"2">>, <<"2">>, <<"0">>]]), + _ = consume(Ch, DLXQName, [P2, P3]), + consume_empty(Ch, DLXQName). + %% Dead letter exchange does not have to be declared when the queue is declared, but it should %% exist by the time messages need to be dead-lettered; if it is missing then, the messages will %% be silently dropped. diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl index 5bac8482fa..f7e20a9fe6 100644 --- a/test/priority_queue_SUITE.erl +++ b/test/priority_queue_SUITE.erl @@ -33,7 +33,8 @@ groups() -> {cluster_size_2, [], [ ackfold, drop, - reject, + {overflow_reject_publish, [], [reject]}, + {overflow_reject_publish_dlx, [], [reject]}, dropwhile_fetchwhile, info_head_message_timestamp, matching, @@ -87,8 +88,20 @@ init_per_group(cluster_size_3, Config) -> ]), rabbit_ct_helpers:run_steps(Config1, rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()). - + rabbit_ct_client_helpers:setup_steps()); +init_per_group(overflow_reject_publish, Config) -> + rabbit_ct_helpers:set_config(Config, [ + {overflow, <<"reject-publish">>} + ]); +init_per_group(overflow_reject_publish_dlx, Config) -> + rabbit_ct_helpers:set_config(Config, [ + {overflow, <<"reject-publish-dlx">>} + ]). + +end_per_group(overflow_reject_publish, _Config) -> + ok; +end_per_group(overflow_reject_publish_dlx, _Config) -> + ok; end_per_group(_Group, Config) -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_client_helpers:teardown_steps() ++ @@ -334,9 +347,10 @@ drop(Config) -> reject(Config) -> {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - Q = <<"reject-queue">>, + XOverflow = ?config(overflow, Config), + Q = <<"reject-queue-", XOverflow/binary>>, declare(Ch, Q, [{<<"x-max-length">>, long, 4}, - {<<"x-overflow">>, longstr, <<"reject-publish">>} + {<<"x-overflow">>, longstr, XOverflow} | arguments(3)]), publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]), %% First 4 messages are published, all others are discarded. diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index c23b7ac85e..1d9789fe89 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -323,11 +323,12 @@ declare_invalid_args(Config) -> LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, {<<"x-max-priority">>, long, 2000}])), - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-overflow">>, longstr, <<"reject-publish">>}])), + [?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-overflow">>, longstr, XOverflow}])) + || XOverflow <- [<<"reject-publish">>, <<"reject-publish-dlx">>]], ?assertExit( {{shutdown, {server_initiated_close, 406, _}}, _}, diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl index 20012b09c8..b2caff86a9 100644 --- a/test/simple_ha_SUITE.erl +++ b/test/simple_ha_SUITE.erl @@ -30,6 +30,11 @@ all() -> ]. groups() -> + RejectTests = [ + rejects_survive_stop, + rejects_survive_sigkill, + rejects_survive_policy + ], [ {cluster_size_2, [], [ rapid_redeclare, @@ -45,9 +50,8 @@ groups() -> confirms_survive_stop, confirms_survive_sigkill, confirms_survive_policy, - rejects_survive_stop, - rejects_survive_sigkill, - rejects_survive_policy + {overflow_reject_publish, [], RejectTests}, + {overflow_reject_publish_dlx, [], RejectTests} ]} ]. @@ -69,6 +73,14 @@ init_per_group(cluster_size_2, Config) -> init_per_group(cluster_size_3, Config) -> rabbit_ct_helpers:set_config(Config, [ {rmq_nodes_count, 3} + ]); +init_per_group(overflow_reject_publish, Config) -> + rabbit_ct_helpers:set_config(Config, [ + {overflow, <<"reject-publish">>} + ]); +init_per_group(overflow_reject_publish_dlx, Config) -> + rabbit_ct_helpers:set_config(Config, [ + {overflow, <<"reject-publish-dlx">>} ]). end_per_group(_, Config) -> @@ -227,12 +239,13 @@ rejects_survive(Config, DeathFun) -> Node2Channel = rabbit_ct_client_helpers:open_channel(Config, B), %% declare the queue on the master, mirrored to the two slaves - Queue = <<"test_rejects">>, + XOverflow = ?config(overflow, Config), + Queue = <<"test_rejects", "_", XOverflow/binary>>, amqp_channel:call(Node1Channel,#'queue.declare'{queue = Queue, auto_delete = false, durable = true, arguments = [{<<"x-max-length">>, long, 1}, - {<<"x-overflow">>, longstr, <<"reject-publish">>}]}), + {<<"x-overflow">>, longstr, XOverflow}]}), Payload = <<"there can be only one">>, amqp_channel:call(Node1Channel, #'basic.publish'{routing_key = Queue}, diff --git a/test/vhost_SUITE.erl b/test/vhost_SUITE.erl index 5e598d9f90..123bf741f0 100644 --- a/test/vhost_SUITE.erl +++ b/test/vhost_SUITE.erl @@ -318,7 +318,9 @@ node_starts_with_dead_vhosts(Config) -> false = rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_vhost_sup_sup, is_vhost_alive, [VHost1]), true = rabbit_ct_broker_helpers:rpc(Config, 1, - rabbit_vhost_sup_sup, is_vhost_alive, [VHost2]). + rabbit_vhost_sup_sup, is_vhost_alive, [VHost2]), + [VHost1] = rabbit_ct_broker_helpers:rpc(Config, 1, + rabbit_vhost_sup_sup, check, []). node_starts_with_dead_vhosts_and_ignore_slaves(Config) -> VHost1 = <<"vhost1">>, @@ -331,6 +333,8 @@ node_starts_with_dead_vhosts_and_ignore_slaves(Config) -> rabbit_vhost_sup_sup, is_vhost_alive, [VHost1]), true = rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_vhost_sup_sup, is_vhost_alive, [VHost2]), + [] = rabbit_ct_broker_helpers:rpc(Config, 1, + rabbit_vhost_sup_sup, check, []), Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost1), {ok, Chan} = amqp_connection:open_channel(Conn), @@ -373,7 +377,9 @@ node_starts_with_dead_vhosts_and_ignore_slaves(Config) -> false = rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_vhost_sup_sup, is_vhost_alive, [VHost1]), true = rabbit_ct_broker_helpers:rpc(Config, 1, - rabbit_vhost_sup_sup, is_vhost_alive, [VHost2]). + rabbit_vhost_sup_sup, is_vhost_alive, [VHost2]), + [VHost1] = rabbit_ct_broker_helpers:rpc(Config, 1, + rabbit_vhost_sup_sup, check, []). vhost_creation_idempotency(Config) -> VHost = <<"idempotency-test">>, |
