diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-01-21 11:41:09 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-01-21 11:41:09 +0000 |
| commit | e1ca83601402e403106c0db14b20b98b16bccff7 (patch) | |
| tree | 43b0c932e5476c26d662d3745b627790a864271e /src | |
| parent | 873e3dfccca03eefcc781c388c2cc8ee6f951947 (diff) | |
| parent | 7f992290a62cb70c8d7b1e5b3503303f4f059b7c (diff) | |
| download | rabbitmq-server-git-e1ca83601402e403106c0db14b20b98b16bccff7.tar.gz | |
merge bug22218 into bug21673
Diffstat (limited to 'src')
| -rw-r--r-- | src/file_handle_cache.erl | 68 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 4 | ||||
| -rw-r--r-- | src/tcp_acceptor.erl | 26 |
3 files changed, 64 insertions, 34 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index aa01a508ec..45c0eff3db 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -115,6 +115,14 @@ %% fully reopened again as soon as needed, thus users of this library %% do not need to worry about their handles being closed by the server %% - reopening them when necessary is handled transparently. +%% +%% The server also supports obtain and release_on_death. obtain/0 +%% blocks until a file descriptor is available. release_on_death/1 +%% takes a pid and monitors the pid, reducing the count by 1 when the +%% pid dies. Thus the assumption is that obtain/0 is called first, and +%% when that returns, release_on_death/1 is called with the pid who +%% "owns" the file descriptor. This is, for example, used to track the +%% use of file descriptors through network sockets. -behaviour(gen_server). @@ -125,7 +133,7 @@ -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([decrement/0, increment/0]). +-export([release_on_death/1, obtain/0]). -define(SERVER, ?MODULE). -define(RESERVED_FOR_OTHERS, 50). @@ -160,7 +168,8 @@ -record(fhc_state, { elders, limit, - count + count, + obtains }). %%---------------------------------------------------------------------------- @@ -432,11 +441,11 @@ set_maximum_since_use(MaximumAge) -> false -> ok end. -decrement() -> - gen_server:cast(?SERVER, decrement). +release_on_death(Pid) when is_pid(Pid) -> + gen_server:cast(?SERVER, {release_on_death, Pid}). -increment() -> - gen_server:cast(?SERVER, increment). +obtain() -> + gen_server:call(?SERVER, obtain, infinity). %%---------------------------------------------------------------------------- %% Internal functions @@ -663,10 +672,17 @@ init([]) -> ulimit() end, error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]), - {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0}}. - -handle_call(_Msg, _From, State) -> - {reply, message_not_understood, State}. + {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0, + obtains = [] }}. + +handle_call(obtain, From, State = #fhc_state { count = Count }) -> + State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } = + maybe_reduce(State #fhc_state { count = Count + 1 }), + case Limit /= infinity andalso Count1 >= Limit of + true -> {noreply, State1 #fhc_state { obtains = [From | Obtains], + count = Count1 - 1 }}; + false -> {reply, ok, State1} + end. handle_cast({open, Pid, EldestUnusedSince}, State = #fhc_state { elders = Elders, count = Count }) -> @@ -687,20 +703,20 @@ handle_cast({close, Pid, EldestUnusedSince}, State = undefined -> dict:erase(Pid, Elders); _ -> dict:store(Pid, EldestUnusedSince, Elders) end, - {noreply, State #fhc_state { elders = Elders1, count = Count - 1 }}; - -handle_cast(increment, State = #fhc_state { count = Count }) -> - {noreply, maybe_reduce(State #fhc_state { count = Count + 1 })}; - -handle_cast(decrement, State = #fhc_state { count = Count }) -> - {noreply, State #fhc_state { count = Count - 1 }}; + {noreply, process_obtains(State #fhc_state { elders = Elders1, + count = Count - 1 })}; handle_cast(check_counts, State) -> - {noreply, maybe_reduce(State)}. + {noreply, maybe_reduce(State)}; -handle_info(_Msg, State) -> +handle_cast({release_on_death, Pid}, State) -> + _MRef = erlang:monitor(process, Pid), {noreply, State}. +handle_info({'DOWN', _MRef, process, _Pid, _Reason}, + State = #fhc_state { count = Count }) -> + {noreply, process_obtains(State #fhc_state { count = Count - 1 })}. + terminate(_Reason, State) -> State. @@ -711,6 +727,20 @@ code_change(_OldVsn, State, _Extra) -> %% server helpers %%---------------------------------------------------------------------------- +process_obtains(State = #fhc_state { obtains = [] }) -> + State; +process_obtains(State = #fhc_state { limit = Limit, count = Count }) + when Limit /= infinity andalso Count >= Limit -> + State; +process_obtains(State = #fhc_state { limit = Limit, count = Count, + obtains = Obtains }) -> + ObtainsLen = length(Obtains), + ObtainableLen = lists:min([ObtainsLen, Limit - Count]), + Take = ObtainsLen - ObtainableLen, + {ObtainsNew, ObtainableRev} = lists:split(Take, Obtains), + [gen_server:reply(From, ok) || From <- ObtainableRev], + State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }. + maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders }) when Limit /= infinity andalso Count >= Limit -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 49e66e328e..adfd412f99 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -213,8 +213,7 @@ start_connection(Parent, Deb, Sock, SockTransform) -> erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), ProfilingValue = setup_profiling(), - try - file_handle_cache:increment(), + try mainloop(Parent, Deb, switch_callback( #v1{sock = ClientSock, connection = #connection{ @@ -235,7 +234,6 @@ start_connection(Parent, Deb, Sock, SockTransform) -> end)("exception on TCP connection ~p from ~s:~p~n~p~n", [self(), PeerAddressS, PeerPort, Ex]) after - file_handle_cache:decrement(), rabbit_log:info("closing TCP connection ~p from ~s:~p~n", [self(), PeerAddressS, PeerPort]), %% We don't close the socket explicitly. The reader is the diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index bc7425613f..f38f8191b0 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -48,22 +48,20 @@ start_link(Callback, LSock) -> %%-------------------------------------------------------------------- init({Callback, LSock}) -> - case prim_inet:async_accept(LSock, -1) of - {ok, Ref} -> {ok, #state{callback=Callback, sock=LSock, ref=Ref}}; - Error -> {stop, {cannot_accept, Error}} - end. + gen_server:cast(self(), accept), + {ok, #state{callback=Callback, sock=LSock, ref=undefined}}. handle_call(_Request, _From, State) -> {noreply, State}. -handle_cast(_Msg, State) -> - {noreply, State}. +handle_cast(accept, State) -> + accept(State). handle_info({inet_async, LSock, Ref, {ok, Sock}}, State = #state{callback={M,F,A}, sock=LSock, ref=Ref}) -> %% patch up the socket so it looks like one we got from - %% gen_tcp:accept/1 + %% gen_tcp:accept/1 {ok, Mod} = inet_db:lookup_socket(LSock), inet_db:register_socket(Sock, Mod), @@ -75,7 +73,7 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, [inet_parse:ntoa(Address), Port, inet_parse:ntoa(PeerAddress), PeerPort]), %% handle - apply(M, F, A ++ [Sock]) + file_handle_cache:release_on_death(apply(M, F, A ++ [Sock])) catch {inet_error, Reason} -> gen_tcp:close(Sock), error_logger:error_msg("unable to accept TCP connection: ~p~n", @@ -83,10 +81,7 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, end, %% accept more - case prim_inet:async_accept(LSock, -1) of - {ok, NRef} -> {noreply, State#state{ref=NRef}}; - Error -> {stop, {cannot_accept, Error}, none} - end; + accept(State); handle_info({inet_async, LSock, Ref, {error, closed}}, State=#state{sock=LSock, ref=Ref}) -> %% It would be wrong to attempt to restart the acceptor when we @@ -104,3 +99,10 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). + +accept(State = #state{sock=LSock}) -> + ok = file_handle_cache:obtain(), + case prim_inet:async_accept(LSock, -1) of + {ok, Ref} -> {noreply, State#state{ref=Ref}}; + Error -> {stop, {cannot_accept, Error}, State} + end. |
