summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-30 13:12:41 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-30 13:12:41 +0100
commit36bd21122a387c0be477b1a8cabe52dfd15883e2 (patch)
tree3e0100753a034aa87f25d05d1c6f344391c01c46 /src
parenteab7e12334a3839e362c3ab762a5fd545f7f739c (diff)
downloadrabbitmq-server-git-36bd21122a387c0be477b1a8cabe52dfd15883e2.tar.gz
Change register_callback to a cast, don't send messages in absence of a callback, and combine two dicts
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl98
1 files changed, 52 insertions, 46 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 2d326583ed..92b408c524 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -84,18 +84,18 @@
%% When the limit is reached, the server calculates the average age of
%% the last reported least recently used file handle of all the
%% clients. It then tells all the clients to close any handles not
-%% used for longer than this average. The client should receive this
-%% message and pass it into set_maximum_since_use/1. However, it is
-%% highly possible this age will be greater than the ages of all the
-%% handles the client knows of because the client has used its file
-%% handles in the mean time. Thus at this point the client reports to
-%% the server the current timestamp at which its least recently used
-%% file handle was last used. The server will check two seconds later
-%% that either it is back under the limit, in which case all is well
-%% again, or if not, it will calculate a new average age. Its data
-%% will be much more recent now, and so it is very likely that when
-%% this is communicated to the clients, the clients will close file
-%% handles.
+%% used for longer than this average, by invoking the callback the
+%% client registered. The client should receive this message and pass
+%% it into set_maximum_since_use/1. However, it is highly possible
+%% this age will be greater than the ages of all the handles the
+%% client knows of because the client has used its file handles in the
+%% mean time. Thus at this point the client reports to the server the
+%% current timestamp at which its least recently used file handle was
+%% last used. The server will check two seconds later that either it
+%% is back under the limit, in which case all is well again, or if
+%% not, it will calculate a new average age. Its data will be much
+%% more recent now, and so it is very likely that when this is
+%% communicated to the clients, the clients will close file handles.
%%
%% The advantage of this scheme is that there is only communication
%% from the client to the server on open, close, and when in the
@@ -170,8 +170,7 @@
limit,
count,
obtains,
- callbacks,
- client_mrefs
+ callbacks_mrefs
}).
%%----------------------------------------------------------------------------
@@ -221,7 +220,7 @@ start_link() ->
register_callback(M, F, A)
when is_atom(M) andalso is_atom(F) andalso is_list(A) ->
- gen_server:call(?SERVER, {register_callback, self(), {M, F, A}}, infinity).
+ gen_server:cast(?SERVER, {register_callback, self(), {M, F, A}}).
open(Path, Mode, Options) ->
Path1 = filename:absname(Path),
@@ -699,8 +698,7 @@ init([]) ->
end,
error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]),
{ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0,
- obtains = [], callbacks = dict:new(),
- client_mrefs = dict:new() }}.
+ obtains = [], callbacks_mrefs = dict:new() }}.
handle_call(obtain, From, State = #fhc_state { count = Count }) ->
State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } =
@@ -709,13 +707,16 @@ handle_call(obtain, From, State = #fhc_state { count = Count }) ->
true -> {noreply, State1 #fhc_state { obtains = [From | Obtains],
count = Count1 - 1 }};
false -> {reply, ok, State1}
- end;
+ end.
-handle_call({register_callback, Pid, MFA}, _From,
- State = #fhc_state { callbacks = Callbacks }) ->
- {reply, ok, ensure_mref(
- Pid, State #fhc_state {
- callbacks = dict:store(Pid, MFA, Callbacks) })}.
+handle_cast({register_callback, Pid, MFA}, State) ->
+ State1 = #fhc_state { callbacks_mrefs = CallsMRefs } =
+ ensure_mref(Pid, State),
+ {noreply,
+ State1 #fhc_state { callbacks_mrefs =
+ dict:update(
+ Pid, fun ({undefined, MRef}) -> {MFA, MRef} end,
+ CallsMRefs) }};
handle_cast({open, Pid, EldestUnusedSince}, State =
#fhc_state { elders = Elders, count = Count }) ->
@@ -749,17 +750,17 @@ handle_cast({release_on_death, Pid}, State) ->
{noreply, State}.
handle_info({'DOWN', MRef, process, Pid, _Reason},
- State = #fhc_state { count = Count, callbacks = Callbacks,
- client_mrefs = ClientMRefs,
+ State = #fhc_state { count = Count, callbacks_mrefs = CallsMRefs,
elders = Elders }) ->
- State1 = case dict:find(Pid, ClientMRefs) of
- {ok, MRef} -> State #fhc_state {
- elders = dict:erase(Pid, Elders),
- client_mrefs = dict:erase(Pid, ClientMRefs),
- callbacks = dict:erase(Pid, Callbacks) };
- _ -> State #fhc_state { count = Count - 1 }
- end,
- {noreply, process_obtains(State1)}.
+ {noreply, process_obtains(
+ case dict:find(Pid, CallsMRefs) of
+ {ok, {_Callback, MRef}} ->
+ State #fhc_state {
+ elders = dict:erase(Pid, Elders),
+ callbacks_mrefs = dict:erase(Pid, CallsMRefs) };
+ _ ->
+ State #fhc_state { count = Count - 1 }
+ end)}.
terminate(_Reason, State) ->
State.
@@ -785,8 +786,9 @@ process_obtains(State = #fhc_state { limit = Limit, count = Count,
[gen_server:reply(From, ok) || From <- ObtainableRev],
State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }.
-maybe_reduce(State = #fhc_state { limit = Limit, count = Count,
- elders = Elders, callbacks = Callbacks })
+maybe_reduce(State = #fhc_state {
+ limit = Limit, count = Count, elders = Elders,
+ callbacks_mrefs = CallsMRefs })
when Limit /= infinity andalso Count >= Limit ->
Now = now(),
{Pids, Sum, ClientCount} =
@@ -801,11 +803,10 @@ maybe_reduce(State = #fhc_state { limit = Limit, count = Count,
_ -> AverageAge = Sum / ClientCount,
lists:foreach(
fun (Pid) ->
- case dict:find(Pid, Callbacks) of
- error ->
- Pid ! {?MODULE, maximum_eldest_since_use,
- AverageAge};
- {ok, {M, F, A}} ->
+ case dict:fetch(Pid, CallsMRefs) of
+ {undefined, _MRef} ->
+ ok;
+ {{M, F, A}, _MRef} ->
apply(M, F, A ++ [AverageAge])
end
end, Pids)
@@ -849,10 +850,15 @@ ulimit() ->
?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
end.
-ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) ->
- case dict:find(Pid, ClientMRefs) of
- {ok, _MRef} -> State;
- error -> MRef = erlang:monitor(process, Pid),
- State #fhc_state {
- client_mrefs = dict:store(Pid, MRef, ClientMRefs) }
+ensure_mref(Pid, State = #fhc_state { callbacks_mrefs = CallsMRefs }) ->
+ case dict:find(Pid, CallsMRefs) of
+ {ok, {_Callback, MRef}} when MRef =/= undefined ->
+ State;
+ _ ->
+ MRef = erlang:monitor(process, Pid),
+ State #fhc_state {
+ callbacks_mrefs = dict:update(
+ Pid, fun ({Callback, undefined}) ->
+ {Callback, MRef}
+ end, {undefined, MRef}, CallsMRefs) }
end.