summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl18
1 files changed, 12 insertions, 6 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 6ac92dc1e1..b33df24fc7 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -420,11 +420,14 @@ publish_delivered(true, Msg = #basic_message { guid = Guid,
fetch(AckRequired, State =
#vqstate { q4 = Q4, ram_msg_count = RamMsgCount, out_counter = OutCount,
- index_state = IndexState, len = Len,
+ index_state = IndexState, len = Len, persistent_count = PCount,
persistent_store = PersistentStore, pending_ack = PA }) ->
case queue:out(Q4) of
{empty, _Q4} ->
- fetch_from_q3_or_delta(AckRequired, State);
+ case fetch_from_q3_or_delta(State) of
+ {empty, _State1} = Result -> Result;
+ {loaded, State1} -> fetch(AckRequired, State1)
+ end;
{{value, MsgStatus = #msg_status {
msg = Msg, guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent, is_delivered = IsDelivered,
@@ -486,12 +489,16 @@ fetch(AckRequired, State =
false -> PA
end,
+ PCount1 = case IsPersistent andalso not AckRequired of
+ true -> PCount - 1;
+ false -> PCount
+ end,
Len1 = Len - 1,
{{Msg, IsDelivered, AckTag, Len1},
State #vqstate { q4 = Q4a, out_counter = OutCount + 1,
ram_msg_count = RamMsgCount - 1,
index_state = IndexState3, len = Len1,
- pending_ack = PA1 }}
+ pending_ack = PA1, persistent_count = PCount1 }}
end.
ack([], State) ->
@@ -1032,8 +1039,7 @@ remove_queue_entries1(
end,
{PersistentStore, CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}.
-fetch_from_q3_or_delta(AckRequired,
- State = #vqstate {
+fetch_from_q3_or_delta(State = #vqstate {
q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount },
q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount,
@@ -1079,7 +1085,7 @@ fetch_from_q3_or_delta(AckRequired,
%% delta and q3 are maintained
State1
end,
- fetch(AckRequired, State2)
+ {loaded, State2}
end.
reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount,