diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-09-07 05:45:46 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-09-07 05:45:46 +0100 |
| commit | ad963f9ed7e3094d7fdbbaa5a7d960340b481706 (patch) | |
| tree | c8a4c49e50939e60348a05610b667f9786eab712 /src | |
| parent | a297b77d0ff5a60e90948fec85b4d4091cd5ea46 (diff) | |
| parent | 30431e239437805d7b7b3e3cc183902d813b178d (diff) | |
| download | rabbitmq-server-git-ad963f9ed7e3094d7fdbbaa5a7d960340b481706.tar.gz | |
merge default into bug22902
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_binding.erl | 371 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_connection_sup.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 367 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_headers.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_load.erl | 78 | ||||
| -rw-r--r-- | src/rabbit_net.erl | 42 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_tracer.erl | 50 | ||||
| -rw-r--r-- | src/rabbit_types.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_writer.erl | 4 |
16 files changed, 464 insertions, 546 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 6b9ac56059..7116653c2a 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -249,11 +249,12 @@ start_queue_process(Q) -> Q#amqqueue{pid = Pid}. add_default_binding(#amqqueue{name = QueueName}) -> - Exchange = rabbit_misc:r(QueueName, exchange, <<>>), + ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, - rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [], - fun (_X, _Q) -> ok end), - ok. + rabbit_binding:add(#binding{exchange_name = ExchangeName, + queue_name = QueueName, + key = RoutingKey, + args = []}). lookup(Name) -> rabbit_misc:dirty_read({rabbit_queue, Name}). @@ -433,7 +434,7 @@ internal_delete1(QueueName) -> %% we want to execute some things, as %% decided by rabbit_exchange, after the %% transaction. - rabbit_exchange:delete_queue_bindings(QueueName). + rabbit_binding:remove_for_queue(QueueName). internal_delete(QueueName) -> case @@ -478,7 +479,7 @@ on_node_down(Node) -> ok. delete_queue(QueueName) -> - Post = rabbit_exchange:delete_transient_queue_bindings(QueueName), + Post = rabbit_binding:remove_transient_for_queue(QueueName), ok = mnesia:delete({rabbit_queue, QueueName}), Post. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 90a0503b00..0849586294 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -163,10 +163,8 @@ declare(Recover, From, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQS = BQ:init(QName, IsDurable, Recover), - rabbit_event:notify( - queue_created, - [{Item, i(Item, State)} || - Item <- ?CREATION_EVENT_KEYS]), + rabbit_event:notify(queue_created, + infos(?CREATION_EVENT_KEYS, State)), noreply(init_expires(State#q{backing_queue_state = BQS})); Q1 -> {stop, normal, {existing, Q1}, State} end. @@ -587,8 +585,7 @@ i(Item, _) -> throw({bad_argument, Item}). emit_stats(State) -> - rabbit_event:notify(queue_stats, - [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). + rabbit_event:notify(queue_stats, infos(?STATISTICS_KEYS, State)). %--------------------------------------------------------------------------- diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl new file mode 100644 index 0000000000..6caf7302b8 --- /dev/null +++ b/src/rabbit_binding.erl @@ -0,0 +1,371 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_binding). +-include("rabbit.hrl"). + +-export([recover/0, add/1, remove/1, add/2, remove/2, list/1]). +-export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]). +-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). +%% these must all be run inside a mnesia tx +-export([has_for_exchange/1, remove_for_exchange/1, + remove_for_queue/1, remove_transient_for_queue/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-export_type([key/0]). + +-type(key() :: binary()). + +-type(bind_res() :: rabbit_types:ok_or_error('queue_not_found' | + 'exchange_not_found' | + 'exchange_and_queue_not_found')). +-type(inner_fun() :: + fun((rabbit_types:exchange(), queue()) -> + rabbit_types:ok_or_error(rabbit_types:amqp_error()))). +-type(bindings() :: [rabbit_types:binding()]). + +-spec(recover/0 :: () -> [rabbit_types:binding()]). +-spec(add/1 :: (rabbit_types:binding()) -> bind_res()). +-spec(remove/1 :: (rabbit_types:binding()) -> + bind_res() | rabbit_types:error('binding_not_found')). +-spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res()). +-spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> + bind_res() | rabbit_types:error('binding_not_found')). +-spec(list/1 :: (rabbit_types:vhost()) -> bindings()). +-spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). +-spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()). +-spec(list_for_exchange_and_queue/2 :: + (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()). +-spec(info_keys/0 :: () -> [rabbit_types:info_key()]). +-spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]). +-spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) -> + [rabbit_types:info()]). +-spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]). +-spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()]) + -> [[rabbit_types:info()]]). +-spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()). +-spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). +-spec(remove_for_queue/1 :: + (rabbit_amqqueue:name()) -> fun (() -> any())). +-spec(remove_transient_for_queue/1 :: + (rabbit_amqqueue:name()) -> fun (() -> any())). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]). + +recover() -> + rabbit_misc:table_fold( + fun (Route = #route{binding = B}, Acc) -> + {_, ReverseRoute} = route_with_reverse(Route), + ok = mnesia:write(rabbit_route, Route, write), + ok = mnesia:write(rabbit_reverse_route, ReverseRoute, write), + [B | Acc] + end, [], rabbit_durable_route). + +add(Binding) -> add(Binding, fun (_X, _Q) -> ok end). + +remove(Binding) -> remove(Binding, fun (_X, _Q) -> ok end). + +add(Binding, InnerFun) -> + case binding_action( + Binding, + fun (X, Q, B) -> + %% this argument is used to check queue exclusivity; + %% in general, we want to fail on that in preference to + %% anything else + case InnerFun(X, Q) of + ok -> + case mnesia:read({rabbit_route, B}) of + [] -> Durable = (X#exchange.durable andalso + Q#amqqueue.durable), + ok = sync_binding( + B, Durable, + fun mnesia:write/3), + {new, X, B}; + [_] -> {existing, X, B} + end; + {error, _} = E -> + E + end + end) of + {new, Exchange = #exchange{ type = Type }, B} -> + ok = (type_to_module(Type)):add_binding(Exchange, B), + rabbit_event:notify(binding_created, info(B)); + {existing, _, _} -> + ok; + {error, _} = Err -> + Err + end. + +remove(Binding, InnerFun) -> + case binding_action( + Binding, + fun (X, Q, B) -> + case mnesia:match_object(rabbit_route, #route{binding = B}, + write) of + [] -> {error, binding_not_found}; + [_] -> case InnerFun(X, Q) of + ok -> + Durable = (X#exchange.durable andalso + Q#amqqueue.durable), + ok = sync_binding( + B, Durable, + fun mnesia:delete_object/3), + Deleted = + rabbit_exchange:maybe_auto_delete(X), + {{Deleted, X}, B}; + {error, _} = E -> + E + end + end + end) of + {error, _} = Err -> + Err; + {{IsDeleted, X = #exchange{ type = Type }}, B} -> + Module = type_to_module(Type), + case IsDeleted of + auto_deleted -> ok = Module:delete(X, [B]); + not_deleted -> ok = Module:remove_bindings(X, [B]) + end, + rabbit_event:notify(binding_deleted, info(B)), + ok + end. + +list(VHostPath) -> + Route = #route{binding = #binding{ + exchange_name = rabbit_misc:r(VHostPath, exchange), + queue_name = rabbit_misc:r(VHostPath, queue), + _ = '_'}, + _ = '_'}, + [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)]. + +list_for_exchange(ExchangeName) -> + Route = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, + [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)]. + +list_for_queue(QueueName) -> + Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}}, + [reverse_binding(B) || #reverse_route{reverse_binding = B} <- + mnesia:dirty_match_object(rabbit_reverse_route, + reverse_route(Route))]. + +list_for_exchange_and_queue(ExchangeName, QueueName) -> + Route = #route{binding = #binding{exchange_name = ExchangeName, + queue_name = QueueName, + _ = '_'}}, + [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)]. + +info_keys() -> ?INFO_KEYS. + +map(VHostPath, F) -> + %% TODO: there is scope for optimisation here, e.g. using a + %% cursor, parallelising the function invocation + lists:map(F, list(VHostPath)). + +infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items]. + +i(exchange_name, #binding{exchange_name = XName}) -> XName; +i(queue_name, #binding{queue_name = QName}) -> QName; +i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; +i(arguments, #binding{args = Arguments}) -> Arguments; +i(Item, _) -> throw({bad_argument, Item}). + +info(B = #binding{}) -> infos(?INFO_KEYS, B). + +info(B = #binding{}, Items) -> infos(Items, B). + +info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end). + +info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end). + +has_for_exchange(ExchangeName) -> + Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, + %% we need to check for durable routes here too in case a bunch of + %% routes to durable queues have been removed temporarily as a + %% result of a node failure + contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match). + +remove_for_exchange(ExchangeName) -> + [begin + ok = mnesia:delete_object(rabbit_reverse_route, + reverse_route(Route), write), + ok = delete_forward_routes(Route), + Route#route.binding + end || Route <- mnesia:match_object( + rabbit_route, + #route{binding = #binding{exchange_name = ExchangeName, + _ = '_'}}, + write)]. + +remove_for_queue(QueueName) -> + remove_for_queue(QueueName, fun delete_forward_routes/1). + +remove_transient_for_queue(QueueName) -> + remove_for_queue(QueueName, fun delete_transient_forward_routes/1). + +%%---------------------------------------------------------------------------- + +binding_action(Binding = #binding{exchange_name = ExchangeName, + queue_name = QueueName, + args = Arguments}, Fun) -> + call_with_exchange_and_queue( + ExchangeName, QueueName, + fun (X, Q) -> + SortedArgs = rabbit_misc:sort_field_table(Arguments), + Fun(X, Q, Binding#binding{args = SortedArgs}) + end). + +sync_binding(Binding, Durable, Fun) -> + ok = case Durable of + true -> Fun(rabbit_durable_route, + #route{binding = Binding}, write); + false -> ok + end, + {Route, ReverseRoute} = route_with_reverse(Binding), + ok = Fun(rabbit_route, Route, write), + ok = Fun(rabbit_reverse_route, ReverseRoute, write), + ok. + +call_with_exchange_and_queue(Exchange, Queue, Fun) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> case {mnesia:read({rabbit_exchange, Exchange}), + mnesia:read({rabbit_queue, Queue})} of + {[X], [Q]} -> Fun(X, Q); + {[ ], [_]} -> {error, exchange_not_found}; + {[_], [ ]} -> {error, queue_not_found}; + {[ ], [ ]} -> {error, exchange_and_queue_not_found} + end + end). + +%% Used with atoms from records; e.g., the type is expected to exist. +type_to_module(T) -> + {ok, Module} = rabbit_exchange_type_registry:lookup_module(T), + Module. + +contains(Table, MatchHead) -> + continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)). + +continue('$end_of_table') -> false; +continue({[_|_], _}) -> true; +continue({[], Continuation}) -> continue(mnesia:select(Continuation)). + +remove_for_queue(QueueName, FwdDeleteFun) -> + DeletedBindings = + [begin + Route = reverse_route(ReverseRoute), + ok = FwdDeleteFun(Route), + ok = mnesia:delete_object(rabbit_reverse_route, + ReverseRoute, write), + Route#route.binding + end || ReverseRoute + <- mnesia:match_object( + rabbit_reverse_route, + reverse_route(#route{binding = #binding{ + queue_name = QueueName, + _ = '_'}}), + write)], + Grouped = group_bindings_and_auto_delete( + lists:keysort(#binding.exchange_name, DeletedBindings), []), + fun () -> + lists:foreach( + fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) -> + Module = type_to_module(Type), + case IsDeleted of + auto_deleted -> Module:delete(X, Bs); + not_deleted -> Module:remove_bindings(X, Bs) + end + end, Grouped) + end. + +%% Requires that its input binding list is sorted in exchange-name +%% order, so that the grouping of bindings (for passing to +%% group_bindings_and_auto_delete1) works properly. +group_bindings_and_auto_delete([], Acc) -> + Acc; +group_bindings_and_auto_delete( + [B = #binding{exchange_name = ExchangeName} | Bs], Acc) -> + group_bindings_and_auto_delete(ExchangeName, Bs, [B], Acc). + +group_bindings_and_auto_delete( + ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs], + Bindings, Acc) -> + group_bindings_and_auto_delete(ExchangeName, Bs, [B | Bindings], Acc); +group_bindings_and_auto_delete(ExchangeName, Removed, Bindings, Acc) -> + %% either Removed is [], or its head has a non-matching ExchangeName + [X] = mnesia:read({rabbit_exchange, ExchangeName}), + NewAcc = [{{rabbit_exchange:maybe_auto_delete(X), X}, Bindings} | Acc], + group_bindings_and_auto_delete(Removed, NewAcc). + +delete_forward_routes(Route) -> + ok = mnesia:delete_object(rabbit_route, Route, write), + ok = mnesia:delete_object(rabbit_durable_route, Route, write). + +delete_transient_forward_routes(Route) -> + ok = mnesia:delete_object(rabbit_route, Route, write). + +route_with_reverse(#route{binding = Binding}) -> + route_with_reverse(Binding); +route_with_reverse(Binding = #binding{}) -> + Route = #route{binding = Binding}, + {Route, reverse_route(Route)}. + +reverse_route(#route{binding = Binding}) -> + #reverse_route{reverse_binding = reverse_binding(Binding)}; + +reverse_route(#reverse_route{reverse_binding = Binding}) -> + #route{binding = reverse_binding(Binding)}. + +reverse_binding(#reverse_binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}) -> + #binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}; + +reverse_binding(#binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}) -> + #reverse_binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 835d3f0da7..174eab4002 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -175,9 +175,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, blocking = dict:new(), queue_collector_pid = CollectorPid, stats_timer = rabbit_event:init_stats_timer()}, - rabbit_event:notify( - channel_created, - [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]), + rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -807,17 +805,17 @@ handle_method(#'queue.bind'{queue = QueueNameBin, routing_key = RoutingKey, nowait = NoWait, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin, - QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, - NoWait, State); + binding_action(fun rabbit_binding:add/2, + ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + #'queue.bind_ok'{}, NoWait, State); handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin, - QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, - false, State); + binding_action(fun rabbit_binding:remove/2, + ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + #'queue.unbind_ok'{}, false, State); handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, @@ -895,7 +893,10 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), - case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, + case Fun(#binding{exchange_name = ExchangeName, + queue_name = QueueName, + key = ActualRoutingKey, + args = Arguments}, fun (_X, Q) -> try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) catch exit:Reason -> {error, Reason} @@ -1148,7 +1149,7 @@ update_measures(Type, QX, Inc, Measure) -> orddict:store(Measure, Cur + Inc, Measures)). internal_emit_stats(State = #ch{stats_timer = StatsTimer}) -> - CoarseStats = [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS], + CoarseStats = infos(?STATISTICS_KEYS, State), case rabbit_event:stats_level(StatsTimer) of coarse -> rabbit_event:notify(channel_stats, CoarseStats); diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 69e21d73cc..b3821d3b8b 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -88,12 +88,12 @@ start_heartbeat_fun(SupPid) -> SupPid, {heartbeat_sender, {rabbit_heartbeat, start_heartbeat_sender, [Parent, Sock, TimeoutSec]}, - intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), {ok, Receiver} = supervisor2:start_child( SupPid, {heartbeat_receiver, {rabbit_heartbeat, start_heartbeat_receiver, [Parent, Sock, TimeoutSec]}, - intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), {Sender, Receiver} end. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 59e7aadf8f..c95d3aa904 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -246,14 +246,14 @@ action(list_exchanges, Node, Args, Opts, Inform) -> [VHostArg, ArgAtoms]), ArgAtoms); -action(list_bindings, Node, _Args, Opts, Inform) -> +action(list_bindings, Node, Args, Opts, Inform) -> Inform("Listing bindings", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - InfoKeys = [exchange_name, queue_name, routing_key, args], - display_info_list( - [lists:zip(InfoKeys, tuple_to_list(X)) || - X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])], - InfoKeys); + ArgAtoms = default_if_empty(Args, [exchange_name, queue_name, + routing_key, arguments]), + display_info_list(rpc_call(Node, rabbit_binding, info_all, + [VHostArg, ArgAtoms]), + ArgAtoms); action(list_connections, Node, Args, _Opts, Inform) -> Inform("Listing connections", []), diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index af4eb1bd79..40bee25f8b 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -34,38 +34,19 @@ -include("rabbit_framing.hrl"). -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0, - info/1, info/2, info_all/1, info_all/2, publish/2]). --export([add_binding/5, delete_binding/5, list_bindings/1]). --export([delete/2]). --export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). --export([assert_equivalence/5]). --export([assert_args_equivalence/2]). --export([check_type/1]). - -%% EXTENDED API --export([list_exchange_bindings/1]). --export([list_queue_bindings/1]). - --import(mnesia). --import(sets). --import(lists). + info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]). +%% this must be run inside a mnesia tx +-export([maybe_auto_delete/1]). +-export([assert_equivalence/5, assert_args_equivalence/2, check_type/1]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --export_type([name/0, type/0, binding_key/0]). +-export_type([name/0, type/0]). -type(name() :: rabbit_types:r('exchange')). -type(type() :: atom()). --type(binding_key() :: binary()). - --type(bind_res() :: rabbit_types:ok_or_error('queue_not_found' | - 'exchange_not_found' | - 'exchange_and_queue_not_found')). --type(inner_fun() :: - fun((rabbit_types:exchange(), queue()) -> - rabbit_types:ok_or_error(rabbit_types:amqp_error()))). -spec(recover/0 :: () -> 'ok'). -spec(declare/5 :: @@ -97,32 +78,12 @@ -> [[rabbit_types:info()]]). -spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) -> {rabbit_router:routing_result(), [pid()]}). --spec(add_binding/5 :: - (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(), - rabbit_framing:amqp_table(), inner_fun()) -> bind_res()). --spec(delete_binding/5 :: - (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(), - rabbit_framing:amqp_table(), inner_fun()) - -> bind_res() | rabbit_types:error('binding_not_found')). --spec(list_bindings/1 :: - (rabbit_types:vhost()) - -> [{name(), rabbit_amqqueue:name(), rabbit_router:routing_key(), - rabbit_framing:amqp_table()}]). --spec(delete_queue_bindings/1 :: - (rabbit_amqqueue:name()) -> fun (() -> any())). --spec(delete_transient_queue_bindings/1 :: - (rabbit_amqqueue:name()) -> fun (() -> any())). -spec(delete/2 :: (name(), boolean())-> 'ok' | rabbit_types:error('not_found') | rabbit_types:error('in_use')). --spec(list_queue_bindings/1 :: - (rabbit_amqqueue:name()) - -> [{name(), rabbit_router:routing_key(), - rabbit_framing:amqp_table()}]). --spec(list_exchange_bindings/1 :: - (name()) -> [{rabbit_amqqueue:name(), rabbit_router:routing_key(), - rabbit_framing:amqp_table()}]). +-spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> + 'not_deleted' | 'auto_deleted'). -endif. @@ -136,19 +97,7 @@ recover() -> ok = mnesia:write(rabbit_exchange, Exchange, write), [Exchange | Acc] end, [], rabbit_durable_exchange), - Bs = rabbit_misc:table_fold( - fun (Route = #route{binding = B}, Acc) -> - {_, ReverseRoute} = route_with_reverse(Route), - ok = mnesia:write(rabbit_route, - Route, write), - ok = mnesia:write(rabbit_reverse_route, - ReverseRoute, write), - [B | Acc] - end, [], rabbit_durable_route), - recover_with_bindings(Bs, Exs), - ok. - -recover_with_bindings(Bs, Exs) -> + Bs = rabbit_binding:recover(), recover_with_bindings( lists:keysort(#binding.exchange_name, Bs), lists:keysort(#exchange.name, Exs), []). @@ -164,11 +113,11 @@ recover_with_bindings([], [], []) -> ok. declare(ExchangeName, Type, Durable, AutoDelete, Args) -> - Exchange = #exchange{name = ExchangeName, - type = Type, - durable = Durable, + Exchange = #exchange{name = ExchangeName, + type = Type, + durable = Durable, auto_delete = AutoDelete, - arguments = Args}, + arguments = Args}, %% We want to upset things if it isn't ok; this is different from %% the other hooks invocations, where we tend to ignore the return %% value. @@ -192,9 +141,7 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> end end) of {new, X} -> TypeModule:create(X), - rabbit_event:notify( - exchange_created, - [{Item, i(Item, Exchange)} || Item <- ?INFO_KEYS]), + rabbit_event:notify(exchange_created, info(X)), X; {existing, X} -> X; Err -> Err @@ -220,9 +167,9 @@ check_type(TypeBin) -> end end. -assert_equivalence(X = #exchange{ durable = Durable, +assert_equivalence(X = #exchange{ durable = Durable, auto_delete = AutoDelete, - type = Type}, + type = Type}, Type, Durable, AutoDelete, RequiredArgs) -> (type_to_module(Type)):assert_args_equivalence(X, RequiredArgs); assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete, @@ -232,8 +179,7 @@ assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete, "cannot redeclare ~s with different type, durable or autodelete value", [rabbit_misc:rs(Name)]). -assert_args_equivalence(#exchange{ name = Name, - arguments = Args }, +assert_args_equivalence(#exchange{ name = Name, arguments = Args }, RequiredArgs) -> %% The spec says "Arguments are compared for semantic %% equivalence". The only arg we care about is @@ -311,92 +257,6 @@ publish(X = #exchange{type = Type}, Seen, Delivery) -> R end. -%% TODO: Should all of the route and binding management not be -%% refactored to its own module, especially seeing as unbind will have -%% to be implemented for 0.91 ? - -delete_exchange_bindings(ExchangeName) -> - [begin - ok = mnesia:delete_object(rabbit_reverse_route, - reverse_route(Route), write), - ok = delete_forward_routes(Route), - Route#route.binding - end || Route <- mnesia:match_object( - rabbit_route, - #route{binding = #binding{exchange_name = ExchangeName, - _ = '_'}}, - write)]. - -delete_queue_bindings(QueueName) -> - delete_queue_bindings(QueueName, fun delete_forward_routes/1). - -delete_transient_queue_bindings(QueueName) -> - delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1). - -delete_queue_bindings(QueueName, FwdDeleteFun) -> - DeletedBindings = - [begin - Route = reverse_route(ReverseRoute), - ok = FwdDeleteFun(Route), - ok = mnesia:delete_object(rabbit_reverse_route, - ReverseRoute, write), - Route#route.binding - end || ReverseRoute - <- mnesia:match_object( - rabbit_reverse_route, - reverse_route(#route{binding = #binding{ - queue_name = QueueName, - _ = '_'}}), - write)], - Cleanup = cleanup_deleted_queue_bindings( - lists:keysort(#binding.exchange_name, DeletedBindings), []), - fun () -> - lists:foreach( - fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) -> - Module = type_to_module(Type), - case IsDeleted of - auto_deleted -> Module:delete(X, Bs); - not_deleted -> Module:remove_bindings(X, Bs) - end - end, Cleanup) - end. - -%% Requires that its input binding list is sorted in exchange-name -%% order, so that the grouping of bindings (for passing to -%% cleanup_deleted_queue_bindings1) works properly. -cleanup_deleted_queue_bindings([], Acc) -> - Acc; -cleanup_deleted_queue_bindings( - [B = #binding{exchange_name = ExchangeName} | Bs], Acc) -> - cleanup_deleted_queue_bindings(ExchangeName, Bs, [B], Acc). - -cleanup_deleted_queue_bindings( - ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs], - Bindings, Acc) -> - cleanup_deleted_queue_bindings(ExchangeName, Bs, [B | Bindings], Acc); -cleanup_deleted_queue_bindings(ExchangeName, Deleted, Bindings, Acc) -> - %% either Deleted is [], or its head has a non-matching ExchangeName - NewAcc = [cleanup_deleted_queue_bindings1(ExchangeName, Bindings) | Acc], - cleanup_deleted_queue_bindings(Deleted, NewAcc). - -cleanup_deleted_queue_bindings1(ExchangeName, Bindings) -> - [X] = mnesia:read({rabbit_exchange, ExchangeName}), - {maybe_auto_delete(X), Bindings}. - -delete_forward_routes(Route) -> - ok = mnesia:delete_object(rabbit_route, Route, write), - ok = mnesia:delete_object(rabbit_durable_route, Route, write). - -delete_transient_forward_routes(Route) -> - ok = mnesia:delete_object(rabbit_route, Route, write). - -contains(Table, MatchHead) -> - continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)). - -continue('$end_of_table') -> false; -continue({[_|_], _}) -> true; -continue({[], Continuation}) -> continue(mnesia:select(Continuation)). - call_with_exchange(Exchange, Fun) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({rabbit_exchange, Exchange}) of @@ -405,156 +265,6 @@ call_with_exchange(Exchange, Fun) -> end end). -call_with_exchange_and_queue(Exchange, Queue, Fun) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> case {mnesia:read({rabbit_exchange, Exchange}), - mnesia:read({rabbit_queue, Queue})} of - {[X], [Q]} -> Fun(X, Q); - {[ ], [_]} -> {error, exchange_not_found}; - {[_], [ ]} -> {error, queue_not_found}; - {[ ], [ ]} -> {error, exchange_and_queue_not_found} - end - end). - -add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> - case binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, - fun (X, Q, B) -> - %% this argument is used to check queue exclusivity; - %% in general, we want to fail on that in preference to - %% anything else - case InnerFun(X, Q) of - ok -> - case mnesia:read({rabbit_route, B}) of - [] -> - ok = sync_binding(B, - X#exchange.durable andalso - Q#amqqueue.durable, - fun mnesia:write/3), - rabbit_event:notify( - binding_created, - [{exchange_name, ExchangeName}, - {queue_name, QueueName}, - {routing_key, RoutingKey}, - {arguments, Arguments}]), - {new, X, B}; - [_R] -> - {existing, X, B} - end; - {error, _} = E -> - E - end - end) of - {new, Exchange = #exchange{ type = Type }, Binding} -> - (type_to_module(Type)):add_binding(Exchange, Binding); - {existing, _, _} -> - ok; - {error, _} = Err -> - Err - end. - -delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> - case binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, - fun (X, Q, B) -> - case mnesia:match_object(rabbit_route, #route{binding = B}, - write) of - [] -> - {error, binding_not_found}; - _ -> - case InnerFun(X, Q) of - ok -> - ok = - sync_binding(B, - X#exchange.durable andalso - Q#amqqueue.durable, - fun mnesia:delete_object/3), - rabbit_event:notify( - binding_deleted, - [{exchange_name, ExchangeName}, - {queue_name, QueueName}]), - {maybe_auto_delete(X), B}; - {error, _} = E -> - E - end - end - end) of - {error, _} = Err -> - Err; - {{IsDeleted, X = #exchange{ type = Type }}, B} -> - Module = type_to_module(Type), - case IsDeleted of - auto_deleted -> Module:delete(X, [B]); - not_deleted -> Module:remove_bindings(X, [B]) - end - end. - -binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) -> - call_with_exchange_and_queue( - ExchangeName, QueueName, - fun (X, Q) -> - Fun(X, Q, #binding{ - exchange_name = ExchangeName, - queue_name = QueueName, - key = RoutingKey, - args = rabbit_misc:sort_field_table(Arguments)}) - end). - -sync_binding(Binding, Durable, Fun) -> - ok = case Durable of - true -> Fun(rabbit_durable_route, - #route{binding = Binding}, write); - false -> ok - end, - {Route, ReverseRoute} = route_with_reverse(Binding), - ok = Fun(rabbit_route, Route, write), - ok = Fun(rabbit_reverse_route, ReverseRoute, write), - ok. - -list_bindings(VHostPath) -> - [{ExchangeName, QueueName, RoutingKey, Arguments} || - #route{binding = #binding{ - exchange_name = ExchangeName, - key = RoutingKey, - queue_name = QueueName, - args = Arguments}} - <- mnesia:dirty_match_object( - rabbit_route, - #route{binding = #binding{ - exchange_name = rabbit_misc:r(VHostPath, exchange), - _ = '_'}, - _ = '_'})]. - -route_with_reverse(#route{binding = Binding}) -> - route_with_reverse(Binding); -route_with_reverse(Binding = #binding{}) -> - Route = #route{binding = Binding}, - {Route, reverse_route(Route)}. - -reverse_route(#route{binding = Binding}) -> - #reverse_route{reverse_binding = reverse_binding(Binding)}; - -reverse_route(#reverse_route{reverse_binding = Binding}) -> - #route{binding = reverse_binding(Binding)}. - -reverse_binding(#reverse_binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}) -> - #binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}; - -reverse_binding(#binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}) -> - #reverse_binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}. - delete(ExchangeName, IfUnused) -> Fun = case IfUnused of true -> fun conditional_delete/1; @@ -568,54 +278,23 @@ delete(ExchangeName, IfUnused) -> Error end. -maybe_auto_delete(Exchange = #exchange{auto_delete = false}) -> - {not_deleted, Exchange}; -maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> +maybe_auto_delete(#exchange{auto_delete = false}) -> + not_deleted; +maybe_auto_delete(#exchange{auto_delete = true} = Exchange) -> case conditional_delete(Exchange) of - {error, in_use} -> {not_deleted, Exchange}; - {deleted, Exchange, []} -> {auto_deleted, Exchange} + {error, in_use} -> not_deleted; + {deleted, Exchange, []} -> auto_deleted end. conditional_delete(Exchange = #exchange{name = ExchangeName}) -> - Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, - %% we need to check for durable routes here too in case a bunch of - %% routes to durable queues have been removed temporarily as a - %% result of a node failure - case contains(rabbit_route, Match) orelse - contains(rabbit_durable_route, Match) of + case rabbit_binding:has_for_exchange(ExchangeName) of false -> unconditional_delete(Exchange); true -> {error, in_use} end. unconditional_delete(Exchange = #exchange{name = ExchangeName}) -> - Bindings = delete_exchange_bindings(ExchangeName), + Bindings = rabbit_binding:remove_for_exchange(ExchangeName), ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), ok = mnesia:delete({rabbit_exchange, ExchangeName}), rabbit_event:notify(exchange_deleted, [{name, ExchangeName}]), {deleted, Exchange, Bindings}. - -%%---------------------------------------------------------------------------- -%% EXTENDED API -%% These are API calls that are not used by the server internally, -%% they are exported for embedded clients to use - -%% This is currently used in mod_rabbit.erl (XMPP) and expects this to -%% return {QueueName, RoutingKey, Arguments} tuples -list_exchange_bindings(ExchangeName) -> - Route = #route{binding = #binding{exchange_name = ExchangeName, - _ = '_'}}, - [{QueueName, RoutingKey, Arguments} || - #route{binding = #binding{queue_name = QueueName, - key = RoutingKey, - args = Arguments}} - <- mnesia:dirty_match_object(rabbit_route, Route)]. - -% Refactoring is left as an exercise for the reader -list_queue_bindings(QueueName) -> - Route = #route{binding = #binding{queue_name = QueueName, - _ = '_'}}, - [{ExchangeName, RoutingKey, Arguments} || - #route{binding = #binding{exchange_name = ExchangeName, - key = RoutingKey, - args = Arguments}} - <- mnesia:dirty_match_object(rabbit_route, Route)]. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 44607398cb..0a59a175cd 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -79,8 +79,8 @@ parse_x_match(Other) -> %% Horrendous matching algorithm. Depends for its merge-like %% (linear-time) behaviour on the lists:keysort -%% (rabbit_misc:sort_field_table) that route/3 and -%% rabbit_exchange:{add,delete}_binding/4 do. +%% (rabbit_misc:sort_field_table) that publish/1 and +%% rabbit_binding:{add,remove}/2 do. %% %% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! %% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl deleted file mode 100644 index e0457b1e43..0000000000 --- a/src/rabbit_load.erl +++ /dev/null @@ -1,78 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_load). - --export([local_load/0, remote_loads/0, pick/0]). - --define(FUDGE_FACTOR, 0.98). --define(TIMEOUT, 100). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(load() :: {{non_neg_integer(), integer() | 'unknown'}, node()}). --spec(local_load/0 :: () -> load()). --spec(remote_loads/0 :: () -> [load()]). --spec(pick/0 :: () -> node()). - --endif. - -%%---------------------------------------------------------------------------- - -local_load() -> - LoadAvg = case whereis(cpu_sup) of - undefined -> unknown; - _ -> case cpu_sup:avg1() of - L when is_integer(L) -> L; - {error, timeout} -> unknown - end - end, - {{statistics(run_queue), LoadAvg}, node()}. - -remote_loads() -> - {ResL, _BadNodes} = - rpc:multicall(nodes(), ?MODULE, local_load, [], ?TIMEOUT), - ResL. - -pick() -> - RemoteLoads = remote_loads(), - {{RunQ, LoadAvg}, Node} = local_load(), - %% add bias towards current node; we rely on Erlang's term order - %% of SomeFloat < local_unknown < unknown. - AdjustedLoadAvg = case LoadAvg of - unknown -> local_unknown; - _ -> LoadAvg * ?FUDGE_FACTOR - end, - Loads = [{{RunQ, AdjustedLoadAvg}, Node} | RemoteLoads], - {_, SelectedNode} = lists:min(Loads), - SelectedNode. diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 145153c155..b9ca674066 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -75,47 +75,39 @@ %%--------------------------------------------------------------------------- +-define(IS_SSL(Sock), is_record(Sock, ssl_socket)). -async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) -> +async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) -> Pid = self(), Ref = make_ref(), spawn(fun () -> Pid ! {inet_async, Sock, Ref, - ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)} - end), + ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)} + end), {ok, Ref}; - async_recv(Sock, Length, infinity) when is_port(Sock) -> prim_inet:async_recv(Sock, Length, -1); - async_recv(Sock, Length, Timeout) when is_port(Sock) -> prim_inet:async_recv(Sock, Length, Timeout). -close(Sock) when is_record(Sock, ssl_socket) -> +close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock#ssl_socket.ssl); - close(Sock) when is_port(Sock) -> gen_tcp:close(Sock). - -controlling_process(Sock, Pid) when is_record(Sock, ssl_socket) -> +controlling_process(Sock, Pid) when ?IS_SSL(Sock) -> ssl:controlling_process(Sock#ssl_socket.ssl, Pid); - controlling_process(Sock, Pid) when is_port(Sock) -> gen_tcp:controlling_process(Sock, Pid). - -getstat(Sock, Stats) when is_record(Sock, ssl_socket) -> +getstat(Sock, Stats) when ?IS_SSL(Sock) -> inet:getstat(Sock#ssl_socket.tcp, Stats); - getstat(Sock, Stats) when is_port(Sock) -> inet:getstat(Sock, Stats). - -peername(Sock) when is_record(Sock, ssl_socket) -> +peername(Sock) when ?IS_SSL(Sock) -> ssl:peername(Sock#ssl_socket.ssl); - peername(Sock) when is_port(Sock) -> inet:peername(Sock). @@ -128,28 +120,22 @@ peercert(Sock) when is_record(Sock, ssl_socket) -> peercert(_) -> nossl. - -port_command(Sock, Data) when is_record(Sock, ssl_socket) -> +port_command(Sock, Data) when ?IS_SSL(Sock) -> case ssl:send(Sock#ssl_socket.ssl, Data) of - ok -> - self() ! {inet_reply, Sock, ok}, - true; - {error, Reason} -> - erlang:error(Reason) + ok -> self() ! {inet_reply, Sock, ok}, + true; + {error, Reason} -> erlang:error(Reason) end; - port_command(Sock, Data) when is_port(Sock) -> erlang:port_command(Sock, Data). -send(Sock, Data) when is_record(Sock, ssl_socket) -> +send(Sock, Data) when ?IS_SSL(Sock) -> ssl:send(Sock#ssl_socket.ssl, Data); - send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data). -sockname(Sock) when is_record(Sock, ssl_socket) -> +sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl); - sockname(Sock) when is_port(Sock) -> inet:sockname(Sock). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 8b9dd3830e..4dfc6338d7 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -780,9 +780,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), State#v1{connection_state = running, connection = NewConnection}), - rabbit_event:notify( - connection_created, - [{Item, i(Item, State1)} || Item <- ?CREATION_EVENT_KEYS]), + rabbit_event:notify(connection_created, + infos(?CREATION_EVENT_KEYS, State1)), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), @@ -1067,5 +1066,4 @@ amqp_exception_explanation(Text, Expl) -> end. internal_emit_stats(State) -> - rabbit_event:notify(connection_stats, - [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). + rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)). diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index ec049a1a2c..bfccb0daa5 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -33,9 +33,7 @@ -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). --export([deliver/2, - match_bindings/2, - match_routing_key/2]). +-export([deliver/2, match_bindings/2, match_routing_key/2]). %%---------------------------------------------------------------------------- @@ -45,9 +43,15 @@ -type(routing_key() :: binary()). -type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). +-type(qpids() :: [pid()]). -spec(deliver/2 :: - ([pid()], rabbit_types:delivery()) -> {routing_result(), [pid()]}). + (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}). +-spec(match_bindings/2 :: (rabbit_exchange:name(), + fun ((rabbit_types:binding()) -> boolean())) -> + qpids()). +-spec(match_routing_key/2 :: (rabbit_exchange:name(), routing_key() | '_') -> + qpids()). -endif. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index bdd3cdcd64..b541f0f70f 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1037,7 +1037,15 @@ test_server_status() -> ok = info_action(list_exchanges, rabbit_exchange:info_keys(), true), %% list bindings - ok = control_action(list_bindings, []), + ok = info_action(list_bindings, rabbit_binding:info_keys(), true), + %% misc binding listing APIs + [_|_] = rabbit_binding:list_for_exchange( + rabbit_misc:r(<<"/">>, exchange, <<"">>)), + [_] = rabbit_binding:list_for_queue( + rabbit_misc:r(<<"/">>, queue, <<"foo">>)), + [_] = rabbit_binding:list_for_exchange_and_queue( + rabbit_misc:r(<<"/">>, exchange, <<"">>), + rabbit_misc:r(<<"/">>, queue, <<"foo">>)), %% list connections [#listener{host = H, port = P} | _] = diff --git a/src/rabbit_tracer.erl b/src/rabbit_tracer.erl deleted file mode 100644 index 484249b1df..0000000000 --- a/src/rabbit_tracer.erl +++ /dev/null @@ -1,50 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_tracer). --export([start/0]). - --import(erlang). - -start() -> - spawn(fun mainloop/0), - ok. - -mainloop() -> - erlang:trace(new, true, [all]), - mainloop1(). - -mainloop1() -> - receive - Msg -> - rabbit_log:info("TRACE: ~p~n", [Msg]) - end, - mainloop1(). diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 47e8bb0161..9dfd33bd87 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -118,7 +118,8 @@ -type(binding() :: #binding{exchange_name :: rabbit_exchange:name(), queue_name :: rabbit_amqqueue:name(), - key :: rabbit_exchange:binding_key()}). + key :: rabbit_binding:key(), + args :: rabbit_framing:amqp_table()}). -type(amqqueue() :: #amqqueue{name :: rabbit_amqqueue:name(), diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index feb214c275..aa986e542b 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -171,8 +171,8 @@ call(Pid, Msg) -> assemble_frames(Channel, MethodRecord, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, none), - rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord, - Protocol). + rabbit_binary_generator:build_simple_method_frame( + Channel, MethodRecord, Protocol). assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, Content), |
