diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-06 13:18:14 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-06 13:18:14 +0100 |
| commit | 01c7de5a9adbab716437c0890eeb40f8a9bb2c16 (patch) | |
| tree | c00ccce34973605027d0b71621e4a7dadd1ec440 /src | |
| parent | 2fbcc4c836a824a5a56fbc7f1ddb943ab49b05bb (diff) | |
| download | rabbitmq-server-git-01c7de5a9adbab716437c0890eeb40f8a9bb2c16.tar.gz | |
Well fixed a few bugs in the fhc2, but it's fundamentally flawed by the fact that raw files can only be manipulated by the process that opens it. Thus the whole design is wrong because the server process can't be responsible for holding on to released but unclosed fhs.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_file_handle_cache2.erl | 92 |
1 files changed, 67 insertions, 25 deletions
diff --git a/src/rabbit_file_handle_cache2.erl b/src/rabbit_file_handle_cache2.erl index c15a3087a6..9f459ebfcc 100644 --- a/src/rabbit_file_handle_cache2.erl +++ b/src/rabbit_file_handle_cache2.erl @@ -34,7 +34,7 @@ -behaviour(gen_server2). -export([start_link/0, new_client/1, get_file_handle/3, release_file_handle/2, - close_file_handle/3, with_file_handle_at/5]). + close_file_handle/3, close_all_file_handles/1, with_file_handle_at/5]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -75,7 +75,7 @@ start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [?MAX_FILE_HANDLES], []). new_client(Callback = {_M, _F, _A}) -> - ok = gen_server2:call(?SERVER, new_client, infinity), + gen_server2:cast(?SERVER, {new_client, self()}), #client_state { callback = Callback, handles = dict:new() }. @@ -104,9 +104,17 @@ close_file_handle(Path, Mode, CState = #client_state { handles = Handles }) -> error -> %% oh well, it must have already gone CState; {value, _} -> - gen_server2:cast(?SERVER, {close_handle, {self(), Path, Mode1}}) + gen_server2:cast(?SERVER, {close_handle, {self(), Path, Mode1}}), + CState #client_state { handles = dict:erase({Path, Mode}, Handles) } end. +close_all_file_handles(CState = #client_state { handles = Handles }) -> + lists:foreach( + fun({Path, Mode}) -> + gen_server2:cast(?SERVER, {close_handle, {self(), Path, Mode}}) + end, dict:fetch_keys(Handles)), + CState #client_state { handles = dict:new() }. + with_file_handle_at(Path, Mode, Offset, Fun, CState = #client_state { handles = Handles }) -> case obtain_file_handle(Path, Mode, CState) of @@ -140,7 +148,10 @@ obtain_file_handle(Path, Mode, #client_state { handles = Handles, case dict:find(Mode1, Handles) of error -> case gen_server2:call(?SERVER, - {get_handle, Path, Mode1, Callback}) of + {get_handle, Path, Mode1, Callback, self()}, + infinity) of + {open_fun, Fun} -> {Hdl, Offset} = Fun(), + {Mode1, Hdl, Offset}; {Hdl, Offset} -> {Mode1, Hdl, Offset}; exiting -> not_available end; @@ -160,16 +171,14 @@ init([MaxFileHandles]) -> handles = Handles, ages = Ages, max_handles = MaxFileHandles }, + hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call(new_client, From, State) -> - _MRef = erlang:monitor(process, From), - {reply, ok, State, hibernate}; -handle_call({get_handle, Path, Mode, Callback = {_M, _F, _A}}, From, +handle_call({get_handle, Path, Mode, Callback = {_M, _F, _A}, Pid}, From, State = #server_state { handles = Handles, ages = Ages, request_queue = Reqs }) -> - Key = {From, Path, Mode}, + Key = {Pid, Path, Mode}, State1 = case ets:lookup(Handles, Key) of [Obj = #hdl { handle = Hdl, offset = Offset, @@ -186,11 +195,15 @@ handle_call({get_handle, Path, Mode, Callback = {_M, _F, _A}}, From, [] -> process_request_queue( State #server_state { request_queue = - queue:in({Key, Callback}, Reqs) }) + queue:in({Key, From, Callback}, Reqs) }) end, {noreply, State1, hibernate}. -handle_cast({release_handle, Key = {_From, _Path, _Mode}, Offset}, +handle_cast({store_handle, Obj = #hdl {}}, + State = #server_state { handles = Handles }) -> + ets:insert_new(Handles, Obj), + {noreply, State, hibernate}; +handle_cast({release_handle, Key = {_Pid, _Path, _Mode}, Offset}, State = #server_state { handles = Handles, ages = Ages }) -> [Obj = #hdl { timer_ref = TRef, released_at = ReleasedAtOld }] = @@ -209,19 +222,26 @@ handle_cast({release_handle, Key = {_From, _Path, _Mode}, Offset}, end, State1 = process_request_queue(State), {noreply, State1, hibernate}; -handle_cast({close_handle, Key = {_From, _Path, _Mode}}, +handle_cast({close_handle, Key = {_Pid, _Path, _Mode}}, State = #server_state { handles = Handles, ages = Ages }) -> [Obj] = ets:lookup(Handles, Key), ok = close_handle(Obj, Handles, Ages), State1 = process_request_queue(State), - {noreply, State1, hibernate}. + {noreply, State1, hibernate}; +handle_cast({new_client, Pid}, State) -> + _MRef = erlang:monitor(process, Pid), + {noreply, State, hibernate}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #server_state { handles = Handles, ages = Ages, request_queue = Reqs }) -> - Reqs1 = queue:filter(fun ({{From, _Path, _Mode}, _Callback}) -> - From /= Pid + Reqs1 = queue:filter(fun ({{OPid, _Path, _Mode}, From, _Callback}) -> + if OPid == Pid -> + gen_server2:reply(From, exiting), + false; + true -> true + end end, Reqs), lists:foreach(fun (Obj) -> ok = close_handle(Obj, Handles, Ages) @@ -233,7 +253,7 @@ terminate(_Reason, State = #server_state { ages = Ages, request_queue = Reqs }) -> Size = ets:info(Ages, size), Size = free_upto(Size, State), - lists:foreach(fun ({{From, _Path, _Mode}, _Callback}) -> + lists:foreach(fun ({{_Pid, _Path, _Mode}, From, _Callback}) -> gen_server2:reply(From, exiting) end, queue:to_list(Reqs)), State #server_state { request_queue = queue:new() }. @@ -281,15 +301,37 @@ open_requested(N, State = #server_state { handles = Handles, request_queue = Reqs }) -> case queue:out(Reqs) of {empty, _Reqs} -> State; - {{value, {Key = {From, Path, Mode}, Callback}}, Reqs1} -> - {ok, Hdl} = file:open(Path, Mode), - gen_server2:reply(From, {Hdl, 0}), - {ok, TRef} = start_timer(Callback, Key), - true = ets:insert_new(Handles, #hdl { key = Key, - handle = Hdl, - offset = unknown, - timer_ref = TRef, - released_at = not_released }), + {{value, {Key = {_Pid, Path, Mode}, From, Callback}}, Reqs1} -> + Msg = + case lists:member(raw, Mode) of + true -> + Fun = + fun() -> + {ok, Hdl} = file:open(Path, Mode), + {ok, TRef} = start_timer(Callback, Key), + gen_server2:cast( + ?SERVER, {store_handle, + #hdl {key = Key, + handle = Hdl, + offset = unknown, + timer_ref = TRef, + released_at = not_released + }}), + {Hdl, 0} + end, + {open_fun, Fun}; + false -> + {ok, Hdl} = file:open(Path, Mode), + {ok, TRef} = start_timer(Callback, Key), + true = ets:insert_new(Handles, #hdl { key = Key, + handle = Hdl, + offset = unknown, + timer_ref = TRef, + released_at = + not_released }), + {Hdl, 0} + end, + gen_server2:reply(From, Msg), open_requested(N - 1, State #server_state { request_queue = Reqs1 }) end. |
