summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_exchange_type_topic.erl98
1 files changed, 32 insertions, 66 deletions
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 066744e138..b35105ed74 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -64,19 +64,22 @@ add_binding(transaction, _Exchange, Binding) ->
add_binding(none, _Exchange, _Binding) ->
ok.
-remove_bindings(transaction, #exchange{name = X}, Bs) ->
- %% We need these locks to reduce the time complexity; without them
- %% we end up with K*length(Bs) row-level locks, and inserting each
- %% lock takes time proportional to the number of existing locks,
- %% thus resulting in O(length(Bs)^2) complexity. And they need to
- %% be write locks since ultimately we end up removing all these
- %% rows. The downside of all this is that no other binding
- %% operations except lookup/routing (which uses dirty ops) on
- %% topic exchanges can take place concurrently. However, that is
- %% the case for any bulk binding removal operations since the
- %% removal of bindings from the rabbit_route etc table, which
- %% precedes all this, calls match_object with a partial key, which
- %% results in a table lock.
+remove_bindings(transaction, _X, Bs) ->
+ %% We need to lock the tables we are operating on in order to
+ %% reduce the time complexity. Without the table locks we end up
+ %% with K*length(Bs) row-level locks. Inserting each lock takes
+ %% time proportional to the number of existing locks, thus
+ %% resulting in O(length(Bs)^2) complexity.
+ %%
+ %% The locks need to be write locks since ultimately we end up
+ %% removing all these rows.
+ %%
+ %% The downside of all this is that no other binding operations
+ %% except lookup/routing (which uses dirty ops) on topic exchanges
+ %% can take place concurrently. However, that is the case for any
+ %% bulk binding removal operations since the removal of bindings
+ %% from the rabbit_route etc table, which precedes all this, calls
+ %% match_object with a partial key, which results in a table lock.
case Bs of
[_] -> ok;
_ -> [mnesia:lock({table, T}, write) ||
@@ -84,56 +87,16 @@ remove_bindings(transaction, #exchange{name = X}, Bs) ->
rabbit_topic_trie_edge,
rabbit_topic_trie_binding]]
end,
- %% The remove process is split into two distinct phases. In the
- %% first phase we gather the lists of bindings and edges to
- %% delete, then in the second phase we process all the
- %% deletions. This is to prevent interleaving of read/write
- %% operations in mnesia that can adversely affect performance.
- {ToDelete, Paths} =
- lists:foldl(
- fun(#binding{source = S, key = K, destination = D}, {Acc, PathAcc}) ->
- Path = [{FinalNode, _} | _] =
- follow_down_get_path(S, split_topic_key(K)),
- {[{FinalNode, D} | Acc],
- decrement_bindings(X, Path, maybe_add_path(X, Path, PathAcc))}
- end, {[], gb_trees:empty()}, Bs),
- [trie_remove_binding(X, FinalNode, D) || {FinalNode, D} <- ToDelete],
- [trie_remove_edge(X, Parent, Node, W) ||
- {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)],
+ [begin
+ Path = [{FinalNode, _} | _] =
+ follow_down_get_path(X, split_topic_key(K)),
+ trie_remove_binding(X, FinalNode, D),
+ remove_path_if_empty(X, Path)
+ end || #binding{source = X, key = K, destination = D} <- Bs],
ok;
remove_bindings(none, _X, _Bs) ->
ok.
-maybe_add_path(_X, [{root, none}], PathAcc) ->
- PathAcc;
-maybe_add_path(X, [{Node, W}, {Parent, _} | _], PathAcc) ->
- case gb_trees:is_defined(Node, PathAcc) of
- true -> PathAcc;
- false -> gb_trees:insert(Node, {Parent, W, trie_node_counts(X, Node)},
- PathAcc)
- end.
-
-decrement_bindings(X, Path, PathAcc) ->
- with_path_acc(X, fun({Bindings, Edges}) -> {Bindings - 1, Edges} end,
- Path, PathAcc).
-
-decrement_edges(X, Path, PathAcc) ->
- with_path_acc(X, fun({Bindings, Edges}) -> {Bindings, Edges - 1} end,
- Path, PathAcc).
-
-with_path_acc(_X, _Fun, [{root, none}], PathAcc) ->
- PathAcc;
-with_path_acc(X, Fun, [{Node, _} | ParentPath], PathAcc) ->
- {Parent, W, Counts} = gb_trees:get(Node, PathAcc),
- NewCounts = Fun(Counts),
- NewPathAcc = gb_trees:update(Node, {Parent, W, NewCounts}, PathAcc),
- case NewCounts of
- {0, 0} -> decrement_edges(X, ParentPath,
- maybe_add_path(X, ParentPath, NewPathAcc));
- _ -> NewPathAcc
- end.
-
-
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
@@ -201,6 +164,16 @@ follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) ->
error -> {error, Acc, Words}
end.
+remove_path_if_empty(_, [{root, none}]) ->
+ ok;
+remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) ->
+ case mnesia:read(rabbit_topic_trie_node,
+ #trie_node{exchange_name = X, node_id = Node}, write) of
+ [] -> trie_remove_edge(X, Parent, Node, W),
+ remove_path_if_empty(X, RestPath);
+ _ -> ok
+ end.
+
trie_child(X, Node, Word) ->
case mnesia:read({rabbit_topic_trie_edge,
#trie_edge{exchange_name = X,
@@ -267,13 +240,6 @@ trie_binding_op(X, Node, D, Op) ->
destination = D}},
write).
-trie_node_counts(X, Node) ->
- [#topic_trie_node{edge_count = EC, binding_count = BC}] =
- mnesia:read({rabbit_topic_trie_node,
- #trie_node{exchange_name = X,
- node_id = Node}}),
- {BC, EC}.
-
trie_remove_all_nodes(X) ->
remove_all(rabbit_topic_trie_node,
#topic_trie_node{trie_node = #trie_node{exchange_name = X,