summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl19
-rw-r--r--src/rabbit_quorum_queue.erl19
-rw-r--r--src/rabbit_vhost.erl11
3 files changed, 38 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d5bfbbb5e3..093d118a6b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -44,6 +44,7 @@
-export([list_local_followers/0]).
-export([ensure_rabbit_queue_record_is_initialized/1]).
-export([format/1]).
+-export([delete_immediately_by_resource/1]).
-export([pid_of/1, pid_of/2]).
-export([mark_local_durable_queues_stopped/1]).
@@ -266,6 +267,13 @@ filter_per_type(Queues) ->
filter_pid_per_type(QPids) ->
lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids).
+filter_resource_per_type(Resources) ->
+ Queues = [begin
+ {ok, #amqqueue{pid = QPid}} = lookup(Resource),
+ {Resource, QPid}
+ end || Resource <- Resources],
+ lists:partition(fun({_Resource, QPid}) -> ?IS_CLASSIC(QPid) end, Queues).
+
stop(VHost) ->
%% Classic queues
ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
@@ -957,7 +965,16 @@ delete_exclusive(QPids, ConnId) ->
delete_immediately(QPids) ->
{Classic, Quorum} = filter_pid_per_type(QPids),
[gen_server2:cast(QPid, delete_immediately) || QPid <- Classic],
- [rabbit_quorum_queue:delete_immediately(QPid) || QPid <- Quorum],
+ case Quorum of
+ [] -> ok;
+ _ -> {error, cannot_delete_quorum_queues, Quorum}
+ end.
+
+delete_immediately_by_resource(Resources) ->
+ {Classic, Quorum} = filter_resource_per_type(Resources),
+ [gen_server2:cast(QPid, delete_immediately) || {_, QPid} <- Classic],
+ [rabbit_quorum_queue:delete_immediately(Resource, QPid)
+ || {Resource, QPid} <- Quorum],
ok.
delete(#amqqueue{ type = quorum} = Q,
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 57053081f1..bf4b432253 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -17,7 +17,7 @@
-module(rabbit_quorum_queue).
-export([init_state/2, handle_event/2]).
--export([declare/1, recover/1, stop/1, delete/4, delete_immediately/1]).
+-export([declare/1, recover/1, stop/1, delete/4, delete_immediately/2]).
-export([info/1, info/2, stat/1, infos/1]).
-export([ack/3, reject/4, basic_get/4, basic_consume/9, basic_cancel/4]).
-export([credit/4]).
@@ -75,7 +75,7 @@
-spec infos(rabbit_types:r('queue')) -> rabbit_types:infos().
-spec stat(rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}.
-spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'.
--spec status(rabbit_types:vhost(), Name :: atom()) -> rabbit_types:infos() | {error, term()}.
+-spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> rabbit_types:infos() | {error, term()}.
-define(STATISTICS_KEYS,
[policy,
@@ -94,15 +94,17 @@
%%----------------------------------------------------------------------------
-spec init_state(ra_server_id(), rabbit_types:r('queue')) ->
- rabbit_fifo_client:state().
+ rabbit_fifo_client:state().
init_state({Name, _}, QName) ->
{ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit),
+ %% This lookup could potentially return an {error, not_found}, but we do not
+ %% know what to do if the queue has `disappeared`. Let it crash.
{ok, #amqqueue{pid = Leader, quorum_nodes = Nodes}} =
rabbit_amqqueue:lookup(QName),
%% Ensure the leader is listed first
Servers0 = [{Name, N} || N <- Nodes],
Servers = [Leader | lists:delete(Leader, Servers0)],
- rabbit_fifo_client:init(QName, Servers, SoftLimit,
+ rabbit_fifo_client:init(qname_to_rname(QName), Servers, SoftLimit,
fun() -> credit_flow:block(Name), ok end,
fun() -> credit_flow:unblock(Name), ok end).
@@ -305,11 +307,10 @@ delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = Q
end
end.
-delete_immediately({Name, _} = QPid) ->
- QName = queue_name(Name),
- _ = rabbit_amqqueue:internal_delete(QName, ?INTERNAL_USER),
- ok = ra:delete_cluster([QPid]),
- rabbit_core_metrics:queue_deleted(QName),
+delete_immediately(Resource, {_Name, _} = QPid) ->
+ _ = rabbit_amqqueue:internal_delete(Resource, ?INTERNAL_USER),
+ {ok, _} = ra:delete_cluster([QPid]),
+ rabbit_core_metrics:queue_deleted(Resource),
ok.
ack(CTag, MsgIds, QState) ->
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index c460b02e5b..e462fc6bc0 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -208,7 +208,16 @@ delete_storage(VHost) ->
VhostDir = msg_store_dir_path(VHost),
rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]),
%% Message store should be closed when vhost supervisor is closed.
- ok = rabbit_file:recursive_delete([VhostDir]).
+ case rabbit_file:recursive_delete([VhostDir]) of
+ ok -> ok;
+ {error, {_, enoent}} ->
+ %% a concurrent delete did the job for us
+ rabbit_log:warning("Tried to delete storage directories for vhost '~s', it failed with an ENOENT", [VHost]),
+ ok;
+ Other ->
+ rabbit_log:warning("Tried to delete storage directories for vhost '~s': ~p", [VHost, Other]),
+ Other
+ end.
assert_benign(ok, _) -> ok;
assert_benign({ok, _}, _) -> ok;