diff options
| author | Diana Corbacho <diana.corbacho@erlang-solutions.com> | 2016-06-14 12:29:56 +0100 |
|---|---|---|
| committer | Diana Corbacho <diana.corbacho@erlang-solutions.com> | 2016-06-14 12:29:56 +0100 |
| commit | 55acdb57727cc2cda19972269693d4c9133d8810 (patch) | |
| tree | 94c110ed4e6777e42ff9562029381ebaad8be220 /test | |
| parent | b5ad8e6725a810b0065b3135c5f6b6e0b769d211 (diff) | |
| download | rabbitmq-server-git-55acdb57727cc2cda19972269693d4c9133d8810.tar.gz | |
Add tests for order of messages within each priority queue
Diffstat (limited to 'test')
| -rw-r--r-- | test/priority_queue_SUITE.erl | 46 |
1 files changed, 43 insertions, 3 deletions
diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl index 5df5686090..3e94e5bf02 100644 --- a/test/priority_queue_SUITE.erl +++ b/test/priority_queue_SUITE.erl @@ -39,6 +39,7 @@ groups() -> mirror_queue_sync, mirror_queue_sync_priority_above_max, mirror_queue_sync_priority_above_max_pending_ack, + mirror_queue_sync_order, purge, requeue, resume, @@ -432,6 +433,33 @@ mirror_queue_auto_ack(Config) -> delete(Ch, Q), passed. +mirror_queue_sync_order(Config) -> + A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + B = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, A), + Ch2 = rabbit_ct_client_helpers:open_channel(Config, B), + Q = <<"mirror_queue_sync_order-queue">>, + declare(Ch, Q, 3), + publish_payload(Ch, Q, [{1, <<"msg1">>}, {2, <<"msg2">>}, + {2, <<"msg3">>}, {2, <<"msg4">>}, + {3, <<"msg5">>}]), + rabbit_ct_client_helpers:close_channel(Ch), + + %% Add and sync slave + ok = rabbit_ct_broker_helpers:set_ha_policy( + Config, A, <<"^mirror_queue_sync_order-queue$">>, <<"all">>), + rabbit_ct_broker_helpers:control_action(sync_queue, A, + [binary_to_list(Q)], [{"-p", "/"}]), + wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q)), + + %% Stop the master + rabbit_ct_broker_helpers:stop_node(Config, A), + + get_payload(Ch2, Q, do_ack, [<<"msg5">>, <<"msg2">>, <<"msg3">>, + <<"msg4">>, <<"msg1">>]), + + delete(Ch2, Q), + passed. %%---------------------------------------------------------------------------- open(Config) -> @@ -454,6 +482,11 @@ publish(Ch, Q, Ps) -> [publish1(Ch, Q, P) || P <- Ps], amqp_channel:wait_for_confirms(Ch). +publish_payload(Ch, Q, PPds) -> + amqp_channel:call(Ch, #'confirm.select'{}), + [publish1(Ch, Q, P, Pd) || {P, Pd} <- PPds], + amqp_channel:wait_for_confirms(Ch). + publish_many(_Ch, _Q, 0) -> ok; publish_many( Ch, Q, N) -> publish1(Ch, Q, random:uniform(5)), publish_many(Ch, Q, N - 1). @@ -463,6 +496,11 @@ publish1(Ch, Q, P) -> #amqp_msg{props = props(P), payload = priority2bin(P)}). +publish1(Ch, Q, P, Pd) -> + amqp_channel:cast(Ch, #'basic.publish'{routing_key = Q}, + #amqp_msg{props = props(P), + payload = Pd}). + props(undefined) -> #'P_basic'{delivery_mode = 2}; props(P) -> #'P_basic'{priority = P, delivery_mode = 2}. @@ -494,19 +532,21 @@ get_all(Ch, Q, Ack, Ps) -> DTags. get_partial(Ch, Q, Ack, Ps) -> - [get_ok(Ch, Q, Ack, P) || P <- Ps]. + [get_ok(Ch, Q, Ack, priority2bin(P)) || P <- Ps]. get_empty(Ch, Q) -> #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = Q}). -get_ok(Ch, Q, Ack, P) -> - PBin = priority2bin(P), +get_ok(Ch, Q, Ack, PBin) -> {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = PBin2}} = amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = Ack =:= no_ack}), PBin = PBin2, maybe_ack(Ch, Ack, DTag). +get_payload(Ch, Q, Ack, Ps) -> + [get_ok(Ch, Q, Ack, P) || P <- Ps]. + get_without_ack(Ch, Q) -> {#'basic.get_ok'{}, _} = amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = false}). |
