summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-01-21 11:41:09 +0000
committerMatthias Radestock <matthias@lshift.net>2010-01-21 11:41:09 +0000
commite1ca83601402e403106c0db14b20b98b16bccff7 (patch)
tree43b0c932e5476c26d662d3745b627790a864271e /src
parent873e3dfccca03eefcc781c388c2cc8ee6f951947 (diff)
parent7f992290a62cb70c8d7b1e5b3503303f4f059b7c (diff)
downloadrabbitmq-server-git-e1ca83601402e403106c0db14b20b98b16bccff7.tar.gz
merge bug22218 into bug21673
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl68
-rw-r--r--src/rabbit_reader.erl4
-rw-r--r--src/tcp_acceptor.erl26
3 files changed, 64 insertions, 34 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index aa01a508ec..45c0eff3db 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -115,6 +115,14 @@
%% fully reopened again as soon as needed, thus users of this library
%% do not need to worry about their handles being closed by the server
%% - reopening them when necessary is handled transparently.
+%%
+%% The server also supports obtain and release_on_death. obtain/0
+%% blocks until a file descriptor is available. release_on_death/1
+%% takes a pid and monitors the pid, reducing the count by 1 when the
+%% pid dies. Thus the assumption is that obtain/0 is called first, and
+%% when that returns, release_on_death/1 is called with the pid who
+%% "owns" the file descriptor. This is, for example, used to track the
+%% use of file descriptors through network sockets.
-behaviour(gen_server).
@@ -125,7 +133,7 @@
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([decrement/0, increment/0]).
+-export([release_on_death/1, obtain/0]).
-define(SERVER, ?MODULE).
-define(RESERVED_FOR_OTHERS, 50).
@@ -160,7 +168,8 @@
-record(fhc_state,
{ elders,
limit,
- count
+ count,
+ obtains
}).
%%----------------------------------------------------------------------------
@@ -432,11 +441,11 @@ set_maximum_since_use(MaximumAge) ->
false -> ok
end.
-decrement() ->
- gen_server:cast(?SERVER, decrement).
+release_on_death(Pid) when is_pid(Pid) ->
+ gen_server:cast(?SERVER, {release_on_death, Pid}).
-increment() ->
- gen_server:cast(?SERVER, increment).
+obtain() ->
+ gen_server:call(?SERVER, obtain, infinity).
%%----------------------------------------------------------------------------
%% Internal functions
@@ -663,10 +672,17 @@ init([]) ->
ulimit()
end,
error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]),
- {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0}}.
-
-handle_call(_Msg, _From, State) ->
- {reply, message_not_understood, State}.
+ {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0,
+ obtains = [] }}.
+
+handle_call(obtain, From, State = #fhc_state { count = Count }) ->
+ State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } =
+ maybe_reduce(State #fhc_state { count = Count + 1 }),
+ case Limit /= infinity andalso Count1 >= Limit of
+ true -> {noreply, State1 #fhc_state { obtains = [From | Obtains],
+ count = Count1 - 1 }};
+ false -> {reply, ok, State1}
+ end.
handle_cast({open, Pid, EldestUnusedSince}, State =
#fhc_state { elders = Elders, count = Count }) ->
@@ -687,20 +703,20 @@ handle_cast({close, Pid, EldestUnusedSince}, State =
undefined -> dict:erase(Pid, Elders);
_ -> dict:store(Pid, EldestUnusedSince, Elders)
end,
- {noreply, State #fhc_state { elders = Elders1, count = Count - 1 }};
-
-handle_cast(increment, State = #fhc_state { count = Count }) ->
- {noreply, maybe_reduce(State #fhc_state { count = Count + 1 })};
-
-handle_cast(decrement, State = #fhc_state { count = Count }) ->
- {noreply, State #fhc_state { count = Count - 1 }};
+ {noreply, process_obtains(State #fhc_state { elders = Elders1,
+ count = Count - 1 })};
handle_cast(check_counts, State) ->
- {noreply, maybe_reduce(State)}.
+ {noreply, maybe_reduce(State)};
-handle_info(_Msg, State) ->
+handle_cast({release_on_death, Pid}, State) ->
+ _MRef = erlang:monitor(process, Pid),
{noreply, State}.
+handle_info({'DOWN', _MRef, process, _Pid, _Reason},
+ State = #fhc_state { count = Count }) ->
+ {noreply, process_obtains(State #fhc_state { count = Count - 1 })}.
+
terminate(_Reason, State) ->
State.
@@ -711,6 +727,20 @@ code_change(_OldVsn, State, _Extra) ->
%% server helpers
%%----------------------------------------------------------------------------
+process_obtains(State = #fhc_state { obtains = [] }) ->
+ State;
+process_obtains(State = #fhc_state { limit = Limit, count = Count })
+ when Limit /= infinity andalso Count >= Limit ->
+ State;
+process_obtains(State = #fhc_state { limit = Limit, count = Count,
+ obtains = Obtains }) ->
+ ObtainsLen = length(Obtains),
+ ObtainableLen = lists:min([ObtainsLen, Limit - Count]),
+ Take = ObtainsLen - ObtainableLen,
+ {ObtainsNew, ObtainableRev} = lists:split(Take, Obtains),
+ [gen_server:reply(From, ok) || From <- ObtainableRev],
+ State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }.
+
maybe_reduce(State = #fhc_state { limit = Limit, count = Count,
elders = Elders })
when Limit /= infinity andalso Count >= Limit ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 49e66e328e..adfd412f99 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -213,8 +213,7 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
ProfilingValue = setup_profiling(),
- try
- file_handle_cache:increment(),
+ try
mainloop(Parent, Deb, switch_callback(
#v1{sock = ClientSock,
connection = #connection{
@@ -235,7 +234,6 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
end)("exception on TCP connection ~p from ~s:~p~n~p~n",
[self(), PeerAddressS, PeerPort, Ex])
after
- file_handle_cache:decrement(),
rabbit_log:info("closing TCP connection ~p from ~s:~p~n",
[self(), PeerAddressS, PeerPort]),
%% We don't close the socket explicitly. The reader is the
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index bc7425613f..f38f8191b0 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -48,22 +48,20 @@ start_link(Callback, LSock) ->
%%--------------------------------------------------------------------
init({Callback, LSock}) ->
- case prim_inet:async_accept(LSock, -1) of
- {ok, Ref} -> {ok, #state{callback=Callback, sock=LSock, ref=Ref}};
- Error -> {stop, {cannot_accept, Error}}
- end.
+ gen_server:cast(self(), accept),
+ {ok, #state{callback=Callback, sock=LSock, ref=undefined}}.
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast(_Msg, State) ->
- {noreply, State}.
+handle_cast(accept, State) ->
+ accept(State).
handle_info({inet_async, LSock, Ref, {ok, Sock}},
State = #state{callback={M,F,A}, sock=LSock, ref=Ref}) ->
%% patch up the socket so it looks like one we got from
- %% gen_tcp:accept/1
+ %% gen_tcp:accept/1
{ok, Mod} = inet_db:lookup_socket(LSock),
inet_db:register_socket(Sock, Mod),
@@ -75,7 +73,7 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
[inet_parse:ntoa(Address), Port,
inet_parse:ntoa(PeerAddress), PeerPort]),
%% handle
- apply(M, F, A ++ [Sock])
+ file_handle_cache:release_on_death(apply(M, F, A ++ [Sock]))
catch {inet_error, Reason} ->
gen_tcp:close(Sock),
error_logger:error_msg("unable to accept TCP connection: ~p~n",
@@ -83,10 +81,7 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
end,
%% accept more
- case prim_inet:async_accept(LSock, -1) of
- {ok, NRef} -> {noreply, State#state{ref=NRef}};
- Error -> {stop, {cannot_accept, Error}, none}
- end;
+ accept(State);
handle_info({inet_async, LSock, Ref, {error, closed}},
State=#state{sock=LSock, ref=Ref}) ->
%% It would be wrong to attempt to restart the acceptor when we
@@ -104,3 +99,10 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
+
+accept(State = #state{sock=LSock}) ->
+ ok = file_handle_cache:obtain(),
+ case prim_inet:async_accept(LSock, -1) of
+ {ok, Ref} -> {noreply, State#state{ref=Ref}};
+ Error -> {stop, {cannot_accept, Error}, State}
+ end.