diff options
Diffstat (limited to 'test')
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 54 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 117 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 21 |
3 files changed, 166 insertions, 26 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index be3b46fbfb..ccb1e1f4d8 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -113,6 +113,7 @@ all_tests() -> subscribe_redelivery_count, message_bytes_metrics, queue_length_limit_drop_head, + queue_length_limit_reject_publish, subscribe_redelivery_limit, subscribe_redelivery_policy, subscribe_redelivery_limit_with_dead_letter, @@ -612,14 +613,16 @@ publish_confirm(Ch, QName) -> publish(Ch, QName), amqp_channel:register_confirm_handler(Ch, self()), ct:pal("waiting for confirms from ~s", [QName]), - ok = receive - #'basic.ack'{} -> ok; - #'basic.nack'{} -> fail - after 2500 -> - exit(confirm_timeout) - end, - ct:pal("CONFIRMED! ~s", [QName]), - ok. + receive + #'basic.ack'{} -> + ct:pal("CONFIRMED! ~s", [QName]), + ok; + #'basic.nack'{} -> + ct:pal("NOT CONFIRMED! ~s", [QName]), + fail + after 2500 -> + exit(confirm_timeout) + end. publish_and_restart(Config) -> %% Test the node restart with both types of queues (quorum and classic) to @@ -1244,13 +1247,13 @@ simple_confirm_availability_on_leader_change(Config) -> %% open a channel to another node Ch = rabbit_ct_client_helpers:open_channel(Config, Node1), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), - publish_confirm(Ch, QQ), + ok = publish_confirm(Ch, QQ), %% stop the node hosting the leader ok = rabbit_ct_broker_helpers:stop_node(Config, Node2), %% this should not fail as the channel should detect the new leader and %% resend to that - publish_confirm(Ch, QQ), + ok = publish_confirm(Ch, QQ), ok = rabbit_ct_broker_helpers:start_node(Config, Node2), ok. @@ -1270,7 +1273,7 @@ confirm_availability_on_leader_change(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config, Node1), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), ConfirmLoop = fun Loop() -> - publish_confirm(Ch, QQ), + ok = publish_confirm(Ch, QQ), receive {done, P} -> P ! done, ok @@ -2020,6 +2023,35 @@ queue_length_limit_drop_head(Config) -> amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = true})). +queue_length_limit_reject_publish(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + RaName = ra_name(QQ), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-length">>, long, 1}, + {<<"x-overflow">>, longstr, <<"reject-publish">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + ok = publish_confirm(Ch, QQ), + ok = publish_confirm(Ch, QQ), + %% give the channel some time to process the async reject_publish notification + %% now that we are over the limit it should start failing + wait_for_messages_total(Servers, RaName, 2), + fail = publish_confirm(Ch, QQ), + %% remove all messages + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = _}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = _}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})), + %% publish should be allowed again now + ok = publish_confirm(Ch, QQ), + ok. + queue_length_in_memory_limit_basic_get(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index e3dfb29e7d..ceab563865 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -59,19 +59,24 @@ end_per_testcase(_TestCase, _Config) -> ?ASSERT_EFF(EfxPat, true, Effects)). -define(ASSERT_EFF(EfxPat, Guard, Effects), - ?assert(lists:any(fun (EfxPat) when Guard -> true; - (_) -> false - end, Effects))). + ?assert(lists:any(fun (EfxPat) when Guard -> true; + (_) -> false + end, Effects))). -define(ASSERT_NO_EFF(EfxPat, Effects), - ?assert(not lists:any(fun (EfxPat) -> true; - (_) -> false - end, Effects))). + ?assert(not lists:any(fun (EfxPat) -> true; + (_) -> false + end, Effects))). + +-define(ASSERT_NO_EFF(EfxPat, Guard, Effects), + ?assert(not lists:any(fun (EfxPat) when Guard -> true; + (_) -> false + end, Effects))). -define(assertNoEffect(EfxPat, Effects), - ?assert(not lists:any(fun (EfxPat) -> true; - (_) -> false - end, Effects))). + ?assert(not lists:any(fun (EfxPat) -> true; + (_) -> false + end, Effects))). test_init(Name) -> init(#{name => Name, @@ -1258,6 +1263,99 @@ single_active_with_credited_test(_) -> State3#rabbit_fifo.waiting_consumers), ok. + +register_enqueuer_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + max_length => 2, + overflow_strategy => reject_publish}), + %% simply registering should be ok when we're below limit + Pid1 = test_util:fake_pid(node()), + {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0), + + {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1), + %% register another enqueuer shoudl be ok + Pid2 = test_util:fake_pid(node()), + {State3, ok, [_]} = apply(meta(3), make_register_enqueuer(Pid2), State2), + + {State4, ok, _} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 2, two), State3), + {State5, ok, Efx} = apply(meta(5), rabbit_fifo:make_enqueue(Pid1, 3, three), State4), + % ct:pal("Efx ~p", [Efx]), + %% validate all registered enqueuers are notified of overflow state + ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx), + ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid2, Efx), + + %% this time, registry should return reject_publish + {State6, reject_publish, [_]} = apply(meta(6), make_register_enqueuer( + test_util:fake_pid(node())), State5), + ?assertMatch(#{num_enqueuers := 3}, rabbit_fifo:overview(State6)), + + + %% remove two messages this should make the queue fall below the 0.8 limit + {State7, {dequeue, _, _}, _Efx7} = + apply(meta(7), + rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State6), + ct:pal("Efx7 ~p", [_Efx7]), + {State8, {dequeue, _, _}, Efx8} = + apply(meta(8), + rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State7), + ct:pal("Efx8 ~p", [Efx8]), + %% validate all registered enqueuers are notified of overflow state + ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid1, Efx8), + ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid2, Efx8), + {_State9, {dequeue, _, _}, Efx9} = + apply(meta(9), + rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State8), + ?ASSERT_NO_EFF({send_msg, P, go, [ra_event]}, P == Pid1, Efx9), + ?ASSERT_NO_EFF({send_msg, P, go, [ra_event]}, P == Pid2, Efx9), + ok. + +reject_publish_purge_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + max_length => 2, + overflow_strategy => reject_publish}), + %% simply registering should be ok when we're below limit + Pid1 = test_util:fake_pid(node()), + {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0), + {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1), + {State3, ok, _} = apply(meta(3), rabbit_fifo:make_enqueue(Pid1, 2, two), State2), + {State4, ok, Efx} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 3, three), State3), + % ct:pal("Efx ~p", [Efx]), + ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx), + {_State5, {purge, 3}, Efx1} = apply(meta(5), rabbit_fifo:make_purge(), State4), + ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid1, Efx1), + ok. + +reject_publish_applied_after_limit_test(_) -> + InitConf = #{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)) + }, + State0 = init(InitConf), + %% simply registering should be ok when we're below limit + Pid1 = test_util:fake_pid(node()), + {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0), + {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1), + {State3, ok, _} = apply(meta(3), rabbit_fifo:make_enqueue(Pid1, 2, two), State2), + {State4, ok, Efx} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 3, three), State3), + % ct:pal("Efx ~p", [Efx]), + ?ASSERT_NO_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx), + %% apply new config + Conf = #{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + max_length => 2, + overflow_strategy => reject_publish + }, + {State5, ok, Efx1} = apply(meta(5), rabbit_fifo:make_update_config(Conf), State4), + ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx1), + Pid2 = test_util:fake_pid(node()), + {_State6, reject_publish, _} = apply(meta(1), make_register_enqueuer(Pid2), State5), + ok. + purge_nodes_test(_) -> Node = purged@node, ThisNode = node(), @@ -1412,6 +1510,7 @@ machine_version_test(_) -> %% Utility init(Conf) -> rabbit_fifo:init(Conf). +make_register_enqueuer(Pid) -> rabbit_fifo:make_register_enqueuer(Pid). apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State). init_aux(Conf) -> rabbit_fifo:init_aux(Conf). handle_aux(S, T, C, A, L, M) -> rabbit_fifo:handle_aux(S, T, C, A, L, M). diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index 23522e71f9..dd2c7154d0 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -479,15 +479,17 @@ test_run_log(_Config) -> snapshots(_Config) -> run_proper( fun () -> - ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength, - InMemoryBytes}, - frequency([{10, {0, 0, false, 0, 0, 0}}, + ?FORALL({Length, Bytes, SingleActiveConsumer, + DeliveryLimit, InMemoryLength, InMemoryBytes, + Overflow}, + frequency([{10, {0, 0, false, 0, 0, 0, drop_head}}, {5, {oneof([range(1, 10), undefined]), oneof([range(1, 1000), undefined]), boolean(), oneof([range(1, 3), undefined]), oneof([range(1, 10), undefined]), - oneof([range(1, 1000), undefined]) + oneof([range(1, 1000), undefined]), + oneof([drop_head, reject_publish]) }}]), begin Config = config(?FUNCTION_NAME, @@ -496,7 +498,8 @@ snapshots(_Config) -> SingleActiveConsumer, DeliveryLimit, InMemoryLength, - InMemoryBytes), + InMemoryBytes, + Overflow), ?FORALL(O, ?LET(Ops, log_gen(256), expand(Ops, Config)), collect({log_size, length(O)}, snapshots_prop(Config, O))) @@ -681,6 +684,11 @@ max_length(_Config) -> config(Name, Length, Bytes, SingleActive, DeliveryLimit, InMemoryLength, InMemoryBytes) -> +config(Name, Length, Bytes, SingleActive, DeliveryLimit, + InMemoryLength, InMemoryBytes, drop_head). + +config(Name, Length, Bytes, SingleActive, DeliveryLimit, + InMemoryLength, InMemoryBytes, Overflow) -> #{name => Name, max_length => map_max(Length), max_bytes => map_max(Bytes), @@ -688,7 +696,8 @@ config(Name, Length, Bytes, SingleActive, DeliveryLimit, single_active_consumer_on => SingleActive, delivery_limit => map_max(DeliveryLimit), max_in_memory_length => map_max(InMemoryLength), - max_in_memory_bytes => map_max(InMemoryBytes)}. + max_in_memory_bytes => map_max(InMemoryBytes), + overflow_strategy => Overflow}. map_max(0) -> undefined; map_max(N) -> N. |
