summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2018-12-03 14:54:34 +0000
committerDiana Corbacho <diana@rabbitmq.com>2018-12-03 14:54:34 +0000
commitcddcdba48dacd8cc4e20a1b11d8df8b156a9878a (patch)
treeb6e88be82dd5529be857b8d6dbea12f5bbb0460e
parent08121376120e3ec936344bf39d80aa9448218d80 (diff)
downloadrabbitmq-server-git-cddcdba48dacd8cc4e20a1b11d8df8b156a9878a.tar.gz
Implement delete_immediately_by_resource
delete_immediately doesn't work for qq as we cannot easily retrieve resource name from qpid. Instead, delete_immediately_by_resource receives a resource record which should make it easier for both users and developers. Usage: `rabbitmqctl eval 'rabbit_amqqueue:delete_immediately([rabbit_misc:r(<<"VHOST_NAME">>, queue, <<"QUEUE_NAME">>)]).'`
-rw-r--r--src/rabbit_amqqueue.erl19
-rw-r--r--src/rabbit_quorum_queue.erl11
-rw-r--r--test/quorum_queue_SUITE.erl43
3 files changed, 65 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 89913ba408..5127280871 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -44,6 +44,7 @@
-export([list_local_followers/0]).
-export([ensure_rabbit_queue_record_is_initialized/1]).
-export([format/1]).
+-export([delete_immediately_by_resource/1]).
-export([pid_of/1, pid_of/2]).
-export([mark_local_durable_queues_stopped/1]).
@@ -266,6 +267,13 @@ filter_per_type(Queues) ->
filter_pid_per_type(QPids) ->
lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids).
+filter_resource_per_type(Resources) ->
+ Queues = [begin
+ {ok, #amqqueue{pid = QPid}} = lookup(Resource),
+ {Resource, QPid}
+ end || Resource <- Resources],
+ lists:partition(fun({Resource, QPid}) -> ?IS_CLASSIC(QPid) end, Queues).
+
stop(VHost) ->
%% Classic queues
ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
@@ -954,7 +962,16 @@ delete_exclusive(QPids, ConnId) ->
delete_immediately(QPids) ->
{Classic, Quorum} = filter_pid_per_type(QPids),
[gen_server2:cast(QPid, delete_immediately) || QPid <- Classic],
- [rabbit_quorum_queue:delete_immediately(QPid) || QPid <- Quorum],
+ case Quorum of
+ [] -> ok;
+ _ -> {error, cannot_delete_quorum_queues, Quorum}
+ end.
+
+delete_immediately_by_resource(Resources) ->
+ {Classic, Quorum} = filter_resource_per_type(Resources),
+ [gen_server2:cast(QPid, delete_immediately) || {_, QPid} <- Classic],
+ [rabbit_quorum_queue:delete_immediately(Resource, QPid)
+ || {Resource, QPid} <- Quorum],
ok.
delete(#amqqueue{ type = quorum} = Q,
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 6de9ed7818..895288672a 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -17,7 +17,7 @@
-module(rabbit_quorum_queue).
-export([init_state/2, handle_event/2]).
--export([declare/1, recover/1, stop/1, delete/4, delete_immediately/1]).
+-export([declare/1, recover/1, stop/1, delete/4, delete_immediately/2]).
-export([info/1, info/2, stat/1, infos/1]).
-export([ack/3, reject/4, basic_get/4, basic_consume/9, basic_cancel/4]).
-export([credit/4]).
@@ -300,11 +300,10 @@ delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = Q
end
end.
-delete_immediately({Name, _} = QPid) ->
- QName = queue_name(Name),
- _ = rabbit_amqqueue:internal_delete(QName, ?INTERNAL_USER),
- ok = ra:delete_cluster([QPid]),
- rabbit_core_metrics:queue_deleted(QName),
+delete_immediately(Resource, {Name, _} = QPid) ->
+ _ = rabbit_amqqueue:internal_delete(Resource, ?INTERNAL_USER),
+ {ok, _} = ra:delete_cluster([QPid]),
+ rabbit_core_metrics:queue_deleted(Resource),
ok.
ack(CTag, MsgIds, QState) ->
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 19f3a66a1d..531b645774 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -108,7 +108,9 @@ all_tests() ->
cancel_sync_queue,
basic_recover,
idempotent_recover,
- vhost_with_quorum_queue_is_deleted
+ vhost_with_quorum_queue_is_deleted,
+ delete_immediately,
+ delete_immediately_by_resource
].
%% -------------------------------------------------------------------
@@ -1633,6 +1635,45 @@ basic_recover(Config) ->
amqp_channel:cast(Ch, #'basic.recover'{requeue = true}),
wait_for_messages_ready(Servers, RaName, 1),
wait_for_messages_pending_ack(Servers, RaName, 0).
+
+delete_immediately(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}],
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, Args)),
+
+ Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."],
+ {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd),
+ ?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])),
+
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ amqp_channel:call(Ch, #'queue.declare'{queue = QQ,
+ durable = true,
+ passive = true,
+ auto_delete = false,
+ arguments = Args})).
+
+delete_immediately_by_resource(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ Cmd2 = ["eval", "rabbit_amqqueue:delete_immediately_by_resource([rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)])."],
+ ?assertEqual({ok, "ok\n"}, rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd2)),
+
+ %% Check that the application and process are down
+ wait_until(fun() ->
+ [] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
+ end),
+ ?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
+ rpc:call(Server, application, which_applications, []))).
+
%%----------------------------------------------------------------------------
declare(Ch, Q) ->