diff options
| -rw-r--r-- | src/rabbit_exchange_type_topic.erl | 149 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 76 |
2 files changed, 121 insertions, 104 deletions
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 2da3f3eec4..2e181f1da7 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -48,27 +48,28 @@ validate(_X) -> ok. create(_Tx, _X) -> ok. recover(_X, _Bs) -> ok. -delete(_Tx, #exchange{name = X}, _Bs) -> - rabbit_misc:execute_mnesia_transaction(fun () -> trie_remove_all_edges(X), - trie_remove_all_bindings(X) - end), +delete(true, #exchange{name = X}, _Bs) -> + trie_remove_all_edges(X), + trie_remove_all_bindings(X), + ok; +delete(false, _Exchange, _Bs) -> ok. -add_binding(_Tx, _Exchange, #binding{source = X, key = K, destination = D}) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> FinalNode = follow_down_create(X, split_topic_key(K)), - trie_add_binding(X, FinalNode, D) - end), +add_binding(true, _Exchange, #binding{source = X, key = K, destination = D}) -> + FinalNode = follow_down_create(X, split_topic_key(K)), + trie_add_binding(X, FinalNode, D), + ok; +add_binding(false, _Exchange, _Binding) -> ok. -remove_bindings(_Tx, _X, Bs) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> lists:foreach(fun remove_binding/1, Bs) end), +remove_bindings(true, _X, Bs) -> + lists:foreach(fun remove_binding/1, Bs), + ok; +remove_bindings(false, _X, _Bs) -> ok. remove_binding(#binding{source = X, key = K, destination = D}) -> - Path = follow_down_get_path(X, split_topic_key(K)), - {FinalNode, _} = hd(Path), + Path = [{FinalNode, _} | _] = follow_down_get_path(X, split_topic_key(K)), trie_remove_binding(X, FinalNode, D), remove_path_if_empty(X, Path), ok. @@ -108,19 +109,8 @@ trie_match_skip_any(X, Node, []) -> trie_match_skip_any(X, Node, [_ | RestW] = Words) -> trie_match(X, Node, Words) ++ trie_match_skip_any(X, Node, RestW). -follow_down(X, Words) -> - follow_down(X, root, Words). - -follow_down(_X, CurNode, []) -> - {ok, CurNode}; -follow_down(X, CurNode, [W | RestW]) -> - case trie_child(X, CurNode, W) of - {ok, NextNode} -> follow_down(X, NextNode, RestW); - error -> {error, CurNode, [W | RestW]} - end. - follow_down_create(X, Words) -> - case follow_down(X, Words) of + case follow_down_last_node(X, Words) of {ok, FinalNode} -> FinalNode; {error, Node, RestW} -> lists:foldl( fun (W, CurNode) -> @@ -130,14 +120,26 @@ follow_down_create(X, Words) -> end, Node, RestW) end. +follow_down_last_node(X, Words) -> + follow_down(X, fun (_, Node, _) -> Node end, root, Words). + follow_down_get_path(X, Words) -> - follow_down_get_path(X, root, Words, [{root, none}]). + {ok, Path} = + follow_down(X, fun (W, Node, PathAcc) -> [{Node, W} | PathAcc] end, + [{root, none}], Words), + Path. + +follow_down(X, AccFun, Acc0, Words) -> + follow_down(X, root, AccFun, Acc0, Words). -follow_down_get_path(_, _, [], PathAcc) -> - PathAcc; -follow_down_get_path(X, CurNode, [W | RestW], PathAcc) -> - {ok, NextNode} = trie_child(X, CurNode, W), - follow_down_get_path(X, NextNode, RestW, [{NextNode, W} | PathAcc]). +follow_down(_X, _CurNode, _AccFun, Acc, []) -> + {ok, Acc}; +follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) -> + case trie_child(X, CurNode, W) of + {ok, NextNode} -> follow_down(X, NextNode, AccFun, + AccFun(W, NextNode, Acc), RestW); + error -> {error, Acc, Words} + end. remove_path_if_empty(_, [{root, none}]) -> ok; @@ -149,9 +151,10 @@ remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) -> end. trie_child(X, Node, Word) -> - case mnesia:read(rabbit_topic_trie_edge, #trie_edge{exchange_name = X, - node_id = Node, - word = Word}) of + case mnesia:read(rabbit_topic_trie_edge, + #trie_edge{exchange_name = X, + node_id = Node, + word = Word}) of [#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode}; [] -> error end. @@ -159,8 +162,8 @@ trie_child(X, Node, Word) -> trie_bindings(X, Node) -> MatchHead = #topic_trie_binding{ trie_binding = #trie_binding{exchange_name = X, - node_id = Node, - destination = '$1'}}, + node_id = Node, + destination = '$1'}}, mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]). trie_add_edge(X, FromNode, ToNode, W) -> @@ -172,9 +175,9 @@ trie_remove_edge(X, FromNode, ToNode, W) -> trie_edge_op(X, FromNode, ToNode, W, Op) -> ok = Op(rabbit_topic_trie_edge, #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, - node_id = FromNode, - word = W}, - node_id = ToNode}, + node_id = FromNode, + word = W}, + node_id = ToNode}, write). trie_add_binding(X, Node, D) -> @@ -185,28 +188,41 @@ trie_remove_binding(X, Node, D) -> trie_binding_op(X, Node, D, Op) -> ok = Op(rabbit_topic_trie_binding, - #topic_trie_binding{trie_binding = #trie_binding{exchange_name = X, - node_id = Node, - destination = D}}, + #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + destination = D}}, write). trie_has_any_children(X, Node) -> - MatchHead = #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, - node_id = Node, - _='_'}, - _='_'}, - Select = mnesia:select(rabbit_topic_trie_edge, - [{MatchHead, [], ['$_']}], 1, read), - select_while_no_result(Select) /= '$end_of_table'. + has_any(rabbit_topic_trie_edge, + #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, + node_id = Node, + _ = '_'}, + _ = '_'}). trie_has_any_bindings(X, Node) -> - MatchHead = #topic_trie_binding{ - trie_binding = #trie_binding{exchange_name = X, - node_id = Node, - _='_'}, - _='_'}, - Select = mnesia:select(rabbit_topic_trie_binding, - [{MatchHead, [], ['$_']}], 1, read), + has_any(rabbit_topic_trie_binding, + #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + _ = '_'}, + _ = '_'}). + +trie_remove_all_edges(X) -> + remove_all(rabbit_topic_trie_edge, + #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, + _ = '_'}, + _ = '_'}). + +trie_remove_all_bindings(X) -> + remove_all(rabbit_topic_trie_binding, + #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, _ = '_'}, + _ = '_'}). + +has_any(Table, MatchHead) -> + Select = mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read), select_while_no_result(Select) /= '$end_of_table'. select_while_no_result({[], Cont}) -> @@ -214,21 +230,9 @@ select_while_no_result({[], Cont}) -> select_while_no_result(Other) -> Other. -trie_remove_all_edges(X) -> - Pattern = #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, - _='_'}, - _='_'}, - lists:foreach( - fun (R) -> mnesia:delete_object(rabbit_topic_trie_edge, R, write) end, - mnesia:match_object(rabbit_topic_trie_edge, Pattern, write)). - -trie_remove_all_bindings(X) -> - Pattern = #topic_trie_binding{trie_binding = #trie_binding{exchange_name =X, - _='_'}, - _='_'}, - lists:foreach( - fun (R) -> mnesia:delete_object(rabbit_topic_trie_binding, R, write) end, - mnesia:match_object(rabbit_topic_trie_binding, Pattern, write)). +remove_all(Table, Pattern) -> + lists:foreach(fun (R) -> mnesia:delete_object(Table, R, write) end, + mnesia:match_object(Table, Pattern, write)). new_node_id() -> rabbit_guid:guid(). @@ -244,3 +248,4 @@ split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) -> split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]); split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) -> split_topic_key(Rest, [C | RevWordAcc], RevResAcc). + diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b80f369238..32cdaa52f1 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -588,7 +588,7 @@ test_topic_matching() -> auto_delete = false, arguments = []}, %% create rabbit_exchange_type_topic:validate(X), - rabbit_exchange_type_topic:create(X), + exchange_op_callback(X, create, []), %% add some bindings Bindings = lists:map( @@ -624,60 +624,72 @@ test_topic_matching() -> {"#.#.#", "t24"}, {"*", "t25"}, {"#.b.#", "t26"}]), - lists:foreach(fun (B) -> rabbit_exchange_type_topic:add_binding(X, B) end, + lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end, Bindings), %% test some matches test_topic_expect_match(X, - [{"a.b.c", ["t1", "t2", "t5", "t6", "t10", "t11", "t12", "t18", "t20", - "t21", "t22", "t23", "t24", "t26"]}, - {"a.b", ["t3", "t5", "t6", "t7", "t8", "t9", "t11", "t12", "t15", - "t21", "t22", "t23", "t24", "t26"]}, - {"a.b.b", ["t3", "t5", "t6", "t7", "t11", "t12", "t14", "t18", "t21", - "t22", "t23", "t24", "t26"]}, - {"", ["t5", "t6", "t17", "t24"]}, - {"b.c.c", ["t5", "t6", "t18", "t21", "t22", "t23", "t24", "t26"]}, - {"a.a.a.a.a", ["t5", "t6", "t11", "t12", "t21", "t22", "t23", "t24"]}, - {"vodka.gin", ["t5", "t6", "t8", "t21", "t22", "t23", "t24"]}, - {"vodka.martini", ["t5", "t6", "t8", "t19", "t21", "t22", "t23", - "t24"]}, - {"b.b.c", ["t5", "t6", "t10", "t13", "t18", "t21", "t22", "t23", - "t24", "t26"]}, + [{"a.b.c", ["t1", "t2", "t5", "t6", "t10", "t11", "t12", + "t18", "t20", "t21", "t22", "t23", "t24", + "t26"]}, + {"a.b", ["t3", "t5", "t6", "t7", "t8", "t9", "t11", + "t12", "t15", "t21", "t22", "t23", "t24", + "t26"]}, + {"a.b.b", ["t3", "t5", "t6", "t7", "t11", "t12", "t14", + "t18", "t21", "t22", "t23", "t24", "t26"]}, + {"", ["t5", "t6", "t17", "t24"]}, + {"b.c.c", ["t5", "t6", "t18", "t21", "t22", "t23", "t24", + "t26"]}, + {"a.a.a.a.a", ["t5", "t6", "t11", "t12", "t21", "t22", "t23", + "t24"]}, + {"vodka.gin", ["t5", "t6", "t8", "t21", "t22", "t23", + "t24"]}, + {"vodka.martini", ["t5", "t6", "t8", "t19", "t21", "t22", "t23", + "t24"]}, + {"b.b.c", ["t5", "t6", "t10", "t13", "t18", "t21", "t22", + "t23", "t24", "t26"]}, {"nothing.here.at.all", ["t5", "t6", "t21", "t22", "t23", "t24"]}, - {"oneword", ["t5", "t6", "t21", "t22", "t23", "t24", "t25"]}]), + {"oneword", ["t5", "t6", "t21", "t22", "t23", "t24", + "t25"]}]), %% remove some bindings RemovedBindings = [lists:nth(1, Bindings), lists:nth(5, Bindings), lists:nth(11, Bindings), lists:nth(19, Bindings), lists:nth(21, Bindings)], - rabbit_exchange_type_topic:remove_bindings(X, RemovedBindings), + exchange_op_callback(X, remove_bindings, [RemovedBindings]), RemainingBindings = ordsets:to_list( ordsets:subtract(ordsets:from_list(Bindings), ordsets:from_list(RemovedBindings))), %% test some matches test_topic_expect_match(X, - [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22", "t23", - "t24", "t26"]}, - {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15", "t22", "t23", - "t24", "t26"]}, - {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22", "t23", - "t24", "t26"]}, - {"", ["t6", "t17", "t24"]}, - {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]}, - {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]}, - {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]}, - {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]}, - {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23", "t24", "t26"]}, + [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22", + "t23", "t24", "t26"]}, + {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15", + "t22", "t23", "t24", "t26"]}, + {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22", + "t23", "t24", "t26"]}, + {"", ["t6", "t17", "t24"]}, + {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]}, + {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]}, + {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]}, + {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]}, + {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23", + "t24", "t26"]}, {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]}, - {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]), + {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]), %% remove the entire exchange - rabbit_exchange_type_topic:delete(X, RemainingBindings), + exchange_op_callback(X, delete, [RemainingBindings]), %% none should match now test_topic_expect_match(X, [{"a.b.c", []}, {"b.b.c", []}, {"", []}]), passed. +exchange_op_callback(X, Fun, ExtraArgs) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> rabbit_exchange:callback(X, Fun, [true, X] ++ ExtraArgs) end), + rabbit_exchange:callback(X, Fun, [false, X] ++ ExtraArgs). + test_topic_expect_match(X, List) -> lists:foreach( fun ({Key, Expected}) -> |
