diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-11-25 13:07:29 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-11-25 13:07:29 +0000 |
| commit | 373e95a80cf90c3bb7993ea3e8c53e78171cfcc5 (patch) | |
| tree | df6e5e08ee06fbad4133b91981c26eaaefecb32a /src | |
| parent | 47c5772a5a4e8dc15ac4f392c3c7877c8470025f (diff) | |
| download | rabbitmq-server-git-373e95a80cf90c3bb7993ea3e8c53e78171cfcc5.tar.gz | |
Corrected spec for open and inlined the internal versions. Also used the with_flushed_handles for truncate and position. It breaks sync if you do *both* the flushed form and then the obvious refactoring within it (i.e. pull the Handle1 up) and I can't quite see why
Diffstat (limited to 'src')
| -rw-r--r-- | src/file_handle_cache.erl | 248 |
1 files changed, 121 insertions, 127 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index cf27817942..bbc09b8af8 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -90,7 +90,10 @@ -type(position() :: ('bof' | 'eof' | {'bof',integer()} | {'eof',integer()} | {'cur',integer()} | integer())). --spec(open/3 :: (string(), [any()], [any()]) -> ({'ok', ref()} | error())). +-spec(open/3 :: + (string(), [any()], + [{'write_buffer', (non_neg_integer()|'infinity'|'unbuffered')}]) -> + ({'ok', ref()} | error())). -spec(close/1 :: (ref()) -> ('ok' | error())). -spec(read/2 :: (ref(), integer()) -> ({'ok', ([char()]|binary())} | eof | error())). @@ -139,34 +142,135 @@ close(Ref) -> end. read(Ref, Count) -> - with_flushed_handles([Ref], Count, fun internal_read/2). + with_flushed_handles( + [Ref], fun ([#handle { is_read = false }]) -> + {error, not_open_for_reading}; + ([Handle = #handle { hdl = Hdl, offset = Offset }]) -> + case file:read(Hdl, Count) of + {ok, Data} = Obj -> + Size = iolist_size(Data), + {Obj, [Handle #handle { offset = Offset + Size }]}; + eof -> + {eof, [Handle #handle { at_eof = true }]}; + Error -> + {Error, [Handle]} + end + end). append(Ref, Data) -> - with_handles([Ref], Data, fun internal_append/2). + with_handles( + [Ref], + fun ([#handle { is_write = false }]) -> + {error, not_open_for_writing}; + ([Handle]) -> + case maybe_seek(eof, Handle) of + {{ok, _Offset}, #handle { hdl = Hdl, offset = Offset, + write_buffer_size_limit = 0, + at_eof = true } = Handle1} -> + Offset1 = Offset + iolist_size(Data), + {file:write(Hdl, Data), + [Handle1 #handle { is_dirty = true, offset = Offset1 }]}; + {{ok, _Offset}, #handle { write_buffer = WriteBuffer, + write_buffer_size = Size, + write_buffer_size_limit = Limit, + at_eof = true } = Handle1} -> + Size1 = Size + iolist_size(Data), + Handle2 = Handle1 #handle { write_buffer = [ Data | WriteBuffer ], + write_buffer_size = Size1 }, + case Limit /= infinity andalso Size1 > Limit of + true -> {Result, Handle3} = write_buffer(Handle2), + {Result, [Handle3]}; + false -> {ok, [Handle2]} + end; + {{error, _} = Error, Handle1} -> + {Error, [Handle1]} + end + end). sync(Ref) -> - with_handles([Ref], ok, fun internal_sync/2). + with_handles( + [Ref], + fun ([#handle { is_dirty = false, write_buffer = [] }]) -> + ok; + ([Handle]) -> + %% write_buffer will set is_dirty, or leave it set if buffer empty + case write_buffer(Handle) of + {ok, Handle1 = #handle { hdl = Hdl, offset = Offset, + is_dirty = true }} -> + case file:sync(Hdl) of + ok -> + {ok, [Handle1 #handle { trusted_offset = Offset, + is_dirty = false }]}; + Error -> + {Error, [Handle1]} + end; + {Error, Handle1} -> + {Error, [Handle1]} + end + end). position(Ref, NewOffset) -> - with_handles([Ref], NewOffset, fun internal_position/2). + with_flushed_handles( + [Ref], fun ([Handle]) -> + {Result, Handle1} = maybe_seek(NewOffset, Handle), + {Result, [Handle1]} + end). truncate(Ref) -> - with_handles([Ref], ok, fun internal_truncate/2). + with_flushed_handles( + [Ref], + fun ([Handle1 = #handle { hdl = Hdl, offset = Offset, + trusted_offset = TrustedOffset }]) -> + case file:truncate(Hdl) of + ok -> + {ok, [Handle1 #handle { + at_eof = true, + trusted_offset = lists:min([Offset, + TrustedOffset]) + }]}; + Error -> + {Error, [Handle1]} + end + end). last_sync_offset(Ref) -> - with_handles([Ref], ok, fun internal_last_sync_offset/2). + with_handles([Ref], fun ([#handle { trusted_offset = TrustedOffset }]) -> + {ok, TrustedOffset} + end). current_virtual_offset(Ref) -> - with_handles([Ref], ok, fun internal_current_virtual_offset/2). + with_handles([Ref], + fun ([#handle { at_eof = true, is_write = true, + offset = Offset, + write_buffer_size = Size }]) -> + {ok, Offset + Size}; + ([#handle { offset = Offset }]) -> + {ok, Offset} + end). current_raw_offset(Ref) -> - with_handles([Ref], ok, fun internal_current_raw_offset/2). + with_handles([Ref], fun ([Handle]) -> {ok, Handle #handle.offset} end). append_write_buffer(Ref) -> - with_flushed_handles([Ref], ok, fun internal_append_write_buffer/2). + with_flushed_handles([Ref], fun ([Handle]) -> {ok, [Handle]} end). copy(Src, Dest, Count) -> - with_flushed_handles([Src, Dest], Count, fun internal_copy/2). + with_flushed_handles( + [Src, Dest], + fun ([SHandle = #handle { is_read = true, hdl = SHdl, offset = SOffset }, + DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }] + ) -> + case file:copy(SHdl, DHdl, Count) of + {ok, Count1} = Result1 -> + {Result1, + [SHandle #handle { offset = SOffset + Count1 }, + DHandle #handle { offset = DOffset + Count1 }]}; + Error -> + {Error, [SHandle, DHandle]} + end; + (_Handles) -> + {error, incorrect_handle_modes} + end). set_maximum_since_use(MaximumAge) -> Now = now(), @@ -201,116 +305,6 @@ increment() -> %% Internal versions for the above %%---------------------------------------------------------------------------- -internal_read(_Count, [#handle { is_read = false }]) -> - {error, not_open_for_reading}; -internal_read(Count, [Handle = #handle { hdl = Hdl, offset = Offset }]) -> - case file:read(Hdl, Count) of - {ok, Data} = Obj -> - Size = iolist_size(Data), - {Obj, [Handle #handle { offset = Offset + Size }]}; - eof -> - {eof, [Handle #handle { at_eof = true }]}; - Error -> - {Error, [Handle]} - end. - -internal_append(_Data, [#handle { is_write = false }]) -> - {error, not_open_for_writing}; -internal_append(Data, [Handle]) -> - case maybe_seek(eof, Handle) of - {{ok, _Offset}, Handle1 = #handle { hdl = Hdl, offset = Offset, - write_buffer_size_limit = 0, - at_eof = true }} -> - Offset1 = Offset + iolist_size(Data), - {file:write(Hdl, Data), - [Handle1 #handle { is_dirty = true, offset = Offset1 }]}; - {{ok, _Offset}, Handle1 = #handle { write_buffer = WriteBuffer, - write_buffer_size = Size, - write_buffer_size_limit = Limit, - at_eof = true }} -> - Size1 = Size + iolist_size(Data), - Handle2 = Handle1 #handle { write_buffer = [ Data | WriteBuffer ], - write_buffer_size = Size1 }, - case Limit /= infinity andalso Size1 > Limit of - true -> {Result, Handle3} = write_buffer(Handle2), - {Result, [Handle3]}; - false -> {ok, [Handle2]} - end; - {{error, _} = Error, Handle1} -> - {Error, [Handle1]} - end. - -internal_sync(ok, [#handle { is_dirty = false, write_buffer = [] }]) -> - ok; -internal_sync(ok, [Handle]) -> - %% write_buffer will set is_dirty, or leave it set if buffer empty - case write_buffer(Handle) of - {ok, Handle1 = #handle {hdl = Hdl, offset = Offset, is_dirty = true}} -> - case file:sync(Hdl) of - ok -> {ok, [Handle1 #handle { trusted_offset = Offset, - is_dirty = false }]}; - Error -> {Error, [Handle1]} - end; - {Error, Handle1} -> - {Error, [Handle1]} - end. - -internal_position(NewOffset, [Handle]) -> - case write_buffer(Handle) of - {ok, Handle1} -> {Result, Handle2} = maybe_seek(NewOffset, Handle1), - {Result, [Handle2]}; - {Error, Handle1} -> {Error, [Handle1]} - end. - -internal_truncate(ok, [#handle { is_write = false }]) -> - {error, not_open_for_writing}; -internal_truncate(ok, [Handle]) -> - case write_buffer(Handle) of - {ok, Handle1 = #handle { hdl = Hdl, offset = Offset, - trusted_offset = TrustedOffset }} -> - case file:truncate(Hdl) of - ok -> - {ok, [Handle1 #handle { - at_eof = true, - trusted_offset = lists:min([Offset, TrustedOffset]) - }]}; - Error -> - {Error, [Handle1]} - end; - {Error, Handle1} -> - {Error, Handle1} - end. - -internal_last_sync_offset(ok, [#handle { trusted_offset = TrustedOffset }]) -> - {ok, TrustedOffset}. - -internal_current_virtual_offset(ok, [#handle { at_eof = true, is_write = true, - offset = Offset, - write_buffer_size = Size }]) -> - {ok, Offset + Size}; -internal_current_virtual_offset(ok, [#handle { offset = Offset }]) -> - {ok, Offset}. - -internal_current_raw_offset(ok, [#handle { offset = Offset }]) -> - {ok, Offset}. - -internal_append_write_buffer(ok, [Handle]) -> - {ok, [Handle]}. - -internal_copy(Count, [SHandle = #handle { is_read = true, hdl = SHdl, - offset = SOffset }, - DHandle = #handle { is_write = true, hdl = DHdl, - offset = DOffset }]) -> - case file:copy(SHdl, DHdl, Count) of - {ok, Count1} = Result1 -> - {Result1, - [SHandle #handle { offset = SOffset + Count1 }, - DHandle #handle { offset = DOffset + Count1 }]}; - Error -> - {Error, [SHandle, DHandle]} - end; -internal_copy(_Count, _Handles) -> - {error, incorrect_handle_modes}. %%---------------------------------------------------------------------------- %% Internal functions @@ -334,7 +328,7 @@ report_eldest() -> end), ok. -with_handles(Refs, Args, Fun) -> +with_handles(Refs, Fun) -> ResHandles = lists:foldl( fun (Ref, {ok, HandlesAcc}) -> case get_or_reopen(Ref) of @@ -346,7 +340,7 @@ with_handles(Refs, Args, Fun) -> end, {ok, []}, Refs), case ResHandles of {ok, Handles} -> - case erlang:apply(Fun, [Args, lists:reverse(Handles)]) of + case erlang:apply(Fun, [lists:reverse(Handles)]) of {Result, Handles1} when is_list(Handles1) -> lists:zipwith(fun put_handle/2, Refs, Handles1), Result; @@ -357,10 +351,10 @@ with_handles(Refs, Args, Fun) -> Error end. -with_flushed_handles(Refs, Args, Fun) -> +with_flushed_handles(Refs, Fun) -> with_handles( - Refs, Args, - fun (Args1, Handles) -> + Refs, + fun (Handles) -> case lists:foldl( fun (Handle, {ok, HandlesAcc}) -> {Res, Handle1} = write_buffer(Handle), @@ -369,7 +363,7 @@ with_flushed_handles(Refs, Args, Fun) -> {Error, [Handle | HandlesAcc]} end, {ok, []}, Handles) of {ok, Handles1} -> - erlang:apply(Fun, [Args1, lists:reverse(Handles1)]); + erlang:apply(Fun, [lists:reverse(Handles1)]); {Error, Handles1} -> {Error, lists:reverse(Handles1)} end |
