diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2018-04-05 14:11:00 +0800 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2018-04-05 14:11:00 +0800 |
| commit | c5d656a74abc14a699886eaf2e3278695e7a550e (patch) | |
| tree | 18ead3671d765f4f7c754ce24efcd429131c05e4 /src | |
| parent | 16e52f5d635951bbfe49ef40562c6f01f4288e9d (diff) | |
| parent | dfa270677f6b5191f0dc994a571728a8bf93957f (diff) | |
| download | rabbitmq-server-git-c5d656a74abc14a699886eaf2e3278695e7a550e.tar.gz | |
Merge branch 'reduce-mnesia-contention-when-nodes-restart-v3.7.x' into reduce-mnesia-contention-when-nodes-restart-master
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 91 | ||||
| -rw-r--r-- | src/rabbit_binding.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 3 |
4 files changed, 65 insertions, 45 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b9f1ba3631..ac79d56358 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -963,12 +963,7 @@ resume(QPid, ChPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast, [{res internal_delete1(QueueName, OnlyDurable) -> ok = mnesia:delete({rabbit_queue, QueueName}), - %% this 'guarded' delete prevents unnecessary writes to the mnesia - %% disk log - case mnesia:wread({rabbit_durable_queue, QueueName}) of - [] -> ok; - [_] -> ok = mnesia:delete({rabbit_durable_queue, QueueName}) - end, + mnesia:delete({rabbit_durable_queue, QueueName}), %% we want to execute some things, as decided by rabbit_exchange, %% after the transaction. rabbit_binding:remove_for_destination(QueueName, OnlyDurable). @@ -1117,36 +1112,70 @@ maybe_clear_recoverable_node(Node, end. on_node_down(Node) -> - rabbit_misc:execute_mnesia_tx_with_tail( - fun () -> QsDels = - qlc:e(qlc:q([{QName, delete_queue(QName)} || - #amqqueue{name = QName, pid = Pid} = - Q <- mnesia:table(rabbit_queue), - node(Pid) == Node andalso - not rabbit_mnesia:is_process_alive(Pid) andalso - (not rabbit_amqqueue:is_mirrored(Q) orelse - rabbit_amqqueue:is_dead_exclusive(Q))])), - {Qs, Dels} = lists:unzip(QsDels), - T = rabbit_binding:process_deletions( - lists:foldl(fun rabbit_binding:combine_deletions/2, - rabbit_binding:new_deletions(), Dels), - ?INTERNAL_USER), - fun () -> - T(), - lists:foreach( - fun(QName) -> - rabbit_core_metrics:queue_deleted(QName), - ok = rabbit_event:notify(queue_deleted, - [{name, QName}, - {user, ?INTERNAL_USER}]) - end, Qs) - end - end). + {QueueNames, QueueDeletions} = delete_queues_on_node_down(Node), + notify_queue_binding_deletions(QueueDeletions), + rabbit_core_metrics:queues_deleted(QueueNames), + notify_queues_deleted(QueueNames), + ok. + +delete_queues_on_node_down(Node) -> + lists:unzip(lists:flatten([ + rabbit_misc:execute_mnesia_transaction( + fun () -> [{Queue, delete_queue(Queue)} || Queue <- Queues] end + ) || Queues <- partition_queues(queues_to_delete_when_node_down(Node)) + ])). delete_queue(QueueName) -> ok = mnesia:delete({rabbit_queue, QueueName}), rabbit_binding:remove_transient_for_destination(QueueName). +% If there are many queues and we delete them all in a single Mnesia transaction, +% this can block all other Mnesia operations for a really long time. +% In situations where a node wants to (re-)join a cluster, +% Mnesia won't be able to sync on the new node until this operation finishes. +% As a result, we want to have multiple Mnesia transactions so that other +% operations can make progress in between these queue delete transactions. +% +% 10 queues per Mnesia transaction is an arbitrary number, but it seems to work OK with 50k queues per node. +partition_queues([Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9 | T]) -> + [[Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9] | partition_queues(T)]; +partition_queues(T) -> + [T]. + +queues_to_delete_when_node_down(NodeDown) -> + rabbit_misc:execute_mnesia_transaction(fun () -> + qlc:e(qlc:q([QName || + #amqqueue{name = QName, pid = Pid} = Q <- mnesia:table(rabbit_queue), + node(Pid) == NodeDown andalso + not rabbit_mnesia:is_process_alive(Pid) andalso + (not rabbit_amqqueue:is_mirrored(Q) orelse + rabbit_amqqueue:is_dead_exclusive(Q))] + )) + end). + +notify_queue_binding_deletions(QueueDeletions) -> + rabbit_misc:execute_mnesia_tx_with_tail( + fun() -> + rabbit_binding:process_deletions( + lists:foldl( + fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), + QueueDeletions + ), + ?INTERNAL_USER + ) + end + ). + +notify_queues_deleted(QueueDeletions) -> + lists:foreach( + fun(Queue) -> + ok = rabbit_event:notify(queue_deleted, + [{name, Queue}, + {user, ?INTERNAL_USER}]) + end, + QueueDeletions). + pseudo_queue(QueueName, Pid) -> #amqqueue{name = QueueName, durable = false, diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index e4d9e64d6b..7498d6b765 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -331,13 +331,8 @@ binding_action(Binding = #binding{source = SrcName, Fun(Src, Dst, Binding#binding{args = SortedArgs}) end, ErrFun). -delete_object(Tab, Record, LockKind) -> - %% this 'guarded' delete prevents unnecessary writes to the mnesia - %% disk log - case mnesia:match_object(Tab, Record, LockKind) of - [] -> ok; - [_] -> mnesia:delete_object(Tab, Record, LockKind) - end. +delete_object(Table, Record, LockKind) -> + mnesia:delete_object(Table, Record, LockKind). sync_route(Route, true, true, Fun) -> ok = Fun(rabbit_durable_route, Route, write), diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index cc2797489d..c7a849ce2e 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -497,14 +497,9 @@ conditional_delete(X = #exchange{name = XName}, OnlyDurable) -> end. unconditional_delete(X = #exchange{name = XName}, OnlyDurable) -> - %% this 'guarded' delete prevents unnecessary writes to the mnesia - %% disk log - case mnesia:wread({rabbit_durable_exchange, XName}) of - [] -> ok; - [_] -> ok = mnesia:delete({rabbit_durable_exchange, XName}) - end, ok = mnesia:delete({rabbit_exchange, XName}), ok = mnesia:delete({rabbit_exchange_serial, XName}), + mnesia:delete({rabbit_durable_exchange, XName}), Bindings = rabbit_binding:remove_for_source(XName), {deleted, X, Bindings, rabbit_binding:remove_for_destination( XName, OnlyDurable)}. diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 565676af3c..4a362eb8a2 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -37,6 +37,7 @@ alive_nodes/1, alive_rabbit_nodes/1]). -define(SERVER, ?MODULE). +-define(NODE_REPLY_TIMEOUT, 5000). -define(RABBIT_UP_RPC_TIMEOUT, 2000). -define(RABBIT_DOWN_PING_INTERVAL, 1000). @@ -181,7 +182,7 @@ partitions() -> gen_server:call(?SERVER, partitions, infinity). partitions(Nodes) -> - {Replies, _} = gen_server:multi_call(Nodes, ?SERVER, partitions, infinity), + {Replies, _} = gen_server:multi_call(Nodes, ?SERVER, partitions, ?NODE_REPLY_TIMEOUT), Replies. status(Nodes) -> |
