diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2018-12-14 18:09:30 +0100 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2018-12-14 18:09:30 +0100 |
| commit | b4d354546d44799f9cdab979c02eecb4245e67ad (patch) | |
| tree | 91f036e9ee19aa726c9af1d93c95014ecd3dae25 /test | |
| parent | 6bec963773f5687c9fa24a7ce902d2bc9fa272c2 (diff) | |
| download | rabbitmq-server-git-b4d354546d44799f9cdab979c02eecb4245e67ad.tar.gz | |
Support single active consumer in quorum queue
Uses a buffer list for non-active consumers. The active consumer is
stored in the usual consumers structure, so the logic around servicing
consumers is kept the same.
[#162582065]
Fixes #1799
Diffstat (limited to 'test')
| -rw-r--r-- | test/single_active_consumer_SUITE.erl | 116 |
1 files changed, 90 insertions, 26 deletions
diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl index 9670744b2b..53bc6cde7e 100644 --- a/test/single_active_consumer_SUITE.erl +++ b/test/single_active_consumer_SUITE.erl @@ -24,16 +24,22 @@ all() -> [ - {group, default} + {group, classic_queue}, {group, quorum_queue} ]. groups() -> [ - {default, [], [ + {classic_queue, [], [ all_messages_go_to_one_consumer, fallback_to_another_consumer_when_first_one_is_cancelled, fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled, amqp_exclusive_consume_fails_on_exclusive_consumer_queue + ]}, + {quorum_queue, [], [ + all_messages_go_to_one_consumer, + fallback_to_another_consumer_when_first_one_is_cancelled, + fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled + %% amqp_exclusive_consume_fails_on_exclusive_consumer_queue % Exclusive consume not implemented in QQ ]} ]. @@ -48,18 +54,37 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config, - rabbit_ct_client_helpers:teardown_steps() ++ - rabbit_ct_broker_helpers:teardown_steps()). + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(classic_queue, Config) -> + [{single_active_consumer_queue_declare, + #'queue.declare'{arguments = [ + {<<"x-single-active-consumer">>, bool, true}, + {<<"x-queue-type">>, longstr, <<"classic">>} + ], + auto_delete = true} + } | Config]; +init_per_group(quorum_queue, Config) -> + [{single_active_consumer_queue_declare, + #'queue.declare'{arguments = [ + {<<"x-single-active-consumer">>, bool, true}, + {<<"x-queue-type">>, longstr, <<"quorum">>} + ], + durable = true, exclusive = false, auto_delete = false} + } | Config]. + +end_per_group(_, Config) -> + Config. init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). - end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). all_messages_go_to_one_consumer(Config) -> {C, Ch} = connection_and_channel(Config), - Q = queue_declare(Ch), + Q = queue_declare(Ch, Config), NbMessages = 5, ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages}]), #'basic.consume_ok'{consumer_tag = CTag1} = @@ -85,36 +110,52 @@ all_messages_go_to_one_consumer(Config) -> fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> {C, Ch} = connection_and_channel(Config), - Q = queue_declare(Ch), + Q = queue_declare(Ch, Config), NbMessages = 10, ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages}]), #'basic.consume_ok'{consumer_tag = CTag1} = amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), #'basic.consume_ok'{consumer_tag = CTag2} = amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), - #'basic.consume_ok'{consumer_tag = _CTag3} = + #'basic.consume_ok'{consumer_tag = CTag3} = amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, NbMessages div 2)], - #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag1}), + {MessagesPerConsumer1, _} = wait_for_messages(NbMessages div 2), + FirstActiveConsumerInList = maps:keys(maps:filter(fun(_CTag, MessageCount) -> MessageCount > 0 end, MessagesPerConsumer1)), + ?assertEqual(1, length(FirstActiveConsumerInList)), + + FirstActiveConsumer = lists:nth(1, FirstActiveConsumerInList), + #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = FirstActiveConsumer}), + + {cancel_ok, FirstActiveConsumer} = wait_for_cancel_ok(), [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(NbMessages div 2 + 1, NbMessages - 1)], - #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag2}), + {MessagesPerConsumer2, _} = wait_for_messages(NbMessages div 2 - 1), + SecondActiveConsumerInList = maps:keys(maps:filter( + fun(CTag, MessageCount) -> MessageCount > 0 andalso CTag /= FirstActiveConsumer end, + MessagesPerConsumer2) + ), + ?assertEqual(1, length(SecondActiveConsumerInList)), + SecondActiveConsumer = lists:nth(1, SecondActiveConsumerInList), + + #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = SecondActiveConsumer}), amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}), + wait_for_messages(1), + + LastActiveConsumer = lists:nth(1, lists:delete(FirstActiveConsumer, lists:delete(SecondActiveConsumer, [CTag1, CTag2, CTag3]))), receive {consumer_done, {MessagesPerConsumer, MessageCount}} -> ?assertEqual(NbMessages, MessageCount), ?assertEqual(3, maps:size(MessagesPerConsumer)), - ?assertEqual(NbMessages div 2, maps:get(CTag1, MessagesPerConsumer)), - Counts = maps:values(MessagesPerConsumer), - ?assert(lists:member(NbMessages div 2, Counts)), - ?assert(lists:member(NbMessages div 2 - 1, Counts)), - ?assert(lists:member(1, Counts)) + ?assertEqual(NbMessages div 2, maps:get(FirstActiveConsumer, MessagesPerConsumer)), + ?assertEqual(NbMessages div 2 - 1, maps:get(SecondActiveConsumer, MessagesPerConsumer)), + ?assertEqual(1, maps:get(LastActiveConsumer, MessagesPerConsumer)) after 1000 -> throw(failed) end, @@ -127,7 +168,7 @@ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config {C1, Ch1} = connection_and_channel(Config), {C2, Ch2} = connection_and_channel(Config), {C3, Ch3} = connection_and_channel(Config), - Q = queue_declare(Ch), + Q = queue_declare(Ch, Config), NbMessages = 10, Consumer1Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages div 2}]), Consumer2Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages div 2 - 1}]), @@ -168,7 +209,7 @@ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config amqp_exclusive_consume_fails_on_exclusive_consumer_queue(Config) -> {C, Ch} = connection_and_channel(Config), - Q = queue_declare(Ch), + Q = queue_declare(Ch, Config), ?assertExit( {{shutdown, {server_initiated_close, 403, _}}, _}, amqp_channel:call(Ch, #'basic.consume'{queue = Q, exclusive = true}) @@ -181,9 +222,8 @@ connection_and_channel(Config) -> {ok, Ch} = amqp_connection:open_channel(C), {C, Ch}. -queue_declare(Channel) -> - Declare = #'queue.declare'{arguments = [{"x-single-active-consumer", bool, true}], - auto_delete = true}, +queue_declare(Channel, Config) -> + Declare = ?config(single_active_consumer_queue_declare, Config), #'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, Declare), Q. @@ -198,11 +238,12 @@ consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}) -> {maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer), MessageCount + 1}}; {#'basic.deliver'{consumer_tag = CTag}, _Content} -> - consume({Parent, - {maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer), - MessageCount + 1}, - CountDown - 1}); - #'basic.cancel_ok'{} -> + NewState = {maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer), + MessageCount + 1}, + Parent ! {message, NewState}, + consume({Parent, NewState, CountDown - 1}); + #'basic.cancel_ok'{consumer_tag = CTag} -> + Parent ! {cancel_ok, CTag}, consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}); _ -> consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}) @@ -216,7 +257,30 @@ consume_results() -> {consumer_done, {MessagesPerConsumer, MessageCount}} -> {MessagesPerConsumer, MessageCount}; {consumer_timeout, {MessagesPerConsumer, MessageCount}} -> - {MessagesPerConsumer, MessageCount} + {MessagesPerConsumer, MessageCount}; + _ -> + consume_results() after 1000 -> throw(failed) + end. + +wait_for_messages(ExpectedCount) -> + wait_for_messages(ExpectedCount, {}). + +wait_for_messages(0, State) -> + State; +wait_for_messages(ExpectedCount, _) -> + receive + {message, {MessagesPerConsumer, MessageCount}} -> + wait_for_messages(ExpectedCount - 1, {MessagesPerConsumer, MessageCount}) + after 5000 -> + throw(message_waiting_timeout) + end. + +wait_for_cancel_ok() -> + receive + {cancel_ok, CTag} -> + {cancel_ok, CTag} + after 5000 -> + throw(consumer_cancel_ok_timeout) end.
\ No newline at end of file |
