summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2019-01-15 16:38:36 +0000
committerkjnilsson <knilsson@pivotal.io>2019-01-24 14:44:27 +0000
commit08bded2fa243a15a9c7ced2b040066ad586104f1 (patch)
tree5ad118b2cd3238605038a52ffcbd6a6711e2f026 /test
parent267755365ab4fa2fd2568d86399806c442c7fd61 (diff)
downloadrabbitmq-server-git-08bded2fa243a15a9c7ced2b040066ad586104f1.tar.gz
Quorum queue queue length limit by byte size and number of messages
Only drop-head strategy. This necessitated the change of rabbit_fifo prefix messages from a tuple of integers representing the number of returned vs enqueued messages that have already been processes and thus don't need to include message bodes in the snapshot to a tuple of lists of the sizes of each message. This change will have some performance impact as the snaphots will now be larger than before but as they still won't contain message bodies at least the sizing is fixed. Decreased the frequency as snapshots points are prepared so somewhat make up for this. [#161247380]
Diffstat (limited to 'test')
-rw-r--r--test/dynamic_qq_SUITE.erl5
-rw-r--r--test/quorum_queue_SUITE.erl69
-rw-r--r--test/quorum_queue_utils.erl9
-rw-r--r--test/rabbit_fifo_SUITE.erl11
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl217
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl207
6 files changed, 405 insertions, 113 deletions
diff --git a/test/dynamic_qq_SUITE.erl b/test/dynamic_qq_SUITE.erl
index d1158ef07a..3357e6f74b 100644
--- a/test/dynamic_qq_SUITE.erl
+++ b/test/dynamic_qq_SUITE.erl
@@ -135,9 +135,8 @@ force_delete_if_no_consensus(Config) ->
passive = true})),
%% TODO implement a force delete
BCh2 = rabbit_ct_client_helpers:open_channel(Config, B),
- ?assertExit({{shutdown,
- {connection_closing, {server_initiated_close, 541, _}}}, _},
- amqp_channel:call(BCh2, #'queue.delete'{queue = QName})),
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(BCh2, #'queue.delete'{queue = QName})),
ok.
takeover_on_failure(Config) ->
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index dcba910a6a..48dac3ca57 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -22,6 +22,7 @@
-import(quorum_queue_utils, [wait_for_messages_ready/3,
wait_for_messages_pending_ack/3,
+ wait_for_messages_total/3,
dirty_query/3,
ra_name/1]).
@@ -37,6 +38,7 @@ all() ->
groups() ->
[
{single_node, [], all_tests()},
+ {single_node, [], memory_tests()},
{unclustered, [], [
{cluster_size_2, [], [add_member]}
]},
@@ -51,6 +53,7 @@ groups() ->
delete_member_not_found,
delete_member]
++ all_tests()},
+ {cluster_size_2, [], memory_tests()},
{cluster_size_3, [], [
declare_during_node_down,
simple_confirm_availability_on_leader_change,
@@ -61,7 +64,8 @@ groups() ->
delete_declare,
metrics_cleanup_on_leadership_takeover,
metrics_cleanup_on_leader_crash,
- consume_in_minority]},
+ consume_in_minority
+ ]},
{cluster_size_5, [], [start_queue,
start_queue_concurrent,
quorum_cluster_size_3,
@@ -126,6 +130,11 @@ all_tests() ->
consume_redelivery_count,
subscribe_redelivery_count,
message_bytes_metrics,
+ queue_length_limit_drop_head
+ ].
+
+memory_tests() ->
+ [
memory_alarm_rolls_wal
].
@@ -240,7 +249,9 @@ declare_args(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
LQ = ?config(queue_name, Config),
- declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
+ declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-length">>, long, 2000},
+ {<<"x-max-length-bytes">>, long, 2000}]),
assert_queue_type(Server, LQ, quorum),
DQ = <<"classic-declare-args-q">>,
@@ -293,16 +304,6 @@ declare_invalid_args(Config) ->
declare(rabbit_ct_client_helpers:open_channel(Config, Server),
LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-message-ttl">>, long, 2000}])),
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-max-length">>, long, 2000}])),
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-max-length-bytes">>, long, 2000}])),
?assertExit(
{{shutdown, {server_initiated_close, 406, _}}, _},
@@ -314,7 +315,7 @@ declare_invalid_args(Config) ->
{{shutdown, {server_initiated_close, 406, _}}, _},
declare(rabbit_ct_client_helpers:open_channel(Config, Server),
LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-overflow">>, longstr, <<"drop-head">>}])),
+ {<<"x-overflow">>, longstr, <<"reject-publish">>}])),
?assertExit(
{{shutdown, {server_initiated_close, 406, _}}, _},
@@ -1422,7 +1423,7 @@ metrics_cleanup_on_leadership_takeover(Config) ->
_ -> false
end
end),
- force_leader_change(Leader, Servers, QQ),
+ force_leader_change(Servers, QQ),
wait_until(fun () ->
[] =:= rpc:call(Leader, ets, lookup, [queue_coarse_metrics, QRes]) andalso
[] =:= rpc:call(Leader, ets, lookup, [queue_metrics, QRes])
@@ -2151,6 +2152,32 @@ memory_alarm_rolls_wal(Config) ->
timer:sleep(1000),
ok.
+queue_length_limit_drop_head(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-length">>, long, 1}])),
+
+ RaName = ra_name(QQ),
+ ok = amqp_channel:cast(Ch,
+ #'basic.publish'{routing_key = QQ},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = <<"msg1">>}),
+ ok = amqp_channel:cast(Ch,
+ #'basic.publish'{routing_key = QQ},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = <<"msg2">>}),
+ wait_for_consensus(QQ, Config),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages_total(Servers, RaName, 1),
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})).
+
%%----------------------------------------------------------------------------
declare(Ch, Q) ->
@@ -2201,6 +2228,9 @@ filter_queues(Expected, Got) ->
lists:member(K, Keys)
end, Got).
+publish_many(Ch, Queue, Count) ->
+ [publish(Ch, Queue) || _ <- lists:seq(1, Count)].
+
publish(Ch, Queue) ->
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = Queue},
@@ -2268,14 +2298,16 @@ wait_until(Condition, N) ->
wait_until(Condition, N - 1)
end.
-force_leader_change(Leader, Servers, Q) ->
+
+force_leader_change([Server | _] = Servers, Q) ->
RaName = ra_name(Q),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server}),
[F1, _] = Servers -- [Leader],
ok = rpc:call(F1, ra, trigger_election, [{RaName, F1}]),
case ra:members({RaName, Leader}) of
{ok, _, {_, Leader}} ->
%% Leader has been re-elected
- force_leader_change(Leader, Servers, Q);
+ force_leader_change(Servers, Q);
{ok, _, _} ->
%% Leader has changed
ok
@@ -2297,3 +2329,8 @@ get_message_bytes(Leader, QRes) ->
_ ->
[]
end.
+
+wait_for_consensus(Name, Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ RaName = ra_name(Name),
+ {ok, _, _} = ra:members({RaName, Server}).
diff --git a/test/quorum_queue_utils.erl b/test/quorum_queue_utils.erl
index a216c220e6..6b820c7b5c 100644
--- a/test/quorum_queue_utils.erl
+++ b/test/quorum_queue_utils.erl
@@ -5,6 +5,7 @@
-export([
wait_for_messages_ready/3,
wait_for_messages_pending_ack/3,
+ wait_for_messages_total/3,
dirty_query/3,
ra_name/1
]).
@@ -17,6 +18,10 @@ wait_for_messages_pending_ack(Servers, QName, Ready) ->
wait_for_messages(Servers, QName, Ready,
fun rabbit_fifo:query_messages_checked_out/1, 60).
+wait_for_messages_total(Servers, QName, Total) ->
+ wait_for_messages(Servers, QName, Total,
+ fun rabbit_fifo:query_messages_total/1, 60).
+
wait_for_messages(Servers, QName, Number, Fun, 0) ->
Msgs = dirty_query(Servers, QName, Fun),
Totals = lists:map(fun(M) when is_map(M) ->
@@ -28,8 +33,8 @@ wait_for_messages(Servers, QName, Number, Fun, 0) ->
wait_for_messages(Servers, QName, Number, Fun, N) ->
Msgs = dirty_query(Servers, QName, Fun),
ct:pal("Got messages ~p", [Msgs]),
- case lists:all(fun(M) when is_map(M) ->
- maps:size(M) == Number;
+ case lists:all(fun(C) when is_integer(C) ->
+ C == Number;
(_) ->
false
end, Msgs) of
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 0512e8161a..9f1f3a4797 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -300,6 +300,7 @@ returns_after_down(Config) ->
Self ! checkout_done
end),
receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end,
+ timer:sleep(1000),
% message should be available for dequeue
{ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2),
ra:stop_server(ServerId),
@@ -481,15 +482,15 @@ test_queries(Config) ->
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
{ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, undefined, F0),
{ok, {_, Ready}, _} = ra:local_query(ServerId,
- fun rabbit_fifo:query_messages_ready/1),
- ?assertEqual(1, maps:size(Ready)),
+ fun rabbit_fifo:query_messages_ready/1),
+ ?assertEqual(1, Ready),
ct:pal("Ready ~w~n", [Ready]),
{ok, {_, Checked}, _} = ra:local_query(ServerId,
- fun rabbit_fifo:query_messages_checked_out/1),
- ?assertEqual(1, maps:size(Checked)),
+ fun rabbit_fifo:query_messages_checked_out/1),
+ ?assertEqual(1, Checked),
ct:pal("Checked ~w~n", [Checked]),
{ok, {_, Processes}, _} = ra:local_query(ServerId,
- fun rabbit_fifo:query_processes/1),
+ fun rabbit_fifo:query_processes/1),
ct:pal("Processes ~w~n", [Processes]),
?assertEqual(2, length(Processes)),
P ! stop,
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index a8604b46af..5643da1991 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -25,7 +25,17 @@ all_tests() ->
scenario1,
scenario2,
scenario3,
- scenario4
+ scenario4,
+ scenario5,
+ scenario6,
+ scenario7,
+ scenario8,
+ scenario9,
+ scenario10,
+ scenario11,
+ scenario12,
+ scenario13,
+ scenario14
].
groups() ->
@@ -73,7 +83,7 @@ scenario1(_Config) ->
make_return(C2, [1]), %% E2 in returns E1 with C2
make_settle(C2, [2]) %% E2 with C2
],
- run_snapshot_test(?FUNCTION_NAME, Commands),
+ run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands),
ok.
scenario2(_Config) ->
@@ -88,7 +98,7 @@ scenario2(_Config) ->
make_settle(C1, [0]),
make_settle(C2, [0])
],
- run_snapshot_test(?FUNCTION_NAME, Commands),
+ run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands),
ok.
scenario3(_Config) ->
@@ -102,7 +112,7 @@ scenario3(_Config) ->
make_settle(C1, [1]),
make_settle(C1, [2])
],
- run_snapshot_test(?FUNCTION_NAME, Commands),
+ run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands),
ok.
scenario4(_Config) ->
@@ -112,19 +122,147 @@ scenario4(_Config) ->
make_enqueue(E,1,msg),
make_settle(C1, [0])
],
- run_snapshot_test(?FUNCTION_NAME, Commands),
+ run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands),
+ ok.
+
+scenario5(_Config) ->
+ C1 = {<<>>, c:pid(0,505,0)},
+ E = c:pid(0,465,9),
+ Commands = [make_enqueue(E,1,<<0>>),
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E,2,<<>>),
+ make_settle(C1,[0])],
+ run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands),
+ ok.
+
+scenario6(_Config) ->
+ E = c:pid(0,465,9),
+ Commands = [make_enqueue(E,1,<<>>), %% 1 msg on queue - snap: prefix 1
+ make_enqueue(E,2,<<>>) %% 1. msg on queue - snap: prefix 1
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 1}, Commands),
+ ok.
+
+scenario7(_Config) ->
+ C1 = {<<>>, c:pid(0,208,0)},
+ E = c:pid(0,188,0),
+ Commands = [
+ make_enqueue(E,1,<<>>),
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E,2,<<>>),
+ make_enqueue(E,3,<<>>),
+ make_settle(C1,[0])],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 1}, Commands),
+ ok.
+
+scenario8(_Config) ->
+ C1 = {<<>>, c:pid(0,208,0)},
+ E = c:pid(0,188,0),
+ Commands = [
+ make_enqueue(E,1,<<>>),
+ make_enqueue(E,2,<<>>),
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ % make_checkout(C1, cancel),
+ {down, E, noconnection},
+ make_settle(C1, [0])],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 1}, Commands),
+ ok.
+
+scenario9(_Config) ->
+ E = c:pid(0,188,0),
+ Commands = [
+ make_enqueue(E,1,<<>>),
+ make_enqueue(E,2,<<>>),
+ make_enqueue(E,3,<<>>)],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 1}, Commands),
+ ok.
+
+scenario10(_Config) ->
+ C1 = {<<>>, c:pid(0,208,0)},
+ E = c:pid(0,188,0),
+ Commands = [
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E,1,<<>>),
+ make_settle(C1, [0])
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 1}, Commands),
+ ok.
+
+scenario11(_Config) ->
+ C1 = {<<>>, c:pid(0,215,0)},
+ E = c:pid(0,217,0),
+ Commands = [
+ make_enqueue(E,1,<<>>),
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_checkout(C1, cancel),
+ make_enqueue(E,2,<<>>),
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_settle(C1, [0]),
+ make_checkout(C1, cancel)
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 2}, Commands),
+ ok.
+
+scenario12(_Config) ->
+ E = c:pid(0,217,0),
+ Commands = [make_enqueue(E,1,<<0>>),
+ make_enqueue(E,2,<<0>>),
+ make_enqueue(E,3,<<0>>)],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_bytes => 2}, Commands),
+ ok.
+
+scenario13(_Config) ->
+ E = c:pid(0,217,0),
+ Commands = [make_enqueue(E,1,<<0>>),
+ make_enqueue(E,2,<<>>),
+ make_enqueue(E,3,<<>>),
+ make_enqueue(E,4,<<>>)
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_length => 2}, Commands),
+ ok.
+
+scenario14(_Config) ->
+ E = c:pid(0,217,0),
+ Commands = [make_enqueue(E,1,<<0,0>>)],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_bytes => 1}, Commands),
ok.
snapshots(_Config) ->
run_proper(
fun () ->
- ?FORALL(O, ?LET(Ops, log_gen(), expand(Ops)),
- test1_prop(O))
- end, [], 1000).
-
-test1_prop(Commands) ->
- ct:pal("Commands: ~p~n", [Commands]),
- try run_snapshot_test(?FUNCTION_NAME, Commands) of
+ ?FORALL({Length, Bytes, SingleActiveConsumer},
+ frequency([{10, {0, 0, false}},
+ {5, {non_neg_integer(), non_neg_integer(),
+ boolean()}}]),
+ ?FORALL(O, ?LET(Ops, log_gen(200), expand(Ops)),
+ collect({Length, Bytes},
+ snapshots_prop(
+ config(?FUNCTION_NAME,
+ Length, Bytes,
+ SingleActiveConsumer), O))))
+ end, [], 2000).
+
+config(Name, Length, Bytes, SingleActive) ->
+ #{name => Name,
+ max_length => map_max(Length),
+ max_bytes => map_max(Bytes),
+ single_active_consumer_on => SingleActive}.
+
+map_max(0) -> undefined;
+map_max(N) -> N.
+
+snapshots_prop(Conf, Commands) ->
+ ct:pal("Commands: ~p~nConf~p~n", [Commands, Conf]),
+ try run_snapshot_test(Conf, Commands) of
_ -> true
catch
Err ->
@@ -132,10 +270,10 @@ test1_prop(Commands) ->
false
end.
-log_gen() ->
+log_gen(Size) ->
?LET(EPids, vector(2, pid_gen()),
?LET(CPids, vector(2, pid_gen()),
- resize(200,
+ resize(Size,
list(
frequency(
[{20, enqueue_gen(oneof(EPids))},
@@ -157,15 +295,17 @@ down_gen(Pid) ->
?LET(E, {down, Pid, oneof([noconnection, noproc])}, E).
enqueue_gen(Pid) ->
- ?LET(E, {enqueue, Pid, frequency([{10, enqueue},
- {1, delay}])}, E).
+ ?LET(E, {enqueue, Pid,
+ frequency([{10, enqueue},
+ {1, delay}]),
+ binary()}, E).
checkout_cancel_gen(Pid) ->
{checkout, Pid, cancel}.
checkout_gen(Pid) ->
%% pid, tag, prefetch
- ?LET(C, {checkout, {binary(), Pid}, choose(1, 10)}, C).
+ ?LET(C, {checkout, {binary(), Pid}, choose(1, 100)}, C).
-record(t, {state = rabbit_fifo:init(#{name => proper,
@@ -193,9 +333,10 @@ expand(Ops) ->
lists:reverse(Log).
-handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0,
- down = Down,
- effects = Effs} = T) ->
+handle_op({enqueue, Pid, When, Data},
+ #t{enqueuers = Enqs0,
+ down = Down,
+ effects = Effs} = T) ->
case Down of
#{Pid := noproc} ->
%% if it's a noproc then it cannot exist - can it?
@@ -204,13 +345,12 @@ handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0,
_ ->
Enqs = maps:update_with(Pid, fun (Seq) -> Seq + 1 end, 1, Enqs0),
MsgSeq = maps:get(Pid, Enqs),
- Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, msg),
+ Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, Data),
case When of
enqueue ->
do_apply(Cmd, T#t{enqueuers = Enqs});
delay ->
%% just put the command on the effects queue
- ct:pal("delaying ~w", [Cmd]),
T#t{effects = queue:in(Cmd, Effs)}
end
end;
@@ -308,7 +448,6 @@ enq_effs([{send_msg, P, {delivery, CTag, Msgs}, ra_event} | Rem], Q) ->
Cmd = rabbit_fifo:make_settle({CTag, P}, MsgIds),
enq_effs(Rem, queue:in(Cmd, Q));
enq_effs([_ | Rem], Q) ->
- % ct:pal("enq_effs dropping ~w~n", [E]),
enq_effs(Rem, Q).
@@ -323,29 +462,40 @@ run_proper(Fun, Args, NumTests) ->
(F, A) -> ct:pal(?LOW_IMPORTANCE, F, A)
end}])).
-run_snapshot_test(Name, Commands) ->
+run_snapshot_test(Conf, Commands) ->
%% create every incremental permuation of the commands lists
%% and run the snapshot tests against that
[begin
% ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]),
- run_snapshot_test0(Name, C)
+ run_snapshot_test0(Conf, C)
end || C <- prefixes(Commands, 1, [])].
-run_snapshot_test0(Name, Commands) ->
+run_snapshot_test0(Conf, Commands) ->
Indexes = lists:seq(1, length(Commands)),
Entries = lists:zip(Indexes, Commands),
- {State, Effects} = run_log(test_init(Name), Entries),
+ {State, Effects} = run_log(test_init(Conf), Entries),
+ % ct:pal("beginning snapshot test run for ~w numn commands ~b",
+ % [maps:get(name, Conf), length(Commands)]),
[begin
+ %% drop all entries below and including the snapshot
Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
(_) -> false
end, Entries),
{S, _} = run_log(SnapState, Filtered),
% assert log can be restored from any release cursor index
- ?assertEqual(State, S)
+ case S of
+ State -> ok;
+ _ ->
+ ct:pal("Snapshot tests failed run log:~n"
+ "~p~n from ~n~p~n Entries~n~p~n",
+ [Filtered, SnapState, Entries]),
+ ?assertEqual(State, S)
+ end
end || {release_cursor, SnapIdx, SnapState} <- Effects],
ok.
+%% transforms [1,2,3] into [[1,2,3], [1,2], [1]]
prefixes(Source, N, Acc) when N > length(Source) ->
lists:reverse(Acc);
prefixes(Source, N, Acc) ->
@@ -364,11 +514,12 @@ run_log(InitState, Entries) ->
end
end, {InitState, []}, Entries).
-test_init(Name) ->
- rabbit_fifo:init(#{name => Name,
- queue_resource => blah,
- shadow_copy_interval => 0,
- metrics_handler => {?MODULE, metrics_handler, []}}).
+test_init(Conf) ->
+ Default = #{queue_resource => blah,
+ shadow_copy_interval => 0,
+ metrics_handler => {?MODULE, metrics_handler, []}},
+ rabbit_fifo:init(maps:merge(Default, Conf)).
+
meta(Idx) ->
#{index => Idx, term => 1}.
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
index 0343e7d136..581440d179 100644
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ b/test/unit_inbroker_parallel_SUITE.erl
@@ -35,9 +35,15 @@ all() ->
].
groups() ->
- MaxLengthTests = [max_length_drop_head,
+ MaxLengthTests = [max_length_default,
+ max_length_bytes_default,
+ max_length_drop_head,
+ max_length_bytes_drop_head,
max_length_reject_confirm,
- max_length_drop_publish],
+ max_length_bytes_reject_confirm,
+ max_length_drop_publish,
+ max_length_drop_publish_requeue,
+ max_length_bytes_drop_publish],
[
{parallel_tests, [parallel], [
amqp_connection_refusal,
@@ -59,11 +65,16 @@ groups() ->
set_disk_free_limit_command,
set_vm_memory_high_watermark_command,
topic_matching,
+ max_message_size,
+
{queue_max_length, [], [
- {max_length_simple, [], MaxLengthTests},
- {max_length_mirrored, [], MaxLengthTests}]},
- max_message_size
- ]}
+ {max_length_classic, [], MaxLengthTests},
+ {max_length_quorum, [], [max_length_default,
+ max_length_bytes_default]
+ },
+ {max_length_mirrored, [], MaxLengthTests}
+ ]}
+ ]}
].
suite() ->
@@ -82,10 +93,23 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
+init_per_group(max_length_classic, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, false}]);
+init_per_group(max_length_quorum, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ {queue_durable, true}]);
init_per_group(max_length_mirrored, Config) ->
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
<<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
- Config1 = rabbit_ct_helpers:set_config(Config, [{is_mirrored, true}]),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config, [{is_mirrored, true},
+ {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, false}]),
rabbit_ct_helpers:run_steps(Config1, []);
init_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
@@ -132,29 +156,22 @@ end_per_group(Group, Config) ->
end.
init_per_testcase(Testcase, Config) ->
- rabbit_ct_helpers:testcase_started(Config, Testcase).
-
-end_per_testcase(max_length_drop_head = Testcase, Config) ->
+ Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
+ Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q}]),
+ rabbit_ct_helpers:testcase_started(Config1, Testcase).
+
+end_per_testcase(Testcase, Config)
+ when Testcase == max_length_drop_publish; Testcase == max_length_bytes_drop_publish;
+ Testcase == max_length_drop_publish_requeue;
+ Testcase == max_length_reject_confirm; Testcase == max_length_bytes_reject_confirm;
+ Testcase == max_length_drop_head; Testcase == max_length_bytes_drop_head;
+ Testcase == max_length_default; Testcase == max_length_bytes_default ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_head_queue">>}),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_default_drop_head_queue">>}),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_drop_head_queue">>}),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_default_drop_head_queue">>}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}),
rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
rabbit_ct_helpers:testcase_finished(Config, Testcase);
-end_per_testcase(max_length_reject_confirm = Testcase, Config) ->
- {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_reject_queue">>}),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_reject_queue">>}),
- rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
- rabbit_ct_helpers:testcase_finished(Config, Testcase);
-end_per_testcase(max_length_drop_publish = Testcase, Config) ->
- {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_publish_queue">>}),
- amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_drop_publish_queue">>}),
- rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
- rabbit_ct_helpers:testcase_finished(Config, Testcase);
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
@@ -1159,43 +1176,66 @@ set_vm_memory_high_watermark_command1(_Config) ->
)
end.
-max_length_drop_head(Config) ->
+max_length_bytes_drop_head(Config) ->
+ max_length_bytes_drop_head(Config, [{<<"x-overflow">>, longstr, <<"drop-head">>}]).
+
+max_length_bytes_default(Config) ->
+ max_length_bytes_drop_head(Config, []).
+
+max_length_bytes_drop_head(Config, ExtraArgs) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- QName = <<"max_length_drop_head_queue">>,
- QNameDefault = <<"max_length_default_drop_head_queue">>,
- QNameBytes = <<"max_length_bytes_drop_head_queue">>,
- QNameDefaultBytes = <<"max_length_bytes_default_drop_head_queue">>,
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
- MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
- OverflowArgs = [{<<"x-overflow">>, longstr, <<"drop-head">>}],
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}),
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefault, arguments = MaxLengthArgs}),
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}),
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefaultBytes, arguments = MaxLengthBytesArgs}),
-
- check_max_length_drops_head(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
- check_max_length_drops_head(Config, QNameDefault, Ch, <<"1">>, <<"2">>, <<"3">>),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthBytesArgs ++ Args ++ ExtraArgs, durable = Durable}),
%% 80 bytes payload
Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>,
Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>,
- check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3),
- check_max_length_drops_head(Config, QNameDefault, Ch, Payload1, Payload2, Payload3).
+ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3).
+
+max_length_drop_head(Config) ->
+ max_length_drop_head(Config, [{<<"x-overflow">>, longstr, <<"drop-head">>}]).
+
+max_length_default(Config) ->
+ %% Defaults to drop_head
+ max_length_drop_head(Config, []).
+
+max_length_drop_head(Config, ExtraArgs) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+
+ MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ Args ++ ExtraArgs, durable = Durable}),
+
+ check_max_length_drops_head(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>).
max_length_reject_confirm(Config) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- QName = <<"max_length_reject_queue">>,
- QNameBytes = <<"max_length_bytes_reject_queue">>,
+ Args = ?config(queue_args, Config),
+ QName = ?config(queue_name, Config),
+ Durable = ?config(queue_durable, Config),
MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
- MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}),
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
- check_max_length_rejects(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
+ check_max_length_rejects(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>).
+
+max_length_bytes_reject_confirm(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ QNameBytes = ?config(queue_name, Config),
+ Durable = ?config(queue_durable, Config),
+ MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs ++ Args, durable = Durable}),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
%% 80 bytes payload
Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
@@ -1207,15 +1247,55 @@ max_length_reject_confirm(Config) ->
max_length_drop_publish(Config) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- QName = <<"max_length_drop_publish_queue">>,
- QNameBytes = <<"max_length_bytes_drop_publish_queue">>,
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
- MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}),
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}),
%% If confirms are not enable, publishes will still be dropped in reject-publish mode.
- check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
+ check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>).
+
+max_length_drop_publish_requeue(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+ MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}),
+ %% If confirms are not enable, publishes will still be dropped in reject-publish mode.
+ check_max_length_requeue(Config, QName, Ch, <<"1">>, <<"2">>).
+
+check_max_length_requeue(Config, QName, Ch, Payload1, Payload2) ->
+ sync_mirrors(QName, Config),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ %% A single message is published and consumed
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ wait_for_consensus(QName, Config),
+ {#'basic.get_ok'{delivery_tag = DeliveryTag},
+ #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+
+ %% Another message is published
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ wait_for_consensus(QName, Config),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true}),
+ wait_for_consensus(QName, Config),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
+
+max_length_bytes_drop_publish(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QNameBytes = ?config(queue_name, Config),
+ MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs ++ Args, durable = Durable}),
%% 80 bytes payload
Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
@@ -1229,22 +1309,38 @@ check_max_length_drops_publish(Config, QName, Ch, Payload1, Payload2, Payload3)
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% A single message is published and consumed
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% Message 2 is dropped, message 1 stays
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ wait_for_consensus(QName, Config),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% Messages 2 and 3 are dropped, message 1 stays
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ wait_for_consensus(QName, Config),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ wait_for_consensus(QName, Config),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
+wait_for_consensus(QName, Config) ->
+ case lists:keyfind(<<"x-queue-type">>, 1, ?config(queue_args, Config)) of
+ {_, _, <<"quorum">>} ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ RaName = binary_to_atom(<<"%2F_", QName/binary>>, utf8),
+ {ok, _, _} = ra:members({RaName, Server});
+ _ ->
+ ok
+ end.
+
check_max_length_rejects(Config, QName, Ch, Payload1, Payload2, Payload3) ->
sync_mirrors(QName, Config),
amqp_channel:register_confirm_handler(Ch, self()),
@@ -1283,12 +1379,14 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) ->
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% A single message is published and consumed
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% Message 1 is replaced by message 2
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
@@ -1296,6 +1394,7 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) ->
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
+ wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload3}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).