diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-09 18:17:45 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-09 18:17:45 +0100 |
| commit | 4fc3412a0dbd940a6b8f5974fbdfb4e7de974e6b (patch) | |
| tree | 05e6a57add1e8208498af97ba12797a9c4c2deaa | |
| parent | d397c015d90be9f08054259b7b87ffb6184c77a8 (diff) | |
| download | rabbitmq-server-git-4fc3412a0dbd940a6b8f5974fbdfb4e7de974e6b.tar.gz | |
Beautiful!
| -rw-r--r-- | src/rabbit_variable_queue.erl | 41 |
1 files changed, 40 insertions, 1 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ae9ca375d9..b0bfd8cdf6 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -32,7 +32,7 @@ -module(rabbit_variable_queue). -export([init/1, publish/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1, - fetch/1, ack/2, len/1, is_empty/1, maybe_start_prefetcher/1]). + fetch/1, ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1]). %%---------------------------------------------------------------------------- @@ -261,8 +261,47 @@ ack(AckTags, State = #vqstate { index_state = IndexState }) -> ok = rabbit_msg_store:remove(MsgIds), State #vqstate { index_state = IndexState1 }. +purge(State = #vqstate { q3 = Q3, prefetcher = undefined, + index_state = IndexState }) -> + case queue:is_empty(Q3) of + true -> State #vqstate { q1 = queue:new(), q4 = queue:new() }; + false -> IndexState1 = remove_betas(Q3, IndexState), + purge(maybe_load_next_segment( + State #vqstate { index_state = IndexState1 })) + end; +purge(State) -> + purge(drain_prefetcher(stop, State)). + %%---------------------------------------------------------------------------- +remove_betas(Q, IndexState) -> + {MsgIds, SeqIds, IndexState1} = + lists:foldl( + fun (#beta { msg_id = MsgId, + seq_id = SeqId, + is_delivered = IsDelivered, + index_on_disk = IndexOnDisk }, + {MsgIdsAcc, SeqIdsAcc, IndexStateN}) -> + IndexStateN1 = case IndexOnDisk andalso not IsDelivered of + true -> rabbit_queue_index:write_delivered( + SeqId, IndexStateN); + false -> IndexStateN + end, + SeqIdsAcc1 = case IndexOnDisk of + true -> [SeqId | SeqIdsAcc]; + false -> SeqIdsAcc + end, + {[MsgId | MsgIdsAcc], SeqIdsAcc1, IndexStateN1} + end, {[], [], IndexState}, lists:reverse(queue:to_list(Q))), + ok = case MsgIds of + [] -> ok; + _ -> rabbit_msg_store:remove(MsgIds) + end, + case SeqIds of + [] -> IndexState1; + _ -> rabbit_queue_index:write_acks(SeqIds, IndexState1) + end. + publish(msg, Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, SeqId, IsDelivered, State = #vqstate { index_state = IndexState, |
