summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2019-02-12 17:01:13 +0000
committerGitHub <noreply@github.com>2019-02-12 17:01:13 +0000
commitb0dfe9352f083607d5d7346bd3962ba6c50cc03a (patch)
tree86dd3fa9004f9c79eda929ce16ac2c0e02fff0cf
parent4e4887f48b9b92c68e85df232d7db29122d6845b (diff)
parentfb97086d7bd1c309803515789eb91b2e4e254de5 (diff)
downloadrabbitmq-server-git-b0dfe9352f083607d5d7346bd3962ba6c50cc03a.tar.gz
Merge pull request #1874 from rabbitmq/queues-testing
Generic queue testing
-rw-r--r--src/rabbit_fifo.erl253
-rw-r--r--test/queue_parallel_SUITE.erl587
-rw-r--r--test/quorum_queue_SUITE.erl543
-rw-r--r--test/rabbit_fifo_SUITE.erl3
-rw-r--r--test/single_active_consumer_SUITE.erl75
5 files changed, 796 insertions, 665 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 1fe0b38cd9..4f548568be 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -184,7 +184,7 @@
%% command: `{consumer_credit, ReceiverDeliveryCount, Credit}'
credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data
lifetime = once :: once | auto,
- suspected_down = false :: boolean()
+ status = up :: up | suspected_down | cancelled
}).
-type consumer() :: #consumer{}.
@@ -193,7 +193,7 @@
{next_seqno = 1 :: msg_seqno(),
% out of order enqueues - sorted list
pending = [] :: [{msg_seqno(), ra_index(), raw_msg()}],
- suspected_down = false :: boolean()
+ status = up :: up | suspected_down
}).
-record(state,
@@ -426,10 +426,7 @@ apply(Meta, #checkout{spec = {dequeue, Settlement},
end
end;
apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
- {State, Effects} = cancel_consumer(ConsumerId, State0, []),
- % TODO: here we should really demonitor the pid but _only_ if it has no
- % other consumers or enqueuers. leaving a monitor in place isn't harmful
- % however
+ {State, Effects} = cancel_consumer(ConsumerId, State0, [], consumer_cancel),
checkout(Meta, State, Effects);
apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
consumer_id = {_, Pid} = ConsumerId},
@@ -466,26 +463,30 @@ apply(_, {down, ConsumerPid, noconnection},
% mark all consumers and enqueuers as suspected down
% and monitor the node so that we can find out the final state of the
% process at some later point
- {Cons, State, Effects1} = maps:fold(
- fun({_, P} = K,
- #consumer{checked_out = Checked0} = C,
- {Co, St0, Eff}) when node(P) =:= Node ->
- St = return_all(St0, Checked0),
- Credit = increase_credit(C, maps:size(Checked0)),
- Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff),
- {maps:put(K, C#consumer{suspected_down = true,
- credit = Credit,
- checked_out = #{}}, Co),
- St, Eff1};
- (K, C, {Co, St, Eff}) ->
- {maps:put(K, C, Co), St, Eff}
- end, {#{}, State0, []}, Cons0),
+ {Cons, State, Effects1} =
+ maps:fold(fun({_, P} = K,
+ #consumer{checked_out = Checked0} = C,
+ {Co, St0, Eff}) when (node(P) =:= Node) and
+ (C#consumer.status =/= cancelled)->
+ St = return_all(St0, Checked0),
+ Credit = increase_credit(C, maps:size(Checked0)),
+ Eff1 = ConsumerUpdateActiveFun(St, K, C, false,
+ suspected_down, Eff),
+ {maps:put(K,
+ C#consumer{status = suspected_down,
+ credit = Credit,
+ checked_out = #{}}, Co),
+ St, Eff1};
+ (K, C, {Co, St, Eff}) ->
+ {maps:put(K, C, Co), St, Eff}
+ end, {#{}, State0, []}, Cons0),
Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
- E#enqueuer{suspected_down = true};
+ E#enqueuer{status = suspected_down};
(_, E) -> E
end, Enqs0),
% mark waiting consumers as suspected if necessary
- WaitingConsumers = maybe_mark_suspect_waiting_consumers(Node, State0, true),
+ WaitingConsumers = update_waiting_consumer_status(Node, State0,
+ suspected_down),
Effects2 = case maps:size(Cons) of
0 ->
@@ -514,12 +515,12 @@ apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0,
DownConsumers = maps:keys(
maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)),
{State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) ->
- cancel_consumer(ConsumerId, S, E)
+ cancel_consumer(ConsumerId, S, E, down)
end, {State2, Effects1}, DownConsumers),
checkout(Meta, State, Effects);
apply(Meta, {nodeup, Node}, #state{consumers = Cons0,
- enqueuers = Enqs0,
- service_queue = SQ0} = State0) ->
+ enqueuers = Enqs0,
+ service_queue = SQ0} = State0) ->
%% A node we are monitoring has come back.
%% If we have suspected any processes of being
%% down we should now re-issue the monitors for them to detect if they're
@@ -528,20 +529,20 @@ apply(Meta, {nodeup, Node}, #state{consumers = Cons0,
|| P <- suspected_pids_for(Node, State0)],
% un-suspect waiting consumers when necessary
- WaitingConsumers = maybe_mark_suspect_waiting_consumers(Node, State0,
- false),
+ WaitingConsumers = update_waiting_consumer_status(Node, State0, up),
Enqs1 = maps:map(fun(P, E) when node(P) =:= Node ->
- E#enqueuer{suspected_down = false};
+ E#enqueuer{status = up};
(_, E) -> E
end, Enqs0),
ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
{Cons1, SQ, Effects} =
maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
- when node(P) =:= Node ->
+ when (node(P) =:= Node) and
+ (C#consumer.status =/= cancelled) ->
EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, up, EAcc),
update_or_remove_sub(
- ConsumerId, C#consumer{suspected_down = false},
+ ConsumerId, C#consumer{status = up},
CAcc, SQAcc, EAcc1);
(_, _, Acc) ->
Acc
@@ -587,26 +588,27 @@ handle_waiting_consumer_down(Pid,
State = State0#state{waiting_consumers = StillUp},
{Effects, State}.
-maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = default},
- _Suspected) ->
+update_waiting_consumer_status(_Node, #state{consumer_strategy = default},
+ _Status) ->
[];
-maybe_mark_suspect_waiting_consumers(_Node,
- #state{consumer_strategy = single_active,
- waiting_consumers = []},
- _Suspected) ->
+update_waiting_consumer_status(_Node,
+ #state{consumer_strategy = single_active,
+ waiting_consumers = []},
+ _Status) ->
[];
-maybe_mark_suspect_waiting_consumers(Node,
- #state{consumer_strategy = single_active,
- waiting_consumers = WaitingConsumers},
- Suspected) ->
+update_waiting_consumer_status(Node,
+ #state{consumer_strategy = single_active,
+ waiting_consumers = WaitingConsumers},
+ Status) ->
[begin
case node(P) of
Node ->
- {ConsumerId, Consumer#consumer{suspected_down = Suspected}};
+ {ConsumerId, Consumer#consumer{status = Status}};
_ ->
{ConsumerId, Consumer}
end
- end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers].
+ end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers,
+ Consumer#consumer.status =/= cancelled].
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #state{consumers = Cons,
@@ -739,12 +741,13 @@ query_consumers(#state{consumers = Consumers,
consumer_strategy = ConsumerStrategy } = State) ->
ActiveActivityStatusFun = case ConsumerStrategy of
default ->
- fun(_ConsumerId, #consumer{suspected_down = SuspectedDown}) ->
- case SuspectedDown of
- true ->
- {false, suspected_down};
- false ->
- {true, up}
+ fun(_ConsumerId,
+ #consumer{status = Status}) ->
+ case Status of
+ suspected_down ->
+ {false, Status};
+ _ ->
+ {true, Status}
end
end;
single_active ->
@@ -758,18 +761,24 @@ query_consumers(#state{consumers = Consumers,
end
end
end,
- FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta} = Consumer) ->
- {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer),
- {Pid, Tag,
- maps:get(ack, Meta, undefined),
- maps:get(prefetch, Meta, undefined),
- Active,
- ActivityStatus,
- maps:get(args, Meta, []),
- maps:get(username, Meta, undefined)}
- end, Consumers),
+ FromConsumers = maps:fold(fun (_, #consumer{status = cancelled}, Acc) ->
+ Acc;
+ ({Tag, Pid}, #consumer{meta = Meta} = Consumer, Acc) ->
+ {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer),
+ maps:put({Tag, Pid},
+ {Pid, Tag,
+ maps:get(ack, Meta, undefined),
+ maps:get(prefetch, Meta, undefined),
+ Active,
+ ActivityStatus,
+ maps:get(args, Meta, []),
+ maps:get(username, Meta, undefined)},
+ Acc)
+ end, #{}, Consumers),
FromWaitingConsumers =
- lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) ->
+ lists:foldl(fun ({_, #consumer{status = cancelled}}, Acc) ->
+ Acc;
+ ({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) ->
{Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer),
maps:put({Tag, Pid},
{Pid, Tag,
@@ -854,42 +863,42 @@ num_checked_out(#state{consumers = Cons}) ->
end, 0, maps:values(Cons)).
cancel_consumer(ConsumerId,
- #state{consumer_strategy = default} = State, Effects) ->
+ #state{consumer_strategy = default} = State, Effects, Reason) ->
%% general case, single active consumer off
- cancel_consumer0(ConsumerId, State, Effects);
+ cancel_consumer0(ConsumerId, State, Effects, Reason);
cancel_consumer(ConsumerId,
#state{consumer_strategy = single_active,
waiting_consumers = []} = State,
- Effects) ->
+ Effects, Reason) ->
%% single active consumer on, no consumers are waiting
- cancel_consumer0(ConsumerId, State, Effects);
+ cancel_consumer0(ConsumerId, State, Effects, Reason);
cancel_consumer(ConsumerId,
#state{consumers = Cons0,
consumer_strategy = single_active,
waiting_consumers = WaitingConsumers0} = State0,
- Effects0) ->
+ Effects0, Reason) ->
%% single active consumer on, consumers are waiting
case maps:take(ConsumerId, Cons0) of
- {#consumer{checked_out = Checked0}, _} ->
+ {Consumer, Cons1} ->
% The active consumer is to be removed
% Cancel it
- State1 = return_all(State0, Checked0),
- Effects1 = cancel_consumer_effects(ConsumerId, State1, Effects0),
+ {State1, Effects1} = maybe_return_all(ConsumerId, Consumer, Cons1, State0, Effects0, Reason),
+ Effects2 = cancel_consumer_effects(ConsumerId, State1, Effects1),
% Take another one from the waiting consumers and put it in consumers
[{NewActiveConsumerId, NewActiveConsumer}
| RemainingWaitingConsumers] = WaitingConsumers0,
- #state{service_queue = ServiceQueue} = State0,
+ #state{service_queue = ServiceQueue} = State1,
ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId,
NewActiveConsumer,
ServiceQueue),
- State = State1#state{consumers = #{NewActiveConsumerId =>
- NewActiveConsumer},
+ State = State1#state{consumers = maps:put(NewActiveConsumerId,
+ NewActiveConsumer, State1#state.consumers),
service_queue = ServiceQueue1,
waiting_consumers = RemainingWaitingConsumers},
- Effects2 = consumer_update_active_effects(State, NewActiveConsumerId,
+ Effects = consumer_update_active_effects(State, NewActiveConsumerId,
NewActiveConsumer, true,
- single_active, Effects1),
- {State, Effects2};
+ single_active, Effects2),
+ {State, Effects};
error ->
% The cancelled consumer is not the active one
% Just remove it from idle_consumers
@@ -914,23 +923,39 @@ consumer_update_active_effects(#state{queue_resource = QName },
[QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]}
| Effects].
-cancel_consumer0(ConsumerId,
- #state{consumers = C0} = S0, Effects0) ->
+cancel_consumer0(ConsumerId, #state{consumers = C0} = S0, Effects0, Reason) ->
case maps:take(ConsumerId, C0) of
- {#consumer{checked_out = Checked0}, Cons} ->
- S = return_all(S0, Checked0),
- Effects = cancel_consumer_effects(ConsumerId, S, Effects0),
- case maps:size(Cons) of
+ {Consumer, Cons1} ->
+ {S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0, Effects0, Reason),
+ Effects = cancel_consumer_effects(ConsumerId, S, Effects2),
+ case maps:size(S#state.consumers) of
0 ->
- {S#state{consumers = Cons}, [{aux, inactive} | Effects]};
+ {S, [{aux, inactive} | Effects]};
_ ->
- {S#state{consumers = Cons}, Effects}
+ {S, Effects}
end;
error ->
%% already removed: do nothing
{S0, Effects0}
end.
+maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1,
+ #state{consumers = C0,
+ service_queue = SQ0} = S0, Effects0, Reason) ->
+ case Reason of
+ consumer_cancel ->
+ {Cons, SQ, Effects1} =
+ update_or_remove_sub(ConsumerId,
+ Consumer#consumer{lifetime = once,
+ credit = 0,
+ status = cancelled},
+ C0, SQ0, Effects0),
+ {S0#state{consumers = Cons, service_queue = SQ}, Effects1};
+ down ->
+ S1 = return_all(S0, Checked0),
+ {S1#state{consumers = Cons1}, Effects0}
+ end.
+
apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
Bytes = message_size(RawMsg),
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of
@@ -1303,7 +1328,9 @@ checkout_one(#state{service_queue = SQ0,
%% can happen when draining
%% recurse without consumer on queue
checkout_one(InitState#state{service_queue = SQ1});
- {ok, #consumer{suspected_down = true}} ->
+ {ok, #consumer{status = cancelled}} ->
+ checkout_one(InitState#state{service_queue = SQ1});
+ {ok, #consumer{status = suspected_down}} ->
checkout_one(InitState#state{service_queue = SQ1});
{ok, #consumer{checked_out = Checked0,
next_msg_id = Next,
@@ -1358,8 +1385,9 @@ update_or_remove_sub(ConsumerId, #consumer{lifetime = once,
case maps:size(Checked) of
0 ->
% we're done with this consumer
- {maps:remove(ConsumerId, Cons), ServiceQueue,
- [{demonitor, process, ConsumerId} | Effects]};
+ % TODO: demonitor consumer pid but _only_ if there are no other
+ % monitors for this pid
+ {maps:remove(ConsumerId, Cons), ServiceQueue, Effects};
_ ->
% there are unsettled items so need to keep around
{maps:put(ConsumerId, Con, Cons), ServiceQueue, Effects}
@@ -1379,7 +1407,6 @@ uniq_queue_in(Key, Queue) ->
queue:in(Key, Queue)
end.
-
update_consumer(ConsumerId, Meta, Spec,
#state{consumer_strategy = default} = State0) ->
%% general case, single active consumer off
@@ -1553,18 +1580,18 @@ message_size(Msg) ->
suspected_pids_for(Node, #state{consumers = Cons0,
enqueuers = Enqs0,
waiting_consumers = WaitingConsumers0}) ->
- Cons = maps:fold(fun({_, P}, #consumer{suspected_down = true}, Acc)
+ Cons = maps:fold(fun({_, P}, #consumer{status = suspected_down}, Acc)
when node(P) =:= Node ->
[P | Acc];
(_, _, Acc) -> Acc
end, [], Cons0),
- Enqs = maps:fold(fun(P, #enqueuer{suspected_down = true}, Acc)
+ Enqs = maps:fold(fun(P, #enqueuer{status = suspected_down}, Acc)
when node(P) =:= Node ->
[P | Acc];
(_, _, Acc) -> Acc
end, Cons, Enqs0),
lists:foldl(fun({{_, P},
- #consumer{suspected_down = true}}, Acc)
+ #consumer{status = suspected_down}}, Acc)
when node(P) =:= Node ->
[P | Acc];
(_, Acc) -> Acc
@@ -1814,10 +1841,11 @@ return_checked_out_test() ->
{State0, [_, _]} = enq(1, 1, first, test_init(test)),
{State1, [_Monitor,
{send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event},
- {aux, active} | _
- ]} = check(Cid, 2, State0),
- % return
- {_State2, _, [_]} = apply(meta(3), make_return(Cid, [MsgId]), State1),
+ {aux, active} | _ ]} = check_auto(Cid, 2, State0),
+ % returning immediately checks out the same message again
+ {_, ok, [{send_msg, _, {delivery, _, [{_, _}]}, ra_event},
+ {aux, active}]} =
+ apply(meta(3), make_return(Cid, [MsgId]), State1),
ok.
return_auto_checked_out_test() ->
@@ -1844,15 +1872,19 @@ cancelled_checkout_out_test() ->
{State00, [_, _]} = enq(1, 1, first, test_init(test)),
{State0, [_]} = enq(2, 2, second, State00),
{State1, _} = check_auto(Cid, 2, State0),
- % cancelled checkout should return all pending messages to queue
+ % cancelled checkout should not return pending messages to queue
{State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), State1),
?assertEqual(1, maps:size(State2#state.messages)),
- ?assertEqual(1, lqueue:len(State2#state.returns)),
+ ?assertEqual(0, lqueue:len(State2#state.returns)),
- {State3, {dequeue, {0, {_, first}}, _}, _} =
+ {State3, {dequeue, empty}} =
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2),
+ %% settle
+ {State4, ok, _} =
+ apply(meta(4), make_settle(Cid, [0]), State3),
+
{_State, {dequeue, {_, {_, second}}, _}, _} =
- apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), State3),
+ apply(meta(5), make_checkout(Cid, {dequeue, settled}, #{}), State4),
ok.
down_with_noproc_consumer_returns_unsettled_test() ->
@@ -1914,16 +1946,6 @@ down_with_noproc_enqueuer_is_cleaned_up_test() ->
?assert(0 =:= maps:size(State1#state.enqueuers)),
ok.
-completed_consumer_yields_demonitor_effect_test() ->
- Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()},
- {State0, [_, _]} = enq(1, 1, second, test_init(test)),
- {State1, [{monitor, process, _} | _]} = check(Cid, 2, State0),
- {_, Effects} = settle(Cid, 3, 0, State1),
- ?ASSERT_EFF({demonitor, _, _}, Effects),
- % release cursor for empty queue
- ?ASSERT_EFF({release_cursor, 3, _}, Effects),
- ok.
-
discarded_message_without_dead_letter_handler_is_removed_test() ->
Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()},
{State0, [_, _]} = enq(1, 1, first, test_init(test)),
@@ -2299,7 +2321,10 @@ single_active_consumer_test() ->
?assertEqual(1, length(Effects1)),
% cancelling the active consumer
- {State3, _, Effects2} = apply(meta(3), #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2),
+ {State3, _, Effects2} = apply(meta(3),
+ #checkout{spec = cancel,
+ consumer_id = {<<"ctag1">>, self()}},
+ State2),
% the second registered consumer is now the active one
?assertEqual(1, map_size(State3#state.consumers)),
?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)),
@@ -2402,15 +2427,16 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti
State),
NewState
end,
- State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+ State1 = lists:foldl(AddConsumer, State0,
+ [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
% simulate node goes down
{State2, _, _} = apply(#{}, {down, self(), noconnection}, State1),
% all the waiting consumers should be suspected down
?assertEqual(3, length(State2#state.waiting_consumers)),
- lists:foreach(fun({_, #consumer{suspected_down = SuspectedDown}}) ->
- ?assert(SuspectedDown)
+ lists:foreach(fun({_, #consumer{status = Status}}) ->
+ ?assert(Status == suspected_down)
end, State2#state.waiting_consumers),
% simulate node goes back up
@@ -2418,8 +2444,8 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti
% all the waiting consumers should be un-suspected
?assertEqual(3, length(State3#state.waiting_consumers)),
- lists:foreach(fun({_, #consumer{suspected_down = SuspectedDown}}) ->
- ?assertNot(SuspectedDown)
+ lists:foreach(fun({_, #consumer{status = Status}}) ->
+ ?assert(Status /= suspected_down)
end, State3#state.waiting_consumers),
ok.
@@ -2504,7 +2530,8 @@ query_consumers_test() ->
State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
Consumers0 = State1#state.consumers,
Consumer = maps:get({<<"ctag2">>, self()}, Consumers0),
- Consumers1 = maps:put({<<"ctag2">>, self()}, Consumer#consumer{suspected_down = true}, Consumers0),
+ Consumers1 = maps:put({<<"ctag2">>, self()},
+ Consumer#consumer{status = suspected_down}, Consumers0),
State2 = State1#state{consumers = Consumers1},
?assertEqual(4, query_consumer_count(State2)),
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl
new file mode 100644
index 0000000000..1290c97b23
--- /dev/null
+++ b/test/queue_parallel_SUITE.erl
@@ -0,0 +1,587 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2019 Pivotal Software, Inc. All rights reserved.
+%%
+%%
+-module(queue_parallel_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("kernel/include/file.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+-define(TIMEOUT, 30000).
+
+-import(quorum_queue_utils, [wait_for_messages/2]).
+
+all() ->
+ [
+ {group, parallel_tests}
+ ].
+
+groups() ->
+ AllTests = [publish,
+ consume,
+ consume_first_empty,
+ consume_from_empty_queue,
+ consume_and_autoack,
+ subscribe,
+ subscribe_with_autoack,
+ consume_and_ack,
+ consume_and_multiple_ack,
+ subscribe_and_ack,
+ subscribe_and_multiple_ack,
+ subscribe_and_requeue_multiple_nack,
+ subscribe_and_nack,
+ subscribe_and_requeue_nack,
+ subscribe_and_multiple_nack,
+ consume_and_requeue_nack,
+ consume_and_nack,
+ consume_and_requeue_multiple_nack,
+ consume_and_multiple_nack,
+ basic_cancel,
+ purge,
+ basic_recover,
+ delete_immediately_by_resource
+ ],
+ [
+ {parallel_tests, [],
+ [
+ {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
+ {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
+ {quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}
+ ]}
+ ].
+
+suite() ->
+ [
+ {timetrap, {minutes, 3}}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(classic_queue, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, true}]);
+init_per_group(quorum_queue, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ {queue_durable, true}]);
+init_per_group(mirrored_queue, Config) ->
+ rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
+ <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config, [{is_mirrored, true},
+ {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, true}]),
+ rabbit_ct_helpers:run_steps(Config1, []);
+init_per_group(Group, Config) ->
+ case lists:member({group, Group}, all()) of
+ true ->
+ ClusterSize = 2,
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, ClusterSize}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
+ false ->
+ rabbit_ct_helpers:run_steps(Config, [])
+ end.
+
+end_per_group(Group, Config) ->
+ case lists:member({group, Group}, all()) of
+ true ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps());
+ false ->
+ Config
+ end.
+
+init_per_testcase(Testcase, Config) ->
+ Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
+ Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
+ Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q},
+ {queue_name_2, Q2}]),
+ rabbit_ct_helpers:testcase_started(Config1, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_2, Config)}),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+publish(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]).
+
+consume(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ consume(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]).
+
+consume_first_empty(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ consume_empty(Ch, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ consume(Ch, QName, [<<"msg1">>]),
+ rabbit_ct_client_helpers:close_channel(Ch).
+
+consume_from_empty_queue(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ consume_empty(Ch, QName).
+
+consume_and_autoack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ consume(Ch, QName, true, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+
+subscribe(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ ?assertMatch(#'basic.qos_ok'{},
+ amqp_channel:call(Ch, #'basic.qos'{global = false,
+ prefetch_count = 10})),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+
+ subscribe(Ch, QName, false),
+ receive_basic_deliver(false),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+
+ %% validate we can retrieve the consumers
+ Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
+ [Consumer] = lists:filter(fun(Props) ->
+ Resource = proplists:get_value(queue_name, Props),
+ QName == Resource#resource.name
+ end, Consumers),
+ ?assert(is_pid(proplists:get_value(channel_pid, Consumer))),
+ ?assert(is_binary(proplists:get_value(consumer_tag, Consumer))),
+ ?assertEqual(true, proplists:get_value(ack_required, Consumer)),
+ ?assertEqual(10, proplists:get_value(prefetch_count, Consumer)),
+ ?assertEqual([], proplists:get_value(arguments, Consumer)),
+
+ rabbit_ct_client_helpers:close_channel(Ch),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]).
+
+subscribe_with_autoack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>]),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
+ subscribe(Ch, QName, true),
+ receive_basic_deliver(false),
+ receive_basic_deliver(false),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+
+consume_and_ack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+
+consume_and_multiple_ack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = true}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+
+subscribe_and_ack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
+ end.
+
+subscribe_and_multiple_ack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive_basic_deliver(false),
+ receive_basic_deliver(false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = true}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
+ end.
+
+subscribe_and_requeue_multiple_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive_basic_deliver(false),
+ receive_basic_deliver(false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = true,
+ requeue = true}),
+ receive_basic_deliver(true),
+ receive_basic_deliver(true),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true}, _} ->
+ wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1,
+ multiple = true}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
+ end
+ end.
+
+consume_and_requeue_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>]),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
+ [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true}),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]).
+
+consume_and_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+
+consume_and_requeue_multiple_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = true,
+ requeue = true}),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]).
+
+consume_and_multiple_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = true,
+ requeue = false}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+
+subscribe_and_requeue_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true}),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true}, _} ->
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
+ end
+ end.
+
+subscribe_and_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
+ end.
+
+subscribe_and_multiple_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive_basic_deliver(false),
+ receive_basic_deliver(false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = true,
+ requeue = false}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
+ end.
+
+%% TODO test with single active
+basic_cancel(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}),
+ Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ ?assertEqual([], lists:filter(fun(Props) ->
+ Resource = proplists:get_value(queue_name, Props),
+ QName == Resource#resource.name
+ end, Consumers)),
+ publish(Ch, QName, [<<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"2">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]])
+ after 5000 ->
+ exit(basic_deliver_timeout)
+ end.
+
+purge(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>]),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
+ [_] = consume(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]),
+ {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QName}),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]).
+
+basic_recover(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [_] = consume(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.recover'{requeue = true}),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]).
+
+delete_immediately_by_pid_fails(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."],
+ {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd),
+ ?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])),
+
+ ?assertEqual({'queue.declare_ok', QName, 0, 0},
+ amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ durable = Durable,
+ passive = true,
+ auto_delete = false,
+ arguments = Args})).
+
+delete_immediately_by_pid_succeeds(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."],
+ {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd),
+ ?assertEqual(match, re:run(Msg, ".*ok.*", [{capture, none}])),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 404, _}}, _},
+ amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ durable = Durable,
+ passive = true,
+ auto_delete = false,
+ arguments = Args})).
+
+delete_immediately_by_resource(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ Cmd = ["eval", "rabbit_amqqueue:delete_immediately_by_resource([rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)])."],
+ ?assertEqual({ok, "ok\n"}, rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd)),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 404, _}}, _},
+ amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ durable = Durable,
+ passive = true,
+ auto_delete = false,
+ arguments = Args})).
+
+%%%%%%%%%%%%%%%%%%%%%%%%
+%% Test helpers
+%%%%%%%%%%%%%%%%%%%%%%%%
+declare_queue(Ch, Config, QName) ->
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ arguments = Args,
+ durable = Durable}).
+
+publish(Ch, QName, Payloads) ->
+ [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload})
+ || Payload <- Payloads].
+
+consume(Ch, QName, Payloads) ->
+ consume(Ch, QName, false, Payloads).
+
+consume(Ch, QName, NoAck, Payloads) ->
+ [begin
+ {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QName,
+ no_ack = NoAck}),
+ DTag
+ end || Payload <- Payloads].
+
+consume_empty(Ch, QName) ->
+ ?assertMatch(#'basic.get_empty'{},
+ amqp_channel:call(Ch, #'basic.get'{queue = QName})).
+
+subscribe(Ch, Queue, NoAck) ->
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
+ no_ack = NoAck,
+ consumer_tag = <<"ctag">>},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ ok
+ end.
+
+receive_basic_deliver(Redelivered) ->
+ receive
+ {#'basic.deliver'{redelivered = R}, _} when R == Redelivered ->
+ ok
+ end.
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 02afba97c5..7d78558739 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -93,26 +93,7 @@ all_tests() ->
restart_queue,
restart_all_types,
stop_start_rabbit_app,
- publish,
publish_and_restart,
- consume,
- consume_first_empty,
- consume_from_empty_queue,
- consume_and_autoack,
- subscribe,
- subscribe_with_autoack,
- consume_and_ack,
- consume_and_multiple_ack,
- subscribe_and_ack,
- subscribe_and_multiple_ack,
- consume_and_requeue_nack,
- consume_and_requeue_multiple_nack,
- subscribe_and_requeue_nack,
- subscribe_and_requeue_multiple_nack,
- consume_and_nack,
- consume_and_multiple_nack,
- subscribe_and_nack,
- subscribe_and_multiple_nack,
subscribe_should_fail_when_global_qos_true,
dead_letter_to_classic_queue,
dead_letter_to_quorum_queue,
@@ -120,14 +101,10 @@ all_tests() ->
dead_letter_policy,
cleanup_queue_state_on_channel_after_publish,
cleanup_queue_state_on_channel_after_subscribe,
- basic_cancel,
- purge,
sync_queue,
cancel_sync_queue,
- basic_recover,
idempotent_recover,
vhost_with_quorum_queue_is_deleted,
- delete_immediately,
delete_immediately_by_resource,
consume_redelivery_count,
subscribe_redelivery_count,
@@ -612,18 +589,6 @@ stop_start_rabbit_app(Config) ->
amqp_channel:call(Ch2, #'basic.get'{queue = CQ2, no_ack = false}),
delete_queues(Ch2, [QQ1, QQ2, CQ1, CQ2]).
-publish(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
- publish(Ch, QQ),
- Name = ra_name(QQ),
- wait_for_messages_ready(Servers, Name, 1),
- wait_for_messages_pending_ack(Servers, Name, 0).
-
publish_confirm(Ch, QName) ->
publish(Ch, QName),
amqp_channel:register_confirm_handler(Ch, self()),
@@ -660,41 +625,6 @@ publish_and_restart(Config) ->
wait_for_messages_ready(Servers, RaName, 2),
wait_for_messages_pending_ack(Servers, RaName, 0).
-consume(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- rabbit_ct_client_helpers:close_channel(Ch),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-consume_first_empty(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- consume_empty(Ch, QQ, false),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- consume(Ch, QQ, false),
- rabbit_ct_client_helpers:close_channel(Ch).
-
consume_in_minority(Config) ->
[Server0, Server1, Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -711,63 +641,6 @@ consume_in_minority(Config) ->
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = false})).
-consume_and_autoack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- consume(Ch, QQ, true),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- rabbit_ct_client_helpers:close_channel(Ch),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-consume_from_empty_queue(Config) ->
- Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- consume_empty(Ch, QQ, false).
-
-subscribe(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- qos(Ch, 10, false),
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive_basic_deliver(false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- %% validate we can retrieve the consumers
- [Consumer] = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
- ct:pal("Consumer ~p", [Consumer]),
- ?assert(is_pid(proplists:get_value(channel_pid, Consumer))),
- ?assert(is_binary(proplists:get_value(consumer_tag, Consumer))),
- ?assertEqual(true, proplists:get_value(ack_required, Consumer)),
- ?assertEqual(10, proplists:get_value(prefetch_count, Consumer)),
- ?assertEqual([], proplists:get_value(arguments, Consumer)),
- rabbit_ct_client_helpers:close_channel(Ch),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
subscribe_should_fail_when_global_qos_true(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -789,338 +662,6 @@ subscribe_should_fail_when_global_qos_true(Config) ->
end,
ok.
-subscribe_with_autoack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 2),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, true),
- receive_basic_deliver(false),
- receive_basic_deliver(false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- rabbit_ct_client_helpers:close_channel(Ch),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-consume_and_ack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- DeliveryTag = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-consume_and_multiple_ack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 3),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- _ = consume(Ch, QQ, false),
- _ = consume(Ch, QQ, false),
- DeliveryTag = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 3),
- amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
- multiple = true}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-subscribe_and_ack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0)
- end.
-
-subscribe_and_multiple_ack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 3),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive_basic_deliver(false),
- receive_basic_deliver(false),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 3),
- amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
- multiple = true}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0)
- end.
-
-consume_and_requeue_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 2),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- DeliveryTag = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = false,
- requeue = true}),
- wait_for_messages_ready(Servers, RaName, 2),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-consume_and_requeue_multiple_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 3),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- _ = consume(Ch, QQ, false),
- _ = consume(Ch, QQ, false),
- DeliveryTag = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 3),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = true,
- requeue = true}),
- wait_for_messages_ready(Servers, RaName, 3),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-consume_and_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- DeliveryTag = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = false,
- requeue = false}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-consume_and_multiple_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 3),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- _ = consume(Ch, QQ, false),
- _ = consume(Ch, QQ, false),
- DeliveryTag = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 3),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = true,
- requeue = false}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-subscribe_and_requeue_multiple_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 3),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive_basic_deliver(false),
- receive_basic_deliver(false),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag,
- redelivered = false}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 3),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = true,
- requeue = true}),
- receive_basic_deliver(true),
- receive_basic_deliver(true),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag1,
- redelivered = true}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 3),
- amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1,
- multiple = true}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0)
- end
- end.
-
-subscribe_and_requeue_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag,
- redelivered = false}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = false,
- requeue = true}),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag1,
- redelivered = true}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0)
- end
- end.
-
-subscribe_and_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag,
- redelivered = false}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = false,
- requeue = false}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0)
- end.
-
-subscribe_and_multiple_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 3),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive_basic_deliver(false),
- receive_basic_deliver(false),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag,
- redelivered = false}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 3),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = true,
- requeue = false}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0)
- end.
-
dead_letter_to_classic_queue(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1500,51 +1041,6 @@ delete_declare(Config) ->
wait_for_messages_ready(Servers, RaName, 0),
wait_for_messages_pending_ack(Servers, RaName, 0).
-basic_cancel(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive
- {#'basic.deliver'{}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- [] = rpc:call(Server, ets, tab2list, [consumer_created])
- after 5000 ->
- exit(basic_deliver_timeout)
- end.
-
-purge(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 2),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- _DeliveryTag = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- wait_for_messages_ready(Servers, RaName, 0).
-
sync_queue(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1922,45 +1418,6 @@ reconnect_consumer_and_wait_channel_down(Config) ->
wait_for_messages_ready(Servers, RaName, 1),
wait_for_messages_pending_ack(Servers, RaName, 0).
-basic_recover(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- _ = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.recover'{requeue = true}),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-delete_immediately(Config) ->
- Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}],
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, Args)),
-
- Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."],
- {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd),
- ?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])),
-
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- amqp_channel:call(Ch, #'queue.declare'{queue = QQ,
- durable = true,
- passive = true,
- auto_delete = false,
- arguments = Args})).
-
delete_immediately_by_resource(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 293a597d14..ab1f44ab27 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -395,7 +395,8 @@ cancel_checkout(Config) ->
{ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1),
{_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end),
{ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3),
- {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4),
+ {ok, F5} = rabbit_fifo_client:return(<<"tag">>, [0], F4),
+ {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5),
ok.
credit(Config) ->
diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl
index 6071aeb5a5..0b12f54c0b 100644
--- a/test/single_active_consumer_SUITE.erl
+++ b/test/single_active_consumer_SUITE.erl
@@ -33,12 +33,14 @@ groups() ->
all_messages_go_to_one_consumer,
fallback_to_another_consumer_when_first_one_is_cancelled,
fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled,
+ fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks,
amqp_exclusive_consume_fails_on_exclusive_consumer_queue
]},
{quorum_queue, [], [
all_messages_go_to_one_consumer,
fallback_to_another_consumer_when_first_one_is_cancelled,
- fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled
+ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled,
+ fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks
%% amqp_exclusive_consume_fails_on_exclusive_consumer_queue % Exclusive consume not implemented in QQ
]}
].
@@ -131,7 +133,7 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
[amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)],
- {MessagesPerConsumer1, _} = wait_for_messages(MessageCount div 2),
+ {ok, {MessagesPerConsumer1, _}} = wait_for_messages(MessageCount div 2),
FirstActiveConsumerInList = maps:keys(maps:filter(fun(_CTag, Count) -> Count > 0 end, MessagesPerConsumer1)),
?assertEqual(1, length(FirstActiveConsumerInList)),
@@ -141,8 +143,8 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
{cancel_ok, FirstActiveConsumer} = wait_for_cancel_ok(),
[amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)],
-
- {MessagesPerConsumer2, _} = wait_for_messages(MessageCount div 2 - 1),
+
+ {ok, {MessagesPerConsumer2, _}} = wait_for_messages(MessageCount div 2 - 1),
SecondActiveConsumerInList = maps:keys(maps:filter(
fun(CTag, Count) -> Count > 0 andalso CTag /= FirstActiveConsumer end,
MessagesPerConsumer2)
@@ -153,7 +155,7 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
#'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = SecondActiveConsumer}),
amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}),
- wait_for_messages(1),
+ ?assertMatch({ok, _}, wait_for_messages(1)),
LastActiveConsumer = lists:nth(1, lists:delete(FirstActiveConsumer, lists:delete(SecondActiveConsumer, [CTag1, CTag2, CTag3]))),
@@ -171,6 +173,54 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
amqp_connection:close(C),
ok.
+fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks(Config) ->
+ %% Let's ensure that although the consumer is cancelled we still keep the unacked
+ %% messages and accept acknowledgments on them.
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ {C, Ch} = connection_and_channel(Config),
+ Q = queue_declare(Ch, Config),
+ #'basic.consume_ok'{} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = false}, self()),
+ #'basic.consume_ok'{} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = false}, self()),
+ #'basic.consume_ok'{} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = false}, self()),
+ Consumers0 = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
+ ?assertMatch([_, _, _], lists:filter(fun(Props) ->
+ Resource = proplists:get_value(queue_name, Props),
+ Q == Resource#resource.name
+ end, Consumers0)),
+
+ Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = P}) || P <- [<<"msg1">>, <<"msg2">>]],
+
+ {CTag, DTag1} = receive_deliver(),
+ {_CTag, DTag2} = receive_deliver(),
+
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"0">>, <<"2">>]]),
+ #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}),
+
+ receive
+ #'basic.cancel_ok'{consumer_tag = CTag} ->
+ ok
+ end,
+ Consumers1 = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
+ ?assertMatch([_, _], lists:filter(fun(Props) ->
+ Resource = proplists:get_value(queue_name, Props),
+ Q == Resource#resource.name
+ end, Consumers1)),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"0">>, <<"2">>]]),
+
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = P}) || P <- [<<"msg3">>, <<"msg4">>]],
+
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"4">>, <<"0">>, <<"4">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag1}),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag2}),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"0">>, <<"2">>]]),
+
+ amqp_connection:close(C),
+ ok.
+
fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config) ->
{C, Ch} = connection_and_channel(Config),
{C1, Ch1} = connection_and_channel(Config),
@@ -276,13 +326,13 @@ wait_for_messages(ExpectedCount) ->
wait_for_messages(ExpectedCount, {}).
wait_for_messages(0, State) ->
- State;
-wait_for_messages(ExpectedCount, _) ->
+ {ok, State};
+wait_for_messages(ExpectedCount, State) ->
receive
{message, {MessagesPerConsumer, MessageCount}} ->
wait_for_messages(ExpectedCount - 1, {MessagesPerConsumer, MessageCount})
after 5000 ->
- throw(message_waiting_timeout)
+ {missing, ExpectedCount, State}
end.
wait_for_cancel_ok() ->
@@ -292,3 +342,12 @@ wait_for_cancel_ok() ->
after 5000 ->
throw(consumer_cancel_ok_timeout)
end.
+
+receive_deliver() ->
+ receive
+ {#'basic.deliver'{consumer_tag = CTag,
+ delivery_tag = DTag}, _} ->
+ {CTag, DTag}
+ after 5000 ->
+ exit(deliver_timeout)
+ end.