summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-03 16:58:17 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-03 16:58:17 +0000
commit3d8a846b69581cb4372903b9dd27361077fb8ad5 (patch)
treef2546865e82a1b78840bbdeba37ed9f037aa114d /src
parentb10788b74928824690cddee5cfc8da930c7282ef (diff)
downloadrabbitmq-server-git-3d8a846b69581cb4372903b9dd27361077fb8ad5.tar.gz
Fixed a few bugs in fhc, pushed fhc through to msg_store and msg_file. API change to fhc:position to match file, also extended fhc with copy. msg_store must also trap exits so that it will shut down cleanly - especially important given that data to be written is now cached more aggressively. Removal of stop from msg_store as it's part of a supervisor and so the correct way to stop it is via the supervisor.
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl72
-rw-r--r--src/rabbit_file_handle_cache.erl128
-rw-r--r--src/rabbit_msg_file.erl17
-rw-r--r--src/rabbit_msg_store.erl226
-rw-r--r--src/rabbit_queue_index.erl4
5 files changed, 180 insertions, 267 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index fe4e90774d..38aa482040 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -32,7 +32,7 @@
-module(file_handle_cache).
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
- last_sync_offset/1, append_write_buffer/1]).
+ last_sync_offset/1, current_offset/1, append_write_buffer/1, copy/3]).
%%----------------------------------------------------------------------------
@@ -76,10 +76,14 @@
({'ok', ([char()]|binary())} | eof | error())).
-spec(append/2 :: (ref(), iodata()) -> ok_or_error()).
-spec(sync/1 :: (ref()) -> ok_or_error()).
--spec(position/2 :: (ref(), position()) -> ok_or_error()).
+-spec(position/2 :: (ref(), position()) ->
+ ({'ok', non_neg_integer()} | error())).
-spec(truncate/1 :: (ref()) -> ok_or_error()).
-spec(last_sync_offset/1 :: (ref()) -> ({'ok', integer()} | error())).
--spec(append_write_buffer/1 :: (ref()) -> ok_or_error()).
+-spec(current_offset/1 :: (ref()) -> ({'ok', integer()} | error())).
+-spec(append_write_buffer/1 :: (ref()) -> ok_or_error()).
+-spec(copy/3 :: (ref(), ref(), non_neg_integer()) ->
+ ({'ok', integer()} | error())).
-endif.
@@ -191,7 +195,7 @@ append(Ref, Data) ->
{ok, Handle} ->
{Result, Handle1} =
case maybe_seek(eof, Handle) of
- {ok, Handle2 = #handle { at_eof = true }} ->
+ {{ok, _Offset}, Handle2 = #handle { at_eof = true }} ->
write_to_buffer(Data, Handle2);
{{error, _} = Error, Handle2} ->
{Error, Handle2}
@@ -266,8 +270,17 @@ truncate(Ref) ->
last_sync_offset(Ref) ->
case get_or_reopen(Ref) of
- {ok, #handle { trusted_offset = TrustedOffset }} ->
- {ok, TrustedOffset};
+ {ok, #handle { trusted_offset = TrustedOffset }} -> {ok, TrustedOffset};
+ Error -> Error
+ end.
+
+current_offset(Ref) ->
+ case get_or_reopen(Ref) of
+ {ok, #handle { at_eof = true, is_write = true, offset = Offset,
+ write_buffer_size = Size }} ->
+ {ok, Offset + Size};
+ {ok, #handle { offset = Offset }} ->
+ {ok, Offset};
Error -> Error
end.
@@ -280,6 +293,45 @@ append_write_buffer(Ref) ->
Error -> Error
end.
+copy(Src, Dest, Count) ->
+ case get_or_reopen(Src) of
+ {ok, SHandle = #handle { is_read = true }} ->
+ case get_or_reopen(Dest) of
+ {ok, DHandle = #handle { is_write = true }} ->
+ {Result, SHandle1, DHandle1} =
+ case write_buffer(SHandle) of
+ {ok, SHandle2 = #handle { hdl = SHdl,
+ offset = SOffset }} ->
+ case write_buffer(DHandle) of
+ {ok,
+ DHandle2 = #handle { hdl = DHdl,
+ offset = DOffset }} ->
+ Result1 = file:copy(SHdl, DHdl, Count),
+ case Result1 of
+ {ok, Count1} ->
+ {Result1,
+ SHandle2 #handle {
+ offset = SOffset + Count1 },
+ DHandle2 #handle {
+ offset = DOffset + Count1 }};
+ Error ->
+ {Error, SHandle2, DHandle2}
+ end;
+ Error -> {Error, SHandle2, DHandle}
+ end;
+ Error -> {Error, SHandle, DHandle}
+ end,
+ put({Src, fhc_handle}, SHandle1),
+ put({Dest, fhc_handle}, DHandle1),
+ Result;
+ {ok, _} -> {error, destination_not_open_for_writing};
+ Error -> Error
+ end;
+ {ok, _} -> {error, source_not_open_for_reading};
+ Error -> Error
+ end.
+
+
%%----------------------------------------------------------------------------
%% Internal functions
%%----------------------------------------------------------------------------
@@ -326,13 +378,15 @@ maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, at_eof = AtEoF,
end,
case Result of
{ok, Offset1} ->
- {ok, Handle #handle { at_eof = AtEoF1, offset = Offset1 }};
+ {Result, Handle #handle { at_eof = AtEoF1, offset = Offset1 }};
{error, _} = Error -> {Error, Handle}
end.
-write_to_buffer(Data, Handle = #handle { hdl = Hdl,
+write_to_buffer(Data, Handle = #handle { hdl = Hdl, offset = Offset,
write_buffer_size_limit = 0 }) ->
- {file:write(Hdl, Data), Handle #handle { is_dirty = true }};
+ Offset1 = Offset + iolist_size(Data),
+ {file:write(Hdl, Data),
+ Handle #handle { is_dirty = true, offset = Offset1 }};
write_to_buffer(Data, Handle =
#handle { write_buffer = WriteBuffer,
write_buffer_size = Size,
diff --git a/src/rabbit_file_handle_cache.erl b/src/rabbit_file_handle_cache.erl
deleted file mode 100644
index 85a5d6e942..0000000000
--- a/src/rabbit_file_handle_cache.erl
+++ /dev/null
@@ -1,128 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_file_handle_cache).
-
--export([init/2, close_all/1, close_file/2, with_file_handle_at/4]).
-
-%%----------------------------------------------------------------------------
-
--include("rabbit.hrl").
-
--record(hcstate,
- { limit, %% how many file handles can we open?
- handles, %% dict of the files to their handles, age and offset
- ages, %% gb_tree of the files, keyed by age
- mode %% the mode to open the files as
- }).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(hcstate() :: #hcstate { limit :: non_neg_integer(),
- handles :: dict(),
- ages :: gb_tree(),
- mode :: [file_open_mode()]
- }).
-
--spec(init/2 :: (non_neg_integer(), [file_open_mode()]) -> hcstate()).
--spec(close_all/1 :: (hcstate()) -> hcstate()).
--spec(close_file/2 :: (file_path(), hcstate()) -> hcstate()).
--spec(with_file_handle_at/4 :: (file_path(), non_neg_integer(),
- fun ((io_device()) -> {non_neg_integer(), A}),
- hcstate()) ->
- {A, hcstate()}).
--endif.
-
-%%----------------------------------------------------------------------------
-
-init(Limit, OpenMode) ->
- #hcstate { limit = Limit,
- handles = dict:new(),
- ages = gb_trees:empty(),
- mode = OpenMode
- }.
-
-close_all(State = #hcstate { handles = Handles }) ->
- dict:fold(fun (_File, {Hdl, _Offset, _Then}, _Acc) ->
- file:close(Hdl)
- end, ok, Handles),
- State #hcstate { handles = dict:new(), ages = gb_trees:empty() }.
-
-close_file(File, State = #hcstate { handles = Handles,
- ages = Ages }) ->
- case dict:find(File, Handles) of
- error ->
- State;
- {ok, {Hdl, _Offset, Then}} ->
- ok = file:close(Hdl),
- State #hcstate { handles = dict:erase(File, Handles),
- ages = gb_trees:delete(Then, Ages)
- }
- end.
-
-with_file_handle_at(File, Offset, Fun, State = #hcstate { handles = Handles,
- ages = Ages,
- limit = Limit,
- mode = Mode }) ->
- {FileHdl, OldOffset, Handles1, Ages1} =
- case dict:find(File, Handles) of
- error ->
- {ok, Hdl} = file:open(File, Mode),
- case dict:size(Handles) < Limit of
- true ->
- {Hdl, 0, Handles, Ages};
- false ->
- {Then, OldFile, Ages2} = gb_trees:take_smallest(Ages),
- {ok, {OldHdl, _Offset, Then}} =
- dict:find(OldFile, Handles),
- ok = file:close(OldHdl),
- {Hdl, 0, dict:erase(OldFile, Handles), Ages2}
- end;
- {ok, {Hdl, OldOffset1, Then}} ->
- {Hdl, OldOffset1, Handles, gb_trees:delete(Then, Ages)}
- end,
- SeekRes = case Offset == OldOffset of
- true -> ok;
- false -> case file:position(FileHdl, {bof, Offset}) of
- {ok, Offset} -> ok;
- KO -> KO
- end
- end,
- {NewOffset, Result} = case SeekRes of
- ok -> Fun(FileHdl);
- KO1 -> {OldOffset, KO1}
- end,
- Now = now(),
- Handles2 = dict:store(File, {FileHdl, NewOffset, Now}, Handles1),
- Ages3 = gb_trees:enter(Now, File, Ages1),
- {Result, State #hcstate { handles = Handles2, ages = Ages3 }}.
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index 84dce90e8c..c08261591d 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -70,10 +70,11 @@ append(FileHdl, MsgId, MsgBody)
MsgBodyBin = term_to_binary(MsgBody),
MsgBodyBinSize = size(MsgBodyBin),
Size = MsgBodyBinSize + ?MSG_ID_SIZE_BYTES,
- case file:write(FileHdl, <<Size:?INTEGER_SIZE_BITS,
- MsgId:?MSG_ID_SIZE_BYTES/binary,
- MsgBodyBin:MsgBodyBinSize/binary,
- ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of
+ case file_handle_cache:append(FileHdl,
+ <<Size:?INTEGER_SIZE_BITS,
+ MsgId:?MSG_ID_SIZE_BYTES/binary,
+ MsgBodyBin:MsgBodyBinSize/binary,
+ ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of
ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT};
KO -> KO
end.
@@ -81,7 +82,7 @@ append(FileHdl, MsgId, MsgBody)
read(FileHdl, TotalSize) ->
Size = TotalSize - ?FILE_PACKING_ADJUSTMENT,
BodyBinSize = Size - ?MSG_ID_SIZE_BYTES,
- case file:read(FileHdl, TotalSize) of
+ case file_handle_cache:read(FileHdl, TotalSize) of
{ok, <<Size:?INTEGER_SIZE_BITS,
MsgId:?MSG_ID_SIZE_BYTES/binary,
MsgBodyBin:BodyBinSize/binary,
@@ -105,7 +106,7 @@ scan(FileHdl, Offset, Acc) ->
end.
read_next(FileHdl, Offset) ->
- case file:read(FileHdl, ?SIZE_AND_MSG_ID_BYTES) of
+ case file_handle_cache:read(FileHdl, ?SIZE_AND_MSG_ID_BYTES) of
%% Here we take option 5 from
%% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in which
%% we read the MsgId as a number, and then convert it back to
@@ -116,11 +117,11 @@ read_next(FileHdl, Offset) ->
_ ->
TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
ExpectedAbsPos = Offset + TotalSize - 1,
- case file:position(
+ case file_handle_cache:position(
FileHdl, {cur, Size - ?MSG_ID_SIZE_BYTES}) of
{ok, ExpectedAbsPos} ->
NextOffset = ExpectedAbsPos + 1,
- case file:read(FileHdl, 1) of
+ case file_handle_cache:read(FileHdl, 1) of
{ok,
<<?WRITE_OK_MARKER: ?WRITE_OK_SIZE_BITS>>} ->
<<MsgId:?MSG_ID_SIZE_BYTES/binary>> =
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index a492a0248d..89f13c6fd0 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -34,7 +34,7 @@
-behaviour(gen_server2).
-export([start_link/3, write/2, read/1, peruse/2, contains/1, remove/1,
- release/1, sync/2, stop/0]).
+ release/1, sync/2]).
-export([sync/0]). %% internal
@@ -46,6 +46,7 @@
-define(MAX_READ_FILE_HANDLES, 256).
-define(FILE_SIZE_LIMIT, (256*1024*1024)).
-define(SYNC_INTERVAL, 5). %% milliseconds
+-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB
%%----------------------------------------------------------------------------
@@ -67,7 +68,6 @@
-spec(remove/1 :: ([msg_id()]) -> 'ok').
-spec(release/1 :: ([msg_id()]) -> 'ok').
-spec(sync/2 :: ([msg_id()], fun (() -> any())) -> 'ok').
--spec(stop/0 :: () -> 'ok').
-endif.
@@ -79,12 +79,9 @@
file_summary, %% what's in the files?
current_file, %% current file name as number
current_file_handle, %% current file handle
- current_offset, %% current offset within current file
- current_dirty, %% has the current file been written to
%% since the last fsync?
file_size_limit, %% how big can our files get?
- read_file_handle_cache, %% file handle cache for reading
- last_sync_offset, %% current_offset at the last time we sync'd
+ file_handle_cache, %% file handle cache
on_sync, %% pending sync requests
sync_timer_ref, %% TRef for our interval timer
message_cache %% ets message cache
@@ -241,7 +238,6 @@ contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity).
remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}).
release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}).
sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}).
-stop() -> gen_server2:call(?SERVER, stop, infinity).
sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal
%%----------------------------------------------------------------------------
@@ -249,15 +245,14 @@ sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal
%%----------------------------------------------------------------------------
init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
-
+ process_flag(trap_exit, true),
+
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
MsgLocations = ets:new(?MSG_LOC_NAME,
[set, private, {keypos, #msg_location.msg_id}]),
InitFile = 0,
- HandleCache = rabbit_file_handle_cache:init(?MAX_READ_FILE_HANDLES,
- ?BINARY_MODE ++ [read]),
FileSummary = ets:new(?FILE_SUMMARY_ETS_NAME,
[set, private, {keypos, #file_summary.file}]),
MessageCache = ets:new(?CACHE_ETS_NAME, [set, private]),
@@ -267,11 +262,8 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
file_summary = FileSummary,
current_file = InitFile,
current_file_handle = undefined,
- current_offset = 0,
- current_dirty = false,
file_size_limit = ?FILE_SIZE_LIMIT,
- read_file_handle_cache = HandleCache,
- last_sync_offset = 0,
+ file_handle_cache = dict:new(),
on_sync = [],
sync_timer_ref = undefined,
message_cache = MessageCache
@@ -286,13 +278,14 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
%% There should be no more tmp files now, so go ahead and load the
%% whole lot
Files = [filename_to_num(FileName) || FileName <- FileNames],
- State1 = #msstate { current_file = CurFile, current_offset = Offset } =
+ {Offset, State1 = #msstate { current_file = CurFile }} =
build_index(Files, State),
%% read is only needed so that we can seek
{ok, FileHdl} = open_file(Dir, filenum_to_name(CurFile),
- ?WRITE_MODE ++ [read]),
- {ok, Offset} = file:position(FileHdl, Offset),
+ [read | ?WRITE_MODE]),
+ {ok, Offset} = file_handle_cache:position(FileHdl, Offset),
+ ok = file_handle_cache:truncate(FileHdl),
{ok, State1 #msstate { current_file_handle = FileHdl }}.
@@ -304,19 +297,16 @@ handle_call({contains, MsgId}, _From, State) ->
reply(case index_lookup(MsgId, State) of
not_found -> false;
#msg_location {} -> true
- end, State);
-
-handle_call(stop, _From, State) ->
- {stop, normal, ok, State}.
+ end, State).
handle_cast({write, MsgId, Msg},
State = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
- current_offset = CurOffset,
file_summary = FileSummary }) ->
case index_lookup(MsgId, State) of
not_found ->
%% New message, lots to do
+ {ok, CurOffset} = file_handle_cache:current_offset(CurHdl),
{ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg),
ok = index_insert(#msg_location {
msg_id = MsgId, ref_count = 1, file = CurFile,
@@ -336,10 +326,7 @@ handle_cast({write, MsgId, Msg},
valid_total_size = ValidTotalSize1,
contiguous_top = ContiguousTop1 }),
NextOffset = CurOffset + TotalSize,
- noreply(
- maybe_roll_to_new_file(
- NextOffset, State #msstate {current_offset = NextOffset,
- current_dirty = true}));
+ noreply(maybe_roll_to_new_file(NextOffset, State));
StoreEntry = #msg_location { ref_count = RefCount } ->
%% We already know about it, just update counter
ok = index_update(StoreEntry #msg_location {
@@ -371,15 +358,11 @@ handle_cast({release, MsgIds}, State) ->
lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds),
noreply(State);
-handle_cast({sync, _MsgIds, K},
- State = #msstate { current_dirty = false }) ->
- K(),
- noreply(State);
-
handle_cast({sync, MsgIds, K},
- State = #msstate { current_file = CurFile,
- last_sync_offset = SyncOffset,
- on_sync = Syncs }) ->
+ State = #msstate { current_file = CurFile,
+ current_file_handle = CurHdl,
+ on_sync = Syncs }) ->
+ {ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl),
case lists:any(fun (MsgId) ->
#msg_location { file = File, offset = Offset } =
index_lookup(MsgId, State),
@@ -398,22 +381,19 @@ handle_info(timeout, State) ->
terminate(_Reason, State = #msstate { msg_locations = MsgLocations,
file_summary = FileSummary,
- current_file_handle = FileHdl,
- read_file_handle_cache = HC }) ->
+ current_file_handle = FileHdl }) ->
State1 = case FileHdl of
undefined -> State;
_ -> State2 = sync(State),
- file:close(FileHdl),
+ file_handle_cache:close(FileHdl),
State2
end,
- HC1 = rabbit_file_handle_cache:close_all(HC),
+ State3 = close_all_handles(State1),
ets:delete(MsgLocations),
ets:delete(FileSummary),
- State1 #msstate { msg_locations = undefined,
- file_summary = undefined,
- current_file_handle = undefined,
- current_dirty = false,
- read_file_handle_cache = HC1 }.
+ State3 #msstate { msg_locations = undefined,
+ file_summary = undefined,
+ current_file_handle = undefined }.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -455,50 +435,28 @@ filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)).
-open_file(Dir, FileName, Mode) ->
- file:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode).
-
sort_file_names(FileNames) ->
lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end,
FileNames).
preallocate(Hdl, FileSizeLimit, FinalPos) ->
- {ok, FileSizeLimit} = file:position(Hdl, FileSizeLimit),
- ok = file:truncate(Hdl),
- {ok, FinalPos} = file:position(Hdl, FinalPos),
+ {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit),
+ ok = file_handle_cache:truncate(Hdl),
+ {ok, FinalPos} = file_handle_cache:position(Hdl, FinalPos),
ok.
truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) ->
- {ok, Lowpoint} = file:position(FileHdl, Lowpoint),
- ok = file:truncate(FileHdl),
+ {ok, Lowpoint} = file_handle_cache:position(FileHdl, Lowpoint),
+ ok = file_handle_cache:truncate(FileHdl),
ok = preallocate(FileHdl, Highpoint, Lowpoint).
-sync(State = #msstate { current_dirty = false }) ->
- State;
sync(State = #msstate { current_file_handle = CurHdl,
- current_offset = CurOffset,
on_sync = Syncs }) ->
State1 = stop_sync_timer(State),
- ok = file:sync(CurHdl),
+ %% we depend on this really calling sync, even if [] == Syncs
+ ok = file_handle_cache:sync(CurHdl),
lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
- State1 #msstate { current_dirty = false,
- last_sync_offset = CurOffset,
- on_sync = [] }.
-
-with_read_handle_at(File, Offset, Fun,
- State = #msstate { dir = Dir,
- read_file_handle_cache = HC,
- current_file = CurFile,
- current_dirty = IsDirty,
- last_sync_offset = SyncOffset }) ->
- State1 = if CurFile == File andalso IsDirty andalso Offset >= SyncOffset ->
- sync(State);
- true -> State
- end,
- FilePath = form_filename(Dir, filenum_to_name(File)),
- {Result, HC1} =
- rabbit_file_handle_cache:with_file_handle_at(FilePath, Offset, Fun, HC),
- {Result, State1 #msstate { read_file_handle_cache = HC1 }}.
+ State1 #msstate { on_sync = [] }.
remove_message(MsgId, State = #msstate { file_summary = FileSummary }) ->
StoreEntry = #msg_location { ref_count = RefCount, file = File,
@@ -524,7 +482,9 @@ remove_message(MsgId, State = #msstate { file_summary = FileSummary }) ->
no_compact
end.
-internal_read_message(MsgId, State) ->
+internal_read_message(MsgId,
+ State = #msstate { current_file = CurFile,
+ current_file_handle = CurHdl }) ->
case index_lookup(MsgId, State) of
not_found -> {not_found, State};
#msg_location { ref_count = RefCount,
@@ -533,37 +493,70 @@ internal_read_message(MsgId, State) ->
total_size = TotalSize } ->
case fetch_and_increment_cache(MsgId, State) of
not_found ->
- {{ok, {MsgId, Msg}}, State1} =
- with_read_handle_at(
- File, Offset,
- fun(Hdl) ->
- Res = case rabbit_msg_file:read(
- Hdl, TotalSize) of
- {ok, {MsgId, _}} = Obj -> Obj;
- {ok, Rest} ->
- throw({error,
- {misread,
- [{old_state, State},
+ {ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl),
+ State1 =
+ case CurFile =:= File andalso Offset >= SyncOffset of
+ true -> sync(State);
+ false -> State
+ end,
+ {Hdl, State2} = get_read_handle(File, State1),
+ {ok, Offset} = file_handle_cache:position(Hdl, Offset),
+ {ok, {MsgId, Msg}} =
+ case rabbit_msg_file:read(Hdl, TotalSize) of
+ {ok, {MsgId, _}} = Obj -> Obj;
+ Rest ->
+ throw({error, {misread, [{old_state, State},
{file_num, File},
{offset, Offset},
{read, Rest}]}})
- end,
- {Offset + TotalSize, Res}
- end, State),
+ end,
ok = if RefCount > 1 ->
- insert_into_cache(MsgId, Msg, State1);
+ insert_into_cache(MsgId, Msg, State2);
true -> ok
%% it's not in the cache and we
%% only have one reference to the
%% message. So don't bother
%% putting it in the cache.
end,
- {{ok, Msg}, State1};
+ {{ok, Msg}, State2};
{Msg, _RefCount} ->
{{ok, Msg}, State}
end
end.
+close_handle(Key, State = #msstate { file_handle_cache = FHC }) ->
+ case dict:find(Key, FHC) of
+ {ok, Hdl} ->
+ ok = close_file(Hdl),
+ State #msstate { file_handle_cache = dict:erase(Key, FHC) };
+ error -> State
+ end.
+
+close_all_handles(State = #msstate { file_handle_cache = FHC }) ->
+ ok = dict:fold(fun (_Key, Hdl, ok) ->
+ file_handle_cache:close(Hdl)
+ end, ok, FHC),
+ State #msstate { file_handle_cache = dict:new() }.
+
+get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC }) ->
+ case dict:find(FileNum, FHC) of
+ {ok, Hdl} -> {Hdl, State};
+ error -> new_handle(FileNum, filenum_to_name(FileNum),
+ [read | ?BINARY_MODE], State)
+ end.
+
+new_handle(Key, FileName, Mode, State = #msstate { file_handle_cache = FHC,
+ dir = Dir }) ->
+ {ok, Hdl} = open_file(Dir, FileName, Mode),
+ {Hdl, State #msstate { file_handle_cache = dict:store(Key, Hdl, FHC) }}.
+
+open_file(Dir, FileName, Mode) ->
+ file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
+ [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
+
+close_file(Hdl) ->
+ file_handle_cache:close(Hdl).
+
%%----------------------------------------------------------------------------
%% message cache helper functions
%%----------------------------------------------------------------------------
@@ -732,7 +725,7 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
true = is_disjoint(MsgIds1, MsgIdsTmp),
%% must open with read flag, otherwise will stomp over contents
{ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName,
- ?WRITE_MODE ++ [read]),
+ [read | ?WRITE_MODE]),
%% Wipe out any rubbish at the end of the file. Remember
%% the head of the list will be the highest entry in the
%% file.
@@ -743,10 +736,9 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
%% fail, but we still aren't risking losing data
ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize),
{ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_AHEAD_MODE),
- {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize),
- ok = file:sync(MainHdl),
- ok = file:close(MainHdl),
- ok = file:close(TmpHdl),
+ {ok, TmpSize} = file_handle_cache:copy(TmpHdl, MainHdl, TmpSize),
+ ok = file_handle_cache:close(MainHdl),
+ ok = file_handle_cache:close(TmpHdl),
ok = file:delete(TmpPath),
{ok, _MainMessages, MsgIdsMain} =
@@ -775,7 +767,7 @@ scan_file_for_valid_messages(Dir, FileName) ->
Valid = rabbit_msg_file:scan(Hdl),
%% if something really bad's happened, the close could fail,
%% but ignore
- file:close(Hdl),
+ file_handle_cache:close(Hdl),
Valid;
{error, enoent} -> {ok, []};
{error, Reason} -> throw({error,
@@ -812,8 +804,8 @@ build_index(Left, [], FilesToCompact, State) ->
total_size = TotalSize } | _] ->
MaxOffset + TotalSize
end,
- compact(FilesToCompact, %% this never includes the current file
- State #msstate { current_file = Left, current_offset = Offset });
+ {Offset, compact(FilesToCompact, %% this never includes the current file
+ State #msstate { current_file = Left })};
build_index(Left, [File|Files], FilesToCompact,
State = #msstate { dir = Dir, file_summary = FileSummary }) ->
{ok, Messages} = scan_file_for_valid_messages(Dir, filenum_to_name(File)),
@@ -860,7 +852,7 @@ maybe_roll_to_new_file(Offset,
file_summary = FileSummary })
when Offset >= FileSizeLimit ->
State1 = sync(State),
- ok = file:close(CurHdl),
+ ok = close_file(CurHdl),
NextFile = CurFile + 1,
{ok, NextHdl} = open_file(Dir, filenum_to_name(NextFile), ?WRITE_MODE),
true = ets:update_element(FileSummary, CurFile,
@@ -870,9 +862,7 @@ maybe_roll_to_new_file(Offset,
file = NextFile, valid_total_size = 0, contiguous_top = 0,
left = CurFile, right = undefined }),
State2 = State1 #msstate { current_file_handle = NextHdl,
- current_file = NextFile,
- current_offset = 0,
- last_sync_offset = 0 },
+ current_file = NextFile },
compact([CurFile], State2);
maybe_roll_to_new_file(_, State) ->
State.
@@ -957,9 +947,9 @@ combine_files(#file_summary { file = Source,
contiguous_top = DestinationContiguousTop,
right = Source },
State = #msstate { dir = Dir }) ->
+ State1 = close_handle(Source, close_handle(Destination, State)),
SourceName = filenum_to_name(Source),
DestinationName = filenum_to_name(Destination),
- State1 = close_file(SourceName, close_file(DestinationName, State)),
{ok, SourceHdl} = open_file(Dir, SourceName, ?READ_AHEAD_MODE),
{ok, DestinationHdl} = open_file(Dir, DestinationName,
?READ_AHEAD_MODE ++ ?WRITE_MODE),
@@ -998,21 +988,22 @@ combine_files(#file_summary { file = Source,
%% Destination, and MsgLocationDets has been updated to
%% reflect compaction of Destination so truncate
%% Destination and copy from Tmp back to the end
- {ok, 0} = file:position(TmpHdl, 0),
+ {ok, 0} = file_handle_cache:position(TmpHdl, 0),
ok = truncate_and_extend_file(
DestinationHdl, DestinationContiguousTop, ExpectedSize),
- {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize),
+ {ok, TmpSize} =
+ file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
%% position in DestinationHdl should now be DestinationValid
- ok = file:sync(DestinationHdl),
- ok = file:close(TmpHdl),
+ ok = file_handle_cache:sync(DestinationHdl),
+ ok = close_file(TmpHdl),
ok = file:delete(form_filename(Dir, Tmp))
end,
SourceWorkList = index_search_by_file(Source, State1),
ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
SourceHdl, DestinationHdl, Destination, State1),
%% tidy up
- ok = file:close(SourceHdl),
- ok = file:close(DestinationHdl),
+ ok = close_file(SourceHdl),
+ ok = close_file(DestinationHdl),
ok = file:delete(form_filename(Dir, SourceName)),
State1.
@@ -1042,24 +1033,19 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
%% the previous block
BSize = BlockEnd - BlockStart,
{ok, BlockStart} =
- file:position(SourceHdl, BlockStart),
- {ok, BSize} =
- file:copy(SourceHdl, DestinationHdl, BSize),
+ file_handle_cache:position(SourceHdl, BlockStart),
+ {ok, BSize} = file_handle_cache:copy(
+ SourceHdl, DestinationHdl, BSize),
{NextOffset, Offset, Offset + TotalSize}
end
end, {InitOffset, undefined, undefined}, WorkList),
%% do the last remaining block
BSize1 = BlockEnd1 - BlockStart1,
- {ok, BlockStart1} = file:position(SourceHdl, BlockStart1),
- {ok, BSize1} = file:copy(SourceHdl, DestinationHdl, BSize1),
- ok = file:sync(DestinationHdl),
+ {ok, BlockStart1} = file_handle_cache:position(SourceHdl, BlockStart1),
+ {ok, BSize1} = file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1),
+ ok = file_handle_cache:sync(DestinationHdl),
ok.
-close_file(FileName,
- State = #msstate { dir = Dir, read_file_handle_cache = HC }) ->
- HC1 = rabbit_file_handle_cache:close_file(form_filename(Dir, FileName), HC),
- State #msstate { read_file_handle_cache = HC1 }.
-
delete_file_if_empty(File,
#msstate { dir = Dir, file_summary = FileSummary }) ->
[#file_summary { valid_total_size = ValidData,
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 4b48df82dc..febf3217bd 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -251,7 +251,7 @@ flush_journal(State = #qistate { journal_ack_dict = JAckDict,
if
JCount1 == 0 ->
{Hdl, State4} = get_journal_handle(State3),
- ok = file_handle_cache:position(Hdl, bof),
+ {ok, 0} = file_handle_cache:position(Hdl, bof),
ok = file_handle_cache:truncate(Hdl),
ok = file_handle_cache:sync(Hdl),
State4;
@@ -705,7 +705,7 @@ load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls,
{Hdl, State1 = #qistate { journal_del_dict = JDelDict,
journal_ack_dict = JAckDict }} =
get_seg_handle(SegNum, State),
- ok = file_handle_cache:position(Hdl, bof),
+ {ok, 0} = file_handle_cache:position(Hdl, bof),
{SDict, PubCount, AckCount, HighRelSeq} =
load_segment_entries(Hdl, dict:new(), 0, 0, 0),
%% delete ack'd msgs first