summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-12 13:33:56 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-12 13:33:56 +0100
commit95851b9ab2968536138371d79cb01633dde4c404 (patch)
tree2f7a716208fa1088e2e41c29235a8acd6bd9afb5
parent2b503a75d5a6dd551174f30a23e68643ef6d55ab (diff)
downloadrabbitmq-server-git-95851b9ab2968536138371d79cb01633dde4c404.tar.gz
implemented delete. This is slightly less than pretty as, after doing the purge, we have to walk through the index on disk in order to pull up msgs which have been delivered and not acked
-rw-r--r--src/rabbit_variable_queue.erl47
1 files changed, 38 insertions, 9 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index cd47f669b3..a9a43fd898 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -31,8 +31,9 @@
-module(rabbit_variable_queue).
--export([init/1, publish/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1,
- fetch/1, ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1]).
+-export([init/1, publish/3, set_queue_ram_duration_target/2,
+ remeasure_egress_rate/1, fetch/1, ack/2, len/1, is_empty/1,
+ maybe_start_prefetcher/1, purge/1, delete/1]).
%%----------------------------------------------------------------------------
@@ -271,15 +272,44 @@ purge(State = #vqstate { prefetcher = undefined, q4 = Q4,
purge(State) ->
purge(drain_prefetcher(stop, State)).
+%% the only difference between purge and delete is that delete also
+%% needs to delete everything that's been delivered and not ack'd.
+delete(State) ->
+ {PurgeCount, State1 = #vqstate { index_state = IndexState }} = purge(State),
+ case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState)
+ of
+ {N, N} ->
+ {PurgeCount, State1};
+ {GammaSeqId, NextSeqId} ->
+ {DeleteCount, IndexState1} =
+ delete1(NextSeqId, 0, GammaSeqId, IndexState),
+ {PurgeCount + DeleteCount,
+ State1 #vqstate { index_state = IndexState1 }}
+ end.
+
%%----------------------------------------------------------------------------
+delete1(NextSeqId, Count, GammaSeqId, IndexState)
+ when GammaSeqId >= NextSeqId ->
+ {Count, IndexState};
+delete1(NextSeqId, Count, GammaSeqId, IndexState) ->
+ Gamma1SeqId = GammaSeqId + rabbit_queue_index:segment_size(),
+ case rabbit_queue_index:read_segment_entries(GammaSeqId, IndexState) of
+ {[], IndexState1} ->
+ delete1(NextSeqId, Count, Gamma1SeqId, IndexState1);
+ {List, IndexState1} ->
+ Q = betas_from_segment_entries(List),
+ {QCount, IndexState2} = remove_queue_entries(Q, IndexState1),
+ delete1(NextSeqId, Count + QCount, Gamma1SeqId, IndexState2)
+ end.
+
purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) ->
case queue:is_empty(Q3) of
true ->
{Q1Count, IndexState1} =
remove_queue_entries(State #vqstate.q1, IndexState),
{Count + Q1Count, State #vqstate { q1 = queue:new(),
- index_state = IndexState1 }};
+ index_state = IndexState1 }};
false ->
{Q3Count, IndexState1} = remove_queue_entries(Q3, IndexState),
purge1(Count + Q3Count,
@@ -444,12 +474,11 @@ maybe_load_next_segment(State =
end.
betas_from_segment_entries(List) ->
- queue:from_list(lists:map(fun ({MsgId, SeqId, IsPersistent, IsDelivered}) ->
- #beta { msg_id = MsgId, seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- index_on_disk = true }
- end, List)).
+ queue:from_list([#beta { msg_id = MsgId, seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ index_on_disk = true }
+ || {MsgId, SeqId, IsPersistent, IsDelivered} <- List]).
read_index_segment(SeqId, IndexState) ->
SeqId1 = SeqId + rabbit_queue_index:segment_size(),