summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl14
-rw-r--r--src/rabbit_mirror_queue_slave.erl3
-rw-r--r--src/rabbit_variable_queue.erl74
3 files changed, 68 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index cdadb5b1d4..7982a2fc2d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -985,9 +985,9 @@ prioritise_cast(Msg, _Len, _State) ->
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
- {ack, _AckTags, _ChPid} -> 3; %% [1]
- {resume, _ChPid} -> 2;
- {notify_sent, _ChPid, _Credit} -> 1;
+ {ack, _AckTags, _ChPid} -> 4; %% [1]
+ {resume, _ChPid} -> 3;
+ {notify_sent, _ChPid, _Credit} -> 2;
_ -> 0
end.
@@ -999,6 +999,9 @@ prioritise_cast(Msg, _Len, _State) ->
%% stack are optimised for that) and to make things easier to reason
%% about. Finally, we prioritise ack over resume since it should
%% always reduce memory use.
+%% bump_reduce_memory_use is prioritised over publishes, because sending
+%% credit to self is hard to reason about. Consumers can continue while
+%% reduce_memory_use is in progress.
prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
case Msg of
@@ -1008,6 +1011,7 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
{drop_expired, _Version} -> 8;
emit_stats -> 7;
sync_timeout -> 6;
+ bump_reduce_memory_use -> 1;
_ -> 0
end.
@@ -1382,6 +1386,10 @@ handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ,
%% rabbit_variable_queue:msg_store_write/4.
credit_flow:handle_bump_msg(Msg),
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
+handle_info(bump_reduce_memory_use, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ put(waiting_bump, false),
+ noreply(State#q{backing_queue_state = BQ:resume(BQS)});
handle_info(Info, State) ->
{stop, {unhandled_info, Info}, State}.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 6017e5a028..008d8ecf2c 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -348,6 +348,9 @@ handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
noreply(State);
+handle_info(bump_reduce_memory_use, State) ->
+ noreply(State);
+
%% In the event of a short partition during sync we can detect the
%% master's 'death', drop out of sync, and then receive sync messages
%% which were still in flight. Ignore them.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index d487286774..43bc0e78a8 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -2363,45 +2363,79 @@ reduce_memory_use(State = #vqstate {
out = AvgEgress,
ack_in = AvgAckIngress,
ack_out = AvgAckEgress } }) ->
- State1 = #vqstate { q2 = Q2, q3 = Q3 } =
+ {CreditDiscBound, _} =rabbit_misc:get_env(rabbit,
+ msg_store_credit_disc_bound,
+ ?CREDIT_DISC_BOUND),
+ {NeedResumeA2B, State1} = {_, #vqstate { q2 = Q2, q3 = Q3 }} =
case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
- 0 -> State;
+ 0 -> {false, State};
%% Reduce memory of pending acks and alphas. The order is
%% determined based on which is growing faster. Whichever
%% comes second may very well get a quota of 0 if the
%% first manages to push out the max number of messages.
- S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) >
+ A2BChunk ->
+ %% In case there are few messages to be sent to a message store
+ %% and many messages to be embedded to the queue index,
+ %% we should limit the number of messages to be flushed
+ %% to avoid blocking the process.
+ A2BChunkActual = case A2BChunk > CreditDiscBound * 2 of
+ true -> CreditDiscBound * 2;
+ false -> A2BChunk
+ end,
+ Funs = case ((AvgAckIngress - AvgAckEgress) >
(AvgIngress - AvgEgress)) of
true -> [fun limit_ram_acks/2,
fun push_alphas_to_betas/2];
false -> [fun push_alphas_to_betas/2,
fun limit_ram_acks/2]
end,
- {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
+ {Quota, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
ReduceFun(QuotaN, StateN)
- end, {S1, State}, Funs),
- State2
+ end, {A2BChunkActual, State}, Funs),
+ {(Quota == 0) andalso (A2BChunk > A2BChunkActual), State2}
end,
-
- State3 =
+ Permitted = permitted_beta_count(State1),
+ {NeedResumeB2D, State3} =
%% If there are more messages with their queue position held in RAM,
%% a.k.a. betas, in Q2 & Q3 than IoBatchSize,
%% write their queue position to disk, a.k.a. push_betas_to_deltas
case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
- permitted_beta_count(State1)) of
- S2 when S2 >= IoBatchSize ->
- %% There is an implicit, but subtle, upper bound here. We
- %% may shuffle a lot of messages from Q2/3 into delta, but
- %% the number of these that require any disk operation,
- %% namely index writing, i.e. messages that are genuine
- %% betas and not gammas, is bounded by the credit_flow
- %% limiting of the alpha->beta conversion above.
- push_betas_to_deltas(S2, State1);
+ Permitted) of
+ B2DChunk when B2DChunk >= IoBatchSize ->
+ %% Same as for alphas to betas. Limit a number of messages
+ %% to be flushed to disk at once to avoid blocking the process.
+ B2DChunkActual = case B2DChunk > CreditDiscBound * 2 of
+ true -> CreditDiscBound * 2;
+ false -> B2DChunk
+ end,
+ StateBD = push_betas_to_deltas(B2DChunkActual, State1),
+ {B2DChunk > B2DChunkActual, StateBD};
_ ->
- State1
+ {false, State1}
end,
- %% See rabbitmq-server-290 for the reasons behind this GC call.
- garbage_collect(),
+ %% We can be blocked by the credit flow, or limited by a batch size,
+ %% or finished with flushing.
+ %% If blocked by the credit flow - the credit grant will resume processing,
+ %% if limited by a batch - the batch continuation message should be sent.
+ %% The continuation message will be prioritised over publishes,
+ %% but not cinsumptions, so the queue can make progess.
+ Blocked = credit_flow:blocked(),
+ case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of
+ %% Credit bump will continue paging
+ {true, _} -> ok;
+ %% Finished with paging
+ {false, false} -> ok;
+ %% Planning next batch
+ {false, true} ->
+ %% We don't want to use self-credit-flow, because it's harder to
+ %% reason about. So the process sends a (prioritised) message to
+ %% itself and sets a waiting_bump value to keep the message box clean
+ case get(waiting_bump) of
+ true -> ok;
+ _ -> self() ! bump_reduce_memory_use,
+ put(waiting_bump, true)
+ end
+ end,
State3;
%% When using lazy queues, there are no alphas, so we don't need to
%% call push_alphas_to_betas/2.