summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-22 11:49:47 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-22 11:49:47 +0100
commit283cd95333fb49eef6a9c60c38f561971b1b85b5 (patch)
tree05cb0fdce5c5198babdce2847f6b2821491b4076 /src
parent4cb3ed5008d54c6c61016ac0ff175550074e7245 (diff)
downloadrabbitmq-server-git-283cd95333fb49eef6a9c60c38f561971b1b85b5.tar.gz
More work on the real fhc
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl153
1 files changed, 124 insertions, 29 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index c5efa00d1e..933f8a833c 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -32,7 +32,7 @@
-module(file_handle_cache).
-export([init/0, open/4, close/2, release/2, read/4, write/4, sync/2,
- position/3, truncate/2, with_file_handle_at/4, sync_to_offset/3]).
+ position/3, truncate/2, last_sync_offset/2]).
-record(file,
{ reader_count,
@@ -45,11 +45,15 @@
offset,
trusted_offset,
write_buffer_size,
+ write_buffer_size_limit,
write_buffer,
at_eof,
is_append,
+ is_write,
+ is_read,
mode,
- global_key
+ global_key,
+ last_used_at
}).
open(Path, Mode, Options, State) ->
@@ -59,7 +63,7 @@ open(Path, Mode, Options, State) ->
#file { reader_count = RCount, has_writer = HasWriter }
= File = get({GRef, fhc_file}),
Mode1 = lists:usort(Mode),
- IsWriter = is_writer(Mode1),
+ IsWriter = is_writer(Mode1) orelse is_appender(Mode),
case IsWriter andalso HasWriter of
true ->
{{error, writer_exists}, State};
@@ -118,14 +122,19 @@ release(_Ref, State) -> %% noop just for now
read(Ref, NewOffset, Count, State) ->
case get({Ref, fhc_handle}) of
undefined -> {{error, not_open}, State};
+ #handle { is_read = false } -> {{error, not_open_for_reading}, State};
Handle ->
case write_buffer(Handle) of
{ok, Handle1 = #handle { hdl = Hdl, at_eof = AtEoF,
offset = Offset }} ->
- {AtEoF1, Offset1} =
- maybe_position(Hdl, AtEoF, Offset, NewOffset),
+ {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset),
+ {ok, Offset1} = case NeedsSeek of
+ true -> file:position(Hdl, NewOffset);
+ false -> {ok, Offset}
+ end,
Handle2 = Handle1 #handle { at_eof = AtEoF1,
- offset = Offset1 },
+ offset = Offset1,
+ last_used_at = now() },
{Handle3, Result} =
case file:read(Hdl, Count) of
{ok, Data} -> {Handle2, {ok, Data}};
@@ -140,6 +149,56 @@ read(Ref, NewOffset, Count, State) ->
end
end.
+write(Ref, NewOffset, Data, State) ->
+ case get({Ref, fhc_handle}) of
+ undefined -> {{error, not_open}, State};
+ Handle = #handle { is_append = true, is_write = false } ->
+ {Result, Handle1} = write_to_buffer(Data, Handle),
+ put({Ref, fhc_handle}, Handle1),
+ {Result, State};
+ Handle = #handle { is_append = false, is_write = true, at_eof = AtEoF,
+ offset = Offset, write_buffer_size = BufSize } ->
+ %% If we wrote the buffer out now, where would we end up?
+ %% Note that if AtEoF == true then it would still be true
+ %% after writing the buffer out, but if AtEoF == false,
+ %% it's possible it should be true after writing the
+ %% buffer out, but we won't know about it.
+ VirtualOffset = Offset + BufSize,
+ %% AtEoF1 says "after writing the buffer out, we will be
+ %% at VirtualOffset. At that point, we travel to
+ %% NewOffset. When we get there, will we be at eof?"
+ {AtEoF1, NeedsSeek} = needs_seek(AtEoF, VirtualOffset, NewOffset),
+ {Error, Handle1} =
+ case NeedsSeek of
+ %% Now if we don't seek, we don't write the buffer
+ %% out. This means we'll still be at Offset, and
+ %% AtEoF still applies. We need to add the data to
+ %% the buffer and leave it at that.
+ false -> {ok, Handle};
+ %% If we do seek, then we write the buffer out,
+ %% which means that AtEoF1 applies, because after
+ %% writing the buffer out, we'll be at
+ %% VirtualOffset, and then want to get to
+ %% NewOffset.
+ true ->
+ case write_buffer(Handle) of
+ {ok, Handle2 = #handle { hdl = Hdl }} ->
+ {ok, Offset2} = file:position(Hdl, NewOffset),
+ {ok, Handle2 #handle { offset = Offset2,
+ at_eof = AtEoF1 }};
+ {Error1, Handle2} -> {Error1, Offset, Handle2}
+ end
+ end,
+ case Error of
+ ok -> {Result, Handle3} = write_to_buffer(Data, Handle1),
+ put({Ref, fhc_handle}, Handle3),
+ {Result, State};
+ _ -> put({Ref, fhc_handle}, Handle1),
+ {Error, State}
+ end;
+ _ -> {{error, not_open_for_writing}, State}
+ end.
+
open1(Path, Mode, Options, GRef, State) ->
case file:open(Path, Mode) of
{ok, Hdl} ->
@@ -152,21 +211,46 @@ open1(Path, Mode, Options, GRef, State) ->
Ref = make_ref(),
put({Ref, fhc_handle},
#handle { hdl = Hdl, offset = 0, trusted_offset = 0,
- write_buffer_size = WriteBufferSize,
+ write_buffer_size = 0,
+ write_buffer_size_limit = WriteBufferSize,
write_buffer = [], at_eof = false,
- is_append = lists:member(append, Mode), mode = Mode,
- global_key = GRef }),
+ is_append = is_appender(Mode), mode = Mode,
+ is_write = is_writer(Mode), is_read = is_reader(Mode),
+ global_key = GRef, last_used_at = now() }),
{{ok, Ref}, State};
{error, Reason} ->
{{error, Reason}, State}
end.
+write_to_buffer(Data, Handle = #handle { hdl = Hdl,
+ write_buffer_size_limit = 0 }) ->
+ {file:write(Hdl, Data), Handle #handle { last_used_at = now() }};
+write_to_buffer(Data, Handle =
+ #handle { write_buffer = WriteBuffer,
+ write_buffer_size = Size,
+ write_buffer_size_limit = infinity }) ->
+ {ok, Handle #handle { write_buffer_size = Size + size_of_write_data(Data),
+ write_buffer = [ Data | WriteBuffer ],
+ last_used_at = now() }};
+write_to_buffer(Data, Handle =
+ #handle { write_buffer = WriteBuffer,
+ write_buffer_size = Size,
+ write_buffer_size_limit = Limit }) ->
+ Size1 = Size + size_of_write_data(Data),
+ Handle1 = Handle #handle { write_buffer = [ Data | WriteBuffer ],
+ write_buffer_size = Size1,
+ last_used_at = now() },
+ case Size1 > Limit of
+ true -> write_buffer(Handle1);
+ false -> {ok, Handle1}
+ end.
+
write_buffer(Handle = #handle { write_buffer = [] }) ->
- Handle;
+ {ok, Handle};
write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
write_buffer = WriteBuffer,
+ write_buffer_size = DataSize,
is_append = IsAppend, at_eof = AtEoF }) ->
- DataSize = size_of_write_data(WriteBuffer),
case file:write(Hdl, lists:reverse(WriteBuffer)) of
ok ->
Offset1 = case IsAppend of
@@ -175,7 +259,7 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
end,
AtEoF1 = AtEoF andalso not IsAppend,
{ok, Handle #handle { offset = Offset1, write_buffer = [],
- at_eof = AtEoF1 }};
+ write_buffer_size = 0, at_eof = AtEoF1 }};
{error, Reason} ->
{{error, Reason}, Handle}
end.
@@ -190,24 +274,35 @@ size_of_write_data([A|B], Acc) ->
size_of_write_data(Bin, Acc) when is_binary(Bin) ->
size(Bin) + Acc.
-is_reader(Mode) ->
- lists:member(read, Mode).
+is_reader(Mode) -> lists:member(read, Mode).
+
+is_writer(Mode) -> lists:member(write, Mode).
-is_writer(Mode) ->
- lists:member(write, Mode) orelse lists:member(append, Mode).
+is_appender(Mode) -> lists:member(append, Mode).
-%% maybe_position(Hdl, AtEof, CurOffset, DesiredOffset)
-maybe_position(_Hdl, AtEof, CurOffset, cur) ->
- {AtEof, CurOffset};
-maybe_position(_Hdl, true, CurOffset, eof) ->
- {true, CurOffset};
-maybe_position(_Hdl, AtEof, CurOffset, CurOffset) ->
- {AtEof, CurOffset};
-maybe_position(Hdl, true, CurOffset, DesiredOffset)
+needs_seek(AtEof, _CurOffset, DesiredOffset)
+ when DesiredOffset == cur orelse DesiredOffset == {cur, 0} ->
+ {AtEof, false};
+needs_seek(true, _CurOffset, DesiredOffset)
+ when DesiredOffset == eof orelse DesiredOffset == {eof, 0} ->
+ {true, false};
+needs_seek(false, _CurOffset, DesiredOffset)
+ when DesiredOffset == eof orelse DesiredOffset == {eof, 0} ->
+ {true, true};
+needs_seek(AtEof, 0, DesiredOffset)
+ when DesiredOffset == bof orelse DesiredOffset == {bof, 0} ->
+ {AtEof, false};
+needs_seek(AtEof, CurOffset, CurOffset) ->
+ {AtEof, false};
+needs_seek(true, CurOffset, {bof, DesiredOffset})
when DesiredOffset >= CurOffset ->
- {ok, Offset} = file:position(Hdl, DesiredOffset),
- {true, Offset};
+ {true, true};
+needs_seek(true, _CurOffset, {cur, DesiredOffset})
+ when DesiredOffset > 0 ->
+ {true, true};
+needs_seek(true, CurOffset, DesiredOffset) %% same as {bof, DO}
+ when is_integer(DesiredOffset) andalso DesiredOffset >= CurOffset ->
+ {true, true};
%% because we can't really track size, we could well end up at EoF and not know
-maybe_position(Hdl, _AtEoF, _CurOffset, DesiredOffset) ->
- {ok, Offset} = file:position(Hdl, DesiredOffset),
- {false, Offset}.
+needs_seek(_AtEoF, _CurOffset, _DesiredOffset) ->
+ {false, true}.