diff options
-rw-r--r-- | test/unit_inbroker_parallel_SUITE.erl | 71 |
1 files changed, 25 insertions, 46 deletions
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index 2a5e04c870..45a889cb20 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -248,30 +248,6 @@ variable_queue_init(Q, Recover) -> false -> new end, fun nop/2, fun nop/2, fun nop/1, fun nop/1). -publish_and_confirm(Q, Payload, Count) -> - Seqs = lists:seq(1, Count), - [begin - Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{delivery_mode = 2}, - Payload), - Delivery = #delivery{mandatory = false, sender = self(), - confirm = true, message = Msg, msg_seq_no = Seq, - flow = noflow}, - _QPids = rabbit_amqqueue:deliver([Q], Delivery) - end || Seq <- Seqs], - wait_for_confirms(gb_sets:from_list(Seqs)). - -wait_for_confirms(Unconfirmed) -> - case gb_sets:is_empty(Unconfirmed) of - true -> ok; - false -> receive {'$gen_cast', {confirm, Confirmed, _}} -> - wait_for_confirms( - rabbit_misc:gb_sets_difference( - Unconfirmed, gb_sets:from_list(Confirmed))) - after ?TIMEOUT -> exit(timeout_waiting_for_confirm) - end - end. - test_amqqueue(Durable) -> rabbit_amqqueue:pseudo_queue(test_queue(), self(), Durable). @@ -1207,21 +1183,26 @@ max_length_drop_publish_requeue(Config) -> check_max_length_requeue(Config, QName, Ch, Payload1, Payload2) -> sync_mirrors(QName, Config), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% A single message is published and consumed amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), - wait_for_consensus(QName, Config), + amqp_channel:wait_for_confirms(Ch, 5000), + {#'basic.get_ok'{delivery_tag = DeliveryTag}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% Another message is published amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), - wait_for_consensus(QName, Config), + amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = true}), - wait_for_consensus(QName, Config), {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). @@ -1244,41 +1225,33 @@ max_length_bytes_drop_publish(Config) -> check_max_length_drops_publish(Config, QName, Ch, Payload1, Payload2, Payload3) -> sync_mirrors(QName, Config), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% A single message is published and consumed amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), - wait_for_consensus(QName, Config), + amqp_channel:wait_for_confirms(Ch, 5000), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% Message 2 is dropped, message 1 stays amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), - wait_for_consensus(QName, Config), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), - wait_for_consensus(QName, Config), + amqp_channel:wait_for_confirms(Ch, 5000), {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% Messages 2 and 3 are dropped, message 1 stays amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), - wait_for_consensus(QName, Config), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), - wait_for_consensus(QName, Config), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}), - wait_for_consensus(QName, Config), + amqp_channel:wait_for_confirms(Ch, 5000), {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). -wait_for_consensus(QName, Config) -> - case lists:keyfind(<<"x-queue-type">>, 1, ?config(queue_args, Config)) of - {_, _, <<"quorum">>} -> - Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - RaName = binary_to_atom(<<"%2F_", QName/binary>>, utf8), - {ok, _, _} = ra:members({RaName, Server}); - _ -> - ok - end. - check_max_length_rejects(Config, QName, Ch, Payload1, Payload2, Payload3) -> sync_mirrors(QName, Config), amqp_channel:register_confirm_handler(Ch, self()), @@ -1314,17 +1287,23 @@ check_max_length_rejects(Config, QName, Ch, Payload1, Payload2, Payload3) -> check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) -> sync_mirrors(QName, Config), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% A single message is published and consumed amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), - wait_for_consensus(QName, Config), + amqp_channel:wait_for_confirms(Ch, 5000), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% Message 1 is replaced by message 2 amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), - wait_for_consensus(QName, Config), + amqp_channel:wait_for_confirms(Ch, 5000), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), @@ -1332,7 +1311,7 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) -> amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}), - wait_for_consensus(QName, Config), + amqp_channel:wait_for_confirms(Ch, 5000), {#'basic.get_ok'{}, #amqp_msg{payload = Payload3}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). |