summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-26 15:02:31 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-26 15:02:31 +0100
commitd1a7c2ac8aabf7446d86ccb7d5c0d59aee428aa2 (patch)
tree115a646ab3bb1fe154c92664b823222080b1087d
parent5d13241c57732c7197678f381ccc2e6efab57b6b (diff)
downloadrabbitmq-server-git-d1a7c2ac8aabf7446d86ccb7d5c0d59aee428aa2.tar.gz
Because of the exposure of AckRequired into the interface, the counting of persistent messages has potentially been wrong for a while (previously only decr on ack, but actually need to maybe decr on fetch too). This can have the effect on startup of telling the queue it has more messages in it than it really has, which can cause an infinite loop
-rw-r--r--src/rabbit_variable_queue.erl18
1 files changed, 12 insertions, 6 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 6ac92dc1e1..b33df24fc7 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -420,11 +420,14 @@ publish_delivered(true, Msg = #basic_message { guid = Guid,
fetch(AckRequired, State =
#vqstate { q4 = Q4, ram_msg_count = RamMsgCount, out_counter = OutCount,
- index_state = IndexState, len = Len,
+ index_state = IndexState, len = Len, persistent_count = PCount,
persistent_store = PersistentStore, pending_ack = PA }) ->
case queue:out(Q4) of
{empty, _Q4} ->
- fetch_from_q3_or_delta(AckRequired, State);
+ case fetch_from_q3_or_delta(State) of
+ {empty, _State1} = Result -> Result;
+ {loaded, State1} -> fetch(AckRequired, State1)
+ end;
{{value, MsgStatus = #msg_status {
msg = Msg, guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent, is_delivered = IsDelivered,
@@ -486,12 +489,16 @@ fetch(AckRequired, State =
false -> PA
end,
+ PCount1 = case IsPersistent andalso not AckRequired of
+ true -> PCount - 1;
+ false -> PCount
+ end,
Len1 = Len - 1,
{{Msg, IsDelivered, AckTag, Len1},
State #vqstate { q4 = Q4a, out_counter = OutCount + 1,
ram_msg_count = RamMsgCount - 1,
index_state = IndexState3, len = Len1,
- pending_ack = PA1 }}
+ pending_ack = PA1, persistent_count = PCount1 }}
end.
ack([], State) ->
@@ -1032,8 +1039,7 @@ remove_queue_entries1(
end,
{PersistentStore, CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}.
-fetch_from_q3_or_delta(AckRequired,
- State = #vqstate {
+fetch_from_q3_or_delta(State = #vqstate {
q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount },
q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount,
@@ -1079,7 +1085,7 @@ fetch_from_q3_or_delta(AckRequired,
%% delta and q3 are maintained
State1
end,
- fetch(AckRequired, State2)
+ {loaded, State2}
end.
reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount,