diff options
| author | Michael Klishin <michael@novemberain.com> | 2019-02-21 01:13:38 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-02-21 01:13:38 +0300 |
| commit | 9e4095fd906da893ca08b02836ce0716bbfac39f (patch) | |
| tree | c98002416ba4d63a08a8957b946c8547fadd602b | |
| parent | d32660c5fc2abc6250ff9df2393eef2d407e7351 (diff) | |
| parent | aee6ef638e9e25d5e687aa937fe8c93a2410ed3e (diff) | |
| download | rabbitmq-server-git-9e4095fd906da893ca08b02836ce0716bbfac39f.tar.gz | |
Merge pull request #1894 from rabbitmq/quorum-queue-grow-cmd
Quorum queue grow and shrink commands
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 105 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 39 | ||||
| -rw-r--r-- | test/rabbitmq_queues_cli_integration_SUITE.erl | 137 |
3 files changed, 262 insertions, 19 deletions
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 083acbb2d2..0ef6d88086 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -37,6 +37,8 @@ -export([requeue/3]). -export([policy_changed/2]). -export([cleanup_data_dir/0]). +-export([shrink_all/1, + grow/4]). %%-include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit.hrl"). @@ -663,7 +665,7 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> %% TODO parallel calls might crash this, or add a duplicate in quorum_nodes ServerId = {RaName, Node}, case ra:start_server(RaName, ServerId, ra_machine(Q), - [{RaName, N} || N <- QNodes]) of + [{RaName, N} || N <- QNodes]) of ok -> case ra:add_member(ServerRef, ServerId) of {ok, _, Leader} -> @@ -676,11 +678,15 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> rabbit_misc:execute_mnesia_transaction( fun() -> rabbit_amqqueue:update(QName, Fun) end), ok; + timeout -> + {error, timeout}; E -> %% TODO should we stop the ra process here? E end; - {error, _} = E -> + timeout -> + {error, timeout}; + E -> E end. @@ -710,20 +716,91 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> QName = amqqueue:get_name(Q), {RaName, _} = amqqueue:get_pid(Q), ServerId = {RaName, Node}, - case ra:leave_and_delete_server(ServerId) of - ok -> - Fun = fun(Q1) -> - amqqueue:set_quorum_nodes( - Q1, - lists:delete(Node, amqqueue:get_quorum_nodes(Q1))) - end, - rabbit_misc:execute_mnesia_transaction( - fun() -> rabbit_amqqueue:update(QName, Fun) end), - ok; - E -> - E + case amqqueue:get_quorum_nodes(Q) of + [Node] -> + %% deleting the last member is not allowed + {error, last_node}; + _ -> + case ra:leave_and_delete_server(ServerId) of + ok -> + Fun = fun(Q1) -> + amqqueue:set_quorum_nodes( + Q1, + lists:delete(Node, + amqqueue:get_quorum_nodes(Q1))) + end, + rabbit_misc:execute_mnesia_transaction( + fun() -> rabbit_amqqueue:update(QName, Fun) end), + ok; + timeout -> + {error, timeout}; + E -> + E + end end. +-spec shrink_all(node()) -> + [{rabbit_amqqueue:name(), + {ok, pos_integer()} | {error, pos_integer(), term()}}]. +shrink_all(Node) -> + [begin + QName = amqqueue:get_name(Q), + rabbit_log:info("~s: removing member (replica) on node ~w", + [rabbit_misc:rs(QName), Node]), + Size = length(amqqueue:get_quorum_nodes(Q)), + case delete_member(Q, Node) of + ok -> + {QName, {ok, Size-1}}; + {error, Err} -> + rabbit_log:warning("~s: failed to remove member (replica) on node ~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]), + {QName, {error, Size, Err}} + end + end || Q <- rabbit_amqqueue:list(), + amqqueue:get_type(Q) == quorum, + lists:member(Node, amqqueue:get_quorum_nodes(Q))]. + +-spec grow(node(), binary(), binary(), all | even) -> + [{rabbit_amqqueue:name(), + {ok, pos_integer()} | {error, pos_integer(), term()}}]. +grow(Node, VhostSpec, QueueSpec, Strategy) -> + Running = rabbit_mnesia:cluster_nodes(running), + [begin + Size = length(amqqueue:get_quorum_nodes(Q)), + QName = amqqueue:get_name(Q), + rabbit_log:info("~s: adding a new member (replica) on node ~w", + [rabbit_misc:rs(QName), Node]), + case add_member(Q, Node) of + ok -> + {QName, {ok, Size + 1}}; + {error, Err} -> + rabbit_log:warning( + "~s: failed to add member (replica) on node ~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]), + {QName, {error, Size, Err}} + end + end + || Q <- rabbit_amqqueue:list(), + amqqueue:get_type(Q) == quorum, + %% don't add a member if there is already one on the node + not lists:member(Node, amqqueue:get_quorum_nodes(Q)), + %% node needs to be running + lists:member(Node, Running), + matches_strategy(Strategy, amqqueue:get_quorum_nodes(Q)), + is_match(amqqueue:get_vhost(Q), VhostSpec) andalso + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]. + +get_resource_name(#resource{name = Name}) -> + Name. + +matches_strategy(all, _) -> true; +matches_strategy(even, Members) -> + length(Members) rem 2 == 0. + +is_match(Subj, E) -> + nomatch /= re:run(Subj, E). + + %%---------------------------------------------------------------------------- dlx_mfa(Q) -> DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>, diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 43a353d0ea..e5d631dfcf 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -68,7 +68,8 @@ groups() -> delete_declare, metrics_cleanup_on_leadership_takeover, metrics_cleanup_on_leader_crash, - consume_in_minority + consume_in_minority, + shrink_all ]}, {cluster_size_5, [], [start_queue, start_queue_concurrent, @@ -189,7 +190,8 @@ init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publ [{rmq_nodes_count, 3}, {rmq_nodename_suffix, Testcase}, {tcp_ports_base}, - {queue_name, Q} + {queue_name, Q}, + {alt_queue_name, <<Q/binary, "_alt">>} ]), Config3 = rabbit_ct_helpers:run_steps( Config2, @@ -209,7 +211,8 @@ init_per_testcase(Testcase, Config) -> rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), Q = rabbit_data_coercion:to_binary(Testcase), Config2 = rabbit_ct_helpers:set_config(Config1, - [{queue_name, Q} + [{queue_name, Q}, + {alt_queue_name, <<Q/binary, "_alt">>} ]), rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()). @@ -621,7 +624,33 @@ consume_in_minority(Config) -> ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _}, amqp_channel:call(Ch, #'basic.get'{queue = QQ, - no_ack = false})). + no_ack = false})), + ok = rabbit_ct_broker_helpers:start_node(Config, Server1), + ok = rabbit_ct_broker_helpers:start_node(Config, Server2), + ok. + +shrink_all(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + AQ = ?config(alt_queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', AQ, 0, 0}, + declare(Ch, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + timer:sleep(500), + Result = rpc:call(Server0, rabbit_quorum_queue, shrink_all, [Server2]), + ?assertMatch([{_, {ok, 2}}, {_, {ok, 2}}], Result), + Result1 = rpc:call(Server0, rabbit_quorum_queue, shrink_all, [Server1]), + ?assertMatch([{_, {ok, 1}}, {_, {ok, 1}}], Result1), + Result2 = rpc:call(Server0, rabbit_quorum_queue, shrink_all, [Server0]), + ?assertMatch([{_, {error, 1, last_node}}, + {_, {error, 1, last_node}}], Result2), + ok. + + subscribe_should_fail_when_global_qos_true(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -640,7 +669,7 @@ subscribe_should_fail_when_global_qos_true(Config) -> _ -> exit(subscribe_should_not_pass) catch _:_ = Err -> - ct:pal("Err ~p", [Err]) + ct:pal("subscribe_should_fail_when_global_qos_true caught an error: ~p", [Err]) end, ok. diff --git a/test/rabbitmq_queues_cli_integration_SUITE.erl b/test/rabbitmq_queues_cli_integration_SUITE.erl new file mode 100644 index 0000000000..1601ba82fe --- /dev/null +++ b/test/rabbitmq_queues_cli_integration_SUITE.erl @@ -0,0 +1,137 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2017-2019 Pivotal Software, Inc. All rights reserved. +%% +-module(rabbitmq_queues_cli_integration_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +all() -> + [ + {group, tests} + ]. + +groups() -> + [ + {tests, [], [ + shrink, + grow, + grow_invalid_node_filtered + ]} + ]. + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(tests, Config0) -> + NumNodes = 3, + Config1 = rabbit_ct_helpers:set_config( + Config0, [{rmq_nodes_count, NumNodes}, + {rmq_nodes_clustered, true}]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() + ). + +end_per_group(tests, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config0) -> + rabbit_ct_helpers:ensure_rabbitmq_queues_cmd( + rabbit_ct_helpers:testcase_started(Config0, Testcase)). + +end_per_testcase(Testcase, Config0) -> + rabbit_ct_helpers:testcase_finished(Config0, Testcase). + +shrink(Config) -> + NodeConfig = rabbit_ct_broker_helpers:get_node_config(Config, 2), + Nodename2 = ?config(nodename, NodeConfig), + Ch = rabbit_ct_client_helpers:open_channel(Config, Nodename2), + %% declare a quorum queue + QName = "shrink1", + #'queue.declare_ok'{} = declare_qq(Ch, QName), + {ok, Out1} = rabbitmq_queues(Config, 0, ["shrink", Nodename2]), + ?assertMatch(#{{"/", "shrink1"} := {2, ok}}, parse_result(Out1)), + Nodename1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename), + {ok, Out2} = rabbitmq_queues(Config, 0, ["shrink", Nodename1]), + ?assertMatch(#{{"/", "shrink1"} := {1, ok}}, parse_result(Out2)), + Nodename0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + {ok, Out3} = rabbitmq_queues(Config, 0, ["shrink", Nodename0]), + ?assertMatch(#{{"/", "shrink1"} := {1, error}}, parse_result(Out3)), + ok. + +grow(Config) -> + NodeConfig = rabbit_ct_broker_helpers:get_node_config(Config, 2), + Nodename2 = ?config(nodename, NodeConfig), + Ch = rabbit_ct_client_helpers:open_channel(Config, Nodename2), + %% declare a quorum queue + QName = "grow1", + Args = [{<<"x-quorum-initial-group-size">>, long, 1}], + #'queue.declare_ok'{} = declare_qq(Ch, QName, Args), + Nodename0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + {ok, Out1} = rabbitmq_queues(Config, 0, ["grow", Nodename0, "all"]), + ?assertMatch(#{{"/", "grow1"} := {2, ok}}, parse_result(Out1)), + Nodename1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename), + {ok, Out2} = rabbitmq_queues(Config, 0, ["grow", Nodename1, "all"]), + ?assertMatch(#{{"/", "grow1"} := {3, ok}}, parse_result(Out2)), + + {ok, Out3} = rabbitmq_queues(Config, 0, ["grow", Nodename0, "all"]), + ?assertNotMatch(#{{"/", "grow1"} := _}, parse_result(Out3)), + ok. + +grow_invalid_node_filtered(Config) -> + NodeConfig = rabbit_ct_broker_helpers:get_node_config(Config, 2), + Nodename2 = ?config(nodename, NodeConfig), + Ch = rabbit_ct_client_helpers:open_channel(Config, Nodename2), + %% declare a quorum queue + QName = "grow-err", + Args = [{<<"x-quorum-initial-group-size">>, long, 1}], + #'queue.declare_ok'{} = declare_qq(Ch, QName, Args), + DummyNode = not_really_a_node@nothing, + {ok, Out1} = rabbitmq_queues(Config, 0, ["grow", DummyNode, "all"]), + ?assertNotMatch(#{{"/", "grow-err"} := _}, parse_result(Out1)), + ok. + +parse_result(S) -> + Lines = string:split(S, "\n", all), + maps:from_list( + [{{Vhost, QName}, + {erlang:list_to_integer(Size), case Result of + "ok" -> ok; + _ -> error + end}} + || [Vhost, QName, Size, Result] <- + [string:split(L, "\t", all) || L <- Lines]]). + +declare_qq(Ch, Q, Args0) -> + Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}] ++ Args0, + amqp_channel:call(Ch, #'queue.declare'{queue = list_to_binary(Q), + durable = true, + auto_delete = false, + arguments = Args}). +declare_qq(Ch, Q) -> + declare_qq(Ch, Q, []). + +rabbitmq_queues(Config, N, Args) -> + rabbit_ct_broker_helpers:rabbitmq_queues(Config, N, ["--silent" | Args]). |
