diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-07 15:33:07 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-07 15:33:07 +0100 |
| commit | ed5ec5b25777a51e5baf1c501ed998d44031b1b1 (patch) | |
| tree | e7df8527c0d2196f9011abea213a4752f650f6c4 | |
| parent | 9fca2ac37c4f85bdfd91ef6a2df34ed6924f7e76 (diff) | |
| download | rabbitmq-server-git-ed5ec5b25777a51e5baf1c501ed998d44031b1b1.tar.gz | |
Fix a leak in the fhc - all clients of the fhc now have a monitor created for them to enable us to tidy up after the process dies. The distinction between these monitors and the ones created in release_on_death is that the release_on_death ones are not stored in the client_mrefs dict, thus if a monitor DOWN message appears which we can't find in that dict, it is assumed it is a release_on_death monitor
| -rw-r--r-- | src/file_handle_cache.erl | 48 |
1 files changed, 34 insertions, 14 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index c43695fbd8..59bb01bf13 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -170,7 +170,8 @@ limit, count, obtains, - callbacks + callbacks, + client_mrefs }). %%---------------------------------------------------------------------------- @@ -688,7 +689,8 @@ init([]) -> end, error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]), {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0, - obtains = [], callbacks = dict:new() }}. + obtains = [], callbacks = dict:new(), + client_mrefs = dict:new() }}. handle_call(obtain, From, State = #fhc_state { count = Count }) -> State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } = @@ -701,21 +703,23 @@ handle_call(obtain, From, State = #fhc_state { count = Count }) -> handle_call({register_callback, Pid, MFA}, _From, State = #fhc_state { callbacks = Callbacks }) -> - {reply, ok, - State #fhc_state { callbacks = dict:store(Pid, MFA, Callbacks) }}. + {reply, ok, ensure_mref( + Pid, State #fhc_state { + callbacks = dict:store(Pid, MFA, Callbacks) })}. handle_cast({open, Pid, EldestUnusedSince}, State = #fhc_state { elders = Elders, count = Count }) -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), - {noreply, maybe_reduce(State #fhc_state { elders = Elders1, - count = Count + 1 })}; + {noreply, maybe_reduce( + ensure_mref(Pid, State #fhc_state { elders = Elders1, + count = Count + 1 }))}; handle_cast({update, Pid, EldestUnusedSince}, State = #fhc_state { elders = Elders }) -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), %% don't call maybe_reduce from here otherwise we can create a %% storm of messages - {noreply, State #fhc_state { elders = Elders1 }}; + {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })}; handle_cast({close, Pid, EldestUnusedSince}, State = #fhc_state { elders = Elders, count = Count }) -> @@ -723,8 +727,9 @@ handle_cast({close, Pid, EldestUnusedSince}, State = undefined -> dict:erase(Pid, Elders); _ -> dict:store(Pid, EldestUnusedSince, Elders) end, - {noreply, process_obtains(State #fhc_state { elders = Elders1, - count = Count - 1 })}; + {noreply, process_obtains( + ensure_mref(Pid, State #fhc_state { elders = Elders1, + count = Count - 1 }))}; handle_cast(check_counts, State) -> {noreply, maybe_reduce(State)}; @@ -733,11 +738,18 @@ handle_cast({release_on_death, Pid}, State) -> _MRef = erlang:monitor(process, Pid), {noreply, State}. -handle_info({'DOWN', _MRef, process, Pid, _Reason}, - State = #fhc_state { count = Count, callbacks = Callbacks }) -> - {noreply, process_obtains( - State #fhc_state { count = Count - 1, - callbacks = dict:erase(Pid, Callbacks) })}. +handle_info({'DOWN', MRef, process, Pid, _Reason}, + State = #fhc_state { count = Count, callbacks = Callbacks, + client_mrefs = ClientMRefs, + elders = Elders }) -> + State1 = case dict:find(Pid, ClientMRefs) of + {ok, MRef} -> State #fhc_state { + elders = dict:erase(Pid, Elders), + client_mrefs = dict:erase(Pid, ClientMRefs), + callbacks = dict:erase(Pid, Callbacks) }; + _ -> State #fhc_state { count = Count - 1 } + end, + {noreply, process_obtains(State1)}. terminate(_Reason, State) -> State. @@ -826,3 +838,11 @@ ulimit() -> _ -> ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS end. + +ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) -> + State #fhc_state { client_mrefs = ensure_mref(Pid, ClientMRefs) }; +ensure_mref(Pid, ClientMRefs) -> + case dict:find(Pid, ClientMRefs) of + {ok, _MRef} -> ClientMRefs; + error -> dict:store(Pid, erlang:monitor(process, Pid), ClientMRefs) + end. |
