summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2017-07-25 19:50:18 +0300
committerMichael Klishin <michael@clojurewerkz.org>2017-07-25 19:50:18 +0300
commit481dd67c6c068d1e3740c6b48fb87c161f483e67 (patch)
tree9ed4e208dfec1a761727c39b23290214a811e0af /src
parent88b516541b865a20fb1639bc95864bb0a7438c24 (diff)
parent82fb30be1aaf736265822d42c0bd0b38bb48a9ee (diff)
downloadrabbitmq-server-git-481dd67c6c068d1e3740c6b48fb87c161f483e67.tar.gz
Merge branch 'stable'
Diffstat (limited to 'src')
-rw-r--r--src/gm.erl34
1 files changed, 24 insertions, 10 deletions
diff --git a/src/gm.erl b/src/gm.erl
index f67050affb..0da190a57d 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -395,9 +395,8 @@
-define(GROUP_TABLE, gm_group).
-define(MAX_BUFFER_SIZE, 100000000). %% 100MB
--define(HIBERNATE_AFTER_MIN, 1000).
--define(DESIRED_HIBERNATE, 10000).
-define(BROADCAST_TIMER, 25).
+-define(FORCE_GC_TIMER, 250).
-define(VERSION_START, 0).
-define(SETS, ordsets).
-define(DICT, orddict).
@@ -416,6 +415,7 @@
broadcast_buffer,
broadcast_buffer_sz,
broadcast_timer,
+ force_gc_timer,
txn_executor,
shutting_down
}).
@@ -508,7 +508,8 @@ table_definitions() ->
[{Name, [?TABLE_MATCH | Attributes]}].
start_link(GroupName, Module, Args, TxnFun) ->
- gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []).
+ gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun],
+ [{spawn_opt, [{fullsweep_after, 0}]}]).
leave(Server) ->
gen_server2:cast(Server, leave).
@@ -551,9 +552,9 @@ init([GroupName, Module, Args, TxnFun]) ->
broadcast_buffer = [],
broadcast_buffer_sz = 0,
broadcast_timer = undefined,
+ force_gc_timer = undefined,
txn_executor = TxnFun,
- shutting_down = false }, hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+ shutting_down = false }}.
handle_call({confirmed_broadcast, _Msg}, _From,
@@ -708,6 +709,10 @@ handle_cast(leave, State) ->
{stop, normal, State}.
+handle_info(force_gc, State) ->
+ garbage_collect(),
+ noreply(State #state { force_gc_timer = undefined });
+
handle_info(flush, State) ->
noreply(
flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
@@ -883,14 +888,24 @@ handle_msg({activity, _NotLeft, _Activity}, State) ->
noreply(State) ->
- {noreply, ensure_broadcast_timer(State), flush_timeout(State)}.
+ {noreply, ensure_timers(State), flush_timeout(State)}.
reply(Reply, State) ->
- {reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}.
+ {reply, Reply, ensure_timers(State), flush_timeout(State)}.
+
+ensure_timers(State) ->
+ ensure_force_gc_timer(ensure_broadcast_timer(State)).
-flush_timeout(#state{broadcast_buffer = []}) -> hibernate;
+flush_timeout(#state{broadcast_buffer = []}) -> infinity;
flush_timeout(_) -> 0.
+ensure_force_gc_timer(State = #state { force_gc_timer = TRef })
+ when is_reference(TRef) ->
+ State;
+ensure_force_gc_timer(State = #state { force_gc_timer = undefined }) ->
+ TRef = erlang:send_after(?FORCE_GC_TIMER, self(), force_gc),
+ State #state { force_gc_timer = TRef }.
+
ensure_broadcast_timer(State = #state { broadcast_buffer = [],
broadcast_timer = undefined }) ->
State;
@@ -958,8 +973,7 @@ flush_broadcast_buffer(State = #state { self = Self,
end, Self, MembersState),
State #state { members_state = MembersState1,
broadcast_buffer = [],
- broadcast_buffer_sz = 0}.
-
+ broadcast_buffer_sz = 0 }.
%% ---------------------------------------------------------------------------
%% View construction and inspection