summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl84
1 files changed, 59 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d5c63aec64..d661be84b6 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -190,6 +190,8 @@
-spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'.
-spec on_node_up(node()) -> 'ok'.
-spec on_node_down(node()) -> 'ok'.
+-spec queues_to_delete_from_node_down(node()) ->
+ rabbit_misc:execute_mnesia_transaction(fun()).
-spec pseudo_queue(name(), pid()) -> rabbit_types:amqqueue().
-spec immutable(rabbit_types:amqqueue()) -> rabbit_types:amqqueue().
-spec store_queue(rabbit_types:amqqueue()) -> 'ok'.
@@ -1112,36 +1114,68 @@ maybe_clear_recoverable_node(Node,
end.
on_node_down(Node) ->
- rabbit_misc:execute_mnesia_tx_with_tail(
- fun () -> QsDels =
- qlc:e(qlc:q([{QName, delete_queue(QName)} ||
- #amqqueue{name = QName, pid = Pid} =
- Q <- mnesia:table(rabbit_queue),
- node(Pid) == Node andalso
- not rabbit_mnesia:is_process_alive(Pid) andalso
- (not rabbit_amqqueue:is_mirrored(Q) orelse
- rabbit_amqqueue:is_dead_exclusive(Q))])),
- {Qs, Dels} = lists:unzip(QsDels),
- T = rabbit_binding:process_deletions(
- lists:foldl(fun rabbit_binding:combine_deletions/2,
- rabbit_binding:new_deletions(), Dels),
- ?INTERNAL_USER),
- fun () ->
- T(),
- lists:foreach(
- fun(QName) ->
- rabbit_core_metrics:queue_deleted(QName),
- ok = rabbit_event:notify(queue_deleted,
- [{name, QName},
- {user, ?INTERNAL_USER}])
- end, Qs)
- end
- end).
+ % Create 1 transaction per N queues that need to be deleted
+ % * 1 transaction for all queues might block everything for a really long time
+ % * ^^^ this is what used to happen before this change
+ % * 1 transaction per queue will result in too many transaction
+ % * ^^^ this is what happens now; it's not perfect, but it's a step in the right direction
+ % * ^^^ OPTIMISE THIS BEFORE MERGING ^^^
+ % * Maybe 1 transaction for every 10 queues that need to be deleted ?
+ %
+ % For each transaction:
+ % * delete all queues in the transaction
+ % * capture the result for every delete queue
+ [
+ rabbit_misc:execute_mnesia_tx_with_tail(
+ fun () -> Dels = [delete_queue(Q)],
+ T = rabbit_binding:process_deletions(
+ lists:foldl(fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(), Dels)),
+ fun () ->
+ T(),
+ lists:foreach(
+ fun(QName) ->
+ % When 40k queues are being deleted,
+ % this results in a rabbit_node_monitor function that recurses for 30 minutes,
+ % meaning that no information is available for the node (Management Overview doesn't update):
+ %
+ % {current_stacktrace,
+ % [{rabbit_core_metrics,queue_deleted,1,
+ % [{file,"src/rabbit_core_metrics.erl"},{line,235}]},
+ % {rabbit_amqqueue,'-on_node_down/1-fun-1-',1,
+ % [{file,"src/rabbit_amqqueue.erl"},{line,1094}]},
+ % {lists,foreach,2,[{file,"lists.erl"},{line,1338}]},
+ % {rabbit_amqqueue,'-on_node_down/1-lc$^0/1-0-',1,
+ % [{file,"src/rabbit_amqqueue.erl"},{line,1084}]},
+ % {rabbit_amqqueue,'-on_node_down/1-lc$^0/1-0-',1,
+ % [{file,"src/rabbit_amqqueue.erl"},{line,1100}]},
+ % {rabbit_node_monitor,handle_dead_rabbit,2,
+ % [{file,"src/rabbit_node_monitor.erl"},{line,755}]},
+ % {rabbit_node_monitor,handle_info,2,
+ % [{file,"src/rabbit_node_monitor.erl"},{line,548}]}]}
+ rabbit_core_metrics:queue_deleted(QName),
+ ok = rabbit_event:notify(queue_deleted,
+ [{name, QName}])
+ end, [Q])
+ end
+ end) || Q <- queues_to_delete_from_node_down(Node)
+ ].
delete_queue(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
rabbit_binding:remove_transient_for_destination(QueueName).
+queues_to_delete_from_node_down(NodeDown) ->
+ rabbit_misc:execute_mnesia_transaction(fun () ->
+ qlc:e(qlc:q([QName ||
+ #amqqueue{name = QName, pid = Pid} = Q <- mnesia:table(rabbit_queue),
+ node(Pid) == NodeDown andalso
+ not rabbit_mnesia:is_process_alive(Pid) andalso
+ (not rabbit_amqqueue:is_mirrored(Q) orelse
+ rabbit_amqqueue:is_dead_exclusive(Q))]
+ ))
+ end).
+
pseudo_queue(QueueName, Pid) ->
#amqqueue{name = QueueName,
durable = false,