summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2018-04-05 14:11:00 +0800
committerMichael Klishin <michael@clojurewerkz.org>2018-04-05 14:11:00 +0800
commitc5d656a74abc14a699886eaf2e3278695e7a550e (patch)
tree18ead3671d765f4f7c754ce24efcd429131c05e4 /src
parent16e52f5d635951bbfe49ef40562c6f01f4288e9d (diff)
parentdfa270677f6b5191f0dc994a571728a8bf93957f (diff)
downloadrabbitmq-server-git-c5d656a74abc14a699886eaf2e3278695e7a550e.tar.gz
Merge branch 'reduce-mnesia-contention-when-nodes-restart-v3.7.x' into reduce-mnesia-contention-when-nodes-restart-master
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl91
-rw-r--r--src/rabbit_binding.erl9
-rw-r--r--src/rabbit_exchange.erl7
-rw-r--r--src/rabbit_node_monitor.erl3
4 files changed, 65 insertions, 45 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b9f1ba3631..ac79d56358 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -963,12 +963,7 @@ resume(QPid, ChPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast, [{res
internal_delete1(QueueName, OnlyDurable) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
- %% this 'guarded' delete prevents unnecessary writes to the mnesia
- %% disk log
- case mnesia:wread({rabbit_durable_queue, QueueName}) of
- [] -> ok;
- [_] -> ok = mnesia:delete({rabbit_durable_queue, QueueName})
- end,
+ mnesia:delete({rabbit_durable_queue, QueueName}),
%% we want to execute some things, as decided by rabbit_exchange,
%% after the transaction.
rabbit_binding:remove_for_destination(QueueName, OnlyDurable).
@@ -1117,36 +1112,70 @@ maybe_clear_recoverable_node(Node,
end.
on_node_down(Node) ->
- rabbit_misc:execute_mnesia_tx_with_tail(
- fun () -> QsDels =
- qlc:e(qlc:q([{QName, delete_queue(QName)} ||
- #amqqueue{name = QName, pid = Pid} =
- Q <- mnesia:table(rabbit_queue),
- node(Pid) == Node andalso
- not rabbit_mnesia:is_process_alive(Pid) andalso
- (not rabbit_amqqueue:is_mirrored(Q) orelse
- rabbit_amqqueue:is_dead_exclusive(Q))])),
- {Qs, Dels} = lists:unzip(QsDels),
- T = rabbit_binding:process_deletions(
- lists:foldl(fun rabbit_binding:combine_deletions/2,
- rabbit_binding:new_deletions(), Dels),
- ?INTERNAL_USER),
- fun () ->
- T(),
- lists:foreach(
- fun(QName) ->
- rabbit_core_metrics:queue_deleted(QName),
- ok = rabbit_event:notify(queue_deleted,
- [{name, QName},
- {user, ?INTERNAL_USER}])
- end, Qs)
- end
- end).
+ {QueueNames, QueueDeletions} = delete_queues_on_node_down(Node),
+ notify_queue_binding_deletions(QueueDeletions),
+ rabbit_core_metrics:queues_deleted(QueueNames),
+ notify_queues_deleted(QueueNames),
+ ok.
+
+delete_queues_on_node_down(Node) ->
+ lists:unzip(lists:flatten([
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> [{Queue, delete_queue(Queue)} || Queue <- Queues] end
+ ) || Queues <- partition_queues(queues_to_delete_when_node_down(Node))
+ ])).
delete_queue(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
rabbit_binding:remove_transient_for_destination(QueueName).
+% If there are many queues and we delete them all in a single Mnesia transaction,
+% this can block all other Mnesia operations for a really long time.
+% In situations where a node wants to (re-)join a cluster,
+% Mnesia won't be able to sync on the new node until this operation finishes.
+% As a result, we want to have multiple Mnesia transactions so that other
+% operations can make progress in between these queue delete transactions.
+%
+% 10 queues per Mnesia transaction is an arbitrary number, but it seems to work OK with 50k queues per node.
+partition_queues([Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9 | T]) ->
+ [[Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9] | partition_queues(T)];
+partition_queues(T) ->
+ [T].
+
+queues_to_delete_when_node_down(NodeDown) ->
+ rabbit_misc:execute_mnesia_transaction(fun () ->
+ qlc:e(qlc:q([QName ||
+ #amqqueue{name = QName, pid = Pid} = Q <- mnesia:table(rabbit_queue),
+ node(Pid) == NodeDown andalso
+ not rabbit_mnesia:is_process_alive(Pid) andalso
+ (not rabbit_amqqueue:is_mirrored(Q) orelse
+ rabbit_amqqueue:is_dead_exclusive(Q))]
+ ))
+ end).
+
+notify_queue_binding_deletions(QueueDeletions) ->
+ rabbit_misc:execute_mnesia_tx_with_tail(
+ fun() ->
+ rabbit_binding:process_deletions(
+ lists:foldl(
+ fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(),
+ QueueDeletions
+ ),
+ ?INTERNAL_USER
+ )
+ end
+ ).
+
+notify_queues_deleted(QueueDeletions) ->
+ lists:foreach(
+ fun(Queue) ->
+ ok = rabbit_event:notify(queue_deleted,
+ [{name, Queue},
+ {user, ?INTERNAL_USER}])
+ end,
+ QueueDeletions).
+
pseudo_queue(QueueName, Pid) ->
#amqqueue{name = QueueName,
durable = false,
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index e4d9e64d6b..7498d6b765 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -331,13 +331,8 @@ binding_action(Binding = #binding{source = SrcName,
Fun(Src, Dst, Binding#binding{args = SortedArgs})
end, ErrFun).
-delete_object(Tab, Record, LockKind) ->
- %% this 'guarded' delete prevents unnecessary writes to the mnesia
- %% disk log
- case mnesia:match_object(Tab, Record, LockKind) of
- [] -> ok;
- [_] -> mnesia:delete_object(Tab, Record, LockKind)
- end.
+delete_object(Table, Record, LockKind) ->
+ mnesia:delete_object(Table, Record, LockKind).
sync_route(Route, true, true, Fun) ->
ok = Fun(rabbit_durable_route, Route, write),
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index cc2797489d..c7a849ce2e 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -497,14 +497,9 @@ conditional_delete(X = #exchange{name = XName}, OnlyDurable) ->
end.
unconditional_delete(X = #exchange{name = XName}, OnlyDurable) ->
- %% this 'guarded' delete prevents unnecessary writes to the mnesia
- %% disk log
- case mnesia:wread({rabbit_durable_exchange, XName}) of
- [] -> ok;
- [_] -> ok = mnesia:delete({rabbit_durable_exchange, XName})
- end,
ok = mnesia:delete({rabbit_exchange, XName}),
ok = mnesia:delete({rabbit_exchange_serial, XName}),
+ mnesia:delete({rabbit_durable_exchange, XName}),
Bindings = rabbit_binding:remove_for_source(XName),
{deleted, X, Bindings, rabbit_binding:remove_for_destination(
XName, OnlyDurable)}.
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 565676af3c..4a362eb8a2 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -37,6 +37,7 @@
alive_nodes/1, alive_rabbit_nodes/1]).
-define(SERVER, ?MODULE).
+-define(NODE_REPLY_TIMEOUT, 5000).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
-define(RABBIT_DOWN_PING_INTERVAL, 1000).
@@ -181,7 +182,7 @@ partitions() ->
gen_server:call(?SERVER, partitions, infinity).
partitions(Nodes) ->
- {Replies, _} = gen_server:multi_call(Nodes, ?SERVER, partitions, infinity),
+ {Replies, _} = gen_server:multi_call(Nodes, ?SERVER, partitions, ?NODE_REPLY_TIMEOUT),
Replies.
status(Nodes) ->