diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-07 11:40:24 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-07 11:40:24 +0100 |
| commit | 397519aed69328d69afb51200215e4899d0fde86 (patch) | |
| tree | a69f4c5cec434b7ad082bcdb732aca6a7b9957e5 | |
| parent | 4f71eaade3615e794f25d58a239b0acde0ad950d (diff) | |
| download | rabbitmq-server-git-397519aed69328d69afb51200215e4899d0fde86.tar.gz | |
unify the phase change predicate and phase change operation
so that the logic for determining the necessity of phase changes is
kept in one place and cannot diverge.
| -rw-r--r-- | src/rabbit_variable_queue.erl | 113 |
1 files changed, 70 insertions, 43 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index d9f22c5a4d..f19fcee53a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -636,14 +636,12 @@ ram_duration(State = #vqstate { egress_rate = Egress, needs_idle_timeout(#vqstate { on_sync = {_, _, [_|_]}}) -> true; -needs_idle_timeout(State = #vqstate { target_ram_msg_count = TargetRamMsgCount, - ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount}) -> - case reduction(RamMsgCount, TargetRamMsgCount) of - 0 -> Permitted = permitted_ram_index_count(State), - reduction(RamIndexCount, Permitted) == ?IO_BATCH_SIZE; - _ -> true - end. +needs_idle_timeout(State) -> + {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end, + fun (_Quota, State1) -> State1 end, + fun (State1) -> State1 end, + State), + Res. idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))). @@ -1114,43 +1112,67 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, %% Phase changes %%---------------------------------------------------------------------------- -reduce_memory_use(State = #vqstate { - ram_msg_count = RamMsgCount, - target_ram_msg_count = TargetRamMsgCount }) -> - Reduction = reduction(RamMsgCount, TargetRamMsgCount), - { Reduction1, State1} = maybe_push_q1_to_betas(Reduction, State), - {_Reduction2, State2} = maybe_push_q4_to_betas(Reduction1, State1), - case TargetRamMsgCount of - infinity -> State2; - 0 -> push_betas_to_deltas(State2); - _ -> limit_ram_index(State2) +%% Determine whether a reduction in memory use is necessary, and call +%% functions to perform the required phase changes. The function can +%% also be used to just do the former, by passing in dummy phase +%% change functions. +%% +%% The function does not report on any needed beta->delta conversions, +%% though the conversion function for that is called as necessary. The +%% reason is twofold. Firstly, this is safe because the conversion is +%% only ever necessary just after a transition to a +%% target_ram_msg_count of zero or after an incremental alpha->beta +%% conversion. In the former case the conversion is performed straight +%% away (i.e. any betas present at the time are converted to deltas), +%% and in the latter case the need for a conversion is flagged up +%% anyway. Secondly, this is necessary because we do not have a +%% precise and cheap predicate for determining whether a beta->delta +%% conversion is necessary - due to the complexities of retaining up +%% one segment's worth of messages in q3 - and thus would risk +%% perpetually reporting the need for a conversion when no such +%% conversion is needed. That in turn could cause an infinite loop. +reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> + {Reduce, State1} = case chunk_size(State #vqstate.ram_msg_count, + State #vqstate.target_ram_msg_count) of + 0 -> {false, State}; + S1 -> {true, AlphaBetaFun(S1, State)} + end, + case State1 #vqstate.target_ram_msg_count of + infinity -> {Reduce, State1}; + 0 -> {Reduce, BetaDeltaFun(State1)}; + _ -> case chunk_size(State1 #vqstate.ram_index_count, + permitted_ram_index_count(State1)) of + ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)}; + _ -> {Reduce, State1} + end end. -limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) -> - Reduction = reduction(RamIndexCount, permitted_ram_index_count(State)), - case Reduction of - ?IO_BATCH_SIZE -> - #vqstate { q2 = Q2, q3 = Q3, index_state = IndexState } = State, - {Q2a, {Reduction1, IndexState1}} = - limit_ram_index(fun bpqueue:map_fold_filter_l/4, - Q2, {Reduction, IndexState}), - %% TODO: we shouldn't be writing index entries for - %% messages that can never end up in delta due them - %% residing in the only segment held by q3. - {Q3a, {Reduction2, IndexState2}} = - limit_ram_index(fun bpqueue:map_fold_filter_r/4, - Q3, {Reduction1, IndexState1}), - RamIndexCount1 = RamIndexCount - (Reduction - Reduction2), - State #vqstate { q2 = Q2a, q3 = Q3a, - index_state = IndexState2, - ram_index_count = RamIndexCount1 }; - _ -> - State - end. +reduce_memory_use(State) -> + {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, + fun limit_ram_index/2, + fun push_betas_to_deltas/1, + State), + State1. + +limit_ram_index(Quota, State = #vqstate { q2 = Q2, q3 = Q3, + index_state = IndexState, + ram_index_count = RamIndexCount }) -> + {Q2a, {Quota1, IndexState1}} = limit_ram_index( + fun bpqueue:map_fold_filter_l/4, + Q2, {Quota, IndexState}), + %% TODO: we shouldn't be writing index entries for messages that + %% can never end up in delta due them residing in the only segment + %% held by q3. + {Q3a, {Quota2, IndexState2}} = limit_ram_index( + fun bpqueue:map_fold_filter_r/4, + Q3, {Quota1, IndexState1}), + State #vqstate { q2 = Q2a, q3 = Q3a, + index_state = IndexState2, + ram_index_count = RamIndexCount - (Quota - Quota2) }. limit_ram_index(_MapFoldFilterFun, Q, {0, IndexState}) -> {Q, {0, IndexState}}; -limit_ram_index(MapFoldFilterFun, Q, {Reduction, IndexState}) -> +limit_ram_index(MapFoldFilterFun, Q, {Quota, IndexState}) -> MapFoldFilterFun( fun erlang:'not'/1, fun (MsgStatus, {0, _IndexStateN}) -> @@ -1161,7 +1183,7 @@ limit_ram_index(MapFoldFilterFun, Q, {Reduction, IndexState}) -> {MsgStatus1, IndexStateN1} = maybe_write_index_to_disk(true, MsgStatus, IndexStateN), {true, MsgStatus1, {N-1, IndexStateN1}} - end, {Reduction, IndexState}, Q). + end, {Quota, IndexState}, Q). permitted_ram_index_count(#vqstate { len = 0 }) -> infinity; @@ -1172,10 +1194,10 @@ permitted_ram_index_count(#vqstate { len = Len, BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3), BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)). -reduction(Current, Permitted) +chunk_size(Current, Permitted) when Permitted =:= infinity orelse Permitted >= Current -> 0; -reduction(Current, Permitted) -> +chunk_size(Current, Permitted) -> lists:min([Current - Permitted, ?IO_BATCH_SIZE]). maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) -> @@ -1228,6 +1250,11 @@ maybe_deltas_to_betas(State = #vqstate { end end. +push_alphas_to_betas(Quota, State) -> + { Quota1, State1} = maybe_push_q1_to_betas(Quota, State), + {_Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1), + State2. + maybe_push_q1_to_betas(0, State) -> {0, State}; maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) -> |
