diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 27 |
1 files changed, 13 insertions, 14 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7131714b2a..94c0913dfc 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -321,7 +321,7 @@ q3 :: bpqueue:bpqueue(), q4 :: queue(), next_seq_id :: seq_id(), - pending_ack :: dict(), + pending_ack :: gb_tree(), ram_ack_index :: gb_tree(), index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, @@ -731,7 +731,7 @@ status(#vqstate { {q3 , bpqueue:len(Q3)}, {q4 , queue:len(Q4)}, {len , Len}, - {pending_acks , dict:size(PA)}, + {pending_acks , gb_trees:size(PA)}, {target_ram_count , TargetRamCount}, {ram_msg_count , RamMsgCount}, {ram_ack_count , gb_trees:size(RAI)}, @@ -868,7 +868,7 @@ betas_from_index_entries(List, TransientThreshold, PA, IndexState) -> true -> {Filtered1, cons_if(not IsDelivered, SeqId, Delivers1), [SeqId | Acks1]}; - false -> case dict:is_key(SeqId, PA) of + false -> case gb_trees:is_defined(SeqId, PA) of false -> {[m(#msg_status { seq_id = SeqId, msg_id = MsgId, @@ -945,7 +945,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, q3 = bpqueue:new(), q4 = queue:new(), next_seq_id = NextSeqId, - pending_ack = dict:new(), + pending_ack = gb_trees:empty(), ram_ack_index = gb_trees:empty(), index_state = IndexState1, msg_store_clients = {PersistentClient, TransientClient}, @@ -1203,15 +1203,14 @@ record_pending_ack(#msg_status { seq_id = SeqId, true -> {m(trim_msg_status(MsgStatus)), RAI}; false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)} end, - PA1 = dict:store(SeqId, AckEntry, PA), - State #vqstate { pending_ack = PA1, + State #vqstate { pending_ack = gb_trees:insert(SeqId, AckEntry, PA), ram_ack_index = RAI1, ack_in_counter = AckInCount + 1}. remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA, ram_ack_index = RAI }) -> - {dict:fetch(SeqId, PA), - State #vqstate { pending_ack = dict:erase(SeqId, PA), + {gb_trees:get(SeqId, PA), + State #vqstate { pending_ack = gb_trees:delete(SeqId, PA), ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}. purge_pending_ack(KeepPersistent, @@ -1219,10 +1218,10 @@ purge_pending_ack(KeepPersistent, index_state = IndexState, msg_store_clients = MSCState }) -> {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} = - dict:fold(fun (_SeqId, MsgStatus, Acc) -> - accumulate_ack(MsgStatus, Acc) - end, accumulate_ack_init(), PA), - State1 = State #vqstate { pending_ack = dict:new(), + rabbit_misc:gb_trees_fold(fun (_SeqId, MsgStatus, Acc) -> + accumulate_ack(MsgStatus, Acc) + end, accumulate_ack_init(), PA), + State1 = State #vqstate { pending_ack = gb_trees:empty(), ram_ack_index = gb_trees:empty() }, case KeepPersistent of true -> case orddict:find(false, MsgIdsByStore) of @@ -1517,10 +1516,10 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, false -> {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI), MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} = - dict:fetch(SeqId, PA), + gb_trees:get(SeqId, PA), {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), - PA1 = dict:store(SeqId, m(trim_msg_status(MsgStatus1)), PA), + PA1 = gb_trees:update(SeqId, m(trim_msg_status(MsgStatus1)), PA), limit_ram_acks(Quota - 1, State1 #vqstate { pending_ack = PA1, ram_ack_index = RAI1 }) |
