diff options
| author | Tim Watson <tim@rabbitmq.com> | 2013-03-21 12:44:35 +0000 |
|---|---|---|
| committer | Tim Watson <tim@rabbitmq.com> | 2013-03-21 12:44:35 +0000 |
| commit | 5633e033cbccac337486d617953051aa2ac5dfee (patch) | |
| tree | 017c8276064685c9cc6c95ede4ec2f8c44c997af /src | |
| parent | 06c3c514203278557cc8a9f356a3a443e49dfef1 (diff) | |
| parent | 34978ae2d48a94f62c129ca0096c529955486802 (diff) | |
| download | rabbitmq-server-git-5633e033cbccac337486d617953051aa2ac5dfee.tar.gz | |
merge bug23378 into default
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_exchange.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_exchange_decorator.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_registry.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 2 |
5 files changed, 58 insertions, 36 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 5f4fb9ec87..9e98448d63 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -326,34 +326,34 @@ route(#exchange{name = #resource{virtual_host = VHost, %% Optimisation [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)]; {Decorators, _} -> - QNames = route1(Delivery, {[X], XName, []}), - lists:usort(decorate_route(Decorators, X, Delivery, QNames)) + lists:usort(route1(Delivery, Decorators, {[X], XName, []})) end. -decorate_route([], _X, _Delivery, QNames) -> +route1(_, _, {[], _, QNames}) -> QNames; -decorate_route(Decorators, X, Delivery, QNames) -> - QNames ++ - lists:append([Decorator:route(X, Delivery) || Decorator <- Decorators]). - -route1(_, {[], _, QNames}) -> - QNames; -route1(Delivery, {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) -> - DstNames = process_alternate( - X, ((type_to_module(Type)):route(X, Delivery))), - route1(Delivery, +route1(Delivery, Decorators, + {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) -> + ExchangeDests = (type_to_module(Type)):route(X, Delivery), + DecorateDests = process_decorators(X, Decorators, Delivery), + AlternateDests = process_alternate(X, ExchangeDests), + route1(Delivery, Decorators, lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames}, - DstNames)). + AlternateDests ++ DecorateDests ++ ExchangeDests)). -process_alternate(#exchange{arguments = []}, Results) -> %% optimisation - Results; +process_alternate(#exchange{arguments = []}, _Results) -> %% optimisation + []; process_alternate(#exchange{name = XName, arguments = Args}, []) -> case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of undefined -> []; AName -> [AName] end; -process_alternate(_X, Results) -> - Results. +process_alternate(_X, _Results) -> + []. + +process_decorators(_, [], _) -> %% optimisation + []; +process_decorators(X, Decorators, Delivery) -> + lists:append([Decorator:route(X, Delivery) || Decorator <- Decorators]). process_route(#resource{kind = exchange} = XName, {_WorkList, XName, _QNames} = Acc) -> diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl index 8f17adfc23..040b55dbb0 100644 --- a/src/rabbit_exchange_decorator.erl +++ b/src/rabbit_exchange_decorator.erl @@ -57,12 +57,10 @@ -callback remove_bindings(serial(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'. -%% called after exchange routing -%% return value is a list of queues to be added to the list of -%% destination queues. decorators must register separately for -%% this callback using exchange_decorator_route. --callback route(rabbit_types:exchange(), rabbit_types:delivery()) -> - [rabbit_amqqueue:name()]. +%% Decorators can optionally implement route/2 which allows additional +%% destinations to be added to the routing decision. +%% -callback route(rabbit_types:exchange(), rabbit_types:delivery()) -> +%% [rabbit_amqqueue:name() | rabbit_exchange:name()]. -else. @@ -70,7 +68,7 @@ behaviour_info(callbacks) -> [{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3}, - {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}, {route, 2}]; + {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index de53b7f0b3..3872f3dfa2 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -318,6 +318,9 @@ alive_nodes() -> Nodes = rabbit_mnesia:cluster_nodes(all), [N || N <- Nodes, pong =:= net_adm:ping(N)]. +alive_rabbit_nodes() -> + [N || N <- alive_nodes(), rabbit_nodes:is_running(N, rabbit)]. + await_cluster_recovery() -> rabbit_log:warning("Cluster minority status detected - awaiting recovery~n", []), @@ -346,7 +349,7 @@ handle_dead_rabbit_state(State = #state{partitions = Partitions}) -> %% that we do not attempt to deal with individual (other) partitions %% going away. It's only safe to forget anything about partitions when %% there are no partitions. - Partitions1 = case Partitions -- (Partitions -- alive_nodes()) of + Partitions1 = case Partitions -- (Partitions -- alive_rabbit_nodes()) of [] -> []; _ -> Partitions end, diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index 3514e7806c..acdc2cffae 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -84,12 +84,34 @@ internal_binary_to_type(TypeBin) when is_binary(TypeBin) -> internal_register(Class, TypeName, ModuleName) when is_atom(Class), is_binary(TypeName), is_atom(ModuleName) -> ok = sanity_check_module(class_module(Class), ModuleName), - true = ets:insert(?ETS_NAME, - {{Class, internal_binary_to_type(TypeName)}, ModuleName}), + RegArg = {{Class, internal_binary_to_type(TypeName)}, ModuleName}, + true = ets:insert(?ETS_NAME, RegArg), + conditional_register(RegArg), ok. internal_unregister(Class, TypeName) -> - true = ets:delete(?ETS_NAME, {Class, internal_binary_to_type(TypeName)}), + UnregArg = {Class, internal_binary_to_type(TypeName)}, + conditional_unregister(UnregArg), + true = ets:delete(?ETS_NAME, UnregArg), + ok. + +%% register exchange decorator route callback only when implemented, +%% in order to avoid unnecessary decorator calls on the fast +%% publishing path +conditional_register({{exchange_decorator, Type}, ModuleName}) -> + case erlang:function_exported(ModuleName, route, 2) of + true -> true = ets:insert(?ETS_NAME, + {{exchange_decorator_route, Type}, + ModuleName}); + false -> ok + end; +conditional_register(_) -> + ok. + +conditional_unregister({exchange_decorator, Type}) -> + true = ets:delete(?ETS_NAME, {exchange_decorator_route, Type}), + ok; +conditional_unregister(_) -> ok. sanity_check_module(ClassModule, Module) -> @@ -104,12 +126,11 @@ sanity_check_module(ClassModule, Module) -> true -> ok end. -class_module(exchange) -> rabbit_exchange_type; -class_module(auth_mechanism) -> rabbit_auth_mechanism; -class_module(runtime_parameter) -> rabbit_runtime_parameter; -class_module(exchange_decorator) -> rabbit_exchange_decorator; -class_module(exchange_decorator_route) -> rabbit_exchange_decorator; -class_module(policy_validator) -> rabbit_policy_validator. +class_module(exchange) -> rabbit_exchange_type; +class_module(auth_mechanism) -> rabbit_auth_mechanism; +class_module(runtime_parameter) -> rabbit_runtime_parameter; +class_module(exchange_decorator) -> rabbit_exchange_decorator; +class_module(policy_validator) -> rabbit_policy_validator. %%--------------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 1188c5549a..cd8fa72052 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1580,7 +1580,7 @@ control_action(Command, Node, Args, Opts) -> info_action(Command, Args, CheckVHost) -> ok = control_action(Command, []), - if CheckVHost -> ok = control_action(Command, []); + if CheckVHost -> ok = control_action(Command, [], ["-p", "/"]); true -> ok end, ok = control_action(Command, lists:map(fun atom_to_list/1, Args)), |
