diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-22 11:49:47 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-22 11:49:47 +0100 |
| commit | 283cd95333fb49eef6a9c60c38f561971b1b85b5 (patch) | |
| tree | 05cb0fdce5c5198babdce2847f6b2821491b4076 /src | |
| parent | 4cb3ed5008d54c6c61016ac0ff175550074e7245 (diff) | |
| download | rabbitmq-server-git-283cd95333fb49eef6a9c60c38f561971b1b85b5.tar.gz | |
More work on the real fhc
Diffstat (limited to 'src')
| -rw-r--r-- | src/file_handle_cache.erl | 153 |
1 files changed, 124 insertions, 29 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index c5efa00d1e..933f8a833c 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -32,7 +32,7 @@ -module(file_handle_cache). -export([init/0, open/4, close/2, release/2, read/4, write/4, sync/2, - position/3, truncate/2, with_file_handle_at/4, sync_to_offset/3]). + position/3, truncate/2, last_sync_offset/2]). -record(file, { reader_count, @@ -45,11 +45,15 @@ offset, trusted_offset, write_buffer_size, + write_buffer_size_limit, write_buffer, at_eof, is_append, + is_write, + is_read, mode, - global_key + global_key, + last_used_at }). open(Path, Mode, Options, State) -> @@ -59,7 +63,7 @@ open(Path, Mode, Options, State) -> #file { reader_count = RCount, has_writer = HasWriter } = File = get({GRef, fhc_file}), Mode1 = lists:usort(Mode), - IsWriter = is_writer(Mode1), + IsWriter = is_writer(Mode1) orelse is_appender(Mode), case IsWriter andalso HasWriter of true -> {{error, writer_exists}, State}; @@ -118,14 +122,19 @@ release(_Ref, State) -> %% noop just for now read(Ref, NewOffset, Count, State) -> case get({Ref, fhc_handle}) of undefined -> {{error, not_open}, State}; + #handle { is_read = false } -> {{error, not_open_for_reading}, State}; Handle -> case write_buffer(Handle) of {ok, Handle1 = #handle { hdl = Hdl, at_eof = AtEoF, offset = Offset }} -> - {AtEoF1, Offset1} = - maybe_position(Hdl, AtEoF, Offset, NewOffset), + {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset), + {ok, Offset1} = case NeedsSeek of + true -> file:position(Hdl, NewOffset); + false -> {ok, Offset} + end, Handle2 = Handle1 #handle { at_eof = AtEoF1, - offset = Offset1 }, + offset = Offset1, + last_used_at = now() }, {Handle3, Result} = case file:read(Hdl, Count) of {ok, Data} -> {Handle2, {ok, Data}}; @@ -140,6 +149,56 @@ read(Ref, NewOffset, Count, State) -> end end. +write(Ref, NewOffset, Data, State) -> + case get({Ref, fhc_handle}) of + undefined -> {{error, not_open}, State}; + Handle = #handle { is_append = true, is_write = false } -> + {Result, Handle1} = write_to_buffer(Data, Handle), + put({Ref, fhc_handle}, Handle1), + {Result, State}; + Handle = #handle { is_append = false, is_write = true, at_eof = AtEoF, + offset = Offset, write_buffer_size = BufSize } -> + %% If we wrote the buffer out now, where would we end up? + %% Note that if AtEoF == true then it would still be true + %% after writing the buffer out, but if AtEoF == false, + %% it's possible it should be true after writing the + %% buffer out, but we won't know about it. + VirtualOffset = Offset + BufSize, + %% AtEoF1 says "after writing the buffer out, we will be + %% at VirtualOffset. At that point, we travel to + %% NewOffset. When we get there, will we be at eof?" + {AtEoF1, NeedsSeek} = needs_seek(AtEoF, VirtualOffset, NewOffset), + {Error, Handle1} = + case NeedsSeek of + %% Now if we don't seek, we don't write the buffer + %% out. This means we'll still be at Offset, and + %% AtEoF still applies. We need to add the data to + %% the buffer and leave it at that. + false -> {ok, Handle}; + %% If we do seek, then we write the buffer out, + %% which means that AtEoF1 applies, because after + %% writing the buffer out, we'll be at + %% VirtualOffset, and then want to get to + %% NewOffset. + true -> + case write_buffer(Handle) of + {ok, Handle2 = #handle { hdl = Hdl }} -> + {ok, Offset2} = file:position(Hdl, NewOffset), + {ok, Handle2 #handle { offset = Offset2, + at_eof = AtEoF1 }}; + {Error1, Handle2} -> {Error1, Offset, Handle2} + end + end, + case Error of + ok -> {Result, Handle3} = write_to_buffer(Data, Handle1), + put({Ref, fhc_handle}, Handle3), + {Result, State}; + _ -> put({Ref, fhc_handle}, Handle1), + {Error, State} + end; + _ -> {{error, not_open_for_writing}, State} + end. + open1(Path, Mode, Options, GRef, State) -> case file:open(Path, Mode) of {ok, Hdl} -> @@ -152,21 +211,46 @@ open1(Path, Mode, Options, GRef, State) -> Ref = make_ref(), put({Ref, fhc_handle}, #handle { hdl = Hdl, offset = 0, trusted_offset = 0, - write_buffer_size = WriteBufferSize, + write_buffer_size = 0, + write_buffer_size_limit = WriteBufferSize, write_buffer = [], at_eof = false, - is_append = lists:member(append, Mode), mode = Mode, - global_key = GRef }), + is_append = is_appender(Mode), mode = Mode, + is_write = is_writer(Mode), is_read = is_reader(Mode), + global_key = GRef, last_used_at = now() }), {{ok, Ref}, State}; {error, Reason} -> {{error, Reason}, State} end. +write_to_buffer(Data, Handle = #handle { hdl = Hdl, + write_buffer_size_limit = 0 }) -> + {file:write(Hdl, Data), Handle #handle { last_used_at = now() }}; +write_to_buffer(Data, Handle = + #handle { write_buffer = WriteBuffer, + write_buffer_size = Size, + write_buffer_size_limit = infinity }) -> + {ok, Handle #handle { write_buffer_size = Size + size_of_write_data(Data), + write_buffer = [ Data | WriteBuffer ], + last_used_at = now() }}; +write_to_buffer(Data, Handle = + #handle { write_buffer = WriteBuffer, + write_buffer_size = Size, + write_buffer_size_limit = Limit }) -> + Size1 = Size + size_of_write_data(Data), + Handle1 = Handle #handle { write_buffer = [ Data | WriteBuffer ], + write_buffer_size = Size1, + last_used_at = now() }, + case Size1 > Limit of + true -> write_buffer(Handle1); + false -> {ok, Handle1} + end. + write_buffer(Handle = #handle { write_buffer = [] }) -> - Handle; + {ok, Handle}; write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, write_buffer = WriteBuffer, + write_buffer_size = DataSize, is_append = IsAppend, at_eof = AtEoF }) -> - DataSize = size_of_write_data(WriteBuffer), case file:write(Hdl, lists:reverse(WriteBuffer)) of ok -> Offset1 = case IsAppend of @@ -175,7 +259,7 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, end, AtEoF1 = AtEoF andalso not IsAppend, {ok, Handle #handle { offset = Offset1, write_buffer = [], - at_eof = AtEoF1 }}; + write_buffer_size = 0, at_eof = AtEoF1 }}; {error, Reason} -> {{error, Reason}, Handle} end. @@ -190,24 +274,35 @@ size_of_write_data([A|B], Acc) -> size_of_write_data(Bin, Acc) when is_binary(Bin) -> size(Bin) + Acc. -is_reader(Mode) -> - lists:member(read, Mode). +is_reader(Mode) -> lists:member(read, Mode). + +is_writer(Mode) -> lists:member(write, Mode). -is_writer(Mode) -> - lists:member(write, Mode) orelse lists:member(append, Mode). +is_appender(Mode) -> lists:member(append, Mode). -%% maybe_position(Hdl, AtEof, CurOffset, DesiredOffset) -maybe_position(_Hdl, AtEof, CurOffset, cur) -> - {AtEof, CurOffset}; -maybe_position(_Hdl, true, CurOffset, eof) -> - {true, CurOffset}; -maybe_position(_Hdl, AtEof, CurOffset, CurOffset) -> - {AtEof, CurOffset}; -maybe_position(Hdl, true, CurOffset, DesiredOffset) +needs_seek(AtEof, _CurOffset, DesiredOffset) + when DesiredOffset == cur orelse DesiredOffset == {cur, 0} -> + {AtEof, false}; +needs_seek(true, _CurOffset, DesiredOffset) + when DesiredOffset == eof orelse DesiredOffset == {eof, 0} -> + {true, false}; +needs_seek(false, _CurOffset, DesiredOffset) + when DesiredOffset == eof orelse DesiredOffset == {eof, 0} -> + {true, true}; +needs_seek(AtEof, 0, DesiredOffset) + when DesiredOffset == bof orelse DesiredOffset == {bof, 0} -> + {AtEof, false}; +needs_seek(AtEof, CurOffset, CurOffset) -> + {AtEof, false}; +needs_seek(true, CurOffset, {bof, DesiredOffset}) when DesiredOffset >= CurOffset -> - {ok, Offset} = file:position(Hdl, DesiredOffset), - {true, Offset}; + {true, true}; +needs_seek(true, _CurOffset, {cur, DesiredOffset}) + when DesiredOffset > 0 -> + {true, true}; +needs_seek(true, CurOffset, DesiredOffset) %% same as {bof, DO} + when is_integer(DesiredOffset) andalso DesiredOffset >= CurOffset -> + {true, true}; %% because we can't really track size, we could well end up at EoF and not know -maybe_position(Hdl, _AtEoF, _CurOffset, DesiredOffset) -> - {ok, Offset} = file:position(Hdl, DesiredOffset), - {false, Offset}. +needs_seek(_AtEoF, _CurOffset, _DesiredOffset) -> + {false, true}. |
