summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2018-08-16 13:04:40 +0100
committerDiana Corbacho <diana@rabbitmq.com>2018-08-16 13:04:40 +0100
commitf3b99954936897c2d4705bb3b46d6c03ca32983d (patch)
tree8d4f9691ddc4597fb4bf8087907deef52afe5507
parent7af8fd244c5d5059ac0b3d2931dcbc7210754d55 (diff)
downloadrabbitmq-server-git-f3b99954936897c2d4705bb3b46d6c03ca32983d.tar.gz
Increase performance of auto_delete queues deletion on channel down
Re-introduce previous delete guard for auto_delete queues. Undo performance enhancement introduced by #1513 for this use case. All other bulk deletions remain the same. [#159483364]
-rw-r--r--src/rabbit_amqqueue.erl20
-rw-r--r--src/rabbit_amqqueue_process.erl17
2 files changed, 31 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 56e8cfc618..5285689b58 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -47,7 +47,7 @@
%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2,
- emit_consumers_local/3]).
+ emit_consumers_local/3, internal_delete/3]).
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
@@ -969,13 +969,27 @@ notify_sent_queue_down(QPid) ->
resume(QPid, ChPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast, [{resume, ChPid}]}).
internal_delete1(QueueName, OnlyDurable) ->
+ internal_delete1(QueueName, OnlyDurable, normal).
+
+internal_delete1(QueueName, OnlyDurable, Reason) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
- mnesia:delete({rabbit_durable_queue, QueueName}),
+ case Reason of
+ auto_delete ->
+ case mnesia:wread({rabbit_durable_queue, QueueName}) of
+ [] -> ok;
+ [_] -> ok = mnesia:delete({rabbit_durable_queue, QueueName})
+ end;
+ _ ->
+ mnesia:delete({rabbit_durable_queue, QueueName})
+ end,
%% we want to execute some things, as decided by rabbit_exchange,
%% after the transaction.
rabbit_binding:remove_for_destination(QueueName, OnlyDurable).
internal_delete(QueueName, ActingUser) ->
+ internal_delete(QueueName, ActingUser, normal).
+
+internal_delete(QueueName, ActingUser, Reason) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case {mnesia:wread({rabbit_queue, QueueName}),
@@ -983,7 +997,7 @@ internal_delete(QueueName, ActingUser) ->
{[], []} ->
rabbit_misc:const({error, not_found});
_ ->
- Deletions = internal_delete1(QueueName, false),
+ Deletions = internal_delete1(QueueName, false, Reason),
T = rabbit_binding:process_deletions(Deletions,
?INTERNAL_USER),
fun() ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 11f272073b..2220c82b6b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -289,6 +289,13 @@ terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
terminate(normal, State) -> %% delete case
terminate_shutdown(terminate_delete(true, normal, State), State);
+terminate(auto_delete, State) -> %% auto_delete case
+ %% To increase performance we want to avoid a mnesia_sync:sync call
+ %% after every transaction, as we could be deleting simultaneously
+ %% thousands of queues. A optimisation introduced by server#1513
+ %% needs to be reverted by this case, avoiding to guard the delete
+ %% operation on `rabbit_durable_queue`
+ terminate_shutdown(terminate_delete(true, auto_delete, State), State);
%% If we crashed don't try to clean up the BQS, probably best to leave it.
terminate(_Reason, State = #q{q = Q}) ->
terminate_shutdown(fun (BQS) ->
@@ -300,12 +307,16 @@ terminate(_Reason, State = #q{q = Q}) ->
BQS
end, State).
-terminate_delete(EmitStats, Reason,
+terminate_delete(EmitStats, Reason0,
State = #q{q = #amqqueue{name = QName},
backing_queue = BQ,
status = Status}) ->
ActingUser = terminated_by(Status),
fun (BQS) ->
+ Reason = case Reason0 of
+ auto_delete -> normal;
+ Any -> Any
+ end,
BQS1 = BQ:delete_and_terminate(Reason, BQS),
if EmitStats -> rabbit_event:if_enabled(State, #q.stats_timer,
fun() -> emit_stats(State) end);
@@ -315,7 +326,7 @@ terminate_delete(EmitStats, Reason,
%% logged.
try
%% don't care if the internal delete doesn't return 'ok'.
- rabbit_amqqueue:internal_delete(QName, ActingUser)
+ rabbit_amqqueue:internal_delete(QName, ActingUser, Reason0)
catch
{error, Reason} -> error(Reason)
end,
@@ -1167,7 +1178,7 @@ handle_call({notify_down, ChPid}, _From, State) ->
%% gen_server2 *before* the reply is sent.
case handle_ch_down(ChPid, State) of
{ok, State1} -> reply(ok, State1);
- {stop, State1} -> stop(ok, State1)
+ {stop, State1} -> {stop, auto_delete, ok, State1}
end;
handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,