summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2018-12-14 18:09:30 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2018-12-14 18:09:30 +0100
commitb4d354546d44799f9cdab979c02eecb4245e67ad (patch)
tree91f036e9ee19aa726c9af1d93c95014ecd3dae25 /test
parent6bec963773f5687c9fa24a7ce902d2bc9fa272c2 (diff)
downloadrabbitmq-server-git-b4d354546d44799f9cdab979c02eecb4245e67ad.tar.gz
Support single active consumer in quorum queue
Uses a buffer list for non-active consumers. The active consumer is stored in the usual consumers structure, so the logic around servicing consumers is kept the same. [#162582065] Fixes #1799
Diffstat (limited to 'test')
-rw-r--r--test/single_active_consumer_SUITE.erl116
1 files changed, 90 insertions, 26 deletions
diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl
index 9670744b2b..53bc6cde7e 100644
--- a/test/single_active_consumer_SUITE.erl
+++ b/test/single_active_consumer_SUITE.erl
@@ -24,16 +24,22 @@
all() ->
[
- {group, default}
+ {group, classic_queue}, {group, quorum_queue}
].
groups() ->
[
- {default, [], [
+ {classic_queue, [], [
all_messages_go_to_one_consumer,
fallback_to_another_consumer_when_first_one_is_cancelled,
fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled,
amqp_exclusive_consume_fails_on_exclusive_consumer_queue
+ ]},
+ {quorum_queue, [], [
+ all_messages_go_to_one_consumer,
+ fallback_to_another_consumer_when_first_one_is_cancelled,
+ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled
+ %% amqp_exclusive_consume_fails_on_exclusive_consumer_queue % Exclusive consume not implemented in QQ
]}
].
@@ -48,18 +54,37 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
- rabbit_ct_client_helpers:teardown_steps() ++
- rabbit_ct_broker_helpers:teardown_steps()).
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(classic_queue, Config) ->
+ [{single_active_consumer_queue_declare,
+ #'queue.declare'{arguments = [
+ {<<"x-single-active-consumer">>, bool, true},
+ {<<"x-queue-type">>, longstr, <<"classic">>}
+ ],
+ auto_delete = true}
+ } | Config];
+init_per_group(quorum_queue, Config) ->
+ [{single_active_consumer_queue_declare,
+ #'queue.declare'{arguments = [
+ {<<"x-single-active-consumer">>, bool, true},
+ {<<"x-queue-type">>, longstr, <<"quorum">>}
+ ],
+ durable = true, exclusive = false, auto_delete = false}
+ } | Config].
+
+end_per_group(_, Config) ->
+ Config.
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
-
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
all_messages_go_to_one_consumer(Config) ->
{C, Ch} = connection_and_channel(Config),
- Q = queue_declare(Ch),
+ Q = queue_declare(Ch, Config),
NbMessages = 5,
ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages}]),
#'basic.consume_ok'{consumer_tag = CTag1} =
@@ -85,36 +110,52 @@ all_messages_go_to_one_consumer(Config) ->
fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
{C, Ch} = connection_and_channel(Config),
- Q = queue_declare(Ch),
+ Q = queue_declare(Ch, Config),
NbMessages = 10,
ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages}]),
#'basic.consume_ok'{consumer_tag = CTag1} =
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
#'basic.consume_ok'{consumer_tag = CTag2} =
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
- #'basic.consume_ok'{consumer_tag = _CTag3} =
+ #'basic.consume_ok'{consumer_tag = CTag3} =
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
[amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, NbMessages div 2)],
- #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag1}),
+ {MessagesPerConsumer1, _} = wait_for_messages(NbMessages div 2),
+ FirstActiveConsumerInList = maps:keys(maps:filter(fun(_CTag, MessageCount) -> MessageCount > 0 end, MessagesPerConsumer1)),
+ ?assertEqual(1, length(FirstActiveConsumerInList)),
+
+ FirstActiveConsumer = lists:nth(1, FirstActiveConsumerInList),
+ #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = FirstActiveConsumer}),
+
+ {cancel_ok, FirstActiveConsumer} = wait_for_cancel_ok(),
[amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(NbMessages div 2 + 1, NbMessages - 1)],
- #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag2}),
+ {MessagesPerConsumer2, _} = wait_for_messages(NbMessages div 2 - 1),
+ SecondActiveConsumerInList = maps:keys(maps:filter(
+ fun(CTag, MessageCount) -> MessageCount > 0 andalso CTag /= FirstActiveConsumer end,
+ MessagesPerConsumer2)
+ ),
+ ?assertEqual(1, length(SecondActiveConsumerInList)),
+ SecondActiveConsumer = lists:nth(1, SecondActiveConsumerInList),
+
+ #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = SecondActiveConsumer}),
amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}),
+ wait_for_messages(1),
+
+ LastActiveConsumer = lists:nth(1, lists:delete(FirstActiveConsumer, lists:delete(SecondActiveConsumer, [CTag1, CTag2, CTag3]))),
receive
{consumer_done, {MessagesPerConsumer, MessageCount}} ->
?assertEqual(NbMessages, MessageCount),
?assertEqual(3, maps:size(MessagesPerConsumer)),
- ?assertEqual(NbMessages div 2, maps:get(CTag1, MessagesPerConsumer)),
- Counts = maps:values(MessagesPerConsumer),
- ?assert(lists:member(NbMessages div 2, Counts)),
- ?assert(lists:member(NbMessages div 2 - 1, Counts)),
- ?assert(lists:member(1, Counts))
+ ?assertEqual(NbMessages div 2, maps:get(FirstActiveConsumer, MessagesPerConsumer)),
+ ?assertEqual(NbMessages div 2 - 1, maps:get(SecondActiveConsumer, MessagesPerConsumer)),
+ ?assertEqual(1, maps:get(LastActiveConsumer, MessagesPerConsumer))
after 1000 ->
throw(failed)
end,
@@ -127,7 +168,7 @@ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config
{C1, Ch1} = connection_and_channel(Config),
{C2, Ch2} = connection_and_channel(Config),
{C3, Ch3} = connection_and_channel(Config),
- Q = queue_declare(Ch),
+ Q = queue_declare(Ch, Config),
NbMessages = 10,
Consumer1Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages div 2}]),
Consumer2Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages div 2 - 1}]),
@@ -168,7 +209,7 @@ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config
amqp_exclusive_consume_fails_on_exclusive_consumer_queue(Config) ->
{C, Ch} = connection_and_channel(Config),
- Q = queue_declare(Ch),
+ Q = queue_declare(Ch, Config),
?assertExit(
{{shutdown, {server_initiated_close, 403, _}}, _},
amqp_channel:call(Ch, #'basic.consume'{queue = Q, exclusive = true})
@@ -181,9 +222,8 @@ connection_and_channel(Config) ->
{ok, Ch} = amqp_connection:open_channel(C),
{C, Ch}.
-queue_declare(Channel) ->
- Declare = #'queue.declare'{arguments = [{"x-single-active-consumer", bool, true}],
- auto_delete = true},
+queue_declare(Channel, Config) ->
+ Declare = ?config(single_active_consumer_queue_declare, Config),
#'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, Declare),
Q.
@@ -198,11 +238,12 @@ consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}) ->
{maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer),
MessageCount + 1}};
{#'basic.deliver'{consumer_tag = CTag}, _Content} ->
- consume({Parent,
- {maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer),
- MessageCount + 1},
- CountDown - 1});
- #'basic.cancel_ok'{} ->
+ NewState = {maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer),
+ MessageCount + 1},
+ Parent ! {message, NewState},
+ consume({Parent, NewState, CountDown - 1});
+ #'basic.cancel_ok'{consumer_tag = CTag} ->
+ Parent ! {cancel_ok, CTag},
consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown});
_ ->
consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown})
@@ -216,7 +257,30 @@ consume_results() ->
{consumer_done, {MessagesPerConsumer, MessageCount}} ->
{MessagesPerConsumer, MessageCount};
{consumer_timeout, {MessagesPerConsumer, MessageCount}} ->
- {MessagesPerConsumer, MessageCount}
+ {MessagesPerConsumer, MessageCount};
+ _ ->
+ consume_results()
after 1000 ->
throw(failed)
+ end.
+
+wait_for_messages(ExpectedCount) ->
+ wait_for_messages(ExpectedCount, {}).
+
+wait_for_messages(0, State) ->
+ State;
+wait_for_messages(ExpectedCount, _) ->
+ receive
+ {message, {MessagesPerConsumer, MessageCount}} ->
+ wait_for_messages(ExpectedCount - 1, {MessagesPerConsumer, MessageCount})
+ after 5000 ->
+ throw(message_waiting_timeout)
+ end.
+
+wait_for_cancel_ok() ->
+ receive
+ {cancel_ok, CTag} ->
+ {cancel_ok, CTag}
+ after 5000 ->
+ throw(consumer_cancel_ok_timeout)
end. \ No newline at end of file