summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAyanda Dube <ayanda.dube@erlang-solutions.com>2015-07-08 19:45:01 +0100
committerAyanda Dube <ayanda.dube@erlang-solutions.com>2015-07-08 19:45:01 +0100
commit2815985b1177ddfc939bd7e24ac63f42510650e5 (patch)
tree22ae3f4f6332d3bc5dc6aba05cc2a34d7db34bd7
parent9b291479d133272013a85270dd44a3001c6850d6 (diff)
downloadrabbitmq-server-git-2815985b1177ddfc939bd7e24ac63f42510650e5.tar.gz
Change min-masters calculation
Remove validate_policy callback from queue location behaviour Create seperate queue location policy validation module Delete function, delay_ms/1. Unused Remove functions not used by other modules from export Update coding style and header to GoPivotal Inc standards References #121.
-rw-r--r--src/rabbit_queue_location_client_local.erl36
-rw-r--r--src/rabbit_queue_location_min_masters.erl88
-rw-r--r--src/rabbit_queue_location_random.erl38
-rw-r--r--src/rabbit_queue_location_validator.erl65
-rw-r--r--src/rabbit_queue_master_location_misc.erl170
-rw-r--r--src/rabbit_queue_master_locator.erl20
6 files changed, 219 insertions, 198 deletions
diff --git a/src/rabbit_queue_location_client_local.erl b/src/rabbit_queue_location_client_local.erl
index c3fe2e8713..56b867da64 100644
--- a/src/rabbit_queue_location_client_local.erl
+++ b/src/rabbit_queue_location_client_local.erl
@@ -10,27 +10,25 @@
%%%
%%% The Original Code is RabbitMQ.
%%%
-%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com>
-%%% @doc
-%%% - Queue Master Location 'client local' selection callback
-%%%
-%%% @end
-%%% Created : 19. Jun 2015
-%%%-------------------------------------------------------------------
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
+%%
+
-module(rabbit_queue_location_client_local).
+-behaviour(rabbit_queue_master_locator).
-include("rabbit.hrl").
--behaviour(rabbit_queue_master_locator).
-
--export([description/0, queue_master_location/1, validate_policy/1]).
+-export([description/0, queue_master_location/1]).
-rabbit_boot_step({?MODULE,
- [{description, "Set queue master node as the client local node"},
- {mfa, {rabbit_registry, register,
- [queue_master_locator, <<"client-local">>, ?MODULE]}},
- {requires, rabbit_registry},
- {enables, kernel_ready}]}).
+ [{description, "locate queue master client local"},
+ {mfa, {rabbit_registry, register,
+ [queue_master_locator,
+ <<"client-local">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
%%---------------------------------------------------------------------------
@@ -38,10 +36,8 @@
%%---------------------------------------------------------------------------
description() ->
- [{description, <<"Set queue master node as the client local node">>}].
+ [{description, <<"Locate queue master node as the client local node">>}].
queue_master_location(#amqqueue{}) ->
- MasterNode = node(),
- {ok, MasterNode}.
-
-validate_policy(_Args) -> ok. \ No newline at end of file
+ MasterNode = node(),
+ {ok, MasterNode}.
diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl
index 6a2e378afd..1fef6b8c02 100644
--- a/src/rabbit_queue_location_min_masters.erl
+++ b/src/rabbit_queue_location_min_masters.erl
@@ -10,60 +10,78 @@
%%%
%%% The Original Code is RabbitMQ.
%%%
-%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com>
-%%% @doc
-%%% - Queue Master Location 'minimum bound queues' selection callback
-%%%
-%%% @end
-%%% Created : 19. Jun 2015
-%%%-------------------------------------------------------------------
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
+%%
+
-module(rabbit_queue_location_min_masters).
-behaviour(rabbit_queue_master_locator).
-include("rabbit.hrl").
--export([ description/0, queue_master_location/1, validate_policy/1 ]).
+-export([description/0, queue_master_location/1]).
-rabbit_boot_step({?MODULE,
- [{description, "Locate queue master node from cluster node with least bound queues"},
- {mfa, {rabbit_registry, register,
- [queue_master_locator, <<"min-masters">>, ?MODULE]}},
- {requires, rabbit_registry},
- {enables, kernel_ready}]}).
+ [{description, "locate queue master min bound queues"},
+ {mfa, {rabbit_registry, register,
+ [queue_master_locator,
+ <<"min-masters">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
%%---------------------------------------------------------------------------
%% Queue Master Location Callbacks
%%---------------------------------------------------------------------------
description() ->
- [{description, <<"Locate queue master node from cluster node with least bound queues">>}].
+ [{description,
+ <<"Locate queue master node from cluster node with least bound queues">>}].
queue_master_location(#amqqueue{}) ->
- Cluster = rabbit_queue_master_location_misc:all_nodes(),
- {_Count, MinNode}= get_bound_queue_counts(Cluster, {undefined, undefined}),
- {ok, MinNode}.
-
-validate_policy(_Args) -> ok.
+ Cluster = rabbit_queue_master_location_misc:all_nodes(),
+ VHosts = rabbit_vhost:list(),
+ BoundQueueMasters = get_bound_queue_masters_per_vhost(VHosts, []),
+ {_Count, MinMaster}= get_min_master(Cluster, BoundQueueMasters,
+ {undefined, undefined}),
+ {ok, MinMaster}.
%%---------------------------------------------------------------------------
%% Private helper functions
%%---------------------------------------------------------------------------
-get_bound_queue_counts([], MinNode) -> MinNode;
-get_bound_queue_counts([Node|Rem], {undefined, undefined}) ->
- VHosts = rpc:call(Node, rabbit_vhost, list, []),
- Count = get_total_vhost_bound_queues(Node, VHosts, 0),
- get_bound_queue_counts(Rem, {Count, Node});
-get_bound_queue_counts([Node0|Rem], MinNode={Count, _Node}) ->
- VHosts = rpc:call(Node0, rabbit_vhost, list, []),
- Count0 = get_total_vhost_bound_queues(Node0, VHosts, 0),
- MinNode0 = if Count0 < Count -> {Count0, Node0};
- true -> MinNode
- end,
- get_bound_queue_counts(Rem, MinNode0).
+get_min_master([], _BoundQueueMasters, MinNode) -> MinNode;
+get_min_master([Node|Rem], BoundQueueMasters, {undefined, undefined}) ->
+ Count = count_masters(Node, BoundQueueMasters, 0),
+ get_min_master(Rem, BoundQueueMasters, {Count, Node});
+get_min_master([Node0|Rem], BoundQueueMasters, MinNode={Count, _Node}) ->
+ Count0 = count_masters(Node0, BoundQueueMasters, 0),
+ MinNode0 = if Count0 < Count -> {Count0, Node0};
+ true -> MinNode
+ end,
+ get_min_master(Rem, BoundQueueMasters, MinNode0).
+
+
+get_bound_queue_masters_per_vhost([], Acc) ->
+ lists:flatten(Acc);
+get_bound_queue_masters_per_vhost([VHost|RemVHosts], Acc) ->
+ Bindings = rabbit_binding:list(VHost),
+ BoundQueueMasters = get_queue_master_per_binding(VHost, Bindings, []),
+ get_bound_queue_masters_per_vhost(RemVHosts, [BoundQueueMasters|Acc]).
+
+
+get_queue_master_per_binding(_VHost, [], BoundQueueNodes) -> BoundQueueNodes;
+get_queue_master_per_binding(VHost, [#binding{key=QueueName}|RemBindings],
+ QueueMastersAcc) ->
+ QueueMastersAcc0 = case rabbit_queue_master_location_misc:lookup_master(
+ QueueName, VHost) of
+ {ok, Master} when is_atom(Master) ->
+ [Master|QueueMastersAcc];
+ _ -> QueueMastersAcc
+ end,
+ get_queue_master_per_binding(VHost, RemBindings, QueueMastersAcc0).
-get_total_vhost_bound_queues(_Node, [], Count) -> Count;
-get_total_vhost_bound_queues(Node, [VHostPath|Rem], Count) ->
- Count0 = length(rpc:call(Node, rabbit_binding, list, [VHostPath])),
- get_total_vhost_bound_queues(Node, Rem, Count+Count0).
+count_masters(_Node, [], Count) -> Count;
+count_masters(Node, [Node|T], Count) -> count_masters(Node, T, Count+1);
+count_masters(Node, [_|T], Count) -> count_masters(Node, T, Count).
diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl
index fbf7d7e2db..85536c7309 100644
--- a/src/rabbit_queue_location_random.erl
+++ b/src/rabbit_queue_location_random.erl
@@ -10,38 +10,36 @@
%%%
%%% The Original Code is RabbitMQ.
%%%
-%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com>
-%%% @doc
-%%% - Queue Master Location 'random' selection callback implementation
-%%%
-%%% @end
-%%% Created : 19. Jun 2015
-%%%-------------------------------------------------------------------
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
+%%
+
-module(rabbit_queue_location_random).
-behaviour(rabbit_queue_master_locator).
-include("rabbit.hrl").
--export([ description/0, queue_master_location/1, validate_policy/1 ]).
+-export([description/0, queue_master_location/1]).
-rabbit_boot_step({?MODULE,
- [{description, "Locate queue master node from cluster in a random manner"},
- {mfa, {rabbit_registry, register,
- [queue_master_locator, <<"random">>, ?MODULE]}},
- {requires, rabbit_registry},
- {enables, kernel_ready}]}).
+ [{description, "locate queue master random"},
+ {mfa, {rabbit_registry, register,
+ [queue_master_locator,
+ <<"random">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
%%---------------------------------------------------------------------------
%% Queue Master Location Callbacks
%%---------------------------------------------------------------------------
description() ->
- [{description, <<"Locate queue master node from cluster in a random manner">>}].
+ [{description,
+ <<"Locate queue master node from cluster in a random manner">>}].
queue_master_location(#amqqueue{}) ->
- Cluster = rabbit_queue_master_location_misc:all_nodes(),
- RandomPos = erlang:phash(now(), length(Cluster)),
- MasterNode = lists:nth(RandomPos, Cluster),
- {ok, MasterNode}.
-
-validate_policy(_Args) -> ok. \ No newline at end of file
+ Cluster = rabbit_queue_master_location_misc:all_nodes(),
+ RandomPos = erlang:phash(now(), length(Cluster)),
+ MasterNode = lists:nth(RandomPos, Cluster),
+ {ok, MasterNode}.
diff --git a/src/rabbit_queue_location_validator.erl b/src/rabbit_queue_location_validator.erl
new file mode 100644
index 0000000000..6f4b8a6a51
--- /dev/null
+++ b/src/rabbit_queue_location_validator.erl
@@ -0,0 +1,65 @@
+%%% The contents of this file are subject to the Mozilla Public License
+%%% Version 1.1 (the "License"); you may not use this file except in
+%%% compliance with the License. You may obtain a copy of the License at
+%%% http://www.mozilla.org/MPL/
+%%%
+%%% Software distributed under the License is distributed on an "AS IS"
+%%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%%% License for the specific language governing rights and limitations
+%%% under the License.
+%%%
+%%% The Original Code is RabbitMQ.
+%%%
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_queue_location_validator).
+-behaviour(rabbit_policy_validator).
+
+-include("rabbit.hrl").
+
+-export([validate_policy/1, validate_strategy/1]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "Queue location policy validation"},
+ {mfa, {rabbit_registry, register,
+ [policy_validator,
+ <<"queue-master-location">>,
+ ?MODULE]}}]}).
+
+validate_policy(KeyList) ->
+ case proplists:lookup(<<"queue-master-location">> , KeyList) of
+ {_, Strategy} -> validate_strategy(Strategy);
+ _ -> {error, "queue-master-location undefined"}
+ end.
+
+validate_strategy(Strategy) ->
+ case module(Strategy) of
+ R={ok, _M} -> R;
+ _ ->
+ {error, "~p invalid queue-master-location value", [Strategy]}
+ end.
+
+policy(Policy, Q) ->
+ case rabbit_policy:get(Policy, Q) of
+ undefined -> none;
+ P -> P
+ end.
+
+module(#amqqueue{} = Q) ->
+ case policy(<<"queue-master-location">>, Q) of
+ undefined -> no_location_strategy;
+ Mode -> module(Mode)
+ end;
+
+module(Strategy) when is_binary(Strategy) ->
+ case rabbit_registry:binary_to_type(Strategy) of
+ {error, not_found} -> no_location_strategy;
+ T ->
+ case rabbit_registry:lookup_module(queue_master_locator, T) of
+ {ok, Module} -> {ok, Module};
+ _ -> no_location_strategy
+ end
+ end.
diff --git a/src/rabbit_queue_master_location_misc.erl b/src/rabbit_queue_master_location_misc.erl
index b7fe72c63b..6d5639eace 100644
--- a/src/rabbit_queue_master_location_misc.erl
+++ b/src/rabbit_queue_master_location_misc.erl
@@ -10,135 +10,83 @@
%%%
%%% The Original Code is RabbitMQ.
%%%
-%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com>
-%%% @doc
-%%% - Queue Master Location miscellaneous functions
-%%%
-%%% @end
-%%% Created : 19. Jun 2015
-%%%-------------------------------------------------------------------
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
+%%
+
-module(rabbit_queue_master_location_misc).
--behaviour(rabbit_policy_validator).
-include("rabbit.hrl").
-%% API
--export([ lookup_master/2,
- lookup_queue/2,
- get_location/1,
- get_location_by_config/1,
- get_location_by_args/1,
- get_location_by_policy/1,
- all_nodes/0,
- delay_ms/1,
- policy/2,
- module/1 ]).
-
--export([ validate_policy/1 ]).
-
--rabbit_boot_step(
-{?MODULE,
- [{description, "Queue location policy validation"},
- {mfa, {rabbit_registry, register,
- [policy_validator, << "queue-master-location" >>, ?MODULE]}}]}).
+-export([lookup_master/2,
+ lookup_queue/2,
+ get_location/1,
+ get_location_by_config/1,
+ get_location_by_args/1,
+ get_location_by_policy/1,
+ all_nodes/0]).
lookup_master(QueueNameBin, VHostPath) when is_binary(QueueNameBin),
is_binary(VHostPath) ->
- Queue = rabbit_misc:r(VHostPath, queue, QueueNameBin),
- case rabbit_amqqueue:lookup(Queue) of
- {ok, #amqqueue{pid=Pid}} when is_pid(Pid) ->
- {ok, node(Pid)};
- Error -> Error
- end.
+ Queue = rabbit_misc:r(VHostPath, queue, QueueNameBin),
+ case rabbit_amqqueue:lookup(Queue) of
+ {ok, #amqqueue{pid=Pid}} when is_pid(Pid) ->
+ {ok, node(Pid)};
+ Error -> Error
+ end.
lookup_queue(QueueNameBin, VHostPath) when is_binary(QueueNameBin),
- is_binary(VHostPath) ->
- Queue = rabbit_misc:r(VHostPath, queue, QueueNameBin),
- case rabbit_amqqueue:lookup(Queue) of
- Reply = {ok, #amqqueue{}} ->
- Reply;
- Error -> Error
- end.
+ is_binary(VHostPath) ->
+ Queue = rabbit_misc:r(VHostPath, queue, QueueNameBin),
+ case rabbit_amqqueue:lookup(Queue) of
+ Reply = {ok, #amqqueue{}} -> Reply;
+ Error -> Error
+ end.
get_location(Queue=#amqqueue{})->
- case get_location_by_args(Queue) of
- _Err1={error, _} ->
- case get_location_by_policy(Queue) of
- _Err2={error, _} ->
- case get_location_by_config(Queue) of
- Err3={error, _} -> Err3;
- Reply={ok, _Node} -> Reply
- end;
+ case get_location_by_args(Queue) of
+ _Err1={error, _} ->
+ case get_location_by_policy(Queue) of
+ _Err2={error, _} ->
+ case get_location_by_config(Queue) of
+ Err3={error, _} -> Err3;
+ Reply={ok, _Node} -> Reply
+ end;
+ Reply={ok, _Node} -> Reply
+ end;
Reply={ok, _Node} -> Reply
- end;
- Reply={ok, _Node} -> Reply
- end.
+ end.
get_location_by_args(Queue=#amqqueue{arguments=Args}) ->
- case proplists:lookup( << "queue-master-location" >> , Args) of
- { << "queue-master-location" >> , Strategy} ->
- case validate_strategy(Strategy) of
- {ok, CB} -> CB:queue_master_location(Queue);
- Error -> Error
- end;
- _ -> {error, "queue-master-location undefined"}
- end.
+ case proplists:lookup(<<"queue-master-location">> , Args) of
+ {<<"queue-master-location">> , Strategy} ->
+ case rabbit_queue_location_validator:validate_strategy(Strategy) of
+ {ok, CB} -> CB:queue_master_location(Queue);
+ Error -> Error
+ end;
+ _ -> {error, "queue-master-location undefined"}
+ end.
get_location_by_policy(Queue=#amqqueue{}) ->
- case rabbit_policy:get( << "queue-master-location" >> , Queue) of
- undefined -> {error, "queue-master-location policy undefined"};
- Strategy ->
- case validate_strategy(Strategy) of
- {ok, CB} -> CB:queue_master_location(Queue);
- Error -> Error
- end
- end.
+ case rabbit_policy:get( <<"queue-master-location">> , Queue) of
+ undefined -> {error, "queue-master-location policy undefined"};
+ Strategy ->
+ case rabbit_queue_location_validator:validate_strategy(Strategy) of
+ {ok, CB} -> CB:queue_master_location(Queue);
+ Error -> Error
+ end
+ end.
get_location_by_config(Queue=#amqqueue{}) ->
- case application:get_env(rabbit, queue_master_location) of
- {ok, Strategy} ->
- case validate_strategy(Strategy) of
- {ok, CB} -> CB:queue_master_location(Queue);
- Error -> Error
- end;
- _ -> {error, "queue-master-location undefined"}
- end.
-
-
-validate_policy(KeyList) ->
- case proplists:lookup( << "queue-master-location" >> , KeyList) of
- {_, Strategy} -> validate_strategy(Strategy);
- _ -> {error, "queue-master-location undefined"}
- end.
-
-validate_strategy(Strategy) ->
- case module(Strategy) of
- R={ok, _M} -> R;
- _ -> {error, "~p is not a valid queue-master-location value", [Strategy]}
- end.
-
+ case application:get_env(rabbit, queue_master_location) of
+ {ok, Strategy} ->
+ case rabbit_queue_location_validator:validate_strategy(Strategy) of
+ {ok, CB} -> CB:queue_master_location(Queue);
+ Error -> Error
+ end;
+ _ -> {error, "queue-master-location undefined"}
+ end.
all_nodes() -> rabbit_mnesia:cluster_nodes(running).
-delay_ms(Ms) -> receive after Ms -> void end.
-
-policy(Policy, Q) ->
- case rabbit_policy:get(Policy, Q) of
- undefined -> none;
- P -> P
- end.
-
-module(#amqqueue{} = Q) ->
- case rabbit_policy:get( << "queue-master-location" >> , Q) of
- undefined -> no_location_strategy;
- Mode -> module(Mode)
- end;
-
-module(Strategy) when is_binary(Strategy) ->
- case rabbit_registry:binary_to_type(Strategy) of
- {error, not_found} -> no_location_strategy;
- T -> case rabbit_registry:lookup_module(queue_master_locator, T) of
- {ok, Module} -> {ok, Module};
- _ -> no_location_strategy
- end
- end. \ No newline at end of file
diff --git a/src/rabbit_queue_master_locator.erl b/src/rabbit_queue_master_locator.erl
index 096b9db713..b6ec6d77ad 100644
--- a/src/rabbit_queue_master_locator.erl
+++ b/src/rabbit_queue_master_locator.erl
@@ -10,29 +10,25 @@
%%%
%%% The Original Code is RabbitMQ.
%%%
-%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com>
-%%% @doc
-%%% - Queue Master Location behaviour implementation
-%%%
-%%% @end
-%%% Created : 19. Jun 2015
-%%%-------------------------------------------------------------------
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
+%%
+
-module(rabbit_queue_master_locator).
-ifdef(use_specs).
-callback description() -> [proplists:property()].
-callback queue_master_location(pid()) -> {'ok', node()} | {'error', term()}.
--callback validate_policy(pid()) -> {'ok', node()} | {'error', term()}.
-else.
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
- [ {description, 0},
- {queue_master_location, 1},
- {validate_policy, 1}];
+ [{description, 0},
+ {queue_master_location, 1}];
behaviour_info(_Other) ->
- undefined.
+ undefined.
-endif.