summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/quorum_queue_SUITE.erl54
-rw-r--r--test/rabbit_fifo_SUITE.erl117
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl21
3 files changed, 166 insertions, 26 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index be3b46fbfb..ccb1e1f4d8 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -113,6 +113,7 @@ all_tests() ->
subscribe_redelivery_count,
message_bytes_metrics,
queue_length_limit_drop_head,
+ queue_length_limit_reject_publish,
subscribe_redelivery_limit,
subscribe_redelivery_policy,
subscribe_redelivery_limit_with_dead_letter,
@@ -612,14 +613,16 @@ publish_confirm(Ch, QName) ->
publish(Ch, QName),
amqp_channel:register_confirm_handler(Ch, self()),
ct:pal("waiting for confirms from ~s", [QName]),
- ok = receive
- #'basic.ack'{} -> ok;
- #'basic.nack'{} -> fail
- after 2500 ->
- exit(confirm_timeout)
- end,
- ct:pal("CONFIRMED! ~s", [QName]),
- ok.
+ receive
+ #'basic.ack'{} ->
+ ct:pal("CONFIRMED! ~s", [QName]),
+ ok;
+ #'basic.nack'{} ->
+ ct:pal("NOT CONFIRMED! ~s", [QName]),
+ fail
+ after 2500 ->
+ exit(confirm_timeout)
+ end.
publish_and_restart(Config) ->
%% Test the node restart with both types of queues (quorum and classic) to
@@ -1244,13 +1247,13 @@ simple_confirm_availability_on_leader_change(Config) ->
%% open a channel to another node
Ch = rabbit_ct_client_helpers:open_channel(Config, Node1),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
- publish_confirm(Ch, QQ),
+ ok = publish_confirm(Ch, QQ),
%% stop the node hosting the leader
ok = rabbit_ct_broker_helpers:stop_node(Config, Node2),
%% this should not fail as the channel should detect the new leader and
%% resend to that
- publish_confirm(Ch, QQ),
+ ok = publish_confirm(Ch, QQ),
ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
ok.
@@ -1270,7 +1273,7 @@ confirm_availability_on_leader_change(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Node1),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
ConfirmLoop = fun Loop() ->
- publish_confirm(Ch, QQ),
+ ok = publish_confirm(Ch, QQ),
receive {done, P} ->
P ! done,
ok
@@ -2020,6 +2023,35 @@ queue_length_limit_drop_head(Config) ->
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = true})).
+queue_length_limit_reject_publish(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ RaName = ra_name(QQ),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-length">>, long, 1},
+ {<<"x-overflow">>, longstr, <<"reject-publish">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ ok = publish_confirm(Ch, QQ),
+ ok = publish_confirm(Ch, QQ),
+ %% give the channel some time to process the async reject_publish notification
+ %% now that we are over the limit it should start failing
+ wait_for_messages_total(Servers, RaName, 2),
+ fail = publish_confirm(Ch, QQ),
+ %% remove all messages
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = _}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})),
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = _}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})),
+ %% publish should be allowed again now
+ ok = publish_confirm(Ch, QQ),
+ ok.
+
queue_length_in_memory_limit_basic_get(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index e3dfb29e7d..ceab563865 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -59,19 +59,24 @@ end_per_testcase(_TestCase, _Config) ->
?ASSERT_EFF(EfxPat, true, Effects)).
-define(ASSERT_EFF(EfxPat, Guard, Effects),
- ?assert(lists:any(fun (EfxPat) when Guard -> true;
- (_) -> false
- end, Effects))).
+ ?assert(lists:any(fun (EfxPat) when Guard -> true;
+ (_) -> false
+ end, Effects))).
-define(ASSERT_NO_EFF(EfxPat, Effects),
- ?assert(not lists:any(fun (EfxPat) -> true;
- (_) -> false
- end, Effects))).
+ ?assert(not lists:any(fun (EfxPat) -> true;
+ (_) -> false
+ end, Effects))).
+
+-define(ASSERT_NO_EFF(EfxPat, Guard, Effects),
+ ?assert(not lists:any(fun (EfxPat) when Guard -> true;
+ (_) -> false
+ end, Effects))).
-define(assertNoEffect(EfxPat, Effects),
- ?assert(not lists:any(fun (EfxPat) -> true;
- (_) -> false
- end, Effects))).
+ ?assert(not lists:any(fun (EfxPat) -> true;
+ (_) -> false
+ end, Effects))).
test_init(Name) ->
init(#{name => Name,
@@ -1258,6 +1263,99 @@ single_active_with_credited_test(_) ->
State3#rabbit_fifo.waiting_consumers),
ok.
+
+register_enqueuer_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ max_length => 2,
+ overflow_strategy => reject_publish}),
+ %% simply registering should be ok when we're below limit
+ Pid1 = test_util:fake_pid(node()),
+ {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0),
+
+ {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1),
+ %% register another enqueuer shoudl be ok
+ Pid2 = test_util:fake_pid(node()),
+ {State3, ok, [_]} = apply(meta(3), make_register_enqueuer(Pid2), State2),
+
+ {State4, ok, _} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 2, two), State3),
+ {State5, ok, Efx} = apply(meta(5), rabbit_fifo:make_enqueue(Pid1, 3, three), State4),
+ % ct:pal("Efx ~p", [Efx]),
+ %% validate all registered enqueuers are notified of overflow state
+ ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx),
+ ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid2, Efx),
+
+ %% this time, registry should return reject_publish
+ {State6, reject_publish, [_]} = apply(meta(6), make_register_enqueuer(
+ test_util:fake_pid(node())), State5),
+ ?assertMatch(#{num_enqueuers := 3}, rabbit_fifo:overview(State6)),
+
+
+ %% remove two messages this should make the queue fall below the 0.8 limit
+ {State7, {dequeue, _, _}, _Efx7} =
+ apply(meta(7),
+ rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State6),
+ ct:pal("Efx7 ~p", [_Efx7]),
+ {State8, {dequeue, _, _}, Efx8} =
+ apply(meta(8),
+ rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State7),
+ ct:pal("Efx8 ~p", [Efx8]),
+ %% validate all registered enqueuers are notified of overflow state
+ ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid1, Efx8),
+ ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid2, Efx8),
+ {_State9, {dequeue, _, _}, Efx9} =
+ apply(meta(9),
+ rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State8),
+ ?ASSERT_NO_EFF({send_msg, P, go, [ra_event]}, P == Pid1, Efx9),
+ ?ASSERT_NO_EFF({send_msg, P, go, [ra_event]}, P == Pid2, Efx9),
+ ok.
+
+reject_publish_purge_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ max_length => 2,
+ overflow_strategy => reject_publish}),
+ %% simply registering should be ok when we're below limit
+ Pid1 = test_util:fake_pid(node()),
+ {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0),
+ {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1),
+ {State3, ok, _} = apply(meta(3), rabbit_fifo:make_enqueue(Pid1, 2, two), State2),
+ {State4, ok, Efx} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 3, three), State3),
+ % ct:pal("Efx ~p", [Efx]),
+ ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx),
+ {_State5, {purge, 3}, Efx1} = apply(meta(5), rabbit_fifo:make_purge(), State4),
+ ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid1, Efx1),
+ ok.
+
+reject_publish_applied_after_limit_test(_) ->
+ InitConf = #{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8))
+ },
+ State0 = init(InitConf),
+ %% simply registering should be ok when we're below limit
+ Pid1 = test_util:fake_pid(node()),
+ {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0),
+ {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1),
+ {State3, ok, _} = apply(meta(3), rabbit_fifo:make_enqueue(Pid1, 2, two), State2),
+ {State4, ok, Efx} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 3, three), State3),
+ % ct:pal("Efx ~p", [Efx]),
+ ?ASSERT_NO_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx),
+ %% apply new config
+ Conf = #{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ max_length => 2,
+ overflow_strategy => reject_publish
+ },
+ {State5, ok, Efx1} = apply(meta(5), rabbit_fifo:make_update_config(Conf), State4),
+ ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx1),
+ Pid2 = test_util:fake_pid(node()),
+ {_State6, reject_publish, _} = apply(meta(1), make_register_enqueuer(Pid2), State5),
+ ok.
+
purge_nodes_test(_) ->
Node = purged@node,
ThisNode = node(),
@@ -1412,6 +1510,7 @@ machine_version_test(_) ->
%% Utility
init(Conf) -> rabbit_fifo:init(Conf).
+make_register_enqueuer(Pid) -> rabbit_fifo:make_register_enqueuer(Pid).
apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State).
init_aux(Conf) -> rabbit_fifo:init_aux(Conf).
handle_aux(S, T, C, A, L, M) -> rabbit_fifo:handle_aux(S, T, C, A, L, M).
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index 23522e71f9..dd2c7154d0 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -479,15 +479,17 @@ test_run_log(_Config) ->
snapshots(_Config) ->
run_proper(
fun () ->
- ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength,
- InMemoryBytes},
- frequency([{10, {0, 0, false, 0, 0, 0}},
+ ?FORALL({Length, Bytes, SingleActiveConsumer,
+ DeliveryLimit, InMemoryLength, InMemoryBytes,
+ Overflow},
+ frequency([{10, {0, 0, false, 0, 0, 0, drop_head}},
{5, {oneof([range(1, 10), undefined]),
oneof([range(1, 1000), undefined]),
boolean(),
oneof([range(1, 3), undefined]),
oneof([range(1, 10), undefined]),
- oneof([range(1, 1000), undefined])
+ oneof([range(1, 1000), undefined]),
+ oneof([drop_head, reject_publish])
}}]),
begin
Config = config(?FUNCTION_NAME,
@@ -496,7 +498,8 @@ snapshots(_Config) ->
SingleActiveConsumer,
DeliveryLimit,
InMemoryLength,
- InMemoryBytes),
+ InMemoryBytes,
+ Overflow),
?FORALL(O, ?LET(Ops, log_gen(256), expand(Ops, Config)),
collect({log_size, length(O)},
snapshots_prop(Config, O)))
@@ -681,6 +684,11 @@ max_length(_Config) ->
config(Name, Length, Bytes, SingleActive, DeliveryLimit,
InMemoryLength, InMemoryBytes) ->
+config(Name, Length, Bytes, SingleActive, DeliveryLimit,
+ InMemoryLength, InMemoryBytes, drop_head).
+
+config(Name, Length, Bytes, SingleActive, DeliveryLimit,
+ InMemoryLength, InMemoryBytes, Overflow) ->
#{name => Name,
max_length => map_max(Length),
max_bytes => map_max(Bytes),
@@ -688,7 +696,8 @@ config(Name, Length, Bytes, SingleActive, DeliveryLimit,
single_active_consumer_on => SingleActive,
delivery_limit => map_max(DeliveryLimit),
max_in_memory_length => map_max(InMemoryLength),
- max_in_memory_bytes => map_max(InMemoryBytes)}.
+ max_in_memory_bytes => map_max(InMemoryBytes),
+ overflow_strategy => Overflow}.
map_max(0) -> undefined;
map_max(N) -> N.