diff options
| author | kjnilsson <knilsson@pivotal.io> | 2020-07-10 10:20:25 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2020-09-07 09:42:10 +0100 |
| commit | c7fcee5a1f317be3f589fa739b9dbf1b6f7253ce (patch) | |
| tree | 61e6ddfc2504bbc0b9f5caeb7a656c68d593d076 /test | |
| parent | 6c7970e674772457f4f3cd8c66ca21b9ac5102a3 (diff) | |
| download | rabbitmq-server-git-c7fcee5a1f317be3f589fa739b9dbf1b6f7253ce.tar.gz | |
Implement reject_publish for QQs
The reject publish overflow strategy for quorum queues is an inexact
implementation that relies on the cooperation of publishing channels.
When a channel first wants to publish to a quorum queue it first issues
a synchonous register_enqueuer command which will return the current
queue overflow state as reject_publish if the queue is full.
The queue will also notify any active enqueuers when it reaches the
limit but will continue to accept any enqueues it receives after that.
Once the queue size goes below 80% of the limit(s) the queue will again
notify enqueuers that they can resume publishin inte the queue.
Diffstat (limited to 'test')
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 54 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 117 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 21 |
3 files changed, 166 insertions, 26 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index be3b46fbfb..ccb1e1f4d8 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -113,6 +113,7 @@ all_tests() -> subscribe_redelivery_count, message_bytes_metrics, queue_length_limit_drop_head, + queue_length_limit_reject_publish, subscribe_redelivery_limit, subscribe_redelivery_policy, subscribe_redelivery_limit_with_dead_letter, @@ -612,14 +613,16 @@ publish_confirm(Ch, QName) -> publish(Ch, QName), amqp_channel:register_confirm_handler(Ch, self()), ct:pal("waiting for confirms from ~s", [QName]), - ok = receive - #'basic.ack'{} -> ok; - #'basic.nack'{} -> fail - after 2500 -> - exit(confirm_timeout) - end, - ct:pal("CONFIRMED! ~s", [QName]), - ok. + receive + #'basic.ack'{} -> + ct:pal("CONFIRMED! ~s", [QName]), + ok; + #'basic.nack'{} -> + ct:pal("NOT CONFIRMED! ~s", [QName]), + fail + after 2500 -> + exit(confirm_timeout) + end. publish_and_restart(Config) -> %% Test the node restart with both types of queues (quorum and classic) to @@ -1244,13 +1247,13 @@ simple_confirm_availability_on_leader_change(Config) -> %% open a channel to another node Ch = rabbit_ct_client_helpers:open_channel(Config, Node1), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), - publish_confirm(Ch, QQ), + ok = publish_confirm(Ch, QQ), %% stop the node hosting the leader ok = rabbit_ct_broker_helpers:stop_node(Config, Node2), %% this should not fail as the channel should detect the new leader and %% resend to that - publish_confirm(Ch, QQ), + ok = publish_confirm(Ch, QQ), ok = rabbit_ct_broker_helpers:start_node(Config, Node2), ok. @@ -1270,7 +1273,7 @@ confirm_availability_on_leader_change(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config, Node1), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), ConfirmLoop = fun Loop() -> - publish_confirm(Ch, QQ), + ok = publish_confirm(Ch, QQ), receive {done, P} -> P ! done, ok @@ -2020,6 +2023,35 @@ queue_length_limit_drop_head(Config) -> amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = true})). +queue_length_limit_reject_publish(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + RaName = ra_name(QQ), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-length">>, long, 1}, + {<<"x-overflow">>, longstr, <<"reject-publish">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + ok = publish_confirm(Ch, QQ), + ok = publish_confirm(Ch, QQ), + %% give the channel some time to process the async reject_publish notification + %% now that we are over the limit it should start failing + wait_for_messages_total(Servers, RaName, 2), + fail = publish_confirm(Ch, QQ), + %% remove all messages + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = _}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = _}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})), + %% publish should be allowed again now + ok = publish_confirm(Ch, QQ), + ok. + queue_length_in_memory_limit_basic_get(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index e3dfb29e7d..ceab563865 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -59,19 +59,24 @@ end_per_testcase(_TestCase, _Config) -> ?ASSERT_EFF(EfxPat, true, Effects)). -define(ASSERT_EFF(EfxPat, Guard, Effects), - ?assert(lists:any(fun (EfxPat) when Guard -> true; - (_) -> false - end, Effects))). + ?assert(lists:any(fun (EfxPat) when Guard -> true; + (_) -> false + end, Effects))). -define(ASSERT_NO_EFF(EfxPat, Effects), - ?assert(not lists:any(fun (EfxPat) -> true; - (_) -> false - end, Effects))). + ?assert(not lists:any(fun (EfxPat) -> true; + (_) -> false + end, Effects))). + +-define(ASSERT_NO_EFF(EfxPat, Guard, Effects), + ?assert(not lists:any(fun (EfxPat) when Guard -> true; + (_) -> false + end, Effects))). -define(assertNoEffect(EfxPat, Effects), - ?assert(not lists:any(fun (EfxPat) -> true; - (_) -> false - end, Effects))). + ?assert(not lists:any(fun (EfxPat) -> true; + (_) -> false + end, Effects))). test_init(Name) -> init(#{name => Name, @@ -1258,6 +1263,99 @@ single_active_with_credited_test(_) -> State3#rabbit_fifo.waiting_consumers), ok. + +register_enqueuer_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + max_length => 2, + overflow_strategy => reject_publish}), + %% simply registering should be ok when we're below limit + Pid1 = test_util:fake_pid(node()), + {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0), + + {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1), + %% register another enqueuer shoudl be ok + Pid2 = test_util:fake_pid(node()), + {State3, ok, [_]} = apply(meta(3), make_register_enqueuer(Pid2), State2), + + {State4, ok, _} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 2, two), State3), + {State5, ok, Efx} = apply(meta(5), rabbit_fifo:make_enqueue(Pid1, 3, three), State4), + % ct:pal("Efx ~p", [Efx]), + %% validate all registered enqueuers are notified of overflow state + ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx), + ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid2, Efx), + + %% this time, registry should return reject_publish + {State6, reject_publish, [_]} = apply(meta(6), make_register_enqueuer( + test_util:fake_pid(node())), State5), + ?assertMatch(#{num_enqueuers := 3}, rabbit_fifo:overview(State6)), + + + %% remove two messages this should make the queue fall below the 0.8 limit + {State7, {dequeue, _, _}, _Efx7} = + apply(meta(7), + rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State6), + ct:pal("Efx7 ~p", [_Efx7]), + {State8, {dequeue, _, _}, Efx8} = + apply(meta(8), + rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State7), + ct:pal("Efx8 ~p", [Efx8]), + %% validate all registered enqueuers are notified of overflow state + ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid1, Efx8), + ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid2, Efx8), + {_State9, {dequeue, _, _}, Efx9} = + apply(meta(9), + rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State8), + ?ASSERT_NO_EFF({send_msg, P, go, [ra_event]}, P == Pid1, Efx9), + ?ASSERT_NO_EFF({send_msg, P, go, [ra_event]}, P == Pid2, Efx9), + ok. + +reject_publish_purge_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + max_length => 2, + overflow_strategy => reject_publish}), + %% simply registering should be ok when we're below limit + Pid1 = test_util:fake_pid(node()), + {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0), + {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1), + {State3, ok, _} = apply(meta(3), rabbit_fifo:make_enqueue(Pid1, 2, two), State2), + {State4, ok, Efx} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 3, three), State3), + % ct:pal("Efx ~p", [Efx]), + ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx), + {_State5, {purge, 3}, Efx1} = apply(meta(5), rabbit_fifo:make_purge(), State4), + ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid1, Efx1), + ok. + +reject_publish_applied_after_limit_test(_) -> + InitConf = #{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)) + }, + State0 = init(InitConf), + %% simply registering should be ok when we're below limit + Pid1 = test_util:fake_pid(node()), + {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0), + {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1), + {State3, ok, _} = apply(meta(3), rabbit_fifo:make_enqueue(Pid1, 2, two), State2), + {State4, ok, Efx} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 3, three), State3), + % ct:pal("Efx ~p", [Efx]), + ?ASSERT_NO_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx), + %% apply new config + Conf = #{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + max_length => 2, + overflow_strategy => reject_publish + }, + {State5, ok, Efx1} = apply(meta(5), rabbit_fifo:make_update_config(Conf), State4), + ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx1), + Pid2 = test_util:fake_pid(node()), + {_State6, reject_publish, _} = apply(meta(1), make_register_enqueuer(Pid2), State5), + ok. + purge_nodes_test(_) -> Node = purged@node, ThisNode = node(), @@ -1412,6 +1510,7 @@ machine_version_test(_) -> %% Utility init(Conf) -> rabbit_fifo:init(Conf). +make_register_enqueuer(Pid) -> rabbit_fifo:make_register_enqueuer(Pid). apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State). init_aux(Conf) -> rabbit_fifo:init_aux(Conf). handle_aux(S, T, C, A, L, M) -> rabbit_fifo:handle_aux(S, T, C, A, L, M). diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index 23522e71f9..dd2c7154d0 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -479,15 +479,17 @@ test_run_log(_Config) -> snapshots(_Config) -> run_proper( fun () -> - ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength, - InMemoryBytes}, - frequency([{10, {0, 0, false, 0, 0, 0}}, + ?FORALL({Length, Bytes, SingleActiveConsumer, + DeliveryLimit, InMemoryLength, InMemoryBytes, + Overflow}, + frequency([{10, {0, 0, false, 0, 0, 0, drop_head}}, {5, {oneof([range(1, 10), undefined]), oneof([range(1, 1000), undefined]), boolean(), oneof([range(1, 3), undefined]), oneof([range(1, 10), undefined]), - oneof([range(1, 1000), undefined]) + oneof([range(1, 1000), undefined]), + oneof([drop_head, reject_publish]) }}]), begin Config = config(?FUNCTION_NAME, @@ -496,7 +498,8 @@ snapshots(_Config) -> SingleActiveConsumer, DeliveryLimit, InMemoryLength, - InMemoryBytes), + InMemoryBytes, + Overflow), ?FORALL(O, ?LET(Ops, log_gen(256), expand(Ops, Config)), collect({log_size, length(O)}, snapshots_prop(Config, O))) @@ -681,6 +684,11 @@ max_length(_Config) -> config(Name, Length, Bytes, SingleActive, DeliveryLimit, InMemoryLength, InMemoryBytes) -> +config(Name, Length, Bytes, SingleActive, DeliveryLimit, + InMemoryLength, InMemoryBytes, drop_head). + +config(Name, Length, Bytes, SingleActive, DeliveryLimit, + InMemoryLength, InMemoryBytes, Overflow) -> #{name => Name, max_length => map_max(Length), max_bytes => map_max(Bytes), @@ -688,7 +696,8 @@ config(Name, Length, Bytes, SingleActive, DeliveryLimit, single_active_consumer_on => SingleActive, delivery_limit => map_max(DeliveryLimit), max_in_memory_length => map_max(InMemoryLength), - max_in_memory_bytes => map_max(InMemoryBytes)}. + max_in_memory_bytes => map_max(InMemoryBytes), + overflow_strategy => Overflow}. map_max(0) -> undefined; map_max(N) -> N. |
