summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit.hrl4
-rw-r--r--src/rabbit_exchange.erl50
2 files changed, 29 insertions, 25 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index b089014476..706a92af7a 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -49,8 +49,8 @@
-record(route, {binding, value = const}).
-record(reverse_route, {reverse_binding, value = const}).
--record(binding, {exchange_name, key, queue_name}).
--record(reverse_binding, {queue_name, key, exchange_name}).
+-record(binding, {exchange_name, key, queue_name, args = []}).
+-record(reverse_binding, {queue_name, key, exchange_name, args = []}).
-record(listener, {node, protocol, host, port}).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 2db7758d7a..fc26915fa0 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -208,7 +208,8 @@ route(X = #exchange{type = direct}, RoutingKey) ->
route_internal(#exchange{name = Name}, RoutingKey) ->
MatchHead = #route{binding = #binding{exchange_name = Name,
queue_name = '$1',
- key = RoutingKey}},
+ key = RoutingKey,
+ _ = '_'}},
lookup_qpids(mnesia:dirty_select(route, [{MatchHead, [], ['$1']}])).
lookup_qpids(Queues) ->
@@ -225,16 +226,14 @@ lookup_qpids(Queues) ->
delete_bindings_for_exchange(ExchangeName) ->
indexed_delete(
#route{binding = #binding{exchange_name = ExchangeName,
- queue_name = '_',
- key = '_'}},
+ _ = '_'}},
fun delete_forward_routes/1, fun mnesia:delete_object/1).
delete_bindings_for_queue(QueueName) ->
Exchanges = exchanges_for_queue(QueueName),
indexed_delete(
- reverse_route(#route{binding = #binding{exchange_name = '_',
- queue_name = QueueName,
- key = '_'}}),
+ reverse_route(#route{binding = #binding{queue_name = QueueName,
+ _ = '_'}}),
fun mnesia:delete_object/1, fun delete_forward_routes/1),
[begin
[X] = mnesia:read({exchange, ExchangeName}),
@@ -257,7 +256,7 @@ exchanges_for_queue(QueueName) ->
MatchHead = reverse_route(
#route{binding = #binding{exchange_name = '$1',
queue_name = QueueName,
- key = '_'}}),
+ _ = '_'}}),
sets:to_list(
sets:from_list(
mnesia:select(reverse_route, [{MatchHead, [], ['$1']}]))).
@@ -265,7 +264,7 @@ exchanges_for_queue(QueueName) ->
has_bindings(ExchangeName) ->
MatchHead = #route{binding = #binding{exchange_name = ExchangeName,
queue_name = '$1',
- key = '_'}},
+ _ = '_'}},
continue(mnesia:select(route, [{MatchHead, [], ['$1']}], 1, read)).
continue('$end_of_table') -> false;
@@ -289,32 +288,33 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
end
end).
-add_binding(ExchangeName, QueueName, RoutingKey, _Arguments) ->
+add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
call_with_exchange_and_queue(
ExchangeName, QueueName,
fun (X, Q) ->
if Q#amqqueue.durable and not(X#exchange.durable) ->
{error, durability_settings_incompatible};
true -> ok = sync_binding(
- ExchangeName, QueueName, RoutingKey,
- Q#amqqueue.durable, fun mnesia:write/3)
+ ExchangeName, QueueName, RoutingKey, Arguments,
+ Q#amqqueue.durable, fun mnesia:write/3)
end
end).
-delete_binding(ExchangeName, QueueName, RoutingKey, _Arguments) ->
+delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
call_with_exchange_and_queue(
ExchangeName, QueueName,
fun (X, Q) ->
ok = sync_binding(
- ExchangeName, QueueName, RoutingKey,
+ ExchangeName, QueueName, RoutingKey, Arguments,
Q#amqqueue.durable, fun mnesia:delete_object/3),
maybe_auto_delete(X)
end).
-sync_binding(ExchangeName, QueueName, RoutingKey, Durable, Fun) ->
+sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) ->
Binding = #binding{exchange_name = ExchangeName,
queue_name = QueueName,
- key = RoutingKey},
+ key = RoutingKey,
+ args = Arguments},
ok = case Durable of
true -> Fun(durable_routes, #route{binding = Binding}, write);
false -> ok
@@ -337,17 +337,21 @@ reverse_route(#reverse_route{reverse_binding = Binding}) ->
reverse_binding(#reverse_binding{exchange_name = Exchange,
queue_name = Queue,
- key = Key}) ->
+ key = Key,
+ args = Args}) ->
#binding{exchange_name = Exchange,
queue_name = Queue,
- key = Key};
+ key = Key,
+ args = Args};
reverse_binding(#binding{exchange_name = Exchange,
queue_name = Queue,
- key = Key}) ->
+ key = Key,
+ args = Args}) ->
#reverse_binding{exchange_name = Exchange,
queue_name = Queue,
- key = Key}.
+ key = Key,
+ args = Args}.
split_topic_key(Key) ->
{ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
@@ -407,9 +411,9 @@ unconditional_delete(#exchange{name = ExchangeName}) ->
%% return {QueueName, RoutingKey, Arguments} tuples
list_exchange_bindings(ExchangeName) ->
Route = #route{binding = #binding{exchange_name = ExchangeName,
- queue_name = '_',
- key = '_'}},
- [{QueueName, RoutingKey, []} ||
+ _ = '_'}},
+ [{QueueName, RoutingKey, Arguments} ||
#route{binding = #binding{queue_name = QueueName,
- key = RoutingKey}}
+ key = RoutingKey,
+ args = Arguments}}
<- mnesia:dirty_match_object(Route)].