summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-09 18:17:45 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-09 18:17:45 +0100
commit4fc3412a0dbd940a6b8f5974fbdfb4e7de974e6b (patch)
tree05e6a57add1e8208498af97ba12797a9c4c2deaa
parentd397c015d90be9f08054259b7b87ffb6184c77a8 (diff)
downloadrabbitmq-server-git-4fc3412a0dbd940a6b8f5974fbdfb4e7de974e6b.tar.gz
Beautiful!
-rw-r--r--src/rabbit_variable_queue.erl41
1 files changed, 40 insertions, 1 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ae9ca375d9..b0bfd8cdf6 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -32,7 +32,7 @@
-module(rabbit_variable_queue).
-export([init/1, publish/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1,
- fetch/1, ack/2, len/1, is_empty/1, maybe_start_prefetcher/1]).
+ fetch/1, ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1]).
%%----------------------------------------------------------------------------
@@ -261,8 +261,47 @@ ack(AckTags, State = #vqstate { index_state = IndexState }) ->
ok = rabbit_msg_store:remove(MsgIds),
State #vqstate { index_state = IndexState1 }.
+purge(State = #vqstate { q3 = Q3, prefetcher = undefined,
+ index_state = IndexState }) ->
+ case queue:is_empty(Q3) of
+ true -> State #vqstate { q1 = queue:new(), q4 = queue:new() };
+ false -> IndexState1 = remove_betas(Q3, IndexState),
+ purge(maybe_load_next_segment(
+ State #vqstate { index_state = IndexState1 }))
+ end;
+purge(State) ->
+ purge(drain_prefetcher(stop, State)).
+
%%----------------------------------------------------------------------------
+remove_betas(Q, IndexState) ->
+ {MsgIds, SeqIds, IndexState1} =
+ lists:foldl(
+ fun (#beta { msg_id = MsgId,
+ seq_id = SeqId,
+ is_delivered = IsDelivered,
+ index_on_disk = IndexOnDisk },
+ {MsgIdsAcc, SeqIdsAcc, IndexStateN}) ->
+ IndexStateN1 = case IndexOnDisk andalso not IsDelivered of
+ true -> rabbit_queue_index:write_delivered(
+ SeqId, IndexStateN);
+ false -> IndexStateN
+ end,
+ SeqIdsAcc1 = case IndexOnDisk of
+ true -> [SeqId | SeqIdsAcc];
+ false -> SeqIdsAcc
+ end,
+ {[MsgId | MsgIdsAcc], SeqIdsAcc1, IndexStateN1}
+ end, {[], [], IndexState}, lists:reverse(queue:to_list(Q))),
+ ok = case MsgIds of
+ [] -> ok;
+ _ -> rabbit_msg_store:remove(MsgIds)
+ end,
+ case SeqIds of
+ [] -> IndexState1;
+ _ -> rabbit_queue_index:write_acks(SeqIds, IndexState1)
+ end.
+
publish(msg, Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
SeqId, IsDelivered, State = #vqstate { index_state = IndexState,