diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 42 |
1 files changed, 19 insertions, 23 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 3a283bbf6e..6efe07bac0 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -476,11 +476,9 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, end, %% 1. Mark it delivered if necessary - IndexState1 = case IndexOnDisk andalso not IsDelivered of - true -> rabbit_queue_index:deliver( - SeqId, IndexState); - false -> IndexState - end, + IndexState1 = maybe_write_delivered( + IndexOnDisk andalso not IsDelivered, + SeqId, IndexState), %% 2. If it's on disk and there's no Ack required, remove it MsgStore = find_msg_store(IsPersistent), @@ -691,6 +689,11 @@ msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) -> is_persistent = IsPersistent, is_delivered = false, msg_on_disk = false, index_on_disk = false }. +maybe_write_delivered(false, _SeqId, IndexState) -> + IndexState; +maybe_write_delivered(true, SeqId, IndexState) -> + rabbit_queue_index:deliver(SeqId, IndexState). + record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, msg_on_disk = MsgOnDisk } = MsgStatus, PA) -> @@ -773,14 +776,11 @@ betas_from_segment_entries(List, TransientThreshold, IndexState) -> fun ({Guid, SeqId, IsPersistent, IsDelivered}, {FilteredAcc, IndexStateAcc}) -> case SeqId < TransientThreshold andalso not IsPersistent of - true -> IndexStateAcc1 = - case IsDelivered of - false -> rabbit_queue_index:deliver( - SeqId, IndexStateAcc); - true -> IndexStateAcc - end, - {FilteredAcc, rabbit_queue_index:ack( - [SeqId], IndexStateAcc1)}; + true -> {FilteredAcc, + rabbit_queue_index:ack( + [SeqId], maybe_write_delivered( + not IsDelivered, + SeqId, IndexStateAcc))}; false -> {[#msg_status { msg = undefined, guid = Guid, seq_id = SeqId, @@ -1014,7 +1014,7 @@ remove_queue_entries1( #msg_status { guid = Guid, seq_id = SeqId, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, - {CountN, GuidsByStore, SeqIdsAcc, IndexStateN}) -> + {Count, GuidsByStore, SeqIdsAcc, IndexState}) -> GuidsByStore1 = case {MsgOnDisk, IsPersistent} of {true, true} -> rabbit_misc:dict_cons(?PERSISTENT_MSG_STORE, @@ -1027,11 +1027,10 @@ remove_queue_entries1( true -> [SeqId | SeqIdsAcc]; false -> SeqIdsAcc end, - IndexStateN1 = case IndexOnDisk andalso not IsDelivered of - true -> rabbit_queue_index:deliver(SeqId, IndexStateN); - false -> IndexStateN - end, - {CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}. + IndexState1 = maybe_write_delivered( + IndexOnDisk andalso not IsDelivered, + SeqId, IndexState), + {Count + 1, GuidsByStore1, SeqIdsAcc1, IndexState1}. fetch_from_q3_or_delta(State = #vqstate { q1 = Q1, @@ -1256,10 +1255,7 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status { IndexState1 = rabbit_queue_index:publish(Guid, SeqId, IsPersistent, IndexState), {MsgStatus #msg_status { index_on_disk = true }, - case IsDelivered of - true -> rabbit_queue_index:deliver(SeqId, IndexState1); - false -> IndexState1 - end}; + maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> {MsgStatus, IndexState}. |
