summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGerhard Lazu <gerhard@users.noreply.github.com>2020-03-16 17:10:09 +0000
committerGitHub <noreply@github.com>2020-03-16 17:10:09 +0000
commit5239852216100dad2658458575948758e35b3fa4 (patch)
tree7ff5df57a7a21c7ed8fe75c0233a62cc0a2d39b8 /src
parenta0ba0ad9578e8a5617554f58844a63e00c20aefe (diff)
parentf446bd26a4f9918e23310d1afc22a4175aff5993 (diff)
downloadrabbitmq-server-git-5239852216100dad2658458575948758e35b3fa4.tar.gz
Merge pull request #2272 from rabbitmq/rabbit-fifo-force-gc
rabbit_fifo: force gc when queue is empty
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl39
-rw-r--r--src/rabbit_fifo.hrl5
2 files changed, 37 insertions, 7 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 0f7adfb88d..940ed0d999 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -646,26 +646,51 @@ get_checked_out(Cid, From, To, #?MODULE{consumers = Consumers}) ->
[]
end.
+-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
+-record(aux, {name :: atom(),
+ utilisation :: term(),
+ gc = #aux_gc{} :: #aux_gc{}}).
+
init_aux(Name) when is_atom(Name) ->
%% TODO: catch specific exception throw if table already exists
ok = ra_machine_ets:create_table(rabbit_fifo_usage,
[named_table, set, public,
{write_concurrency, true}]),
Now = erlang:monotonic_time(micro_seconds),
- {Name, {inactive, Now, 1, 1.0}}.
+ #aux{name = Name,
+ utilisation = {inactive, Now, 1, 1.0}}.
-handle_aux(_, cast, Cmd, {Name, Use0}, Log, _) ->
- Use = case Cmd of
+handle_aux(_RaState, cast, Cmd, #aux{name = Name,
+ utilisation = Use0} = State0,
+ Log, MacState) ->
+ State = case Cmd of
_ when Cmd == active orelse Cmd == inactive ->
- update_use(Use0, Cmd);
+ State0#aux{utilisation = update_use(Use0, Cmd)};
tick ->
true = ets:insert(rabbit_fifo_usage,
{Name, utilisation(Use0)}),
- Use0;
+ eval_gc(Log, MacState, State0);
eval ->
- Use0
+ State0
end,
- {no_reply, {Name, Use}, Log}.
+ {no_reply, State, Log}.
+
+eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}} = MacState,
+ #aux{gc = #aux_gc{last_raft_idx = LastGcIdx} = Gc} = AuxState) ->
+ {Idx, _} = ra_log:last_index_term(Log),
+ {memory, Mem} = erlang:process_info(self(), memory),
+ case messages_total(MacState) of
+ 0 when Idx > LastGcIdx andalso
+ Mem > ?GC_MEM_LIMIT_B ->
+ garbage_collect(),
+ {memory, MemAfter} = erlang:process_info(self(), memory),
+ rabbit_log:debug("~s: full GC sweep complete. "
+ "Process memory reduced from ~.2fMB to ~.2fMB.",
+ [rabbit_misc:rs(QR), Mem/?MB, MemAfter/?MB]),
+ AuxState#aux{gc = Gc#aux_gc{last_raft_idx = Idx}};
+ _ ->
+ AuxState
+ end.
%%% Queries
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index 2a8899d593..ebbaa9e1eb 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -70,6 +70,11 @@
-define(RELEASE_CURSOR_EVERY, 64000).
-define(RELEASE_CURSOR_EVERY_MAX, 3200000).
-define(USE_AVG_HALF_LIFE, 10000.0).
+%% an average QQ without any message uses about 100KB so setting this limit
+%% to ~10 times that should be relatively safe.
+-define(GC_MEM_LIMIT_B, 2000000).
+
+-define(MB, 1048576).
-record(consumer,
{meta = #{} :: consumer_meta(),