diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2017-09-12 13:34:16 +0100 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2017-10-02 14:34:47 +0100 |
| commit | bf531fd017cbec756ee979299723adce76828c96 (patch) | |
| tree | e81ffa4d3656f15826f95983ab2484ed494797b5 /test | |
| parent | 7e64d485e196c1791df6eff07940a6c5f368a7a0 (diff) | |
| download | rabbitmq-server-git-bf531fd017cbec756ee979299723adce76828c96.tar.gz | |
Add configurable queue overflow strategy
If a queue is to be overflowed by a delivery it can reject
the delivery or drop messages from the head.
To reject delivery overflow can be configured to `reject_publish`,
to drop head it's `drop_head` (default setting).
Messages which will be rejected should still confirm being routed,
so mandatory expectations are not accumulated on the channel side.
Slave nodes will only confirm if a message was published or discarded.
To drop confirms from slaves, all confirms for a message are cleared
when the message is rejected.
When promoting a new master, left-behind deliveries should
be rejected if the queue is full, just like normal deliveries.
Fixes #995
[#151294447]
Diffstat (limited to 'test')
| -rw-r--r-- | test/clustering_management_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/priority_queue_SUITE.erl | 15 | ||||
| -rw-r--r-- | test/rabbit_ha_test_producer.erl | 53 | ||||
| -rw-r--r-- | test/simple_ha_SUITE.erl | 39 | ||||
| -rw-r--r-- | test/unit_inbroker_parallel_SUITE.erl | 185 |
5 files changed, 275 insertions, 19 deletions
diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl index 2a23c4997e..8bf8a9a8b8 100644 --- a/test/clustering_management_SUITE.erl +++ b/test/clustering_management_SUITE.erl @@ -704,7 +704,7 @@ assert_failure(Fun) -> {error_string, Reason} -> Reason; {badrpc, {'EXIT', Reason}} -> Reason; {badrpc_multi, Reason, _Nodes} -> Reason; - Other -> exit({expected_failure, Other}) + Other -> error({expected_failure, Other}) end. stop_app(Node) -> diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl index eecd59b879..eb781ffedf 100644 --- a/test/priority_queue_SUITE.erl +++ b/test/priority_queue_SUITE.erl @@ -32,6 +32,7 @@ groups() -> {cluster_size_2, [], [ ackfold, drop, + reject, dropwhile_fetchwhile, info_head_message_timestamp, matching, @@ -306,6 +307,20 @@ drop(Config) -> rabbit_ct_client_helpers:close_connection(Conn), passed. +reject(Config) -> + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Q = <<"reject-queue">>, + declare(Ch, Q, [{<<"x-max-length">>, long, 4}, + {<<"x-overflow">>, longstr, <<"reject_publish">>} + | arguments(3)]), + publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]), + %% First 4 messages are published, all others are discarded. + get_all(Ch, Q, do_ack, [3, 2, 1, 1]), + delete(Ch, Q), + rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), + passed. + purge(Config) -> {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Q = <<"purge-queue">>, diff --git a/test/rabbit_ha_test_producer.erl b/test/rabbit_ha_test_producer.erl index fe2d15ed9a..b6c301cd0c 100644 --- a/test/rabbit_ha_test_producer.erl +++ b/test/rabbit_ha_test_producer.erl @@ -15,7 +15,7 @@ %% -module(rabbit_ha_test_producer). --export([await_response/1, start/5, create/5]). +-export([await_response/1, start/6, create/5, create/6]). -include_lib("amqp_client/include/amqp_client.hrl"). @@ -28,13 +28,20 @@ await_response(ProducerPid) -> end. create(Channel, Queue, TestPid, Confirm, MsgsToSend) -> + create(Channel, Queue, TestPid, Confirm, MsgsToSend, acks). + +create(Channel, Queue, TestPid, Confirm, MsgsToSend, Mode) -> + AckNackMsgs = case Mode of + acks -> {ok, {error, received_nacks}}; + nacks -> {{error, received_acks}, ok} + end, ProducerPid = spawn_link(?MODULE, start, [Channel, Queue, TestPid, - Confirm, MsgsToSend]), + Confirm, MsgsToSend, AckNackMsgs]), receive {ProducerPid, started} -> ProducerPid end. -start(Channel, Queue, TestPid, Confirm, MsgsToSend) -> +start(Channel, Queue, TestPid, Confirm, MsgsToSend, AckNackMsgs) -> ConfirmState = case Confirm of true -> amqp_channel:register_confirm_handler(Channel, self()), @@ -45,25 +52,27 @@ start(Channel, Queue, TestPid, Confirm, MsgsToSend) -> end, TestPid ! {self(), started}, error_logger:info_msg("publishing ~w msgs on ~p~n", [MsgsToSend, Channel]), - producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend). + producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend, AckNackMsgs). %% %% Private API %% -producer(_Channel, _Queue, TestPid, none, 0) -> +producer(_Channel, _Queue, TestPid, none, 0, _AckNackMsgs) -> TestPid ! {self(), ok}; -producer(Channel, _Queue, TestPid, ConfirmState, 0) -> +producer(Channel, _Queue, TestPid, ConfirmState, 0, {AckMsg, NackMsg}) -> error_logger:info_msg("awaiting confirms on channel ~p~n", [Channel]), - Msg = case drain_confirms(no_nacks, ConfirmState) of - no_nacks -> ok; - nacks -> {error, received_nacks}; + Msg = case drain_confirms(none, ConfirmState) of + %% No acks or nacks + acks -> AckMsg; + nacks -> NackMsg; + mix -> {error, received_both_acks_and_nacks}; {Nacks, CS} -> {error, {missing_confirms, Nacks, lists:sort(gb_trees:keys(CS))}} end, TestPid ! {self(), Msg}; -producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend) -> +producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend, AckNackMsgs) -> Method = #'basic.publish'{exchange = <<"">>, routing_key = Queue, mandatory = false, @@ -76,7 +85,7 @@ producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend) -> payload = list_to_binary( integer_to_list(MsgsToSend))}), - producer(Channel, Queue, TestPid, ConfirmState1, MsgsToSend - 1). + producer(Channel, Queue, TestPid, ConfirmState1, MsgsToSend - 1, AckNackMsgs). maybe_record_confirm(none, _, _) -> none; @@ -84,22 +93,34 @@ maybe_record_confirm(ConfirmState, Channel, MsgsToSend) -> SeqNo = amqp_channel:next_publish_seqno(Channel), gb_trees:insert(SeqNo, MsgsToSend, ConfirmState). -drain_confirms(Nacks, ConfirmState) -> +drain_confirms(Collected, ConfirmState) -> case gb_trees:is_empty(ConfirmState) of - true -> Nacks; + true -> Collected; false -> receive #'basic.ack'{delivery_tag = DeliveryTag, multiple = IsMulti} -> - drain_confirms(Nacks, + Collected1 = case Collected of + none -> acks; + acks -> acks; + nacks -> mix; + mix -> mix + end, + drain_confirms(Collected1, delete_confirms(DeliveryTag, IsMulti, ConfirmState)); #'basic.nack'{delivery_tag = DeliveryTag, multiple = IsMulti} -> - drain_confirms(nacks, + Collected1 = case Collected of + none -> nacks; + nacks -> nacks; + acks -> mix; + mix -> mix + end, + drain_confirms(Collected1, delete_confirms(DeliveryTag, IsMulti, ConfirmState)) after - 60000 -> {Nacks, ConfirmState} + 60000 -> {Collected, ConfirmState} end end. diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl index a0499b9d59..297b53b713 100644 --- a/test/simple_ha_SUITE.erl +++ b/test/simple_ha_SUITE.erl @@ -44,7 +44,8 @@ groups() -> auto_resume_no_ccn_client, confirms_survive_stop, confirms_survive_sigkill, - confirms_survive_policy + confirms_survive_policy, + rejects_survive_stop ]} ]. @@ -156,6 +157,10 @@ confirms_survive_stop(Cf) -> confirms_survive(Cf, fun stop/2). confirms_survive_sigkill(Cf) -> confirms_survive(Cf, fun sigkill/2). confirms_survive_policy(Cf) -> confirms_survive(Cf, fun policy/2). +rejects_survive_stop(Cf) -> rejecets_survive(Cf, fun stop/2). +rejects_survive_sigkill(Cf) -> rejecets_survive(Cf, fun sigkill/2). +rejects_survive_policy(Cf) -> rejecets_survive(Cf, fun policy/2). + %%---------------------------------------------------------------------------- consume_survives(Config, DeathFun, CancelOnFailover) -> @@ -213,6 +218,38 @@ confirms_survive(Config, DeathFun) -> rabbit_ha_test_producer:await_response(ProducerPid), ok. +rejecets_survive(Config, DeathFun) -> + [A, B, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Msgs = rabbit_ct_helpers:cover_work_factor(Config, 20000), + Node1Channel = rabbit_ct_client_helpers:open_channel(Config, A), + Node2Channel = rabbit_ct_client_helpers:open_channel(Config, B), + + %% declare the queue on the master, mirrored to the two slaves + Queue = <<"test_rejects">>, + amqp_channel:call(Node1Channel,#'queue.declare'{queue = Queue, + auto_delete = false, + durable = true, + arguments = [{<<"x-max-length">>, long, 1}, + {<<"x-overflow">>, longstr, <<"reject_publish">>}]}), + Payload = <<"there can be only one">>, + amqp_channel:call(Node1Channel, + #'basic.publish'{routing_key = Queue}, + #amqp_msg{payload = Payload}), + + %% send a bunch of messages from the producer. Tolerating nacks. + ProducerPid = rabbit_ha_test_producer:create(Node2Channel, Queue, + self(), true, Msgs, nacks), + DeathFun(Config, A), + rabbit_ha_test_producer:await_response(ProducerPid), + + {#'basic.get_ok'{}, #amqp_msg{payload = Payload}} = + amqp_channel:call(Node2Channel, #'basic.get'{queue = Queue}), + %% There is only one message. + #'basic.get_empty'{} = amqp_channel:call(Node2Channel, #'basic.get'{queue = Queue}), + ok. + + + stop(Config, Node) -> rabbit_ct_broker_helpers:stop_node_after(Config, Node, 50). diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index dd8cd48b5a..951cf6f213 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -33,6 +33,9 @@ all() -> ]. groups() -> + MaxLengthTests = [max_length_drop_head, + max_length_reject_confirm, + max_length_drop_publish], [ {parallel_tests, [parallel], [ amqp_connection_refusal, @@ -48,7 +51,10 @@ groups() -> ]}, set_disk_free_limit_command, set_vm_memory_high_watermark_command, - topic_matching + topic_matching, + {queue_max_length, [], [ + {max_length_simple, [], MaxLengthTests}, + {max_length_mirrored, [], MaxLengthTests}]} ]} ]. @@ -63,6 +69,11 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). +init_per_group(max_length_mirrored, Config) -> + rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, + <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), + Config1 = rabbit_ct_helpers:set_config(Config, [{is_mirrored, true}]), + rabbit_ct_helpers:run_steps(Config1, []); init_per_group(Group, Config) -> case lists:member({group, Group}, all()) of true -> @@ -91,6 +102,18 @@ setup_file_handle_cache1() -> ok = file_handle_cache:set_limit(10), ok. +end_per_group(max_length_mirrored, Config) -> + rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"^max_length.*queue">>), + Config1 = rabbit_ct_helpers:set_config(Config, [{is_mirrored, false}]), + Config1; +end_per_group(queue_max_length, Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_reject_queue">>}), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_publish_queue">>}), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_head_queue">>}), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_default_drop_head_queue">>}), + rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), + Config; end_per_group(Group, Config) -> case lists:member({group, Group}, all()) of true -> @@ -1018,6 +1041,161 @@ set_vm_memory_high_watermark_command1(_Config) -> ) end. +max_length_drop_head(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = <<"max_length_drop_head_queue">>, + QNameDefault = <<"max_length_default_drop_head_queue">>, + QNameBytes = <<"max_length_bytes_drop_head_queue">>, + QNameDefaultBytes = <<"max_length_bytes_default_drop_head_queue">>, + + MaxLengthArgs = [{<<"x-max-length">>, long, 1}], + MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], + OverflowArgs = [{<<"x-overflow">>, longstr, <<"drop_head">>}], + amqp_channel:call(Ch, #'queue.delete'{queue = QName}), + amqp_channel:call(Ch, #'queue.delete'{queue = QNameDefault}), + amqp_channel:call(Ch, #'queue.delete'{queue = QNameBytes}), + amqp_channel:call(Ch, #'queue.delete'{queue = QNameDefaultBytes}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefault, arguments = MaxLengthArgs}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefaultBytes, arguments = MaxLengthBytesArgs}), + + check_max_length_drops_head(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>), + check_max_length_drops_head(Config, QNameDefault, Ch, <<"1">>, <<"2">>, <<"3">>), + + %% 80 bytes payload + Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>, + Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>, + Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>, + check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3), + check_max_length_drops_head(Config, QNameDefault, Ch, Payload1, Payload2, Payload3). + +max_length_reject_confirm(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = <<"max_length_reject_queue">>, + QNameBytes = <<"max_length_bytes_reject_queue">>, + MaxLengthArgs = [{<<"x-max-length">>, long, 1}], + MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], + OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject_publish">>}], + amqp_channel:call(Ch, #'queue.delete'{queue = QName}), + amqp_channel:call(Ch, #'queue.delete'{queue = QNameBytes}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>), + check_max_length_rejects(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>), + + %% 80 bytes payload + Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>, + Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>, + Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>, + + check_max_length_drops_publish(Config, QNameBytes, Ch, Payload1, Payload2, Payload3), + check_max_length_rejects(Config, QNameBytes, Ch, Payload1, Payload2, Payload3). + +max_length_drop_publish(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = <<"max_length_drop_publish_queue">>, + QNameBytes = <<"max_length_bytes_drop_publish_queue">>, + MaxLengthArgs = [{<<"x-max-length">>, long, 1}], + MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], + OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject_publish">>}], + amqp_channel:call(Ch, #'queue.delete'{queue = QName}), + amqp_channel:call(Ch, #'queue.delete'{queue = QNameBytes}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}), + %% If confirms are not enable, publishes will still be dropped in reject_publish mode. + check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>), + + %% 80 bytes payload + Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>, + Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>, + Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>, + + check_max_length_drops_publish(Config, QNameBytes, Ch, Payload1, Payload2, Payload3). + +check_max_length_drops_publish(Config, QName, Ch, Payload1, Payload2, Payload3) -> + sync_mirrors(QName, Config), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + %% A single message is published and consumed + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + + %% Message 2 is dropped, message 1 stays + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + + %% Messages 2 and 3 are dropped, message 1 stays + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). + +check_max_length_rejects(Config, QName, Ch, Payload1, Payload2, Payload3) -> + sync_mirrors(QName, Config), + amqp_channel:register_confirm_handler(Ch, self()), + flush(), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + %% First message can be enqueued and acks + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + receive #'basic.ack'{} -> ok + after 1000 -> error(expected_ack) + end, + + %% The message cannot be enqueued and nacks + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + receive #'basic.nack'{} -> ok + after 1000 -> error(expected_nack) + end, + + %% The message cannot be enqueued and nacks + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}), + receive #'basic.nack'{} -> ok + after 1000 -> error(expected_nack) + end, + + {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + + %% Now we can publish message 2. + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + receive #'basic.ack'{} -> ok + after 1000 -> error(expected_ack) + end, + + {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). + +check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) -> + sync_mirrors(QName, Config), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + %% A single message is published and consumed + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + + %% Message 1 is replaced by message 2 + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + + %% Messages 1 and 2 are replaced + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload3}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). + +sync_mirrors(QName, Config) -> + case ?config(is_mirrored, Config) of + true -> + rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, [<<"sync_queue">>, QName]); + _ -> ok + end. + %% --------------------------------------------------------------------------- %% rabbitmqctl helpers. %% --------------------------------------------------------------------------- @@ -1031,3 +1209,8 @@ expand_options(As, Bs) -> false -> [A | R] end end, Bs, As). + +flush() -> + receive _ -> flush() + after 10 -> ok + end.
\ No newline at end of file |
