diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-12 13:33:56 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-12 13:33:56 +0100 |
| commit | 95851b9ab2968536138371d79cb01633dde4c404 (patch) | |
| tree | 2f7a716208fa1088e2e41c29235a8acd6bd9afb5 | |
| parent | 2b503a75d5a6dd551174f30a23e68643ef6d55ab (diff) | |
| download | rabbitmq-server-git-95851b9ab2968536138371d79cb01633dde4c404.tar.gz | |
implemented delete. This is slightly less than pretty as, after doing the purge, we have to walk through the index on disk in order to pull up msgs which have been delivered and not acked
| -rw-r--r-- | src/rabbit_variable_queue.erl | 47 |
1 files changed, 38 insertions, 9 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index cd47f669b3..a9a43fd898 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -31,8 +31,9 @@ -module(rabbit_variable_queue). --export([init/1, publish/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1, - fetch/1, ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1]). +-export([init/1, publish/3, set_queue_ram_duration_target/2, + remeasure_egress_rate/1, fetch/1, ack/2, len/1, is_empty/1, + maybe_start_prefetcher/1, purge/1, delete/1]). %%---------------------------------------------------------------------------- @@ -271,15 +272,44 @@ purge(State = #vqstate { prefetcher = undefined, q4 = Q4, purge(State) -> purge(drain_prefetcher(stop, State)). +%% the only difference between purge and delete is that delete also +%% needs to delete everything that's been delivered and not ack'd. +delete(State) -> + {PurgeCount, State1 = #vqstate { index_state = IndexState }} = purge(State), + case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState) + of + {N, N} -> + {PurgeCount, State1}; + {GammaSeqId, NextSeqId} -> + {DeleteCount, IndexState1} = + delete1(NextSeqId, 0, GammaSeqId, IndexState), + {PurgeCount + DeleteCount, + State1 #vqstate { index_state = IndexState1 }} + end. + %%---------------------------------------------------------------------------- +delete1(NextSeqId, Count, GammaSeqId, IndexState) + when GammaSeqId >= NextSeqId -> + {Count, IndexState}; +delete1(NextSeqId, Count, GammaSeqId, IndexState) -> + Gamma1SeqId = GammaSeqId + rabbit_queue_index:segment_size(), + case rabbit_queue_index:read_segment_entries(GammaSeqId, IndexState) of + {[], IndexState1} -> + delete1(NextSeqId, Count, Gamma1SeqId, IndexState1); + {List, IndexState1} -> + Q = betas_from_segment_entries(List), + {QCount, IndexState2} = remove_queue_entries(Q, IndexState1), + delete1(NextSeqId, Count + QCount, Gamma1SeqId, IndexState2) + end. + purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) -> case queue:is_empty(Q3) of true -> {Q1Count, IndexState1} = remove_queue_entries(State #vqstate.q1, IndexState), {Count + Q1Count, State #vqstate { q1 = queue:new(), - index_state = IndexState1 }}; + index_state = IndexState1 }}; false -> {Q3Count, IndexState1} = remove_queue_entries(Q3, IndexState), purge1(Count + Q3Count, @@ -444,12 +474,11 @@ maybe_load_next_segment(State = end. betas_from_segment_entries(List) -> - queue:from_list(lists:map(fun ({MsgId, SeqId, IsPersistent, IsDelivered}) -> - #beta { msg_id = MsgId, seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - index_on_disk = true } - end, List)). + queue:from_list([#beta { msg_id = MsgId, seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + index_on_disk = true } + || {MsgId, SeqId, IsPersistent, IsDelivered} <- List]). read_index_segment(SeqId, IndexState) -> SeqId1 = SeqId + rabbit_queue_index:segment_size(), |
