summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2019-01-15 00:45:17 +0300
committerGitHub <noreply@github.com>2019-01-15 00:45:17 +0300
commit45632289309e80a38348ed936961330a4ae0763b (patch)
tree4680ccc9d6631accb723d0cab4306bd454be2bb0
parent3c42423dd4a1f445d0be28338b53c5379442195f (diff)
parent2ca9c2927949199d099126c6cebdccb4d06bb615 (diff)
downloadrabbitmq-server-git-45632289309e80a38348ed936961330a4ae0763b.tar.gz
Merge pull request #1819 from rabbitmq/qq-testing
Testing of quorum queues
-rw-r--r--src/rabbit_amqqueue.erl14
-rw-r--r--src/rabbit_channel.erl1
-rw-r--r--src/rabbit_fifo.erl6
-rw-r--r--src/rabbit_fifo_client.erl10
-rw-r--r--src/rabbit_quorum_queue.erl11
-rw-r--r--test/dynamic_qq_SUITE.erl251
-rw-r--r--test/quorum_queue_SUITE.erl52
-rw-r--r--test/quorum_queue_utils.erl56
8 files changed, 344 insertions, 57 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d4301647d3..fe73e760b2 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -1278,6 +1278,9 @@ forget_all_durable(Node) ->
%% Try to promote a slave while down - it should recover as a
%% master. We try to take the oldest slave here for best chance of
%% recovery.
+forget_node_for_queue(DeadNode, Q = #amqqueue{type = quorum,
+ quorum_nodes = QN}) ->
+ forget_node_for_queue(DeadNode, QN, Q);
forget_node_for_queue(DeadNode, Q = #amqqueue{recoverable_slaves = RS}) ->
forget_node_for_queue(DeadNode, RS, Q).
@@ -1291,11 +1294,12 @@ forget_node_for_queue(_DeadNode, [], #amqqueue{name = Name}) ->
forget_node_for_queue(DeadNode, [DeadNode | T], Q) ->
forget_node_for_queue(DeadNode, T, Q);
-forget_node_for_queue(DeadNode, [H|T], Q) ->
- case node_permits_offline_promotion(H) of
- false -> forget_node_for_queue(DeadNode, T, Q);
- true -> Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H)},
- ok = mnesia:write(rabbit_durable_queue, Q1, write)
+forget_node_for_queue(DeadNode, [H|T], #amqqueue{type = Type} = Q) ->
+ case {node_permits_offline_promotion(H), Type} of
+ {false, _} -> forget_node_for_queue(DeadNode, T, Q);
+ {true, classic} -> Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H)},
+ ok = mnesia:write(rabbit_durable_queue, Q1, write);
+ {true, quorum} -> ok
end.
node_permits_offline_promotion(Node) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a4e4638bb2..805d9f538f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -562,7 +562,6 @@ handle_cast({method, Method, Content, Flow},
flow -> credit_flow:ack(Reader);
noflow -> ok
end,
-
try handle_method(rabbit_channel_interceptor:intercept_in(
expand_shortcuts(Method, State), Content, IState),
State) of
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index fc699111d1..4fe4d954b9 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -42,6 +42,7 @@
query_ra_indexes/1,
query_consumer_count/1,
query_consumers/1,
+ query_stat/1,
usage/1,
zero/1,
@@ -721,7 +722,10 @@ query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsume
Acc)
end, #{}, WaitingConsumers),
maps:merge(FromConsumers, FromWaitingConsumers).
-%% other
+
+query_stat(#state{messages = M,
+ consumers = Consumers}) ->
+ {maps:size(M), maps:size(Consumers)}.
-spec usage(atom()) -> float().
usage(Name) when is_atom(Name) ->
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 5a7eb37b9d..dfbf2f477a 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -38,7 +38,8 @@
untracked_enqueue/2,
purge/1,
cluster_name/1,
- update_machine_state/2
+ update_machine_state/2,
+ stat/1
]).
-include_lib("ra/include/ra.hrl").
@@ -398,6 +399,13 @@ purge(Node) ->
Err
end.
+-spec stat(ra_server_id()) -> {ok, {non_neg_integer(), non_neg_integer()}}
+ | {error | timeout, term()}.
+stat(Leader) ->
+ Query = fun (State) -> rabbit_fifo:query_stat(State) end,
+ {ok, {_, Stat}, _} = ra:local_query(Leader, Query),
+ Stat.
+
%% @doc returns the cluster name
-spec cluster_name(state()) -> cluster_name().
cluster_name(#state{cluster_name = ClusterName}) ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 5c8524bdbc..429be39067 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -432,8 +432,15 @@ infos(QName) ->
info(Q, Items) ->
[{Item, i(Item, Q)} || Item <- Items].
-stat(_Q) ->
- {ok, 0, 0}. %% TODO length, consumers count
+stat(#amqqueue{pid = Leader}) ->
+ try
+ {Ready, Consumers} = rabbit_fifo_client:stat(Leader),
+ {ok, Ready, Consumers}
+ catch
+ _:_ ->
+ %% Leader is not available, cluster might be in minority
+ {ok, 0, 0}
+ end.
purge(Node) ->
rabbit_fifo_client:purge(Node).
diff --git a/test/dynamic_qq_SUITE.erl b/test/dynamic_qq_SUITE.erl
new file mode 100644
index 0000000000..d1158ef07a
--- /dev/null
+++ b/test/dynamic_qq_SUITE.erl
@@ -0,0 +1,251 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+%% This test suite is an adaptation from dynamic_ha_SUITE for quorum queues.
+%% Some test cases didn't make sense, but others could be adapted for quorum queue.
+%%
+%%
+-module(dynamic_qq_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("proper/include/proper.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-import(quorum_queue_utils, [wait_for_messages_ready/3,
+ ra_name/1]).
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, clustered}
+ ].
+
+groups() ->
+ [
+ {clustered, [], [
+ {cluster_size_2, [], [
+ vhost_deletion,
+ force_delete_if_no_consensus,
+ takeover_on_failure,
+ takeover_on_shutdown,
+ quorum_unaffected_after_vhost_failure
+ ]},
+ {cluster_size_3, [], [
+ recover_follower_after_standalone_restart
+ ]}
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(clustered, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]);
+init_per_group(cluster_size_2, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 2}]);
+init_per_group(cluster_size_3, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 3}]).
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
+ Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
+ Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}},
+ {queue_name, Q},
+ {queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+%% Vhost deletion needs to successfully tear down queues.
+vhost_deletion(Config) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Node),
+ QName = ?config(queue_name, Config),
+ Args = ?config(queue_args, Config),
+ amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ arguments = Args,
+ durable = true
+ }),
+ ok = rpc:call(Node, rabbit_vhost, delete, [<<"/">>, <<"acting-user">>]),
+ ?assertMatch([],
+ rabbit_ct_broker_helpers:rabbitmqctl_list(
+ Config, 0, ["list_queues", "name"])),
+ ok.
+
+force_delete_if_no_consensus(Config) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ QName = ?config(queue_name, Config),
+ Args = ?config(queue_args, Config),
+ amqp_channel:call(ACh, #'queue.declare'{queue = QName,
+ arguments = Args,
+ durable = true
+ }),
+ rabbit_ct_client_helpers:publish(ACh, QName, 10),
+ ok = rabbit_ct_broker_helpers:restart_node(Config, B),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, A),
+
+ BCh = rabbit_ct_client_helpers:open_channel(Config, B),
+ ?assertMatch(
+ #'queue.declare_ok'{},
+ amqp_channel:call(
+ BCh, #'queue.declare'{queue = QName,
+ arguments = Args,
+ durable = true,
+ passive = true})),
+ %% TODO implement a force delete
+ BCh2 = rabbit_ct_client_helpers:open_channel(Config, B),
+ ?assertExit({{shutdown,
+ {connection_closing, {server_initiated_close, 541, _}}}, _},
+ amqp_channel:call(BCh2, #'queue.delete'{queue = QName})),
+ ok.
+
+takeover_on_failure(Config) ->
+ takeover_on(Config, kill_node).
+
+takeover_on_shutdown(Config) ->
+ takeover_on(Config, stop_node).
+
+takeover_on(Config, Fun) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ QName = ?config(queue_name, Config),
+ Args = ?config(queue_args, Config),
+ amqp_channel:call(ACh, #'queue.declare'{queue = QName,
+ arguments = Args,
+ durable = true
+ }),
+ rabbit_ct_client_helpers:publish(ACh, QName, 10),
+ ok = rabbit_ct_broker_helpers:restart_node(Config, B),
+
+ ok = rabbit_ct_broker_helpers:Fun(Config, A),
+
+ BCh = rabbit_ct_client_helpers:open_channel(Config, B),
+ #'queue.declare_ok'{message_count = 0} =
+ amqp_channel:call(
+ BCh, #'queue.declare'{queue = QName,
+ arguments = Args,
+ durable = true}),
+ ok = rabbit_ct_broker_helpers:start_node(Config, A),
+ ACh2 = rabbit_ct_client_helpers:open_channel(Config, A),
+ #'queue.declare_ok'{message_count = 10} =
+ amqp_channel:call(
+ ACh2, #'queue.declare'{queue = QName,
+ arguments = Args,
+ durable = true}),
+ ok.
+
+quorum_unaffected_after_vhost_failure(Config) ->
+ [A, B] = Servers0 = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Servers = lists:sort(Servers0),
+
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ QName = ?config(queue_name, Config),
+ Args = ?config(queue_args, Config),
+ amqp_channel:call(ACh, #'queue.declare'{queue = QName,
+ arguments = Args,
+ durable = true
+ }),
+ timer:sleep(300),
+
+ Info0 = rpc:call(A, rabbit_quorum_queue, infos,
+ [rabbit_misc:r(<<"/">>, queue, QName)]),
+ ?assertEqual(Servers, lists:sort(proplists:get_value(online, Info0, []))),
+
+ %% Crash vhost on both nodes
+ {ok, SupA} = rabbit_ct_broker_helpers:rpc(Config, A, rabbit_vhost_sup_sup, get_vhost_sup, [<<"/">>]),
+ exit(SupA, foo),
+ {ok, SupB} = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, get_vhost_sup, [<<"/">>]),
+ exit(SupB, foo),
+
+ Info = rpc:call(A, rabbit_quorum_queue, infos,
+ [rabbit_misc:r(<<"/">>, queue, QName)]),
+ ?assertEqual(Servers, lists:sort(proplists:get_value(online, Info, []))).
+
+recover_follower_after_standalone_restart(Config) ->
+ %% Tests that followers can be brought up standalone after forgetting the rest
+ %% of the cluster. Consensus won't be reached as there is only one node in the
+ %% new cluster.
+ Servers = [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, A),
+
+ QName = ?config(queue_name, Config),
+ Args = ?config(queue_args, Config),
+ amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ arguments = Args,
+ durable = true
+ }),
+
+ rabbit_ct_client_helpers:publish(Ch, QName, 15),
+ rabbit_ct_client_helpers:close_channel(Ch),
+
+ Name = ra_name(QName),
+ wait_for_messages_ready(Servers, Name, 15),
+
+ rabbit_ct_broker_helpers:stop_node(Config, C),
+ rabbit_ct_broker_helpers:stop_node(Config, B),
+ rabbit_ct_broker_helpers:stop_node(Config, A),
+
+ %% Restart one follower
+ forget_cluster_node(Config, B, C),
+ forget_cluster_node(Config, B, A),
+
+ ok = rabbit_ct_broker_helpers:start_node(Config, B),
+ wait_for_messages_ready([B], Name, 15),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, B),
+
+ %% Restart the other
+ forget_cluster_node(Config, C, B),
+ forget_cluster_node(Config, C, A),
+
+ ok = rabbit_ct_broker_helpers:start_node(Config, C),
+ wait_for_messages_ready([C], Name, 15),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, C),
+
+ ok.
+
+%%----------------------------------------------------------------------------
+forget_cluster_node(Config, Node, NodeToRemove) ->
+ rabbit_ct_broker_helpers:rabbitmqctl(
+ Config, Node, ["forget_cluster_node", "--offline", NodeToRemove]).
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 0b27e58a7e..0beecf6c79 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -20,6 +20,11 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-import(quorum_queue_utils, [wait_for_messages_ready/3,
+ wait_for_messages_pending_ack/3,
+ dirty_query/3,
+ ra_name/1]).
+
-compile(export_all).
all() ->
@@ -597,9 +602,6 @@ publish_confirm(Ch, QName) ->
ct:pal("CONFIRMED! ~s", [QName]),
ok.
-ra_name(Q) ->
- binary_to_atom(<<"%2F_", Q/binary>>, utf8).
-
publish_and_restart(Config) ->
%% Test the node restart with both types of queues (quorum and classic) to
%% ensure there are no regressions
@@ -2246,50 +2248,6 @@ wait_for_cleanup(Server, Channel, Number, N) ->
wait_for_cleanup(Server, Channel, Number, N - 1)
end.
-
-wait_for_messages_ready(Servers, QName, Ready) ->
- wait_for_messages(Servers, QName, Ready,
- fun rabbit_fifo:query_messages_ready/1, 60).
-
-wait_for_messages_pending_ack(Servers, QName, Ready) ->
- wait_for_messages(Servers, QName, Ready,
- fun rabbit_fifo:query_messages_checked_out/1, 60).
-
-wait_for_messages(Servers, QName, Number, Fun, 0) ->
- Msgs = dirty_query(Servers, QName, Fun),
- Totals = lists:map(fun(M) when is_map(M) ->
- maps:size(M);
- (_) ->
- -1
- end, Msgs),
- ?assertEqual([Number || _ <- lists:seq(1, length(Servers))],
- Totals);
-wait_for_messages(Servers, QName, Number, Fun, N) ->
- Msgs = dirty_query(Servers, QName, Fun),
- case lists:all(fun(M) when is_map(M) ->
- maps:size(M) == Number;
- (_) ->
- false
- end, Msgs) of
- true ->
- ok;
- _ ->
- timer:sleep(500),
- wait_for_messages(Servers, QName, Number, Fun, N - 1)
- end.
-
-dirty_query(Servers, QName, Fun) ->
- lists:map(
- fun(N) ->
- case rpc:call(N, ra, local_query, [{QName, N}, Fun]) of
- {ok, {_, Msgs}, _} ->
- ct:pal("Msgs ~w", [Msgs]),
- Msgs;
- _ ->
- undefined
- end
- end, Servers).
-
wait_until(Condition) ->
wait_until(Condition, 60).
diff --git a/test/quorum_queue_utils.erl b/test/quorum_queue_utils.erl
new file mode 100644
index 0000000000..a216c220e6
--- /dev/null
+++ b/test/quorum_queue_utils.erl
@@ -0,0 +1,56 @@
+-module(quorum_queue_utils).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-export([
+ wait_for_messages_ready/3,
+ wait_for_messages_pending_ack/3,
+ dirty_query/3,
+ ra_name/1
+ ]).
+
+wait_for_messages_ready(Servers, QName, Ready) ->
+ wait_for_messages(Servers, QName, Ready,
+ fun rabbit_fifo:query_messages_ready/1, 60).
+
+wait_for_messages_pending_ack(Servers, QName, Ready) ->
+ wait_for_messages(Servers, QName, Ready,
+ fun rabbit_fifo:query_messages_checked_out/1, 60).
+
+wait_for_messages(Servers, QName, Number, Fun, 0) ->
+ Msgs = dirty_query(Servers, QName, Fun),
+ Totals = lists:map(fun(M) when is_map(M) ->
+ maps:size(M);
+ (_) ->
+ -1
+ end, Msgs),
+ ?assertEqual(Totals, [Number || _ <- lists:seq(1, length(Servers))]);
+wait_for_messages(Servers, QName, Number, Fun, N) ->
+ Msgs = dirty_query(Servers, QName, Fun),
+ ct:pal("Got messages ~p", [Msgs]),
+ case lists:all(fun(M) when is_map(M) ->
+ maps:size(M) == Number;
+ (_) ->
+ false
+ end, Msgs) of
+ true ->
+ ok;
+ _ ->
+ timer:sleep(500),
+ wait_for_messages(Servers, QName, Number, Fun, N - 1)
+ end.
+
+dirty_query(Servers, QName, Fun) ->
+ lists:map(
+ fun(N) ->
+ case rpc:call(N, ra, local_query, [{QName, N}, Fun]) of
+ {ok, {_, Msgs}, _} ->
+ Msgs;
+ _E ->
+ undefined
+ end
+ end, Servers).
+
+ra_name(Q) ->
+ binary_to_atom(<<"%2F_", Q/binary>>, utf8).
+