summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-06-22 18:55:07 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-06-22 18:55:07 +0100
commita12b6fc42d414800352f4538a55d9e116b43f9b2 (patch)
treef2e3b498eb259795c5d2409a42237b4b4437a524
parent916539f31d540596fa919fc246331435de46301b (diff)
downloadrabbitmq-server-git-a12b6fc42d414800352f4538a55d9e116b43f9b2.tar.gz
make beta->gamma conversion part of general memory reduction scheme
While it doesn't reduce memory itself, it is still part of the overall memory reduction logic. This allows us to cleanly separate the beta->gamma conversion from the major phase changes.
-rw-r--r--src/rabbit_variable_queue.erl21
1 files changed, 8 insertions, 13 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 01fc114f19..113bd0f50c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -427,7 +427,7 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
publish(Msg, State) ->
{_SeqId, State1} = publish(Msg, false, false, State),
- a(limit_ram_index(reduce_memory_use(State1))).
+ a(reduce_memory_use(State1)).
publish_delivered(false, _Msg, State = #vqstate { len = 0 }) ->
{blank_ack, a(State)};
@@ -1142,13 +1142,6 @@ limit_ram_index(MapFoldFilterFun, Q, {Reduction, IndexState}) ->
{true, MsgStatus1, {N-1, IndexStateN1}}
end, {Reduction, IndexState}, Q).
-should_force_index_to_disk(State = #vqstate {
- ram_index_count = RamIndexCount }) ->
- case permitted_ram_index_count(State) of
- infinity -> false;
- Permitted -> RamIndexCount >= Permitted
- end.
-
permitted_ram_index_count(#vqstate { len = 0 }) ->
infinity;
permitted_ram_index_count(#vqstate { len = Len,
@@ -1159,16 +1152,19 @@ permitted_ram_index_count(#vqstate { len = Len,
BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)).
reduce_memory_use(State = #vqstate {
+ target_ram_msg_count = infinity }) ->
+ State;
+reduce_memory_use(State = #vqstate {
ram_msg_count = RamMsgCount,
target_ram_msg_count = TargetRamMsgCount })
- when TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount ->
- State;
+ when TargetRamMsgCount >= RamMsgCount ->
+ limit_ram_index(State);
reduce_memory_use(State = #vqstate {
target_ram_msg_count = TargetRamMsgCount }) ->
State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)),
case TargetRamMsgCount of
0 -> push_betas_to_deltas(State1);
- _ -> State1
+ _ -> limit_ram_index(State1)
end.
maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) ->
@@ -1253,12 +1249,11 @@ maybe_push_alphas_to_betas(Generator, Consumer, Q, State) ->
case Generator(Q) of
{empty, _Q} -> State;
{{value, MsgStatus}, Qa} ->
- ForceIndex = should_force_index_to_disk(State),
{MsgStatus1 = #msg_status { msg_on_disk = true,
index_on_disk = IndexOnDisk },
State1 = #vqstate { ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount }} =
- maybe_write_to_disk(true, ForceIndex, MsgStatus, State),
+ maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = MsgStatus1 #msg_status { msg = undefined },
RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk),
State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1,