diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 143 |
1 files changed, 79 insertions, 64 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 003088cb42..c2f90bac77 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -368,7 +368,7 @@ init(QueueName, IsDurable, _Recover) -> avg_ingress_rate = 0, rate_timestamp = Now }, - maybe_deltas_to_betas(State). + a(maybe_deltas_to_betas(State)). terminate(State) -> State1 = #vqstate { persistent_count = PCount, @@ -384,9 +384,9 @@ terminate(State) -> Terms = [{persistent_ref, PRef}, {transient_ref, TRef}, {persistent_count, PCount}], - State1 #vqstate { index_state = rabbit_queue_index:terminate( - Terms, IndexState), - msg_store_clients = undefined }. + a(State1 #vqstate { index_state = rabbit_queue_index:terminate( + Terms, IndexState), + msg_store_clients = undefined }). %% the only difference between purge and delete is that delete also %% needs to delete everything that's been delivered and not ack'd. @@ -416,8 +416,8 @@ delete_and_terminate(State) -> end, rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef), rabbit_msg_store:client_terminate(MSCStateT), - State2 #vqstate { index_state = IndexState5, - msg_store_clients = undefined }. + a(State2 #vqstate { index_state = IndexState5, + msg_store_clients = undefined }). purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> {Q4Count, IndexState1} = @@ -425,18 +425,18 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> {Len, State1} = purge1(Q4Count, State #vqstate { q4 = queue:new(), index_state = IndexState1 }), - {Len, State1 #vqstate { len = 0, - ram_msg_count = 0, - ram_index_count = 0, - persistent_count = 0 }}. + {Len, a(State1 #vqstate { len = 0, + ram_msg_count = 0, + ram_index_count = 0, + persistent_count = 0 })}. publish(Msg, State) -> State1 = limit_ram_index(State), {_SeqId, State2} = publish(Msg, false, false, State1), - State2. + a(State2). publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> - {blank_ack, State}; + {blank_ack, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, State = #vqstate { len = 0, next_seq_id = SeqId, @@ -451,11 +451,11 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), PA1 = record_pending_ack(MsgStatus1, PA), PCount1 = PCount + one_if(IsPersistent1), - {SeqId, State1 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - pending_ack = PA1 }}. + {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + pending_ack = PA1 })}. fetch(AckRequired, State = #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, @@ -467,8 +467,8 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, case queue:out(Q4) of {empty, _Q4} -> case fetch_from_q3_to_q4(State) of - {empty, _State1} = Result -> Result; - {loaded, State1} -> fetch(AckRequired, State1) + {empty, State1} = Result -> a(State1), Result; + {loaded, State1} -> fetch(AckRequired, State1) end; {{value, MsgStatus = #msg_status { msg = Msg, guid = Guid, seq_id = SeqId, @@ -505,30 +505,30 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), Len1 = Len - 1, {{Msg, IsDelivered, AckTag, Len1}, - State #vqstate { q4 = Q4a, - ram_msg_count = RamMsgCount - 1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len1, - persistent_count = PCount1, - pending_ack = PA1 }} + a(State #vqstate { q4 = Q4a, + ram_msg_count = RamMsgCount - 1, + out_counter = OutCount + 1, + index_state = IndexState2, + len = Len1, + persistent_count = PCount1, + pending_ack = PA1 })} end. ack(AckTags, State) -> - ack(fun (_AckEntry, State1) -> State1 end, AckTags, State). + a(ack(fun (_AckEntry, State1) -> State1 end, AckTags, State)). tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, State = #vqstate { durable = IsDurable, msg_store_clients = MSCState }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), - case IsPersistent andalso IsDurable of - true -> MsgStatus = msg_status(true, undefined, Msg), - {#msg_status { msg_on_disk = true }, MSCState1} = - maybe_write_msg_to_disk(false, MsgStatus, MSCState), - State #vqstate { msg_store_clients = MSCState1 }; - false -> State - end. + a(case IsPersistent andalso IsDurable of + true -> MsgStatus = msg_status(true, undefined, Msg), + {#msg_status { msg_on_disk = true }, MSCState1} = + maybe_write_msg_to_disk(false, MsgStatus, MSCState), + State #vqstate { msg_store_clients = MSCState1 }; + false -> State + end). tx_ack(Txn, AckTags, State) -> Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), @@ -543,7 +543,7 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable }) -> persistent_guids(Pubs)); false -> ok end, - {lists:flatten(AckTags), State}. + {lists:flatten(AckTags), a(State)}. tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> %% If we are a non-durable queue, or we have no persistent pubs, @@ -555,28 +555,28 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> PersistentGuids = persistent_guids(PubsOrdered), IsTransientPubs = [] == PersistentGuids, {AckTags1, - case (not IsDurable) orelse IsTransientPubs of - true -> tx_commit_post_msg_store( - IsTransientPubs, PubsOrdered, AckTags1, Fun, State); - false -> ok = rabbit_msg_store:sync( - ?PERSISTENT_MSG_STORE, PersistentGuids, - msg_store_callback(PersistentGuids, IsTransientPubs, - PubsOrdered, AckTags1, Fun)), - State - end}. + a(case (not IsDurable) orelse IsTransientPubs of + true -> tx_commit_post_msg_store( + IsTransientPubs, PubsOrdered, AckTags1, Fun, State); + false -> ok = rabbit_msg_store:sync( + ?PERSISTENT_MSG_STORE, PersistentGuids, + msg_store_callback(PersistentGuids, IsTransientPubs, + PubsOrdered, AckTags1, Fun)), + State + end)}. requeue(AckTags, State) -> - ack(fun (#msg_status { msg = Msg }, State1) -> - {_SeqId, State2} = publish(Msg, true, false, State1), - State2; - ({IsPersistent, Guid}, State1) -> - #vqstate { msg_store_clients = MSCState } = State1, - {{ok, Msg = #basic_message{}}, MSCState1} = - read_from_msg_store(MSCState, IsPersistent, Guid), - State2 = State1 #vqstate { msg_store_clients = MSCState1 }, - {_SeqId, State3} = publish(Msg, true, true, State2), - State3 - end, AckTags, State). + a(ack(fun (#msg_status { msg = Msg }, State1) -> + {_SeqId, State2} = publish(Msg, true, false, State1), + State2; + ({IsPersistent, Guid}, State1) -> + #vqstate { msg_store_clients = MSCState } = State1, + {{ok, Msg = #basic_message{}}, MSCState1} = + read_from_msg_store(MSCState, IsPersistent, Guid), + State2 = State1 #vqstate { msg_store_clients = MSCState1 }, + {_SeqId, State3} = publish(Msg, true, true, State2), + State3 + end, AckTags, State)). len(#vqstate { len = Len }) -> Len. @@ -596,11 +596,11 @@ set_ram_duration_target(DurationTarget, end, State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1, duration_target = DurationTarget }, - case TargetRamMsgCount1 == undefined orelse - TargetRamMsgCount1 >= TargetRamMsgCount of - true -> State1; - false -> reduce_memory_use(State1) - end. + a(case TargetRamMsgCount1 == undefined orelse + TargetRamMsgCount1 >= TargetRamMsgCount of + true -> State1; + false -> reduce_memory_use(State1) + end). ram_duration(State = #vqstate { egress_rate = Egress, ingress_rate = Ingress, @@ -635,7 +635,7 @@ ram_duration(State = #vqstate { egress_rate = Egress, needs_sync(#vqstate { on_sync = {_, _, []} }) -> false; needs_sync(_) -> true. -sync(State) -> tx_commit_index(State). +sync(State) -> a(tx_commit_index(State)). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. @@ -667,6 +667,24 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, %% Minor helpers %%---------------------------------------------------------------------------- +a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, + len = Len, target_ram_msg_count = TargetRamMsgCount }) -> + E1 = queue:is_empty(Q1), + E2 = bpqueue:is_empty(Q2), + ED = Delta#delta.count == 0, + E3 = bpqueue:is_empty(Q3), + E4 = queue:is_empty(Q4), + TZ = TargetRamMsgCount == 0, + LZ = Len == 0, + + true = E1 or not E3, + true = E2 or not ED, + true = ED or not E3, + true = (E1 and E2 and E4) or not TZ, + true = LZ == (E3 and E4), + + State. + one_if(true ) -> 1; one_if(false) -> 0. @@ -1006,9 +1024,6 @@ fetch_from_q3_to_q4(State = #vqstate { msg_store_clients = MSCState }) -> case bpqueue:out(Q3) of {empty, _Q3} -> - 0 = DeltaCount, %% ASSERTION - true = bpqueue:is_empty(Q2), %% ASSERTION - true = queue:is_empty(Q1), %% ASSERTION {empty, State}; {{value, IndexOnDisk, MsgStatus = #msg_status { msg = undefined, guid = Guid, |
