diff options
| author | Michael Klishin <michael@novemberain.com> | 2018-03-28 21:33:50 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-03-28 21:33:50 +0300 |
| commit | 16e52f5d635951bbfe49ef40562c6f01f4288e9d (patch) | |
| tree | dbf4510d3836b6bbe94af54dd5e09206a9952ec5 /src | |
| parent | 2d28cb459e5bf30e7c72a4fffb889fbdb6ffbc77 (diff) | |
| parent | d04b7f180c4707f8f0b670a48a1b493f362305c6 (diff) | |
| download | rabbitmq-server-git-16e52f5d635951bbfe49ef40562c6f01f4288e9d.tar.gz | |
Merge pull request #1567 from rabbitmq/revert-1527-reduce-mnesia-contention-when-nodes-restart-master
Revert "Reduce lock contention when nodes restart (master)"
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 91 | ||||
| -rw-r--r-- | src/rabbit_binding.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 3 |
4 files changed, 45 insertions, 65 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ac79d56358..b9f1ba3631 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -963,7 +963,12 @@ resume(QPid, ChPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast, [{res internal_delete1(QueueName, OnlyDurable) -> ok = mnesia:delete({rabbit_queue, QueueName}), - mnesia:delete({rabbit_durable_queue, QueueName}), + %% this 'guarded' delete prevents unnecessary writes to the mnesia + %% disk log + case mnesia:wread({rabbit_durable_queue, QueueName}) of + [] -> ok; + [_] -> ok = mnesia:delete({rabbit_durable_queue, QueueName}) + end, %% we want to execute some things, as decided by rabbit_exchange, %% after the transaction. rabbit_binding:remove_for_destination(QueueName, OnlyDurable). @@ -1112,70 +1117,36 @@ maybe_clear_recoverable_node(Node, end. on_node_down(Node) -> - {QueueNames, QueueDeletions} = delete_queues_on_node_down(Node), - notify_queue_binding_deletions(QueueDeletions), - rabbit_core_metrics:queues_deleted(QueueNames), - notify_queues_deleted(QueueNames), - ok. - -delete_queues_on_node_down(Node) -> - lists:unzip(lists:flatten([ - rabbit_misc:execute_mnesia_transaction( - fun () -> [{Queue, delete_queue(Queue)} || Queue <- Queues] end - ) || Queues <- partition_queues(queues_to_delete_when_node_down(Node)) - ])). + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> QsDels = + qlc:e(qlc:q([{QName, delete_queue(QName)} || + #amqqueue{name = QName, pid = Pid} = + Q <- mnesia:table(rabbit_queue), + node(Pid) == Node andalso + not rabbit_mnesia:is_process_alive(Pid) andalso + (not rabbit_amqqueue:is_mirrored(Q) orelse + rabbit_amqqueue:is_dead_exclusive(Q))])), + {Qs, Dels} = lists:unzip(QsDels), + T = rabbit_binding:process_deletions( + lists:foldl(fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), Dels), + ?INTERNAL_USER), + fun () -> + T(), + lists:foreach( + fun(QName) -> + rabbit_core_metrics:queue_deleted(QName), + ok = rabbit_event:notify(queue_deleted, + [{name, QName}, + {user, ?INTERNAL_USER}]) + end, Qs) + end + end). delete_queue(QueueName) -> ok = mnesia:delete({rabbit_queue, QueueName}), rabbit_binding:remove_transient_for_destination(QueueName). -% If there are many queues and we delete them all in a single Mnesia transaction, -% this can block all other Mnesia operations for a really long time. -% In situations where a node wants to (re-)join a cluster, -% Mnesia won't be able to sync on the new node until this operation finishes. -% As a result, we want to have multiple Mnesia transactions so that other -% operations can make progress in between these queue delete transactions. -% -% 10 queues per Mnesia transaction is an arbitrary number, but it seems to work OK with 50k queues per node. -partition_queues([Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9 | T]) -> - [[Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9] | partition_queues(T)]; -partition_queues(T) -> - [T]. - -queues_to_delete_when_node_down(NodeDown) -> - rabbit_misc:execute_mnesia_transaction(fun () -> - qlc:e(qlc:q([QName || - #amqqueue{name = QName, pid = Pid} = Q <- mnesia:table(rabbit_queue), - node(Pid) == NodeDown andalso - not rabbit_mnesia:is_process_alive(Pid) andalso - (not rabbit_amqqueue:is_mirrored(Q) orelse - rabbit_amqqueue:is_dead_exclusive(Q))] - )) - end). - -notify_queue_binding_deletions(QueueDeletions) -> - rabbit_misc:execute_mnesia_tx_with_tail( - fun() -> - rabbit_binding:process_deletions( - lists:foldl( - fun rabbit_binding:combine_deletions/2, - rabbit_binding:new_deletions(), - QueueDeletions - ), - ?INTERNAL_USER - ) - end - ). - -notify_queues_deleted(QueueDeletions) -> - lists:foreach( - fun(Queue) -> - ok = rabbit_event:notify(queue_deleted, - [{name, Queue}, - {user, ?INTERNAL_USER}]) - end, - QueueDeletions). - pseudo_queue(QueueName, Pid) -> #amqqueue{name = QueueName, durable = false, diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 7498d6b765..e4d9e64d6b 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -331,8 +331,13 @@ binding_action(Binding = #binding{source = SrcName, Fun(Src, Dst, Binding#binding{args = SortedArgs}) end, ErrFun). -delete_object(Table, Record, LockKind) -> - mnesia:delete_object(Table, Record, LockKind). +delete_object(Tab, Record, LockKind) -> + %% this 'guarded' delete prevents unnecessary writes to the mnesia + %% disk log + case mnesia:match_object(Tab, Record, LockKind) of + [] -> ok; + [_] -> mnesia:delete_object(Tab, Record, LockKind) + end. sync_route(Route, true, true, Fun) -> ok = Fun(rabbit_durable_route, Route, write), diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index c7a849ce2e..cc2797489d 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -497,9 +497,14 @@ conditional_delete(X = #exchange{name = XName}, OnlyDurable) -> end. unconditional_delete(X = #exchange{name = XName}, OnlyDurable) -> + %% this 'guarded' delete prevents unnecessary writes to the mnesia + %% disk log + case mnesia:wread({rabbit_durable_exchange, XName}) of + [] -> ok; + [_] -> ok = mnesia:delete({rabbit_durable_exchange, XName}) + end, ok = mnesia:delete({rabbit_exchange, XName}), ok = mnesia:delete({rabbit_exchange_serial, XName}), - mnesia:delete({rabbit_durable_exchange, XName}), Bindings = rabbit_binding:remove_for_source(XName), {deleted, X, Bindings, rabbit_binding:remove_for_destination( XName, OnlyDurable)}. diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 4a362eb8a2..565676af3c 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -37,7 +37,6 @@ alive_nodes/1, alive_rabbit_nodes/1]). -define(SERVER, ?MODULE). --define(NODE_REPLY_TIMEOUT, 5000). -define(RABBIT_UP_RPC_TIMEOUT, 2000). -define(RABBIT_DOWN_PING_INTERVAL, 1000). @@ -182,7 +181,7 @@ partitions() -> gen_server:call(?SERVER, partitions, infinity). partitions(Nodes) -> - {Replies, _} = gen_server:multi_call(Nodes, ?SERVER, partitions, ?NODE_REPLY_TIMEOUT), + {Replies, _} = gen_server:multi_call(Nodes, ?SERVER, partitions, infinity), Replies. status(Nodes) -> |
