diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-11-02 15:43:04 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-11-02 15:43:04 +0000 |
| commit | bf1029db49ed642f270c78f0420e6aec691f1d95 (patch) | |
| tree | 5ea79f1dcb3c53dd5569006a8075980875129acd /src | |
| parent | 4eec29a3d5d1f1d0da31c546b060cd59d327fcc4 (diff) | |
| download | rabbitmq-server-git-bf1029db49ed642f270c78f0420e6aec691f1d95.tar.gz | |
Still haven't discovered any real bugs in the refactoring that happened to qi on Friday, but quite a lot of cleaning, extra code comments and general polish to qi
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 82 |
1 files changed, 41 insertions, 41 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index c68a3d0460..4b48df82dc 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -292,21 +292,20 @@ find_lowest_seq_id_seg_and_next_seq_id(State = #qistate { dir = Dir }) -> %% SegNums is sorted, ascending. LowSeqIdSeg = case SegNums of - [] -> 0; - _ -> reconstruct_seq_id(hd(SegNums), 0) + [] -> 0; + [MinSegNum|_] -> reconstruct_seq_id(MinSegNum, 0) end, {NextSeqId, State1} = case SegNums of [] -> {0, State}; - _ -> SegNum2 = lists:last(SegNums), + _ -> MaxSegNum = lists:last(SegNums), {_SDict, PubCount, _AckCount, HighRelSeq, State2} = - load_segment(SegNum2, State), - NextSeqId1 = reconstruct_seq_id(SegNum2, HighRelSeq), - NextSeqId2 = - case PubCount of - 0 -> NextSeqId1; - _ -> NextSeqId1 + 1 - end, + load_segment(MaxSegNum, State), + NextSeqId1 = reconstruct_seq_id(MaxSegNum, HighRelSeq), + NextSeqId2 = case PubCount of + 0 -> NextSeqId1; + _ -> NextSeqId1 + 1 + end, {NextSeqId2, State2} end, {LowSeqIdSeg, NextSeqId, State1}. @@ -459,8 +458,9 @@ seg_num_to_path(Dir, SegNum) -> delete_queue_directory(Dir) -> {ok, Entries} = file:list_dir(Dir), - lists:foreach(fun file:delete/1, - [ filename:join(Dir, Entry) || Entry <- Entries ]), + ok = lists:foldl(fun (Entry, ok) -> + file:delete(filename:join(Dir, Entry)) + end, ok, Entries), ok = file:del_dir(Dir). add_seqid_to_dict(SeqId, Dict) -> @@ -556,6 +556,10 @@ read_and_prune_segments(State = #qistate { dir = Dir }) -> load_segment(SegNum, StateN), StateL = #qistate { seg_ack_counts = AckCounts } = drop_and_deliver(SegNum, SDict, CleanShutdown, StateM), + %% ignore the effect of drop_and_deliver on + %% TotalMsgCount and AckCounts, as drop_and_deliver + %% will add to the journal dicts, which will then + %% effect TotalMsgCount when we scatter the journal TotalMsgCount2 = TotalMsgCount1 + dict:size(SDict), AckCounts1 = case AckCount of 0 -> AckCounts; @@ -589,12 +593,13 @@ scatter_journal(TotalMsgCount, State = #qistate { dir = Dir }) -> {TotalMsgCount1, ADict1, State3} = dict:fold(fun replay_journal_to_segment/3, {TotalMsgCount, ADict, - %% supply empty dict so that when - %% replay_journal_acks_to_segment loads segments, - %% it gets all msgs, and ignores anything we've - %% found in the journal. + %% supply empty dicts so that when + %% replay_journal_to_segment loads segments, it + %% gets all msgs, and ignores anything we've found + %% in the journal. State2 #qistate { journal_del_dict = dict:new(), journal_ack_dict = dict:new() }}, DDict), + %% replay for segments which only had acks, and no deliveries {TotalMsgCount2, State4} = dict:fold(fun replay_journal_acks_to_segment/3, {TotalMsgCount1, State3}, ADict1), @@ -767,41 +772,36 @@ append_acks_to_segment(SegNum, Acks, {ok, AckCount1} -> AckCount1; error -> 0 end, - case append_acks_to_segment(SegNum, AckCount, Acks, State) of - {0, State1} -> State1; - {?SEGMENT_ENTRIES_COUNT, - State1 = #qistate { seg_ack_counts = AckCounts1 }} -> - State1 #qistate { seg_ack_counts = dict:erase(SegNum, AckCounts1) }; - {AckCount2, State1 = #qistate { seg_ack_counts = AckCounts1 }} -> - State1 #qistate { seg_ack_counts = dict:store(SegNum, AckCount2, - AckCounts1) } - end. - -append_acks_to_segment(SegNum, AckCount, Acks, State = #qistate { dir = Dir }) - when length(Acks) + AckCount == ?SEGMENT_ENTRIES_COUNT -> + AckCount2 = AckCount + length(Acks), + AckCounts1 = case AckCount2 of + 0 -> AckCounts; + ?SEGMENT_ENTRIES_COUNT -> dict:erase(SegNum, AckCounts); + _ -> dict:store(SegNum, AckCount2, AckCounts) + end, + append_acks_to_segment(SegNum, AckCount2, Acks, + State #qistate { seg_ack_counts = AckCounts1 }). + +append_acks_to_segment(SegNum, AckCount, _Acks, State = #qistate { dir = Dir }) + when AckCount == ?SEGMENT_ENTRIES_COUNT -> State1 = close_handle(SegNum, State), ok = case file:delete(seg_num_to_path(Dir, SegNum)) of ok -> ok; {error, enoent} -> ok end, - {?SEGMENT_ENTRIES_COUNT, State1}; + State1; append_acks_to_segment(SegNum, AckCount, Acks, State) - when length(Acks) + AckCount < ?SEGMENT_ENTRIES_COUNT -> - {Count, Hdl, State1} = append_to_segment(SegNum, Acks, State), + when AckCount < ?SEGMENT_ENTRIES_COUNT -> + {Hdl, State1} = append_to_segment(SegNum, Acks, State), ok = file_handle_cache:sync(Hdl), - {AckCount + Count, State1}. + State1. append_dels_to_segment(SegNum, Dels, State) -> - {_Count, _Hdl, State1} = append_to_segment(SegNum, Dels, State), + {_Hdl, State1} = append_to_segment(SegNum, Dels, State), State1. append_to_segment(SegNum, AcksOrDels, State) -> {Hdl, State1} = get_seg_handle(SegNum, State), - {Count, List} = - lists:foldl(fun (RelSeq, {Count1, Acc}) -> - {Count1 + 1, - [<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>> | Acc]} - end, {0, []}, AcksOrDels), - ok = file_handle_cache:append(Hdl, List), - {Count, Hdl, State1}. + ok = file_handle_cache:append( + Hdl, [<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>> || RelSeq <- AcksOrDels ]), + {Hdl, State1}. |
