summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-08-07 12:51:36 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-08-07 12:51:36 +0100
commit4a2f85a6506260cff20a6dd4e747b93a1641823a (patch)
tree076666112c10d2e38802d25d568ff9203cee6a7d /src
parent70a541c7d909f11ce36e357ad567f8e2e1412c03 (diff)
downloadrabbitmq-server-git-4a2f85a6506260cff20a6dd4e747b93a1641823a.tar.gz
Take account of the current nodes when selecting new nodes, and some unit tests for the node selection logic.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_misc.erl70
-rw-r--r--src/rabbit_tests.erl29
2 files changed, 70 insertions, 29 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index fe7c0442dc..0469f5f211 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -22,6 +22,8 @@
%% temp
-export([suggested_queue_nodes/1, is_mirrored/1, update_mirrors/2]).
+%% for testing
+-export([suggested_queue_nodes/4]).
-include("rabbit.hrl").
@@ -204,41 +206,51 @@ store_updated_slaves(Q = #amqqueue{slave_pids = SPids,
%%----------------------------------------------------------------------------
-%% TODO this should take account of current nodes so we don't throw
-%% away mirrors or change the master needlessly
suggested_queue_nodes(Q) ->
- case [rabbit_policy:get(P, Q) || P <- [<<"ha-mode">>, <<"ha-params">>]] of
- [{ok, <<"all">>}, _] ->
- {node(), rabbit_mnesia:all_clustered_nodes() -- [node()]};
- [{ok, <<"nodes">>}, {ok, Nodes}] ->
- case [list_to_atom(binary_to_list(Node)) || Node <- Nodes] of
- [Node] -> {Node, []};
- [First | Rest] -> {First, Rest}
- end;
- [{ok, <<"at-least">>}, {ok, Count}] ->
- {node(), lists:sublist(
- rabbit_mnesia:all_clustered_nodes(), Count) -- [node()]};
- _ ->
- {node(), []}
+ {MNode0, SNodes} = actual_queue_nodes(Q),
+ MNode = case MNode0 of
+ none -> node();
+ _ -> MNode0
+ end,
+ suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q),
+ {MNode, SNodes}, rabbit_mnesia:all_clustered_nodes()).
+
+policy(Policy, Q) ->
+ case rabbit_policy:get(Policy, Q) of
+ {ok, P} -> P;
+ _ -> none
end.
+suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, All) ->
+ {MNode, All -- [MNode]};
+suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, _All) ->
+ Nodes = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
+ case lists:member(MNode, Nodes) of
+ true -> {MNode, Nodes -- [MNode]};
+ false -> {hd(Nodes), tl(Nodes)}
+ end;
+suggested_queue_nodes(<<"at-least">>, Count, {MNode, SNodes}, All) ->
+ SCount = Count - 1,
+ {MNode, case SCount > length(SNodes) of
+ true -> Cand = (All -- [MNode]) -- SNodes,
+ SNodes ++ lists:sublist(Cand, SCount - length(SNodes));
+ false -> lists:sublist(SNodes, SCount)
+ end};
+suggested_queue_nodes(_, _, {MNode, _}, _) ->
+ {MNode, []}.
+
actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) ->
- MNode = case MPid of
- undefined -> undefined;
- _ -> node(MPid)
- end,
- SNodes = case SPids of
- undefined -> undefined;
- _ -> [node(Pid) || Pid <- SPids]
- end,
- {MNode, SNodes}.
+ {case MPid of
+ none -> none;
+ _ -> node(MPid)
+ end, [node(Pid) || Pid <- SPids]}.
is_mirrored(Q) ->
- case rabbit_policy:get(<<"ha-mode">>, Q) of
- {ok, <<"all">>} -> true;
- {ok, <<"nodes">>} -> true;
- {ok, <<"at-least">>} -> true;
- _ -> false
+ case policy(<<"ha-mode">>, Q) of
+ <<"all">> -> true;
+ <<"nodes">> -> true;
+ <<"at-least">> -> true;
+ _ -> false
end.
update_mirrors(OldQ = #amqqueue{name = QName, pid = QPid},
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index bb60bd125e..bc30fb4c26 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -52,6 +52,7 @@ all_tests() ->
passed = test_log_management_during_startup(),
passed = test_statistics(),
passed = test_arguments_parser(),
+ passed = test_dynamic_mirroring(),
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_runtime_parameters(),
@@ -856,6 +857,34 @@ test_arguments_parser() ->
passed.
+test_dynamic_mirroring() ->
+ %% Just unit tests of the node selection logic, see multi node
+ %% tests for the rest...
+ Test = fun ({NewM, NewSs}, Policy, Params, {OldM, OldSs}, All) ->
+ {NewM, NewSs0} =
+ rabbit_mirror_queue_misc:suggested_queue_nodes(
+ Policy, Params, {OldM, OldSs}, All),
+ NewSs = lists:sort(NewSs0)
+ end,
+
+ Test({a,[b,c]},<<"all">>,'_',{a,[]}, [a,b,c]),
+ Test({a,[b,c]},<<"all">>,'_',{a,[b,c]},[a,b,c]),
+ Test({a,[b,c]},<<"all">>,'_',{a,[d]}, [a,b,c]),
+
+ Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]),
+ Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]),
+ Test({b,[a,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]),
+ Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{d,[a]},[a,b,c,d]),
+
+ Test({a,[b]}, <<"at-least">>,2,{a,[]}, [a,b,c,d]),
+ Test({a,[b,c]},<<"at-least">>,3,{a,[]}, [a,b,c,d]),
+ Test({a,[c]}, <<"at-least">>,2,{a,[c]}, [a,b,c,d]),
+ Test({a,[b,c]},<<"at-least">>,3,{a,[c]}, [a,b,c,d]),
+ Test({a,[c]}, <<"at-least">>,2,{a,[c,d]},[a,b,c,d]),
+ Test({a,[c,d]},<<"at-least">>,3,{a,[c,d]},[a,b,c,d]),
+
+ passed.
+
test_cluster_management() ->
%% 'cluster' and 'reset' should only work if the app is stopped
{error, _} = control_action(cluster, []),