summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl42
1 files changed, 19 insertions, 23 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 3a283bbf6e..6efe07bac0 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -476,11 +476,9 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
end,
%% 1. Mark it delivered if necessary
- IndexState1 = case IndexOnDisk andalso not IsDelivered of
- true -> rabbit_queue_index:deliver(
- SeqId, IndexState);
- false -> IndexState
- end,
+ IndexState1 = maybe_write_delivered(
+ IndexOnDisk andalso not IsDelivered,
+ SeqId, IndexState),
%% 2. If it's on disk and there's no Ack required, remove it
MsgStore = find_msg_store(IsPersistent),
@@ -691,6 +689,11 @@ msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) ->
is_persistent = IsPersistent, is_delivered = false,
msg_on_disk = false, index_on_disk = false }.
+maybe_write_delivered(false, _SeqId, IndexState) ->
+ IndexState;
+maybe_write_delivered(true, SeqId, IndexState) ->
+ rabbit_queue_index:deliver(SeqId, IndexState).
+
record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent,
msg_on_disk = MsgOnDisk } = MsgStatus, PA) ->
@@ -773,14 +776,11 @@ betas_from_segment_entries(List, TransientThreshold, IndexState) ->
fun ({Guid, SeqId, IsPersistent, IsDelivered},
{FilteredAcc, IndexStateAcc}) ->
case SeqId < TransientThreshold andalso not IsPersistent of
- true -> IndexStateAcc1 =
- case IsDelivered of
- false -> rabbit_queue_index:deliver(
- SeqId, IndexStateAcc);
- true -> IndexStateAcc
- end,
- {FilteredAcc, rabbit_queue_index:ack(
- [SeqId], IndexStateAcc1)};
+ true -> {FilteredAcc,
+ rabbit_queue_index:ack(
+ [SeqId], maybe_write_delivered(
+ not IsDelivered,
+ SeqId, IndexStateAcc))};
false -> {[#msg_status { msg = undefined,
guid = Guid,
seq_id = SeqId,
@@ -1014,7 +1014,7 @@ remove_queue_entries1(
#msg_status { guid = Guid, seq_id = SeqId,
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
- {CountN, GuidsByStore, SeqIdsAcc, IndexStateN}) ->
+ {Count, GuidsByStore, SeqIdsAcc, IndexState}) ->
GuidsByStore1 =
case {MsgOnDisk, IsPersistent} of
{true, true} -> rabbit_misc:dict_cons(?PERSISTENT_MSG_STORE,
@@ -1027,11 +1027,10 @@ remove_queue_entries1(
true -> [SeqId | SeqIdsAcc];
false -> SeqIdsAcc
end,
- IndexStateN1 = case IndexOnDisk andalso not IsDelivered of
- true -> rabbit_queue_index:deliver(SeqId, IndexStateN);
- false -> IndexStateN
- end,
- {CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}.
+ IndexState1 = maybe_write_delivered(
+ IndexOnDisk andalso not IsDelivered,
+ SeqId, IndexState),
+ {Count + 1, GuidsByStore1, SeqIdsAcc1, IndexState1}.
fetch_from_q3_or_delta(State = #vqstate {
q1 = Q1,
@@ -1256,10 +1255,7 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
IndexState1 = rabbit_queue_index:publish(Guid, SeqId, IsPersistent,
IndexState),
{MsgStatus #msg_status { index_on_disk = true },
- case IsDelivered of
- true -> rabbit_queue_index:deliver(SeqId, IndexState1);
- false -> IndexState1
- end};
+ maybe_write_delivered(IsDelivered, SeqId, IndexState1)};
maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
{MsgStatus, IndexState}.