diff options
| -rw-r--r-- | src/rabbit.app.src | 4 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 38 |
2 files changed, 36 insertions, 6 deletions
diff --git a/src/rabbit.app.src b/src/rabbit.app.src index 872336bd8e..217aad593e 100644 --- a/src/rabbit.app.src +++ b/src/rabbit.app.src @@ -99,5 +99,7 @@ {credit_flow_default_credit, {200, 100}}, %% see rabbitmq-server#248 %% and rabbitmq-server#667 - {channel_operation_timeout, 15000} + {channel_operation_timeout, 15000}, + %% rabbitmq-server-973 + {lazy_queue_explicit_gc_run_operation_threshold, 250} ]}]}. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 297df086ad..3ce889fba5 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -306,7 +306,11 @@ io_batch_size, %% default queue or lazy queue - mode + mode, + %% number of reduce_memory_usage executions, once it + %% reaches a threshold the queue will manually trigger a runtime GC + %% see: maybe_execute_gc/1 + memory_reduction_run_count }). -record(rates, { in, out, ack_in, ack_out, timestamp }). @@ -402,7 +406,8 @@ disk_write_count :: non_neg_integer(), io_batch_size :: pos_integer(), - mode :: 'default' | 'lazy' }. + mode :: 'default' | 'lazy', + memory_reduction_run_count :: non_neg_integer()}. %% Duplicated from rabbit_backing_queue -spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}. @@ -427,6 +432,21 @@ %% rabbit_amqqueue_process need fairly fresh rates. -define(MSGS_PER_RATE_CALC, 100). + +%% we define the garbage collector threshold +%% it needs to tune the GC calls inside `reduce_memory_use` +%% see: rabbitmq-server-973 and `maybe_execute_gc` function +-define(DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD, 250). +-define(EXPLICIT_GC_RUN_OP_THRESHOLD, + case get(explicit_gc_run_operation_threshold) of + undefined -> + Val = rabbit_misc:get_env(rabbit, lazy_queue_explicit_gc_run_operation_threshold, + ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD), + put(explicit_gc_run_operation_threshold, Val), + Val; + Val -> Val + end). + %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -1330,7 +1350,8 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, io_batch_size = IoBatchSize, - mode = default }, + mode = default, + memory_reduction_run_count = 0}, a(maybe_deltas_to_betas(State)). blank_rates(Now) -> @@ -2264,6 +2285,14 @@ ifold(Fun, Acc, Its, State) -> %% Phase changes %%---------------------------------------------------------------------------- +maybe_execute_gc(State = #vqstate {memory_reduction_run_count = MRedRunCount}) -> + case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD of + true -> garbage_collect(), + State#vqstate{memory_reduction_run_count = 0}; + false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1} + + end. + reduce_memory_use(State = #vqstate { target_ram_count = infinity }) -> State; reduce_memory_use(State = #vqstate { @@ -2336,8 +2365,7 @@ reduce_memory_use(State = #vqstate { S2 -> push_betas_to_deltas(S2, State1) end, - garbage_collect(), - State3. + maybe_execute_gc(State3). limit_ram_acks(0, State) -> {0, ui(State)}; |
