summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/single_active_consumer_SUITE.erl48
1 files changed, 24 insertions, 24 deletions
diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl
index 53bc6cde7e..07da4a8d08 100644
--- a/test/single_active_consumer_SUITE.erl
+++ b/test/single_active_consumer_SUITE.erl
@@ -85,21 +85,21 @@ end_per_testcase(Testcase, Config) ->
all_messages_go_to_one_consumer(Config) ->
{C, Ch} = connection_and_channel(Config),
Q = queue_declare(Ch, Config),
- NbMessages = 5,
- ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages}]),
+ MessageCount = 5,
+ ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount}]),
#'basic.consume_ok'{consumer_tag = CTag1} =
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
#'basic.consume_ok'{consumer_tag = CTag2} =
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
- [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, NbMessages)],
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount)],
receive
{consumer_done, {MessagesPerConsumer, MessageCount}} ->
- ?assertEqual(NbMessages, MessageCount),
+ ?assertEqual(MessageCount, MessageCount),
?assertEqual(2, maps:size(MessagesPerConsumer)),
- ?assertEqual(NbMessages, maps:get(CTag1, MessagesPerConsumer)),
+ ?assertEqual(MessageCount, maps:get(CTag1, MessagesPerConsumer)),
?assertEqual(0, maps:get(CTag2, MessagesPerConsumer))
after 1000 ->
throw(failed)
@@ -111,8 +111,8 @@ all_messages_go_to_one_consumer(Config) ->
fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
{C, Ch} = connection_and_channel(Config),
Q = queue_declare(Ch, Config),
- NbMessages = 10,
- ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages}]),
+ MessageCount = 10,
+ ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount}]),
#'basic.consume_ok'{consumer_tag = CTag1} =
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
#'basic.consume_ok'{consumer_tag = CTag2} =
@@ -121,9 +121,9 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
- [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, NbMessages div 2)],
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)],
- {MessagesPerConsumer1, _} = wait_for_messages(NbMessages div 2),
+ {MessagesPerConsumer1, _} = wait_for_messages(MessageCount div 2),
FirstActiveConsumerInList = maps:keys(maps:filter(fun(_CTag, MessageCount) -> MessageCount > 0 end, MessagesPerConsumer1)),
?assertEqual(1, length(FirstActiveConsumerInList)),
@@ -132,9 +132,9 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
{cancel_ok, FirstActiveConsumer} = wait_for_cancel_ok(),
- [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(NbMessages div 2 + 1, NbMessages - 1)],
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)],
- {MessagesPerConsumer2, _} = wait_for_messages(NbMessages div 2 - 1),
+ {MessagesPerConsumer2, _} = wait_for_messages(MessageCount div 2 - 1),
SecondActiveConsumerInList = maps:keys(maps:filter(
fun(CTag, MessageCount) -> MessageCount > 0 andalso CTag /= FirstActiveConsumer end,
MessagesPerConsumer2)
@@ -151,10 +151,10 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
receive
{consumer_done, {MessagesPerConsumer, MessageCount}} ->
- ?assertEqual(NbMessages, MessageCount),
+ ?assertEqual(MessageCount, MessageCount),
?assertEqual(3, maps:size(MessagesPerConsumer)),
- ?assertEqual(NbMessages div 2, maps:get(FirstActiveConsumer, MessagesPerConsumer)),
- ?assertEqual(NbMessages div 2 - 1, maps:get(SecondActiveConsumer, MessagesPerConsumer)),
+ ?assertEqual(MessageCount div 2, maps:get(FirstActiveConsumer, MessagesPerConsumer)),
+ ?assertEqual(MessageCount div 2 - 1, maps:get(SecondActiveConsumer, MessagesPerConsumer)),
?assertEqual(1, maps:get(LastActiveConsumer, MessagesPerConsumer))
after 1000 ->
throw(failed)
@@ -169,10 +169,10 @@ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config
{C2, Ch2} = connection_and_channel(Config),
{C3, Ch3} = connection_and_channel(Config),
Q = queue_declare(Ch, Config),
- NbMessages = 10,
- Consumer1Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages div 2}]),
- Consumer2Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages div 2 - 1}]),
- Consumer3Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages div 2 - 1}]),
+ MessageCount = 10,
+ Consumer1Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2}]),
+ Consumer2Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2 - 1}]),
+ Consumer3Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2 - 1}]),
#'basic.consume_ok'{consumer_tag = CTag1} =
amqp_channel:subscribe(Ch1, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"1">>}, Consumer1Pid),
#'basic.consume_ok'{} =
@@ -181,19 +181,19 @@ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config
amqp_channel:subscribe(Ch3, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"3">>}, Consumer3Pid),
Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
- [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, NbMessages div 2)],
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)],
{MessagesPerConsumer1, MessageCount1} = consume_results(),
- ?assertEqual(NbMessages div 2, MessageCount1),
+ ?assertEqual(MessageCount div 2, MessageCount1),
?assertEqual(1, maps:size(MessagesPerConsumer1)),
- ?assertEqual(NbMessages div 2, maps:get(CTag1, MessagesPerConsumer1)),
+ ?assertEqual(MessageCount div 2, maps:get(CTag1, MessagesPerConsumer1)),
ok = amqp_channel:close(Ch1),
- [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(NbMessages div 2 + 1, NbMessages - 1)],
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)],
{MessagesPerConsumer2, MessageCount2} = consume_results(),
- ?assertEqual(NbMessages div 2 - 1, MessageCount2),
+ ?assertEqual(MessageCount div 2 - 1, MessageCount2),
?assertEqual(1, maps:size(MessagesPerConsumer2)),
ok = amqp_channel:close(Ch2),
@@ -283,4 +283,4 @@ wait_for_cancel_ok() ->
{cancel_ok, CTag}
after 5000 ->
throw(consumer_cancel_ok_timeout)
- end. \ No newline at end of file
+ end.