diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-22 18:55:07 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-22 18:55:07 +0100 |
| commit | a12b6fc42d414800352f4538a55d9e116b43f9b2 (patch) | |
| tree | f2e3b498eb259795c5d2409a42237b4b4437a524 | |
| parent | 916539f31d540596fa919fc246331435de46301b (diff) | |
| download | rabbitmq-server-git-a12b6fc42d414800352f4538a55d9e116b43f9b2.tar.gz | |
make beta->gamma conversion part of general memory reduction scheme
While it doesn't reduce memory itself, it is still part of the overall
memory reduction logic.
This allows us to cleanly separate the beta->gamma conversion from the
major phase changes.
| -rw-r--r-- | src/rabbit_variable_queue.erl | 21 |
1 files changed, 8 insertions, 13 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 01fc114f19..113bd0f50c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -427,7 +427,7 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> publish(Msg, State) -> {_SeqId, State1} = publish(Msg, false, false, State), - a(limit_ram_index(reduce_memory_use(State1))). + a(reduce_memory_use(State1)). publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> {blank_ack, a(State)}; @@ -1142,13 +1142,6 @@ limit_ram_index(MapFoldFilterFun, Q, {Reduction, IndexState}) -> {true, MsgStatus1, {N-1, IndexStateN1}} end, {Reduction, IndexState}, Q). -should_force_index_to_disk(State = #vqstate { - ram_index_count = RamIndexCount }) -> - case permitted_ram_index_count(State) of - infinity -> false; - Permitted -> RamIndexCount >= Permitted - end. - permitted_ram_index_count(#vqstate { len = 0 }) -> infinity; permitted_ram_index_count(#vqstate { len = Len, @@ -1159,16 +1152,19 @@ permitted_ram_index_count(#vqstate { len = Len, BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)). reduce_memory_use(State = #vqstate { + target_ram_msg_count = infinity }) -> + State; +reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount, target_ram_msg_count = TargetRamMsgCount }) - when TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount -> - State; + when TargetRamMsgCount >= RamMsgCount -> + limit_ram_index(State); reduce_memory_use(State = #vqstate { target_ram_msg_count = TargetRamMsgCount }) -> State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)), case TargetRamMsgCount of 0 -> push_betas_to_deltas(State1); - _ -> State1 + _ -> limit_ram_index(State1) end. maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) -> @@ -1253,12 +1249,11 @@ maybe_push_alphas_to_betas(Generator, Consumer, Q, State) -> case Generator(Q) of {empty, _Q} -> State; {{value, MsgStatus}, Qa} -> - ForceIndex = should_force_index_to_disk(State), {MsgStatus1 = #msg_status { msg_on_disk = true, index_on_disk = IndexOnDisk }, State1 = #vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount }} = - maybe_write_to_disk(true, ForceIndex, MsgStatus, State), + maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = MsgStatus1 #msg_status { msg = undefined }, RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk), State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1, |
