diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2019-01-25 17:41:12 +0100 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2019-01-25 17:41:12 +0100 |
| commit | dcf663fcf99eeefb1c0de07e7389b47988cfbd92 (patch) | |
| tree | eff2f2f31d572615bc2453378f3b760d1323bb9f /test | |
| parent | 9eaa79d5f80ec3025ce0dbbac5e81a60437dec7c (diff) | |
| parent | a4b602567081b28c4bc53ac5995b5c054a305da9 (diff) | |
| download | rabbitmq-server-git-dcf663fcf99eeefb1c0de07e7389b47988cfbd92.tar.gz | |
Merge branch 'master' into rabbitmq-server-1838-active-field-for-consumers
Conflicts:
src/rabbit_fifo.erl
Diffstat (limited to 'test')
| -rw-r--r-- | test/backing_queue_SUITE.erl | 18 | ||||
| -rw-r--r-- | test/dynamic_qq_SUITE.erl | 6 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 69 | ||||
| -rw-r--r-- | test/quorum_queue_utils.erl | 9 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 21 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 217 | ||||
| -rw-r--r-- | test/unit_inbroker_parallel_SUITE.erl | 207 |
7 files changed, 422 insertions, 125 deletions
diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl index 5864c387c5..433bc66bff 100644 --- a/test/backing_queue_SUITE.erl +++ b/test/backing_queue_SUITE.erl @@ -723,7 +723,7 @@ bq_queue_recover1(Config) -> true, false, [], none, <<"acting-user">>), publish_and_confirm(Q, <<>>, Count), - SupPid = rabbit_ct_broker_helpers:get_queue_sup_pid(Q), + SupPid = get_queue_sup_pid(Q), true = is_pid(SupPid), exit(SupPid, kill), exit(QPid, kill), @@ -751,6 +751,22 @@ bq_queue_recover1(Config) -> end), passed. +%% Return the PID of the given queue's supervisor. +get_queue_sup_pid(#amqqueue { pid = QPid, name = QName }) -> + VHost = QName#resource.virtual_host, + {ok, AmqSup} = rabbit_amqqueue_sup_sup:find_for_vhost(VHost, node(QPid)), + Sups = supervisor:which_children(AmqSup), + get_queue_sup_pid(Sups, QPid). + +get_queue_sup_pid([{_, SupPid, _, _} | Rest], QueuePid) -> + WorkerPids = [Pid || {_, Pid, _, _} <- supervisor:which_children(SupPid)], + case lists:member(QueuePid, WorkerPids) of + true -> SupPid; + false -> get_queue_sup_pid(Rest, QueuePid) + end; +get_queue_sup_pid([], _QueuePid) -> + undefined. + variable_queue_dynamic_duration_change(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, variable_queue_dynamic_duration_change1, [Config]). diff --git a/test/dynamic_qq_SUITE.erl b/test/dynamic_qq_SUITE.erl index d1158ef07a..fbc1e81827 100644 --- a/test/dynamic_qq_SUITE.erl +++ b/test/dynamic_qq_SUITE.erl @@ -133,11 +133,9 @@ force_delete_if_no_consensus(Config) -> arguments = Args, durable = true, passive = true})), - %% TODO implement a force delete BCh2 = rabbit_ct_client_helpers:open_channel(Config, B), - ?assertExit({{shutdown, - {connection_closing, {server_initiated_close, 541, _}}}, _}, - amqp_channel:call(BCh2, #'queue.delete'{queue = QName})), + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(BCh2, #'queue.delete'{queue = QName})), ok. takeover_on_failure(Config) -> diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index dcba910a6a..48dac3ca57 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -22,6 +22,7 @@ -import(quorum_queue_utils, [wait_for_messages_ready/3, wait_for_messages_pending_ack/3, + wait_for_messages_total/3, dirty_query/3, ra_name/1]). @@ -37,6 +38,7 @@ all() -> groups() -> [ {single_node, [], all_tests()}, + {single_node, [], memory_tests()}, {unclustered, [], [ {cluster_size_2, [], [add_member]} ]}, @@ -51,6 +53,7 @@ groups() -> delete_member_not_found, delete_member] ++ all_tests()}, + {cluster_size_2, [], memory_tests()}, {cluster_size_3, [], [ declare_during_node_down, simple_confirm_availability_on_leader_change, @@ -61,7 +64,8 @@ groups() -> delete_declare, metrics_cleanup_on_leadership_takeover, metrics_cleanup_on_leader_crash, - consume_in_minority]}, + consume_in_minority + ]}, {cluster_size_5, [], [start_queue, start_queue_concurrent, quorum_cluster_size_3, @@ -126,6 +130,11 @@ all_tests() -> consume_redelivery_count, subscribe_redelivery_count, message_bytes_metrics, + queue_length_limit_drop_head + ]. + +memory_tests() -> + [ memory_alarm_rolls_wal ]. @@ -240,7 +249,9 @@ declare_args(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config, Server), LQ = ?config(queue_name, Config), - declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), + declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-length">>, long, 2000}, + {<<"x-max-length-bytes">>, long, 2000}]), assert_queue_type(Server, LQ, quorum), DQ = <<"classic-declare-args-q">>, @@ -293,16 +304,6 @@ declare_invalid_args(Config) -> declare(rabbit_ct_client_helpers:open_channel(Config, Server), LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, {<<"x-message-ttl">>, long, 2000}])), - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-max-length">>, long, 2000}])), - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-max-length-bytes">>, long, 2000}])), ?assertExit( {{shutdown, {server_initiated_close, 406, _}}, _}, @@ -314,7 +315,7 @@ declare_invalid_args(Config) -> {{shutdown, {server_initiated_close, 406, _}}, _}, declare(rabbit_ct_client_helpers:open_channel(Config, Server), LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-overflow">>, longstr, <<"drop-head">>}])), + {<<"x-overflow">>, longstr, <<"reject-publish">>}])), ?assertExit( {{shutdown, {server_initiated_close, 406, _}}, _}, @@ -1422,7 +1423,7 @@ metrics_cleanup_on_leadership_takeover(Config) -> _ -> false end end), - force_leader_change(Leader, Servers, QQ), + force_leader_change(Servers, QQ), wait_until(fun () -> [] =:= rpc:call(Leader, ets, lookup, [queue_coarse_metrics, QRes]) andalso [] =:= rpc:call(Leader, ets, lookup, [queue_metrics, QRes]) @@ -2151,6 +2152,32 @@ memory_alarm_rolls_wal(Config) -> timer:sleep(1000), ok. +queue_length_limit_drop_head(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-length">>, long, 1}])), + + RaName = ra_name(QQ), + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = <<"msg1">>}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = <<"msg2">>}), + wait_for_consensus(QQ, Config), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + wait_for_messages_total(Servers, RaName, 1), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})). + %%---------------------------------------------------------------------------- declare(Ch, Q) -> @@ -2201,6 +2228,9 @@ filter_queues(Expected, Got) -> lists:member(K, Keys) end, Got). +publish_many(Ch, Queue, Count) -> + [publish(Ch, Queue) || _ <- lists:seq(1, Count)]. + publish(Ch, Queue) -> ok = amqp_channel:cast(Ch, #'basic.publish'{routing_key = Queue}, @@ -2268,14 +2298,16 @@ wait_until(Condition, N) -> wait_until(Condition, N - 1) end. -force_leader_change(Leader, Servers, Q) -> + +force_leader_change([Server | _] = Servers, Q) -> RaName = ra_name(Q), + {ok, _, {_, Leader}} = ra:members({RaName, Server}), [F1, _] = Servers -- [Leader], ok = rpc:call(F1, ra, trigger_election, [{RaName, F1}]), case ra:members({RaName, Leader}) of {ok, _, {_, Leader}} -> %% Leader has been re-elected - force_leader_change(Leader, Servers, Q); + force_leader_change(Servers, Q); {ok, _, _} -> %% Leader has changed ok @@ -2297,3 +2329,8 @@ get_message_bytes(Leader, QRes) -> _ -> [] end. + +wait_for_consensus(Name, Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + RaName = ra_name(Name), + {ok, _, _} = ra:members({RaName, Server}). diff --git a/test/quorum_queue_utils.erl b/test/quorum_queue_utils.erl index a216c220e6..6b820c7b5c 100644 --- a/test/quorum_queue_utils.erl +++ b/test/quorum_queue_utils.erl @@ -5,6 +5,7 @@ -export([ wait_for_messages_ready/3, wait_for_messages_pending_ack/3, + wait_for_messages_total/3, dirty_query/3, ra_name/1 ]). @@ -17,6 +18,10 @@ wait_for_messages_pending_ack(Servers, QName, Ready) -> wait_for_messages(Servers, QName, Ready, fun rabbit_fifo:query_messages_checked_out/1, 60). +wait_for_messages_total(Servers, QName, Total) -> + wait_for_messages(Servers, QName, Total, + fun rabbit_fifo:query_messages_total/1, 60). + wait_for_messages(Servers, QName, Number, Fun, 0) -> Msgs = dirty_query(Servers, QName, Fun), Totals = lists:map(fun(M) when is_map(M) -> @@ -28,8 +33,8 @@ wait_for_messages(Servers, QName, Number, Fun, 0) -> wait_for_messages(Servers, QName, Number, Fun, N) -> Msgs = dirty_query(Servers, QName, Fun), ct:pal("Got messages ~p", [Msgs]), - case lists:all(fun(M) when is_map(M) -> - maps:size(M) == Number; + case lists:all(fun(C) when is_integer(C) -> + C == Number; (_) -> false end, Msgs) of diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 0512e8161a..6df61d4288 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -124,7 +124,6 @@ basics(Config) -> {ra_event, Frm, E} -> case rabbit_fifo_client:handle_ra_event(Frm, E, FState6b) of {internal, _, _, _FState7} -> - ct:pal("unexpected event ~p~n", [E]), exit({unexpected_internal_event, E}); {{delivery, Ctag, [{Mid, {_, two}}]}, FState7} -> {ok, _S} = rabbit_fifo_client:return(Ctag, [Mid], FState7), @@ -218,9 +217,7 @@ usage(Config) -> % force tick and usage stats emission ServerId ! tick_timeout, timer:sleep(50), - % ct:pal("ets ~w ~w ~w", [ets:tab2list(rabbit_fifo_usage), ServerId, UId]), Use = rabbit_fifo:usage(element(1, ServerId)), - ct:pal("Use ~w~n", [Use]), ra:stop_server(ServerId), ?assert(Use > 0.0), ok. @@ -300,6 +297,7 @@ returns_after_down(Config) -> Self ! checkout_done end), receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end, + timer:sleep(1000), % message should be available for dequeue {ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2), ra:stop_server(ServerId), @@ -380,7 +378,6 @@ discard(Config) -> {ok, empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3), receive {dead_letter, Letters} -> - ct:pal("dead letters ~p~n", [Letters]), [{_, msg1}] = Letters, ok after 500 -> @@ -481,16 +478,13 @@ test_queries(Config) -> F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, undefined, F0), {ok, {_, Ready}, _} = ra:local_query(ServerId, - fun rabbit_fifo:query_messages_ready/1), - ?assertEqual(1, maps:size(Ready)), - ct:pal("Ready ~w~n", [Ready]), + fun rabbit_fifo:query_messages_ready/1), + ?assertEqual(1, Ready), {ok, {_, Checked}, _} = ra:local_query(ServerId, - fun rabbit_fifo:query_messages_checked_out/1), - ?assertEqual(1, maps:size(Checked)), - ct:pal("Checked ~w~n", [Checked]), + fun rabbit_fifo:query_messages_checked_out/1), + ?assertEqual(1, Checked), {ok, {_, Processes}, _} = ra:local_query(ServerId, - fun rabbit_fifo:query_processes/1), - ct:pal("Processes ~w~n", [Processes]), + fun rabbit_fifo:query_processes/1), ?assertEqual(2, length(Processes)), P ! stop, ra:stop_server(ServerId), @@ -565,7 +559,6 @@ process_ra_events(State, Acc, Wait) -> process_ra_events0(State0, Acc, Actions0, Wait, DeliveryFun) -> receive {ra_event, From, Evt} -> - % ct:pal("ra event ~w~n", [Evt]), case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of {internal, _, Actions, State} -> process_ra_events0(State, Acc, Actions0 ++ Actions, @@ -588,7 +581,6 @@ discard_next_delivery(State0, Wait) -> discard_next_delivery(State, Wait); {{delivery, Tag, Msgs}, State1} -> MsgIds = [element(1, M) || M <- Msgs], - ct:pal("discarding ~p", [Msgs]), {ok, State} = rabbit_fifo_client:discard(Tag, MsgIds, State1), State @@ -605,7 +597,6 @@ return_next_delivery(State0, Wait) -> return_next_delivery(State, Wait); {{delivery, Tag, Msgs}, State1} -> MsgIds = [element(1, M) || M <- Msgs], - ct:pal("returning ~p", [Msgs]), {ok, State} = rabbit_fifo_client:return(Tag, MsgIds, State1), State diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index a8604b46af..5643da1991 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -25,7 +25,17 @@ all_tests() -> scenario1, scenario2, scenario3, - scenario4 + scenario4, + scenario5, + scenario6, + scenario7, + scenario8, + scenario9, + scenario10, + scenario11, + scenario12, + scenario13, + scenario14 ]. groups() -> @@ -73,7 +83,7 @@ scenario1(_Config) -> make_return(C2, [1]), %% E2 in returns E1 with C2 make_settle(C2, [2]) %% E2 with C2 ], - run_snapshot_test(?FUNCTION_NAME, Commands), + run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands), ok. scenario2(_Config) -> @@ -88,7 +98,7 @@ scenario2(_Config) -> make_settle(C1, [0]), make_settle(C2, [0]) ], - run_snapshot_test(?FUNCTION_NAME, Commands), + run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands), ok. scenario3(_Config) -> @@ -102,7 +112,7 @@ scenario3(_Config) -> make_settle(C1, [1]), make_settle(C1, [2]) ], - run_snapshot_test(?FUNCTION_NAME, Commands), + run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands), ok. scenario4(_Config) -> @@ -112,19 +122,147 @@ scenario4(_Config) -> make_enqueue(E,1,msg), make_settle(C1, [0]) ], - run_snapshot_test(?FUNCTION_NAME, Commands), + run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands), + ok. + +scenario5(_Config) -> + C1 = {<<>>, c:pid(0,505,0)}, + E = c:pid(0,465,9), + Commands = [make_enqueue(E,1,<<0>>), + make_checkout(C1, {auto,1,simple_prefetch}), + make_enqueue(E,2,<<>>), + make_settle(C1,[0])], + run_snapshot_test(#{name => ?FUNCTION_NAME}, Commands), + ok. + +scenario6(_Config) -> + E = c:pid(0,465,9), + Commands = [make_enqueue(E,1,<<>>), %% 1 msg on queue - snap: prefix 1 + make_enqueue(E,2,<<>>) %% 1. msg on queue - snap: prefix 1 + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_length => 1}, Commands), + ok. + +scenario7(_Config) -> + C1 = {<<>>, c:pid(0,208,0)}, + E = c:pid(0,188,0), + Commands = [ + make_enqueue(E,1,<<>>), + make_checkout(C1, {auto,1,simple_prefetch}), + make_enqueue(E,2,<<>>), + make_enqueue(E,3,<<>>), + make_settle(C1,[0])], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_length => 1}, Commands), + ok. + +scenario8(_Config) -> + C1 = {<<>>, c:pid(0,208,0)}, + E = c:pid(0,188,0), + Commands = [ + make_enqueue(E,1,<<>>), + make_enqueue(E,2,<<>>), + make_checkout(C1, {auto,1,simple_prefetch}), + % make_checkout(C1, cancel), + {down, E, noconnection}, + make_settle(C1, [0])], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_length => 1}, Commands), + ok. + +scenario9(_Config) -> + E = c:pid(0,188,0), + Commands = [ + make_enqueue(E,1,<<>>), + make_enqueue(E,2,<<>>), + make_enqueue(E,3,<<>>)], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_length => 1}, Commands), + ok. + +scenario10(_Config) -> + C1 = {<<>>, c:pid(0,208,0)}, + E = c:pid(0,188,0), + Commands = [ + make_checkout(C1, {auto,1,simple_prefetch}), + make_enqueue(E,1,<<>>), + make_settle(C1, [0]) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_length => 1}, Commands), + ok. + +scenario11(_Config) -> + C1 = {<<>>, c:pid(0,215,0)}, + E = c:pid(0,217,0), + Commands = [ + make_enqueue(E,1,<<>>), + make_checkout(C1, {auto,1,simple_prefetch}), + make_checkout(C1, cancel), + make_enqueue(E,2,<<>>), + make_checkout(C1, {auto,1,simple_prefetch}), + make_settle(C1, [0]), + make_checkout(C1, cancel) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_length => 2}, Commands), + ok. + +scenario12(_Config) -> + E = c:pid(0,217,0), + Commands = [make_enqueue(E,1,<<0>>), + make_enqueue(E,2,<<0>>), + make_enqueue(E,3,<<0>>)], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_bytes => 2}, Commands), + ok. + +scenario13(_Config) -> + E = c:pid(0,217,0), + Commands = [make_enqueue(E,1,<<0>>), + make_enqueue(E,2,<<>>), + make_enqueue(E,3,<<>>), + make_enqueue(E,4,<<>>) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_length => 2}, Commands), + ok. + +scenario14(_Config) -> + E = c:pid(0,217,0), + Commands = [make_enqueue(E,1,<<0,0>>)], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_bytes => 1}, Commands), ok. snapshots(_Config) -> run_proper( fun () -> - ?FORALL(O, ?LET(Ops, log_gen(), expand(Ops)), - test1_prop(O)) - end, [], 1000). - -test1_prop(Commands) -> - ct:pal("Commands: ~p~n", [Commands]), - try run_snapshot_test(?FUNCTION_NAME, Commands) of + ?FORALL({Length, Bytes, SingleActiveConsumer}, + frequency([{10, {0, 0, false}}, + {5, {non_neg_integer(), non_neg_integer(), + boolean()}}]), + ?FORALL(O, ?LET(Ops, log_gen(200), expand(Ops)), + collect({Length, Bytes}, + snapshots_prop( + config(?FUNCTION_NAME, + Length, Bytes, + SingleActiveConsumer), O)))) + end, [], 2000). + +config(Name, Length, Bytes, SingleActive) -> + #{name => Name, + max_length => map_max(Length), + max_bytes => map_max(Bytes), + single_active_consumer_on => SingleActive}. + +map_max(0) -> undefined; +map_max(N) -> N. + +snapshots_prop(Conf, Commands) -> + ct:pal("Commands: ~p~nConf~p~n", [Commands, Conf]), + try run_snapshot_test(Conf, Commands) of _ -> true catch Err -> @@ -132,10 +270,10 @@ test1_prop(Commands) -> false end. -log_gen() -> +log_gen(Size) -> ?LET(EPids, vector(2, pid_gen()), ?LET(CPids, vector(2, pid_gen()), - resize(200, + resize(Size, list( frequency( [{20, enqueue_gen(oneof(EPids))}, @@ -157,15 +295,17 @@ down_gen(Pid) -> ?LET(E, {down, Pid, oneof([noconnection, noproc])}, E). enqueue_gen(Pid) -> - ?LET(E, {enqueue, Pid, frequency([{10, enqueue}, - {1, delay}])}, E). + ?LET(E, {enqueue, Pid, + frequency([{10, enqueue}, + {1, delay}]), + binary()}, E). checkout_cancel_gen(Pid) -> {checkout, Pid, cancel}. checkout_gen(Pid) -> %% pid, tag, prefetch - ?LET(C, {checkout, {binary(), Pid}, choose(1, 10)}, C). + ?LET(C, {checkout, {binary(), Pid}, choose(1, 100)}, C). -record(t, {state = rabbit_fifo:init(#{name => proper, @@ -193,9 +333,10 @@ expand(Ops) -> lists:reverse(Log). -handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0, - down = Down, - effects = Effs} = T) -> +handle_op({enqueue, Pid, When, Data}, + #t{enqueuers = Enqs0, + down = Down, + effects = Effs} = T) -> case Down of #{Pid := noproc} -> %% if it's a noproc then it cannot exist - can it? @@ -204,13 +345,12 @@ handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0, _ -> Enqs = maps:update_with(Pid, fun (Seq) -> Seq + 1 end, 1, Enqs0), MsgSeq = maps:get(Pid, Enqs), - Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, msg), + Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, Data), case When of enqueue -> do_apply(Cmd, T#t{enqueuers = Enqs}); delay -> %% just put the command on the effects queue - ct:pal("delaying ~w", [Cmd]), T#t{effects = queue:in(Cmd, Effs)} end end; @@ -308,7 +448,6 @@ enq_effs([{send_msg, P, {delivery, CTag, Msgs}, ra_event} | Rem], Q) -> Cmd = rabbit_fifo:make_settle({CTag, P}, MsgIds), enq_effs(Rem, queue:in(Cmd, Q)); enq_effs([_ | Rem], Q) -> - % ct:pal("enq_effs dropping ~w~n", [E]), enq_effs(Rem, Q). @@ -323,29 +462,40 @@ run_proper(Fun, Args, NumTests) -> (F, A) -> ct:pal(?LOW_IMPORTANCE, F, A) end}])). -run_snapshot_test(Name, Commands) -> +run_snapshot_test(Conf, Commands) -> %% create every incremental permuation of the commands lists %% and run the snapshot tests against that [begin % ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]), - run_snapshot_test0(Name, C) + run_snapshot_test0(Conf, C) end || C <- prefixes(Commands, 1, [])]. -run_snapshot_test0(Name, Commands) -> +run_snapshot_test0(Conf, Commands) -> Indexes = lists:seq(1, length(Commands)), Entries = lists:zip(Indexes, Commands), - {State, Effects} = run_log(test_init(Name), Entries), + {State, Effects} = run_log(test_init(Conf), Entries), + % ct:pal("beginning snapshot test run for ~w numn commands ~b", + % [maps:get(name, Conf), length(Commands)]), [begin + %% drop all entries below and including the snapshot Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true; (_) -> false end, Entries), {S, _} = run_log(SnapState, Filtered), % assert log can be restored from any release cursor index - ?assertEqual(State, S) + case S of + State -> ok; + _ -> + ct:pal("Snapshot tests failed run log:~n" + "~p~n from ~n~p~n Entries~n~p~n", + [Filtered, SnapState, Entries]), + ?assertEqual(State, S) + end end || {release_cursor, SnapIdx, SnapState} <- Effects], ok. +%% transforms [1,2,3] into [[1,2,3], [1,2], [1]] prefixes(Source, N, Acc) when N > length(Source) -> lists:reverse(Acc); prefixes(Source, N, Acc) -> @@ -364,11 +514,12 @@ run_log(InitState, Entries) -> end end, {InitState, []}, Entries). -test_init(Name) -> - rabbit_fifo:init(#{name => Name, - queue_resource => blah, - shadow_copy_interval => 0, - metrics_handler => {?MODULE, metrics_handler, []}}). +test_init(Conf) -> + Default = #{queue_resource => blah, + shadow_copy_interval => 0, + metrics_handler => {?MODULE, metrics_handler, []}}, + rabbit_fifo:init(maps:merge(Default, Conf)). + meta(Idx) -> #{index => Idx, term => 1}. diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index 0343e7d136..581440d179 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -35,9 +35,15 @@ all() -> ]. groups() -> - MaxLengthTests = [max_length_drop_head, + MaxLengthTests = [max_length_default, + max_length_bytes_default, + max_length_drop_head, + max_length_bytes_drop_head, max_length_reject_confirm, - max_length_drop_publish], + max_length_bytes_reject_confirm, + max_length_drop_publish, + max_length_drop_publish_requeue, + max_length_bytes_drop_publish], [ {parallel_tests, [parallel], [ amqp_connection_refusal, @@ -59,11 +65,16 @@ groups() -> set_disk_free_limit_command, set_vm_memory_high_watermark_command, topic_matching, + max_message_size, + {queue_max_length, [], [ - {max_length_simple, [], MaxLengthTests}, - {max_length_mirrored, [], MaxLengthTests}]}, - max_message_size - ]} + {max_length_classic, [], MaxLengthTests}, + {max_length_quorum, [], [max_length_default, + max_length_bytes_default] + }, + {max_length_mirrored, [], MaxLengthTests} + ]} + ]} ]. suite() -> @@ -82,10 +93,23 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). +init_per_group(max_length_classic, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, false}]); +init_per_group(max_length_quorum, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {queue_durable, true}]); init_per_group(max_length_mirrored, Config) -> rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), - Config1 = rabbit_ct_helpers:set_config(Config, [{is_mirrored, true}]), + Config1 = rabbit_ct_helpers:set_config( + Config, [{is_mirrored, true}, + {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, false}]), rabbit_ct_helpers:run_steps(Config1, []); init_per_group(Group, Config) -> case lists:member({group, Group}, all()) of @@ -132,29 +156,22 @@ end_per_group(Group, Config) -> end. init_per_testcase(Testcase, Config) -> - rabbit_ct_helpers:testcase_started(Config, Testcase). - -end_per_testcase(max_length_drop_head = Testcase, Config) -> + Group = proplists:get_value(name, ?config(tc_group_properties, Config)), + Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])), + Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q}]), + rabbit_ct_helpers:testcase_started(Config1, Testcase). + +end_per_testcase(Testcase, Config) + when Testcase == max_length_drop_publish; Testcase == max_length_bytes_drop_publish; + Testcase == max_length_drop_publish_requeue; + Testcase == max_length_reject_confirm; Testcase == max_length_bytes_reject_confirm; + Testcase == max_length_drop_head; Testcase == max_length_bytes_drop_head; + Testcase == max_length_default; Testcase == max_length_bytes_default -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_head_queue">>}), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_default_drop_head_queue">>}), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_drop_head_queue">>}), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_default_drop_head_queue">>}), + amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}), rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), rabbit_ct_helpers:testcase_finished(Config, Testcase); -end_per_testcase(max_length_reject_confirm = Testcase, Config) -> - {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_reject_queue">>}), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_reject_queue">>}), - rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), - rabbit_ct_helpers:testcase_finished(Config, Testcase); -end_per_testcase(max_length_drop_publish = Testcase, Config) -> - {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_publish_queue">>}), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_drop_publish_queue">>}), - rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), - rabbit_ct_helpers:testcase_finished(Config, Testcase); end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). @@ -1159,43 +1176,66 @@ set_vm_memory_high_watermark_command1(_Config) -> ) end. -max_length_drop_head(Config) -> +max_length_bytes_drop_head(Config) -> + max_length_bytes_drop_head(Config, [{<<"x-overflow">>, longstr, <<"drop-head">>}]). + +max_length_bytes_default(Config) -> + max_length_bytes_drop_head(Config, []). + +max_length_bytes_drop_head(Config, ExtraArgs) -> {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = <<"max_length_drop_head_queue">>, - QNameDefault = <<"max_length_default_drop_head_queue">>, - QNameBytes = <<"max_length_bytes_drop_head_queue">>, - QNameDefaultBytes = <<"max_length_bytes_default_drop_head_queue">>, + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), - MaxLengthArgs = [{<<"x-max-length">>, long, 1}], MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], - OverflowArgs = [{<<"x-overflow">>, longstr, <<"drop-head">>}], - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefault, arguments = MaxLengthArgs}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefaultBytes, arguments = MaxLengthBytesArgs}), - - check_max_length_drops_head(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>), - check_max_length_drops_head(Config, QNameDefault, Ch, <<"1">>, <<"2">>, <<"3">>), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthBytesArgs ++ Args ++ ExtraArgs, durable = Durable}), %% 80 bytes payload Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>, Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>, Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>, - check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3), - check_max_length_drops_head(Config, QNameDefault, Ch, Payload1, Payload2, Payload3). + check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3). + +max_length_drop_head(Config) -> + max_length_drop_head(Config, [{<<"x-overflow">>, longstr, <<"drop-head">>}]). + +max_length_default(Config) -> + %% Defaults to drop_head + max_length_drop_head(Config, []). + +max_length_drop_head(Config, ExtraArgs) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + + MaxLengthArgs = [{<<"x-max-length">>, long, 1}], + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ Args ++ ExtraArgs, durable = Durable}), + + check_max_length_drops_head(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>). max_length_reject_confirm(Config) -> {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = <<"max_length_reject_queue">>, - QNameBytes = <<"max_length_bytes_reject_queue">>, + Args = ?config(queue_args, Config), + QName = ?config(queue_name, Config), + Durable = ?config(queue_durable, Config), MaxLengthArgs = [{<<"x-max-length">>, long, 1}], - MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}], - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>), - check_max_length_rejects(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>), + check_max_length_rejects(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>). + +max_length_bytes_reject_confirm(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + QNameBytes = ?config(queue_name, Config), + Durable = ?config(queue_durable, Config), + MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], + OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}], + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs ++ Args, durable = Durable}), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), %% 80 bytes payload Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>, @@ -1207,15 +1247,55 @@ max_length_reject_confirm(Config) -> max_length_drop_publish(Config) -> {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - QName = <<"max_length_drop_publish_queue">>, - QNameBytes = <<"max_length_bytes_drop_publish_queue">>, + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), MaxLengthArgs = [{<<"x-max-length">>, long, 1}], - MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}], - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}), %% If confirms are not enable, publishes will still be dropped in reject-publish mode. - check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>), + check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>). + +max_length_drop_publish_requeue(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + MaxLengthArgs = [{<<"x-max-length">>, long, 1}], + OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}], + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}), + %% If confirms are not enable, publishes will still be dropped in reject-publish mode. + check_max_length_requeue(Config, QName, Ch, <<"1">>, <<"2">>). + +check_max_length_requeue(Config, QName, Ch, Payload1, Payload2) -> + sync_mirrors(QName, Config), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + %% A single message is published and consumed + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + wait_for_consensus(QName, Config), + {#'basic.get_ok'{delivery_tag = DeliveryTag}, + #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + + %% Another message is published + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + wait_for_consensus(QName, Config), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}), + wait_for_consensus(QName, Config), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). + +max_length_bytes_drop_publish(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QNameBytes = ?config(queue_name, Config), + MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], + OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}], + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs ++ Args, durable = Durable}), %% 80 bytes payload Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>, @@ -1229,22 +1309,38 @@ check_max_length_drops_publish(Config, QName, Ch, Payload1, Payload2, Payload3) #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% A single message is published and consumed amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + wait_for_consensus(QName, Config), {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% Message 2 is dropped, message 1 stays amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + wait_for_consensus(QName, Config), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + wait_for_consensus(QName, Config), {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% Messages 2 and 3 are dropped, message 1 stays amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + wait_for_consensus(QName, Config), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + wait_for_consensus(QName, Config), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}), + wait_for_consensus(QName, Config), {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). +wait_for_consensus(QName, Config) -> + case lists:keyfind(<<"x-queue-type">>, 1, ?config(queue_args, Config)) of + {_, _, <<"quorum">>} -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + RaName = binary_to_atom(<<"%2F_", QName/binary>>, utf8), + {ok, _, _} = ra:members({RaName, Server}); + _ -> + ok + end. + check_max_length_rejects(Config, QName, Ch, Payload1, Payload2, Payload3) -> sync_mirrors(QName, Config), amqp_channel:register_confirm_handler(Ch, self()), @@ -1283,12 +1379,14 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) -> #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% A single message is published and consumed amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + wait_for_consensus(QName, Config), {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% Message 1 is replaced by message 2 amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + wait_for_consensus(QName, Config), {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), @@ -1296,6 +1394,7 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) -> amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}), + wait_for_consensus(QName, Config), {#'basic.get_ok'{}, #amqp_msg{payload = Payload3}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). |
