summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2016-10-11 17:25:50 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2016-10-11 17:25:50 +0200
commit172c158b94b3f4bf4a9b1f7ed622f7b83164d5c1 (patch)
treedf492a1cc74cf43d199a736c5bc68326c52e05c4
parent82036e04518ef1033e6f23f51803edd163638c94 (diff)
parent157cdaa0d0c9b7753fbfcb2dba37cd48b8101407 (diff)
downloadrabbitmq-server-git-172c158b94b3f4bf4a9b1f7ed622f7b83164d5c1.tar.gz
Merge branch 'stable' into rabbitmq-server-979
Conflicts: src/rabbit.app.src
-rw-r--r--src/pg2_fixed.erl399
-rw-r--r--src/rabbit.app.src4
-rw-r--r--src/rabbit_mirror_queue_mode_nodes.erl36
-rw-r--r--src/rabbit_variable_queue.erl38
-rw-r--r--test/dynamic_ha_SUITE.erl55
-rw-r--r--test/health_check_SUITE.erl46
6 files changed, 138 insertions, 440 deletions
diff --git a/src/pg2_fixed.erl b/src/pg2_fixed.erl
deleted file mode 100644
index 222a0bc849..0000000000
--- a/src/pg2_fixed.erl
+++ /dev/null
@@ -1,399 +0,0 @@
-%% This is the version of pg2 from R14B02, which contains the fix
-%% described at
-%% http://erlang.2086793.n4.nabble.com/pg2-still-busted-in-R13B04-td2230601.html.
-%% The changes are a search-and-replace to rename the module and avoid
-%% clashes with other versions of pg2, and also a simple rewrite of
-%% "andalso" and "orelse" expressions to case statements where the second
-%% operand is not a boolean since R12B does not allow this.
-
-%%
-%% %CopyrightBegin%
-%%
-%% Copyright Ericsson AB 1997-2010. All Rights Reserved.
-%%
-%% The contents of this file are subject to the Erlang Public License,
-%% Version 1.1, (the "License"); you may not use this file except in
-%% compliance with the License. You should have received a copy of the
-%% Erlang Public License along with this software. If not, it can be
-%% retrieved online at http://www.erlang.org/.
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% %CopyrightEnd%
-%%
--module(pg2_fixed).
-
--export([create/1, delete/1, join/2, leave/2]).
--export([get_members/1, get_local_members/1]).
--export([get_closest_pid/1, which_groups/0]).
--export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2,
- terminate/2]).
-
-%%% As of R13B03 monitors are used instead of links.
-
-%%%
-%%% Exported functions
-%%%
-
--spec start_link() -> {'ok', pid()} | {'error', term()}.
-
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
--spec start() -> {'ok', pid()} | {'error', term()}.
-
-start() ->
- ensure_started().
-
--spec create(term()) -> 'ok'.
-
-create(Name) ->
- _ = ensure_started(),
- case ets:member(pg2_fixed_table, {group, Name}) of
- false ->
- global:trans({{?MODULE, Name}, self()},
- fun() ->
- gen_server:multi_call(?MODULE, {create, Name})
- end),
- ok;
- true ->
- ok
- end.
-
--type name() :: term().
-
--spec delete(name()) -> 'ok'.
-
-delete(Name) ->
- _ = ensure_started(),
- global:trans({{?MODULE, Name}, self()},
- fun() ->
- gen_server:multi_call(?MODULE, {delete, Name})
- end),
- ok.
-
--spec join(name(), pid()) -> 'ok' | {'error', {'no_such_group', term()}}.
-
-join(Name, Pid) when is_pid(Pid) ->
- _ = ensure_started(),
- case ets:member(pg2_fixed_table, {group, Name}) of
- false ->
- {error, {no_such_group, Name}};
- true ->
- global:trans({{?MODULE, Name}, self()},
- fun() ->
- gen_server:multi_call(?MODULE,
- {join, Name, Pid})
- end),
- ok
- end.
-
--spec leave(name(), pid()) -> 'ok' | {'error', {'no_such_group', name()}}.
-
-leave(Name, Pid) when is_pid(Pid) ->
- _ = ensure_started(),
- case ets:member(pg2_fixed_table, {group, Name}) of
- false ->
- {error, {no_such_group, Name}};
- true ->
- global:trans({{?MODULE, Name}, self()},
- fun() ->
- gen_server:multi_call(?MODULE,
- {leave, Name, Pid})
- end),
- ok
- end.
-
--type get_members_ret() :: [pid()] | {'error', {'no_such_group', name()}}.
-
--spec get_members(name()) -> get_members_ret().
-
-get_members(Name) ->
- _ = ensure_started(),
- case ets:member(pg2_fixed_table, {group, Name}) of
- true ->
- group_members(Name);
- false ->
- {error, {no_such_group, Name}}
- end.
-
--spec get_local_members(name()) -> get_members_ret().
-
-get_local_members(Name) ->
- _ = ensure_started(),
- case ets:member(pg2_fixed_table, {group, Name}) of
- true ->
- local_group_members(Name);
- false ->
- {error, {no_such_group, Name}}
- end.
-
--spec which_groups() -> [name()].
-
-which_groups() ->
- _ = ensure_started(),
- all_groups().
-
--type gcp_error_reason() :: {'no_process', term()} | {'no_such_group', term()}.
-
--spec get_closest_pid(term()) -> pid() | {'error', gcp_error_reason()}.
-
-get_closest_pid(Name) ->
- case get_local_members(Name) of
- [Pid] ->
- Pid;
- [] ->
- case get_members(Name) of
- [] -> {error, {no_process, Name}};
- Members ->
- X = time_compat:erlang_system_time(micro_seconds),
- lists:nth((X rem length(Members))+1, Members)
- end;
- Members when is_list(Members) ->
- X = time_compat:erlang_system_time(micro_seconds),
- lists:nth((X rem length(Members))+1, Members);
- Else ->
- Else
- end.
-
-%%%
-%%% Callback functions from gen_server
-%%%
-
--record(state, {}).
-
--spec init([]) -> {'ok', #state{}}.
-
-init([]) ->
- Ns = nodes(),
- _ = net_kernel:monitor_nodes(true),
- lists:foreach(fun(N) ->
- {?MODULE, N} ! {new_pg2_fixed, node()},
- self() ! {nodeup, N}
- end, Ns),
- pg2_fixed_table = ets:new(pg2_fixed_table, [ordered_set, protected, named_table]),
- {ok, #state{}}.
-
--type call() :: {'create', name()}
- | {'delete', name()}
- | {'join', name(), pid()}
- | {'leave', name(), pid()}.
-
--spec handle_call(call(), _, #state{}) ->
- {'reply', 'ok', #state{}}.
-
-handle_call({create, Name}, _From, S) ->
- assure_group(Name),
- {reply, ok, S};
-handle_call({join, Name, Pid}, _From, S) ->
- case ets:member(pg2_fixed_table, {group, Name}) of
- true -> _ = join_group(Name, Pid),
- ok;
- _ -> ok
- end,
- {reply, ok, S};
-handle_call({leave, Name, Pid}, _From, S) ->
- case ets:member(pg2_fixed_table, {group, Name}) of
- true -> leave_group(Name, Pid);
- _ -> ok
- end,
- {reply, ok, S};
-handle_call({delete, Name}, _From, S) ->
- delete_group(Name),
- {reply, ok, S};
-handle_call(Request, From, S) ->
- error_logger:warning_msg("The pg2_fixed server received an unexpected message:\n"
- "handle_call(~p, ~p, _)\n",
- [Request, From]),
- {noreply, S}.
-
--type all_members() :: [[name(),...]].
--type cast() :: {'exchange', node(), all_members()}
- | {'del_member', name(), pid()}.
-
--spec handle_cast(cast(), #state{}) -> {'noreply', #state{}}.
-
-handle_cast({exchange, _Node, List}, S) ->
- store(List),
- {noreply, S};
-handle_cast(_, S) ->
- %% Ignore {del_member, Name, Pid}.
- {noreply, S}.
-
--spec handle_info(tuple(), #state{}) -> {'noreply', #state{}}.
-
-handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) ->
- member_died(MonitorRef),
- {noreply, S};
-handle_info({nodeup, Node}, S) ->
- gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
- {noreply, S};
-handle_info({new_pg2_fixed, Node}, S) ->
- gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
- {noreply, S};
-handle_info(_, S) ->
- {noreply, S}.
-
--spec terminate(term(), #state{}) -> 'ok'.
-
-terminate(_Reason, _S) ->
- true = ets:delete(pg2_fixed_table),
- ok.
-
-%%%
-%%% Local functions
-%%%
-
-%%% One ETS table, pg2_fixed_table, is used for bookkeeping. The type of the
-%%% table is ordered_set, and the fast matching of partially
-%%% instantiated keys is used extensively.
-%%%
-%%% {{group, Name}}
-%%% Process group Name.
-%%% {{ref, Pid}, RPid, MonitorRef, Counter}
-%%% {{ref, MonitorRef}, Pid}
-%%% Each process has one monitor. Sometimes a process is spawned to
-%%% monitor the pid (RPid). Counter is incremented when the Pid joins
-%%% some group.
-%%% {{member, Name, Pid}, GroupCounter}
-%%% {{local_member, Name, Pid}}
-%%% Pid is a member of group Name, GroupCounter is incremented when the
-%%% Pid joins the group Name.
-%%% {{pid, Pid, Name}}
-%%% Pid is a member of group Name.
-
-store(List) ->
- _ = [assure_group(Name)
- andalso
- [join_group(Name, P) || P <- Members -- group_members(Name)] ||
- [Name, Members] <- List],
- ok.
-
-assure_group(Name) ->
- Key = {group, Name},
- ets:member(pg2_fixed_table, Key) orelse true =:= ets:insert(pg2_fixed_table, {Key}).
-
-delete_group(Name) ->
- _ = [leave_group(Name, Pid) || Pid <- group_members(Name)],
- true = ets:delete(pg2_fixed_table, {group, Name}),
- ok.
-
-member_died(Ref) ->
- [{{ref, Ref}, Pid}] = ets:lookup(pg2_fixed_table, {ref, Ref}),
- Names = member_groups(Pid),
- _ = [leave_group(Name, P) ||
- Name <- Names,
- P <- member_in_group(Pid, Name)],
- %% Kept for backward compatibility with links. Can be removed, eventually.
- _ = [gen_server:abcast(nodes(), ?MODULE, {del_member, Name, Pid}) ||
- Name <- Names],
- ok.
-
-join_group(Name, Pid) ->
- Ref_Pid = {ref, Pid},
- try _ = ets:update_counter(pg2_fixed_table, Ref_Pid, {4, +1})
- catch _:_ ->
- {RPid, Ref} = do_monitor(Pid),
- true = ets:insert(pg2_fixed_table, {Ref_Pid, RPid, Ref, 1}),
- true = ets:insert(pg2_fixed_table, {{ref, Ref}, Pid})
- end,
- Member_Name_Pid = {member, Name, Pid},
- try _ = ets:update_counter(pg2_fixed_table, Member_Name_Pid, {2, +1, 1, 1})
- catch _:_ ->
- true = ets:insert(pg2_fixed_table, {Member_Name_Pid, 1}),
- _ = [ets:insert(pg2_fixed_table, {{local_member, Name, Pid}}) ||
- node(Pid) =:= node()],
- true = ets:insert(pg2_fixed_table, {{pid, Pid, Name}})
- end.
-
-leave_group(Name, Pid) ->
- Member_Name_Pid = {member, Name, Pid},
- try ets:update_counter(pg2_fixed_table, Member_Name_Pid, {2, -1, 0, 0}) of
- N ->
- if
- N =:= 0 ->
- true = ets:delete(pg2_fixed_table, {pid, Pid, Name}),
- _ = [ets:delete(pg2_fixed_table, {local_member, Name, Pid}) ||
- node(Pid) =:= node()],
- true = ets:delete(pg2_fixed_table, Member_Name_Pid);
- true ->
- ok
- end,
- Ref_Pid = {ref, Pid},
- case ets:update_counter(pg2_fixed_table, Ref_Pid, {4, -1}) of
- 0 ->
- [{Ref_Pid,RPid,Ref,0}] = ets:lookup(pg2_fixed_table, Ref_Pid),
- true = ets:delete(pg2_fixed_table, {ref, Ref}),
- true = ets:delete(pg2_fixed_table, Ref_Pid),
- true = erlang:demonitor(Ref, [flush]),
- kill_monitor_proc(RPid, Pid);
- _ ->
- ok
- end
- catch _:_ ->
- ok
- end.
-
-all_members() ->
- [[G, group_members(G)] || G <- all_groups()].
-
-group_members(Name) ->
- [P ||
- [P, N] <- ets:match(pg2_fixed_table, {{member, Name, '$1'},'$2'}),
- _ <- lists:seq(1, N)].
-
-local_group_members(Name) ->
- [P ||
- [Pid] <- ets:match(pg2_fixed_table, {{local_member, Name, '$1'}}),
- P <- member_in_group(Pid, Name)].
-
-member_in_group(Pid, Name) ->
- case ets:lookup(pg2_fixed_table, {member, Name, Pid}) of
- [] -> [];
- [{{member, Name, Pid}, N}] ->
- lists:duplicate(N, Pid)
- end.
-
-member_groups(Pid) ->
- [Name || [Name] <- ets:match(pg2_fixed_table, {{pid, Pid, '$1'}})].
-
-all_groups() ->
- [N || [N] <- ets:match(pg2_fixed_table, {{group,'$1'}})].
-
-ensure_started() ->
- case whereis(?MODULE) of
- undefined ->
- C = {pg2_fixed, {?MODULE, start_link, []}, permanent,
- 1000, worker, [?MODULE]},
- supervisor:start_child(kernel_safe_sup, C);
- Pg2_FixedPid ->
- {ok, Pg2_FixedPid}
- end.
-
-
-kill_monitor_proc(RPid, Pid) ->
- case RPid of
- Pid -> ok;
- _ -> exit(RPid, kill)
- end.
-
-%% When/if erlang:monitor() returns before trying to connect to the
-%% other node this function can be removed.
-do_monitor(Pid) ->
- case (node(Pid) =:= node()) orelse lists:member(node(Pid), nodes()) of
- true ->
- %% Assume the node is still up
- {Pid, erlang:monitor(process, Pid)};
- false ->
- F = fun() ->
- Ref = erlang:monitor(process, Pid),
- receive
- {'DOWN', Ref, process, Pid, _Info} ->
- exit(normal)
- end
- end,
- erlang:spawn_monitor(F)
- end.
diff --git a/src/rabbit.app.src b/src/rabbit.app.src
index cc33c70b62..883b71cb77 100644
--- a/src/rabbit.app.src
+++ b/src/rabbit.app.src
@@ -105,5 +105,7 @@
{hash, sha512},
{iterations, 1000},
{passphrase, undefined}
- ]}
+ ]},
+ %% rabbitmq-server-973
+ {lazy_queue_explicit_gc_run_operation_threshold, 250}
]}]}.
diff --git a/src/rabbit_mirror_queue_mode_nodes.erl b/src/rabbit_mirror_queue_mode_nodes.erl
index e63f340373..31c55722a5 100644
--- a/src/rabbit_mirror_queue_mode_nodes.erl
+++ b/src/rabbit_mirror_queue_mode_nodes.erl
@@ -32,29 +32,37 @@
description() ->
[{description, <<"Mirror queue to specified nodes">>}].
-suggested_queue_nodes(Nodes0, MNode, _SNodes, SSNodes, Poss) ->
- Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
+suggested_queue_nodes(PolicyNodes0, CurrentMaster, _SNodes, SSNodes, NodesRunningRabbitMQ) ->
+ PolicyNodes1 = [list_to_atom(binary_to_list(Node)) || Node <- PolicyNodes0],
%% If the current master is not in the nodes specified, then what we want
%% to do depends on whether there are any synchronised slaves. If there
%% are then we can just kill the current master - the admin has asked for
%% a migration and we should give it to them. If there are not however
%% then we must keep the master around so as not to lose messages.
- Nodes = case SSNodes of
- [] -> lists:usort([MNode | Nodes1]);
- _ -> Nodes1
- end,
- Unavailable = Nodes -- Poss,
- Available = Nodes -- Unavailable,
- case Available of
+
+ PolicyNodes = case SSNodes of
+ [] -> lists:usort([CurrentMaster | PolicyNodes1]);
+ _ -> PolicyNodes1
+ end,
+ Unavailable = PolicyNodes -- NodesRunningRabbitMQ,
+ AvailablePolicyNodes = PolicyNodes -- Unavailable,
+ case AvailablePolicyNodes of
[] -> %% We have never heard of anything? Not much we can do but
%% keep the master alive.
- {MNode, []};
- _ -> case lists:member(MNode, Available) of
- true -> {MNode, Available -- [MNode]};
+ {CurrentMaster, []};
+ _ -> case lists:member(CurrentMaster, AvailablePolicyNodes) of
+ true -> {CurrentMaster,
+ AvailablePolicyNodes -- [CurrentMaster]};
false -> %% Make sure the new master is synced! In order to
%% get here SSNodes must not be empty.
- [NewMNode | _] = SSNodes,
- {NewMNode, Available -- [NewMNode]}
+ SyncPolicyNodes = [Node ||
+ Node <- AvailablePolicyNodes,
+ lists:member(Node, SSNodes)],
+ NewMaster = case SyncPolicyNodes of
+ [Node | _] -> Node;
+ [] -> erlang:hd(SSNodes)
+ end,
+ {NewMaster, AvailablePolicyNodes -- [NewMaster]}
end
end.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 297df086ad..3ce889fba5 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -306,7 +306,11 @@
io_batch_size,
%% default queue or lazy queue
- mode
+ mode,
+ %% number of reduce_memory_usage executions, once it
+ %% reaches a threshold the queue will manually trigger a runtime GC
+ %% see: maybe_execute_gc/1
+ memory_reduction_run_count
}).
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -402,7 +406,8 @@
disk_write_count :: non_neg_integer(),
io_batch_size :: pos_integer(),
- mode :: 'default' | 'lazy' }.
+ mode :: 'default' | 'lazy',
+ memory_reduction_run_count :: non_neg_integer()}.
%% Duplicated from rabbit_backing_queue
-spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}.
@@ -427,6 +432,21 @@
%% rabbit_amqqueue_process need fairly fresh rates.
-define(MSGS_PER_RATE_CALC, 100).
+
+%% we define the garbage collector threshold
+%% it needs to tune the GC calls inside `reduce_memory_use`
+%% see: rabbitmq-server-973 and `maybe_execute_gc` function
+-define(DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD, 250).
+-define(EXPLICIT_GC_RUN_OP_THRESHOLD,
+ case get(explicit_gc_run_operation_threshold) of
+ undefined ->
+ Val = rabbit_misc:get_env(rabbit, lazy_queue_explicit_gc_run_operation_threshold,
+ ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD),
+ put(explicit_gc_run_operation_threshold, Val),
+ Val;
+ Val -> Val
+ end).
+
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
@@ -1330,7 +1350,8 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
io_batch_size = IoBatchSize,
- mode = default },
+ mode = default,
+ memory_reduction_run_count = 0},
a(maybe_deltas_to_betas(State)).
blank_rates(Now) ->
@@ -2264,6 +2285,14 @@ ifold(Fun, Acc, Its, State) ->
%% Phase changes
%%----------------------------------------------------------------------------
+maybe_execute_gc(State = #vqstate {memory_reduction_run_count = MRedRunCount}) ->
+ case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD of
+ true -> garbage_collect(),
+ State#vqstate{memory_reduction_run_count = 0};
+ false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1}
+
+ end.
+
reduce_memory_use(State = #vqstate { target_ram_count = infinity }) ->
State;
reduce_memory_use(State = #vqstate {
@@ -2336,8 +2365,7 @@ reduce_memory_use(State = #vqstate {
S2 ->
push_betas_to_deltas(S2, State1)
end,
- garbage_collect(),
- State3.
+ maybe_execute_gc(State3).
limit_ram_acks(0, State) ->
{0, ui(State)};
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
index bba7fad707..502e3a7e86 100644
--- a/test/dynamic_ha_SUITE.erl
+++ b/test/dynamic_ha_SUITE.erl
@@ -61,7 +61,8 @@ groups() ->
]},
{cluster_size_3, [], [
change_policy,
- rapid_change
+ rapid_change,
+ nodes_policy_should_pick_master_from_its_params
% FIXME: Re-enable those tests when the know issues are
% fixed.
%failing_random_policies,
@@ -258,6 +259,48 @@ promote_on_shutdown(Config) ->
durable = true}),
ok.
+nodes_policy_should_pick_master_from_its_params(Config) ->
+ [A | _] = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, A),
+ ?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A],
+ [all])),
+ %% --> Master: A
+ %% Slaves: [B, C] or [C, B]
+ Info = find_queue(?QNAME, A),
+ SSPids = proplists:get_value(synchronised_slave_pids, Info),
+
+ %% Choose slave that isn't the first sync slave. Cover a bug that always
+ %% chose the first, even if it was not part of the policy
+ LastSlave = node(lists:last(SSPids)),
+ ?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A],
+ [{nodes, [LastSlave]}])),
+ %% --> Master: B or C (depends on the order of current slaves)
+ %% Slaves: []
+
+ %% Now choose a new master that isn't synchronised. The previous
+ %% policy made sure that the queue only runs on one node (the last
+ %% from the initial synchronised list). Thus, by taking the first
+ %% node from this list, we know it is not synchronised.
+ %%
+ %% Because the policy doesn't cover any synchronised slave, RabbitMQ
+ %% should instead use an existing synchronised slave as the new master,
+ %% even though that isn't in the policy.
+ ?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A],
+ [{nodes, [LastSlave, A]}])),
+ %% --> Master: B or C (same as previous policy)
+ %% Slaves: [A]
+
+ NewMaster = node(erlang:hd(SSPids)),
+ ?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A],
+ [{nodes, [NewMaster]}])),
+ %% --> Master: B or C (the other one compared to previous policy)
+ %% Slaves: []
+
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?QNAME}),
+ _ = rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY).
+
random_policy(Config) ->
run_proper(fun prop_random_policy/1, [Config]).
@@ -364,9 +407,8 @@ prop_random_policy(Config) ->
Policies, non_empty(list(policy_gen(Nodes))),
test_random_policy(Config, Nodes, Policies)).
-test_random_policy(Config, Nodes, Policies) ->
+apply_policy_to_declared_queue(Config, Ch, Nodes, Policies) ->
[NodeA | _] = Nodes,
- Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA),
amqp_channel:call(Ch, #'queue.declare'{queue = ?QNAME}),
%% Add some load so mirrors can be busy synchronising
rabbit_ct_client_helpers:publish(Ch, ?QNAME, 100000),
@@ -375,7 +417,12 @@ test_random_policy(Config, Nodes, Policies) ->
%% Give it some time to generate all internal notifications
timer:sleep(2000),
%% Check the result
- Result = wait_for_last_policy(?QNAME, NodeA, Policies, 30),
+ wait_for_last_policy(?QNAME, NodeA, Policies, 30).
+
+test_random_policy(Config, Nodes, Policies) ->
+ [NodeA | _] = Nodes,
+ Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA),
+ Result = apply_policy_to_declared_queue(Config, Ch, Nodes, Policies),
%% Cleanup
amqp_channel:call(Ch, #'queue.delete'{queue = ?QNAME}),
_ = rabbit_ct_broker_helpers:clear_policy(Config, NodeA, ?POLICY),
diff --git a/test/health_check_SUITE.erl b/test/health_check_SUITE.erl
index 44d42134ce..13373d79d4 100644
--- a/test/health_check_SUITE.erl
+++ b/test/health_check_SUITE.erl
@@ -22,6 +22,8 @@
,groups/0
,init_per_suite/1
,end_per_suite/1
+ ,init_per_group/2
+ ,end_per_group/2
,init_per_testcase/2
,end_per_testcase/2
]).
@@ -53,6 +55,10 @@ groups() ->
,ignores_stuck_remote_node_monitor
]}].
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config).
@@ -60,24 +66,30 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
-init_per_testcase(Testcase, Config0) ->
- rabbit_ct_helpers:testcase_started(Config0, Testcase),
- Config1 = rabbit_ct_helpers:set_config(
- Config0, [{rmq_nodes_count, 2},
- {rmq_nodes_clustered, true}]),
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ ClusterSize = 2,
+ TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, ClusterSize},
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}
+ ]),
rabbit_ct_helpers:run_steps(Config1,
- rabbit_ct_broker_helpers:setup_steps() ++
- rabbit_ct_client_helpers:setup_steps()).
-
-end_per_testcase(Testcase, Config0) ->
- Config1 = case rabbit_ct_helpers:get_config(Config0, save_config) of
- undefined -> Config0;
- C -> C
- end,
- Config2 = rabbit_ct_helpers:run_steps(Config1,
- rabbit_ct_client_helpers:teardown_steps() ++
- rabbit_ct_broker_helpers:teardown_steps()),
- rabbit_ct_helpers:testcase_finished(Config2, Testcase).
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
%%----------------------------------------------------------------------------
%% Test cases