summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-22 15:23:09 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-22 15:23:09 +0100
commit966a760267e30aa0d0c3a5b2a841fa9350238689 (patch)
treedcf6612dee2fbc6b44a53718fe5ca9b802b356f1 /src
parent283cd95333fb49eef6a9c60c38f561971b1b85b5 (diff)
downloadrabbitmq-server-git-966a760267e30aa0d0c3a5b2a841fa9350238689.tar.gz
not quite complete, but much much simpler, and prettier
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl226
1 files changed, 106 insertions, 120 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 933f8a833c..e86344a087 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -31,7 +31,7 @@
-module(file_handle_cache).
--export([init/0, open/4, close/2, release/2, read/4, write/4, sync/2,
+-export([init/0, open/4, close/2, release/2, read/4, append/3, sync/2,
position/3, truncate/2, last_sync_offset/2]).
-record(file,
@@ -48,7 +48,6 @@
write_buffer_size_limit,
write_buffer,
at_eof,
- is_append,
is_write,
is_read,
mode,
@@ -56,33 +55,41 @@
last_used_at
}).
+init() -> empty_state.
+
open(Path, Mode, Options, State) ->
- Path1 = filename:absname(Path),
- case get({Path1, fhc_path}) of
- {gref, GRef} ->
- #file { reader_count = RCount, has_writer = HasWriter }
- = File = get({GRef, fhc_file}),
- Mode1 = lists:usort(Mode),
- IsWriter = is_writer(Mode1) orelse is_appender(Mode),
- case IsWriter andalso HasWriter of
- true ->
- {{error, writer_exists}, State};
- false ->
- RCount1 = case is_reader(Mode1) of
- true -> RCount + 1;
- false -> RCount
- end,
- put({Path1, fhc_file},
- File #file { reader_count = RCount1,
- has_writer = HasWriter orelse IsWriter }),
- open1(Path1, Mode1, Options, GRef, State)
- end;
- undefined ->
- GRef = make_ref(),
- put({Path1, fhc_path}, {gref, GRef}),
- put({GRef, fhc_file},
- #file { reader_count = 0, has_writer = false, path = Path1 }),
- open(Path, Mode, Options, State)
+ case is_appender(Mode) of
+ true -> {{error, append_not_supported}, State};
+ false ->
+ Path1 = filename:absname(Path),
+ case get({Path1, fhc_path}) of
+ {gref, GRef} ->
+ #file { reader_count = RCount, has_writer = HasWriter }
+ = File = get({GRef, fhc_file}),
+ Mode1 = lists:usort(Mode),
+ IsWriter = is_writer(Mode1),
+ case IsWriter andalso HasWriter of
+ true ->
+ {{error, writer_exists}, State};
+ false ->
+ RCount1 = case is_reader(Mode1) of
+ true -> RCount + 1;
+ false -> RCount
+ end,
+ put({Path1, fhc_file},
+ File #file {
+ reader_count = RCount1,
+ has_writer = HasWriter orelse IsWriter }),
+ open1(Path1, Mode1, Options, GRef, State)
+ end;
+ undefined ->
+ GRef = make_ref(),
+ put({Path1, fhc_path}, {gref, GRef}),
+ put({GRef, fhc_file},
+ #file { reader_count = 0, has_writer = false,
+ path = Path1 }),
+ open(Path, Mode, Options, State)
+ end
end.
close(Ref, State) ->
@@ -90,11 +97,10 @@ close(Ref, State) ->
undefined -> {ok, State};
Handle ->
case write_buffer(Handle) of
- {ok, #handle { hdl = Hdl, mode = Mode, global_key = GRef }} ->
+ {ok, #handle { hdl = Hdl, global_key = GRef,
+ is_read = IsReader, is_write = IsWriter }} ->
ok = file:sync(Hdl),
ok = file:close(Hdl),
- IsReader = is_reader(Mode),
- IsWriter = is_writer(Mode),
#file { reader_count = RCount, has_writer = HasWriter,
path = Path } = File = get({GRef, fhc_file}),
RCount1 = case IsReader of
@@ -124,81 +130,61 @@ read(Ref, NewOffset, Count, State) ->
undefined -> {{error, not_open}, State};
#handle { is_read = false } -> {{error, not_open_for_reading}, State};
Handle ->
- case write_buffer(Handle) of
- {ok, Handle1 = #handle { hdl = Hdl, at_eof = AtEoF,
- offset = Offset }} ->
- {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset),
- {ok, Offset1} = case NeedsSeek of
- true -> file:position(Hdl, NewOffset);
- false -> {ok, Offset}
- end,
- Handle2 = Handle1 #handle { at_eof = AtEoF1,
- offset = Offset1,
- last_used_at = now() },
- {Handle3, Result} =
- case file:read(Hdl, Count) of
- {ok, Data} -> {Handle2, {ok, Data}};
- eof -> {Handle2 #handle { at_eof = true }, eof};
- {error, Reason} -> {Handle2, {error, Reason}}
- end,
- put({Ref, fhc_handle}, Handle3),
- {Result, State};
- {Error, Handle1} ->
- put({Ref, fhc_handle}, Handle1),
- {Error, State}
- end
+ {Result, Handle1} =
+ case write_buffer(Handle) of
+ {ok, Handle2} ->
+ case maybe_seek(NewOffset, Handle2) of
+ {ok, Handle3 = #handle { hdl = Hdl }} ->
+ case file:read(Hdl, Count) of
+ {ok, _} = Obj -> {Obj, Handle3};
+ eof -> {eof,
+ Handle3 #handle { at_eof = true }};
+ {error, _} = Error -> {Error, Handle3}
+ end;
+ {Error, Handle3} -> {Error, Handle3}
+ end;
+ {Error, Handle2} -> {Error, Handle2}
+ end,
+ put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }),
+ {Result, State}
end.
-write(Ref, NewOffset, Data, State) ->
+append(Ref, Data, State) ->
case get({Ref, fhc_handle}) of
undefined -> {{error, not_open}, State};
- Handle = #handle { is_append = true, is_write = false } ->
- {Result, Handle1} = write_to_buffer(Data, Handle),
- put({Ref, fhc_handle}, Handle1),
- {Result, State};
- Handle = #handle { is_append = false, is_write = true, at_eof = AtEoF,
- offset = Offset, write_buffer_size = BufSize } ->
- %% If we wrote the buffer out now, where would we end up?
- %% Note that if AtEoF == true then it would still be true
- %% after writing the buffer out, but if AtEoF == false,
- %% it's possible it should be true after writing the
- %% buffer out, but we won't know about it.
- VirtualOffset = Offset + BufSize,
- %% AtEoF1 says "after writing the buffer out, we will be
- %% at VirtualOffset. At that point, we travel to
- %% NewOffset. When we get there, will we be at eof?"
- {AtEoF1, NeedsSeek} = needs_seek(AtEoF, VirtualOffset, NewOffset),
- {Error, Handle1} =
- case NeedsSeek of
- %% Now if we don't seek, we don't write the buffer
- %% out. This means we'll still be at Offset, and
- %% AtEoF still applies. We need to add the data to
- %% the buffer and leave it at that.
- false -> {ok, Handle};
- %% If we do seek, then we write the buffer out,
- %% which means that AtEoF1 applies, because after
- %% writing the buffer out, we'll be at
- %% VirtualOffset, and then want to get to
- %% NewOffset.
- true ->
- case write_buffer(Handle) of
- {ok, Handle2 = #handle { hdl = Hdl }} ->
- {ok, Offset2} = file:position(Hdl, NewOffset),
- {ok, Handle2 #handle { offset = Offset2,
- at_eof = AtEoF1 }};
- {Error1, Handle2} -> {Error1, Offset, Handle2}
- end
+ Handle = #handle { is_write = true } ->
+ {Result, Handle1} =
+ case maybe_seek(eof, Handle) of
+ {ok, Handle2 = #handle { at_eof = true }} ->
+ write_to_buffer(Data, Handle2);
+ {{error, _} = Error, Handle2} ->
+ {Error, Handle2}
end,
- case Error of
- ok -> {Result, Handle3} = write_to_buffer(Data, Handle1),
- put({Ref, fhc_handle}, Handle3),
- {Result, State};
- _ -> put({Ref, fhc_handle}, Handle1),
- {Error, State}
- end;
+ put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }),
+ {Result, State};
_ -> {{error, not_open_for_writing}, State}
end.
+last_sync_offset(Ref, State) ->
+ case get({Ref, fhc_handle}) of
+ undefined -> {{error, not_open}, State};
+ #handle { trusted_offset = TrustedOffset } ->
+ {{ok, TrustedOffset}, State}
+ end.
+
+position(Ref, NewOffset, State) ->
+ case get({Ref, fhc_handle}) of
+ undefined -> {{error, not_open}, State};
+ Handle ->
+ {Result, Handle1} =
+ case write_buffer(Handle) of
+ {ok, Handle2} -> maybe_seek(NewOffset, Handle2);
+ {Error, Handle2} -> {Error, Handle2}
+ end,
+ put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }),
+ {Result, State}
+ end.
+
open1(Path, Mode, Options, GRef, State) ->
case file:open(Path, Mode) of
{ok, Hdl} ->
@@ -213,8 +199,7 @@ open1(Path, Mode, Options, GRef, State) ->
#handle { hdl = Hdl, offset = 0, trusted_offset = 0,
write_buffer_size = 0,
write_buffer_size_limit = WriteBufferSize,
- write_buffer = [], at_eof = false,
- is_append = is_appender(Mode), mode = Mode,
+ write_buffer = [], at_eof = false, mode = Mode,
is_write = is_writer(Mode), is_read = is_reader(Mode),
global_key = GRef, last_used_at = now() }),
{{ok, Ref}, State};
@@ -222,25 +207,30 @@ open1(Path, Mode, Options, GRef, State) ->
{{error, Reason}, State}
end.
+maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, at_eof = AtEoF,
+ offset = Offset }) ->
+ {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset),
+ Result = case NeedsSeek of
+ true -> file:position(Hdl, NewOffset);
+ false -> {ok, Offset}
+ end,
+ case Result of
+ {ok, Offset1} ->
+ {ok, Handle #handle { at_eof = AtEoF1, offset = Offset1 }};
+ {error, _} = Error -> {Error, Handle}
+ end.
+
write_to_buffer(Data, Handle = #handle { hdl = Hdl,
write_buffer_size_limit = 0 }) ->
- {file:write(Hdl, Data), Handle #handle { last_used_at = now() }};
-write_to_buffer(Data, Handle =
- #handle { write_buffer = WriteBuffer,
- write_buffer_size = Size,
- write_buffer_size_limit = infinity }) ->
- {ok, Handle #handle { write_buffer_size = Size + size_of_write_data(Data),
- write_buffer = [ Data | WriteBuffer ],
- last_used_at = now() }};
+ {file:write(Hdl, Data), Handle};
write_to_buffer(Data, Handle =
#handle { write_buffer = WriteBuffer,
write_buffer_size = Size,
write_buffer_size_limit = Limit }) ->
Size1 = Size + size_of_write_data(Data),
Handle1 = Handle #handle { write_buffer = [ Data | WriteBuffer ],
- write_buffer_size = Size1,
- last_used_at = now() },
- case Size1 > Limit of
+ write_buffer_size = Size1 },
+ case Limit /= infinity andalso Size1 > Limit of
true -> write_buffer(Handle1);
false -> {ok, Handle1}
end.
@@ -250,18 +240,14 @@ write_buffer(Handle = #handle { write_buffer = [] }) ->
write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
write_buffer = WriteBuffer,
write_buffer_size = DataSize,
- is_append = IsAppend, at_eof = AtEoF }) ->
+ at_eof = true }) ->
case file:write(Hdl, lists:reverse(WriteBuffer)) of
ok ->
- Offset1 = case IsAppend of
- true -> Offset;
- false -> Offset + DataSize
- end,
- AtEoF1 = AtEoF andalso not IsAppend,
+ Offset1 = Offset + DataSize,
{ok, Handle #handle { offset = Offset1, write_buffer = [],
- write_buffer_size = 0, at_eof = AtEoF1 }};
- {error, Reason} ->
- {{error, Reason}, Handle}
+ write_buffer_size = 0 }};
+ {error, _} = Error ->
+ {Error, Handle}
end.
size_of_write_data(Data) ->