diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2016-10-04 16:59:16 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2016-10-04 16:59:16 +0300 |
| commit | f75bc0971ea24efc67ca42a9e3d81de32f6691ca (patch) | |
| tree | e8e385fa8c2ded90763fef8ab5a96beffd0f6d90 | |
| parent | 8b500cd2ef48787bff4208c6abfd2e95ca1e7531 (diff) | |
| parent | a980106a85afd40b9f773feba1d2a0ef626655ef (diff) | |
| download | rabbitmq-server-git-f75bc0971ea24efc67ca42a9e3d81de32f6691ca.tar.gz | |
Merge branch 'master' into rabbitmq-server-486
Conflicts:
src/rabbit.app.src
| -rw-r--r-- | src/pg2_fixed.erl | 399 | ||||
| -rw-r--r-- | src/rabbit.app.src | 5 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 38 | ||||
| -rw-r--r-- | test/health_check_SUITE.erl | 48 | ||||
| -rw-r--r-- | test/partitions_SUITE.erl | 16 |
6 files changed, 87 insertions, 431 deletions
diff --git a/src/pg2_fixed.erl b/src/pg2_fixed.erl deleted file mode 100644 index 73c05819d4..0000000000 --- a/src/pg2_fixed.erl +++ /dev/null @@ -1,399 +0,0 @@ -%% This is the version of pg2 from R14B02, which contains the fix -%% described at -%% http://erlang.2086793.n4.nabble.com/pg2-still-busted-in-R13B04-td2230601.html. -%% The changes are a search-and-replace to rename the module and avoid -%% clashes with other versions of pg2, and also a simple rewrite of -%% "andalso" and "orelse" expressions to case statements where the second -%% operand is not a boolean since R12B does not allow this. - -%% -%% %CopyrightBegin% -%% -%% Copyright Ericsson AB 1997-2010. All Rights Reserved. -%% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. -%% -%% %CopyrightEnd% -%% --module(pg2_fixed). - --export([create/1, delete/1, join/2, leave/2]). --export([get_members/1, get_local_members/1]). --export([get_closest_pid/1, which_groups/0]). --export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2, - terminate/2]). - -%%% As of R13B03 monitors are used instead of links. - -%%% -%%% Exported functions -%%% - --spec start_link() -> {'ok', pid()} | {'error', term()}. - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - --spec start() -> {'ok', pid()} | {'error', term()}. - -start() -> - ensure_started(). - --spec create(term()) -> 'ok'. - -create(Name) -> - _ = ensure_started(), - case ets:member(pg2_fixed_table, {group, Name}) of - false -> - global:trans({{?MODULE, Name}, self()}, - fun() -> - gen_server:multi_call(?MODULE, {create, Name}) - end), - ok; - true -> - ok - end. - --type name() :: term(). - --spec delete(name()) -> 'ok'. - -delete(Name) -> - _ = ensure_started(), - global:trans({{?MODULE, Name}, self()}, - fun() -> - gen_server:multi_call(?MODULE, {delete, Name}) - end), - ok. - --spec join(name(), pid()) -> 'ok' | {'error', {'no_such_group', term()}}. - -join(Name, Pid) when is_pid(Pid) -> - _ = ensure_started(), - case ets:member(pg2_fixed_table, {group, Name}) of - false -> - {error, {no_such_group, Name}}; - true -> - global:trans({{?MODULE, Name}, self()}, - fun() -> - gen_server:multi_call(?MODULE, - {join, Name, Pid}) - end), - ok - end. - --spec leave(name(), pid()) -> 'ok' | {'error', {'no_such_group', name()}}. - -leave(Name, Pid) when is_pid(Pid) -> - _ = ensure_started(), - case ets:member(pg2_fixed_table, {group, Name}) of - false -> - {error, {no_such_group, Name}}; - true -> - global:trans({{?MODULE, Name}, self()}, - fun() -> - gen_server:multi_call(?MODULE, - {leave, Name, Pid}) - end), - ok - end. - --type get_members_ret() :: [pid()] | {'error', {'no_such_group', name()}}. - --spec get_members(name()) -> get_members_ret(). - -get_members(Name) -> - _ = ensure_started(), - case ets:member(pg2_fixed_table, {group, Name}) of - true -> - group_members(Name); - false -> - {error, {no_such_group, Name}} - end. - --spec get_local_members(name()) -> get_members_ret(). - -get_local_members(Name) -> - _ = ensure_started(), - case ets:member(pg2_fixed_table, {group, Name}) of - true -> - local_group_members(Name); - false -> - {error, {no_such_group, Name}} - end. - --spec which_groups() -> [name()]. - -which_groups() -> - _ = ensure_started(), - all_groups(). - --type gcp_error_reason() :: {'no_process', term()} | {'no_such_group', term()}. - --spec get_closest_pid(term()) -> pid() | {'error', gcp_error_reason()}. - -get_closest_pid(Name) -> - case get_local_members(Name) of - [Pid] -> - Pid; - [] -> - case get_members(Name) of - [] -> {error, {no_process, Name}}; - Members -> - X = erlang:system_time(micro_seconds), - lists:nth((X rem length(Members))+1, Members) - end; - Members when is_list(Members) -> - X = erlang:system_time(micro_seconds), - lists:nth((X rem length(Members))+1, Members); - Else -> - Else - end. - -%%% -%%% Callback functions from gen_server -%%% - --record(state, {}). - --spec init([]) -> {'ok', #state{}}. - -init([]) -> - Ns = nodes(), - _ = net_kernel:monitor_nodes(true), - lists:foreach(fun(N) -> - {?MODULE, N} ! {new_pg2_fixed, node()}, - self() ! {nodeup, N} - end, Ns), - pg2_fixed_table = ets:new(pg2_fixed_table, [ordered_set, protected, named_table]), - {ok, #state{}}. - --type call() :: {'create', name()} - | {'delete', name()} - | {'join', name(), pid()} - | {'leave', name(), pid()}. - --spec handle_call(call(), _, #state{}) -> - {'reply', 'ok', #state{}}. - -handle_call({create, Name}, _From, S) -> - assure_group(Name), - {reply, ok, S}; -handle_call({join, Name, Pid}, _From, S) -> - case ets:member(pg2_fixed_table, {group, Name}) of - true -> _ = join_group(Name, Pid), - ok; - _ -> ok - end, - {reply, ok, S}; -handle_call({leave, Name, Pid}, _From, S) -> - case ets:member(pg2_fixed_table, {group, Name}) of - true -> leave_group(Name, Pid); - _ -> ok - end, - {reply, ok, S}; -handle_call({delete, Name}, _From, S) -> - delete_group(Name), - {reply, ok, S}; -handle_call(Request, From, S) -> - error_logger:warning_msg("The pg2_fixed server received an unexpected message:\n" - "handle_call(~p, ~p, _)\n", - [Request, From]), - {noreply, S}. - --type all_members() :: [[name(),...]]. --type cast() :: {'exchange', node(), all_members()} - | {'del_member', name(), pid()}. - --spec handle_cast(cast(), #state{}) -> {'noreply', #state{}}. - -handle_cast({exchange, _Node, List}, S) -> - store(List), - {noreply, S}; -handle_cast(_, S) -> - %% Ignore {del_member, Name, Pid}. - {noreply, S}. - --spec handle_info(tuple(), #state{}) -> {'noreply', #state{}}. - -handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) -> - member_died(MonitorRef), - {noreply, S}; -handle_info({nodeup, Node}, S) -> - gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}), - {noreply, S}; -handle_info({new_pg2_fixed, Node}, S) -> - gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}), - {noreply, S}; -handle_info(_, S) -> - {noreply, S}. - --spec terminate(term(), #state{}) -> 'ok'. - -terminate(_Reason, _S) -> - true = ets:delete(pg2_fixed_table), - ok. - -%%% -%%% Local functions -%%% - -%%% One ETS table, pg2_fixed_table, is used for bookkeeping. The type of the -%%% table is ordered_set, and the fast matching of partially -%%% instantiated keys is used extensively. -%%% -%%% {{group, Name}} -%%% Process group Name. -%%% {{ref, Pid}, RPid, MonitorRef, Counter} -%%% {{ref, MonitorRef}, Pid} -%%% Each process has one monitor. Sometimes a process is spawned to -%%% monitor the pid (RPid). Counter is incremented when the Pid joins -%%% some group. -%%% {{member, Name, Pid}, GroupCounter} -%%% {{local_member, Name, Pid}} -%%% Pid is a member of group Name, GroupCounter is incremented when the -%%% Pid joins the group Name. -%%% {{pid, Pid, Name}} -%%% Pid is a member of group Name. - -store(List) -> - _ = [assure_group(Name) - andalso - [join_group(Name, P) || P <- Members -- group_members(Name)] || - [Name, Members] <- List], - ok. - -assure_group(Name) -> - Key = {group, Name}, - ets:member(pg2_fixed_table, Key) orelse true =:= ets:insert(pg2_fixed_table, {Key}). - -delete_group(Name) -> - _ = [leave_group(Name, Pid) || Pid <- group_members(Name)], - true = ets:delete(pg2_fixed_table, {group, Name}), - ok. - -member_died(Ref) -> - [{{ref, Ref}, Pid}] = ets:lookup(pg2_fixed_table, {ref, Ref}), - Names = member_groups(Pid), - _ = [leave_group(Name, P) || - Name <- Names, - P <- member_in_group(Pid, Name)], - %% Kept for backward compatibility with links. Can be removed, eventually. - _ = [gen_server:abcast(nodes(), ?MODULE, {del_member, Name, Pid}) || - Name <- Names], - ok. - -join_group(Name, Pid) -> - Ref_Pid = {ref, Pid}, - try _ = ets:update_counter(pg2_fixed_table, Ref_Pid, {4, +1}) - catch _:_ -> - {RPid, Ref} = do_monitor(Pid), - true = ets:insert(pg2_fixed_table, {Ref_Pid, RPid, Ref, 1}), - true = ets:insert(pg2_fixed_table, {{ref, Ref}, Pid}) - end, - Member_Name_Pid = {member, Name, Pid}, - try _ = ets:update_counter(pg2_fixed_table, Member_Name_Pid, {2, +1, 1, 1}) - catch _:_ -> - true = ets:insert(pg2_fixed_table, {Member_Name_Pid, 1}), - _ = [ets:insert(pg2_fixed_table, {{local_member, Name, Pid}}) || - node(Pid) =:= node()], - true = ets:insert(pg2_fixed_table, {{pid, Pid, Name}}) - end. - -leave_group(Name, Pid) -> - Member_Name_Pid = {member, Name, Pid}, - try ets:update_counter(pg2_fixed_table, Member_Name_Pid, {2, -1, 0, 0}) of - N -> - if - N =:= 0 -> - true = ets:delete(pg2_fixed_table, {pid, Pid, Name}), - _ = [ets:delete(pg2_fixed_table, {local_member, Name, Pid}) || - node(Pid) =:= node()], - true = ets:delete(pg2_fixed_table, Member_Name_Pid); - true -> - ok - end, - Ref_Pid = {ref, Pid}, - case ets:update_counter(pg2_fixed_table, Ref_Pid, {4, -1}) of - 0 -> - [{Ref_Pid,RPid,Ref,0}] = ets:lookup(pg2_fixed_table, Ref_Pid), - true = ets:delete(pg2_fixed_table, {ref, Ref}), - true = ets:delete(pg2_fixed_table, Ref_Pid), - true = erlang:demonitor(Ref, [flush]), - kill_monitor_proc(RPid, Pid); - _ -> - ok - end - catch _:_ -> - ok - end. - -all_members() -> - [[G, group_members(G)] || G <- all_groups()]. - -group_members(Name) -> - [P || - [P, N] <- ets:match(pg2_fixed_table, {{member, Name, '$1'},'$2'}), - _ <- lists:seq(1, N)]. - -local_group_members(Name) -> - [P || - [Pid] <- ets:match(pg2_fixed_table, {{local_member, Name, '$1'}}), - P <- member_in_group(Pid, Name)]. - -member_in_group(Pid, Name) -> - case ets:lookup(pg2_fixed_table, {member, Name, Pid}) of - [] -> []; - [{{member, Name, Pid}, N}] -> - lists:duplicate(N, Pid) - end. - -member_groups(Pid) -> - [Name || [Name] <- ets:match(pg2_fixed_table, {{pid, Pid, '$1'}})]. - -all_groups() -> - [N || [N] <- ets:match(pg2_fixed_table, {{group,'$1'}})]. - -ensure_started() -> - case whereis(?MODULE) of - undefined -> - C = {pg2_fixed, {?MODULE, start_link, []}, permanent, - 1000, worker, [?MODULE]}, - supervisor:start_child(kernel_safe_sup, C); - Pg2_FixedPid -> - {ok, Pg2_FixedPid} - end. - - -kill_monitor_proc(RPid, Pid) -> - case RPid of - Pid -> ok; - _ -> exit(RPid, kill) - end. - -%% When/if erlang:monitor() returns before trying to connect to the -%% other node this function can be removed. -do_monitor(Pid) -> - case (node(Pid) =:= node()) orelse lists:member(node(Pid), nodes()) of - true -> - %% Assume the node is still up - {Pid, erlang:monitor(process, Pid)}; - false -> - F = fun() -> - Ref = erlang:monitor(process, Pid), - receive - {'DOWN', Ref, process, Pid, _Info} -> - exit(normal) - end - end, - erlang:spawn_monitor(F) - end. diff --git a/src/rabbit.app.src b/src/rabbit.app.src index 1a894c3620..87401662c8 100644 --- a/src/rabbit.app.src +++ b/src/rabbit.app.src @@ -106,5 +106,8 @@ %% see rabbitmq-server#486 {peer_discovery_backend, rabbit_peer_discovery_classic_config}, %% used by rabbit_peer_discovery_classic_config - {cluster_nodes, {[], disc}} + {cluster_nodes, {[], disc}}, + + %% rabbitmq-server-973 + {lazy_queue_explicit_gc_run_operation_threshold, 250} ]}]}. diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index ad1a25a425..d6357e0dc0 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -248,9 +248,15 @@ syncer_loop(Ref, MPid, SPids) -> syncer_loop(Ref, MPid, SPids); {msgs, Ref, Msgs} -> SPids1 = wait_for_credit(SPids), - broadcast(SPids1, {sync_msgs, Ref, Msgs}), - MPid ! {next, Ref}, - syncer_loop(Ref, MPid, SPids1); + case SPids1 of + [] -> + % Die silently because there are no slaves left. + ok; + _ -> + broadcast(SPids1, {sync_msgs, Ref, Msgs}), + MPid ! {next, Ref}, + syncer_loop(Ref, MPid, SPids1) + end; {cancel, Ref} -> %% We don't tell the slaves we will die - so when we do %% they interpret that as a failure, which is what we diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 3267b02bea..03381be311 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -306,7 +306,11 @@ io_batch_size, %% default queue or lazy queue - mode + mode, + %% number of reduce_memory_usage executions, once it + %% reaches a threshold the queue will manually trigger a runtime GC + %% see: maybe_execute_gc/1 + memory_reduction_run_count }). -record(rates, { in, out, ack_in, ack_out, timestamp }). @@ -402,7 +406,8 @@ disk_write_count :: non_neg_integer(), io_batch_size :: pos_integer(), - mode :: 'default' | 'lazy' }. + mode :: 'default' | 'lazy', + memory_reduction_run_count :: non_neg_integer()}. %% Duplicated from rabbit_backing_queue -spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}. @@ -427,6 +432,21 @@ %% rabbit_amqqueue_process need fairly fresh rates. -define(MSGS_PER_RATE_CALC, 100). + +%% we define the garbage collector threshold +%% it needs to tune the GC calls inside `reduce_memory_use` +%% see: rabbitmq-server-973 and `maybe_execute_gc` function +-define(DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD, 250). +-define(EXPLICIT_GC_RUN_OP_THRESHOLD, + case get(explicit_gc_run_operation_threshold) of + undefined -> + Val = rabbit_misc:get_env(rabbit, lazy_queue_explicit_gc_run_operation_threshold, + ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD), + put(explicit_gc_run_operation_threshold, Val), + Val; + Val -> Val + end). + %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -1330,7 +1350,8 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, io_batch_size = IoBatchSize, - mode = default }, + mode = default, + memory_reduction_run_count = 0}, a(maybe_deltas_to_betas(State)). blank_rates(Now) -> @@ -2264,6 +2285,14 @@ ifold(Fun, Acc, Its, State) -> %% Phase changes %%---------------------------------------------------------------------------- +maybe_execute_gc(State = #vqstate {memory_reduction_run_count = MRedRunCount}) -> + case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD of + true -> garbage_collect(), + State#vqstate{memory_reduction_run_count = 0}; + false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1} + + end. + reduce_memory_use(State = #vqstate { target_ram_count = infinity }) -> State; reduce_memory_use(State = #vqstate { @@ -2336,8 +2365,7 @@ reduce_memory_use(State = #vqstate { S2 -> push_betas_to_deltas(S2, State1) end, - garbage_collect(), - State3. + maybe_execute_gc(State3). limit_ram_acks(0, State) -> {0, ui(State)}; diff --git a/test/health_check_SUITE.erl b/test/health_check_SUITE.erl index 50abc97a02..13373d79d4 100644 --- a/test/health_check_SUITE.erl +++ b/test/health_check_SUITE.erl @@ -22,6 +22,8 @@ ,groups/0 ,init_per_suite/1 ,end_per_suite/1 + ,init_per_group/2 + ,end_per_group/2 ,init_per_testcase/2 ,end_per_testcase/2 ]). @@ -53,6 +55,10 @@ groups() -> ,ignores_stuck_remote_node_monitor ]}]. +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + init_per_suite(Config) -> rabbit_ct_helpers:log_environment(), rabbit_ct_helpers:run_setup_steps(Config). @@ -60,24 +66,30 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). -init_per_testcase(Testcase, Config0) -> - rabbit_ct_helpers:testcase_started(Config0, Testcase), - Config1 = rabbit_ct_helpers:set_config( - Config0, [{rmq_nodes_count, 2}, - {rmq_nodes_clustered, true}]), +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + ClusterSize = 2, + TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, ClusterSize}, + {rmq_nodename_suffix, Testcase}, + {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}} + ]), rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()). + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). -end_per_testcase(Testcase, Config0) -> - Config1 = case rabbit_ct_helpers:get_config(Config0, save_config) of - undefined -> Config0; - C -> C - end, - Config2 = rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_client_helpers:teardown_steps() ++ - rabbit_ct_broker_helpers:teardown_steps()), - rabbit_ct_helpers:testcase_finished(Config2, Testcase). +end_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). %%---------------------------------------------------------------------------- %% Test cases @@ -148,8 +160,8 @@ honors_timeout_argument(Config) -> case timer:tc(rabbit_ct_broker_helpers, rabbitmqctl, [Config, A, ["-t", "5", "node_health_check"]]) of {TimeSpent, {error, 75, _}} -> - if TimeSpent < 5000000 -> exit({too_fast, TimeSpent}); - TimeSpent > 7000000 -> exit({too_slow, TimeSpent}); %% +2 seconds for rabbitmqctl overhead + if TimeSpent < 5000000 -> exit({too_fast, TimeSpent}); + TimeSpent > 10000000 -> exit({too_slow, TimeSpent}); %% +5 seconds for rabbitmqctl overhead true -> ok end; {_, Unexpected} -> diff --git a/test/partitions_SUITE.erl b/test/partitions_SUITE.erl index aa1c1df24f..e00c015d02 100644 --- a/test/partitions_SUITE.erl +++ b/test/partitions_SUITE.erl @@ -29,6 +29,10 @@ %% passes... -define(DELAY, 8000). +%% We wait for 5 minutes for nodes to be running/blocked. +%% It's a lot, but still better than timetrap_timeout +-define(AWAIT_TIMEOUT, 300000). + all() -> [ {group, net_ticktime_1}, @@ -415,15 +419,17 @@ block(X, Y) -> allow(X, Y) -> rabbit_ct_broker_helpers:allow_traffic_between(X, Y). -await_running (Node, Bool) -> await(Node, Bool, fun is_running/1). -await_listening (Node, Bool) -> await(Node, Bool, fun is_listening/1). -await_partitions(Node, Parts) -> await(Node, Parts, fun partitions/1). +await_running (Node, Bool) -> await(Node, Bool, fun is_running/1, ?AWAIT_TIMEOUT). +await_listening (Node, Bool) -> await(Node, Bool, fun is_listening/1, ?AWAIT_TIMEOUT). +await_partitions(Node, Parts) -> await(Node, Parts, fun partitions/1, ?AWAIT_TIMEOUT). -await(Node, Res, Fun) -> +await(Node, Res, Fun, Timeout) when Timeout =< 0 -> + error({await_timeout, Node, Res, Fun}); +await(Node, Res, Fun, Timeout) -> case Fun(Node) of Res -> ok; _ -> timer:sleep(100), - await(Node, Res, Fun) + await(Node, Res, Fun, Timeout - 100) end. is_running(Node) -> rpc:call(Node, rabbit, is_running, []). |
