diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2016-06-14 22:50:36 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2016-06-14 22:50:36 +0300 |
| commit | d8d72f12867599a1687b32a99a50fd4ce8ddbe1b (patch) | |
| tree | 94c110ed4e6777e42ff9562029381ebaad8be220 | |
| parent | 03d015c3bd2c86dc6df8413b58e2d780c783af08 (diff) | |
| parent | 55acdb57727cc2cda19972269693d4c9133d8810 (diff) | |
| download | rabbitmq-server-git-d8d72f12867599a1687b32a99a50fd4ce8ddbe1b.tar.gz | |
Merge branch 'rabbitmq-server-687' into stable
| -rw-r--r-- | src/rabbit_priority_queue.erl | 13 | ||||
| -rw-r--r-- | test/priority_queue_SUITE.erl | 46 |
2 files changed, 52 insertions, 7 deletions
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index a3bfb5cdfa..ae8a38daf0 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -591,10 +591,15 @@ partition_publish_delivered_batch(Publishes, MaxP) -> Publishes, fun ({Msg, _}) -> Msg end, MaxP). partition_publishes(Publishes, ExtractMsg, MaxP) -> - lists:foldl(fun (Pub, Dict) -> - Msg = ExtractMsg(Pub), - rabbit_misc:orddict_cons(priority(Msg, MaxP), Pub, Dict) - end, orddict:new(), Publishes). + Partitioned = + lists:foldl(fun (Pub, Dict) -> + Msg = ExtractMsg(Pub), + rabbit_misc:orddict_cons(priority(Msg, MaxP), Pub, Dict) + end, orddict:new(), Publishes), + orddict:map(fun (_P, RevPubs) -> + lists:reverse(RevPubs) + end, Partitioned). + priority_bq(Priority, [{MaxP, _} | _] = BQSs) -> bq_fetch(priority(Priority, MaxP), BQSs). diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl index 5df5686090..3e94e5bf02 100644 --- a/test/priority_queue_SUITE.erl +++ b/test/priority_queue_SUITE.erl @@ -39,6 +39,7 @@ groups() -> mirror_queue_sync, mirror_queue_sync_priority_above_max, mirror_queue_sync_priority_above_max_pending_ack, + mirror_queue_sync_order, purge, requeue, resume, @@ -432,6 +433,33 @@ mirror_queue_auto_ack(Config) -> delete(Ch, Q), passed. +mirror_queue_sync_order(Config) -> + A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + B = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, A), + Ch2 = rabbit_ct_client_helpers:open_channel(Config, B), + Q = <<"mirror_queue_sync_order-queue">>, + declare(Ch, Q, 3), + publish_payload(Ch, Q, [{1, <<"msg1">>}, {2, <<"msg2">>}, + {2, <<"msg3">>}, {2, <<"msg4">>}, + {3, <<"msg5">>}]), + rabbit_ct_client_helpers:close_channel(Ch), + + %% Add and sync slave + ok = rabbit_ct_broker_helpers:set_ha_policy( + Config, A, <<"^mirror_queue_sync_order-queue$">>, <<"all">>), + rabbit_ct_broker_helpers:control_action(sync_queue, A, + [binary_to_list(Q)], [{"-p", "/"}]), + wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q)), + + %% Stop the master + rabbit_ct_broker_helpers:stop_node(Config, A), + + get_payload(Ch2, Q, do_ack, [<<"msg5">>, <<"msg2">>, <<"msg3">>, + <<"msg4">>, <<"msg1">>]), + + delete(Ch2, Q), + passed. %%---------------------------------------------------------------------------- open(Config) -> @@ -454,6 +482,11 @@ publish(Ch, Q, Ps) -> [publish1(Ch, Q, P) || P <- Ps], amqp_channel:wait_for_confirms(Ch). +publish_payload(Ch, Q, PPds) -> + amqp_channel:call(Ch, #'confirm.select'{}), + [publish1(Ch, Q, P, Pd) || {P, Pd} <- PPds], + amqp_channel:wait_for_confirms(Ch). + publish_many(_Ch, _Q, 0) -> ok; publish_many( Ch, Q, N) -> publish1(Ch, Q, random:uniform(5)), publish_many(Ch, Q, N - 1). @@ -463,6 +496,11 @@ publish1(Ch, Q, P) -> #amqp_msg{props = props(P), payload = priority2bin(P)}). +publish1(Ch, Q, P, Pd) -> + amqp_channel:cast(Ch, #'basic.publish'{routing_key = Q}, + #amqp_msg{props = props(P), + payload = Pd}). + props(undefined) -> #'P_basic'{delivery_mode = 2}; props(P) -> #'P_basic'{priority = P, delivery_mode = 2}. @@ -494,19 +532,21 @@ get_all(Ch, Q, Ack, Ps) -> DTags. get_partial(Ch, Q, Ack, Ps) -> - [get_ok(Ch, Q, Ack, P) || P <- Ps]. + [get_ok(Ch, Q, Ack, priority2bin(P)) || P <- Ps]. get_empty(Ch, Q) -> #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = Q}). -get_ok(Ch, Q, Ack, P) -> - PBin = priority2bin(P), +get_ok(Ch, Q, Ack, PBin) -> {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = PBin2}} = amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = Ack =:= no_ack}), PBin = PBin2, maybe_ack(Ch, Ack, DTag). +get_payload(Ch, Q, Ack, Ps) -> + [get_ok(Ch, Q, Ack, P) || P <- Ps]. + get_without_ack(Ch, Q) -> {#'basic.get_ok'{}, _} = amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = false}). |
