diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2013-03-22 12:47:21 +0000 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2013-03-22 12:47:21 +0000 |
| commit | 6fa8df03f5de1bfcdef7ec9beeebdbe0b910aee2 (patch) | |
| tree | d334f402c4a44cfe999dbc17285daacffc45c10a | |
| parent | 9b8ca7c15dd1ec0402dc4f9f989a4ec60907c4cc (diff) | |
| download | rabbitmq-server-git-6fa8df03f5de1bfcdef7ec9beeebdbe0b910aee2.tar.gz | |
Move decorators to exchange record
| -rw-r--r-- | include/rabbit.hrl | 2 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 57 | ||||
| -rw-r--r-- | src/rabbit_exchange_decorator.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_policy.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 5 |
5 files changed, 73 insertions, 49 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index eeee799ecf..4282755df5 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -40,7 +40,7 @@ -record(resource, {virtual_host, kind, name}). -record(exchange, {name, type, durable, auto_delete, internal, arguments, - scratches, policy}). + scratches, policy, decorators}). -record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 9e98448d63..16cfa8e36f 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -115,23 +115,23 @@ recover() -> rabbit_durable_exchange), [XName || #exchange{name = XName} <- Xs]. -callback(X = #exchange{type = XType}, Fun, Serial0, Args) -> +callback(X = #exchange{type = XType, + decorators = Decorators}, Fun, Serial0, Args) -> Serial = if is_function(Serial0) -> Serial0; is_atom(Serial0) -> fun (_Bool) -> Serial0 end end, [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) || - M <- registry_lookup(exchange_decorator)], + M <- rabbit_exchange_decorator:select(all, Decorators)], Module = type_to_module(XType), apply(Module, Fun, [Serial(Module:serialise_events()) | Args]). policy_changed(X = #exchange{type = XType}, X1) -> - [ok = M:policy_changed(X, X1) || - M <- [type_to_module(XType) | registry_lookup(exchange_decorator)]], - ok. + ok = (type_to_module(XType)):policy_changed(X, X1). -serialise_events(X = #exchange{type = Type}) -> - lists:any(fun (M) -> M:serialise_events(X) end, - registry_lookup(exchange_decorator)) +serialise_events(X = #exchange{type = Type, decorators = Decorators}) -> + lists:any(fun (M) -> + M:serialise_events(X) + end, rabbit_exchange_decorator:select(all, Decorators)) orelse (type_to_module(Type)):serialise_events(). serial(#exchange{name = XName} = X) -> @@ -143,23 +143,14 @@ serial(#exchange{name = XName} = X) -> (false) -> none end. -registry_lookup(exchange_decorator_route = Class) -> - case get(exchange_decorator_route_modules) of - undefined -> Mods = [M || {_, M} <- rabbit_registry:lookup_all(Class)], - put(exchange_decorator_route_modules, Mods), - Mods; - Mods -> Mods - end; -registry_lookup(Class) -> - [M || {_, M} <- rabbit_registry:lookup_all(Class)]. - declare(XName, Type, Durable, AutoDelete, Internal, Args) -> - X = rabbit_policy:set(#exchange{name = XName, - type = Type, - durable = Durable, - auto_delete = AutoDelete, - internal = Internal, - arguments = Args}), + X0 = rabbit_policy:set(#exchange{name = XName, + type = Type, + durable = Durable, + auto_delete = AutoDelete, + internal = Internal, + arguments = Args}), + X = rabbit_exchange_decorator:record(X0, rabbit_exchange_decorator:list()), XT = type_to_module(Type), %% We want to upset things if it isn't ok ok = XT:validate(X), @@ -318,25 +309,25 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -route(#exchange{name = #resource{virtual_host = VHost, - name = RName} = XName} = X, +route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName, + decorators = Decorators} = X, #delivery{message = #basic_message{routing_keys = RKs}} = Delivery) -> - case {registry_lookup(exchange_decorator_route), RName == <<"">>} of - {[], true} -> + case {RName, rabbit_exchange_decorator:select(route, Decorators)} of + {<<"">>, []} -> %% Optimisation [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)]; - {Decorators, _} -> - lists:usort(route1(Delivery, Decorators, {[X], XName, []})) + {_, RDecorators} -> + lists:usort(route1(Delivery, RDecorators, {[X], XName, []})) end. route1(_, _, {[], _, QNames}) -> QNames; -route1(Delivery, Decorators, +route1(Delivery, RDecorators, {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) -> ExchangeDests = (type_to_module(Type)):route(X, Delivery), - DecorateDests = process_decorators(X, Decorators, Delivery), + DecorateDests = process_decorators(X, RDecorators, Delivery), AlternateDests = process_alternate(X, ExchangeDests), - route1(Delivery, Decorators, + route1(Delivery, RDecorators, lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames}, AlternateDests ++ DecorateDests ++ ExchangeDests)). diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl index 040b55dbb0..240ddd9afc 100644 --- a/src/rabbit_exchange_decorator.erl +++ b/src/rabbit_exchange_decorator.erl @@ -16,6 +16,10 @@ -module(rabbit_exchange_decorator). +-include("rabbit.hrl"). + +-export([list/0, select/2, record/2]). + %% This is like an exchange type except that: %% %% 1) It applies to all exchanges as soon as it is installed, therefore @@ -45,10 +49,6 @@ -callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'. -%% called when the policy attached to this exchange changes. --callback policy_changed(rabbit_types:exchange(), rabbit_types:exchange()) -> - 'ok'. - %% called after a binding has been added or recovered -callback add_binding(serial(), rabbit_types:exchange(), rabbit_types:binding()) -> 'ok'. @@ -59,8 +59,12 @@ %% Decorators can optionally implement route/2 which allows additional %% destinations to be added to the routing decision. -%% -callback route(rabbit_types:exchange(), rabbit_types:delivery()) -> -%% [rabbit_amqqueue:name() | rabbit_exchange:name()]. +-callback route(rabbit_types:exchange(), rabbit_types:delivery()) -> + [rabbit_amqqueue:name() | rabbit_exchange:name()] | ok. + +%% Whether the decorator wishes to receive callbacks for the exchange +%% none:no callbacks, noroute:all callbacks except route, all:all callbacks +-callback active_for(rabbit_types:exchange()) -> 'none' | 'noroute' | 'all'. -else. @@ -68,8 +72,29 @@ behaviour_info(callbacks) -> [{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3}, - {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}]; + {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}, + {active_for, 1}]; behaviour_info(_Other) -> undefined. -endif. + +%%---------------------------------------------------------------------------- + +list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)]. + +%% select a subset of active decorators +select(all, {Route, NoRoute}) -> Route ++ NoRoute; +select(route, {Route, _NoRoute}) -> Route. + +%% record active decorators in an exchange +record(X, Decorators) -> + X#exchange{decorators = + lists:foldl(fun (D, {Route, NoRoute}) -> + Callbacks = D:active_for(X), + {cons_if_eq(all, Callbacks, D, Route), + cons_if_eq(noroute, Callbacks, D, NoRoute)} + end, {[], []}, Decorators)}. + +cons_if_eq(Select, Select, Item, List) -> [Item | List]; +cons_if_eq(_Select, _Other, _Item, List) -> List. diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 7398cd2d13..22e9bdacff 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -156,9 +156,10 @@ notify_clear(VHost, <<"policy">>, _Name) -> update_policies(VHost) -> Policies = list(VHost), + Decorators = rabbit_exchange_decorator:list(), {Xs, Qs} = rabbit_misc:execute_mnesia_transaction( fun() -> - {[update_exchange(X, Policies) || + {[update_exchange(X, Policies, Decorators) || X <- rabbit_exchange:list(VHost)], [update_queue(Q, Policies) || Q <- rabbit_amqqueue:list(VHost)]} @@ -167,12 +168,18 @@ update_policies(VHost) -> [catch notify(Q) || Q <- Qs], ok. -update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) -> +update_exchange(X = #exchange{name = XName, policy = OldPolicy}, + Policies, Decorators) -> case match(XName, Policies) of - OldPolicy -> no_change; - NewPolicy -> rabbit_exchange:update( - XName, fun(X1) -> X1#exchange{policy = NewPolicy} end), - {X, X#exchange{policy = NewPolicy}} + OldPolicy -> + no_change; + NewPolicy -> + rabbit_exchange:update( + XName, fun(X1) -> + rabbit_exchange_decorator:record( + X1#exchange{policy = NewPolicy}, Decorators) + end), + {X, X#exchange{policy = NewPolicy}} end. update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b2c8036427..91f560cb85 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -563,8 +563,9 @@ test_topic_matching() -> XName = #resource{virtual_host = <<"/">>, kind = exchange, name = <<"test_exchange">>}, - X = #exchange{name = XName, type = topic, durable = false, - auto_delete = false, arguments = []}, + X0 = #exchange{name = XName, type = topic, durable = false, + auto_delete = false, arguments = []}, + X = rabbit_exchange_decorator:record(X0, []), %% create rabbit_exchange_type_topic:validate(X), exchange_op_callback(X, create, []), |
