summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2016-10-04 16:59:16 +0300
committerMichael Klishin <michael@clojurewerkz.org>2016-10-04 16:59:16 +0300
commitf75bc0971ea24efc67ca42a9e3d81de32f6691ca (patch)
treee8e385fa8c2ded90763fef8ab5a96beffd0f6d90
parent8b500cd2ef48787bff4208c6abfd2e95ca1e7531 (diff)
parenta980106a85afd40b9f773feba1d2a0ef626655ef (diff)
downloadrabbitmq-server-git-f75bc0971ea24efc67ca42a9e3d81de32f6691ca.tar.gz
Merge branch 'master' into rabbitmq-server-486
Conflicts: src/rabbit.app.src
-rw-r--r--src/pg2_fixed.erl399
-rw-r--r--src/rabbit.app.src5
-rw-r--r--src/rabbit_mirror_queue_sync.erl12
-rw-r--r--src/rabbit_variable_queue.erl38
-rw-r--r--test/health_check_SUITE.erl48
-rw-r--r--test/partitions_SUITE.erl16
6 files changed, 87 insertions, 431 deletions
diff --git a/src/pg2_fixed.erl b/src/pg2_fixed.erl
deleted file mode 100644
index 73c05819d4..0000000000
--- a/src/pg2_fixed.erl
+++ /dev/null
@@ -1,399 +0,0 @@
-%% This is the version of pg2 from R14B02, which contains the fix
-%% described at
-%% http://erlang.2086793.n4.nabble.com/pg2-still-busted-in-R13B04-td2230601.html.
-%% The changes are a search-and-replace to rename the module and avoid
-%% clashes with other versions of pg2, and also a simple rewrite of
-%% "andalso" and "orelse" expressions to case statements where the second
-%% operand is not a boolean since R12B does not allow this.
-
-%%
-%% %CopyrightBegin%
-%%
-%% Copyright Ericsson AB 1997-2010. All Rights Reserved.
-%%
-%% The contents of this file are subject to the Erlang Public License,
-%% Version 1.1, (the "License"); you may not use this file except in
-%% compliance with the License. You should have received a copy of the
-%% Erlang Public License along with this software. If not, it can be
-%% retrieved online at http://www.erlang.org/.
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% %CopyrightEnd%
-%%
--module(pg2_fixed).
-
--export([create/1, delete/1, join/2, leave/2]).
--export([get_members/1, get_local_members/1]).
--export([get_closest_pid/1, which_groups/0]).
--export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2,
- terminate/2]).
-
-%%% As of R13B03 monitors are used instead of links.
-
-%%%
-%%% Exported functions
-%%%
-
--spec start_link() -> {'ok', pid()} | {'error', term()}.
-
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
--spec start() -> {'ok', pid()} | {'error', term()}.
-
-start() ->
- ensure_started().
-
--spec create(term()) -> 'ok'.
-
-create(Name) ->
- _ = ensure_started(),
- case ets:member(pg2_fixed_table, {group, Name}) of
- false ->
- global:trans({{?MODULE, Name}, self()},
- fun() ->
- gen_server:multi_call(?MODULE, {create, Name})
- end),
- ok;
- true ->
- ok
- end.
-
--type name() :: term().
-
--spec delete(name()) -> 'ok'.
-
-delete(Name) ->
- _ = ensure_started(),
- global:trans({{?MODULE, Name}, self()},
- fun() ->
- gen_server:multi_call(?MODULE, {delete, Name})
- end),
- ok.
-
--spec join(name(), pid()) -> 'ok' | {'error', {'no_such_group', term()}}.
-
-join(Name, Pid) when is_pid(Pid) ->
- _ = ensure_started(),
- case ets:member(pg2_fixed_table, {group, Name}) of
- false ->
- {error, {no_such_group, Name}};
- true ->
- global:trans({{?MODULE, Name}, self()},
- fun() ->
- gen_server:multi_call(?MODULE,
- {join, Name, Pid})
- end),
- ok
- end.
-
--spec leave(name(), pid()) -> 'ok' | {'error', {'no_such_group', name()}}.
-
-leave(Name, Pid) when is_pid(Pid) ->
- _ = ensure_started(),
- case ets:member(pg2_fixed_table, {group, Name}) of
- false ->
- {error, {no_such_group, Name}};
- true ->
- global:trans({{?MODULE, Name}, self()},
- fun() ->
- gen_server:multi_call(?MODULE,
- {leave, Name, Pid})
- end),
- ok
- end.
-
--type get_members_ret() :: [pid()] | {'error', {'no_such_group', name()}}.
-
--spec get_members(name()) -> get_members_ret().
-
-get_members(Name) ->
- _ = ensure_started(),
- case ets:member(pg2_fixed_table, {group, Name}) of
- true ->
- group_members(Name);
- false ->
- {error, {no_such_group, Name}}
- end.
-
--spec get_local_members(name()) -> get_members_ret().
-
-get_local_members(Name) ->
- _ = ensure_started(),
- case ets:member(pg2_fixed_table, {group, Name}) of
- true ->
- local_group_members(Name);
- false ->
- {error, {no_such_group, Name}}
- end.
-
--spec which_groups() -> [name()].
-
-which_groups() ->
- _ = ensure_started(),
- all_groups().
-
--type gcp_error_reason() :: {'no_process', term()} | {'no_such_group', term()}.
-
--spec get_closest_pid(term()) -> pid() | {'error', gcp_error_reason()}.
-
-get_closest_pid(Name) ->
- case get_local_members(Name) of
- [Pid] ->
- Pid;
- [] ->
- case get_members(Name) of
- [] -> {error, {no_process, Name}};
- Members ->
- X = erlang:system_time(micro_seconds),
- lists:nth((X rem length(Members))+1, Members)
- end;
- Members when is_list(Members) ->
- X = erlang:system_time(micro_seconds),
- lists:nth((X rem length(Members))+1, Members);
- Else ->
- Else
- end.
-
-%%%
-%%% Callback functions from gen_server
-%%%
-
--record(state, {}).
-
--spec init([]) -> {'ok', #state{}}.
-
-init([]) ->
- Ns = nodes(),
- _ = net_kernel:monitor_nodes(true),
- lists:foreach(fun(N) ->
- {?MODULE, N} ! {new_pg2_fixed, node()},
- self() ! {nodeup, N}
- end, Ns),
- pg2_fixed_table = ets:new(pg2_fixed_table, [ordered_set, protected, named_table]),
- {ok, #state{}}.
-
--type call() :: {'create', name()}
- | {'delete', name()}
- | {'join', name(), pid()}
- | {'leave', name(), pid()}.
-
--spec handle_call(call(), _, #state{}) ->
- {'reply', 'ok', #state{}}.
-
-handle_call({create, Name}, _From, S) ->
- assure_group(Name),
- {reply, ok, S};
-handle_call({join, Name, Pid}, _From, S) ->
- case ets:member(pg2_fixed_table, {group, Name}) of
- true -> _ = join_group(Name, Pid),
- ok;
- _ -> ok
- end,
- {reply, ok, S};
-handle_call({leave, Name, Pid}, _From, S) ->
- case ets:member(pg2_fixed_table, {group, Name}) of
- true -> leave_group(Name, Pid);
- _ -> ok
- end,
- {reply, ok, S};
-handle_call({delete, Name}, _From, S) ->
- delete_group(Name),
- {reply, ok, S};
-handle_call(Request, From, S) ->
- error_logger:warning_msg("The pg2_fixed server received an unexpected message:\n"
- "handle_call(~p, ~p, _)\n",
- [Request, From]),
- {noreply, S}.
-
--type all_members() :: [[name(),...]].
--type cast() :: {'exchange', node(), all_members()}
- | {'del_member', name(), pid()}.
-
--spec handle_cast(cast(), #state{}) -> {'noreply', #state{}}.
-
-handle_cast({exchange, _Node, List}, S) ->
- store(List),
- {noreply, S};
-handle_cast(_, S) ->
- %% Ignore {del_member, Name, Pid}.
- {noreply, S}.
-
--spec handle_info(tuple(), #state{}) -> {'noreply', #state{}}.
-
-handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) ->
- member_died(MonitorRef),
- {noreply, S};
-handle_info({nodeup, Node}, S) ->
- gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
- {noreply, S};
-handle_info({new_pg2_fixed, Node}, S) ->
- gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
- {noreply, S};
-handle_info(_, S) ->
- {noreply, S}.
-
--spec terminate(term(), #state{}) -> 'ok'.
-
-terminate(_Reason, _S) ->
- true = ets:delete(pg2_fixed_table),
- ok.
-
-%%%
-%%% Local functions
-%%%
-
-%%% One ETS table, pg2_fixed_table, is used for bookkeeping. The type of the
-%%% table is ordered_set, and the fast matching of partially
-%%% instantiated keys is used extensively.
-%%%
-%%% {{group, Name}}
-%%% Process group Name.
-%%% {{ref, Pid}, RPid, MonitorRef, Counter}
-%%% {{ref, MonitorRef}, Pid}
-%%% Each process has one monitor. Sometimes a process is spawned to
-%%% monitor the pid (RPid). Counter is incremented when the Pid joins
-%%% some group.
-%%% {{member, Name, Pid}, GroupCounter}
-%%% {{local_member, Name, Pid}}
-%%% Pid is a member of group Name, GroupCounter is incremented when the
-%%% Pid joins the group Name.
-%%% {{pid, Pid, Name}}
-%%% Pid is a member of group Name.
-
-store(List) ->
- _ = [assure_group(Name)
- andalso
- [join_group(Name, P) || P <- Members -- group_members(Name)] ||
- [Name, Members] <- List],
- ok.
-
-assure_group(Name) ->
- Key = {group, Name},
- ets:member(pg2_fixed_table, Key) orelse true =:= ets:insert(pg2_fixed_table, {Key}).
-
-delete_group(Name) ->
- _ = [leave_group(Name, Pid) || Pid <- group_members(Name)],
- true = ets:delete(pg2_fixed_table, {group, Name}),
- ok.
-
-member_died(Ref) ->
- [{{ref, Ref}, Pid}] = ets:lookup(pg2_fixed_table, {ref, Ref}),
- Names = member_groups(Pid),
- _ = [leave_group(Name, P) ||
- Name <- Names,
- P <- member_in_group(Pid, Name)],
- %% Kept for backward compatibility with links. Can be removed, eventually.
- _ = [gen_server:abcast(nodes(), ?MODULE, {del_member, Name, Pid}) ||
- Name <- Names],
- ok.
-
-join_group(Name, Pid) ->
- Ref_Pid = {ref, Pid},
- try _ = ets:update_counter(pg2_fixed_table, Ref_Pid, {4, +1})
- catch _:_ ->
- {RPid, Ref} = do_monitor(Pid),
- true = ets:insert(pg2_fixed_table, {Ref_Pid, RPid, Ref, 1}),
- true = ets:insert(pg2_fixed_table, {{ref, Ref}, Pid})
- end,
- Member_Name_Pid = {member, Name, Pid},
- try _ = ets:update_counter(pg2_fixed_table, Member_Name_Pid, {2, +1, 1, 1})
- catch _:_ ->
- true = ets:insert(pg2_fixed_table, {Member_Name_Pid, 1}),
- _ = [ets:insert(pg2_fixed_table, {{local_member, Name, Pid}}) ||
- node(Pid) =:= node()],
- true = ets:insert(pg2_fixed_table, {{pid, Pid, Name}})
- end.
-
-leave_group(Name, Pid) ->
- Member_Name_Pid = {member, Name, Pid},
- try ets:update_counter(pg2_fixed_table, Member_Name_Pid, {2, -1, 0, 0}) of
- N ->
- if
- N =:= 0 ->
- true = ets:delete(pg2_fixed_table, {pid, Pid, Name}),
- _ = [ets:delete(pg2_fixed_table, {local_member, Name, Pid}) ||
- node(Pid) =:= node()],
- true = ets:delete(pg2_fixed_table, Member_Name_Pid);
- true ->
- ok
- end,
- Ref_Pid = {ref, Pid},
- case ets:update_counter(pg2_fixed_table, Ref_Pid, {4, -1}) of
- 0 ->
- [{Ref_Pid,RPid,Ref,0}] = ets:lookup(pg2_fixed_table, Ref_Pid),
- true = ets:delete(pg2_fixed_table, {ref, Ref}),
- true = ets:delete(pg2_fixed_table, Ref_Pid),
- true = erlang:demonitor(Ref, [flush]),
- kill_monitor_proc(RPid, Pid);
- _ ->
- ok
- end
- catch _:_ ->
- ok
- end.
-
-all_members() ->
- [[G, group_members(G)] || G <- all_groups()].
-
-group_members(Name) ->
- [P ||
- [P, N] <- ets:match(pg2_fixed_table, {{member, Name, '$1'},'$2'}),
- _ <- lists:seq(1, N)].
-
-local_group_members(Name) ->
- [P ||
- [Pid] <- ets:match(pg2_fixed_table, {{local_member, Name, '$1'}}),
- P <- member_in_group(Pid, Name)].
-
-member_in_group(Pid, Name) ->
- case ets:lookup(pg2_fixed_table, {member, Name, Pid}) of
- [] -> [];
- [{{member, Name, Pid}, N}] ->
- lists:duplicate(N, Pid)
- end.
-
-member_groups(Pid) ->
- [Name || [Name] <- ets:match(pg2_fixed_table, {{pid, Pid, '$1'}})].
-
-all_groups() ->
- [N || [N] <- ets:match(pg2_fixed_table, {{group,'$1'}})].
-
-ensure_started() ->
- case whereis(?MODULE) of
- undefined ->
- C = {pg2_fixed, {?MODULE, start_link, []}, permanent,
- 1000, worker, [?MODULE]},
- supervisor:start_child(kernel_safe_sup, C);
- Pg2_FixedPid ->
- {ok, Pg2_FixedPid}
- end.
-
-
-kill_monitor_proc(RPid, Pid) ->
- case RPid of
- Pid -> ok;
- _ -> exit(RPid, kill)
- end.
-
-%% When/if erlang:monitor() returns before trying to connect to the
-%% other node this function can be removed.
-do_monitor(Pid) ->
- case (node(Pid) =:= node()) orelse lists:member(node(Pid), nodes()) of
- true ->
- %% Assume the node is still up
- {Pid, erlang:monitor(process, Pid)};
- false ->
- F = fun() ->
- Ref = erlang:monitor(process, Pid),
- receive
- {'DOWN', Ref, process, Pid, _Info} ->
- exit(normal)
- end
- end,
- erlang:spawn_monitor(F)
- end.
diff --git a/src/rabbit.app.src b/src/rabbit.app.src
index 1a894c3620..87401662c8 100644
--- a/src/rabbit.app.src
+++ b/src/rabbit.app.src
@@ -106,5 +106,8 @@
%% see rabbitmq-server#486
{peer_discovery_backend, rabbit_peer_discovery_classic_config},
%% used by rabbit_peer_discovery_classic_config
- {cluster_nodes, {[], disc}}
+ {cluster_nodes, {[], disc}},
+
+ %% rabbitmq-server-973
+ {lazy_queue_explicit_gc_run_operation_threshold, 250}
]}]}.
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index ad1a25a425..d6357e0dc0 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -248,9 +248,15 @@ syncer_loop(Ref, MPid, SPids) ->
syncer_loop(Ref, MPid, SPids);
{msgs, Ref, Msgs} ->
SPids1 = wait_for_credit(SPids),
- broadcast(SPids1, {sync_msgs, Ref, Msgs}),
- MPid ! {next, Ref},
- syncer_loop(Ref, MPid, SPids1);
+ case SPids1 of
+ [] ->
+ % Die silently because there are no slaves left.
+ ok;
+ _ ->
+ broadcast(SPids1, {sync_msgs, Ref, Msgs}),
+ MPid ! {next, Ref},
+ syncer_loop(Ref, MPid, SPids1)
+ end;
{cancel, Ref} ->
%% We don't tell the slaves we will die - so when we do
%% they interpret that as a failure, which is what we
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 3267b02bea..03381be311 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -306,7 +306,11 @@
io_batch_size,
%% default queue or lazy queue
- mode
+ mode,
+ %% number of reduce_memory_usage executions, once it
+ %% reaches a threshold the queue will manually trigger a runtime GC
+ %% see: maybe_execute_gc/1
+ memory_reduction_run_count
}).
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -402,7 +406,8 @@
disk_write_count :: non_neg_integer(),
io_batch_size :: pos_integer(),
- mode :: 'default' | 'lazy' }.
+ mode :: 'default' | 'lazy',
+ memory_reduction_run_count :: non_neg_integer()}.
%% Duplicated from rabbit_backing_queue
-spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}.
@@ -427,6 +432,21 @@
%% rabbit_amqqueue_process need fairly fresh rates.
-define(MSGS_PER_RATE_CALC, 100).
+
+%% we define the garbage collector threshold
+%% it needs to tune the GC calls inside `reduce_memory_use`
+%% see: rabbitmq-server-973 and `maybe_execute_gc` function
+-define(DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD, 250).
+-define(EXPLICIT_GC_RUN_OP_THRESHOLD,
+ case get(explicit_gc_run_operation_threshold) of
+ undefined ->
+ Val = rabbit_misc:get_env(rabbit, lazy_queue_explicit_gc_run_operation_threshold,
+ ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD),
+ put(explicit_gc_run_operation_threshold, Val),
+ Val;
+ Val -> Val
+ end).
+
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
@@ -1330,7 +1350,8 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
io_batch_size = IoBatchSize,
- mode = default },
+ mode = default,
+ memory_reduction_run_count = 0},
a(maybe_deltas_to_betas(State)).
blank_rates(Now) ->
@@ -2264,6 +2285,14 @@ ifold(Fun, Acc, Its, State) ->
%% Phase changes
%%----------------------------------------------------------------------------
+maybe_execute_gc(State = #vqstate {memory_reduction_run_count = MRedRunCount}) ->
+ case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD of
+ true -> garbage_collect(),
+ State#vqstate{memory_reduction_run_count = 0};
+ false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1}
+
+ end.
+
reduce_memory_use(State = #vqstate { target_ram_count = infinity }) ->
State;
reduce_memory_use(State = #vqstate {
@@ -2336,8 +2365,7 @@ reduce_memory_use(State = #vqstate {
S2 ->
push_betas_to_deltas(S2, State1)
end,
- garbage_collect(),
- State3.
+ maybe_execute_gc(State3).
limit_ram_acks(0, State) ->
{0, ui(State)};
diff --git a/test/health_check_SUITE.erl b/test/health_check_SUITE.erl
index 50abc97a02..13373d79d4 100644
--- a/test/health_check_SUITE.erl
+++ b/test/health_check_SUITE.erl
@@ -22,6 +22,8 @@
,groups/0
,init_per_suite/1
,end_per_suite/1
+ ,init_per_group/2
+ ,end_per_group/2
,init_per_testcase/2
,end_per_testcase/2
]).
@@ -53,6 +55,10 @@ groups() ->
,ignores_stuck_remote_node_monitor
]}].
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config).
@@ -60,24 +66,30 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
-init_per_testcase(Testcase, Config0) ->
- rabbit_ct_helpers:testcase_started(Config0, Testcase),
- Config1 = rabbit_ct_helpers:set_config(
- Config0, [{rmq_nodes_count, 2},
- {rmq_nodes_clustered, true}]),
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ ClusterSize = 2,
+ TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, ClusterSize},
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}
+ ]),
rabbit_ct_helpers:run_steps(Config1,
- rabbit_ct_broker_helpers:setup_steps() ++
- rabbit_ct_client_helpers:setup_steps()).
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
-end_per_testcase(Testcase, Config0) ->
- Config1 = case rabbit_ct_helpers:get_config(Config0, save_config) of
- undefined -> Config0;
- C -> C
- end,
- Config2 = rabbit_ct_helpers:run_steps(Config1,
- rabbit_ct_client_helpers:teardown_steps() ++
- rabbit_ct_broker_helpers:teardown_steps()),
- rabbit_ct_helpers:testcase_finished(Config2, Testcase).
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
%%----------------------------------------------------------------------------
%% Test cases
@@ -148,8 +160,8 @@ honors_timeout_argument(Config) ->
case timer:tc(rabbit_ct_broker_helpers, rabbitmqctl, [Config, A, ["-t", "5", "node_health_check"]]) of
{TimeSpent, {error, 75, _}} ->
- if TimeSpent < 5000000 -> exit({too_fast, TimeSpent});
- TimeSpent > 7000000 -> exit({too_slow, TimeSpent}); %% +2 seconds for rabbitmqctl overhead
+ if TimeSpent < 5000000 -> exit({too_fast, TimeSpent});
+ TimeSpent > 10000000 -> exit({too_slow, TimeSpent}); %% +5 seconds for rabbitmqctl overhead
true -> ok
end;
{_, Unexpected} ->
diff --git a/test/partitions_SUITE.erl b/test/partitions_SUITE.erl
index aa1c1df24f..e00c015d02 100644
--- a/test/partitions_SUITE.erl
+++ b/test/partitions_SUITE.erl
@@ -29,6 +29,10 @@
%% passes...
-define(DELAY, 8000).
+%% We wait for 5 minutes for nodes to be running/blocked.
+%% It's a lot, but still better than timetrap_timeout
+-define(AWAIT_TIMEOUT, 300000).
+
all() ->
[
{group, net_ticktime_1},
@@ -415,15 +419,17 @@ block(X, Y) ->
allow(X, Y) ->
rabbit_ct_broker_helpers:allow_traffic_between(X, Y).
-await_running (Node, Bool) -> await(Node, Bool, fun is_running/1).
-await_listening (Node, Bool) -> await(Node, Bool, fun is_listening/1).
-await_partitions(Node, Parts) -> await(Node, Parts, fun partitions/1).
+await_running (Node, Bool) -> await(Node, Bool, fun is_running/1, ?AWAIT_TIMEOUT).
+await_listening (Node, Bool) -> await(Node, Bool, fun is_listening/1, ?AWAIT_TIMEOUT).
+await_partitions(Node, Parts) -> await(Node, Parts, fun partitions/1, ?AWAIT_TIMEOUT).
-await(Node, Res, Fun) ->
+await(Node, Res, Fun, Timeout) when Timeout =< 0 ->
+ error({await_timeout, Node, Res, Fun});
+await(Node, Res, Fun, Timeout) ->
case Fun(Node) of
Res -> ok;
_ -> timer:sleep(100),
- await(Node, Res, Fun)
+ await(Node, Res, Fun, Timeout - 100)
end.
is_running(Node) -> rpc:call(Node, rabbit, is_running, []).