summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-06-15 18:40:20 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-06-15 18:40:20 +0100
commit90c6ec6c3fa27ad5f1c62db61aa4923b6c5ea5a4 (patch)
tree9ffa729350790964a58700c9b4518ce5ba47c100 /src
parent74f5c9ca871208523454fbd3be5def4cc21a98c9 (diff)
downloadrabbitmq-server-git-90c6ec6c3fa27ad5f1c62db61aa4923b6c5ea5a4.tar.gz
assert vq state invariants
as a post condition on all exported functions
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl143
1 files changed, 79 insertions, 64 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 003088cb42..c2f90bac77 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -368,7 +368,7 @@ init(QueueName, IsDurable, _Recover) ->
avg_ingress_rate = 0,
rate_timestamp = Now
},
- maybe_deltas_to_betas(State).
+ a(maybe_deltas_to_betas(State)).
terminate(State) ->
State1 = #vqstate { persistent_count = PCount,
@@ -384,9 +384,9 @@ terminate(State) ->
Terms = [{persistent_ref, PRef},
{transient_ref, TRef},
{persistent_count, PCount}],
- State1 #vqstate { index_state = rabbit_queue_index:terminate(
- Terms, IndexState),
- msg_store_clients = undefined }.
+ a(State1 #vqstate { index_state = rabbit_queue_index:terminate(
+ Terms, IndexState),
+ msg_store_clients = undefined }).
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
@@ -416,8 +416,8 @@ delete_and_terminate(State) ->
end,
rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef),
rabbit_msg_store:client_terminate(MSCStateT),
- State2 #vqstate { index_state = IndexState5,
- msg_store_clients = undefined }.
+ a(State2 #vqstate { index_state = IndexState5,
+ msg_store_clients = undefined }).
purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
{Q4Count, IndexState1} =
@@ -425,18 +425,18 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
{Len, State1} =
purge1(Q4Count, State #vqstate { q4 = queue:new(),
index_state = IndexState1 }),
- {Len, State1 #vqstate { len = 0,
- ram_msg_count = 0,
- ram_index_count = 0,
- persistent_count = 0 }}.
+ {Len, a(State1 #vqstate { len = 0,
+ ram_msg_count = 0,
+ ram_index_count = 0,
+ persistent_count = 0 })}.
publish(Msg, State) ->
State1 = limit_ram_index(State),
{_SeqId, State2} = publish(Msg, false, false, State1),
- State2.
+ a(State2).
publish_delivered(false, _Msg, State = #vqstate { len = 0 }) ->
- {blank_ack, State};
+ {blank_ack, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
State = #vqstate { len = 0,
next_seq_id = SeqId,
@@ -451,11 +451,11 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
PA1 = record_pending_ack(MsgStatus1, PA),
PCount1 = PCount + one_if(IsPersistent1),
- {SeqId, State1 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- pending_ack = PA1 }}.
+ {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ pending_ack = PA1 })}.
fetch(AckRequired, State = #vqstate { q4 = Q4,
ram_msg_count = RamMsgCount,
@@ -467,8 +467,8 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
case queue:out(Q4) of
{empty, _Q4} ->
case fetch_from_q3_to_q4(State) of
- {empty, _State1} = Result -> Result;
- {loaded, State1} -> fetch(AckRequired, State1)
+ {empty, State1} = Result -> a(State1), Result;
+ {loaded, State1} -> fetch(AckRequired, State1)
end;
{{value, MsgStatus = #msg_status {
msg = Msg, guid = Guid, seq_id = SeqId,
@@ -505,30 +505,30 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
Len1 = Len - 1,
{{Msg, IsDelivered, AckTag, Len1},
- State #vqstate { q4 = Q4a,
- ram_msg_count = RamMsgCount - 1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len1,
- persistent_count = PCount1,
- pending_ack = PA1 }}
+ a(State #vqstate { q4 = Q4a,
+ ram_msg_count = RamMsgCount - 1,
+ out_counter = OutCount + 1,
+ index_state = IndexState2,
+ len = Len1,
+ persistent_count = PCount1,
+ pending_ack = PA1 })}
end.
ack(AckTags, State) ->
- ack(fun (_AckEntry, State1) -> State1 end, AckTags, State).
+ a(ack(fun (_AckEntry, State1) -> State1 end, AckTags, State)).
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent },
State = #vqstate { durable = IsDurable,
msg_store_clients = MSCState }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }),
- case IsPersistent andalso IsDurable of
- true -> MsgStatus = msg_status(true, undefined, Msg),
- {#msg_status { msg_on_disk = true }, MSCState1} =
- maybe_write_msg_to_disk(false, MsgStatus, MSCState),
- State #vqstate { msg_store_clients = MSCState1 };
- false -> State
- end.
+ a(case IsPersistent andalso IsDurable of
+ true -> MsgStatus = msg_status(true, undefined, Msg),
+ {#msg_status { msg_on_disk = true }, MSCState1} =
+ maybe_write_msg_to_disk(false, MsgStatus, MSCState),
+ State #vqstate { msg_store_clients = MSCState1 };
+ false -> State
+ end).
tx_ack(Txn, AckTags, State) ->
Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
@@ -543,7 +543,7 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable }) ->
persistent_guids(Pubs));
false -> ok
end,
- {lists:flatten(AckTags), State}.
+ {lists:flatten(AckTags), a(State)}.
tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
%% If we are a non-durable queue, or we have no persistent pubs,
@@ -555,28 +555,28 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
PersistentGuids = persistent_guids(PubsOrdered),
IsTransientPubs = [] == PersistentGuids,
{AckTags1,
- case (not IsDurable) orelse IsTransientPubs of
- true -> tx_commit_post_msg_store(
- IsTransientPubs, PubsOrdered, AckTags1, Fun, State);
- false -> ok = rabbit_msg_store:sync(
- ?PERSISTENT_MSG_STORE, PersistentGuids,
- msg_store_callback(PersistentGuids, IsTransientPubs,
- PubsOrdered, AckTags1, Fun)),
- State
- end}.
+ a(case (not IsDurable) orelse IsTransientPubs of
+ true -> tx_commit_post_msg_store(
+ IsTransientPubs, PubsOrdered, AckTags1, Fun, State);
+ false -> ok = rabbit_msg_store:sync(
+ ?PERSISTENT_MSG_STORE, PersistentGuids,
+ msg_store_callback(PersistentGuids, IsTransientPubs,
+ PubsOrdered, AckTags1, Fun)),
+ State
+ end)}.
requeue(AckTags, State) ->
- ack(fun (#msg_status { msg = Msg }, State1) ->
- {_SeqId, State2} = publish(Msg, true, false, State1),
- State2;
- ({IsPersistent, Guid}, State1) ->
- #vqstate { msg_store_clients = MSCState } = State1,
- {{ok, Msg = #basic_message{}}, MSCState1} =
- read_from_msg_store(MSCState, IsPersistent, Guid),
- State2 = State1 #vqstate { msg_store_clients = MSCState1 },
- {_SeqId, State3} = publish(Msg, true, true, State2),
- State3
- end, AckTags, State).
+ a(ack(fun (#msg_status { msg = Msg }, State1) ->
+ {_SeqId, State2} = publish(Msg, true, false, State1),
+ State2;
+ ({IsPersistent, Guid}, State1) ->
+ #vqstate { msg_store_clients = MSCState } = State1,
+ {{ok, Msg = #basic_message{}}, MSCState1} =
+ read_from_msg_store(MSCState, IsPersistent, Guid),
+ State2 = State1 #vqstate { msg_store_clients = MSCState1 },
+ {_SeqId, State3} = publish(Msg, true, true, State2),
+ State3
+ end, AckTags, State)).
len(#vqstate { len = Len }) -> Len.
@@ -596,11 +596,11 @@ set_ram_duration_target(DurationTarget,
end,
State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1,
duration_target = DurationTarget },
- case TargetRamMsgCount1 == undefined orelse
- TargetRamMsgCount1 >= TargetRamMsgCount of
- true -> State1;
- false -> reduce_memory_use(State1)
- end.
+ a(case TargetRamMsgCount1 == undefined orelse
+ TargetRamMsgCount1 >= TargetRamMsgCount of
+ true -> State1;
+ false -> reduce_memory_use(State1)
+ end).
ram_duration(State = #vqstate { egress_rate = Egress,
ingress_rate = Ingress,
@@ -635,7 +635,7 @@ ram_duration(State = #vqstate { egress_rate = Egress,
needs_sync(#vqstate { on_sync = {_, _, []} }) -> false;
needs_sync(_) -> true.
-sync(State) -> tx_commit_index(State).
+sync(State) -> a(tx_commit_index(State)).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
@@ -667,6 +667,24 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
%% Minor helpers
%%----------------------------------------------------------------------------
+a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
+ len = Len, target_ram_msg_count = TargetRamMsgCount }) ->
+ E1 = queue:is_empty(Q1),
+ E2 = bpqueue:is_empty(Q2),
+ ED = Delta#delta.count == 0,
+ E3 = bpqueue:is_empty(Q3),
+ E4 = queue:is_empty(Q4),
+ TZ = TargetRamMsgCount == 0,
+ LZ = Len == 0,
+
+ true = E1 or not E3,
+ true = E2 or not ED,
+ true = ED or not E3,
+ true = (E1 and E2 and E4) or not TZ,
+ true = LZ == (E3 and E4),
+
+ State.
+
one_if(true ) -> 1;
one_if(false) -> 0.
@@ -1006,9 +1024,6 @@ fetch_from_q3_to_q4(State = #vqstate {
msg_store_clients = MSCState }) ->
case bpqueue:out(Q3) of
{empty, _Q3} ->
- 0 = DeltaCount, %% ASSERTION
- true = bpqueue:is_empty(Q2), %% ASSERTION
- true = queue:is_empty(Q1), %% ASSERTION
{empty, State};
{{value, IndexOnDisk, MsgStatus = #msg_status {
msg = undefined, guid = Guid,