diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2018-11-06 13:54:47 +0000 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2018-11-06 13:54:47 +0000 |
| commit | ee7cbf53e97e59596bd1c00fd75a9cd440a7b053 (patch) | |
| tree | 603fe7b35a6b9f03d89f1d806d1523a0039cc611 /src | |
| parent | d098858a2152776ec316799d636a3ad24861fc64 (diff) | |
| download | rabbitmq-server-git-ee7cbf53e97e59596bd1c00fd75a9cd440a7b053.tar.gz | |
Cleanup data for missing quorum queues
[#161313536]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 21 |
2 files changed, 32 insertions, 0 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7c4386ba57..40b74c04ac 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -30,6 +30,7 @@ emit_info_all/5, list_local/1, info_local/1, emit_info_local/4, emit_info_down/4]). -export([list_down/1, count/1, list_names/0, list_local_names/0]). +-export([list_by_type/1]). -export([notify_policy_changed/1]). -export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]). -export([basic_get/6, basic_consume/12, basic_cancel/6, notify_decorators/1]). @@ -124,6 +125,7 @@ -spec list(rabbit_types:vhost()) -> [rabbit_types:amqqueue()]. -spec list_names() -> [rabbit_amqqueue:name()]. -spec list_down(rabbit_types:vhost()) -> [rabbit_types:amqqueue()]. +-spec list_by_type(atom()) -> [rabbit_types:amqqueue()]. -spec info_keys() -> rabbit_types:info_keys(). -spec info(rabbit_types:amqqueue()) -> rabbit_types:infos(). -spec info(rabbit_types:amqqueue(), rabbit_types:info_keys()) -> @@ -741,6 +743,15 @@ list_local_names() -> [ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(), State =/= crashed, is_local_to_node(QPid, node())]. +list_by_type(Type) -> + {atomic, Qs} = + mnesia:sync_transaction( + fun () -> + mnesia:match_object(rabbit_durable_queue, + #amqqueue{_ = '_', type = Type}, read) + end), + Qs. + list_local_followers() -> [ Q#amqqueue.name || #amqqueue{state = State, type = quorum, pid = {_, Leader}, diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 4823d4ef32..a438c4826e 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -34,6 +34,7 @@ -export([add_member/3]). -export([delete_member/3]). -export([requeue/3]). +-export([cleanup_data_dir/0]). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -389,6 +390,26 @@ purge(Node) -> requeue(ConsumerTag, MsgIds, FState) -> rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, FState). +cleanup_data_dir() -> + Names = [Name || #amqqueue{pid = {Name, _}, quorum_nodes = Nodes} + <- rabbit_amqqueue:list_by_type(quorum), + lists:member(node(), Nodes)], + Registered = ra_directory:list_registered(), + [maybe_delete_data_dir(UId) || {Name, UId} <- Registered, + not lists:member(Name, Names)], + ok. + +maybe_delete_data_dir(UId) -> + Dir = ra_env:server_data_dir(UId), + {ok, Config} = ra_log:read_config(Dir), + case maps:get(machine, Config) of + {module, rabbit_fifo, _} -> + ra_lib:recursive_delete(Dir), + ra_directory:unregister_name(UId); + _ -> + ok + end. + cluster_state(Name) -> case whereis(Name) of undefined -> down; |
