summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2018-03-28 21:33:50 +0300
committerGitHub <noreply@github.com>2018-03-28 21:33:50 +0300
commit16e52f5d635951bbfe49ef40562c6f01f4288e9d (patch)
treedbf4510d3836b6bbe94af54dd5e09206a9952ec5
parent2d28cb459e5bf30e7c72a4fffb889fbdb6ffbc77 (diff)
parentd04b7f180c4707f8f0b670a48a1b493f362305c6 (diff)
downloadrabbitmq-server-git-16e52f5d635951bbfe49ef40562c6f01f4288e9d.tar.gz
Merge pull request #1567 from rabbitmq/revert-1527-reduce-mnesia-contention-when-nodes-restart-master
Revert "Reduce lock contention when nodes restart (master)"
-rw-r--r--src/rabbit_amqqueue.erl91
-rw-r--r--src/rabbit_binding.erl9
-rw-r--r--src/rabbit_exchange.erl7
-rw-r--r--src/rabbit_node_monitor.erl3
4 files changed, 45 insertions, 65 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ac79d56358..b9f1ba3631 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -963,7 +963,12 @@ resume(QPid, ChPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast, [{res
internal_delete1(QueueName, OnlyDurable) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
- mnesia:delete({rabbit_durable_queue, QueueName}),
+ %% this 'guarded' delete prevents unnecessary writes to the mnesia
+ %% disk log
+ case mnesia:wread({rabbit_durable_queue, QueueName}) of
+ [] -> ok;
+ [_] -> ok = mnesia:delete({rabbit_durable_queue, QueueName})
+ end,
%% we want to execute some things, as decided by rabbit_exchange,
%% after the transaction.
rabbit_binding:remove_for_destination(QueueName, OnlyDurable).
@@ -1112,70 +1117,36 @@ maybe_clear_recoverable_node(Node,
end.
on_node_down(Node) ->
- {QueueNames, QueueDeletions} = delete_queues_on_node_down(Node),
- notify_queue_binding_deletions(QueueDeletions),
- rabbit_core_metrics:queues_deleted(QueueNames),
- notify_queues_deleted(QueueNames),
- ok.
-
-delete_queues_on_node_down(Node) ->
- lists:unzip(lists:flatten([
- rabbit_misc:execute_mnesia_transaction(
- fun () -> [{Queue, delete_queue(Queue)} || Queue <- Queues] end
- ) || Queues <- partition_queues(queues_to_delete_when_node_down(Node))
- ])).
+ rabbit_misc:execute_mnesia_tx_with_tail(
+ fun () -> QsDels =
+ qlc:e(qlc:q([{QName, delete_queue(QName)} ||
+ #amqqueue{name = QName, pid = Pid} =
+ Q <- mnesia:table(rabbit_queue),
+ node(Pid) == Node andalso
+ not rabbit_mnesia:is_process_alive(Pid) andalso
+ (not rabbit_amqqueue:is_mirrored(Q) orelse
+ rabbit_amqqueue:is_dead_exclusive(Q))])),
+ {Qs, Dels} = lists:unzip(QsDels),
+ T = rabbit_binding:process_deletions(
+ lists:foldl(fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(), Dels),
+ ?INTERNAL_USER),
+ fun () ->
+ T(),
+ lists:foreach(
+ fun(QName) ->
+ rabbit_core_metrics:queue_deleted(QName),
+ ok = rabbit_event:notify(queue_deleted,
+ [{name, QName},
+ {user, ?INTERNAL_USER}])
+ end, Qs)
+ end
+ end).
delete_queue(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
rabbit_binding:remove_transient_for_destination(QueueName).
-% If there are many queues and we delete them all in a single Mnesia transaction,
-% this can block all other Mnesia operations for a really long time.
-% In situations where a node wants to (re-)join a cluster,
-% Mnesia won't be able to sync on the new node until this operation finishes.
-% As a result, we want to have multiple Mnesia transactions so that other
-% operations can make progress in between these queue delete transactions.
-%
-% 10 queues per Mnesia transaction is an arbitrary number, but it seems to work OK with 50k queues per node.
-partition_queues([Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9 | T]) ->
- [[Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9] | partition_queues(T)];
-partition_queues(T) ->
- [T].
-
-queues_to_delete_when_node_down(NodeDown) ->
- rabbit_misc:execute_mnesia_transaction(fun () ->
- qlc:e(qlc:q([QName ||
- #amqqueue{name = QName, pid = Pid} = Q <- mnesia:table(rabbit_queue),
- node(Pid) == NodeDown andalso
- not rabbit_mnesia:is_process_alive(Pid) andalso
- (not rabbit_amqqueue:is_mirrored(Q) orelse
- rabbit_amqqueue:is_dead_exclusive(Q))]
- ))
- end).
-
-notify_queue_binding_deletions(QueueDeletions) ->
- rabbit_misc:execute_mnesia_tx_with_tail(
- fun() ->
- rabbit_binding:process_deletions(
- lists:foldl(
- fun rabbit_binding:combine_deletions/2,
- rabbit_binding:new_deletions(),
- QueueDeletions
- ),
- ?INTERNAL_USER
- )
- end
- ).
-
-notify_queues_deleted(QueueDeletions) ->
- lists:foreach(
- fun(Queue) ->
- ok = rabbit_event:notify(queue_deleted,
- [{name, Queue},
- {user, ?INTERNAL_USER}])
- end,
- QueueDeletions).
-
pseudo_queue(QueueName, Pid) ->
#amqqueue{name = QueueName,
durable = false,
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 7498d6b765..e4d9e64d6b 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -331,8 +331,13 @@ binding_action(Binding = #binding{source = SrcName,
Fun(Src, Dst, Binding#binding{args = SortedArgs})
end, ErrFun).
-delete_object(Table, Record, LockKind) ->
- mnesia:delete_object(Table, Record, LockKind).
+delete_object(Tab, Record, LockKind) ->
+ %% this 'guarded' delete prevents unnecessary writes to the mnesia
+ %% disk log
+ case mnesia:match_object(Tab, Record, LockKind) of
+ [] -> ok;
+ [_] -> mnesia:delete_object(Tab, Record, LockKind)
+ end.
sync_route(Route, true, true, Fun) ->
ok = Fun(rabbit_durable_route, Route, write),
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index c7a849ce2e..cc2797489d 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -497,9 +497,14 @@ conditional_delete(X = #exchange{name = XName}, OnlyDurable) ->
end.
unconditional_delete(X = #exchange{name = XName}, OnlyDurable) ->
+ %% this 'guarded' delete prevents unnecessary writes to the mnesia
+ %% disk log
+ case mnesia:wread({rabbit_durable_exchange, XName}) of
+ [] -> ok;
+ [_] -> ok = mnesia:delete({rabbit_durable_exchange, XName})
+ end,
ok = mnesia:delete({rabbit_exchange, XName}),
ok = mnesia:delete({rabbit_exchange_serial, XName}),
- mnesia:delete({rabbit_durable_exchange, XName}),
Bindings = rabbit_binding:remove_for_source(XName),
{deleted, X, Bindings, rabbit_binding:remove_for_destination(
XName, OnlyDurable)}.
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 4a362eb8a2..565676af3c 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -37,7 +37,6 @@
alive_nodes/1, alive_rabbit_nodes/1]).
-define(SERVER, ?MODULE).
--define(NODE_REPLY_TIMEOUT, 5000).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
-define(RABBIT_DOWN_PING_INTERVAL, 1000).
@@ -182,7 +181,7 @@ partitions() ->
gen_server:call(?SERVER, partitions, infinity).
partitions(Nodes) ->
- {Replies, _} = gen_server:multi_call(Nodes, ?SERVER, partitions, ?NODE_REPLY_TIMEOUT),
+ {Replies, _} = gen_server:multi_call(Nodes, ?SERVER, partitions, infinity),
Replies.
status(Nodes) ->