summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-12 11:44:59 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-12 11:44:59 +0100
commitcbdf3da7c001844eb3cbb1dfb3d512e87172793e (patch)
treea65ef29a337d75c28235055fcc9625436698720b /src
parentab00b22f45feb9643a5efd22b87b488a927bd46f (diff)
downloadrabbitmq-server-git-cbdf3da7c001844eb3cbb1dfb3d512e87172793e.tar.gz
Well, it was beautiful, but it was also wrong. Firstly, purge needs to return the count of the msgs purged. Secondly, it needs to remember to purge msgs and indices on disk that are in q1 or q4.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl68
1 files changed, 48 insertions, 20 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 41ad77917e..261478c564 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -261,27 +261,36 @@ ack(AckTags, State = #vqstate { index_state = IndexState }) ->
ok = rabbit_msg_store:remove(MsgIds),
State #vqstate { index_state = IndexState1 }.
-purge(State = #vqstate { q3 = Q3, prefetcher = undefined,
+purge(State = #vqstate { prefetcher = undefined, q4 = Q4,
index_state = IndexState }) ->
- case queue:is_empty(Q3) of
- true -> State #vqstate { q1 = queue:new(), q4 = queue:new() };
- false -> IndexState1 = remove_betas(Q3, IndexState),
- purge(maybe_load_next_segment(
- State #vqstate { index_state = IndexState1 }))
- end;
+ {Q4Count, IndexState1} = remove_queue_entries(Q4, IndexState),
+ purge1(Q4Count, State #vqstate { index_state = IndexState1,
+ q4 = queue:new() });
purge(State) ->
purge(drain_prefetcher(stop, State)).
%%----------------------------------------------------------------------------
-remove_betas(Q, IndexState) ->
- {MsgIds, SeqIds, IndexState1} =
+purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) ->
+ case queue:is_empty(Q3) of
+ true ->
+ {Q1Count, IndexState1} =
+ remove_queue_entries(State #vqstate.q1, IndexState),
+ {Count + Q1Count, State #vqstate { q1 = queue:new(),
+ index_state = IndexState1 }};
+ false ->
+ {Q3Count, IndexState1} = remove_queue_entries(Q3, IndexState),
+ purge1(Count + Q3Count,
+ maybe_load_next_segment(
+ State #vqstate { index_state = IndexState1 }))
+ end.
+
+remove_queue_entries(Q, IndexState) ->
+ {Count, MsgIds, SeqIds, IndexState1} =
lists:foldl(
- fun (#beta { msg_id = MsgId,
- seq_id = SeqId,
- is_delivered = IsDelivered,
- index_on_disk = IndexOnDisk },
- {MsgIdsAcc, SeqIdsAcc, IndexStateN}) ->
+ fun (Entry, {CountN, MsgIdsAcc, SeqIdsAcc, IndexStateN}) ->
+ {MsgId, SeqId, IsDelivered, MsgOnDisk, IndexOnDisk} =
+ entry_salient_details(Entry),
IndexStateN1 = case IndexOnDisk andalso not IsDelivered of
true -> rabbit_queue_index:write_delivered(
SeqId, IndexStateN);
@@ -291,16 +300,35 @@ remove_betas(Q, IndexState) ->
true -> [SeqId | SeqIdsAcc];
false -> SeqIdsAcc
end,
- {[MsgId | MsgIdsAcc], SeqIdsAcc1, IndexStateN1}
- end, {[], [], IndexState}, lists:reverse(queue:to_list(Q))),
+ MsgIdsAcc1 = case MsgOnDisk of
+ true -> [MsgId | MsgIdsAcc];
+ false -> MsgIdsAcc
+ end,
+ {CountN + 1, MsgIdsAcc1, SeqIdsAcc1, IndexStateN1}
+ %% the foldl is going to reverse the result lists, so start
+ %% by reversing so that we maintain doing things in
+ %% ascending seqid order
+ end, {0, [], [], IndexState}, lists:reverse(queue:to_list(Q))),
ok = case MsgIds of
[] -> ok;
_ -> rabbit_msg_store:remove(MsgIds)
end,
- case SeqIds of
- [] -> IndexState1;
- _ -> rabbit_queue_index:write_acks(SeqIds, IndexState1)
- end.
+ IndexState2 =
+ case SeqIds of
+ [] -> IndexState1;
+ _ -> rabbit_queue_index:write_acks(SeqIds, IndexState1)
+ end,
+ {Count, IndexState2}.
+
+entry_salient_details(#alpha { msg = #basic_message { guid = MsgId },
+ seq_id = SeqId, is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk }) ->
+ {MsgId, SeqId, IsDelivered, MsgOnDisk, IndexOnDisk};
+entry_salient_details(#beta { msg_id = MsgId, seq_id = SeqId,
+ is_delivered = IsDelivered,
+ index_on_disk = IndexOnDisk }) ->
+ {MsgId, SeqId, IsDelivered, true, IndexOnDisk}.
publish(msg, Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },