diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-11 16:21:35 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-11 16:21:35 +0000 |
| commit | 8504d1c5fd80b6c246102158f6cb8885778d8712 (patch) | |
| tree | 00b3624b4b4e25ab1c45fac395695a41e75ac06b | |
| parent | 1721a5ebc1160fae927d72be21278eba2d3c28db (diff) | |
| parent | bba11517f66b618bf9d07159ee6d1c3588917e41 (diff) | |
| download | rabbitmq-server-git-8504d1c5fd80b6c246102158f6cb8885778d8712.tar.gz | |
Merging default to bug23554
| -rw-r--r-- | src/rabbit_exchange_type_topic.erl | 84 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 64 |
2 files changed, 89 insertions, 59 deletions
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index f12661d48f..ffd1e58395 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -67,17 +67,58 @@ add_binding(true, _Exchange, Binding) -> add_binding(false, _Exchange, _Binding) -> ok. -remove_bindings(true, _X, Bs) -> - lists:foreach(fun remove_binding/1, Bs), +remove_bindings(true, #exchange{name = X}, Bs) -> + %% The remove process is split into two distinct phases. In the + %% first phase we gather the lists of bindings and edges to + %% delete, then in the second phase we process all the + %% deletions. This is to prevent interleaving of read/write + %% operations in mnesia that can adversely affect performance. + {ToDelete, Paths} = + lists:foldl( + fun(#binding{source = S, key = K, destination = D}, {Acc, PathAcc}) -> + Path = [{FinalNode, _} | _] = + follow_down_get_path(S, split_topic_key(K)), + {[{FinalNode, D} | Acc], + decrement_bindings(X, Path, maybe_add_path(X, Path, PathAcc))} + end, {[], gb_trees:empty()}, Bs), + + [trie_remove_binding(X, FinalNode, D) || {FinalNode, D} <- ToDelete], + [trie_remove_edge(X, Parent, Node, W) || + {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)], ok; remove_bindings(false, _X, _Bs) -> ok. -remove_binding(#binding{source = X, key = K, destination = D}) -> - Path = [{FinalNode, _} | _] = follow_down_get_path(X, split_topic_key(K)), - trie_remove_binding(X, FinalNode, D), - remove_path_if_empty(X, Path), - ok. +maybe_add_path(_X, [{root, none}], PathAcc) -> + PathAcc; +maybe_add_path(X, [{Node, W}, {Parent, _} | _], PathAcc) -> + case gb_trees:is_defined(Node, PathAcc) of + true -> PathAcc; + false -> gb_trees:insert(Node, {Parent, W, {trie_binding_count(X, Node), + trie_child_count(X, Node)}}, + PathAcc) + end. + +decrement_bindings(X, Path, PathAcc) -> + with_path_acc(X, fun({Bindings, Edges}) -> {Bindings - 1, Edges} end, + Path, PathAcc). + +decrement_edges(X, Path, PathAcc) -> + with_path_acc(X, fun({Bindings, Edges}) -> {Bindings, Edges - 1} end, + Path, PathAcc). + +with_path_acc(_X, _Fun, [{root, none}], PathAcc) -> + PathAcc; +with_path_acc(X, Fun, [{Node, _} | ParentPath], PathAcc) -> + {Parent, W, Counts} = gb_trees:get(Node, PathAcc), + NewCounts = Fun(Counts), + NewPathAcc = gb_trees:update(Node, {Parent, W, NewCounts}, PathAcc), + case NewCounts of + {0, 0} -> decrement_edges(X, ParentPath, + maybe_add_path(X, ParentPath, NewPathAcc)); + _ -> NewPathAcc + end. + assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). @@ -146,15 +187,6 @@ follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) -> error -> {error, Acc, Words} end. -remove_path_if_empty(_, [{root, none}]) -> - ok; -remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) -> - case trie_has_any_bindings(X, Node) orelse trie_has_any_children(X, Node) of - true -> ok; - false -> trie_remove_edge(X, Parent, Node, W), - remove_path_if_empty(X, RestPath) - end. - trie_child(X, Node, Word) -> case mnesia:read(rabbit_topic_trie_edge, #trie_edge{exchange_name = X, @@ -199,21 +231,24 @@ trie_binding_op(X, Node, D, Op) -> destination = D}}, write). -trie_has_any_children(X, Node) -> - has_any(rabbit_topic_trie_edge, +trie_child_count(X, Node) -> + count(rabbit_topic_trie_edge, #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, node_id = Node, _ = '_'}, _ = '_'}). -trie_has_any_bindings(X, Node) -> - has_any(rabbit_topic_trie_binding, +trie_binding_count(X, Node) -> + count(rabbit_topic_trie_binding, #topic_trie_binding{ trie_binding = #trie_binding{exchange_name = X, node_id = Node, _ = '_'}, _ = '_'}). +count(Table, Match) -> + length(mnesia:match_object(Table, Match, read)). + trie_remove_all_edges(X) -> remove_all(rabbit_topic_trie_edge, #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, @@ -226,15 +261,6 @@ trie_remove_all_bindings(X) -> trie_binding = #trie_binding{exchange_name = X, _ = '_'}, _ = '_'}). -has_any(Table, MatchHead) -> - Select = mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read), - select_while_no_result(Select) /= '$end_of_table'. - -select_while_no_result({[], Cont}) -> - select_while_no_result(mnesia:select(Cont)); -select_while_no_result(Other) -> - Other. - remove_all(Table, Pattern) -> lists:foreach(fun (R) -> mnesia:delete_object(Table, R, write) end, mnesia:match_object(Table, Pattern, write)). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 877d2cf7da..53be0190e7 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -24,7 +24,8 @@ close_connection/2]). %%used by TCP-based transports, e.g. STOMP adapter --export([check_tcp_listener_address/2]). +-export([check_tcp_listener_address/2, + ensure_ssl/0, ssl_transform_fun/1]). -export([tcp_listener_started/3, tcp_listener_stopped/3, start_client/1, start_ssl_client/2]). @@ -67,7 +68,7 @@ -spec(close_connection/2 :: (pid(), string()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(check_tcp_listener_address/2 :: (atom(), listener_config()) - -> [{inet:ip_address(), ip_port(), family(), atom()}]). + -> [{inet:ip_address(), ip_port(), family(), atom()}]). -endif. @@ -88,19 +89,8 @@ boot_ssl() -> {ok, []} -> ok; {ok, SslListeners} -> - ok = rabbit_misc:start_applications([crypto, public_key, ssl]), - {ok, SslOptsConfig} = application:get_env(ssl_options), - %% unknown_ca errors are silently ignored prior to R14B unless we - %% supply this verify_fun - remove when at least R14B is required - SslOpts = - case proplists:get_value(verify, SslOptsConfig, verify_none) of - verify_none -> SslOptsConfig; - verify_peer -> [{verify_fun, fun([]) -> true; - ([_|_]) -> false - end} - | SslOptsConfig] - end, - [start_ssl_listener(Listener, SslOpts) || Listener <- SslListeners], + [start_ssl_listener(Listener, ensure_ssl()) + || Listener <- SslListeners], ok end. @@ -147,6 +137,34 @@ resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6; resolve_family(IP, auto) -> throw({error, {strange_family, IP}}); resolve_family(_, F) -> F. +ensure_ssl() -> + ok = rabbit_misc:start_applications([crypto, public_key, ssl]), + {ok, SslOptsConfig} = application:get_env(rabbit, ssl_options), + + % unknown_ca errors are silently ignored prior to R14B unless we + % supply this verify_fun - remove when at least R14B is required + case proplists:get_value(verify, SslOptsConfig, verify_none) of + verify_none -> SslOptsConfig; + verify_peer -> [{verify_fun, fun([]) -> true; + ([_|_]) -> false + end} + | SslOptsConfig] + end. + +ssl_transform_fun(SslOpts) -> + fun (Sock) -> + case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of + {ok, SslSock} -> + rabbit_log:info("upgraded TCP connection ~p to SSL~n", + [self()]), + {ok, #ssl_socket{tcp = Sock, ssl = SslSock}}; + {error, Reason} -> + {error, {ssl_upgrade_error, Reason}}; + {'EXIT', Reason} -> + {error, {ssl_upgrade_failure, Reason}} + end + end. + check_tcp_listener_address(NamePrefix, Port) when is_integer(Port) -> check_tcp_listener_address_auto(NamePrefix, Port); @@ -246,21 +264,7 @@ start_client(Sock) -> start_client(Sock, fun (S) -> {ok, S} end). start_ssl_client(SslOpts, Sock) -> - start_client( - Sock, - fun (Sock1) -> - case catch ssl:ssl_accept(Sock1, SslOpts, ?SSL_TIMEOUT * 1000) of - {ok, SslSock} -> - rabbit_log:info("upgraded TCP connection ~p to SSL~n", - [self()]), - {ok, #ssl_socket{tcp = Sock1, ssl = SslSock}}; - {error, Reason} -> - {error, {ssl_upgrade_error, Reason}}; - {'EXIT', Reason} -> - {error, {ssl_upgrade_failure, Reason}} - - end - end). + start_client(Sock, ssl_transform_fun(SslOpts)). connections() -> [rabbit_connection_sup:reader(ConnSup) || |
