diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 41 |
1 files changed, 40 insertions, 1 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ae9ca375d9..b0bfd8cdf6 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -32,7 +32,7 @@ -module(rabbit_variable_queue). -export([init/1, publish/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1, - fetch/1, ack/2, len/1, is_empty/1, maybe_start_prefetcher/1]). + fetch/1, ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1]). %%---------------------------------------------------------------------------- @@ -261,8 +261,47 @@ ack(AckTags, State = #vqstate { index_state = IndexState }) -> ok = rabbit_msg_store:remove(MsgIds), State #vqstate { index_state = IndexState1 }. +purge(State = #vqstate { q3 = Q3, prefetcher = undefined, + index_state = IndexState }) -> + case queue:is_empty(Q3) of + true -> State #vqstate { q1 = queue:new(), q4 = queue:new() }; + false -> IndexState1 = remove_betas(Q3, IndexState), + purge(maybe_load_next_segment( + State #vqstate { index_state = IndexState1 })) + end; +purge(State) -> + purge(drain_prefetcher(stop, State)). + %%---------------------------------------------------------------------------- +remove_betas(Q, IndexState) -> + {MsgIds, SeqIds, IndexState1} = + lists:foldl( + fun (#beta { msg_id = MsgId, + seq_id = SeqId, + is_delivered = IsDelivered, + index_on_disk = IndexOnDisk }, + {MsgIdsAcc, SeqIdsAcc, IndexStateN}) -> + IndexStateN1 = case IndexOnDisk andalso not IsDelivered of + true -> rabbit_queue_index:write_delivered( + SeqId, IndexStateN); + false -> IndexStateN + end, + SeqIdsAcc1 = case IndexOnDisk of + true -> [SeqId | SeqIdsAcc]; + false -> SeqIdsAcc + end, + {[MsgId | MsgIdsAcc], SeqIdsAcc1, IndexStateN1} + end, {[], [], IndexState}, lists:reverse(queue:to_list(Q))), + ok = case MsgIds of + [] -> ok; + _ -> rabbit_msg_store:remove(MsgIds) + end, + case SeqIds of + [] -> IndexState1; + _ -> rabbit_queue_index:write_acks(SeqIds, IndexState1) + end. + publish(msg, Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, SeqId, IsDelivered, State = #vqstate { index_state = IndexState, |
