summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2018-12-05 17:50:00 +0300
committerMichael Klishin <michael@clojurewerkz.org>2018-12-05 17:50:00 +0300
commit688858521fd08b1430625a85be24a0f7f3a32620 (patch)
treef494848672d900ddb19b430820caad456b1a68bf
parent1111ceeb7a0c85b25dd2e433435feb1d107fc811 (diff)
parentcb0429eab46f787f39e1872fc983f7d73abeb86b (diff)
downloadrabbitmq-server-git-688858521fd08b1430625a85be24a0f7f3a32620.tar.gz
Merge branch 'master' into ranch_proxy_header
-rw-r--r--src/rabbit_amqqueue.erl19
-rw-r--r--src/rabbit_quorum_queue.erl19
-rw-r--r--src/rabbit_vhost.erl11
-rw-r--r--test/quorum_queue_SUITE.erl44
4 files changed, 79 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d5bfbbb5e3..093d118a6b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -44,6 +44,7 @@
-export([list_local_followers/0]).
-export([ensure_rabbit_queue_record_is_initialized/1]).
-export([format/1]).
+-export([delete_immediately_by_resource/1]).
-export([pid_of/1, pid_of/2]).
-export([mark_local_durable_queues_stopped/1]).
@@ -266,6 +267,13 @@ filter_per_type(Queues) ->
filter_pid_per_type(QPids) ->
lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids).
+filter_resource_per_type(Resources) ->
+ Queues = [begin
+ {ok, #amqqueue{pid = QPid}} = lookup(Resource),
+ {Resource, QPid}
+ end || Resource <- Resources],
+ lists:partition(fun({_Resource, QPid}) -> ?IS_CLASSIC(QPid) end, Queues).
+
stop(VHost) ->
%% Classic queues
ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
@@ -957,7 +965,16 @@ delete_exclusive(QPids, ConnId) ->
delete_immediately(QPids) ->
{Classic, Quorum} = filter_pid_per_type(QPids),
[gen_server2:cast(QPid, delete_immediately) || QPid <- Classic],
- [rabbit_quorum_queue:delete_immediately(QPid) || QPid <- Quorum],
+ case Quorum of
+ [] -> ok;
+ _ -> {error, cannot_delete_quorum_queues, Quorum}
+ end.
+
+delete_immediately_by_resource(Resources) ->
+ {Classic, Quorum} = filter_resource_per_type(Resources),
+ [gen_server2:cast(QPid, delete_immediately) || {_, QPid} <- Classic],
+ [rabbit_quorum_queue:delete_immediately(Resource, QPid)
+ || {Resource, QPid} <- Quorum],
ok.
delete(#amqqueue{ type = quorum} = Q,
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 57053081f1..bf4b432253 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -17,7 +17,7 @@
-module(rabbit_quorum_queue).
-export([init_state/2, handle_event/2]).
--export([declare/1, recover/1, stop/1, delete/4, delete_immediately/1]).
+-export([declare/1, recover/1, stop/1, delete/4, delete_immediately/2]).
-export([info/1, info/2, stat/1, infos/1]).
-export([ack/3, reject/4, basic_get/4, basic_consume/9, basic_cancel/4]).
-export([credit/4]).
@@ -75,7 +75,7 @@
-spec infos(rabbit_types:r('queue')) -> rabbit_types:infos().
-spec stat(rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}.
-spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'.
--spec status(rabbit_types:vhost(), Name :: atom()) -> rabbit_types:infos() | {error, term()}.
+-spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> rabbit_types:infos() | {error, term()}.
-define(STATISTICS_KEYS,
[policy,
@@ -94,15 +94,17 @@
%%----------------------------------------------------------------------------
-spec init_state(ra_server_id(), rabbit_types:r('queue')) ->
- rabbit_fifo_client:state().
+ rabbit_fifo_client:state().
init_state({Name, _}, QName) ->
{ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit),
+ %% This lookup could potentially return an {error, not_found}, but we do not
+ %% know what to do if the queue has `disappeared`. Let it crash.
{ok, #amqqueue{pid = Leader, quorum_nodes = Nodes}} =
rabbit_amqqueue:lookup(QName),
%% Ensure the leader is listed first
Servers0 = [{Name, N} || N <- Nodes],
Servers = [Leader | lists:delete(Leader, Servers0)],
- rabbit_fifo_client:init(QName, Servers, SoftLimit,
+ rabbit_fifo_client:init(qname_to_rname(QName), Servers, SoftLimit,
fun() -> credit_flow:block(Name), ok end,
fun() -> credit_flow:unblock(Name), ok end).
@@ -305,11 +307,10 @@ delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = Q
end
end.
-delete_immediately({Name, _} = QPid) ->
- QName = queue_name(Name),
- _ = rabbit_amqqueue:internal_delete(QName, ?INTERNAL_USER),
- ok = ra:delete_cluster([QPid]),
- rabbit_core_metrics:queue_deleted(QName),
+delete_immediately(Resource, {_Name, _} = QPid) ->
+ _ = rabbit_amqqueue:internal_delete(Resource, ?INTERNAL_USER),
+ {ok, _} = ra:delete_cluster([QPid]),
+ rabbit_core_metrics:queue_deleted(Resource),
ok.
ack(CTag, MsgIds, QState) ->
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index c460b02e5b..e462fc6bc0 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -208,7 +208,16 @@ delete_storage(VHost) ->
VhostDir = msg_store_dir_path(VHost),
rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]),
%% Message store should be closed when vhost supervisor is closed.
- ok = rabbit_file:recursive_delete([VhostDir]).
+ case rabbit_file:recursive_delete([VhostDir]) of
+ ok -> ok;
+ {error, {_, enoent}} ->
+ %% a concurrent delete did the job for us
+ rabbit_log:warning("Tried to delete storage directories for vhost '~s', it failed with an ENOENT", [VHost]),
+ ok;
+ Other ->
+ rabbit_log:warning("Tried to delete storage directories for vhost '~s': ~p", [VHost, Other]),
+ Other
+ end.
assert_benign(ok, _) -> ok;
assert_benign({ok, _}, _) -> ok;
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 61fd7a1b2c..4190903409 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -116,6 +116,8 @@ all_tests() ->
basic_recover,
idempotent_recover,
vhost_with_quorum_queue_is_deleted,
+ delete_immediately,
+ delete_immediately_by_resource,
consume_redelivery_count,
subscribe_redelivery_count
].
@@ -1914,6 +1916,42 @@ basic_recover(Config) ->
wait_for_messages_ready(Servers, RaName, 1),
wait_for_messages_pending_ack(Servers, RaName, 0).
+delete_immediately(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}],
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, Args)),
+
+ Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."],
+ {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd),
+ ?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])),
+
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ amqp_channel:call(Ch, #'queue.declare'{queue = QQ,
+ durable = true,
+ passive = true,
+ auto_delete = false,
+ arguments = Args})).
+
+delete_immediately_by_resource(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ Cmd2 = ["eval", "rabbit_amqqueue:delete_immediately_by_resource([rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)])."],
+ ?assertEqual({ok, "ok\n"}, rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd2)),
+
+ %% Check that the application and process are down
+ wait_until(fun() ->
+ [] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
+ end),
+ ?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
+ rpc:call(Server, application, which_applications, []))).
+
subscribe_redelivery_count(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1967,7 +2005,6 @@ consume_redelivery_count(Config) ->
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
RaName = ra_name(QQ),
publish(Ch, QQ),
wait_for_messages_ready(Servers, RaName, 1),
@@ -1984,7 +2021,9 @@ consume_redelivery_count(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
requeue = true}),
-
+ %% wait for requeueing
+ timer:sleep(500),
+
{#'basic.get_ok'{delivery_tag = DeliveryTag1,
redelivered = true},
#amqp_msg{props = #'P_basic'{headers = H1}}} =
@@ -2005,7 +2044,6 @@ consume_redelivery_count(Config) ->
multiple = false,
requeue = true}).
-
%%----------------------------------------------------------------------------
declare(Ch, Q) ->