summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-06-12 16:51:57 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-06-12 16:51:57 +0100
commit40dfa16cd0d264831c74707260ccd4cfe61b35e0 (patch)
treef2c4f17ce6b15c2c9f16a0880e7a3a14d8fd5ea5 /src
parent4aef6d9c6bafc0eb9a99d0abc93a065218342daf (diff)
downloadrabbitmq-server-git-40dfa16cd0d264831c74707260ccd4cfe61b35e0.tar.gz
refactor: simplify 'fetch'
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl51
1 files changed, 17 insertions, 34 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 6efe07bac0..f8ced680f5 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -470,48 +470,31 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }},
Q4a} ->
- AckTag = case AckRequired of
- true -> SeqId;
- false -> blank_ack
- end,
-
%% 1. Mark it delivered if necessary
IndexState1 = maybe_write_delivered(
IndexOnDisk andalso not IsDelivered,
SeqId, IndexState),
- %% 2. If it's on disk and there's no Ack required, remove it
+ %% 2. Remove from msg_store and queue index, if necessary
MsgStore = find_msg_store(IsPersistent),
+ Rem = fun () -> ok = rabbit_msg_store:remove(MsgStore, [Guid]) end,
+ Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
IndexState2 =
- case MsgOnDisk andalso not AckRequired of
- %% Remove from disk now
- true -> ok = case MsgOnDisk of
- true -> rabbit_msg_store:remove(
- MsgStore, [Guid]);
- false -> ok
- end,
- case IndexOnDisk of
- true -> rabbit_queue_index:ack(
- [SeqId], IndexState1);
- false -> IndexState1
- end;
- false -> IndexState1
- end,
-
- %% 3. If it's on disk, not persistent and an ack's
- %% required then remove it from the queue index only.
- IndexState3 =
- case IndexOnDisk andalso AckRequired andalso not IsPersistent of
- true -> rabbit_queue_index:ack([SeqId], IndexState2);
- false -> IndexState2
+ case {MsgOnDisk, IndexOnDisk, AckRequired, IsPersistent} of
+ {true, false, false, _} -> Rem(), IndexState1;
+ {true, true, false, _} -> Rem(), Ack();
+ {true, true, true, false} -> Ack();
+ _ -> IndexState1
end,
- %% 4. If an ack is required, add something sensible to PA
- PA1 = case AckRequired of
- true -> record_pending_ack(MsgStatus #msg_status {
- is_delivered = true }, PA);
- false -> PA
- end,
+ %% 3. If an ack is required, add something sensible to PA
+ {AckTag, PA1} = case AckRequired of
+ true -> PA2 = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, PA),
+ {SeqId, PA2};
+ false -> {blank_ack, PA}
+ end,
PCount1 = maybe_dec(PCount, IsPersistent andalso not AckRequired),
Len1 = Len - 1,
@@ -519,7 +502,7 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
State #vqstate { q4 = Q4a,
ram_msg_count = RamMsgCount - 1,
out_counter = OutCount + 1,
- index_state = IndexState3,
+ index_state = IndexState2,
len = Len1,
persistent_count = PCount1,
pending_ack = PA1 }}