summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2016-10-04 16:59:16 +0300
committerMichael Klishin <michael@clojurewerkz.org>2016-10-04 16:59:16 +0300
commitf75bc0971ea24efc67ca42a9e3d81de32f6691ca (patch)
treee8e385fa8c2ded90763fef8ab5a96beffd0f6d90 /src
parent8b500cd2ef48787bff4208c6abfd2e95ca1e7531 (diff)
parenta980106a85afd40b9f773feba1d2a0ef626655ef (diff)
downloadrabbitmq-server-git-f75bc0971ea24efc67ca42a9e3d81de32f6691ca.tar.gz
Merge branch 'master' into rabbitmq-server-486
Conflicts: src/rabbit.app.src
Diffstat (limited to 'src')
-rw-r--r--src/pg2_fixed.erl399
-rw-r--r--src/rabbit.app.src5
-rw-r--r--src/rabbit_mirror_queue_sync.erl12
-rw-r--r--src/rabbit_variable_queue.erl38
4 files changed, 46 insertions, 408 deletions
diff --git a/src/pg2_fixed.erl b/src/pg2_fixed.erl
deleted file mode 100644
index 73c05819d4..0000000000
--- a/src/pg2_fixed.erl
+++ /dev/null
@@ -1,399 +0,0 @@
-%% This is the version of pg2 from R14B02, which contains the fix
-%% described at
-%% http://erlang.2086793.n4.nabble.com/pg2-still-busted-in-R13B04-td2230601.html.
-%% The changes are a search-and-replace to rename the module and avoid
-%% clashes with other versions of pg2, and also a simple rewrite of
-%% "andalso" and "orelse" expressions to case statements where the second
-%% operand is not a boolean since R12B does not allow this.
-
-%%
-%% %CopyrightBegin%
-%%
-%% Copyright Ericsson AB 1997-2010. All Rights Reserved.
-%%
-%% The contents of this file are subject to the Erlang Public License,
-%% Version 1.1, (the "License"); you may not use this file except in
-%% compliance with the License. You should have received a copy of the
-%% Erlang Public License along with this software. If not, it can be
-%% retrieved online at http://www.erlang.org/.
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% %CopyrightEnd%
-%%
--module(pg2_fixed).
-
--export([create/1, delete/1, join/2, leave/2]).
--export([get_members/1, get_local_members/1]).
--export([get_closest_pid/1, which_groups/0]).
--export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2,
- terminate/2]).
-
-%%% As of R13B03 monitors are used instead of links.
-
-%%%
-%%% Exported functions
-%%%
-
--spec start_link() -> {'ok', pid()} | {'error', term()}.
-
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
--spec start() -> {'ok', pid()} | {'error', term()}.
-
-start() ->
- ensure_started().
-
--spec create(term()) -> 'ok'.
-
-create(Name) ->
- _ = ensure_started(),
- case ets:member(pg2_fixed_table, {group, Name}) of
- false ->
- global:trans({{?MODULE, Name}, self()},
- fun() ->
- gen_server:multi_call(?MODULE, {create, Name})
- end),
- ok;
- true ->
- ok
- end.
-
--type name() :: term().
-
--spec delete(name()) -> 'ok'.
-
-delete(Name) ->
- _ = ensure_started(),
- global:trans({{?MODULE, Name}, self()},
- fun() ->
- gen_server:multi_call(?MODULE, {delete, Name})
- end),
- ok.
-
--spec join(name(), pid()) -> 'ok' | {'error', {'no_such_group', term()}}.
-
-join(Name, Pid) when is_pid(Pid) ->
- _ = ensure_started(),
- case ets:member(pg2_fixed_table, {group, Name}) of
- false ->
- {error, {no_such_group, Name}};
- true ->
- global:trans({{?MODULE, Name}, self()},
- fun() ->
- gen_server:multi_call(?MODULE,
- {join, Name, Pid})
- end),
- ok
- end.
-
--spec leave(name(), pid()) -> 'ok' | {'error', {'no_such_group', name()}}.
-
-leave(Name, Pid) when is_pid(Pid) ->
- _ = ensure_started(),
- case ets:member(pg2_fixed_table, {group, Name}) of
- false ->
- {error, {no_such_group, Name}};
- true ->
- global:trans({{?MODULE, Name}, self()},
- fun() ->
- gen_server:multi_call(?MODULE,
- {leave, Name, Pid})
- end),
- ok
- end.
-
--type get_members_ret() :: [pid()] | {'error', {'no_such_group', name()}}.
-
--spec get_members(name()) -> get_members_ret().
-
-get_members(Name) ->
- _ = ensure_started(),
- case ets:member(pg2_fixed_table, {group, Name}) of
- true ->
- group_members(Name);
- false ->
- {error, {no_such_group, Name}}
- end.
-
--spec get_local_members(name()) -> get_members_ret().
-
-get_local_members(Name) ->
- _ = ensure_started(),
- case ets:member(pg2_fixed_table, {group, Name}) of
- true ->
- local_group_members(Name);
- false ->
- {error, {no_such_group, Name}}
- end.
-
--spec which_groups() -> [name()].
-
-which_groups() ->
- _ = ensure_started(),
- all_groups().
-
--type gcp_error_reason() :: {'no_process', term()} | {'no_such_group', term()}.
-
--spec get_closest_pid(term()) -> pid() | {'error', gcp_error_reason()}.
-
-get_closest_pid(Name) ->
- case get_local_members(Name) of
- [Pid] ->
- Pid;
- [] ->
- case get_members(Name) of
- [] -> {error, {no_process, Name}};
- Members ->
- X = erlang:system_time(micro_seconds),
- lists:nth((X rem length(Members))+1, Members)
- end;
- Members when is_list(Members) ->
- X = erlang:system_time(micro_seconds),
- lists:nth((X rem length(Members))+1, Members);
- Else ->
- Else
- end.
-
-%%%
-%%% Callback functions from gen_server
-%%%
-
--record(state, {}).
-
--spec init([]) -> {'ok', #state{}}.
-
-init([]) ->
- Ns = nodes(),
- _ = net_kernel:monitor_nodes(true),
- lists:foreach(fun(N) ->
- {?MODULE, N} ! {new_pg2_fixed, node()},
- self() ! {nodeup, N}
- end, Ns),
- pg2_fixed_table = ets:new(pg2_fixed_table, [ordered_set, protected, named_table]),
- {ok, #state{}}.
-
--type call() :: {'create', name()}
- | {'delete', name()}
- | {'join', name(), pid()}
- | {'leave', name(), pid()}.
-
--spec handle_call(call(), _, #state{}) ->
- {'reply', 'ok', #state{}}.
-
-handle_call({create, Name}, _From, S) ->
- assure_group(Name),
- {reply, ok, S};
-handle_call({join, Name, Pid}, _From, S) ->
- case ets:member(pg2_fixed_table, {group, Name}) of
- true -> _ = join_group(Name, Pid),
- ok;
- _ -> ok
- end,
- {reply, ok, S};
-handle_call({leave, Name, Pid}, _From, S) ->
- case ets:member(pg2_fixed_table, {group, Name}) of
- true -> leave_group(Name, Pid);
- _ -> ok
- end,
- {reply, ok, S};
-handle_call({delete, Name}, _From, S) ->
- delete_group(Name),
- {reply, ok, S};
-handle_call(Request, From, S) ->
- error_logger:warning_msg("The pg2_fixed server received an unexpected message:\n"
- "handle_call(~p, ~p, _)\n",
- [Request, From]),
- {noreply, S}.
-
--type all_members() :: [[name(),...]].
--type cast() :: {'exchange', node(), all_members()}
- | {'del_member', name(), pid()}.
-
--spec handle_cast(cast(), #state{}) -> {'noreply', #state{}}.
-
-handle_cast({exchange, _Node, List}, S) ->
- store(List),
- {noreply, S};
-handle_cast(_, S) ->
- %% Ignore {del_member, Name, Pid}.
- {noreply, S}.
-
--spec handle_info(tuple(), #state{}) -> {'noreply', #state{}}.
-
-handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) ->
- member_died(MonitorRef),
- {noreply, S};
-handle_info({nodeup, Node}, S) ->
- gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
- {noreply, S};
-handle_info({new_pg2_fixed, Node}, S) ->
- gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
- {noreply, S};
-handle_info(_, S) ->
- {noreply, S}.
-
--spec terminate(term(), #state{}) -> 'ok'.
-
-terminate(_Reason, _S) ->
- true = ets:delete(pg2_fixed_table),
- ok.
-
-%%%
-%%% Local functions
-%%%
-
-%%% One ETS table, pg2_fixed_table, is used for bookkeeping. The type of the
-%%% table is ordered_set, and the fast matching of partially
-%%% instantiated keys is used extensively.
-%%%
-%%% {{group, Name}}
-%%% Process group Name.
-%%% {{ref, Pid}, RPid, MonitorRef, Counter}
-%%% {{ref, MonitorRef}, Pid}
-%%% Each process has one monitor. Sometimes a process is spawned to
-%%% monitor the pid (RPid). Counter is incremented when the Pid joins
-%%% some group.
-%%% {{member, Name, Pid}, GroupCounter}
-%%% {{local_member, Name, Pid}}
-%%% Pid is a member of group Name, GroupCounter is incremented when the
-%%% Pid joins the group Name.
-%%% {{pid, Pid, Name}}
-%%% Pid is a member of group Name.
-
-store(List) ->
- _ = [assure_group(Name)
- andalso
- [join_group(Name, P) || P <- Members -- group_members(Name)] ||
- [Name, Members] <- List],
- ok.
-
-assure_group(Name) ->
- Key = {group, Name},
- ets:member(pg2_fixed_table, Key) orelse true =:= ets:insert(pg2_fixed_table, {Key}).
-
-delete_group(Name) ->
- _ = [leave_group(Name, Pid) || Pid <- group_members(Name)],
- true = ets:delete(pg2_fixed_table, {group, Name}),
- ok.
-
-member_died(Ref) ->
- [{{ref, Ref}, Pid}] = ets:lookup(pg2_fixed_table, {ref, Ref}),
- Names = member_groups(Pid),
- _ = [leave_group(Name, P) ||
- Name <- Names,
- P <- member_in_group(Pid, Name)],
- %% Kept for backward compatibility with links. Can be removed, eventually.
- _ = [gen_server:abcast(nodes(), ?MODULE, {del_member, Name, Pid}) ||
- Name <- Names],
- ok.
-
-join_group(Name, Pid) ->
- Ref_Pid = {ref, Pid},
- try _ = ets:update_counter(pg2_fixed_table, Ref_Pid, {4, +1})
- catch _:_ ->
- {RPid, Ref} = do_monitor(Pid),
- true = ets:insert(pg2_fixed_table, {Ref_Pid, RPid, Ref, 1}),
- true = ets:insert(pg2_fixed_table, {{ref, Ref}, Pid})
- end,
- Member_Name_Pid = {member, Name, Pid},
- try _ = ets:update_counter(pg2_fixed_table, Member_Name_Pid, {2, +1, 1, 1})
- catch _:_ ->
- true = ets:insert(pg2_fixed_table, {Member_Name_Pid, 1}),
- _ = [ets:insert(pg2_fixed_table, {{local_member, Name, Pid}}) ||
- node(Pid) =:= node()],
- true = ets:insert(pg2_fixed_table, {{pid, Pid, Name}})
- end.
-
-leave_group(Name, Pid) ->
- Member_Name_Pid = {member, Name, Pid},
- try ets:update_counter(pg2_fixed_table, Member_Name_Pid, {2, -1, 0, 0}) of
- N ->
- if
- N =:= 0 ->
- true = ets:delete(pg2_fixed_table, {pid, Pid, Name}),
- _ = [ets:delete(pg2_fixed_table, {local_member, Name, Pid}) ||
- node(Pid) =:= node()],
- true = ets:delete(pg2_fixed_table, Member_Name_Pid);
- true ->
- ok
- end,
- Ref_Pid = {ref, Pid},
- case ets:update_counter(pg2_fixed_table, Ref_Pid, {4, -1}) of
- 0 ->
- [{Ref_Pid,RPid,Ref,0}] = ets:lookup(pg2_fixed_table, Ref_Pid),
- true = ets:delete(pg2_fixed_table, {ref, Ref}),
- true = ets:delete(pg2_fixed_table, Ref_Pid),
- true = erlang:demonitor(Ref, [flush]),
- kill_monitor_proc(RPid, Pid);
- _ ->
- ok
- end
- catch _:_ ->
- ok
- end.
-
-all_members() ->
- [[G, group_members(G)] || G <- all_groups()].
-
-group_members(Name) ->
- [P ||
- [P, N] <- ets:match(pg2_fixed_table, {{member, Name, '$1'},'$2'}),
- _ <- lists:seq(1, N)].
-
-local_group_members(Name) ->
- [P ||
- [Pid] <- ets:match(pg2_fixed_table, {{local_member, Name, '$1'}}),
- P <- member_in_group(Pid, Name)].
-
-member_in_group(Pid, Name) ->
- case ets:lookup(pg2_fixed_table, {member, Name, Pid}) of
- [] -> [];
- [{{member, Name, Pid}, N}] ->
- lists:duplicate(N, Pid)
- end.
-
-member_groups(Pid) ->
- [Name || [Name] <- ets:match(pg2_fixed_table, {{pid, Pid, '$1'}})].
-
-all_groups() ->
- [N || [N] <- ets:match(pg2_fixed_table, {{group,'$1'}})].
-
-ensure_started() ->
- case whereis(?MODULE) of
- undefined ->
- C = {pg2_fixed, {?MODULE, start_link, []}, permanent,
- 1000, worker, [?MODULE]},
- supervisor:start_child(kernel_safe_sup, C);
- Pg2_FixedPid ->
- {ok, Pg2_FixedPid}
- end.
-
-
-kill_monitor_proc(RPid, Pid) ->
- case RPid of
- Pid -> ok;
- _ -> exit(RPid, kill)
- end.
-
-%% When/if erlang:monitor() returns before trying to connect to the
-%% other node this function can be removed.
-do_monitor(Pid) ->
- case (node(Pid) =:= node()) orelse lists:member(node(Pid), nodes()) of
- true ->
- %% Assume the node is still up
- {Pid, erlang:monitor(process, Pid)};
- false ->
- F = fun() ->
- Ref = erlang:monitor(process, Pid),
- receive
- {'DOWN', Ref, process, Pid, _Info} ->
- exit(normal)
- end
- end,
- erlang:spawn_monitor(F)
- end.
diff --git a/src/rabbit.app.src b/src/rabbit.app.src
index 1a894c3620..87401662c8 100644
--- a/src/rabbit.app.src
+++ b/src/rabbit.app.src
@@ -106,5 +106,8 @@
%% see rabbitmq-server#486
{peer_discovery_backend, rabbit_peer_discovery_classic_config},
%% used by rabbit_peer_discovery_classic_config
- {cluster_nodes, {[], disc}}
+ {cluster_nodes, {[], disc}},
+
+ %% rabbitmq-server-973
+ {lazy_queue_explicit_gc_run_operation_threshold, 250}
]}]}.
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index ad1a25a425..d6357e0dc0 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -248,9 +248,15 @@ syncer_loop(Ref, MPid, SPids) ->
syncer_loop(Ref, MPid, SPids);
{msgs, Ref, Msgs} ->
SPids1 = wait_for_credit(SPids),
- broadcast(SPids1, {sync_msgs, Ref, Msgs}),
- MPid ! {next, Ref},
- syncer_loop(Ref, MPid, SPids1);
+ case SPids1 of
+ [] ->
+ % Die silently because there are no slaves left.
+ ok;
+ _ ->
+ broadcast(SPids1, {sync_msgs, Ref, Msgs}),
+ MPid ! {next, Ref},
+ syncer_loop(Ref, MPid, SPids1)
+ end;
{cancel, Ref} ->
%% We don't tell the slaves we will die - so when we do
%% they interpret that as a failure, which is what we
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 3267b02bea..03381be311 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -306,7 +306,11 @@
io_batch_size,
%% default queue or lazy queue
- mode
+ mode,
+ %% number of reduce_memory_usage executions, once it
+ %% reaches a threshold the queue will manually trigger a runtime GC
+ %% see: maybe_execute_gc/1
+ memory_reduction_run_count
}).
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -402,7 +406,8 @@
disk_write_count :: non_neg_integer(),
io_batch_size :: pos_integer(),
- mode :: 'default' | 'lazy' }.
+ mode :: 'default' | 'lazy',
+ memory_reduction_run_count :: non_neg_integer()}.
%% Duplicated from rabbit_backing_queue
-spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}.
@@ -427,6 +432,21 @@
%% rabbit_amqqueue_process need fairly fresh rates.
-define(MSGS_PER_RATE_CALC, 100).
+
+%% we define the garbage collector threshold
+%% it needs to tune the GC calls inside `reduce_memory_use`
+%% see: rabbitmq-server-973 and `maybe_execute_gc` function
+-define(DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD, 250).
+-define(EXPLICIT_GC_RUN_OP_THRESHOLD,
+ case get(explicit_gc_run_operation_threshold) of
+ undefined ->
+ Val = rabbit_misc:get_env(rabbit, lazy_queue_explicit_gc_run_operation_threshold,
+ ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD),
+ put(explicit_gc_run_operation_threshold, Val),
+ Val;
+ Val -> Val
+ end).
+
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
@@ -1330,7 +1350,8 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
io_batch_size = IoBatchSize,
- mode = default },
+ mode = default,
+ memory_reduction_run_count = 0},
a(maybe_deltas_to_betas(State)).
blank_rates(Now) ->
@@ -2264,6 +2285,14 @@ ifold(Fun, Acc, Its, State) ->
%% Phase changes
%%----------------------------------------------------------------------------
+maybe_execute_gc(State = #vqstate {memory_reduction_run_count = MRedRunCount}) ->
+ case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD of
+ true -> garbage_collect(),
+ State#vqstate{memory_reduction_run_count = 0};
+ false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1}
+
+ end.
+
reduce_memory_use(State = #vqstate { target_ram_count = infinity }) ->
State;
reduce_memory_use(State = #vqstate {
@@ -2336,8 +2365,7 @@ reduce_memory_use(State = #vqstate {
S2 ->
push_betas_to_deltas(S2, State1)
end,
- garbage_collect(),
- State3.
+ maybe_execute_gc(State3).
limit_ram_acks(0, State) ->
{0, ui(State)};