summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl27
1 files changed, 13 insertions, 14 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7131714b2a..94c0913dfc 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -321,7 +321,7 @@
q3 :: bpqueue:bpqueue(),
q4 :: queue(),
next_seq_id :: seq_id(),
- pending_ack :: dict(),
+ pending_ack :: gb_tree(),
ram_ack_index :: gb_tree(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
@@ -731,7 +731,7 @@ status(#vqstate {
{q3 , bpqueue:len(Q3)},
{q4 , queue:len(Q4)},
{len , Len},
- {pending_acks , dict:size(PA)},
+ {pending_acks , gb_trees:size(PA)},
{target_ram_count , TargetRamCount},
{ram_msg_count , RamMsgCount},
{ram_ack_count , gb_trees:size(RAI)},
@@ -868,7 +868,7 @@ betas_from_index_entries(List, TransientThreshold, PA, IndexState) ->
true -> {Filtered1,
cons_if(not IsDelivered, SeqId, Delivers1),
[SeqId | Acks1]};
- false -> case dict:is_key(SeqId, PA) of
+ false -> case gb_trees:is_defined(SeqId, PA) of
false -> {[m(#msg_status {
seq_id = SeqId,
msg_id = MsgId,
@@ -945,7 +945,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
q3 = bpqueue:new(),
q4 = queue:new(),
next_seq_id = NextSeqId,
- pending_ack = dict:new(),
+ pending_ack = gb_trees:empty(),
ram_ack_index = gb_trees:empty(),
index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient},
@@ -1203,15 +1203,14 @@ record_pending_ack(#msg_status { seq_id = SeqId,
true -> {m(trim_msg_status(MsgStatus)), RAI};
false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)}
end,
- PA1 = dict:store(SeqId, AckEntry, PA),
- State #vqstate { pending_ack = PA1,
+ State #vqstate { pending_ack = gb_trees:insert(SeqId, AckEntry, PA),
ram_ack_index = RAI1,
ack_in_counter = AckInCount + 1}.
remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA,
ram_ack_index = RAI }) ->
- {dict:fetch(SeqId, PA),
- State #vqstate { pending_ack = dict:erase(SeqId, PA),
+ {gb_trees:get(SeqId, PA),
+ State #vqstate { pending_ack = gb_trees:delete(SeqId, PA),
ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}.
purge_pending_ack(KeepPersistent,
@@ -1219,10 +1218,10 @@ purge_pending_ack(KeepPersistent,
index_state = IndexState,
msg_store_clients = MSCState }) ->
{IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
- dict:fold(fun (_SeqId, MsgStatus, Acc) ->
- accumulate_ack(MsgStatus, Acc)
- end, accumulate_ack_init(), PA),
- State1 = State #vqstate { pending_ack = dict:new(),
+ rabbit_misc:gb_trees_fold(fun (_SeqId, MsgStatus, Acc) ->
+ accumulate_ack(MsgStatus, Acc)
+ end, accumulate_ack_init(), PA),
+ State1 = State #vqstate { pending_ack = gb_trees:empty(),
ram_ack_index = gb_trees:empty() },
case KeepPersistent of
true -> case orddict:find(false, MsgIdsByStore) of
@@ -1517,10 +1516,10 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
false ->
{SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI),
MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} =
- dict:fetch(SeqId, PA),
+ gb_trees:get(SeqId, PA),
{MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
- PA1 = dict:store(SeqId, m(trim_msg_status(MsgStatus1)), PA),
+ PA1 = gb_trees:update(SeqId, m(trim_msg_status(MsgStatus1)), PA),
limit_ram_acks(Quota - 1,
State1 #vqstate { pending_ack = PA1,
ram_ack_index = RAI1 })