diff options
| author | Matthias Radestock <matthias@lshift.net> | 2009-01-15 14:04:03 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2009-01-15 14:04:03 +0000 |
| commit | 5e75164e8f6cc1df4913537de949cf0fdd7220b5 (patch) | |
| tree | ae02e1344807945ca827311000b42e1176c80ecf | |
| parent | 4088c214dfabafdc6a07231ac95c7400b7763885 (diff) | |
| download | rabbitmq-server-git-5e75164e8f6cc1df4913537de949cf0fdd7220b5.tar.gz | |
refactoring to eliminate code duplication
...between topic and headers exchange routing
| -rw-r--r-- | src/rabbit_exchange.erl | 79 |
1 files changed, 31 insertions, 48 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index c83dd38b08..2b5d9763f2 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -233,65 +233,48 @@ sort_arguments(Arguments) -> %% The function ensures that a qpid appears in the return list exactly %% as many times as a message should be delivered to it. With the %% current exchange types that is at most once. -%% -%% TODO: Maybe this should be handled by a cursor instead. route(#exchange{name = Name, type = topic}, RoutingKey, _Content) -> - Query = qlc:q([QName || - #route{binding = #binding{ - exchange_name = ExchangeName, - queue_name = QName, - key = BindingKey}} <- mnesia:table(route), - ExchangeName == Name, - %% TODO: This causes a full scan for each entry - %% with the same exchange (see bug 19336) - topic_matches(BindingKey, RoutingKey)]), - lookup_qpids( - try - mnesia:async_dirty(fun qlc:e/1, [Query]) - catch exit:{aborted, {badarg, _}} -> - %% work around OTP-7025, which was fixed in R12B-1, by - %% falling back on a less efficient method - [QName || #route{binding = #binding{queue_name = QName, - key = BindingKey}} <- - mnesia:dirty_match_object( - #route{binding = #binding{exchange_name = Name, - _ = '_'}}), - topic_matches(BindingKey, RoutingKey)] - end); + match_bindings(Name, fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RoutingKey) + end); -route(#exchange{name = Name, type = headers}, - _RoutingKey, - #content{properties = #'P_basic'{headers = Headers0}}) -> - Headers = case Headers0 of +route(#exchange{name = Name, type = headers}, _RoutingKey, Content) -> + Headers = case (Content#content.properties)#'P_basic'.headers of undefined -> []; - _ -> sort_arguments(Headers0) + H -> sort_arguments(H) end, - Query = qlc:q([QName || - #route{binding = #binding{ - exchange_name = ExchangeName, - queue_name = QName, - args = Spec}} <- mnesia:table(route), - ExchangeName == Name, - headers_match(Spec, Headers)]), + match_bindings(Name, fun (#binding{args = Spec}) -> + headers_match(Spec, Headers) + end); + +route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> + route_internal(X, '_'); + +route(X = #exchange{type = direct}, RoutingKey, _Content) -> + route_internal(X, RoutingKey). + +%% TODO: Maybe this should be handled by a cursor instead. +%% TODO: This causes a full scan for each entry with the same exchange +match_bindings(XName, Match) -> + Query = qlc:q([QName || #route{binding = Binding = #binding{ + exchange_name = ExchangeName, + queue_name = QName}} <- + mnesia:table(route), + ExchangeName == XName, + Match(Binding)]), lookup_qpids( try mnesia:async_dirty(fun qlc:e/1, [Query]) catch exit:{aborted, {badarg, _}} -> %% work around OTP-7025, which was fixed in R12B-1, by %% falling back on a less efficient method - [QName || #route{binding = #binding{queue_name = QName, - args = Spec}} <- - mnesia:dirty_match_object( - #route{binding = #binding{exchange_name = Name, + [QName || #route{binding = Binding = #binding{ + queue_name = QName}} <- + mnesia:dirty_match_object( + #route{binding = #binding{exchange_name = XName, _ = '_'}}), - headers_match(Spec, Headers)] - end); - -route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> - route_internal(X, '_'); - -route(X = #exchange{type = direct}, RoutingKey, _Content) -> - route_internal(X, RoutingKey). + Match(Binding)] + end). route_internal(#exchange{name = Name}, RoutingKey) -> MatchHead = #route{binding = #binding{exchange_name = Name, |
