diff options
| -rw-r--r-- | src/file_handle_cache.erl | 226 |
1 files changed, 106 insertions, 120 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 933f8a833c..e86344a087 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -31,7 +31,7 @@ -module(file_handle_cache). --export([init/0, open/4, close/2, release/2, read/4, write/4, sync/2, +-export([init/0, open/4, close/2, release/2, read/4, append/3, sync/2, position/3, truncate/2, last_sync_offset/2]). -record(file, @@ -48,7 +48,6 @@ write_buffer_size_limit, write_buffer, at_eof, - is_append, is_write, is_read, mode, @@ -56,33 +55,41 @@ last_used_at }). +init() -> empty_state. + open(Path, Mode, Options, State) -> - Path1 = filename:absname(Path), - case get({Path1, fhc_path}) of - {gref, GRef} -> - #file { reader_count = RCount, has_writer = HasWriter } - = File = get({GRef, fhc_file}), - Mode1 = lists:usort(Mode), - IsWriter = is_writer(Mode1) orelse is_appender(Mode), - case IsWriter andalso HasWriter of - true -> - {{error, writer_exists}, State}; - false -> - RCount1 = case is_reader(Mode1) of - true -> RCount + 1; - false -> RCount - end, - put({Path1, fhc_file}, - File #file { reader_count = RCount1, - has_writer = HasWriter orelse IsWriter }), - open1(Path1, Mode1, Options, GRef, State) - end; - undefined -> - GRef = make_ref(), - put({Path1, fhc_path}, {gref, GRef}), - put({GRef, fhc_file}, - #file { reader_count = 0, has_writer = false, path = Path1 }), - open(Path, Mode, Options, State) + case is_appender(Mode) of + true -> {{error, append_not_supported}, State}; + false -> + Path1 = filename:absname(Path), + case get({Path1, fhc_path}) of + {gref, GRef} -> + #file { reader_count = RCount, has_writer = HasWriter } + = File = get({GRef, fhc_file}), + Mode1 = lists:usort(Mode), + IsWriter = is_writer(Mode1), + case IsWriter andalso HasWriter of + true -> + {{error, writer_exists}, State}; + false -> + RCount1 = case is_reader(Mode1) of + true -> RCount + 1; + false -> RCount + end, + put({Path1, fhc_file}, + File #file { + reader_count = RCount1, + has_writer = HasWriter orelse IsWriter }), + open1(Path1, Mode1, Options, GRef, State) + end; + undefined -> + GRef = make_ref(), + put({Path1, fhc_path}, {gref, GRef}), + put({GRef, fhc_file}, + #file { reader_count = 0, has_writer = false, + path = Path1 }), + open(Path, Mode, Options, State) + end end. close(Ref, State) -> @@ -90,11 +97,10 @@ close(Ref, State) -> undefined -> {ok, State}; Handle -> case write_buffer(Handle) of - {ok, #handle { hdl = Hdl, mode = Mode, global_key = GRef }} -> + {ok, #handle { hdl = Hdl, global_key = GRef, + is_read = IsReader, is_write = IsWriter }} -> ok = file:sync(Hdl), ok = file:close(Hdl), - IsReader = is_reader(Mode), - IsWriter = is_writer(Mode), #file { reader_count = RCount, has_writer = HasWriter, path = Path } = File = get({GRef, fhc_file}), RCount1 = case IsReader of @@ -124,81 +130,61 @@ read(Ref, NewOffset, Count, State) -> undefined -> {{error, not_open}, State}; #handle { is_read = false } -> {{error, not_open_for_reading}, State}; Handle -> - case write_buffer(Handle) of - {ok, Handle1 = #handle { hdl = Hdl, at_eof = AtEoF, - offset = Offset }} -> - {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset), - {ok, Offset1} = case NeedsSeek of - true -> file:position(Hdl, NewOffset); - false -> {ok, Offset} - end, - Handle2 = Handle1 #handle { at_eof = AtEoF1, - offset = Offset1, - last_used_at = now() }, - {Handle3, Result} = - case file:read(Hdl, Count) of - {ok, Data} -> {Handle2, {ok, Data}}; - eof -> {Handle2 #handle { at_eof = true }, eof}; - {error, Reason} -> {Handle2, {error, Reason}} - end, - put({Ref, fhc_handle}, Handle3), - {Result, State}; - {Error, Handle1} -> - put({Ref, fhc_handle}, Handle1), - {Error, State} - end + {Result, Handle1} = + case write_buffer(Handle) of + {ok, Handle2} -> + case maybe_seek(NewOffset, Handle2) of + {ok, Handle3 = #handle { hdl = Hdl }} -> + case file:read(Hdl, Count) of + {ok, _} = Obj -> {Obj, Handle3}; + eof -> {eof, + Handle3 #handle { at_eof = true }}; + {error, _} = Error -> {Error, Handle3} + end; + {Error, Handle3} -> {Error, Handle3} + end; + {Error, Handle2} -> {Error, Handle2} + end, + put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }), + {Result, State} end. -write(Ref, NewOffset, Data, State) -> +append(Ref, Data, State) -> case get({Ref, fhc_handle}) of undefined -> {{error, not_open}, State}; - Handle = #handle { is_append = true, is_write = false } -> - {Result, Handle1} = write_to_buffer(Data, Handle), - put({Ref, fhc_handle}, Handle1), - {Result, State}; - Handle = #handle { is_append = false, is_write = true, at_eof = AtEoF, - offset = Offset, write_buffer_size = BufSize } -> - %% If we wrote the buffer out now, where would we end up? - %% Note that if AtEoF == true then it would still be true - %% after writing the buffer out, but if AtEoF == false, - %% it's possible it should be true after writing the - %% buffer out, but we won't know about it. - VirtualOffset = Offset + BufSize, - %% AtEoF1 says "after writing the buffer out, we will be - %% at VirtualOffset. At that point, we travel to - %% NewOffset. When we get there, will we be at eof?" - {AtEoF1, NeedsSeek} = needs_seek(AtEoF, VirtualOffset, NewOffset), - {Error, Handle1} = - case NeedsSeek of - %% Now if we don't seek, we don't write the buffer - %% out. This means we'll still be at Offset, and - %% AtEoF still applies. We need to add the data to - %% the buffer and leave it at that. - false -> {ok, Handle}; - %% If we do seek, then we write the buffer out, - %% which means that AtEoF1 applies, because after - %% writing the buffer out, we'll be at - %% VirtualOffset, and then want to get to - %% NewOffset. - true -> - case write_buffer(Handle) of - {ok, Handle2 = #handle { hdl = Hdl }} -> - {ok, Offset2} = file:position(Hdl, NewOffset), - {ok, Handle2 #handle { offset = Offset2, - at_eof = AtEoF1 }}; - {Error1, Handle2} -> {Error1, Offset, Handle2} - end + Handle = #handle { is_write = true } -> + {Result, Handle1} = + case maybe_seek(eof, Handle) of + {ok, Handle2 = #handle { at_eof = true }} -> + write_to_buffer(Data, Handle2); + {{error, _} = Error, Handle2} -> + {Error, Handle2} end, - case Error of - ok -> {Result, Handle3} = write_to_buffer(Data, Handle1), - put({Ref, fhc_handle}, Handle3), - {Result, State}; - _ -> put({Ref, fhc_handle}, Handle1), - {Error, State} - end; + put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }), + {Result, State}; _ -> {{error, not_open_for_writing}, State} end. +last_sync_offset(Ref, State) -> + case get({Ref, fhc_handle}) of + undefined -> {{error, not_open}, State}; + #handle { trusted_offset = TrustedOffset } -> + {{ok, TrustedOffset}, State} + end. + +position(Ref, NewOffset, State) -> + case get({Ref, fhc_handle}) of + undefined -> {{error, not_open}, State}; + Handle -> + {Result, Handle1} = + case write_buffer(Handle) of + {ok, Handle2} -> maybe_seek(NewOffset, Handle2); + {Error, Handle2} -> {Error, Handle2} + end, + put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }), + {Result, State} + end. + open1(Path, Mode, Options, GRef, State) -> case file:open(Path, Mode) of {ok, Hdl} -> @@ -213,8 +199,7 @@ open1(Path, Mode, Options, GRef, State) -> #handle { hdl = Hdl, offset = 0, trusted_offset = 0, write_buffer_size = 0, write_buffer_size_limit = WriteBufferSize, - write_buffer = [], at_eof = false, - is_append = is_appender(Mode), mode = Mode, + write_buffer = [], at_eof = false, mode = Mode, is_write = is_writer(Mode), is_read = is_reader(Mode), global_key = GRef, last_used_at = now() }), {{ok, Ref}, State}; @@ -222,25 +207,30 @@ open1(Path, Mode, Options, GRef, State) -> {{error, Reason}, State} end. +maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, at_eof = AtEoF, + offset = Offset }) -> + {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset), + Result = case NeedsSeek of + true -> file:position(Hdl, NewOffset); + false -> {ok, Offset} + end, + case Result of + {ok, Offset1} -> + {ok, Handle #handle { at_eof = AtEoF1, offset = Offset1 }}; + {error, _} = Error -> {Error, Handle} + end. + write_to_buffer(Data, Handle = #handle { hdl = Hdl, write_buffer_size_limit = 0 }) -> - {file:write(Hdl, Data), Handle #handle { last_used_at = now() }}; -write_to_buffer(Data, Handle = - #handle { write_buffer = WriteBuffer, - write_buffer_size = Size, - write_buffer_size_limit = infinity }) -> - {ok, Handle #handle { write_buffer_size = Size + size_of_write_data(Data), - write_buffer = [ Data | WriteBuffer ], - last_used_at = now() }}; + {file:write(Hdl, Data), Handle}; write_to_buffer(Data, Handle = #handle { write_buffer = WriteBuffer, write_buffer_size = Size, write_buffer_size_limit = Limit }) -> Size1 = Size + size_of_write_data(Data), Handle1 = Handle #handle { write_buffer = [ Data | WriteBuffer ], - write_buffer_size = Size1, - last_used_at = now() }, - case Size1 > Limit of + write_buffer_size = Size1 }, + case Limit /= infinity andalso Size1 > Limit of true -> write_buffer(Handle1); false -> {ok, Handle1} end. @@ -250,18 +240,14 @@ write_buffer(Handle = #handle { write_buffer = [] }) -> write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, write_buffer = WriteBuffer, write_buffer_size = DataSize, - is_append = IsAppend, at_eof = AtEoF }) -> + at_eof = true }) -> case file:write(Hdl, lists:reverse(WriteBuffer)) of ok -> - Offset1 = case IsAppend of - true -> Offset; - false -> Offset + DataSize - end, - AtEoF1 = AtEoF andalso not IsAppend, + Offset1 = Offset + DataSize, {ok, Handle #handle { offset = Offset1, write_buffer = [], - write_buffer_size = 0, at_eof = AtEoF1 }}; - {error, Reason} -> - {{error, Reason}, Handle} + write_buffer_size = 0 }}; + {error, _} = Error -> + {Error, Handle} end. size_of_write_data(Data) -> |
