diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 137 |
1 files changed, 63 insertions, 74 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 5b1419efab..3e66e000b2 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -451,14 +451,13 @@ publish_delivered(true, Msg = #basic_message { guid = Guid, in_counter = InCount + 1 }, {SeqId, case MsgStatus1 #msg_status.msg_on_disk of - true -> - {#msg_status { index_on_disk = true }, IndexState1} = - maybe_write_index_to_disk(false, MsgStatus1, IndexState), - State1 #vqstate { index_state = IndexState1, - pending_ack = dict:store(SeqId, {true, Guid}, - PA) }; - false -> - State1 #vqstate { pending_ack = dict:store(SeqId, MsgStatus1, PA) } + true -> {#msg_status { index_on_disk = true }, IndexState1} = + maybe_write_index_to_disk(false, MsgStatus1, IndexState), + PA1 = dict:store(SeqId, {true, Guid}, PA), + State1 #vqstate { index_state = IndexState1, + pending_ack = PA1 }; + false -> PA1 = dict:store(SeqId, MsgStatus1, PA), + State1 #vqstate { pending_ack = PA1 } end}. fetch(AckRequired, State = @@ -517,14 +516,13 @@ fetch(AckRequired, State = %% 4. If an ack is required, add something sensible to PA PA1 = case AckRequired of - true -> - Entry = - case MsgOnDisk of - true -> {IsPersistent, Guid}; - false -> MsgStatus #msg_status { - is_delivered = true } - end, - dict:store(SeqId, Entry, PA); + true -> Entry = + case MsgOnDisk of + true -> {IsPersistent, Guid}; + false -> MsgStatus #msg_status { + is_delivered = true } + end, + dict:store(SeqId, Entry, PA); false -> PA end, @@ -611,15 +609,13 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> IsTransientPubs = [] == PersistentGuids, {AckTags1, case (not IsDurable) orelse IsTransientPubs of - true -> - tx_commit_post_msg_store( - IsTransientPubs, PubsOrdered, AckTags1, Fun, State); - false -> - ok = rabbit_msg_store:sync( - ?PERSISTENT_MSG_STORE, PersistentGuids, - msg_store_callback(PersistentGuids, IsTransientPubs, - PubsOrdered, AckTags1, Fun)), - State + true -> tx_commit_post_msg_store( + IsTransientPubs, PubsOrdered, AckTags1, Fun, State); + false -> ok = rabbit_msg_store:sync( + ?PERSISTENT_MSG_STORE, PersistentGuids, + msg_store_callback(PersistentGuids, IsTransientPubs, + PubsOrdered, AckTags1, Fun)), + State end}. requeue(AckTags, State) -> @@ -681,9 +677,9 @@ set_ram_duration_target( Rate = AvgEgressRate + AvgIngressRate, TargetRamMsgCount1 = case DurationTarget of - infinity -> undefined; + infinity -> undefined; undefined -> undefined; - _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec + _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec end, State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1, duration_target = DurationTarget }, @@ -895,9 +891,7 @@ combine_deltas(#delta { start_seq_id = StartLow, count = CountLow, #delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }. beta_fold_no_index_on_disk(Fun, Init, Q) -> - bpqueue:foldr(fun (_Prefix, Value, Acc) -> - Fun(Value, Acc) - end, Init, Q). + bpqueue:foldr(fun (_Prefix, Value, Acc) -> Fun(Value, Acc) end, Init, Q). permitted_ram_index_count(#vqstate { len = 0 }) -> undefined; @@ -905,13 +899,11 @@ permitted_ram_index_count(#vqstate { len = Len, q2 = Q2, q3 = Q3, delta = #delta { count = DeltaCount } }) -> AlphaBetaLen = Len - DeltaCount, case AlphaBetaLen == 0 of - true -> - undefined; - false -> - BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3), - %% the fraction of the alphas+betas that are betas - BetaFrac = BetaLen / AlphaBetaLen, - BetaLen - trunc(BetaFrac * BetaLen) + true -> undefined; + false -> BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3), + %% the fraction of the alphas+betas that are betas + BetaFrac = BetaLen / AlphaBetaLen, + BetaLen - trunc(BetaFrac * BetaLen) end. @@ -1008,20 +1000,19 @@ delete1(TransientThreshold, NextSeqId, DeltaSeqId, IndexState) -> purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) -> case bpqueue:is_empty(Q3) of - true -> - {Q1Count, IndexState1} = - remove_queue_entries( - fun rabbit_misc:queue_fold/3, State #vqstate.q1, IndexState), - {Count + Q1Count, State #vqstate { q1 = queue:new(), - index_state = IndexState1 }}; - false -> - {Q3Count, IndexState1} = - remove_queue_entries( - fun beta_fold_no_index_on_disk/3, Q3, IndexState), - purge1(Count + Q3Count, - maybe_deltas_to_betas( - State #vqstate { index_state = IndexState1, - q3 = bpqueue:new() })) + true -> {Q1Count, IndexState1} = + remove_queue_entries(fun rabbit_misc:queue_fold/3, + State #vqstate.q1, IndexState), + {Count + Q1Count, + State #vqstate { q1 = queue:new(), + index_state = IndexState1 }}; + false -> {Q3Count, IndexState1} = + remove_queue_entries(fun beta_fold_no_index_on_disk/3, + Q3, IndexState), + purge1(Count + Q3Count, + maybe_deltas_to_betas( + State #vqstate { index_state = IndexState1, + q3 = bpqueue:new() })) end. remove_queue_entries(Fold, Q, IndexState) -> @@ -1031,12 +1022,10 @@ remove_queue_entries(Fold, Q, IndexState) -> ok = dict:fold(fun (MsgStore, Guids, ok) -> rabbit_msg_store:remove(MsgStore, Guids) end, ok, GuidsByStore), - IndexState2 = - case SeqIds of - [] -> IndexState1; - _ -> rabbit_queue_index:ack(SeqIds, IndexState1) - end, - {Count, IndexState2}. + {Count, case SeqIds of + [] -> IndexState1; + _ -> rabbit_queue_index:ack(SeqIds, IndexState1) + end}. remove_queue_entries1( #msg_status { guid = Guid, seq_id = SeqId, @@ -1045,12 +1034,11 @@ remove_queue_entries1( {CountN, GuidsByStore, SeqIdsAcc, IndexStateN}) -> GuidsByStore1 = case {MsgOnDisk, IsPersistent} of - {true, true} -> - rabbit_misc:dict_cons(?PERSISTENT_MSG_STORE, Guid, GuidsByStore); - {true, false} -> - rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE, Guid, GuidsByStore); - {false, _} -> - GuidsByStore + {true, true} -> rabbit_misc:dict_cons(?PERSISTENT_MSG_STORE, + Guid, GuidsByStore); + {true, false} -> rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE, + Guid, GuidsByStore); + {false, _} -> GuidsByStore end, SeqIdsAcc1 = case IndexOnDisk of true -> [SeqId | SeqIdsAcc]; @@ -1236,10 +1224,10 @@ store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true, q3 = Q3 }) -> MsgStatus1 = MsgStatus #msg_status { msg = undefined }, case DeltaCount == 0 of - true -> - State #vqstate { q3 = bpqueue:in(IndexOnDisk, MsgStatus1, Q3) }; - false -> - State #vqstate { q2 = bpqueue:in(IndexOnDisk, MsgStatus1, Q2) } + true -> State #vqstate { q3 = bpqueue:in(IndexOnDisk, MsgStatus1, + Q3) }; + false -> State #vqstate { q2 = bpqueue:in(IndexOnDisk, MsgStatus1, + Q2) } end. find_msg_store(true) -> ?PERSISTENT_MSG_STORE; @@ -1309,12 +1297,12 @@ limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) -> Reduction = lists:min([RamIndexCount - Permitted, ?RAM_INDEX_BATCH_SIZE]), case Reduction < ?RAM_INDEX_BATCH_SIZE of - true -> - State; - false -> - {Reduction1, State1} = limit_q2_ram_index(Reduction, State), - {_Red2, State2} = limit_q3_ram_index(Reduction1, State1), - State2 + true -> State; + false -> {Reduction1, State1} = + limit_q2_ram_index(Reduction, State), + {_Red2, State2} = + limit_q3_ram_index(Reduction1, State1), + State2 end; _ -> State @@ -1531,7 +1519,8 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> {{value, IndexOnDisk, MsgStatus}, Qa} -> {RamIndexCount1, IndexState1} = case IndexOnDisk of - true -> {RamIndexCount, IndexState}; + true -> + {RamIndexCount, IndexState}; false -> {#msg_status { index_on_disk = true }, IndexState2} = maybe_write_index_to_disk(true, MsgStatus, |
