diff options
| -rw-r--r-- | Makefile | 4 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server | 6 | ||||
| -rw-r--r-- | scripts/rabbitmq-server.bat | 13 | ||||
| -rw-r--r-- | scripts/rabbitmq-service.bat | 13 | ||||
| -rw-r--r-- | src/file_handle_cache.erl | 186 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_heartbeat.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 7 | ||||
| -rw-r--r-- | src/tcp_acceptor.erl | 7 |
10 files changed, 126 insertions, 144 deletions
@@ -5,7 +5,6 @@ RABBITMQ_NODENAME ?= rabbit RABBITMQ_SERVER_START_ARGS ?= RABBITMQ_MNESIA_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-mnesia RABBITMQ_LOG_BASE ?= $(TMPDIR) -RABBITMQ_PLUGINS_EXPAND_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-plugins-scratch DEPS_FILE=deps.mk SOURCE_DIR=src @@ -147,8 +146,7 @@ BASIC_SCRIPT_ENVIRONMENT_SETTINGS=\ RABBITMQ_NODE_IP_ADDRESS="$(RABBITMQ_NODE_IP_ADDRESS)" \ RABBITMQ_NODE_PORT="$(RABBITMQ_NODE_PORT)" \ RABBITMQ_LOG_BASE="$(RABBITMQ_LOG_BASE)" \ - RABBITMQ_MNESIA_DIR="$(RABBITMQ_MNESIA_DIR)" \ - RABBITMQ_PLUGINS_EXPAND_DIR="$(RABBITMQ_PLUGINS_EXPAND_DIR)" + RABBITMQ_MNESIA_DIR="$(RABBITMQ_MNESIA_DIR)" run: all $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index d52dc774b6..9310752f6f 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -39,7 +39,6 @@ CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config CONFIG_FILE=/etc/rabbitmq/rabbitmq LOG_BASE=/var/log/rabbitmq MNESIA_BASE=/var/lib/rabbitmq/mnesia -PLUGINS_EXPAND_DIR=/var/lib/rabbitmq/plugins-scratch SERVER_START_ARGS= . `dirname $0`/rabbitmq-env @@ -70,7 +69,6 @@ fi [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME} [ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins" -[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR="${PLUGINS_EXPAND_DIR}" ## Log rotation [ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS} @@ -91,14 +89,14 @@ if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then if erl \ -pa "$RABBITMQ_EBIN_ROOT" \ -rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \ - -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \ + -rabbit plugins_expand_dir "\"${RABBITMQ_MNESIA_DIR}/plugins-scratch\"" \ -rabbit rabbit_ebin "\"$RABBITMQ_EBIN_ROOT\"" \ -noinput \ -hidden \ -s rabbit_plugin_activator \ -extra "$@" then - RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit" + RABBITMQ_BOOT_FILE="${RABBITMQ_MNESIA_DIR}/plugins-scratch/rabbit" RABBITMQ_EBIN_PATH="" else exit 1 diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index b1a91f47cd..5bcbc6babd 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -110,24 +110,21 @@ if "!RABBITMQ_MNESIA_DIR!"=="" ( set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
- set RABBITMQ_PLUGINS_EXPAND_DIR=!RABBITMQ_BASE!\plugins-scratch
-)
-
"!ERLANG_HOME!\bin\erl.exe" ^
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_plugin_activator ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
--rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_MNESIA_DIR:\=/!/plugins-scratch"\" ^
-rabbit rabbit_ebin \""!RABBITMQ_EBIN_ROOT:\=/!"\" ^
-extra !STAR!
-if not exist "!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit.boot" (
- echo Custom Boot File "!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit.boot" is missing.
+set RABBITMQ_BOOT_FILE=!RABBITMQ_MNESIA_DIR!\plugins-scratch\rabbit
+if not exist "!RABBITMQ_BOOT_FILE!.boot" (
+ echo Custom Boot File "!RABBITMQ_BOOT_FILE!.boot" is missing.
exit /B 1
)
-set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
+
set RABBITMQ_EBIN_PATH=
if "!RABBITMQ_CONFIG_FILE!"=="" (
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 95e5eebf86..4b3961d43b 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -180,24 +180,21 @@ if errorlevel 1 ( set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
- set RABBITMQ_PLUGINS_EXPAND_DIR=!RABBITMQ_BASE!\plugins-scratch
-)
-
"!ERLANG_HOME!\bin\erl.exe" ^
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_plugin_activator ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
--rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_MNESIA_DIR:\=/!/plugins-scratch"\" ^
-rabbit rabbit_ebin \""!RABBITMQ_EBIN_ROOT:\=/!"\" ^
-extra !STAR!
-if not exist "!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit.boot" (
- echo Custom Boot File "!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit.boot" is missing.
+set RABBITMQ_BOOT_FILE=!RABBITMQ_MNESIA_DIR!\plugins-scratch\rabbit
+if not exist "!RABBITMQ_BOOT_FILE!.boot" (
+ echo Custom Boot File "!RABBITMQ_BOOT_FILE!.boot" is missing.
exit /B 1
)
-set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
+
set RABBITMQ_EBIN_PATH=
if "!RABBITMQ_CONFIG_FILE!"=="" (
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 08e71f55ba..6c6ed1729c 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -116,13 +116,13 @@ %% do not need to worry about their handles being closed by the server %% - reopening them when necessary is handled transparently. %% -%% The server also supports obtain and release_on_death. obtain/0 -%% blocks until a file descriptor is available. release_on_death/1 -%% takes a pid and monitors the pid, reducing the count by 1 when the -%% pid dies. Thus the assumption is that obtain/0 is called first, and -%% when that returns, release_on_death/1 is called with the pid who -%% "owns" the file descriptor. This is, for example, used to track the -%% use of file descriptors through network sockets. +%% The server also supports obtain and transfer. obtain/0 blocks until +%% a file descriptor is available. transfer/1 is transfers ownership +%% of a file descriptor between processes. It is non-blocking. +%% +%% The callers of register_callback/3, obtain/0, and the argument of +%% transfer/1 are monitored, reducing the count of handles in use +%% appropriately when the processes terminate. -behaviour(gen_server). @@ -130,7 +130,7 @@ -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([obtain/1]). +-export([obtain/0, transfer/1]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -223,7 +223,8 @@ -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). -spec(delete/1 :: (ref()) -> ok_or_error()). -spec(clear/1 :: (ref()) -> ok_or_error()). --spec(obtain/1 :: (pid()) -> 'ok'). +-spec(obtain/0 :: () -> 'ok'). +-spec(transfer/1 :: (pid()) -> 'ok'). -endif. @@ -445,8 +446,11 @@ set_maximum_since_use(MaximumAge) -> true -> ok end. -obtain(Pid) -> - gen_server:call(?SERVER, {obtain, self(), Pid}, infinity). +obtain() -> + gen_server:call(?SERVER, {obtain, self()}, infinity). + +transfer(Pid) -> + gen_server:cast(?SERVER, {transfer, self(), Pid}). %%---------------------------------------------------------------------------- %% Internal functions @@ -743,56 +747,50 @@ init([]) -> blocked = sets:new(), timer_ref = undefined }}. -handle_call({obtain, FromPid, ForPid}, From, - State = #fhc_state { obtain_limit = Limit, - obtain_count = Count, - obtain_pending = Pending, - blocked = Blocked }) - when Limit =/= infinity andalso Count >= Limit -> - MRef = erlang:monitor(process, FromPid), - Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending], - {noreply, ensure_mref(ForPid, - State #fhc_state { - blocked = sets:add_element(FromPid, Blocked), - obtain_pending = Pending1 })}; -handle_call({obtain, FromPid, ForPid}, From, - State = #fhc_state { obtain_count = Count, - obtain_pending = Pending, - blocked = Blocked }) -> - MRef = erlang:monitor(process, FromPid), - case maybe_reduce(ensure_mref(ForPid, State #fhc_state { - obtain_count = Count + 1 })) of - {true, State1} -> - Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending], - {noreply, State1 #fhc_state { - blocked = sets:add_element(FromPid, Blocked), - obtain_count = Count, - obtain_pending = Pending1 }}; - {false, State1} -> - {noreply, - run_pending_item({obtain, FromPid, MRef, From, ForPid}, State1)} - end; - handle_call({open, Pid, EldestUnusedSince, CanClose}, From, State = #fhc_state { open_count = Count, open_pending = Pending, elders = Elders, blocked = Blocked }) -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), - case maybe_reduce( - ensure_mref(Pid, State #fhc_state { open_count = Count + 1, - elders = Elders1 })) of + Item = {open, Pid, From}, + case maybe_reduce(ensure_mref(Pid, State #fhc_state { + open_count = Count + 1, + elders = Elders1 })) of {true, State1} -> State2 = State1 #fhc_state { open_count = Count }, case CanClose of true -> {reply, close, State2}; - false -> {noreply, - State2 #fhc_state { - blocked = sets:add_element(Pid, Blocked), - open_pending = [{open, Pid, From} | Pending] }} + false -> {noreply, State2 #fhc_state { + open_pending = [Item | Pending], + blocked = sets:add_element(Pid, Blocked) }} end; {false, State1} -> - {noreply, run_pending_item({open, Pid, From}, State1)} + {noreply, run_pending_item(Item, State1)} + end; + +handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, + obtain_count = Count, + obtain_pending = Pending, + blocked = Blocked }) + when Limit =/= infinity andalso Count >= Limit -> + Item = {obtain, Pid, From}, + {noreply, ensure_mref(Pid, State #fhc_state { + obtain_pending = [Item | Pending], + blocked = sets:add_element(Pid, Blocked) })}; +handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, + obtain_pending = Pending, + blocked = Blocked }) -> + Item = {obtain, Pid, From}, + case maybe_reduce(ensure_mref(Pid, State #fhc_state { + obtain_count = Count + 1 })) of + {true, State1} -> + {noreply, State1 #fhc_state { + obtain_count = Count, + obtain_pending = [Item | Pending], + blocked = sets:add_element(Pid, Blocked) }}; + {false, State1} -> + {noreply, run_pending_item(Item, State1)} end. handle_cast({register_callback, Pid, MFA}, @@ -808,59 +806,51 @@ handle_cast({update, Pid, EldestUnusedSince}, State = %% storm of messages {noreply, State #fhc_state { elders = Elders1 }}; -handle_cast({close, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders, counts = Counts, - open_count = Count }) -> +handle_cast({close, Pid, EldestUnusedSince}, + State = #fhc_state { open_count = Count, + counts = Counts, + elders = Elders }) -> Elders1 = case EldestUnusedSince of undefined -> dict:erase(Pid, Elders); _ -> dict:store(Pid, EldestUnusedSince, Elders) end, - {Obtained, Opened} = dict:fetch(Pid, Counts), - {noreply, - process_pending(State #fhc_state { - open_count = Count - 1, - counts = dict:store(Pid, {Obtained, Opened - 1}, Counts), - elders = Elders1 })}; + Counts1 = update_counts(open, Pid, -1, Counts), + {noreply, process_pending(State #fhc_state { open_count = Count - 1, + counts = Counts1, + elders = Elders1 })}; + +handle_cast({transfer, FromPid, ToPid}, State) -> + State1 = #fhc_state { counts = Counts } = ensure_mref(ToPid, State), + Counts1 = update_counts(obtain, FromPid, -1, Counts), + Counts2 = update_counts(obtain, ToPid, +1, Counts1), + {noreply, process_pending(State1 #fhc_state { counts = Counts2 })}; handle_cast(check_counts, State) -> {_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }), {noreply, State1}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = - #fhc_state { obtain_count = ObtainCount, - obtain_pending = ObtainPending, - open_count = OpenCount, + #fhc_state { open_count = OpenCount, open_pending = OpenPending, + obtain_count = ObtainCount, + obtain_pending = ObtainPending, callbacks = Callbacks, counts = Counts, elders = Elders, blocked = Blocked }) -> - ObtainPending1 = - lists:filter( - fun ({obtain, FromPid, FromMRef, From, ForPid}) -> - case Pid =:= ForPid of - true -> gen_server:reply(From, ok), - true = erlang:demonitor(FromMRef, [flush]), - false; - false -> Pid =/= FromPid - end - end, ObtainPending), - OpenPending1 = lists:filter(fun ({open, Pid1, _From}) -> - Pid =/= Pid1 - end, OpenPending), - {Obtained, Opened} = case dict:find(Pid, Counts) of - {ok, Val} -> Val; - error -> {0, 0} - end, + FilterFun = fun ({_Kind, Pid1, _From}) -> Pid1 =/= Pid end, + OpenPending1 = lists:filter(FilterFun, OpenPending), + ObtainPending1 = lists:filter(FilterFun, ObtainPending), + {Opened, Obtained} = dict:fetch(Pid, Counts), {noreply, process_pending( State #fhc_state { - elders = dict:erase(Pid, Elders), - counts = dict:erase(Pid, Counts), - callbacks = dict:erase(Pid, Callbacks), - obtain_count = ObtainCount - Obtained, - obtain_pending = ObtainPending1, open_count = OpenCount - Opened, open_pending = OpenPending1, + obtain_count = ObtainCount - Obtained, + obtain_pending = ObtainPending1, + callbacks = dict:erase(Pid, Callbacks), + counts = dict:erase(Pid, Counts), + elders = dict:erase(Pid, Elders), blocked = sets:del_element(Pid, Blocked) })}. terminate(_Reason, State) -> @@ -907,24 +897,21 @@ process_pending(Pending, Quota, State) -> SatisfiableLen = lists:min([PendingLen, Quota]), Take = PendingLen - SatisfiableLen, {PendingNew, SatisfiableRev} = lists:split(Take, Pending), - {PendingNew, SatisfiableLen, - lists:foldl(fun run_pending_item/2, State, SatisfiableRev)}. + State1 = lists:foldl(fun run_pending_item/2, State, SatisfiableRev), + {PendingNew, SatisfiableLen, State1}. -run_pending_item({open, Pid, From}, State = #fhc_state { counts = Counts, +run_pending_item({Kind, Pid, From}, State = #fhc_state { counts = Counts, blocked = Blocked }) -> gen_server:reply(From, ok), - {Obtained, Opened} = dict:fetch(Pid, Counts), - State #fhc_state { - counts = dict:store(Pid, {Obtained, Opened + 1}, Counts), - blocked = sets:del_element(Pid, Blocked) }; -run_pending_item({obtain, FromPid, FromMRef, From, ForPid}, - State = #fhc_state { counts = Counts, blocked = Blocked }) -> - gen_server:reply(From, ok), - true = erlang:demonitor(FromMRef, [flush]), - {Obtained, Opened} = dict:fetch(ForPid, Counts), - State #fhc_state { - counts = dict:store(ForPid, {Obtained + 1, Opened}, Counts), - blocked = sets:del_element(FromPid, Blocked) }. + State #fhc_state { counts = update_counts(Kind, Pid, +1, Counts), + blocked = sets:del_element(Pid, Blocked) }. + +update_counts(open, Pid, Delta, Counts) -> + dict:update(Pid, fun ({Opened, Obtained}) -> {Opened + Delta, Obtained} end, + Counts); +update_counts(obtain, Pid, Delta, Counts) -> + dict:update(Pid, fun ({Opened, Obtained}) -> {Opened, Obtained + Delta} end, + Counts). maybe_reduce(State = #fhc_state { limit = Limit, open_count = OpenCount, @@ -1010,5 +997,6 @@ ensure_mref(Pid, State = #fhc_state { counts = Counts }) -> case dict:find(Pid, Counts) of {ok, _} -> State; error -> _MRef = erlang:monitor(process, Pid), - State #fhc_state { counts = dict:store(Pid, {0, 0}, Counts) } + State #fhc_state { + counts = dict:store(Pid, {0, 0}, Counts) } end. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2453280e34..0cdb4fff08 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -197,7 +197,8 @@ find_durable_queues() -> recover_durable_queues(DurableQueues) -> Qs = [start_queue_process(Q) || Q <- DurableQueues], - [Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q]. + [Q || Q <- Qs, + gen_server2:call(Q#amqqueue.pid, {init, true}, infinity) == Q]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d52660c5ac..2cab7136a6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -603,6 +603,7 @@ handle_call({init, Recover}, From, declare(Recover, From, State); _ -> #q{q = #amqqueue{name = QName, durable = IsDurable}, backing_queue = BQ, backing_queue_state = undefined} = State, + gen_server2:reply(From, not_found), case Recover of true -> ok; _ -> rabbit_log:warning( @@ -610,7 +611,7 @@ handle_call({init, Recover}, From, end, BQS = BQ:init(QName, IsDurable, Recover), %% Rely on terminate to delete the queue. - {stop, normal, not_found, State#q{backing_queue_state = BQS}} + {stop, normal, State#q{backing_queue_state = BQS}} end; handle_call(info, _From, State) -> diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 7c10d48a9f..ab50c28ca5 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -55,23 +55,19 @@ start_heartbeat(Sock, TimeoutSec) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. - Sender = - spawn_link(fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2, - send_oct, 0, - fun () -> - catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), - continue - end}, Parent) end), + Sender = heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0, + fun () -> + catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), + continue + end}, Parent), %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. - Receiver = - spawn_link(fun () -> heartbeater({Sock, TimeoutSec * 1000, - recv_oct, 1, - fun () -> - Parent ! timeout, - stop - end}, Parent) end), + Receiver = heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, + fun () -> + Parent ! timeout, + stop + end}, Parent), {Sender, Receiver}. pause_monitor(none) -> @@ -89,7 +85,9 @@ resume_monitor({_Sender, Receiver}) -> %%---------------------------------------------------------------------------- heartbeater(Params, Parent) -> - heartbeater(Params, erlang:monitor(process, Parent), {0, 0}). + spawn_link(fun () -> heartbeater(Params, erlang:monitor(process, Parent), + {0, 0}) + end). heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, MonitorRef, {StatVal, SameCount}) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index d5ade90f6f..a133bf450d 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -761,9 +761,10 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), - rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), - State1 = State#v1{connection_state = running, - connection = NewConnection}, + State1 = internal_conserve_memory( + rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + State#v1{connection_state = running, + connection = NewConnection}), rabbit_event:notify( connection_created, [{Item, i(Item, State1)} || Item <- ?CREATION_EVENT_KEYS]), diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 11ce6fc532..c9809ace61 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -55,7 +55,7 @@ handle_call(_Request, _From, State) -> {noreply, State}. handle_cast(accept, State) -> - ok = file_handle_cache:obtain(self()), + ok = file_handle_cache:obtain(), accept(State); handle_cast(_Msg, State) -> @@ -84,7 +84,8 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, %% is drained. gen_event:which_handlers(error_logger), %% handle - file_handle_cache:obtain(apply(M, F, A ++ [Sock])) + file_handle_cache:transfer(apply(M, F, A ++ [Sock])), + ok = file_handle_cache:obtain() catch {inet_error, Reason} -> gen_tcp:close(Sock), error_logger:error_msg("unable to accept TCP connection: ~p~n", @@ -93,11 +94,13 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, %% accept more accept(State); + handle_info({inet_async, LSock, Ref, {error, closed}}, State=#state{sock=LSock, ref=Ref}) -> %% It would be wrong to attempt to restart the acceptor when we %% know this will fail. {stop, normal, State}; + handle_info(_Info, State) -> {noreply, State}. |
