diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-24 12:56:29 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-24 12:56:29 +0100 |
| commit | 07fd4cc4321ddb0fc21b50738ce7d3a2b77557b0 (patch) | |
| tree | e743bafbd7e96e4d9398f0d09e30e8a004f5b910 /src | |
| parent | 2940ef9035d139458c1f6a184ebd11f8cbc437a5 (diff) | |
| download | rabbitmq-server-git-07fd4cc4321ddb0fc21b50738ce7d3a2b77557b0.tar.gz | |
Abstracted out all the read handles stuff to a separate module, and refined the API as discussed.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 128 | ||||
| -rw-r--r-- | src/rabbit_handle_cache.erl | 103 |
2 files changed, 147 insertions, 84 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 04c8a82579..362d1e42e9 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -92,8 +92,7 @@ current_dirty, %% has the current file been written to %% since the last fsync? file_size_limit, %% how big can our files get? - read_file_handles, %% file handles for reading (LRU) - read_file_handles_limit, %% how many file handles can we open? + read_file_hc_cache, %% file handle cache for reading on_sync_txns, %% list of commiters to run on sync (reversed) commit_timer_ref, %% TRef for our interval timer last_sync_offset, %% current_offset at the last time we sync'd @@ -420,8 +419,9 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> current_offset = 0, current_dirty = false, file_size_limit = FileSizeLimit, - read_file_handles = {dict:new(), gb_trees:empty()}, - read_file_handles_limit = ReadFileHandlesLimit, + read_file_hc_cache = rabbit_handle_cache:init( + ReadFileHandlesLimit, + [read, raw, binary, read_ahead]), on_sync_txns = [], commit_timer_ref = undefined, last_sync_offset = 0, @@ -491,8 +491,7 @@ handle_call(stop_vaporise, _From, State) -> true = ets:delete(Sequences), lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))), {stop, normal, ok, - State1 #dqstate { current_file_handle = undefined, - read_file_handles = {dict:new(), gb_trees:empty()}}}; + State1 #dqstate { current_file_handle = undefined }}; %% gen_server now calls terminate, which then calls shutdown handle_call(to_disk_only_mode, _From, State) -> reply(ok, to_disk_only_mode(State)); @@ -576,7 +575,7 @@ terminate(_Reason, State) -> shutdown(State = #dqstate { msg_location_dets = MsgLocationDets, msg_location_ets = MsgLocationEts, current_file_handle = FileHdl, - read_file_handles = {ReadHdls, _ReadHdlsAge} + read_file_hc_cache = HC }) -> %% deliberately ignoring return codes here State1 = stop_commit_timer(stop_memory_timer(State)), @@ -589,12 +588,10 @@ shutdown(State = #dqstate { msg_location_dets = MsgLocationDets, _ -> sync_current_file_handle(State), file:close(FileHdl) end, - dict:fold(fun (_File, Hdl, _Acc) -> - file:close(Hdl) - end, ok, ReadHdls), + HC1 = rabbit_handle_cache:close_all(HC), State1 #dqstate { current_file_handle = undefined, current_dirty = false, - read_file_handles = {dict:new(), gb_trees:empty()}, + read_file_hc_cache = HC1, memory_report_timer_ref = undefined }. @@ -754,42 +751,20 @@ dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk }, Obj) -> ets:match_object(MsgLocationEts, Obj). -get_read_handle(File, Offset, TotalSize, State = - #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge}, - read_file_handles_limit = ReadFileHandlesLimit, - current_file_name = CurName, - current_dirty = IsDirty, - last_sync_offset = SyncOffset - }) -> - NewOffset = Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT, - State1 = if CurName =:= File andalso IsDirty andalso NewOffset > SyncOffset -> +with_read_handle_at(File, Offset, Fun, State = + #dqstate { read_file_hc_cache = HC, + current_file_name = CurName, + current_dirty = IsDirty, + last_sync_offset = SyncOffset + }) -> + State1 = if CurName =:= File andalso IsDirty andalso Offset >= SyncOffset -> sync_current_file_handle(State); true -> State end, - Now = now(), - {FileHdl, OldOffset, ReadHdls1, ReadHdlsAge1} = - case dict:find(File, ReadHdls) of - error -> - {ok, Hdl} = file:open(form_filename(File), - [read, raw, binary]), - case dict:size(ReadHdls) < ReadFileHandlesLimit of - true -> - {Hdl, 0, ReadHdls, ReadHdlsAge}; - false -> - {Then, OldFile, ReadHdlsAge2} = - gb_trees:take_smallest(ReadHdlsAge), - {ok, {OldHdl, _Offset, Then}} = - dict:find(OldFile, ReadHdls), - ok = file:close(OldHdl), - {Hdl, 0, dict:erase(OldFile, ReadHdls), ReadHdlsAge2} - end; - {ok, {Hdl, OldOffset1, Then}} -> - {Hdl, OldOffset1, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)} - end, - ReadHdls2 = dict:store(File, {FileHdl, NewOffset, Now}, ReadHdls1), - ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1), - {FileHdl, Offset /= OldOffset, - State1 #dqstate { read_file_handles = {ReadHdls2, ReadHdlsAge3} }}. + FilePath = form_filename(File), + {Result, HC1} = + rabbit_handle_cache:with_file_handle_at(FilePath, Offset, Fun, HC), + {Result, State1 #dqstate { read_file_hc_cache = HC1 }}. sequence_lookup(Sequences, Q) -> case ets:lookup(Sequences, Q) of @@ -913,10 +888,14 @@ read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount, total_size = TotalSize }, State) -> case fetch_and_increment_cache(MsgId, State) of not_found -> - {FileHdl, SeekReq, State1} = - get_read_handle(File, Offset, TotalSize, State), - {ok, {MsgBody, _IsPersistent, EncodedBodySize}} = - read_message_at_offset(FileHdl, Offset, TotalSize, SeekReq), + {{ok, {MsgBody, _IsPersistent, EncodedBodySize}}, State1} = + with_read_handle_at( + File, Offset, + fun(Hdl) -> + {ok, _} = Res = + read_message_from_disk(Hdl, TotalSize), + {Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT, Res} + end, State), Message = #basic_message {} = bin_to_msg(MsgBody), ok = if RefCount > 1 -> insert_into_cache(Message, EncodedBodySize, State1); @@ -1474,17 +1453,9 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, {ok, BSize1} = file:copy(SourceHdl, DestinationHdl, BSize1), ok. -close_file(File, State = #dqstate { read_file_handles = - {ReadHdls, ReadHdlsAge} }) -> - case dict:find(File, ReadHdls) of - error -> - State; - {ok, {Hdl, _Offset, Then}} -> - ok = file:close(Hdl), - State #dqstate { read_file_handles = - { dict:erase(File, ReadHdls), - gb_trees:delete(Then, ReadHdlsAge) } } - end. +close_file(File, State = #dqstate { read_file_hc_cache = HC }) -> + HC1 = rabbit_handle_cache:close_file(form_filename(File), HC), + State #dqstate { read_file_hc_cache = HC1 }. delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) -> [{File, ValidData, _ContiguousTop, Left, Right}] = @@ -1883,33 +1854,22 @@ append_message(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) -> KO -> KO end. -read_message_at_offset(FileHdl, Offset, TotalSize, SeekReq) -> +read_message_from_disk(FileHdl, TotalSize) -> TotalSizeWriteOkBytes = TotalSize + 1, - SeekRes = case SeekReq of - true -> case file:position(FileHdl, {bof, Offset}) of - {ok, Offset} -> ok; - KO -> KO - end; - false -> ok - end, - case SeekRes of - ok -> - case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of - {ok, <<TotalSize:?INTEGER_SIZE_BITS, - MsgIdBinSize:?INTEGER_SIZE_BITS, - Rest:TotalSizeWriteOkBytes/binary>>} -> - BodySize = TotalSize - MsgIdBinSize, - case Rest of - <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary, - ?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>> -> - {ok, {MsgBody, false, BodySize}}; - <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary, - ?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>> -> - {ok, {MsgBody, true, BodySize}} - end; - KO1 -> KO1 + case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of + {ok, <<TotalSize:?INTEGER_SIZE_BITS, + MsgIdBinSize:?INTEGER_SIZE_BITS, + Rest:TotalSizeWriteOkBytes/binary>>} -> + BodySize = TotalSize - MsgIdBinSize, + case Rest of + <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary, + ?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>> -> + {ok, {MsgBody, false, BodySize}}; + <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary, + ?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>> -> + {ok, {MsgBody, true, BodySize}} end; - KO2 -> KO2 + KO -> KO end. scan_file_for_valid_messages(File) -> diff --git a/src/rabbit_handle_cache.erl b/src/rabbit_handle_cache.erl new file mode 100644 index 0000000000..3509429646 --- /dev/null +++ b/src/rabbit_handle_cache.erl @@ -0,0 +1,103 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_handle_cache). + +-export([init/2, close_all/1, close_file/2, with_file_handle_at/4]). + +-record(hcstate, + { limit, %% how many file handles can we open? + handles, %% dict of the files to their handles, age and offset + ages, %% gb_tree of the files, keyed by age + mode %% the mode to open the files as + }). + +init(Limit, OpenMode) -> + #hcstate { limit = Limit, + handles = dict:new(), + ages = gb_trees:empty(), + mode = OpenMode + }. + +close_all(State = #hcstate { handles = Handles }) -> + dict:fold(fun (_File, {Hdl, _Offset, _Then}, _Acc) -> + file:close(Hdl) + end, ok, Handles), + State #hcstate { handles = dict:new(), ages = gb_trees:empty() }. + +close_file(File, State = #hcstate { handles = Handles, + ages = Ages }) -> + case dict:find(File, Handles) of + error -> + State; + {ok, {Hdl, _Offset, Then}} -> + ok = file:close(Hdl), + State #hcstate { handles = dict:erase(File, Handles), + ages = gb_trees:delete(Then, Ages) + } + end. + +with_file_handle_at(File, Offset, Fun, State = #hcstate { handles = Handles, + ages = Ages, + limit = Limit, + mode = Mode }) -> + {FileHdl, OldOffset, Handles1, Ages1} = + case dict:find(File, Handles) of + error -> + {ok, Hdl} = file:open(File, Mode), + case dict:size(Handles) < Limit of + true -> + {Hdl, 0, Handles, Ages}; + false -> + {Then, OldFile, Ages2} = gb_trees:take_smallest(Ages), + {ok, {OldHdl, _Offset, Then}} = + dict:find(OldFile, Handles), + ok = file:close(OldHdl), + {Hdl, 0, dict:erase(OldFile, Handles), Ages2} + end; + {ok, {Hdl, OldOffset1, Then}} -> + {Hdl, OldOffset1, Handles, gb_trees:delete(Then, Ages)} + end, + SeekRes = case Offset == OldOffset of + true -> ok; + false -> case file:position(FileHdl, {bof, Offset}) of + {ok, Offset} -> ok; + KO -> KO + end + end, + {NewOffset, Result} = case SeekRes of + ok -> Fun(FileHdl); + KO1 -> {Offset, KO1} + end, + Now = now(), + Handles2 = dict:store(File, {FileHdl, NewOffset, Now}, Handles1), + Ages3 = gb_trees:enter(Now, File, Ages1), + {Result, State #hcstate { handles = Handles2, ages = Ages3 }}. |
