diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_binding.erl | 204 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_headers.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 10 |
6 files changed, 136 insertions, 110 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8a9237805d..7116653c2a 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -249,11 +249,12 @@ start_queue_process(Q) -> Q#amqqueue{pid = Pid}. add_default_binding(#amqqueue{name = QueueName}) -> - Exchange = rabbit_misc:r(QueueName, exchange, <<>>), + ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, - rabbit_binding:add(Exchange, QueueName, RoutingKey, [], - fun (_X, _Q) -> ok end), - ok. + rabbit_binding:add(#binding{exchange_name = ExchangeName, + queue_name = QueueName, + key = RoutingKey, + args = []}). lookup(Name) -> rabbit_misc:dirty_read({rabbit_queue, Name}). diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 3569ba93b7..6caf7302b8 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -32,8 +32,9 @@ -module(rabbit_binding). -include("rabbit.hrl"). --export([recover/0, add/5, remove/5, list/1]). --export([list_for_exchange/1, list_for_queue/1]). +-export([recover/0, add/1, remove/1, add/2, remove/2, list/1]). +-export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]). +-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx -export([has_for_exchange/1, remove_for_exchange/1, remove_for_queue/1, remove_transient_for_queue/1]). @@ -52,31 +53,29 @@ -type(inner_fun() :: fun((rabbit_types:exchange(), queue()) -> rabbit_types:ok_or_error(rabbit_types:amqp_error()))). +-type(bindings() :: [rabbit_types:binding()]). -spec(recover/0 :: () -> [rabbit_types:binding()]). --spec(add/5 :: - (rabbit_exchange:name(), rabbit_amqqueue:name(), - rabbit_router:routing_key(), rabbit_framing:amqp_table(), - inner_fun()) -> bind_res()). --spec(remove/5 :: - (rabbit_exchange:name(), rabbit_amqqueue:name(), - rabbit_router:routing_key(), rabbit_framing:amqp_table(), - inner_fun()) -> bind_res() | rabbit_types:error('binding_not_found')). --spec(list/1 :: (rabbit_types:vhost()) -> - [{rabbit_exchange:name(), rabbit_amqqueue:name(), - rabbit_router:routing_key(), - rabbit_framing:amqp_table()}]). --spec(list_for_exchange/1 :: - (rabbit_exchange:name()) -> [{rabbit_amqqueue:name(), - rabbit_router:routing_key(), - rabbit_framing:amqp_table()}]). --spec(list_for_queue/1 :: - (rabbit_amqqueue:name()) -> [{rabbit_exchange:name(), - rabbit_router:routing_key(), - rabbit_framing:amqp_table()}]). +-spec(add/1 :: (rabbit_types:binding()) -> bind_res()). +-spec(remove/1 :: (rabbit_types:binding()) -> + bind_res() | rabbit_types:error('binding_not_found')). +-spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res()). +-spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> + bind_res() | rabbit_types:error('binding_not_found')). +-spec(list/1 :: (rabbit_types:vhost()) -> bindings()). +-spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). +-spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()). +-spec(list_for_exchange_and_queue/2 :: + (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()). +-spec(info_keys/0 :: () -> [rabbit_types:info_key()]). +-spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]). +-spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) -> + [rabbit_types:info()]). +-spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]). +-spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()]) + -> [[rabbit_types:info()]]). -spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()). --spec(remove_for_exchange/1 :: - (rabbit_exchange:name()) -> [rabbit_types:binding()]). +-spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). -spec(remove_for_queue/1 :: (rabbit_amqqueue:name()) -> fun (() -> any())). -spec(remove_transient_for_queue/1 :: @@ -86,6 +85,8 @@ %%---------------------------------------------------------------------------- +-define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]). + recover() -> rabbit_misc:table_fold( fun (Route = #route{binding = B}, Acc) -> @@ -95,9 +96,13 @@ recover() -> [B | Acc] end, [], rabbit_durable_route). -add(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> +add(Binding) -> add(Binding, fun (_X, _Q) -> ok end). + +remove(Binding) -> remove(Binding, fun (_X, _Q) -> ok end). + +add(Binding, InnerFun) -> case binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, + Binding, fun (X, Q, B) -> %% this argument is used to check queue exclusivity; %% in general, we want to fail on that in preference to @@ -105,58 +110,47 @@ add(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> case InnerFun(X, Q) of ok -> case mnesia:read({rabbit_route, B}) of - [] -> - ok = sync_binding(B, - X#exchange.durable andalso - Q#amqqueue.durable, - fun mnesia:write/3), - rabbit_event:notify( - binding_created, - [{exchange_name, ExchangeName}, - {queue_name, QueueName}, - {routing_key, RoutingKey}, - {arguments, Arguments}]), - {new, X, B}; - [_R] -> - {existing, X, B} + [] -> Durable = (X#exchange.durable andalso + Q#amqqueue.durable), + ok = sync_binding( + B, Durable, + fun mnesia:write/3), + {new, X, B}; + [_] -> {existing, X, B} end; {error, _} = E -> E end end) of - {new, Exchange = #exchange{ type = Type }, Binding} -> - (type_to_module(Type)):add_binding(Exchange, Binding); + {new, Exchange = #exchange{ type = Type }, B} -> + ok = (type_to_module(Type)):add_binding(Exchange, B), + rabbit_event:notify(binding_created, info(B)); {existing, _, _} -> ok; {error, _} = Err -> Err end. -remove(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> +remove(Binding, InnerFun) -> case binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, + Binding, fun (X, Q, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of - [] -> - {error, binding_not_found}; - _ -> - case InnerFun(X, Q) of - ok -> - ok = - sync_binding(B, - X#exchange.durable andalso - Q#amqqueue.durable, - fun mnesia:delete_object/3), - rabbit_event:notify( - binding_deleted, - [{exchange_name, ExchangeName}, - {queue_name, QueueName}]), - Del = rabbit_exchange:maybe_auto_delete(X), - {{Del, X}, B}; - {error, _} = E -> - E - end + [] -> {error, binding_not_found}; + [_] -> case InnerFun(X, Q) of + ok -> + Durable = (X#exchange.durable andalso + Q#amqqueue.durable), + ok = sync_binding( + B, Durable, + fun mnesia:delete_object/3), + Deleted = + rabbit_exchange:maybe_auto_delete(X), + {{Deleted, X}, B}; + {error, _} = E -> + E + end end end) of {error, _} = Err -> @@ -164,41 +158,62 @@ remove(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> {{IsDeleted, X = #exchange{ type = Type }}, B} -> Module = type_to_module(Type), case IsDeleted of - auto_deleted -> Module:delete(X, [B]); - not_deleted -> Module:remove_bindings(X, [B]) - end + auto_deleted -> ok = Module:delete(X, [B]); + not_deleted -> ok = Module:remove_bindings(X, [B]) + end, + rabbit_event:notify(binding_deleted, info(B)), + ok end. list(VHostPath) -> - [{ExchangeName, QueueName, RoutingKey, Arguments} || - #route{binding = #binding{ - exchange_name = ExchangeName, - key = RoutingKey, - queue_name = QueueName, - args = Arguments}} - <- mnesia:dirty_match_object( - rabbit_route, - #route{binding = #binding{ - exchange_name = rabbit_misc:r(VHostPath, exchange), - _ = '_'}, - _ = '_'})]. + Route = #route{binding = #binding{ + exchange_name = rabbit_misc:r(VHostPath, exchange), + queue_name = rabbit_misc:r(VHostPath, queue), + _ = '_'}, + _ = '_'}, + [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)]. list_for_exchange(ExchangeName) -> Route = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, - [{QueueName, RoutingKey, Arguments} || - #route{binding = #binding{queue_name = QueueName, - key = RoutingKey, - args = Arguments}} - <- mnesia:dirty_match_object(rabbit_route, Route)]. + [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)]. -% Refactoring is left as an exercise for the reader list_for_queue(QueueName) -> Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}}, - [{ExchangeName, RoutingKey, Arguments} || - #route{binding = #binding{exchange_name = ExchangeName, - key = RoutingKey, - args = Arguments}} - <- mnesia:dirty_match_object(rabbit_route, Route)]. + [reverse_binding(B) || #reverse_route{reverse_binding = B} <- + mnesia:dirty_match_object(rabbit_reverse_route, + reverse_route(Route))]. + +list_for_exchange_and_queue(ExchangeName, QueueName) -> + Route = #route{binding = #binding{exchange_name = ExchangeName, + queue_name = QueueName, + _ = '_'}}, + [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)]. + +info_keys() -> ?INFO_KEYS. + +map(VHostPath, F) -> + %% TODO: there is scope for optimisation here, e.g. using a + %% cursor, parallelising the function invocation + lists:map(F, list(VHostPath)). + +infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items]. + +i(exchange_name, #binding{exchange_name = XName}) -> XName; +i(queue_name, #binding{queue_name = QName}) -> QName; +i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; +i(arguments, #binding{args = Arguments}) -> Arguments; +i(Item, _) -> throw({bad_argument, Item}). + +info(B = #binding{}) -> infos(?INFO_KEYS, B). + +info(B = #binding{}, Items) -> infos(Items, B). + +info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end). + +info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end). has_for_exchange(ExchangeName) -> Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, @@ -227,15 +242,14 @@ remove_transient_for_queue(QueueName) -> %%---------------------------------------------------------------------------- -binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) -> +binding_action(Binding = #binding{exchange_name = ExchangeName, + queue_name = QueueName, + args = Arguments}, Fun) -> call_with_exchange_and_queue( ExchangeName, QueueName, fun (X, Q) -> - Fun(X, Q, #binding{ - exchange_name = ExchangeName, - queue_name = QueueName, - key = RoutingKey, - args = rabbit_misc:sort_field_table(Arguments)}) + SortedArgs = rabbit_misc:sort_field_table(Arguments), + Fun(X, Q, Binding#binding{args = SortedArgs}) end). sync_binding(Binding, Durable, Fun) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 60b807faa5..174eab4002 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -805,7 +805,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin, routing_key = RoutingKey, nowait = NoWait, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_binding:add/5, + binding_action(fun rabbit_binding:add/2, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, NoWait, State); @@ -813,7 +813,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_binding:remove/5, + binding_action(fun rabbit_binding:remove/2, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, false, State); @@ -893,7 +893,10 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), - case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, + case Fun(#binding{exchange_name = ExchangeName, + queue_name = QueueName, + key = ActualRoutingKey, + args = Arguments}, fun (_X, Q) -> try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) catch exit:Reason -> {error, Reason} diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index cca2e3d199..06826b8e7f 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -246,14 +246,14 @@ action(list_exchanges, Node, Args, Opts, Inform) -> [VHostArg, ArgAtoms]), ArgAtoms); -action(list_bindings, Node, _Args, Opts, Inform) -> +action(list_bindings, Node, Args, Opts, Inform) -> Inform("Listing bindings", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - InfoKeys = [exchange_name, queue_name, routing_key, args], - display_info_list( - [lists:zip(InfoKeys, tuple_to_list(X)) || - X <- rpc_call(Node, rabbit_binding, list, [VHostArg])], - InfoKeys); + ArgAtoms = default_if_empty(Args, [exchange_name, queue_name, + routing_key, arguments]), + display_info_list(rpc_call(Node, rabbit_binding, info_all, + [VHostArg, ArgAtoms]), + ArgAtoms); action(list_connections, Node, Args, _Opts, Inform) -> Inform("Listing connections", []), diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 02e829ecce..0a59a175cd 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -80,7 +80,7 @@ parse_x_match(Other) -> %% Horrendous matching algorithm. Depends for its merge-like %% (linear-time) behaviour on the lists:keysort %% (rabbit_misc:sort_field_table) that publish/1 and -%% rabbit_binding:{add,remove}/5 do. +%% rabbit_binding:{add,remove}/2 do. %% %% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! %% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index bdd3cdcd64..b541f0f70f 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1037,7 +1037,15 @@ test_server_status() -> ok = info_action(list_exchanges, rabbit_exchange:info_keys(), true), %% list bindings - ok = control_action(list_bindings, []), + ok = info_action(list_bindings, rabbit_binding:info_keys(), true), + %% misc binding listing APIs + [_|_] = rabbit_binding:list_for_exchange( + rabbit_misc:r(<<"/">>, exchange, <<"">>)), + [_] = rabbit_binding:list_for_queue( + rabbit_misc:r(<<"/">>, queue, <<"foo">>)), + [_] = rabbit_binding:list_for_exchange_and_queue( + rabbit_misc:r(<<"/">>, exchange, <<"">>), + rabbit_misc:r(<<"/">>, queue, <<"foo">>)), %% list connections [#listener{host = H, port = P} | _] = |
