diff options
| author | Karl Nilsson <kjnilsson@gmail.com> | 2019-02-12 17:01:13 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-02-12 17:01:13 +0000 |
| commit | b0dfe9352f083607d5d7346bd3962ba6c50cc03a (patch) | |
| tree | 86dd3fa9004f9c79eda929ce16ac2c0e02fff0cf | |
| parent | 4e4887f48b9b92c68e85df232d7db29122d6845b (diff) | |
| parent | fb97086d7bd1c309803515789eb91b2e4e254de5 (diff) | |
| download | rabbitmq-server-git-b0dfe9352f083607d5d7346bd3962ba6c50cc03a.tar.gz | |
Merge pull request #1874 from rabbitmq/queues-testing
Generic queue testing
| -rw-r--r-- | src/rabbit_fifo.erl | 253 | ||||
| -rw-r--r-- | test/queue_parallel_SUITE.erl | 587 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 543 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 3 | ||||
| -rw-r--r-- | test/single_active_consumer_SUITE.erl | 75 |
5 files changed, 796 insertions, 665 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 1fe0b38cd9..4f548568be 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -184,7 +184,7 @@ %% command: `{consumer_credit, ReceiverDeliveryCount, Credit}' credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data lifetime = once :: once | auto, - suspected_down = false :: boolean() + status = up :: up | suspected_down | cancelled }). -type consumer() :: #consumer{}. @@ -193,7 +193,7 @@ {next_seqno = 1 :: msg_seqno(), % out of order enqueues - sorted list pending = [] :: [{msg_seqno(), ra_index(), raw_msg()}], - suspected_down = false :: boolean() + status = up :: up | suspected_down }). -record(state, @@ -426,10 +426,7 @@ apply(Meta, #checkout{spec = {dequeue, Settlement}, end end; apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) -> - {State, Effects} = cancel_consumer(ConsumerId, State0, []), - % TODO: here we should really demonitor the pid but _only_ if it has no - % other consumers or enqueuers. leaving a monitor in place isn't harmful - % however + {State, Effects} = cancel_consumer(ConsumerId, State0, [], consumer_cancel), checkout(Meta, State, Effects); apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId}, @@ -466,26 +463,30 @@ apply(_, {down, ConsumerPid, noconnection}, % mark all consumers and enqueuers as suspected down % and monitor the node so that we can find out the final state of the % process at some later point - {Cons, State, Effects1} = maps:fold( - fun({_, P} = K, - #consumer{checked_out = Checked0} = C, - {Co, St0, Eff}) when node(P) =:= Node -> - St = return_all(St0, Checked0), - Credit = increase_credit(C, maps:size(Checked0)), - Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff), - {maps:put(K, C#consumer{suspected_down = true, - credit = Credit, - checked_out = #{}}, Co), - St, Eff1}; - (K, C, {Co, St, Eff}) -> - {maps:put(K, C, Co), St, Eff} - end, {#{}, State0, []}, Cons0), + {Cons, State, Effects1} = + maps:fold(fun({_, P} = K, + #consumer{checked_out = Checked0} = C, + {Co, St0, Eff}) when (node(P) =:= Node) and + (C#consumer.status =/= cancelled)-> + St = return_all(St0, Checked0), + Credit = increase_credit(C, maps:size(Checked0)), + Eff1 = ConsumerUpdateActiveFun(St, K, C, false, + suspected_down, Eff), + {maps:put(K, + C#consumer{status = suspected_down, + credit = Credit, + checked_out = #{}}, Co), + St, Eff1}; + (K, C, {Co, St, Eff}) -> + {maps:put(K, C, Co), St, Eff} + end, {#{}, State0, []}, Cons0), Enqs = maps:map(fun(P, E) when node(P) =:= Node -> - E#enqueuer{suspected_down = true}; + E#enqueuer{status = suspected_down}; (_, E) -> E end, Enqs0), % mark waiting consumers as suspected if necessary - WaitingConsumers = maybe_mark_suspect_waiting_consumers(Node, State0, true), + WaitingConsumers = update_waiting_consumer_status(Node, State0, + suspected_down), Effects2 = case maps:size(Cons) of 0 -> @@ -514,12 +515,12 @@ apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0, DownConsumers = maps:keys( maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)), {State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) -> - cancel_consumer(ConsumerId, S, E) + cancel_consumer(ConsumerId, S, E, down) end, {State2, Effects1}, DownConsumers), checkout(Meta, State, Effects); apply(Meta, {nodeup, Node}, #state{consumers = Cons0, - enqueuers = Enqs0, - service_queue = SQ0} = State0) -> + enqueuers = Enqs0, + service_queue = SQ0} = State0) -> %% A node we are monitoring has come back. %% If we have suspected any processes of being %% down we should now re-issue the monitors for them to detect if they're @@ -528,20 +529,20 @@ apply(Meta, {nodeup, Node}, #state{consumers = Cons0, || P <- suspected_pids_for(Node, State0)], % un-suspect waiting consumers when necessary - WaitingConsumers = maybe_mark_suspect_waiting_consumers(Node, State0, - false), + WaitingConsumers = update_waiting_consumer_status(Node, State0, up), Enqs1 = maps:map(fun(P, E) when node(P) =:= Node -> - E#enqueuer{suspected_down = false}; + E#enqueuer{status = up}; (_, E) -> E end, Enqs0), ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), {Cons1, SQ, Effects} = maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) - when node(P) =:= Node -> + when (node(P) =:= Node) and + (C#consumer.status =/= cancelled) -> EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, up, EAcc), update_or_remove_sub( - ConsumerId, C#consumer{suspected_down = false}, + ConsumerId, C#consumer{status = up}, CAcc, SQAcc, EAcc1); (_, _, Acc) -> Acc @@ -587,26 +588,27 @@ handle_waiting_consumer_down(Pid, State = State0#state{waiting_consumers = StillUp}, {Effects, State}. -maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = default}, - _Suspected) -> +update_waiting_consumer_status(_Node, #state{consumer_strategy = default}, + _Status) -> []; -maybe_mark_suspect_waiting_consumers(_Node, - #state{consumer_strategy = single_active, - waiting_consumers = []}, - _Suspected) -> +update_waiting_consumer_status(_Node, + #state{consumer_strategy = single_active, + waiting_consumers = []}, + _Status) -> []; -maybe_mark_suspect_waiting_consumers(Node, - #state{consumer_strategy = single_active, - waiting_consumers = WaitingConsumers}, - Suspected) -> +update_waiting_consumer_status(Node, + #state{consumer_strategy = single_active, + waiting_consumers = WaitingConsumers}, + Status) -> [begin case node(P) of Node -> - {ConsumerId, Consumer#consumer{suspected_down = Suspected}}; + {ConsumerId, Consumer#consumer{status = Status}}; _ -> {ConsumerId, Consumer} end - end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers]. + end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers, + Consumer#consumer.status =/= cancelled]. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #state{consumers = Cons, @@ -739,12 +741,13 @@ query_consumers(#state{consumers = Consumers, consumer_strategy = ConsumerStrategy } = State) -> ActiveActivityStatusFun = case ConsumerStrategy of default -> - fun(_ConsumerId, #consumer{suspected_down = SuspectedDown}) -> - case SuspectedDown of - true -> - {false, suspected_down}; - false -> - {true, up} + fun(_ConsumerId, + #consumer{status = Status}) -> + case Status of + suspected_down -> + {false, Status}; + _ -> + {true, Status} end end; single_active -> @@ -758,18 +761,24 @@ query_consumers(#state{consumers = Consumers, end end end, - FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta} = Consumer) -> - {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer), - {Pid, Tag, - maps:get(ack, Meta, undefined), - maps:get(prefetch, Meta, undefined), - Active, - ActivityStatus, - maps:get(args, Meta, []), - maps:get(username, Meta, undefined)} - end, Consumers), + FromConsumers = maps:fold(fun (_, #consumer{status = cancelled}, Acc) -> + Acc; + ({Tag, Pid}, #consumer{meta = Meta} = Consumer, Acc) -> + {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer), + maps:put({Tag, Pid}, + {Pid, Tag, + maps:get(ack, Meta, undefined), + maps:get(prefetch, Meta, undefined), + Active, + ActivityStatus, + maps:get(args, Meta, []), + maps:get(username, Meta, undefined)}, + Acc) + end, #{}, Consumers), FromWaitingConsumers = - lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) -> + lists:foldl(fun ({_, #consumer{status = cancelled}}, Acc) -> + Acc; + ({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) -> {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer), maps:put({Tag, Pid}, {Pid, Tag, @@ -854,42 +863,42 @@ num_checked_out(#state{consumers = Cons}) -> end, 0, maps:values(Cons)). cancel_consumer(ConsumerId, - #state{consumer_strategy = default} = State, Effects) -> + #state{consumer_strategy = default} = State, Effects, Reason) -> %% general case, single active consumer off - cancel_consumer0(ConsumerId, State, Effects); + cancel_consumer0(ConsumerId, State, Effects, Reason); cancel_consumer(ConsumerId, #state{consumer_strategy = single_active, waiting_consumers = []} = State, - Effects) -> + Effects, Reason) -> %% single active consumer on, no consumers are waiting - cancel_consumer0(ConsumerId, State, Effects); + cancel_consumer0(ConsumerId, State, Effects, Reason); cancel_consumer(ConsumerId, #state{consumers = Cons0, consumer_strategy = single_active, waiting_consumers = WaitingConsumers0} = State0, - Effects0) -> + Effects0, Reason) -> %% single active consumer on, consumers are waiting case maps:take(ConsumerId, Cons0) of - {#consumer{checked_out = Checked0}, _} -> + {Consumer, Cons1} -> % The active consumer is to be removed % Cancel it - State1 = return_all(State0, Checked0), - Effects1 = cancel_consumer_effects(ConsumerId, State1, Effects0), + {State1, Effects1} = maybe_return_all(ConsumerId, Consumer, Cons1, State0, Effects0, Reason), + Effects2 = cancel_consumer_effects(ConsumerId, State1, Effects1), % Take another one from the waiting consumers and put it in consumers [{NewActiveConsumerId, NewActiveConsumer} | RemainingWaitingConsumers] = WaitingConsumers0, - #state{service_queue = ServiceQueue} = State0, + #state{service_queue = ServiceQueue} = State1, ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId, NewActiveConsumer, ServiceQueue), - State = State1#state{consumers = #{NewActiveConsumerId => - NewActiveConsumer}, + State = State1#state{consumers = maps:put(NewActiveConsumerId, + NewActiveConsumer, State1#state.consumers), service_queue = ServiceQueue1, waiting_consumers = RemainingWaitingConsumers}, - Effects2 = consumer_update_active_effects(State, NewActiveConsumerId, + Effects = consumer_update_active_effects(State, NewActiveConsumerId, NewActiveConsumer, true, - single_active, Effects1), - {State, Effects2}; + single_active, Effects2), + {State, Effects}; error -> % The cancelled consumer is not the active one % Just remove it from idle_consumers @@ -914,23 +923,39 @@ consumer_update_active_effects(#state{queue_resource = QName }, [QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]} | Effects]. -cancel_consumer0(ConsumerId, - #state{consumers = C0} = S0, Effects0) -> +cancel_consumer0(ConsumerId, #state{consumers = C0} = S0, Effects0, Reason) -> case maps:take(ConsumerId, C0) of - {#consumer{checked_out = Checked0}, Cons} -> - S = return_all(S0, Checked0), - Effects = cancel_consumer_effects(ConsumerId, S, Effects0), - case maps:size(Cons) of + {Consumer, Cons1} -> + {S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0, Effects0, Reason), + Effects = cancel_consumer_effects(ConsumerId, S, Effects2), + case maps:size(S#state.consumers) of 0 -> - {S#state{consumers = Cons}, [{aux, inactive} | Effects]}; + {S, [{aux, inactive} | Effects]}; _ -> - {S#state{consumers = Cons}, Effects} + {S, Effects} end; error -> %% already removed: do nothing {S0, Effects0} end. +maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1, + #state{consumers = C0, + service_queue = SQ0} = S0, Effects0, Reason) -> + case Reason of + consumer_cancel -> + {Cons, SQ, Effects1} = + update_or_remove_sub(ConsumerId, + Consumer#consumer{lifetime = once, + credit = 0, + status = cancelled}, + C0, SQ0, Effects0), + {S0#state{consumers = Cons, service_queue = SQ}, Effects1}; + down -> + S1 = return_all(S0, Checked0), + {S1#state{consumers = Cons1}, Effects0} + end. + apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> Bytes = message_size(RawMsg), case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of @@ -1303,7 +1328,9 @@ checkout_one(#state{service_queue = SQ0, %% can happen when draining %% recurse without consumer on queue checkout_one(InitState#state{service_queue = SQ1}); - {ok, #consumer{suspected_down = true}} -> + {ok, #consumer{status = cancelled}} -> + checkout_one(InitState#state{service_queue = SQ1}); + {ok, #consumer{status = suspected_down}} -> checkout_one(InitState#state{service_queue = SQ1}); {ok, #consumer{checked_out = Checked0, next_msg_id = Next, @@ -1358,8 +1385,9 @@ update_or_remove_sub(ConsumerId, #consumer{lifetime = once, case maps:size(Checked) of 0 -> % we're done with this consumer - {maps:remove(ConsumerId, Cons), ServiceQueue, - [{demonitor, process, ConsumerId} | Effects]}; + % TODO: demonitor consumer pid but _only_ if there are no other + % monitors for this pid + {maps:remove(ConsumerId, Cons), ServiceQueue, Effects}; _ -> % there are unsettled items so need to keep around {maps:put(ConsumerId, Con, Cons), ServiceQueue, Effects} @@ -1379,7 +1407,6 @@ uniq_queue_in(Key, Queue) -> queue:in(Key, Queue) end. - update_consumer(ConsumerId, Meta, Spec, #state{consumer_strategy = default} = State0) -> %% general case, single active consumer off @@ -1553,18 +1580,18 @@ message_size(Msg) -> suspected_pids_for(Node, #state{consumers = Cons0, enqueuers = Enqs0, waiting_consumers = WaitingConsumers0}) -> - Cons = maps:fold(fun({_, P}, #consumer{suspected_down = true}, Acc) + Cons = maps:fold(fun({_, P}, #consumer{status = suspected_down}, Acc) when node(P) =:= Node -> [P | Acc]; (_, _, Acc) -> Acc end, [], Cons0), - Enqs = maps:fold(fun(P, #enqueuer{suspected_down = true}, Acc) + Enqs = maps:fold(fun(P, #enqueuer{status = suspected_down}, Acc) when node(P) =:= Node -> [P | Acc]; (_, _, Acc) -> Acc end, Cons, Enqs0), lists:foldl(fun({{_, P}, - #consumer{suspected_down = true}}, Acc) + #consumer{status = suspected_down}}, Acc) when node(P) =:= Node -> [P | Acc]; (_, Acc) -> Acc @@ -1814,10 +1841,11 @@ return_checked_out_test() -> {State0, [_, _]} = enq(1, 1, first, test_init(test)), {State1, [_Monitor, {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event}, - {aux, active} | _ - ]} = check(Cid, 2, State0), - % return - {_State2, _, [_]} = apply(meta(3), make_return(Cid, [MsgId]), State1), + {aux, active} | _ ]} = check_auto(Cid, 2, State0), + % returning immediately checks out the same message again + {_, ok, [{send_msg, _, {delivery, _, [{_, _}]}, ra_event}, + {aux, active}]} = + apply(meta(3), make_return(Cid, [MsgId]), State1), ok. return_auto_checked_out_test() -> @@ -1844,15 +1872,19 @@ cancelled_checkout_out_test() -> {State00, [_, _]} = enq(1, 1, first, test_init(test)), {State0, [_]} = enq(2, 2, second, State00), {State1, _} = check_auto(Cid, 2, State0), - % cancelled checkout should return all pending messages to queue + % cancelled checkout should not return pending messages to queue {State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), State1), ?assertEqual(1, maps:size(State2#state.messages)), - ?assertEqual(1, lqueue:len(State2#state.returns)), + ?assertEqual(0, lqueue:len(State2#state.returns)), - {State3, {dequeue, {0, {_, first}}, _}, _} = + {State3, {dequeue, empty}} = apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2), + %% settle + {State4, ok, _} = + apply(meta(4), make_settle(Cid, [0]), State3), + {_State, {dequeue, {_, {_, second}}, _}, _} = - apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), State3), + apply(meta(5), make_checkout(Cid, {dequeue, settled}, #{}), State4), ok. down_with_noproc_consumer_returns_unsettled_test() -> @@ -1914,16 +1946,6 @@ down_with_noproc_enqueuer_is_cleaned_up_test() -> ?assert(0 =:= maps:size(State1#state.enqueuers)), ok. -completed_consumer_yields_demonitor_effect_test() -> - Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()}, - {State0, [_, _]} = enq(1, 1, second, test_init(test)), - {State1, [{monitor, process, _} | _]} = check(Cid, 2, State0), - {_, Effects} = settle(Cid, 3, 0, State1), - ?ASSERT_EFF({demonitor, _, _}, Effects), - % release cursor for empty queue - ?ASSERT_EFF({release_cursor, 3, _}, Effects), - ok. - discarded_message_without_dead_letter_handler_is_removed_test() -> Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()}, {State0, [_, _]} = enq(1, 1, first, test_init(test)), @@ -2299,7 +2321,10 @@ single_active_consumer_test() -> ?assertEqual(1, length(Effects1)), % cancelling the active consumer - {State3, _, Effects2} = apply(meta(3), #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2), + {State3, _, Effects2} = apply(meta(3), + #checkout{spec = cancel, + consumer_id = {<<"ctag1">>, self()}}, + State2), % the second registered consumer is now the active one ?assertEqual(1, map_size(State3#state.consumers)), ?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)), @@ -2402,15 +2427,16 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti State), NewState end, - State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + State1 = lists:foldl(AddConsumer, State0, + [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), % simulate node goes down {State2, _, _} = apply(#{}, {down, self(), noconnection}, State1), % all the waiting consumers should be suspected down ?assertEqual(3, length(State2#state.waiting_consumers)), - lists:foreach(fun({_, #consumer{suspected_down = SuspectedDown}}) -> - ?assert(SuspectedDown) + lists:foreach(fun({_, #consumer{status = Status}}) -> + ?assert(Status == suspected_down) end, State2#state.waiting_consumers), % simulate node goes back up @@ -2418,8 +2444,8 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti % all the waiting consumers should be un-suspected ?assertEqual(3, length(State3#state.waiting_consumers)), - lists:foreach(fun({_, #consumer{suspected_down = SuspectedDown}}) -> - ?assertNot(SuspectedDown) + lists:foreach(fun({_, #consumer{status = Status}}) -> + ?assert(Status /= suspected_down) end, State3#state.waiting_consumers), ok. @@ -2504,7 +2530,8 @@ query_consumers_test() -> State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), Consumers0 = State1#state.consumers, Consumer = maps:get({<<"ctag2">>, self()}, Consumers0), - Consumers1 = maps:put({<<"ctag2">>, self()}, Consumer#consumer{suspected_down = true}, Consumers0), + Consumers1 = maps:put({<<"ctag2">>, self()}, + Consumer#consumer{status = suspected_down}, Consumers0), State2 = State1#state{consumers = Consumers1}, ?assertEqual(4, query_consumer_count(State2)), diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl new file mode 100644 index 0000000000..1290c97b23 --- /dev/null +++ b/test/queue_parallel_SUITE.erl @@ -0,0 +1,587 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2019 Pivotal Software, Inc. All rights reserved. +%% +%% +-module(queue_parallel_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("kernel/include/file.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +-define(TIMEOUT, 30000). + +-import(quorum_queue_utils, [wait_for_messages/2]). + +all() -> + [ + {group, parallel_tests} + ]. + +groups() -> + AllTests = [publish, + consume, + consume_first_empty, + consume_from_empty_queue, + consume_and_autoack, + subscribe, + subscribe_with_autoack, + consume_and_ack, + consume_and_multiple_ack, + subscribe_and_ack, + subscribe_and_multiple_ack, + subscribe_and_requeue_multiple_nack, + subscribe_and_nack, + subscribe_and_requeue_nack, + subscribe_and_multiple_nack, + consume_and_requeue_nack, + consume_and_nack, + consume_and_requeue_multiple_nack, + consume_and_multiple_nack, + basic_cancel, + purge, + basic_recover, + delete_immediately_by_resource + ], + [ + {parallel_tests, [], + [ + {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]}, + {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]}, + {quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]} + ]} + ]. + +suite() -> + [ + {timetrap, {minutes, 3}} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(classic_queue, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, true}]); +init_per_group(quorum_queue, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {queue_durable, true}]); +init_per_group(mirrored_queue, Config) -> + rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, + <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), + Config1 = rabbit_ct_helpers:set_config( + Config, [{is_mirrored, true}, + {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, true}]), + rabbit_ct_helpers:run_steps(Config1, []); +init_per_group(Group, Config) -> + case lists:member({group, Group}, all()) of + true -> + ClusterSize = 2, + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Group}, + {rmq_nodes_count, ClusterSize} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()); + false -> + rabbit_ct_helpers:run_steps(Config, []) + end. + +end_per_group(Group, Config) -> + case lists:member({group, Group}, all()) of + true -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()); + false -> + Config + end. + +init_per_testcase(Testcase, Config) -> + Group = proplists:get_value(name, ?config(tc_group_properties, Config)), + Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])), + Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])), + Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q}, + {queue_name_2, Q2}]), + rabbit_ct_helpers:testcase_started(Config1, Testcase). + +end_per_testcase(Testcase, Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}), + amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_2, Config)}), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +publish(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + +consume(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + +consume_first_empty(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + consume_empty(Ch, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + consume(Ch, QName, [<<"msg1">>]), + rabbit_ct_client_helpers:close_channel(Ch). + +consume_from_empty_queue(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + consume_empty(Ch, QName). + +consume_and_autoack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + consume(Ch, QName, true, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +subscribe(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + ?assertMatch(#'basic.qos_ok'{}, + amqp_channel:call(Ch, #'basic.qos'{global = false, + prefetch_count = 10})), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + + subscribe(Ch, QName, false), + receive_basic_deliver(false), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + + %% validate we can retrieve the consumers + Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), + [Consumer] = lists:filter(fun(Props) -> + Resource = proplists:get_value(queue_name, Props), + QName == Resource#resource.name + end, Consumers), + ?assert(is_pid(proplists:get_value(channel_pid, Consumer))), + ?assert(is_binary(proplists:get_value(consumer_tag, Consumer))), + ?assertEqual(true, proplists:get_value(ack_required, Consumer)), + ?assertEqual(10, proplists:get_value(prefetch_count, Consumer)), + ?assertEqual([], proplists:get_value(arguments, Consumer)), + + rabbit_ct_client_helpers:close_channel(Ch), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + +subscribe_with_autoack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + subscribe(Ch, QName, true), + receive_basic_deliver(false), + receive_basic_deliver(false), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +consume_and_ack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +consume_and_multiple_ack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = true}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +subscribe_and_ack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end. + +subscribe_and_multiple_ack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive_basic_deliver(false), + receive_basic_deliver(false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = true}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end. + +subscribe_and_requeue_multiple_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive_basic_deliver(false), + receive_basic_deliver(false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = true, + requeue = true}), + receive_basic_deliver(true), + receive_basic_deliver(true), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, _} -> + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1, + multiple = true}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end + end. + +consume_and_requeue_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]). + +consume_and_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = false}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +consume_and_requeue_multiple_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = true, + requeue = true}), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]). + +consume_and_multiple_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = true, + requeue = false}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +subscribe_and_requeue_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, _} -> + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end + end. + +subscribe_and_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = false}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end. + +subscribe_and_multiple_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive_basic_deliver(false), + receive_basic_deliver(false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = true, + requeue = false}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end. + +%% TODO test with single active +basic_cancel(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}), + Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + ?assertEqual([], lists:filter(fun(Props) -> + Resource = proplists:get_value(queue_name, Props), + QName == Resource#resource.name + end, Consumers)), + publish(Ch, QName, [<<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"2">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]) + after 5000 -> + exit(basic_deliver_timeout) + end. + +purge(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + [_] = consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]), + {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QName}), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]). + +basic_recover(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [_] = consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.recover'{requeue = true}), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + +delete_immediately_by_pid_fails(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."], + {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd), + ?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])), + + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + durable = Durable, + passive = true, + auto_delete = false, + arguments = Args})). + +delete_immediately_by_pid_succeeds(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."], + {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd), + ?assertEqual(match, re:run(Msg, ".*ok.*", [{capture, none}])), + + ?assertExit( + {{shutdown, {server_initiated_close, 404, _}}, _}, + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + durable = Durable, + passive = true, + auto_delete = false, + arguments = Args})). + +delete_immediately_by_resource(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + Cmd = ["eval", "rabbit_amqqueue:delete_immediately_by_resource([rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)])."], + ?assertEqual({ok, "ok\n"}, rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd)), + + ?assertExit( + {{shutdown, {server_initiated_close, 404, _}}, _}, + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + durable = Durable, + passive = true, + auto_delete = false, + arguments = Args})). + +%%%%%%%%%%%%%%%%%%%%%%%% +%% Test helpers +%%%%%%%%%%%%%%%%%%%%%%%% +declare_queue(Ch, Config, QName) -> + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, + arguments = Args, + durable = Durable}). + +publish(Ch, QName, Payloads) -> + [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload}) + || Payload <- Payloads]. + +consume(Ch, QName, Payloads) -> + consume(Ch, QName, false, Payloads). + +consume(Ch, QName, NoAck, Payloads) -> + [begin + {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} = + amqp_channel:call(Ch, #'basic.get'{queue = QName, + no_ack = NoAck}), + DTag + end || Payload <- Payloads]. + +consume_empty(Ch, QName) -> + ?assertMatch(#'basic.get_empty'{}, + amqp_channel:call(Ch, #'basic.get'{queue = QName})). + +subscribe(Ch, Queue, NoAck) -> + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, + no_ack = NoAck, + consumer_tag = <<"ctag">>}, + self()), + receive + #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + ok + end. + +receive_basic_deliver(Redelivered) -> + receive + {#'basic.deliver'{redelivered = R}, _} when R == Redelivered -> + ok + end. diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 02afba97c5..7d78558739 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -93,26 +93,7 @@ all_tests() -> restart_queue, restart_all_types, stop_start_rabbit_app, - publish, publish_and_restart, - consume, - consume_first_empty, - consume_from_empty_queue, - consume_and_autoack, - subscribe, - subscribe_with_autoack, - consume_and_ack, - consume_and_multiple_ack, - subscribe_and_ack, - subscribe_and_multiple_ack, - consume_and_requeue_nack, - consume_and_requeue_multiple_nack, - subscribe_and_requeue_nack, - subscribe_and_requeue_multiple_nack, - consume_and_nack, - consume_and_multiple_nack, - subscribe_and_nack, - subscribe_and_multiple_nack, subscribe_should_fail_when_global_qos_true, dead_letter_to_classic_queue, dead_letter_to_quorum_queue, @@ -120,14 +101,10 @@ all_tests() -> dead_letter_policy, cleanup_queue_state_on_channel_after_publish, cleanup_queue_state_on_channel_after_subscribe, - basic_cancel, - purge, sync_queue, cancel_sync_queue, - basic_recover, idempotent_recover, vhost_with_quorum_queue_is_deleted, - delete_immediately, delete_immediately_by_resource, consume_redelivery_count, subscribe_redelivery_count, @@ -612,18 +589,6 @@ stop_start_rabbit_app(Config) -> amqp_channel:call(Ch2, #'basic.get'{queue = CQ2, no_ack = false}), delete_queues(Ch2, [QQ1, QQ2, CQ1, CQ2]). -publish(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - publish(Ch, QQ), - Name = ra_name(QQ), - wait_for_messages_ready(Servers, Name, 1), - wait_for_messages_pending_ack(Servers, Name, 0). - publish_confirm(Ch, QName) -> publish(Ch, QName), amqp_channel:register_confirm_handler(Ch, self()), @@ -660,41 +625,6 @@ publish_and_restart(Config) -> wait_for_messages_ready(Servers, RaName, 2), wait_for_messages_pending_ack(Servers, RaName, 0). -consume(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_first_empty(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - consume_empty(Ch, QQ, false), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - consume(Ch, QQ, false), - rabbit_ct_client_helpers:close_channel(Ch). - consume_in_minority(Config) -> [Server0, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -711,63 +641,6 @@ consume_in_minority(Config) -> amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = false})). -consume_and_autoack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - consume(Ch, QQ, true), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0), - rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_from_empty_queue(Config) -> - Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - consume_empty(Ch, QQ, false). - -subscribe(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - qos(Ch, 10, false), - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive_basic_deliver(false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - %% validate we can retrieve the consumers - [Consumer] = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), - ct:pal("Consumer ~p", [Consumer]), - ?assert(is_pid(proplists:get_value(channel_pid, Consumer))), - ?assert(is_binary(proplists:get_value(consumer_tag, Consumer))), - ?assertEqual(true, proplists:get_value(ack_required, Consumer)), - ?assertEqual(10, proplists:get_value(prefetch_count, Consumer)), - ?assertEqual([], proplists:get_value(arguments, Consumer)), - rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0). - subscribe_should_fail_when_global_qos_true(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -789,338 +662,6 @@ subscribe_should_fail_when_global_qos_true(Config) -> end, ok. -subscribe_with_autoack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 2), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, true), - receive_basic_deliver(false), - receive_basic_deliver(false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0), - rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_and_ack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_and_multiple_ack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - _ = consume(Ch, QQ, false), - _ = consume(Ch, QQ, false), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, - multiple = true}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -subscribe_and_ack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end. - -subscribe_and_multiple_ack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive_basic_deliver(false), - receive_basic_deliver(false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, - multiple = true}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end. - -consume_and_requeue_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 2), - wait_for_messages_pending_ack(Servers, RaName, 0), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = false, - requeue = true}), - wait_for_messages_ready(Servers, RaName, 2), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_and_requeue_multiple_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - _ = consume(Ch, QQ, false), - _ = consume(Ch, QQ, false), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = true, - requeue = true}), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_and_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = false, - requeue = false}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_and_multiple_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - _ = consume(Ch, QQ, false), - _ = consume(Ch, QQ, false), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = true, - requeue = false}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -subscribe_and_requeue_multiple_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive_basic_deliver(false), - receive_basic_deliver(false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag, - redelivered = false}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = true, - requeue = true}), - receive_basic_deliver(true), - receive_basic_deliver(true), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag1, - redelivered = true}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1, - multiple = true}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end - end. - -subscribe_and_requeue_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag, - redelivered = false}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = false, - requeue = true}), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag1, - redelivered = true}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end - end. - -subscribe_and_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag, - redelivered = false}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = false, - requeue = false}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end. - -subscribe_and_multiple_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive_basic_deliver(false), - receive_basic_deliver(false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag, - redelivered = false}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = true, - requeue = false}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end. - dead_letter_to_classic_queue(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1500,51 +1041,6 @@ delete_declare(Config) -> wait_for_messages_ready(Servers, RaName, 0), wait_for_messages_pending_ack(Servers, RaName, 0). -basic_cancel(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive - {#'basic.deliver'{}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - [] = rpc:call(Server, ets, tab2list, [consumer_created]) - after 5000 -> - exit(basic_deliver_timeout) - end. - -purge(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 2), - wait_for_messages_pending_ack(Servers, RaName, 0), - _DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 1), - {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}), - wait_for_messages_pending_ack(Servers, RaName, 1), - wait_for_messages_ready(Servers, RaName, 0). - sync_queue(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1922,45 +1418,6 @@ reconnect_consumer_and_wait_channel_down(Config) -> wait_for_messages_ready(Servers, RaName, 1), wait_for_messages_pending_ack(Servers, RaName, 0). -basic_recover(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - _ = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.recover'{requeue = true}), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0). - -delete_immediately(Config) -> - Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}], - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, Args)), - - Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."], - {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd), - ?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])), - - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - amqp_channel:call(Ch, #'queue.declare'{queue = QQ, - durable = true, - passive = true, - auto_delete = false, - arguments = Args})). - delete_immediately_by_resource(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 293a597d14..ab1f44ab27 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -395,7 +395,8 @@ cancel_checkout(Config) -> {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1), {_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end), {ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3), - {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4), + {ok, F5} = rabbit_fifo_client:return(<<"tag">>, [0], F4), + {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5), ok. credit(Config) -> diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl index 6071aeb5a5..0b12f54c0b 100644 --- a/test/single_active_consumer_SUITE.erl +++ b/test/single_active_consumer_SUITE.erl @@ -33,12 +33,14 @@ groups() -> all_messages_go_to_one_consumer, fallback_to_another_consumer_when_first_one_is_cancelled, fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled, + fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks, amqp_exclusive_consume_fails_on_exclusive_consumer_queue ]}, {quorum_queue, [], [ all_messages_go_to_one_consumer, fallback_to_another_consumer_when_first_one_is_cancelled, - fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled + fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled, + fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks %% amqp_exclusive_consume_fails_on_exclusive_consumer_queue % Exclusive consume not implemented in QQ ]} ]. @@ -131,7 +133,7 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)], - {MessagesPerConsumer1, _} = wait_for_messages(MessageCount div 2), + {ok, {MessagesPerConsumer1, _}} = wait_for_messages(MessageCount div 2), FirstActiveConsumerInList = maps:keys(maps:filter(fun(_CTag, Count) -> Count > 0 end, MessagesPerConsumer1)), ?assertEqual(1, length(FirstActiveConsumerInList)), @@ -141,8 +143,8 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> {cancel_ok, FirstActiveConsumer} = wait_for_cancel_ok(), [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)], - - {MessagesPerConsumer2, _} = wait_for_messages(MessageCount div 2 - 1), + + {ok, {MessagesPerConsumer2, _}} = wait_for_messages(MessageCount div 2 - 1), SecondActiveConsumerInList = maps:keys(maps:filter( fun(CTag, Count) -> Count > 0 andalso CTag /= FirstActiveConsumer end, MessagesPerConsumer2) @@ -153,7 +155,7 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = SecondActiveConsumer}), amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}), - wait_for_messages(1), + ?assertMatch({ok, _}, wait_for_messages(1)), LastActiveConsumer = lists:nth(1, lists:delete(FirstActiveConsumer, lists:delete(SecondActiveConsumer, [CTag1, CTag2, CTag3]))), @@ -171,6 +173,54 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> amqp_connection:close(C), ok. +fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks(Config) -> + %% Let's ensure that although the consumer is cancelled we still keep the unacked + %% messages and accept acknowledgments on them. + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + {C, Ch} = connection_and_channel(Config), + Q = queue_declare(Ch, Config), + #'basic.consume_ok'{} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = false}, self()), + #'basic.consume_ok'{} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = false}, self()), + #'basic.consume_ok'{} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = false}, self()), + Consumers0 = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), + ?assertMatch([_, _, _], lists:filter(fun(Props) -> + Resource = proplists:get_value(queue_name, Props), + Q == Resource#resource.name + end, Consumers0)), + + Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = P}) || P <- [<<"msg1">>, <<"msg2">>]], + + {CTag, DTag1} = receive_deliver(), + {_CTag, DTag2} = receive_deliver(), + + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"0">>, <<"2">>]]), + #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}), + + receive + #'basic.cancel_ok'{consumer_tag = CTag} -> + ok + end, + Consumers1 = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), + ?assertMatch([_, _], lists:filter(fun(Props) -> + Resource = proplists:get_value(queue_name, Props), + Q == Resource#resource.name + end, Consumers1)), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"0">>, <<"2">>]]), + + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = P}) || P <- [<<"msg3">>, <<"msg4">>]], + + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"4">>, <<"0">>, <<"4">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag1}), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag2}), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"0">>, <<"2">>]]), + + amqp_connection:close(C), + ok. + fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config) -> {C, Ch} = connection_and_channel(Config), {C1, Ch1} = connection_and_channel(Config), @@ -276,13 +326,13 @@ wait_for_messages(ExpectedCount) -> wait_for_messages(ExpectedCount, {}). wait_for_messages(0, State) -> - State; -wait_for_messages(ExpectedCount, _) -> + {ok, State}; +wait_for_messages(ExpectedCount, State) -> receive {message, {MessagesPerConsumer, MessageCount}} -> wait_for_messages(ExpectedCount - 1, {MessagesPerConsumer, MessageCount}) after 5000 -> - throw(message_waiting_timeout) + {missing, ExpectedCount, State} end. wait_for_cancel_ok() -> @@ -292,3 +342,12 @@ wait_for_cancel_ok() -> after 5000 -> throw(consumer_cancel_ok_timeout) end. + +receive_deliver() -> + receive + {#'basic.deliver'{consumer_tag = CTag, + delivery_tag = DTag}, _} -> + {CTag, DTag} + after 5000 -> + exit(deliver_timeout) + end. |
