diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 47 |
1 files changed, 38 insertions, 9 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index cd47f669b3..a9a43fd898 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -31,8 +31,9 @@ -module(rabbit_variable_queue). --export([init/1, publish/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1, - fetch/1, ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1]). +-export([init/1, publish/3, set_queue_ram_duration_target/2, + remeasure_egress_rate/1, fetch/1, ack/2, len/1, is_empty/1, + maybe_start_prefetcher/1, purge/1, delete/1]). %%---------------------------------------------------------------------------- @@ -271,15 +272,44 @@ purge(State = #vqstate { prefetcher = undefined, q4 = Q4, purge(State) -> purge(drain_prefetcher(stop, State)). +%% the only difference between purge and delete is that delete also +%% needs to delete everything that's been delivered and not ack'd. +delete(State) -> + {PurgeCount, State1 = #vqstate { index_state = IndexState }} = purge(State), + case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState) + of + {N, N} -> + {PurgeCount, State1}; + {GammaSeqId, NextSeqId} -> + {DeleteCount, IndexState1} = + delete1(NextSeqId, 0, GammaSeqId, IndexState), + {PurgeCount + DeleteCount, + State1 #vqstate { index_state = IndexState1 }} + end. + %%---------------------------------------------------------------------------- +delete1(NextSeqId, Count, GammaSeqId, IndexState) + when GammaSeqId >= NextSeqId -> + {Count, IndexState}; +delete1(NextSeqId, Count, GammaSeqId, IndexState) -> + Gamma1SeqId = GammaSeqId + rabbit_queue_index:segment_size(), + case rabbit_queue_index:read_segment_entries(GammaSeqId, IndexState) of + {[], IndexState1} -> + delete1(NextSeqId, Count, Gamma1SeqId, IndexState1); + {List, IndexState1} -> + Q = betas_from_segment_entries(List), + {QCount, IndexState2} = remove_queue_entries(Q, IndexState1), + delete1(NextSeqId, Count + QCount, Gamma1SeqId, IndexState2) + end. + purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) -> case queue:is_empty(Q3) of true -> {Q1Count, IndexState1} = remove_queue_entries(State #vqstate.q1, IndexState), {Count + Q1Count, State #vqstate { q1 = queue:new(), - index_state = IndexState1 }}; + index_state = IndexState1 }}; false -> {Q3Count, IndexState1} = remove_queue_entries(Q3, IndexState), purge1(Count + Q3Count, @@ -444,12 +474,11 @@ maybe_load_next_segment(State = end. betas_from_segment_entries(List) -> - queue:from_list(lists:map(fun ({MsgId, SeqId, IsPersistent, IsDelivered}) -> - #beta { msg_id = MsgId, seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - index_on_disk = true } - end, List)). + queue:from_list([#beta { msg_id = MsgId, seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + index_on_disk = true } + || {MsgId, SeqId, IsPersistent, IsDelivered} <- List]). read_index_segment(SeqId, IndexState) -> SeqId1 = SeqId + rabbit_queue_index:segment_size(), |
