diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-06 19:18:25 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-06 19:18:25 +0100 |
| commit | 0f8ab46e829954ffbbba436c57be640360a41705 (patch) | |
| tree | de0fe13e26d039d24d565a1f5c075700819548bc /src | |
| parent | 29cf9fa7bf60c0691a4ba694258114a64fefaeed (diff) | |
| download | rabbitmq-server-git-0f8ab46e829954ffbbba436c57be640360a41705.tar.gz | |
simplify memory reduction decisions
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 78 |
1 files changed, 36 insertions, 42 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 5893385aa1..6f6f3d923d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -634,14 +634,15 @@ ram_duration(State = #vqstate { egress_rate = Egress, out_counter = 0, ram_msg_count_prev = RamMsgCount })}. -needs_idle_timeout(#vqstate { on_sync = {_, _, SFuns}, - target_ram_msg_count = TargetRamMsgCount, - ram_msg_count = RamMsgCount }) - when SFuns =/= [] orelse RamMsgCount > TargetRamMsgCount -> +needs_idle_timeout(#vqstate { on_sync = {_, _, SFuns}}) when SFuns =/= [] -> true; -needs_idle_timeout(State = #vqstate { ram_index_count = RamIndexCount }) -> - Permitted = permitted_ram_index_count(State), - Permitted =/= infinity andalso RamIndexCount > Permitted. +needs_idle_timeout(State = #vqstate { target_ram_msg_count = TargetRamMsgCount, + ram_msg_count = RamMsgCount, + ram_index_count = RamIndexCount}) -> + case reduction(RamMsgCount, TargetRamMsgCount) of + 0 -> reduction(RamIndexCount, State) == ?IO_BATCH_SIZE; + _ -> true + end. idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))). @@ -1113,49 +1114,36 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, %%---------------------------------------------------------------------------- reduce_memory_use(State = #vqstate { - target_ram_msg_count = infinity }) -> - State; -reduce_memory_use(State = #vqstate { - ram_msg_count = RamMsgCount, - target_ram_msg_count = TargetRamMsgCount }) - when TargetRamMsgCount >= RamMsgCount -> - limit_ram_index(State); -reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount, target_ram_msg_count = TargetRamMsgCount }) -> - Reduction = lists:min([RamMsgCount - TargetRamMsgCount, ?IO_BATCH_SIZE]), + Reduction = reduction(RamMsgCount, TargetRamMsgCount), { Reduction1, State1} = maybe_push_q1_to_betas(Reduction, State), {_Reduction2, State2} = maybe_push_q4_to_betas(Reduction1, State1), case TargetRamMsgCount of - 0 -> push_betas_to_deltas(State2); - _ -> limit_ram_index(State2) + infinity -> State2; + 0 -> push_betas_to_deltas(State2); + _ -> limit_ram_index(State2) end. limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) -> - Permitted = permitted_ram_index_count(State), - if Permitted =/= infinity andalso RamIndexCount > Permitted -> - Reduction = lists:min([RamIndexCount - Permitted, ?IO_BATCH_SIZE]), - case Reduction < ?IO_BATCH_SIZE of - true -> State; - false -> #vqstate { q2 = Q2, q3 = Q3, - index_state = IndexState } = State, - {Q2a, {Reduction1, IndexState1}} = - limit_ram_index(fun bpqueue:map_fold_filter_l/4, - Q2, {Reduction, IndexState}), - %% TODO: we shouldn't be writing index - %% entries for messages that can never end up - %% in delta due them residing in the only - %% segment held by q3. - {Q3a, {Reduction2, IndexState2}} = - limit_ram_index(fun bpqueue:map_fold_filter_r/4, - Q3, {Reduction1, IndexState1}), - RamIndexCount1 = RamIndexCount - - (Reduction - Reduction2), - State #vqstate { q2 = Q2a, q3 = Q3a, - index_state = IndexState2, - ram_index_count = RamIndexCount1 } - end; - true -> + Reduction = reduction(RamIndexCount, permitted_ram_index_count(State)), + case Reduction of + ?IO_BATCH_SIZE -> + #vqstate { q2 = Q2, q3 = Q3, index_state = IndexState } = State, + {Q2a, {Reduction1, IndexState1}} = + limit_ram_index(fun bpqueue:map_fold_filter_l/4, + Q2, {Reduction, IndexState}), + %% TODO: we shouldn't be writing index entries for + %% messages that can never end up in delta due them + %% residing in the only segment held by q3. + {Q3a, {Reduction2, IndexState2}} = + limit_ram_index(fun bpqueue:map_fold_filter_r/4, + Q3, {Reduction1, IndexState1}), + RamIndexCount1 = RamIndexCount - (Reduction - Reduction2), + State #vqstate { q2 = Q2a, q3 = Q3a, + index_state = IndexState2, + ram_index_count = RamIndexCount1 }; + _ -> State end. @@ -1183,6 +1171,12 @@ permitted_ram_index_count(#vqstate { len = Len, BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3), BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)). +reduction(Current, Permitted) + when Permitted =:= infinity orelse Permitted >= Current -> + 0; +reduction(Current, Permitted) -> + lists:min([Current - Permitted, ?IO_BATCH_SIZE]). + maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) -> State; maybe_deltas_to_betas(State = #vqstate { |
