diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-22 18:01:48 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-22 18:01:48 +0100 |
| commit | ffba4bc3a76efa5f102db4d639403e2f0921e8f4 (patch) | |
| tree | 2717e606f3c37f01c4cd4d820aaf302017e3397c | |
| parent | 36e2dee055754e7aa3bd56660e312385172ef903 (diff) | |
| download | rabbitmq-server-git-ffba4bc3a76efa5f102db4d639403e2f0921e8f4.tar.gz | |
some refactorings. Not all done yet.
| -rw-r--r-- | src/rabbit_disk_queue.erl | 101 |
1 files changed, 51 insertions, 50 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 7ee02f99e8..b32fffe4d1 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -260,8 +260,14 @@ stop_and_obliterate() -> %% ---- GEN-SERVER INTERNAL API ---- init([FileSizeLimit, ReadFileHandlesLimit]) -> - %% gen_server does not trap by default. Without this, terminate/2 - %% won't be called + %% If the gen_server is part of a supervision tree and is ordered + %% by its supervisor to terminate, terminate will be called with + %% Reason=shutdown if the following conditions apply: + %% * the gen_server has been set to trap exit signals, and + %% * the shutdown strategy as defined in the supervisor's + %% child specification is an integer timeout value, not + %% brutal_kill. + %% Otherwise, the gen_server will be immediately terminated. process_flag(trap_exit, true), ok = filelib:ensure_dir(form_filename("nothing")), InitName = "0" ++ ?FILE_EXTENSION, @@ -370,9 +376,7 @@ base_directory() -> internal_deliver(Q, State = #dqstate { msg_location = MsgLocation, - sequences = Sequences, - read_file_handles_limit = ReadFileHandlesLimit, - read_file_handles = {ReadHdls, ReadHdlsAge} + sequences = Sequences }) -> case ets:lookup(Sequences, Q) of [] -> {ok, empty, State}; @@ -383,31 +387,7 @@ internal_deliver(Q, State = #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] -> [{MsgId, _RefCount, File, Offset, TotalSize}] = dets:lookup(MsgLocation, MsgId), - Now = now(), - {FileHdl, ReadHdls1, ReadHdlsAge1} = - case dict:find(File, ReadHdls) of - error -> - {ok, Hdl} = file:open(form_filename(File), - [read, raw, binary, - read_ahead]), - case dict:size(ReadHdls) < ReadFileHandlesLimit of - true -> - {Hdl, ReadHdls, ReadHdlsAge}; - _False -> - {Then, OldFile, ReadHdlsAge3} = - gb_trees:take_smallest(ReadHdlsAge), - {ok, {OldHdl, Then}} = - dict:find(OldFile, ReadHdls), - ok = file:close(OldHdl), - {Hdl, dict:erase(OldFile, ReadHdls), - ReadHdlsAge3} - end; - {ok, {Hdl, Then}} -> - {Hdl, ReadHdls, - gb_trees:delete(Then, ReadHdlsAge)} - end, - ReadHdls2 = dict:store(File, {FileHdl, Now}, ReadHdls1), - ReadHdlsAge2 = gb_trees:enter(Now, File, ReadHdlsAge1), + {FileHdl, State1} = getReadHandle(File, State), %% read the message {ok, {MsgBody, BodySize}} = read_message_at_offset(FileHdl, Offset, TotalSize), @@ -417,10 +397,38 @@ internal_deliver(Q, State = end, true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}), {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}}, - State #dqstate { read_file_handles = {ReadHdls2, ReadHdlsAge2} }} + State1} end end. +getReadHandle(File, State = + #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge}, + read_file_handles_limit = ReadFileHandlesLimit }) -> + Now = now(), + {FileHdl, ReadHdls1, ReadHdlsAge1} = + case dict:find(File, ReadHdls) of + error -> + {ok, Hdl} = file:open(form_filename(File), + [read, raw, binary, + read_ahead]), + case dict:size(ReadHdls) < ReadFileHandlesLimit of + true -> + {Hdl, ReadHdls, ReadHdlsAge}; + _False -> + {Then, OldFile, ReadHdlsAge2} = + gb_trees:take_smallest(ReadHdlsAge), + {ok, {OldHdl, Then}} = + dict:find(OldFile, ReadHdls), + ok = file:close(OldHdl), + {Hdl, dict:erase(OldFile, ReadHdls), ReadHdlsAge2} + end; + {ok, {Hdl, Then}} -> + {Hdl, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)} + end, + ReadHdls3 = dict:store(File, {FileHdl, Now}, ReadHdls1), + ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1), + {FileHdl, State #dqstate {read_file_handles = {ReadHdls3, ReadHdlsAge3}}}. + internal_ack(Q, MsgIds, State) -> remove_messages(Q, MsgIds, true, State). @@ -664,6 +672,14 @@ sortMsgLocationsByOffset(Asc, List) -> Comp(OffA, OffB) end, List). +truncateAndExtendFile(FileHdl, Lowpoint, Highpoint) -> + {ok, Lowpoint} = file:position(FileHdl, {bof, Lowpoint}), + ok = file:truncate(FileHdl), + {ok, Highpoint} = file:position(FileHdl, {bof, Highpoint}), + ok = file:truncate(FileHdl), + {ok, Lowpoint} = file:position(FileHdl, {bof, Lowpoint}), + ok. + combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRight}, {Destination, DestinationValid, DestinationContiguousTop, @@ -683,14 +699,8 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, %% then truncate, copy back in, and then copy over from Source %% otherwise we just truncate straight away and copy over from Source if DestinationContiguousTop =:= DestinationValid -> - {ok, DestinationValid} = file:position(DestinationHdl, - {bof, DestinationValid}), - ok = file:truncate(DestinationHdl), - {ok, ExpectedSize} = file:position(DestinationHdl, - {cur, SourceValid}), - ok = file:truncate(DestinationHdl), - {ok, DestinationValid} = file:position(DestinationHdl, - {bof, DestinationValid}); + ok = truncateAndExtendFile(DestinationHdl, + DestinationValid, ExpectedSize); true -> Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP, {ok, TmpHdl} = @@ -755,17 +765,8 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, %% reflect compaction of Destination so truncate %% Destination and copy from Tmp back to the end {ok, 0} = file:position(TmpHdl, {bof, 0}), - {ok, DestinationContiguousTop} = - file:position(DestinationHdl, - {bof, DestinationContiguousTop}), - ok = file:truncate(DestinationHdl), - {ok, ExpectedSize} = - file:position(DestinationHdl, - {bof, ExpectedSize}), - ok = file:truncate(DestinationHdl), - {ok, DestinationContiguousTop} = - file:position(DestinationHdl, - {bof, DestinationContiguousTop}), + ok = truncateAndExtendFile(DestinationHdl, + DestinationContiguousTop, ExpectedSize), {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize), %% position in DestinationHdl should now be %% DestinationValid |
