diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-07 11:40:24 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-07 11:40:24 +0100 |
| commit | 397519aed69328d69afb51200215e4899d0fde86 (patch) | |
| tree | a69f4c5cec434b7ad082bcdb732aca6a7b9957e5 /src | |
| parent | 4f71eaade3615e794f25d58a239b0acde0ad950d (diff) | |
| download | rabbitmq-server-git-397519aed69328d69afb51200215e4899d0fde86.tar.gz | |
unify the phase change predicate and phase change operation
so that the logic for determining the necessity of phase changes is
kept in one place and cannot diverge.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 113 |
1 files changed, 70 insertions, 43 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index d9f22c5a4d..f19fcee53a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -636,14 +636,12 @@ ram_duration(State = #vqstate { egress_rate = Egress, needs_idle_timeout(#vqstate { on_sync = {_, _, [_|_]}}) -> true; -needs_idle_timeout(State = #vqstate { target_ram_msg_count = TargetRamMsgCount, - ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount}) -> - case reduction(RamMsgCount, TargetRamMsgCount) of - 0 -> Permitted = permitted_ram_index_count(State), - reduction(RamIndexCount, Permitted) == ?IO_BATCH_SIZE; - _ -> true - end. +needs_idle_timeout(State) -> + {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end, + fun (_Quota, State1) -> State1 end, + fun (State1) -> State1 end, + State), + Res. idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))). @@ -1114,43 +1112,67 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, %% Phase changes %%---------------------------------------------------------------------------- -reduce_memory_use(State = #vqstate { - ram_msg_count = RamMsgCount, - target_ram_msg_count = TargetRamMsgCount }) -> - Reduction = reduction(RamMsgCount, TargetRamMsgCount), - { Reduction1, State1} = maybe_push_q1_to_betas(Reduction, State), - {_Reduction2, State2} = maybe_push_q4_to_betas(Reduction1, State1), - case TargetRamMsgCount of - infinity -> State2; - 0 -> push_betas_to_deltas(State2); - _ -> limit_ram_index(State2) +%% Determine whether a reduction in memory use is necessary, and call +%% functions to perform the required phase changes. The function can +%% also be used to just do the former, by passing in dummy phase +%% change functions. +%% +%% The function does not report on any needed beta->delta conversions, +%% though the conversion function for that is called as necessary. The +%% reason is twofold. Firstly, this is safe because the conversion is +%% only ever necessary just after a transition to a +%% target_ram_msg_count of zero or after an incremental alpha->beta +%% conversion. In the former case the conversion is performed straight +%% away (i.e. any betas present at the time are converted to deltas), +%% and in the latter case the need for a conversion is flagged up +%% anyway. Secondly, this is necessary because we do not have a +%% precise and cheap predicate for determining whether a beta->delta +%% conversion is necessary - due to the complexities of retaining up +%% one segment's worth of messages in q3 - and thus would risk +%% perpetually reporting the need for a conversion when no such +%% conversion is needed. That in turn could cause an infinite loop. +reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> + {Reduce, State1} = case chunk_size(State #vqstate.ram_msg_count, + State #vqstate.target_ram_msg_count) of + 0 -> {false, State}; + S1 -> {true, AlphaBetaFun(S1, State)} + end, + case State1 #vqstate.target_ram_msg_count of + infinity -> {Reduce, State1}; + 0 -> {Reduce, BetaDeltaFun(State1)}; + _ -> case chunk_size(State1 #vqstate.ram_index_count, + permitted_ram_index_count(State1)) of + ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)}; + _ -> {Reduce, State1} + end end. -limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) -> - Reduction = reduction(RamIndexCount, permitted_ram_index_count(State)), - case Reduction of - ?IO_BATCH_SIZE -> - #vqstate { q2 = Q2, q3 = Q3, index_state = IndexState } = State, - {Q2a, {Reduction1, IndexState1}} = - limit_ram_index(fun bpqueue:map_fold_filter_l/4, - Q2, {Reduction, IndexState}), - %% TODO: we shouldn't be writing index entries for - %% messages that can never end up in delta due them - %% residing in the only segment held by q3. - {Q3a, {Reduction2, IndexState2}} = - limit_ram_index(fun bpqueue:map_fold_filter_r/4, - Q3, {Reduction1, IndexState1}), - RamIndexCount1 = RamIndexCount - (Reduction - Reduction2), - State #vqstate { q2 = Q2a, q3 = Q3a, - index_state = IndexState2, - ram_index_count = RamIndexCount1 }; - _ -> - State - end. +reduce_memory_use(State) -> + {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, + fun limit_ram_index/2, + fun push_betas_to_deltas/1, + State), + State1. + +limit_ram_index(Quota, State = #vqstate { q2 = Q2, q3 = Q3, + index_state = IndexState, + ram_index_count = RamIndexCount }) -> + {Q2a, {Quota1, IndexState1}} = limit_ram_index( + fun bpqueue:map_fold_filter_l/4, + Q2, {Quota, IndexState}), + %% TODO: we shouldn't be writing index entries for messages that + %% can never end up in delta due them residing in the only segment + %% held by q3. + {Q3a, {Quota2, IndexState2}} = limit_ram_index( + fun bpqueue:map_fold_filter_r/4, + Q3, {Quota1, IndexState1}), + State #vqstate { q2 = Q2a, q3 = Q3a, + index_state = IndexState2, + ram_index_count = RamIndexCount - (Quota - Quota2) }. limit_ram_index(_MapFoldFilterFun, Q, {0, IndexState}) -> {Q, {0, IndexState}}; -limit_ram_index(MapFoldFilterFun, Q, {Reduction, IndexState}) -> +limit_ram_index(MapFoldFilterFun, Q, {Quota, IndexState}) -> MapFoldFilterFun( fun erlang:'not'/1, fun (MsgStatus, {0, _IndexStateN}) -> @@ -1161,7 +1183,7 @@ limit_ram_index(MapFoldFilterFun, Q, {Reduction, IndexState}) -> {MsgStatus1, IndexStateN1} = maybe_write_index_to_disk(true, MsgStatus, IndexStateN), {true, MsgStatus1, {N-1, IndexStateN1}} - end, {Reduction, IndexState}, Q). + end, {Quota, IndexState}, Q). permitted_ram_index_count(#vqstate { len = 0 }) -> infinity; @@ -1172,10 +1194,10 @@ permitted_ram_index_count(#vqstate { len = Len, BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3), BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)). -reduction(Current, Permitted) +chunk_size(Current, Permitted) when Permitted =:= infinity orelse Permitted >= Current -> 0; -reduction(Current, Permitted) -> +chunk_size(Current, Permitted) -> lists:min([Current - Permitted, ?IO_BATCH_SIZE]). maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) -> @@ -1228,6 +1250,11 @@ maybe_deltas_to_betas(State = #vqstate { end end. +push_alphas_to_betas(Quota, State) -> + { Quota1, State1} = maybe_push_q1_to_betas(Quota, State), + {_Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1), + State2. + maybe_push_q1_to_betas(0, State) -> {0, State}; maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) -> |
