diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 74 |
3 files changed, 68 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index cdadb5b1d4..7982a2fc2d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -985,9 +985,9 @@ prioritise_cast(Msg, _Len, _State) -> {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; {run_backing_queue, _Mod, _Fun} -> 6; - {ack, _AckTags, _ChPid} -> 3; %% [1] - {resume, _ChPid} -> 2; - {notify_sent, _ChPid, _Credit} -> 1; + {ack, _AckTags, _ChPid} -> 4; %% [1] + {resume, _ChPid} -> 3; + {notify_sent, _ChPid, _Credit} -> 2; _ -> 0 end. @@ -999,6 +999,9 @@ prioritise_cast(Msg, _Len, _State) -> %% stack are optimised for that) and to make things easier to reason %% about. Finally, we prioritise ack over resume since it should %% always reduce memory use. +%% bump_reduce_memory_use is prioritised over publishes, because sending +%% credit to self is hard to reason about. Consumers can continue while +%% reduce_memory_use is in progress. prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> case Msg of @@ -1008,6 +1011,7 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> {drop_expired, _Version} -> 8; emit_stats -> 7; sync_timeout -> 6; + bump_reduce_memory_use -> 1; _ -> 0 end. @@ -1382,6 +1386,10 @@ handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ, %% rabbit_variable_queue:msg_store_write/4. credit_flow:handle_bump_msg(Msg), noreply(State#q{backing_queue_state = BQ:resume(BQS)}); +handle_info(bump_reduce_memory_use, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + put(waiting_bump, false), + noreply(State#q{backing_queue_state = BQ:resume(BQS)}); handle_info(Info, State) -> {stop, {unhandled_info, Info}, State}. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 6017e5a028..008d8ecf2c 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -348,6 +348,9 @@ handle_info({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), noreply(State); +handle_info(bump_reduce_memory_use, State) -> + noreply(State); + %% In the event of a short partition during sync we can detect the %% master's 'death', drop out of sync, and then receive sync messages %% which were still in flight. Ignore them. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index d487286774..43bc0e78a8 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -2363,45 +2363,79 @@ reduce_memory_use(State = #vqstate { out = AvgEgress, ack_in = AvgAckIngress, ack_out = AvgAckEgress } }) -> - State1 = #vqstate { q2 = Q2, q3 = Q3 } = + {CreditDiscBound, _} =rabbit_misc:get_env(rabbit, + msg_store_credit_disc_bound, + ?CREDIT_DISC_BOUND), + {NeedResumeA2B, State1} = {_, #vqstate { q2 = Q2, q3 = Q3 }} = case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of - 0 -> State; + 0 -> {false, State}; %% Reduce memory of pending acks and alphas. The order is %% determined based on which is growing faster. Whichever %% comes second may very well get a quota of 0 if the %% first manages to push out the max number of messages. - S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) > + A2BChunk -> + %% In case there are few messages to be sent to a message store + %% and many messages to be embedded to the queue index, + %% we should limit the number of messages to be flushed + %% to avoid blocking the process. + A2BChunkActual = case A2BChunk > CreditDiscBound * 2 of + true -> CreditDiscBound * 2; + false -> A2BChunk + end, + Funs = case ((AvgAckIngress - AvgAckEgress) > (AvgIngress - AvgEgress)) of true -> [fun limit_ram_acks/2, fun push_alphas_to_betas/2]; false -> [fun push_alphas_to_betas/2, fun limit_ram_acks/2] end, - {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> + {Quota, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> ReduceFun(QuotaN, StateN) - end, {S1, State}, Funs), - State2 + end, {A2BChunkActual, State}, Funs), + {(Quota == 0) andalso (A2BChunk > A2BChunkActual), State2} end, - - State3 = + Permitted = permitted_beta_count(State1), + {NeedResumeB2D, State3} = %% If there are more messages with their queue position held in RAM, %% a.k.a. betas, in Q2 & Q3 than IoBatchSize, %% write their queue position to disk, a.k.a. push_betas_to_deltas case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), - permitted_beta_count(State1)) of - S2 when S2 >= IoBatchSize -> - %% There is an implicit, but subtle, upper bound here. We - %% may shuffle a lot of messages from Q2/3 into delta, but - %% the number of these that require any disk operation, - %% namely index writing, i.e. messages that are genuine - %% betas and not gammas, is bounded by the credit_flow - %% limiting of the alpha->beta conversion above. - push_betas_to_deltas(S2, State1); + Permitted) of + B2DChunk when B2DChunk >= IoBatchSize -> + %% Same as for alphas to betas. Limit a number of messages + %% to be flushed to disk at once to avoid blocking the process. + B2DChunkActual = case B2DChunk > CreditDiscBound * 2 of + true -> CreditDiscBound * 2; + false -> B2DChunk + end, + StateBD = push_betas_to_deltas(B2DChunkActual, State1), + {B2DChunk > B2DChunkActual, StateBD}; _ -> - State1 + {false, State1} end, - %% See rabbitmq-server-290 for the reasons behind this GC call. - garbage_collect(), + %% We can be blocked by the credit flow, or limited by a batch size, + %% or finished with flushing. + %% If blocked by the credit flow - the credit grant will resume processing, + %% if limited by a batch - the batch continuation message should be sent. + %% The continuation message will be prioritised over publishes, + %% but not cinsumptions, so the queue can make progess. + Blocked = credit_flow:blocked(), + case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of + %% Credit bump will continue paging + {true, _} -> ok; + %% Finished with paging + {false, false} -> ok; + %% Planning next batch + {false, true} -> + %% We don't want to use self-credit-flow, because it's harder to + %% reason about. So the process sends a (prioritised) message to + %% itself and sets a waiting_bump value to keep the message box clean + case get(waiting_bump) of + true -> ok; + _ -> self() ! bump_reduce_memory_use, + put(waiting_bump, true) + end + end, State3; %% When using lazy queues, there are no alphas, so we don't need to %% call push_alphas_to_betas/2. |
