summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_binding.erl204
-rw-r--r--src/rabbit_channel.erl9
-rw-r--r--src/rabbit_control.erl12
-rw-r--r--src/rabbit_exchange_type_headers.erl2
-rw-r--r--src/rabbit_tests.erl10
6 files changed, 136 insertions, 110 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8a9237805d..7116653c2a 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -249,11 +249,12 @@ start_queue_process(Q) ->
Q#amqqueue{pid = Pid}.
add_default_binding(#amqqueue{name = QueueName}) ->
- Exchange = rabbit_misc:r(QueueName, exchange, <<>>),
+ ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
- rabbit_binding:add(Exchange, QueueName, RoutingKey, [],
- fun (_X, _Q) -> ok end),
- ok.
+ rabbit_binding:add(#binding{exchange_name = ExchangeName,
+ queue_name = QueueName,
+ key = RoutingKey,
+ args = []}).
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_queue, Name}).
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 3569ba93b7..6caf7302b8 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -32,8 +32,9 @@
-module(rabbit_binding).
-include("rabbit.hrl").
--export([recover/0, add/5, remove/5, list/1]).
--export([list_for_exchange/1, list_for_queue/1]).
+-export([recover/0, add/1, remove/1, add/2, remove/2, list/1]).
+-export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]).
+-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]).
%% these must all be run inside a mnesia tx
-export([has_for_exchange/1, remove_for_exchange/1,
remove_for_queue/1, remove_transient_for_queue/1]).
@@ -52,31 +53,29 @@
-type(inner_fun() ::
fun((rabbit_types:exchange(), queue()) ->
rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
+-type(bindings() :: [rabbit_types:binding()]).
-spec(recover/0 :: () -> [rabbit_types:binding()]).
--spec(add/5 ::
- (rabbit_exchange:name(), rabbit_amqqueue:name(),
- rabbit_router:routing_key(), rabbit_framing:amqp_table(),
- inner_fun()) -> bind_res()).
--spec(remove/5 ::
- (rabbit_exchange:name(), rabbit_amqqueue:name(),
- rabbit_router:routing_key(), rabbit_framing:amqp_table(),
- inner_fun()) -> bind_res() | rabbit_types:error('binding_not_found')).
--spec(list/1 :: (rabbit_types:vhost()) ->
- [{rabbit_exchange:name(), rabbit_amqqueue:name(),
- rabbit_router:routing_key(),
- rabbit_framing:amqp_table()}]).
--spec(list_for_exchange/1 ::
- (rabbit_exchange:name()) -> [{rabbit_amqqueue:name(),
- rabbit_router:routing_key(),
- rabbit_framing:amqp_table()}]).
--spec(list_for_queue/1 ::
- (rabbit_amqqueue:name()) -> [{rabbit_exchange:name(),
- rabbit_router:routing_key(),
- rabbit_framing:amqp_table()}]).
+-spec(add/1 :: (rabbit_types:binding()) -> bind_res()).
+-spec(remove/1 :: (rabbit_types:binding()) ->
+ bind_res() | rabbit_types:error('binding_not_found')).
+-spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res()).
+-spec(remove/2 :: (rabbit_types:binding(), inner_fun()) ->
+ bind_res() | rabbit_types:error('binding_not_found')).
+-spec(list/1 :: (rabbit_types:vhost()) -> bindings()).
+-spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()).
+-spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()).
+-spec(list_for_exchange_and_queue/2 ::
+ (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()).
+-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
+-spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]).
+-spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) ->
+ [rabbit_types:info()]).
+-spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]).
+-spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()])
+ -> [[rabbit_types:info()]]).
-spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()).
--spec(remove_for_exchange/1 ::
- (rabbit_exchange:name()) -> [rabbit_types:binding()]).
+-spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()).
-spec(remove_for_queue/1 ::
(rabbit_amqqueue:name()) -> fun (() -> any())).
-spec(remove_transient_for_queue/1 ::
@@ -86,6 +85,8 @@
%%----------------------------------------------------------------------------
+-define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]).
+
recover() ->
rabbit_misc:table_fold(
fun (Route = #route{binding = B}, Acc) ->
@@ -95,9 +96,13 @@ recover() ->
[B | Acc]
end, [], rabbit_durable_route).
-add(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
+add(Binding) -> add(Binding, fun (_X, _Q) -> ok end).
+
+remove(Binding) -> remove(Binding, fun (_X, _Q) -> ok end).
+
+add(Binding, InnerFun) ->
case binding_action(
- ExchangeName, QueueName, RoutingKey, Arguments,
+ Binding,
fun (X, Q, B) ->
%% this argument is used to check queue exclusivity;
%% in general, we want to fail on that in preference to
@@ -105,58 +110,47 @@ add(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
case InnerFun(X, Q) of
ok ->
case mnesia:read({rabbit_route, B}) of
- [] ->
- ok = sync_binding(B,
- X#exchange.durable andalso
- Q#amqqueue.durable,
- fun mnesia:write/3),
- rabbit_event:notify(
- binding_created,
- [{exchange_name, ExchangeName},
- {queue_name, QueueName},
- {routing_key, RoutingKey},
- {arguments, Arguments}]),
- {new, X, B};
- [_R] ->
- {existing, X, B}
+ [] -> Durable = (X#exchange.durable andalso
+ Q#amqqueue.durable),
+ ok = sync_binding(
+ B, Durable,
+ fun mnesia:write/3),
+ {new, X, B};
+ [_] -> {existing, X, B}
end;
{error, _} = E ->
E
end
end) of
- {new, Exchange = #exchange{ type = Type }, Binding} ->
- (type_to_module(Type)):add_binding(Exchange, Binding);
+ {new, Exchange = #exchange{ type = Type }, B} ->
+ ok = (type_to_module(Type)):add_binding(Exchange, B),
+ rabbit_event:notify(binding_created, info(B));
{existing, _, _} ->
ok;
{error, _} = Err ->
Err
end.
-remove(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
+remove(Binding, InnerFun) ->
case binding_action(
- ExchangeName, QueueName, RoutingKey, Arguments,
+ Binding,
fun (X, Q, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
- [] ->
- {error, binding_not_found};
- _ ->
- case InnerFun(X, Q) of
- ok ->
- ok =
- sync_binding(B,
- X#exchange.durable andalso
- Q#amqqueue.durable,
- fun mnesia:delete_object/3),
- rabbit_event:notify(
- binding_deleted,
- [{exchange_name, ExchangeName},
- {queue_name, QueueName}]),
- Del = rabbit_exchange:maybe_auto_delete(X),
- {{Del, X}, B};
- {error, _} = E ->
- E
- end
+ [] -> {error, binding_not_found};
+ [_] -> case InnerFun(X, Q) of
+ ok ->
+ Durable = (X#exchange.durable andalso
+ Q#amqqueue.durable),
+ ok = sync_binding(
+ B, Durable,
+ fun mnesia:delete_object/3),
+ Deleted =
+ rabbit_exchange:maybe_auto_delete(X),
+ {{Deleted, X}, B};
+ {error, _} = E ->
+ E
+ end
end
end) of
{error, _} = Err ->
@@ -164,41 +158,62 @@ remove(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
{{IsDeleted, X = #exchange{ type = Type }}, B} ->
Module = type_to_module(Type),
case IsDeleted of
- auto_deleted -> Module:delete(X, [B]);
- not_deleted -> Module:remove_bindings(X, [B])
- end
+ auto_deleted -> ok = Module:delete(X, [B]);
+ not_deleted -> ok = Module:remove_bindings(X, [B])
+ end,
+ rabbit_event:notify(binding_deleted, info(B)),
+ ok
end.
list(VHostPath) ->
- [{ExchangeName, QueueName, RoutingKey, Arguments} ||
- #route{binding = #binding{
- exchange_name = ExchangeName,
- key = RoutingKey,
- queue_name = QueueName,
- args = Arguments}}
- <- mnesia:dirty_match_object(
- rabbit_route,
- #route{binding = #binding{
- exchange_name = rabbit_misc:r(VHostPath, exchange),
- _ = '_'},
- _ = '_'})].
+ Route = #route{binding = #binding{
+ exchange_name = rabbit_misc:r(VHostPath, exchange),
+ queue_name = rabbit_misc:r(VHostPath, queue),
+ _ = '_'},
+ _ = '_'},
+ [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
+ Route)].
list_for_exchange(ExchangeName) ->
Route = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
- [{QueueName, RoutingKey, Arguments} ||
- #route{binding = #binding{queue_name = QueueName,
- key = RoutingKey,
- args = Arguments}}
- <- mnesia:dirty_match_object(rabbit_route, Route)].
+ [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
+ Route)].
-% Refactoring is left as an exercise for the reader
list_for_queue(QueueName) ->
Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}},
- [{ExchangeName, RoutingKey, Arguments} ||
- #route{binding = #binding{exchange_name = ExchangeName,
- key = RoutingKey,
- args = Arguments}}
- <- mnesia:dirty_match_object(rabbit_route, Route)].
+ [reverse_binding(B) || #reverse_route{reverse_binding = B} <-
+ mnesia:dirty_match_object(rabbit_reverse_route,
+ reverse_route(Route))].
+
+list_for_exchange_and_queue(ExchangeName, QueueName) ->
+ Route = #route{binding = #binding{exchange_name = ExchangeName,
+ queue_name = QueueName,
+ _ = '_'}},
+ [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
+ Route)].
+
+info_keys() -> ?INFO_KEYS.
+
+map(VHostPath, F) ->
+ %% TODO: there is scope for optimisation here, e.g. using a
+ %% cursor, parallelising the function invocation
+ lists:map(F, list(VHostPath)).
+
+infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items].
+
+i(exchange_name, #binding{exchange_name = XName}) -> XName;
+i(queue_name, #binding{queue_name = QName}) -> QName;
+i(routing_key, #binding{key = RoutingKey}) -> RoutingKey;
+i(arguments, #binding{args = Arguments}) -> Arguments;
+i(Item, _) -> throw({bad_argument, Item}).
+
+info(B = #binding{}) -> infos(?INFO_KEYS, B).
+
+info(B = #binding{}, Items) -> infos(Items, B).
+
+info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end).
+
+info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end).
has_for_exchange(ExchangeName) ->
Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
@@ -227,15 +242,14 @@ remove_transient_for_queue(QueueName) ->
%%----------------------------------------------------------------------------
-binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) ->
+binding_action(Binding = #binding{exchange_name = ExchangeName,
+ queue_name = QueueName,
+ args = Arguments}, Fun) ->
call_with_exchange_and_queue(
ExchangeName, QueueName,
fun (X, Q) ->
- Fun(X, Q, #binding{
- exchange_name = ExchangeName,
- queue_name = QueueName,
- key = RoutingKey,
- args = rabbit_misc:sort_field_table(Arguments)})
+ SortedArgs = rabbit_misc:sort_field_table(Arguments),
+ Fun(X, Q, Binding#binding{args = SortedArgs})
end).
sync_binding(Binding, Durable, Fun) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 60b807faa5..174eab4002 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -805,7 +805,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
routing_key = RoutingKey,
nowait = NoWait,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_binding:add/5,
+ binding_action(fun rabbit_binding:add/2,
ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
#'queue.bind_ok'{}, NoWait, State);
@@ -813,7 +813,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_binding:remove/5,
+ binding_action(fun rabbit_binding:remove/2,
ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
#'queue.unbind_ok'{}, false, State);
@@ -893,7 +893,10 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_read_permitted(ExchangeName, State),
- case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments,
+ case Fun(#binding{exchange_name = ExchangeName,
+ queue_name = QueueName,
+ key = ActualRoutingKey,
+ args = Arguments},
fun (_X, Q) ->
try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid)
catch exit:Reason -> {error, Reason}
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index cca2e3d199..06826b8e7f 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -246,14 +246,14 @@ action(list_exchanges, Node, Args, Opts, Inform) ->
[VHostArg, ArgAtoms]),
ArgAtoms);
-action(list_bindings, Node, _Args, Opts, Inform) ->
+action(list_bindings, Node, Args, Opts, Inform) ->
Inform("Listing bindings", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- InfoKeys = [exchange_name, queue_name, routing_key, args],
- display_info_list(
- [lists:zip(InfoKeys, tuple_to_list(X)) ||
- X <- rpc_call(Node, rabbit_binding, list, [VHostArg])],
- InfoKeys);
+ ArgAtoms = default_if_empty(Args, [exchange_name, queue_name,
+ routing_key, arguments]),
+ display_info_list(rpc_call(Node, rabbit_binding, info_all,
+ [VHostArg, ArgAtoms]),
+ ArgAtoms);
action(list_connections, Node, Args, _Opts, Inform) ->
Inform("Listing connections", []),
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 02e829ecce..0a59a175cd 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -80,7 +80,7 @@ parse_x_match(Other) ->
%% Horrendous matching algorithm. Depends for its merge-like
%% (linear-time) behaviour on the lists:keysort
%% (rabbit_misc:sort_field_table) that publish/1 and
-%% rabbit_binding:{add,remove}/5 do.
+%% rabbit_binding:{add,remove}/2 do.
%%
%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index bdd3cdcd64..b541f0f70f 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1037,7 +1037,15 @@ test_server_status() ->
ok = info_action(list_exchanges, rabbit_exchange:info_keys(), true),
%% list bindings
- ok = control_action(list_bindings, []),
+ ok = info_action(list_bindings, rabbit_binding:info_keys(), true),
+ %% misc binding listing APIs
+ [_|_] = rabbit_binding:list_for_exchange(
+ rabbit_misc:r(<<"/">>, exchange, <<"">>)),
+ [_] = rabbit_binding:list_for_queue(
+ rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
+ [_] = rabbit_binding:list_for_exchange_and_queue(
+ rabbit_misc:r(<<"/">>, exchange, <<"">>),
+ rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
%% list connections
[#listener{host = H, port = P} | _] =