diff options
| author | Gerhard Lazu <gerhard@users.noreply.github.com> | 2020-03-16 17:10:09 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-03-16 17:10:09 +0000 |
| commit | 5239852216100dad2658458575948758e35b3fa4 (patch) | |
| tree | 7ff5df57a7a21c7ed8fe75c0233a62cc0a2d39b8 | |
| parent | a0ba0ad9578e8a5617554f58844a63e00c20aefe (diff) | |
| parent | f446bd26a4f9918e23310d1afc22a4175aff5993 (diff) | |
| download | rabbitmq-server-git-5239852216100dad2658458575948758e35b3fa4.tar.gz | |
Merge pull request #2272 from rabbitmq/rabbit-fifo-force-gc
rabbit_fifo: force gc when queue is empty
| -rw-r--r-- | src/rabbit_fifo.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_fifo.hrl | 5 |
2 files changed, 37 insertions, 7 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 0f7adfb88d..940ed0d999 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -646,26 +646,51 @@ get_checked_out(Cid, From, To, #?MODULE{consumers = Consumers}) -> [] end. +-record(aux_gc, {last_raft_idx = 0 :: ra:index()}). +-record(aux, {name :: atom(), + utilisation :: term(), + gc = #aux_gc{} :: #aux_gc{}}). + init_aux(Name) when is_atom(Name) -> %% TODO: catch specific exception throw if table already exists ok = ra_machine_ets:create_table(rabbit_fifo_usage, [named_table, set, public, {write_concurrency, true}]), Now = erlang:monotonic_time(micro_seconds), - {Name, {inactive, Now, 1, 1.0}}. + #aux{name = Name, + utilisation = {inactive, Now, 1, 1.0}}. -handle_aux(_, cast, Cmd, {Name, Use0}, Log, _) -> - Use = case Cmd of +handle_aux(_RaState, cast, Cmd, #aux{name = Name, + utilisation = Use0} = State0, + Log, MacState) -> + State = case Cmd of _ when Cmd == active orelse Cmd == inactive -> - update_use(Use0, Cmd); + State0#aux{utilisation = update_use(Use0, Cmd)}; tick -> true = ets:insert(rabbit_fifo_usage, {Name, utilisation(Use0)}), - Use0; + eval_gc(Log, MacState, State0); eval -> - Use0 + State0 end, - {no_reply, {Name, Use}, Log}. + {no_reply, State, Log}. + +eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}} = MacState, + #aux{gc = #aux_gc{last_raft_idx = LastGcIdx} = Gc} = AuxState) -> + {Idx, _} = ra_log:last_index_term(Log), + {memory, Mem} = erlang:process_info(self(), memory), + case messages_total(MacState) of + 0 when Idx > LastGcIdx andalso + Mem > ?GC_MEM_LIMIT_B -> + garbage_collect(), + {memory, MemAfter} = erlang:process_info(self(), memory), + rabbit_log:debug("~s: full GC sweep complete. " + "Process memory reduced from ~.2fMB to ~.2fMB.", + [rabbit_misc:rs(QR), Mem/?MB, MemAfter/?MB]), + AuxState#aux{gc = Gc#aux_gc{last_raft_idx = Idx}}; + _ -> + AuxState + end. %%% Queries diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index 2a8899d593..ebbaa9e1eb 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -70,6 +70,11 @@ -define(RELEASE_CURSOR_EVERY, 64000). -define(RELEASE_CURSOR_EVERY_MAX, 3200000). -define(USE_AVG_HALF_LIFE, 10000.0). +%% an average QQ without any message uses about 100KB so setting this limit +%% to ~10 times that should be relatively safe. +-define(GC_MEM_LIMIT_B, 2000000). + +-define(MB, 1048576). -record(consumer, {meta = #{} :: consumer_meta(), |
