diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-02-14 14:46:41 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-02-14 14:48:02 +0000 |
| commit | b062641019a849f2c80118bf33c0882335cda431 (patch) | |
| tree | a7f2c1e3868f5c79336bac48c86980b4544489d7 | |
| parent | 375f743238ba75376f038544cb6199bbd637bf5b (diff) | |
| download | rabbitmq-server-git-b062641019a849f2c80118bf33c0882335cda431.tar.gz | |
Add shrink_all/1 function
That takes a node and removes all quorum queue members for this node and
returns an list of results for each queue.
[#162782789]
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 19 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 33 |
2 files changed, 48 insertions, 4 deletions
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 79c07535a4..b9ca15467c 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -37,6 +37,7 @@ -export([requeue/3]). -export([policy_changed/2]). -export([cleanup_data_dir/0]). +-export([shrink_all/1]). %%-include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit.hrl"). @@ -725,6 +726,24 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> E end. +shrink_all(Node) -> + [begin + QName = amqqueue:get_name(Q), + rabbit_log:info("~s: Removing member ~w", + [rabbit_misc:rs(QName), Node]), + case delete_member(Q, Node) of + ok -> + {ok, QName}; + Err -> + rabbit_log:warning("~s: Failed to remove member ~w", + [rabbit_misc:rs(QName), Node]), + {error, QName, Err} + end + end || Q <- rabbit_amqqueue:list(), + amqqueue:get_type(Q) == quorum, + lists:member(Node, amqqueue:get_quorum_nodes(Q))]. + + %%---------------------------------------------------------------------------- dlx_mfa(Q) -> DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>, diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 43a353d0ea..509d56ece4 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -68,7 +68,8 @@ groups() -> delete_declare, metrics_cleanup_on_leadership_takeover, metrics_cleanup_on_leader_crash, - consume_in_minority + consume_in_minority, + shrink_all ]}, {cluster_size_5, [], [start_queue, start_queue_concurrent, @@ -189,7 +190,8 @@ init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publ [{rmq_nodes_count, 3}, {rmq_nodename_suffix, Testcase}, {tcp_ports_base}, - {queue_name, Q} + {queue_name, Q}, + {alt_queue_name, <<Q/binary, "_alt">>} ]), Config3 = rabbit_ct_helpers:run_steps( Config2, @@ -209,7 +211,8 @@ init_per_testcase(Testcase, Config) -> rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), Q = rabbit_data_coercion:to_binary(Testcase), Config2 = rabbit_ct_helpers:set_config(Config1, - [{queue_name, Q} + [{queue_name, Q}, + {alt_queue_name, <<Q/binary, "_alt">>} ]), rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()). @@ -621,7 +624,29 @@ consume_in_minority(Config) -> ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _}, amqp_channel:call(Ch, #'basic.get'{queue = QQ, - no_ack = false})). + no_ack = false})), + ok = rabbit_ct_broker_helpers:start_node(Config, Server1), + ok = rabbit_ct_broker_helpers:start_node(Config, Server2), + ok. + +shrink_all(Config) -> + [Server0, _Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + AQ = ?config(alt_queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', AQ, 0, 0}, + declare(Ch, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + timer:sleep(500), + Result = rpc:call(Server0, rabbit_quorum_queue, shrink_all, [Server2]), + ct:pal("shring all result ~p", [Result]), + ?assertMatch([{ok, _}, {ok, _}], Result), + ok. + + subscribe_should_fail_when_global_qos_true(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |
