summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2018-12-14 14:09:32 +0100
committerGitHub <noreply@github.com>2018-12-14 14:09:32 +0100
commit685f8ea30c0654f9264ec3b25a9ccceb1a2e9a3e (patch)
tree454777c224844e7a8440b37cf5340edf924bec95 /test
parentb914cdb7f539343d13eef4522608559d69edd828 (diff)
parent2376e9b465c5bac53f388fc25e83265ddce95919 (diff)
downloadrabbitmq-server-git-685f8ea30c0654f9264ec3b25a9ccceb1a2e9a3e.tar.gz
Merge pull request #1801 from rabbitmq/qq-list-consumers
Implement consumer listing for quorum queues
Diffstat (limited to 'test')
-rw-r--r--test/quorum_queue_SUITE.erl9
-rw-r--r--test/rabbit_fifo_SUITE.erl50
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl101
3 files changed, 101 insertions, 59 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 523506b0e0..c94fb9ddab 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -709,6 +709,7 @@ subscribe(Config) ->
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ qos(Ch, 10, false),
RaName = ra_name(QQ),
publish(Ch, QQ),
wait_for_messages_ready(Servers, RaName, 1),
@@ -717,6 +718,14 @@ subscribe(Config) ->
receive_basic_deliver(false),
wait_for_messages_ready(Servers, RaName, 0),
wait_for_messages_pending_ack(Servers, RaName, 1),
+ %% validate we can retrieve the consumers
+ [Consumer] = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
+ ct:pal("Consumer ~p", [Consumer]),
+ ?assert(is_pid(proplists:get_value(channel_pid, Consumer))),
+ ?assert(is_binary(proplists:get_value(consumer_tag, Consumer))),
+ ?assertEqual(true, proplists:get_value(ack_required, Consumer)),
+ ?assertEqual(10, proplists:get_value(prefetch_count, Consumer)),
+ ?assertEqual([], proplists:get_value(arguments, Consumer)),
rabbit_ct_client_helpers:close_channel(Ch),
wait_for_messages_ready(Servers, RaName, 1),
wait_for_messages_pending_ack(Servers, RaName, 0).
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 56608e9af3..3263a733a9 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -4,6 +4,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
all() ->
[
@@ -50,11 +51,16 @@ end_per_group(_, Config) ->
Config.
init_per_testcase(TestCase, Config) ->
+ meck:new(rabbit_quorum_queue, [passthrough]),
+ meck:expect(rabbit_quorum_queue, update_metrics, fun (_, _) -> ok end),
+ meck:expect(rabbit_quorum_queue, cancel_consumer_handler,
+ fun (_, _) -> ok end),
ra_server_sup:remove_all(),
ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"),
ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"),
+ ClusterName = rabbit_misc:r("/", queue, atom_to_binary(TestCase, utf8)),
[
- {cluster_name, TestCase},
+ {cluster_name, ClusterName},
{uid, atom_to_binary(TestCase, utf8)},
{node_id, {TestCase, node()}},
{uid2, atom_to_binary(ServerName2, utf8)},
@@ -63,6 +69,10 @@ init_per_testcase(TestCase, Config) ->
{node_id3, {ServerName3, node()}}
| Config].
+end_per_testcase(_, Config) ->
+ meck:unload(),
+ Config.
+
basics(Config) ->
ClusterName = ?config(cluster_name, Config),
ServerId = ?config(node_id, Config),
@@ -70,7 +80,7 @@ basics(Config) ->
CustomerTag = UId,
ok = start_cluster(ClusterName, [ServerId]),
FState0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, FState0),
+ {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, undefined, FState0),
ra_log_wal:force_roll_over(ra_log_wal),
% create segment the segment will trigger a snapshot
@@ -167,7 +177,7 @@ duplicate_delivery(Config) ->
ServerId = ?config(node_id, Config),
ok = start_cluster(ClusterName, [ServerId]),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
{ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1),
Fun = fun Loop(S0) ->
receive
@@ -201,7 +211,7 @@ usage(Config) ->
ServerId = ?config(node_id, Config),
ok = start_cluster(ClusterName, [ServerId]),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
{ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1),
{ok, F3} = rabbit_fifo_client:enqueue(corr2, msg2, F2),
{_, _, _} = process_ra_events(F3, 50),
@@ -256,7 +266,7 @@ detects_lost_delivery(Config) ->
F000 = rabbit_fifo_client:init(ClusterName, [ServerId]),
{ok, F00} = rabbit_fifo_client:enqueue(msg1, F000),
{_, _, F0} = process_ra_events(F00, 100),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
{ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
{ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
% lose first delivery
@@ -285,7 +295,8 @@ returns_after_down(Config) ->
Self = self(),
_Pid = spawn(fun () ->
F = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10, F),
+ {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10,
+ undefined, F),
Self ! checkout_done
end),
receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end,
@@ -327,8 +338,11 @@ handles_reject_notification(Config) ->
CId = {UId1, self()},
ok = start_cluster(ClusterName, [ServerId1, ServerId2]),
- _ = ra:process_command(ServerId1, {checkout,
- {auto, 10, simple_prefetch}, CId}),
+ _ = ra:process_command(ServerId1,
+ rabbit_fifo:make_checkout(
+ CId,
+ {auto, 10, simple_prefetch},
+ #{})),
% reverse order - should try the first node in the list first
F0 = rabbit_fifo_client:init(ClusterName, [ServerId2, ServerId1]),
{ok, F1} = rabbit_fifo_client:enqueue(one, F0),
@@ -346,20 +360,21 @@ discard(Config) ->
ServerId = ?config(node_id, Config),
UId = ?config(uid, Config),
ClusterName = ?config(cluster_name, Config),
- Conf = #{cluster_name => ClusterName,
+ Conf = #{cluster_name => ClusterName#resource.name,
id => ServerId,
uid => UId,
log_init_args => #{data_dir => PrivDir, uid => UId},
initial_member => [],
machine => {module, rabbit_fifo,
- #{dead_letter_handler =>
+ #{queue_resource => discard,
+ dead_letter_handler =>
{?MODULE, dead_letter_handler, [self()]}}}},
_ = ra:start_server(Conf),
ok = ra:trigger_election(ServerId),
_ = ra:members(ServerId),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
{ok, F2} = rabbit_fifo_client:enqueue(msg1, F1),
F3 = discard_next_delivery(F2, 500),
{ok, empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3),
@@ -380,7 +395,7 @@ cancel_checkout(Config) ->
ok = start_cluster(ClusterName, [ServerId]),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
{ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
- {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, F1),
+ {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1),
{_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end),
{ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3),
{ok, {_, {_, m1}}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4),
@@ -395,7 +410,7 @@ credit(Config) ->
{ok, F2} = rabbit_fifo_client:enqueue(m2, F1),
{_, _, F3} = process_ra_events(F2, [], 250),
%% checkout with 0 prefetch
- {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, F3),
+ {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, undefined, F3),
%% assert no deliveries
{_, _, F5} = process_ra_events0(F4, [], [], 250,
fun
@@ -464,7 +479,7 @@ test_queries(Config) ->
receive stop -> ok end
end),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
- {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, F0),
+ {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, undefined, F0),
{ok, {_, Ready}, _} = ra:local_query(ServerId,
fun rabbit_fifo:query_messages_ready/1),
?assertEqual(1, maps:size(Ready)),
@@ -485,7 +500,7 @@ dead_letter_handler(Pid, Msgs) ->
Pid ! {dead_letter, Msgs}.
dequeue(Config) ->
- ClusterName = ?config(priv_dir, Config),
+ ClusterName = ?config(cluster_name, Config),
ServerId = ?config(node_id, Config),
UId = ?config(uid, Config),
Tag = UId,
@@ -611,14 +626,15 @@ validate_process_down(Name, Num) ->
end.
start_cluster(ClusterName, ServerIds, RaFifoConfig) ->
- {ok, Started, _} = ra:start_cluster(ClusterName,
+ {ok, Started, _} = ra:start_cluster(ClusterName#resource.name,
{module, rabbit_fifo, RaFifoConfig},
ServerIds),
?assertEqual(length(Started), length(ServerIds)),
ok.
start_cluster(ClusterName, ServerIds) ->
- start_cluster(ClusterName, ServerIds, #{}).
+ start_cluster(ClusterName, ServerIds, #{name => some_name,
+ queue_resource => ClusterName}).
flush() ->
receive
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index 48e7b9aa7f..1c1ab42a9e 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -64,14 +64,14 @@ scenario1(_Config) ->
E = c:pid(0,6720,1),
Commands = [
- {checkout,{auto,2,simple_prefetch},C1},
- {enqueue,E,1,msg1},
- {enqueue,E,2,msg2},
- {checkout,cancel,C1}, %% both on returns queue
- {checkout,{auto,1,simple_prefetch},C2}, % on on return one on C2
- {return,[0],C2}, %% E1 in returns, E2 with C2
- {return,[1],C2}, %% E2 in returns E1 with C2
- {settle,[2],C2} %% E2 with C2
+ make_checkout(C1, {auto,2,simple_prefetch}),
+ make_enqueue(E,1,msg1),
+ make_enqueue(E,2,msg2),
+ make_checkout(C1, cancel), %% both on returns queue
+ make_checkout(C2, {auto,1,simple_prefetch}),
+ make_return(C2, [0]), %% E1 in returns, E2 with C2
+ make_return(C2, [1]), %% E2 in returns E1 with C2
+ make_settle(C2, [2]) %% E2 with C2
],
run_snapshot_test(?FUNCTION_NAME, Commands),
ok.
@@ -80,13 +80,13 @@ scenario2(_Config) ->
C1 = {<<>>, c:pid(0,346,1)},
C2 = {<<>>,c:pid(0,379,1)},
E = c:pid(0,327,1),
- Commands = [{checkout,{auto,1,simple_prefetch},C1},
- {enqueue,E,1,msg1},
- {checkout,cancel,C1},
- {enqueue,E,2,msg2},
- {checkout,{auto,1,simple_prefetch},C2},
- {settle,[0],C1},
- {settle,[0],C2}
+ Commands = [make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E,1,msg1),
+ make_checkout(C1, cancel),
+ make_enqueue(E,2,msg2),
+ make_checkout(C2, {auto,1,simple_prefetch}),
+ make_settle(C1, [0]),
+ make_settle(C2, [0])
],
run_snapshot_test(?FUNCTION_NAME, Commands),
ok.
@@ -94,22 +94,24 @@ scenario2(_Config) ->
scenario3(_Config) ->
C1 = {<<>>, c:pid(0,179,1)},
E = c:pid(0,176,1),
- Commands = [{checkout,{auto,2,simple_prefetch},C1},
- {enqueue,E,1,msg1},
- {return,[0],C1},
- {enqueue,E,2,msg2},
- {enqueue,E,3,msg3},
- {settle,[1],C1},
- {settle,[2],C1}],
+ Commands = [make_checkout(C1, {auto,2,simple_prefetch}),
+ make_enqueue(E,1,msg1),
+ make_return(C1, [0]),
+ make_enqueue(E,2,msg2),
+ make_enqueue(E,3,msg3),
+ make_settle(C1, [1]),
+ make_settle(C1, [2])
+ ],
run_snapshot_test(?FUNCTION_NAME, Commands),
ok.
scenario4(_Config) ->
C1 = {<<>>, c:pid(0,179,1)},
E = c:pid(0,176,1),
-Commands = [{checkout,{auto,1,simple_prefetch},C1},
- {enqueue,E,1,msg},
- {settle,[0],C1}],
+ Commands = [make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E,1,msg),
+ make_settle(C1, [0])
+ ],
run_snapshot_test(?FUNCTION_NAME, Commands),
ok.
@@ -167,6 +169,7 @@ checkout_gen(Pid) ->
-record(t, {state = rabbit_fifo:init(#{name => proper,
+ queue_resource => blah,
shadow_copy_interval => 1})
:: rabbit_fifo:state(),
index = 1 :: non_neg_integer(), %% raft index
@@ -201,7 +204,7 @@ handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0,
_ ->
Enqs = maps:update_with(Pid, fun (Seq) -> Seq + 1 end, 1, Enqs0),
MsgSeq = maps:get(Pid, Enqs),
- Cmd = {enqueue, Pid, MsgSeq, msg},
+ Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, msg),
case When of
enqueue ->
do_apply(Cmd, T#t{enqueuers = Enqs});
@@ -218,7 +221,7 @@ handle_op({checkout, Pid, cancel}, #t{consumers = Cons0} = T) ->
end, Cons0)) of
[CId | _] ->
Cons = maps:remove(CId, Cons0),
- Cmd = {checkout, cancel, CId},
+ Cmd = rabbit_fifo:make_checkout(CId, cancel, #{}),
do_apply(Cmd, T#t{consumers = Cons});
_ ->
T
@@ -230,7 +233,13 @@ handle_op({checkout, CId, Prefetch}, #t{consumers = Cons0} = T) ->
T;
_ ->
Cons = maps:put(CId, ok, Cons0),
- Cmd = {checkout, {auto, Prefetch, simple_prefetch}, CId},
+ Cmd = rabbit_fifo:make_checkout(CId,
+ {auto, Prefetch, simple_prefetch},
+ #{ack => true,
+ prefetch => Prefetch,
+ username => <<"user">>,
+ args => []}),
+
do_apply(Cmd, T#t{consumers = Cons})
end;
handle_op({down, Pid, Reason} = Cmd, #t{down = Down} = T) ->
@@ -253,14 +262,19 @@ handle_op({input_event, requeue}, #t{effects = Effs} = T) ->
handle_op({input_event, Settlement}, #t{effects = Effs} = T) ->
case queue:out(Effs) of
{{value, {settle, MsgIds, CId}}, Q} ->
- do_apply({Settlement, MsgIds, CId}, T#t{effects = Q});
- {{value, {enqueue, _, _, _} = Cmd}, Q} ->
+ Cmd = case Settlement of
+ settle -> rabbit_fifo:make_settle(CId, MsgIds);
+ return -> rabbit_fifo:make_return(CId, MsgIds);
+ discard -> rabbit_fifo:make_discard(CId, MsgIds)
+ end,
+ do_apply(Cmd, T#t{effects = Q});
+ {{value, Cmd}, Q} when element(1, Cmd) =:= enqueue ->
do_apply(Cmd, T#t{effects = Q});
_ ->
T
end;
handle_op(purge, T) ->
- do_apply(purge, T).
+ do_apply(rabbit_fifo:make_purge(), T).
do_apply(Cmd, #t{effects = Effs, index = Index, state = S0,
log = Log} = T) ->
@@ -275,7 +289,7 @@ enq_effs([{send_msg, P, {delivery, CTag, Msgs}, ra_event} | Rem], Q) ->
MsgIds = [I || {I, _} <- Msgs],
%% always make settle commands by default
%% they can be changed depending on the input event later
- Cmd = {settle, MsgIds, {CTag, P}},
+ Cmd = rabbit_fifo:make_settle({CTag, P}, MsgIds),
enq_effs(Rem, queue:in(Cmd, Q));
enq_effs([_ | Rem], Q) ->
% ct:pal("enq_effs dropping ~w~n", [E]),
@@ -310,18 +324,8 @@ run_snapshot_test0(Name, Commands) ->
Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
(_) -> false
end, Entries),
- % L = case Filtered of
- % [] -> undefined;
- % _ ->lists:last(Filtered)
- % end,
-
- % ct:pal("running from snapshot: ~b to ~w"
- % "~n~p~n",
- % [SnapIdx, L, SnapState]),
{S, _} = run_log(SnapState, Filtered),
% assert log can be restored from any release cursor index
- % ?debugFmt("Name ~p Idx ~p S~p~nState~p~nSnapState ~p~nFiltered ~p~n",
- % [Name, SnapIdx, S, State, SnapState, Filtered]),
?assertEqual(State, S)
end || {release_cursor, SnapIdx, SnapState} <- Effects],
ok.
@@ -342,7 +346,20 @@ run_log(InitState, Entries) ->
test_init(Name) ->
rabbit_fifo:init(#{name => Name,
+ queue_resource => blah,
shadow_copy_interval => 0,
metrics_handler => {?MODULE, metrics_handler, []}}).
meta(Idx) ->
#{index => Idx, term => 1}.
+
+make_checkout(Cid, Spec) ->
+ rabbit_fifo:make_checkout(Cid, Spec, #{}).
+
+make_enqueue(Pid, Seq, Msg) ->
+ rabbit_fifo:make_enqueue(Pid, Seq, Msg).
+
+make_settle(Cid, MsgIds) ->
+ rabbit_fifo:make_settle(Cid, MsgIds).
+
+make_return(Cid, MsgIds) ->
+ rabbit_fifo:make_return(Cid, MsgIds).