diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-22 17:04:51 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-22 17:04:51 +0100 |
| commit | 91edec707b01d130854686ad2de589ec01175f2e (patch) | |
| tree | 336d941b508806814699b26a510402bd13c0f38e /src | |
| parent | 7d78971a0eef645b34905cd681e48012ce7cf07d (diff) | |
| download | rabbitmq-server-git-91edec707b01d130854686ad2de589ec01175f2e.tar.gz | |
starting to deal with the idea that fhs may disappear because of hitting limits
Diffstat (limited to 'src')
| -rw-r--r-- | src/file_handle_cache.erl | 99 |
1 files changed, 61 insertions, 38 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 33a69ed78f..6959dcc27f 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -51,6 +51,7 @@ is_write, is_read, mode, + options, global_key, last_used_at }). @@ -80,7 +81,12 @@ open(Path, Mode, Options, State) -> File #file { reader_count = RCount1, has_writer = HasWriter orelse IsWriter }), - open1(Path1, Mode1, Options, GRef, State) + Ref = make_ref(), + case open1(Path1, Mode1, Options, Ref, GRef, State) + of + {{ok, _Handle}, State} -> {{ok, Ref}, State}; + {Error, State} -> {Error, State} + end end; undefined -> GRef = make_ref(), @@ -99,8 +105,11 @@ close(Ref, State) -> case write_buffer(Handle) of {ok, #handle { hdl = Hdl, global_key = GRef, is_read = IsReader, is_write = IsWriter }} -> - ok = file:sync(Hdl), - ok = file:close(Hdl), + case Hdl of + closed -> ok; + _ -> ok = file:sync(Hdl), + ok = file:close(Hdl) + end, #file { reader_count = RCount, has_writer = HasWriter, path = Path } = File = get({GRef, fhc_file}), RCount1 = case IsReader of @@ -126,10 +135,10 @@ release(_Ref, State) -> %% noop just for now {ok, State}. read(Ref, NewOffset, Count, State) -> - case get({Ref, fhc_handle}) of - undefined -> {{error, not_open}, State}; - #handle { is_read = false } -> {{error, not_open_for_reading}, State}; - Handle -> + case get_or_reopen(Ref, State) of + {{ok, #handle { is_read = false }}, State1} -> + {{error, not_open_for_reading}, State1}; + {{ok, Handle}, State1} -> {Result, Handle1} = case write_buffer(Handle) of {ok, Handle2} -> @@ -151,13 +160,15 @@ read(Ref, NewOffset, Count, State) -> {Error, Handle2} -> {Error, Handle2} end, put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }), - {Result, State} + {Result, State1}; + ErrorAndState -> ErrorAndState end. append(Ref, Data, State) -> - case get({Ref, fhc_handle}) of - undefined -> {{error, not_open}, State}; - Handle = #handle { is_write = true } -> + case get_or_reopen(Ref, State) of + {{ok, #handle { is_write = false }}, State1} -> + {{error, not_open_for_writing}, State1}; + {{ok, Handle}, State1} -> {Result, Handle1} = case maybe_seek(eof, Handle) of {ok, Handle2 = #handle { at_eof = true }} -> @@ -166,52 +177,54 @@ append(Ref, Data, State) -> {Error, Handle2} end, put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }), - {Result, State}; - _ -> {{error, not_open_for_writing}, State} + {Result, State1}; + ErrorAndState -> ErrorAndState end. last_sync_offset(Ref, State) -> - case get({Ref, fhc_handle}) of - undefined -> {{error, not_open}, State}; - #handle { trusted_offset = TrustedOffset } -> - {{ok, TrustedOffset}, State} + case get_or_reopen(Ref, State) of + {{ok, #handle { trusted_offset = TrustedOffset }}, State1} -> + {{ok, TrustedOffset}, State1}; + ErrorAndState -> ErrorAndState end. position(Ref, NewOffset, State) -> - case get({Ref, fhc_handle}) of - undefined -> {{error, not_open}, State}; - Handle -> + case get_or_reopen(Ref, State) of + {{ok, Handle}, State1} -> {Result, Handle1} = case write_buffer(Handle) of {ok, Handle2} -> maybe_seek(NewOffset, Handle2); {Error, Handle2} -> {Error, Handle2} end, put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }), - {Result, State} + {Result, State1}; + ErrorAndState -> ErrorAndState end. sync(Ref, State) -> - case get({Ref, fhc_handle}) of - undefined -> {{error, not_open}, State}; - Handle = #handle { write_buffer = [], hdl = Hdl, offset = Offset } -> + case get_or_reopen(Ref, State) of + {{ok, Handle = #handle { write_buffer = [], hdl = Hdl, + offset = Offset }}, State1} -> {Result, Handle1} = case file:sync(Hdl) of ok -> {ok, Handle #handle { trusted_offset = Offset }}; Error -> {Error, Handle} end, put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }), - {Result, State}; - Handle = #handle { at_eof = true } -> + {Result, State1}; + {{ok, Handle = #handle { at_eof = true }}, State1} -> %% we can't have content in the buffer without being at eof {Result, Handle1} = write_buffer(Handle), put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }), - {Result, State} + {Result, State1}; + ErrorAndState -> ErrorAndState end. truncate(Ref, State) -> - case get({Ref, fhc_handle}) of - undefined -> {{error, not_open}, State}; - Handle = #handle { is_write = true } -> + case get_or_reopen(Ref, State) of + {{ok, #handle { is_write = false }}, State1} -> + {{error, not_open_for_writing}, State1}; + {{ok, Handle}, State1} -> {Result, Handle1} = case write_buffer(Handle) of {ok, @@ -230,11 +243,21 @@ truncate(Ref, State) -> {Error, Handle2} -> {Error, Handle2} end, put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now () }), - {Result, State}; - _Handle -> {{error, not_open_for_writing}, State} + {Result, State1}; + ErrorAndState -> ErrorAndState + end. + +get_or_reopen(Ref, State) -> + case get({Ref, fhc_handle}) of + undefined -> {{error, not_open}, State}; + #handle { hdl = closed, mode = Mode, global_key = GRef, + options = Options } -> + #file { path = Path } = get({GRef, fhc_file}), + open1(Path, Mode, Options, Ref, GRef, State); + Handle -> {{ok, Handle}, State} end. -open1(Path, Mode, Options, GRef, State) -> +open1(Path, Mode, Options, Ref, GRef, State) -> case file:open(Path, Mode) of {ok, Hdl} -> WriteBufferSize = @@ -243,15 +266,15 @@ open1(Path, Mode, Options, GRef, State) -> infinity -> infinity; N when is_integer(N) -> N end, - Ref = make_ref(), - put({Ref, fhc_handle}, + Handle = #handle { hdl = Hdl, offset = 0, trusted_offset = 0, - write_buffer_size = 0, + write_buffer_size = 0, options = Options, write_buffer_size_limit = WriteBufferSize, write_buffer = [], at_eof = false, mode = Mode, is_write = is_writer(Mode), is_read = is_reader(Mode), - global_key = GRef, last_used_at = now() }), - {{ok, Ref}, State}; + global_key = GRef, last_used_at = now() }, + put({Ref, fhc_handle}, Handle), + {{ok, Handle}, State}; {error, Reason} -> {{error, Reason}, State} end. |
