summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-02-14 14:46:41 +0000
committerkjnilsson <knilsson@pivotal.io>2019-02-14 14:48:02 +0000
commitb062641019a849f2c80118bf33c0882335cda431 (patch)
treea7f2c1e3868f5c79336bac48c86980b4544489d7
parent375f743238ba75376f038544cb6199bbd637bf5b (diff)
downloadrabbitmq-server-git-b062641019a849f2c80118bf33c0882335cda431.tar.gz
Add shrink_all/1 function
That takes a node and removes all quorum queue members for this node and returns an list of results for each queue. [#162782789]
-rw-r--r--src/rabbit_quorum_queue.erl19
-rw-r--r--test/quorum_queue_SUITE.erl33
2 files changed, 48 insertions, 4 deletions
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 79c07535a4..b9ca15467c 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -37,6 +37,7 @@
-export([requeue/3]).
-export([policy_changed/2]).
-export([cleanup_data_dir/0]).
+-export([shrink_all/1]).
%%-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit.hrl").
@@ -725,6 +726,24 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
E
end.
+shrink_all(Node) ->
+ [begin
+ QName = amqqueue:get_name(Q),
+ rabbit_log:info("~s: Removing member ~w",
+ [rabbit_misc:rs(QName), Node]),
+ case delete_member(Q, Node) of
+ ok ->
+ {ok, QName};
+ Err ->
+ rabbit_log:warning("~s: Failed to remove member ~w",
+ [rabbit_misc:rs(QName), Node]),
+ {error, QName, Err}
+ end
+ end || Q <- rabbit_amqqueue:list(),
+ amqqueue:get_type(Q) == quorum,
+ lists:member(Node, amqqueue:get_quorum_nodes(Q))].
+
+
%%----------------------------------------------------------------------------
dlx_mfa(Q) ->
DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>,
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 43a353d0ea..509d56ece4 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -68,7 +68,8 @@ groups() ->
delete_declare,
metrics_cleanup_on_leadership_takeover,
metrics_cleanup_on_leader_crash,
- consume_in_minority
+ consume_in_minority,
+ shrink_all
]},
{cluster_size_5, [], [start_queue,
start_queue_concurrent,
@@ -189,7 +190,8 @@ init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publ
[{rmq_nodes_count, 3},
{rmq_nodename_suffix, Testcase},
{tcp_ports_base},
- {queue_name, Q}
+ {queue_name, Q},
+ {alt_queue_name, <<Q/binary, "_alt">>}
]),
Config3 = rabbit_ct_helpers:run_steps(
Config2,
@@ -209,7 +211,8 @@ init_per_testcase(Testcase, Config) ->
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
Q = rabbit_data_coercion:to_binary(Testcase),
Config2 = rabbit_ct_helpers:set_config(Config1,
- [{queue_name, Q}
+ [{queue_name, Q},
+ {alt_queue_name, <<Q/binary, "_alt">>}
]),
rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()).
@@ -621,7 +624,29 @@ consume_in_minority(Config) ->
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
- no_ack = false})).
+ no_ack = false})),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
+ ok.
+
+shrink_all(Config) ->
+ [Server0, _Server1, Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ QQ = ?config(queue_name, Config),
+ AQ = ?config(alt_queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ ?assertEqual({'queue.declare_ok', AQ, 0, 0},
+ declare(Ch, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ timer:sleep(500),
+ Result = rpc:call(Server0, rabbit_quorum_queue, shrink_all, [Server2]),
+ ct:pal("shring all result ~p", [Result]),
+ ?assertMatch([{ok, _}, {ok, _}], Result),
+ ok.
+
+
subscribe_should_fail_when_global_qos_true(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),