summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-06 13:18:14 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-06 13:18:14 +0100
commit01c7de5a9adbab716437c0890eeb40f8a9bb2c16 (patch)
treec00ccce34973605027d0b71621e4a7dadd1ec440
parent2fbcc4c836a824a5a56fbc7f1ddb943ab49b05bb (diff)
downloadrabbitmq-server-git-01c7de5a9adbab716437c0890eeb40f8a9bb2c16.tar.gz
Well fixed a few bugs in the fhc2, but it's fundamentally flawed by the fact that raw files can only be manipulated by the process that opens it. Thus the whole design is wrong because the server process can't be responsible for holding on to released but unclosed fhs.
-rw-r--r--src/rabbit_file_handle_cache2.erl92
1 files changed, 67 insertions, 25 deletions
diff --git a/src/rabbit_file_handle_cache2.erl b/src/rabbit_file_handle_cache2.erl
index c15a3087a6..9f459ebfcc 100644
--- a/src/rabbit_file_handle_cache2.erl
+++ b/src/rabbit_file_handle_cache2.erl
@@ -34,7 +34,7 @@
-behaviour(gen_server2).
-export([start_link/0, new_client/1, get_file_handle/3, release_file_handle/2,
- close_file_handle/3, with_file_handle_at/5]).
+ close_file_handle/3, close_all_file_handles/1, with_file_handle_at/5]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -75,7 +75,7 @@ start_link() ->
gen_server2:start_link({local, ?SERVER}, ?MODULE, [?MAX_FILE_HANDLES], []).
new_client(Callback = {_M, _F, _A}) ->
- ok = gen_server2:call(?SERVER, new_client, infinity),
+ gen_server2:cast(?SERVER, {new_client, self()}),
#client_state { callback = Callback,
handles = dict:new() }.
@@ -104,9 +104,17 @@ close_file_handle(Path, Mode, CState = #client_state { handles = Handles }) ->
error -> %% oh well, it must have already gone
CState;
{value, _} ->
- gen_server2:cast(?SERVER, {close_handle, {self(), Path, Mode1}})
+ gen_server2:cast(?SERVER, {close_handle, {self(), Path, Mode1}}),
+ CState #client_state { handles = dict:erase({Path, Mode}, Handles) }
end.
+close_all_file_handles(CState = #client_state { handles = Handles }) ->
+ lists:foreach(
+ fun({Path, Mode}) ->
+ gen_server2:cast(?SERVER, {close_handle, {self(), Path, Mode}})
+ end, dict:fetch_keys(Handles)),
+ CState #client_state { handles = dict:new() }.
+
with_file_handle_at(Path, Mode, Offset, Fun, CState =
#client_state { handles = Handles }) ->
case obtain_file_handle(Path, Mode, CState) of
@@ -140,7 +148,10 @@ obtain_file_handle(Path, Mode, #client_state { handles = Handles,
case dict:find(Mode1, Handles) of
error ->
case gen_server2:call(?SERVER,
- {get_handle, Path, Mode1, Callback}) of
+ {get_handle, Path, Mode1, Callback, self()},
+ infinity) of
+ {open_fun, Fun} -> {Hdl, Offset} = Fun(),
+ {Mode1, Hdl, Offset};
{Hdl, Offset} -> {Mode1, Hdl, Offset};
exiting -> not_available
end;
@@ -160,16 +171,14 @@ init([MaxFileHandles]) ->
handles = Handles,
ages = Ages,
max_handles = MaxFileHandles },
+ hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call(new_client, From, State) ->
- _MRef = erlang:monitor(process, From),
- {reply, ok, State, hibernate};
-handle_call({get_handle, Path, Mode, Callback = {_M, _F, _A}}, From,
+handle_call({get_handle, Path, Mode, Callback = {_M, _F, _A}, Pid}, From,
State = #server_state { handles = Handles,
ages = Ages,
request_queue = Reqs }) ->
- Key = {From, Path, Mode},
+ Key = {Pid, Path, Mode},
State1 =
case ets:lookup(Handles, Key) of
[Obj = #hdl { handle = Hdl, offset = Offset,
@@ -186,11 +195,15 @@ handle_call({get_handle, Path, Mode, Callback = {_M, _F, _A}}, From,
[] ->
process_request_queue(
State #server_state { request_queue =
- queue:in({Key, Callback}, Reqs) })
+ queue:in({Key, From, Callback}, Reqs) })
end,
{noreply, State1, hibernate}.
-handle_cast({release_handle, Key = {_From, _Path, _Mode}, Offset},
+handle_cast({store_handle, Obj = #hdl {}},
+ State = #server_state { handles = Handles }) ->
+ ets:insert_new(Handles, Obj),
+ {noreply, State, hibernate};
+handle_cast({release_handle, Key = {_Pid, _Path, _Mode}, Offset},
State = #server_state { handles = Handles,
ages = Ages }) ->
[Obj = #hdl { timer_ref = TRef, released_at = ReleasedAtOld }] =
@@ -209,19 +222,26 @@ handle_cast({release_handle, Key = {_From, _Path, _Mode}, Offset},
end,
State1 = process_request_queue(State),
{noreply, State1, hibernate};
-handle_cast({close_handle, Key = {_From, _Path, _Mode}},
+handle_cast({close_handle, Key = {_Pid, _Path, _Mode}},
State = #server_state { handles = Handles,
ages = Ages }) ->
[Obj] = ets:lookup(Handles, Key),
ok = close_handle(Obj, Handles, Ages),
State1 = process_request_queue(State),
- {noreply, State1, hibernate}.
+ {noreply, State1, hibernate};
+handle_cast({new_client, Pid}, State) ->
+ _MRef = erlang:monitor(process, Pid),
+ {noreply, State, hibernate}.
handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #server_state { handles = Handles, ages = Ages,
request_queue = Reqs }) ->
- Reqs1 = queue:filter(fun ({{From, _Path, _Mode}, _Callback}) ->
- From /= Pid
+ Reqs1 = queue:filter(fun ({{OPid, _Path, _Mode}, From, _Callback}) ->
+ if OPid == Pid ->
+ gen_server2:reply(From, exiting),
+ false;
+ true -> true
+ end
end, Reqs),
lists:foreach(fun (Obj) ->
ok = close_handle(Obj, Handles, Ages)
@@ -233,7 +253,7 @@ terminate(_Reason, State = #server_state { ages = Ages,
request_queue = Reqs }) ->
Size = ets:info(Ages, size),
Size = free_upto(Size, State),
- lists:foreach(fun ({{From, _Path, _Mode}, _Callback}) ->
+ lists:foreach(fun ({{_Pid, _Path, _Mode}, From, _Callback}) ->
gen_server2:reply(From, exiting)
end, queue:to_list(Reqs)),
State #server_state { request_queue = queue:new() }.
@@ -281,15 +301,37 @@ open_requested(N, State = #server_state { handles = Handles,
request_queue = Reqs }) ->
case queue:out(Reqs) of
{empty, _Reqs} -> State;
- {{value, {Key = {From, Path, Mode}, Callback}}, Reqs1} ->
- {ok, Hdl} = file:open(Path, Mode),
- gen_server2:reply(From, {Hdl, 0}),
- {ok, TRef} = start_timer(Callback, Key),
- true = ets:insert_new(Handles, #hdl { key = Key,
- handle = Hdl,
- offset = unknown,
- timer_ref = TRef,
- released_at = not_released }),
+ {{value, {Key = {_Pid, Path, Mode}, From, Callback}}, Reqs1} ->
+ Msg =
+ case lists:member(raw, Mode) of
+ true ->
+ Fun =
+ fun() ->
+ {ok, Hdl} = file:open(Path, Mode),
+ {ok, TRef} = start_timer(Callback, Key),
+ gen_server2:cast(
+ ?SERVER, {store_handle,
+ #hdl {key = Key,
+ handle = Hdl,
+ offset = unknown,
+ timer_ref = TRef,
+ released_at = not_released
+ }}),
+ {Hdl, 0}
+ end,
+ {open_fun, Fun};
+ false ->
+ {ok, Hdl} = file:open(Path, Mode),
+ {ok, TRef} = start_timer(Callback, Key),
+ true = ets:insert_new(Handles, #hdl { key = Key,
+ handle = Hdl,
+ offset = unknown,
+ timer_ref = TRef,
+ released_at =
+ not_released }),
+ {Hdl, 0}
+ end,
+ gen_server2:reply(From, Msg),
open_requested(N - 1, State #server_state { request_queue = Reqs1 })
end.