diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-14 15:51:50 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-14 15:51:50 +0100 |
| commit | 68ffe14519589afcd689141855602c3c94c16366 (patch) | |
| tree | ffeffd2dc86b4e771d108f6448adb956bb177011 | |
| parent | 9dfa3c6b67c7352407122f05f257adcaee842965 (diff) | |
| download | rabbitmq-server-git-68ffe14519589afcd689141855602c3c94c16366.tar.gz | |
the deletion of transient msgs via the scattering of the journal at startup was wrong because there is no guarantee that the journal will touch all the segments. Thus now in the formation of the ack counts, add deliveries at that point where necessary for transient msgs. Acks for these msgs are not added at this point because they need to go via the journal scattering mechanism so that full segments can be removed.
| -rw-r--r-- | src/rabbit_queue_index.erl | 121 |
1 files changed, 74 insertions, 47 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 10773c0cef..a38732bdba 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -161,17 +161,20 @@ init(Name) -> StrName = queue_name_to_dir_name(Name), Dir = filename:join(queues_dir(), StrName), ok = filelib:ensure_dir(filename:join(Dir, "nothing")), - {AckCounts, TotalMsgCount} = scatter_journal(Dir, find_ack_counts(Dir)), + {TotalMsgCount, AckCounts, TransientADict} = + find_ack_counts_and_deliver_transient_msgs(Dir), + {TotalMsgCount1, AckCounts1} = + scatter_journal(Dir, TotalMsgCount, AckCounts, TransientADict), {ok, JournalHdl} = file:open(filename:join(Dir, ?ACK_JOURNAL_FILENAME), [raw, binary, delayed_write, write, read]), - {TotalMsgCount, #qistate { dir = Dir, - cur_seg_num = undefined, - cur_seg_hdl = undefined, - journal_ack_count = 0, - journal_ack_dict = dict:new(), - journal_handle = JournalHdl, - seg_ack_counts = AckCounts - }}. + {TotalMsgCount1, #qistate { dir = Dir, + cur_seg_num = undefined, + cur_seg_hdl = undefined, + journal_ack_count = 0, + journal_ack_dict = dict:new(), + journal_handle = JournalHdl, + seg_ack_counts = AckCounts1 + }}. terminate(State = #qistate { journal_handle = undefined }) -> State; @@ -425,31 +428,45 @@ all_segment_nums_paths(Dir) -> SegName)), filename:join(Dir, SegName)} || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)]. -find_ack_counts(Dir) -> +find_ack_counts_and_deliver_transient_msgs(Dir) -> SegNumsPaths = all_segment_nums_paths(Dir), lists:foldl( - fun ({SegNum, SegPath}, {AccCount, AccDict}) -> + fun ({SegNum, SegPath}, {TotalMsgCount, AckCounts, TransientADict}) -> {SDict, AckCount, _HighRelSeq} = load_segment(SegNum, SegPath, dict:new()), - {dict:size(SDict) + AccCount, - case AckCount of - 0 -> AccDict; - _ -> dict:store(SegNum, AckCount, AccDict) - end} - end, {0, dict:new()}, SegNumsPaths). - -scatter_journal(Dir, {TotalMsgCount, AckCounts}) -> + TransientMsgsAcks = deliver_transient(SegPath, SDict), + %% ignore TransientMsgsAcks in AckCounts1 and + %% TotalMsgCount1 because the TransientMsgsAcks fall + %% through into scatter_journal at which point the + %% AckCounts and TotalMsgCount will be correctly + %% adjusted. + TotalMsgCount1 = TotalMsgCount + dict:size(SDict), + AckCounts1 = case AckCount of + 0 -> AckCounts; + N -> dict:store(SegNum, N, AckCounts) + end, + TransientADict1 = + case TransientMsgsAcks of + [] -> TransientADict; + _ -> dict:store(SegNum, TransientMsgsAcks, TransientADict) + end, + {TotalMsgCount1, AckCounts1, TransientADict1} + end, {0, dict:new(), dict:new()}, SegNumsPaths). + +scatter_journal(Dir, TotalMsgCount, AckCounts, TransientADict) -> JournalPath = filename:join(Dir, ?ACK_JOURNAL_FILENAME), case file:open(JournalPath, [read, read_ahead, raw, binary]) of {error, enoent} -> AckCounts; {ok, Hdl} -> - ADict = load_journal(Hdl, dict:new()), + %% ADict may well contain duplicates. However, this is ok, + %% due to the use of sets in replay_journal_acks_to_segment + ADict = load_journal(Hdl, TransientADict), ok = file:close(Hdl), - {AckCounts1, TotalMsgCount1, _Dir} = + {TotalMsgCount1, AckCounts1, _Dir} = dict:fold(fun replay_journal_acks_to_segment/3, - {AckCounts, TotalMsgCount, Dir}, ADict), + {TotalMsgCount, AckCounts, Dir}, ADict), ok = file:delete(JournalPath), - {AckCounts1, TotalMsgCount1} + {TotalMsgCount1, AckCounts1} end. load_journal(Hdl, ADict) -> @@ -459,27 +476,37 @@ load_journal(Hdl, ADict) -> _ErrOrEoF -> ADict end. -replay_journal_acks_to_segment(SegNum, Acks, {AckCounts, TotalMsgCount, Dir}) -> +replay_journal_acks_to_segment(_, [], Acc) -> + Acc; +replay_journal_acks_to_segment(SegNum, Acks, {TotalMsgCount, AckCounts, Dir}) -> SegPath = seg_num_to_path(Dir, SegNum), + %% supply empty dict so that we get all msgs in SDict that have + %% not been acked in the segment file itself {SDict, _AckCount, _HighRelSeq} = load_segment(SegNum, SegPath, dict:new()), ValidRelSeqIds = dict:fetch_keys(SDict), - ValidAcks = sets:intersection(sets:from_list(ValidRelSeqIds), - sets:from_list(Acks)), - AcksToAppend = deliver_and_ack_transient(SDict, sets:to_list(ValidAcks)), - {append_acks_to_segment(SegPath, SegNum, AckCounts, AcksToAppend), - TotalMsgCount - sets:size(ValidAcks), Dir}. - -deliver_and_ack_transient(SDict, Acks) -> - %% because an Ack entry and a Delivered entry are identical, we - %% simply add the RelSeq twice to the accumulator for transient - %% msgs that have not yet been delivered. - dict:fold(fun (_RelSeq, {_MsgId, _IsDelivered, true }, Acc) -> - Acc; - (RelSeq, {_MsgId, true, false}, Acc) -> - [RelSeq | Acc]; - (RelSeq, {_MsgId, false, false}, Acc) -> - [RelSeq, RelSeq | Acc] - end, Acks, SDict). + ValidAcks = sets:to_list(sets:intersection(sets:from_list(ValidRelSeqIds), + sets:from_list(Acks))), + %% ValidAcks will not contain any duplicates at this point. + {TotalMsgCount - length(ValidAcks), + append_acks_to_segment(SegPath, SegNum, AckCounts, ValidAcks), Dir}. + +deliver_transient(SegPath, SDict) -> + {AckMe, DeliverMe} = + dict:fold( + fun (_RelSeq, {_MsgId, _IsDelivered, true}, Acc) -> + Acc; + (RelSeq, {_MsgId, false, false}, {AckMeAcc, DeliverMeAcc}) -> + {[RelSeq | AckMeAcc], [RelSeq | DeliverMeAcc]}; + (RelSeq, {_MsgId, true, false}, {AckMeAcc, DeliverMeAcc}) -> + {[RelSeq | AckMeAcc], DeliverMeAcc} + end, {[], []}, SDict), + {ok, Hdl} = file:open(SegPath, [binary, raw, write, delayed_write, read]), + {ok, _} = file:position(Hdl, {eof, 0}), + ok = file:write(Hdl, [ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>> || RelSeq <- DeliverMe ]), + ok = file:sync(Hdl), + ok = file:close(Hdl), + AckMe. %%---------------------------------------------------------------------------- %% Loading Segments @@ -490,7 +517,7 @@ load_segment(SegNum, SegPath, JAckDict) -> {error, enoent} -> {dict:new(), 0, 0}; {ok, Hdl} -> {SDict, AckCount, HighRelSeq} = - load_segment_entries(SegNum, Hdl, {dict:new(), 0, 0}), + load_segment_entries(Hdl, dict:new(), 0, 0), ok = file:close(Hdl), RelSeqs = case dict:find(SegNum, JAckDict) of {ok, RelSeqs1} -> RelSeqs1; @@ -503,14 +530,14 @@ load_segment(SegNum, SegPath, JAckDict) -> {SDict1, AckCount1, HighRelSeq} end. -load_segment_entries(SegNum, Hdl, {SDict, AckCount, HighRelSeq}) -> +load_segment_entries(Hdl, SDict, AckCount, HighRelSeq) -> case file:read(Hdl, 1) of {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>} -> {ok, LSB} = file:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1), <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, {SDict1, AckCount1} = deliver_or_ack_msg(SDict, AckCount, RelSeq), - load_segment_entries(SegNum, Hdl, {SDict1, AckCount1, HighRelSeq}); + load_segment_entries(Hdl, SDict1, AckCount1, HighRelSeq); {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>} -> %% because we specify /binary, and binaries are complete @@ -520,9 +547,9 @@ load_segment_entries(SegNum, Hdl, {SDict, AckCount, HighRelSeq}) -> <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, HighRelSeq1 = lists:max([RelSeq, HighRelSeq]), load_segment_entries( - SegNum, Hdl, {dict:store(RelSeq, {MsgId, false, - 1 == IsPersistentNum}, - SDict), AckCount, HighRelSeq1}); + Hdl, dict:store(RelSeq, {MsgId, false, + 1 == IsPersistentNum}, + SDict), AckCount, HighRelSeq1); _ErrOrEoF -> {SDict, AckCount, HighRelSeq} end. |
