summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-22 18:01:48 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-22 18:01:48 +0100
commitffba4bc3a76efa5f102db4d639403e2f0921e8f4 (patch)
tree2717e606f3c37f01c4cd4d820aaf302017e3397c
parent36e2dee055754e7aa3bd56660e312385172ef903 (diff)
downloadrabbitmq-server-git-ffba4bc3a76efa5f102db4d639403e2f0921e8f4.tar.gz
some refactorings. Not all done yet.
-rw-r--r--src/rabbit_disk_queue.erl101
1 files changed, 51 insertions, 50 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 7ee02f99e8..b32fffe4d1 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -260,8 +260,14 @@ stop_and_obliterate() ->
%% ---- GEN-SERVER INTERNAL API ----
init([FileSizeLimit, ReadFileHandlesLimit]) ->
- %% gen_server does not trap by default. Without this, terminate/2
- %% won't be called
+ %% If the gen_server is part of a supervision tree and is ordered
+ %% by its supervisor to terminate, terminate will be called with
+ %% Reason=shutdown if the following conditions apply:
+ %% * the gen_server has been set to trap exit signals, and
+ %% * the shutdown strategy as defined in the supervisor's
+ %% child specification is an integer timeout value, not
+ %% brutal_kill.
+ %% Otherwise, the gen_server will be immediately terminated.
process_flag(trap_exit, true),
ok = filelib:ensure_dir(form_filename("nothing")),
InitName = "0" ++ ?FILE_EXTENSION,
@@ -370,9 +376,7 @@ base_directory() ->
internal_deliver(Q, State =
#dqstate { msg_location = MsgLocation,
- sequences = Sequences,
- read_file_handles_limit = ReadFileHandlesLimit,
- read_file_handles = {ReadHdls, ReadHdlsAge}
+ sequences = Sequences
}) ->
case ets:lookup(Sequences, Q) of
[] -> {ok, empty, State};
@@ -383,31 +387,7 @@ internal_deliver(Q, State =
#dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] ->
[{MsgId, _RefCount, File, Offset, TotalSize}] =
dets:lookup(MsgLocation, MsgId),
- Now = now(),
- {FileHdl, ReadHdls1, ReadHdlsAge1} =
- case dict:find(File, ReadHdls) of
- error ->
- {ok, Hdl} = file:open(form_filename(File),
- [read, raw, binary,
- read_ahead]),
- case dict:size(ReadHdls) < ReadFileHandlesLimit of
- true ->
- {Hdl, ReadHdls, ReadHdlsAge};
- _False ->
- {Then, OldFile, ReadHdlsAge3} =
- gb_trees:take_smallest(ReadHdlsAge),
- {ok, {OldHdl, Then}} =
- dict:find(OldFile, ReadHdls),
- ok = file:close(OldHdl),
- {Hdl, dict:erase(OldFile, ReadHdls),
- ReadHdlsAge3}
- end;
- {ok, {Hdl, Then}} ->
- {Hdl, ReadHdls,
- gb_trees:delete(Then, ReadHdlsAge)}
- end,
- ReadHdls2 = dict:store(File, {FileHdl, Now}, ReadHdls1),
- ReadHdlsAge2 = gb_trees:enter(Now, File, ReadHdlsAge1),
+ {FileHdl, State1} = getReadHandle(File, State),
%% read the message
{ok, {MsgBody, BodySize}} =
read_message_at_offset(FileHdl, Offset, TotalSize),
@@ -417,10 +397,38 @@ internal_deliver(Q, State =
end,
true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}),
{ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}},
- State #dqstate { read_file_handles = {ReadHdls2, ReadHdlsAge2} }}
+ State1}
end
end.
+getReadHandle(File, State =
+ #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge},
+ read_file_handles_limit = ReadFileHandlesLimit }) ->
+ Now = now(),
+ {FileHdl, ReadHdls1, ReadHdlsAge1} =
+ case dict:find(File, ReadHdls) of
+ error ->
+ {ok, Hdl} = file:open(form_filename(File),
+ [read, raw, binary,
+ read_ahead]),
+ case dict:size(ReadHdls) < ReadFileHandlesLimit of
+ true ->
+ {Hdl, ReadHdls, ReadHdlsAge};
+ _False ->
+ {Then, OldFile, ReadHdlsAge2} =
+ gb_trees:take_smallest(ReadHdlsAge),
+ {ok, {OldHdl, Then}} =
+ dict:find(OldFile, ReadHdls),
+ ok = file:close(OldHdl),
+ {Hdl, dict:erase(OldFile, ReadHdls), ReadHdlsAge2}
+ end;
+ {ok, {Hdl, Then}} ->
+ {Hdl, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)}
+ end,
+ ReadHdls3 = dict:store(File, {FileHdl, Now}, ReadHdls1),
+ ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1),
+ {FileHdl, State #dqstate {read_file_handles = {ReadHdls3, ReadHdlsAge3}}}.
+
internal_ack(Q, MsgIds, State) ->
remove_messages(Q, MsgIds, true, State).
@@ -664,6 +672,14 @@ sortMsgLocationsByOffset(Asc, List) ->
Comp(OffA, OffB)
end, List).
+truncateAndExtendFile(FileHdl, Lowpoint, Highpoint) ->
+ {ok, Lowpoint} = file:position(FileHdl, {bof, Lowpoint}),
+ ok = file:truncate(FileHdl),
+ {ok, Highpoint} = file:position(FileHdl, {bof, Highpoint}),
+ ok = file:truncate(FileHdl),
+ {ok, Lowpoint} = file:position(FileHdl, {bof, Lowpoint}),
+ ok.
+
combineFiles({Source, SourceValid, _SourceContiguousTop,
_SourceLeft, _SourceRight},
{Destination, DestinationValid, DestinationContiguousTop,
@@ -683,14 +699,8 @@ combineFiles({Source, SourceValid, _SourceContiguousTop,
%% then truncate, copy back in, and then copy over from Source
%% otherwise we just truncate straight away and copy over from Source
if DestinationContiguousTop =:= DestinationValid ->
- {ok, DestinationValid} = file:position(DestinationHdl,
- {bof, DestinationValid}),
- ok = file:truncate(DestinationHdl),
- {ok, ExpectedSize} = file:position(DestinationHdl,
- {cur, SourceValid}),
- ok = file:truncate(DestinationHdl),
- {ok, DestinationValid} = file:position(DestinationHdl,
- {bof, DestinationValid});
+ ok = truncateAndExtendFile(DestinationHdl,
+ DestinationValid, ExpectedSize);
true ->
Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP,
{ok, TmpHdl} =
@@ -755,17 +765,8 @@ combineFiles({Source, SourceValid, _SourceContiguousTop,
%% reflect compaction of Destination so truncate
%% Destination and copy from Tmp back to the end
{ok, 0} = file:position(TmpHdl, {bof, 0}),
- {ok, DestinationContiguousTop} =
- file:position(DestinationHdl,
- {bof, DestinationContiguousTop}),
- ok = file:truncate(DestinationHdl),
- {ok, ExpectedSize} =
- file:position(DestinationHdl,
- {bof, ExpectedSize}),
- ok = file:truncate(DestinationHdl),
- {ok, DestinationContiguousTop} =
- file:position(DestinationHdl,
- {bof, DestinationContiguousTop}),
+ ok = truncateAndExtendFile(DestinationHdl,
+ DestinationContiguousTop, ExpectedSize),
{ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize),
%% position in DestinationHdl should now be
%% DestinationValid