diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-30 13:12:41 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-30 13:12:41 +0100 |
| commit | 36bd21122a387c0be477b1a8cabe52dfd15883e2 (patch) | |
| tree | 3e0100753a034aa87f25d05d1c6f344391c01c46 /src | |
| parent | eab7e12334a3839e362c3ab762a5fd545f7f739c (diff) | |
| download | rabbitmq-server-git-36bd21122a387c0be477b1a8cabe52dfd15883e2.tar.gz | |
Change register_callback to a cast, don't send messages in absence of a callback, and combine two dicts
Diffstat (limited to 'src')
| -rw-r--r-- | src/file_handle_cache.erl | 98 |
1 files changed, 52 insertions, 46 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 2d326583ed..92b408c524 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -84,18 +84,18 @@ %% When the limit is reached, the server calculates the average age of %% the last reported least recently used file handle of all the %% clients. It then tells all the clients to close any handles not -%% used for longer than this average. The client should receive this -%% message and pass it into set_maximum_since_use/1. However, it is -%% highly possible this age will be greater than the ages of all the -%% handles the client knows of because the client has used its file -%% handles in the mean time. Thus at this point the client reports to -%% the server the current timestamp at which its least recently used -%% file handle was last used. The server will check two seconds later -%% that either it is back under the limit, in which case all is well -%% again, or if not, it will calculate a new average age. Its data -%% will be much more recent now, and so it is very likely that when -%% this is communicated to the clients, the clients will close file -%% handles. +%% used for longer than this average, by invoking the callback the +%% client registered. The client should receive this message and pass +%% it into set_maximum_since_use/1. However, it is highly possible +%% this age will be greater than the ages of all the handles the +%% client knows of because the client has used its file handles in the +%% mean time. Thus at this point the client reports to the server the +%% current timestamp at which its least recently used file handle was +%% last used. The server will check two seconds later that either it +%% is back under the limit, in which case all is well again, or if +%% not, it will calculate a new average age. Its data will be much +%% more recent now, and so it is very likely that when this is +%% communicated to the clients, the clients will close file handles. %% %% The advantage of this scheme is that there is only communication %% from the client to the server on open, close, and when in the @@ -170,8 +170,7 @@ limit, count, obtains, - callbacks, - client_mrefs + callbacks_mrefs }). %%---------------------------------------------------------------------------- @@ -221,7 +220,7 @@ start_link() -> register_callback(M, F, A) when is_atom(M) andalso is_atom(F) andalso is_list(A) -> - gen_server:call(?SERVER, {register_callback, self(), {M, F, A}}, infinity). + gen_server:cast(?SERVER, {register_callback, self(), {M, F, A}}). open(Path, Mode, Options) -> Path1 = filename:absname(Path), @@ -699,8 +698,7 @@ init([]) -> end, error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]), {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0, - obtains = [], callbacks = dict:new(), - client_mrefs = dict:new() }}. + obtains = [], callbacks_mrefs = dict:new() }}. handle_call(obtain, From, State = #fhc_state { count = Count }) -> State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } = @@ -709,13 +707,16 @@ handle_call(obtain, From, State = #fhc_state { count = Count }) -> true -> {noreply, State1 #fhc_state { obtains = [From | Obtains], count = Count1 - 1 }}; false -> {reply, ok, State1} - end; + end. -handle_call({register_callback, Pid, MFA}, _From, - State = #fhc_state { callbacks = Callbacks }) -> - {reply, ok, ensure_mref( - Pid, State #fhc_state { - callbacks = dict:store(Pid, MFA, Callbacks) })}. +handle_cast({register_callback, Pid, MFA}, State) -> + State1 = #fhc_state { callbacks_mrefs = CallsMRefs } = + ensure_mref(Pid, State), + {noreply, + State1 #fhc_state { callbacks_mrefs = + dict:update( + Pid, fun ({undefined, MRef}) -> {MFA, MRef} end, + CallsMRefs) }}; handle_cast({open, Pid, EldestUnusedSince}, State = #fhc_state { elders = Elders, count = Count }) -> @@ -749,17 +750,17 @@ handle_cast({release_on_death, Pid}, State) -> {noreply, State}. handle_info({'DOWN', MRef, process, Pid, _Reason}, - State = #fhc_state { count = Count, callbacks = Callbacks, - client_mrefs = ClientMRefs, + State = #fhc_state { count = Count, callbacks_mrefs = CallsMRefs, elders = Elders }) -> - State1 = case dict:find(Pid, ClientMRefs) of - {ok, MRef} -> State #fhc_state { - elders = dict:erase(Pid, Elders), - client_mrefs = dict:erase(Pid, ClientMRefs), - callbacks = dict:erase(Pid, Callbacks) }; - _ -> State #fhc_state { count = Count - 1 } - end, - {noreply, process_obtains(State1)}. + {noreply, process_obtains( + case dict:find(Pid, CallsMRefs) of + {ok, {_Callback, MRef}} -> + State #fhc_state { + elders = dict:erase(Pid, Elders), + callbacks_mrefs = dict:erase(Pid, CallsMRefs) }; + _ -> + State #fhc_state { count = Count - 1 } + end)}. terminate(_Reason, State) -> State. @@ -785,8 +786,9 @@ process_obtains(State = #fhc_state { limit = Limit, count = Count, [gen_server:reply(From, ok) || From <- ObtainableRev], State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }. -maybe_reduce(State = #fhc_state { limit = Limit, count = Count, - elders = Elders, callbacks = Callbacks }) +maybe_reduce(State = #fhc_state { + limit = Limit, count = Count, elders = Elders, + callbacks_mrefs = CallsMRefs }) when Limit /= infinity andalso Count >= Limit -> Now = now(), {Pids, Sum, ClientCount} = @@ -801,11 +803,10 @@ maybe_reduce(State = #fhc_state { limit = Limit, count = Count, _ -> AverageAge = Sum / ClientCount, lists:foreach( fun (Pid) -> - case dict:find(Pid, Callbacks) of - error -> - Pid ! {?MODULE, maximum_eldest_since_use, - AverageAge}; - {ok, {M, F, A}} -> + case dict:fetch(Pid, CallsMRefs) of + {undefined, _MRef} -> + ok; + {{M, F, A}, _MRef} -> apply(M, F, A ++ [AverageAge]) end end, Pids) @@ -849,10 +850,15 @@ ulimit() -> ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS end. -ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) -> - case dict:find(Pid, ClientMRefs) of - {ok, _MRef} -> State; - error -> MRef = erlang:monitor(process, Pid), - State #fhc_state { - client_mrefs = dict:store(Pid, MRef, ClientMRefs) } +ensure_mref(Pid, State = #fhc_state { callbacks_mrefs = CallsMRefs }) -> + case dict:find(Pid, CallsMRefs) of + {ok, {_Callback, MRef}} when MRef =/= undefined -> + State; + _ -> + MRef = erlang:monitor(process, Pid), + State #fhc_state { + callbacks_mrefs = dict:update( + Pid, fun ({Callback, undefined}) -> + {Callback, MRef} + end, {undefined, MRef}, CallsMRefs) } end. |
