summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-09-21 16:25:13 +0100
committerkjnilsson <knilsson@pivotal.io>2020-09-25 10:05:24 +0100
commit4a916b0057e8daf319fea4e020171aaadd4333d1 (patch)
tree547ee0a9781ed42e3c475918aa03ced0ef7cc445 /test
parent8468954a87b9287a64d2936afed3a36b521462c4 (diff)
downloadrabbitmq-server-git-4a916b0057e8daf319fea4e020171aaadd4333d1.tar.gz
Quorum queue consumer prioritiesqq-consumer-priorities
This switches the service queue inside rabbit_fifo from a normal queue to a priority queue such that consumers with a higher priority are favoured for service.
Diffstat (limited to 'test')
-rw-r--r--test/quorum_queue_SUITE.erl81
-rw-r--r--test/rabbit_fifo_SUITE.erl26
2 files changed, 106 insertions, 1 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 942b42f5e4..ecb4fdac63 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -132,7 +132,8 @@ all_tests() ->
delete_if_empty,
delete_if_unused,
queue_ttl,
- peek
+ peek,
+ consumer_priorities
].
memory_tests() ->
@@ -2540,6 +2541,84 @@ queue_ttl(Config) ->
{<<"x-expires">>, long, 1000}]})),
ok.
+consumer_priorities(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch, 2, false),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ %% consumer with default priority
+ Tag1 = <<"ctag1">>,
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = QQ,
+ no_ack = false,
+ consumer_tag = Tag1},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = Tag1} ->
+ ok
+ end,
+ %% consumer with higher priority
+ Tag2 = <<"ctag2">>,
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = QQ,
+ arguments = [{"x-priority", long, 10}],
+ no_ack = false,
+ consumer_tag = Tag2},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = Tag2} ->
+ ok
+ end,
+
+ publish(Ch, QQ),
+ %% Tag2 should receive the message
+ DT1 = receive
+ {#'basic.deliver'{delivery_tag = D1,
+ consumer_tag = Tag2}, _} ->
+ D1
+ after 5000 ->
+ flush(100),
+ ct:fail("basic.deliver timeout")
+ end,
+ publish(Ch, QQ),
+ %% Tag2 should receive the message
+ receive
+ {#'basic.deliver'{delivery_tag = _,
+ consumer_tag = Tag2}, _} ->
+ ok
+ after 5000 ->
+ flush(100),
+ ct:fail("basic.deliver timeout")
+ end,
+
+ publish(Ch, QQ),
+ %% Tag1 should receive the message as Tag2 has maxed qos
+ receive
+ {#'basic.deliver'{delivery_tag = _,
+ consumer_tag = Tag1}, _} ->
+ ok
+ after 5000 ->
+ flush(100),
+ ct:fail("basic.deliver timeout")
+ end,
+
+ ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DT1,
+ multiple = false}),
+ publish(Ch, QQ),
+ %% Tag2 should receive the message
+ receive
+ {#'basic.deliver'{delivery_tag = _,
+ consumer_tag = Tag2}, _} ->
+ ok
+ after 5000 ->
+ flush(100),
+ ct:fail("basic.deliver timeout")
+ end,
+
+ ok.
+
%%----------------------------------------------------------------------------
declare(Ch, Q) ->
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 8431dd8db7..59a33eab4e 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -1509,10 +1509,13 @@ machine_version_test(_) ->
{S1, _Effects} = rabbit_fifo_v0_SUITE:run_log(S0, Entries),
Self = self(),
{#rabbit_fifo{enqueuers = #{Self := #enqueuer{}},
+ consumers = #{Cid := #consumer{priority = 0}},
+ service_queue = S,
messages = Msgs}, ok, []} = apply(meta(Idx),
{machine_version, 0, 1}, S1),
%% validate message conversion to lqueue
?assertEqual(1, lqueue:len(Msgs)),
+ ?assert(priority_queue:is_queue(S)),
ok.
queue_ttl_test(_) ->
@@ -1631,6 +1634,29 @@ query_peek_test(_) ->
?assertEqual({error, no_message_at_pos}, rabbit_fifo:query_peek(3, State2)),
ok.
+checkout_priority_test(_) ->
+ Cid = {<<"checkout_priority_test">>, self()},
+ Pid = spawn(fun () -> ok end),
+ Cid2 = {<<"checkout_priority_test2">>, Pid},
+ Args = [{<<"x-priority">>, long, 1}],
+ {S1, _, _} =
+ apply(meta(3),
+ rabbit_fifo:make_checkout(Cid, {once, 2, simple_prefetch},
+ #{args => Args}),
+ test_init(test)),
+ {S2, _, _} =
+ apply(meta(3),
+ rabbit_fifo:make_checkout(Cid2, {once, 2, simple_prefetch},
+ #{args => []}),
+ S1),
+ {S3, E3} = enq(1, 1, first, S2),
+ ?ASSERT_EFF({send_msg, P, {delivery, _, _}, _}, P == self(), E3),
+ {S4, E4} = enq(2, 2, second, S3),
+ ?ASSERT_EFF({send_msg, P, {delivery, _, _}, _}, P == self(), E4),
+ {_S5, E5} = enq(3, 3, third, S4),
+ ?ASSERT_EFF({send_msg, P, {delivery, _, _}, _}, P == Pid, E5),
+ ok.
+
%% Utility
init(Conf) -> rabbit_fifo:init(Conf).