diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-15 13:37:54 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-15 13:37:54 +0100 |
| commit | 73f611ac92bd6c7a9ad9db55816b3bbf0b7ac8e2 (patch) | |
| tree | 3799bd20fd2a9aeea8f8d1487987245c8bc4d9b2 /src | |
| parent | 1a9461e99cda69b05713c62d7ccade6b56cd5394 (diff) | |
| download | rabbitmq-server-git-73f611ac92bd6c7a9ad9db55816b3bbf0b7ac8e2.tar.gz | |
add some msg_status invariants
these are quite important for understanding much of the code
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 38 |
1 files changed, 24 insertions, 14 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 46e1aad8e6..4d57304cb1 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -451,7 +451,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), - PA1 = record_pending_ack(MsgStatus1, PA), + PA1 = record_pending_ack(m(MsgStatus1), PA), PCount1 = PCount + one_if(IsPersistent1), {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, @@ -705,6 +705,16 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, State. +m(MsgStatus = #msg_status { msg = Msg, + is_persistent = IsPersistent, + msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }) -> + true = (not IsPersistent) or IndexOnDisk, + true = (not IndexOnDisk) or MsgOnDisk, + true = (Msg =/= undefined) or MsgOnDisk, + + MsgStatus. + one_if(true ) -> 1; one_if(false) -> 0. @@ -765,14 +775,14 @@ betas_from_index_entries(List, TransientThreshold, IndexState) -> false -> [SeqId | Delivers1] end, [SeqId | Acks1]}; - false -> {[#msg_status { msg = undefined, - guid = Guid, - seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_on_disk = true, - index_on_disk = true - } | Filtered1], + false -> {[m(#msg_status { msg = undefined, + guid = Guid, + seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = true, + index_on_disk = true + }) | Filtered1], Delivers1, Acks1} end @@ -933,8 +943,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case bpqueue:is_empty(Q3) of - false -> State1 #vqstate { q1 = queue:in(MsgStatus1, Q1) }; - true -> State1 #vqstate { q4 = queue:in(MsgStatus1, Q4) } + false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; + true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } end, PCount1 = PCount + one_if(IsPersistent1), {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, @@ -1131,7 +1141,7 @@ limit_ram_index(MapFoldFilterFun, Q, {Quota, IndexState}) -> false = MsgStatus #msg_status.index_on_disk, %% ASSERTION {MsgStatus1, IndexStateN1} = maybe_write_index_to_disk(true, MsgStatus, IndexStateN), - {true, MsgStatus1, {N-1, IndexStateN1}} + {true, m(MsgStatus1), {N-1, IndexStateN1}} end, {Quota, IndexState}, Q). permitted_ram_index_count(#vqstate { len = 0 }) -> @@ -1166,7 +1176,7 @@ fetch_from_q3_to_q4(State = #vqstate { is_persistent = IsPersistent }}, Q3a} -> {{ok, Msg = #basic_message {}}, MSCState1} = read_from_msg_store(MSCState, IsPersistent, Guid), - Q4a = queue:in(MsgStatus #msg_status { msg = Msg }, Q4), + Q4a = queue:in(m(MsgStatus #msg_status { msg = Msg }), Q4), RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), true = RamIndexCount1 >= 0, %% ASSERTION State1 = State #vqstate { q3 = Q3a, @@ -1288,7 +1298,7 @@ maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> State1 = #vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount }} = maybe_write_to_disk(true, false, MsgStatus, State), - MsgStatus2 = MsgStatus1 #msg_status { msg = undefined }, + MsgStatus2 = m(MsgStatus1 #msg_status { msg = undefined }), RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk), State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1, ram_index_count = RamIndexCount1 }, |
