summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-03-11 16:21:35 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-03-11 16:21:35 +0000
commit8504d1c5fd80b6c246102158f6cb8885778d8712 (patch)
tree00b3624b4b4e25ab1c45fac395695a41e75ac06b
parent1721a5ebc1160fae927d72be21278eba2d3c28db (diff)
parentbba11517f66b618bf9d07159ee6d1c3588917e41 (diff)
downloadrabbitmq-server-git-8504d1c5fd80b6c246102158f6cb8885778d8712.tar.gz
Merging default to bug23554
-rw-r--r--src/rabbit_exchange_type_topic.erl84
-rw-r--r--src/rabbit_networking.erl64
2 files changed, 89 insertions, 59 deletions
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index f12661d48f..ffd1e58395 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -67,17 +67,58 @@ add_binding(true, _Exchange, Binding) ->
add_binding(false, _Exchange, _Binding) ->
ok.
-remove_bindings(true, _X, Bs) ->
- lists:foreach(fun remove_binding/1, Bs),
+remove_bindings(true, #exchange{name = X}, Bs) ->
+ %% The remove process is split into two distinct phases. In the
+ %% first phase we gather the lists of bindings and edges to
+ %% delete, then in the second phase we process all the
+ %% deletions. This is to prevent interleaving of read/write
+ %% operations in mnesia that can adversely affect performance.
+ {ToDelete, Paths} =
+ lists:foldl(
+ fun(#binding{source = S, key = K, destination = D}, {Acc, PathAcc}) ->
+ Path = [{FinalNode, _} | _] =
+ follow_down_get_path(S, split_topic_key(K)),
+ {[{FinalNode, D} | Acc],
+ decrement_bindings(X, Path, maybe_add_path(X, Path, PathAcc))}
+ end, {[], gb_trees:empty()}, Bs),
+
+ [trie_remove_binding(X, FinalNode, D) || {FinalNode, D} <- ToDelete],
+ [trie_remove_edge(X, Parent, Node, W) ||
+ {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)],
ok;
remove_bindings(false, _X, _Bs) ->
ok.
-remove_binding(#binding{source = X, key = K, destination = D}) ->
- Path = [{FinalNode, _} | _] = follow_down_get_path(X, split_topic_key(K)),
- trie_remove_binding(X, FinalNode, D),
- remove_path_if_empty(X, Path),
- ok.
+maybe_add_path(_X, [{root, none}], PathAcc) ->
+ PathAcc;
+maybe_add_path(X, [{Node, W}, {Parent, _} | _], PathAcc) ->
+ case gb_trees:is_defined(Node, PathAcc) of
+ true -> PathAcc;
+ false -> gb_trees:insert(Node, {Parent, W, {trie_binding_count(X, Node),
+ trie_child_count(X, Node)}},
+ PathAcc)
+ end.
+
+decrement_bindings(X, Path, PathAcc) ->
+ with_path_acc(X, fun({Bindings, Edges}) -> {Bindings - 1, Edges} end,
+ Path, PathAcc).
+
+decrement_edges(X, Path, PathAcc) ->
+ with_path_acc(X, fun({Bindings, Edges}) -> {Bindings, Edges - 1} end,
+ Path, PathAcc).
+
+with_path_acc(_X, _Fun, [{root, none}], PathAcc) ->
+ PathAcc;
+with_path_acc(X, Fun, [{Node, _} | ParentPath], PathAcc) ->
+ {Parent, W, Counts} = gb_trees:get(Node, PathAcc),
+ NewCounts = Fun(Counts),
+ NewPathAcc = gb_trees:update(Node, {Parent, W, NewCounts}, PathAcc),
+ case NewCounts of
+ {0, 0} -> decrement_edges(X, ParentPath,
+ maybe_add_path(X, ParentPath, NewPathAcc));
+ _ -> NewPathAcc
+ end.
+
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
@@ -146,15 +187,6 @@ follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) ->
error -> {error, Acc, Words}
end.
-remove_path_if_empty(_, [{root, none}]) ->
- ok;
-remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) ->
- case trie_has_any_bindings(X, Node) orelse trie_has_any_children(X, Node) of
- true -> ok;
- false -> trie_remove_edge(X, Parent, Node, W),
- remove_path_if_empty(X, RestPath)
- end.
-
trie_child(X, Node, Word) ->
case mnesia:read(rabbit_topic_trie_edge,
#trie_edge{exchange_name = X,
@@ -199,21 +231,24 @@ trie_binding_op(X, Node, D, Op) ->
destination = D}},
write).
-trie_has_any_children(X, Node) ->
- has_any(rabbit_topic_trie_edge,
+trie_child_count(X, Node) ->
+ count(rabbit_topic_trie_edge,
#topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
node_id = Node,
_ = '_'},
_ = '_'}).
-trie_has_any_bindings(X, Node) ->
- has_any(rabbit_topic_trie_binding,
+trie_binding_count(X, Node) ->
+ count(rabbit_topic_trie_binding,
#topic_trie_binding{
trie_binding = #trie_binding{exchange_name = X,
node_id = Node,
_ = '_'},
_ = '_'}).
+count(Table, Match) ->
+ length(mnesia:match_object(Table, Match, read)).
+
trie_remove_all_edges(X) ->
remove_all(rabbit_topic_trie_edge,
#topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
@@ -226,15 +261,6 @@ trie_remove_all_bindings(X) ->
trie_binding = #trie_binding{exchange_name = X, _ = '_'},
_ = '_'}).
-has_any(Table, MatchHead) ->
- Select = mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read),
- select_while_no_result(Select) /= '$end_of_table'.
-
-select_while_no_result({[], Cont}) ->
- select_while_no_result(mnesia:select(Cont));
-select_while_no_result(Other) ->
- Other.
-
remove_all(Table, Pattern) ->
lists:foreach(fun (R) -> mnesia:delete_object(Table, R, write) end,
mnesia:match_object(Table, Pattern, write)).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 877d2cf7da..53be0190e7 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -24,7 +24,8 @@
close_connection/2]).
%%used by TCP-based transports, e.g. STOMP adapter
--export([check_tcp_listener_address/2]).
+-export([check_tcp_listener_address/2,
+ ensure_ssl/0, ssl_transform_fun/1]).
-export([tcp_listener_started/3, tcp_listener_stopped/3,
start_client/1, start_ssl_client/2]).
@@ -67,7 +68,7 @@
-spec(close_connection/2 :: (pid(), string()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(check_tcp_listener_address/2 :: (atom(), listener_config())
- -> [{inet:ip_address(), ip_port(), family(), atom()}]).
+ -> [{inet:ip_address(), ip_port(), family(), atom()}]).
-endif.
@@ -88,19 +89,8 @@ boot_ssl() ->
{ok, []} ->
ok;
{ok, SslListeners} ->
- ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
- {ok, SslOptsConfig} = application:get_env(ssl_options),
- %% unknown_ca errors are silently ignored prior to R14B unless we
- %% supply this verify_fun - remove when at least R14B is required
- SslOpts =
- case proplists:get_value(verify, SslOptsConfig, verify_none) of
- verify_none -> SslOptsConfig;
- verify_peer -> [{verify_fun, fun([]) -> true;
- ([_|_]) -> false
- end}
- | SslOptsConfig]
- end,
- [start_ssl_listener(Listener, SslOpts) || Listener <- SslListeners],
+ [start_ssl_listener(Listener, ensure_ssl())
+ || Listener <- SslListeners],
ok
end.
@@ -147,6 +137,34 @@ resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6;
resolve_family(IP, auto) -> throw({error, {strange_family, IP}});
resolve_family(_, F) -> F.
+ensure_ssl() ->
+ ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
+ {ok, SslOptsConfig} = application:get_env(rabbit, ssl_options),
+
+ % unknown_ca errors are silently ignored prior to R14B unless we
+ % supply this verify_fun - remove when at least R14B is required
+ case proplists:get_value(verify, SslOptsConfig, verify_none) of
+ verify_none -> SslOptsConfig;
+ verify_peer -> [{verify_fun, fun([]) -> true;
+ ([_|_]) -> false
+ end}
+ | SslOptsConfig]
+ end.
+
+ssl_transform_fun(SslOpts) ->
+ fun (Sock) ->
+ case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of
+ {ok, SslSock} ->
+ rabbit_log:info("upgraded TCP connection ~p to SSL~n",
+ [self()]),
+ {ok, #ssl_socket{tcp = Sock, ssl = SslSock}};
+ {error, Reason} ->
+ {error, {ssl_upgrade_error, Reason}};
+ {'EXIT', Reason} ->
+ {error, {ssl_upgrade_failure, Reason}}
+ end
+ end.
+
check_tcp_listener_address(NamePrefix, Port) when is_integer(Port) ->
check_tcp_listener_address_auto(NamePrefix, Port);
@@ -246,21 +264,7 @@ start_client(Sock) ->
start_client(Sock, fun (S) -> {ok, S} end).
start_ssl_client(SslOpts, Sock) ->
- start_client(
- Sock,
- fun (Sock1) ->
- case catch ssl:ssl_accept(Sock1, SslOpts, ?SSL_TIMEOUT * 1000) of
- {ok, SslSock} ->
- rabbit_log:info("upgraded TCP connection ~p to SSL~n",
- [self()]),
- {ok, #ssl_socket{tcp = Sock1, ssl = SslSock}};
- {error, Reason} ->
- {error, {ssl_upgrade_error, Reason}};
- {'EXIT', Reason} ->
- {error, {ssl_upgrade_failure, Reason}}
-
- end
- end).
+ start_client(Sock, ssl_transform_fun(SslOpts)).
connections() ->
[rabbit_connection_sup:reader(ConnSup) ||