summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_exchange.erl79
1 files changed, 31 insertions, 48 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index c83dd38b08..2b5d9763f2 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -233,65 +233,48 @@ sort_arguments(Arguments) ->
%% The function ensures that a qpid appears in the return list exactly
%% as many times as a message should be delivered to it. With the
%% current exchange types that is at most once.
-%%
-%% TODO: Maybe this should be handled by a cursor instead.
route(#exchange{name = Name, type = topic}, RoutingKey, _Content) ->
- Query = qlc:q([QName ||
- #route{binding = #binding{
- exchange_name = ExchangeName,
- queue_name = QName,
- key = BindingKey}} <- mnesia:table(route),
- ExchangeName == Name,
- %% TODO: This causes a full scan for each entry
- %% with the same exchange (see bug 19336)
- topic_matches(BindingKey, RoutingKey)]),
- lookup_qpids(
- try
- mnesia:async_dirty(fun qlc:e/1, [Query])
- catch exit:{aborted, {badarg, _}} ->
- %% work around OTP-7025, which was fixed in R12B-1, by
- %% falling back on a less efficient method
- [QName || #route{binding = #binding{queue_name = QName,
- key = BindingKey}} <-
- mnesia:dirty_match_object(
- #route{binding = #binding{exchange_name = Name,
- _ = '_'}}),
- topic_matches(BindingKey, RoutingKey)]
- end);
+ match_bindings(Name, fun (#binding{key = BindingKey}) ->
+ topic_matches(BindingKey, RoutingKey)
+ end);
-route(#exchange{name = Name, type = headers},
- _RoutingKey,
- #content{properties = #'P_basic'{headers = Headers0}}) ->
- Headers = case Headers0 of
+route(#exchange{name = Name, type = headers}, _RoutingKey, Content) ->
+ Headers = case (Content#content.properties)#'P_basic'.headers of
undefined -> [];
- _ -> sort_arguments(Headers0)
+ H -> sort_arguments(H)
end,
- Query = qlc:q([QName ||
- #route{binding = #binding{
- exchange_name = ExchangeName,
- queue_name = QName,
- args = Spec}} <- mnesia:table(route),
- ExchangeName == Name,
- headers_match(Spec, Headers)]),
+ match_bindings(Name, fun (#binding{args = Spec}) ->
+ headers_match(Spec, Headers)
+ end);
+
+route(X = #exchange{type = fanout}, _RoutingKey, _Content) ->
+ route_internal(X, '_');
+
+route(X = #exchange{type = direct}, RoutingKey, _Content) ->
+ route_internal(X, RoutingKey).
+
+%% TODO: Maybe this should be handled by a cursor instead.
+%% TODO: This causes a full scan for each entry with the same exchange
+match_bindings(XName, Match) ->
+ Query = qlc:q([QName || #route{binding = Binding = #binding{
+ exchange_name = ExchangeName,
+ queue_name = QName}} <-
+ mnesia:table(route),
+ ExchangeName == XName,
+ Match(Binding)]),
lookup_qpids(
try
mnesia:async_dirty(fun qlc:e/1, [Query])
catch exit:{aborted, {badarg, _}} ->
%% work around OTP-7025, which was fixed in R12B-1, by
%% falling back on a less efficient method
- [QName || #route{binding = #binding{queue_name = QName,
- args = Spec}} <-
- mnesia:dirty_match_object(
- #route{binding = #binding{exchange_name = Name,
+ [QName || #route{binding = Binding = #binding{
+ queue_name = QName}} <-
+ mnesia:dirty_match_object(
+ #route{binding = #binding{exchange_name = XName,
_ = '_'}}),
- headers_match(Spec, Headers)]
- end);
-
-route(X = #exchange{type = fanout}, _RoutingKey, _Content) ->
- route_internal(X, '_');
-
-route(X = #exchange{type = direct}, RoutingKey, _Content) ->
- route_internal(X, RoutingKey).
+ Match(Binding)]
+ end).
route_internal(#exchange{name = Name}, RoutingKey) ->
MatchHead = #route{binding = #binding{exchange_name = Name,