summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl16
1 files changed, 8 insertions, 8 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 582477992e..765380b605 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -113,7 +113,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
{ok, State1 = #dqstate { current_file_name = CurrentName, current_offset = Offset } } = load_from_disk(State),
Path = form_filename(CurrentName),
ok = filelib:ensure_dir(Path),
- {ok, FileHdl} = file:open(Path, [read, write, raw, binary]), %% read only needed so that we can seek
+ {ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]), %% read only needed so that we can seek
{ok, Offset} = file:position(FileHdl, {bof, Offset}),
{ok, State1 # dqstate { current_file_handle = FileHdl }}.
@@ -172,7 +172,7 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
{FileHdl, ReadHdls1, ReadHdlsAge1}
= case dict:find(File, ReadHdls) of
error ->
- {ok, Hdl} = file:open(form_filename(File), [read, raw, binary]),
+ {ok, Hdl} = file:open(form_filename(File), [read, raw, binary, read_ahead]),
Now = now(),
case dict:size(ReadHdls) < ReadFileHandlesLimit of
true ->
@@ -320,7 +320,7 @@ maybe_roll_to_new_file(Offset, State = #dqstate { file_size_limit = FileSizeLimi
ok = file:close(CurHdl),
NextNum = CurNum + 1,
NextName = integer_to_list(NextNum) ++ (?FILE_EXTENSION),
- {ok, NextHdl} = file:open(form_filename(NextName), [write, raw, binary]),
+ {ok, NextHdl} = file:open(form_filename(NextName), [write, raw, binary, delayed_write]),
[{CurName, FileSum = #dqfile {right = undefined}}] = ets:lookup(FileSummary, CurName),
true = ets:insert(FileSummary, {CurName, FileSum #dqfile {right = NextName}}),
true = ets:insert_new(FileSummary, {NextName, #dqfile { valid_data = 0,
@@ -455,7 +455,7 @@ recover_crashed_compactions1(Files, TmpFile) ->
% we should have that none of the messages in the prefix are in the tmp file
true = lists:all(fun (MsgId) -> not(lists:member(MsgId, MsgIdsTmp)) end, MsgIds),
- {ok, MainHdl} = file:open(form_filename(NonTmpRelatedFile), [write, raw, binary]),
+ {ok, MainHdl} = file:open(form_filename(NonTmpRelatedFile), [write, raw, binary, delayed_write]),
{ok, Top} = file:position(MainHdl, Top),
ok = file:truncate(MainHdl), % wipe out any rubbish at the end of the file
% there really could be rubbish at the end of the file - we could have failed after the
@@ -468,7 +468,7 @@ recover_crashed_compactions1(Files, TmpFile) ->
ok = file:truncate(MainHdl), % and now extend the main file as big as necessary in a single move
% if we run out of disk space, this truncate could fail, but we still
% aren't risking losing data
- {ok, TmpHdl} = file:open(form_filename(TmpFile), [read, raw, binary]),
+ {ok, TmpHdl} = file:open(form_filename(TmpFile), [read, raw, binary, read_ahead]),
{ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize),
ok = file:close(MainHdl),
ok = file:close(TmpHdl),
@@ -523,9 +523,9 @@ append_message(FileHdl, MsgId, MsgBody) when is_binary(MsgBody) ->
TotalSize = BodySize + MsgIdBinSize,
case file:write(FileHdl, <<TotalSize:(?INTEGER_SIZE_BITS),
MsgIdBinSize:(?INTEGER_SIZE_BITS),
- MsgIdBin:MsgIdBinSize/binary, MsgBody:BodySize/binary>>) of
- ok -> ok = file:write(FileHdl, <<(?WRITE_OK):(?WRITE_OK_SIZE_BITS)>>),
- {ok, TotalSize};
+ MsgIdBin:MsgIdBinSize/binary, MsgBody:BodySize/binary,
+ (?WRITE_OK):(?WRITE_OK_SIZE_BITS)>>) of
+ ok -> {ok, TotalSize};
KO -> KO
end.