diff options
| author | Michael Klishin <michael@novemberain.com> | 2017-07-25 17:04:46 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2017-07-25 17:04:46 +0300 |
| commit | 82fb30be1aaf736265822d42c0bd0b38bb48a9ee (patch) | |
| tree | 157baa4205955a2e0472455aab5e1b47203a71e5 /src | |
| parent | 46eb2e540332ffd727e513e2e12ba25d88737faa (diff) | |
| parent | 7d0e49c6391fce19d651ad4658923d4b3d415aa6 (diff) | |
| download | rabbitmq-server-git-82fb30be1aaf736265822d42c0bd0b38bb48a9ee.tar.gz | |
Merge pull request #1302 from rabbitmq/stable-gm-mem-usage-during-constant-redeliveryrabbitmq_v3_6_11_milestone5
Stable GM memory usage during constant redelivery
Diffstat (limited to 'src')
| -rw-r--r-- | src/gm.erl | 34 |
1 files changed, 24 insertions, 10 deletions
diff --git a/src/gm.erl b/src/gm.erl index 7604b9e197..cf3e217010 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -395,9 +395,8 @@ -define(GROUP_TABLE, gm_group). -define(MAX_BUFFER_SIZE, 100000000). %% 100MB --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). -define(BROADCAST_TIMER, 25). +-define(FORCE_GC_TIMER, 250). -define(VERSION_START, 0). -define(SETS, ordsets). -define(DICT, orddict). @@ -416,6 +415,7 @@ broadcast_buffer, broadcast_buffer_sz, broadcast_timer, + force_gc_timer, txn_executor, shutting_down }). @@ -508,7 +508,8 @@ table_definitions() -> [{Name, [?TABLE_MATCH | Attributes]}]. start_link(GroupName, Module, Args, TxnFun) -> - gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []). + gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], + [{spawn_opt, [{fullsweep_after, 0}]}]). leave(Server) -> gen_server2:cast(Server, leave). @@ -551,9 +552,9 @@ init([GroupName, Module, Args, TxnFun]) -> broadcast_buffer = [], broadcast_buffer_sz = 0, broadcast_timer = undefined, + force_gc_timer = undefined, txn_executor = TxnFun, - shutting_down = false }, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + shutting_down = false }}. handle_call({confirmed_broadcast, _Msg}, _From, @@ -708,6 +709,10 @@ handle_cast(leave, State) -> {stop, normal, State}. +handle_info(force_gc, State) -> + garbage_collect(), + noreply(State #state { force_gc_timer = undefined }); + handle_info(flush, State) -> noreply( flush_broadcast_buffer(State #state { broadcast_timer = undefined })); @@ -883,14 +888,24 @@ handle_msg({activity, _NotLeft, _Activity}, State) -> noreply(State) -> - {noreply, ensure_broadcast_timer(State), flush_timeout(State)}. + {noreply, ensure_timers(State), flush_timeout(State)}. reply(Reply, State) -> - {reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}. + {reply, Reply, ensure_timers(State), flush_timeout(State)}. + +ensure_timers(State) -> + ensure_force_gc_timer(ensure_broadcast_timer(State)). -flush_timeout(#state{broadcast_buffer = []}) -> hibernate; +flush_timeout(#state{broadcast_buffer = []}) -> infinity; flush_timeout(_) -> 0. +ensure_force_gc_timer(State = #state { force_gc_timer = TRef }) + when is_reference(TRef) -> + State; +ensure_force_gc_timer(State = #state { force_gc_timer = undefined }) -> + TRef = erlang:send_after(?FORCE_GC_TIMER, self(), force_gc), + State #state { force_gc_timer = TRef }. + ensure_broadcast_timer(State = #state { broadcast_buffer = [], broadcast_timer = undefined }) -> State; @@ -958,8 +973,7 @@ flush_broadcast_buffer(State = #state { self = Self, end, Self, MembersState), State #state { members_state = MembersState1, broadcast_buffer = [], - broadcast_buffer_sz = 0}. - + broadcast_buffer_sz = 0 }. %% --------------------------------------------------------------------------- %% View construction and inspection |
