summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_exchange.erl57
-rw-r--r--src/rabbit_exchange_decorator.erl39
-rw-r--r--src/rabbit_policy.erl19
-rw-r--r--src/rabbit_tests.erl5
5 files changed, 73 insertions, 49 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index eeee799ecf..4282755df5 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -40,7 +40,7 @@
-record(resource, {virtual_host, kind, name}).
-record(exchange, {name, type, durable, auto_delete, internal, arguments,
- scratches, policy}).
+ scratches, policy, decorators}).
-record(exchange_serial, {name, next}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 9e98448d63..16cfa8e36f 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -115,23 +115,23 @@ recover() ->
rabbit_durable_exchange),
[XName || #exchange{name = XName} <- Xs].
-callback(X = #exchange{type = XType}, Fun, Serial0, Args) ->
+callback(X = #exchange{type = XType,
+ decorators = Decorators}, Fun, Serial0, Args) ->
Serial = if is_function(Serial0) -> Serial0;
is_atom(Serial0) -> fun (_Bool) -> Serial0 end
end,
[ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) ||
- M <- registry_lookup(exchange_decorator)],
+ M <- rabbit_exchange_decorator:select(all, Decorators)],
Module = type_to_module(XType),
apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
policy_changed(X = #exchange{type = XType}, X1) ->
- [ok = M:policy_changed(X, X1) ||
- M <- [type_to_module(XType) | registry_lookup(exchange_decorator)]],
- ok.
+ ok = (type_to_module(XType)):policy_changed(X, X1).
-serialise_events(X = #exchange{type = Type}) ->
- lists:any(fun (M) -> M:serialise_events(X) end,
- registry_lookup(exchange_decorator))
+serialise_events(X = #exchange{type = Type, decorators = Decorators}) ->
+ lists:any(fun (M) ->
+ M:serialise_events(X)
+ end, rabbit_exchange_decorator:select(all, Decorators))
orelse (type_to_module(Type)):serialise_events().
serial(#exchange{name = XName} = X) ->
@@ -143,23 +143,14 @@ serial(#exchange{name = XName} = X) ->
(false) -> none
end.
-registry_lookup(exchange_decorator_route = Class) ->
- case get(exchange_decorator_route_modules) of
- undefined -> Mods = [M || {_, M} <- rabbit_registry:lookup_all(Class)],
- put(exchange_decorator_route_modules, Mods),
- Mods;
- Mods -> Mods
- end;
-registry_lookup(Class) ->
- [M || {_, M} <- rabbit_registry:lookup_all(Class)].
-
declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
- X = rabbit_policy:set(#exchange{name = XName,
- type = Type,
- durable = Durable,
- auto_delete = AutoDelete,
- internal = Internal,
- arguments = Args}),
+ X0 = rabbit_policy:set(#exchange{name = XName,
+ type = Type,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ internal = Internal,
+ arguments = Args}),
+ X = rabbit_exchange_decorator:record(X0, rabbit_exchange_decorator:list()),
XT = type_to_module(Type),
%% We want to upset things if it isn't ok
ok = XT:validate(X),
@@ -318,25 +309,25 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
-route(#exchange{name = #resource{virtual_host = VHost,
- name = RName} = XName} = X,
+route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName,
+ decorators = Decorators} = X,
#delivery{message = #basic_message{routing_keys = RKs}} = Delivery) ->
- case {registry_lookup(exchange_decorator_route), RName == <<"">>} of
- {[], true} ->
+ case {RName, rabbit_exchange_decorator:select(route, Decorators)} of
+ {<<"">>, []} ->
%% Optimisation
[rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
- {Decorators, _} ->
- lists:usort(route1(Delivery, Decorators, {[X], XName, []}))
+ {_, RDecorators} ->
+ lists:usort(route1(Delivery, RDecorators, {[X], XName, []}))
end.
route1(_, _, {[], _, QNames}) ->
QNames;
-route1(Delivery, Decorators,
+route1(Delivery, RDecorators,
{[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) ->
ExchangeDests = (type_to_module(Type)):route(X, Delivery),
- DecorateDests = process_decorators(X, Decorators, Delivery),
+ DecorateDests = process_decorators(X, RDecorators, Delivery),
AlternateDests = process_alternate(X, ExchangeDests),
- route1(Delivery, Decorators,
+ route1(Delivery, RDecorators,
lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames},
AlternateDests ++ DecorateDests ++ ExchangeDests)).
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index 040b55dbb0..240ddd9afc 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -16,6 +16,10 @@
-module(rabbit_exchange_decorator).
+-include("rabbit.hrl").
+
+-export([list/0, select/2, record/2]).
+
%% This is like an exchange type except that:
%%
%% 1) It applies to all exchanges as soon as it is installed, therefore
@@ -45,10 +49,6 @@
-callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) ->
'ok'.
-%% called when the policy attached to this exchange changes.
--callback policy_changed(rabbit_types:exchange(), rabbit_types:exchange()) ->
- 'ok'.
-
%% called after a binding has been added or recovered
-callback add_binding(serial(), rabbit_types:exchange(),
rabbit_types:binding()) -> 'ok'.
@@ -59,8 +59,12 @@
%% Decorators can optionally implement route/2 which allows additional
%% destinations to be added to the routing decision.
-%% -callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
-%% [rabbit_amqqueue:name() | rabbit_exchange:name()].
+-callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
+ [rabbit_amqqueue:name() | rabbit_exchange:name()] | ok.
+
+%% Whether the decorator wishes to receive callbacks for the exchange
+%% none:no callbacks, noroute:all callbacks except route, all:all callbacks
+-callback active_for(rabbit_types:exchange()) -> 'none' | 'noroute' | 'all'.
-else.
@@ -68,8 +72,29 @@
behaviour_info(callbacks) ->
[{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3},
- {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}];
+ {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3},
+ {active_for, 1}];
behaviour_info(_Other) ->
undefined.
-endif.
+
+%%----------------------------------------------------------------------------
+
+list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
+
+%% select a subset of active decorators
+select(all, {Route, NoRoute}) -> Route ++ NoRoute;
+select(route, {Route, _NoRoute}) -> Route.
+
+%% record active decorators in an exchange
+record(X, Decorators) ->
+ X#exchange{decorators =
+ lists:foldl(fun (D, {Route, NoRoute}) ->
+ Callbacks = D:active_for(X),
+ {cons_if_eq(all, Callbacks, D, Route),
+ cons_if_eq(noroute, Callbacks, D, NoRoute)}
+ end, {[], []}, Decorators)}.
+
+cons_if_eq(Select, Select, Item, List) -> [Item | List];
+cons_if_eq(_Select, _Other, _Item, List) -> List.
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 7398cd2d13..22e9bdacff 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -156,9 +156,10 @@ notify_clear(VHost, <<"policy">>, _Name) ->
update_policies(VHost) ->
Policies = list(VHost),
+ Decorators = rabbit_exchange_decorator:list(),
{Xs, Qs} = rabbit_misc:execute_mnesia_transaction(
fun() ->
- {[update_exchange(X, Policies) ||
+ {[update_exchange(X, Policies, Decorators) ||
X <- rabbit_exchange:list(VHost)],
[update_queue(Q, Policies) ||
Q <- rabbit_amqqueue:list(VHost)]}
@@ -167,12 +168,18 @@ update_policies(VHost) ->
[catch notify(Q) || Q <- Qs],
ok.
-update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) ->
+update_exchange(X = #exchange{name = XName, policy = OldPolicy},
+ Policies, Decorators) ->
case match(XName, Policies) of
- OldPolicy -> no_change;
- NewPolicy -> rabbit_exchange:update(
- XName, fun(X1) -> X1#exchange{policy = NewPolicy} end),
- {X, X#exchange{policy = NewPolicy}}
+ OldPolicy ->
+ no_change;
+ NewPolicy ->
+ rabbit_exchange:update(
+ XName, fun(X1) ->
+ rabbit_exchange_decorator:record(
+ X1#exchange{policy = NewPolicy}, Decorators)
+ end),
+ {X, X#exchange{policy = NewPolicy}}
end.
update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b2c8036427..91f560cb85 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -563,8 +563,9 @@ test_topic_matching() ->
XName = #resource{virtual_host = <<"/">>,
kind = exchange,
name = <<"test_exchange">>},
- X = #exchange{name = XName, type = topic, durable = false,
- auto_delete = false, arguments = []},
+ X0 = #exchange{name = XName, type = topic, durable = false,
+ auto_delete = false, arguments = []},
+ X = rabbit_exchange_decorator:record(X0, []),
%% create
rabbit_exchange_type_topic:validate(X),
exchange_op_callback(X, create, []),