summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_file_handle_cache2.erl138
1 files changed, 116 insertions, 22 deletions
diff --git a/src/rabbit_file_handle_cache2.erl b/src/rabbit_file_handle_cache2.erl
index 6fe7e47632..6594e62f45 100644
--- a/src/rabbit_file_handle_cache2.erl
+++ b/src/rabbit_file_handle_cache2.erl
@@ -33,7 +33,8 @@
-behaviour(gen_server2).
--export([start_link/0]).
+-export([start_link/0, new_client/1, get_file_handle/3, release_file_handle/2,
+ close_file_handle/3, with_file_handle_at/5]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -53,6 +54,11 @@
max_handles
}).
+-record(client_state,
+ { callback,
+ handles
+ }).
+
-record(hdl,
{ key,
handle,
@@ -68,12 +74,87 @@
start_link() ->
gen_server2:start_link({local, ?SERVER}, ?MODULE, [?MAX_FILE_HANDLES], []).
+new_client(Callback = {_M, _F, _A}) ->
+ ok = gen_server2:call(?SERVER, new_client, infinity),
+ #client_state { callback = Callback,
+ handles = dict:new() }.
+
+get_file_handle(Path, Mode, CState = #client_state { handles = Handles }) ->
+ case obtain_file_handle(Path, Mode, CState) of
+ not_available -> {not_available, CState};
+ {Mode1, Hdl, _Offset} ->
+ Handles1 = dict:store({Path, Mode1}, {Hdl, unknown}, Handles),
+ {Hdl, CState #client_state { handles = Handles1 }}
+ end.
+
+release_file_handle({release_handle, Key = {_From, Path, Mode}},
+ CState = #client_state { handles = Handles }) ->
+ Mode1 = lists:usort(Mode),
+ case dict:find({Path, Mode1}, Handles) of
+ error -> %% oh well, it must have already gone
+ CState;
+ {value, {_Hdl, Offset}} ->
+ Handles1 = dict:erase({Path, Mode1}, Handles),
+ gen_server2:cast(?SERVER, {release_handle, Key, Offset}),
+ CState #client_state { handles = Handles1 }
+ end.
+
+close_file_handle(Path, Mode, CState = #client_state { handles = Handles }) ->
+ Mode1 = lists:usort(Mode),
+ case dict:find({Path, Mode1}, Handles) of
+ error -> %% oh well, it must have already gone
+ CState;
+ {value, _} ->
+ gen_server2:cast(?SERVER, {close_handle, {self(), Path, Mode1}})
+ end.
+
+with_file_handle_at(Path, Mode, Offset, Fun, CState =
+ #client_state { handles = Handles }) ->
+ case obtain_file_handle(Path, Mode, CState) of
+ not_available -> {not_available, CState};
+ {Mode1, Hdl, OldOffset} ->
+ SeekRes = case Offset == OldOffset of
+ true -> ok;
+ false -> case file:position(Hdl, Offset) of
+ {ok, _} -> ok;
+ KO -> KO
+ end
+ end,
+ case SeekRes of
+ ok -> {NewOffset, Result} = Fun(Hdl),
+ {Result, CState #client_state {
+ handles = dict:store({Path, Mode1},
+ {Hdl, NewOffset},
+ Handles) }};
+ KO1 -> {KO1, CState}
+ end
+ end.
+
+%%----------------------------------------------------------------------------
+%% Client-side helpers
+%%----------------------------------------------------------------------------
+
+obtain_file_handle(Path, Mode, #client_state { handles = Handles,
+ callback = Callback }) ->
+ Mode1 = lists:usort(Mode),
+ case dict:find(Mode1, Handles) of
+ error ->
+ case gen_server2:call(?SERVER,
+ {get_handle, Path, Mode1, Callback}) of
+ {Hdl, Offset} -> {Mode1, Hdl, Offset};
+ exiting -> not_available
+ end;
+ {value, {Hdl, Offset}} ->
+ {Mode1, Hdl, Offset}
+ end.
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
init([MaxFileHandles]) ->
- Handles = ets:new(?ETS_HANDLES_NAME, [ordered_set, private, {keypos, #hdl.key}]),
+ Handles = ets:new(?ETS_HANDLES_NAME,
+ [ordered_set, private, {keypos, #hdl.key}]),
Ages = ets:new(?ETS_AGE_NAME, [ordered_set, private]),
{ok, #server_state { request_queue = queue:new(),
handles = Handles,
@@ -81,6 +162,9 @@ init([MaxFileHandles]) ->
max_handles = MaxFileHandles },
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+handle_call(new_client, From, State) ->
+ _MRef = erlang:monitor(process, From),
+ {reply, ok, State};
handle_call({get_handle, Path, Mode, Callback = {_M, _F, _A}}, From,
State = #server_state { handles = Handles,
ages = Ages,
@@ -93,30 +177,35 @@ handle_call({get_handle, Path, Mode, Callback = {_M, _F, _A}}, From,
gen_server2:reply(From, {Hdl, Offset}),
ok = stop_timer(TRef),
{ok, TRef1} = start_timer(Callback, Key),
- true = ets:insert(Handles, Obj #hdl { offset = unknown,
- timer_ref = TRef1,
- released_at = not_released }),
+ true = ets:insert(Handles,
+ Obj #hdl { offset = unknown,
+ timer_ref = TRef1,
+ released_at = not_released }),
true = ets:delete(Ages, ReleasedAt),
State;
[] ->
process_request_queue(
- State #server_state { request_queue = queue:in({Key, Callback}, Reqs) })
+ State #server_state { request_queue =
+ queue:in({Key, Callback}, Reqs) })
end,
{noreply, State1, hibernate}.
handle_cast({release_handle, Key = {_From, _Path, _Mode}, Offset},
State = #server_state { handles = Handles,
ages = Ages }) ->
- [Obj = #hdl { timer_ref = TRef, released_at = ReleasedAtOld }] = ets:lookup(Handles, Key),
+ [Obj = #hdl { timer_ref = TRef, released_at = ReleasedAtOld }] =
+ ets:lookup(Handles, Key),
ok = stop_timer(TRef),
ok = case ReleasedAtOld of
- not_released -> ReleasedAt = now(),
- true = ets:insert_new(Ages, {ReleasedAt, Key}),
- true = ets:insert(Handles, Obj #hdl { released_at = ReleasedAt,
- offset = Offset,
- timer_ref = no_timer }),
- ok;
- _ -> ok
+ not_released ->
+ ReleasedAt = now(),
+ true = ets:insert_new(Ages, {ReleasedAt, Key}),
+ true = ets:insert(Handles, Obj #hdl { released_at = ReleasedAt,
+ offset = Offset,
+ timer_ref = no_timer }),
+ ok;
+ _ ->
+ ok
end,
State1 = process_request_queue(State),
{noreply, State1, hibernate};
@@ -129,16 +218,19 @@ handle_cast({close_handle, Key = {_From, _Path, _Mode}},
{noreply, State1, hibernate}.
handle_info({'DOWN', _MRef, process, Pid, _Reason},
- State = #server_state { handles = Handles, ages = Ages, request_queue = Reqs }) ->
+ State = #server_state { handles = Handles, ages = Ages,
+ request_queue = Reqs }) ->
Reqs1 = queue:filter(fun ({{From, _Path, _Mode}, _Callback}) ->
From /= Pid
end, Reqs),
lists:foreach(fun (Obj) ->
ok = close_handle(Obj, Handles, Ages)
- end, ets:match_object(Handles, #hdl { key = {Pid, '_', '_'}, _ = '_' })),
+ end, ets:match_object(Handles, #hdl { key = {Pid, '_', '_'},
+ _ = '_' })),
{noreply, State #server_state { request_queue = Reqs1 }}.
-terminate(_Reason, State = #server_state { ages = Ages, request_queue = Reqs }) ->
+terminate(_Reason, State = #server_state { ages = Ages,
+ request_queue = Reqs }) ->
Size = ets:info(Ages, size),
Size = free_upto(Size, State),
lists:foreach(fun ({{From, _Path, _Mode}, _Callback}) ->
@@ -150,7 +242,7 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%----------------------------------------------------------------------------
-%% Helpers
+%% Server-side Helpers
%%----------------------------------------------------------------------------
start_timer({M,F,A}, Key) ->
@@ -162,7 +254,8 @@ stop_timer(TRef) ->
timer:cancel(TRef),
ok.
-close_handle(#hdl { key = Key, timer_ref = TRef, released_at = ReleasedAt, handle = Hdl },
+close_handle(#hdl { key = Key, timer_ref = TRef, released_at = ReleasedAt,
+ handle = Hdl },
Handles, Ages) ->
ok = timer:stop(TRef),
ok = file:sync(Hdl),
@@ -184,7 +277,8 @@ process_request_queue(State = #server_state { max_handles = MaxHandles,
open_requested(0, State) ->
State;
-open_requested(N, State = #server_state { handles = Handles, request_queue = Reqs }) ->
+open_requested(N, State = #server_state { handles = Handles,
+ request_queue = Reqs }) ->
case queue:out(Reqs) of
{empty, _Reqs} -> State;
{{value, {Key = {From, Path, Mode}, Callback}}, Reqs1} ->
@@ -210,8 +304,8 @@ free_upto(N, Count, State = #server_state { handles = Handles,
'$end_of_table' ->
Count;
{ReleasedAt, Key} ->
- [#hdl { handle = Hdl, timer_ref = no_timer, released_at = ReleasedAt }]
- = ets:lookup(Handles, Key),
+ [#hdl { handle = Hdl, timer_ref = no_timer,
+ released_at = ReleasedAt }] = ets:lookup(Handles, Key),
ok = file:sync(Hdl),
ok = file:close(Hdl),
true = ets:delete(Ages, ReleasedAt),