summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-08-17 15:01:41 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-08-17 15:01:41 +0100
commit3fe7c78a7ac920b5febb9e6073713cc11c581618 (patch)
treecde9a94cd0c5e35883094d334878db211eecd5de
parent0c5cbb3d9a0b723b34503812b267e9f95a7a83bd (diff)
parent57b55e89da224bca59a09e8547f8117bf7f78cc4 (diff)
downloadrabbitmq-server-git-3fe7c78a7ac920b5febb9e6073713cc11c581618.tar.gz
merge bug23135 into default
-rw-r--r--src/file_handle_cache.erl41
-rw-r--r--src/tcp_acceptor.erl4
2 files changed, 23 insertions, 22 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 19b2654f59..fe4bdc0392 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -130,7 +130,7 @@
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
--export([release_on_death/1, obtain/0]).
+-export([obtain/1]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -222,8 +222,7 @@
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(delete/1 :: (ref()) -> ok_or_error()).
-spec(clear/1 :: (ref()) -> ok_or_error()).
--spec(release_on_death/1 :: (pid()) -> 'ok').
--spec(obtain/0 :: () -> 'ok').
+-spec(obtain/1 :: (pid()) -> 'ok').
-endif.
@@ -445,11 +444,8 @@ set_maximum_since_use(MaximumAge) ->
true -> ok
end.
-release_on_death(Pid) when is_pid(Pid) ->
- gen_server:cast(?SERVER, {release_on_death, Pid}).
-
-obtain() ->
- gen_server:call(?SERVER, {obtain, self()}, infinity).
+obtain(Pid) ->
+ gen_server:call(?SERVER, {obtain, Pid}, infinity).
%%----------------------------------------------------------------------------
%% Internal functions
@@ -750,17 +746,20 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
obtain_pending = Pending,
elders = Elders })
when Limit =/= infinity andalso Count >= Limit ->
- {noreply, State #fhc_state { obtain_pending = [From | Pending],
- elders = dict:erase(Pid, Elders) }};
+ {noreply,
+ State #fhc_state { obtain_pending = [{obtain, Pid, From} | Pending],
+ elders = dict:erase(Pid, Elders) }};
handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
obtain_pending = Pending,
elders = Elders }) ->
case maybe_reduce(State #fhc_state { obtain_count = Count + 1 }) of
{true, State1} ->
- {noreply, State1 #fhc_state { obtain_count = Count,
- obtain_pending = [From | Pending],
- elders = dict:erase(Pid, Elders) }};
+ {noreply, State1 #fhc_state {
+ obtain_count = Count,
+ obtain_pending = [{obtain, Pid, From} | Pending],
+ elders = dict:erase(Pid, Elders) }};
{false, State1} ->
+ _MRef = erlang:monitor(process, Pid),
{reply, ok, State1}
end;
@@ -777,7 +776,7 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
case CanClose of
true -> {reply, close, State2};
false -> {noreply, State2 #fhc_state {
- open_pending = [From | Pending],
+ open_pending = [{open, From} | Pending],
elders = dict:erase(Pid, Elders1) }}
end;
{false, State1} ->
@@ -809,11 +808,7 @@ handle_cast({close, Pid, EldestUnusedSince}, State =
handle_cast(check_counts, State) ->
{_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }),
- {noreply, State1};
-
-handle_cast({release_on_death, Pid}, State) ->
- _MRef = erlang:monitor(process, Pid),
- {noreply, State}.
+ {noreply, State1}.
handle_info({'DOWN', MRef, process, Pid, _Reason}, State =
#fhc_state { obtain_count = Count, callbacks = Callbacks,
@@ -871,9 +866,15 @@ process_pending(Pending, Quota) ->
SatisfiableLen = lists:min([PendingLen, Quota]),
Take = PendingLen - SatisfiableLen,
{PendingNew, SatisfiableRev} = lists:split(Take, Pending),
- [gen_server:reply(From, ok) || From <- SatisfiableRev],
+ [run_pending_item(Item) || Item <- SatisfiableRev],
{PendingNew, SatisfiableLen}.
+run_pending_item({open, From}) ->
+ gen_server:reply(From, ok);
+run_pending_item({obtain, Pid, From}) ->
+ _MRef = erlang:monitor(process, Pid),
+ gen_server:reply(From, ok).
+
maybe_reduce(State = #fhc_state { limit = Limit,
open_count = OpenCount,
open_pending = OpenPending,
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index cc4982c9cb..11ce6fc532 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -55,6 +55,7 @@ handle_call(_Request, _From, State) ->
{noreply, State}.
handle_cast(accept, State) ->
+ ok = file_handle_cache:obtain(self()),
accept(State);
handle_cast(_Msg, State) ->
@@ -83,7 +84,7 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
%% is drained.
gen_event:which_handlers(error_logger),
%% handle
- file_handle_cache:release_on_death(apply(M, F, A ++ [Sock]))
+ file_handle_cache:obtain(apply(M, F, A ++ [Sock]))
catch {inet_error, Reason} ->
gen_tcp:close(Sock),
error_logger:error_msg("unable to accept TCP connection: ~p~n",
@@ -111,7 +112,6 @@ code_change(_OldVsn, State, _Extra) ->
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
accept(State = #state{sock=LSock}) ->
- ok = file_handle_cache:obtain(),
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} -> {noreply, State#state{ref=Ref}};
Error -> {stop, {cannot_accept, Error}, State}