summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-22 17:04:51 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-22 17:04:51 +0100
commit91edec707b01d130854686ad2de589ec01175f2e (patch)
tree336d941b508806814699b26a510402bd13c0f38e /src
parent7d78971a0eef645b34905cd681e48012ce7cf07d (diff)
downloadrabbitmq-server-git-91edec707b01d130854686ad2de589ec01175f2e.tar.gz
starting to deal with the idea that fhs may disappear because of hitting limits
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl99
1 files changed, 61 insertions, 38 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 33a69ed78f..6959dcc27f 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -51,6 +51,7 @@
is_write,
is_read,
mode,
+ options,
global_key,
last_used_at
}).
@@ -80,7 +81,12 @@ open(Path, Mode, Options, State) ->
File #file {
reader_count = RCount1,
has_writer = HasWriter orelse IsWriter }),
- open1(Path1, Mode1, Options, GRef, State)
+ Ref = make_ref(),
+ case open1(Path1, Mode1, Options, Ref, GRef, State)
+ of
+ {{ok, _Handle}, State} -> {{ok, Ref}, State};
+ {Error, State} -> {Error, State}
+ end
end;
undefined ->
GRef = make_ref(),
@@ -99,8 +105,11 @@ close(Ref, State) ->
case write_buffer(Handle) of
{ok, #handle { hdl = Hdl, global_key = GRef,
is_read = IsReader, is_write = IsWriter }} ->
- ok = file:sync(Hdl),
- ok = file:close(Hdl),
+ case Hdl of
+ closed -> ok;
+ _ -> ok = file:sync(Hdl),
+ ok = file:close(Hdl)
+ end,
#file { reader_count = RCount, has_writer = HasWriter,
path = Path } = File = get({GRef, fhc_file}),
RCount1 = case IsReader of
@@ -126,10 +135,10 @@ release(_Ref, State) -> %% noop just for now
{ok, State}.
read(Ref, NewOffset, Count, State) ->
- case get({Ref, fhc_handle}) of
- undefined -> {{error, not_open}, State};
- #handle { is_read = false } -> {{error, not_open_for_reading}, State};
- Handle ->
+ case get_or_reopen(Ref, State) of
+ {{ok, #handle { is_read = false }}, State1} ->
+ {{error, not_open_for_reading}, State1};
+ {{ok, Handle}, State1} ->
{Result, Handle1} =
case write_buffer(Handle) of
{ok, Handle2} ->
@@ -151,13 +160,15 @@ read(Ref, NewOffset, Count, State) ->
{Error, Handle2} -> {Error, Handle2}
end,
put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }),
- {Result, State}
+ {Result, State1};
+ ErrorAndState -> ErrorAndState
end.
append(Ref, Data, State) ->
- case get({Ref, fhc_handle}) of
- undefined -> {{error, not_open}, State};
- Handle = #handle { is_write = true } ->
+ case get_or_reopen(Ref, State) of
+ {{ok, #handle { is_write = false }}, State1} ->
+ {{error, not_open_for_writing}, State1};
+ {{ok, Handle}, State1} ->
{Result, Handle1} =
case maybe_seek(eof, Handle) of
{ok, Handle2 = #handle { at_eof = true }} ->
@@ -166,52 +177,54 @@ append(Ref, Data, State) ->
{Error, Handle2}
end,
put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }),
- {Result, State};
- _ -> {{error, not_open_for_writing}, State}
+ {Result, State1};
+ ErrorAndState -> ErrorAndState
end.
last_sync_offset(Ref, State) ->
- case get({Ref, fhc_handle}) of
- undefined -> {{error, not_open}, State};
- #handle { trusted_offset = TrustedOffset } ->
- {{ok, TrustedOffset}, State}
+ case get_or_reopen(Ref, State) of
+ {{ok, #handle { trusted_offset = TrustedOffset }}, State1} ->
+ {{ok, TrustedOffset}, State1};
+ ErrorAndState -> ErrorAndState
end.
position(Ref, NewOffset, State) ->
- case get({Ref, fhc_handle}) of
- undefined -> {{error, not_open}, State};
- Handle ->
+ case get_or_reopen(Ref, State) of
+ {{ok, Handle}, State1} ->
{Result, Handle1} =
case write_buffer(Handle) of
{ok, Handle2} -> maybe_seek(NewOffset, Handle2);
{Error, Handle2} -> {Error, Handle2}
end,
put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }),
- {Result, State}
+ {Result, State1};
+ ErrorAndState -> ErrorAndState
end.
sync(Ref, State) ->
- case get({Ref, fhc_handle}) of
- undefined -> {{error, not_open}, State};
- Handle = #handle { write_buffer = [], hdl = Hdl, offset = Offset } ->
+ case get_or_reopen(Ref, State) of
+ {{ok, Handle = #handle { write_buffer = [], hdl = Hdl,
+ offset = Offset }}, State1} ->
{Result, Handle1} =
case file:sync(Hdl) of
ok -> {ok, Handle #handle { trusted_offset = Offset }};
Error -> {Error, Handle}
end,
put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }),
- {Result, State};
- Handle = #handle { at_eof = true } ->
+ {Result, State1};
+ {{ok, Handle = #handle { at_eof = true }}, State1} ->
%% we can't have content in the buffer without being at eof
{Result, Handle1} = write_buffer(Handle),
put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }),
- {Result, State}
+ {Result, State1};
+ ErrorAndState -> ErrorAndState
end.
truncate(Ref, State) ->
- case get({Ref, fhc_handle}) of
- undefined -> {{error, not_open}, State};
- Handle = #handle { is_write = true } ->
+ case get_or_reopen(Ref, State) of
+ {{ok, #handle { is_write = false }}, State1} ->
+ {{error, not_open_for_writing}, State1};
+ {{ok, Handle}, State1} ->
{Result, Handle1} =
case write_buffer(Handle) of
{ok,
@@ -230,11 +243,21 @@ truncate(Ref, State) ->
{Error, Handle2} -> {Error, Handle2}
end,
put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now () }),
- {Result, State};
- _Handle -> {{error, not_open_for_writing}, State}
+ {Result, State1};
+ ErrorAndState -> ErrorAndState
+ end.
+
+get_or_reopen(Ref, State) ->
+ case get({Ref, fhc_handle}) of
+ undefined -> {{error, not_open}, State};
+ #handle { hdl = closed, mode = Mode, global_key = GRef,
+ options = Options } ->
+ #file { path = Path } = get({GRef, fhc_file}),
+ open1(Path, Mode, Options, Ref, GRef, State);
+ Handle -> {{ok, Handle}, State}
end.
-open1(Path, Mode, Options, GRef, State) ->
+open1(Path, Mode, Options, Ref, GRef, State) ->
case file:open(Path, Mode) of
{ok, Hdl} ->
WriteBufferSize =
@@ -243,15 +266,15 @@ open1(Path, Mode, Options, GRef, State) ->
infinity -> infinity;
N when is_integer(N) -> N
end,
- Ref = make_ref(),
- put({Ref, fhc_handle},
+ Handle =
#handle { hdl = Hdl, offset = 0, trusted_offset = 0,
- write_buffer_size = 0,
+ write_buffer_size = 0, options = Options,
write_buffer_size_limit = WriteBufferSize,
write_buffer = [], at_eof = false, mode = Mode,
is_write = is_writer(Mode), is_read = is_reader(Mode),
- global_key = GRef, last_used_at = now() }),
- {{ok, Ref}, State};
+ global_key = GRef, last_used_at = now() },
+ put({Ref, fhc_handle}, Handle),
+ {{ok, Handle}, State};
{error, Reason} ->
{{error, Reason}, State}
end.