diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-12-04 00:06:44 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-12-04 00:06:44 +0000 |
| commit | b51ba757d4e71fb86259c69f3693747d1d122e38 (patch) | |
| tree | 3272857a75e4452bc2a37e1783179424bba56630 | |
| parent | 2b3c37e243009a5261a7710b5ca21e906bb48fc7 (diff) | |
| download | rabbitmq-server-git-b51ba757d4e71fb86259c69f3693747d1d122e38.tar.gz | |
combined steps 3 and 4 of init, and made segment_find have the same type as dict:find. Dropped the Dir from the Segments, and added find_segment_or_new/3
| -rw-r--r-- | src/rabbit_queue_index.erl | 153 |
1 files changed, 78 insertions, 75 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 136ff82995..6345428e8d 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -116,7 +116,7 @@ })). -type(msg_id() :: binary()). -type(seq_id() :: integer()). --type(seg_dict() :: {dict(), [segment()], file_path()}). +-type(seg_dict() :: {dict(), [segment()]}). -type(qistate() :: #qistate { dir :: file_path(), segments :: seg_dict(), journal_handle :: hdl(), @@ -160,48 +160,45 @@ init(Name) -> %% 3. Load each segment in turn and filter out messages that are %% not in the msg_store, by adding acks to the journal. These %% acks only go to the RAM journal as it doesn't matter if we - %% lose them. Also mark delivered if not clean shutdown. + %% lose them. Also mark delivered if not clean shutdown. Also + %% find the number of unacked messages. AllSegs = all_segment_nums(State2), CleanShutdown = detect_clean_shutdown(Dir), %% We know the journal is empty here, so we don't need to combine %% with the journal, and we don't need to worry about messages %% that have been acked. - State3 = #qistate { segments = Segments } = + {State3 = #qistate { segments = Segments }, Count} = lists:foldl( - fun (Seg, StateN = #qistate { segments = SegmentsN }) -> - Segment = segment_find(Seg, SegmentsN), + fun (Seg, {StateN = #qistate { segments = SegmentsN }, CountAcc}) -> + Segment = segment_find_or_new(Seg, Dir, SegmentsN), {SegDict, _PubCount, _AckCount, Segment1} = load_segment(false, Segment), SegmentsN1 = segment_store(Segment1, SegmentsN), - dict:fold( - fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack}, - StateM) -> - SeqId = reconstruct_seq_id(Seg, RelSeq), - InMsgStore = rabbit_msg_store:contains(MsgId), - case {InMsgStore, CleanShutdown} of - {true, true} -> - StateM; - {true, false} when Del == del -> - StateM; - {true, false} -> - add_to_journal(SeqId, del, StateM); - {false, _} when Del == del -> - add_to_journal(SeqId, ack, StateM); - {false, _} -> - add_to_journal( - SeqId, ack, - add_to_journal(SeqId, del, StateM)) - end - end, StateN #qistate { segments = SegmentsN1 }, SegDict) - end, State2, AllSegs), - %% 4. Go through all segments and calculate the number of unacked - %% messages we have. - Count = lists:foldl( - fun (Seg, CountAcc) -> - #segment { pubs = PubCount, acks = AckCount } = - segment_find(Seg, Segments), - CountAcc + PubCount - AckCount - end, 0, AllSegs), + StateN1 = #qistate { segments = SegmentsN2 } = + dict:fold( + fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack}, + StateM) -> + SeqId = reconstruct_seq_id(Seg, RelSeq), + InMsgStore = rabbit_msg_store:contains(MsgId), + case {InMsgStore, CleanShutdown} of + {true, true} -> + StateM; + {true, false} when Del == del -> + StateM; + {true, false} -> + add_to_journal(SeqId, del, StateM); + {false, _} when Del == del -> + add_to_journal(SeqId, ack, StateM); + {false, _} -> + add_to_journal( + SeqId, ack, + add_to_journal(SeqId, del, StateM)) + end + end, StateN #qistate { segments=SegmentsN1 }, SegDict), + {ok, #segment { pubs = PubCount, acks = AckCount }} = + segment_find(Seg, SegmentsN2), + {StateN1, CountAcc + PubCount - AckCount} + end, {State2, 0}, AllSegs), {Count, State3}. terminate(State) -> @@ -249,7 +246,7 @@ sync_seq_ids(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> flush_journal(State = #qistate { dirty_count = 0 }) -> State; -flush_journal(State = #qistate { segments = Segments, dir = Dir }) -> +flush_journal(State = #qistate { segments = Segments }) -> Segments1 = segment_fold( fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount, @@ -273,15 +270,16 @@ flush_journal(State = #qistate { segments = Segments, dir = Dir }) -> end, segment_store(Segment1, SegmentsN) end - end, segment_new(Dir), Segments), + end, segments_new(), Segments), {JournalHdl, State1} = get_journal_handle(State #qistate { segments = Segments1 }), ok = file_handle_cache:clear(JournalHdl), State1 #qistate { dirty_count = 0 }. -read_segment_entries(InitSeqId, State = #qistate { segments = Segments }) -> +read_segment_entries(InitSeqId, State = #qistate { segments = Segments, + dir = Dir }) -> {Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), - Segment = segment_find(Seg, Segments), + Segment = segment_find_or_new(Seg, Dir, Segments), {SegDict, _PubCount, _AckCount, Segment1 = #segment { journal_entries = JEntries }} = load_segment(false, Segment), @@ -412,7 +410,7 @@ blank_state(QueueName) -> Dir = filename:join(queues_dir(), StrName), ok = filelib:ensure_dir(filename:join(Dir, "nothing")), #qistate { dir = Dir, - segments = segment_new(Dir), + segments = segments_new(), journal_handle = undefined, dirty_count = 0 }. @@ -475,58 +473,62 @@ get_segment_handle(Segment = #segment { handle = undefined, path = Path }) -> get_segment_handle(Segment = #segment { handle = Hdl }) -> {Hdl, Segment}. -segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_], _Dir}) -> - Segment; %% 1 or (2, matches head) -segment_find(Seg, {_Segments, [_, Segment = #segment { num = Seg }], _Dir}) -> - Segment; %% 2, matches tail -segment_find(Seg, {Segments, _, Dir}) -> %% no match - case dict:find(Seg, Segments) of - {ok, Segment = #segment{}} -> Segment; - error -> #segment { pubs = 0, - acks = 0, - handle = undefined, - journal_entries = journal_new(), - path = seg_num_to_path(Dir, Seg), - num = Seg - } +segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_]}) -> + {ok, Segment}; %% 1 or (2, matches head) +segment_find(Seg, {_Segments, [_, Segment = #segment { num = Seg }]}) -> + {ok, Segment}; %% 2, matches tail +segment_find(Seg, {Segments, _}) -> %% no match + dict:find(Seg, Segments). + +segment_new(Seg, Dir) -> + #segment { pubs = 0, + acks = 0, + handle = undefined, + journal_entries = journal_new(), + path = seg_num_to_path(Dir, Seg), + num = Seg + }. + +segment_find_or_new(Seg, Dir, Segments) -> + case segment_find(Seg, Segments) of + error -> segment_new(Seg, Dir); + {ok, Segment} -> Segment end. segment_store(Segment = #segment { num = Seg }, %% 1 or (2, matches head) - {Segments, [#segment { num = Seg } | Tail], Dir}) -> - {Segments, [Segment | Tail], Dir}; + {Segments, [#segment { num = Seg } | Tail]}) -> + {Segments, [Segment | Tail]}; segment_store(Segment = #segment { num = Seg }, %% 2, matches tail - {Segments, [SegmentA, #segment { num = Seg }], Dir}) -> - {Segments, [SegmentA, Segment], Dir}; -segment_store(Segment = #segment { num = Seg }, - {Segments, [], Dir}) -> - {dict:erase(Seg, Segments), [Segment], Dir}; -segment_store(Segment = #segment { num = Seg }, - {Segments, [SegmentA], Dir}) -> - {dict:erase(Seg, Segments), [Segment, SegmentA], Dir}; + {Segments, [SegmentA, #segment { num = Seg }]}) -> + {Segments, [SegmentA, Segment]}; +segment_store(Segment = #segment { num = Seg }, {Segments, []}) -> + {dict:erase(Seg, Segments), [Segment]}; +segment_store(Segment = #segment { num = Seg }, {Segments, [SegmentA]}) -> + {dict:erase(Seg, Segments), [Segment, SegmentA]}; segment_store(Segment = #segment { num = Seg }, - {Segments, [SegmentA, SegmentB], Dir}) -> + {Segments, [SegmentA, SegmentB]}) -> {dict:store(SegmentB#segment.num, SegmentB, dict:erase(Seg, Segments)), - [Segment, SegmentA], Dir}. + [Segment, SegmentA]}. -segment_fold(Fun, Acc, {Segments, [], _Dir}) -> +segment_fold(Fun, Acc, {Segments, []}) -> dict:fold(Fun, Acc, Segments); -segment_fold(Fun, Acc, {Segments, CachedSegments, _Dir}) -> +segment_fold(Fun, Acc, {Segments, CachedSegments}) -> Acc1 = lists:foldl(fun (Segment = #segment { num = Num }, AccN) -> Fun(Num, Segment, AccN) end, Acc, CachedSegments), dict:fold(Fun, Acc1, Segments). -segment_map(Fun, {Segments, CachedSegments, Dir}) -> +segment_map(Fun, {Segments, CachedSegments}) -> {dict:map(Fun, Segments), lists:map(fun (Segment = #segment { num = Num }) -> Fun(Num, Segment) end, - CachedSegments), Dir}. + CachedSegments)}. -segment_fetch_keys({Segments, CachedSegments, _Dir}) -> +segment_fetch_keys({Segments, CachedSegments}) -> lists:map(fun (Segment) -> Segment#segment.num end, CachedSegments) ++ dict:fetch_keys(Segments). -segment_new(Dir) -> - {dict:new(), [], Dir}. +segments_new() -> + {dict:new(), []}. get_journal_handle(State = #qistate { journal_handle = undefined, dir = Dir }) -> @@ -583,7 +585,7 @@ terminate(StoreShutdown, State = true -> store_clean_shutdown(Dir); false -> ok end, - State #qistate { journal_handle = undefined, segments = segment_new(Dir) }. + State #qistate { journal_handle = undefined, segments = segments_new() }. %%---------------------------------------------------------------------------- %% Majors @@ -712,11 +714,12 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> end. add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount, - segments = Segments }) -> + segments = Segments, + dir = Dir }) -> {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), Segment = #segment { journal_entries = SegJDict, pubs = PubCount, acks = AckCount } = - segment_find(Seg, Segments), + segment_find_or_new(Seg, Dir, Segments), SegJDict1 = add_to_journal(RelSeq, Action, SegJDict), Segment1 = Segment #segment { journal_entries = SegJDict1 }, Segment2 = |
