diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-08 18:18:09 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-08 18:18:09 +0100 |
| commit | 3ce9813fd375baea72f9225d90989afd9812842a (patch) | |
| tree | 161977ee426dd2f87639d7e9c9366e8bc037edc5 /src | |
| parent | 4c8ef89985c59c679439477bb5feb799d0740816 (diff) | |
| download | rabbitmq-server-git-3ce9813fd375baea72f9225d90989afd9812842a.tar.gz | |
on out/1, if the msg or its index are on disk and they don't need to be, take them off disk
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 44 |
1 files changed, 32 insertions, 12 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 184050f783..ff3b8b7c4f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -220,20 +220,41 @@ out(State = end, out(State #vqstate { q4 = Q4a, prefetcher = undefined }); {{value, - #alpha { msg = Msg = #basic_message { guid = MsgId }, seq_id = SeqId, - is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, - index_on_disk = IndexOnDisk }}, Q4a} -> - IndexState1 = - case IndexOnDisk andalso not IsDelivered of + #alpha { msg = Msg = #basic_message { guid = MsgId, + is_persistent = IsPersistent }, + seq_id = SeqId, is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }}, + Q4a} -> + {IndexState1, IndexOnDisk1} = + case IndexOnDisk of true -> - rabbit_queue_index:write_delivered(SeqId, IndexState); + IndexState2 = + case IsDelivered of + false -> rabbit_queue_index:write_delivered( + SeqId, IndexState); + true -> IndexState + end, + case IsPersistent of + true -> {IndexState2, true}; + false -> {rabbit_queue_index:write_acks( + [SeqId], IndexState2), false} + end; false -> - IndexState + {IndexState, false} + end, + _MsgOnDisk1 = IndexOnDisk1 = + case IndexOnDisk1 of + true -> true = IsPersistent, %% ASSERTION + true = MsgOnDisk; %% ASSERTION + false -> ok = case MsgOnDisk andalso not IsPersistent of + true -> rabbit_msg_store:remove([MsgId]); + false -> ok + end, + false end, - AckTag = case {IndexOnDisk, MsgOnDisk} of - {true, true } -> {ack_index_and_store, MsgId, SeqId}; - {false, true } -> {ack_store, MsgId}; - {false, false} -> ack_not_on_disk + AckTag = case IndexOnDisk1 of + true -> {ack_index_and_store, MsgId, SeqId}; + false -> ack_not_on_disk end, {{Msg, IsDelivered, AckTag}, State #vqstate { q4 = Q4a, out_counter = OutCount + 1, @@ -260,7 +281,6 @@ out_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, #alpha { msg = Msg, seq_id = SeqId, is_delivered = IsDelivered, msg_on_disk = true, index_on_disk = IndexOnDisk }, Q4), - %% TODO - if it's not persistent, remove it from disk now State1 = State #vqstate { q3 = Q3a, q4 = Q4a }, State2 = case {queue:is_empty(Q3a), 0 == GammaCount} of |
