summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2015-08-04 00:24:47 +0300
committerMichael Klishin <mklishin@pivotal.io>2015-08-04 00:24:47 +0300
commit3f8f24bbb6f75d095f9dd68bb826afb15bc624e5 (patch)
treee1ed6d6a133931208cb4f2cf79f7c4a458b1abd5
parentb5d2e0090717a5f4559fb9a68dcffa2266347004 (diff)
parent585a9690f078d720db62898d04b6405d802bb10c (diff)
downloadrabbitmq-server-git-3f8f24bbb6f75d095f9dd68bb826afb15bc624e5.tar.gz
Merge branch 'rabbitmq-server-121'
-rw-r--r--src/rabbit_amqqueue.erl10
-rw-r--r--src/rabbit_queue_location_client_local.erl40
-rw-r--r--src/rabbit_queue_location_min_masters.erl77
-rw-r--r--src/rabbit_queue_location_random.erl44
-rw-r--r--src/rabbit_queue_location_validator.erl68
-rw-r--r--src/rabbit_queue_master_location_misc.erl96
-rw-r--r--src/rabbit_queue_master_locator.erl33
-rw-r--r--src/rabbit_registry.erl3
-rw-r--r--test/src/rabbit_tests.erl16
9 files changed, 384 insertions, 3 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 5bfa006e09..52b8ed6e06 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -274,9 +274,15 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
recoverable_slaves = [],
gm_pids = [],
state = live})),
- Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),
+
+ Node1 = case rabbit_queue_master_location_misc:get_location(Q) of
+ {ok, Node0} -> Node0;
+ {error, _} -> Node
+ end,
+
+ Node1 = rabbit_mirror_queue_misc:initial_queue_node(Q, Node1),
gen_server2:call(
- rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare),
+ rabbit_amqqueue_sup_sup:start_queue_process(Node1, Q, declare),
{init, new}, infinity).
internal_declare(Q, true) ->
diff --git a/src/rabbit_queue_location_client_local.erl b/src/rabbit_queue_location_client_local.erl
new file mode 100644
index 0000000000..4cf91abc0a
--- /dev/null
+++ b/src/rabbit_queue_location_client_local.erl
@@ -0,0 +1,40 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_queue_location_client_local).
+-behaviour(rabbit_queue_master_locator).
+
+-include("rabbit.hrl").
+
+-export([description/0, queue_master_location/1]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "locate queue master client local"},
+ {mfa, {rabbit_registry, register,
+ [queue_master_locator,
+ <<"client-local">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+
+%%---------------------------------------------------------------------------
+%% Queue Master Location Callbacks
+%%---------------------------------------------------------------------------
+
+description() ->
+ [{description, <<"Locate queue master node as the client local node">>}].
+
+queue_master_location(#amqqueue{}) -> {ok, node()}.
diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl
new file mode 100644
index 0000000000..21c3bdb045
--- /dev/null
+++ b/src/rabbit_queue_location_min_masters.erl
@@ -0,0 +1,77 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_queue_location_min_masters).
+-behaviour(rabbit_queue_master_locator).
+
+-include("rabbit.hrl").
+
+-export([description/0, queue_master_location/1]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "locate queue master min bound queues"},
+ {mfa, {rabbit_registry, register,
+ [queue_master_locator,
+ <<"min-masters">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+%%---------------------------------------------------------------------------
+%% Queue Master Location Callbacks
+%%---------------------------------------------------------------------------
+
+description() ->
+ [{description,
+ <<"Locate queue master node from cluster node with least bound queues">>}].
+
+queue_master_location(#amqqueue{}) ->
+ Cluster = rabbit_queue_master_location_misc:all_nodes(),
+ VHosts = rabbit_vhost:list(),
+ BoundQueueMasters = get_bound_queue_masters_per_vhost(VHosts, []),
+ {_Count, MinMaster}= get_min_master(Cluster, BoundQueueMasters),
+ {ok, MinMaster}.
+
+%%---------------------------------------------------------------------------
+%% Private helper functions
+%%---------------------------------------------------------------------------
+get_min_master(Cluster, BoundQueueMasters) ->
+ lists:min([ {count_masters(Node, BoundQueueMasters), Node} ||
+ Node <- Cluster ]).
+
+count_masters(Node, Masters) ->
+ length([ X || X <- Masters, X == Node ]).
+
+get_bound_queue_masters_per_vhost([], Acc) ->
+ lists:flatten(Acc);
+get_bound_queue_masters_per_vhost([VHost|RemVHosts], Acc) ->
+ Bindings = rabbit_binding:list(VHost),
+ BoundQueueMasters = get_queue_master_per_binding(VHost, Bindings, []),
+ get_bound_queue_masters_per_vhost(RemVHosts, [BoundQueueMasters|Acc]).
+
+
+get_queue_master_per_binding(_VHost, [], BoundQueueNodes) -> BoundQueueNodes;
+get_queue_master_per_binding(VHost, [#binding{destination=
+ #resource{kind=queue,
+ name=QueueName}}|
+ RemBindings],
+ QueueMastersAcc) ->
+ QueueMastersAcc0 = case rabbit_queue_master_location_misc:lookup_master(
+ QueueName, VHost) of
+ {ok, Master} when is_atom(Master) ->
+ [Master|QueueMastersAcc];
+ _ -> QueueMastersAcc
+ end,
+ get_queue_master_per_binding(VHost, RemBindings, QueueMastersAcc0).
diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl
new file mode 100644
index 0000000000..b708077cf8
--- /dev/null
+++ b/src/rabbit_queue_location_random.erl
@@ -0,0 +1,44 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_queue_location_random).
+-behaviour(rabbit_queue_master_locator).
+
+-include("rabbit.hrl").
+
+-export([description/0, queue_master_location/1]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "locate queue master random"},
+ {mfa, {rabbit_registry, register,
+ [queue_master_locator,
+ <<"random">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+%%---------------------------------------------------------------------------
+%% Queue Master Location Callbacks
+%%---------------------------------------------------------------------------
+
+description() ->
+ [{description,
+ <<"Locate queue master node from cluster in a random manner">>}].
+
+queue_master_location(#amqqueue{}) ->
+ Cluster = rabbit_queue_master_location_misc:all_nodes(),
+ RandomPos = erlang:phash(now(), length(Cluster)),
+ MasterNode = lists:nth(RandomPos, Cluster),
+ {ok, MasterNode}.
diff --git a/src/rabbit_queue_location_validator.erl b/src/rabbit_queue_location_validator.erl
new file mode 100644
index 0000000000..4eab52855a
--- /dev/null
+++ b/src/rabbit_queue_location_validator.erl
@@ -0,0 +1,68 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_queue_location_validator).
+-behaviour(rabbit_policy_validator).
+
+-include("rabbit.hrl").
+
+-export([validate_policy/1, validate_strategy/1]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "Queue location policy validation"},
+ {mfa, {rabbit_registry, register,
+ [policy_validator,
+ <<"x-queue-master-locator">>,
+ ?MODULE]}}]}).
+
+validate_policy(KeyList) ->
+ case proplists:lookup(<<"x-queue-master-locator">> , KeyList) of
+ {_, Strategy} -> validate_strategy(Strategy);
+ _ -> {error, "x-queue-master-locator undefined"}
+ end.
+
+validate_strategy(Strategy) ->
+ case module(Strategy) of
+ R={ok, _M} -> R;
+ _ ->
+ {error, "~p invalid x-queue-master-locator value", [Strategy]}
+ end.
+
+policy(Policy, Q) ->
+ case rabbit_policy:get(Policy, Q) of
+ undefined -> none;
+ P -> P
+ end.
+
+module(#amqqueue{} = Q) ->
+ case policy(<<"x-queue-master-locator">>, Q) of
+ undefined -> no_location_strategy;
+ Mode -> module(Mode)
+ end;
+
+module(Strategy) when is_binary(Strategy) ->
+ case rabbit_registry:binary_to_type(Strategy) of
+ {error, not_found} -> no_location_strategy;
+ T ->
+ case rabbit_registry:lookup_module(queue_master_locator, T) of
+ {ok, Module} ->
+ case code:which(Module) of
+ non_existing -> no_location_strategy;
+ _ -> {ok, Module}
+ end;
+ _ -> no_location_strategy
+ end
+ end.
diff --git a/src/rabbit_queue_master_location_misc.erl b/src/rabbit_queue_master_location_misc.erl
new file mode 100644
index 0000000000..279869d054
--- /dev/null
+++ b/src/rabbit_queue_master_location_misc.erl
@@ -0,0 +1,96 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_queue_master_location_misc).
+
+-include("rabbit.hrl").
+
+-export([lookup_master/2,
+ lookup_queue/2,
+ get_location/1,
+ get_location_mod_by_config/1,
+ get_location_mod_by_args/1,
+ get_location_mod_by_policy/1,
+ all_nodes/0]).
+
+lookup_master(QueueNameBin, VHostPath) when is_binary(QueueNameBin),
+ is_binary(VHostPath) ->
+ Queue = rabbit_misc:r(VHostPath, queue, QueueNameBin),
+ case rabbit_amqqueue:lookup(Queue) of
+ {ok, #amqqueue{pid=Pid}} when is_pid(Pid) ->
+ {ok, node(Pid)};
+ Error -> Error
+ end.
+
+lookup_queue(QueueNameBin, VHostPath) when is_binary(QueueNameBin),
+ is_binary(VHostPath) ->
+ Queue = rabbit_misc:r(VHostPath, queue, QueueNameBin),
+ case rabbit_amqqueue:lookup(Queue) of
+ Reply = {ok, #amqqueue{}} -> Reply;
+ Error -> Error
+ end.
+
+get_location(Queue=#amqqueue{})->
+ Reply1 = case get_location_mod_by_args(Queue) of
+ _Err1={error, _} ->
+ case get_location_mod_by_policy(Queue) of
+ _Err2={error, _} ->
+ case get_location_mod_by_config(Queue) of
+ Err3={error, _} -> Err3;
+ Reply0={ok, _Module} -> Reply0
+ end;
+ Reply0={ok, _Module} -> Reply0
+ end;
+ Reply0={ok, _Module} -> Reply0
+ end,
+
+ case Reply1 of
+ {ok, CB} -> CB:queue_master_location(Queue);
+ Error -> Error
+ end.
+
+get_location_mod_by_args(#amqqueue{arguments=Args}) ->
+ case proplists:lookup(<<"x-queue-master-locator">> , Args) of
+ {<<"x-queue-master-locator">> , Strategy} ->
+ case rabbit_queue_location_validator:validate_strategy(Strategy) of
+ Reply={ok, _CB} -> Reply;
+ Error -> Error
+ end;
+ _ -> {error, "x-queue-master-locator undefined"}
+ end.
+
+get_location_mod_by_policy(Queue=#amqqueue{}) ->
+ case rabbit_policy:get(<<"x-queue-master-locator">> , Queue) of
+ undefined -> {error, "x-queue-master-locator policy undefined"};
+ Strategy ->
+ case rabbit_queue_location_validator:validate_strategy(Strategy) of
+ Reply={ok, _CB} -> Reply;
+ Error -> Error
+ end
+ end.
+
+get_location_mod_by_config(#amqqueue{}) ->
+ case application:get_env(rabbit, queue_master_locator) of
+ {ok, Strategy} ->
+ case rabbit_queue_location_validator:validate_strategy(Strategy) of
+ Reply={ok, _CB} -> Reply;
+ Error -> Error
+ end;
+ _ -> {error, "queue_master_locator undefined"}
+ end.
+
+all_nodes() -> rabbit_mnesia:cluster_nodes(running).
+
diff --git a/src/rabbit_queue_master_locator.erl b/src/rabbit_queue_master_locator.erl
new file mode 100644
index 0000000000..caaa7bbbde
--- /dev/null
+++ b/src/rabbit_queue_master_locator.erl
@@ -0,0 +1,33 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_queue_master_locator).
+
+-ifdef(use_specs).
+
+-callback description() -> [proplists:property()].
+-callback queue_master_location(pid()) -> {'ok', node()} | {'error', term()}.
+
+-else.
+
+-export([behaviour_info/1]).
+behaviour_info(callbacks) ->
+ [{description, 0},
+ {queue_master_location, 1}];
+behaviour_info(_Other) ->
+ undefined.
+
+-endif.
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index fc016e718e..f75d839bbf 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -133,7 +133,8 @@ class_module(exchange_decorator) -> rabbit_exchange_decorator;
class_module(queue_decorator) -> rabbit_queue_decorator;
class_module(policy_validator) -> rabbit_policy_validator;
class_module(ha_mode) -> rabbit_mirror_queue_mode;
-class_module(channel_interceptor) -> rabbit_channel_interceptor.
+class_module(channel_interceptor) -> rabbit_channel_interceptor;
+class_module(queue_master_locator)-> rabbit_queue_master_locator.
%%---------------------------------------------------------------------------
diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl
index 95a8c45ffa..4c1489f6aa 100644
--- a/test/src/rabbit_tests.erl
+++ b/test/src/rabbit_tests.erl
@@ -72,6 +72,7 @@ all_tests0() ->
passed = test_policy_validation(),
passed = test_policy_opts_validation(),
passed = test_ha_policy_validation(),
+ passed = test_queue_master_location_policy_validation(),
passed = test_server_status(),
passed = test_amqp_connection_refusal(),
passed = test_confirms(),
@@ -1155,6 +1156,21 @@ test_ha_policy_validation() ->
ok = control_action(clear_policy, ["name"]),
passed.
+test_queue_master_location_policy_validation() ->
+ Set = fun (JSON) ->
+ control_action_opts( ["set_policy", "name", ".*", JSON] )
+ end,
+ OK = fun (JSON) -> ok = Set(JSON) end,
+ Fail = fun (JSON) -> error = Set(JSON) end,
+
+ OK ("{\"x-queue-master-locator\":\"min-masters\"}"),
+ OK ("{\"x-queue-master-locator\":\"client-local\"}"),
+ OK ("{\"x-queue-master-locator\":\"random\"}"),
+ Fail("{\"x-queue-master-locator\":\"made_up\"}"),
+
+ ok = control_action(clear_policy, ["name"]),
+ passed.
+
test_server_status() ->
%% create a few things so there is some useful information to list
{_Writer, Limiter, Ch} = test_channel(),