summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl47
1 files changed, 38 insertions, 9 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index cd47f669b3..a9a43fd898 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -31,8 +31,9 @@
-module(rabbit_variable_queue).
--export([init/1, publish/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1,
- fetch/1, ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1]).
+-export([init/1, publish/3, set_queue_ram_duration_target/2,
+ remeasure_egress_rate/1, fetch/1, ack/2, len/1, is_empty/1,
+ maybe_start_prefetcher/1, purge/1, delete/1]).
%%----------------------------------------------------------------------------
@@ -271,15 +272,44 @@ purge(State = #vqstate { prefetcher = undefined, q4 = Q4,
purge(State) ->
purge(drain_prefetcher(stop, State)).
+%% the only difference between purge and delete is that delete also
+%% needs to delete everything that's been delivered and not ack'd.
+delete(State) ->
+ {PurgeCount, State1 = #vqstate { index_state = IndexState }} = purge(State),
+ case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState)
+ of
+ {N, N} ->
+ {PurgeCount, State1};
+ {GammaSeqId, NextSeqId} ->
+ {DeleteCount, IndexState1} =
+ delete1(NextSeqId, 0, GammaSeqId, IndexState),
+ {PurgeCount + DeleteCount,
+ State1 #vqstate { index_state = IndexState1 }}
+ end.
+
%%----------------------------------------------------------------------------
+delete1(NextSeqId, Count, GammaSeqId, IndexState)
+ when GammaSeqId >= NextSeqId ->
+ {Count, IndexState};
+delete1(NextSeqId, Count, GammaSeqId, IndexState) ->
+ Gamma1SeqId = GammaSeqId + rabbit_queue_index:segment_size(),
+ case rabbit_queue_index:read_segment_entries(GammaSeqId, IndexState) of
+ {[], IndexState1} ->
+ delete1(NextSeqId, Count, Gamma1SeqId, IndexState1);
+ {List, IndexState1} ->
+ Q = betas_from_segment_entries(List),
+ {QCount, IndexState2} = remove_queue_entries(Q, IndexState1),
+ delete1(NextSeqId, Count + QCount, Gamma1SeqId, IndexState2)
+ end.
+
purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) ->
case queue:is_empty(Q3) of
true ->
{Q1Count, IndexState1} =
remove_queue_entries(State #vqstate.q1, IndexState),
{Count + Q1Count, State #vqstate { q1 = queue:new(),
- index_state = IndexState1 }};
+ index_state = IndexState1 }};
false ->
{Q3Count, IndexState1} = remove_queue_entries(Q3, IndexState),
purge1(Count + Q3Count,
@@ -444,12 +474,11 @@ maybe_load_next_segment(State =
end.
betas_from_segment_entries(List) ->
- queue:from_list(lists:map(fun ({MsgId, SeqId, IsPersistent, IsDelivered}) ->
- #beta { msg_id = MsgId, seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- index_on_disk = true }
- end, List)).
+ queue:from_list([#beta { msg_id = MsgId, seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ index_on_disk = true }
+ || {MsgId, SeqId, IsPersistent, IsDelivered} <- List]).
read_index_segment(SeqId, IndexState) ->
SeqId1 = SeqId + rabbit_queue_index:segment_size(),