diff options
Diffstat (limited to 'test')
| -rw-r--r-- | test/single_active_consumer_SUITE.erl | 48 |
1 files changed, 24 insertions, 24 deletions
diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl index 53bc6cde7e..07da4a8d08 100644 --- a/test/single_active_consumer_SUITE.erl +++ b/test/single_active_consumer_SUITE.erl @@ -85,21 +85,21 @@ end_per_testcase(Testcase, Config) -> all_messages_go_to_one_consumer(Config) -> {C, Ch} = connection_and_channel(Config), Q = queue_declare(Ch, Config), - NbMessages = 5, - ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages}]), + MessageCount = 5, + ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount}]), #'basic.consume_ok'{consumer_tag = CTag1} = amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), #'basic.consume_ok'{consumer_tag = CTag2} = amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, - [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, NbMessages)], + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount)], receive {consumer_done, {MessagesPerConsumer, MessageCount}} -> - ?assertEqual(NbMessages, MessageCount), + ?assertEqual(MessageCount, MessageCount), ?assertEqual(2, maps:size(MessagesPerConsumer)), - ?assertEqual(NbMessages, maps:get(CTag1, MessagesPerConsumer)), + ?assertEqual(MessageCount, maps:get(CTag1, MessagesPerConsumer)), ?assertEqual(0, maps:get(CTag2, MessagesPerConsumer)) after 1000 -> throw(failed) @@ -111,8 +111,8 @@ all_messages_go_to_one_consumer(Config) -> fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> {C, Ch} = connection_and_channel(Config), Q = queue_declare(Ch, Config), - NbMessages = 10, - ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages}]), + MessageCount = 10, + ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount}]), #'basic.consume_ok'{consumer_tag = CTag1} = amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), #'basic.consume_ok'{consumer_tag = CTag2} = @@ -121,9 +121,9 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, - [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, NbMessages div 2)], + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)], - {MessagesPerConsumer1, _} = wait_for_messages(NbMessages div 2), + {MessagesPerConsumer1, _} = wait_for_messages(MessageCount div 2), FirstActiveConsumerInList = maps:keys(maps:filter(fun(_CTag, MessageCount) -> MessageCount > 0 end, MessagesPerConsumer1)), ?assertEqual(1, length(FirstActiveConsumerInList)), @@ -132,9 +132,9 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> {cancel_ok, FirstActiveConsumer} = wait_for_cancel_ok(), - [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(NbMessages div 2 + 1, NbMessages - 1)], + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)], - {MessagesPerConsumer2, _} = wait_for_messages(NbMessages div 2 - 1), + {MessagesPerConsumer2, _} = wait_for_messages(MessageCount div 2 - 1), SecondActiveConsumerInList = maps:keys(maps:filter( fun(CTag, MessageCount) -> MessageCount > 0 andalso CTag /= FirstActiveConsumer end, MessagesPerConsumer2) @@ -151,10 +151,10 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> receive {consumer_done, {MessagesPerConsumer, MessageCount}} -> - ?assertEqual(NbMessages, MessageCount), + ?assertEqual(MessageCount, MessageCount), ?assertEqual(3, maps:size(MessagesPerConsumer)), - ?assertEqual(NbMessages div 2, maps:get(FirstActiveConsumer, MessagesPerConsumer)), - ?assertEqual(NbMessages div 2 - 1, maps:get(SecondActiveConsumer, MessagesPerConsumer)), + ?assertEqual(MessageCount div 2, maps:get(FirstActiveConsumer, MessagesPerConsumer)), + ?assertEqual(MessageCount div 2 - 1, maps:get(SecondActiveConsumer, MessagesPerConsumer)), ?assertEqual(1, maps:get(LastActiveConsumer, MessagesPerConsumer)) after 1000 -> throw(failed) @@ -169,10 +169,10 @@ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config {C2, Ch2} = connection_and_channel(Config), {C3, Ch3} = connection_and_channel(Config), Q = queue_declare(Ch, Config), - NbMessages = 10, - Consumer1Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages div 2}]), - Consumer2Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages div 2 - 1}]), - Consumer3Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages div 2 - 1}]), + MessageCount = 10, + Consumer1Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2}]), + Consumer2Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2 - 1}]), + Consumer3Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2 - 1}]), #'basic.consume_ok'{consumer_tag = CTag1} = amqp_channel:subscribe(Ch1, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"1">>}, Consumer1Pid), #'basic.consume_ok'{} = @@ -181,19 +181,19 @@ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config amqp_channel:subscribe(Ch3, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"3">>}, Consumer3Pid), Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, - [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, NbMessages div 2)], + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)], {MessagesPerConsumer1, MessageCount1} = consume_results(), - ?assertEqual(NbMessages div 2, MessageCount1), + ?assertEqual(MessageCount div 2, MessageCount1), ?assertEqual(1, maps:size(MessagesPerConsumer1)), - ?assertEqual(NbMessages div 2, maps:get(CTag1, MessagesPerConsumer1)), + ?assertEqual(MessageCount div 2, maps:get(CTag1, MessagesPerConsumer1)), ok = amqp_channel:close(Ch1), - [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(NbMessages div 2 + 1, NbMessages - 1)], + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)], {MessagesPerConsumer2, MessageCount2} = consume_results(), - ?assertEqual(NbMessages div 2 - 1, MessageCount2), + ?assertEqual(MessageCount div 2 - 1, MessageCount2), ?assertEqual(1, maps:size(MessagesPerConsumer2)), ok = amqp_channel:close(Ch2), @@ -283,4 +283,4 @@ wait_for_cancel_ok() -> {cancel_ok, CTag} after 5000 -> throw(consumer_cancel_ok_timeout) - end.
\ No newline at end of file + end. |
