summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-07 15:33:07 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-07 15:33:07 +0100
commited5ec5b25777a51e5baf1c501ed998d44031b1b1 (patch)
treee7df8527c0d2196f9011abea213a4752f650f6c4
parent9fca2ac37c4f85bdfd91ef6a2df34ed6924f7e76 (diff)
downloadrabbitmq-server-git-ed5ec5b25777a51e5baf1c501ed998d44031b1b1.tar.gz
Fix a leak in the fhc - all clients of the fhc now have a monitor created for them to enable us to tidy up after the process dies. The distinction between these monitors and the ones created in release_on_death is that the release_on_death ones are not stored in the client_mrefs dict, thus if a monitor DOWN message appears which we can't find in that dict, it is assumed it is a release_on_death monitor
-rw-r--r--src/file_handle_cache.erl48
1 files changed, 34 insertions, 14 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index c43695fbd8..59bb01bf13 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -170,7 +170,8 @@
limit,
count,
obtains,
- callbacks
+ callbacks,
+ client_mrefs
}).
%%----------------------------------------------------------------------------
@@ -688,7 +689,8 @@ init([]) ->
end,
error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]),
{ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0,
- obtains = [], callbacks = dict:new() }}.
+ obtains = [], callbacks = dict:new(),
+ client_mrefs = dict:new() }}.
handle_call(obtain, From, State = #fhc_state { count = Count }) ->
State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } =
@@ -701,21 +703,23 @@ handle_call(obtain, From, State = #fhc_state { count = Count }) ->
handle_call({register_callback, Pid, MFA}, _From,
State = #fhc_state { callbacks = Callbacks }) ->
- {reply, ok,
- State #fhc_state { callbacks = dict:store(Pid, MFA, Callbacks) }}.
+ {reply, ok, ensure_mref(
+ Pid, State #fhc_state {
+ callbacks = dict:store(Pid, MFA, Callbacks) })}.
handle_cast({open, Pid, EldestUnusedSince}, State =
#fhc_state { elders = Elders, count = Count }) ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
- {noreply, maybe_reduce(State #fhc_state { elders = Elders1,
- count = Count + 1 })};
+ {noreply, maybe_reduce(
+ ensure_mref(Pid, State #fhc_state { elders = Elders1,
+ count = Count + 1 }))};
handle_cast({update, Pid, EldestUnusedSince}, State =
#fhc_state { elders = Elders }) ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
%% don't call maybe_reduce from here otherwise we can create a
%% storm of messages
- {noreply, State #fhc_state { elders = Elders1 }};
+ {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })};
handle_cast({close, Pid, EldestUnusedSince}, State =
#fhc_state { elders = Elders, count = Count }) ->
@@ -723,8 +727,9 @@ handle_cast({close, Pid, EldestUnusedSince}, State =
undefined -> dict:erase(Pid, Elders);
_ -> dict:store(Pid, EldestUnusedSince, Elders)
end,
- {noreply, process_obtains(State #fhc_state { elders = Elders1,
- count = Count - 1 })};
+ {noreply, process_obtains(
+ ensure_mref(Pid, State #fhc_state { elders = Elders1,
+ count = Count - 1 }))};
handle_cast(check_counts, State) ->
{noreply, maybe_reduce(State)};
@@ -733,11 +738,18 @@ handle_cast({release_on_death, Pid}, State) ->
_MRef = erlang:monitor(process, Pid),
{noreply, State}.
-handle_info({'DOWN', _MRef, process, Pid, _Reason},
- State = #fhc_state { count = Count, callbacks = Callbacks }) ->
- {noreply, process_obtains(
- State #fhc_state { count = Count - 1,
- callbacks = dict:erase(Pid, Callbacks) })}.
+handle_info({'DOWN', MRef, process, Pid, _Reason},
+ State = #fhc_state { count = Count, callbacks = Callbacks,
+ client_mrefs = ClientMRefs,
+ elders = Elders }) ->
+ State1 = case dict:find(Pid, ClientMRefs) of
+ {ok, MRef} -> State #fhc_state {
+ elders = dict:erase(Pid, Elders),
+ client_mrefs = dict:erase(Pid, ClientMRefs),
+ callbacks = dict:erase(Pid, Callbacks) };
+ _ -> State #fhc_state { count = Count - 1 }
+ end,
+ {noreply, process_obtains(State1)}.
terminate(_Reason, State) ->
State.
@@ -826,3 +838,11 @@ ulimit() ->
_ ->
?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
end.
+
+ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) ->
+ State #fhc_state { client_mrefs = ensure_mref(Pid, ClientMRefs) };
+ensure_mref(Pid, ClientMRefs) ->
+ case dict:find(Pid, ClientMRefs) of
+ {ok, _MRef} -> ClientMRefs;
+ error -> dict:store(Pid, erlang:monitor(process, Pid), ClientMRefs)
+ end.