diff options
| -rw-r--r-- | src/rabbit_exchange_type_topic.erl | 98 |
1 files changed, 32 insertions, 66 deletions
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 066744e138..b35105ed74 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -64,19 +64,22 @@ add_binding(transaction, _Exchange, Binding) -> add_binding(none, _Exchange, _Binding) -> ok. -remove_bindings(transaction, #exchange{name = X}, Bs) -> - %% We need these locks to reduce the time complexity; without them - %% we end up with K*length(Bs) row-level locks, and inserting each - %% lock takes time proportional to the number of existing locks, - %% thus resulting in O(length(Bs)^2) complexity. And they need to - %% be write locks since ultimately we end up removing all these - %% rows. The downside of all this is that no other binding - %% operations except lookup/routing (which uses dirty ops) on - %% topic exchanges can take place concurrently. However, that is - %% the case for any bulk binding removal operations since the - %% removal of bindings from the rabbit_route etc table, which - %% precedes all this, calls match_object with a partial key, which - %% results in a table lock. +remove_bindings(transaction, _X, Bs) -> + %% We need to lock the tables we are operating on in order to + %% reduce the time complexity. Without the table locks we end up + %% with K*length(Bs) row-level locks. Inserting each lock takes + %% time proportional to the number of existing locks, thus + %% resulting in O(length(Bs)^2) complexity. + %% + %% The locks need to be write locks since ultimately we end up + %% removing all these rows. + %% + %% The downside of all this is that no other binding operations + %% except lookup/routing (which uses dirty ops) on topic exchanges + %% can take place concurrently. However, that is the case for any + %% bulk binding removal operations since the removal of bindings + %% from the rabbit_route etc table, which precedes all this, calls + %% match_object with a partial key, which results in a table lock. case Bs of [_] -> ok; _ -> [mnesia:lock({table, T}, write) || @@ -84,56 +87,16 @@ remove_bindings(transaction, #exchange{name = X}, Bs) -> rabbit_topic_trie_edge, rabbit_topic_trie_binding]] end, - %% The remove process is split into two distinct phases. In the - %% first phase we gather the lists of bindings and edges to - %% delete, then in the second phase we process all the - %% deletions. This is to prevent interleaving of read/write - %% operations in mnesia that can adversely affect performance. - {ToDelete, Paths} = - lists:foldl( - fun(#binding{source = S, key = K, destination = D}, {Acc, PathAcc}) -> - Path = [{FinalNode, _} | _] = - follow_down_get_path(S, split_topic_key(K)), - {[{FinalNode, D} | Acc], - decrement_bindings(X, Path, maybe_add_path(X, Path, PathAcc))} - end, {[], gb_trees:empty()}, Bs), - [trie_remove_binding(X, FinalNode, D) || {FinalNode, D} <- ToDelete], - [trie_remove_edge(X, Parent, Node, W) || - {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)], + [begin + Path = [{FinalNode, _} | _] = + follow_down_get_path(X, split_topic_key(K)), + trie_remove_binding(X, FinalNode, D), + remove_path_if_empty(X, Path) + end || #binding{source = X, key = K, destination = D} <- Bs], ok; remove_bindings(none, _X, _Bs) -> ok. -maybe_add_path(_X, [{root, none}], PathAcc) -> - PathAcc; -maybe_add_path(X, [{Node, W}, {Parent, _} | _], PathAcc) -> - case gb_trees:is_defined(Node, PathAcc) of - true -> PathAcc; - false -> gb_trees:insert(Node, {Parent, W, trie_node_counts(X, Node)}, - PathAcc) - end. - -decrement_bindings(X, Path, PathAcc) -> - with_path_acc(X, fun({Bindings, Edges}) -> {Bindings - 1, Edges} end, - Path, PathAcc). - -decrement_edges(X, Path, PathAcc) -> - with_path_acc(X, fun({Bindings, Edges}) -> {Bindings, Edges - 1} end, - Path, PathAcc). - -with_path_acc(_X, _Fun, [{root, none}], PathAcc) -> - PathAcc; -with_path_acc(X, Fun, [{Node, _} | ParentPath], PathAcc) -> - {Parent, W, Counts} = gb_trees:get(Node, PathAcc), - NewCounts = Fun(Counts), - NewPathAcc = gb_trees:update(Node, {Parent, W, NewCounts}, PathAcc), - case NewCounts of - {0, 0} -> decrement_edges(X, ParentPath, - maybe_add_path(X, ParentPath, NewPathAcc)); - _ -> NewPathAcc - end. - - assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). @@ -201,6 +164,16 @@ follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) -> error -> {error, Acc, Words} end. +remove_path_if_empty(_, [{root, none}]) -> + ok; +remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) -> + case mnesia:read(rabbit_topic_trie_node, + #trie_node{exchange_name = X, node_id = Node}, write) of + [] -> trie_remove_edge(X, Parent, Node, W), + remove_path_if_empty(X, RestPath); + _ -> ok + end. + trie_child(X, Node, Word) -> case mnesia:read({rabbit_topic_trie_edge, #trie_edge{exchange_name = X, @@ -267,13 +240,6 @@ trie_binding_op(X, Node, D, Op) -> destination = D}}, write). -trie_node_counts(X, Node) -> - [#topic_trie_node{edge_count = EC, binding_count = BC}] = - mnesia:read({rabbit_topic_trie_node, - #trie_node{exchange_name = X, - node_id = Node}}), - {BC, EC}. - trie_remove_all_nodes(X) -> remove_all(rabbit_topic_trie_node, #topic_trie_node{trie_node = #trie_node{exchange_name = X, |
