diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-08-07 12:51:36 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-08-07 12:51:36 +0100 |
| commit | 4a2f85a6506260cff20a6dd4e747b93a1641823a (patch) | |
| tree | 076666112c10d2e38802d25d568ff9203cee6a7d /src | |
| parent | 70a541c7d909f11ce36e357ad567f8e2e1412c03 (diff) | |
| download | rabbitmq-server-git-4a2f85a6506260cff20a6dd4e747b93a1641823a.tar.gz | |
Take account of the current nodes when selecting new nodes, and some unit tests for the node selection logic.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 70 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 29 |
2 files changed, 70 insertions, 29 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index fe7c0442dc..0469f5f211 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -22,6 +22,8 @@ %% temp -export([suggested_queue_nodes/1, is_mirrored/1, update_mirrors/2]). +%% for testing +-export([suggested_queue_nodes/4]). -include("rabbit.hrl"). @@ -204,41 +206,51 @@ store_updated_slaves(Q = #amqqueue{slave_pids = SPids, %%---------------------------------------------------------------------------- -%% TODO this should take account of current nodes so we don't throw -%% away mirrors or change the master needlessly suggested_queue_nodes(Q) -> - case [rabbit_policy:get(P, Q) || P <- [<<"ha-mode">>, <<"ha-params">>]] of - [{ok, <<"all">>}, _] -> - {node(), rabbit_mnesia:all_clustered_nodes() -- [node()]}; - [{ok, <<"nodes">>}, {ok, Nodes}] -> - case [list_to_atom(binary_to_list(Node)) || Node <- Nodes] of - [Node] -> {Node, []}; - [First | Rest] -> {First, Rest} - end; - [{ok, <<"at-least">>}, {ok, Count}] -> - {node(), lists:sublist( - rabbit_mnesia:all_clustered_nodes(), Count) -- [node()]}; - _ -> - {node(), []} + {MNode0, SNodes} = actual_queue_nodes(Q), + MNode = case MNode0 of + none -> node(); + _ -> MNode0 + end, + suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q), + {MNode, SNodes}, rabbit_mnesia:all_clustered_nodes()). + +policy(Policy, Q) -> + case rabbit_policy:get(Policy, Q) of + {ok, P} -> P; + _ -> none end. +suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, All) -> + {MNode, All -- [MNode]}; +suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, _All) -> + Nodes = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0], + case lists:member(MNode, Nodes) of + true -> {MNode, Nodes -- [MNode]}; + false -> {hd(Nodes), tl(Nodes)} + end; +suggested_queue_nodes(<<"at-least">>, Count, {MNode, SNodes}, All) -> + SCount = Count - 1, + {MNode, case SCount > length(SNodes) of + true -> Cand = (All -- [MNode]) -- SNodes, + SNodes ++ lists:sublist(Cand, SCount - length(SNodes)); + false -> lists:sublist(SNodes, SCount) + end}; +suggested_queue_nodes(_, _, {MNode, _}, _) -> + {MNode, []}. + actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) -> - MNode = case MPid of - undefined -> undefined; - _ -> node(MPid) - end, - SNodes = case SPids of - undefined -> undefined; - _ -> [node(Pid) || Pid <- SPids] - end, - {MNode, SNodes}. + {case MPid of + none -> none; + _ -> node(MPid) + end, [node(Pid) || Pid <- SPids]}. is_mirrored(Q) -> - case rabbit_policy:get(<<"ha-mode">>, Q) of - {ok, <<"all">>} -> true; - {ok, <<"nodes">>} -> true; - {ok, <<"at-least">>} -> true; - _ -> false + case policy(<<"ha-mode">>, Q) of + <<"all">> -> true; + <<"nodes">> -> true; + <<"at-least">> -> true; + _ -> false end. update_mirrors(OldQ = #amqqueue{name = QName, pid = QPid}, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index bb60bd125e..bc30fb4c26 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -52,6 +52,7 @@ all_tests() -> passed = test_log_management_during_startup(), passed = test_statistics(), passed = test_arguments_parser(), + passed = test_dynamic_mirroring(), passed = test_cluster_management(), passed = test_user_management(), passed = test_runtime_parameters(), @@ -856,6 +857,34 @@ test_arguments_parser() -> passed. +test_dynamic_mirroring() -> + %% Just unit tests of the node selection logic, see multi node + %% tests for the rest... + Test = fun ({NewM, NewSs}, Policy, Params, {OldM, OldSs}, All) -> + {NewM, NewSs0} = + rabbit_mirror_queue_misc:suggested_queue_nodes( + Policy, Params, {OldM, OldSs}, All), + NewSs = lists:sort(NewSs0) + end, + + Test({a,[b,c]},<<"all">>,'_',{a,[]}, [a,b,c]), + Test({a,[b,c]},<<"all">>,'_',{a,[b,c]},[a,b,c]), + Test({a,[b,c]},<<"all">>,'_',{a,[d]}, [a,b,c]), + + Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]), + Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]), + Test({b,[a,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]), + Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{d,[a]},[a,b,c,d]), + + Test({a,[b]}, <<"at-least">>,2,{a,[]}, [a,b,c,d]), + Test({a,[b,c]},<<"at-least">>,3,{a,[]}, [a,b,c,d]), + Test({a,[c]}, <<"at-least">>,2,{a,[c]}, [a,b,c,d]), + Test({a,[b,c]},<<"at-least">>,3,{a,[c]}, [a,b,c,d]), + Test({a,[c]}, <<"at-least">>,2,{a,[c,d]},[a,b,c,d]), + Test({a,[c,d]},<<"at-least">>,3,{a,[c,d]},[a,b,c,d]), + + passed. + test_cluster_management() -> %% 'cluster' and 'reset' should only work if the app is stopped {error, _} = control_action(cluster, []), |
