diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_queue_location_client_local.erl | 40 | ||||
| -rw-r--r-- | src/rabbit_queue_location_min_masters.erl | 77 | ||||
| -rw-r--r-- | src/rabbit_queue_location_random.erl | 44 | ||||
| -rw-r--r-- | src/rabbit_queue_location_validator.erl | 68 | ||||
| -rw-r--r-- | src/rabbit_queue_master_location_misc.erl | 96 | ||||
| -rw-r--r-- | src/rabbit_queue_master_locator.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_registry.erl | 3 | ||||
| -rw-r--r-- | test/src/rabbit_tests.erl | 16 |
9 files changed, 384 insertions, 3 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 5bfa006e09..52b8ed6e06 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -274,9 +274,15 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> recoverable_slaves = [], gm_pids = [], state = live})), - Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), + + Node1 = case rabbit_queue_master_location_misc:get_location(Q) of + {ok, Node0} -> Node0; + {error, _} -> Node + end, + + Node1 = rabbit_mirror_queue_misc:initial_queue_node(Q, Node1), gen_server2:call( - rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare), + rabbit_amqqueue_sup_sup:start_queue_process(Node1, Q, declare), {init, new}, infinity). internal_declare(Q, true) -> diff --git a/src/rabbit_queue_location_client_local.erl b/src/rabbit_queue_location_client_local.erl new file mode 100644 index 0000000000..4cf91abc0a --- /dev/null +++ b/src/rabbit_queue_location_client_local.erl @@ -0,0 +1,40 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_queue_location_client_local). +-behaviour(rabbit_queue_master_locator). + +-include("rabbit.hrl"). + +-export([description/0, queue_master_location/1]). + +-rabbit_boot_step({?MODULE, + [{description, "locate queue master client local"}, + {mfa, {rabbit_registry, register, + [queue_master_locator, + <<"client-local">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + + +%%--------------------------------------------------------------------------- +%% Queue Master Location Callbacks +%%--------------------------------------------------------------------------- + +description() -> + [{description, <<"Locate queue master node as the client local node">>}]. + +queue_master_location(#amqqueue{}) -> {ok, node()}. diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl new file mode 100644 index 0000000000..21c3bdb045 --- /dev/null +++ b/src/rabbit_queue_location_min_masters.erl @@ -0,0 +1,77 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_queue_location_min_masters). +-behaviour(rabbit_queue_master_locator). + +-include("rabbit.hrl"). + +-export([description/0, queue_master_location/1]). + +-rabbit_boot_step({?MODULE, + [{description, "locate queue master min bound queues"}, + {mfa, {rabbit_registry, register, + [queue_master_locator, + <<"min-masters">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + +%%--------------------------------------------------------------------------- +%% Queue Master Location Callbacks +%%--------------------------------------------------------------------------- + +description() -> + [{description, + <<"Locate queue master node from cluster node with least bound queues">>}]. + +queue_master_location(#amqqueue{}) -> + Cluster = rabbit_queue_master_location_misc:all_nodes(), + VHosts = rabbit_vhost:list(), + BoundQueueMasters = get_bound_queue_masters_per_vhost(VHosts, []), + {_Count, MinMaster}= get_min_master(Cluster, BoundQueueMasters), + {ok, MinMaster}. + +%%--------------------------------------------------------------------------- +%% Private helper functions +%%--------------------------------------------------------------------------- +get_min_master(Cluster, BoundQueueMasters) -> + lists:min([ {count_masters(Node, BoundQueueMasters), Node} || + Node <- Cluster ]). + +count_masters(Node, Masters) -> + length([ X || X <- Masters, X == Node ]). + +get_bound_queue_masters_per_vhost([], Acc) -> + lists:flatten(Acc); +get_bound_queue_masters_per_vhost([VHost|RemVHosts], Acc) -> + Bindings = rabbit_binding:list(VHost), + BoundQueueMasters = get_queue_master_per_binding(VHost, Bindings, []), + get_bound_queue_masters_per_vhost(RemVHosts, [BoundQueueMasters|Acc]). + + +get_queue_master_per_binding(_VHost, [], BoundQueueNodes) -> BoundQueueNodes; +get_queue_master_per_binding(VHost, [#binding{destination= + #resource{kind=queue, + name=QueueName}}| + RemBindings], + QueueMastersAcc) -> + QueueMastersAcc0 = case rabbit_queue_master_location_misc:lookup_master( + QueueName, VHost) of + {ok, Master} when is_atom(Master) -> + [Master|QueueMastersAcc]; + _ -> QueueMastersAcc + end, + get_queue_master_per_binding(VHost, RemBindings, QueueMastersAcc0). diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl new file mode 100644 index 0000000000..b708077cf8 --- /dev/null +++ b/src/rabbit_queue_location_random.erl @@ -0,0 +1,44 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_queue_location_random). +-behaviour(rabbit_queue_master_locator). + +-include("rabbit.hrl"). + +-export([description/0, queue_master_location/1]). + +-rabbit_boot_step({?MODULE, + [{description, "locate queue master random"}, + {mfa, {rabbit_registry, register, + [queue_master_locator, + <<"random">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + +%%--------------------------------------------------------------------------- +%% Queue Master Location Callbacks +%%--------------------------------------------------------------------------- + +description() -> + [{description, + <<"Locate queue master node from cluster in a random manner">>}]. + +queue_master_location(#amqqueue{}) -> + Cluster = rabbit_queue_master_location_misc:all_nodes(), + RandomPos = erlang:phash(now(), length(Cluster)), + MasterNode = lists:nth(RandomPos, Cluster), + {ok, MasterNode}. diff --git a/src/rabbit_queue_location_validator.erl b/src/rabbit_queue_location_validator.erl new file mode 100644 index 0000000000..4eab52855a --- /dev/null +++ b/src/rabbit_queue_location_validator.erl @@ -0,0 +1,68 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_queue_location_validator). +-behaviour(rabbit_policy_validator). + +-include("rabbit.hrl"). + +-export([validate_policy/1, validate_strategy/1]). + +-rabbit_boot_step({?MODULE, + [{description, "Queue location policy validation"}, + {mfa, {rabbit_registry, register, + [policy_validator, + <<"x-queue-master-locator">>, + ?MODULE]}}]}). + +validate_policy(KeyList) -> + case proplists:lookup(<<"x-queue-master-locator">> , KeyList) of + {_, Strategy} -> validate_strategy(Strategy); + _ -> {error, "x-queue-master-locator undefined"} + end. + +validate_strategy(Strategy) -> + case module(Strategy) of + R={ok, _M} -> R; + _ -> + {error, "~p invalid x-queue-master-locator value", [Strategy]} + end. + +policy(Policy, Q) -> + case rabbit_policy:get(Policy, Q) of + undefined -> none; + P -> P + end. + +module(#amqqueue{} = Q) -> + case policy(<<"x-queue-master-locator">>, Q) of + undefined -> no_location_strategy; + Mode -> module(Mode) + end; + +module(Strategy) when is_binary(Strategy) -> + case rabbit_registry:binary_to_type(Strategy) of + {error, not_found} -> no_location_strategy; + T -> + case rabbit_registry:lookup_module(queue_master_locator, T) of + {ok, Module} -> + case code:which(Module) of + non_existing -> no_location_strategy; + _ -> {ok, Module} + end; + _ -> no_location_strategy + end + end. diff --git a/src/rabbit_queue_master_location_misc.erl b/src/rabbit_queue_master_location_misc.erl new file mode 100644 index 0000000000..279869d054 --- /dev/null +++ b/src/rabbit_queue_master_location_misc.erl @@ -0,0 +1,96 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_queue_master_location_misc). + +-include("rabbit.hrl"). + +-export([lookup_master/2, + lookup_queue/2, + get_location/1, + get_location_mod_by_config/1, + get_location_mod_by_args/1, + get_location_mod_by_policy/1, + all_nodes/0]). + +lookup_master(QueueNameBin, VHostPath) when is_binary(QueueNameBin), + is_binary(VHostPath) -> + Queue = rabbit_misc:r(VHostPath, queue, QueueNameBin), + case rabbit_amqqueue:lookup(Queue) of + {ok, #amqqueue{pid=Pid}} when is_pid(Pid) -> + {ok, node(Pid)}; + Error -> Error + end. + +lookup_queue(QueueNameBin, VHostPath) when is_binary(QueueNameBin), + is_binary(VHostPath) -> + Queue = rabbit_misc:r(VHostPath, queue, QueueNameBin), + case rabbit_amqqueue:lookup(Queue) of + Reply = {ok, #amqqueue{}} -> Reply; + Error -> Error + end. + +get_location(Queue=#amqqueue{})-> + Reply1 = case get_location_mod_by_args(Queue) of + _Err1={error, _} -> + case get_location_mod_by_policy(Queue) of + _Err2={error, _} -> + case get_location_mod_by_config(Queue) of + Err3={error, _} -> Err3; + Reply0={ok, _Module} -> Reply0 + end; + Reply0={ok, _Module} -> Reply0 + end; + Reply0={ok, _Module} -> Reply0 + end, + + case Reply1 of + {ok, CB} -> CB:queue_master_location(Queue); + Error -> Error + end. + +get_location_mod_by_args(#amqqueue{arguments=Args}) -> + case proplists:lookup(<<"x-queue-master-locator">> , Args) of + {<<"x-queue-master-locator">> , Strategy} -> + case rabbit_queue_location_validator:validate_strategy(Strategy) of + Reply={ok, _CB} -> Reply; + Error -> Error + end; + _ -> {error, "x-queue-master-locator undefined"} + end. + +get_location_mod_by_policy(Queue=#amqqueue{}) -> + case rabbit_policy:get(<<"x-queue-master-locator">> , Queue) of + undefined -> {error, "x-queue-master-locator policy undefined"}; + Strategy -> + case rabbit_queue_location_validator:validate_strategy(Strategy) of + Reply={ok, _CB} -> Reply; + Error -> Error + end + end. + +get_location_mod_by_config(#amqqueue{}) -> + case application:get_env(rabbit, queue_master_locator) of + {ok, Strategy} -> + case rabbit_queue_location_validator:validate_strategy(Strategy) of + Reply={ok, _CB} -> Reply; + Error -> Error + end; + _ -> {error, "queue_master_locator undefined"} + end. + +all_nodes() -> rabbit_mnesia:cluster_nodes(running). + diff --git a/src/rabbit_queue_master_locator.erl b/src/rabbit_queue_master_locator.erl new file mode 100644 index 0000000000..caaa7bbbde --- /dev/null +++ b/src/rabbit_queue_master_locator.erl @@ -0,0 +1,33 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_queue_master_locator). + +-ifdef(use_specs). + +-callback description() -> [proplists:property()]. +-callback queue_master_location(pid()) -> {'ok', node()} | {'error', term()}. + +-else. + +-export([behaviour_info/1]). +behaviour_info(callbacks) -> + [{description, 0}, + {queue_master_location, 1}]; +behaviour_info(_Other) -> + undefined. + +-endif. diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index fc016e718e..f75d839bbf 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -133,7 +133,8 @@ class_module(exchange_decorator) -> rabbit_exchange_decorator; class_module(queue_decorator) -> rabbit_queue_decorator; class_module(policy_validator) -> rabbit_policy_validator; class_module(ha_mode) -> rabbit_mirror_queue_mode; -class_module(channel_interceptor) -> rabbit_channel_interceptor. +class_module(channel_interceptor) -> rabbit_channel_interceptor; +class_module(queue_master_locator)-> rabbit_queue_master_locator. %%--------------------------------------------------------------------------- diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl index 95a8c45ffa..4c1489f6aa 100644 --- a/test/src/rabbit_tests.erl +++ b/test/src/rabbit_tests.erl @@ -72,6 +72,7 @@ all_tests0() -> passed = test_policy_validation(), passed = test_policy_opts_validation(), passed = test_ha_policy_validation(), + passed = test_queue_master_location_policy_validation(), passed = test_server_status(), passed = test_amqp_connection_refusal(), passed = test_confirms(), @@ -1155,6 +1156,21 @@ test_ha_policy_validation() -> ok = control_action(clear_policy, ["name"]), passed. +test_queue_master_location_policy_validation() -> + Set = fun (JSON) -> + control_action_opts( ["set_policy", "name", ".*", JSON] ) + end, + OK = fun (JSON) -> ok = Set(JSON) end, + Fail = fun (JSON) -> error = Set(JSON) end, + + OK ("{\"x-queue-master-locator\":\"min-masters\"}"), + OK ("{\"x-queue-master-locator\":\"client-local\"}"), + OK ("{\"x-queue-master-locator\":\"random\"}"), + Fail("{\"x-queue-master-locator\":\"made_up\"}"), + + ok = control_action(clear_policy, ["name"]), + passed. + test_server_status() -> %% create a few things so there is some useful information to list {_Writer, Limiter, Ch} = test_channel(), |
