summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_exchange_type_topic.erl149
-rw-r--r--src/rabbit_tests.erl76
2 files changed, 121 insertions, 104 deletions
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 2da3f3eec4..2e181f1da7 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -48,27 +48,28 @@ validate(_X) -> ok.
create(_Tx, _X) -> ok.
recover(_X, _Bs) -> ok.
-delete(_Tx, #exchange{name = X}, _Bs) ->
- rabbit_misc:execute_mnesia_transaction(fun () -> trie_remove_all_edges(X),
- trie_remove_all_bindings(X)
- end),
+delete(true, #exchange{name = X}, _Bs) ->
+ trie_remove_all_edges(X),
+ trie_remove_all_bindings(X),
+ ok;
+delete(false, _Exchange, _Bs) ->
ok.
-add_binding(_Tx, _Exchange, #binding{source = X, key = K, destination = D}) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () -> FinalNode = follow_down_create(X, split_topic_key(K)),
- trie_add_binding(X, FinalNode, D)
- end),
+add_binding(true, _Exchange, #binding{source = X, key = K, destination = D}) ->
+ FinalNode = follow_down_create(X, split_topic_key(K)),
+ trie_add_binding(X, FinalNode, D),
+ ok;
+add_binding(false, _Exchange, _Binding) ->
ok.
-remove_bindings(_Tx, _X, Bs) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () -> lists:foreach(fun remove_binding/1, Bs) end),
+remove_bindings(true, _X, Bs) ->
+ lists:foreach(fun remove_binding/1, Bs),
+ ok;
+remove_bindings(false, _X, _Bs) ->
ok.
remove_binding(#binding{source = X, key = K, destination = D}) ->
- Path = follow_down_get_path(X, split_topic_key(K)),
- {FinalNode, _} = hd(Path),
+ Path = [{FinalNode, _} | _] = follow_down_get_path(X, split_topic_key(K)),
trie_remove_binding(X, FinalNode, D),
remove_path_if_empty(X, Path),
ok.
@@ -108,19 +109,8 @@ trie_match_skip_any(X, Node, []) ->
trie_match_skip_any(X, Node, [_ | RestW] = Words) ->
trie_match(X, Node, Words) ++ trie_match_skip_any(X, Node, RestW).
-follow_down(X, Words) ->
- follow_down(X, root, Words).
-
-follow_down(_X, CurNode, []) ->
- {ok, CurNode};
-follow_down(X, CurNode, [W | RestW]) ->
- case trie_child(X, CurNode, W) of
- {ok, NextNode} -> follow_down(X, NextNode, RestW);
- error -> {error, CurNode, [W | RestW]}
- end.
-
follow_down_create(X, Words) ->
- case follow_down(X, Words) of
+ case follow_down_last_node(X, Words) of
{ok, FinalNode} -> FinalNode;
{error, Node, RestW} -> lists:foldl(
fun (W, CurNode) ->
@@ -130,14 +120,26 @@ follow_down_create(X, Words) ->
end, Node, RestW)
end.
+follow_down_last_node(X, Words) ->
+ follow_down(X, fun (_, Node, _) -> Node end, root, Words).
+
follow_down_get_path(X, Words) ->
- follow_down_get_path(X, root, Words, [{root, none}]).
+ {ok, Path} =
+ follow_down(X, fun (W, Node, PathAcc) -> [{Node, W} | PathAcc] end,
+ [{root, none}], Words),
+ Path.
+
+follow_down(X, AccFun, Acc0, Words) ->
+ follow_down(X, root, AccFun, Acc0, Words).
-follow_down_get_path(_, _, [], PathAcc) ->
- PathAcc;
-follow_down_get_path(X, CurNode, [W | RestW], PathAcc) ->
- {ok, NextNode} = trie_child(X, CurNode, W),
- follow_down_get_path(X, NextNode, RestW, [{NextNode, W} | PathAcc]).
+follow_down(_X, _CurNode, _AccFun, Acc, []) ->
+ {ok, Acc};
+follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) ->
+ case trie_child(X, CurNode, W) of
+ {ok, NextNode} -> follow_down(X, NextNode, AccFun,
+ AccFun(W, NextNode, Acc), RestW);
+ error -> {error, Acc, Words}
+ end.
remove_path_if_empty(_, [{root, none}]) ->
ok;
@@ -149,9 +151,10 @@ remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) ->
end.
trie_child(X, Node, Word) ->
- case mnesia:read(rabbit_topic_trie_edge, #trie_edge{exchange_name = X,
- node_id = Node,
- word = Word}) of
+ case mnesia:read(rabbit_topic_trie_edge,
+ #trie_edge{exchange_name = X,
+ node_id = Node,
+ word = Word}) of
[#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode};
[] -> error
end.
@@ -159,8 +162,8 @@ trie_child(X, Node, Word) ->
trie_bindings(X, Node) ->
MatchHead = #topic_trie_binding{
trie_binding = #trie_binding{exchange_name = X,
- node_id = Node,
- destination = '$1'}},
+ node_id = Node,
+ destination = '$1'}},
mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]).
trie_add_edge(X, FromNode, ToNode, W) ->
@@ -172,9 +175,9 @@ trie_remove_edge(X, FromNode, ToNode, W) ->
trie_edge_op(X, FromNode, ToNode, W, Op) ->
ok = Op(rabbit_topic_trie_edge,
#topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
- node_id = FromNode,
- word = W},
- node_id = ToNode},
+ node_id = FromNode,
+ word = W},
+ node_id = ToNode},
write).
trie_add_binding(X, Node, D) ->
@@ -185,28 +188,41 @@ trie_remove_binding(X, Node, D) ->
trie_binding_op(X, Node, D, Op) ->
ok = Op(rabbit_topic_trie_binding,
- #topic_trie_binding{trie_binding = #trie_binding{exchange_name = X,
- node_id = Node,
- destination = D}},
+ #topic_trie_binding{
+ trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ destination = D}},
write).
trie_has_any_children(X, Node) ->
- MatchHead = #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
- node_id = Node,
- _='_'},
- _='_'},
- Select = mnesia:select(rabbit_topic_trie_edge,
- [{MatchHead, [], ['$_']}], 1, read),
- select_while_no_result(Select) /= '$end_of_table'.
+ has_any(rabbit_topic_trie_edge,
+ #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
+ node_id = Node,
+ _ = '_'},
+ _ = '_'}).
trie_has_any_bindings(X, Node) ->
- MatchHead = #topic_trie_binding{
- trie_binding = #trie_binding{exchange_name = X,
- node_id = Node,
- _='_'},
- _='_'},
- Select = mnesia:select(rabbit_topic_trie_binding,
- [{MatchHead, [], ['$_']}], 1, read),
+ has_any(rabbit_topic_trie_binding,
+ #topic_trie_binding{
+ trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ _ = '_'},
+ _ = '_'}).
+
+trie_remove_all_edges(X) ->
+ remove_all(rabbit_topic_trie_edge,
+ #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
+ _ = '_'},
+ _ = '_'}).
+
+trie_remove_all_bindings(X) ->
+ remove_all(rabbit_topic_trie_binding,
+ #topic_trie_binding{
+ trie_binding = #trie_binding{exchange_name = X, _ = '_'},
+ _ = '_'}).
+
+has_any(Table, MatchHead) ->
+ Select = mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read),
select_while_no_result(Select) /= '$end_of_table'.
select_while_no_result({[], Cont}) ->
@@ -214,21 +230,9 @@ select_while_no_result({[], Cont}) ->
select_while_no_result(Other) ->
Other.
-trie_remove_all_edges(X) ->
- Pattern = #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
- _='_'},
- _='_'},
- lists:foreach(
- fun (R) -> mnesia:delete_object(rabbit_topic_trie_edge, R, write) end,
- mnesia:match_object(rabbit_topic_trie_edge, Pattern, write)).
-
-trie_remove_all_bindings(X) ->
- Pattern = #topic_trie_binding{trie_binding = #trie_binding{exchange_name =X,
- _='_'},
- _='_'},
- lists:foreach(
- fun (R) -> mnesia:delete_object(rabbit_topic_trie_binding, R, write) end,
- mnesia:match_object(rabbit_topic_trie_binding, Pattern, write)).
+remove_all(Table, Pattern) ->
+ lists:foreach(fun (R) -> mnesia:delete_object(Table, R, write) end,
+ mnesia:match_object(Table, Pattern, write)).
new_node_id() ->
rabbit_guid:guid().
@@ -244,3 +248,4 @@ split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) ->
split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]);
split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) ->
split_topic_key(Rest, [C | RevWordAcc], RevResAcc).
+
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b80f369238..32cdaa52f1 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -588,7 +588,7 @@ test_topic_matching() ->
auto_delete = false, arguments = []},
%% create
rabbit_exchange_type_topic:validate(X),
- rabbit_exchange_type_topic:create(X),
+ exchange_op_callback(X, create, []),
%% add some bindings
Bindings = lists:map(
@@ -624,60 +624,72 @@ test_topic_matching() ->
{"#.#.#", "t24"},
{"*", "t25"},
{"#.b.#", "t26"}]),
- lists:foreach(fun (B) -> rabbit_exchange_type_topic:add_binding(X, B) end,
+ lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end,
Bindings),
%% test some matches
test_topic_expect_match(X,
- [{"a.b.c", ["t1", "t2", "t5", "t6", "t10", "t11", "t12", "t18", "t20",
- "t21", "t22", "t23", "t24", "t26"]},
- {"a.b", ["t3", "t5", "t6", "t7", "t8", "t9", "t11", "t12", "t15",
- "t21", "t22", "t23", "t24", "t26"]},
- {"a.b.b", ["t3", "t5", "t6", "t7", "t11", "t12", "t14", "t18", "t21",
- "t22", "t23", "t24", "t26"]},
- {"", ["t5", "t6", "t17", "t24"]},
- {"b.c.c", ["t5", "t6", "t18", "t21", "t22", "t23", "t24", "t26"]},
- {"a.a.a.a.a", ["t5", "t6", "t11", "t12", "t21", "t22", "t23", "t24"]},
- {"vodka.gin", ["t5", "t6", "t8", "t21", "t22", "t23", "t24"]},
- {"vodka.martini", ["t5", "t6", "t8", "t19", "t21", "t22", "t23",
- "t24"]},
- {"b.b.c", ["t5", "t6", "t10", "t13", "t18", "t21", "t22", "t23",
- "t24", "t26"]},
+ [{"a.b.c", ["t1", "t2", "t5", "t6", "t10", "t11", "t12",
+ "t18", "t20", "t21", "t22", "t23", "t24",
+ "t26"]},
+ {"a.b", ["t3", "t5", "t6", "t7", "t8", "t9", "t11",
+ "t12", "t15", "t21", "t22", "t23", "t24",
+ "t26"]},
+ {"a.b.b", ["t3", "t5", "t6", "t7", "t11", "t12", "t14",
+ "t18", "t21", "t22", "t23", "t24", "t26"]},
+ {"", ["t5", "t6", "t17", "t24"]},
+ {"b.c.c", ["t5", "t6", "t18", "t21", "t22", "t23", "t24",
+ "t26"]},
+ {"a.a.a.a.a", ["t5", "t6", "t11", "t12", "t21", "t22", "t23",
+ "t24"]},
+ {"vodka.gin", ["t5", "t6", "t8", "t21", "t22", "t23",
+ "t24"]},
+ {"vodka.martini", ["t5", "t6", "t8", "t19", "t21", "t22", "t23",
+ "t24"]},
+ {"b.b.c", ["t5", "t6", "t10", "t13", "t18", "t21", "t22",
+ "t23", "t24", "t26"]},
{"nothing.here.at.all", ["t5", "t6", "t21", "t22", "t23", "t24"]},
- {"oneword", ["t5", "t6", "t21", "t22", "t23", "t24", "t25"]}]),
+ {"oneword", ["t5", "t6", "t21", "t22", "t23", "t24",
+ "t25"]}]),
%% remove some bindings
RemovedBindings = [lists:nth(1, Bindings), lists:nth(5, Bindings),
lists:nth(11, Bindings), lists:nth(19, Bindings),
lists:nth(21, Bindings)],
- rabbit_exchange_type_topic:remove_bindings(X, RemovedBindings),
+ exchange_op_callback(X, remove_bindings, [RemovedBindings]),
RemainingBindings = ordsets:to_list(
ordsets:subtract(ordsets:from_list(Bindings),
ordsets:from_list(RemovedBindings))),
%% test some matches
test_topic_expect_match(X,
- [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22", "t23",
- "t24", "t26"]},
- {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15", "t22", "t23",
- "t24", "t26"]},
- {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22", "t23",
- "t24", "t26"]},
- {"", ["t6", "t17", "t24"]},
- {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]},
- {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]},
- {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]},
- {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]},
- {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23", "t24", "t26"]},
+ [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22",
+ "t23", "t24", "t26"]},
+ {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15",
+ "t22", "t23", "t24", "t26"]},
+ {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22",
+ "t23", "t24", "t26"]},
+ {"", ["t6", "t17", "t24"]},
+ {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]},
+ {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]},
+ {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]},
+ {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]},
+ {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23",
+ "t24", "t26"]},
{"nothing.here.at.all", ["t6", "t22", "t23", "t24"]},
- {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]),
+ {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]),
%% remove the entire exchange
- rabbit_exchange_type_topic:delete(X, RemainingBindings),
+ exchange_op_callback(X, delete, [RemainingBindings]),
%% none should match now
test_topic_expect_match(X, [{"a.b.c", []}, {"b.b.c", []}, {"", []}]),
passed.
+exchange_op_callback(X, Fun, ExtraArgs) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> rabbit_exchange:callback(X, Fun, [true, X] ++ ExtraArgs) end),
+ rabbit_exchange:callback(X, Fun, [false, X] ++ ExtraArgs).
+
test_topic_expect_match(X, List) ->
lists:foreach(
fun ({Key, Expected}) ->