summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-24 12:56:29 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-24 12:56:29 +0100
commit07fd4cc4321ddb0fc21b50738ce7d3a2b77557b0 (patch)
treee743bafbd7e96e4d9398f0d09e30e8a004f5b910 /src
parent2940ef9035d139458c1f6a184ebd11f8cbc437a5 (diff)
downloadrabbitmq-server-git-07fd4cc4321ddb0fc21b50738ce7d3a2b77557b0.tar.gz
Abstracted out all the read handles stuff to a separate module, and refined the API as discussed.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl128
-rw-r--r--src/rabbit_handle_cache.erl103
2 files changed, 147 insertions, 84 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 04c8a82579..362d1e42e9 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -92,8 +92,7 @@
current_dirty, %% has the current file been written to
%% since the last fsync?
file_size_limit, %% how big can our files get?
- read_file_handles, %% file handles for reading (LRU)
- read_file_handles_limit, %% how many file handles can we open?
+ read_file_hc_cache, %% file handle cache for reading
on_sync_txns, %% list of commiters to run on sync (reversed)
commit_timer_ref, %% TRef for our interval timer
last_sync_offset, %% current_offset at the last time we sync'd
@@ -420,8 +419,9 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
current_offset = 0,
current_dirty = false,
file_size_limit = FileSizeLimit,
- read_file_handles = {dict:new(), gb_trees:empty()},
- read_file_handles_limit = ReadFileHandlesLimit,
+ read_file_hc_cache = rabbit_handle_cache:init(
+ ReadFileHandlesLimit,
+ [read, raw, binary, read_ahead]),
on_sync_txns = [],
commit_timer_ref = undefined,
last_sync_offset = 0,
@@ -491,8 +491,7 @@ handle_call(stop_vaporise, _From, State) ->
true = ets:delete(Sequences),
lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))),
{stop, normal, ok,
- State1 #dqstate { current_file_handle = undefined,
- read_file_handles = {dict:new(), gb_trees:empty()}}};
+ State1 #dqstate { current_file_handle = undefined }};
%% gen_server now calls terminate, which then calls shutdown
handle_call(to_disk_only_mode, _From, State) ->
reply(ok, to_disk_only_mode(State));
@@ -576,7 +575,7 @@ terminate(_Reason, State) ->
shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
msg_location_ets = MsgLocationEts,
current_file_handle = FileHdl,
- read_file_handles = {ReadHdls, _ReadHdlsAge}
+ read_file_hc_cache = HC
}) ->
%% deliberately ignoring return codes here
State1 = stop_commit_timer(stop_memory_timer(State)),
@@ -589,12 +588,10 @@ shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
_ -> sync_current_file_handle(State),
file:close(FileHdl)
end,
- dict:fold(fun (_File, Hdl, _Acc) ->
- file:close(Hdl)
- end, ok, ReadHdls),
+ HC1 = rabbit_handle_cache:close_all(HC),
State1 #dqstate { current_file_handle = undefined,
current_dirty = false,
- read_file_handles = {dict:new(), gb_trees:empty()},
+ read_file_hc_cache = HC1,
memory_report_timer_ref = undefined
}.
@@ -754,42 +751,20 @@ dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts,
operation_mode = ram_disk }, Obj) ->
ets:match_object(MsgLocationEts, Obj).
-get_read_handle(File, Offset, TotalSize, State =
- #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge},
- read_file_handles_limit = ReadFileHandlesLimit,
- current_file_name = CurName,
- current_dirty = IsDirty,
- last_sync_offset = SyncOffset
- }) ->
- NewOffset = Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
- State1 = if CurName =:= File andalso IsDirty andalso NewOffset > SyncOffset ->
+with_read_handle_at(File, Offset, Fun, State =
+ #dqstate { read_file_hc_cache = HC,
+ current_file_name = CurName,
+ current_dirty = IsDirty,
+ last_sync_offset = SyncOffset
+ }) ->
+ State1 = if CurName =:= File andalso IsDirty andalso Offset >= SyncOffset ->
sync_current_file_handle(State);
true -> State
end,
- Now = now(),
- {FileHdl, OldOffset, ReadHdls1, ReadHdlsAge1} =
- case dict:find(File, ReadHdls) of
- error ->
- {ok, Hdl} = file:open(form_filename(File),
- [read, raw, binary]),
- case dict:size(ReadHdls) < ReadFileHandlesLimit of
- true ->
- {Hdl, 0, ReadHdls, ReadHdlsAge};
- false ->
- {Then, OldFile, ReadHdlsAge2} =
- gb_trees:take_smallest(ReadHdlsAge),
- {ok, {OldHdl, _Offset, Then}} =
- dict:find(OldFile, ReadHdls),
- ok = file:close(OldHdl),
- {Hdl, 0, dict:erase(OldFile, ReadHdls), ReadHdlsAge2}
- end;
- {ok, {Hdl, OldOffset1, Then}} ->
- {Hdl, OldOffset1, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)}
- end,
- ReadHdls2 = dict:store(File, {FileHdl, NewOffset, Now}, ReadHdls1),
- ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1),
- {FileHdl, Offset /= OldOffset,
- State1 #dqstate { read_file_handles = {ReadHdls2, ReadHdlsAge3} }}.
+ FilePath = form_filename(File),
+ {Result, HC1} =
+ rabbit_handle_cache:with_file_handle_at(FilePath, Offset, Fun, HC),
+ {Result, State1 #dqstate { read_file_hc_cache = HC1 }}.
sequence_lookup(Sequences, Q) ->
case ets:lookup(Sequences, Q) of
@@ -913,10 +888,14 @@ read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount,
total_size = TotalSize }, State) ->
case fetch_and_increment_cache(MsgId, State) of
not_found ->
- {FileHdl, SeekReq, State1} =
- get_read_handle(File, Offset, TotalSize, State),
- {ok, {MsgBody, _IsPersistent, EncodedBodySize}} =
- read_message_at_offset(FileHdl, Offset, TotalSize, SeekReq),
+ {{ok, {MsgBody, _IsPersistent, EncodedBodySize}}, State1} =
+ with_read_handle_at(
+ File, Offset,
+ fun(Hdl) ->
+ {ok, _} = Res =
+ read_message_from_disk(Hdl, TotalSize),
+ {Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT, Res}
+ end, State),
Message = #basic_message {} = bin_to_msg(MsgBody),
ok = if RefCount > 1 ->
insert_into_cache(Message, EncodedBodySize, State1);
@@ -1474,17 +1453,9 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
{ok, BSize1} = file:copy(SourceHdl, DestinationHdl, BSize1),
ok.
-close_file(File, State = #dqstate { read_file_handles =
- {ReadHdls, ReadHdlsAge} }) ->
- case dict:find(File, ReadHdls) of
- error ->
- State;
- {ok, {Hdl, _Offset, Then}} ->
- ok = file:close(Hdl),
- State #dqstate { read_file_handles =
- { dict:erase(File, ReadHdls),
- gb_trees:delete(Then, ReadHdlsAge) } }
- end.
+close_file(File, State = #dqstate { read_file_hc_cache = HC }) ->
+ HC1 = rabbit_handle_cache:close_file(form_filename(File), HC),
+ State #dqstate { read_file_hc_cache = HC1 }.
delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) ->
[{File, ValidData, _ContiguousTop, Left, Right}] =
@@ -1883,33 +1854,22 @@ append_message(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) ->
KO -> KO
end.
-read_message_at_offset(FileHdl, Offset, TotalSize, SeekReq) ->
+read_message_from_disk(FileHdl, TotalSize) ->
TotalSizeWriteOkBytes = TotalSize + 1,
- SeekRes = case SeekReq of
- true -> case file:position(FileHdl, {bof, Offset}) of
- {ok, Offset} -> ok;
- KO -> KO
- end;
- false -> ok
- end,
- case SeekRes of
- ok ->
- case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of
- {ok, <<TotalSize:?INTEGER_SIZE_BITS,
- MsgIdBinSize:?INTEGER_SIZE_BITS,
- Rest:TotalSizeWriteOkBytes/binary>>} ->
- BodySize = TotalSize - MsgIdBinSize,
- case Rest of
- <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
- ?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>> ->
- {ok, {MsgBody, false, BodySize}};
- <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
- ?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>> ->
- {ok, {MsgBody, true, BodySize}}
- end;
- KO1 -> KO1
+ case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of
+ {ok, <<TotalSize:?INTEGER_SIZE_BITS,
+ MsgIdBinSize:?INTEGER_SIZE_BITS,
+ Rest:TotalSizeWriteOkBytes/binary>>} ->
+ BodySize = TotalSize - MsgIdBinSize,
+ case Rest of
+ <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
+ ?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>> ->
+ {ok, {MsgBody, false, BodySize}};
+ <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
+ ?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>> ->
+ {ok, {MsgBody, true, BodySize}}
end;
- KO2 -> KO2
+ KO -> KO
end.
scan_file_for_valid_messages(File) ->
diff --git a/src/rabbit_handle_cache.erl b/src/rabbit_handle_cache.erl
new file mode 100644
index 0000000000..3509429646
--- /dev/null
+++ b/src/rabbit_handle_cache.erl
@@ -0,0 +1,103 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_handle_cache).
+
+-export([init/2, close_all/1, close_file/2, with_file_handle_at/4]).
+
+-record(hcstate,
+ { limit, %% how many file handles can we open?
+ handles, %% dict of the files to their handles, age and offset
+ ages, %% gb_tree of the files, keyed by age
+ mode %% the mode to open the files as
+ }).
+
+init(Limit, OpenMode) ->
+ #hcstate { limit = Limit,
+ handles = dict:new(),
+ ages = gb_trees:empty(),
+ mode = OpenMode
+ }.
+
+close_all(State = #hcstate { handles = Handles }) ->
+ dict:fold(fun (_File, {Hdl, _Offset, _Then}, _Acc) ->
+ file:close(Hdl)
+ end, ok, Handles),
+ State #hcstate { handles = dict:new(), ages = gb_trees:empty() }.
+
+close_file(File, State = #hcstate { handles = Handles,
+ ages = Ages }) ->
+ case dict:find(File, Handles) of
+ error ->
+ State;
+ {ok, {Hdl, _Offset, Then}} ->
+ ok = file:close(Hdl),
+ State #hcstate { handles = dict:erase(File, Handles),
+ ages = gb_trees:delete(Then, Ages)
+ }
+ end.
+
+with_file_handle_at(File, Offset, Fun, State = #hcstate { handles = Handles,
+ ages = Ages,
+ limit = Limit,
+ mode = Mode }) ->
+ {FileHdl, OldOffset, Handles1, Ages1} =
+ case dict:find(File, Handles) of
+ error ->
+ {ok, Hdl} = file:open(File, Mode),
+ case dict:size(Handles) < Limit of
+ true ->
+ {Hdl, 0, Handles, Ages};
+ false ->
+ {Then, OldFile, Ages2} = gb_trees:take_smallest(Ages),
+ {ok, {OldHdl, _Offset, Then}} =
+ dict:find(OldFile, Handles),
+ ok = file:close(OldHdl),
+ {Hdl, 0, dict:erase(OldFile, Handles), Ages2}
+ end;
+ {ok, {Hdl, OldOffset1, Then}} ->
+ {Hdl, OldOffset1, Handles, gb_trees:delete(Then, Ages)}
+ end,
+ SeekRes = case Offset == OldOffset of
+ true -> ok;
+ false -> case file:position(FileHdl, {bof, Offset}) of
+ {ok, Offset} -> ok;
+ KO -> KO
+ end
+ end,
+ {NewOffset, Result} = case SeekRes of
+ ok -> Fun(FileHdl);
+ KO1 -> {Offset, KO1}
+ end,
+ Now = now(),
+ Handles2 = dict:store(File, {FileHdl, NewOffset, Now}, Handles1),
+ Ages3 = gb_trees:enter(Now, File, Ages1),
+ {Result, State #hcstate { handles = Handles2, ages = Ages3 }}.