diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 18 |
1 files changed, 12 insertions, 6 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 6ac92dc1e1..b33df24fc7 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -420,11 +420,14 @@ publish_delivered(true, Msg = #basic_message { guid = Guid, fetch(AckRequired, State = #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, out_counter = OutCount, - index_state = IndexState, len = Len, + index_state = IndexState, len = Len, persistent_count = PCount, persistent_store = PersistentStore, pending_ack = PA }) -> case queue:out(Q4) of {empty, _Q4} -> - fetch_from_q3_or_delta(AckRequired, State); + case fetch_from_q3_or_delta(State) of + {empty, _State1} = Result -> Result; + {loaded, State1} -> fetch(AckRequired, State1) + end; {{value, MsgStatus = #msg_status { msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, @@ -486,12 +489,16 @@ fetch(AckRequired, State = false -> PA end, + PCount1 = case IsPersistent andalso not AckRequired of + true -> PCount - 1; + false -> PCount + end, Len1 = Len - 1, {{Msg, IsDelivered, AckTag, Len1}, State #vqstate { q4 = Q4a, out_counter = OutCount + 1, ram_msg_count = RamMsgCount - 1, index_state = IndexState3, len = Len1, - pending_ack = PA1 }} + pending_ack = PA1, persistent_count = PCount1 }} end. ack([], State) -> @@ -1032,8 +1039,7 @@ remove_queue_entries1( end, {PersistentStore, CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}. -fetch_from_q3_or_delta(AckRequired, - State = #vqstate { +fetch_from_q3_or_delta(State = #vqstate { q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount }, q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, @@ -1079,7 +1085,7 @@ fetch_from_q3_or_delta(AckRequired, %% delta and q3 are maintained State1 end, - fetch(AckRequired, State2) + {loaded, State2} end. reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount, |
