diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2013-03-06 21:07:31 +0000 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2013-03-06 21:07:31 +0000 |
| commit | 7ed42b79ac27358b9f2bdc00aa31f5ce0799853a (patch) | |
| tree | b3357b78eb853504f0204d5d1cc8425423cc4818 | |
| parent | 2e665ace9e3202b0e89fdfb6910b74b9646e88ff (diff) | |
| download | rabbitmq-server-git-7ed42b79ac27358b9f2bdc00aa31f5ce0799853a.tar.gz | |
Permit exchange decorators to modify routing decisions
| -rw-r--r-- | src/rabbit_exchange.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_exchange_decorator.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_registry.erl | 11 |
3 files changed, 39 insertions, 17 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 88033f7762..0a3849ef55 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -117,14 +117,15 @@ callback(X = #exchange{type = XType}, Fun, Serial0, Args) -> is_atom(Serial0) -> fun (_Bool) -> Serial0 end end, [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) || - M <- decorators()], + M <- registry_lookup(exchange_decorator)], Module = type_to_module(XType), apply(Module, Fun, [Serial(Module:serialise_events()) | Args]). policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]). serialise_events(X = #exchange{type = Type}) -> - lists:any(fun (M) -> M:serialise_events(X) end, decorators()) + lists:any(fun (M) -> M:serialise_events(X) end, + registry_lookup(exchange_decorator)) orelse (type_to_module(Type)):serialise_events(). serial(#exchange{name = XName} = X) -> @@ -136,8 +137,15 @@ serial(#exchange{name = XName} = X) -> (false) -> none end. -decorators() -> - [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)]. +registry_lookup(exchange_decorator_route = Class) -> + case get(exchange_decorator_route_modules) of + undefined -> Mods = [M || {_, M} <- rabbit_registry:lookup_all(Class)], + put(exchange_decorator_route_modules, Mods), + Mods; + Mods -> Mods + end; +registry_lookup(Class) -> + [M || {_, M} <- rabbit_registry:lookup_all(Class)]. declare(XName, Type, Durable, AutoDelete, Internal, Args) -> X = rabbit_policy:set(#exchange{name = XName, @@ -304,16 +312,25 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -%% Optimisation -route(#exchange{name = #resource{name = <<"">>, virtual_host = VHost}}, - #delivery{message = #basic_message{routing_keys = RKs}}) -> - [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)]; +route(#exchange{name = #resource{name = RName, virtual_host = VHost} = XName} = X, + #delivery{message = #basic_message{routing_keys = RKs}} = Delivery) -> + case registry_lookup(exchange_decorator_route) == [] andalso + RName == <<"">> of + true -> [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)]; + false -> QNames = route1(Delivery, {[X], XName, []}), + lists:usort(decorate_route(X, Delivery, QNames)) + end. -route(X = #exchange{name = XName}, Delivery) -> - route1(Delivery, {[X], XName, []}). +decorate_route(X, Delivery, QNames) -> + {Add, Remove} = + lists:foldl(fun (Decorator, {Add, Remove}) -> + {A1, R1} = Decorator:route(X, Delivery, QNames), + {A1 ++ Add, R1 ++ Remove} + end, {[], []}, registry_lookup(exchange_decorator_route)), + QNames ++ Add -- Remove. route1(_, {[], _, QNames}) -> - lists:usort(QNames); + QNames; route1(Delivery, {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) -> DstNames = process_alternate( X, ((type_to_module(Type)):route(X, Delivery))), diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl index befbc46219..4e395cbe0a 100644 --- a/src/rabbit_exchange_decorator.erl +++ b/src/rabbit_exchange_decorator.erl @@ -58,13 +58,17 @@ -callback policy_changed ( serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'. +-callback route ( rabbit_types:exchange(), rabbit_types:delivery(), + [rabbit_amqqueue:name()]) -> + {[rabbit_amqqueue:name()], [rabbit_amqqueue:name()]}. + -else. -export([behaviour_info/1]). behaviour_info(callbacks) -> [{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3}, - {add_binding, 3}, {remove_bindings, 3}, {policy_changed, 3}]; + {add_binding, 3}, {remove_bindings, 3}, {policy_changed, 3}, {route, 3}]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index 60419856c9..3514e7806c 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -104,11 +104,12 @@ sanity_check_module(ClassModule, Module) -> true -> ok end. -class_module(exchange) -> rabbit_exchange_type; -class_module(auth_mechanism) -> rabbit_auth_mechanism; -class_module(runtime_parameter) -> rabbit_runtime_parameter; -class_module(exchange_decorator) -> rabbit_exchange_decorator; -class_module(policy_validator) -> rabbit_policy_validator. +class_module(exchange) -> rabbit_exchange_type; +class_module(auth_mechanism) -> rabbit_auth_mechanism; +class_module(runtime_parameter) -> rabbit_runtime_parameter; +class_module(exchange_decorator) -> rabbit_exchange_decorator; +class_module(exchange_decorator_route) -> rabbit_exchange_decorator; +class_module(policy_validator) -> rabbit_policy_validator. %%--------------------------------------------------------------------------- |
