summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_exchange.erl39
-rw-r--r--src/rabbit_exchange_decorator.erl6
-rw-r--r--src/rabbit_registry.erl11
3 files changed, 39 insertions, 17 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 88033f7762..0a3849ef55 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -117,14 +117,15 @@ callback(X = #exchange{type = XType}, Fun, Serial0, Args) ->
is_atom(Serial0) -> fun (_Bool) -> Serial0 end
end,
[ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) ||
- M <- decorators()],
+ M <- registry_lookup(exchange_decorator)],
Module = type_to_module(XType),
apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]).
serialise_events(X = #exchange{type = Type}) ->
- lists:any(fun (M) -> M:serialise_events(X) end, decorators())
+ lists:any(fun (M) -> M:serialise_events(X) end,
+ registry_lookup(exchange_decorator))
orelse (type_to_module(Type)):serialise_events().
serial(#exchange{name = XName} = X) ->
@@ -136,8 +137,15 @@ serial(#exchange{name = XName} = X) ->
(false) -> none
end.
-decorators() ->
- [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
+registry_lookup(exchange_decorator_route = Class) ->
+ case get(exchange_decorator_route_modules) of
+ undefined -> Mods = [M || {_, M} <- rabbit_registry:lookup_all(Class)],
+ put(exchange_decorator_route_modules, Mods),
+ Mods;
+ Mods -> Mods
+ end;
+registry_lookup(Class) ->
+ [M || {_, M} <- rabbit_registry:lookup_all(Class)].
declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
X = rabbit_policy:set(#exchange{name = XName,
@@ -304,16 +312,25 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
-%% Optimisation
-route(#exchange{name = #resource{name = <<"">>, virtual_host = VHost}},
- #delivery{message = #basic_message{routing_keys = RKs}}) ->
- [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
+route(#exchange{name = #resource{name = RName, virtual_host = VHost} = XName} = X,
+ #delivery{message = #basic_message{routing_keys = RKs}} = Delivery) ->
+ case registry_lookup(exchange_decorator_route) == [] andalso
+ RName == <<"">> of
+ true -> [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
+ false -> QNames = route1(Delivery, {[X], XName, []}),
+ lists:usort(decorate_route(X, Delivery, QNames))
+ end.
-route(X = #exchange{name = XName}, Delivery) ->
- route1(Delivery, {[X], XName, []}).
+decorate_route(X, Delivery, QNames) ->
+ {Add, Remove} =
+ lists:foldl(fun (Decorator, {Add, Remove}) ->
+ {A1, R1} = Decorator:route(X, Delivery, QNames),
+ {A1 ++ Add, R1 ++ Remove}
+ end, {[], []}, registry_lookup(exchange_decorator_route)),
+ QNames ++ Add -- Remove.
route1(_, {[], _, QNames}) ->
- lists:usort(QNames);
+ QNames;
route1(Delivery, {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) ->
DstNames = process_alternate(
X, ((type_to_module(Type)):route(X, Delivery))),
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index befbc46219..4e395cbe0a 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -58,13 +58,17 @@
-callback policy_changed (
serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'.
+-callback route ( rabbit_types:exchange(), rabbit_types:delivery(),
+ [rabbit_amqqueue:name()]) ->
+ {[rabbit_amqqueue:name()], [rabbit_amqqueue:name()]}.
+
-else.
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
[{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3},
- {add_binding, 3}, {remove_bindings, 3}, {policy_changed, 3}];
+ {add_binding, 3}, {remove_bindings, 3}, {policy_changed, 3}, {route, 3}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 60419856c9..3514e7806c 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -104,11 +104,12 @@ sanity_check_module(ClassModule, Module) ->
true -> ok
end.
-class_module(exchange) -> rabbit_exchange_type;
-class_module(auth_mechanism) -> rabbit_auth_mechanism;
-class_module(runtime_parameter) -> rabbit_runtime_parameter;
-class_module(exchange_decorator) -> rabbit_exchange_decorator;
-class_module(policy_validator) -> rabbit_policy_validator.
+class_module(exchange) -> rabbit_exchange_type;
+class_module(auth_mechanism) -> rabbit_auth_mechanism;
+class_module(runtime_parameter) -> rabbit_runtime_parameter;
+class_module(exchange_decorator) -> rabbit_exchange_decorator;
+class_module(exchange_decorator_route) -> rabbit_exchange_decorator;
+class_module(policy_validator) -> rabbit_policy_validator.
%%---------------------------------------------------------------------------