diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-23 14:43:23 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-23 14:43:23 +0000 |
| commit | 5c74690915d038fd4c9041ba9191935cecbb29b4 (patch) | |
| tree | 8f9d617dfacbcae3b952643f8df57d005e9111d6 | |
| parent | feef8a1342821b7d8b0810f23a2b3e4401992de1 (diff) | |
| download | rabbitmq-server-git-5c74690915d038fd4c9041ba9191935cecbb29b4.tar.gz | |
refactor: vq:ram_ack_index doesn't need to be a gb_tree
a gb_set suffices
| -rw-r--r-- | src/rabbit_variable_queue.erl | 23 |
1 files changed, 11 insertions, 12 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7813aa7ba3..298c68b6a1 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -348,7 +348,7 @@ q4 :: ?QUEUE:?QUEUE(), next_seq_id :: seq_id(), pending_ack :: gb_tree(), - ram_ack_index :: gb_tree(), + ram_ack_index :: gb_set(), index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, @@ -731,7 +731,7 @@ ram_duration(State = #vqstate { {AvgAckIngressRate, AckIngress1} = update_rate(Now, AckTimestamp, AckInCount, AckIngress), - RamAckCount = gb_trees:size(RamAckIndex), + RamAckCount = gb_sets:size(RamAckIndex), Duration = %% msgs+acks / (msgs+acks/sec) == sec case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso @@ -810,7 +810,7 @@ status(#vqstate { {pending_acks , gb_trees:size(PA)}, {target_ram_count , TargetRamCount}, {ram_msg_count , RamMsgCount}, - {ram_ack_count , gb_trees:size(RAI)}, + {ram_ack_count , gb_sets:size(RAI)}, {next_seq_id , NextSeqId}, {persistent_count , PersistentCount}, {avg_ingress_rate , AvgIngressRate}, @@ -1015,7 +1015,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, q4 = ?QUEUE:new(), next_seq_id = NextSeqId, pending_ack = gb_trees:empty(), - ram_ack_index = gb_trees:empty(), + ram_ack_index = gb_sets:empty(), index_state = IndexState1, msg_store_clients = {PersistentClient, TransientClient}, durable = IsDurable, @@ -1233,7 +1233,6 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, %%---------------------------------------------------------------------------- record_pending_ack(#msg_status { seq_id = SeqId, - msg_id = MsgId, msg_on_disk = MsgOnDisk } = MsgStatus, State = #vqstate { pending_ack = PA, ram_ack_index = RAI, @@ -1241,7 +1240,7 @@ record_pending_ack(#msg_status { seq_id = SeqId, {AckEntry, RAI1} = case MsgOnDisk of true -> {m(trim_msg_status(MsgStatus)), RAI}; - false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)} + false -> {MsgStatus, gb_sets:insert(SeqId, RAI)} end, State #vqstate { pending_ack = gb_trees:insert(SeqId, AckEntry, PA), ram_ack_index = RAI1, @@ -1251,7 +1250,7 @@ remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA, ram_ack_index = RAI }) -> {gb_trees:get(SeqId, PA), State #vqstate { pending_ack = gb_trees:delete(SeqId, PA), - ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}. + ram_ack_index = gb_sets:delete_any(SeqId, RAI) }}. purge_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, @@ -1262,7 +1261,7 @@ purge_pending_ack(KeepPersistent, accumulate_ack(MsgStatus, Acc) end, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = gb_trees:empty(), - ram_ack_index = gb_trees:empty() }, + ram_ack_index = gb_sets:empty() }, case KeepPersistent of true -> case orddict:find(false, MsgIdsByStore) of error -> State1; @@ -1462,7 +1461,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, }) -> {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} = - case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex), + case chunk_size(RamMsgCount + gb_sets:size(RamAckIndex), TargetRamCount) of 0 -> {false, State}; %% Reduce memory of pending acks and alphas. The order is @@ -1490,12 +1489,12 @@ limit_ram_acks(0, State) -> {0, State}; limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, ram_ack_index = RAI }) -> - case gb_trees:is_empty(RAI) of + case gb_sets:is_empty(RAI) of true -> {Quota, State}; false -> - {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI), - MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} = + {SeqId, RAI1} = gb_sets:take_largest(RAI), + MsgStatus = #msg_status { is_persistent = false} = gb_trees:get(SeqId, PA), {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), |
