summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl10
-rw-r--r--src/rabbit_queue_location_client_local.erl47
-rw-r--r--src/rabbit_queue_location_min_masters.erl69
-rw-r--r--src/rabbit_queue_location_random.erl47
-rw-r--r--src/rabbit_queue_location_round_robin.erl61
-rw-r--r--src/rabbit_queue_master_location_misc.erl144
-rw-r--r--src/rabbit_queue_master_locator.erl38
7 files changed, 414 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9ce800023f..f75636d248 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -274,9 +274,15 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
recoverable_slaves = [],
gm_pids = [],
state = live})),
- Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),
+
+ Node1 = case rabbit_queue_master_location_misc:get_location(Q) of
+ {ok, Node0} -> Node0;
+ {error, _} -> Node
+ end,
+
+ Node1 = rabbit_mirror_queue_misc:initial_queue_node(Q, Node1),
gen_server2:call(
- rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare),
+ rabbit_amqqueue_sup_sup:start_queue_process(Node1, Q, declare),
{init, new}, infinity).
internal_declare(Q, true) ->
diff --git a/src/rabbit_queue_location_client_local.erl b/src/rabbit_queue_location_client_local.erl
new file mode 100644
index 0000000000..c3fe2e8713
--- /dev/null
+++ b/src/rabbit_queue_location_client_local.erl
@@ -0,0 +1,47 @@
+%%% The contents of this file are subject to the Mozilla Public License
+%%% Version 1.1 (the "License"); you may not use this file except in
+%%% compliance with the License. You may obtain a copy of the License at
+%%% http://www.mozilla.org/MPL/
+%%%
+%%% Software distributed under the License is distributed on an "AS IS"
+%%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%%% License for the specific language governing rights and limitations
+%%% under the License.
+%%%
+%%% The Original Code is RabbitMQ.
+%%%
+%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com>
+%%% @doc
+%%% - Queue Master Location 'client local' selection callback
+%%%
+%%% @end
+%%% Created : 19. Jun 2015
+%%%-------------------------------------------------------------------
+-module(rabbit_queue_location_client_local).
+
+-include("rabbit.hrl").
+
+-behaviour(rabbit_queue_master_locator).
+
+-export([description/0, queue_master_location/1, validate_policy/1]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "Set queue master node as the client local node"},
+ {mfa, {rabbit_registry, register,
+ [queue_master_locator, <<"client-local">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+
+%%---------------------------------------------------------------------------
+%% Queue Master Location Callbacks
+%%---------------------------------------------------------------------------
+
+description() ->
+ [{description, <<"Set queue master node as the client local node">>}].
+
+queue_master_location(#amqqueue{}) ->
+ MasterNode = node(),
+ {ok, MasterNode}.
+
+validate_policy(_Args) -> ok. \ No newline at end of file
diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl
new file mode 100644
index 0000000000..6a2e378afd
--- /dev/null
+++ b/src/rabbit_queue_location_min_masters.erl
@@ -0,0 +1,69 @@
+%%% The contents of this file are subject to the Mozilla Public License
+%%% Version 1.1 (the "License"); you may not use this file except in
+%%% compliance with the License. You may obtain a copy of the License at
+%%% http://www.mozilla.org/MPL/
+%%%
+%%% Software distributed under the License is distributed on an "AS IS"
+%%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%%% License for the specific language governing rights and limitations
+%%% under the License.
+%%%
+%%% The Original Code is RabbitMQ.
+%%%
+%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com>
+%%% @doc
+%%% - Queue Master Location 'minimum bound queues' selection callback
+%%%
+%%% @end
+%%% Created : 19. Jun 2015
+%%%-------------------------------------------------------------------
+-module(rabbit_queue_location_min_masters).
+-behaviour(rabbit_queue_master_locator).
+
+-include("rabbit.hrl").
+
+-export([ description/0, queue_master_location/1, validate_policy/1 ]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "Locate queue master node from cluster node with least bound queues"},
+ {mfa, {rabbit_registry, register,
+ [queue_master_locator, <<"min-masters">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+%%---------------------------------------------------------------------------
+%% Queue Master Location Callbacks
+%%---------------------------------------------------------------------------
+
+description() ->
+ [{description, <<"Locate queue master node from cluster node with least bound queues">>}].
+
+queue_master_location(#amqqueue{}) ->
+ Cluster = rabbit_queue_master_location_misc:all_nodes(),
+ {_Count, MinNode}= get_bound_queue_counts(Cluster, {undefined, undefined}),
+ {ok, MinNode}.
+
+validate_policy(_Args) -> ok.
+
+%%---------------------------------------------------------------------------
+%% Private helper functions
+%%---------------------------------------------------------------------------
+
+get_bound_queue_counts([], MinNode) -> MinNode;
+get_bound_queue_counts([Node|Rem], {undefined, undefined}) ->
+ VHosts = rpc:call(Node, rabbit_vhost, list, []),
+ Count = get_total_vhost_bound_queues(Node, VHosts, 0),
+ get_bound_queue_counts(Rem, {Count, Node});
+get_bound_queue_counts([Node0|Rem], MinNode={Count, _Node}) ->
+ VHosts = rpc:call(Node0, rabbit_vhost, list, []),
+ Count0 = get_total_vhost_bound_queues(Node0, VHosts, 0),
+ MinNode0 = if Count0 < Count -> {Count0, Node0};
+ true -> MinNode
+ end,
+ get_bound_queue_counts(Rem, MinNode0).
+
+get_total_vhost_bound_queues(_Node, [], Count) -> Count;
+get_total_vhost_bound_queues(Node, [VHostPath|Rem], Count) ->
+ Count0 = length(rpc:call(Node, rabbit_binding, list, [VHostPath])),
+ get_total_vhost_bound_queues(Node, Rem, Count+Count0).
+
diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl
new file mode 100644
index 0000000000..fbf7d7e2db
--- /dev/null
+++ b/src/rabbit_queue_location_random.erl
@@ -0,0 +1,47 @@
+%%% The contents of this file are subject to the Mozilla Public License
+%%% Version 1.1 (the "License"); you may not use this file except in
+%%% compliance with the License. You may obtain a copy of the License at
+%%% http://www.mozilla.org/MPL/
+%%%
+%%% Software distributed under the License is distributed on an "AS IS"
+%%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%%% License for the specific language governing rights and limitations
+%%% under the License.
+%%%
+%%% The Original Code is RabbitMQ.
+%%%
+%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com>
+%%% @doc
+%%% - Queue Master Location 'random' selection callback implementation
+%%%
+%%% @end
+%%% Created : 19. Jun 2015
+%%%-------------------------------------------------------------------
+-module(rabbit_queue_location_random).
+-behaviour(rabbit_queue_master_locator).
+
+-include("rabbit.hrl").
+
+-export([ description/0, queue_master_location/1, validate_policy/1 ]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "Locate queue master node from cluster in a random manner"},
+ {mfa, {rabbit_registry, register,
+ [queue_master_locator, <<"random">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+%%---------------------------------------------------------------------------
+%% Queue Master Location Callbacks
+%%---------------------------------------------------------------------------
+
+description() ->
+ [{description, <<"Locate queue master node from cluster in a random manner">>}].
+
+queue_master_location(#amqqueue{}) ->
+ Cluster = rabbit_queue_master_location_misc:all_nodes(),
+ RandomPos = erlang:phash(now(), length(Cluster)),
+ MasterNode = lists:nth(RandomPos, Cluster),
+ {ok, MasterNode}.
+
+validate_policy(_Args) -> ok. \ No newline at end of file
diff --git a/src/rabbit_queue_location_round_robin.erl b/src/rabbit_queue_location_round_robin.erl
new file mode 100644
index 0000000000..d45190aacf
--- /dev/null
+++ b/src/rabbit_queue_location_round_robin.erl
@@ -0,0 +1,61 @@
+%%% The contents of this file are subject to the Mozilla Public License
+%%% Version 1.1 (the "License"); you may not use this file except in
+%%% compliance with the License. You may obtain a copy of the License at
+%%% http://www.mozilla.org/MPL/
+%%%
+%%% Software distributed under the License is distributed on an "AS IS"
+%%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%%% License for the specific language governing rights and limitations
+%%% under the License.
+%%%
+%%% The Original Code is RabbitMQ.
+%%%
+%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com>
+%%% @doc
+%%% - Queue Master Location callback implementation
+%%%
+%%% @end
+%%% Created : 19. Jun 2015 15:34
+%%%-------------------------------------------------------------------
+-module(rabbit_queue_location_round_robin).
+-behaviour(rabbit_queue_master_location).
+
+-include("rabbit.hrl").
+
+-export([ init/1,
+ description/0,
+ queue_master_location/3,
+ validate_policy/1 ]).
+
+-record(state, {rr_pointer=1, cluster_size=2}). % cluster must start at any value > pointer
+
+-rabbit_boot_step({?MODULE,
+ [{description, "Locate queue master node in a round robin manner"},
+ {mfa, {rabbit_registry, register,
+ [queue_master_location_strategy, <<"round-robin">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+
+%%---------------------------------------------------------------------------
+%% Queue Master Location Callbacks
+%%---------------------------------------------------------------------------
+
+description() ->
+ [{description, <<"Locate queue master node in a round robin manner">>}].
+
+init(_Args) ->
+ {ok, #state{}}.
+
+queue_master_location(#amqqueue{}, Cluster, CbState=#state{rr_pointer = RR_Pointer,
+ cluster_size = RR_Pointer}) ->
+ MasterNode = lists:nth(RR_Pointer, Cluster),
+ {reply, MasterNode, CbState#state{rr_pointer=1, cluster_size=length(Cluster)}};
+
+queue_master_location(#amqqueue{}, Cluster, CbState=#state{rr_pointer = RR_Pointer,
+ cluster_size= ClusterSize}) when RR_Pointer =< ClusterSize,
+ length(Cluster) > 0 ->
+ MasterNode = lists:nth(RR_Pointer,Cluster),
+ {reply, MasterNode, CbState#state{rr_pointer=RR_Pointer+1,cluster_size=length(Cluster)}}.
+
+validate_policy(_Args) -> ok. \ No newline at end of file
diff --git a/src/rabbit_queue_master_location_misc.erl b/src/rabbit_queue_master_location_misc.erl
new file mode 100644
index 0000000000..b7fe72c63b
--- /dev/null
+++ b/src/rabbit_queue_master_location_misc.erl
@@ -0,0 +1,144 @@
+%%% The contents of this file are subject to the Mozilla Public License
+%%% Version 1.1 (the "License"); you may not use this file except in
+%%% compliance with the License. You may obtain a copy of the License at
+%%% http://www.mozilla.org/MPL/
+%%%
+%%% Software distributed under the License is distributed on an "AS IS"
+%%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%%% License for the specific language governing rights and limitations
+%%% under the License.
+%%%
+%%% The Original Code is RabbitMQ.
+%%%
+%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com>
+%%% @doc
+%%% - Queue Master Location miscellaneous functions
+%%%
+%%% @end
+%%% Created : 19. Jun 2015
+%%%-------------------------------------------------------------------
+-module(rabbit_queue_master_location_misc).
+-behaviour(rabbit_policy_validator).
+
+-include("rabbit.hrl").
+
+%% API
+-export([ lookup_master/2,
+ lookup_queue/2,
+ get_location/1,
+ get_location_by_config/1,
+ get_location_by_args/1,
+ get_location_by_policy/1,
+ all_nodes/0,
+ delay_ms/1,
+ policy/2,
+ module/1 ]).
+
+-export([ validate_policy/1 ]).
+
+-rabbit_boot_step(
+{?MODULE,
+ [{description, "Queue location policy validation"},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, << "queue-master-location" >>, ?MODULE]}}]}).
+
+lookup_master(QueueNameBin, VHostPath) when is_binary(QueueNameBin),
+ is_binary(VHostPath) ->
+ Queue = rabbit_misc:r(VHostPath, queue, QueueNameBin),
+ case rabbit_amqqueue:lookup(Queue) of
+ {ok, #amqqueue{pid=Pid}} when is_pid(Pid) ->
+ {ok, node(Pid)};
+ Error -> Error
+ end.
+
+lookup_queue(QueueNameBin, VHostPath) when is_binary(QueueNameBin),
+ is_binary(VHostPath) ->
+ Queue = rabbit_misc:r(VHostPath, queue, QueueNameBin),
+ case rabbit_amqqueue:lookup(Queue) of
+ Reply = {ok, #amqqueue{}} ->
+ Reply;
+ Error -> Error
+ end.
+
+get_location(Queue=#amqqueue{})->
+ case get_location_by_args(Queue) of
+ _Err1={error, _} ->
+ case get_location_by_policy(Queue) of
+ _Err2={error, _} ->
+ case get_location_by_config(Queue) of
+ Err3={error, _} -> Err3;
+ Reply={ok, _Node} -> Reply
+ end;
+ Reply={ok, _Node} -> Reply
+ end;
+ Reply={ok, _Node} -> Reply
+ end.
+
+get_location_by_args(Queue=#amqqueue{arguments=Args}) ->
+ case proplists:lookup( << "queue-master-location" >> , Args) of
+ { << "queue-master-location" >> , Strategy} ->
+ case validate_strategy(Strategy) of
+ {ok, CB} -> CB:queue_master_location(Queue);
+ Error -> Error
+ end;
+ _ -> {error, "queue-master-location undefined"}
+ end.
+
+get_location_by_policy(Queue=#amqqueue{}) ->
+ case rabbit_policy:get( << "queue-master-location" >> , Queue) of
+ undefined -> {error, "queue-master-location policy undefined"};
+ Strategy ->
+ case validate_strategy(Strategy) of
+ {ok, CB} -> CB:queue_master_location(Queue);
+ Error -> Error
+ end
+ end.
+
+get_location_by_config(Queue=#amqqueue{}) ->
+ case application:get_env(rabbit, queue_master_location) of
+ {ok, Strategy} ->
+ case validate_strategy(Strategy) of
+ {ok, CB} -> CB:queue_master_location(Queue);
+ Error -> Error
+ end;
+ _ -> {error, "queue-master-location undefined"}
+ end.
+
+
+validate_policy(KeyList) ->
+ case proplists:lookup( << "queue-master-location" >> , KeyList) of
+ {_, Strategy} -> validate_strategy(Strategy);
+ _ -> {error, "queue-master-location undefined"}
+ end.
+
+validate_strategy(Strategy) ->
+ case module(Strategy) of
+ R={ok, _M} -> R;
+ _ -> {error, "~p is not a valid queue-master-location value", [Strategy]}
+ end.
+
+
+all_nodes() -> rabbit_mnesia:cluster_nodes(running).
+
+delay_ms(Ms) -> receive after Ms -> void end.
+
+policy(Policy, Q) ->
+ case rabbit_policy:get(Policy, Q) of
+ undefined -> none;
+ P -> P
+ end.
+
+module(#amqqueue{} = Q) ->
+ case rabbit_policy:get( << "queue-master-location" >> , Q) of
+ undefined -> no_location_strategy;
+ Mode -> module(Mode)
+ end;
+
+module(Strategy) when is_binary(Strategy) ->
+ case rabbit_registry:binary_to_type(Strategy) of
+ {error, not_found} -> no_location_strategy;
+ T -> case rabbit_registry:lookup_module(queue_master_locator, T) of
+ {ok, Module} -> {ok, Module};
+ _ -> no_location_strategy
+ end
+ end. \ No newline at end of file
diff --git a/src/rabbit_queue_master_locator.erl b/src/rabbit_queue_master_locator.erl
new file mode 100644
index 0000000000..096b9db713
--- /dev/null
+++ b/src/rabbit_queue_master_locator.erl
@@ -0,0 +1,38 @@
+%%% The contents of this file are subject to the Mozilla Public License
+%%% Version 1.1 (the "License"); you may not use this file except in
+%%% compliance with the License. You may obtain a copy of the License at
+%%% http://www.mozilla.org/MPL/
+%%%
+%%% Software distributed under the License is distributed on an "AS IS"
+%%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%%% License for the specific language governing rights and limitations
+%%% under the License.
+%%%
+%%% The Original Code is RabbitMQ.
+%%%
+%%% @author Ayanda Dube <ayanda.dube@erlang-solutions.com>
+%%% @doc
+%%% - Queue Master Location behaviour implementation
+%%%
+%%% @end
+%%% Created : 19. Jun 2015
+%%%-------------------------------------------------------------------
+-module(rabbit_queue_master_locator).
+
+-ifdef(use_specs).
+
+-callback description() -> [proplists:property()].
+-callback queue_master_location(pid()) -> {'ok', node()} | {'error', term()}.
+-callback validate_policy(pid()) -> {'ok', node()} | {'error', term()}.
+
+-else.
+
+-export([behaviour_info/1]).
+behaviour_info(callbacks) ->
+ [ {description, 0},
+ {queue_master_location, 1},
+ {validate_policy, 1}];
+behaviour_info(_Other) ->
+ undefined.
+
+-endif.