summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2018-04-05 14:11:00 +0800
committerMichael Klishin <michael@clojurewerkz.org>2018-04-05 14:11:00 +0800
commitc5d656a74abc14a699886eaf2e3278695e7a550e (patch)
tree18ead3671d765f4f7c754ce24efcd429131c05e4
parent16e52f5d635951bbfe49ef40562c6f01f4288e9d (diff)
parentdfa270677f6b5191f0dc994a571728a8bf93957f (diff)
downloadrabbitmq-server-git-c5d656a74abc14a699886eaf2e3278695e7a550e.tar.gz
Merge branch 'reduce-mnesia-contention-when-nodes-restart-v3.7.x' into reduce-mnesia-contention-when-nodes-restart-master
-rw-r--r--src/rabbit_amqqueue.erl91
-rw-r--r--src/rabbit_binding.erl9
-rw-r--r--src/rabbit_exchange.erl7
-rw-r--r--src/rabbit_node_monitor.erl3
-rw-r--r--test/clustering_management_SUITE.erl26
5 files changed, 90 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b9f1ba3631..ac79d56358 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -963,12 +963,7 @@ resume(QPid, ChPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast, [{res
internal_delete1(QueueName, OnlyDurable) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
- %% this 'guarded' delete prevents unnecessary writes to the mnesia
- %% disk log
- case mnesia:wread({rabbit_durable_queue, QueueName}) of
- [] -> ok;
- [_] -> ok = mnesia:delete({rabbit_durable_queue, QueueName})
- end,
+ mnesia:delete({rabbit_durable_queue, QueueName}),
%% we want to execute some things, as decided by rabbit_exchange,
%% after the transaction.
rabbit_binding:remove_for_destination(QueueName, OnlyDurable).
@@ -1117,36 +1112,70 @@ maybe_clear_recoverable_node(Node,
end.
on_node_down(Node) ->
- rabbit_misc:execute_mnesia_tx_with_tail(
- fun () -> QsDels =
- qlc:e(qlc:q([{QName, delete_queue(QName)} ||
- #amqqueue{name = QName, pid = Pid} =
- Q <- mnesia:table(rabbit_queue),
- node(Pid) == Node andalso
- not rabbit_mnesia:is_process_alive(Pid) andalso
- (not rabbit_amqqueue:is_mirrored(Q) orelse
- rabbit_amqqueue:is_dead_exclusive(Q))])),
- {Qs, Dels} = lists:unzip(QsDels),
- T = rabbit_binding:process_deletions(
- lists:foldl(fun rabbit_binding:combine_deletions/2,
- rabbit_binding:new_deletions(), Dels),
- ?INTERNAL_USER),
- fun () ->
- T(),
- lists:foreach(
- fun(QName) ->
- rabbit_core_metrics:queue_deleted(QName),
- ok = rabbit_event:notify(queue_deleted,
- [{name, QName},
- {user, ?INTERNAL_USER}])
- end, Qs)
- end
- end).
+ {QueueNames, QueueDeletions} = delete_queues_on_node_down(Node),
+ notify_queue_binding_deletions(QueueDeletions),
+ rabbit_core_metrics:queues_deleted(QueueNames),
+ notify_queues_deleted(QueueNames),
+ ok.
+
+delete_queues_on_node_down(Node) ->
+ lists:unzip(lists:flatten([
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> [{Queue, delete_queue(Queue)} || Queue <- Queues] end
+ ) || Queues <- partition_queues(queues_to_delete_when_node_down(Node))
+ ])).
delete_queue(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
rabbit_binding:remove_transient_for_destination(QueueName).
+% If there are many queues and we delete them all in a single Mnesia transaction,
+% this can block all other Mnesia operations for a really long time.
+% In situations where a node wants to (re-)join a cluster,
+% Mnesia won't be able to sync on the new node until this operation finishes.
+% As a result, we want to have multiple Mnesia transactions so that other
+% operations can make progress in between these queue delete transactions.
+%
+% 10 queues per Mnesia transaction is an arbitrary number, but it seems to work OK with 50k queues per node.
+partition_queues([Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9 | T]) ->
+ [[Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9] | partition_queues(T)];
+partition_queues(T) ->
+ [T].
+
+queues_to_delete_when_node_down(NodeDown) ->
+ rabbit_misc:execute_mnesia_transaction(fun () ->
+ qlc:e(qlc:q([QName ||
+ #amqqueue{name = QName, pid = Pid} = Q <- mnesia:table(rabbit_queue),
+ node(Pid) == NodeDown andalso
+ not rabbit_mnesia:is_process_alive(Pid) andalso
+ (not rabbit_amqqueue:is_mirrored(Q) orelse
+ rabbit_amqqueue:is_dead_exclusive(Q))]
+ ))
+ end).
+
+notify_queue_binding_deletions(QueueDeletions) ->
+ rabbit_misc:execute_mnesia_tx_with_tail(
+ fun() ->
+ rabbit_binding:process_deletions(
+ lists:foldl(
+ fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(),
+ QueueDeletions
+ ),
+ ?INTERNAL_USER
+ )
+ end
+ ).
+
+notify_queues_deleted(QueueDeletions) ->
+ lists:foreach(
+ fun(Queue) ->
+ ok = rabbit_event:notify(queue_deleted,
+ [{name, Queue},
+ {user, ?INTERNAL_USER}])
+ end,
+ QueueDeletions).
+
pseudo_queue(QueueName, Pid) ->
#amqqueue{name = QueueName,
durable = false,
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index e4d9e64d6b..7498d6b765 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -331,13 +331,8 @@ binding_action(Binding = #binding{source = SrcName,
Fun(Src, Dst, Binding#binding{args = SortedArgs})
end, ErrFun).
-delete_object(Tab, Record, LockKind) ->
- %% this 'guarded' delete prevents unnecessary writes to the mnesia
- %% disk log
- case mnesia:match_object(Tab, Record, LockKind) of
- [] -> ok;
- [_] -> mnesia:delete_object(Tab, Record, LockKind)
- end.
+delete_object(Table, Record, LockKind) ->
+ mnesia:delete_object(Table, Record, LockKind).
sync_route(Route, true, true, Fun) ->
ok = Fun(rabbit_durable_route, Route, write),
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index cc2797489d..c7a849ce2e 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -497,14 +497,9 @@ conditional_delete(X = #exchange{name = XName}, OnlyDurable) ->
end.
unconditional_delete(X = #exchange{name = XName}, OnlyDurable) ->
- %% this 'guarded' delete prevents unnecessary writes to the mnesia
- %% disk log
- case mnesia:wread({rabbit_durable_exchange, XName}) of
- [] -> ok;
- [_] -> ok = mnesia:delete({rabbit_durable_exchange, XName})
- end,
ok = mnesia:delete({rabbit_exchange, XName}),
ok = mnesia:delete({rabbit_exchange_serial, XName}),
+ mnesia:delete({rabbit_durable_exchange, XName}),
Bindings = rabbit_binding:remove_for_source(XName),
{deleted, X, Bindings, rabbit_binding:remove_for_destination(
XName, OnlyDurable)}.
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 565676af3c..4a362eb8a2 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -37,6 +37,7 @@
alive_nodes/1, alive_rabbit_nodes/1]).
-define(SERVER, ?MODULE).
+-define(NODE_REPLY_TIMEOUT, 5000).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
-define(RABBIT_DOWN_PING_INTERVAL, 1000).
@@ -181,7 +182,7 @@ partitions() ->
gen_server:call(?SERVER, partitions, infinity).
partitions(Nodes) ->
- {Replies, _} = gen_server:multi_call(Nodes, ?SERVER, partitions, infinity),
+ {Replies, _} = gen_server:multi_call(Nodes, ?SERVER, partitions, ?NODE_REPLY_TIMEOUT),
Replies.
status(Nodes) ->
diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl
index e04fff2182..e9f52de3c8 100644
--- a/test/clustering_management_SUITE.erl
+++ b/test/clustering_management_SUITE.erl
@@ -53,7 +53,8 @@ groups() ->
forget_offline_removes_things,
force_boot,
status_with_alarm,
- wait_fails_when_cluster_fails
+ wait_fails_when_cluster_fails,
+ concurrent_default_data_creation
]},
{cluster_size_4, [], [
forget_promotes_offline_slave
@@ -636,6 +637,29 @@ wait_fails_when_cluster_fails(Config) ->
{error, _, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, Rabbit,
["wait", RabbitPidFile]).
+concurrent_default_data_creation(Config) ->
+ [Rabbit, Hare] = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+ %% Run multiple times to detect a race.
+ %% This test simulates concurrent initialisation of several key node DB tables.
+ %% Since this is node-local state, in practice this can only
+ %% happen when a new cluster is formed and two nodes are booting
+ %% at roughly the same time (say, within a couple of ms from each other).
+ [concurrent_default_data_creation1(Rabbit, Hare) || _ <- lists:seq(1, 20)].
+
+concurrent_default_data_creation1(Rabbit, Hare) ->
+ %% Clear default data.
+ [{atomic, ok} = rpc:call(Rabbit, mnesia, clear_table, [Tab])
+ || Tab <- [rabbit_user, rabbit_user_permission, rabbit_vhost]],
+ %% Stop both nodes
+ [ok = rpc:call(Node, rabbit, stop, []) || Node <- [Rabbit, Hare]],
+ %% Start nodes in parallel
+ [spawn(fun() -> rpc:call(Node, rabbit, start, []) end)
+ || Node <- [Rabbit, Hare]],
+ %% Verify both nodes are started successfully
+ [ok = rpc:call(Node, rabbit, await_startup, [Node]) || Node <- [Rabbit, Hare]],
+ [{ok, _Pid} = rpc:call(Node, rabbit_vhost_sup_sup, get_vhost_sup, [<<"/">>]) || Node <- [Rabbit, Hare]].
+
%% ----------------------------------------------------------------------------
%% Internal utils
%% ----------------------------------------------------------------------------