diff options
| author | Matthias Radestock <matthias@lshift.net> | 2009-08-29 07:09:26 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2009-08-29 07:09:26 +0100 |
| commit | 40cece9560359da9e268c4111d357134520ae1b5 (patch) | |
| tree | 3f25d1752b3f4872d5416883dd20ab7450908c50 | |
| parent | f74e8dfd75f9f4b1859b7de8eb21666ecbd4bf3d (diff) | |
| download | rabbitmq-server-git-40cece9560359da9e268c4111d357134520ae1b5.tar.gz | |
refactoring: extract file opening, and simplify file:position calls
| -rw-r--r-- | src/rabbit_disk_queue.erl | 45 |
1 files changed, 20 insertions, 25 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index f78f413fda..62d53eee7e 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -73,6 +73,8 @@ -define(MAX_READ_FILE_HANDLES, 256). -define(FILE_SIZE_LIMIT, (256*1024*1024)). +-define(READ_MODE, [read, read_ahead]). +-define(WRITE_MODE, [write, delayed_write]). -define(SHUTDOWN_MESSAGE_KEY, shutdown_token). -define(SHUTDOWN_MESSAGE, @@ -449,9 +451,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> current_offset = Offset } } = load_from_disk(State), %% read is only needed so that we can seek - {ok, FileHdl} = file:open(form_filename(CurrentName), - [read, write, raw, binary, delayed_write]), - {ok, Offset} = file:position(FileHdl, {bof, Offset}), + FileHdl = open_file(CurrentName, ?WRITE_MODE ++ [read]), + {ok, Offset} = file:position(FileHdl, Offset), State2 = State1 #dqstate { current_file_handle = FileHdl }, %% by reporting a memory use of 0, we guarantee the manager will %% not oppress us. We have to start in ram_disk mode because we @@ -807,6 +808,10 @@ base_directory() -> msg_location_dets_file() -> form_filename(atom_to_list(?MSG_LOC_NAME) ++ ?FILE_EXTENSION_DETS). +open_file(File, Mode) -> + {ok, Hdl} = file:open(form_filename(File), [raw, binary] ++ Mode), + Hdl. + with_read_handle_at(File, Offset, Fun, State = #dqstate { read_file_hc_cache = HC, current_file_name = CurName, @@ -1259,8 +1264,7 @@ maybe_roll_to_new_file(Offset, ok = file:close(CurHdl), NextNum = CurNum + 1, NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION, - {ok, NextHdl} = file:open(form_filename(NextName), - [write, raw, binary, delayed_write]), + NextHdl = open_file(NextName, ?WRITE_MODE), true = ets:update_element(FileSummary, CurName, {5, NextName}),%% 5 is Right true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}), State2 = State1 #dqstate { current_file_name = NextName, @@ -1352,13 +1356,13 @@ sort_msg_locations_by_offset(Dir, List) -> end, List). preallocate(Hdl, FileSizeLimit, FinalPos) -> - {ok, FileSizeLimit} = file:position(Hdl, {bof, FileSizeLimit}), + {ok, FileSizeLimit} = file:position(Hdl, FileSizeLimit), ok = file:truncate(Hdl), - {ok, FinalPos} = file:position(Hdl, {bof, FinalPos}), + {ok, FinalPos} = file:position(Hdl, FinalPos), ok. truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> - {ok, Lowpoint} = file:position(FileHdl, {bof, Lowpoint}), + {ok, Lowpoint} = file:position(FileHdl, Lowpoint), ok = file:truncate(FileHdl), ok = preallocate(FileHdl, Highpoint, Lowpoint). @@ -1368,12 +1372,8 @@ combine_files({Source, SourceValid, _SourceContiguousTop, _DestinationLeft, _DestinationRight}, State) -> State1 = close_file(Source, close_file(Destination, State)), - {ok, SourceHdl} = - file:open(form_filename(Source), - [read, raw, binary, read_ahead]), - {ok, DestinationHdl} = - file:open(form_filename(Destination), - [read, write, raw, binary, read_ahead, delayed_write]), + SourceHdl = open_file(Source, ?READ_MODE), + DestinationHdl = open_file(Destination, ?READ_MODE ++ ?WRITE_MODE), ExpectedSize = SourceValid + DestinationValid, %% if DestinationValid =:= DestinationContiguousTop then we don't %% need a tmp file @@ -1386,10 +1386,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop, DestinationValid, ExpectedSize); true -> Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP, - {ok, TmpHdl} = - file:open(form_filename(Tmp), - [read, write, raw, binary, - read_ahead, delayed_write]), + TmpHdl = open_file(Tmp, ?READ_MODE ++ ?WRITE_MODE), Worklist = lists:dropwhile( fun (#message_store_entry { offset = Offset }) @@ -1415,7 +1412,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop, %% Destination, and MsgLocationDets has been updated to %% reflect compaction of Destination so truncate %% Destination and copy from Tmp back to the end - {ok, 0} = file:position(TmpHdl, {bof, 0}), + {ok, 0} = file:position(TmpHdl, 0), ok = truncate_and_extend_file( DestinationHdl, DestinationContiguousTop, ExpectedSize), {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize), @@ -1464,7 +1461,7 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, %% the previous block BSize = BlockEnd - BlockStart, {ok, BlockStart} = - file:position(SourceHdl, {bof, BlockStart}), + file:position(SourceHdl, BlockStart), {ok, BSize} = file:copy(SourceHdl, DestinationHdl, BSize), {NextOffset, Offset, Offset + Size} @@ -1472,7 +1469,7 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, end, {InitOffset, undefined, undefined}, WorkList), %% do the last remaining block BSize1 = BlockEnd1 - BlockStart1, - {ok, BlockStart1} = file:position(SourceHdl, {bof, BlockStart1}), + {ok, BlockStart1} = file:position(SourceHdl, BlockStart1), {ok, BSize1} = file:copy(SourceHdl, DestinationHdl, BSize1), ok. @@ -1854,9 +1851,7 @@ recover_crashed_compactions1(Files, TmpFile) -> not (lists:member(MsgId, MsgIdsTmp)) end, MsgIds), %% must open with read flag, otherwise will stomp over contents - {ok, MainHdl} = - file:open(form_filename(NonTmpRelatedFile), - [read, write, raw, binary, delayed_write]), + MainHdl = open_file(NonTmpRelatedFile, ?WRITE_MODE ++ [read]), {ok, Top} = file:position(MainHdl, Top), %% wipe out any rubbish at the end of the file ok = file:truncate(MainHdl), @@ -1872,7 +1867,7 @@ recover_crashed_compactions1(Files, TmpFile) -> %% single move if we run out of disk space, this truncate %% could fail, but we still aren't risking losing data ok = file:truncate(MainHdl), - {ok, TmpHdl} = file:open(TmpPath, [read, raw, binary, read_ahead]), + TmpHdl = open_file(TmpFile, ?READ_MODE), {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize), ok = file:sync(MainHdl), ok = file:close(MainHdl), |
