diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2018-11-06 13:54:47 +0000 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2018-11-06 13:54:47 +0000 |
| commit | ee7cbf53e97e59596bd1c00fd75a9cd440a7b053 (patch) | |
| tree | 603fe7b35a6b9f03d89f1d806d1523a0039cc611 | |
| parent | d098858a2152776ec316799d636a3ad24861fc64 (diff) | |
| download | rabbitmq-server-git-ee7cbf53e97e59596bd1c00fd75a9cd440a7b053.tar.gz | |
Cleanup data for missing quorum queues
[#161313536]
| -rw-r--r-- | src/rabbit_amqqueue.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 21 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 31 |
3 files changed, 62 insertions, 1 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7c4386ba57..40b74c04ac 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -30,6 +30,7 @@ emit_info_all/5, list_local/1, info_local/1, emit_info_local/4, emit_info_down/4]). -export([list_down/1, count/1, list_names/0, list_local_names/0]). +-export([list_by_type/1]). -export([notify_policy_changed/1]). -export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]). -export([basic_get/6, basic_consume/12, basic_cancel/6, notify_decorators/1]). @@ -124,6 +125,7 @@ -spec list(rabbit_types:vhost()) -> [rabbit_types:amqqueue()]. -spec list_names() -> [rabbit_amqqueue:name()]. -spec list_down(rabbit_types:vhost()) -> [rabbit_types:amqqueue()]. +-spec list_by_type(atom()) -> [rabbit_types:amqqueue()]. -spec info_keys() -> rabbit_types:info_keys(). -spec info(rabbit_types:amqqueue()) -> rabbit_types:infos(). -spec info(rabbit_types:amqqueue(), rabbit_types:info_keys()) -> @@ -741,6 +743,15 @@ list_local_names() -> [ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(), State =/= crashed, is_local_to_node(QPid, node())]. +list_by_type(Type) -> + {atomic, Qs} = + mnesia:sync_transaction( + fun () -> + mnesia:match_object(rabbit_durable_queue, + #amqqueue{_ = '_', type = Type}, read) + end), + Qs. + list_local_followers() -> [ Q#amqqueue.name || #amqqueue{state = State, type = quorum, pid = {_, Leader}, diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 4823d4ef32..a438c4826e 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -34,6 +34,7 @@ -export([add_member/3]). -export([delete_member/3]). -export([requeue/3]). +-export([cleanup_data_dir/0]). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -389,6 +390,26 @@ purge(Node) -> requeue(ConsumerTag, MsgIds, FState) -> rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, FState). +cleanup_data_dir() -> + Names = [Name || #amqqueue{pid = {Name, _}, quorum_nodes = Nodes} + <- rabbit_amqqueue:list_by_type(quorum), + lists:member(node(), Nodes)], + Registered = ra_directory:list_registered(), + [maybe_delete_data_dir(UId) || {Name, UId} <- Registered, + not lists:member(Name, Names)], + ok. + +maybe_delete_data_dir(UId) -> + Dir = ra_env:server_data_dir(UId), + {ok, Config} = ra_log:read_config(Dir), + case maps:get(machine, Config) of + {module, rabbit_fifo, _} -> + ra_lib:recursive_delete(Dir), + ra_directory:unregister_name(UId); + _ -> + ok + end. + cluster_state(Name) -> case whereis(Name) of undefined -> down; diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index d983a9d396..6147215fbc 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -43,7 +43,8 @@ groups() -> delete_member_not_running, delete_member_classic, delete_member_not_found, - delete_member] + delete_member, + cleanup_data_dir] ++ all_tests()}, {cluster_size_3, [], [ declare_during_node_down, @@ -1586,6 +1587,34 @@ delete_member(Config) -> rpc:call(Server, rabbit_quorum_queue, delete_member, [<<"/">>, QQ, Server])). +cleanup_data_dir(Config) -> + %% This test is slow, but also checks that we handle properly errors when + %% trying to delete a queue in minority. A case clause there had gone + %% previously unnoticed. + + [Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + timer:sleep(100), + + [{_, UId}] = rpc:call(Server1, ra_directory, list_registered, []), + DataDir = rpc:call(Server1, ra_env, server_data_dir, [UId]), + ?assert(filelib:is_dir(DataDir)), + + ok = rabbit_ct_broker_helpers:stop_node(Config, Server2), + + ?assertExit({{shutdown, + {connection_closing, {server_initiated_close, 541, _}}}, _}, + amqp_channel:call(Ch, #'queue.delete'{queue = QQ})), + ?assert(filelib:is_dir(DataDir)), + + ?assertEqual(ok, + rpc:call(Server1, rabbit_quorum_queue, cleanup_data_dir, + [])), + ?assert(not filelib:is_dir(DataDir)). + basic_recover(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |
