summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile4
-rwxr-xr-xscripts/rabbitmq-server6
-rw-r--r--scripts/rabbitmq-server.bat13
-rw-r--r--scripts/rabbitmq-service.bat13
-rw-r--r--src/file_handle_cache.erl186
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl3
-rw-r--r--src/rabbit_heartbeat.erl28
-rw-r--r--src/rabbit_reader.erl7
-rw-r--r--src/tcp_acceptor.erl7
10 files changed, 126 insertions, 144 deletions
diff --git a/Makefile b/Makefile
index e060c804b4..b6f4b7e5a9 100644
--- a/Makefile
+++ b/Makefile
@@ -5,7 +5,6 @@ RABBITMQ_NODENAME ?= rabbit
RABBITMQ_SERVER_START_ARGS ?=
RABBITMQ_MNESIA_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-mnesia
RABBITMQ_LOG_BASE ?= $(TMPDIR)
-RABBITMQ_PLUGINS_EXPAND_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-plugins-scratch
DEPS_FILE=deps.mk
SOURCE_DIR=src
@@ -147,8 +146,7 @@ BASIC_SCRIPT_ENVIRONMENT_SETTINGS=\
RABBITMQ_NODE_IP_ADDRESS="$(RABBITMQ_NODE_IP_ADDRESS)" \
RABBITMQ_NODE_PORT="$(RABBITMQ_NODE_PORT)" \
RABBITMQ_LOG_BASE="$(RABBITMQ_LOG_BASE)" \
- RABBITMQ_MNESIA_DIR="$(RABBITMQ_MNESIA_DIR)" \
- RABBITMQ_PLUGINS_EXPAND_DIR="$(RABBITMQ_PLUGINS_EXPAND_DIR)"
+ RABBITMQ_MNESIA_DIR="$(RABBITMQ_MNESIA_DIR)"
run: all
$(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index d52dc774b6..9310752f6f 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -39,7 +39,6 @@ CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config
CONFIG_FILE=/etc/rabbitmq/rabbitmq
LOG_BASE=/var/log/rabbitmq
MNESIA_BASE=/var/lib/rabbitmq/mnesia
-PLUGINS_EXPAND_DIR=/var/lib/rabbitmq/plugins-scratch
SERVER_START_ARGS=
. `dirname $0`/rabbitmq-env
@@ -70,7 +69,6 @@ fi
[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}
[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins"
-[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR="${PLUGINS_EXPAND_DIR}"
## Log rotation
[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS}
@@ -91,14 +89,14 @@ if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then
if erl \
-pa "$RABBITMQ_EBIN_ROOT" \
-rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \
- -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \
+ -rabbit plugins_expand_dir "\"${RABBITMQ_MNESIA_DIR}/plugins-scratch\"" \
-rabbit rabbit_ebin "\"$RABBITMQ_EBIN_ROOT\"" \
-noinput \
-hidden \
-s rabbit_plugin_activator \
-extra "$@"
then
- RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit"
+ RABBITMQ_BOOT_FILE="${RABBITMQ_MNESIA_DIR}/plugins-scratch/rabbit"
RABBITMQ_EBIN_PATH=""
else
exit 1
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index b1a91f47cd..5bcbc6babd 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -110,24 +110,21 @@ if "!RABBITMQ_MNESIA_DIR!"=="" (
set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
- set RABBITMQ_PLUGINS_EXPAND_DIR=!RABBITMQ_BASE!\plugins-scratch
-)
-
"!ERLANG_HOME!\bin\erl.exe" ^
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_plugin_activator ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
--rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_MNESIA_DIR:\=/!/plugins-scratch"\" ^
-rabbit rabbit_ebin \""!RABBITMQ_EBIN_ROOT:\=/!"\" ^
-extra !STAR!
-if not exist "!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit.boot" (
- echo Custom Boot File "!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit.boot" is missing.
+set RABBITMQ_BOOT_FILE=!RABBITMQ_MNESIA_DIR!\plugins-scratch\rabbit
+if not exist "!RABBITMQ_BOOT_FILE!.boot" (
+ echo Custom Boot File "!RABBITMQ_BOOT_FILE!.boot" is missing.
exit /B 1
)
-set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
+
set RABBITMQ_EBIN_PATH=
if "!RABBITMQ_CONFIG_FILE!"=="" (
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 95e5eebf86..4b3961d43b 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -180,24 +180,21 @@ if errorlevel 1 (
set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
- set RABBITMQ_PLUGINS_EXPAND_DIR=!RABBITMQ_BASE!\plugins-scratch
-)
-
"!ERLANG_HOME!\bin\erl.exe" ^
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_plugin_activator ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
--rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_MNESIA_DIR:\=/!/plugins-scratch"\" ^
-rabbit rabbit_ebin \""!RABBITMQ_EBIN_ROOT:\=/!"\" ^
-extra !STAR!
-if not exist "!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit.boot" (
- echo Custom Boot File "!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit.boot" is missing.
+set RABBITMQ_BOOT_FILE=!RABBITMQ_MNESIA_DIR!\plugins-scratch\rabbit
+if not exist "!RABBITMQ_BOOT_FILE!.boot" (
+ echo Custom Boot File "!RABBITMQ_BOOT_FILE!.boot" is missing.
exit /B 1
)
-set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
+
set RABBITMQ_EBIN_PATH=
if "!RABBITMQ_CONFIG_FILE!"=="" (
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 08e71f55ba..6c6ed1729c 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -116,13 +116,13 @@
%% do not need to worry about their handles being closed by the server
%% - reopening them when necessary is handled transparently.
%%
-%% The server also supports obtain and release_on_death. obtain/0
-%% blocks until a file descriptor is available. release_on_death/1
-%% takes a pid and monitors the pid, reducing the count by 1 when the
-%% pid dies. Thus the assumption is that obtain/0 is called first, and
-%% when that returns, release_on_death/1 is called with the pid who
-%% "owns" the file descriptor. This is, for example, used to track the
-%% use of file descriptors through network sockets.
+%% The server also supports obtain and transfer. obtain/0 blocks until
+%% a file descriptor is available. transfer/1 is transfers ownership
+%% of a file descriptor between processes. It is non-blocking.
+%%
+%% The callers of register_callback/3, obtain/0, and the argument of
+%% transfer/1 are monitored, reducing the count of handles in use
+%% appropriately when the processes terminate.
-behaviour(gen_server).
@@ -130,7 +130,7 @@
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
--export([obtain/1]).
+-export([obtain/0, transfer/1]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -223,7 +223,8 @@
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(delete/1 :: (ref()) -> ok_or_error()).
-spec(clear/1 :: (ref()) -> ok_or_error()).
--spec(obtain/1 :: (pid()) -> 'ok').
+-spec(obtain/0 :: () -> 'ok').
+-spec(transfer/1 :: (pid()) -> 'ok').
-endif.
@@ -445,8 +446,11 @@ set_maximum_since_use(MaximumAge) ->
true -> ok
end.
-obtain(Pid) ->
- gen_server:call(?SERVER, {obtain, self(), Pid}, infinity).
+obtain() ->
+ gen_server:call(?SERVER, {obtain, self()}, infinity).
+
+transfer(Pid) ->
+ gen_server:cast(?SERVER, {transfer, self(), Pid}).
%%----------------------------------------------------------------------------
%% Internal functions
@@ -743,56 +747,50 @@ init([]) ->
blocked = sets:new(),
timer_ref = undefined }}.
-handle_call({obtain, FromPid, ForPid}, From,
- State = #fhc_state { obtain_limit = Limit,
- obtain_count = Count,
- obtain_pending = Pending,
- blocked = Blocked })
- when Limit =/= infinity andalso Count >= Limit ->
- MRef = erlang:monitor(process, FromPid),
- Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending],
- {noreply, ensure_mref(ForPid,
- State #fhc_state {
- blocked = sets:add_element(FromPid, Blocked),
- obtain_pending = Pending1 })};
-handle_call({obtain, FromPid, ForPid}, From,
- State = #fhc_state { obtain_count = Count,
- obtain_pending = Pending,
- blocked = Blocked }) ->
- MRef = erlang:monitor(process, FromPid),
- case maybe_reduce(ensure_mref(ForPid, State #fhc_state {
- obtain_count = Count + 1 })) of
- {true, State1} ->
- Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending],
- {noreply, State1 #fhc_state {
- blocked = sets:add_element(FromPid, Blocked),
- obtain_count = Count,
- obtain_pending = Pending1 }};
- {false, State1} ->
- {noreply,
- run_pending_item({obtain, FromPid, MRef, From, ForPid}, State1)}
- end;
-
handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
State = #fhc_state { open_count = Count,
open_pending = Pending,
elders = Elders,
blocked = Blocked }) ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
- case maybe_reduce(
- ensure_mref(Pid, State #fhc_state { open_count = Count + 1,
- elders = Elders1 })) of
+ Item = {open, Pid, From},
+ case maybe_reduce(ensure_mref(Pid, State #fhc_state {
+ open_count = Count + 1,
+ elders = Elders1 })) of
{true, State1} ->
State2 = State1 #fhc_state { open_count = Count },
case CanClose of
true -> {reply, close, State2};
- false -> {noreply,
- State2 #fhc_state {
- blocked = sets:add_element(Pid, Blocked),
- open_pending = [{open, Pid, From} | Pending] }}
+ false -> {noreply, State2 #fhc_state {
+ open_pending = [Item | Pending],
+ blocked = sets:add_element(Pid, Blocked) }}
end;
{false, State1} ->
- {noreply, run_pending_item({open, Pid, From}, State1)}
+ {noreply, run_pending_item(Item, State1)}
+ end;
+
+handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
+ obtain_count = Count,
+ obtain_pending = Pending,
+ blocked = Blocked })
+ when Limit =/= infinity andalso Count >= Limit ->
+ Item = {obtain, Pid, From},
+ {noreply, ensure_mref(Pid, State #fhc_state {
+ obtain_pending = [Item | Pending],
+ blocked = sets:add_element(Pid, Blocked) })};
+handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
+ obtain_pending = Pending,
+ blocked = Blocked }) ->
+ Item = {obtain, Pid, From},
+ case maybe_reduce(ensure_mref(Pid, State #fhc_state {
+ obtain_count = Count + 1 })) of
+ {true, State1} ->
+ {noreply, State1 #fhc_state {
+ obtain_count = Count,
+ obtain_pending = [Item | Pending],
+ blocked = sets:add_element(Pid, Blocked) }};
+ {false, State1} ->
+ {noreply, run_pending_item(Item, State1)}
end.
handle_cast({register_callback, Pid, MFA},
@@ -808,59 +806,51 @@ handle_cast({update, Pid, EldestUnusedSince}, State =
%% storm of messages
{noreply, State #fhc_state { elders = Elders1 }};
-handle_cast({close, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders, counts = Counts,
- open_count = Count }) ->
+handle_cast({close, Pid, EldestUnusedSince},
+ State = #fhc_state { open_count = Count,
+ counts = Counts,
+ elders = Elders }) ->
Elders1 = case EldestUnusedSince of
undefined -> dict:erase(Pid, Elders);
_ -> dict:store(Pid, EldestUnusedSince, Elders)
end,
- {Obtained, Opened} = dict:fetch(Pid, Counts),
- {noreply,
- process_pending(State #fhc_state {
- open_count = Count - 1,
- counts = dict:store(Pid, {Obtained, Opened - 1}, Counts),
- elders = Elders1 })};
+ Counts1 = update_counts(open, Pid, -1, Counts),
+ {noreply, process_pending(State #fhc_state { open_count = Count - 1,
+ counts = Counts1,
+ elders = Elders1 })};
+
+handle_cast({transfer, FromPid, ToPid}, State) ->
+ State1 = #fhc_state { counts = Counts } = ensure_mref(ToPid, State),
+ Counts1 = update_counts(obtain, FromPid, -1, Counts),
+ Counts2 = update_counts(obtain, ToPid, +1, Counts1),
+ {noreply, process_pending(State1 #fhc_state { counts = Counts2 })};
handle_cast(check_counts, State) ->
{_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }),
{noreply, State1}.
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State =
- #fhc_state { obtain_count = ObtainCount,
- obtain_pending = ObtainPending,
- open_count = OpenCount,
+ #fhc_state { open_count = OpenCount,
open_pending = OpenPending,
+ obtain_count = ObtainCount,
+ obtain_pending = ObtainPending,
callbacks = Callbacks,
counts = Counts,
elders = Elders,
blocked = Blocked }) ->
- ObtainPending1 =
- lists:filter(
- fun ({obtain, FromPid, FromMRef, From, ForPid}) ->
- case Pid =:= ForPid of
- true -> gen_server:reply(From, ok),
- true = erlang:demonitor(FromMRef, [flush]),
- false;
- false -> Pid =/= FromPid
- end
- end, ObtainPending),
- OpenPending1 = lists:filter(fun ({open, Pid1, _From}) ->
- Pid =/= Pid1
- end, OpenPending),
- {Obtained, Opened} = case dict:find(Pid, Counts) of
- {ok, Val} -> Val;
- error -> {0, 0}
- end,
+ FilterFun = fun ({_Kind, Pid1, _From}) -> Pid1 =/= Pid end,
+ OpenPending1 = lists:filter(FilterFun, OpenPending),
+ ObtainPending1 = lists:filter(FilterFun, ObtainPending),
+ {Opened, Obtained} = dict:fetch(Pid, Counts),
{noreply, process_pending(
State #fhc_state {
- elders = dict:erase(Pid, Elders),
- counts = dict:erase(Pid, Counts),
- callbacks = dict:erase(Pid, Callbacks),
- obtain_count = ObtainCount - Obtained,
- obtain_pending = ObtainPending1,
open_count = OpenCount - Opened,
open_pending = OpenPending1,
+ obtain_count = ObtainCount - Obtained,
+ obtain_pending = ObtainPending1,
+ callbacks = dict:erase(Pid, Callbacks),
+ counts = dict:erase(Pid, Counts),
+ elders = dict:erase(Pid, Elders),
blocked = sets:del_element(Pid, Blocked) })}.
terminate(_Reason, State) ->
@@ -907,24 +897,21 @@ process_pending(Pending, Quota, State) ->
SatisfiableLen = lists:min([PendingLen, Quota]),
Take = PendingLen - SatisfiableLen,
{PendingNew, SatisfiableRev} = lists:split(Take, Pending),
- {PendingNew, SatisfiableLen,
- lists:foldl(fun run_pending_item/2, State, SatisfiableRev)}.
+ State1 = lists:foldl(fun run_pending_item/2, State, SatisfiableRev),
+ {PendingNew, SatisfiableLen, State1}.
-run_pending_item({open, Pid, From}, State = #fhc_state { counts = Counts,
+run_pending_item({Kind, Pid, From}, State = #fhc_state { counts = Counts,
blocked = Blocked }) ->
gen_server:reply(From, ok),
- {Obtained, Opened} = dict:fetch(Pid, Counts),
- State #fhc_state {
- counts = dict:store(Pid, {Obtained, Opened + 1}, Counts),
- blocked = sets:del_element(Pid, Blocked) };
-run_pending_item({obtain, FromPid, FromMRef, From, ForPid},
- State = #fhc_state { counts = Counts, blocked = Blocked }) ->
- gen_server:reply(From, ok),
- true = erlang:demonitor(FromMRef, [flush]),
- {Obtained, Opened} = dict:fetch(ForPid, Counts),
- State #fhc_state {
- counts = dict:store(ForPid, {Obtained + 1, Opened}, Counts),
- blocked = sets:del_element(FromPid, Blocked) }.
+ State #fhc_state { counts = update_counts(Kind, Pid, +1, Counts),
+ blocked = sets:del_element(Pid, Blocked) }.
+
+update_counts(open, Pid, Delta, Counts) ->
+ dict:update(Pid, fun ({Opened, Obtained}) -> {Opened + Delta, Obtained} end,
+ Counts);
+update_counts(obtain, Pid, Delta, Counts) ->
+ dict:update(Pid, fun ({Opened, Obtained}) -> {Opened, Obtained + Delta} end,
+ Counts).
maybe_reduce(State = #fhc_state { limit = Limit,
open_count = OpenCount,
@@ -1010,5 +997,6 @@ ensure_mref(Pid, State = #fhc_state { counts = Counts }) ->
case dict:find(Pid, Counts) of
{ok, _} -> State;
error -> _MRef = erlang:monitor(process, Pid),
- State #fhc_state { counts = dict:store(Pid, {0, 0}, Counts) }
+ State #fhc_state {
+ counts = dict:store(Pid, {0, 0}, Counts) }
end.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2453280e34..0cdb4fff08 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -197,7 +197,8 @@ find_durable_queues() ->
recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(Q) || Q <- DurableQueues],
- [Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q].
+ [Q || Q <- Qs,
+ gen_server2:call(Q#amqqueue.pid, {init, true}, infinity) == Q].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d52660c5ac..2cab7136a6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -603,6 +603,7 @@ handle_call({init, Recover}, From,
declare(Recover, From, State);
_ -> #q{q = #amqqueue{name = QName, durable = IsDurable},
backing_queue = BQ, backing_queue_state = undefined} = State,
+ gen_server2:reply(From, not_found),
case Recover of
true -> ok;
_ -> rabbit_log:warning(
@@ -610,7 +611,7 @@ handle_call({init, Recover}, From,
end,
BQS = BQ:init(QName, IsDurable, Recover),
%% Rely on terminate to delete the queue.
- {stop, normal, not_found, State#q{backing_queue_state = BQS}}
+ {stop, normal, State#q{backing_queue_state = BQS}}
end;
handle_call(info, _From, State) ->
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 7c10d48a9f..ab50c28ca5 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -55,23 +55,19 @@ start_heartbeat(Sock, TimeoutSec) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
- Sender =
- spawn_link(fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2,
- send_oct, 0,
- fun () ->
- catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
- continue
- end}, Parent) end),
+ Sender = heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0,
+ fun () ->
+ catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
+ continue
+ end}, Parent),
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
- Receiver =
- spawn_link(fun () -> heartbeater({Sock, TimeoutSec * 1000,
- recv_oct, 1,
- fun () ->
- Parent ! timeout,
- stop
- end}, Parent) end),
+ Receiver = heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1,
+ fun () ->
+ Parent ! timeout,
+ stop
+ end}, Parent),
{Sender, Receiver}.
pause_monitor(none) ->
@@ -89,7 +85,9 @@ resume_monitor({_Sender, Receiver}) ->
%%----------------------------------------------------------------------------
heartbeater(Params, Parent) ->
- heartbeater(Params, erlang:monitor(process, Parent), {0, 0}).
+ spawn_link(fun () -> heartbeater(Params, erlang:monitor(process, Parent),
+ {0, 0})
+ end).
heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
MonitorRef, {StatVal, SameCount}) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index d5ade90f6f..a133bf450d 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -761,9 +761,10 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
- rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
- State1 = State#v1{connection_state = running,
- connection = NewConnection},
+ State1 = internal_conserve_memory(
+ rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ State#v1{connection_state = running,
+ connection = NewConnection}),
rabbit_event:notify(
connection_created,
[{Item, i(Item, State1)} || Item <- ?CREATION_EVENT_KEYS]),
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 11ce6fc532..c9809ace61 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -55,7 +55,7 @@ handle_call(_Request, _From, State) ->
{noreply, State}.
handle_cast(accept, State) ->
- ok = file_handle_cache:obtain(self()),
+ ok = file_handle_cache:obtain(),
accept(State);
handle_cast(_Msg, State) ->
@@ -84,7 +84,8 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
%% is drained.
gen_event:which_handlers(error_logger),
%% handle
- file_handle_cache:obtain(apply(M, F, A ++ [Sock]))
+ file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
+ ok = file_handle_cache:obtain()
catch {inet_error, Reason} ->
gen_tcp:close(Sock),
error_logger:error_msg("unable to accept TCP connection: ~p~n",
@@ -93,11 +94,13 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
%% accept more
accept(State);
+
handle_info({inet_async, LSock, Ref, {error, closed}},
State=#state{sock=LSock, ref=Ref}) ->
%% It would be wrong to attempt to restart the acceptor when we
%% know this will fail.
{stop, normal, State};
+
handle_info(_Info, State) ->
{noreply, State}.