diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 11 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 44 |
4 files changed, 79 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d5bfbbb5e3..093d118a6b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -44,6 +44,7 @@ -export([list_local_followers/0]). -export([ensure_rabbit_queue_record_is_initialized/1]). -export([format/1]). +-export([delete_immediately_by_resource/1]). -export([pid_of/1, pid_of/2]). -export([mark_local_durable_queues_stopped/1]). @@ -266,6 +267,13 @@ filter_per_type(Queues) -> filter_pid_per_type(QPids) -> lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids). +filter_resource_per_type(Resources) -> + Queues = [begin + {ok, #amqqueue{pid = QPid}} = lookup(Resource), + {Resource, QPid} + end || Resource <- Resources], + lists:partition(fun({_Resource, QPid}) -> ?IS_CLASSIC(QPid) end, Queues). + stop(VHost) -> %% Classic queues ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost), @@ -957,7 +965,16 @@ delete_exclusive(QPids, ConnId) -> delete_immediately(QPids) -> {Classic, Quorum} = filter_pid_per_type(QPids), [gen_server2:cast(QPid, delete_immediately) || QPid <- Classic], - [rabbit_quorum_queue:delete_immediately(QPid) || QPid <- Quorum], + case Quorum of + [] -> ok; + _ -> {error, cannot_delete_quorum_queues, Quorum} + end. + +delete_immediately_by_resource(Resources) -> + {Classic, Quorum} = filter_resource_per_type(Resources), + [gen_server2:cast(QPid, delete_immediately) || {_, QPid} <- Classic], + [rabbit_quorum_queue:delete_immediately(Resource, QPid) + || {Resource, QPid} <- Quorum], ok. delete(#amqqueue{ type = quorum} = Q, diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 57053081f1..bf4b432253 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -17,7 +17,7 @@ -module(rabbit_quorum_queue). -export([init_state/2, handle_event/2]). --export([declare/1, recover/1, stop/1, delete/4, delete_immediately/1]). +-export([declare/1, recover/1, stop/1, delete/4, delete_immediately/2]). -export([info/1, info/2, stat/1, infos/1]). -export([ack/3, reject/4, basic_get/4, basic_consume/9, basic_cancel/4]). -export([credit/4]). @@ -75,7 +75,7 @@ -spec infos(rabbit_types:r('queue')) -> rabbit_types:infos(). -spec stat(rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}. -spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'. --spec status(rabbit_types:vhost(), Name :: atom()) -> rabbit_types:infos() | {error, term()}. +-spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> rabbit_types:infos() | {error, term()}. -define(STATISTICS_KEYS, [policy, @@ -94,15 +94,17 @@ %%---------------------------------------------------------------------------- -spec init_state(ra_server_id(), rabbit_types:r('queue')) -> - rabbit_fifo_client:state(). + rabbit_fifo_client:state(). init_state({Name, _}, QName) -> {ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit), + %% This lookup could potentially return an {error, not_found}, but we do not + %% know what to do if the queue has `disappeared`. Let it crash. {ok, #amqqueue{pid = Leader, quorum_nodes = Nodes}} = rabbit_amqqueue:lookup(QName), %% Ensure the leader is listed first Servers0 = [{Name, N} || N <- Nodes], Servers = [Leader | lists:delete(Leader, Servers0)], - rabbit_fifo_client:init(QName, Servers, SoftLimit, + rabbit_fifo_client:init(qname_to_rname(QName), Servers, SoftLimit, fun() -> credit_flow:block(Name), ok end, fun() -> credit_flow:unblock(Name), ok end). @@ -305,11 +307,10 @@ delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = Q end end. -delete_immediately({Name, _} = QPid) -> - QName = queue_name(Name), - _ = rabbit_amqqueue:internal_delete(QName, ?INTERNAL_USER), - ok = ra:delete_cluster([QPid]), - rabbit_core_metrics:queue_deleted(QName), +delete_immediately(Resource, {_Name, _} = QPid) -> + _ = rabbit_amqqueue:internal_delete(Resource, ?INTERNAL_USER), + {ok, _} = ra:delete_cluster([QPid]), + rabbit_core_metrics:queue_deleted(Resource), ok. ack(CTag, MsgIds, QState) -> diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index c460b02e5b..e462fc6bc0 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -208,7 +208,16 @@ delete_storage(VHost) -> VhostDir = msg_store_dir_path(VHost), rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]), %% Message store should be closed when vhost supervisor is closed. - ok = rabbit_file:recursive_delete([VhostDir]). + case rabbit_file:recursive_delete([VhostDir]) of + ok -> ok; + {error, {_, enoent}} -> + %% a concurrent delete did the job for us + rabbit_log:warning("Tried to delete storage directories for vhost '~s', it failed with an ENOENT", [VHost]), + ok; + Other -> + rabbit_log:warning("Tried to delete storage directories for vhost '~s': ~p", [VHost, Other]), + Other + end. assert_benign(ok, _) -> ok; assert_benign({ok, _}, _) -> ok; diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 61fd7a1b2c..4190903409 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -116,6 +116,8 @@ all_tests() -> basic_recover, idempotent_recover, vhost_with_quorum_queue_is_deleted, + delete_immediately, + delete_immediately_by_resource, consume_redelivery_count, subscribe_redelivery_count ]. @@ -1914,6 +1916,42 @@ basic_recover(Config) -> wait_for_messages_ready(Servers, RaName, 1), wait_for_messages_pending_ack(Servers, RaName, 0). +delete_immediately(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}], + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, Args)), + + Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."], + {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd), + ?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])), + + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + amqp_channel:call(Ch, #'queue.declare'{queue = QQ, + durable = true, + passive = true, + auto_delete = false, + arguments = Args})). + +delete_immediately_by_resource(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + Cmd2 = ["eval", "rabbit_amqqueue:delete_immediately_by_resource([rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)])."], + ?assertEqual({ok, "ok\n"}, rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd2)), + + %% Check that the application and process are down + wait_until(fun() -> + [] == rpc:call(Server, supervisor, which_children, [ra_server_sup]) + end), + ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, + rpc:call(Server, application, which_applications, []))). + subscribe_redelivery_count(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1967,7 +2005,6 @@ consume_redelivery_count(Config) -> QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - RaName = ra_name(QQ), publish(Ch, QQ), wait_for_messages_ready(Servers, RaName, 1), @@ -1984,7 +2021,9 @@ consume_redelivery_count(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = true}), - + %% wait for requeueing + timer:sleep(500), + {#'basic.get_ok'{delivery_tag = DeliveryTag1, redelivered = true}, #amqp_msg{props = #'P_basic'{headers = H1}}} = @@ -2005,7 +2044,6 @@ consume_redelivery_count(Config) -> multiple = false, requeue = true}). - %%---------------------------------------------------------------------------- declare(Ch, Q) -> |
