summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-06 13:18:14 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-06 13:18:14 +0100
commit01c7de5a9adbab716437c0890eeb40f8a9bb2c16 (patch)
treec00ccce34973605027d0b71621e4a7dadd1ec440 /src
parent2fbcc4c836a824a5a56fbc7f1ddb943ab49b05bb (diff)
downloadrabbitmq-server-git-01c7de5a9adbab716437c0890eeb40f8a9bb2c16.tar.gz
Well fixed a few bugs in the fhc2, but it's fundamentally flawed by the fact that raw files can only be manipulated by the process that opens it. Thus the whole design is wrong because the server process can't be responsible for holding on to released but unclosed fhs.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_file_handle_cache2.erl92
1 files changed, 67 insertions, 25 deletions
diff --git a/src/rabbit_file_handle_cache2.erl b/src/rabbit_file_handle_cache2.erl
index c15a3087a6..9f459ebfcc 100644
--- a/src/rabbit_file_handle_cache2.erl
+++ b/src/rabbit_file_handle_cache2.erl
@@ -34,7 +34,7 @@
-behaviour(gen_server2).
-export([start_link/0, new_client/1, get_file_handle/3, release_file_handle/2,
- close_file_handle/3, with_file_handle_at/5]).
+ close_file_handle/3, close_all_file_handles/1, with_file_handle_at/5]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -75,7 +75,7 @@ start_link() ->
gen_server2:start_link({local, ?SERVER}, ?MODULE, [?MAX_FILE_HANDLES], []).
new_client(Callback = {_M, _F, _A}) ->
- ok = gen_server2:call(?SERVER, new_client, infinity),
+ gen_server2:cast(?SERVER, {new_client, self()}),
#client_state { callback = Callback,
handles = dict:new() }.
@@ -104,9 +104,17 @@ close_file_handle(Path, Mode, CState = #client_state { handles = Handles }) ->
error -> %% oh well, it must have already gone
CState;
{value, _} ->
- gen_server2:cast(?SERVER, {close_handle, {self(), Path, Mode1}})
+ gen_server2:cast(?SERVER, {close_handle, {self(), Path, Mode1}}),
+ CState #client_state { handles = dict:erase({Path, Mode}, Handles) }
end.
+close_all_file_handles(CState = #client_state { handles = Handles }) ->
+ lists:foreach(
+ fun({Path, Mode}) ->
+ gen_server2:cast(?SERVER, {close_handle, {self(), Path, Mode}})
+ end, dict:fetch_keys(Handles)),
+ CState #client_state { handles = dict:new() }.
+
with_file_handle_at(Path, Mode, Offset, Fun, CState =
#client_state { handles = Handles }) ->
case obtain_file_handle(Path, Mode, CState) of
@@ -140,7 +148,10 @@ obtain_file_handle(Path, Mode, #client_state { handles = Handles,
case dict:find(Mode1, Handles) of
error ->
case gen_server2:call(?SERVER,
- {get_handle, Path, Mode1, Callback}) of
+ {get_handle, Path, Mode1, Callback, self()},
+ infinity) of
+ {open_fun, Fun} -> {Hdl, Offset} = Fun(),
+ {Mode1, Hdl, Offset};
{Hdl, Offset} -> {Mode1, Hdl, Offset};
exiting -> not_available
end;
@@ -160,16 +171,14 @@ init([MaxFileHandles]) ->
handles = Handles,
ages = Ages,
max_handles = MaxFileHandles },
+ hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call(new_client, From, State) ->
- _MRef = erlang:monitor(process, From),
- {reply, ok, State, hibernate};
-handle_call({get_handle, Path, Mode, Callback = {_M, _F, _A}}, From,
+handle_call({get_handle, Path, Mode, Callback = {_M, _F, _A}, Pid}, From,
State = #server_state { handles = Handles,
ages = Ages,
request_queue = Reqs }) ->
- Key = {From, Path, Mode},
+ Key = {Pid, Path, Mode},
State1 =
case ets:lookup(Handles, Key) of
[Obj = #hdl { handle = Hdl, offset = Offset,
@@ -186,11 +195,15 @@ handle_call({get_handle, Path, Mode, Callback = {_M, _F, _A}}, From,
[] ->
process_request_queue(
State #server_state { request_queue =
- queue:in({Key, Callback}, Reqs) })
+ queue:in({Key, From, Callback}, Reqs) })
end,
{noreply, State1, hibernate}.
-handle_cast({release_handle, Key = {_From, _Path, _Mode}, Offset},
+handle_cast({store_handle, Obj = #hdl {}},
+ State = #server_state { handles = Handles }) ->
+ ets:insert_new(Handles, Obj),
+ {noreply, State, hibernate};
+handle_cast({release_handle, Key = {_Pid, _Path, _Mode}, Offset},
State = #server_state { handles = Handles,
ages = Ages }) ->
[Obj = #hdl { timer_ref = TRef, released_at = ReleasedAtOld }] =
@@ -209,19 +222,26 @@ handle_cast({release_handle, Key = {_From, _Path, _Mode}, Offset},
end,
State1 = process_request_queue(State),
{noreply, State1, hibernate};
-handle_cast({close_handle, Key = {_From, _Path, _Mode}},
+handle_cast({close_handle, Key = {_Pid, _Path, _Mode}},
State = #server_state { handles = Handles,
ages = Ages }) ->
[Obj] = ets:lookup(Handles, Key),
ok = close_handle(Obj, Handles, Ages),
State1 = process_request_queue(State),
- {noreply, State1, hibernate}.
+ {noreply, State1, hibernate};
+handle_cast({new_client, Pid}, State) ->
+ _MRef = erlang:monitor(process, Pid),
+ {noreply, State, hibernate}.
handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #server_state { handles = Handles, ages = Ages,
request_queue = Reqs }) ->
- Reqs1 = queue:filter(fun ({{From, _Path, _Mode}, _Callback}) ->
- From /= Pid
+ Reqs1 = queue:filter(fun ({{OPid, _Path, _Mode}, From, _Callback}) ->
+ if OPid == Pid ->
+ gen_server2:reply(From, exiting),
+ false;
+ true -> true
+ end
end, Reqs),
lists:foreach(fun (Obj) ->
ok = close_handle(Obj, Handles, Ages)
@@ -233,7 +253,7 @@ terminate(_Reason, State = #server_state { ages = Ages,
request_queue = Reqs }) ->
Size = ets:info(Ages, size),
Size = free_upto(Size, State),
- lists:foreach(fun ({{From, _Path, _Mode}, _Callback}) ->
+ lists:foreach(fun ({{_Pid, _Path, _Mode}, From, _Callback}) ->
gen_server2:reply(From, exiting)
end, queue:to_list(Reqs)),
State #server_state { request_queue = queue:new() }.
@@ -281,15 +301,37 @@ open_requested(N, State = #server_state { handles = Handles,
request_queue = Reqs }) ->
case queue:out(Reqs) of
{empty, _Reqs} -> State;
- {{value, {Key = {From, Path, Mode}, Callback}}, Reqs1} ->
- {ok, Hdl} = file:open(Path, Mode),
- gen_server2:reply(From, {Hdl, 0}),
- {ok, TRef} = start_timer(Callback, Key),
- true = ets:insert_new(Handles, #hdl { key = Key,
- handle = Hdl,
- offset = unknown,
- timer_ref = TRef,
- released_at = not_released }),
+ {{value, {Key = {_Pid, Path, Mode}, From, Callback}}, Reqs1} ->
+ Msg =
+ case lists:member(raw, Mode) of
+ true ->
+ Fun =
+ fun() ->
+ {ok, Hdl} = file:open(Path, Mode),
+ {ok, TRef} = start_timer(Callback, Key),
+ gen_server2:cast(
+ ?SERVER, {store_handle,
+ #hdl {key = Key,
+ handle = Hdl,
+ offset = unknown,
+ timer_ref = TRef,
+ released_at = not_released
+ }}),
+ {Hdl, 0}
+ end,
+ {open_fun, Fun};
+ false ->
+ {ok, Hdl} = file:open(Path, Mode),
+ {ok, TRef} = start_timer(Callback, Key),
+ true = ets:insert_new(Handles, #hdl { key = Key,
+ handle = Hdl,
+ offset = unknown,
+ timer_ref = TRef,
+ released_at =
+ not_released }),
+ {Hdl, 0}
+ end,
+ gen_server2:reply(From, Msg),
open_requested(N - 1, State #server_state { request_queue = Reqs1 })
end.