summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2017-10-10 17:59:53 +0100
committerGerhard Lazu <gerhard@lazu.co.uk>2017-10-18 17:35:48 +0100
commitf1de592af35903d542ee387bcc7d7a6d592728db (patch)
tree6bfa2396a93cec8480fce792d1bc599835fedfa6 /src
parent8d8246b6ac249956125d082de057ae8f74725f0f (diff)
downloadrabbitmq-server-git-f1de592af35903d542ee387bcc7d7a6d592728db.tar.gz
Flush messages to disk in batches.
If messages should be embedded to a queue index, there will be no credit flow limit, so message batches can be too big and block the queue process. Limiting the batch size allows consumer to make progress while publishers are blocked by the paging-out process. [#151614048]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl14
-rw-r--r--src/rabbit_mirror_queue_slave.erl3
-rw-r--r--src/rabbit_variable_queue.erl74
3 files changed, 68 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index cdadb5b1d4..7982a2fc2d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -985,9 +985,9 @@ prioritise_cast(Msg, _Len, _State) ->
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
- {ack, _AckTags, _ChPid} -> 3; %% [1]
- {resume, _ChPid} -> 2;
- {notify_sent, _ChPid, _Credit} -> 1;
+ {ack, _AckTags, _ChPid} -> 4; %% [1]
+ {resume, _ChPid} -> 3;
+ {notify_sent, _ChPid, _Credit} -> 2;
_ -> 0
end.
@@ -999,6 +999,9 @@ prioritise_cast(Msg, _Len, _State) ->
%% stack are optimised for that) and to make things easier to reason
%% about. Finally, we prioritise ack over resume since it should
%% always reduce memory use.
+%% bump_reduce_memory_use is prioritised over publishes, because sending
+%% credit to self is hard to reason about. Consumers can continue while
+%% reduce_memory_use is in progress.
prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
case Msg of
@@ -1008,6 +1011,7 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
{drop_expired, _Version} -> 8;
emit_stats -> 7;
sync_timeout -> 6;
+ bump_reduce_memory_use -> 1;
_ -> 0
end.
@@ -1382,6 +1386,10 @@ handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ,
%% rabbit_variable_queue:msg_store_write/4.
credit_flow:handle_bump_msg(Msg),
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
+handle_info(bump_reduce_memory_use, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ put(waiting_bump, false),
+ noreply(State#q{backing_queue_state = BQ:resume(BQS)});
handle_info(Info, State) ->
{stop, {unhandled_info, Info}, State}.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 6017e5a028..008d8ecf2c 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -348,6 +348,9 @@ handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
noreply(State);
+handle_info(bump_reduce_memory_use, State) ->
+ noreply(State);
+
%% In the event of a short partition during sync we can detect the
%% master's 'death', drop out of sync, and then receive sync messages
%% which were still in flight. Ignore them.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index d487286774..e26a8acfa7 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -2363,45 +2363,79 @@ reduce_memory_use(State = #vqstate {
out = AvgEgress,
ack_in = AvgAckIngress,
ack_out = AvgAckEgress } }) ->
- State1 = #vqstate { q2 = Q2, q3 = Q3 } =
+ {CreditDiscBound, _} =rabbit_misc:get_env(rabbit,
+ msg_store_credit_disc_bound,
+ ?CREDIT_DISC_BOUND),
+ {NeedResumeA2B, State1} = {_, #vqstate { q2 = Q2, q3 = Q3 }} =
case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
- 0 -> State;
+ 0 -> {false, State};
%% Reduce memory of pending acks and alphas. The order is
%% determined based on which is growing faster. Whichever
%% comes second may very well get a quota of 0 if the
%% first manages to push out the max number of messages.
- S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) >
+ A2BChunk ->
+ %% In case there are few messages to be sent to a message store
+ %% and many messages to be embedded to the queue index,
+ %% we should limit the number of messages to be flushed
+ %% to avoid blocking the process.
+ A2BChunkActual = case A2BChunk > CreditDiscBound * 2 of
+ true -> CreditDiscBound * 2;
+ false -> A2BChunk
+ end,
+ Funs = case ((AvgAckIngress - AvgAckEgress) >
(AvgIngress - AvgEgress)) of
true -> [fun limit_ram_acks/2,
fun push_alphas_to_betas/2];
false -> [fun push_alphas_to_betas/2,
fun limit_ram_acks/2]
end,
- {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
+ {Quota, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
ReduceFun(QuotaN, StateN)
- end, {S1, State}, Funs),
- State2
+ end, {A2BChunkActual, State}, Funs),
+ {(Quota == 0) andalso (A2BChunk > A2BChunkActual), State2}
end,
-
- State3 =
+ Permitted = permitted_beta_count(State1),
+ {NeedResumeB2D, State3} =
%% If there are more messages with their queue position held in RAM,
%% a.k.a. betas, in Q2 & Q3 than IoBatchSize,
%% write their queue position to disk, a.k.a. push_betas_to_deltas
case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
- permitted_beta_count(State1)) of
- S2 when S2 >= IoBatchSize ->
- %% There is an implicit, but subtle, upper bound here. We
- %% may shuffle a lot of messages from Q2/3 into delta, but
- %% the number of these that require any disk operation,
- %% namely index writing, i.e. messages that are genuine
- %% betas and not gammas, is bounded by the credit_flow
- %% limiting of the alpha->beta conversion above.
- push_betas_to_deltas(S2, State1);
+ Permitted) of
+ B2DChunk when B2DChunk >= IoBatchSize ->
+ %% Same as for alphas to betas. Limit a number of messages
+ %% to be flushed to disk at once to avoid blocking the process.
+ B2DChunkActual = case B2DChunk > CreditDiscBound * 2 of
+ true -> CreditDiscBound * 2;
+ false -> B2DChunk
+ end,
+ StateBD = push_betas_to_deltas(B2DChunkActual, State1),
+ {B2DChunk > B2DChunkActual, StateBD};
_ ->
- State1
+ {false, State1}
end,
- %% See rabbitmq-server-290 for the reasons behind this GC call.
- garbage_collect(),
+ %% We can be blocked by the credit flow, or limited by a batch size,
+ %% or finished with flushing.
+ %% If blocked by the credit flow - the credit grant will resume processing,
+ %% if limited by a batch - the batch continuation message should be sent.
+ %% The continuation message will be prioritised over publishes,
+ %% but not cinsumptions, so the queue can make progess.
+ Blocked = credit_flow:blocked(),
+ case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of
+ %% Credit bump will continue paging
+ {true, _} -> ok;
+ %% Finished with paging
+ {false, false} -> ok;
+ %% Planning next batch
+ {false, true} ->
+ %% We don't want to use self-credit-flow, because it's harder to
+ %% reason about. So the process sends a (prioritised) message to
+ %% itself and sets a waiting_bump value to keep the message box clean
+ case get(waiting_bump) of
+ true -> ok;
+ _ -> self() ! bump_reduce_memory_use,
+ put(waiting_bump, waiting)
+ end
+ end,
State3;
%% When using lazy queues, there are no alphas, so we don't need to
%% call push_alphas_to_betas/2.