summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2017-09-12 13:34:16 +0100
committerDaniil Fedotov <dfedotov@pivotal.io>2017-10-02 14:34:47 +0100
commitbf531fd017cbec756ee979299723adce76828c96 (patch)
treee81ffa4d3656f15826f95983ab2484ed494797b5 /test
parent7e64d485e196c1791df6eff07940a6c5f368a7a0 (diff)
downloadrabbitmq-server-git-bf531fd017cbec756ee979299723adce76828c96.tar.gz
Add configurable queue overflow strategy
If a queue is to be overflowed by a delivery it can reject the delivery or drop messages from the head. To reject delivery overflow can be configured to `reject_publish`, to drop head it's `drop_head` (default setting). Messages which will be rejected should still confirm being routed, so mandatory expectations are not accumulated on the channel side. Slave nodes will only confirm if a message was published or discarded. To drop confirms from slaves, all confirms for a message are cleared when the message is rejected. When promoting a new master, left-behind deliveries should be rejected if the queue is full, just like normal deliveries. Fixes #995 [#151294447]
Diffstat (limited to 'test')
-rw-r--r--test/clustering_management_SUITE.erl2
-rw-r--r--test/priority_queue_SUITE.erl15
-rw-r--r--test/rabbit_ha_test_producer.erl53
-rw-r--r--test/simple_ha_SUITE.erl39
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl185
5 files changed, 275 insertions, 19 deletions
diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl
index 2a23c4997e..8bf8a9a8b8 100644
--- a/test/clustering_management_SUITE.erl
+++ b/test/clustering_management_SUITE.erl
@@ -704,7 +704,7 @@ assert_failure(Fun) ->
{error_string, Reason} -> Reason;
{badrpc, {'EXIT', Reason}} -> Reason;
{badrpc_multi, Reason, _Nodes} -> Reason;
- Other -> exit({expected_failure, Other})
+ Other -> error({expected_failure, Other})
end.
stop_app(Node) ->
diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl
index eecd59b879..eb781ffedf 100644
--- a/test/priority_queue_SUITE.erl
+++ b/test/priority_queue_SUITE.erl
@@ -32,6 +32,7 @@ groups() ->
{cluster_size_2, [], [
ackfold,
drop,
+ reject,
dropwhile_fetchwhile,
info_head_message_timestamp,
matching,
@@ -306,6 +307,20 @@ drop(Config) ->
rabbit_ct_client_helpers:close_connection(Conn),
passed.
+reject(Config) ->
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Q = <<"reject-queue">>,
+ declare(Ch, Q, [{<<"x-max-length">>, long, 4},
+ {<<"x-overflow">>, longstr, <<"reject_publish">>}
+ | arguments(3)]),
+ publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]),
+ %% First 4 messages are published, all others are discarded.
+ get_all(Ch, Q, do_ack, [3, 2, 1, 1]),
+ delete(Ch, Q),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ rabbit_ct_client_helpers:close_connection(Conn),
+ passed.
+
purge(Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"purge-queue">>,
diff --git a/test/rabbit_ha_test_producer.erl b/test/rabbit_ha_test_producer.erl
index fe2d15ed9a..b6c301cd0c 100644
--- a/test/rabbit_ha_test_producer.erl
+++ b/test/rabbit_ha_test_producer.erl
@@ -15,7 +15,7 @@
%%
-module(rabbit_ha_test_producer).
--export([await_response/1, start/5, create/5]).
+-export([await_response/1, start/6, create/5, create/6]).
-include_lib("amqp_client/include/amqp_client.hrl").
@@ -28,13 +28,20 @@ await_response(ProducerPid) ->
end.
create(Channel, Queue, TestPid, Confirm, MsgsToSend) ->
+ create(Channel, Queue, TestPid, Confirm, MsgsToSend, acks).
+
+create(Channel, Queue, TestPid, Confirm, MsgsToSend, Mode) ->
+ AckNackMsgs = case Mode of
+ acks -> {ok, {error, received_nacks}};
+ nacks -> {{error, received_acks}, ok}
+ end,
ProducerPid = spawn_link(?MODULE, start, [Channel, Queue, TestPid,
- Confirm, MsgsToSend]),
+ Confirm, MsgsToSend, AckNackMsgs]),
receive
{ProducerPid, started} -> ProducerPid
end.
-start(Channel, Queue, TestPid, Confirm, MsgsToSend) ->
+start(Channel, Queue, TestPid, Confirm, MsgsToSend, AckNackMsgs) ->
ConfirmState =
case Confirm of
true -> amqp_channel:register_confirm_handler(Channel, self()),
@@ -45,25 +52,27 @@ start(Channel, Queue, TestPid, Confirm, MsgsToSend) ->
end,
TestPid ! {self(), started},
error_logger:info_msg("publishing ~w msgs on ~p~n", [MsgsToSend, Channel]),
- producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend).
+ producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend, AckNackMsgs).
%%
%% Private API
%%
-producer(_Channel, _Queue, TestPid, none, 0) ->
+producer(_Channel, _Queue, TestPid, none, 0, _AckNackMsgs) ->
TestPid ! {self(), ok};
-producer(Channel, _Queue, TestPid, ConfirmState, 0) ->
+producer(Channel, _Queue, TestPid, ConfirmState, 0, {AckMsg, NackMsg}) ->
error_logger:info_msg("awaiting confirms on channel ~p~n", [Channel]),
- Msg = case drain_confirms(no_nacks, ConfirmState) of
- no_nacks -> ok;
- nacks -> {error, received_nacks};
+ Msg = case drain_confirms(none, ConfirmState) of
+ %% No acks or nacks
+ acks -> AckMsg;
+ nacks -> NackMsg;
+ mix -> {error, received_both_acks_and_nacks};
{Nacks, CS} -> {error, {missing_confirms, Nacks,
lists:sort(gb_trees:keys(CS))}}
end,
TestPid ! {self(), Msg};
-producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend) ->
+producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend, AckNackMsgs) ->
Method = #'basic.publish'{exchange = <<"">>,
routing_key = Queue,
mandatory = false,
@@ -76,7 +85,7 @@ producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend) ->
payload = list_to_binary(
integer_to_list(MsgsToSend))}),
- producer(Channel, Queue, TestPid, ConfirmState1, MsgsToSend - 1).
+ producer(Channel, Queue, TestPid, ConfirmState1, MsgsToSend - 1, AckNackMsgs).
maybe_record_confirm(none, _, _) ->
none;
@@ -84,22 +93,34 @@ maybe_record_confirm(ConfirmState, Channel, MsgsToSend) ->
SeqNo = amqp_channel:next_publish_seqno(Channel),
gb_trees:insert(SeqNo, MsgsToSend, ConfirmState).
-drain_confirms(Nacks, ConfirmState) ->
+drain_confirms(Collected, ConfirmState) ->
case gb_trees:is_empty(ConfirmState) of
- true -> Nacks;
+ true -> Collected;
false -> receive
#'basic.ack'{delivery_tag = DeliveryTag,
multiple = IsMulti} ->
- drain_confirms(Nacks,
+ Collected1 = case Collected of
+ none -> acks;
+ acks -> acks;
+ nacks -> mix;
+ mix -> mix
+ end,
+ drain_confirms(Collected1,
delete_confirms(DeliveryTag, IsMulti,
ConfirmState));
#'basic.nack'{delivery_tag = DeliveryTag,
multiple = IsMulti} ->
- drain_confirms(nacks,
+ Collected1 = case Collected of
+ none -> nacks;
+ nacks -> nacks;
+ acks -> mix;
+ mix -> mix
+ end,
+ drain_confirms(Collected1,
delete_confirms(DeliveryTag, IsMulti,
ConfirmState))
after
- 60000 -> {Nacks, ConfirmState}
+ 60000 -> {Collected, ConfirmState}
end
end.
diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl
index a0499b9d59..297b53b713 100644
--- a/test/simple_ha_SUITE.erl
+++ b/test/simple_ha_SUITE.erl
@@ -44,7 +44,8 @@ groups() ->
auto_resume_no_ccn_client,
confirms_survive_stop,
confirms_survive_sigkill,
- confirms_survive_policy
+ confirms_survive_policy,
+ rejects_survive_stop
]}
].
@@ -156,6 +157,10 @@ confirms_survive_stop(Cf) -> confirms_survive(Cf, fun stop/2).
confirms_survive_sigkill(Cf) -> confirms_survive(Cf, fun sigkill/2).
confirms_survive_policy(Cf) -> confirms_survive(Cf, fun policy/2).
+rejects_survive_stop(Cf) -> rejecets_survive(Cf, fun stop/2).
+rejects_survive_sigkill(Cf) -> rejecets_survive(Cf, fun sigkill/2).
+rejects_survive_policy(Cf) -> rejecets_survive(Cf, fun policy/2).
+
%%----------------------------------------------------------------------------
consume_survives(Config, DeathFun, CancelOnFailover) ->
@@ -213,6 +218,38 @@ confirms_survive(Config, DeathFun) ->
rabbit_ha_test_producer:await_response(ProducerPid),
ok.
+rejecets_survive(Config, DeathFun) ->
+ [A, B, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Msgs = rabbit_ct_helpers:cover_work_factor(Config, 20000),
+ Node1Channel = rabbit_ct_client_helpers:open_channel(Config, A),
+ Node2Channel = rabbit_ct_client_helpers:open_channel(Config, B),
+
+ %% declare the queue on the master, mirrored to the two slaves
+ Queue = <<"test_rejects">>,
+ amqp_channel:call(Node1Channel,#'queue.declare'{queue = Queue,
+ auto_delete = false,
+ durable = true,
+ arguments = [{<<"x-max-length">>, long, 1},
+ {<<"x-overflow">>, longstr, <<"reject_publish">>}]}),
+ Payload = <<"there can be only one">>,
+ amqp_channel:call(Node1Channel,
+ #'basic.publish'{routing_key = Queue},
+ #amqp_msg{payload = Payload}),
+
+ %% send a bunch of messages from the producer. Tolerating nacks.
+ ProducerPid = rabbit_ha_test_producer:create(Node2Channel, Queue,
+ self(), true, Msgs, nacks),
+ DeathFun(Config, A),
+ rabbit_ha_test_producer:await_response(ProducerPid),
+
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload}} =
+ amqp_channel:call(Node2Channel, #'basic.get'{queue = Queue}),
+ %% There is only one message.
+ #'basic.get_empty'{} = amqp_channel:call(Node2Channel, #'basic.get'{queue = Queue}),
+ ok.
+
+
+
stop(Config, Node) ->
rabbit_ct_broker_helpers:stop_node_after(Config, Node, 50).
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
index dd8cd48b5a..951cf6f213 100644
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ b/test/unit_inbroker_parallel_SUITE.erl
@@ -33,6 +33,9 @@ all() ->
].
groups() ->
+ MaxLengthTests = [max_length_drop_head,
+ max_length_reject_confirm,
+ max_length_drop_publish],
[
{parallel_tests, [parallel], [
amqp_connection_refusal,
@@ -48,7 +51,10 @@ groups() ->
]},
set_disk_free_limit_command,
set_vm_memory_high_watermark_command,
- topic_matching
+ topic_matching,
+ {queue_max_length, [], [
+ {max_length_simple, [], MaxLengthTests},
+ {max_length_mirrored, [], MaxLengthTests}]}
]}
].
@@ -63,6 +69,11 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
+init_per_group(max_length_mirrored, Config) ->
+ rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
+ <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{is_mirrored, true}]),
+ rabbit_ct_helpers:run_steps(Config1, []);
init_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
true ->
@@ -91,6 +102,18 @@ setup_file_handle_cache1() ->
ok = file_handle_cache:set_limit(10),
ok.
+end_per_group(max_length_mirrored, Config) ->
+ rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"^max_length.*queue">>),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{is_mirrored, false}]),
+ Config1;
+end_per_group(queue_max_length, Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_reject_queue">>}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_publish_queue">>}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_head_queue">>}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_default_drop_head_queue">>}),
+ rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
+ Config;
end_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
true ->
@@ -1018,6 +1041,161 @@ set_vm_memory_high_watermark_command1(_Config) ->
)
end.
+max_length_drop_head(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = <<"max_length_drop_head_queue">>,
+ QNameDefault = <<"max_length_default_drop_head_queue">>,
+ QNameBytes = <<"max_length_bytes_drop_head_queue">>,
+ QNameDefaultBytes = <<"max_length_bytes_default_drop_head_queue">>,
+
+ MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
+ MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"drop_head">>}],
+ amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = QNameDefault}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = QNameBytes}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = QNameDefaultBytes}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefault, arguments = MaxLengthArgs}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefaultBytes, arguments = MaxLengthBytesArgs}),
+
+ check_max_length_drops_head(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
+ check_max_length_drops_head(Config, QNameDefault, Ch, <<"1">>, <<"2">>, <<"3">>),
+
+ %% 80 bytes payload
+ Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
+ Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>,
+ Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>,
+ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3),
+ check_max_length_drops_head(Config, QNameDefault, Ch, Payload1, Payload2, Payload3).
+
+max_length_reject_confirm(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = <<"max_length_reject_queue">>,
+ QNameBytes = <<"max_length_bytes_reject_queue">>,
+ MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
+ MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject_publish">>}],
+ amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = QNameBytes}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
+ check_max_length_rejects(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
+
+ %% 80 bytes payload
+ Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
+ Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>,
+ Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>,
+
+ check_max_length_drops_publish(Config, QNameBytes, Ch, Payload1, Payload2, Payload3),
+ check_max_length_rejects(Config, QNameBytes, Ch, Payload1, Payload2, Payload3).
+
+max_length_drop_publish(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = <<"max_length_drop_publish_queue">>,
+ QNameBytes = <<"max_length_bytes_drop_publish_queue">>,
+ MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
+ MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject_publish">>}],
+ amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = QNameBytes}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}),
+ %% If confirms are not enable, publishes will still be dropped in reject_publish mode.
+ check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
+
+ %% 80 bytes payload
+ Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
+ Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>,
+ Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>,
+
+ check_max_length_drops_publish(Config, QNameBytes, Ch, Payload1, Payload2, Payload3).
+
+check_max_length_drops_publish(Config, QName, Ch, Payload1, Payload2, Payload3) ->
+ sync_mirrors(QName, Config),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ %% A single message is published and consumed
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+
+ %% Message 2 is dropped, message 1 stays
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+
+ %% Messages 2 and 3 are dropped, message 1 stays
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
+
+check_max_length_rejects(Config, QName, Ch, Payload1, Payload2, Payload3) ->
+ sync_mirrors(QName, Config),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ flush(),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ %% First message can be enqueued and acks
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ receive #'basic.ack'{} -> ok
+ after 1000 -> error(expected_ack)
+ end,
+
+ %% The message cannot be enqueued and nacks
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ receive #'basic.nack'{} -> ok
+ after 1000 -> error(expected_nack)
+ end,
+
+ %% The message cannot be enqueued and nacks
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
+ receive #'basic.nack'{} -> ok
+ after 1000 -> error(expected_nack)
+ end,
+
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+
+ %% Now we can publish message 2.
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ receive #'basic.ack'{} -> ok
+ after 1000 -> error(expected_ack)
+ end,
+
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
+
+check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) ->
+ sync_mirrors(QName, Config),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ %% A single message is published and consumed
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+
+ %% Message 1 is replaced by message 2
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+
+ %% Messages 1 and 2 are replaced
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload3}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
+
+sync_mirrors(QName, Config) ->
+ case ?config(is_mirrored, Config) of
+ true ->
+ rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, [<<"sync_queue">>, QName]);
+ _ -> ok
+ end.
+
%% ---------------------------------------------------------------------------
%% rabbitmqctl helpers.
%% ---------------------------------------------------------------------------
@@ -1031,3 +1209,8 @@ expand_options(As, Bs) ->
false -> [A | R]
end
end, Bs, As).
+
+flush() ->
+ receive _ -> flush()
+ after 10 -> ok
+ end. \ No newline at end of file