diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-12 16:51:57 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-12 16:51:57 +0100 |
| commit | 40dfa16cd0d264831c74707260ccd4cfe61b35e0 (patch) | |
| tree | f2c4f17ce6b15c2c9f16a0880e7a3a14d8fd5ea5 /src | |
| parent | 4aef6d9c6bafc0eb9a99d0abc93a065218342daf (diff) | |
| download | rabbitmq-server-git-40dfa16cd0d264831c74707260ccd4cfe61b35e0.tar.gz | |
refactor: simplify 'fetch'
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 51 |
1 files changed, 17 insertions, 34 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 6efe07bac0..f8ced680f5 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -470,48 +470,31 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }}, Q4a} -> - AckTag = case AckRequired of - true -> SeqId; - false -> blank_ack - end, - %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( IndexOnDisk andalso not IsDelivered, SeqId, IndexState), - %% 2. If it's on disk and there's no Ack required, remove it + %% 2. Remove from msg_store and queue index, if necessary MsgStore = find_msg_store(IsPersistent), + Rem = fun () -> ok = rabbit_msg_store:remove(MsgStore, [Guid]) end, + Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, IndexState2 = - case MsgOnDisk andalso not AckRequired of - %% Remove from disk now - true -> ok = case MsgOnDisk of - true -> rabbit_msg_store:remove( - MsgStore, [Guid]); - false -> ok - end, - case IndexOnDisk of - true -> rabbit_queue_index:ack( - [SeqId], IndexState1); - false -> IndexState1 - end; - false -> IndexState1 - end, - - %% 3. If it's on disk, not persistent and an ack's - %% required then remove it from the queue index only. - IndexState3 = - case IndexOnDisk andalso AckRequired andalso not IsPersistent of - true -> rabbit_queue_index:ack([SeqId], IndexState2); - false -> IndexState2 + case {MsgOnDisk, IndexOnDisk, AckRequired, IsPersistent} of + {true, false, false, _} -> Rem(), IndexState1; + {true, true, false, _} -> Rem(), Ack(); + {true, true, true, false} -> Ack(); + _ -> IndexState1 end, - %% 4. If an ack is required, add something sensible to PA - PA1 = case AckRequired of - true -> record_pending_ack(MsgStatus #msg_status { - is_delivered = true }, PA); - false -> PA - end, + %% 3. If an ack is required, add something sensible to PA + {AckTag, PA1} = case AckRequired of + true -> PA2 = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, PA), + {SeqId, PA2}; + false -> {blank_ack, PA} + end, PCount1 = maybe_dec(PCount, IsPersistent andalso not AckRequired), Len1 = Len - 1, @@ -519,7 +502,7 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, State #vqstate { q4 = Q4a, ram_msg_count = RamMsgCount - 1, out_counter = OutCount + 1, - index_state = IndexState3, + index_state = IndexState2, len = Len1, persistent_count = PCount1, pending_ack = PA1 }} |
