diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-09-04 06:31:01 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-09-04 06:31:01 +0100 |
| commit | bf969f71ac27b0fa0ca0c19835680491db233ea0 (patch) | |
| tree | 16a6d54aabd8be9ec04a885d25f0b610d100a2ff | |
| parent | b4560ce8e40df9dbb7f1826cc08e2e1b905e03e9 (diff) | |
| download | rabbitmq-server-git-bf969f71ac27b0fa0ca0c19835680491db233ea0.tar.gz | |
tweak and extend rabbit_binding API
- 'add', 'remove' take binding records instead of losts of args
- 'list*' return #binding records instead of tuples
- add 'list_for_exchange_and_queue'
- add 'info*' functions
Also fix two bugs:
- don't invoke rabbit_event:notify inside mnesia tx
- include complete binding info in binding_deleted event
| -rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_binding.erl | 197 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_headers.erl | 2 |
5 files changed, 120 insertions, 107 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8a9237805d..5c5b143d2b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -249,9 +249,12 @@ start_queue_process(Q) -> Q#amqqueue{pid = Pid}. add_default_binding(#amqqueue{name = QueueName}) -> - Exchange = rabbit_misc:r(QueueName, exchange, <<>>), + ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, - rabbit_binding:add(Exchange, QueueName, RoutingKey, [], + rabbit_binding:add(#binding{exchange_name = ExchangeName, + queue_name = QueueName, + key = RoutingKey, + args = []}, fun (_X, _Q) -> ok end), ok. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 3569ba93b7..a815f544db 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -32,8 +32,9 @@ -module(rabbit_binding). -include("rabbit.hrl"). --export([recover/0, add/5, remove/5, list/1]). --export([list_for_exchange/1, list_for_queue/1]). +-export([recover/0, add/2, remove/2, list/1]). +-export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]). +-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx -export([has_for_exchange/1, remove_for_exchange/1, remove_for_queue/1, remove_transient_for_queue/1]). @@ -52,31 +53,26 @@ -type(inner_fun() :: fun((rabbit_types:exchange(), queue()) -> rabbit_types:ok_or_error(rabbit_types:amqp_error()))). +-type(bindings() :: [rabbit_types:binding()]). -spec(recover/0 :: () -> [rabbit_types:binding()]). --spec(add/5 :: - (rabbit_exchange:name(), rabbit_amqqueue:name(), - rabbit_router:routing_key(), rabbit_framing:amqp_table(), - inner_fun()) -> bind_res()). --spec(remove/5 :: - (rabbit_exchange:name(), rabbit_amqqueue:name(), - rabbit_router:routing_key(), rabbit_framing:amqp_table(), - inner_fun()) -> bind_res() | rabbit_types:error('binding_not_found')). --spec(list/1 :: (rabbit_types:vhost()) -> - [{rabbit_exchange:name(), rabbit_amqqueue:name(), - rabbit_router:routing_key(), - rabbit_framing:amqp_table()}]). --spec(list_for_exchange/1 :: - (rabbit_exchange:name()) -> [{rabbit_amqqueue:name(), - rabbit_router:routing_key(), - rabbit_framing:amqp_table()}]). --spec(list_for_queue/1 :: - (rabbit_amqqueue:name()) -> [{rabbit_exchange:name(), - rabbit_router:routing_key(), - rabbit_framing:amqp_table()}]). +-spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res()). +-spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> + bind_res() | rabbit_types:error('binding_not_found')). +-spec(list/1 :: (rabbit_types:vhost()) -> bindings()). +-spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). +-spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()). +-spec(list_for_exchange_and_queue/2 :: + (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()). +-spec(info_keys/0 :: () -> [rabbit_types:info_key()]). +-spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]). +-spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) -> + [rabbit_types:info()]). +-spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]). +-spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()]) + -> [[rabbit_types:info()]]). -spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()). --spec(remove_for_exchange/1 :: - (rabbit_exchange:name()) -> [rabbit_types:binding()]). +-spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). -spec(remove_for_queue/1 :: (rabbit_amqqueue:name()) -> fun (() -> any())). -spec(remove_transient_for_queue/1 :: @@ -86,6 +82,8 @@ %%---------------------------------------------------------------------------- +-define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]). + recover() -> rabbit_misc:table_fold( fun (Route = #route{binding = B}, Acc) -> @@ -95,9 +93,9 @@ recover() -> [B | Acc] end, [], rabbit_durable_route). -add(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> +add(Binding, InnerFun) -> case binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, + Binding, fun (X, Q, B) -> %% this argument is used to check queue exclusivity; %% in general, we want to fail on that in preference to @@ -105,58 +103,47 @@ add(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> case InnerFun(X, Q) of ok -> case mnesia:read({rabbit_route, B}) of - [] -> - ok = sync_binding(B, - X#exchange.durable andalso - Q#amqqueue.durable, - fun mnesia:write/3), - rabbit_event:notify( - binding_created, - [{exchange_name, ExchangeName}, - {queue_name, QueueName}, - {routing_key, RoutingKey}, - {arguments, Arguments}]), - {new, X, B}; - [_R] -> - {existing, X, B} + [] -> Durable = (X#exchange.durable andalso + Q#amqqueue.durable), + ok = sync_binding( + B, Durable, + fun mnesia:write/3), + {new, X, B}; + [_] -> {existing, X, B} end; {error, _} = E -> E end end) of - {new, Exchange = #exchange{ type = Type }, Binding} -> - (type_to_module(Type)):add_binding(Exchange, Binding); + {new, Exchange = #exchange{ type = Type }, B} -> + ok = (type_to_module(Type)):add_binding(Exchange, B), + rabbit_event:notify(binding_created, info(B)); {existing, _, _} -> ok; {error, _} = Err -> Err end. -remove(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> +remove(Binding, InnerFun) -> case binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, + Binding, fun (X, Q, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of - [] -> - {error, binding_not_found}; - _ -> - case InnerFun(X, Q) of - ok -> - ok = - sync_binding(B, - X#exchange.durable andalso - Q#amqqueue.durable, - fun mnesia:delete_object/3), - rabbit_event:notify( - binding_deleted, - [{exchange_name, ExchangeName}, - {queue_name, QueueName}]), - Del = rabbit_exchange:maybe_auto_delete(X), - {{Del, X}, B}; - {error, _} = E -> - E - end + [] -> {error, binding_not_found}; + [_] -> case InnerFun(X, Q) of + ok -> + Durable = (X#exchange.durable andalso + Q#amqqueue.durable), + ok = sync_binding( + B, Durable, + fun mnesia:delete_object/3), + Deleted = + rabbit_exchange:maybe_auto_delete(X), + {{Deleted, X}, B}; + {error, _} = E -> + E + end end end) of {error, _} = Err -> @@ -164,41 +151,62 @@ remove(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> {{IsDeleted, X = #exchange{ type = Type }}, B} -> Module = type_to_module(Type), case IsDeleted of - auto_deleted -> Module:delete(X, [B]); - not_deleted -> Module:remove_bindings(X, [B]) - end + auto_deleted -> ok = Module:delete(X, [B]); + not_deleted -> ok = Module:remove_bindings(X, [B]) + end, + rabbit_event:notify(binding_deleted, info(B)), + ok end. list(VHostPath) -> - [{ExchangeName, QueueName, RoutingKey, Arguments} || - #route{binding = #binding{ - exchange_name = ExchangeName, - key = RoutingKey, - queue_name = QueueName, - args = Arguments}} - <- mnesia:dirty_match_object( - rabbit_route, - #route{binding = #binding{ - exchange_name = rabbit_misc:r(VHostPath, exchange), - _ = '_'}, - _ = '_'})]. + Route = #route{binding = #binding{ + exchange_name = rabbit_misc:r(VHostPath, exchange), + queue_name = rabbit_misc:r(VHostPath, queue), + _ = '_'}, + _ = '_'}, + [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)]. list_for_exchange(ExchangeName) -> Route = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, - [{QueueName, RoutingKey, Arguments} || - #route{binding = #binding{queue_name = QueueName, - key = RoutingKey, - args = Arguments}} - <- mnesia:dirty_match_object(rabbit_route, Route)]. + [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)]. -% Refactoring is left as an exercise for the reader list_for_queue(QueueName) -> Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}}, - [{ExchangeName, RoutingKey, Arguments} || - #route{binding = #binding{exchange_name = ExchangeName, - key = RoutingKey, - args = Arguments}} - <- mnesia:dirty_match_object(rabbit_route, Route)]. + [reverse_binding(B) || #reverse_route{reverse_binding = B} <- + mnesia:dirty_match_object(rabbit_reverse_route, + reverse_route(Route))]. + +list_for_exchange_and_queue(ExchangeName, QueueName) -> + Route = #route{binding = #binding{exchange_name = ExchangeName, + queue_name = QueueName, + _ = '_'}}, + [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)]. + +info_keys() -> ?INFO_KEYS. + +map(VHostPath, F) -> + %% TODO: there is scope for optimisation here, e.g. using a + %% cursor, parallelising the function invocation + lists:map(F, list(VHostPath)). + +infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items]. + +i(exchange_name, #binding{exchange_name = XName}) -> XName; +i(queue_name, #binding{queue_name = QName}) -> QName; +i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; +i(arguments, #binding{args = Arguments}) -> Arguments; +i(Item, _) -> throw({bad_argument, Item}). + +info(B = #binding{}) -> infos(?INFO_KEYS, B). + +info(B = #binding{}, Items) -> infos(Items, B). + +info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end). + +info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end). has_for_exchange(ExchangeName) -> Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, @@ -227,15 +235,14 @@ remove_transient_for_queue(QueueName) -> %%---------------------------------------------------------------------------- -binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) -> +binding_action(Binding = #binding{exchange_name = ExchangeName, + queue_name = QueueName, + args = Arguments}, Fun) -> call_with_exchange_and_queue( ExchangeName, QueueName, fun (X, Q) -> - Fun(X, Q, #binding{ - exchange_name = ExchangeName, - queue_name = QueueName, - key = RoutingKey, - args = rabbit_misc:sort_field_table(Arguments)}) + SortedArgs = rabbit_misc:sort_field_table(Arguments), + Fun(X, Q, Binding#binding{args = SortedArgs}) end). sync_binding(Binding, Durable, Fun) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8e5735c5cd..8f0065ccfe 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -807,7 +807,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin, routing_key = RoutingKey, nowait = NoWait, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_binding:add/5, + binding_action(fun rabbit_binding:add/2, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, NoWait, State); @@ -815,7 +815,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_binding:remove/5, + binding_action(fun rabbit_binding:remove/2, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, false, State); @@ -895,7 +895,10 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), - case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, + case Fun(#binding{exchange_name = ExchangeName, + queue_name = QueueName, + key = ActualRoutingKey, + args = Arguments}, fun (_X, Q) -> try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) catch exit:Reason -> {error, Reason} diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index cca2e3d199..06826b8e7f 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -246,14 +246,14 @@ action(list_exchanges, Node, Args, Opts, Inform) -> [VHostArg, ArgAtoms]), ArgAtoms); -action(list_bindings, Node, _Args, Opts, Inform) -> +action(list_bindings, Node, Args, Opts, Inform) -> Inform("Listing bindings", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - InfoKeys = [exchange_name, queue_name, routing_key, args], - display_info_list( - [lists:zip(InfoKeys, tuple_to_list(X)) || - X <- rpc_call(Node, rabbit_binding, list, [VHostArg])], - InfoKeys); + ArgAtoms = default_if_empty(Args, [exchange_name, queue_name, + routing_key, arguments]), + display_info_list(rpc_call(Node, rabbit_binding, info_all, + [VHostArg, ArgAtoms]), + ArgAtoms); action(list_connections, Node, Args, _Opts, Inform) -> Inform("Listing connections", []), diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 02e829ecce..0a59a175cd 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -80,7 +80,7 @@ parse_x_match(Other) -> %% Horrendous matching algorithm. Depends for its merge-like %% (linear-time) behaviour on the lists:keysort %% (rabbit_misc:sort_field_table) that publish/1 and -%% rabbit_binding:{add,remove}/5 do. +%% rabbit_binding:{add,remove}/2 do. %% %% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! %% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. |
