summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-14 15:51:50 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-14 15:51:50 +0100
commit68ffe14519589afcd689141855602c3c94c16366 (patch)
treeffeffd2dc86b4e771d108f6448adb956bb177011
parent9dfa3c6b67c7352407122f05f257adcaee842965 (diff)
downloadrabbitmq-server-git-68ffe14519589afcd689141855602c3c94c16366.tar.gz
the deletion of transient msgs via the scattering of the journal at startup was wrong because there is no guarantee that the journal will touch all the segments. Thus now in the formation of the ack counts, add deliveries at that point where necessary for transient msgs. Acks for these msgs are not added at this point because they need to go via the journal scattering mechanism so that full segments can be removed.
-rw-r--r--src/rabbit_queue_index.erl121
1 files changed, 74 insertions, 47 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 10773c0cef..a38732bdba 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -161,17 +161,20 @@ init(Name) ->
StrName = queue_name_to_dir_name(Name),
Dir = filename:join(queues_dir(), StrName),
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
- {AckCounts, TotalMsgCount} = scatter_journal(Dir, find_ack_counts(Dir)),
+ {TotalMsgCount, AckCounts, TransientADict} =
+ find_ack_counts_and_deliver_transient_msgs(Dir),
+ {TotalMsgCount1, AckCounts1} =
+ scatter_journal(Dir, TotalMsgCount, AckCounts, TransientADict),
{ok, JournalHdl} = file:open(filename:join(Dir, ?ACK_JOURNAL_FILENAME),
[raw, binary, delayed_write, write, read]),
- {TotalMsgCount, #qistate { dir = Dir,
- cur_seg_num = undefined,
- cur_seg_hdl = undefined,
- journal_ack_count = 0,
- journal_ack_dict = dict:new(),
- journal_handle = JournalHdl,
- seg_ack_counts = AckCounts
- }}.
+ {TotalMsgCount1, #qistate { dir = Dir,
+ cur_seg_num = undefined,
+ cur_seg_hdl = undefined,
+ journal_ack_count = 0,
+ journal_ack_dict = dict:new(),
+ journal_handle = JournalHdl,
+ seg_ack_counts = AckCounts1
+ }}.
terminate(State = #qistate { journal_handle = undefined }) ->
State;
@@ -425,31 +428,45 @@ all_segment_nums_paths(Dir) ->
SegName)), filename:join(Dir, SegName)}
|| SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)].
-find_ack_counts(Dir) ->
+find_ack_counts_and_deliver_transient_msgs(Dir) ->
SegNumsPaths = all_segment_nums_paths(Dir),
lists:foldl(
- fun ({SegNum, SegPath}, {AccCount, AccDict}) ->
+ fun ({SegNum, SegPath}, {TotalMsgCount, AckCounts, TransientADict}) ->
{SDict, AckCount, _HighRelSeq} =
load_segment(SegNum, SegPath, dict:new()),
- {dict:size(SDict) + AccCount,
- case AckCount of
- 0 -> AccDict;
- _ -> dict:store(SegNum, AckCount, AccDict)
- end}
- end, {0, dict:new()}, SegNumsPaths).
-
-scatter_journal(Dir, {TotalMsgCount, AckCounts}) ->
+ TransientMsgsAcks = deliver_transient(SegPath, SDict),
+ %% ignore TransientMsgsAcks in AckCounts1 and
+ %% TotalMsgCount1 because the TransientMsgsAcks fall
+ %% through into scatter_journal at which point the
+ %% AckCounts and TotalMsgCount will be correctly
+ %% adjusted.
+ TotalMsgCount1 = TotalMsgCount + dict:size(SDict),
+ AckCounts1 = case AckCount of
+ 0 -> AckCounts;
+ N -> dict:store(SegNum, N, AckCounts)
+ end,
+ TransientADict1 =
+ case TransientMsgsAcks of
+ [] -> TransientADict;
+ _ -> dict:store(SegNum, TransientMsgsAcks, TransientADict)
+ end,
+ {TotalMsgCount1, AckCounts1, TransientADict1}
+ end, {0, dict:new(), dict:new()}, SegNumsPaths).
+
+scatter_journal(Dir, TotalMsgCount, AckCounts, TransientADict) ->
JournalPath = filename:join(Dir, ?ACK_JOURNAL_FILENAME),
case file:open(JournalPath, [read, read_ahead, raw, binary]) of
{error, enoent} -> AckCounts;
{ok, Hdl} ->
- ADict = load_journal(Hdl, dict:new()),
+ %% ADict may well contain duplicates. However, this is ok,
+ %% due to the use of sets in replay_journal_acks_to_segment
+ ADict = load_journal(Hdl, TransientADict),
ok = file:close(Hdl),
- {AckCounts1, TotalMsgCount1, _Dir} =
+ {TotalMsgCount1, AckCounts1, _Dir} =
dict:fold(fun replay_journal_acks_to_segment/3,
- {AckCounts, TotalMsgCount, Dir}, ADict),
+ {TotalMsgCount, AckCounts, Dir}, ADict),
ok = file:delete(JournalPath),
- {AckCounts1, TotalMsgCount1}
+ {TotalMsgCount1, AckCounts1}
end.
load_journal(Hdl, ADict) ->
@@ -459,27 +476,37 @@ load_journal(Hdl, ADict) ->
_ErrOrEoF -> ADict
end.
-replay_journal_acks_to_segment(SegNum, Acks, {AckCounts, TotalMsgCount, Dir}) ->
+replay_journal_acks_to_segment(_, [], Acc) ->
+ Acc;
+replay_journal_acks_to_segment(SegNum, Acks, {TotalMsgCount, AckCounts, Dir}) ->
SegPath = seg_num_to_path(Dir, SegNum),
+ %% supply empty dict so that we get all msgs in SDict that have
+ %% not been acked in the segment file itself
{SDict, _AckCount, _HighRelSeq} = load_segment(SegNum, SegPath, dict:new()),
ValidRelSeqIds = dict:fetch_keys(SDict),
- ValidAcks = sets:intersection(sets:from_list(ValidRelSeqIds),
- sets:from_list(Acks)),
- AcksToAppend = deliver_and_ack_transient(SDict, sets:to_list(ValidAcks)),
- {append_acks_to_segment(SegPath, SegNum, AckCounts, AcksToAppend),
- TotalMsgCount - sets:size(ValidAcks), Dir}.
-
-deliver_and_ack_transient(SDict, Acks) ->
- %% because an Ack entry and a Delivered entry are identical, we
- %% simply add the RelSeq twice to the accumulator for transient
- %% msgs that have not yet been delivered.
- dict:fold(fun (_RelSeq, {_MsgId, _IsDelivered, true }, Acc) ->
- Acc;
- (RelSeq, {_MsgId, true, false}, Acc) ->
- [RelSeq | Acc];
- (RelSeq, {_MsgId, false, false}, Acc) ->
- [RelSeq, RelSeq | Acc]
- end, Acks, SDict).
+ ValidAcks = sets:to_list(sets:intersection(sets:from_list(ValidRelSeqIds),
+ sets:from_list(Acks))),
+ %% ValidAcks will not contain any duplicates at this point.
+ {TotalMsgCount - length(ValidAcks),
+ append_acks_to_segment(SegPath, SegNum, AckCounts, ValidAcks), Dir}.
+
+deliver_transient(SegPath, SDict) ->
+ {AckMe, DeliverMe} =
+ dict:fold(
+ fun (_RelSeq, {_MsgId, _IsDelivered, true}, Acc) ->
+ Acc;
+ (RelSeq, {_MsgId, false, false}, {AckMeAcc, DeliverMeAcc}) ->
+ {[RelSeq | AckMeAcc], [RelSeq | DeliverMeAcc]};
+ (RelSeq, {_MsgId, true, false}, {AckMeAcc, DeliverMeAcc}) ->
+ {[RelSeq | AckMeAcc], DeliverMeAcc}
+ end, {[], []}, SDict),
+ {ok, Hdl} = file:open(SegPath, [binary, raw, write, delayed_write, read]),
+ {ok, _} = file:position(Hdl, {eof, 0}),
+ ok = file:write(Hdl, [ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>> || RelSeq <- DeliverMe ]),
+ ok = file:sync(Hdl),
+ ok = file:close(Hdl),
+ AckMe.
%%----------------------------------------------------------------------------
%% Loading Segments
@@ -490,7 +517,7 @@ load_segment(SegNum, SegPath, JAckDict) ->
{error, enoent} -> {dict:new(), 0, 0};
{ok, Hdl} ->
{SDict, AckCount, HighRelSeq} =
- load_segment_entries(SegNum, Hdl, {dict:new(), 0, 0}),
+ load_segment_entries(Hdl, dict:new(), 0, 0),
ok = file:close(Hdl),
RelSeqs = case dict:find(SegNum, JAckDict) of
{ok, RelSeqs1} -> RelSeqs1;
@@ -503,14 +530,14 @@ load_segment(SegNum, SegPath, JAckDict) ->
{SDict1, AckCount1, HighRelSeq}
end.
-load_segment_entries(SegNum, Hdl, {SDict, AckCount, HighRelSeq}) ->
+load_segment_entries(Hdl, SDict, AckCount, HighRelSeq) ->
case file:read(Hdl, 1) of
{ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>} ->
{ok, LSB} = file:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1),
<<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>,
{SDict1, AckCount1} = deliver_or_ack_msg(SDict, AckCount, RelSeq),
- load_segment_entries(SegNum, Hdl, {SDict1, AckCount1, HighRelSeq});
+ load_segment_entries(Hdl, SDict1, AckCount1, HighRelSeq);
{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>} ->
%% because we specify /binary, and binaries are complete
@@ -520,9 +547,9 @@ load_segment_entries(SegNum, Hdl, {SDict, AckCount, HighRelSeq}) ->
<<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>,
HighRelSeq1 = lists:max([RelSeq, HighRelSeq]),
load_segment_entries(
- SegNum, Hdl, {dict:store(RelSeq, {MsgId, false,
- 1 == IsPersistentNum},
- SDict), AckCount, HighRelSeq1});
+ Hdl, dict:store(RelSeq, {MsgId, false,
+ 1 == IsPersistentNum},
+ SDict), AckCount, HighRelSeq1);
_ErrOrEoF -> {SDict, AckCount, HighRelSeq}
end.