diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-22 12:15:38 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-22 12:15:38 +0100 |
| commit | 0056ad665a93ab048f6dce0c065ac7664d57f551 (patch) | |
| tree | 0416c164b4dca5610b373e4dc0e4b5431717b87b | |
| parent | fafc70a12da00e750a2090c7375a4c7170888be6 (diff) | |
| download | rabbitmq-server-git-0056ad665a93ab048f6dce0c065ac7664d57f551.tar.gz | |
well it's now much less readable. But at least the lines are shorter.
| -rw-r--r-- | src/rabbit_disk_queue.erl | 747 |
1 files changed, 437 insertions, 310 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index c56b14d5b8..190c06f00b 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -59,22 +59,23 @@ -define(SERVER, ?MODULE). --record(dqstate, {msg_location, - file_summary, - sequences, - current_file_num, - current_file_name, - current_file_handle, - current_offset, - file_size_limit, - read_file_handles, - read_file_handles_limit +-record(dqstate, {msg_location, % where are messages? + file_summary, %% what's in the files? + sequences, %% next read and write for each q + current_file_num, %% current file name as number + current_file_name, %% current file name + current_file_handle, %% current file handle + current_offset, %% current offset within current file + file_size_limit, %% how big can our files get? + read_file_handles, %% file handles for reading (LRU) + read_file_handles_limit %% how many file handles can we open? }). %% ---- PUBLIC API ---- start_link(FileSizeLimit, ReadFileHandlesLimit) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [FileSizeLimit, ReadFileHandlesLimit], []). + gen_server:start_link({local, ?SERVER}, ?MODULE, + [FileSizeLimit, ReadFileHandlesLimit], []). publish(Q, MsgId, Msg) when is_binary(Msg) -> gen_server:cast(?SERVER, {publish, Q, MsgId, Msg}). @@ -106,28 +107,35 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> process_flag(trap_exit, true), ok = filelib:ensure_dir(form_filename("nothing")), InitName = "0" ++ ?FILE_EXTENSION, - {ok, MsgLocation} = dets:open_file(?MSG_LOC_DETS_NAME, - [{file, form_filename(atom_to_list(?MSG_LOC_DETS_NAME) ++ ?FILE_EXTENSION_DETS)}, - {min_no_slots, 1024*1024}, - % man says this should be <= 32M. But it works... - {max_no_slots, 1024*1024*1024}, - {type, set} - ]), - State = #dqstate { msg_location = MsgLocation, - file_summary = ets:new(?FILE_SUMMARY_ETS_NAME, [set, private]), - sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]), - current_file_num = 0, - current_file_name = InitName, - current_file_handle = undefined, - current_offset = 0, - file_size_limit = FileSizeLimit, - read_file_handles = {dict:new(), gb_trees:empty()}, - read_file_handles_limit = ReadFileHandlesLimit - }, + {ok, MsgLocation} + = dets:open_file(?MSG_LOC_DETS_NAME, + [{file, form_filename(atom_to_list(?MSG_LOC_DETS_NAME) + ++ ?FILE_EXTENSION_DETS)}, + {min_no_slots, 1024*1024}, + %% man says this should be <= 32M. But it works... + {max_no_slots, 1024*1024*1024}, + {type, set} + ]), + State + = #dqstate { msg_location = MsgLocation, + file_summary = ets:new(?FILE_SUMMARY_ETS_NAME, + [set, private]), + sequences = ets:new(?SEQUENCE_ETS_NAME, + [set, private]), + current_file_num = 0, + current_file_name = InitName, + current_file_handle = undefined, + current_offset = 0, + file_size_limit = FileSizeLimit, + read_file_handles = {dict:new(), gb_trees:empty()}, + read_file_handles_limit = ReadFileHandlesLimit + }, {ok, State1 = #dqstate { current_file_name = CurrentName, - current_offset = Offset } } = load_from_disk(State), + current_offset = Offset } } + = load_from_disk(State), Path = form_filename(CurrentName), - {ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]), %% read only needed so that we can seek + %% read is only needed so that we can seek + {ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]), {ok, Offset} = file:position(FileHdl, {bof, Offset}), {ok, State1 # dqstate { current_file_handle = FileHdl }}. @@ -147,8 +155,9 @@ handle_call(clean_stop, _From, State) -> true = ets:delete(FileSummary), true = ets:delete(Sequences), lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))), - {stop, normal, ok, State1 # dqstate { current_file_handle = undefined, - read_file_handles = {dict:new(), gb_trees:empty()}}}. + {stop, normal, ok, + State1 # dqstate { current_file_handle = undefined, + read_file_handles = {dict:new(), gb_trees:empty()}}}. %% gen_server now calls terminate, which then calls shutdown handle_cast({publish, Q, MsgId, MsgBody}, State) -> @@ -174,9 +183,10 @@ shutdown(State = #dqstate { msg_location = MsgLocation, current_file_handle = FileHdl, read_file_handles = {ReadHdls, _ReadHdlsAge} }) -> - % deliberately ignoring return codes here + %% deliberately ignoring return codes here dets:close(MsgLocation), - file:delete(form_filename(atom_to_list(?MSG_LOC_DETS_NAME) ++ ?FILE_EXTENSION_DETS)), + file:delete(form_filename(atom_to_list(?MSG_LOC_DETS_NAME) + ++ ?FILE_EXTENSION_DETS)), if FileHdl =:= undefined -> ok; true -> file:sync(FileHdl), file:close(FileHdl) @@ -200,42 +210,56 @@ base_directory() -> %% ---- INTERNAL RAW FUNCTIONS ---- -internal_deliver(Q, State = #dqstate { msg_location = MsgLocation, - sequences = Sequences, - read_file_handles_limit = ReadFileHandlesLimit, - read_file_handles = {ReadHdls, ReadHdlsAge} - }) -> +internal_deliver(Q, State + = #dqstate { msg_location = MsgLocation, + sequences = Sequences, + read_file_handles_limit = ReadFileHandlesLimit, + read_file_handles = {ReadHdls, ReadHdlsAge} + }) -> case ets:lookup(Sequences, Q) of [] -> {ok, empty, State}; [{Q, ReadSeqId, WriteSeqId}] -> case mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}) of [] -> {ok, empty, State}; - [Obj = #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] -> - [{MsgId, _RefCount, File, Offset, TotalSize}] = dets:lookup(MsgLocation, MsgId), + [Obj + = #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] -> + [{MsgId, _RefCount, File, Offset, TotalSize}] + = dets:lookup(MsgLocation, MsgId), {FileHdl, ReadHdls1, ReadHdlsAge1} = case dict:find(File, ReadHdls) of error -> - {ok, Hdl} = file:open(form_filename(File), [read, raw, binary, read_ahead]), + {ok, Hdl} = file:open(form_filename(File), + [read, raw, binary, + read_ahead]), Now = now(), case dict:size(ReadHdls) < ReadFileHandlesLimit of true -> - {Hdl, dict:store(File, {Hdl, Now}, ReadHdls), gb_trees:enter(Now, File, ReadHdlsAge)}; + {Hdl, + dict:store(File, {Hdl, Now}, ReadHdls), + gb_trees:enter(Now, File, ReadHdlsAge)}; _False -> - {_Then, OldFile, ReadHdlsAge2} = gb_trees:take_smallest(ReadHdlsAge), - {ok, {OldHdl, _Then}} = dict:find(OldFile, ReadHdls), + {_Then, OldFile, ReadHdlsAge2} + = gb_trees:take_smallest(ReadHdlsAge), + {ok, {OldHdl, _Then}} + = dict:find(OldFile, ReadHdls), ok = file:close(OldHdl), ReadHdls2 = dict:erase(OldFile, ReadHdls), - {Hdl, dict:store(File, {Hdl, Now}, ReadHdls2), gb_trees:enter(Now, File, ReadHdlsAge2)} + {Hdl, + dict:store(File, {Hdl, Now}, ReadHdls2), + gb_trees:enter(Now, File, ReadHdlsAge2)} end; {ok, {Hdl, Then}} -> Now = now(), {Hdl, dict:store(File, {Hdl, Now}, ReadHdls), - gb_trees:enter(Now, File, gb_trees:delete(Then, ReadHdlsAge))} + gb_trees:enter(Now, File, + gb_trees:delete(Then, ReadHdlsAge))} end, - % read the message - {ok, {MsgBody, BodySize}} = read_message_at_offset(FileHdl, Offset, TotalSize), + %% read the message + {ok, {MsgBody, BodySize}} + = read_message_at_offset(FileHdl, Offset, TotalSize), if Delivered -> ok; - true -> ok = mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc {is_delivered = true}) + true -> ok = mnesia:dirty_write(rabbit_disk_queue, + Obj #dq_msg_loc {is_delivered = true}) end, true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}), {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}}, @@ -249,71 +273,83 @@ internal_ack(Q, MsgIds, State) -> %% Q is only needed if MnesiaDelete = true %% called from tx_cancel with MnesiaDelete = false %% called from ack with MnesiaDelete = true -remove_messages(Q, MsgSeqIds, MnesiaDelete, State = # dqstate { msg_location = MsgLocation, - file_summary = FileSummary, - current_file_name = CurName - }) -> +remove_messages(Q, MsgSeqIds, MnesiaDelete, + State = # dqstate { msg_location = MsgLocation, + file_summary = FileSummary, + current_file_name = CurName + }) -> Files - = lists:foldl(fun ({MsgId, SeqId}, Files2) -> - [{MsgId, RefCount, File, Offset, TotalSize}] - = dets:lookup(MsgLocation, MsgId), - Files3 = - if 1 =:= RefCount -> - ok = dets:delete(MsgLocation, MsgId), - [{File, ValidTotalSize, ContiguousTop, Left, Right}] - = ets:lookup(FileSummary, File), - ContiguousTop1 = lists:min([ContiguousTop, Offset]), - true = ets:insert(FileSummary, - {File, (ValidTotalSize - TotalSize - ?FILE_PACKING_ADJUSTMENT), - ContiguousTop1, Left, Right}), - if CurName =:= File -> Files2; - true -> sets:add_element(File, Files2) - end; - 1 < RefCount -> - ok = dets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}), - Files2 - end, - if MnesiaDelete -> - ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}); - true -> ok - end, - Files3 - end, sets:new(), MsgSeqIds), + = lists:foldl( + fun ({MsgId, SeqId}, Files2) -> + [{MsgId, RefCount, File, Offset, TotalSize}] + = dets:lookup(MsgLocation, MsgId), + Files3 + = if 1 =:= RefCount -> + ok = dets:delete(MsgLocation, MsgId), + [{File, ValidTotalSize, ContiguousTop, Left, Right}] + = ets:lookup(FileSummary, File), + ContiguousTop1 = lists:min([ContiguousTop, Offset]), + true = ets:insert(FileSummary, + {File, (ValidTotalSize - TotalSize + - ?FILE_PACKING_ADJUSTMENT), + ContiguousTop1, Left, Right}), + if CurName =:= File -> Files2; + true -> sets:add_element(File, Files2) + end; + 1 < RefCount -> + ok = dets:insert(MsgLocation, {MsgId, RefCount - 1, + File, Offset, TotalSize}), + Files2 + end, + if MnesiaDelete -> + ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}); + true -> ok + end, + Files3 + end, sets:new(), MsgSeqIds), State2 = compact(Files, State), {ok, State2}. -internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocation, - current_file_handle = CurHdl, - current_file_name = CurName, - current_offset = CurOffset, - file_summary = FileSummary - }) -> +internal_tx_publish(MsgId, MsgBody, + State = #dqstate { msg_location = MsgLocation, + current_file_handle = CurHdl, + current_file_name = CurName, + current_offset = CurOffset, + file_summary = FileSummary + }) -> case dets:lookup(MsgLocation, MsgId) of [] -> - % New message, lots to do + %% New message, lots to do {ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody), - true = dets:insert_new(MsgLocation, {MsgId, 1, CurName, CurOffset, TotalSize}), + true = dets:insert_new(MsgLocation, {MsgId, 1, CurName, + CurOffset, TotalSize}), [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] = ets:lookup(FileSummary, CurName), - ValidTotalSize1 = ValidTotalSize + TotalSize + ?FILE_PACKING_ADJUSTMENT, + ValidTotalSize1 = ValidTotalSize + TotalSize + + ?FILE_PACKING_ADJUSTMENT, ContiguousTop1 = if CurOffset =:= ContiguousTop -> - ValidTotalSize1; % can't be any holes in this file + %% can't be any holes in this file + ValidTotalSize1; true -> ContiguousTop end, - true = ets:insert(FileSummary, {CurName, ValidTotalSize1, ContiguousTop1, Left, undefined}), - maybe_roll_to_new_file(CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT, - State # dqstate {current_offset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT}); + true = ets:insert(FileSummary, {CurName, ValidTotalSize1, + ContiguousTop1, Left, undefined}), + NextOffset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT, + maybe_roll_to_new_file(NextOffset, + State # dqstate {current_offset = NextOffset}); [{MsgId, RefCount, File, Offset, TotalSize}] -> - % We already know about it, just update counter - ok = dets:insert(MsgLocation, {MsgId, RefCount + 1, File, Offset, TotalSize}), + %% We already know about it, just update counter + ok = dets:insert(MsgLocation, {MsgId, RefCount + 1, File, + Offset, TotalSize}), {ok, State} end. -internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation, - current_file_handle = CurHdl, - current_file_name = CurName, - sequences = Sequences - }) -> +internal_tx_commit(Q, MsgIds, + State = #dqstate { msg_location = MsgLocation, + current_file_handle = CurHdl, + current_file_name = CurName, + sequences = Sequences + }) -> {ReadSeqId, InitWriteSeqId} = case ets:lookup(Sequences, Q) of [] -> {0,0}; @@ -322,15 +358,18 @@ internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation, {atomic, {Sync, WriteSeqId}} = mnesia:transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), - lists:foldl(fun (MsgId, {Acc, NextWriteSeqId}) -> - [{MsgId, _RefCount, File, _Offset, _TotalSize}] = - dets:lookup(MsgLocation, MsgId), - ok = mnesia:write(rabbit_disk_queue, - #dq_msg_loc { queue_and_seq_id = {Q, NextWriteSeqId}, - msg_id = MsgId, is_delivered = false}, - write), - {Acc or (CurName =:= File), NextWriteSeqId + 1} - end, {false, InitWriteSeqId}, MsgIds) + lists:foldl( + fun (MsgId, {Acc, NextWriteSeqId}) -> + [{MsgId, _RefCount, File, _Offset, _TotalSize}] + = dets:lookup(MsgLocation, MsgId), + ok = mnesia:write(rabbit_disk_queue, + #dq_msg_loc { queue_and_seq_id + = {Q, NextWriteSeqId}, + msg_id = MsgId, + is_delivered = false}, + write), + {Acc or (CurName =:= File), NextWriteSeqId + 1} + end, {false, InitWriteSeqId}, MsgIds) end), true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId}), if Sync -> ok = file:sync(CurHdl); @@ -339,41 +378,46 @@ internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation, {ok, State}. internal_publish(Q, MsgId, MsgBody, State) -> - {ok, State1 = #dqstate { sequences = Sequences }} = internal_tx_publish(MsgId, MsgBody, State), + {ok, State1 = #dqstate { sequences = Sequences }} + = internal_tx_publish(MsgId, MsgBody, State), WriteSeqId = case ets:lookup(Sequences, Q) of - [] -> % previously unseen queue + [] -> %% previously unseen queue true = ets:insert_new(Sequences, {Q, 0, 1}), 0; [{Q, ReadSeqId, WriteSeqId2}] -> - true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId2 +1}), + true = ets:insert(Sequences, {Q, ReadSeqId, + WriteSeqId2 +1}), WriteSeqId2 end, - ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId}, - msg_id = MsgId, - is_delivered = false}), + ok = mnesia:dirty_write(rabbit_disk_queue, + #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId}, + msg_id = MsgId, + is_delivered = false}), {ok, State1}. internal_tx_cancel(MsgIds, State) -> - % we don't need seq ids because we're not touching mnesia, because seqids were - % never assigned + %% we don't need seq ids because we're not touching mnesia, + %% because seqids were never assigned MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)), remove_messages(undefined, MsgSeqIds, false, State). %% ---- ROLLING OVER THE APPEND FILE ---- -maybe_roll_to_new_file(Offset, State = #dqstate { file_size_limit = FileSizeLimit, - current_file_name = CurName, - current_file_handle = CurHdl, - current_file_num = CurNum, - file_summary = FileSummary - } +maybe_roll_to_new_file(Offset, + State = #dqstate { file_size_limit = FileSizeLimit, + current_file_name = CurName, + current_file_handle = CurHdl, + current_file_num = CurNum, + file_summary = FileSummary + } ) when Offset >= FileSizeLimit -> ok = file:sync(CurHdl), ok = file:close(CurHdl), NextNum = CurNum + 1, NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION, - {ok, NextHdl} = file:open(form_filename(NextName), [write, raw, binary, delayed_write]), - true = ets:update_element(FileSummary, CurName, {5, NextName}), % 5 is Right + {ok, NextHdl} = file:open(form_filename(NextName), + [write, raw, binary, delayed_write]), + true = ets:update_element(FileSummary, CurName, {5, NextName}), %% 5 is Right true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}), {ok, State # dqstate { current_file_name = NextName, current_file_handle = NextHdl, @@ -386,55 +430,70 @@ maybe_roll_to_new_file(_, State) -> %% ---- GARBAGE COLLECTION / COMPACTION / AGGREGATION ---- compact(FilesSet, State) -> - % smallest number, hence eldest, hence left-most, first + %% smallest number, hence eldest, hence left-most, first Files = lists:sort(sets:to_list(FilesSet)), - % foldl reverses, so now youngest/right-most first - RemainingFiles = lists:foldl(fun (File, Acc) -> delete_empty_files(File, Acc, State) end, [], Files), + %% foldl reverses, so now youngest/right-most first + RemainingFiles = lists:foldl(fun (File, Acc) -> + delete_empty_files(File, Acc, State) + end, [], Files), lists:foldl(fun combineFile/2, State, lists:reverse(RemainingFiles)). combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit, file_summary = FileSummary, current_file_name = CurName }) -> - % the file we're looking at may no longer exist as it may have been deleted - % within the current GC run + %% the file we're looking at may no longer exist as it may have been deleted + %% within the current GC run case ets:lookup(FileSummary, File) of [] -> State; [FileObj = {File, ValidData, _ContiguousTop, Left, Right}] -> - GoRight = fun() -> - case Right of - undefined -> State; - _ when not(CurName =:= Right) -> - [RightObj = {Right, RightValidData, _RightContiguousTop, File, RightRight}] - = ets:lookup(FileSummary, Right), - RightSumData = ValidData + RightValidData, - if FileSizeLimit >= RightSumData -> - % here, Right will be the source and so will be deleted, - % File will be the destination - State1 = combineFiles(RightObj, FileObj, State), - % this could fail if RightRight is undefined - ets:update_element(FileSummary, RightRight, {4, File}), % left is the 4th field - true = ets:insert(FileSummary, {File, RightSumData, RightSumData, Left, RightRight}), - true = ets:delete(FileSummary, Right), - State1; - true -> State - end; - _ -> State - end - end, + GoRight + = fun() -> + case Right of + undefined -> State; + _ when not(CurName =:= Right) -> + [RightObj = {Right, RightValidData, + _RightContiguousTop, File, RightRight}] + = ets:lookup(FileSummary, Right), + RightSumData = ValidData + RightValidData, + if FileSizeLimit >= RightSumData -> + %% here, Right will be the source and so will be deleted, + %% File will be the destination + State1 = combineFiles(RightObj, FileObj, + State), + %% this could fail if RightRight is undefined + %% left is the 4th field + ets:update_element(FileSummary, + RightRight, {4, File}), + true = ets:insert(FileSummary, {File, + RightSumData, + RightSumData, + Left, + RightRight}), + true = ets:delete(FileSummary, Right), + State1; + true -> State + end; + _ -> State + end + end, case Left of undefined -> GoRight(); - _ -> [LeftObj = {Left, LeftValidData, _LeftContiguousTop, LeftLeft, File}] + _ -> [LeftObj + = {Left, LeftValidData, _LeftContiguousTop, LeftLeft, File}] = ets:lookup(FileSummary, Left), LeftSumData = ValidData + LeftValidData, if FileSizeLimit >= LeftSumData -> - % here, File will be the source and so will be deleted, - % Left will be the destination + %% here, File will be the source and so will be deleted, + %% Left will be the destination State1 = combineFiles(FileObj, LeftObj, State), - % this could fail if Right is undefined - ets:update_element(FileSummary, Right, {4, Left}), % left is the 4th field - true = ets:insert(FileSummary, {Left, LeftSumData, LeftSumData, LeftLeft, Right}), + %% this could fail if Right is undefined + %% left is the 4th field + ets:update_element(FileSummary, Right, {4, Left}), + true = ets:insert(FileSummary, {Left, LeftSumData, + LeftSumData, + LeftLeft, Right}), true = ets:delete(FileSummary, File), State1; true -> @@ -443,113 +502,148 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit, end end. -combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRight}, - {Destination, DestinationValid, DestinationContiguousTop, _DestinationLeft, _DestinationRight}, +combineFiles({Source, SourceValid, _SourceContiguousTop, + _SourceLeft, _SourceRight}, + {Destination, DestinationValid, DestinationContiguousTop, + _DestinationLeft, _DestinationRight}, State1) -> (State = #dqstate { msg_location = MsgLocation }) = closeFile(Source, closeFile(Destination, State1)), - {ok, SourceHdl} = file:open(form_filename(Source), [read, write, raw, binary, delayed_write, read_ahead]), - {ok, DestinationHdl} = file:open(form_filename(Destination), [read, write, raw, binary, delayed_write, read_ahead]), + {ok, SourceHdl} + = file:open(form_filename(Source), + [read, write, raw, binary, delayed_write, read_ahead]), + {ok, DestinationHdl} + = file:open(form_filename(Destination), + [read, write, raw, binary, delayed_write, read_ahead]), ExpectedSize = SourceValid + DestinationValid, - % if DestinationValid =:= DestinationContiguousTop then we don't need a tmp file - % if they're not equal, then we need to write out everything past the DestinationContiguousTop to a tmp file - % then truncate, copy back in, and then copy over from Source - % otherwise we just truncate straight away and copy over from Source + %% if DestinationValid =:= DestinationContiguousTop then we don't need a tmp file + %% if they're not equal, then we need to write out everything past the DestinationContiguousTop to a tmp file + %% then truncate, copy back in, and then copy over from Source + %% otherwise we just truncate straight away and copy over from Source if DestinationContiguousTop =:= DestinationValid -> - {ok, DestinationValid} = file:position(DestinationHdl, {bof, DestinationValid}), + {ok, DestinationValid} = file:position(DestinationHdl, + {bof, DestinationValid}), ok = file:truncate(DestinationHdl), - {ok, ExpectedSize} = file:position(DestinationHdl, {cur, SourceValid}), + {ok, ExpectedSize} = file:position(DestinationHdl, + {cur, SourceValid}), ok = file:truncate(DestinationHdl), - {ok, DestinationValid} = file:position(DestinationHdl, {bof, DestinationValid}); + {ok, DestinationValid} = file:position(DestinationHdl, + {bof, DestinationValid}); true -> Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP, - {ok, TmpHdl} = file:open(form_filename(Tmp), [read, write, raw, binary, delayed_write, read_ahead]), + {ok, TmpHdl} + = file:open(form_filename(Tmp), + [read, write, raw, binary, delayed_write, read_ahead]), Worklist - = lists:dropwhile(fun ({_, _, _, Offset, _}) when Offset /= DestinationContiguousTop -> - % it cannot be that Offset == DestinationContiguousTop - % because if it was then DestinationContiguousTop would have been - % extended by TotalSize - Offset < DestinationContiguousTop - % Given expected access patterns, I suspect that the list should be - % naturally sorted as we require, however, we need to enforce it anyway - end, lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) -> - OffA < OffB - end, - dets:match_object(MsgLocation, {'_', '_', Destination, '_', '_'}))), + = lists:dropwhile( + fun ({_, _, _, Offset, _}) + when Offset /= DestinationContiguousTop -> + %% it cannot be that Offset == DestinationContiguousTop + %% because if it was then DestinationContiguousTop would have been + %% extended by TotalSize + Offset < DestinationContiguousTop + %% Given expected access patterns, I suspect that the list should be + %% naturally sorted as we require, however, we need to enforce it anyway + end, lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) -> + OffA < OffB + end, + dets:match_object(MsgLocation, + {'_', '_', Destination, + '_', '_'}))), TmpSize = DestinationValid - DestinationContiguousTop, - {TmpSize, BlockStart1, BlockEnd1} = - lists:foldl(fun ({MsgId, RefCount, _Destination, Offset, TotalSize}, {CurOffset, BlockStart, BlockEnd}) -> - % CurOffset is in the TmpFile. - % Offset, BlockStart and BlockEnd are in the DestinationFile (which is currently the source!) - Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, - % this message is going to end up back in Destination, at DestinationContiguousTop + CurOffset - FinalOffset = DestinationContiguousTop + CurOffset, - ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination, FinalOffset, TotalSize}), - - NextOffset = CurOffset + Size, - if BlockStart =:= undefined -> - % base case, called only for the first list elem - {NextOffset, Offset, Offset + Size}; - Offset =:= BlockEnd -> - % extend the current block because the next msg follows straight on - {NextOffset, BlockStart, BlockEnd + Size}; - true -> - % found a gap, so actually do the work for the previous block - BSize = BlockEnd - BlockStart, - {ok, BlockStart} = file:position(DestinationHdl, {bof, BlockStart}), - {ok, BSize} = file:copy(DestinationHdl, TmpHdl, BSize), - {NextOffset, Offset, Offset + Size} - end - end, {0, undefined, undefined}, Worklist), - % do the last remaining block + {TmpSize, BlockStart1, BlockEnd1} + = lists:foldl( + fun ({MsgId, RefCount, _Destination, Offset, TotalSize}, + {CurOffset, BlockStart, BlockEnd}) -> + %% CurOffset is in the TmpFile. + %% Offset, BlockStart and BlockEnd are in the DestinationFile (which is currently the source!) + Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, + %% this message is going to end up back in Destination, at DestinationContiguousTop + CurOffset + FinalOffset = DestinationContiguousTop + CurOffset, + ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination, + FinalOffset, TotalSize}), + + NextOffset = CurOffset + Size, + if BlockStart =:= undefined -> + %% base case, called only for the first list elem + {NextOffset, Offset, Offset + Size}; + Offset =:= BlockEnd -> + %% extend the current block because the next msg follows straight on + {NextOffset, BlockStart, BlockEnd + Size}; + true -> + %% found a gap, so actually do the work for the previous block + BSize = BlockEnd - BlockStart, + {ok, BlockStart} + = file:position(DestinationHdl, + {bof, BlockStart}), + {ok, BSize} = file:copy(DestinationHdl, + TmpHdl, BSize), + {NextOffset, Offset, Offset + Size} + end + end, {0, undefined, undefined}, Worklist), + %% do the last remaining block BSize1 = BlockEnd1 - BlockStart1, {ok, BlockStart1} = file:position(DestinationHdl, {bof, BlockStart1}), {ok, BSize1} = file:copy(DestinationHdl, TmpHdl, BSize1), - % so now Tmp contains everything we need to salvage from Destination, - % and MsgLocation has been updated to reflect compaction of Destination - % so truncate Destination and copy from Tmp back to the end + %% so now Tmp contains everything we need to salvage from Destination, + %% and MsgLocation has been updated to reflect compaction of Destination + %% so truncate Destination and copy from Tmp back to the end {ok, 0} = file:position(TmpHdl, {bof, 0}), - {ok, DestinationContiguousTop} = file:position(DestinationHdl, {bof, DestinationContiguousTop}), + {ok, DestinationContiguousTop} + = file:position(DestinationHdl, + {bof, DestinationContiguousTop}), ok = file:truncate(DestinationHdl), - {ok, ExpectedSize} = file:position(DestinationHdl, {bof, ExpectedSize}), + {ok, ExpectedSize} + = file:position(DestinationHdl, + {bof, ExpectedSize}), ok = file:truncate(DestinationHdl), - {ok, DestinationContiguousTop} = file:position(DestinationHdl, {bof, DestinationContiguousTop}), + {ok, DestinationContiguousTop} + = file:position(DestinationHdl, + {bof, DestinationContiguousTop}), {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize), - % position in DestinationHdl should now be DestinationValid + %% position in DestinationHdl should now be DestinationValid ok = file:sync(DestinationHdl), ok = file:close(TmpHdl), ok = file:delete(form_filename(Tmp)) end, - SourceWorkList = lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) -> - OffA < OffB - end, dets:match_object(MsgLocation, {'_', '_', Source, '_', '_'})), - {ExpectedSize, BlockStart2, BlockEnd2} = - lists:foldl(fun ({MsgId, RefCount, _Source, Offset, TotalSize}, {CurOffset, BlockStart, BlockEnd}) -> - % CurOffset is in the DestinationFile. - % Offset, BlockStart and BlockEnd are in the SourceFile - Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, - % update MsgLocation to reflect change of file and offset - ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination, CurOffset, TotalSize}), - NextOffset = CurOffset + Size, - if BlockStart =:= undefined -> - % base case, called only for the first list elem - {NextOffset, Offset, Offset + Size}; - Offset =:= BlockEnd -> - % extend the current block because the next msg follows straight on - {NextOffset, BlockStart, BlockEnd + Size}; - true -> - % found a gap, so actually do the work for the previous block - BSize = BlockEnd - BlockStart, - {ok, BlockStart} = file:position(SourceHdl, {bof, BlockStart}), - {ok, BSize} = file:copy(SourceHdl, DestinationHdl, BSize), - {NextOffset, Offset, Offset + Size} - end - end, {DestinationValid, undefined, undefined}, SourceWorkList), - % do the last remaining block + SourceWorkList + = lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) -> + OffA < OffB + end, dets:match_object(MsgLocation, {'_', '_', Source, + '_', '_'})), + {ExpectedSize, BlockStart2, BlockEnd2} + = lists:foldl(fun ({MsgId, RefCount, _Source, Offset, TotalSize}, + {CurOffset, BlockStart, BlockEnd}) -> + %% CurOffset is in the DestinationFile. + %% Offset, BlockStart and BlockEnd are in the SourceFile + Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, + %% update MsgLocation to reflect change of file and offset + ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination, + CurOffset, TotalSize}), + NextOffset = CurOffset + Size, + if BlockStart =:= undefined -> + %% base case, called only for the first list elem + {NextOffset, Offset, Offset + Size}; + Offset =:= BlockEnd -> + %% extend the current block because the next msg follows straight on + {NextOffset, BlockStart, BlockEnd + Size}; + true -> + %% found a gap, so actually do the work for the previous block + BSize = BlockEnd - BlockStart, + {ok, BlockStart} + = file:position(SourceHdl, + {bof, BlockStart}), + {ok, BSize} + = file:copy(SourceHdl, DestinationHdl, + BSize), + {NextOffset, Offset, Offset + Size} + end + end, {DestinationValid, undefined, undefined}, SourceWorkList), + %% do the last remaining block BSize2 = BlockEnd2 - BlockStart2, {ok, BlockStart2} = file:position(SourceHdl, {bof, BlockStart2}), {ok, BSize2} = file:copy(SourceHdl, DestinationHdl, BSize2), - % tidy up + %% tidy up ok = file:sync(DestinationHdl), ok = file:close(SourceHdl), ok = file:close(DestinationHdl), @@ -562,20 +656,27 @@ closeFile(File, State = #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge} } State; {ok, {Hdl, Then}} -> ok = file:close(Hdl), - State #dqstate { read_file_handles = { dict:erase(File, ReadHdls), gb_trees:delete(Then, ReadHdlsAge) } } + State #dqstate { read_file_handles + = { dict:erase(File, ReadHdls), + gb_trees:delete(Then, ReadHdlsAge) } } end. delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) -> - [{File, ValidData, _ContiguousTop, Left, Right}] = ets:lookup(FileSummary, File), + [{File, ValidData, _ContiguousTop, Left, Right}] + = ets:lookup(FileSummary, File), case ValidData of - % we should NEVER find the current file in here - hence right should always be a file, not undefined + %% we should NEVER find the current file in here + %% hence right should always be a file, not undefined 0 -> case {Left, Right} of {undefined, _} when not(is_atom(Right)) -> - % the eldest file is empty. YAY! - true = ets:update_element(FileSummary, Right, {4, undefined}); % left is the 4th field + %% the eldest file is empty. YAY! + %% left is the 4th field + true = ets:update_element(FileSummary, Right, {4, undefined}); {_, _} when not(is_atom(Right)) -> - true = ets:update_element(FileSummary, Right, {4, Left}), % left is the 4th field - true = ets:update_element(FileSummary, Left, {5, Right}) % right is the 5th field + %% left is the 4th field + true = ets:update_element(FileSummary, Right, {4, Left}), + %% right is the 5th field + true = ets:update_element(FileSummary, Left, {5, Right}) end, true = ets:delete(FileSummary, File), ok = file:delete(form_filename(File)), @@ -586,24 +687,27 @@ delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) -> %% ---- DISK RECOVERY ---- load_from_disk(State) -> - % sorted so that smallest number is first. which also means eldest file (left-most) first + %% sorted so that smallest number is first. which also means eldest file (left-most) first {Files, TmpFiles} = get_disk_queue_files(), ok = recover_crashed_compactions(Files, TmpFiles), - % There should be no more tmp files now, so go ahead and load the whole lot - (State1 = #dqstate{ msg_location = MsgLocation }) = load_messages(undefined, Files, State), - % Finally, check there is nothing in mnesia which we haven't loaded + %% There should be no more tmp files now, so go ahead and load the whole lot + (State1 = #dqstate{ msg_location = MsgLocation }) + = load_messages(undefined, Files, State), + %% Finally, check there is nothing in mnesia which we haven't loaded {atomic, true} = mnesia:transaction( fun() -> ok = mnesia:read_lock_table(rabbit_disk_queue), mnesia:foldl(fun (#dq_msg_loc { msg_id = MsgId }, true) -> - true = 1 =:= length(dets:lookup(MsgLocation, MsgId)) end, + true = 1 + =:= length(dets:lookup(MsgLocation, MsgId)) + end, true, rabbit_disk_queue) end), State2 = extract_sequence_numbers(State1), {ok, State2}. extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> - % next-seqid-to-read is the lowest seqid which has is_delivered = false + %% next-seqid-to-read is the lowest seqid which has is_delivered = false {atomic, true} = mnesia:transaction( fun() -> ok = mnesia:read_lock_table(rabbit_disk_queue), @@ -612,7 +716,8 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> NextWrite = SeqId + 1, case ets:lookup(Sequences, Q) of [] -> - true = ets:insert_new(Sequences, {Q, SeqId, NextWrite}); + true = ets:insert_new(Sequences, + {Q, SeqId, NextWrite}); [Orig = {Q, Read, Write}] -> Repl = {Q, lists:min([Read, SeqId]), lists:max([Write, NextWrite])}, @@ -634,7 +739,8 @@ load_messages(Left, [], State = #dqstate { msg_location = MsgLocation }) -> Offset = case dets:match_object(MsgLocation, {'_', '_', Left, '_', '_'}) of [] -> 0; L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_] - = lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) -> + = lists:sort(fun ({_, _, _, OffA, _}, + {_, _, _, OffB, _}) -> OffB < OffA end, L), MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT @@ -645,24 +751,26 @@ load_messages(Left, [File|Files], State = #dqstate { msg_location = MsgLocation, file_summary = FileSummary }) -> - % [{MsgId, TotalSize, FileOffset}] + %% [{MsgId, TotalSize, FileOffset}] {ok, Messages} = scan_file_for_valid_messages(form_filename(File)), {ValidMessagesRev, ValidTotalSize} = lists:foldl( fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> - case length(mnesia:dirty_match_object(rabbit_disk_queue, - #dq_msg_loc { msg_id = MsgId, - queue_and_seq_id = '_', - is_delivered = '_'})) of + case length(mnesia:dirty_match_object + (rabbit_disk_queue, + #dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = '_', + is_delivered = '_'})) of 0 -> {VMAcc, VTSAcc}; RefCount -> - true = dets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}), + true = dets:insert_new(MsgLocation, {MsgId, RefCount, File, + Offset, TotalSize}), {[{MsgId, TotalSize, Offset}|VMAcc], VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT } end end, {[], 0}, Messages), - % foldl reverses lists and find_contiguous_block_prefix needs elems in the same order - % as from scan_file_for_valid_messages + %% foldl reverses lists and find_contiguous_block_prefix needs elems in the same order + %% as from scan_file_for_valid_messages {ContiguousTop, _} = find_contiguous_block_prefix(lists:reverse(ValidMessagesRev)), Right = case Files of [] -> undefined; @@ -682,17 +790,20 @@ recover_crashed_compactions1(Files, TmpFile) -> GrabMsgId = fun ({MsgId, _TotalSize, _FileOffset}) -> MsgId end, NonTmpRelatedFile = filename:rootname(TmpFile) ++ ?FILE_EXTENSION, true = lists:member(NonTmpRelatedFile, Files), - % [{MsgId, TotalSize, FileOffset}] - {ok, UncorruptedMessagesTmp} = scan_file_for_valid_messages(form_filename(TmpFile)), + %% [{MsgId, TotalSize, FileOffset}] + {ok, UncorruptedMessagesTmp} + = scan_file_for_valid_messages(form_filename(TmpFile)), MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp), - % all of these messages should appear in the mnesia table, otherwise they wouldn't have been copied out + %% all of these messages should appear in the mnesia table, otherwise they wouldn't have been copied out lists:foreach(fun (MsgId) -> - true = 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, - #dq_msg_loc { msg_id = MsgId, - queue_and_seq_id = '_', - is_delivered = '_'})) + true = 0 < length(mnesia:dirty_match_object + (rabbit_disk_queue, + #dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = '_', + is_delivered = '_'})) end, MsgIdsTmp), - {ok, UncorruptedMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), + {ok, UncorruptedMessages} + = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), MsgIds = lists:map(GrabMsgId, UncorruptedMessages), %% 1) It's possible that everything in the tmp file is also in the main file %% such that the main file is (prefix ++ tmpfile). This means that compaction @@ -714,66 +825,74 @@ recover_crashed_compactions1(Files, TmpFile) -> %% Plan: Truncate the main file back to before any of the files in the tmp file and copy %% them over again case lists:all(fun (MsgId) -> lists:member(MsgId, MsgIds) end, MsgIdsTmp) of - true -> % we're in case 1, 2 or 3 above. Just delete the tmp file - % note this also catches the case when the tmp file is empty + true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file + %% note this also catches the case when the tmp file is empty ok = file:delete(TmpFile); _False -> - % we're in case 4 above. - % check that everything in the main file is a valid message in mnesia + %% we're in case 4 above. + %% check that everything in the main file is a valid message in mnesia lists:foreach(fun (MsgId) -> - true = 0 < - length(mnesia:dirty_match_object(rabbit_disk_queue, - #dq_msg_loc { msg_id = MsgId, - queue_and_seq_id = '_', - is_delivered = '_'})) + true = 0 < length(mnesia:dirty_match_object + (rabbit_disk_queue, + #dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = '_', + is_delivered = '_'})) end, MsgIds), - % The main file should be contiguous + %% The main file should be contiguous {Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages), - % we should have that none of the messages in the prefix are in the tmp file - true = lists:all(fun (MsgId) -> not(lists:member(MsgId, MsgIdsTmp)) end, MsgIds), + %% we should have that none of the messages in the prefix are in the tmp file + true = lists:all(fun (MsgId) -> not(lists:member(MsgId, MsgIdsTmp)) end, + MsgIds), - {ok, MainHdl} = file:open(form_filename(NonTmpRelatedFile), [write, raw, binary, delayed_write]), + {ok, MainHdl} = file:open(form_filename(NonTmpRelatedFile), + [write, raw, binary, delayed_write]), {ok, Top} = file:position(MainHdl, Top), - ok = file:truncate(MainHdl), % wipe out any rubbish at the end of the file - % there really could be rubbish at the end of the file - we could have failed after the - % extending truncate. - % Remember the head of the list will be the highest entry in the file + ok = file:truncate(MainHdl), %% wipe out any rubbish at the end of the file + %% there really could be rubbish at the end of the file - we could have failed after the + %% extending truncate. + %% Remember the head of the list will be the highest entry in the file [{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp, TmpSize = TmpTopOffset + TmpTopTotalSize + ?FILE_PACKING_ADJUSTMENT, ExpectedAbsPos = Top + TmpSize, {ok, ExpectedAbsPos} = file:position(MainHdl, {cur, TmpSize}), - ok = file:truncate(MainHdl), % and now extend the main file as big as necessary in a single move - % if we run out of disk space, this truncate could fail, but we still - % aren't risking losing data - {ok, TmpHdl} = file:open(form_filename(TmpFile), [read, raw, binary, read_ahead]), + ok = file:truncate(MainHdl), %% and now extend the main file as big as necessary in a single move + %% if we run out of disk space, this truncate could fail, but we still + %% aren't risking losing data + {ok, TmpHdl} = file:open(form_filename(TmpFile), + [read, raw, binary, read_ahead]), {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize), ok = file:close(MainHdl), ok = file:close(TmpHdl), ok = file:delete(TmpFile), - {ok, MainMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), + {ok, MainMessages} + = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), MsgIdsMain = lists:map(GrabMsgId, MainMessages), - % check that everything in MsgIds is in MsgIdsMain - true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end, MsgIds), - % check that everything in MsgIdsTmp is in MsgIdsMain - true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end, MsgIdsTmp) + %% check that everything in MsgIds is in MsgIdsMain + true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end, + MsgIds), + %% check that everything in MsgIdsTmp is in MsgIdsMain + true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end, + MsgIdsTmp) end, ok. -% this assumes that the messages are ordered such that the highest address is at -% the head of the list. -% this matches what scan_file_for_valid_messages produces +%% this assumes that the messages are ordered such that the highest address is at +%% the head of the list. +%% this matches what scan_file_for_valid_messages produces find_contiguous_block_prefix([]) -> {0, []}; find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail]) -> case find_contiguous_block_prefix(Tail, Offset, [MsgId]) of - {ok, Acc} -> {Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT, lists:reverse(Acc)}; + {ok, Acc} -> {Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT, + lists:reverse(Acc)}; Res -> Res end. find_contiguous_block_prefix([], 0, Acc) -> {ok, Acc}; find_contiguous_block_prefix([], _N, _Acc) -> {0, []}; -find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail], ExpectedOffset, Acc) +find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail], + ExpectedOffset, Acc) when ExpectedOffset =:= Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT -> find_contiguous_block_prefix(Tail, Offset, [MsgId|Acc]); find_contiguous_block_prefix(List, _ExpectedOffset, _Acc) -> @@ -800,7 +919,8 @@ append_message(FileHdl, MsgId, MsgBody) when is_binary(MsgBody) -> TotalSize = BodySize + MsgIdBinSize, case file:write(FileHdl, <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS, - MsgIdBin:MsgIdBinSize/binary, MsgBody:BodySize/binary, + MsgIdBin:MsgIdBinSize/binary, + MsgBody:BodySize/binary, ?WRITE_OK:?WRITE_OK_SIZE_BITS>>) of ok -> {ok, TotalSize}; KO -> KO @@ -811,9 +931,12 @@ read_message_at_offset(FileHdl, Offset, TotalSize) -> case file:position(FileHdl, {bof, Offset}) of {ok, Offset} -> case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of - {ok, <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS, Rest:TotalSizeWriteOkBytes/binary>>} -> + {ok, <<TotalSize:?INTEGER_SIZE_BITS, + MsgIdBinSize:?INTEGER_SIZE_BITS, + Rest:TotalSizeWriteOkBytes/binary>>} -> BodySize = TotalSize - MsgIdBinSize, - <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary, ?WRITE_OK:?WRITE_OK_SIZE_BITS>> = Rest, + <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary, + ?WRITE_OK:?WRITE_OK_SIZE_BITS>> = Rest, {ok, {MsgBody, BodySize}}; KO -> KO end; @@ -823,7 +946,7 @@ read_message_at_offset(FileHdl, Offset, TotalSize) -> scan_file_for_valid_messages(File) -> {ok, Hdl} = file:open(File, [raw, binary, read]), Valid = scan_file_for_valid_messages(Hdl, 0, []), - _ = file:close(Hdl), % if something really bad's happened, the close could fail, but ignore + _ = file:close(Hdl), %% if something really bad's happened, the close could fail, but ignore Valid. scan_file_for_valid_messages(FileHdl, Offset, Acc) -> @@ -832,7 +955,8 @@ scan_file_for_valid_messages(FileHdl, Offset, Acc) -> {ok, {corrupted, NextOffset}} -> scan_file_for_valid_messages(FileHdl, NextOffset, Acc); {ok, {ok, MsgId, TotalSize, NextOffset}} -> - scan_file_for_valid_messages(FileHdl, NextOffset, [{MsgId, TotalSize, Offset}|Acc]); + scan_file_for_valid_messages(FileHdl, NextOffset, + [{MsgId, TotalSize, Offset}|Acc]); _KO -> {ok, Acc} %% bad message, but we may still have recovered some valid messages end. @@ -854,14 +978,17 @@ read_next_file_entry(FileHdl, Offset) -> case file:read(FileHdl, MsgIdBinSize) of {ok, <<MsgId:MsgIdBinSize/binary>>} -> ExpectedAbsPos = Offset + TwoIntegers + TotalSize, - case file:position(FileHdl, {cur, TotalSize - MsgIdBinSize}) of + case file:position(FileHdl, + {cur, TotalSize - MsgIdBinSize}) of {ok, ExpectedAbsPos} -> + NextOffset = Offset + TotalSize + + ?FILE_PACKING_ADJUSTMENT, case file:read(FileHdl, 1) of {ok, <<?WRITE_OK:?WRITE_OK_SIZE_BITS>>} -> - {ok, {ok, binary_to_term(MsgId), TotalSize, - Offset + ?FILE_PACKING_ADJUSTMENT + TotalSize}}; + {ok, {ok, binary_to_term(MsgId), + TotalSize, NextOffset}}; {ok, _SomeOtherData} -> - {ok, {corrupted, Offset + ?FILE_PACKING_ADJUSTMENT + TotalSize}}; + {ok, {corrupted, NextOffset}}; KO -> KO end; {ok, _SomeOtherPos} -> {ok, eof}; %% seek failed, so give up |
