summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl68
1 files changed, 48 insertions, 20 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 41ad77917e..261478c564 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -261,27 +261,36 @@ ack(AckTags, State = #vqstate { index_state = IndexState }) ->
ok = rabbit_msg_store:remove(MsgIds),
State #vqstate { index_state = IndexState1 }.
-purge(State = #vqstate { q3 = Q3, prefetcher = undefined,
+purge(State = #vqstate { prefetcher = undefined, q4 = Q4,
index_state = IndexState }) ->
- case queue:is_empty(Q3) of
- true -> State #vqstate { q1 = queue:new(), q4 = queue:new() };
- false -> IndexState1 = remove_betas(Q3, IndexState),
- purge(maybe_load_next_segment(
- State #vqstate { index_state = IndexState1 }))
- end;
+ {Q4Count, IndexState1} = remove_queue_entries(Q4, IndexState),
+ purge1(Q4Count, State #vqstate { index_state = IndexState1,
+ q4 = queue:new() });
purge(State) ->
purge(drain_prefetcher(stop, State)).
%%----------------------------------------------------------------------------
-remove_betas(Q, IndexState) ->
- {MsgIds, SeqIds, IndexState1} =
+purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) ->
+ case queue:is_empty(Q3) of
+ true ->
+ {Q1Count, IndexState1} =
+ remove_queue_entries(State #vqstate.q1, IndexState),
+ {Count + Q1Count, State #vqstate { q1 = queue:new(),
+ index_state = IndexState1 }};
+ false ->
+ {Q3Count, IndexState1} = remove_queue_entries(Q3, IndexState),
+ purge1(Count + Q3Count,
+ maybe_load_next_segment(
+ State #vqstate { index_state = IndexState1 }))
+ end.
+
+remove_queue_entries(Q, IndexState) ->
+ {Count, MsgIds, SeqIds, IndexState1} =
lists:foldl(
- fun (#beta { msg_id = MsgId,
- seq_id = SeqId,
- is_delivered = IsDelivered,
- index_on_disk = IndexOnDisk },
- {MsgIdsAcc, SeqIdsAcc, IndexStateN}) ->
+ fun (Entry, {CountN, MsgIdsAcc, SeqIdsAcc, IndexStateN}) ->
+ {MsgId, SeqId, IsDelivered, MsgOnDisk, IndexOnDisk} =
+ entry_salient_details(Entry),
IndexStateN1 = case IndexOnDisk andalso not IsDelivered of
true -> rabbit_queue_index:write_delivered(
SeqId, IndexStateN);
@@ -291,16 +300,35 @@ remove_betas(Q, IndexState) ->
true -> [SeqId | SeqIdsAcc];
false -> SeqIdsAcc
end,
- {[MsgId | MsgIdsAcc], SeqIdsAcc1, IndexStateN1}
- end, {[], [], IndexState}, lists:reverse(queue:to_list(Q))),
+ MsgIdsAcc1 = case MsgOnDisk of
+ true -> [MsgId | MsgIdsAcc];
+ false -> MsgIdsAcc
+ end,
+ {CountN + 1, MsgIdsAcc1, SeqIdsAcc1, IndexStateN1}
+ %% the foldl is going to reverse the result lists, so start
+ %% by reversing so that we maintain doing things in
+ %% ascending seqid order
+ end, {0, [], [], IndexState}, lists:reverse(queue:to_list(Q))),
ok = case MsgIds of
[] -> ok;
_ -> rabbit_msg_store:remove(MsgIds)
end,
- case SeqIds of
- [] -> IndexState1;
- _ -> rabbit_queue_index:write_acks(SeqIds, IndexState1)
- end.
+ IndexState2 =
+ case SeqIds of
+ [] -> IndexState1;
+ _ -> rabbit_queue_index:write_acks(SeqIds, IndexState1)
+ end,
+ {Count, IndexState2}.
+
+entry_salient_details(#alpha { msg = #basic_message { guid = MsgId },
+ seq_id = SeqId, is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk }) ->
+ {MsgId, SeqId, IsDelivered, MsgOnDisk, IndexOnDisk};
+entry_salient_details(#beta { msg_id = MsgId, seq_id = SeqId,
+ is_delivered = IsDelivered,
+ index_on_disk = IndexOnDisk }) ->
+ {MsgId, SeqId, IsDelivered, true, IndexOnDisk}.
publish(msg, Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },