summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-08-29 07:09:26 +0100
committerMatthias Radestock <matthias@lshift.net>2009-08-29 07:09:26 +0100
commit40cece9560359da9e268c4111d357134520ae1b5 (patch)
tree3f25d1752b3f4872d5416883dd20ab7450908c50
parentf74e8dfd75f9f4b1859b7de8eb21666ecbd4bf3d (diff)
downloadrabbitmq-server-git-40cece9560359da9e268c4111d357134520ae1b5.tar.gz
refactoring: extract file opening, and simplify file:position calls
-rw-r--r--src/rabbit_disk_queue.erl45
1 files changed, 20 insertions, 25 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index f78f413fda..62d53eee7e 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -73,6 +73,8 @@
-define(MAX_READ_FILE_HANDLES, 256).
-define(FILE_SIZE_LIMIT, (256*1024*1024)).
+-define(READ_MODE, [read, read_ahead]).
+-define(WRITE_MODE, [write, delayed_write]).
-define(SHUTDOWN_MESSAGE_KEY, shutdown_token).
-define(SHUTDOWN_MESSAGE,
@@ -449,9 +451,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
current_offset = Offset } } =
load_from_disk(State),
%% read is only needed so that we can seek
- {ok, FileHdl} = file:open(form_filename(CurrentName),
- [read, write, raw, binary, delayed_write]),
- {ok, Offset} = file:position(FileHdl, {bof, Offset}),
+ FileHdl = open_file(CurrentName, ?WRITE_MODE ++ [read]),
+ {ok, Offset} = file:position(FileHdl, Offset),
State2 = State1 #dqstate { current_file_handle = FileHdl },
%% by reporting a memory use of 0, we guarantee the manager will
%% not oppress us. We have to start in ram_disk mode because we
@@ -807,6 +808,10 @@ base_directory() ->
msg_location_dets_file() ->
form_filename(atom_to_list(?MSG_LOC_NAME) ++ ?FILE_EXTENSION_DETS).
+open_file(File, Mode) ->
+ {ok, Hdl} = file:open(form_filename(File), [raw, binary] ++ Mode),
+ Hdl.
+
with_read_handle_at(File, Offset, Fun, State =
#dqstate { read_file_hc_cache = HC,
current_file_name = CurName,
@@ -1259,8 +1264,7 @@ maybe_roll_to_new_file(Offset,
ok = file:close(CurHdl),
NextNum = CurNum + 1,
NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION,
- {ok, NextHdl} = file:open(form_filename(NextName),
- [write, raw, binary, delayed_write]),
+ NextHdl = open_file(NextName, ?WRITE_MODE),
true = ets:update_element(FileSummary, CurName, {5, NextName}),%% 5 is Right
true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}),
State2 = State1 #dqstate { current_file_name = NextName,
@@ -1352,13 +1356,13 @@ sort_msg_locations_by_offset(Dir, List) ->
end, List).
preallocate(Hdl, FileSizeLimit, FinalPos) ->
- {ok, FileSizeLimit} = file:position(Hdl, {bof, FileSizeLimit}),
+ {ok, FileSizeLimit} = file:position(Hdl, FileSizeLimit),
ok = file:truncate(Hdl),
- {ok, FinalPos} = file:position(Hdl, {bof, FinalPos}),
+ {ok, FinalPos} = file:position(Hdl, FinalPos),
ok.
truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) ->
- {ok, Lowpoint} = file:position(FileHdl, {bof, Lowpoint}),
+ {ok, Lowpoint} = file:position(FileHdl, Lowpoint),
ok = file:truncate(FileHdl),
ok = preallocate(FileHdl, Highpoint, Lowpoint).
@@ -1368,12 +1372,8 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
_DestinationLeft, _DestinationRight},
State) ->
State1 = close_file(Source, close_file(Destination, State)),
- {ok, SourceHdl} =
- file:open(form_filename(Source),
- [read, raw, binary, read_ahead]),
- {ok, DestinationHdl} =
- file:open(form_filename(Destination),
- [read, write, raw, binary, read_ahead, delayed_write]),
+ SourceHdl = open_file(Source, ?READ_MODE),
+ DestinationHdl = open_file(Destination, ?READ_MODE ++ ?WRITE_MODE),
ExpectedSize = SourceValid + DestinationValid,
%% if DestinationValid =:= DestinationContiguousTop then we don't
%% need a tmp file
@@ -1386,10 +1386,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
DestinationValid, ExpectedSize);
true ->
Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP,
- {ok, TmpHdl} =
- file:open(form_filename(Tmp),
- [read, write, raw, binary,
- read_ahead, delayed_write]),
+ TmpHdl = open_file(Tmp, ?READ_MODE ++ ?WRITE_MODE),
Worklist =
lists:dropwhile(
fun (#message_store_entry { offset = Offset })
@@ -1415,7 +1412,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
%% Destination, and MsgLocationDets has been updated to
%% reflect compaction of Destination so truncate
%% Destination and copy from Tmp back to the end
- {ok, 0} = file:position(TmpHdl, {bof, 0}),
+ {ok, 0} = file:position(TmpHdl, 0),
ok = truncate_and_extend_file(
DestinationHdl, DestinationContiguousTop, ExpectedSize),
{ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize),
@@ -1464,7 +1461,7 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
%% the previous block
BSize = BlockEnd - BlockStart,
{ok, BlockStart} =
- file:position(SourceHdl, {bof, BlockStart}),
+ file:position(SourceHdl, BlockStart),
{ok, BSize} =
file:copy(SourceHdl, DestinationHdl, BSize),
{NextOffset, Offset, Offset + Size}
@@ -1472,7 +1469,7 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
end, {InitOffset, undefined, undefined}, WorkList),
%% do the last remaining block
BSize1 = BlockEnd1 - BlockStart1,
- {ok, BlockStart1} = file:position(SourceHdl, {bof, BlockStart1}),
+ {ok, BlockStart1} = file:position(SourceHdl, BlockStart1),
{ok, BSize1} = file:copy(SourceHdl, DestinationHdl, BSize1),
ok.
@@ -1854,9 +1851,7 @@ recover_crashed_compactions1(Files, TmpFile) ->
not (lists:member(MsgId, MsgIdsTmp))
end, MsgIds),
%% must open with read flag, otherwise will stomp over contents
- {ok, MainHdl} =
- file:open(form_filename(NonTmpRelatedFile),
- [read, write, raw, binary, delayed_write]),
+ MainHdl = open_file(NonTmpRelatedFile, ?WRITE_MODE ++ [read]),
{ok, Top} = file:position(MainHdl, Top),
%% wipe out any rubbish at the end of the file
ok = file:truncate(MainHdl),
@@ -1872,7 +1867,7 @@ recover_crashed_compactions1(Files, TmpFile) ->
%% single move if we run out of disk space, this truncate
%% could fail, but we still aren't risking losing data
ok = file:truncate(MainHdl),
- {ok, TmpHdl} = file:open(TmpPath, [read, raw, binary, read_ahead]),
+ TmpHdl = open_file(TmpFile, ?READ_MODE),
{ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize),
ok = file:sync(MainHdl),
ok = file:close(MainHdl),