diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 84 |
1 files changed, 59 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d5c63aec64..d661be84b6 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -190,6 +190,8 @@ -spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'. -spec on_node_up(node()) -> 'ok'. -spec on_node_down(node()) -> 'ok'. +-spec queues_to_delete_from_node_down(node()) -> + rabbit_misc:execute_mnesia_transaction(fun()). -spec pseudo_queue(name(), pid()) -> rabbit_types:amqqueue(). -spec immutable(rabbit_types:amqqueue()) -> rabbit_types:amqqueue(). -spec store_queue(rabbit_types:amqqueue()) -> 'ok'. @@ -1112,36 +1114,68 @@ maybe_clear_recoverable_node(Node, end. on_node_down(Node) -> - rabbit_misc:execute_mnesia_tx_with_tail( - fun () -> QsDels = - qlc:e(qlc:q([{QName, delete_queue(QName)} || - #amqqueue{name = QName, pid = Pid} = - Q <- mnesia:table(rabbit_queue), - node(Pid) == Node andalso - not rabbit_mnesia:is_process_alive(Pid) andalso - (not rabbit_amqqueue:is_mirrored(Q) orelse - rabbit_amqqueue:is_dead_exclusive(Q))])), - {Qs, Dels} = lists:unzip(QsDels), - T = rabbit_binding:process_deletions( - lists:foldl(fun rabbit_binding:combine_deletions/2, - rabbit_binding:new_deletions(), Dels), - ?INTERNAL_USER), - fun () -> - T(), - lists:foreach( - fun(QName) -> - rabbit_core_metrics:queue_deleted(QName), - ok = rabbit_event:notify(queue_deleted, - [{name, QName}, - {user, ?INTERNAL_USER}]) - end, Qs) - end - end). + % Create 1 transaction per N queues that need to be deleted + % * 1 transaction for all queues might block everything for a really long time + % * ^^^ this is what used to happen before this change + % * 1 transaction per queue will result in too many transaction + % * ^^^ this is what happens now; it's not perfect, but it's a step in the right direction + % * ^^^ OPTIMISE THIS BEFORE MERGING ^^^ + % * Maybe 1 transaction for every 10 queues that need to be deleted ? + % + % For each transaction: + % * delete all queues in the transaction + % * capture the result for every delete queue + [ + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> Dels = [delete_queue(Q)], + T = rabbit_binding:process_deletions( + lists:foldl(fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), Dels)), + fun () -> + T(), + lists:foreach( + fun(QName) -> + % When 40k queues are being deleted, + % this results in a rabbit_node_monitor function that recurses for 30 minutes, + % meaning that no information is available for the node (Management Overview doesn't update): + % + % {current_stacktrace, + % [{rabbit_core_metrics,queue_deleted,1, + % [{file,"src/rabbit_core_metrics.erl"},{line,235}]}, + % {rabbit_amqqueue,'-on_node_down/1-fun-1-',1, + % [{file,"src/rabbit_amqqueue.erl"},{line,1094}]}, + % {lists,foreach,2,[{file,"lists.erl"},{line,1338}]}, + % {rabbit_amqqueue,'-on_node_down/1-lc$^0/1-0-',1, + % [{file,"src/rabbit_amqqueue.erl"},{line,1084}]}, + % {rabbit_amqqueue,'-on_node_down/1-lc$^0/1-0-',1, + % [{file,"src/rabbit_amqqueue.erl"},{line,1100}]}, + % {rabbit_node_monitor,handle_dead_rabbit,2, + % [{file,"src/rabbit_node_monitor.erl"},{line,755}]}, + % {rabbit_node_monitor,handle_info,2, + % [{file,"src/rabbit_node_monitor.erl"},{line,548}]}]} + rabbit_core_metrics:queue_deleted(QName), + ok = rabbit_event:notify(queue_deleted, + [{name, QName}]) + end, [Q]) + end + end) || Q <- queues_to_delete_from_node_down(Node) + ]. delete_queue(QueueName) -> ok = mnesia:delete({rabbit_queue, QueueName}), rabbit_binding:remove_transient_for_destination(QueueName). +queues_to_delete_from_node_down(NodeDown) -> + rabbit_misc:execute_mnesia_transaction(fun () -> + qlc:e(qlc:q([QName || + #amqqueue{name = QName, pid = Pid} = Q <- mnesia:table(rabbit_queue), + node(Pid) == NodeDown andalso + not rabbit_mnesia:is_process_alive(Pid) andalso + (not rabbit_amqqueue:is_mirrored(Q) orelse + rabbit_amqqueue:is_dead_exclusive(Q))] + )) + end). + pseudo_queue(QueueName, Pid) -> #amqqueue{name = QueueName, durable = false, |
