summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-08 18:18:09 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-08 18:18:09 +0100
commit3ce9813fd375baea72f9225d90989afd9812842a (patch)
tree161977ee426dd2f87639d7e9c9366e8bc037edc5 /src
parent4c8ef89985c59c679439477bb5feb799d0740816 (diff)
downloadrabbitmq-server-git-3ce9813fd375baea72f9225d90989afd9812842a.tar.gz
on out/1, if the msg or its index are on disk and they don't need to be, take them off disk
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl44
1 files changed, 32 insertions, 12 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 184050f783..ff3b8b7c4f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -220,20 +220,41 @@ out(State =
end,
out(State #vqstate { q4 = Q4a, prefetcher = undefined });
{{value,
- #alpha { msg = Msg = #basic_message { guid = MsgId }, seq_id = SeqId,
- is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
- index_on_disk = IndexOnDisk }}, Q4a} ->
- IndexState1 =
- case IndexOnDisk andalso not IsDelivered of
+ #alpha { msg = Msg = #basic_message { guid = MsgId,
+ is_persistent = IsPersistent },
+ seq_id = SeqId, is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }},
+ Q4a} ->
+ {IndexState1, IndexOnDisk1} =
+ case IndexOnDisk of
true ->
- rabbit_queue_index:write_delivered(SeqId, IndexState);
+ IndexState2 =
+ case IsDelivered of
+ false -> rabbit_queue_index:write_delivered(
+ SeqId, IndexState);
+ true -> IndexState
+ end,
+ case IsPersistent of
+ true -> {IndexState2, true};
+ false -> {rabbit_queue_index:write_acks(
+ [SeqId], IndexState2), false}
+ end;
false ->
- IndexState
+ {IndexState, false}
+ end,
+ _MsgOnDisk1 = IndexOnDisk1 =
+ case IndexOnDisk1 of
+ true -> true = IsPersistent, %% ASSERTION
+ true = MsgOnDisk; %% ASSERTION
+ false -> ok = case MsgOnDisk andalso not IsPersistent of
+ true -> rabbit_msg_store:remove([MsgId]);
+ false -> ok
+ end,
+ false
end,
- AckTag = case {IndexOnDisk, MsgOnDisk} of
- {true, true } -> {ack_index_and_store, MsgId, SeqId};
- {false, true } -> {ack_store, MsgId};
- {false, false} -> ack_not_on_disk
+ AckTag = case IndexOnDisk1 of
+ true -> {ack_index_and_store, MsgId, SeqId};
+ false -> ack_not_on_disk
end,
{{Msg, IsDelivered, AckTag},
State #vqstate { q4 = Q4a, out_counter = OutCount + 1,
@@ -260,7 +281,6 @@ out_from_q3(State = #vqstate { q1 = Q1, q2 = Q2,
#alpha { msg = Msg, seq_id = SeqId,
is_delivered = IsDelivered, msg_on_disk = true,
index_on_disk = IndexOnDisk }, Q4),
- %% TODO - if it's not persistent, remove it from disk now
State1 = State #vqstate { q3 = Q3a, q4 = Q4a },
State2 =
case {queue:is_empty(Q3a), 0 == GammaCount} of