diff options
| author | Michael Klishin <michael@novemberain.com> | 2019-01-15 00:45:17 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-01-15 00:45:17 +0300 |
| commit | 45632289309e80a38348ed936961330a4ae0763b (patch) | |
| tree | 4680ccc9d6631accb723d0cab4306bd454be2bb0 | |
| parent | 3c42423dd4a1f445d0be28338b53c5379442195f (diff) | |
| parent | 2ca9c2927949199d099126c6cebdccb4d06bb615 (diff) | |
| download | rabbitmq-server-git-45632289309e80a38348ed936961330a4ae0763b.tar.gz | |
Merge pull request #1819 from rabbitmq/qq-testing
Testing of quorum queues
| -rw-r--r-- | src/rabbit_amqqueue.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 11 | ||||
| -rw-r--r-- | test/dynamic_qq_SUITE.erl | 251 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 52 | ||||
| -rw-r--r-- | test/quorum_queue_utils.erl | 56 |
8 files changed, 344 insertions, 57 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d4301647d3..fe73e760b2 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -1278,6 +1278,9 @@ forget_all_durable(Node) -> %% Try to promote a slave while down - it should recover as a %% master. We try to take the oldest slave here for best chance of %% recovery. +forget_node_for_queue(DeadNode, Q = #amqqueue{type = quorum, + quorum_nodes = QN}) -> + forget_node_for_queue(DeadNode, QN, Q); forget_node_for_queue(DeadNode, Q = #amqqueue{recoverable_slaves = RS}) -> forget_node_for_queue(DeadNode, RS, Q). @@ -1291,11 +1294,12 @@ forget_node_for_queue(_DeadNode, [], #amqqueue{name = Name}) -> forget_node_for_queue(DeadNode, [DeadNode | T], Q) -> forget_node_for_queue(DeadNode, T, Q); -forget_node_for_queue(DeadNode, [H|T], Q) -> - case node_permits_offline_promotion(H) of - false -> forget_node_for_queue(DeadNode, T, Q); - true -> Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H)}, - ok = mnesia:write(rabbit_durable_queue, Q1, write) +forget_node_for_queue(DeadNode, [H|T], #amqqueue{type = Type} = Q) -> + case {node_permits_offline_promotion(H), Type} of + {false, _} -> forget_node_for_queue(DeadNode, T, Q); + {true, classic} -> Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H)}, + ok = mnesia:write(rabbit_durable_queue, Q1, write); + {true, quorum} -> ok end. node_permits_offline_promotion(Node) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a4e4638bb2..805d9f538f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -562,7 +562,6 @@ handle_cast({method, Method, Content, Flow}, flow -> credit_flow:ack(Reader); noflow -> ok end, - try handle_method(rabbit_channel_interceptor:intercept_in( expand_shortcuts(Method, State), Content, IState), State) of diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index fc699111d1..4fe4d954b9 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -42,6 +42,7 @@ query_ra_indexes/1, query_consumer_count/1, query_consumers/1, + query_stat/1, usage/1, zero/1, @@ -721,7 +722,10 @@ query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsume Acc) end, #{}, WaitingConsumers), maps:merge(FromConsumers, FromWaitingConsumers). -%% other + +query_stat(#state{messages = M, + consumers = Consumers}) -> + {maps:size(M), maps:size(Consumers)}. -spec usage(atom()) -> float(). usage(Name) when is_atom(Name) -> diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 5a7eb37b9d..dfbf2f477a 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -38,7 +38,8 @@ untracked_enqueue/2, purge/1, cluster_name/1, - update_machine_state/2 + update_machine_state/2, + stat/1 ]). -include_lib("ra/include/ra.hrl"). @@ -398,6 +399,13 @@ purge(Node) -> Err end. +-spec stat(ra_server_id()) -> {ok, {non_neg_integer(), non_neg_integer()}} + | {error | timeout, term()}. +stat(Leader) -> + Query = fun (State) -> rabbit_fifo:query_stat(State) end, + {ok, {_, Stat}, _} = ra:local_query(Leader, Query), + Stat. + %% @doc returns the cluster name -spec cluster_name(state()) -> cluster_name(). cluster_name(#state{cluster_name = ClusterName}) -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 5c8524bdbc..429be39067 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -432,8 +432,15 @@ infos(QName) -> info(Q, Items) -> [{Item, i(Item, Q)} || Item <- Items]. -stat(_Q) -> - {ok, 0, 0}. %% TODO length, consumers count +stat(#amqqueue{pid = Leader}) -> + try + {Ready, Consumers} = rabbit_fifo_client:stat(Leader), + {ok, Ready, Consumers} + catch + _:_ -> + %% Leader is not available, cluster might be in minority + {ok, 0, 0} + end. purge(Node) -> rabbit_fifo_client:purge(Node). diff --git a/test/dynamic_qq_SUITE.erl b/test/dynamic_qq_SUITE.erl new file mode 100644 index 0000000000..d1158ef07a --- /dev/null +++ b/test/dynamic_qq_SUITE.erl @@ -0,0 +1,251 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% +%% This test suite is an adaptation from dynamic_ha_SUITE for quorum queues. +%% Some test cases didn't make sense, but others could be adapted for quorum queue. +%% +%% +-module(dynamic_qq_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-import(quorum_queue_utils, [wait_for_messages_ready/3, + ra_name/1]). + +-compile(export_all). + +all() -> + [ + {group, clustered} + ]. + +groups() -> + [ + {clustered, [], [ + {cluster_size_2, [], [ + vhost_deletion, + force_delete_if_no_consensus, + takeover_on_failure, + takeover_on_shutdown, + quorum_unaffected_after_vhost_failure + ]}, + {cluster_size_3, [], [ + recover_follower_after_standalone_restart + ]} + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(clustered, Config) -> + rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]); +init_per_group(cluster_size_2, Config) -> + rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 2}]); +init_per_group(cluster_size_3, Config) -> + rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 3}]). + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + ClusterSize = ?config(rmq_nodes_count, Config), + TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase), + Group = proplists:get_value(name, ?config(tc_group_properties, Config)), + Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Testcase}, + {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}, + {queue_name, Q}, + {queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- +%% Vhost deletion needs to successfully tear down queues. +vhost_deletion(Config) -> + Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Node), + QName = ?config(queue_name, Config), + Args = ?config(queue_args, Config), + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + arguments = Args, + durable = true + }), + ok = rpc:call(Node, rabbit_vhost, delete, [<<"/">>, <<"acting-user">>]), + ?assertMatch([], + rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, ["list_queues", "name"])), + ok. + +force_delete_if_no_consensus(Config) -> + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + QName = ?config(queue_name, Config), + Args = ?config(queue_args, Config), + amqp_channel:call(ACh, #'queue.declare'{queue = QName, + arguments = Args, + durable = true + }), + rabbit_ct_client_helpers:publish(ACh, QName, 10), + ok = rabbit_ct_broker_helpers:restart_node(Config, B), + ok = rabbit_ct_broker_helpers:stop_node(Config, A), + + BCh = rabbit_ct_client_helpers:open_channel(Config, B), + ?assertMatch( + #'queue.declare_ok'{}, + amqp_channel:call( + BCh, #'queue.declare'{queue = QName, + arguments = Args, + durable = true, + passive = true})), + %% TODO implement a force delete + BCh2 = rabbit_ct_client_helpers:open_channel(Config, B), + ?assertExit({{shutdown, + {connection_closing, {server_initiated_close, 541, _}}}, _}, + amqp_channel:call(BCh2, #'queue.delete'{queue = QName})), + ok. + +takeover_on_failure(Config) -> + takeover_on(Config, kill_node). + +takeover_on_shutdown(Config) -> + takeover_on(Config, stop_node). + +takeover_on(Config, Fun) -> + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + QName = ?config(queue_name, Config), + Args = ?config(queue_args, Config), + amqp_channel:call(ACh, #'queue.declare'{queue = QName, + arguments = Args, + durable = true + }), + rabbit_ct_client_helpers:publish(ACh, QName, 10), + ok = rabbit_ct_broker_helpers:restart_node(Config, B), + + ok = rabbit_ct_broker_helpers:Fun(Config, A), + + BCh = rabbit_ct_client_helpers:open_channel(Config, B), + #'queue.declare_ok'{message_count = 0} = + amqp_channel:call( + BCh, #'queue.declare'{queue = QName, + arguments = Args, + durable = true}), + ok = rabbit_ct_broker_helpers:start_node(Config, A), + ACh2 = rabbit_ct_client_helpers:open_channel(Config, A), + #'queue.declare_ok'{message_count = 10} = + amqp_channel:call( + ACh2, #'queue.declare'{queue = QName, + arguments = Args, + durable = true}), + ok. + +quorum_unaffected_after_vhost_failure(Config) -> + [A, B] = Servers0 = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Servers = lists:sort(Servers0), + + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + QName = ?config(queue_name, Config), + Args = ?config(queue_args, Config), + amqp_channel:call(ACh, #'queue.declare'{queue = QName, + arguments = Args, + durable = true + }), + timer:sleep(300), + + Info0 = rpc:call(A, rabbit_quorum_queue, infos, + [rabbit_misc:r(<<"/">>, queue, QName)]), + ?assertEqual(Servers, lists:sort(proplists:get_value(online, Info0, []))), + + %% Crash vhost on both nodes + {ok, SupA} = rabbit_ct_broker_helpers:rpc(Config, A, rabbit_vhost_sup_sup, get_vhost_sup, [<<"/">>]), + exit(SupA, foo), + {ok, SupB} = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, get_vhost_sup, [<<"/">>]), + exit(SupB, foo), + + Info = rpc:call(A, rabbit_quorum_queue, infos, + [rabbit_misc:r(<<"/">>, queue, QName)]), + ?assertEqual(Servers, lists:sort(proplists:get_value(online, Info, []))). + +recover_follower_after_standalone_restart(Config) -> + %% Tests that followers can be brought up standalone after forgetting the rest + %% of the cluster. Consensus won't be reached as there is only one node in the + %% new cluster. + Servers = [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, A), + + QName = ?config(queue_name, Config), + Args = ?config(queue_args, Config), + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + arguments = Args, + durable = true + }), + + rabbit_ct_client_helpers:publish(Ch, QName, 15), + rabbit_ct_client_helpers:close_channel(Ch), + + Name = ra_name(QName), + wait_for_messages_ready(Servers, Name, 15), + + rabbit_ct_broker_helpers:stop_node(Config, C), + rabbit_ct_broker_helpers:stop_node(Config, B), + rabbit_ct_broker_helpers:stop_node(Config, A), + + %% Restart one follower + forget_cluster_node(Config, B, C), + forget_cluster_node(Config, B, A), + + ok = rabbit_ct_broker_helpers:start_node(Config, B), + wait_for_messages_ready([B], Name, 15), + ok = rabbit_ct_broker_helpers:stop_node(Config, B), + + %% Restart the other + forget_cluster_node(Config, C, B), + forget_cluster_node(Config, C, A), + + ok = rabbit_ct_broker_helpers:start_node(Config, C), + wait_for_messages_ready([C], Name, 15), + ok = rabbit_ct_broker_helpers:stop_node(Config, C), + + ok. + +%%---------------------------------------------------------------------------- +forget_cluster_node(Config, Node, NodeToRemove) -> + rabbit_ct_broker_helpers:rabbitmqctl( + Config, Node, ["forget_cluster_node", "--offline", NodeToRemove]). diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 0b27e58a7e..0beecf6c79 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -20,6 +20,11 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-import(quorum_queue_utils, [wait_for_messages_ready/3, + wait_for_messages_pending_ack/3, + dirty_query/3, + ra_name/1]). + -compile(export_all). all() -> @@ -597,9 +602,6 @@ publish_confirm(Ch, QName) -> ct:pal("CONFIRMED! ~s", [QName]), ok. -ra_name(Q) -> - binary_to_atom(<<"%2F_", Q/binary>>, utf8). - publish_and_restart(Config) -> %% Test the node restart with both types of queues (quorum and classic) to %% ensure there are no regressions @@ -2246,50 +2248,6 @@ wait_for_cleanup(Server, Channel, Number, N) -> wait_for_cleanup(Server, Channel, Number, N - 1) end. - -wait_for_messages_ready(Servers, QName, Ready) -> - wait_for_messages(Servers, QName, Ready, - fun rabbit_fifo:query_messages_ready/1, 60). - -wait_for_messages_pending_ack(Servers, QName, Ready) -> - wait_for_messages(Servers, QName, Ready, - fun rabbit_fifo:query_messages_checked_out/1, 60). - -wait_for_messages(Servers, QName, Number, Fun, 0) -> - Msgs = dirty_query(Servers, QName, Fun), - Totals = lists:map(fun(M) when is_map(M) -> - maps:size(M); - (_) -> - -1 - end, Msgs), - ?assertEqual([Number || _ <- lists:seq(1, length(Servers))], - Totals); -wait_for_messages(Servers, QName, Number, Fun, N) -> - Msgs = dirty_query(Servers, QName, Fun), - case lists:all(fun(M) when is_map(M) -> - maps:size(M) == Number; - (_) -> - false - end, Msgs) of - true -> - ok; - _ -> - timer:sleep(500), - wait_for_messages(Servers, QName, Number, Fun, N - 1) - end. - -dirty_query(Servers, QName, Fun) -> - lists:map( - fun(N) -> - case rpc:call(N, ra, local_query, [{QName, N}, Fun]) of - {ok, {_, Msgs}, _} -> - ct:pal("Msgs ~w", [Msgs]), - Msgs; - _ -> - undefined - end - end, Servers). - wait_until(Condition) -> wait_until(Condition, 60). diff --git a/test/quorum_queue_utils.erl b/test/quorum_queue_utils.erl new file mode 100644 index 0000000000..a216c220e6 --- /dev/null +++ b/test/quorum_queue_utils.erl @@ -0,0 +1,56 @@ +-module(quorum_queue_utils). + +-include_lib("eunit/include/eunit.hrl"). + +-export([ + wait_for_messages_ready/3, + wait_for_messages_pending_ack/3, + dirty_query/3, + ra_name/1 + ]). + +wait_for_messages_ready(Servers, QName, Ready) -> + wait_for_messages(Servers, QName, Ready, + fun rabbit_fifo:query_messages_ready/1, 60). + +wait_for_messages_pending_ack(Servers, QName, Ready) -> + wait_for_messages(Servers, QName, Ready, + fun rabbit_fifo:query_messages_checked_out/1, 60). + +wait_for_messages(Servers, QName, Number, Fun, 0) -> + Msgs = dirty_query(Servers, QName, Fun), + Totals = lists:map(fun(M) when is_map(M) -> + maps:size(M); + (_) -> + -1 + end, Msgs), + ?assertEqual(Totals, [Number || _ <- lists:seq(1, length(Servers))]); +wait_for_messages(Servers, QName, Number, Fun, N) -> + Msgs = dirty_query(Servers, QName, Fun), + ct:pal("Got messages ~p", [Msgs]), + case lists:all(fun(M) when is_map(M) -> + maps:size(M) == Number; + (_) -> + false + end, Msgs) of + true -> + ok; + _ -> + timer:sleep(500), + wait_for_messages(Servers, QName, Number, Fun, N - 1) + end. + +dirty_query(Servers, QName, Fun) -> + lists:map( + fun(N) -> + case rpc:call(N, ra, local_query, [{QName, N}, Fun]) of + {ok, {_, Msgs}, _} -> + Msgs; + _E -> + undefined + end + end, Servers). + +ra_name(Q) -> + binary_to_atom(<<"%2F_", Q/binary>>, utf8). + |
