summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-06-23 12:15:23 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-06-23 12:15:23 +0100
commita964c1d5cb2a40515bff14e4d4aa93b9ea55c420 (patch)
tree0074dc099782e0b3cf1f3fb0eca35d499fcb1e65 /src
parent09c3e3d41249d37dda5493e348ec1d9d1e46abcc (diff)
parent5a052e30baf440b0b7bab1b0f8697f944dc9d36b (diff)
downloadrabbitmq-server-git-a964c1d5cb2a40515bff14e4d4aa93b9ea55c420.tar.gz
Merging heads
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl59
1 files changed, 27 insertions, 32 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 50fa0e2640..958a2903b7 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -427,7 +427,7 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
publish(Msg, State) ->
{_SeqId, State1} = publish(Msg, false, false, State),
- a(limit_ram_index(reduce_memory_use(State1))).
+ a(reduce_memory_use(State1)).
publish_delivered(false, _Msg, State = #vqstate { len = 0 }) ->
{blank_ack, a(State)};
@@ -843,22 +843,6 @@ combine_deltas(#delta { start_seq_id = StartLow,
beta_fold(Fun, Init, Q) ->
bpqueue:foldr(fun (_Prefix, Value, Acc) -> Fun(Value, Acc) end, Init, Q).
-permitted_ram_index_count(#vqstate { len = 0 }) ->
- infinity;
-permitted_ram_index_count(#vqstate { len = Len,
- q2 = Q2,
- q3 = Q3,
- delta = #delta { count = DeltaCount } }) ->
- BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3),
- BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)).
-
-should_force_index_to_disk(State = #vqstate {
- ram_index_count = RamIndexCount }) ->
- case permitted_ram_index_count(State) of
- infinity -> false;
- Permitted -> RamIndexCount >= Permitted
- end.
-
%%----------------------------------------------------------------------------
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
@@ -1040,19 +1024,6 @@ fetch_from_q3_to_q4(State = #vqstate {
{loaded, State2}
end.
-reduce_memory_use(State = #vqstate {
- ram_msg_count = RamMsgCount,
- target_ram_msg_count = TargetRamMsgCount })
- when TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount ->
- State;
-reduce_memory_use(State = #vqstate {
- target_ram_msg_count = TargetRamMsgCount }) ->
- State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)),
- case TargetRamMsgCount of
- 0 -> push_betas_to_deltas(State1);
- _ -> State1
- end.
-
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
@@ -1130,6 +1101,22 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
%% Phase changes
%%----------------------------------------------------------------------------
+reduce_memory_use(State = #vqstate {
+ target_ram_msg_count = infinity }) ->
+ State;
+reduce_memory_use(State = #vqstate {
+ ram_msg_count = RamMsgCount,
+ target_ram_msg_count = TargetRamMsgCount })
+ when TargetRamMsgCount >= RamMsgCount ->
+ limit_ram_index(State);
+reduce_memory_use(State = #vqstate {
+ target_ram_msg_count = TargetRamMsgCount }) ->
+ State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)),
+ case TargetRamMsgCount of
+ 0 -> push_betas_to_deltas(State1);
+ _ -> limit_ram_index(State1)
+ end.
+
limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) ->
Permitted = permitted_ram_index_count(State),
if Permitted =/= infinity andalso RamIndexCount > Permitted ->
@@ -1171,6 +1158,15 @@ limit_ram_index(MapFoldFilterFun, Q, {Reduction, IndexState}) ->
{true, MsgStatus1, {N-1, IndexStateN1}}
end, {Reduction, IndexState}, Q).
+permitted_ram_index_count(#vqstate { len = 0 }) ->
+ infinity;
+permitted_ram_index_count(#vqstate { len = Len,
+ q2 = Q2,
+ q3 = Q3,
+ delta = #delta { count = DeltaCount } }) ->
+ BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3),
+ BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)).
+
maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) ->
State;
maybe_deltas_to_betas(State = #vqstate {
@@ -1253,12 +1249,11 @@ maybe_push_alphas_to_betas(Generator, Consumer, Q, State) ->
case Generator(Q) of
{empty, _Q} -> State;
{{value, MsgStatus}, Qa} ->
- ForceIndex = should_force_index_to_disk(State),
{MsgStatus1 = #msg_status { msg_on_disk = true,
index_on_disk = IndexOnDisk },
State1 = #vqstate { ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount }} =
- maybe_write_to_disk(true, ForceIndex, MsgStatus, State),
+ maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = MsgStatus1 #msg_status { msg = undefined },
RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk),
State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1,