summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-07-06 19:18:25 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-07-06 19:18:25 +0100
commit0f8ab46e829954ffbbba436c57be640360a41705 (patch)
treede0fe13e26d039d24d565a1f5c075700819548bc /src
parent29cf9fa7bf60c0691a4ba694258114a64fefaeed (diff)
downloadrabbitmq-server-git-0f8ab46e829954ffbbba436c57be640360a41705.tar.gz
simplify memory reduction decisions
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl78
1 files changed, 36 insertions, 42 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 5893385aa1..6f6f3d923d 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -634,14 +634,15 @@ ram_duration(State = #vqstate { egress_rate = Egress,
out_counter = 0,
ram_msg_count_prev = RamMsgCount })}.
-needs_idle_timeout(#vqstate { on_sync = {_, _, SFuns},
- target_ram_msg_count = TargetRamMsgCount,
- ram_msg_count = RamMsgCount })
- when SFuns =/= [] orelse RamMsgCount > TargetRamMsgCount ->
+needs_idle_timeout(#vqstate { on_sync = {_, _, SFuns}}) when SFuns =/= [] ->
true;
-needs_idle_timeout(State = #vqstate { ram_index_count = RamIndexCount }) ->
- Permitted = permitted_ram_index_count(State),
- Permitted =/= infinity andalso RamIndexCount > Permitted.
+needs_idle_timeout(State = #vqstate { target_ram_msg_count = TargetRamMsgCount,
+ ram_msg_count = RamMsgCount,
+ ram_index_count = RamIndexCount}) ->
+ case reduction(RamMsgCount, TargetRamMsgCount) of
+ 0 -> reduction(RamIndexCount, State) == ?IO_BATCH_SIZE;
+ _ -> true
+ end.
idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))).
@@ -1113,49 +1114,36 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
%%----------------------------------------------------------------------------
reduce_memory_use(State = #vqstate {
- target_ram_msg_count = infinity }) ->
- State;
-reduce_memory_use(State = #vqstate {
- ram_msg_count = RamMsgCount,
- target_ram_msg_count = TargetRamMsgCount })
- when TargetRamMsgCount >= RamMsgCount ->
- limit_ram_index(State);
-reduce_memory_use(State = #vqstate {
ram_msg_count = RamMsgCount,
target_ram_msg_count = TargetRamMsgCount }) ->
- Reduction = lists:min([RamMsgCount - TargetRamMsgCount, ?IO_BATCH_SIZE]),
+ Reduction = reduction(RamMsgCount, TargetRamMsgCount),
{ Reduction1, State1} = maybe_push_q1_to_betas(Reduction, State),
{_Reduction2, State2} = maybe_push_q4_to_betas(Reduction1, State1),
case TargetRamMsgCount of
- 0 -> push_betas_to_deltas(State2);
- _ -> limit_ram_index(State2)
+ infinity -> State2;
+ 0 -> push_betas_to_deltas(State2);
+ _ -> limit_ram_index(State2)
end.
limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) ->
- Permitted = permitted_ram_index_count(State),
- if Permitted =/= infinity andalso RamIndexCount > Permitted ->
- Reduction = lists:min([RamIndexCount - Permitted, ?IO_BATCH_SIZE]),
- case Reduction < ?IO_BATCH_SIZE of
- true -> State;
- false -> #vqstate { q2 = Q2, q3 = Q3,
- index_state = IndexState } = State,
- {Q2a, {Reduction1, IndexState1}} =
- limit_ram_index(fun bpqueue:map_fold_filter_l/4,
- Q2, {Reduction, IndexState}),
- %% TODO: we shouldn't be writing index
- %% entries for messages that can never end up
- %% in delta due them residing in the only
- %% segment held by q3.
- {Q3a, {Reduction2, IndexState2}} =
- limit_ram_index(fun bpqueue:map_fold_filter_r/4,
- Q3, {Reduction1, IndexState1}),
- RamIndexCount1 = RamIndexCount -
- (Reduction - Reduction2),
- State #vqstate { q2 = Q2a, q3 = Q3a,
- index_state = IndexState2,
- ram_index_count = RamIndexCount1 }
- end;
- true ->
+ Reduction = reduction(RamIndexCount, permitted_ram_index_count(State)),
+ case Reduction of
+ ?IO_BATCH_SIZE ->
+ #vqstate { q2 = Q2, q3 = Q3, index_state = IndexState } = State,
+ {Q2a, {Reduction1, IndexState1}} =
+ limit_ram_index(fun bpqueue:map_fold_filter_l/4,
+ Q2, {Reduction, IndexState}),
+ %% TODO: we shouldn't be writing index entries for
+ %% messages that can never end up in delta due them
+ %% residing in the only segment held by q3.
+ {Q3a, {Reduction2, IndexState2}} =
+ limit_ram_index(fun bpqueue:map_fold_filter_r/4,
+ Q3, {Reduction1, IndexState1}),
+ RamIndexCount1 = RamIndexCount - (Reduction - Reduction2),
+ State #vqstate { q2 = Q2a, q3 = Q3a,
+ index_state = IndexState2,
+ ram_index_count = RamIndexCount1 };
+ _ ->
State
end.
@@ -1183,6 +1171,12 @@ permitted_ram_index_count(#vqstate { len = Len,
BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3),
BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)).
+reduction(Current, Permitted)
+ when Permitted =:= infinity orelse Permitted >= Current ->
+ 0;
+reduction(Current, Permitted) ->
+ lists:min([Current - Permitted, ?IO_BATCH_SIZE]).
+
maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) ->
State;
maybe_deltas_to_betas(State = #vqstate {