summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-25 13:07:29 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-25 13:07:29 +0000
commit373e95a80cf90c3bb7993ea3e8c53e78171cfcc5 (patch)
treedf6e5e08ee06fbad4133b91981c26eaaefecb32a /src
parent47c5772a5a4e8dc15ac4f392c3c7877c8470025f (diff)
downloadrabbitmq-server-git-373e95a80cf90c3bb7993ea3e8c53e78171cfcc5.tar.gz
Corrected spec for open and inlined the internal versions. Also used the with_flushed_handles for truncate and position. It breaks sync if you do *both* the flushed form and then the obvious refactoring within it (i.e. pull the Handle1 up) and I can't quite see why
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl248
1 files changed, 121 insertions, 127 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index cf27817942..bbc09b8af8 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -90,7 +90,10 @@
-type(position() :: ('bof' | 'eof' | {'bof',integer()} | {'eof',integer()}
| {'cur',integer()} | integer())).
--spec(open/3 :: (string(), [any()], [any()]) -> ({'ok', ref()} | error())).
+-spec(open/3 ::
+ (string(), [any()],
+ [{'write_buffer', (non_neg_integer()|'infinity'|'unbuffered')}]) ->
+ ({'ok', ref()} | error())).
-spec(close/1 :: (ref()) -> ('ok' | error())).
-spec(read/2 :: (ref(), integer()) ->
({'ok', ([char()]|binary())} | eof | error())).
@@ -139,34 +142,135 @@ close(Ref) ->
end.
read(Ref, Count) ->
- with_flushed_handles([Ref], Count, fun internal_read/2).
+ with_flushed_handles(
+ [Ref], fun ([#handle { is_read = false }]) ->
+ {error, not_open_for_reading};
+ ([Handle = #handle { hdl = Hdl, offset = Offset }]) ->
+ case file:read(Hdl, Count) of
+ {ok, Data} = Obj ->
+ Size = iolist_size(Data),
+ {Obj, [Handle #handle { offset = Offset + Size }]};
+ eof ->
+ {eof, [Handle #handle { at_eof = true }]};
+ Error ->
+ {Error, [Handle]}
+ end
+ end).
append(Ref, Data) ->
- with_handles([Ref], Data, fun internal_append/2).
+ with_handles(
+ [Ref],
+ fun ([#handle { is_write = false }]) ->
+ {error, not_open_for_writing};
+ ([Handle]) ->
+ case maybe_seek(eof, Handle) of
+ {{ok, _Offset}, #handle { hdl = Hdl, offset = Offset,
+ write_buffer_size_limit = 0,
+ at_eof = true } = Handle1} ->
+ Offset1 = Offset + iolist_size(Data),
+ {file:write(Hdl, Data),
+ [Handle1 #handle { is_dirty = true, offset = Offset1 }]};
+ {{ok, _Offset}, #handle { write_buffer = WriteBuffer,
+ write_buffer_size = Size,
+ write_buffer_size_limit = Limit,
+ at_eof = true } = Handle1} ->
+ Size1 = Size + iolist_size(Data),
+ Handle2 = Handle1 #handle { write_buffer = [ Data | WriteBuffer ],
+ write_buffer_size = Size1 },
+ case Limit /= infinity andalso Size1 > Limit of
+ true -> {Result, Handle3} = write_buffer(Handle2),
+ {Result, [Handle3]};
+ false -> {ok, [Handle2]}
+ end;
+ {{error, _} = Error, Handle1} ->
+ {Error, [Handle1]}
+ end
+ end).
sync(Ref) ->
- with_handles([Ref], ok, fun internal_sync/2).
+ with_handles(
+ [Ref],
+ fun ([#handle { is_dirty = false, write_buffer = [] }]) ->
+ ok;
+ ([Handle]) ->
+ %% write_buffer will set is_dirty, or leave it set if buffer empty
+ case write_buffer(Handle) of
+ {ok, Handle1 = #handle { hdl = Hdl, offset = Offset,
+ is_dirty = true }} ->
+ case file:sync(Hdl) of
+ ok ->
+ {ok, [Handle1 #handle { trusted_offset = Offset,
+ is_dirty = false }]};
+ Error ->
+ {Error, [Handle1]}
+ end;
+ {Error, Handle1} ->
+ {Error, [Handle1]}
+ end
+ end).
position(Ref, NewOffset) ->
- with_handles([Ref], NewOffset, fun internal_position/2).
+ with_flushed_handles(
+ [Ref], fun ([Handle]) ->
+ {Result, Handle1} = maybe_seek(NewOffset, Handle),
+ {Result, [Handle1]}
+ end).
truncate(Ref) ->
- with_handles([Ref], ok, fun internal_truncate/2).
+ with_flushed_handles(
+ [Ref],
+ fun ([Handle1 = #handle { hdl = Hdl, offset = Offset,
+ trusted_offset = TrustedOffset }]) ->
+ case file:truncate(Hdl) of
+ ok ->
+ {ok, [Handle1 #handle {
+ at_eof = true,
+ trusted_offset = lists:min([Offset,
+ TrustedOffset])
+ }]};
+ Error ->
+ {Error, [Handle1]}
+ end
+ end).
last_sync_offset(Ref) ->
- with_handles([Ref], ok, fun internal_last_sync_offset/2).
+ with_handles([Ref], fun ([#handle { trusted_offset = TrustedOffset }]) ->
+ {ok, TrustedOffset}
+ end).
current_virtual_offset(Ref) ->
- with_handles([Ref], ok, fun internal_current_virtual_offset/2).
+ with_handles([Ref],
+ fun ([#handle { at_eof = true, is_write = true,
+ offset = Offset,
+ write_buffer_size = Size }]) ->
+ {ok, Offset + Size};
+ ([#handle { offset = Offset }]) ->
+ {ok, Offset}
+ end).
current_raw_offset(Ref) ->
- with_handles([Ref], ok, fun internal_current_raw_offset/2).
+ with_handles([Ref], fun ([Handle]) -> {ok, Handle #handle.offset} end).
append_write_buffer(Ref) ->
- with_flushed_handles([Ref], ok, fun internal_append_write_buffer/2).
+ with_flushed_handles([Ref], fun ([Handle]) -> {ok, [Handle]} end).
copy(Src, Dest, Count) ->
- with_flushed_handles([Src, Dest], Count, fun internal_copy/2).
+ with_flushed_handles(
+ [Src, Dest],
+ fun ([SHandle = #handle { is_read = true, hdl = SHdl, offset = SOffset },
+ DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }]
+ ) ->
+ case file:copy(SHdl, DHdl, Count) of
+ {ok, Count1} = Result1 ->
+ {Result1,
+ [SHandle #handle { offset = SOffset + Count1 },
+ DHandle #handle { offset = DOffset + Count1 }]};
+ Error ->
+ {Error, [SHandle, DHandle]}
+ end;
+ (_Handles) ->
+ {error, incorrect_handle_modes}
+ end).
set_maximum_since_use(MaximumAge) ->
Now = now(),
@@ -201,116 +305,6 @@ increment() ->
%% Internal versions for the above
%%----------------------------------------------------------------------------
-internal_read(_Count, [#handle { is_read = false }]) ->
- {error, not_open_for_reading};
-internal_read(Count, [Handle = #handle { hdl = Hdl, offset = Offset }]) ->
- case file:read(Hdl, Count) of
- {ok, Data} = Obj ->
- Size = iolist_size(Data),
- {Obj, [Handle #handle { offset = Offset + Size }]};
- eof ->
- {eof, [Handle #handle { at_eof = true }]};
- Error ->
- {Error, [Handle]}
- end.
-
-internal_append(_Data, [#handle { is_write = false }]) ->
- {error, not_open_for_writing};
-internal_append(Data, [Handle]) ->
- case maybe_seek(eof, Handle) of
- {{ok, _Offset}, Handle1 = #handle { hdl = Hdl, offset = Offset,
- write_buffer_size_limit = 0,
- at_eof = true }} ->
- Offset1 = Offset + iolist_size(Data),
- {file:write(Hdl, Data),
- [Handle1 #handle { is_dirty = true, offset = Offset1 }]};
- {{ok, _Offset}, Handle1 = #handle { write_buffer = WriteBuffer,
- write_buffer_size = Size,
- write_buffer_size_limit = Limit,
- at_eof = true }} ->
- Size1 = Size + iolist_size(Data),
- Handle2 = Handle1 #handle { write_buffer = [ Data | WriteBuffer ],
- write_buffer_size = Size1 },
- case Limit /= infinity andalso Size1 > Limit of
- true -> {Result, Handle3} = write_buffer(Handle2),
- {Result, [Handle3]};
- false -> {ok, [Handle2]}
- end;
- {{error, _} = Error, Handle1} ->
- {Error, [Handle1]}
- end.
-
-internal_sync(ok, [#handle { is_dirty = false, write_buffer = [] }]) ->
- ok;
-internal_sync(ok, [Handle]) ->
- %% write_buffer will set is_dirty, or leave it set if buffer empty
- case write_buffer(Handle) of
- {ok, Handle1 = #handle {hdl = Hdl, offset = Offset, is_dirty = true}} ->
- case file:sync(Hdl) of
- ok -> {ok, [Handle1 #handle { trusted_offset = Offset,
- is_dirty = false }]};
- Error -> {Error, [Handle1]}
- end;
- {Error, Handle1} ->
- {Error, [Handle1]}
- end.
-
-internal_position(NewOffset, [Handle]) ->
- case write_buffer(Handle) of
- {ok, Handle1} -> {Result, Handle2} = maybe_seek(NewOffset, Handle1),
- {Result, [Handle2]};
- {Error, Handle1} -> {Error, [Handle1]}
- end.
-
-internal_truncate(ok, [#handle { is_write = false }]) ->
- {error, not_open_for_writing};
-internal_truncate(ok, [Handle]) ->
- case write_buffer(Handle) of
- {ok, Handle1 = #handle { hdl = Hdl, offset = Offset,
- trusted_offset = TrustedOffset }} ->
- case file:truncate(Hdl) of
- ok ->
- {ok, [Handle1 #handle {
- at_eof = true,
- trusted_offset = lists:min([Offset, TrustedOffset])
- }]};
- Error ->
- {Error, [Handle1]}
- end;
- {Error, Handle1} ->
- {Error, Handle1}
- end.
-
-internal_last_sync_offset(ok, [#handle { trusted_offset = TrustedOffset }]) ->
- {ok, TrustedOffset}.
-
-internal_current_virtual_offset(ok, [#handle { at_eof = true, is_write = true,
- offset = Offset,
- write_buffer_size = Size }]) ->
- {ok, Offset + Size};
-internal_current_virtual_offset(ok, [#handle { offset = Offset }]) ->
- {ok, Offset}.
-
-internal_current_raw_offset(ok, [#handle { offset = Offset }]) ->
- {ok, Offset}.
-
-internal_append_write_buffer(ok, [Handle]) ->
- {ok, [Handle]}.
-
-internal_copy(Count, [SHandle = #handle { is_read = true, hdl = SHdl,
- offset = SOffset },
- DHandle = #handle { is_write = true, hdl = DHdl,
- offset = DOffset }]) ->
- case file:copy(SHdl, DHdl, Count) of
- {ok, Count1} = Result1 ->
- {Result1,
- [SHandle #handle { offset = SOffset + Count1 },
- DHandle #handle { offset = DOffset + Count1 }]};
- Error ->
- {Error, [SHandle, DHandle]}
- end;
-internal_copy(_Count, _Handles) ->
- {error, incorrect_handle_modes}.
%%----------------------------------------------------------------------------
%% Internal functions
@@ -334,7 +328,7 @@ report_eldest() ->
end),
ok.
-with_handles(Refs, Args, Fun) ->
+with_handles(Refs, Fun) ->
ResHandles = lists:foldl(
fun (Ref, {ok, HandlesAcc}) ->
case get_or_reopen(Ref) of
@@ -346,7 +340,7 @@ with_handles(Refs, Args, Fun) ->
end, {ok, []}, Refs),
case ResHandles of
{ok, Handles} ->
- case erlang:apply(Fun, [Args, lists:reverse(Handles)]) of
+ case erlang:apply(Fun, [lists:reverse(Handles)]) of
{Result, Handles1} when is_list(Handles1) ->
lists:zipwith(fun put_handle/2, Refs, Handles1),
Result;
@@ -357,10 +351,10 @@ with_handles(Refs, Args, Fun) ->
Error
end.
-with_flushed_handles(Refs, Args, Fun) ->
+with_flushed_handles(Refs, Fun) ->
with_handles(
- Refs, Args,
- fun (Args1, Handles) ->
+ Refs,
+ fun (Handles) ->
case lists:foldl(
fun (Handle, {ok, HandlesAcc}) ->
{Res, Handle1} = write_buffer(Handle),
@@ -369,7 +363,7 @@ with_flushed_handles(Refs, Args, Fun) ->
{Error, [Handle | HandlesAcc]}
end, {ok, []}, Handles) of
{ok, Handles1} ->
- erlang:apply(Fun, [Args1, lists:reverse(Handles1)]);
+ erlang:apply(Fun, [lists:reverse(Handles1)]);
{Error, Handles1} ->
{Error, lists:reverse(Handles1)}
end