summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-07-15 13:37:54 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-07-15 13:37:54 +0100
commit73f611ac92bd6c7a9ad9db55816b3bbf0b7ac8e2 (patch)
tree3799bd20fd2a9aeea8f8d1487987245c8bc4d9b2 /src
parent1a9461e99cda69b05713c62d7ccade6b56cd5394 (diff)
downloadrabbitmq-server-git-73f611ac92bd6c7a9ad9db55816b3bbf0b7ac8e2.tar.gz
add some msg_status invariants
these are quite important for understanding much of the code
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl38
1 files changed, 24 insertions, 14 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 46e1aad8e6..4d57304cb1 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -451,7 +451,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
- PA1 = record_pending_ack(MsgStatus1, PA),
+ PA1 = record_pending_ack(m(MsgStatus1), PA),
PCount1 = PCount + one_if(IsPersistent1),
{SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
@@ -705,6 +705,16 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
State.
+m(MsgStatus = #msg_status { msg = Msg,
+ is_persistent = IsPersistent,
+ msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk }) ->
+ true = (not IsPersistent) or IndexOnDisk,
+ true = (not IndexOnDisk) or MsgOnDisk,
+ true = (Msg =/= undefined) or MsgOnDisk,
+
+ MsgStatus.
+
one_if(true ) -> 1;
one_if(false) -> 0.
@@ -765,14 +775,14 @@ betas_from_index_entries(List, TransientThreshold, IndexState) ->
false -> [SeqId | Delivers1]
end,
[SeqId | Acks1]};
- false -> {[#msg_status { msg = undefined,
- guid = Guid,
- seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = true,
- index_on_disk = true
- } | Filtered1],
+ false -> {[m(#msg_status { msg = undefined,
+ guid = Guid,
+ seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = true,
+ index_on_disk = true
+ }) | Filtered1],
Delivers1,
Acks1}
end
@@ -933,8 +943,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent },
#msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case bpqueue:is_empty(Q3) of
- false -> State1 #vqstate { q1 = queue:in(MsgStatus1, Q1) };
- true -> State1 #vqstate { q4 = queue:in(MsgStatus1, Q4) }
+ false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) };
+ true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
end,
PCount1 = PCount + one_if(IsPersistent1),
{SeqId, State2 #vqstate { next_seq_id = SeqId + 1,
@@ -1131,7 +1141,7 @@ limit_ram_index(MapFoldFilterFun, Q, {Quota, IndexState}) ->
false = MsgStatus #msg_status.index_on_disk, %% ASSERTION
{MsgStatus1, IndexStateN1} =
maybe_write_index_to_disk(true, MsgStatus, IndexStateN),
- {true, MsgStatus1, {N-1, IndexStateN1}}
+ {true, m(MsgStatus1), {N-1, IndexStateN1}}
end, {Quota, IndexState}, Q).
permitted_ram_index_count(#vqstate { len = 0 }) ->
@@ -1166,7 +1176,7 @@ fetch_from_q3_to_q4(State = #vqstate {
is_persistent = IsPersistent }}, Q3a} ->
{{ok, Msg = #basic_message {}}, MSCState1} =
read_from_msg_store(MSCState, IsPersistent, Guid),
- Q4a = queue:in(MsgStatus #msg_status { msg = Msg }, Q4),
+ Q4a = queue:in(m(MsgStatus #msg_status { msg = Msg }), Q4),
RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk),
true = RamIndexCount1 >= 0, %% ASSERTION
State1 = State #vqstate { q3 = Q3a,
@@ -1288,7 +1298,7 @@ maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
State1 = #vqstate { ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount }} =
maybe_write_to_disk(true, false, MsgStatus, State),
- MsgStatus2 = MsgStatus1 #msg_status { msg = undefined },
+ MsgStatus2 = m(MsgStatus1 #msg_status { msg = undefined }),
RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk),
State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1,
ram_index_count = RamIndexCount1 },