diff options
| author | Ayanda Dube <ayanda.dube@erlang-solutions.com> | 2015-07-07 13:59:42 +0100 |
|---|---|---|
| committer | Ayanda Dube <ayanda.dube@erlang-solutions.com> | 2015-07-07 13:59:42 +0100 |
| commit | 7f81963ae0e37d40f291035cbe96b15b73db0c1c (patch) | |
| tree | 07977efb911b4a65927df545449dd6b414821217 | |
| parent | f0931b7dcca40e366e8272dfa403cc52493ec9b3 (diff) | |
| download | rabbitmq-server-git-7f81963ae0e37d40f291035cbe96b15b73db0c1c.tar.gz | |
Initial commit #121
| -rw-r--r-- | src/rabbit_amqqueue.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_queue_location_client_local.erl | 47 | ||||
| -rw-r--r-- | src/rabbit_queue_location_min_masters.erl | 69 | ||||
| -rw-r--r-- | src/rabbit_queue_location_random.erl | 47 | ||||
| -rw-r--r-- | src/rabbit_queue_location_round_robin.erl | 61 | ||||
| -rw-r--r-- | src/rabbit_queue_master_location_misc.erl | 144 | ||||
| -rw-r--r-- | src/rabbit_queue_master_locator.erl | 38 |
7 files changed, 414 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9ce800023f..f75636d248 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -274,9 +274,15 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> recoverable_slaves = [], gm_pids = [], state = live})), - Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), + + Node1 = case rabbit_queue_master_location_misc:get_location(Q) of + {ok, Node0} -> Node0; + {error, _} -> Node + end, + + Node1 = rabbit_mirror_queue_misc:initial_queue_node(Q, Node1), gen_server2:call( - rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare), + rabbit_amqqueue_sup_sup:start_queue_process(Node1, Q, declare), {init, new}, infinity). internal_declare(Q, true) -> diff --git a/src/rabbit_queue_location_client_local.erl b/src/rabbit_queue_location_client_local.erl new file mode 100644 index 0000000000..c3fe2e8713 --- /dev/null +++ b/src/rabbit_queue_location_client_local.erl @@ -0,0 +1,47 @@ +%%% The contents of this file are subject to the Mozilla Public License +%%% Version 1.1 (the "License"); you may not use this file except in +%%% compliance with the License. You may obtain a copy of the License at +%%% http://www.mozilla.org/MPL/ +%%% +%%% Software distributed under the License is distributed on an "AS IS" +%%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%%% License for the specific language governing rights and limitations +%%% under the License. +%%% +%%% The Original Code is RabbitMQ. +%%% +%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com> +%%% @doc +%%% - Queue Master Location 'client local' selection callback +%%% +%%% @end +%%% Created : 19. Jun 2015 +%%%------------------------------------------------------------------- +-module(rabbit_queue_location_client_local). + +-include("rabbit.hrl"). + +-behaviour(rabbit_queue_master_locator). + +-export([description/0, queue_master_location/1, validate_policy/1]). + +-rabbit_boot_step({?MODULE, + [{description, "Set queue master node as the client local node"}, + {mfa, {rabbit_registry, register, + [queue_master_locator, <<"client-local">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + + +%%--------------------------------------------------------------------------- +%% Queue Master Location Callbacks +%%--------------------------------------------------------------------------- + +description() -> + [{description, <<"Set queue master node as the client local node">>}]. + +queue_master_location(#amqqueue{}) -> + MasterNode = node(), + {ok, MasterNode}. + +validate_policy(_Args) -> ok.
\ No newline at end of file diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl new file mode 100644 index 0000000000..6a2e378afd --- /dev/null +++ b/src/rabbit_queue_location_min_masters.erl @@ -0,0 +1,69 @@ +%%% The contents of this file are subject to the Mozilla Public License +%%% Version 1.1 (the "License"); you may not use this file except in +%%% compliance with the License. You may obtain a copy of the License at +%%% http://www.mozilla.org/MPL/ +%%% +%%% Software distributed under the License is distributed on an "AS IS" +%%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%%% License for the specific language governing rights and limitations +%%% under the License. +%%% +%%% The Original Code is RabbitMQ. +%%% +%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com> +%%% @doc +%%% - Queue Master Location 'minimum bound queues' selection callback +%%% +%%% @end +%%% Created : 19. Jun 2015 +%%%------------------------------------------------------------------- +-module(rabbit_queue_location_min_masters). +-behaviour(rabbit_queue_master_locator). + +-include("rabbit.hrl"). + +-export([ description/0, queue_master_location/1, validate_policy/1 ]). + +-rabbit_boot_step({?MODULE, + [{description, "Locate queue master node from cluster node with least bound queues"}, + {mfa, {rabbit_registry, register, + [queue_master_locator, <<"min-masters">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + +%%--------------------------------------------------------------------------- +%% Queue Master Location Callbacks +%%--------------------------------------------------------------------------- + +description() -> + [{description, <<"Locate queue master node from cluster node with least bound queues">>}]. + +queue_master_location(#amqqueue{}) -> + Cluster = rabbit_queue_master_location_misc:all_nodes(), + {_Count, MinNode}= get_bound_queue_counts(Cluster, {undefined, undefined}), + {ok, MinNode}. + +validate_policy(_Args) -> ok. + +%%--------------------------------------------------------------------------- +%% Private helper functions +%%--------------------------------------------------------------------------- + +get_bound_queue_counts([], MinNode) -> MinNode; +get_bound_queue_counts([Node|Rem], {undefined, undefined}) -> + VHosts = rpc:call(Node, rabbit_vhost, list, []), + Count = get_total_vhost_bound_queues(Node, VHosts, 0), + get_bound_queue_counts(Rem, {Count, Node}); +get_bound_queue_counts([Node0|Rem], MinNode={Count, _Node}) -> + VHosts = rpc:call(Node0, rabbit_vhost, list, []), + Count0 = get_total_vhost_bound_queues(Node0, VHosts, 0), + MinNode0 = if Count0 < Count -> {Count0, Node0}; + true -> MinNode + end, + get_bound_queue_counts(Rem, MinNode0). + +get_total_vhost_bound_queues(_Node, [], Count) -> Count; +get_total_vhost_bound_queues(Node, [VHostPath|Rem], Count) -> + Count0 = length(rpc:call(Node, rabbit_binding, list, [VHostPath])), + get_total_vhost_bound_queues(Node, Rem, Count+Count0). + diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl new file mode 100644 index 0000000000..fbf7d7e2db --- /dev/null +++ b/src/rabbit_queue_location_random.erl @@ -0,0 +1,47 @@ +%%% The contents of this file are subject to the Mozilla Public License +%%% Version 1.1 (the "License"); you may not use this file except in +%%% compliance with the License. You may obtain a copy of the License at +%%% http://www.mozilla.org/MPL/ +%%% +%%% Software distributed under the License is distributed on an "AS IS" +%%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%%% License for the specific language governing rights and limitations +%%% under the License. +%%% +%%% The Original Code is RabbitMQ. +%%% +%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com> +%%% @doc +%%% - Queue Master Location 'random' selection callback implementation +%%% +%%% @end +%%% Created : 19. Jun 2015 +%%%------------------------------------------------------------------- +-module(rabbit_queue_location_random). +-behaviour(rabbit_queue_master_locator). + +-include("rabbit.hrl"). + +-export([ description/0, queue_master_location/1, validate_policy/1 ]). + +-rabbit_boot_step({?MODULE, + [{description, "Locate queue master node from cluster in a random manner"}, + {mfa, {rabbit_registry, register, + [queue_master_locator, <<"random">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + +%%--------------------------------------------------------------------------- +%% Queue Master Location Callbacks +%%--------------------------------------------------------------------------- + +description() -> + [{description, <<"Locate queue master node from cluster in a random manner">>}]. + +queue_master_location(#amqqueue{}) -> + Cluster = rabbit_queue_master_location_misc:all_nodes(), + RandomPos = erlang:phash(now(), length(Cluster)), + MasterNode = lists:nth(RandomPos, Cluster), + {ok, MasterNode}. + +validate_policy(_Args) -> ok.
\ No newline at end of file diff --git a/src/rabbit_queue_location_round_robin.erl b/src/rabbit_queue_location_round_robin.erl new file mode 100644 index 0000000000..d45190aacf --- /dev/null +++ b/src/rabbit_queue_location_round_robin.erl @@ -0,0 +1,61 @@ +%%% The contents of this file are subject to the Mozilla Public License +%%% Version 1.1 (the "License"); you may not use this file except in +%%% compliance with the License. You may obtain a copy of the License at +%%% http://www.mozilla.org/MPL/ +%%% +%%% Software distributed under the License is distributed on an "AS IS" +%%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%%% License for the specific language governing rights and limitations +%%% under the License. +%%% +%%% The Original Code is RabbitMQ. +%%% +%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com> +%%% @doc +%%% - Queue Master Location callback implementation +%%% +%%% @end +%%% Created : 19. Jun 2015 15:34 +%%%------------------------------------------------------------------- +-module(rabbit_queue_location_round_robin). +-behaviour(rabbit_queue_master_location). + +-include("rabbit.hrl"). + +-export([ init/1, + description/0, + queue_master_location/3, + validate_policy/1 ]). + +-record(state, {rr_pointer=1, cluster_size=2}). % cluster must start at any value > pointer + +-rabbit_boot_step({?MODULE, + [{description, "Locate queue master node in a round robin manner"}, + {mfa, {rabbit_registry, register, + [queue_master_location_strategy, <<"round-robin">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + + +%%--------------------------------------------------------------------------- +%% Queue Master Location Callbacks +%%--------------------------------------------------------------------------- + +description() -> + [{description, <<"Locate queue master node in a round robin manner">>}]. + +init(_Args) -> + {ok, #state{}}. + +queue_master_location(#amqqueue{}, Cluster, CbState=#state{rr_pointer = RR_Pointer, + cluster_size = RR_Pointer}) -> + MasterNode = lists:nth(RR_Pointer, Cluster), + {reply, MasterNode, CbState#state{rr_pointer=1, cluster_size=length(Cluster)}}; + +queue_master_location(#amqqueue{}, Cluster, CbState=#state{rr_pointer = RR_Pointer, + cluster_size= ClusterSize}) when RR_Pointer =< ClusterSize, + length(Cluster) > 0 -> + MasterNode = lists:nth(RR_Pointer,Cluster), + {reply, MasterNode, CbState#state{rr_pointer=RR_Pointer+1,cluster_size=length(Cluster)}}. + +validate_policy(_Args) -> ok.
\ No newline at end of file diff --git a/src/rabbit_queue_master_location_misc.erl b/src/rabbit_queue_master_location_misc.erl new file mode 100644 index 0000000000..b7fe72c63b --- /dev/null +++ b/src/rabbit_queue_master_location_misc.erl @@ -0,0 +1,144 @@ +%%% The contents of this file are subject to the Mozilla Public License +%%% Version 1.1 (the "License"); you may not use this file except in +%%% compliance with the License. You may obtain a copy of the License at +%%% http://www.mozilla.org/MPL/ +%%% +%%% Software distributed under the License is distributed on an "AS IS" +%%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%%% License for the specific language governing rights and limitations +%%% under the License. +%%% +%%% The Original Code is RabbitMQ. +%%% +%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com> +%%% @doc +%%% - Queue Master Location miscellaneous functions +%%% +%%% @end +%%% Created : 19. Jun 2015 +%%%------------------------------------------------------------------- +-module(rabbit_queue_master_location_misc). +-behaviour(rabbit_policy_validator). + +-include("rabbit.hrl"). + +%% API +-export([ lookup_master/2, + lookup_queue/2, + get_location/1, + get_location_by_config/1, + get_location_by_args/1, + get_location_by_policy/1, + all_nodes/0, + delay_ms/1, + policy/2, + module/1 ]). + +-export([ validate_policy/1 ]). + +-rabbit_boot_step( +{?MODULE, + [{description, "Queue location policy validation"}, + {mfa, {rabbit_registry, register, + [policy_validator, << "queue-master-location" >>, ?MODULE]}}]}). + +lookup_master(QueueNameBin, VHostPath) when is_binary(QueueNameBin), + is_binary(VHostPath) -> + Queue = rabbit_misc:r(VHostPath, queue, QueueNameBin), + case rabbit_amqqueue:lookup(Queue) of + {ok, #amqqueue{pid=Pid}} when is_pid(Pid) -> + {ok, node(Pid)}; + Error -> Error + end. + +lookup_queue(QueueNameBin, VHostPath) when is_binary(QueueNameBin), + is_binary(VHostPath) -> + Queue = rabbit_misc:r(VHostPath, queue, QueueNameBin), + case rabbit_amqqueue:lookup(Queue) of + Reply = {ok, #amqqueue{}} -> + Reply; + Error -> Error + end. + +get_location(Queue=#amqqueue{})-> + case get_location_by_args(Queue) of + _Err1={error, _} -> + case get_location_by_policy(Queue) of + _Err2={error, _} -> + case get_location_by_config(Queue) of + Err3={error, _} -> Err3; + Reply={ok, _Node} -> Reply + end; + Reply={ok, _Node} -> Reply + end; + Reply={ok, _Node} -> Reply + end. + +get_location_by_args(Queue=#amqqueue{arguments=Args}) -> + case proplists:lookup( << "queue-master-location" >> , Args) of + { << "queue-master-location" >> , Strategy} -> + case validate_strategy(Strategy) of + {ok, CB} -> CB:queue_master_location(Queue); + Error -> Error + end; + _ -> {error, "queue-master-location undefined"} + end. + +get_location_by_policy(Queue=#amqqueue{}) -> + case rabbit_policy:get( << "queue-master-location" >> , Queue) of + undefined -> {error, "queue-master-location policy undefined"}; + Strategy -> + case validate_strategy(Strategy) of + {ok, CB} -> CB:queue_master_location(Queue); + Error -> Error + end + end. + +get_location_by_config(Queue=#amqqueue{}) -> + case application:get_env(rabbit, queue_master_location) of + {ok, Strategy} -> + case validate_strategy(Strategy) of + {ok, CB} -> CB:queue_master_location(Queue); + Error -> Error + end; + _ -> {error, "queue-master-location undefined"} + end. + + +validate_policy(KeyList) -> + case proplists:lookup( << "queue-master-location" >> , KeyList) of + {_, Strategy} -> validate_strategy(Strategy); + _ -> {error, "queue-master-location undefined"} + end. + +validate_strategy(Strategy) -> + case module(Strategy) of + R={ok, _M} -> R; + _ -> {error, "~p is not a valid queue-master-location value", [Strategy]} + end. + + +all_nodes() -> rabbit_mnesia:cluster_nodes(running). + +delay_ms(Ms) -> receive after Ms -> void end. + +policy(Policy, Q) -> + case rabbit_policy:get(Policy, Q) of + undefined -> none; + P -> P + end. + +module(#amqqueue{} = Q) -> + case rabbit_policy:get( << "queue-master-location" >> , Q) of + undefined -> no_location_strategy; + Mode -> module(Mode) + end; + +module(Strategy) when is_binary(Strategy) -> + case rabbit_registry:binary_to_type(Strategy) of + {error, not_found} -> no_location_strategy; + T -> case rabbit_registry:lookup_module(queue_master_locator, T) of + {ok, Module} -> {ok, Module}; + _ -> no_location_strategy + end + end.
\ No newline at end of file diff --git a/src/rabbit_queue_master_locator.erl b/src/rabbit_queue_master_locator.erl new file mode 100644 index 0000000000..096b9db713 --- /dev/null +++ b/src/rabbit_queue_master_locator.erl @@ -0,0 +1,38 @@ +%%% The contents of this file are subject to the Mozilla Public License +%%% Version 1.1 (the "License"); you may not use this file except in +%%% compliance with the License. You may obtain a copy of the License at +%%% http://www.mozilla.org/MPL/ +%%% +%%% Software distributed under the License is distributed on an "AS IS" +%%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%%% License for the specific language governing rights and limitations +%%% under the License. +%%% +%%% The Original Code is RabbitMQ. +%%% +%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com> +%%% @doc +%%% - Queue Master Location behaviour implementation +%%% +%%% @end +%%% Created : 19. Jun 2015 +%%%------------------------------------------------------------------- +-module(rabbit_queue_master_locator). + +-ifdef(use_specs). + +-callback description() -> [proplists:property()]. +-callback queue_master_location(pid()) -> {'ok', node()} | {'error', term()}. +-callback validate_policy(pid()) -> {'ok', node()} | {'error', term()}. + +-else. + +-export([behaviour_info/1]). +behaviour_info(callbacks) -> + [ {description, 0}, + {queue_master_location, 1}, + {validate_policy, 1}]; +behaviour_info(_Other) -> + undefined. + +-endif. |
