diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 68 |
1 files changed, 48 insertions, 20 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 41ad77917e..261478c564 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -261,27 +261,36 @@ ack(AckTags, State = #vqstate { index_state = IndexState }) -> ok = rabbit_msg_store:remove(MsgIds), State #vqstate { index_state = IndexState1 }. -purge(State = #vqstate { q3 = Q3, prefetcher = undefined, +purge(State = #vqstate { prefetcher = undefined, q4 = Q4, index_state = IndexState }) -> - case queue:is_empty(Q3) of - true -> State #vqstate { q1 = queue:new(), q4 = queue:new() }; - false -> IndexState1 = remove_betas(Q3, IndexState), - purge(maybe_load_next_segment( - State #vqstate { index_state = IndexState1 })) - end; + {Q4Count, IndexState1} = remove_queue_entries(Q4, IndexState), + purge1(Q4Count, State #vqstate { index_state = IndexState1, + q4 = queue:new() }); purge(State) -> purge(drain_prefetcher(stop, State)). %%---------------------------------------------------------------------------- -remove_betas(Q, IndexState) -> - {MsgIds, SeqIds, IndexState1} = +purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) -> + case queue:is_empty(Q3) of + true -> + {Q1Count, IndexState1} = + remove_queue_entries(State #vqstate.q1, IndexState), + {Count + Q1Count, State #vqstate { q1 = queue:new(), + index_state = IndexState1 }}; + false -> + {Q3Count, IndexState1} = remove_queue_entries(Q3, IndexState), + purge1(Count + Q3Count, + maybe_load_next_segment( + State #vqstate { index_state = IndexState1 })) + end. + +remove_queue_entries(Q, IndexState) -> + {Count, MsgIds, SeqIds, IndexState1} = lists:foldl( - fun (#beta { msg_id = MsgId, - seq_id = SeqId, - is_delivered = IsDelivered, - index_on_disk = IndexOnDisk }, - {MsgIdsAcc, SeqIdsAcc, IndexStateN}) -> + fun (Entry, {CountN, MsgIdsAcc, SeqIdsAcc, IndexStateN}) -> + {MsgId, SeqId, IsDelivered, MsgOnDisk, IndexOnDisk} = + entry_salient_details(Entry), IndexStateN1 = case IndexOnDisk andalso not IsDelivered of true -> rabbit_queue_index:write_delivered( SeqId, IndexStateN); @@ -291,16 +300,35 @@ remove_betas(Q, IndexState) -> true -> [SeqId | SeqIdsAcc]; false -> SeqIdsAcc end, - {[MsgId | MsgIdsAcc], SeqIdsAcc1, IndexStateN1} - end, {[], [], IndexState}, lists:reverse(queue:to_list(Q))), + MsgIdsAcc1 = case MsgOnDisk of + true -> [MsgId | MsgIdsAcc]; + false -> MsgIdsAcc + end, + {CountN + 1, MsgIdsAcc1, SeqIdsAcc1, IndexStateN1} + %% the foldl is going to reverse the result lists, so start + %% by reversing so that we maintain doing things in + %% ascending seqid order + end, {0, [], [], IndexState}, lists:reverse(queue:to_list(Q))), ok = case MsgIds of [] -> ok; _ -> rabbit_msg_store:remove(MsgIds) end, - case SeqIds of - [] -> IndexState1; - _ -> rabbit_queue_index:write_acks(SeqIds, IndexState1) - end. + IndexState2 = + case SeqIds of + [] -> IndexState1; + _ -> rabbit_queue_index:write_acks(SeqIds, IndexState1) + end, + {Count, IndexState2}. + +entry_salient_details(#alpha { msg = #basic_message { guid = MsgId }, + seq_id = SeqId, is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }) -> + {MsgId, SeqId, IsDelivered, MsgOnDisk, IndexOnDisk}; +entry_salient_details(#beta { msg_id = MsgId, seq_id = SeqId, + is_delivered = IsDelivered, + index_on_disk = IndexOnDisk }) -> + {MsgId, SeqId, IsDelivered, true, IndexOnDisk}. publish(msg, Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, |
