diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-12 11:44:59 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-12 11:44:59 +0100 |
| commit | cbdf3da7c001844eb3cbb1dfb3d512e87172793e (patch) | |
| tree | a65ef29a337d75c28235055fcc9625436698720b /src | |
| parent | ab00b22f45feb9643a5efd22b87b488a927bd46f (diff) | |
| download | rabbitmq-server-git-cbdf3da7c001844eb3cbb1dfb3d512e87172793e.tar.gz | |
Well, it was beautiful, but it was also wrong. Firstly, purge needs to return the count of the msgs purged. Secondly, it needs to remember to purge msgs and indices on disk that are in q1 or q4.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 68 |
1 files changed, 48 insertions, 20 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 41ad77917e..261478c564 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -261,27 +261,36 @@ ack(AckTags, State = #vqstate { index_state = IndexState }) -> ok = rabbit_msg_store:remove(MsgIds), State #vqstate { index_state = IndexState1 }. -purge(State = #vqstate { q3 = Q3, prefetcher = undefined, +purge(State = #vqstate { prefetcher = undefined, q4 = Q4, index_state = IndexState }) -> - case queue:is_empty(Q3) of - true -> State #vqstate { q1 = queue:new(), q4 = queue:new() }; - false -> IndexState1 = remove_betas(Q3, IndexState), - purge(maybe_load_next_segment( - State #vqstate { index_state = IndexState1 })) - end; + {Q4Count, IndexState1} = remove_queue_entries(Q4, IndexState), + purge1(Q4Count, State #vqstate { index_state = IndexState1, + q4 = queue:new() }); purge(State) -> purge(drain_prefetcher(stop, State)). %%---------------------------------------------------------------------------- -remove_betas(Q, IndexState) -> - {MsgIds, SeqIds, IndexState1} = +purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) -> + case queue:is_empty(Q3) of + true -> + {Q1Count, IndexState1} = + remove_queue_entries(State #vqstate.q1, IndexState), + {Count + Q1Count, State #vqstate { q1 = queue:new(), + index_state = IndexState1 }}; + false -> + {Q3Count, IndexState1} = remove_queue_entries(Q3, IndexState), + purge1(Count + Q3Count, + maybe_load_next_segment( + State #vqstate { index_state = IndexState1 })) + end. + +remove_queue_entries(Q, IndexState) -> + {Count, MsgIds, SeqIds, IndexState1} = lists:foldl( - fun (#beta { msg_id = MsgId, - seq_id = SeqId, - is_delivered = IsDelivered, - index_on_disk = IndexOnDisk }, - {MsgIdsAcc, SeqIdsAcc, IndexStateN}) -> + fun (Entry, {CountN, MsgIdsAcc, SeqIdsAcc, IndexStateN}) -> + {MsgId, SeqId, IsDelivered, MsgOnDisk, IndexOnDisk} = + entry_salient_details(Entry), IndexStateN1 = case IndexOnDisk andalso not IsDelivered of true -> rabbit_queue_index:write_delivered( SeqId, IndexStateN); @@ -291,16 +300,35 @@ remove_betas(Q, IndexState) -> true -> [SeqId | SeqIdsAcc]; false -> SeqIdsAcc end, - {[MsgId | MsgIdsAcc], SeqIdsAcc1, IndexStateN1} - end, {[], [], IndexState}, lists:reverse(queue:to_list(Q))), + MsgIdsAcc1 = case MsgOnDisk of + true -> [MsgId | MsgIdsAcc]; + false -> MsgIdsAcc + end, + {CountN + 1, MsgIdsAcc1, SeqIdsAcc1, IndexStateN1} + %% the foldl is going to reverse the result lists, so start + %% by reversing so that we maintain doing things in + %% ascending seqid order + end, {0, [], [], IndexState}, lists:reverse(queue:to_list(Q))), ok = case MsgIds of [] -> ok; _ -> rabbit_msg_store:remove(MsgIds) end, - case SeqIds of - [] -> IndexState1; - _ -> rabbit_queue_index:write_acks(SeqIds, IndexState1) - end. + IndexState2 = + case SeqIds of + [] -> IndexState1; + _ -> rabbit_queue_index:write_acks(SeqIds, IndexState1) + end, + {Count, IndexState2}. + +entry_salient_details(#alpha { msg = #basic_message { guid = MsgId }, + seq_id = SeqId, is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }) -> + {MsgId, SeqId, IsDelivered, MsgOnDisk, IndexOnDisk}; +entry_salient_details(#beta { msg_id = MsgId, seq_id = SeqId, + is_delivered = IsDelivered, + index_on_disk = IndexOnDisk }) -> + {MsgId, SeqId, IsDelivered, true, IndexOnDisk}. publish(msg, Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, |
