diff options
| author | kjnilsson <knilsson@pivotal.io> | 2020-09-21 16:25:13 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2020-09-25 10:05:24 +0100 |
| commit | 4a916b0057e8daf319fea4e020171aaadd4333d1 (patch) | |
| tree | 547ee0a9781ed42e3c475918aa03ced0ef7cc445 /test | |
| parent | 8468954a87b9287a64d2936afed3a36b521462c4 (diff) | |
| download | rabbitmq-server-git-4a916b0057e8daf319fea4e020171aaadd4333d1.tar.gz | |
Quorum queue consumer prioritiesqq-consumer-priorities
This switches the service queue inside rabbit_fifo from a normal queue
to a priority queue such that consumers with a higher priority are
favoured for service.
Diffstat (limited to 'test')
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 81 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 26 |
2 files changed, 106 insertions, 1 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 942b42f5e4..ecb4fdac63 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -132,7 +132,8 @@ all_tests() -> delete_if_empty, delete_if_unused, queue_ttl, - peek + peek, + consumer_priorities ]. memory_tests() -> @@ -2540,6 +2541,84 @@ queue_ttl(Config) -> {<<"x-expires">>, long, 1000}]})), ok. +consumer_priorities(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch, 2, false), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + %% consumer with default priority + Tag1 = <<"ctag1">>, + amqp_channel:subscribe(Ch, #'basic.consume'{queue = QQ, + no_ack = false, + consumer_tag = Tag1}, + self()), + receive + #'basic.consume_ok'{consumer_tag = Tag1} -> + ok + end, + %% consumer with higher priority + Tag2 = <<"ctag2">>, + amqp_channel:subscribe(Ch, #'basic.consume'{queue = QQ, + arguments = [{"x-priority", long, 10}], + no_ack = false, + consumer_tag = Tag2}, + self()), + receive + #'basic.consume_ok'{consumer_tag = Tag2} -> + ok + end, + + publish(Ch, QQ), + %% Tag2 should receive the message + DT1 = receive + {#'basic.deliver'{delivery_tag = D1, + consumer_tag = Tag2}, _} -> + D1 + after 5000 -> + flush(100), + ct:fail("basic.deliver timeout") + end, + publish(Ch, QQ), + %% Tag2 should receive the message + receive + {#'basic.deliver'{delivery_tag = _, + consumer_tag = Tag2}, _} -> + ok + after 5000 -> + flush(100), + ct:fail("basic.deliver timeout") + end, + + publish(Ch, QQ), + %% Tag1 should receive the message as Tag2 has maxed qos + receive + {#'basic.deliver'{delivery_tag = _, + consumer_tag = Tag1}, _} -> + ok + after 5000 -> + flush(100), + ct:fail("basic.deliver timeout") + end, + + ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DT1, + multiple = false}), + publish(Ch, QQ), + %% Tag2 should receive the message + receive + {#'basic.deliver'{delivery_tag = _, + consumer_tag = Tag2}, _} -> + ok + after 5000 -> + flush(100), + ct:fail("basic.deliver timeout") + end, + + ok. + %%---------------------------------------------------------------------------- declare(Ch, Q) -> diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 8431dd8db7..59a33eab4e 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -1509,10 +1509,13 @@ machine_version_test(_) -> {S1, _Effects} = rabbit_fifo_v0_SUITE:run_log(S0, Entries), Self = self(), {#rabbit_fifo{enqueuers = #{Self := #enqueuer{}}, + consumers = #{Cid := #consumer{priority = 0}}, + service_queue = S, messages = Msgs}, ok, []} = apply(meta(Idx), {machine_version, 0, 1}, S1), %% validate message conversion to lqueue ?assertEqual(1, lqueue:len(Msgs)), + ?assert(priority_queue:is_queue(S)), ok. queue_ttl_test(_) -> @@ -1631,6 +1634,29 @@ query_peek_test(_) -> ?assertEqual({error, no_message_at_pos}, rabbit_fifo:query_peek(3, State2)), ok. +checkout_priority_test(_) -> + Cid = {<<"checkout_priority_test">>, self()}, + Pid = spawn(fun () -> ok end), + Cid2 = {<<"checkout_priority_test2">>, Pid}, + Args = [{<<"x-priority">>, long, 1}], + {S1, _, _} = + apply(meta(3), + rabbit_fifo:make_checkout(Cid, {once, 2, simple_prefetch}, + #{args => Args}), + test_init(test)), + {S2, _, _} = + apply(meta(3), + rabbit_fifo:make_checkout(Cid2, {once, 2, simple_prefetch}, + #{args => []}), + S1), + {S3, E3} = enq(1, 1, first, S2), + ?ASSERT_EFF({send_msg, P, {delivery, _, _}, _}, P == self(), E3), + {S4, E4} = enq(2, 2, second, S3), + ?ASSERT_EFF({send_msg, P, {delivery, _, _}, _}, P == self(), E4), + {_S5, E5} = enq(3, 3, third, S4), + ?ASSERT_EFF({send_msg, P, {delivery, _, _}, _}, P == Pid, E5), + ok. + %% Utility init(Conf) -> rabbit_fifo:init(Conf). |
