diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 50 |
1 files changed, 21 insertions, 29 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 031ed882e6..88b765d6b6 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -421,11 +421,10 @@ delete_and_terminate(State) -> msg_store_clients = undefined }). purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> - {Q4Count, IndexState1} = - remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4, IndexState), - {Len, State1} = - purge1(Q4Count, State #vqstate { q4 = queue:new(), - index_state = IndexState1 }), + IndexState1 = remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4, + IndexState), + State1 = purge1(State #vqstate { q4 = queue:new(), + index_state = IndexState1 }), {Len, a(State1 #vqstate { len = 0, ram_msg_count = 0, ram_index_count = 0, @@ -976,46 +975,39 @@ delete1(TransientThreshold, NextSeqId, DeltaSeqId, IndexState) -> [] -> IndexState1; _ -> {Q, IndexState3} = betas_from_segment_entries( List, TransientThreshold, IndexState1), - {_Count, IndexState4} = - remove_queue_entries( - fun beta_fold_no_index_on_disk/3, Q, IndexState3), - IndexState4 + remove_queue_entries(fun beta_fold_no_index_on_disk/3, Q, + IndexState3) end, delete1(TransientThreshold, NextSeqId, Again, IndexState2). -purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) -> +purge1(State = #vqstate { q1 = Q1, q3 = Q3, index_state = IndexState }) -> case bpqueue:is_empty(Q3) of - true -> {Q1Count, IndexState1} = - remove_queue_entries(fun rabbit_misc:queue_fold/3, - State #vqstate.q1, IndexState), - {Count + Q1Count, - State #vqstate { q1 = queue:new(), - index_state = IndexState1 }}; - false -> {Q3Count, IndexState1} = - remove_queue_entries(fun beta_fold_no_index_on_disk/3, - Q3, IndexState), - purge1(Count + Q3Count, - maybe_deltas_to_betas( + true -> IndexState1 = + remove_queue_entries(fun rabbit_misc:queue_fold/3, Q1, + IndexState), + State #vqstate { q1 = queue:new(), + index_state = IndexState1 }; + false -> IndexState1 = + remove_queue_entries(fun beta_fold_no_index_on_disk/3, Q3, + IndexState), + purge1(maybe_deltas_to_betas( State #vqstate { q3 = bpqueue:new(), index_state = IndexState1 })) end. remove_queue_entries(Fold, Q, IndexState) -> - {Count, GuidsByStore, SeqIds, IndexState1} = - Fold(fun remove_queue_entries1/2, {0, dict:new(), [], IndexState}, Q), + {GuidsByStore, SeqIds, IndexState1} = + Fold(fun remove_queue_entries1/2, {dict:new(), [], IndexState}, Q), ok = dict:fold(fun (MsgStore, Guids, ok) -> rabbit_msg_store:remove(MsgStore, Guids) end, ok, GuidsByStore), - {Count, case SeqIds of - [] -> IndexState1; - _ -> rabbit_queue_index:ack(SeqIds, IndexState1) - end}. + rabbit_queue_index:ack(SeqIds, IndexState1). remove_queue_entries1( #msg_status { guid = Guid, seq_id = SeqId, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, - {Count, GuidsByStore, SeqIdsAcc, IndexState}) -> + {GuidsByStore, SeqIdsAcc, IndexState}) -> GuidsByStore1 = case MsgOnDisk of true -> rabbit_misc:dict_cons( find_msg_store(IsPersistent), @@ -1029,7 +1021,7 @@ remove_queue_entries1( IndexState1 = maybe_write_delivered( IndexOnDisk andalso not IsDelivered, SeqId, IndexState), - {Count + 1, GuidsByStore1, SeqIdsAcc1, IndexState1}. + {GuidsByStore1, SeqIdsAcc1, IndexState1}. fetch_from_q3_to_q4(State = #vqstate { q1 = Q1, |
