summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-17 17:24:27 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-17 17:24:27 +0100
commit305f866c4a70bb3ecab4be8c69847874667a5dec (patch)
tree3f708414362a5e93ed6ea54d05a5da375e3daed5 /src
parent3fe7c78a7ac920b5febb9e6073713cc11c581618 (diff)
downloadrabbitmq-server-git-305f866c4a70bb3ecab4be8c69847874667a5dec.tar.gz
Correct monitoring and actions upon DOWN messages. Note this is especially subtle for obtains, which effectively implicitly allocates temporarily to the blocked caller (FromPid) whilst monitoring it, and then transfers this to the ForPid when possible. Note the ForPid can die before the obtains is processed, which which point the FromPid must be replied to immediately.
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl170
1 files changed, 105 insertions, 65 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index fe4bdc0392..0ee3a7096e 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -183,7 +183,7 @@
obtain_count,
obtain_pending,
callbacks,
- client_mrefs,
+ counts,
timer_ref
}).
@@ -445,7 +445,7 @@ set_maximum_since_use(MaximumAge) ->
end.
obtain(Pid) ->
- gen_server:call(?SERVER, {obtain, Pid}, infinity).
+ gen_server:call(?SERVER, {obtain, self(), Pid}, infinity).
%%----------------------------------------------------------------------------
%% Internal functions
@@ -738,29 +738,36 @@ init([]) ->
obtain_count = 0,
obtain_pending = [],
callbacks = dict:new(),
- client_mrefs = dict:new(),
+ counts = dict:new(),
timer_ref = undefined }}.
-handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
- obtain_count = Count,
- obtain_pending = Pending,
- elders = Elders })
+handle_call({obtain, FromPid, ForPid}, From,
+ State = #fhc_state { obtain_limit = Limit,
+ obtain_count = Count,
+ obtain_pending = Pending,
+ elders = Elders })
when Limit =/= infinity andalso Count >= Limit ->
- {noreply,
- State #fhc_state { obtain_pending = [{obtain, Pid, From} | Pending],
- elders = dict:erase(Pid, Elders) }};
-handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
- obtain_pending = Pending,
- elders = Elders }) ->
- case maybe_reduce(State #fhc_state { obtain_count = Count + 1 }) of
+ MRef = erlang:monitor(process, FromPid),
+ Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending],
+ {noreply, ensure_mref(ForPid, State #fhc_state {
+ obtain_pending = Pending1,
+ elders = dict:erase(FromPid, Elders) })};
+handle_call({obtain, FromPid, ForPid}, From,
+ State = #fhc_state { obtain_count = Count,
+ obtain_pending = Pending,
+ elders = Elders }) ->
+ MRef = erlang:monitor(process, FromPid),
+ case maybe_reduce(ensure_mref(ForPid, State #fhc_state {
+ obtain_count = Count + 1 })) of
{true, State1} ->
+ Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending],
{noreply, State1 #fhc_state {
obtain_count = Count,
- obtain_pending = [{obtain, Pid, From} | Pending],
- elders = dict:erase(Pid, Elders) }};
+ obtain_pending = Pending1,
+ elders = dict:erase(FromPid, Elders) }};
{false, State1} ->
- _MRef = erlang:monitor(process, Pid),
- {reply, ok, State1}
+ {noreply,
+ run_pending_item({obtain, FromPid, MRef, From, ForPid}, State1)}
end;
handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
@@ -775,12 +782,13 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
State2 = State1 #fhc_state { open_count = Count },
case CanClose of
true -> {reply, close, State2};
- false -> {noreply, State2 #fhc_state {
- open_pending = [{open, From} | Pending],
- elders = dict:erase(Pid, Elders1) }}
+ false -> {noreply,
+ State2 #fhc_state {
+ open_pending = [{open, Pid, From} | Pending],
+ elders = dict:erase(Pid, Elders1) }}
end;
{false, State1} ->
- {reply, ok, State1}
+ {noreply, run_pending_item({open, Pid, From}, State1)}
end.
handle_cast({register_callback, Pid, MFA},
@@ -794,33 +802,59 @@ handle_cast({update, Pid, EldestUnusedSince}, State =
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
%% don't call maybe_reduce from here otherwise we can create a
%% storm of messages
- {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })};
+ {noreply, State #fhc_state { elders = Elders1 }};
handle_cast({close, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders, open_count = Count }) ->
+ #fhc_state { elders = Elders, counts = Counts,
+ open_count = Count }) ->
Elders1 = case EldestUnusedSince of
undefined -> dict:erase(Pid, Elders);
_ -> dict:store(Pid, EldestUnusedSince, Elders)
end,
- {noreply, process_pending(
- ensure_mref(Pid, State #fhc_state { open_count = Count - 1,
- elders = Elders1 }))};
+ {Obtained, Opened} = dict:fetch(Pid, Counts),
+ {noreply,
+ process_pending(State #fhc_state {
+ open_count = Count - 1,
+ counts = dict:store(Pid, {Obtained, Opened - 1}, Counts),
+ elders = Elders1 })};
handle_cast(check_counts, State) ->
{_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }),
{noreply, State1}.
-handle_info({'DOWN', MRef, process, Pid, _Reason}, State =
- #fhc_state { obtain_count = Count, callbacks = Callbacks,
- client_mrefs = ClientMRefs, elders = Elders }) ->
- {noreply, process_pending(
- case dict:find(Pid, ClientMRefs) of
- {ok, MRef} -> State #fhc_state {
- elders = dict:erase(Pid, Elders),
- client_mrefs = dict:erase(Pid, ClientMRefs),
- callbacks = dict:erase(Pid, Callbacks) };
- _ -> State #fhc_state { obtain_count = Count - 1 }
- end)}.
+handle_info({'DOWN', _MRef, process, Pid, _Reason}, State =
+ #fhc_state { obtain_count = ObtainCount,
+ obtain_pending = ObtainPending,
+ open_count = OpenCount,
+ open_pending = OpenPending,
+ callbacks = Callbacks,
+ counts = Counts,
+ elders = Elders }) ->
+ ObtainPending1 =
+ lists:filter(
+ fun ({obtain, FromPid, FromMRef, From, ForPid}) ->
+ case Pid =:= ForPid of
+ true -> gen_server:reply(From, ok),
+ true = erlang:demonitor(FromMRef, [flush]),
+ false;
+ false -> Pid =/= FromPid
+ end
+ end, ObtainPending),
+ OpenPending1 = lists:filter(fun ({open, Pid1, _From}) ->
+ Pid =/= Pid1
+ end, OpenPending),
+ {Obtained, Opened} = case dict:find(Pid, Counts) of
+ {ok, Val} -> Val;
+ error -> {0, 0}
+ end,
+ {noreply, process_pending(State #fhc_state {
+ elders = dict:erase(Pid, Elders),
+ counts = dict:erase(Pid, Counts),
+ callbacks = dict:erase(Pid, Callbacks),
+ obtain_count = ObtainCount - Obtained,
+ obtain_pending = ObtainPending1,
+ open_count = OpenCount - Opened,
+ open_pending = OpenPending1 })}.
terminate(_Reason, State) ->
State.
@@ -841,10 +875,10 @@ process_open(State = #fhc_state { limit = Limit,
open_pending = Pending,
open_count = OpenCount,
obtain_count = ObtainCount }) ->
- {Pending1, Inc} =
- process_pending(Pending, Limit - (ObtainCount + OpenCount)),
- State #fhc_state { open_pending = Pending1,
- open_count = OpenCount + Inc }.
+ {Pending1, Inc, State1} =
+ process_pending(Pending, Limit - (ObtainCount + OpenCount), State),
+ State1 #fhc_state { open_pending = Pending1,
+ open_count = OpenCount + Inc }.
process_obtain(State = #fhc_state { limit = Limit,
obtain_pending = Pending,
@@ -853,27 +887,34 @@ process_obtain(State = #fhc_state { limit = Limit,
open_count = OpenCount }) ->
Quota = lists:min([ObtainLimit - ObtainCount,
Limit - (ObtainCount + OpenCount)]),
- {Pending1, Inc} = process_pending(Pending, Quota),
- State #fhc_state { obtain_pending = Pending1,
- obtain_count = ObtainCount + Inc }.
-
-process_pending([], _Quota) ->
- {[], 0};
-process_pending(Pending, Quota) when Quota =< 0 ->
- {Pending, 0};
-process_pending(Pending, Quota) ->
+ {Pending1, Inc, State1} = process_pending(Pending, Quota, State),
+ State1 #fhc_state { obtain_pending = Pending1,
+ obtain_count = ObtainCount + Inc }.
+
+process_pending([], _Quota, State) ->
+ {[], 0, State};
+process_pending(Pending, Quota, State) when Quota =< 0 ->
+ {Pending, 0, State};
+process_pending(Pending, Quota, State) ->
PendingLen = length(Pending),
SatisfiableLen = lists:min([PendingLen, Quota]),
Take = PendingLen - SatisfiableLen,
{PendingNew, SatisfiableRev} = lists:split(Take, Pending),
- [run_pending_item(Item) || Item <- SatisfiableRev],
- {PendingNew, SatisfiableLen}.
-
-run_pending_item({open, From}) ->
- gen_server:reply(From, ok);
-run_pending_item({obtain, Pid, From}) ->
- _MRef = erlang:monitor(process, Pid),
- gen_server:reply(From, ok).
+ {PendingNew, SatisfiableLen,
+ lists:foldl(fun run_pending_item/2, State, SatisfiableRev)}.
+
+run_pending_item({open, Pid, From}, State = #fhc_state { counts = Counts }) ->
+ gen_server:reply(From, ok),
+ {Obtained, Opened} = dict:fetch(Pid, Counts),
+ State #fhc_state {
+ counts = dict:store(Pid, {Obtained, Opened + 1}, Counts) };
+run_pending_item({obtain, _FromPid, FromMRef, From, ForPid},
+ State = #fhc_state { counts = Counts }) ->
+ gen_server:reply(From, ok),
+ true = erlang:demonitor(FromMRef, [flush]),
+ {Obtained, Opened} = dict:fetch(ForPid, Counts),
+ State #fhc_state {
+ counts = dict:store(ForPid, {Obtained + 1, Opened}, Counts) }.
maybe_reduce(State = #fhc_state { limit = Limit,
open_count = OpenCount,
@@ -948,10 +989,9 @@ ulimit() ->
?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
end.
-ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) ->
- case dict:find(Pid, ClientMRefs) of
- {ok, _MRef} -> State;
- error -> MRef = erlang:monitor(process, Pid),
- State #fhc_state {
- client_mrefs = dict:store(Pid, MRef, ClientMRefs) }
+ensure_mref(Pid, State = #fhc_state { counts = Counts }) ->
+ case dict:find(Pid, Counts) of
+ {ok, _} -> State;
+ error -> _MRef = erlang:monitor(process, Pid),
+ State #fhc_state { counts = dict:store(Pid, {0, 0}, Counts) }
end.