summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGerhard Lazu <gerhard@lazu.co.uk>2018-03-06 20:08:19 +0000
committerGerhard Lazu <gerhard@lazu.co.uk>2018-03-27 16:07:36 +0100
commiteab08df1ffd3ba21174fa688469b547649bc9db2 (patch)
tree6f8fb24822fedebef01f6f149b0995511be90d8b
parent4927aeb648be6ced9185d62f63ce1db58308bac6 (diff)
downloadrabbitmq-server-git-eab08df1ffd3ba21174fa688469b547649bc9db2.tar.gz
Delete metrics for all deleted queues in a single operation
Rather than calling `rabbit_core_metrics:delete_queue/1` for every queue, collect all deleted queues and delete all their metrics in a single operation. We don't use a single Mnesia transaction to delete all objects related to a queue and this may be a problem, but we haven't run this version of the code long enough to know for sure. What should we be looking out for @michaelklishin? For initial context, see #1513 Partner-in-crime: @essen
-rw-r--r--src/rabbit_amqqueue.erl87
1 files changed, 40 insertions, 47 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4019c53671..88d7c597f9 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -190,8 +190,6 @@
-spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'.
-spec on_node_up(node()) -> 'ok'.
-spec on_node_down(node()) -> 'ok'.
--spec queues_to_delete_from_node_down(node()) ->
- rabbit_misc:execute_mnesia_transaction(fun()).
-spec pseudo_queue(name(), pid()) -> rabbit_types:amqqueue().
-spec immutable(rabbit_types:amqqueue()) -> rabbit_types:amqqueue().
-spec store_queue(rabbit_types:amqqueue()) -> 'ok'.
@@ -1114,58 +1112,37 @@ maybe_clear_recoverable_node(Node,
end.
on_node_down(Node) ->
- % For each transaction:
- % * delete all queues in the transaction
- % * capture the result for every delete queue
- [
- rabbit_misc:execute_mnesia_tx_with_tail(
- fun () -> QueueDeletions = [delete_queue(Queue) || Queue <- Queues],
- NotifyBindingDeletions = rabbit_binding:process_deletions(
- lists:foldl(fun rabbit_binding:combine_deletions/2,
- rabbit_binding:new_deletions(), QueueDeletions),
- ?INTERNAL_USER),
- fun () ->
- NotifyBindingDeletions(),
- lists:foreach(
- fun(Queue) ->
- % When 40k queues are being deleted,
- % this results in a rabbit_node_monitor function that recurses for 30 minutes,
- % meaning that no information is available for the node (Management Overview doesn't update):
- %
- % {current_stacktrace,
- % [{rabbit_core_metrics,queue_deleted,1,
- % [{file,"src/rabbit_core_metrics.erl"},{line,235}]},
- % {rabbit_amqqueue,'-on_node_down/1-fun-1-',1,
- % [{file,"src/rabbit_amqqueue.erl"},{line,1094}]},
- % {lists,foreach,2,[{file,"lists.erl"},{line,1338}]},
- % {rabbit_amqqueue,'-on_node_down/1-lc$^0/1-0-',1,
- % [{file,"src/rabbit_amqqueue.erl"},{line,1084}]},
- % {rabbit_amqqueue,'-on_node_down/1-lc$^0/1-0-',1,
- % [{file,"src/rabbit_amqqueue.erl"},{line,1100}]},
- % {rabbit_node_monitor,handle_dead_rabbit,2,
- % [{file,"src/rabbit_node_monitor.erl"},{line,755}]},
- % {rabbit_node_monitor,handle_info,2,
- % [{file,"src/rabbit_node_monitor.erl"},{line,548}]}]}
- rabbit_core_metrics:queue_deleted(Queue),
- ok = rabbit_event:notify(queue_deleted,
- [{name, Queue},
- {user, ?INTERNAL_USER}])
- end, Queues)
- end
- end) || Queues <- partition_queues(queues_to_delete_from_node_down(Node))
- ],
+ {QueueNames, QueueDeletions} = delete_queues_on_node_down(Node),
+ notify_queue_binding_deletions(QueueDeletions),
+ rabbit_core_metrics:queues_deleted(QueueNames),
+ notify_queues_deleted(QueueNames),
ok.
-partition_queues([Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9 | T]) ->
- [[Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9] | partition_queues(T)];
-partition_queues(T) ->
- [T].
+delete_queues_on_node_down(Node) ->
+ lists:unzip(lists:flatten([
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> [{Queue, delete_queue(Queue)} || Queue <- Queues] end
+ ) || Queues <- partition_queues(queues_to_delete_when_node_down(Node))
+ ])).
delete_queue(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
rabbit_binding:remove_transient_for_destination(QueueName).
-queues_to_delete_from_node_down(NodeDown) ->
+% If there are many queues and we delete them all in a single Mnesia transaction,
+% this can block all other Mnesia operations for a really long time.
+% In situations where a node wants to (re-)join a cluster,
+% Mnesia won't be able to sync on the new node until this operation finishes.
+% As a result, we want to have multiple Mnesia transactions so that other
+% operations can make progress in between these queue delete transactions.
+%
+% 10 queues per Mnesia transaction is an arbitrary number, but it seems to work OK with 50k queues per node.
+partition_queues([Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9 | T]) ->
+ [[Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9] | partition_queues(T)];
+partition_queues(T) ->
+ [T].
+
+queues_to_delete_when_node_down(NodeDown) ->
rabbit_misc:execute_mnesia_transaction(fun () ->
qlc:e(qlc:q([QName ||
#amqqueue{name = QName, pid = Pid} = Q <- mnesia:table(rabbit_queue),
@@ -1176,6 +1153,22 @@ queues_to_delete_from_node_down(NodeDown) ->
))
end).
+notify_queue_binding_deletions(QueueDeletions) ->
+ NotifyBindingDeletions = rabbit_binding:process_deletions(
+ lists:foldl(fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(), QueueDeletions),
+ ?INTERNAL_USER),
+ NotifyBindingDeletions().
+
+notify_queues_deleted(QueueDeletions) ->
+ lists:foreach(
+ fun(Queue) ->
+ ok = rabbit_event:notify(queue_deleted,
+ [{name, Queue},
+ {user, ?INTERNAL_USER}])
+ end,
+ QueueDeletions).
+
pseudo_queue(QueueName, Pid) ->
#amqqueue{name = QueueName,
durable = false,