diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-12-04 12:10:10 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-12-04 12:10:10 +0000 |
| commit | 41805b590d507f3c9a443d48b3661364e052e9ae (patch) | |
| tree | 61e3a0db9ecf066758a5d6c35ed9022cb573a868 | |
| parent | b51ba757d4e71fb86259c69f3693747d1d122e38 (diff) | |
| download | rabbitmq-server-git-41805b590d507f3c9a443d48b3661364e052e9ae.tar.gz | |
refactoring
| -rw-r--r-- | src/rabbit_queue_index.erl | 73 |
1 files changed, 39 insertions, 34 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 6345428e8d..ed469849ff 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -156,7 +156,8 @@ init(Name) -> State1 = load_journal(State), %% 2. Flush the journal. This makes life easier for everyone, as %% it means there won't be any publishes in the journal alone. - State2 = #qistate { dir = Dir } = flush_journal(State1), + State2 = #qistate { dir = Dir, segments = Segments, + dirty_count = DCount } = flush_journal(State1), %% 3. Load each segment in turn and filter out messages that are %% not in the msg_store, by adding acks to the journal. These %% acks only go to the RAM journal as it doesn't matter if we @@ -167,39 +168,41 @@ init(Name) -> %% We know the journal is empty here, so we don't need to combine %% with the journal, and we don't need to worry about messages %% that have been acked. - {State3 = #qistate { segments = Segments }, Count} = + {Segments1, Count, DCount1} = lists:foldl( - fun (Seg, {StateN = #qistate { segments = SegmentsN }, CountAcc}) -> - Segment = segment_find_or_new(Seg, Dir, SegmentsN), + fun (Seg, {Segments2, CountAcc, DCountAcc}) -> + Segment = segment_find_or_new(Seg, Dir, Segments2), {SegDict, _PubCount, _AckCount, Segment1} = load_segment(false, Segment), - SegmentsN1 = segment_store(Segment1, SegmentsN), - StateN1 = #qistate { segments = SegmentsN2 } = + {Segment2 = #segment { pubs = PubCount, acks = AckCount }, + DCountAcc1} = dict:fold( fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack}, - StateM) -> - SeqId = reconstruct_seq_id(Seg, RelSeq), + {Segment3, DCountAcc2}) -> InMsgStore = rabbit_msg_store:contains(MsgId), case {InMsgStore, CleanShutdown} of {true, true} -> - StateM; + {Segment3, DCountAcc}; {true, false} when Del == del -> - StateM; + {Segment3, DCountAcc}; {true, false} -> - add_to_journal(SeqId, del, StateM); + {add_to_journal(RelSeq, del, Segment3), + DCountAcc2 + 1}; {false, _} when Del == del -> - add_to_journal(SeqId, ack, StateM); + {add_to_journal(RelSeq, ack, Segment3), + DCountAcc2 + 1}; {false, _} -> - add_to_journal( - SeqId, ack, - add_to_journal(SeqId, del, StateM)) + {add_to_journal( + RelSeq, ack, + add_to_journal( + RelSeq, del, Segment3)), + DCountAcc2 + 2} end - end, StateN #qistate { segments=SegmentsN1 }, SegDict), - {ok, #segment { pubs = PubCount, acks = AckCount }} = - segment_find(Seg, SegmentsN2), - {StateN1, CountAcc + PubCount - AckCount} - end, {State2, 0}, AllSegs), - {Count, State3}. + end, {Segment1, DCountAcc}, SegDict), + {segment_store(Segment2, Segments2), + CountAcc + PubCount - AckCount, DCountAcc1} + end, {Segments, 0, DCount}, AllSegs), + {Count, State2 #qistate { segments = Segments1, dirty_count = DCount1 }}. terminate(State) -> terminate(true, State). @@ -491,7 +494,7 @@ segment_new(Seg, Dir) -> segment_find_or_new(Seg, Dir, Segments) -> case segment_find(Seg, Segments) of - error -> segment_new(Seg, Dir); + error -> segment_new(Seg, Dir); {ok, Segment} -> Segment end. @@ -717,19 +720,21 @@ add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount, segments = Segments, dir = Dir }) -> {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - Segment = #segment { journal_entries = SegJDict, - pubs = PubCount, acks = AckCount } = - segment_find_or_new(Seg, Dir, Segments), - SegJDict1 = add_to_journal(RelSeq, Action, SegJDict), - Segment1 = Segment #segment { journal_entries = SegJDict1 }, - Segment2 = - case Action of - del -> Segment1; - ack -> Segment1 #segment { acks = AckCount + 1 }; - {_MsgId, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 } - end, + Segment = segment_find_or_new(Seg, Dir, Segments), + Segment1 = add_to_journal(RelSeq, Action, Segment), State #qistate { dirty_count = DCount + 1, - segments = segment_store(Segment2, Segments) }; + segments = segment_store(Segment1, Segments) }; + +add_to_journal(RelSeq, Action, Segment = + #segment { journal_entries = SegJournal, + pubs = PubCount, acks = AckCount }) -> + SegJournal1 = add_to_journal(RelSeq, Action, SegJournal), + Segment1 = Segment #segment { journal_entries = SegJournal1 }, + case Action of + del -> Segment1; + ack -> Segment1 #segment { acks = AckCount + 1 }; + {_MsgId, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 } + end; %% This is a more relaxed version of deliver_or_ack_msg because we can %% have dels or acks in the journal without the corresponding |
