summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-06 18:13:51 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-06 18:13:51 +0100
commiteb8add1e061d65ba150935c2733abcb023643d81 (patch)
tree7f6cac524b7327b53bf38f35ec41d80e0927cddd
parent70c11fb19bbc6970950b29ada906eb42171cbe02 (diff)
downloadrabbitmq-server-git-eb8add1e061d65ba150935c2733abcb023643d81.tar.gz
Large accountancy bug in queue index leading to great confusion and indeed infinite loop. Fixed.
-rw-r--r--src/rabbit_queue_index.erl72
-rw-r--r--src/rabbit_tests.erl4
2 files changed, 51 insertions, 25 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 4887ec2160..ee4f05b4d2 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -243,8 +243,8 @@ init(Name, MsgStoreRecovered) ->
State1 = load_journal(State),
%% 2. Flush the journal. This makes life easier for everyone, as
%% it means there won't be any publishes in the journal alone.
- State2 = #qistate { dir = Dir, segments = Segments,
- dirty_count = DCount } = flush_journal(State1),
+ State2 = #qistate { dir = Dir, segments = Segments } =
+ flush_journal(State1),
%% 3. Load each segment in turn and filter out messages that are
%% not in the msg_store, by adding acks to the journal. These
%% acks only go to the RAM journal as it doesn't matter if we
@@ -254,36 +254,59 @@ init(Name, MsgStoreRecovered) ->
%% We know the journal is empty here, so we don't need to combine
%% with the journal, and we don't need to worry about messages
%% that have been acked.
- {Segments1, Count, DCount1} =
+ {Segments1, Count} =
case CleanShutdown andalso MsgStoreRecovered of
false ->
lists:foldl(
- fun (Seg, {Segments2, CountAcc, DCountAcc}) ->
+ fun (Seg, {Segments2, CountAcc}) ->
Segment = segment_find_or_new(Seg, Dir, Segments2),
{SegEntries, PubCount, AckCount, Segment1} =
load_segment(false, Segment),
- {Segment2 = #segment { pubs = PubCount1, acks = AckCount1 },
- DCountAcc1} =
+ Segment2 =
+ #segment { pubs = PubCount1, acks = AckCount1 } =
array:sparse_foldl(
fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack},
- {Segment3, DCountAcc2}) ->
- {Segment4, DCountDelta} =
+ Segment3) ->
+ {Segment4, _DCountDelta} =
maybe_add_to_journal(
rabbit_msg_store:contains(
?PERSISTENT_MSG_STORE, MsgId),
CleanShutdown, Del, RelSeq, Segment3),
- {Segment4, DCountAcc2 + DCountDelta}
- end, {Segment1 #segment { pubs = PubCount,
- acks = AckCount }, DCountAcc},
+ Segment4
+ end, Segment1 #segment { pubs = PubCount,
+ acks = AckCount },
SegEntries),
{segment_store(Segment2, Segments2),
- CountAcc + PubCount1 - AckCount1, DCountAcc1}
- end, {Segments, 0, DCount}, all_segment_nums(State2));
+ CountAcc + PubCount1 - AckCount1}
+ end, {Segments, 0}, all_segment_nums(State2));
true ->
- {Segments, undefined, DCount}
+ %% At this stage, we will only know about files that
+ %% were loaded during flushing. They *will* have
+ %% correct ack and pub counts, but for all remaining
+ %% segments, if they're not in the Segments store then
+ %% we need to add them and populate with saved data.
+ SegmentDictTerms =
+ dict:from_list(proplists:get_value(segments, Terms, [])),
+ {lists:foldl(
+ fun (Seg, SegmentsN) ->
+ case {segment_find(Seg, SegmentsN),
+ dict:find(Seg, SegmentDictTerms)} of
+ {error, {ok, {PubCount, AckCount}}} ->
+ Segment = segment_new(Seg, Dir),
+ segment_store(
+ Segment #segment { pubs = PubCount,
+ acks = AckCount },
+ SegmentsN);
+ _ ->
+ SegmentsN
+ end
+ end, Segments, all_segment_nums(State2)),
+ undefined}
end,
- {Count, PRef, TRef, Terms,
- State2 #qistate { segments = Segments1, dirty_count = DCount1 }}.
+ %% artificially set the dirty_count non zero and call flush again
+ State3 = flush_journal(State2 #qistate { segments = Segments1,
+ dirty_count = 1 }),
+ {Count, PRef, TRef, Terms, State3}.
maybe_add_to_journal( true, true, _Del, _RelSeq, Segment) ->
{Segment, 0};
@@ -689,14 +712,17 @@ terminate(StoreShutdown, Terms, State =
undefined -> ok;
_ -> file_handle_cache:close(JournalHdl)
end,
- ok = segment_fold(
- fun (_Seg, #segment { handle = undefined }, ok) ->
- ok;
- (_Seg, #segment { handle = Hdl }, ok) ->
- file_handle_cache:close(Hdl)
- end, ok, Segments),
+ SegTerms = segment_fold(
+ fun (Seg, #segment { handle = Hdl, pubs = PubCount,
+ acks = AckCount }, SegTermsAcc) ->
+ ok = case Hdl of
+ undefined -> ok;
+ _ -> file_handle_cache:close(Hdl)
+ end,
+ [{Seg, {PubCount, AckCount}} | SegTermsAcc]
+ end, [], Segments),
case StoreShutdown of
- true -> store_clean_shutdown(Terms, Dir);
+ true -> store_clean_shutdown([{segments, SegTerms} | Terms], Dir);
false -> ok
end,
State #qistate { journal_handle = undefined, segments = undefined }.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 788aeeddd3..6b8998c2e6 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1259,7 +1259,7 @@ test_queue_index() ->
{0, 0, Qi1} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi0),
{Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1),
- {0, SegSize, Qi3} =
+ {0, SegmentSize, Qi3} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi2),
{ReadA, Qi4} = rabbit_queue_index:read_segment_entries(0, Qi3),
ok = verify_read_with_published(false, false, ReadA,
@@ -1271,7 +1271,7 @@ test_queue_index() ->
ok = start_transient_msg_store(),
%% should get length back as 0, as all the msgs were transient
{0, _PRef1, _TRef1, _Terms1, Qi6} = rabbit_queue_index:init(test_queue(), false),
- {0, SegSize, Qi7} =
+ {0, 0, Qi7} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6),
{Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7),
{0, TwoSegs, Qi9} =