summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDiana Corbacho <diana.corbacho@erlang-solutions.com>2016-06-14 12:29:56 +0100
committerDiana Corbacho <diana.corbacho@erlang-solutions.com>2016-06-14 12:29:56 +0100
commit55acdb57727cc2cda19972269693d4c9133d8810 (patch)
tree94c110ed4e6777e42ff9562029381ebaad8be220 /test
parentb5ad8e6725a810b0065b3135c5f6b6e0b769d211 (diff)
downloadrabbitmq-server-git-55acdb57727cc2cda19972269693d4c9133d8810.tar.gz
Add tests for order of messages within each priority queue
Diffstat (limited to 'test')
-rw-r--r--test/priority_queue_SUITE.erl46
1 files changed, 43 insertions, 3 deletions
diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl
index 5df5686090..3e94e5bf02 100644
--- a/test/priority_queue_SUITE.erl
+++ b/test/priority_queue_SUITE.erl
@@ -39,6 +39,7 @@ groups() ->
mirror_queue_sync,
mirror_queue_sync_priority_above_max,
mirror_queue_sync_priority_above_max_pending_ack,
+ mirror_queue_sync_order,
purge,
requeue,
resume,
@@ -432,6 +433,33 @@ mirror_queue_auto_ack(Config) ->
delete(Ch, Q),
passed.
+mirror_queue_sync_order(Config) ->
+ A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ B = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, A),
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config, B),
+ Q = <<"mirror_queue_sync_order-queue">>,
+ declare(Ch, Q, 3),
+ publish_payload(Ch, Q, [{1, <<"msg1">>}, {2, <<"msg2">>},
+ {2, <<"msg3">>}, {2, <<"msg4">>},
+ {3, <<"msg5">>}]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+
+ %% Add and sync slave
+ ok = rabbit_ct_broker_helpers:set_ha_policy(
+ Config, A, <<"^mirror_queue_sync_order-queue$">>, <<"all">>),
+ rabbit_ct_broker_helpers:control_action(sync_queue, A,
+ [binary_to_list(Q)], [{"-p", "/"}]),
+ wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
+
+ %% Stop the master
+ rabbit_ct_broker_helpers:stop_node(Config, A),
+
+ get_payload(Ch2, Q, do_ack, [<<"msg5">>, <<"msg2">>, <<"msg3">>,
+ <<"msg4">>, <<"msg1">>]),
+
+ delete(Ch2, Q),
+ passed.
%%----------------------------------------------------------------------------
open(Config) ->
@@ -454,6 +482,11 @@ publish(Ch, Q, Ps) ->
[publish1(Ch, Q, P) || P <- Ps],
amqp_channel:wait_for_confirms(Ch).
+publish_payload(Ch, Q, PPds) ->
+ amqp_channel:call(Ch, #'confirm.select'{}),
+ [publish1(Ch, Q, P, Pd) || {P, Pd} <- PPds],
+ amqp_channel:wait_for_confirms(Ch).
+
publish_many(_Ch, _Q, 0) -> ok;
publish_many( Ch, Q, N) -> publish1(Ch, Q, random:uniform(5)),
publish_many(Ch, Q, N - 1).
@@ -463,6 +496,11 @@ publish1(Ch, Q, P) ->
#amqp_msg{props = props(P),
payload = priority2bin(P)}).
+publish1(Ch, Q, P, Pd) ->
+ amqp_channel:cast(Ch, #'basic.publish'{routing_key = Q},
+ #amqp_msg{props = props(P),
+ payload = Pd}).
+
props(undefined) -> #'P_basic'{delivery_mode = 2};
props(P) -> #'P_basic'{priority = P,
delivery_mode = 2}.
@@ -494,19 +532,21 @@ get_all(Ch, Q, Ack, Ps) ->
DTags.
get_partial(Ch, Q, Ack, Ps) ->
- [get_ok(Ch, Q, Ack, P) || P <- Ps].
+ [get_ok(Ch, Q, Ack, priority2bin(P)) || P <- Ps].
get_empty(Ch, Q) ->
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = Q}).
-get_ok(Ch, Q, Ack, P) ->
- PBin = priority2bin(P),
+get_ok(Ch, Q, Ack, PBin) ->
{#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = PBin2}} =
amqp_channel:call(Ch, #'basic.get'{queue = Q,
no_ack = Ack =:= no_ack}),
PBin = PBin2,
maybe_ack(Ch, Ack, DTag).
+get_payload(Ch, Q, Ack, Ps) ->
+ [get_ok(Ch, Q, Ack, P) || P <- Ps].
+
get_without_ack(Ch, Q) ->
{#'basic.get_ok'{}, _} =
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = false}).