summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGerhard Lazu <gerhard@lazu.co.uk>2018-02-22 16:33:46 +0000
committerGerhard Lazu <gerhard@lazu.co.uk>2018-03-27 16:07:35 +0100
commit52dba0346cf092d9f6160ab7776487ed24df408a (patch)
treed1d4f2ec9108831c12489448c9648c5b93437689
parent30a1a2f76bb21b598e1f3ce5e67725032cb38701 (diff)
downloadrabbitmq-server-git-52dba0346cf092d9f6160ab7776487ed24df408a.tar.gz
Split single Mnesia transaction that runs on_node_down
Rather than using a single Mnesia transaction to clean queues when a node goes down, split it into many transactions so that other Mnesia operations can make progress. This is especially important when nodes join the cluster, since Mnesia on the newly started node will not be able to synchronise if there is a long-running Mnesia transaction. This commit is not complete, we need feedback on the comments left in the code before we can settle on a final version that can be merged. For more context, see #1513 Partner-in-crime: @essen
-rw-r--r--src/rabbit_amqqueue.erl84
1 files changed, 59 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d5c63aec64..d661be84b6 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -190,6 +190,8 @@
-spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'.
-spec on_node_up(node()) -> 'ok'.
-spec on_node_down(node()) -> 'ok'.
+-spec queues_to_delete_from_node_down(node()) ->
+ rabbit_misc:execute_mnesia_transaction(fun()).
-spec pseudo_queue(name(), pid()) -> rabbit_types:amqqueue().
-spec immutable(rabbit_types:amqqueue()) -> rabbit_types:amqqueue().
-spec store_queue(rabbit_types:amqqueue()) -> 'ok'.
@@ -1112,36 +1114,68 @@ maybe_clear_recoverable_node(Node,
end.
on_node_down(Node) ->
- rabbit_misc:execute_mnesia_tx_with_tail(
- fun () -> QsDels =
- qlc:e(qlc:q([{QName, delete_queue(QName)} ||
- #amqqueue{name = QName, pid = Pid} =
- Q <- mnesia:table(rabbit_queue),
- node(Pid) == Node andalso
- not rabbit_mnesia:is_process_alive(Pid) andalso
- (not rabbit_amqqueue:is_mirrored(Q) orelse
- rabbit_amqqueue:is_dead_exclusive(Q))])),
- {Qs, Dels} = lists:unzip(QsDels),
- T = rabbit_binding:process_deletions(
- lists:foldl(fun rabbit_binding:combine_deletions/2,
- rabbit_binding:new_deletions(), Dels),
- ?INTERNAL_USER),
- fun () ->
- T(),
- lists:foreach(
- fun(QName) ->
- rabbit_core_metrics:queue_deleted(QName),
- ok = rabbit_event:notify(queue_deleted,
- [{name, QName},
- {user, ?INTERNAL_USER}])
- end, Qs)
- end
- end).
+ % Create 1 transaction per N queues that need to be deleted
+ % * 1 transaction for all queues might block everything for a really long time
+ % * ^^^ this is what used to happen before this change
+ % * 1 transaction per queue will result in too many transaction
+ % * ^^^ this is what happens now; it's not perfect, but it's a step in the right direction
+ % * ^^^ OPTIMISE THIS BEFORE MERGING ^^^
+ % * Maybe 1 transaction for every 10 queues that need to be deleted ?
+ %
+ % For each transaction:
+ % * delete all queues in the transaction
+ % * capture the result for every delete queue
+ [
+ rabbit_misc:execute_mnesia_tx_with_tail(
+ fun () -> Dels = [delete_queue(Q)],
+ T = rabbit_binding:process_deletions(
+ lists:foldl(fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(), Dels)),
+ fun () ->
+ T(),
+ lists:foreach(
+ fun(QName) ->
+ % When 40k queues are being deleted,
+ % this results in a rabbit_node_monitor function that recurses for 30 minutes,
+ % meaning that no information is available for the node (Management Overview doesn't update):
+ %
+ % {current_stacktrace,
+ % [{rabbit_core_metrics,queue_deleted,1,
+ % [{file,"src/rabbit_core_metrics.erl"},{line,235}]},
+ % {rabbit_amqqueue,'-on_node_down/1-fun-1-',1,
+ % [{file,"src/rabbit_amqqueue.erl"},{line,1094}]},
+ % {lists,foreach,2,[{file,"lists.erl"},{line,1338}]},
+ % {rabbit_amqqueue,'-on_node_down/1-lc$^0/1-0-',1,
+ % [{file,"src/rabbit_amqqueue.erl"},{line,1084}]},
+ % {rabbit_amqqueue,'-on_node_down/1-lc$^0/1-0-',1,
+ % [{file,"src/rabbit_amqqueue.erl"},{line,1100}]},
+ % {rabbit_node_monitor,handle_dead_rabbit,2,
+ % [{file,"src/rabbit_node_monitor.erl"},{line,755}]},
+ % {rabbit_node_monitor,handle_info,2,
+ % [{file,"src/rabbit_node_monitor.erl"},{line,548}]}]}
+ rabbit_core_metrics:queue_deleted(QName),
+ ok = rabbit_event:notify(queue_deleted,
+ [{name, QName}])
+ end, [Q])
+ end
+ end) || Q <- queues_to_delete_from_node_down(Node)
+ ].
delete_queue(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
rabbit_binding:remove_transient_for_destination(QueueName).
+queues_to_delete_from_node_down(NodeDown) ->
+ rabbit_misc:execute_mnesia_transaction(fun () ->
+ qlc:e(qlc:q([QName ||
+ #amqqueue{name = QName, pid = Pid} = Q <- mnesia:table(rabbit_queue),
+ node(Pid) == NodeDown andalso
+ not rabbit_mnesia:is_process_alive(Pid) andalso
+ (not rabbit_amqqueue:is_mirrored(Q) orelse
+ rabbit_amqqueue:is_dead_exclusive(Q))]
+ ))
+ end).
+
pseudo_queue(QueueName, Pid) ->
#amqqueue{name = QueueName,
durable = false,