summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-11-23 14:43:23 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-11-23 14:43:23 +0000
commit5c74690915d038fd4c9041ba9191935cecbb29b4 (patch)
tree8f9d617dfacbcae3b952643f8df57d005e9111d6
parentfeef8a1342821b7d8b0810f23a2b3e4401992de1 (diff)
downloadrabbitmq-server-git-5c74690915d038fd4c9041ba9191935cecbb29b4.tar.gz
refactor: vq:ram_ack_index doesn't need to be a gb_tree
a gb_set suffices
-rw-r--r--src/rabbit_variable_queue.erl23
1 files changed, 11 insertions, 12 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7813aa7ba3..298c68b6a1 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -348,7 +348,7 @@
q4 :: ?QUEUE:?QUEUE(),
next_seq_id :: seq_id(),
pending_ack :: gb_tree(),
- ram_ack_index :: gb_tree(),
+ ram_ack_index :: gb_set(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
@@ -731,7 +731,7 @@ ram_duration(State = #vqstate {
{AvgAckIngressRate, AckIngress1} =
update_rate(Now, AckTimestamp, AckInCount, AckIngress),
- RamAckCount = gb_trees:size(RamAckIndex),
+ RamAckCount = gb_sets:size(RamAckIndex),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
@@ -810,7 +810,7 @@ status(#vqstate {
{pending_acks , gb_trees:size(PA)},
{target_ram_count , TargetRamCount},
{ram_msg_count , RamMsgCount},
- {ram_ack_count , gb_trees:size(RAI)},
+ {ram_ack_count , gb_sets:size(RAI)},
{next_seq_id , NextSeqId},
{persistent_count , PersistentCount},
{avg_ingress_rate , AvgIngressRate},
@@ -1015,7 +1015,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
q4 = ?QUEUE:new(),
next_seq_id = NextSeqId,
pending_ack = gb_trees:empty(),
- ram_ack_index = gb_trees:empty(),
+ ram_ack_index = gb_sets:empty(),
index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient},
durable = IsDurable,
@@ -1233,7 +1233,6 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
%%----------------------------------------------------------------------------
record_pending_ack(#msg_status { seq_id = SeqId,
- msg_id = MsgId,
msg_on_disk = MsgOnDisk } = MsgStatus,
State = #vqstate { pending_ack = PA,
ram_ack_index = RAI,
@@ -1241,7 +1240,7 @@ record_pending_ack(#msg_status { seq_id = SeqId,
{AckEntry, RAI1} =
case MsgOnDisk of
true -> {m(trim_msg_status(MsgStatus)), RAI};
- false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)}
+ false -> {MsgStatus, gb_sets:insert(SeqId, RAI)}
end,
State #vqstate { pending_ack = gb_trees:insert(SeqId, AckEntry, PA),
ram_ack_index = RAI1,
@@ -1251,7 +1250,7 @@ remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA,
ram_ack_index = RAI }) ->
{gb_trees:get(SeqId, PA),
State #vqstate { pending_ack = gb_trees:delete(SeqId, PA),
- ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}.
+ ram_ack_index = gb_sets:delete_any(SeqId, RAI) }}.
purge_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
@@ -1262,7 +1261,7 @@ purge_pending_ack(KeepPersistent,
accumulate_ack(MsgStatus, Acc)
end, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = gb_trees:empty(),
- ram_ack_index = gb_trees:empty() },
+ ram_ack_index = gb_sets:empty() },
case KeepPersistent of
true -> case orddict:find(false, MsgIdsByStore) of
error -> State1;
@@ -1462,7 +1461,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
}) ->
{Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =
- case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
+ case chunk_size(RamMsgCount + gb_sets:size(RamAckIndex),
TargetRamCount) of
0 -> {false, State};
%% Reduce memory of pending acks and alphas. The order is
@@ -1490,12 +1489,12 @@ limit_ram_acks(0, State) ->
{0, State};
limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
ram_ack_index = RAI }) ->
- case gb_trees:is_empty(RAI) of
+ case gb_sets:is_empty(RAI) of
true ->
{Quota, State};
false ->
- {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI),
- MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} =
+ {SeqId, RAI1} = gb_sets:take_largest(RAI),
+ MsgStatus = #msg_status { is_persistent = false} =
gb_trees:get(SeqId, PA),
{MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),