summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl11
-rw-r--r--src/rabbit_quorum_queue.erl32
2 files changed, 39 insertions, 4 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7c4386ba57..40b74c04ac 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -30,6 +30,7 @@
emit_info_all/5, list_local/1, info_local/1,
emit_info_local/4, emit_info_down/4]).
-export([list_down/1, count/1, list_names/0, list_local_names/0]).
+-export([list_by_type/1]).
-export([notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
-export([basic_get/6, basic_consume/12, basic_cancel/6, notify_decorators/1]).
@@ -124,6 +125,7 @@
-spec list(rabbit_types:vhost()) -> [rabbit_types:amqqueue()].
-spec list_names() -> [rabbit_amqqueue:name()].
-spec list_down(rabbit_types:vhost()) -> [rabbit_types:amqqueue()].
+-spec list_by_type(atom()) -> [rabbit_types:amqqueue()].
-spec info_keys() -> rabbit_types:info_keys().
-spec info(rabbit_types:amqqueue()) -> rabbit_types:infos().
-spec info(rabbit_types:amqqueue(), rabbit_types:info_keys()) ->
@@ -741,6 +743,15 @@ list_local_names() ->
[ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(),
State =/= crashed, is_local_to_node(QPid, node())].
+list_by_type(Type) ->
+ {atomic, Qs} =
+ mnesia:sync_transaction(
+ fun () ->
+ mnesia:match_object(rabbit_durable_queue,
+ #amqqueue{_ = '_', type = Type}, read)
+ end),
+ Qs.
+
list_local_followers() ->
[ Q#amqqueue.name
|| #amqqueue{state = State, type = quorum, pid = {_, Leader},
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 795465855b..311894dee6 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -34,6 +34,7 @@
-export([add_member/3]).
-export([delete_member/3]).
-export([requeue/3]).
+-export([cleanup_data_dir/0]).
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
@@ -240,7 +241,7 @@ recover(Queues) ->
case ra:start_server(Name, {Name, node()}, Machine, RaNodes) of
ok -> ok;
Err ->
- rabbit_log:warning("recover: Quorum queue ~w could not"
+ rabbit_log:warning("recover: quorum queue ~w could not"
" be started ~w", [Name, Err]),
ok
end;
@@ -251,7 +252,7 @@ recover(Queues) ->
ok;
Err ->
%% catch all clause to avoid causing the vhost not to start
- rabbit_log:warning("recover: Quorum queue ~w could not be "
+ rabbit_log:warning("recover: quorum queue ~w could not be "
"restarted ~w", [Name, Err]),
ok
end,
@@ -281,7 +282,7 @@ delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = Q
end,
rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName]),
{ok, Msgs};
- {error, {no_more_nodes_to_try, Errs}} = Err ->
+ {error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
(_) -> false
end, Errs) of
@@ -291,7 +292,10 @@ delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = Q
rabbit_core_metrics:queue_deleted(QName),
{ok, Msgs};
false ->
- Err
+ rabbit_misc:protocol_error(
+ internal_error,
+ "Cannot delete quorum queue '~s', not enough nodes online to reach a quorum: ~255p",
+ [rabbit_misc:rs(QName), Errs])
end
end.
@@ -386,6 +390,26 @@ purge(Node) ->
requeue(ConsumerTag, MsgIds, FState) ->
rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, FState).
+cleanup_data_dir() ->
+ Names = [Name || #amqqueue{pid = {Name, _}, quorum_nodes = Nodes}
+ <- rabbit_amqqueue:list_by_type(quorum),
+ lists:member(node(), Nodes)],
+ Registered = ra_directory:list_registered(),
+ [maybe_delete_data_dir(UId) || {Name, UId} <- Registered,
+ not lists:member(Name, Names)],
+ ok.
+
+maybe_delete_data_dir(UId) ->
+ Dir = ra_env:server_data_dir(UId),
+ {ok, Config} = ra_log:read_config(Dir),
+ case maps:get(machine, Config) of
+ {module, rabbit_fifo, _} ->
+ ra_lib:recursive_delete(Dir),
+ ra_directory:unregister_name(UId);
+ _ ->
+ ok
+ end.
+
cluster_state(Name) ->
case whereis(Name) of
undefined -> down;