diff options
| author | Ayanda Dube <ayanda.dube@erlang-solutions.com> | 2015-07-08 19:45:01 +0100 |
|---|---|---|
| committer | Ayanda Dube <ayanda.dube@erlang-solutions.com> | 2015-07-08 19:45:01 +0100 |
| commit | 2815985b1177ddfc939bd7e24ac63f42510650e5 (patch) | |
| tree | 22ae3f4f6332d3bc5dc6aba05cc2a34d7db34bd7 | |
| parent | 9b291479d133272013a85270dd44a3001c6850d6 (diff) | |
| download | rabbitmq-server-git-2815985b1177ddfc939bd7e24ac63f42510650e5.tar.gz | |
Change min-masters calculation
Remove validate_policy callback from queue location behaviour
Create seperate queue location policy validation module
Delete function, delay_ms/1. Unused
Remove functions not used by other modules from export
Update coding style and header to GoPivotal Inc standards
References #121.
| -rw-r--r-- | src/rabbit_queue_location_client_local.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_queue_location_min_masters.erl | 88 | ||||
| -rw-r--r-- | src/rabbit_queue_location_random.erl | 38 | ||||
| -rw-r--r-- | src/rabbit_queue_location_validator.erl | 65 | ||||
| -rw-r--r-- | src/rabbit_queue_master_location_misc.erl | 170 | ||||
| -rw-r--r-- | src/rabbit_queue_master_locator.erl | 20 |
6 files changed, 219 insertions, 198 deletions
diff --git a/src/rabbit_queue_location_client_local.erl b/src/rabbit_queue_location_client_local.erl index c3fe2e8713..56b867da64 100644 --- a/src/rabbit_queue_location_client_local.erl +++ b/src/rabbit_queue_location_client_local.erl @@ -10,27 +10,25 @@ %%% %%% The Original Code is RabbitMQ. %%% -%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com> -%%% @doc -%%% - Queue Master Location 'client local' selection callback -%%% -%%% @end -%%% Created : 19. Jun 2015 -%%%------------------------------------------------------------------- +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. +%% + -module(rabbit_queue_location_client_local). +-behaviour(rabbit_queue_master_locator). -include("rabbit.hrl"). --behaviour(rabbit_queue_master_locator). - --export([description/0, queue_master_location/1, validate_policy/1]). +-export([description/0, queue_master_location/1]). -rabbit_boot_step({?MODULE, - [{description, "Set queue master node as the client local node"}, - {mfa, {rabbit_registry, register, - [queue_master_locator, <<"client-local">>, ?MODULE]}}, - {requires, rabbit_registry}, - {enables, kernel_ready}]}). + [{description, "locate queue master client local"}, + {mfa, {rabbit_registry, register, + [queue_master_locator, + <<"client-local">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). %%--------------------------------------------------------------------------- @@ -38,10 +36,8 @@ %%--------------------------------------------------------------------------- description() -> - [{description, <<"Set queue master node as the client local node">>}]. + [{description, <<"Locate queue master node as the client local node">>}]. queue_master_location(#amqqueue{}) -> - MasterNode = node(), - {ok, MasterNode}. - -validate_policy(_Args) -> ok.
\ No newline at end of file + MasterNode = node(), + {ok, MasterNode}. diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl index 6a2e378afd..1fef6b8c02 100644 --- a/src/rabbit_queue_location_min_masters.erl +++ b/src/rabbit_queue_location_min_masters.erl @@ -10,60 +10,78 @@ %%% %%% The Original Code is RabbitMQ. %%% -%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com> -%%% @doc -%%% - Queue Master Location 'minimum bound queues' selection callback -%%% -%%% @end -%%% Created : 19. Jun 2015 -%%%------------------------------------------------------------------- +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. +%% + -module(rabbit_queue_location_min_masters). -behaviour(rabbit_queue_master_locator). -include("rabbit.hrl"). --export([ description/0, queue_master_location/1, validate_policy/1 ]). +-export([description/0, queue_master_location/1]). -rabbit_boot_step({?MODULE, - [{description, "Locate queue master node from cluster node with least bound queues"}, - {mfa, {rabbit_registry, register, - [queue_master_locator, <<"min-masters">>, ?MODULE]}}, - {requires, rabbit_registry}, - {enables, kernel_ready}]}). + [{description, "locate queue master min bound queues"}, + {mfa, {rabbit_registry, register, + [queue_master_locator, + <<"min-masters">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). %%--------------------------------------------------------------------------- %% Queue Master Location Callbacks %%--------------------------------------------------------------------------- description() -> - [{description, <<"Locate queue master node from cluster node with least bound queues">>}]. + [{description, + <<"Locate queue master node from cluster node with least bound queues">>}]. queue_master_location(#amqqueue{}) -> - Cluster = rabbit_queue_master_location_misc:all_nodes(), - {_Count, MinNode}= get_bound_queue_counts(Cluster, {undefined, undefined}), - {ok, MinNode}. - -validate_policy(_Args) -> ok. + Cluster = rabbit_queue_master_location_misc:all_nodes(), + VHosts = rabbit_vhost:list(), + BoundQueueMasters = get_bound_queue_masters_per_vhost(VHosts, []), + {_Count, MinMaster}= get_min_master(Cluster, BoundQueueMasters, + {undefined, undefined}), + {ok, MinMaster}. %%--------------------------------------------------------------------------- %% Private helper functions %%--------------------------------------------------------------------------- -get_bound_queue_counts([], MinNode) -> MinNode; -get_bound_queue_counts([Node|Rem], {undefined, undefined}) -> - VHosts = rpc:call(Node, rabbit_vhost, list, []), - Count = get_total_vhost_bound_queues(Node, VHosts, 0), - get_bound_queue_counts(Rem, {Count, Node}); -get_bound_queue_counts([Node0|Rem], MinNode={Count, _Node}) -> - VHosts = rpc:call(Node0, rabbit_vhost, list, []), - Count0 = get_total_vhost_bound_queues(Node0, VHosts, 0), - MinNode0 = if Count0 < Count -> {Count0, Node0}; - true -> MinNode - end, - get_bound_queue_counts(Rem, MinNode0). +get_min_master([], _BoundQueueMasters, MinNode) -> MinNode; +get_min_master([Node|Rem], BoundQueueMasters, {undefined, undefined}) -> + Count = count_masters(Node, BoundQueueMasters, 0), + get_min_master(Rem, BoundQueueMasters, {Count, Node}); +get_min_master([Node0|Rem], BoundQueueMasters, MinNode={Count, _Node}) -> + Count0 = count_masters(Node0, BoundQueueMasters, 0), + MinNode0 = if Count0 < Count -> {Count0, Node0}; + true -> MinNode + end, + get_min_master(Rem, BoundQueueMasters, MinNode0). + + +get_bound_queue_masters_per_vhost([], Acc) -> + lists:flatten(Acc); +get_bound_queue_masters_per_vhost([VHost|RemVHosts], Acc) -> + Bindings = rabbit_binding:list(VHost), + BoundQueueMasters = get_queue_master_per_binding(VHost, Bindings, []), + get_bound_queue_masters_per_vhost(RemVHosts, [BoundQueueMasters|Acc]). + + +get_queue_master_per_binding(_VHost, [], BoundQueueNodes) -> BoundQueueNodes; +get_queue_master_per_binding(VHost, [#binding{key=QueueName}|RemBindings], + QueueMastersAcc) -> + QueueMastersAcc0 = case rabbit_queue_master_location_misc:lookup_master( + QueueName, VHost) of + {ok, Master} when is_atom(Master) -> + [Master|QueueMastersAcc]; + _ -> QueueMastersAcc + end, + get_queue_master_per_binding(VHost, RemBindings, QueueMastersAcc0). -get_total_vhost_bound_queues(_Node, [], Count) -> Count; -get_total_vhost_bound_queues(Node, [VHostPath|Rem], Count) -> - Count0 = length(rpc:call(Node, rabbit_binding, list, [VHostPath])), - get_total_vhost_bound_queues(Node, Rem, Count+Count0). +count_masters(_Node, [], Count) -> Count; +count_masters(Node, [Node|T], Count) -> count_masters(Node, T, Count+1); +count_masters(Node, [_|T], Count) -> count_masters(Node, T, Count). diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl index fbf7d7e2db..85536c7309 100644 --- a/src/rabbit_queue_location_random.erl +++ b/src/rabbit_queue_location_random.erl @@ -10,38 +10,36 @@ %%% %%% The Original Code is RabbitMQ. %%% -%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com> -%%% @doc -%%% - Queue Master Location 'random' selection callback implementation -%%% -%%% @end -%%% Created : 19. Jun 2015 -%%%------------------------------------------------------------------- +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. +%% + -module(rabbit_queue_location_random). -behaviour(rabbit_queue_master_locator). -include("rabbit.hrl"). --export([ description/0, queue_master_location/1, validate_policy/1 ]). +-export([description/0, queue_master_location/1]). -rabbit_boot_step({?MODULE, - [{description, "Locate queue master node from cluster in a random manner"}, - {mfa, {rabbit_registry, register, - [queue_master_locator, <<"random">>, ?MODULE]}}, - {requires, rabbit_registry}, - {enables, kernel_ready}]}). + [{description, "locate queue master random"}, + {mfa, {rabbit_registry, register, + [queue_master_locator, + <<"random">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). %%--------------------------------------------------------------------------- %% Queue Master Location Callbacks %%--------------------------------------------------------------------------- description() -> - [{description, <<"Locate queue master node from cluster in a random manner">>}]. + [{description, + <<"Locate queue master node from cluster in a random manner">>}]. queue_master_location(#amqqueue{}) -> - Cluster = rabbit_queue_master_location_misc:all_nodes(), - RandomPos = erlang:phash(now(), length(Cluster)), - MasterNode = lists:nth(RandomPos, Cluster), - {ok, MasterNode}. - -validate_policy(_Args) -> ok.
\ No newline at end of file + Cluster = rabbit_queue_master_location_misc:all_nodes(), + RandomPos = erlang:phash(now(), length(Cluster)), + MasterNode = lists:nth(RandomPos, Cluster), + {ok, MasterNode}. diff --git a/src/rabbit_queue_location_validator.erl b/src/rabbit_queue_location_validator.erl new file mode 100644 index 0000000000..6f4b8a6a51 --- /dev/null +++ b/src/rabbit_queue_location_validator.erl @@ -0,0 +1,65 @@ +%%% The contents of this file are subject to the Mozilla Public License +%%% Version 1.1 (the "License"); you may not use this file except in +%%% compliance with the License. You may obtain a copy of the License at +%%% http://www.mozilla.org/MPL/ +%%% +%%% Software distributed under the License is distributed on an "AS IS" +%%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%%% License for the specific language governing rights and limitations +%%% under the License. +%%% +%%% The Original Code is RabbitMQ. +%%% +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_queue_location_validator). +-behaviour(rabbit_policy_validator). + +-include("rabbit.hrl"). + +-export([validate_policy/1, validate_strategy/1]). + +-rabbit_boot_step({?MODULE, + [{description, "Queue location policy validation"}, + {mfa, {rabbit_registry, register, + [policy_validator, + <<"queue-master-location">>, + ?MODULE]}}]}). + +validate_policy(KeyList) -> + case proplists:lookup(<<"queue-master-location">> , KeyList) of + {_, Strategy} -> validate_strategy(Strategy); + _ -> {error, "queue-master-location undefined"} + end. + +validate_strategy(Strategy) -> + case module(Strategy) of + R={ok, _M} -> R; + _ -> + {error, "~p invalid queue-master-location value", [Strategy]} + end. + +policy(Policy, Q) -> + case rabbit_policy:get(Policy, Q) of + undefined -> none; + P -> P + end. + +module(#amqqueue{} = Q) -> + case policy(<<"queue-master-location">>, Q) of + undefined -> no_location_strategy; + Mode -> module(Mode) + end; + +module(Strategy) when is_binary(Strategy) -> + case rabbit_registry:binary_to_type(Strategy) of + {error, not_found} -> no_location_strategy; + T -> + case rabbit_registry:lookup_module(queue_master_locator, T) of + {ok, Module} -> {ok, Module}; + _ -> no_location_strategy + end + end. diff --git a/src/rabbit_queue_master_location_misc.erl b/src/rabbit_queue_master_location_misc.erl index b7fe72c63b..6d5639eace 100644 --- a/src/rabbit_queue_master_location_misc.erl +++ b/src/rabbit_queue_master_location_misc.erl @@ -10,135 +10,83 @@ %%% %%% The Original Code is RabbitMQ. %%% -%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com> -%%% @doc -%%% - Queue Master Location miscellaneous functions -%%% -%%% @end -%%% Created : 19. Jun 2015 -%%%------------------------------------------------------------------- +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. +%% + -module(rabbit_queue_master_location_misc). --behaviour(rabbit_policy_validator). -include("rabbit.hrl"). -%% API --export([ lookup_master/2, - lookup_queue/2, - get_location/1, - get_location_by_config/1, - get_location_by_args/1, - get_location_by_policy/1, - all_nodes/0, - delay_ms/1, - policy/2, - module/1 ]). - --export([ validate_policy/1 ]). - --rabbit_boot_step( -{?MODULE, - [{description, "Queue location policy validation"}, - {mfa, {rabbit_registry, register, - [policy_validator, << "queue-master-location" >>, ?MODULE]}}]}). +-export([lookup_master/2, + lookup_queue/2, + get_location/1, + get_location_by_config/1, + get_location_by_args/1, + get_location_by_policy/1, + all_nodes/0]). lookup_master(QueueNameBin, VHostPath) when is_binary(QueueNameBin), is_binary(VHostPath) -> - Queue = rabbit_misc:r(VHostPath, queue, QueueNameBin), - case rabbit_amqqueue:lookup(Queue) of - {ok, #amqqueue{pid=Pid}} when is_pid(Pid) -> - {ok, node(Pid)}; - Error -> Error - end. + Queue = rabbit_misc:r(VHostPath, queue, QueueNameBin), + case rabbit_amqqueue:lookup(Queue) of + {ok, #amqqueue{pid=Pid}} when is_pid(Pid) -> + {ok, node(Pid)}; + Error -> Error + end. lookup_queue(QueueNameBin, VHostPath) when is_binary(QueueNameBin), - is_binary(VHostPath) -> - Queue = rabbit_misc:r(VHostPath, queue, QueueNameBin), - case rabbit_amqqueue:lookup(Queue) of - Reply = {ok, #amqqueue{}} -> - Reply; - Error -> Error - end. + is_binary(VHostPath) -> + Queue = rabbit_misc:r(VHostPath, queue, QueueNameBin), + case rabbit_amqqueue:lookup(Queue) of + Reply = {ok, #amqqueue{}} -> Reply; + Error -> Error + end. get_location(Queue=#amqqueue{})-> - case get_location_by_args(Queue) of - _Err1={error, _} -> - case get_location_by_policy(Queue) of - _Err2={error, _} -> - case get_location_by_config(Queue) of - Err3={error, _} -> Err3; - Reply={ok, _Node} -> Reply - end; + case get_location_by_args(Queue) of + _Err1={error, _} -> + case get_location_by_policy(Queue) of + _Err2={error, _} -> + case get_location_by_config(Queue) of + Err3={error, _} -> Err3; + Reply={ok, _Node} -> Reply + end; + Reply={ok, _Node} -> Reply + end; Reply={ok, _Node} -> Reply - end; - Reply={ok, _Node} -> Reply - end. + end. get_location_by_args(Queue=#amqqueue{arguments=Args}) -> - case proplists:lookup( << "queue-master-location" >> , Args) of - { << "queue-master-location" >> , Strategy} -> - case validate_strategy(Strategy) of - {ok, CB} -> CB:queue_master_location(Queue); - Error -> Error - end; - _ -> {error, "queue-master-location undefined"} - end. + case proplists:lookup(<<"queue-master-location">> , Args) of + {<<"queue-master-location">> , Strategy} -> + case rabbit_queue_location_validator:validate_strategy(Strategy) of + {ok, CB} -> CB:queue_master_location(Queue); + Error -> Error + end; + _ -> {error, "queue-master-location undefined"} + end. get_location_by_policy(Queue=#amqqueue{}) -> - case rabbit_policy:get( << "queue-master-location" >> , Queue) of - undefined -> {error, "queue-master-location policy undefined"}; - Strategy -> - case validate_strategy(Strategy) of - {ok, CB} -> CB:queue_master_location(Queue); - Error -> Error - end - end. + case rabbit_policy:get( <<"queue-master-location">> , Queue) of + undefined -> {error, "queue-master-location policy undefined"}; + Strategy -> + case rabbit_queue_location_validator:validate_strategy(Strategy) of + {ok, CB} -> CB:queue_master_location(Queue); + Error -> Error + end + end. get_location_by_config(Queue=#amqqueue{}) -> - case application:get_env(rabbit, queue_master_location) of - {ok, Strategy} -> - case validate_strategy(Strategy) of - {ok, CB} -> CB:queue_master_location(Queue); - Error -> Error - end; - _ -> {error, "queue-master-location undefined"} - end. - - -validate_policy(KeyList) -> - case proplists:lookup( << "queue-master-location" >> , KeyList) of - {_, Strategy} -> validate_strategy(Strategy); - _ -> {error, "queue-master-location undefined"} - end. - -validate_strategy(Strategy) -> - case module(Strategy) of - R={ok, _M} -> R; - _ -> {error, "~p is not a valid queue-master-location value", [Strategy]} - end. - + case application:get_env(rabbit, queue_master_location) of + {ok, Strategy} -> + case rabbit_queue_location_validator:validate_strategy(Strategy) of + {ok, CB} -> CB:queue_master_location(Queue); + Error -> Error + end; + _ -> {error, "queue-master-location undefined"} + end. all_nodes() -> rabbit_mnesia:cluster_nodes(running). -delay_ms(Ms) -> receive after Ms -> void end. - -policy(Policy, Q) -> - case rabbit_policy:get(Policy, Q) of - undefined -> none; - P -> P - end. - -module(#amqqueue{} = Q) -> - case rabbit_policy:get( << "queue-master-location" >> , Q) of - undefined -> no_location_strategy; - Mode -> module(Mode) - end; - -module(Strategy) when is_binary(Strategy) -> - case rabbit_registry:binary_to_type(Strategy) of - {error, not_found} -> no_location_strategy; - T -> case rabbit_registry:lookup_module(queue_master_locator, T) of - {ok, Module} -> {ok, Module}; - _ -> no_location_strategy - end - end.
\ No newline at end of file diff --git a/src/rabbit_queue_master_locator.erl b/src/rabbit_queue_master_locator.erl index 096b9db713..b6ec6d77ad 100644 --- a/src/rabbit_queue_master_locator.erl +++ b/src/rabbit_queue_master_locator.erl @@ -10,29 +10,25 @@ %%% %%% The Original Code is RabbitMQ. %%% -%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com> -%%% @doc -%%% - Queue Master Location behaviour implementation -%%% -%%% @end -%%% Created : 19. Jun 2015 -%%%------------------------------------------------------------------- +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. +%% + -module(rabbit_queue_master_locator). -ifdef(use_specs). -callback description() -> [proplists:property()]. -callback queue_master_location(pid()) -> {'ok', node()} | {'error', term()}. --callback validate_policy(pid()) -> {'ok', node()} | {'error', term()}. -else. -export([behaviour_info/1]). behaviour_info(callbacks) -> - [ {description, 0}, - {queue_master_location, 1}, - {validate_policy, 1}]; + [{description, 0}, + {queue_master_location, 1}]; behaviour_info(_Other) -> - undefined. + undefined. -endif. |
