summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-07-07 11:40:24 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-07-07 11:40:24 +0100
commit397519aed69328d69afb51200215e4899d0fde86 (patch)
treea69f4c5cec434b7ad082bcdb732aca6a7b9957e5
parent4f71eaade3615e794f25d58a239b0acde0ad950d (diff)
downloadrabbitmq-server-git-397519aed69328d69afb51200215e4899d0fde86.tar.gz
unify the phase change predicate and phase change operation
so that the logic for determining the necessity of phase changes is kept in one place and cannot diverge.
-rw-r--r--src/rabbit_variable_queue.erl113
1 files changed, 70 insertions, 43 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index d9f22c5a4d..f19fcee53a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -636,14 +636,12 @@ ram_duration(State = #vqstate { egress_rate = Egress,
needs_idle_timeout(#vqstate { on_sync = {_, _, [_|_]}}) ->
true;
-needs_idle_timeout(State = #vqstate { target_ram_msg_count = TargetRamMsgCount,
- ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount}) ->
- case reduction(RamMsgCount, TargetRamMsgCount) of
- 0 -> Permitted = permitted_ram_index_count(State),
- reduction(RamIndexCount, Permitted) == ?IO_BATCH_SIZE;
- _ -> true
- end.
+needs_idle_timeout(State) ->
+ {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end,
+ fun (_Quota, State1) -> State1 end,
+ fun (State1) -> State1 end,
+ State),
+ Res.
idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))).
@@ -1114,43 +1112,67 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
%% Phase changes
%%----------------------------------------------------------------------------
-reduce_memory_use(State = #vqstate {
- ram_msg_count = RamMsgCount,
- target_ram_msg_count = TargetRamMsgCount }) ->
- Reduction = reduction(RamMsgCount, TargetRamMsgCount),
- { Reduction1, State1} = maybe_push_q1_to_betas(Reduction, State),
- {_Reduction2, State2} = maybe_push_q4_to_betas(Reduction1, State1),
- case TargetRamMsgCount of
- infinity -> State2;
- 0 -> push_betas_to_deltas(State2);
- _ -> limit_ram_index(State2)
+%% Determine whether a reduction in memory use is necessary, and call
+%% functions to perform the required phase changes. The function can
+%% also be used to just do the former, by passing in dummy phase
+%% change functions.
+%%
+%% The function does not report on any needed beta->delta conversions,
+%% though the conversion function for that is called as necessary. The
+%% reason is twofold. Firstly, this is safe because the conversion is
+%% only ever necessary just after a transition to a
+%% target_ram_msg_count of zero or after an incremental alpha->beta
+%% conversion. In the former case the conversion is performed straight
+%% away (i.e. any betas present at the time are converted to deltas),
+%% and in the latter case the need for a conversion is flagged up
+%% anyway. Secondly, this is necessary because we do not have a
+%% precise and cheap predicate for determining whether a beta->delta
+%% conversion is necessary - due to the complexities of retaining up
+%% one segment's worth of messages in q3 - and thus would risk
+%% perpetually reporting the need for a conversion when no such
+%% conversion is needed. That in turn could cause an infinite loop.
+reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) ->
+ {Reduce, State1} = case chunk_size(State #vqstate.ram_msg_count,
+ State #vqstate.target_ram_msg_count) of
+ 0 -> {false, State};
+ S1 -> {true, AlphaBetaFun(S1, State)}
+ end,
+ case State1 #vqstate.target_ram_msg_count of
+ infinity -> {Reduce, State1};
+ 0 -> {Reduce, BetaDeltaFun(State1)};
+ _ -> case chunk_size(State1 #vqstate.ram_index_count,
+ permitted_ram_index_count(State1)) of
+ ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)};
+ _ -> {Reduce, State1}
+ end
end.
-limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) ->
- Reduction = reduction(RamIndexCount, permitted_ram_index_count(State)),
- case Reduction of
- ?IO_BATCH_SIZE ->
- #vqstate { q2 = Q2, q3 = Q3, index_state = IndexState } = State,
- {Q2a, {Reduction1, IndexState1}} =
- limit_ram_index(fun bpqueue:map_fold_filter_l/4,
- Q2, {Reduction, IndexState}),
- %% TODO: we shouldn't be writing index entries for
- %% messages that can never end up in delta due them
- %% residing in the only segment held by q3.
- {Q3a, {Reduction2, IndexState2}} =
- limit_ram_index(fun bpqueue:map_fold_filter_r/4,
- Q3, {Reduction1, IndexState1}),
- RamIndexCount1 = RamIndexCount - (Reduction - Reduction2),
- State #vqstate { q2 = Q2a, q3 = Q3a,
- index_state = IndexState2,
- ram_index_count = RamIndexCount1 };
- _ ->
- State
- end.
+reduce_memory_use(State) ->
+ {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
+ fun limit_ram_index/2,
+ fun push_betas_to_deltas/1,
+ State),
+ State1.
+
+limit_ram_index(Quota, State = #vqstate { q2 = Q2, q3 = Q3,
+ index_state = IndexState,
+ ram_index_count = RamIndexCount }) ->
+ {Q2a, {Quota1, IndexState1}} = limit_ram_index(
+ fun bpqueue:map_fold_filter_l/4,
+ Q2, {Quota, IndexState}),
+ %% TODO: we shouldn't be writing index entries for messages that
+ %% can never end up in delta due them residing in the only segment
+ %% held by q3.
+ {Q3a, {Quota2, IndexState2}} = limit_ram_index(
+ fun bpqueue:map_fold_filter_r/4,
+ Q3, {Quota1, IndexState1}),
+ State #vqstate { q2 = Q2a, q3 = Q3a,
+ index_state = IndexState2,
+ ram_index_count = RamIndexCount - (Quota - Quota2) }.
limit_ram_index(_MapFoldFilterFun, Q, {0, IndexState}) ->
{Q, {0, IndexState}};
-limit_ram_index(MapFoldFilterFun, Q, {Reduction, IndexState}) ->
+limit_ram_index(MapFoldFilterFun, Q, {Quota, IndexState}) ->
MapFoldFilterFun(
fun erlang:'not'/1,
fun (MsgStatus, {0, _IndexStateN}) ->
@@ -1161,7 +1183,7 @@ limit_ram_index(MapFoldFilterFun, Q, {Reduction, IndexState}) ->
{MsgStatus1, IndexStateN1} =
maybe_write_index_to_disk(true, MsgStatus, IndexStateN),
{true, MsgStatus1, {N-1, IndexStateN1}}
- end, {Reduction, IndexState}, Q).
+ end, {Quota, IndexState}, Q).
permitted_ram_index_count(#vqstate { len = 0 }) ->
infinity;
@@ -1172,10 +1194,10 @@ permitted_ram_index_count(#vqstate { len = Len,
BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3),
BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)).
-reduction(Current, Permitted)
+chunk_size(Current, Permitted)
when Permitted =:= infinity orelse Permitted >= Current ->
0;
-reduction(Current, Permitted) ->
+chunk_size(Current, Permitted) ->
lists:min([Current - Permitted, ?IO_BATCH_SIZE]).
maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) ->
@@ -1228,6 +1250,11 @@ maybe_deltas_to_betas(State = #vqstate {
end
end.
+push_alphas_to_betas(Quota, State) ->
+ { Quota1, State1} = maybe_push_q1_to_betas(Quota, State),
+ {_Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1),
+ State2.
+
maybe_push_q1_to_betas(0, State) ->
{0, State};
maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) ->