diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-06 18:13:51 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-06 18:13:51 +0100 |
| commit | eb8add1e061d65ba150935c2733abcb023643d81 (patch) | |
| tree | 7f6cac524b7327b53bf38f35ec41d80e0927cddd | |
| parent | 70c11fb19bbc6970950b29ada906eb42171cbe02 (diff) | |
| download | rabbitmq-server-git-eb8add1e061d65ba150935c2733abcb023643d81.tar.gz | |
Large accountancy bug in queue index leading to great confusion and indeed infinite loop. Fixed.
| -rw-r--r-- | src/rabbit_queue_index.erl | 72 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 4 |
2 files changed, 51 insertions, 25 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 4887ec2160..ee4f05b4d2 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -243,8 +243,8 @@ init(Name, MsgStoreRecovered) -> State1 = load_journal(State), %% 2. Flush the journal. This makes life easier for everyone, as %% it means there won't be any publishes in the journal alone. - State2 = #qistate { dir = Dir, segments = Segments, - dirty_count = DCount } = flush_journal(State1), + State2 = #qistate { dir = Dir, segments = Segments } = + flush_journal(State1), %% 3. Load each segment in turn and filter out messages that are %% not in the msg_store, by adding acks to the journal. These %% acks only go to the RAM journal as it doesn't matter if we @@ -254,36 +254,59 @@ init(Name, MsgStoreRecovered) -> %% We know the journal is empty here, so we don't need to combine %% with the journal, and we don't need to worry about messages %% that have been acked. - {Segments1, Count, DCount1} = + {Segments1, Count} = case CleanShutdown andalso MsgStoreRecovered of false -> lists:foldl( - fun (Seg, {Segments2, CountAcc, DCountAcc}) -> + fun (Seg, {Segments2, CountAcc}) -> Segment = segment_find_or_new(Seg, Dir, Segments2), {SegEntries, PubCount, AckCount, Segment1} = load_segment(false, Segment), - {Segment2 = #segment { pubs = PubCount1, acks = AckCount1 }, - DCountAcc1} = + Segment2 = + #segment { pubs = PubCount1, acks = AckCount1 } = array:sparse_foldl( fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack}, - {Segment3, DCountAcc2}) -> - {Segment4, DCountDelta} = + Segment3) -> + {Segment4, _DCountDelta} = maybe_add_to_journal( rabbit_msg_store:contains( ?PERSISTENT_MSG_STORE, MsgId), CleanShutdown, Del, RelSeq, Segment3), - {Segment4, DCountAcc2 + DCountDelta} - end, {Segment1 #segment { pubs = PubCount, - acks = AckCount }, DCountAcc}, + Segment4 + end, Segment1 #segment { pubs = PubCount, + acks = AckCount }, SegEntries), {segment_store(Segment2, Segments2), - CountAcc + PubCount1 - AckCount1, DCountAcc1} - end, {Segments, 0, DCount}, all_segment_nums(State2)); + CountAcc + PubCount1 - AckCount1} + end, {Segments, 0}, all_segment_nums(State2)); true -> - {Segments, undefined, DCount} + %% At this stage, we will only know about files that + %% were loaded during flushing. They *will* have + %% correct ack and pub counts, but for all remaining + %% segments, if they're not in the Segments store then + %% we need to add them and populate with saved data. + SegmentDictTerms = + dict:from_list(proplists:get_value(segments, Terms, [])), + {lists:foldl( + fun (Seg, SegmentsN) -> + case {segment_find(Seg, SegmentsN), + dict:find(Seg, SegmentDictTerms)} of + {error, {ok, {PubCount, AckCount}}} -> + Segment = segment_new(Seg, Dir), + segment_store( + Segment #segment { pubs = PubCount, + acks = AckCount }, + SegmentsN); + _ -> + SegmentsN + end + end, Segments, all_segment_nums(State2)), + undefined} end, - {Count, PRef, TRef, Terms, - State2 #qistate { segments = Segments1, dirty_count = DCount1 }}. + %% artificially set the dirty_count non zero and call flush again + State3 = flush_journal(State2 #qistate { segments = Segments1, + dirty_count = 1 }), + {Count, PRef, TRef, Terms, State3}. maybe_add_to_journal( true, true, _Del, _RelSeq, Segment) -> {Segment, 0}; @@ -689,14 +712,17 @@ terminate(StoreShutdown, Terms, State = undefined -> ok; _ -> file_handle_cache:close(JournalHdl) end, - ok = segment_fold( - fun (_Seg, #segment { handle = undefined }, ok) -> - ok; - (_Seg, #segment { handle = Hdl }, ok) -> - file_handle_cache:close(Hdl) - end, ok, Segments), + SegTerms = segment_fold( + fun (Seg, #segment { handle = Hdl, pubs = PubCount, + acks = AckCount }, SegTermsAcc) -> + ok = case Hdl of + undefined -> ok; + _ -> file_handle_cache:close(Hdl) + end, + [{Seg, {PubCount, AckCount}} | SegTermsAcc] + end, [], Segments), case StoreShutdown of - true -> store_clean_shutdown(Terms, Dir); + true -> store_clean_shutdown([{segments, SegTerms} | Terms], Dir); false -> ok end, State #qistate { journal_handle = undefined, segments = undefined }. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 788aeeddd3..6b8998c2e6 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1259,7 +1259,7 @@ test_queue_index() -> {0, 0, Qi1} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi0), {Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1), - {0, SegSize, Qi3} = + {0, SegmentSize, Qi3} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi2), {ReadA, Qi4} = rabbit_queue_index:read_segment_entries(0, Qi3), ok = verify_read_with_published(false, false, ReadA, @@ -1271,7 +1271,7 @@ test_queue_index() -> ok = start_transient_msg_store(), %% should get length back as 0, as all the msgs were transient {0, _PRef1, _TRef1, _Terms1, Qi6} = rabbit_queue_index:init(test_queue(), false), - {0, SegSize, Qi7} = + {0, 0, Qi7} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6), {Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7), {0, TwoSegs, Qi9} = |
