diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2018-12-14 14:09:32 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-12-14 14:09:32 +0100 |
| commit | 685f8ea30c0654f9264ec3b25a9ccceb1a2e9a3e (patch) | |
| tree | 454777c224844e7a8440b37cf5340edf924bec95 /test | |
| parent | b914cdb7f539343d13eef4522608559d69edd828 (diff) | |
| parent | 2376e9b465c5bac53f388fc25e83265ddce95919 (diff) | |
| download | rabbitmq-server-git-685f8ea30c0654f9264ec3b25a9ccceb1a2e9a3e.tar.gz | |
Merge pull request #1801 from rabbitmq/qq-list-consumers
Implement consumer listing for quorum queues
Diffstat (limited to 'test')
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 9 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 50 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 101 |
3 files changed, 101 insertions, 59 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 523506b0e0..c94fb9ddab 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -709,6 +709,7 @@ subscribe(Config) -> ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + qos(Ch, 10, false), RaName = ra_name(QQ), publish(Ch, QQ), wait_for_messages_ready(Servers, RaName, 1), @@ -717,6 +718,14 @@ subscribe(Config) -> receive_basic_deliver(false), wait_for_messages_ready(Servers, RaName, 0), wait_for_messages_pending_ack(Servers, RaName, 1), + %% validate we can retrieve the consumers + [Consumer] = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), + ct:pal("Consumer ~p", [Consumer]), + ?assert(is_pid(proplists:get_value(channel_pid, Consumer))), + ?assert(is_binary(proplists:get_value(consumer_tag, Consumer))), + ?assertEqual(true, proplists:get_value(ack_required, Consumer)), + ?assertEqual(10, proplists:get_value(prefetch_count, Consumer)), + ?assertEqual([], proplists:get_value(arguments, Consumer)), rabbit_ct_client_helpers:close_channel(Ch), wait_for_messages_ready(Servers, RaName, 1), wait_for_messages_pending_ack(Servers, RaName, 0). diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 56608e9af3..3263a733a9 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -4,6 +4,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). all() -> [ @@ -50,11 +51,16 @@ end_per_group(_, Config) -> Config. init_per_testcase(TestCase, Config) -> + meck:new(rabbit_quorum_queue, [passthrough]), + meck:expect(rabbit_quorum_queue, update_metrics, fun (_, _) -> ok end), + meck:expect(rabbit_quorum_queue, cancel_consumer_handler, + fun (_, _) -> ok end), ra_server_sup:remove_all(), ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"), ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"), + ClusterName = rabbit_misc:r("/", queue, atom_to_binary(TestCase, utf8)), [ - {cluster_name, TestCase}, + {cluster_name, ClusterName}, {uid, atom_to_binary(TestCase, utf8)}, {node_id, {TestCase, node()}}, {uid2, atom_to_binary(ServerName2, utf8)}, @@ -63,6 +69,10 @@ init_per_testcase(TestCase, Config) -> {node_id3, {ServerName3, node()}} | Config]. +end_per_testcase(_, Config) -> + meck:unload(), + Config. + basics(Config) -> ClusterName = ?config(cluster_name, Config), ServerId = ?config(node_id, Config), @@ -70,7 +80,7 @@ basics(Config) -> CustomerTag = UId, ok = start_cluster(ClusterName, [ServerId]), FState0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, FState0), + {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, undefined, FState0), ra_log_wal:force_roll_over(ra_log_wal), % create segment the segment will trigger a snapshot @@ -167,7 +177,7 @@ duplicate_delivery(Config) -> ServerId = ?config(node_id, Config), ok = start_cluster(ClusterName, [ServerId]), F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1), Fun = fun Loop(S0) -> receive @@ -201,7 +211,7 @@ usage(Config) -> ServerId = ?config(node_id, Config), ok = start_cluster(ClusterName, [ServerId]), F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1), {ok, F3} = rabbit_fifo_client:enqueue(corr2, msg2, F2), {_, _, _} = process_ra_events(F3, 50), @@ -256,7 +266,7 @@ detects_lost_delivery(Config) -> F000 = rabbit_fifo_client:init(ClusterName, [ServerId]), {ok, F00} = rabbit_fifo_client:enqueue(msg1, F000), {_, _, F0} = process_ra_events(F00, 100), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2), % lose first delivery @@ -285,7 +295,8 @@ returns_after_down(Config) -> Self = self(), _Pid = spawn(fun () -> F = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10, F), + {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10, + undefined, F), Self ! checkout_done end), receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end, @@ -327,8 +338,11 @@ handles_reject_notification(Config) -> CId = {UId1, self()}, ok = start_cluster(ClusterName, [ServerId1, ServerId2]), - _ = ra:process_command(ServerId1, {checkout, - {auto, 10, simple_prefetch}, CId}), + _ = ra:process_command(ServerId1, + rabbit_fifo:make_checkout( + CId, + {auto, 10, simple_prefetch}, + #{})), % reverse order - should try the first node in the list first F0 = rabbit_fifo_client:init(ClusterName, [ServerId2, ServerId1]), {ok, F1} = rabbit_fifo_client:enqueue(one, F0), @@ -346,20 +360,21 @@ discard(Config) -> ServerId = ?config(node_id, Config), UId = ?config(uid, Config), ClusterName = ?config(cluster_name, Config), - Conf = #{cluster_name => ClusterName, + Conf = #{cluster_name => ClusterName#resource.name, id => ServerId, uid => UId, log_init_args => #{data_dir => PrivDir, uid => UId}, initial_member => [], machine => {module, rabbit_fifo, - #{dead_letter_handler => + #{queue_resource => discard, + dead_letter_handler => {?MODULE, dead_letter_handler, [self()]}}}}, _ = ra:start_server(Conf), ok = ra:trigger_election(ServerId), _ = ra:members(ServerId), F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), {ok, F2} = rabbit_fifo_client:enqueue(msg1, F1), F3 = discard_next_delivery(F2, 500), {ok, empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3), @@ -380,7 +395,7 @@ cancel_checkout(Config) -> ok = start_cluster(ClusterName, [ServerId]), F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), {ok, F1} = rabbit_fifo_client:enqueue(m1, F0), - {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, F1), + {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1), {_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end), {ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3), {ok, {_, {_, m1}}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4), @@ -395,7 +410,7 @@ credit(Config) -> {ok, F2} = rabbit_fifo_client:enqueue(m2, F1), {_, _, F3} = process_ra_events(F2, [], 250), %% checkout with 0 prefetch - {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, F3), + {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, undefined, F3), %% assert no deliveries {_, _, F5} = process_ra_events0(F4, [], [], 250, fun @@ -464,7 +479,7 @@ test_queries(Config) -> receive stop -> ok end end), F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), - {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, F0), + {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, undefined, F0), {ok, {_, Ready}, _} = ra:local_query(ServerId, fun rabbit_fifo:query_messages_ready/1), ?assertEqual(1, maps:size(Ready)), @@ -485,7 +500,7 @@ dead_letter_handler(Pid, Msgs) -> Pid ! {dead_letter, Msgs}. dequeue(Config) -> - ClusterName = ?config(priv_dir, Config), + ClusterName = ?config(cluster_name, Config), ServerId = ?config(node_id, Config), UId = ?config(uid, Config), Tag = UId, @@ -611,14 +626,15 @@ validate_process_down(Name, Num) -> end. start_cluster(ClusterName, ServerIds, RaFifoConfig) -> - {ok, Started, _} = ra:start_cluster(ClusterName, + {ok, Started, _} = ra:start_cluster(ClusterName#resource.name, {module, rabbit_fifo, RaFifoConfig}, ServerIds), ?assertEqual(length(Started), length(ServerIds)), ok. start_cluster(ClusterName, ServerIds) -> - start_cluster(ClusterName, ServerIds, #{}). + start_cluster(ClusterName, ServerIds, #{name => some_name, + queue_resource => ClusterName}). flush() -> receive diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index 48e7b9aa7f..1c1ab42a9e 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -64,14 +64,14 @@ scenario1(_Config) -> E = c:pid(0,6720,1), Commands = [ - {checkout,{auto,2,simple_prefetch},C1}, - {enqueue,E,1,msg1}, - {enqueue,E,2,msg2}, - {checkout,cancel,C1}, %% both on returns queue - {checkout,{auto,1,simple_prefetch},C2}, % on on return one on C2 - {return,[0],C2}, %% E1 in returns, E2 with C2 - {return,[1],C2}, %% E2 in returns E1 with C2 - {settle,[2],C2} %% E2 with C2 + make_checkout(C1, {auto,2,simple_prefetch}), + make_enqueue(E,1,msg1), + make_enqueue(E,2,msg2), + make_checkout(C1, cancel), %% both on returns queue + make_checkout(C2, {auto,1,simple_prefetch}), + make_return(C2, [0]), %% E1 in returns, E2 with C2 + make_return(C2, [1]), %% E2 in returns E1 with C2 + make_settle(C2, [2]) %% E2 with C2 ], run_snapshot_test(?FUNCTION_NAME, Commands), ok. @@ -80,13 +80,13 @@ scenario2(_Config) -> C1 = {<<>>, c:pid(0,346,1)}, C2 = {<<>>,c:pid(0,379,1)}, E = c:pid(0,327,1), - Commands = [{checkout,{auto,1,simple_prefetch},C1}, - {enqueue,E,1,msg1}, - {checkout,cancel,C1}, - {enqueue,E,2,msg2}, - {checkout,{auto,1,simple_prefetch},C2}, - {settle,[0],C1}, - {settle,[0],C2} + Commands = [make_checkout(C1, {auto,1,simple_prefetch}), + make_enqueue(E,1,msg1), + make_checkout(C1, cancel), + make_enqueue(E,2,msg2), + make_checkout(C2, {auto,1,simple_prefetch}), + make_settle(C1, [0]), + make_settle(C2, [0]) ], run_snapshot_test(?FUNCTION_NAME, Commands), ok. @@ -94,22 +94,24 @@ scenario2(_Config) -> scenario3(_Config) -> C1 = {<<>>, c:pid(0,179,1)}, E = c:pid(0,176,1), - Commands = [{checkout,{auto,2,simple_prefetch},C1}, - {enqueue,E,1,msg1}, - {return,[0],C1}, - {enqueue,E,2,msg2}, - {enqueue,E,3,msg3}, - {settle,[1],C1}, - {settle,[2],C1}], + Commands = [make_checkout(C1, {auto,2,simple_prefetch}), + make_enqueue(E,1,msg1), + make_return(C1, [0]), + make_enqueue(E,2,msg2), + make_enqueue(E,3,msg3), + make_settle(C1, [1]), + make_settle(C1, [2]) + ], run_snapshot_test(?FUNCTION_NAME, Commands), ok. scenario4(_Config) -> C1 = {<<>>, c:pid(0,179,1)}, E = c:pid(0,176,1), -Commands = [{checkout,{auto,1,simple_prefetch},C1}, - {enqueue,E,1,msg}, - {settle,[0],C1}], + Commands = [make_checkout(C1, {auto,1,simple_prefetch}), + make_enqueue(E,1,msg), + make_settle(C1, [0]) + ], run_snapshot_test(?FUNCTION_NAME, Commands), ok. @@ -167,6 +169,7 @@ checkout_gen(Pid) -> -record(t, {state = rabbit_fifo:init(#{name => proper, + queue_resource => blah, shadow_copy_interval => 1}) :: rabbit_fifo:state(), index = 1 :: non_neg_integer(), %% raft index @@ -201,7 +204,7 @@ handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0, _ -> Enqs = maps:update_with(Pid, fun (Seq) -> Seq + 1 end, 1, Enqs0), MsgSeq = maps:get(Pid, Enqs), - Cmd = {enqueue, Pid, MsgSeq, msg}, + Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, msg), case When of enqueue -> do_apply(Cmd, T#t{enqueuers = Enqs}); @@ -218,7 +221,7 @@ handle_op({checkout, Pid, cancel}, #t{consumers = Cons0} = T) -> end, Cons0)) of [CId | _] -> Cons = maps:remove(CId, Cons0), - Cmd = {checkout, cancel, CId}, + Cmd = rabbit_fifo:make_checkout(CId, cancel, #{}), do_apply(Cmd, T#t{consumers = Cons}); _ -> T @@ -230,7 +233,13 @@ handle_op({checkout, CId, Prefetch}, #t{consumers = Cons0} = T) -> T; _ -> Cons = maps:put(CId, ok, Cons0), - Cmd = {checkout, {auto, Prefetch, simple_prefetch}, CId}, + Cmd = rabbit_fifo:make_checkout(CId, + {auto, Prefetch, simple_prefetch}, + #{ack => true, + prefetch => Prefetch, + username => <<"user">>, + args => []}), + do_apply(Cmd, T#t{consumers = Cons}) end; handle_op({down, Pid, Reason} = Cmd, #t{down = Down} = T) -> @@ -253,14 +262,19 @@ handle_op({input_event, requeue}, #t{effects = Effs} = T) -> handle_op({input_event, Settlement}, #t{effects = Effs} = T) -> case queue:out(Effs) of {{value, {settle, MsgIds, CId}}, Q} -> - do_apply({Settlement, MsgIds, CId}, T#t{effects = Q}); - {{value, {enqueue, _, _, _} = Cmd}, Q} -> + Cmd = case Settlement of + settle -> rabbit_fifo:make_settle(CId, MsgIds); + return -> rabbit_fifo:make_return(CId, MsgIds); + discard -> rabbit_fifo:make_discard(CId, MsgIds) + end, + do_apply(Cmd, T#t{effects = Q}); + {{value, Cmd}, Q} when element(1, Cmd) =:= enqueue -> do_apply(Cmd, T#t{effects = Q}); _ -> T end; handle_op(purge, T) -> - do_apply(purge, T). + do_apply(rabbit_fifo:make_purge(), T). do_apply(Cmd, #t{effects = Effs, index = Index, state = S0, log = Log} = T) -> @@ -275,7 +289,7 @@ enq_effs([{send_msg, P, {delivery, CTag, Msgs}, ra_event} | Rem], Q) -> MsgIds = [I || {I, _} <- Msgs], %% always make settle commands by default %% they can be changed depending on the input event later - Cmd = {settle, MsgIds, {CTag, P}}, + Cmd = rabbit_fifo:make_settle({CTag, P}, MsgIds), enq_effs(Rem, queue:in(Cmd, Q)); enq_effs([_ | Rem], Q) -> % ct:pal("enq_effs dropping ~w~n", [E]), @@ -310,18 +324,8 @@ run_snapshot_test0(Name, Commands) -> Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true; (_) -> false end, Entries), - % L = case Filtered of - % [] -> undefined; - % _ ->lists:last(Filtered) - % end, - - % ct:pal("running from snapshot: ~b to ~w" - % "~n~p~n", - % [SnapIdx, L, SnapState]), {S, _} = run_log(SnapState, Filtered), % assert log can be restored from any release cursor index - % ?debugFmt("Name ~p Idx ~p S~p~nState~p~nSnapState ~p~nFiltered ~p~n", - % [Name, SnapIdx, S, State, SnapState, Filtered]), ?assertEqual(State, S) end || {release_cursor, SnapIdx, SnapState} <- Effects], ok. @@ -342,7 +346,20 @@ run_log(InitState, Entries) -> test_init(Name) -> rabbit_fifo:init(#{name => Name, + queue_resource => blah, shadow_copy_interval => 0, metrics_handler => {?MODULE, metrics_handler, []}}). meta(Idx) -> #{index => Idx, term => 1}. + +make_checkout(Cid, Spec) -> + rabbit_fifo:make_checkout(Cid, Spec, #{}). + +make_enqueue(Pid, Seq, Msg) -> + rabbit_fifo:make_enqueue(Pid, Seq, Msg). + +make_settle(Cid, MsgIds) -> + rabbit_fifo:make_settle(Cid, MsgIds). + +make_return(Cid, MsgIds) -> + rabbit_fifo:make_return(Cid, MsgIds). |
