summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2018-11-06 13:54:47 +0000
committerDiana Corbacho <diana@rabbitmq.com>2018-11-06 13:54:47 +0000
commitee7cbf53e97e59596bd1c00fd75a9cd440a7b053 (patch)
tree603fe7b35a6b9f03d89f1d806d1523a0039cc611 /src
parentd098858a2152776ec316799d636a3ad24861fc64 (diff)
downloadrabbitmq-server-git-ee7cbf53e97e59596bd1c00fd75a9cd440a7b053.tar.gz
Cleanup data for missing quorum queues
[#161313536]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl11
-rw-r--r--src/rabbit_quorum_queue.erl21
2 files changed, 32 insertions, 0 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7c4386ba57..40b74c04ac 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -30,6 +30,7 @@
emit_info_all/5, list_local/1, info_local/1,
emit_info_local/4, emit_info_down/4]).
-export([list_down/1, count/1, list_names/0, list_local_names/0]).
+-export([list_by_type/1]).
-export([notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
-export([basic_get/6, basic_consume/12, basic_cancel/6, notify_decorators/1]).
@@ -124,6 +125,7 @@
-spec list(rabbit_types:vhost()) -> [rabbit_types:amqqueue()].
-spec list_names() -> [rabbit_amqqueue:name()].
-spec list_down(rabbit_types:vhost()) -> [rabbit_types:amqqueue()].
+-spec list_by_type(atom()) -> [rabbit_types:amqqueue()].
-spec info_keys() -> rabbit_types:info_keys().
-spec info(rabbit_types:amqqueue()) -> rabbit_types:infos().
-spec info(rabbit_types:amqqueue(), rabbit_types:info_keys()) ->
@@ -741,6 +743,15 @@ list_local_names() ->
[ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(),
State =/= crashed, is_local_to_node(QPid, node())].
+list_by_type(Type) ->
+ {atomic, Qs} =
+ mnesia:sync_transaction(
+ fun () ->
+ mnesia:match_object(rabbit_durable_queue,
+ #amqqueue{_ = '_', type = Type}, read)
+ end),
+ Qs.
+
list_local_followers() ->
[ Q#amqqueue.name
|| #amqqueue{state = State, type = quorum, pid = {_, Leader},
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 4823d4ef32..a438c4826e 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -34,6 +34,7 @@
-export([add_member/3]).
-export([delete_member/3]).
-export([requeue/3]).
+-export([cleanup_data_dir/0]).
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
@@ -389,6 +390,26 @@ purge(Node) ->
requeue(ConsumerTag, MsgIds, FState) ->
rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, FState).
+cleanup_data_dir() ->
+ Names = [Name || #amqqueue{pid = {Name, _}, quorum_nodes = Nodes}
+ <- rabbit_amqqueue:list_by_type(quorum),
+ lists:member(node(), Nodes)],
+ Registered = ra_directory:list_registered(),
+ [maybe_delete_data_dir(UId) || {Name, UId} <- Registered,
+ not lists:member(Name, Names)],
+ ok.
+
+maybe_delete_data_dir(UId) ->
+ Dir = ra_env:server_data_dir(UId),
+ {ok, Config} = ra_log:read_config(Dir),
+ case maps:get(machine, Config) of
+ {module, rabbit_fifo, _} ->
+ ra_lib:recursive_delete(Dir),
+ ra_directory:unregister_name(UId);
+ _ ->
+ ok
+ end.
+
cluster_state(Name) ->
case whereis(Name) of
undefined -> down;