summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-09-07 05:45:46 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-09-07 05:45:46 +0100
commitad963f9ed7e3094d7fdbbaa5a7d960340b481706 (patch)
treec8a4c49e50939e60348a05610b667f9786eab712 /src
parenta297b77d0ff5a60e90948fec85b4d4091cd5ea46 (diff)
parent30431e239437805d7b7b3e3cc183902d813b178d (diff)
downloadrabbitmq-server-git-ad963f9ed7e3094d7fdbbaa5a7d960340b481706.tar.gz
merge default into bug22902
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl13
-rw-r--r--src/rabbit_amqqueue_process.erl9
-rw-r--r--src/rabbit_binding.erl371
-rw-r--r--src/rabbit_channel.erl23
-rw-r--r--src/rabbit_connection_sup.erl4
-rw-r--r--src/rabbit_control.erl12
-rw-r--r--src/rabbit_exchange.erl367
-rw-r--r--src/rabbit_exchange_type_headers.erl4
-rw-r--r--src/rabbit_load.erl78
-rw-r--r--src/rabbit_net.erl42
-rw-r--r--src/rabbit_reader.erl8
-rw-r--r--src/rabbit_router.erl12
-rw-r--r--src/rabbit_tests.erl10
-rw-r--r--src/rabbit_tracer.erl50
-rw-r--r--src/rabbit_types.erl3
-rw-r--r--src/rabbit_writer.erl4
16 files changed, 464 insertions, 546 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6b9ac56059..7116653c2a 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -249,11 +249,12 @@ start_queue_process(Q) ->
Q#amqqueue{pid = Pid}.
add_default_binding(#amqqueue{name = QueueName}) ->
- Exchange = rabbit_misc:r(QueueName, exchange, <<>>),
+ ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
- rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [],
- fun (_X, _Q) -> ok end),
- ok.
+ rabbit_binding:add(#binding{exchange_name = ExchangeName,
+ queue_name = QueueName,
+ key = RoutingKey,
+ args = []}).
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_queue, Name}).
@@ -433,7 +434,7 @@ internal_delete1(QueueName) ->
%% we want to execute some things, as
%% decided by rabbit_exchange, after the
%% transaction.
- rabbit_exchange:delete_queue_bindings(QueueName).
+ rabbit_binding:remove_for_queue(QueueName).
internal_delete(QueueName) ->
case
@@ -478,7 +479,7 @@ on_node_down(Node) ->
ok.
delete_queue(QueueName) ->
- Post = rabbit_exchange:delete_transient_queue_bindings(QueueName),
+ Post = rabbit_binding:remove_transient_for_queue(QueueName),
ok = mnesia:delete({rabbit_queue, QueueName}),
Post.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 90a0503b00..0849586294 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -163,10 +163,8 @@ declare(Recover, From,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQS = BQ:init(QName, IsDurable, Recover),
- rabbit_event:notify(
- queue_created,
- [{Item, i(Item, State)} ||
- Item <- ?CREATION_EVENT_KEYS]),
+ rabbit_event:notify(queue_created,
+ infos(?CREATION_EVENT_KEYS, State)),
noreply(init_expires(State#q{backing_queue_state = BQS}));
Q1 -> {stop, normal, {existing, Q1}, State}
end.
@@ -587,8 +585,7 @@ i(Item, _) ->
throw({bad_argument, Item}).
emit_stats(State) ->
- rabbit_event:notify(queue_stats,
- [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
+ rabbit_event:notify(queue_stats, infos(?STATISTICS_KEYS, State)).
%---------------------------------------------------------------------------
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
new file mode 100644
index 0000000000..6caf7302b8
--- /dev/null
+++ b/src/rabbit_binding.erl
@@ -0,0 +1,371 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_binding).
+-include("rabbit.hrl").
+
+-export([recover/0, add/1, remove/1, add/2, remove/2, list/1]).
+-export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]).
+-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]).
+%% these must all be run inside a mnesia tx
+-export([has_for_exchange/1, remove_for_exchange/1,
+ remove_for_queue/1, remove_transient_for_queue/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-export_type([key/0]).
+
+-type(key() :: binary()).
+
+-type(bind_res() :: rabbit_types:ok_or_error('queue_not_found' |
+ 'exchange_not_found' |
+ 'exchange_and_queue_not_found')).
+-type(inner_fun() ::
+ fun((rabbit_types:exchange(), queue()) ->
+ rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
+-type(bindings() :: [rabbit_types:binding()]).
+
+-spec(recover/0 :: () -> [rabbit_types:binding()]).
+-spec(add/1 :: (rabbit_types:binding()) -> bind_res()).
+-spec(remove/1 :: (rabbit_types:binding()) ->
+ bind_res() | rabbit_types:error('binding_not_found')).
+-spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res()).
+-spec(remove/2 :: (rabbit_types:binding(), inner_fun()) ->
+ bind_res() | rabbit_types:error('binding_not_found')).
+-spec(list/1 :: (rabbit_types:vhost()) -> bindings()).
+-spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()).
+-spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()).
+-spec(list_for_exchange_and_queue/2 ::
+ (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()).
+-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
+-spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]).
+-spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) ->
+ [rabbit_types:info()]).
+-spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]).
+-spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()])
+ -> [[rabbit_types:info()]]).
+-spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()).
+-spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()).
+-spec(remove_for_queue/1 ::
+ (rabbit_amqqueue:name()) -> fun (() -> any())).
+-spec(remove_transient_for_queue/1 ::
+ (rabbit_amqqueue:name()) -> fun (() -> any())).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]).
+
+recover() ->
+ rabbit_misc:table_fold(
+ fun (Route = #route{binding = B}, Acc) ->
+ {_, ReverseRoute} = route_with_reverse(Route),
+ ok = mnesia:write(rabbit_route, Route, write),
+ ok = mnesia:write(rabbit_reverse_route, ReverseRoute, write),
+ [B | Acc]
+ end, [], rabbit_durable_route).
+
+add(Binding) -> add(Binding, fun (_X, _Q) -> ok end).
+
+remove(Binding) -> remove(Binding, fun (_X, _Q) -> ok end).
+
+add(Binding, InnerFun) ->
+ case binding_action(
+ Binding,
+ fun (X, Q, B) ->
+ %% this argument is used to check queue exclusivity;
+ %% in general, we want to fail on that in preference to
+ %% anything else
+ case InnerFun(X, Q) of
+ ok ->
+ case mnesia:read({rabbit_route, B}) of
+ [] -> Durable = (X#exchange.durable andalso
+ Q#amqqueue.durable),
+ ok = sync_binding(
+ B, Durable,
+ fun mnesia:write/3),
+ {new, X, B};
+ [_] -> {existing, X, B}
+ end;
+ {error, _} = E ->
+ E
+ end
+ end) of
+ {new, Exchange = #exchange{ type = Type }, B} ->
+ ok = (type_to_module(Type)):add_binding(Exchange, B),
+ rabbit_event:notify(binding_created, info(B));
+ {existing, _, _} ->
+ ok;
+ {error, _} = Err ->
+ Err
+ end.
+
+remove(Binding, InnerFun) ->
+ case binding_action(
+ Binding,
+ fun (X, Q, B) ->
+ case mnesia:match_object(rabbit_route, #route{binding = B},
+ write) of
+ [] -> {error, binding_not_found};
+ [_] -> case InnerFun(X, Q) of
+ ok ->
+ Durable = (X#exchange.durable andalso
+ Q#amqqueue.durable),
+ ok = sync_binding(
+ B, Durable,
+ fun mnesia:delete_object/3),
+ Deleted =
+ rabbit_exchange:maybe_auto_delete(X),
+ {{Deleted, X}, B};
+ {error, _} = E ->
+ E
+ end
+ end
+ end) of
+ {error, _} = Err ->
+ Err;
+ {{IsDeleted, X = #exchange{ type = Type }}, B} ->
+ Module = type_to_module(Type),
+ case IsDeleted of
+ auto_deleted -> ok = Module:delete(X, [B]);
+ not_deleted -> ok = Module:remove_bindings(X, [B])
+ end,
+ rabbit_event:notify(binding_deleted, info(B)),
+ ok
+ end.
+
+list(VHostPath) ->
+ Route = #route{binding = #binding{
+ exchange_name = rabbit_misc:r(VHostPath, exchange),
+ queue_name = rabbit_misc:r(VHostPath, queue),
+ _ = '_'},
+ _ = '_'},
+ [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
+ Route)].
+
+list_for_exchange(ExchangeName) ->
+ Route = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
+ [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
+ Route)].
+
+list_for_queue(QueueName) ->
+ Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}},
+ [reverse_binding(B) || #reverse_route{reverse_binding = B} <-
+ mnesia:dirty_match_object(rabbit_reverse_route,
+ reverse_route(Route))].
+
+list_for_exchange_and_queue(ExchangeName, QueueName) ->
+ Route = #route{binding = #binding{exchange_name = ExchangeName,
+ queue_name = QueueName,
+ _ = '_'}},
+ [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
+ Route)].
+
+info_keys() -> ?INFO_KEYS.
+
+map(VHostPath, F) ->
+ %% TODO: there is scope for optimisation here, e.g. using a
+ %% cursor, parallelising the function invocation
+ lists:map(F, list(VHostPath)).
+
+infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items].
+
+i(exchange_name, #binding{exchange_name = XName}) -> XName;
+i(queue_name, #binding{queue_name = QName}) -> QName;
+i(routing_key, #binding{key = RoutingKey}) -> RoutingKey;
+i(arguments, #binding{args = Arguments}) -> Arguments;
+i(Item, _) -> throw({bad_argument, Item}).
+
+info(B = #binding{}) -> infos(?INFO_KEYS, B).
+
+info(B = #binding{}, Items) -> infos(Items, B).
+
+info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end).
+
+info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end).
+
+has_for_exchange(ExchangeName) ->
+ Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
+ %% we need to check for durable routes here too in case a bunch of
+ %% routes to durable queues have been removed temporarily as a
+ %% result of a node failure
+ contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match).
+
+remove_for_exchange(ExchangeName) ->
+ [begin
+ ok = mnesia:delete_object(rabbit_reverse_route,
+ reverse_route(Route), write),
+ ok = delete_forward_routes(Route),
+ Route#route.binding
+ end || Route <- mnesia:match_object(
+ rabbit_route,
+ #route{binding = #binding{exchange_name = ExchangeName,
+ _ = '_'}},
+ write)].
+
+remove_for_queue(QueueName) ->
+ remove_for_queue(QueueName, fun delete_forward_routes/1).
+
+remove_transient_for_queue(QueueName) ->
+ remove_for_queue(QueueName, fun delete_transient_forward_routes/1).
+
+%%----------------------------------------------------------------------------
+
+binding_action(Binding = #binding{exchange_name = ExchangeName,
+ queue_name = QueueName,
+ args = Arguments}, Fun) ->
+ call_with_exchange_and_queue(
+ ExchangeName, QueueName,
+ fun (X, Q) ->
+ SortedArgs = rabbit_misc:sort_field_table(Arguments),
+ Fun(X, Q, Binding#binding{args = SortedArgs})
+ end).
+
+sync_binding(Binding, Durable, Fun) ->
+ ok = case Durable of
+ true -> Fun(rabbit_durable_route,
+ #route{binding = Binding}, write);
+ false -> ok
+ end,
+ {Route, ReverseRoute} = route_with_reverse(Binding),
+ ok = Fun(rabbit_route, Route, write),
+ ok = Fun(rabbit_reverse_route, ReverseRoute, write),
+ ok.
+
+call_with_exchange_and_queue(Exchange, Queue, Fun) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> case {mnesia:read({rabbit_exchange, Exchange}),
+ mnesia:read({rabbit_queue, Queue})} of
+ {[X], [Q]} -> Fun(X, Q);
+ {[ ], [_]} -> {error, exchange_not_found};
+ {[_], [ ]} -> {error, queue_not_found};
+ {[ ], [ ]} -> {error, exchange_and_queue_not_found}
+ end
+ end).
+
+%% Used with atoms from records; e.g., the type is expected to exist.
+type_to_module(T) ->
+ {ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
+ Module.
+
+contains(Table, MatchHead) ->
+ continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)).
+
+continue('$end_of_table') -> false;
+continue({[_|_], _}) -> true;
+continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
+
+remove_for_queue(QueueName, FwdDeleteFun) ->
+ DeletedBindings =
+ [begin
+ Route = reverse_route(ReverseRoute),
+ ok = FwdDeleteFun(Route),
+ ok = mnesia:delete_object(rabbit_reverse_route,
+ ReverseRoute, write),
+ Route#route.binding
+ end || ReverseRoute
+ <- mnesia:match_object(
+ rabbit_reverse_route,
+ reverse_route(#route{binding = #binding{
+ queue_name = QueueName,
+ _ = '_'}}),
+ write)],
+ Grouped = group_bindings_and_auto_delete(
+ lists:keysort(#binding.exchange_name, DeletedBindings), []),
+ fun () ->
+ lists:foreach(
+ fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) ->
+ Module = type_to_module(Type),
+ case IsDeleted of
+ auto_deleted -> Module:delete(X, Bs);
+ not_deleted -> Module:remove_bindings(X, Bs)
+ end
+ end, Grouped)
+ end.
+
+%% Requires that its input binding list is sorted in exchange-name
+%% order, so that the grouping of bindings (for passing to
+%% group_bindings_and_auto_delete1) works properly.
+group_bindings_and_auto_delete([], Acc) ->
+ Acc;
+group_bindings_and_auto_delete(
+ [B = #binding{exchange_name = ExchangeName} | Bs], Acc) ->
+ group_bindings_and_auto_delete(ExchangeName, Bs, [B], Acc).
+
+group_bindings_and_auto_delete(
+ ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs],
+ Bindings, Acc) ->
+ group_bindings_and_auto_delete(ExchangeName, Bs, [B | Bindings], Acc);
+group_bindings_and_auto_delete(ExchangeName, Removed, Bindings, Acc) ->
+ %% either Removed is [], or its head has a non-matching ExchangeName
+ [X] = mnesia:read({rabbit_exchange, ExchangeName}),
+ NewAcc = [{{rabbit_exchange:maybe_auto_delete(X), X}, Bindings} | Acc],
+ group_bindings_and_auto_delete(Removed, NewAcc).
+
+delete_forward_routes(Route) ->
+ ok = mnesia:delete_object(rabbit_route, Route, write),
+ ok = mnesia:delete_object(rabbit_durable_route, Route, write).
+
+delete_transient_forward_routes(Route) ->
+ ok = mnesia:delete_object(rabbit_route, Route, write).
+
+route_with_reverse(#route{binding = Binding}) ->
+ route_with_reverse(Binding);
+route_with_reverse(Binding = #binding{}) ->
+ Route = #route{binding = Binding},
+ {Route, reverse_route(Route)}.
+
+reverse_route(#route{binding = Binding}) ->
+ #reverse_route{reverse_binding = reverse_binding(Binding)};
+
+reverse_route(#reverse_route{reverse_binding = Binding}) ->
+ #route{binding = reverse_binding(Binding)}.
+
+reverse_binding(#reverse_binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args}) ->
+ #binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args};
+
+reverse_binding(#binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args}) ->
+ #reverse_binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 835d3f0da7..174eab4002 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -175,9 +175,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
blocking = dict:new(),
queue_collector_pid = CollectorPid,
stats_timer = rabbit_event:init_stats_timer()},
- rabbit_event:notify(
- channel_created,
- [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]),
+ rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -807,17 +805,17 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
routing_key = RoutingKey,
nowait = NoWait,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin,
- QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{},
- NoWait, State);
+ binding_action(fun rabbit_binding:add/2,
+ ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
+ #'queue.bind_ok'{}, NoWait, State);
handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin,
- QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{},
- false, State);
+ binding_action(fun rabbit_binding:remove/2,
+ ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
+ #'queue.unbind_ok'{}, false, State);
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
@@ -895,7 +893,10 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_read_permitted(ExchangeName, State),
- case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments,
+ case Fun(#binding{exchange_name = ExchangeName,
+ queue_name = QueueName,
+ key = ActualRoutingKey,
+ args = Arguments},
fun (_X, Q) ->
try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid)
catch exit:Reason -> {error, Reason}
@@ -1148,7 +1149,7 @@ update_measures(Type, QX, Inc, Measure) ->
orddict:store(Measure, Cur + Inc, Measures)).
internal_emit_stats(State = #ch{stats_timer = StatsTimer}) ->
- CoarseStats = [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS],
+ CoarseStats = infos(?STATISTICS_KEYS, State),
case rabbit_event:stats_level(StatsTimer) of
coarse ->
rabbit_event:notify(channel_stats, CoarseStats);
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index 69e21d73cc..b3821d3b8b 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -88,12 +88,12 @@ start_heartbeat_fun(SupPid) ->
SupPid, {heartbeat_sender,
{rabbit_heartbeat, start_heartbeat_sender,
[Parent, Sock, TimeoutSec]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
{ok, Receiver} =
supervisor2:start_child(
SupPid, {heartbeat_receiver,
{rabbit_heartbeat, start_heartbeat_receiver,
[Parent, Sock, TimeoutSec]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
{Sender, Receiver}
end.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 59e7aadf8f..c95d3aa904 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -246,14 +246,14 @@ action(list_exchanges, Node, Args, Opts, Inform) ->
[VHostArg, ArgAtoms]),
ArgAtoms);
-action(list_bindings, Node, _Args, Opts, Inform) ->
+action(list_bindings, Node, Args, Opts, Inform) ->
Inform("Listing bindings", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- InfoKeys = [exchange_name, queue_name, routing_key, args],
- display_info_list(
- [lists:zip(InfoKeys, tuple_to_list(X)) ||
- X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])],
- InfoKeys);
+ ArgAtoms = default_if_empty(Args, [exchange_name, queue_name,
+ routing_key, arguments]),
+ display_info_list(rpc_call(Node, rabbit_binding, info_all,
+ [VHostArg, ArgAtoms]),
+ ArgAtoms);
action(list_connections, Node, Args, _Opts, Inform) ->
Inform("Listing connections", []),
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index af4eb1bd79..40bee25f8b 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -34,38 +34,19 @@
-include("rabbit_framing.hrl").
-export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0,
- info/1, info/2, info_all/1, info_all/2, publish/2]).
--export([add_binding/5, delete_binding/5, list_bindings/1]).
--export([delete/2]).
--export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
--export([assert_equivalence/5]).
--export([assert_args_equivalence/2]).
--export([check_type/1]).
-
-%% EXTENDED API
--export([list_exchange_bindings/1]).
--export([list_queue_bindings/1]).
-
--import(mnesia).
--import(sets).
--import(lists).
+ info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]).
+%% this must be run inside a mnesia tx
+-export([maybe_auto_delete/1]).
+-export([assert_equivalence/5, assert_args_equivalence/2, check_type/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--export_type([name/0, type/0, binding_key/0]).
+-export_type([name/0, type/0]).
-type(name() :: rabbit_types:r('exchange')).
-type(type() :: atom()).
--type(binding_key() :: binary()).
-
--type(bind_res() :: rabbit_types:ok_or_error('queue_not_found' |
- 'exchange_not_found' |
- 'exchange_and_queue_not_found')).
--type(inner_fun() ::
- fun((rabbit_types:exchange(), queue()) ->
- rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
-spec(recover/0 :: () -> 'ok').
-spec(declare/5 ::
@@ -97,32 +78,12 @@
-> [[rabbit_types:info()]]).
-spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
-> {rabbit_router:routing_result(), [pid()]}).
--spec(add_binding/5 ::
- (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table(), inner_fun()) -> bind_res()).
--spec(delete_binding/5 ::
- (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table(), inner_fun())
- -> bind_res() | rabbit_types:error('binding_not_found')).
--spec(list_bindings/1 ::
- (rabbit_types:vhost())
- -> [{name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table()}]).
--spec(delete_queue_bindings/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> any())).
--spec(delete_transient_queue_bindings/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> any())).
-spec(delete/2 ::
(name(), boolean())-> 'ok' |
rabbit_types:error('not_found') |
rabbit_types:error('in_use')).
--spec(list_queue_bindings/1 ::
- (rabbit_amqqueue:name())
- -> [{name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table()}]).
--spec(list_exchange_bindings/1 ::
- (name()) -> [{rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table()}]).
+-spec(maybe_auto_delete/1:: (rabbit_types:exchange()) ->
+ 'not_deleted' | 'auto_deleted').
-endif.
@@ -136,19 +97,7 @@ recover() ->
ok = mnesia:write(rabbit_exchange, Exchange, write),
[Exchange | Acc]
end, [], rabbit_durable_exchange),
- Bs = rabbit_misc:table_fold(
- fun (Route = #route{binding = B}, Acc) ->
- {_, ReverseRoute} = route_with_reverse(Route),
- ok = mnesia:write(rabbit_route,
- Route, write),
- ok = mnesia:write(rabbit_reverse_route,
- ReverseRoute, write),
- [B | Acc]
- end, [], rabbit_durable_route),
- recover_with_bindings(Bs, Exs),
- ok.
-
-recover_with_bindings(Bs, Exs) ->
+ Bs = rabbit_binding:recover(),
recover_with_bindings(
lists:keysort(#binding.exchange_name, Bs),
lists:keysort(#exchange.name, Exs), []).
@@ -164,11 +113,11 @@ recover_with_bindings([], [], []) ->
ok.
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
- Exchange = #exchange{name = ExchangeName,
- type = Type,
- durable = Durable,
+ Exchange = #exchange{name = ExchangeName,
+ type = Type,
+ durable = Durable,
auto_delete = AutoDelete,
- arguments = Args},
+ arguments = Args},
%% We want to upset things if it isn't ok; this is different from
%% the other hooks invocations, where we tend to ignore the return
%% value.
@@ -192,9 +141,7 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
end
end) of
{new, X} -> TypeModule:create(X),
- rabbit_event:notify(
- exchange_created,
- [{Item, i(Item, Exchange)} || Item <- ?INFO_KEYS]),
+ rabbit_event:notify(exchange_created, info(X)),
X;
{existing, X} -> X;
Err -> Err
@@ -220,9 +167,9 @@ check_type(TypeBin) ->
end
end.
-assert_equivalence(X = #exchange{ durable = Durable,
+assert_equivalence(X = #exchange{ durable = Durable,
auto_delete = AutoDelete,
- type = Type},
+ type = Type},
Type, Durable, AutoDelete, RequiredArgs) ->
(type_to_module(Type)):assert_args_equivalence(X, RequiredArgs);
assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete,
@@ -232,8 +179,7 @@ assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete,
"cannot redeclare ~s with different type, durable or autodelete value",
[rabbit_misc:rs(Name)]).
-assert_args_equivalence(#exchange{ name = Name,
- arguments = Args },
+assert_args_equivalence(#exchange{ name = Name, arguments = Args },
RequiredArgs) ->
%% The spec says "Arguments are compared for semantic
%% equivalence". The only arg we care about is
@@ -311,92 +257,6 @@ publish(X = #exchange{type = Type}, Seen, Delivery) ->
R
end.
-%% TODO: Should all of the route and binding management not be
-%% refactored to its own module, especially seeing as unbind will have
-%% to be implemented for 0.91 ?
-
-delete_exchange_bindings(ExchangeName) ->
- [begin
- ok = mnesia:delete_object(rabbit_reverse_route,
- reverse_route(Route), write),
- ok = delete_forward_routes(Route),
- Route#route.binding
- end || Route <- mnesia:match_object(
- rabbit_route,
- #route{binding = #binding{exchange_name = ExchangeName,
- _ = '_'}},
- write)].
-
-delete_queue_bindings(QueueName) ->
- delete_queue_bindings(QueueName, fun delete_forward_routes/1).
-
-delete_transient_queue_bindings(QueueName) ->
- delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
-
-delete_queue_bindings(QueueName, FwdDeleteFun) ->
- DeletedBindings =
- [begin
- Route = reverse_route(ReverseRoute),
- ok = FwdDeleteFun(Route),
- ok = mnesia:delete_object(rabbit_reverse_route,
- ReverseRoute, write),
- Route#route.binding
- end || ReverseRoute
- <- mnesia:match_object(
- rabbit_reverse_route,
- reverse_route(#route{binding = #binding{
- queue_name = QueueName,
- _ = '_'}}),
- write)],
- Cleanup = cleanup_deleted_queue_bindings(
- lists:keysort(#binding.exchange_name, DeletedBindings), []),
- fun () ->
- lists:foreach(
- fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) ->
- Module = type_to_module(Type),
- case IsDeleted of
- auto_deleted -> Module:delete(X, Bs);
- not_deleted -> Module:remove_bindings(X, Bs)
- end
- end, Cleanup)
- end.
-
-%% Requires that its input binding list is sorted in exchange-name
-%% order, so that the grouping of bindings (for passing to
-%% cleanup_deleted_queue_bindings1) works properly.
-cleanup_deleted_queue_bindings([], Acc) ->
- Acc;
-cleanup_deleted_queue_bindings(
- [B = #binding{exchange_name = ExchangeName} | Bs], Acc) ->
- cleanup_deleted_queue_bindings(ExchangeName, Bs, [B], Acc).
-
-cleanup_deleted_queue_bindings(
- ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs],
- Bindings, Acc) ->
- cleanup_deleted_queue_bindings(ExchangeName, Bs, [B | Bindings], Acc);
-cleanup_deleted_queue_bindings(ExchangeName, Deleted, Bindings, Acc) ->
- %% either Deleted is [], or its head has a non-matching ExchangeName
- NewAcc = [cleanup_deleted_queue_bindings1(ExchangeName, Bindings) | Acc],
- cleanup_deleted_queue_bindings(Deleted, NewAcc).
-
-cleanup_deleted_queue_bindings1(ExchangeName, Bindings) ->
- [X] = mnesia:read({rabbit_exchange, ExchangeName}),
- {maybe_auto_delete(X), Bindings}.
-
-delete_forward_routes(Route) ->
- ok = mnesia:delete_object(rabbit_route, Route, write),
- ok = mnesia:delete_object(rabbit_durable_route, Route, write).
-
-delete_transient_forward_routes(Route) ->
- ok = mnesia:delete_object(rabbit_route, Route, write).
-
-contains(Table, MatchHead) ->
- continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)).
-
-continue('$end_of_table') -> false;
-continue({[_|_], _}) -> true;
-continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
-
call_with_exchange(Exchange, Fun) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> case mnesia:read({rabbit_exchange, Exchange}) of
@@ -405,156 +265,6 @@ call_with_exchange(Exchange, Fun) ->
end
end).
-call_with_exchange_and_queue(Exchange, Queue, Fun) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () -> case {mnesia:read({rabbit_exchange, Exchange}),
- mnesia:read({rabbit_queue, Queue})} of
- {[X], [Q]} -> Fun(X, Q);
- {[ ], [_]} -> {error, exchange_not_found};
- {[_], [ ]} -> {error, queue_not_found};
- {[ ], [ ]} -> {error, exchange_and_queue_not_found}
- end
- end).
-
-add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
- case binding_action(
- ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
- %% this argument is used to check queue exclusivity;
- %% in general, we want to fail on that in preference to
- %% anything else
- case InnerFun(X, Q) of
- ok ->
- case mnesia:read({rabbit_route, B}) of
- [] ->
- ok = sync_binding(B,
- X#exchange.durable andalso
- Q#amqqueue.durable,
- fun mnesia:write/3),
- rabbit_event:notify(
- binding_created,
- [{exchange_name, ExchangeName},
- {queue_name, QueueName},
- {routing_key, RoutingKey},
- {arguments, Arguments}]),
- {new, X, B};
- [_R] ->
- {existing, X, B}
- end;
- {error, _} = E ->
- E
- end
- end) of
- {new, Exchange = #exchange{ type = Type }, Binding} ->
- (type_to_module(Type)):add_binding(Exchange, Binding);
- {existing, _, _} ->
- ok;
- {error, _} = Err ->
- Err
- end.
-
-delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
- case binding_action(
- ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
- case mnesia:match_object(rabbit_route, #route{binding = B},
- write) of
- [] ->
- {error, binding_not_found};
- _ ->
- case InnerFun(X, Q) of
- ok ->
- ok =
- sync_binding(B,
- X#exchange.durable andalso
- Q#amqqueue.durable,
- fun mnesia:delete_object/3),
- rabbit_event:notify(
- binding_deleted,
- [{exchange_name, ExchangeName},
- {queue_name, QueueName}]),
- {maybe_auto_delete(X), B};
- {error, _} = E ->
- E
- end
- end
- end) of
- {error, _} = Err ->
- Err;
- {{IsDeleted, X = #exchange{ type = Type }}, B} ->
- Module = type_to_module(Type),
- case IsDeleted of
- auto_deleted -> Module:delete(X, [B]);
- not_deleted -> Module:remove_bindings(X, [B])
- end
- end.
-
-binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) ->
- call_with_exchange_and_queue(
- ExchangeName, QueueName,
- fun (X, Q) ->
- Fun(X, Q, #binding{
- exchange_name = ExchangeName,
- queue_name = QueueName,
- key = RoutingKey,
- args = rabbit_misc:sort_field_table(Arguments)})
- end).
-
-sync_binding(Binding, Durable, Fun) ->
- ok = case Durable of
- true -> Fun(rabbit_durable_route,
- #route{binding = Binding}, write);
- false -> ok
- end,
- {Route, ReverseRoute} = route_with_reverse(Binding),
- ok = Fun(rabbit_route, Route, write),
- ok = Fun(rabbit_reverse_route, ReverseRoute, write),
- ok.
-
-list_bindings(VHostPath) ->
- [{ExchangeName, QueueName, RoutingKey, Arguments} ||
- #route{binding = #binding{
- exchange_name = ExchangeName,
- key = RoutingKey,
- queue_name = QueueName,
- args = Arguments}}
- <- mnesia:dirty_match_object(
- rabbit_route,
- #route{binding = #binding{
- exchange_name = rabbit_misc:r(VHostPath, exchange),
- _ = '_'},
- _ = '_'})].
-
-route_with_reverse(#route{binding = Binding}) ->
- route_with_reverse(Binding);
-route_with_reverse(Binding = #binding{}) ->
- Route = #route{binding = Binding},
- {Route, reverse_route(Route)}.
-
-reverse_route(#route{binding = Binding}) ->
- #reverse_route{reverse_binding = reverse_binding(Binding)};
-
-reverse_route(#reverse_route{reverse_binding = Binding}) ->
- #route{binding = reverse_binding(Binding)}.
-
-reverse_binding(#reverse_binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args}) ->
- #binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args};
-
-reverse_binding(#binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args}) ->
- #reverse_binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args}.
-
delete(ExchangeName, IfUnused) ->
Fun = case IfUnused of
true -> fun conditional_delete/1;
@@ -568,54 +278,23 @@ delete(ExchangeName, IfUnused) ->
Error
end.
-maybe_auto_delete(Exchange = #exchange{auto_delete = false}) ->
- {not_deleted, Exchange};
-maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
+maybe_auto_delete(#exchange{auto_delete = false}) ->
+ not_deleted;
+maybe_auto_delete(#exchange{auto_delete = true} = Exchange) ->
case conditional_delete(Exchange) of
- {error, in_use} -> {not_deleted, Exchange};
- {deleted, Exchange, []} -> {auto_deleted, Exchange}
+ {error, in_use} -> not_deleted;
+ {deleted, Exchange, []} -> auto_deleted
end.
conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
- Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
- %% we need to check for durable routes here too in case a bunch of
- %% routes to durable queues have been removed temporarily as a
- %% result of a node failure
- case contains(rabbit_route, Match) orelse
- contains(rabbit_durable_route, Match) of
+ case rabbit_binding:has_for_exchange(ExchangeName) of
false -> unconditional_delete(Exchange);
true -> {error, in_use}
end.
unconditional_delete(Exchange = #exchange{name = ExchangeName}) ->
- Bindings = delete_exchange_bindings(ExchangeName),
+ Bindings = rabbit_binding:remove_for_exchange(ExchangeName),
ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
ok = mnesia:delete({rabbit_exchange, ExchangeName}),
rabbit_event:notify(exchange_deleted, [{name, ExchangeName}]),
{deleted, Exchange, Bindings}.
-
-%%----------------------------------------------------------------------------
-%% EXTENDED API
-%% These are API calls that are not used by the server internally,
-%% they are exported for embedded clients to use
-
-%% This is currently used in mod_rabbit.erl (XMPP) and expects this to
-%% return {QueueName, RoutingKey, Arguments} tuples
-list_exchange_bindings(ExchangeName) ->
- Route = #route{binding = #binding{exchange_name = ExchangeName,
- _ = '_'}},
- [{QueueName, RoutingKey, Arguments} ||
- #route{binding = #binding{queue_name = QueueName,
- key = RoutingKey,
- args = Arguments}}
- <- mnesia:dirty_match_object(rabbit_route, Route)].
-
-% Refactoring is left as an exercise for the reader
-list_queue_bindings(QueueName) ->
- Route = #route{binding = #binding{queue_name = QueueName,
- _ = '_'}},
- [{ExchangeName, RoutingKey, Arguments} ||
- #route{binding = #binding{exchange_name = ExchangeName,
- key = RoutingKey,
- args = Arguments}}
- <- mnesia:dirty_match_object(rabbit_route, Route)].
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 44607398cb..0a59a175cd 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -79,8 +79,8 @@ parse_x_match(Other) ->
%% Horrendous matching algorithm. Depends for its merge-like
%% (linear-time) behaviour on the lists:keysort
-%% (rabbit_misc:sort_field_table) that route/3 and
-%% rabbit_exchange:{add,delete}_binding/4 do.
+%% (rabbit_misc:sort_field_table) that publish/1 and
+%% rabbit_binding:{add,remove}/2 do.
%%
%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl
deleted file mode 100644
index e0457b1e43..0000000000
--- a/src/rabbit_load.erl
+++ /dev/null
@@ -1,78 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_load).
-
--export([local_load/0, remote_loads/0, pick/0]).
-
--define(FUDGE_FACTOR, 0.98).
--define(TIMEOUT, 100).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(load() :: {{non_neg_integer(), integer() | 'unknown'}, node()}).
--spec(local_load/0 :: () -> load()).
--spec(remote_loads/0 :: () -> [load()]).
--spec(pick/0 :: () -> node()).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-local_load() ->
- LoadAvg = case whereis(cpu_sup) of
- undefined -> unknown;
- _ -> case cpu_sup:avg1() of
- L when is_integer(L) -> L;
- {error, timeout} -> unknown
- end
- end,
- {{statistics(run_queue), LoadAvg}, node()}.
-
-remote_loads() ->
- {ResL, _BadNodes} =
- rpc:multicall(nodes(), ?MODULE, local_load, [], ?TIMEOUT),
- ResL.
-
-pick() ->
- RemoteLoads = remote_loads(),
- {{RunQ, LoadAvg}, Node} = local_load(),
- %% add bias towards current node; we rely on Erlang's term order
- %% of SomeFloat < local_unknown < unknown.
- AdjustedLoadAvg = case LoadAvg of
- unknown -> local_unknown;
- _ -> LoadAvg * ?FUDGE_FACTOR
- end,
- Loads = [{{RunQ, AdjustedLoadAvg}, Node} | RemoteLoads],
- {_, SelectedNode} = lists:min(Loads),
- SelectedNode.
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 145153c155..b9ca674066 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -75,47 +75,39 @@
%%---------------------------------------------------------------------------
+-define(IS_SSL(Sock), is_record(Sock, ssl_socket)).
-async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) ->
+async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) ->
Pid = self(),
Ref = make_ref(),
spawn(fun () -> Pid ! {inet_async, Sock, Ref,
- ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)}
- end),
+ ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)}
+ end),
{ok, Ref};
-
async_recv(Sock, Length, infinity) when is_port(Sock) ->
prim_inet:async_recv(Sock, Length, -1);
-
async_recv(Sock, Length, Timeout) when is_port(Sock) ->
prim_inet:async_recv(Sock, Length, Timeout).
-close(Sock) when is_record(Sock, ssl_socket) ->
+close(Sock) when ?IS_SSL(Sock) ->
ssl:close(Sock#ssl_socket.ssl);
-
close(Sock) when is_port(Sock) ->
gen_tcp:close(Sock).
-
-controlling_process(Sock, Pid) when is_record(Sock, ssl_socket) ->
+controlling_process(Sock, Pid) when ?IS_SSL(Sock) ->
ssl:controlling_process(Sock#ssl_socket.ssl, Pid);
-
controlling_process(Sock, Pid) when is_port(Sock) ->
gen_tcp:controlling_process(Sock, Pid).
-
-getstat(Sock, Stats) when is_record(Sock, ssl_socket) ->
+getstat(Sock, Stats) when ?IS_SSL(Sock) ->
inet:getstat(Sock#ssl_socket.tcp, Stats);
-
getstat(Sock, Stats) when is_port(Sock) ->
inet:getstat(Sock, Stats).
-
-peername(Sock) when is_record(Sock, ssl_socket) ->
+peername(Sock) when ?IS_SSL(Sock) ->
ssl:peername(Sock#ssl_socket.ssl);
-
peername(Sock) when is_port(Sock) ->
inet:peername(Sock).
@@ -128,28 +120,22 @@ peercert(Sock) when is_record(Sock, ssl_socket) ->
peercert(_) ->
nossl.
-
-port_command(Sock, Data) when is_record(Sock, ssl_socket) ->
+port_command(Sock, Data) when ?IS_SSL(Sock) ->
case ssl:send(Sock#ssl_socket.ssl, Data) of
- ok ->
- self() ! {inet_reply, Sock, ok},
- true;
- {error, Reason} ->
- erlang:error(Reason)
+ ok -> self() ! {inet_reply, Sock, ok},
+ true;
+ {error, Reason} -> erlang:error(Reason)
end;
-
port_command(Sock, Data) when is_port(Sock) ->
erlang:port_command(Sock, Data).
-send(Sock, Data) when is_record(Sock, ssl_socket) ->
+send(Sock, Data) when ?IS_SSL(Sock) ->
ssl:send(Sock#ssl_socket.ssl, Data);
-
send(Sock, Data) when is_port(Sock) ->
gen_tcp:send(Sock, Data).
-sockname(Sock) when is_record(Sock, ssl_socket) ->
+sockname(Sock) when ?IS_SSL(Sock) ->
ssl:sockname(Sock#ssl_socket.ssl);
-
sockname(Sock) when is_port(Sock) ->
inet:sockname(Sock).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 8b9dd3830e..4dfc6338d7 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -780,9 +780,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
State#v1{connection_state = running,
connection = NewConnection}),
- rabbit_event:notify(
- connection_created,
- [{Item, i(Item, State1)} || Item <- ?CREATION_EVENT_KEYS]),
+ rabbit_event:notify(connection_created,
+ infos(?CREATION_EVENT_KEYS, State1)),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
@@ -1067,5 +1066,4 @@ amqp_exception_explanation(Text, Expl) ->
end.
internal_emit_stats(State) ->
- rabbit_event:notify(connection_stats,
- [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
+ rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)).
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index ec049a1a2c..bfccb0daa5 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -33,9 +33,7 @@
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
--export([deliver/2,
- match_bindings/2,
- match_routing_key/2]).
+-export([deliver/2, match_bindings/2, match_routing_key/2]).
%%----------------------------------------------------------------------------
@@ -45,9 +43,15 @@
-type(routing_key() :: binary()).
-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
+-type(qpids() :: [pid()]).
-spec(deliver/2 ::
- ([pid()], rabbit_types:delivery()) -> {routing_result(), [pid()]}).
+ (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}).
+-spec(match_bindings/2 :: (rabbit_exchange:name(),
+ fun ((rabbit_types:binding()) -> boolean())) ->
+ qpids()).
+-spec(match_routing_key/2 :: (rabbit_exchange:name(), routing_key() | '_') ->
+ qpids()).
-endif.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index bdd3cdcd64..b541f0f70f 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1037,7 +1037,15 @@ test_server_status() ->
ok = info_action(list_exchanges, rabbit_exchange:info_keys(), true),
%% list bindings
- ok = control_action(list_bindings, []),
+ ok = info_action(list_bindings, rabbit_binding:info_keys(), true),
+ %% misc binding listing APIs
+ [_|_] = rabbit_binding:list_for_exchange(
+ rabbit_misc:r(<<"/">>, exchange, <<"">>)),
+ [_] = rabbit_binding:list_for_queue(
+ rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
+ [_] = rabbit_binding:list_for_exchange_and_queue(
+ rabbit_misc:r(<<"/">>, exchange, <<"">>),
+ rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
%% list connections
[#listener{host = H, port = P} | _] =
diff --git a/src/rabbit_tracer.erl b/src/rabbit_tracer.erl
deleted file mode 100644
index 484249b1df..0000000000
--- a/src/rabbit_tracer.erl
+++ /dev/null
@@ -1,50 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_tracer).
--export([start/0]).
-
--import(erlang).
-
-start() ->
- spawn(fun mainloop/0),
- ok.
-
-mainloop() ->
- erlang:trace(new, true, [all]),
- mainloop1().
-
-mainloop1() ->
- receive
- Msg ->
- rabbit_log:info("TRACE: ~p~n", [Msg])
- end,
- mainloop1().
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 47e8bb0161..9dfd33bd87 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -118,7 +118,8 @@
-type(binding() ::
#binding{exchange_name :: rabbit_exchange:name(),
queue_name :: rabbit_amqqueue:name(),
- key :: rabbit_exchange:binding_key()}).
+ key :: rabbit_binding:key(),
+ args :: rabbit_framing:amqp_table()}).
-type(amqqueue() ::
#amqqueue{name :: rabbit_amqqueue:name(),
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index feb214c275..aa986e542b 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -171,8 +171,8 @@ call(Pid, Msg) ->
assemble_frames(Channel, MethodRecord, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, none),
- rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord,
- Protocol).
+ rabbit_binary_generator:build_simple_method_frame(
+ Channel, MethodRecord, Protocol).
assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, Content),