diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 30 |
1 files changed, 15 insertions, 15 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index b7ed21561a..cbf8d68f39 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -73,6 +73,7 @@ -define(MAX_READ_FILE_HANDLES, 256). -define(FILE_SIZE_LIMIT, (256*1024*1024)). +-define(BINARY_MODE, [raw, binary]). -define(READ_MODE, [read, read_ahead]). -define(WRITE_MODE, [write, delayed_write]). @@ -419,6 +420,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected, {keypos, 2}]), InitName = "0" ++ ?FILE_EXTENSION, + HandleCache = rabbit_file_handle_cache:init(ReadFileHandlesLimit, + ?BINARY_MODE ++ [read]), State = #dqstate { msg_location_dets = MsgLocationDets, msg_location_ets = MsgLocationEts, @@ -433,9 +436,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> current_offset = 0, current_dirty = false, file_size_limit = FileSizeLimit, - read_file_hc_cache = rabbit_file_handle_cache:init( - ReadFileHandlesLimit, - [read, raw, binary]), + read_file_hc_cache = HandleCache, on_sync_txns = [], commit_timer_ref = undefined, last_sync_offset = 0, @@ -450,7 +451,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> current_offset = Offset } } = load_from_disk(State), %% read is only needed so that we can seek - FileHdl = open_file(CurrentName, ?WRITE_MODE ++ [read]), + {ok, FileHdl} = open_file(CurrentName, ?WRITE_MODE ++ [read]), {ok, Offset} = file:position(FileHdl, Offset), State2 = State1 #dqstate { current_file_handle = FileHdl }, %% by reporting a memory use of 0, we guarantee the manager will @@ -807,9 +808,7 @@ base_directory() -> msg_location_dets_file() -> form_filename(atom_to_list(?MSG_LOC_NAME) ++ ?FILE_EXTENSION_DETS). -open_file(File, Mode) -> - {ok, Hdl} = file:open(form_filename(File), [raw, binary] ++ Mode), - Hdl. +open_file(File, Mode) -> file:open(form_filename(File), ?BINARY_MODE ++ Mode). with_read_handle_at(File, Offset, Fun, State = #dqstate { read_file_hc_cache = HC, @@ -1261,7 +1260,7 @@ maybe_roll_to_new_file(Offset, ok = file:close(CurHdl), NextNum = CurNum + 1, NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION, - NextHdl = open_file(NextName, ?WRITE_MODE), + {ok, NextHdl} = open_file(NextName, ?WRITE_MODE), true = ets:update_element(FileSummary, CurName, {5, NextName}),%% 5 is Right true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}), State2 = State1 #dqstate { current_file_name = NextName, @@ -1369,8 +1368,8 @@ combine_files({Source, SourceValid, _SourceContiguousTop, _DestinationLeft, _DestinationRight}, State) -> State1 = close_file(Source, close_file(Destination, State)), - SourceHdl = open_file(Source, ?READ_MODE), - DestinationHdl = open_file(Destination, ?READ_MODE ++ ?WRITE_MODE), + {ok, SourceHdl} = open_file(Source, ?READ_MODE), + {ok, DestinationHdl} = open_file(Destination, ?READ_MODE ++ ?WRITE_MODE), ExpectedSize = SourceValid + DestinationValid, %% if DestinationValid =:= DestinationContiguousTop then we don't %% need a tmp file @@ -1383,7 +1382,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop, DestinationValid, ExpectedSize); true -> Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP, - TmpHdl = open_file(Tmp, ?READ_MODE ++ ?WRITE_MODE), + {ok, TmpHdl} = open_file(Tmp, ?READ_MODE ++ ?WRITE_MODE), Worklist = lists:dropwhile( fun (#message_store_entry { offset = Offset }) @@ -1848,7 +1847,7 @@ recover_crashed_compactions1(Files, TmpFile) -> not (lists:member(MsgId, MsgIdsTmp)) end, MsgIds), %% must open with read flag, otherwise will stomp over contents - MainHdl = open_file(NonTmpRelatedFile, ?WRITE_MODE ++ [read]), + {ok, MainHdl} = open_file(NonTmpRelatedFile, ?WRITE_MODE ++ [read]), {ok, Top} = file:position(MainHdl, Top), %% wipe out any rubbish at the end of the file ok = file:truncate(MainHdl), @@ -1864,7 +1863,7 @@ recover_crashed_compactions1(Files, TmpFile) -> %% single move if we run out of disk space, this truncate %% could fail, but we still aren't risking losing data ok = file:truncate(MainHdl), - TmpHdl = open_file(TmpFile, ?READ_MODE), + {ok, TmpHdl} = open_file(TmpFile, ?READ_MODE), {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize), ok = file:sync(MainHdl), ok = file:close(MainHdl), @@ -1950,10 +1949,11 @@ read_message_from_disk(FileHdl, TotalSize) -> end. scan_file_for_valid_messages(File) -> - case file:open(form_filename(File), [raw, binary, read, read_ahead]) of + case open_file(File, ?READ_MODE) of {ok, Hdl} -> Valid = scan_file_for_valid_messages(Hdl, 0, []), - %% if something really bad's happened, the close could fail, but ignore + %% if something really bad's happened, the close could fail, + %% but ignore file:close(Hdl), Valid; {error, enoent} -> {ok, []}; |
