diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-23 12:15:23 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-23 12:15:23 +0100 |
| commit | a964c1d5cb2a40515bff14e4d4aa93b9ea55c420 (patch) | |
| tree | 0074dc099782e0b3cf1f3fb0eca35d499fcb1e65 /src | |
| parent | 09c3e3d41249d37dda5493e348ec1d9d1e46abcc (diff) | |
| parent | 5a052e30baf440b0b7bab1b0f8697f944dc9d36b (diff) | |
| download | rabbitmq-server-git-a964c1d5cb2a40515bff14e4d4aa93b9ea55c420.tar.gz | |
Merging heads
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 59 |
1 files changed, 27 insertions, 32 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 50fa0e2640..958a2903b7 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -427,7 +427,7 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> publish(Msg, State) -> {_SeqId, State1} = publish(Msg, false, false, State), - a(limit_ram_index(reduce_memory_use(State1))). + a(reduce_memory_use(State1)). publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> {blank_ack, a(State)}; @@ -843,22 +843,6 @@ combine_deltas(#delta { start_seq_id = StartLow, beta_fold(Fun, Init, Q) -> bpqueue:foldr(fun (_Prefix, Value, Acc) -> Fun(Value, Acc) end, Init, Q). -permitted_ram_index_count(#vqstate { len = 0 }) -> - infinity; -permitted_ram_index_count(#vqstate { len = Len, - q2 = Q2, - q3 = Q3, - delta = #delta { count = DeltaCount } }) -> - BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3), - BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)). - -should_force_index_to_disk(State = #vqstate { - ram_index_count = RamIndexCount }) -> - case permitted_ram_index_count(State) of - infinity -> false; - Permitted -> RamIndexCount >= Permitted - end. - %%---------------------------------------------------------------------------- %% Internal major helpers for Public API %%---------------------------------------------------------------------------- @@ -1040,19 +1024,6 @@ fetch_from_q3_to_q4(State = #vqstate { {loaded, State2} end. -reduce_memory_use(State = #vqstate { - ram_msg_count = RamMsgCount, - target_ram_msg_count = TargetRamMsgCount }) - when TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount -> - State; -reduce_memory_use(State = #vqstate { - target_ram_msg_count = TargetRamMsgCount }) -> - State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)), - case TargetRamMsgCount of - 0 -> push_betas_to_deltas(State1); - _ -> State1 - end. - %%---------------------------------------------------------------------------- %% Internal gubbins for publishing %%---------------------------------------------------------------------------- @@ -1130,6 +1101,22 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, %% Phase changes %%---------------------------------------------------------------------------- +reduce_memory_use(State = #vqstate { + target_ram_msg_count = infinity }) -> + State; +reduce_memory_use(State = #vqstate { + ram_msg_count = RamMsgCount, + target_ram_msg_count = TargetRamMsgCount }) + when TargetRamMsgCount >= RamMsgCount -> + limit_ram_index(State); +reduce_memory_use(State = #vqstate { + target_ram_msg_count = TargetRamMsgCount }) -> + State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)), + case TargetRamMsgCount of + 0 -> push_betas_to_deltas(State1); + _ -> limit_ram_index(State1) + end. + limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) -> Permitted = permitted_ram_index_count(State), if Permitted =/= infinity andalso RamIndexCount > Permitted -> @@ -1171,6 +1158,15 @@ limit_ram_index(MapFoldFilterFun, Q, {Reduction, IndexState}) -> {true, MsgStatus1, {N-1, IndexStateN1}} end, {Reduction, IndexState}, Q). +permitted_ram_index_count(#vqstate { len = 0 }) -> + infinity; +permitted_ram_index_count(#vqstate { len = Len, + q2 = Q2, + q3 = Q3, + delta = #delta { count = DeltaCount } }) -> + BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3), + BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)). + maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) -> State; maybe_deltas_to_betas(State = #vqstate { @@ -1253,12 +1249,11 @@ maybe_push_alphas_to_betas(Generator, Consumer, Q, State) -> case Generator(Q) of {empty, _Q} -> State; {{value, MsgStatus}, Qa} -> - ForceIndex = should_force_index_to_disk(State), {MsgStatus1 = #msg_status { msg_on_disk = true, index_on_disk = IndexOnDisk }, State1 = #vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount }} = - maybe_write_to_disk(true, ForceIndex, MsgStatus, State), + maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = MsgStatus1 #msg_status { msg = undefined }, RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk), State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1, |
