summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-27 18:19:56 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-27 18:19:56 +0100
commit14c76021b6b58dbf973cb229cfa00e7ba23130d9 (patch)
tree6cc9baee2d801dc6a40ccc7b4d67d3ce4dc835bc
parent2a1598e9adcce9aef5631cd772f16ae56d30cc1d (diff)
parent27ce44e7d420dba66832183a314ee24d6d5e7c3c (diff)
downloadrabbitmq-server-git-14c76021b6b58dbf973cb229cfa00e7ba23130d9.tar.gz
merge default into bug20284
The tests fail right now because queue_index sends out more messages than expected.
-rw-r--r--Makefile6
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rwxr-xr-xscripts/rabbitmq-server6
-rw-r--r--scripts/rabbitmq-server.bat13
-rw-r--r--scripts/rabbitmq-service.bat13
-rw-r--r--src/delegate.erl2
-rw-r--r--src/delegate_sup.erl2
-rw-r--r--src/file_handle_cache.erl726
-rw-r--r--src/gatherer.erl2
-rw-r--r--src/gen_server2.erl54
-rw-r--r--src/pg_local.erl4
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl3
-rw-r--r--src/rabbit_channel.erl69
-rw-r--r--src/rabbit_channel_sup.erl96
-rw-r--r--src/rabbit_channel_sup_sup.erl (renamed from src/rabbit_hooks.erl)52
-rw-r--r--src/rabbit_connection_sup.erl99
-rw-r--r--src/rabbit_event.erl2
-rw-r--r--src/rabbit_exchange_type_registry.erl3
-rw-r--r--src/rabbit_exchange_type_topic.erl2
-rw-r--r--src/rabbit_framing_channel.erl19
-rw-r--r--src/rabbit_guid.erl2
-rw-r--r--src/rabbit_heartbeat.erl66
-rw-r--r--src/rabbit_limiter.erl27
-rw-r--r--src/rabbit_log.erl2
-rw-r--r--src/rabbit_memory_monitor.erl2
-rw-r--r--src/rabbit_misc.erl20
-rw-r--r--src/rabbit_mnesia.erl128
-rw-r--r--src/rabbit_msg_store.erl260
-rw-r--r--src/rabbit_msg_store_gc.erl2
-rw-r--r--src/rabbit_multi.erl9
-rw-r--r--src/rabbit_networking.erl18
-rw-r--r--src/rabbit_persister.erl5
-rw-r--r--src/rabbit_plugin_activator.erl9
-rw-r--r--src/rabbit_queue_collector.erl12
-rw-r--r--src/rabbit_queue_index.erl6
-rw-r--r--src/rabbit_reader.erl227
-rw-r--r--src/rabbit_tests.erl103
-rw-r--r--src/rabbit_types.erl5
-rw-r--r--src/rabbit_writer.erl86
-rw-r--r--src/supervisor2.erl43
-rw-r--r--src/tcp_acceptor.erl7
-rw-r--r--src/tcp_client_sup.erl10
-rw-r--r--src/vm_memory_monitor.erl5
-rw-r--r--src/worker_pool.erl2
-rw-r--r--src/worker_pool_sup.erl5
-rw-r--r--src/worker_pool_worker.erl3
49 files changed, 1342 insertions, 913 deletions
diff --git a/Makefile b/Makefile
index b39c09f63c..ee3f198b65 100644
--- a/Makefile
+++ b/Makefile
@@ -4,7 +4,6 @@ RABBITMQ_NODENAME ?= rabbit
RABBITMQ_SERVER_START_ARGS ?=
RABBITMQ_MNESIA_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-mnesia
RABBITMQ_LOG_BASE ?= $(TMPDIR)
-RABBITMQ_PLUGINS_EXPAND_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-plugins-scratch
DEPS_FILE=deps.mk
SOURCE_DIR=src
@@ -147,8 +146,7 @@ BASIC_SCRIPT_ENVIRONMENT_SETTINGS=\
RABBITMQ_NODE_IP_ADDRESS="$(RABBITMQ_NODE_IP_ADDRESS)" \
RABBITMQ_NODE_PORT="$(RABBITMQ_NODE_PORT)" \
RABBITMQ_LOG_BASE="$(RABBITMQ_LOG_BASE)" \
- RABBITMQ_MNESIA_DIR="$(RABBITMQ_MNESIA_DIR)" \
- RABBITMQ_PLUGINS_EXPAND_DIR="$(RABBITMQ_PLUGINS_EXPAND_DIR)"
+ RABBITMQ_MNESIA_DIR="$(RABBITMQ_MNESIA_DIR)"
run: all
$(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \
@@ -239,7 +237,7 @@ distclean: clean
%.gz: %.xml $(DOCS_DIR)/examples-to-end.xsl
xmlto --version | grep -E '^xmlto version 0\.0\.([0-9]|1[1-8])$$' >/dev/null || opt='--stringparam man.indent.verbatims=0' ; \
xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \
- xmlto man -o $(DOCS_DIR) $$opt $<.tmp && \
+ xmlto -o $(DOCS_DIR) $$opt man $<.tmp && \
gzip -f $(DOCS_DIR)/`basename $< .xml`
rm -f $<.tmp
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index c0d9aedae1..17518ddf68 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -127,6 +127,9 @@ done
rm -rf %{buildroot}
%changelog
+* Mon Aug 23 2010 mikeb@rabbitmq.com 2.0.0-1
+- New Upstream Release
+
* Wed Jul 14 2010 Emile Joubert <emile@rabbitmq.com> 1.8.1-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 0dccf93879..7ee6001630 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (2.0.0-1) karmic; urgency=low
+
+ * New Upstream Release
+
+ -- Michael Bridgen <mikeb@rabbitmq.com> Mon, 23 Aug 2010 14:55:39 +0100
+
rabbitmq-server (1.8.1-1) lucid; urgency=low
* New Upstream Release
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index d52dc774b6..9310752f6f 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -39,7 +39,6 @@ CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config
CONFIG_FILE=/etc/rabbitmq/rabbitmq
LOG_BASE=/var/log/rabbitmq
MNESIA_BASE=/var/lib/rabbitmq/mnesia
-PLUGINS_EXPAND_DIR=/var/lib/rabbitmq/plugins-scratch
SERVER_START_ARGS=
. `dirname $0`/rabbitmq-env
@@ -70,7 +69,6 @@ fi
[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}
[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins"
-[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR="${PLUGINS_EXPAND_DIR}"
## Log rotation
[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS}
@@ -91,14 +89,14 @@ if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then
if erl \
-pa "$RABBITMQ_EBIN_ROOT" \
-rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \
- -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \
+ -rabbit plugins_expand_dir "\"${RABBITMQ_MNESIA_DIR}/plugins-scratch\"" \
-rabbit rabbit_ebin "\"$RABBITMQ_EBIN_ROOT\"" \
-noinput \
-hidden \
-s rabbit_plugin_activator \
-extra "$@"
then
- RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit"
+ RABBITMQ_BOOT_FILE="${RABBITMQ_MNESIA_DIR}/plugins-scratch/rabbit"
RABBITMQ_EBIN_PATH=""
else
exit 1
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index b1a91f47cd..5bcbc6babd 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -110,24 +110,21 @@ if "!RABBITMQ_MNESIA_DIR!"=="" (
set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
- set RABBITMQ_PLUGINS_EXPAND_DIR=!RABBITMQ_BASE!\plugins-scratch
-)
-
"!ERLANG_HOME!\bin\erl.exe" ^
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_plugin_activator ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
--rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_MNESIA_DIR:\=/!/plugins-scratch"\" ^
-rabbit rabbit_ebin \""!RABBITMQ_EBIN_ROOT:\=/!"\" ^
-extra !STAR!
-if not exist "!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit.boot" (
- echo Custom Boot File "!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit.boot" is missing.
+set RABBITMQ_BOOT_FILE=!RABBITMQ_MNESIA_DIR!\plugins-scratch\rabbit
+if not exist "!RABBITMQ_BOOT_FILE!.boot" (
+ echo Custom Boot File "!RABBITMQ_BOOT_FILE!.boot" is missing.
exit /B 1
)
-set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
+
set RABBITMQ_EBIN_PATH=
if "!RABBITMQ_CONFIG_FILE!"=="" (
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 95e5eebf86..4b3961d43b 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -180,24 +180,21 @@ if errorlevel 1 (
set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
- set RABBITMQ_PLUGINS_EXPAND_DIR=!RABBITMQ_BASE!\plugins-scratch
-)
-
"!ERLANG_HOME!\bin\erl.exe" ^
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_plugin_activator ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
--rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_MNESIA_DIR:\=/!/plugins-scratch"\" ^
-rabbit rabbit_ebin \""!RABBITMQ_EBIN_ROOT:\=/!"\" ^
-extra !STAR!
-if not exist "!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit.boot" (
- echo Custom Boot File "!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit.boot" is missing.
+set RABBITMQ_BOOT_FILE=!RABBITMQ_MNESIA_DIR!\plugins-scratch\rabbit
+if not exist "!RABBITMQ_BOOT_FILE!.boot" (
+ echo Custom Boot File "!RABBITMQ_BOOT_FILE!.boot" is missing.
exit /B 1
)
-set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
+
set RABBITMQ_EBIN_PATH=
if "!RABBITMQ_CONFIG_FILE!"=="" (
diff --git a/src/delegate.erl b/src/delegate.erl
index 3f57953bf7..c8aa3092c7 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -44,7 +44,7 @@
-ifdef(use_specs).
--spec(start_link/1 :: (non_neg_integer()) -> rabbit_types:ok(pid())).
+-spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | {'error', any()}).
-spec(invoke_no_result/2 ::
(pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
-spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A).
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index 39ef3f85b8..ff303ee28c 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -43,7 +43,7 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> rabbit_types:ok_or_error2(pid(), any()) | 'ignore').
+-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
-endif.
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index e209ee6be4..f83fa0bc11 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -116,13 +116,13 @@
%% do not need to worry about their handles being closed by the server
%% - reopening them when necessary is handled transparently.
%%
-%% The server also supports obtain and release_on_death. obtain/0
-%% blocks until a file descriptor is available. release_on_death/1
-%% takes a pid and monitors the pid, reducing the count by 1 when the
-%% pid dies. Thus the assumption is that obtain/0 is called first, and
-%% when that returns, release_on_death/1 is called with the pid who
-%% "owns" the file descriptor. This is, for example, used to track the
-%% use of file descriptors through network sockets.
+%% The server also supports obtain and transfer. obtain/0 blocks until
+%% a file descriptor is available. transfer/1 is transfers ownership
+%% of a file descriptor between processes. It is non-blocking.
+%%
+%% The callers of register_callback/3, obtain/0, and the argument of
+%% transfer/1 are monitored, reducing the count of handles in use
+%% appropriately when the processes terminate.
-behaviour(gen_server).
@@ -130,17 +130,27 @@
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
--export([release_on_death/1, obtain/0]).
+-export([obtain/0, transfer/1, set_limit/1, get_limit/0]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-define(RESERVED_FOR_OTHERS, 100).
--define(FILE_HANDLES_LIMIT_WINDOWS, 10000000).
+
+%% Googling around suggests that Windows has a limit somewhere around
+%% 16M, eg
+%% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx
+%% however, it turns out that's only available through the win32
+%% API. Via the C Runtime, we have just 512:
+%% http://msdn.microsoft.com/en-us/library/6e3b887c%28VS.80%29.aspx
+-define(FILE_HANDLES_LIMIT_WINDOWS, 512).
-define(FILE_HANDLES_LIMIT_OTHER, 1024).
-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
+-define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 2)).
+-define(CLIENT_ETS_TABLE, ?MODULE).
+
%%----------------------------------------------------------------------------
-record(file,
@@ -168,13 +178,31 @@
-record(fhc_state,
{ elders,
limit,
- count,
- obtains,
- callbacks,
- client_mrefs,
+ open_count,
+ open_pending,
+ obtain_limit,
+ obtain_count,
+ obtain_pending,
+ clients,
timer_ref
}).
+-record(cstate,
+ { pid,
+ callback,
+ opened,
+ obtained,
+ blocked,
+ pending_closes
+ }).
+
+-record(pending,
+ { kind,
+ pid,
+ requested,
+ from
+ }).
+
%%----------------------------------------------------------------------------
%% Specs
%%----------------------------------------------------------------------------
@@ -182,8 +210,8 @@
-ifdef(use_specs).
-type(ref() :: any()).
--type(ok_or_error() :: rabbit_types:ok_or_error(any())).
--type(val_or_error(T) :: rabbit_types:ok_or_error2(T, any())).
+-type(ok_or_error() :: 'ok' | {'error', any()}).
+-type(val_or_error(T) :: {'ok', T} | {'error', any()}).
-type(position() :: ('bof' | 'eof' | non_neg_integer() |
{('bof' |'eof'), non_neg_integer()} |
{'cur', integer()})).
@@ -210,8 +238,10 @@
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(delete/1 :: (ref()) -> ok_or_error()).
-spec(clear/1 :: (ref()) -> ok_or_error()).
--spec(release_on_death/1 :: (pid()) -> 'ok').
-spec(obtain/0 :: () -> 'ok').
+-spec(transfer/1 :: (pid()) -> 'ok').
+-spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
+-spec(get_limit/0 :: () -> non_neg_integer()).
-endif.
@@ -238,9 +268,9 @@ open(Path, Mode, Options) ->
IsWriter = is_writer(Mode1),
case IsWriter andalso HasWriter of
true -> {error, writer_exists};
- false -> Ref = make_ref(),
- case open1(Path1, Mode1, Options, Ref, bof, new) of
- {ok, _Handle} ->
+ false -> {ok, Ref} = new_closed_handle(Path1, Mode1, Options),
+ case get_or_reopen([{Ref, new}]) of
+ {ok, [_Handle1]} ->
RCount1 = case is_reader(Mode1) of
true -> RCount + 1;
false -> RCount
@@ -251,6 +281,7 @@ open(Path, Mode, Options) ->
has_writer = HasWriter1 }),
{ok, Ref};
Error ->
+ erase({Ref, fhc_handle}),
Error
end
end.
@@ -301,7 +332,7 @@ append(Ref, Data) ->
Size1 = Size + iolist_size(Data),
Handle2 = Handle1 #handle { write_buffer = WriteBuffer1,
write_buffer_size = Size1 },
- case Limit /= infinity andalso Size1 > Limit of
+ case Limit =/= infinity andalso Size1 > Limit of
true -> {Result, Handle3} = write_buffer(Handle2),
{Result, [Handle3]};
false -> {ok, [Handle2]}
@@ -375,7 +406,8 @@ copy(Src, Dest, Count) ->
{ok, Count1} = Result1 ->
{Result1,
[SHandle #handle { offset = SOffset + Count1 },
- DHandle #handle { offset = DOffset + Count1 }]};
+ DHandle #handle { offset = DOffset + Count1,
+ is_dirty = true }]};
Error ->
{Error, [SHandle, DHandle]}
end;
@@ -420,29 +452,29 @@ set_maximum_since_use(MaximumAge) ->
case lists:foldl(
fun ({{Ref, fhc_handle},
Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) ->
- Age = timer:now_diff(Now, Then),
- case Hdl /= closed andalso Age >= MaximumAge of
- true -> {Res, Handle1} = soft_close(Handle),
- case Res of
- ok -> put({Ref, fhc_handle}, Handle1),
- false;
- _ -> put_handle(Ref, Handle1),
- Rep
- end;
+ case Hdl =/= closed andalso
+ timer:now_diff(Now, Then) >= MaximumAge of
+ true -> soft_close(Ref, Handle) orelse Rep;
false -> Rep
end;
(_KeyValuePair, Rep) ->
Rep
- end, true, get()) of
- true -> age_tree_change(), ok;
- false -> ok
+ end, false, get()) of
+ false -> age_tree_change(), ok;
+ true -> ok
end.
-release_on_death(Pid) when is_pid(Pid) ->
- gen_server:cast(?SERVER, {release_on_death, Pid}).
-
obtain() ->
- gen_server:call(?SERVER, obtain, infinity).
+ gen_server:call(?SERVER, {obtain, self()}, infinity).
+
+transfer(Pid) ->
+ gen_server:cast(?SERVER, {transfer, self(), Pid}).
+
+set_limit(Limit) ->
+ gen_server:call(?SERVER, {set_limit, Limit}, infinity).
+
+get_limit() ->
+ gen_server:call(?SERVER, get_limit, infinity).
%%----------------------------------------------------------------------------
%% Internal functions
@@ -459,18 +491,9 @@ append_to_write(Mode) ->
end.
with_handles(Refs, Fun) ->
- ResHandles = lists:foldl(
- fun (Ref, {ok, HandlesAcc}) ->
- case get_or_reopen(Ref) of
- {ok, Handle} -> {ok, [Handle | HandlesAcc]};
- Error -> Error
- end;
- (_Ref, Error) ->
- Error
- end, {ok, []}, Refs),
- case ResHandles of
+ case get_or_reopen([{Ref, reopen} || Ref <- Refs]) of
{ok, Handles} ->
- case Fun(lists:reverse(Handles)) of
+ case Fun(Handles) of
{Result, Handles1} when is_list(Handles1) ->
lists:zipwith(fun put_handle/2, Refs, Handles1),
Result;
@@ -499,36 +522,94 @@ with_flushed_handles(Refs, Fun) ->
end
end).
-get_or_reopen(Ref) ->
- case get({Ref, fhc_handle}) of
- undefined ->
- {error, not_open, Ref};
- #handle { hdl = closed, offset = Offset,
- path = Path, mode = Mode, options = Options } ->
- open1(Path, Mode, Options, Ref, Offset, reopen);
- Handle ->
- {ok, Handle}
+get_or_reopen(RefNewOrReopens) ->
+ case partition_handles(RefNewOrReopens) of
+ {OpenHdls, []} ->
+ {ok, [Handle || {_Ref, Handle} <- OpenHdls]};
+ {OpenHdls, ClosedHdls} ->
+ Oldest = oldest(get_age_tree(), fun () -> now() end),
+ case gen_server:call(?SERVER, {open, self(), length(ClosedHdls),
+ Oldest}, infinity) of
+ ok ->
+ case reopen(ClosedHdls) of
+ {ok, RefHdls} -> sort_handles(RefNewOrReopens,
+ OpenHdls, RefHdls, []);
+ Error -> Error
+ end;
+ close ->
+ [soft_close(Ref, Handle) ||
+ {{Ref, fhc_handle}, Handle = #handle { hdl = Hdl }} <-
+ get(),
+ Hdl =/= closed],
+ get_or_reopen(RefNewOrReopens)
+ end
+ end.
+
+reopen(ClosedHdls) -> reopen(ClosedHdls, get_age_tree(), []).
+
+reopen([], Tree, RefHdls) ->
+ put_age_tree(Tree),
+ {ok, lists:reverse(RefHdls)};
+reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
+ path = Path,
+ mode = Mode,
+ offset = Offset,
+ last_used_at = undefined }} |
+ RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) ->
+ case file:open(Path, case NewOrReopen of
+ new -> Mode;
+ reopen -> [read | Mode]
+ end) of
+ {ok, Hdl} ->
+ Now = now(),
+ {{ok, Offset1}, Handle1} =
+ maybe_seek(Offset, Handle #handle { hdl = Hdl,
+ offset = 0,
+ last_used_at = Now }),
+ Handle2 = Handle1 #handle { trusted_offset = Offset1 },
+ put({Ref, fhc_handle}, Handle2),
+ reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree),
+ [{Ref, Handle2} | RefHdls]);
+ Error ->
+ %% NB: none of the handles in ToOpen are in the age tree
+ Oldest = oldest(Tree, fun () -> undefined end),
+ [gen_server:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen],
+ put_age_tree(Tree),
+ Error
end.
+partition_handles(RefNewOrReopens) ->
+ lists:foldr(
+ fun ({Ref, NewOrReopen}, {Open, Closed}) ->
+ case get({Ref, fhc_handle}) of
+ #handle { hdl = closed } = Handle ->
+ {Open, [{Ref, NewOrReopen, Handle} | Closed]};
+ #handle {} = Handle ->
+ {[{Ref, Handle} | Open], Closed}
+ end
+ end, {[], []}, RefNewOrReopens).
+
+sort_handles([], [], [], Acc) ->
+ {ok, lists:reverse(Acc)};
+sort_handles([{Ref, _} | RefHdls], [{Ref, Handle} | RefHdlsA], RefHdlsB, Acc) ->
+ sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]);
+sort_handles([{Ref, _} | RefHdls], RefHdlsA, [{Ref, Handle} | RefHdlsB], Acc) ->
+ sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]).
+
put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
Now = now(),
age_tree_update(Then, Now, Ref),
put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }).
-with_age_tree(Fun) ->
- put(fhc_age_tree, Fun(case get(fhc_age_tree) of
- undefined -> gb_trees:empty();
- AgeTree -> AgeTree
- end)).
+with_age_tree(Fun) -> put_age_tree(Fun(get_age_tree())).
-age_tree_insert(Now, Ref) ->
- with_age_tree(
- fun (Tree) ->
- Tree1 = gb_trees:insert(Now, Ref, Tree),
- {Oldest, _Ref} = gb_trees:smallest(Tree1),
- gen_server:cast(?SERVER, {open, self(), Oldest}),
- Tree1
- end).
+get_age_tree() ->
+ case get(fhc_age_tree) of
+ undefined -> gb_trees:empty();
+ AgeTree -> AgeTree
+ end.
+
+put_age_tree(Tree) -> put(fhc_age_tree, Tree).
age_tree_update(Then, Now, Ref) ->
with_age_tree(
@@ -540,13 +621,7 @@ age_tree_delete(Then) ->
with_age_tree(
fun (Tree) ->
Tree1 = gb_trees:delete_any(Then, Tree),
- Oldest = case gb_trees:is_empty(Tree1) of
- true ->
- undefined;
- false ->
- {Oldest1, _Ref} = gb_trees:smallest(Tree1),
- Oldest1
- end,
+ Oldest = oldest(Tree1, fun () -> undefined end),
gen_server:cast(?SERVER, {close, self(), Oldest}),
Tree1
end).
@@ -562,48 +637,53 @@ age_tree_change() ->
Tree
end).
-open1(Path, Mode, Options, Ref, Offset, NewOrReopen) ->
- Mode1 = case NewOrReopen of
- new -> Mode;
- reopen -> [read | Mode]
- end,
- case file:open(Path, Mode1) of
- {ok, Hdl} ->
- WriteBufferSize =
- case proplists:get_value(write_buffer, Options, unbuffered) of
- unbuffered -> 0;
- infinity -> infinity;
- N when is_integer(N) -> N
- end,
- Now = now(),
- Handle = #handle { hdl = Hdl,
- offset = 0,
- trusted_offset = 0,
- is_dirty = false,
- write_buffer_size = 0,
- write_buffer_size_limit = WriteBufferSize,
- write_buffer = [],
- at_eof = false,
- path = Path,
- mode = Mode,
- options = Options,
- is_write = is_writer(Mode),
- is_read = is_reader(Mode),
- last_used_at = Now },
- {{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle),
- Handle2 = Handle1 #handle { trusted_offset = Offset1 },
- put({Ref, fhc_handle}, Handle2),
- age_tree_insert(Now, Ref),
- {ok, Handle2};
- {error, Reason} ->
- {error, Reason}
+oldest(Tree, DefaultFun) ->
+ case gb_trees:is_empty(Tree) of
+ true -> DefaultFun();
+ false -> {Oldest, _Ref} = gb_trees:smallest(Tree),
+ Oldest
+ end.
+
+new_closed_handle(Path, Mode, Options) ->
+ WriteBufferSize =
+ case proplists:get_value(write_buffer, Options, unbuffered) of
+ unbuffered -> 0;
+ infinity -> infinity;
+ N when is_integer(N) -> N
+ end,
+ Ref = make_ref(),
+ put({Ref, fhc_handle}, #handle { hdl = closed,
+ offset = 0,
+ trusted_offset = 0,
+ is_dirty = false,
+ write_buffer_size = 0,
+ write_buffer_size_limit = WriteBufferSize,
+ write_buffer = [],
+ at_eof = false,
+ path = Path,
+ mode = Mode,
+ options = Options,
+ is_write = is_writer(Mode),
+ is_read = is_reader(Mode),
+ last_used_at = undefined }),
+ {ok, Ref}.
+
+soft_close(Ref, Handle) ->
+ {Res, Handle1} = soft_close(Handle),
+ case Res of
+ ok -> put({Ref, fhc_handle}, Handle1),
+ true;
+ _ -> put_handle(Ref, Handle1),
+ false
end.
soft_close(Handle = #handle { hdl = closed }) ->
{ok, Handle};
soft_close(Handle) ->
case write_buffer(Handle) of
- {ok, #handle { hdl = Hdl, offset = Offset, is_dirty = IsDirty,
+ {ok, #handle { hdl = Hdl,
+ offset = Offset,
+ is_dirty = IsDirty,
last_used_at = Then } = Handle1 } ->
ok = case IsDirty of
true -> file:sync(Hdl);
@@ -611,8 +691,10 @@ soft_close(Handle) ->
end,
ok = file:close(Hdl),
age_tree_delete(Then),
- {ok, Handle1 #handle { hdl = closed, trusted_offset = Offset,
- is_dirty = false }};
+ {ok, Handle1 #handle { hdl = closed,
+ trusted_offset = Offset,
+ is_dirty = false,
+ last_used_at = undefined }};
{_Error, _Handle} = Result ->
Result
end.
@@ -701,114 +783,303 @@ init([]) ->
_ ->
ulimit()
end,
- error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]),
- {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0,
- obtains = [], callbacks = dict:new(),
- client_mrefs = dict:new(), timer_ref = undefined }}.
-
-handle_call(obtain, From, State = #fhc_state { count = Count }) ->
- State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } =
- maybe_reduce(State #fhc_state { count = Count + 1 }),
- case Limit /= infinity andalso Count1 >= Limit of
- true -> {noreply, State1 #fhc_state { obtains = [From | Obtains],
- count = Count1 - 1 }};
- false -> {reply, ok, State1}
- end.
-
-handle_cast({register_callback, Pid, MFA},
- State = #fhc_state { callbacks = Callbacks }) ->
- {noreply, ensure_mref(
- Pid, State #fhc_state {
- callbacks = dict:store(Pid, MFA, Callbacks) })};
-
-handle_cast({open, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders, count = Count }) ->
+ ObtainLimit = obtain_limit(Limit),
+ error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n",
+ [Limit, ObtainLimit]),
+ Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]),
+ {ok, #fhc_state { elders = dict:new(),
+ limit = Limit,
+ open_count = 0,
+ open_pending = pending_new(),
+ obtain_limit = ObtainLimit,
+ obtain_count = 0,
+ obtain_pending = pending_new(),
+ clients = Clients,
+ timer_ref = undefined }}.
+
+handle_call({open, Pid, Requested, EldestUnusedSince}, From,
+ State = #fhc_state { open_count = Count,
+ open_pending = Pending,
+ elders = Elders,
+ clients = Clients })
+ when EldestUnusedSince =/= undefined ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
- {noreply, maybe_reduce(
- ensure_mref(Pid, State #fhc_state { elders = Elders1,
- count = Count + 1 }))};
+ Item = #pending { kind = open,
+ pid = Pid,
+ requested = Requested,
+ from = From },
+ ok = track_client(Pid, Clients),
+ State1 = State #fhc_state { elders = Elders1 },
+ case needs_reduce(State1 #fhc_state { open_count = Count + Requested }) of
+ true -> case ets:lookup(Clients, Pid) of
+ [#cstate { opened = 0 }] ->
+ true = ets:update_element(
+ Clients, Pid, {#cstate.blocked, true}),
+ {noreply,
+ reduce(State1 #fhc_state {
+ open_pending = pending_in(Item, Pending) })};
+ [#cstate { opened = Opened }] ->
+ true = ets:update_element(
+ Clients, Pid,
+ {#cstate.pending_closes, Opened}),
+ {reply, close, State1}
+ end;
+ false -> {noreply, run_pending_item(Item, State1)}
+ end;
-handle_cast({update, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders }) ->
+handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
+ obtain_count = Count,
+ obtain_pending = Pending,
+ clients = Clients })
+ when Limit =/= infinity andalso Count >= Limit ->
+ ok = track_client(Pid, Clients),
+ true = ets:update_element(Clients, Pid, {#cstate.blocked, true}),
+ Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
+ {noreply, State #fhc_state { obtain_pending = pending_in(Item, Pending) }};
+handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
+ obtain_pending = Pending,
+ clients = Clients }) ->
+ Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
+ ok = track_client(Pid, Clients),
+ case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of
+ true ->
+ true = ets:update_element(Clients, Pid, {#cstate.blocked, true}),
+ {noreply, reduce(State #fhc_state {
+ obtain_pending = pending_in(Item, Pending) })};
+ false ->
+ {noreply, run_pending_item(Item, State)}
+ end;
+handle_call({set_limit, Limit}, _From, State) ->
+ {reply, ok, maybe_reduce(
+ process_pending(State #fhc_state {
+ limit = Limit,
+ obtain_limit = obtain_limit(Limit) }))};
+handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) ->
+ {reply, Limit, State}.
+
+handle_cast({register_callback, Pid, MFA},
+ State = #fhc_state { clients = Clients }) ->
+ ok = track_client(Pid, Clients),
+ true = ets:update_element(Clients, Pid, {#cstate.callback, MFA}),
+ {noreply, State};
+
+handle_cast({update, Pid, EldestUnusedSince},
+ State = #fhc_state { elders = Elders })
+ when EldestUnusedSince =/= undefined ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
%% don't call maybe_reduce from here otherwise we can create a
%% storm of messages
- {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })};
+ {noreply, State #fhc_state { elders = Elders1 }};
-handle_cast({close, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders, count = Count }) ->
+handle_cast({close, Pid, EldestUnusedSince},
+ State = #fhc_state { elders = Elders, clients = Clients }) ->
Elders1 = case EldestUnusedSince of
undefined -> dict:erase(Pid, Elders);
_ -> dict:store(Pid, EldestUnusedSince, Elders)
end,
- {noreply, process_obtains(
- ensure_mref(Pid, State #fhc_state { elders = Elders1,
- count = Count - 1 }))};
+ ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}),
+ {noreply, process_pending(
+ update_counts(open, Pid, -1,
+ State #fhc_state { elders = Elders1 }))};
+
+handle_cast({transfer, FromPid, ToPid}, State) ->
+ ok = track_client(ToPid, State#fhc_state.clients),
+ {noreply, process_pending(
+ update_counts(obtain, ToPid, +1,
+ update_counts(obtain, FromPid, -1, State)))};
handle_cast(check_counts, State) ->
- {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
-
-handle_cast({release_on_death, Pid}, State) ->
- _MRef = erlang:monitor(process, Pid),
- {noreply, State}.
-
-handle_info({'DOWN', MRef, process, Pid, _Reason}, State =
- #fhc_state { count = Count, callbacks = Callbacks,
- client_mrefs = ClientMRefs, elders = Elders }) ->
- {noreply, process_obtains(
- case dict:find(Pid, ClientMRefs) of
- {ok, MRef} -> State #fhc_state {
- elders = dict:erase(Pid, Elders),
- client_mrefs = dict:erase(Pid, ClientMRefs),
- callbacks = dict:erase(Pid, Callbacks) };
- _ -> State #fhc_state { count = Count - 1 }
- end)}.
-
-terminate(_Reason, State) ->
+ {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}.
+
+handle_info({'DOWN', _MRef, process, Pid, _Reason},
+ State = #fhc_state { elders = Elders,
+ open_count = OpenCount,
+ open_pending = OpenPending,
+ obtain_count = ObtainCount,
+ obtain_pending = ObtainPending,
+ clients = Clients }) ->
+ [#cstate { opened = Opened, obtained = Obtained }] =
+ ets:lookup(Clients, Pid),
+ true = ets:delete(Clients, Pid),
+ FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end,
+ {noreply, process_pending(
+ State #fhc_state {
+ open_count = OpenCount - Opened,
+ open_pending = filter_pending(FilterFun, OpenPending),
+ obtain_count = ObtainCount - Obtained,
+ obtain_pending = filter_pending(FilterFun, ObtainPending),
+ elders = dict:erase(Pid, Elders) })}.
+
+terminate(_Reason, State = #fhc_state { clients = Clients }) ->
+ ets:delete(Clients),
State.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%----------------------------------------------------------------------------
+%% pending queue abstraction helpers
+%%----------------------------------------------------------------------------
+
+queue_fold(Fun, Init, Q) ->
+ case queue:out(Q) of
+ {empty, _Q} -> Init;
+ {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
+ end.
+
+filter_pending(Fun, {Count, Queue}) ->
+ {Delta, Queue1} =
+ queue_fold(fun (Item, {DeltaN, QueueN}) ->
+ case Fun(Item) of
+ true -> {DeltaN, queue:in(Item, QueueN)};
+ false -> {DeltaN - requested(Item), QueueN}
+ end
+ end, {0, queue:new()}, Queue),
+ {Count + Delta, Queue1}.
+
+pending_new() ->
+ {0, queue:new()}.
+
+pending_in(Item = #pending { requested = Requested }, {Count, Queue}) ->
+ {Count + Requested, queue:in(Item, Queue)}.
+
+pending_out({0, _Queue} = Pending) ->
+ {empty, Pending};
+pending_out({N, Queue}) ->
+ {{value, #pending { requested = Requested }} = Result, Queue1} =
+ queue:out(Queue),
+ {Result, {N - Requested, Queue1}}.
+
+pending_count({Count, _Queue}) ->
+ Count.
+
+pending_is_empty({0, _Queue}) ->
+ true;
+pending_is_empty({_N, _Queue}) ->
+ false.
+
+%%----------------------------------------------------------------------------
%% server helpers
%%----------------------------------------------------------------------------
-process_obtains(State = #fhc_state { obtains = [] }) ->
- State;
-process_obtains(State = #fhc_state { limit = Limit, count = Count })
- when Limit /= infinity andalso Count >= Limit ->
+obtain_limit(infinity) -> infinity;
+obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of
+ OLimit when OLimit < 0 -> 0;
+ OLimit -> OLimit
+ end.
+
+requested({_Kind, _Pid, Requested, _From}) ->
+ Requested.
+
+process_pending(State = #fhc_state { limit = infinity }) ->
State;
-process_obtains(State = #fhc_state { limit = Limit, count = Count,
- obtains = Obtains }) ->
- ObtainsLen = length(Obtains),
- ObtainableLen = lists:min([ObtainsLen, Limit - Count]),
- Take = ObtainsLen - ObtainableLen,
- {ObtainsNew, ObtainableRev} = lists:split(Take, Obtains),
- [gen_server:reply(From, ok) || From <- ObtainableRev],
- State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }.
-
-maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders,
- callbacks = Callbacks, timer_ref = TRef })
- when Limit /= infinity andalso Count >= Limit ->
+process_pending(State) ->
+ process_open(process_obtain(State)).
+
+process_open(State = #fhc_state { limit = Limit,
+ open_pending = Pending,
+ open_count = OpenCount,
+ obtain_count = ObtainCount }) ->
+ {Pending1, State1} =
+ process_pending(Pending, Limit - (ObtainCount + OpenCount), State),
+ State1 #fhc_state { open_pending = Pending1 }.
+
+process_obtain(State = #fhc_state { limit = Limit,
+ obtain_pending = Pending,
+ obtain_limit = ObtainLimit,
+ obtain_count = ObtainCount,
+ open_count = OpenCount }) ->
+ Quota = lists:min([ObtainLimit - ObtainCount,
+ Limit - (ObtainCount + OpenCount)]),
+ {Pending1, State1} = process_pending(Pending, Quota, State),
+ State1 #fhc_state { obtain_pending = Pending1 }.
+
+process_pending(Pending, Quota, State) when Quota =< 0 ->
+ {Pending, State};
+process_pending(Pending, Quota, State) ->
+ case pending_out(Pending) of
+ {empty, _Pending} ->
+ {Pending, State};
+ {{value, #pending { requested = Requested }}, _Pending1}
+ when Requested > Quota ->
+ {Pending, State};
+ {{value, #pending { requested = Requested } = Item}, Pending1} ->
+ process_pending(Pending1, Quota - Requested,
+ run_pending_item(Item, State))
+ end.
+
+run_pending_item(#pending { kind = Kind,
+ pid = Pid,
+ requested = Requested,
+ from = From },
+ State = #fhc_state { clients = Clients }) ->
+ gen_server:reply(From, ok),
+ true = ets:update_element(Clients, Pid, {#cstate.blocked, false}),
+ update_counts(Kind, Pid, Requested, State).
+
+update_counts(Kind, Pid, Delta,
+ State = #fhc_state { open_count = OpenCount,
+ obtain_count = ObtainCount,
+ clients = Clients }) ->
+ {OpenDelta, ObtainDelta} = update_counts1(Kind, Pid, Delta, Clients),
+ State #fhc_state { open_count = OpenCount + OpenDelta,
+ obtain_count = ObtainCount + ObtainDelta }.
+
+update_counts1(open, Pid, Delta, Clients) ->
+ ets:update_counter(Clients, Pid, {#cstate.opened, Delta}),
+ {Delta, 0};
+update_counts1(obtain, Pid, Delta, Clients) ->
+ ets:update_counter(Clients, Pid, {#cstate.obtained, Delta}),
+ {0, Delta}.
+
+maybe_reduce(State) ->
+ case needs_reduce(State) of
+ true -> reduce(State);
+ false -> State
+ end.
+
+needs_reduce(#fhc_state { limit = Limit,
+ open_count = OpenCount,
+ open_pending = OpenPending,
+ obtain_count = ObtainCount,
+ obtain_limit = ObtainLimit,
+ obtain_pending = ObtainPending }) ->
+ Limit =/= infinity
+ andalso ((OpenCount + ObtainCount > Limit)
+ orelse (not pending_is_empty(OpenPending))
+ orelse (ObtainCount < ObtainLimit
+ andalso not pending_is_empty(ObtainPending))).
+
+reduce(State = #fhc_state { open_pending = OpenPending,
+ obtain_pending = ObtainPending,
+ elders = Elders,
+ clients = Clients,
+ timer_ref = TRef }) ->
Now = now(),
- {Pids, Sum, ClientCount} =
- dict:fold(fun (_Pid, undefined, Accs) ->
- Accs;
- (Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) ->
- {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest),
- CountAcc + 1}
+ {CStates, Sum, ClientCount} =
+ dict:fold(fun (Pid, Eldest, {CStatesAcc, SumAcc, CountAcc} = Accs) ->
+ [#cstate { pending_closes = PendingCloses,
+ opened = Opened,
+ blocked = Blocked } = CState] =
+ ets:lookup(Clients, Pid),
+ case Blocked orelse PendingCloses =:= Opened of
+ true -> Accs;
+ false -> {[CState | CStatesAcc],
+ SumAcc + timer:now_diff(Now, Eldest),
+ CountAcc + 1}
+ end
end, {[], 0, 0}, Elders),
- case Pids of
+ case CStates of
[] -> ok;
- _ -> AverageAge = Sum / ClientCount,
- lists:foreach(
- fun (Pid) ->
- case dict:find(Pid, Callbacks) of
- error -> ok;
- {ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge])
- end
- end, Pids)
+ _ -> case (Sum / ClientCount) -
+ (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of
+ AverageAge when AverageAge > 0 ->
+ notify_age(CStates, AverageAge);
+ _ ->
+ notify_age0(Clients, CStates,
+ pending_count(OpenPending) +
+ pending_count(ObtainPending))
+ end
end,
case TRef of
undefined -> {ok, TRef1} = timer:apply_after(
@@ -816,20 +1087,51 @@ maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders,
gen_server, cast, [?SERVER, check_counts]),
State #fhc_state { timer_ref = TRef1 };
_ -> State
- end;
-maybe_reduce(State) ->
- State.
+ end.
-%% Googling around suggests that Windows has a limit somewhere around
-%% 16M, eg
-%% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx
-%% For everything else, assume ulimit exists. Further googling
-%% suggests that BSDs (incl OS X), solaris and linux all agree that
-%% ulimit -n is file handles
+notify_age(CStates, AverageAge) ->
+ lists:foreach(
+ fun (#cstate { callback = undefined }) -> ok;
+ (#cstate { callback = {M, F, A} }) -> apply(M, F, A ++ [AverageAge])
+ end, CStates).
+
+notify_age0(Clients, CStates, Required) ->
+ Notifications =
+ [CState || CState <- CStates, CState#cstate.callback =/= undefined],
+ {L1, L2} = lists:split(random:uniform(length(Notifications)),
+ Notifications),
+ notify(Clients, Required, L2 ++ L1).
+
+notify(_Clients, _Required, []) ->
+ ok;
+notify(_Clients, Required, _Notifications) when Required =< 0 ->
+ ok;
+notify(Clients, Required, [#cstate{ pid = Pid,
+ callback = {M, F, A},
+ opened = Opened } | Notifications]) ->
+ apply(M, F, A ++ [0]),
+ ets:update_element(Clients, Pid, {#cstate.pending_closes, Opened}),
+ notify(Clients, Required - Opened, Notifications).
+
+track_client(Pid, Clients) ->
+ case ets:insert_new(Clients, #cstate { pid = Pid,
+ callback = undefined,
+ opened = 0,
+ obtained = 0,
+ blocked = false,
+ pending_closes = 0 }) of
+ true -> _MRef = erlang:monitor(process, Pid),
+ ok;
+ false -> ok
+ end.
+
+%% For all unices, assume ulimit exists. Further googling suggests
+%% that BSDs (incl OS X), solaris and linux all agree that ulimit -n
+%% is file handles
ulimit() ->
case os:type() of
{win32, _OsName} ->
- ?FILE_HANDLES_LIMIT_WINDOWS;
+ ?FILE_HANDLES_LIMIT_WINDOWS - ?RESERVED_FOR_OTHERS;
{unix, _OsName} ->
%% Under Linux, Solaris and FreeBSD, ulimit is a shell
%% builtin, not a command. In OS X, it's a command.
@@ -852,11 +1154,3 @@ ulimit() ->
_ ->
?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
end.
-
-ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) ->
- case dict:find(Pid, ClientMRefs) of
- {ok, _MRef} -> State;
- error -> MRef = erlang:monitor(process, Pid),
- State #fhc_state {
- client_mrefs = dict:store(Pid, MRef, ClientMRefs) }
- end.
diff --git a/src/gatherer.erl b/src/gatherer.erl
index 31dda16e83..1e03d6c41c 100644
--- a/src/gatherer.erl
+++ b/src/gatherer.erl
@@ -42,7 +42,7 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
-spec(stop/1 :: (pid()) -> 'ok').
-spec(fork/1 :: (pid()) -> 'ok').
-spec(finish/1 :: (pid()) -> 'ok').
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index f1c8eb4d9d..9fb9e2fea7 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -976,7 +976,7 @@ print_event(Dev, Event, Name) ->
terminate(Reason, Name, Msg, Mod, State, Debug) ->
case catch Mod:terminate(Reason, State) of
{'EXIT', R} ->
- error_info(R, Name, Msg, State, Debug),
+ error_info(R, Reason, Name, Msg, State, Debug),
exit(R);
_ ->
case Reason of
@@ -987,42 +987,44 @@ terminate(Reason, Name, Msg, Mod, State, Debug) ->
{shutdown,_}=Shutdown ->
exit(Shutdown);
_ ->
- error_info(Reason, Name, Msg, State, Debug),
+ error_info(Reason, undefined, Name, Msg, State, Debug),
exit(Reason)
end
end.
-error_info(_Reason, application_controller, _Msg, _State, _Debug) ->
+error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) ->
%% OTP-5811 Don't send an error report if it's the system process
%% application_controller which is terminating - let init take care
%% of it instead
ok;
-error_info(Reason, Name, Msg, State, Debug) ->
- Reason1 =
- case Reason of
- {undef,[{M,F,A}|MFAs]} ->
- case code:is_loaded(M) of
- false ->
- {'module could not be loaded',[{M,F,A}|MFAs]};
- _ ->
- case erlang:function_exported(M, F, length(A)) of
- true ->
- Reason;
- false ->
- {'function not exported',[{M,F,A}|MFAs]}
- end
- end;
- _ ->
- Reason
- end,
- format("** Generic server ~p terminating \n"
- "** Last message in was ~p~n"
- "** When Server state == ~p~n"
- "** Reason for termination == ~n** ~p~n",
- [Name, Msg, State, Reason1]),
+error_info(Reason, RootCause, Name, Msg, State, Debug) ->
+ Reason1 = error_reason(Reason),
+ Fmt =
+ "** Generic server ~p terminating~n"
+ "** Last message in was ~p~n"
+ "** When Server state == ~p~n"
+ "** Reason for termination == ~n** ~p~n",
+ case RootCause of
+ undefined -> format(Fmt, [Name, Msg, State, Reason1]);
+ _ -> format(Fmt ++ "** In 'terminate' callback "
+ "with reason ==~n** ~p~n",
+ [Name, Msg, State, Reason1,
+ error_reason(RootCause)])
+ end,
sys:print_log(Debug),
ok.
+error_reason({undef,[{M,F,A}|MFAs]} = Reason) ->
+ case code:is_loaded(M) of
+ false -> {'module could not be loaded',[{M,F,A}|MFAs]};
+ _ -> case erlang:function_exported(M, F, length(A)) of
+ true -> Reason;
+ false -> {'function not exported',[{M,F,A}|MFAs]}
+ end
+ end;
+error_reason(Reason) ->
+ Reason.
+
%%% ---------------------------------------------------
%%% Misc. functions.
%%% ---------------------------------------------------
diff --git a/src/pg_local.erl b/src/pg_local.erl
index f5ded123d7..49fa873ae4 100644
--- a/src/pg_local.erl
+++ b/src/pg_local.erl
@@ -45,8 +45,8 @@
-type(name() :: term()).
--spec(start_link/0 :: () -> rabbit_types:ok_or_error2(pid(), term())).
--spec(start/0 :: () -> rabbit_types:ok_or_error2(pid(), term())).
+-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
+-spec(start/0 :: () -> {'ok', pid()} | {'error', any()}).
-spec(join/2 :: (name(), pid()) -> 'ok').
-spec(leave/2 :: (name(), pid()) -> 'ok').
-spec(get_members/1 :: (name()) -> [pid()]).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 41c628a071..303d1f3ac0 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -83,12 +83,6 @@
{requires, external_infrastructure},
{enables, kernel_ready}]}).
--rabbit_boot_step({rabbit_hooks,
- [{description, "internal event notification system"},
- {mfa, {rabbit_hooks, start, []}},
- {requires, external_infrastructure},
- {enables, kernel_ready}]}).
-
-rabbit_boot_step({rabbit_event,
[{description, "statistics event manager"},
{mfa, {rabbit_sup, start_restartable_child,
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c25fd9b4cb..99faa511c3 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -197,7 +197,8 @@ find_durable_queues() ->
recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(Q) || Q <- DurableQueues],
- [Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q].
+ [Q || Q <- Qs,
+ gen_server2:call(Q#amqqueue.pid, {init, true}, infinity) == Q].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 15939802ae..ce5c8162a1 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -631,6 +631,7 @@ handle_call({init, Recover}, From,
declare(Recover, From, State);
_ -> #q{q = #amqqueue{name = QName, durable = IsDurable},
backing_queue = BQ, backing_queue_state = undefined} = State,
+ gen_server2:reply(From, not_found),
case Recover of
true -> ok;
_ -> rabbit_log:warning(
@@ -638,7 +639,7 @@ handle_call({init, Recover}, From,
end,
BQS = BQ:init(QName, IsDurable, Recover),
%% Rely on terminate to delete the queue.
- {stop, normal, not_found, State#q{backing_queue_state = BQS}}
+ {stop, normal, State#q{backing_queue_state = BQS}}
end;
handle_call(info, _From, State) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ae49b30127..0964700c29 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,7 +35,7 @@
-behaviour(gen_server2).
--export([start_link/6, do/2, do/3, shutdown/1]).
+-export([start_link/7, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([emit_stats/1, flush/1, flush_multiple_acks/1, confirm/2]).
@@ -44,7 +44,7 @@
handle_info/2, handle_pre_hibernate/1]).
-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
- transaction_id, tx_participants, next_tag,
+ start_limiter_fun, transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
@@ -80,9 +80,11 @@
-type(channel_number() :: non_neg_integer()).
--spec(start_link/6 ::
+-spec(start_link/7 ::
(channel_number(), pid(), pid(), rabbit_access_control:username(),
- rabbit_types:vhost(), pid()) -> rabbit_types:ok(pid())).
+ rabbit_types:vhost(), pid(),
+ fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
+ rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
@@ -106,9 +108,10 @@
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) ->
- gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid,
- Username, VHost, CollectorPid], []).
+start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
+ StartLimiterFun) ->
+ gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, Username,
+ VHost, CollectorPid, StartLimiterFun], []).
do(Pid, Method) ->
do(Pid, Method, none).
@@ -163,15 +166,16 @@ confirm(Pid, MsgSeqNo) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
+init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
+ StartLimiterFun]) ->
process_flag(trap_exit, true),
- link(WriterPid),
ok = pg_local:join(rabbit_channels, self()),
State = #ch{ state = starting,
channel = Channel,
reader_pid = ReaderPid,
writer_pid = WriterPid,
limiter_pid = undefined,
+ start_limiter_fun = StartLimiterFun,
transaction_id = none,
tx_participants = sets:new(),
next_tag = 1,
@@ -281,24 +285,19 @@ handle_cast({msg_sent_to_queues, MsgSeqNo, QPids}, State) ->
end, State, QPids)}.
-handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
- State = #ch{writer_pid = WriterPid}) ->
- State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
- {stop, normal, State};
-handle_info({'EXIT', _Pid, Reason}, State) ->
- {stop, Reason, State};
handle_info({'DOWN', _MRef, process, QPid, _Reason},
State = #ch{qpid_to_msgs = QTM}) ->
- case dict:find(QPid, QTM) of
- {ok, Msgs} ->
- S = gb_sets:fold(fun (MsgSeqNo, State0) ->
- send_or_enqueue_ack(MsgSeqNo, State0)
- end, State, Msgs),
- {noreply, S #ch {qpid_to_msgs = dict:erase(QPid, QTM)}};
- error ->
- erase_queue_stats(QPid),
- {noreply, queue_blocked(QPid, State)}
- end.
+ State1 = case dict:find(QPid, QTM) of
+ {ok, Msgs} ->
+ S = gb_sets:fold(fun (MsgSeqNo, State0) ->
+ send_or_enqueue_ack(MsgSeqNo, State0)
+ end, State, Msgs),
+ S #ch {qpid_to_msgs = dict:erase(QPid, QTM)};
+ error ->
+ State
+ end,
+ erase_queue_stats(QPid),
+ {noreply, queue_blocked(QPid, State1)}.
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
@@ -310,8 +309,10 @@ terminate(_Reason, State = #ch{state = terminating}) ->
terminate(Reason, State) ->
Res = rollback_and_notify(State),
case Reason of
- normal -> ok = Res;
- _ -> ok
+ normal -> ok = Res;
+ shutdown -> ok = Res;
+ {shutdown, _Term} -> ok = Res;
+ _ -> ok
end,
terminate(State).
@@ -506,7 +507,7 @@ handle_method(_Method, _, #ch{state = starting}) ->
handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
ok = rollback_and_notify(State),
- ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
+ ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
stop;
handle_method(#'access.request'{},_, State) ->
@@ -1185,8 +1186,8 @@ fold_per_queue(F, Acc0, UAQ) ->
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
-start_limiter(State = #ch{unacked_message_q = UAMQ}) ->
- {ok, LPid} = rabbit_limiter:start_link(self(), queue:len(UAMQ)),
+start_limiter(State = #ch{unacked_message_q = UAMQ, start_limiter_fun = SLF}) ->
+ {ok, LPid} = SLF(queue:len(UAMQ)),
ok = limit_queues(LPid, State),
LPid.
@@ -1258,12 +1259,10 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
false -> rabbit_writer:send_command(WriterPid, M, Content)
end.
-terminate(State = #ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) ->
+terminate(State) ->
stop_ack_timer(State),
pg_local:leave(rabbit_channels, self()),
- rabbit_event:notify(channel_closed, [{pid, self()}]),
- rabbit_writer:shutdown(WriterPid),
- rabbit_limiter:shutdown(LimiterPid).
+ rabbit_event:notify(channel_closed, [{pid, self()}]).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -1292,11 +1291,9 @@ maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) ->
end.
incr_stats({QPid, _} = QX, Inc, Measure) ->
- io:format("incr_stats for ~p~n", [QPid]),
maybe_monitor(QPid),
update_measures(queue_exchange_stats, QX, Inc, Measure);
incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
- io:format("incr_stats for ~p~n", [QPid]),
maybe_monitor(QPid),
update_measures(queue_stats, QPid, Inc, Measure);
incr_stats(X, Inc, Measure) ->
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
new file mode 100644
index 0000000000..02199a6516
--- /dev/null
+++ b/src/rabbit_channel_sup.erl
@@ -0,0 +1,96 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_channel_sup).
+
+-behaviour(supervisor2).
+
+-export([start_link/1]).
+
+-export([init/1]).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-export_type([start_link_args/0]).
+
+-type(start_link_args() ::
+ {rabbit_types:protocol(), rabbit_net:socket(),
+ rabbit_channel:channel_number(), non_neg_integer(), pid(),
+ rabbit_access_control:username(), rabbit_types:vhost(), pid()}).
+
+-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), pid()}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
+ Collector}) ->
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, WriterPid} =
+ supervisor2:start_child(
+ SupPid,
+ {writer, {rabbit_writer, start_link,
+ [Sock, Channel, FrameMax, Protocol, ReaderPid]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}),
+ {ok, ChannelPid} =
+ supervisor2:start_child(
+ SupPid,
+ {channel, {rabbit_channel, start_link,
+ [Channel, ReaderPid, WriterPid, Username, VHost,
+ Collector, start_limiter_fun(SupPid)]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
+ {ok, FramingChannelPid} =
+ supervisor2:start_child(
+ SupPid,
+ {framing_channel, {rabbit_framing_channel, start_link,
+ [ReaderPid, ChannelPid, Protocol]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}),
+ {ok, SupPid, FramingChannelPid}.
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
+
+start_limiter_fun(SupPid) ->
+ fun (UnackedCount) ->
+ Me = self(),
+ {ok, _Pid} =
+ supervisor2:start_child(
+ SupPid,
+ {limiter, {rabbit_limiter, start_link, [Me, UnackedCount]},
+ transient, ?MAX_WAIT, worker, [rabbit_limiter]})
+ end.
diff --git a/src/rabbit_hooks.erl b/src/rabbit_channel_sup_sup.erl
index 3fc84c1e09..d193880555 100644
--- a/src/rabbit_hooks.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -29,45 +29,37 @@
%% Contributor(s): ______________________________________.
%%
--module(rabbit_hooks).
+-module(rabbit_channel_sup_sup).
--export([start/0]).
--export([subscribe/3, unsubscribe/2, trigger/2, notify_remote/5]).
+-behaviour(supervisor2).
--define(TableName, rabbit_hooks).
+-export([start_link/0, start_channel/2]).
+
+-export([init/1]).
+
+%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start/0 :: () -> 'ok').
--spec(subscribe/3 :: (atom(), atom(), {atom(), atom(), list()}) -> 'ok').
--spec(unsubscribe/2 :: (atom(), atom()) -> 'ok').
--spec(trigger/2 :: (atom(), list()) -> 'ok').
--spec(notify_remote/5 :: (atom(), atom(), list(), pid(), list()) -> 'ok').
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) ->
+ {'ok', pid(), pid()}).
-endif.
-start() ->
- ets:new(?TableName, [bag, public, named_table]),
- ok.
+%%----------------------------------------------------------------------------
-subscribe(Hook, HandlerName, Handler) ->
- ets:insert(?TableName, {Hook, HandlerName, Handler}),
- ok.
+start_link() ->
+ supervisor2:start_link(?MODULE, []).
-unsubscribe(Hook, HandlerName) ->
- ets:match_delete(?TableName, {Hook, HandlerName, '_'}),
- ok.
+start_channel(Pid, Args) ->
+ {ok, ChSupPid, _} = Result = supervisor2:start_child(Pid, [Args]),
+ link(ChSupPid),
+ Result.
-trigger(Hook, Args) ->
- Hooks = ets:lookup(?TableName, Hook),
- [case catch apply(M, F, [Hook, Name, Args | A]) of
- {'EXIT', Reason} ->
- rabbit_log:warning("Failed to execute handler ~p for hook ~p: ~p",
- [Name, Hook, Reason]);
- _ -> ok
- end || {_, Name, {M, F, A}} <- Hooks],
- ok.
+%%----------------------------------------------------------------------------
-notify_remote(Hook, HandlerName, Args, Pid, PidArgs) ->
- Pid ! {rabbitmq_hook, [Hook, HandlerName, Args | PidArgs]},
- ok.
+init([]) ->
+ {ok, {{simple_one_for_one_terminate, 0, 1},
+ [{channel_sup, {rabbit_channel_sup, start_link, []},
+ temporary, infinity, supervisor, [rabbit_channel_sup]}]}}.
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
new file mode 100644
index 0000000000..69e21d73cc
--- /dev/null
+++ b/src/rabbit_connection_sup.erl
@@ -0,0 +1,99 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_connection_sup).
+
+-behaviour(supervisor2).
+
+-export([start_link/0, reader/1]).
+
+-export([init/1]).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid(), pid()}).
+-spec(reader/1 :: (pid()) -> pid()).
+
+-endif.
+
+%%--------------------------------------------------------------------------
+
+start_link() ->
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, ChannelSupSupPid} =
+ supervisor2:start_child(
+ SupPid,
+ {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
+ intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
+ {ok, Collector} =
+ supervisor2:start_child(
+ SupPid,
+ {collector, {rabbit_queue_collector, start_link, []},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
+ {ok, ReaderPid} =
+ supervisor2:start_child(
+ SupPid,
+ {reader, {rabbit_reader, start_link,
+ [ChannelSupSupPid, Collector, start_heartbeat_fun(SupPid)]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
+ {ok, SupPid, ReaderPid}.
+
+reader(Pid) ->
+ hd(supervisor2:find_child(Pid, reader)).
+
+%%--------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
+
+start_heartbeat_fun(SupPid) ->
+ fun (_Sock, 0) ->
+ none;
+ (Sock, TimeoutSec) ->
+ Parent = self(),
+ {ok, Sender} =
+ supervisor2:start_child(
+ SupPid, {heartbeat_sender,
+ {rabbit_heartbeat, start_heartbeat_sender,
+ [Parent, Sock, TimeoutSec]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ {ok, Receiver} =
+ supervisor2:start_child(
+ SupPid, {heartbeat_receiver,
+ {rabbit_heartbeat, start_heartbeat_receiver,
+ [Parent, Sock, TimeoutSec]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ {Sender, Receiver}
+ end.
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 113ffcb4bc..0f00537a6a 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -69,7 +69,7 @@
-type(timer_fun() :: fun (() -> 'ok')).
--spec(start_link/0 :: () -> rabbit_types:ok_or_error2(pid(), any())).
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(init_stats_timer/0 :: () -> state()).
-spec(ensure_stats_timer/3 :: (state(), timer_fun(), timer_fun()) -> state()).
-spec(stop_stats_timer/2 :: (state(), timer_fun()) -> state()).
diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_exchange_type_registry.erl
index 7906fbee72..f15275b538 100644
--- a/src/rabbit_exchange_type_registry.erl
+++ b/src/rabbit_exchange_type_registry.erl
@@ -45,8 +45,7 @@
-ifdef(use_specs).
--spec(start_link/0 ::
- () -> 'ignore' | rabbit_types:ok_or_error2(pid(), term())).
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(register/2 :: (binary(), atom()) -> 'ok').
-spec(binary_to_type/1 ::
(binary()) -> atom() | rabbit_types:error('not_found')).
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 89b2441e38..e796acf327 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -67,7 +67,7 @@ publish(#exchange{name = Name}, Delivery =
Delivery).
split_topic_key(Key) ->
- re:split(Key, "\\.", [{return, list}]).
+ string:tokens(binary_to_list(Key), ".").
topic_matches(PatternKey, RoutingKey) ->
P = split_topic_key(PatternKey),
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index 553faaa814..cb53185f6b 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -39,16 +39,9 @@
%%--------------------------------------------------------------------
-start_link(StartFun, StartArgs, Protocol) ->
- Parent = self(),
- {ok, spawn_link(
- fun () ->
- %% we trap exits so that a normal termination of
- %% the channel or reader process terminates us too.
- process_flag(trap_exit, true),
- {ok, ChannelPid} = apply(StartFun, StartArgs),
- mainloop(Parent, ChannelPid, Protocol)
- end)}.
+start_link(Parent, ChannelPid, Protocol) ->
+ {ok, proc_lib:spawn_link(
+ fun () -> mainloop(Parent, ChannelPid, Protocol) end)}.
process(Pid, Frame) ->
Pid ! {frame, Frame},
@@ -62,12 +55,6 @@ shutdown(Pid) ->
read_frame(ChannelPid) ->
receive
- %% converting the exit signal into one of our own ensures that
- %% the reader sees the right pid (i.e. ours) when a channel
- %% exits. Similarly in the other direction, though it is not
- %% really relevant there since the channel is not specifically
- %% watching out for reader exit signals.
- {'EXIT', _Pid, Reason} -> exit(Reason);
{frame, Frame} -> Frame;
terminate -> rabbit_channel:shutdown(ChannelPid),
read_frame(ChannelPid);
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index af1c629f41..e7d0c10177 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -52,7 +52,7 @@
-type(guid() :: binary()).
--spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(guid/0 :: () -> guid()).
-spec(string_guid/1 :: (any()) -> string()).
-spec(binstring_guid/1 :: (any()) -> binary()).
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index faddffc1fc..a9945af1d4 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -31,47 +31,53 @@
-module(rabbit_heartbeat).
--export([start_heartbeat/2, pause_monitor/1, resume_monitor/1]).
+-export([start_heartbeat_sender/3, start_heartbeat_receiver/3,
+ pause_monitor/1, resume_monitor/1]).
+
+-include("rabbit.hrl").
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--type(pids() :: rabbit_types:maybe({pid(), pid()})).
+-export_type([heartbeaters/0]).
+
+-type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})).
+
+-spec(start_heartbeat_sender/3 ::
+ (pid(), rabbit_net:socket(), non_neg_integer()) ->
+ rabbit_types:ok(pid())).
+-spec(start_heartbeat_receiver/3 ::
+ (pid(), rabbit_net:socket(), non_neg_integer()) ->
+ rabbit_types:ok(pid())).
--spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) -> pids()).
--spec(pause_monitor/1 :: (pids()) -> 'ok').
--spec(resume_monitor/1 :: (pids()) -> 'ok').
+-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok').
+-spec(resume_monitor/1 :: (heartbeaters()) -> 'ok').
-endif.
%%----------------------------------------------------------------------------
-start_heartbeat(_Sock, 0) ->
- none;
-start_heartbeat(Sock, TimeoutSec) ->
- Parent = self(),
+start_heartbeat_sender(_Parent, Sock, TimeoutSec) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
- Sender =
- spawn_link(fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2,
- send_oct, 0,
- fun () ->
- catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
- continue
- end}, Parent) end),
+ heartbeater(
+ {Sock, TimeoutSec * 1000 div 2, send_oct, 0,
+ fun () ->
+ catch rabbit_net:send(
+ Sock, rabbit_binary_generator:build_heartbeat_frame()),
+ continue
+ end}).
+
+start_heartbeat_receiver(Parent, Sock, TimeoutSec) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
- Receiver =
- spawn_link(fun () -> heartbeater({Sock, TimeoutSec * 1000,
- recv_oct, 1,
- fun () ->
- Parent ! timeout,
- stop
- end}, Parent) end),
- {Sender, Receiver}.
+ heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () ->
+ Parent ! timeout,
+ stop
+ end}).
pause_monitor(none) ->
ok;
@@ -87,19 +93,15 @@ resume_monitor({_Sender, Receiver}) ->
%%----------------------------------------------------------------------------
-heartbeater(Params, Parent) ->
- heartbeater(Params, erlang:monitor(process, Parent), {0, 0}).
+heartbeater(Params) ->
+ {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}.
heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
- MonitorRef, {StatVal, SameCount}) ->
- Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end,
+ {StatVal, SameCount}) ->
+ Recurse = fun (V) -> heartbeater(Params, V) end,
receive
- {'DOWN', MonitorRef, process, _Object, _Info} ->
- ok;
pause ->
receive
- {'DOWN', MonitorRef, process, _Object, _Info} ->
- ok;
resume ->
Recurse({0, 0});
Other ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 813ccc7538..da7078f1ba 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -35,7 +35,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
--export([start_link/2, shutdown/1]).
+-export([start_link/2]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1]).
@@ -45,8 +45,8 @@
-type(maybe_pid() :: pid() | 'undefined').
--spec(start_link/2 :: (pid(), non_neg_integer()) -> rabbit_types:ok(pid())).
--spec(shutdown/1 :: (maybe_pid()) -> 'ok').
+-spec(start_link/2 :: (pid(), non_neg_integer()) ->
+ rabbit_types:ok_pid_or_error()).
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped').
-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
@@ -76,17 +76,10 @@
start_link(ChPid, UnackedMsgCount) ->
gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []).
-shutdown(undefined) ->
- ok;
-shutdown(LimiterPid) ->
- true = unlink(LimiterPid),
- gen_server2:cast(LimiterPid, shutdown).
-
limit(undefined, 0) ->
ok;
limit(LimiterPid, PrefetchCount) ->
- unlink_on_stopped(LimiterPid,
- gen_server2:call(LimiterPid, {limit, PrefetchCount})).
+ gen_server2:call(LimiterPid, {limit, PrefetchCount}).
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
@@ -124,8 +117,7 @@ block(LimiterPid) ->
unblock(undefined) ->
ok;
unblock(LimiterPid) ->
- unlink_on_stopped(LimiterPid,
- gen_server2:call(LimiterPid, unblock, infinity)).
+ gen_server2:call(LimiterPid, unblock, infinity).
%%----------------------------------------------------------------------------
%% gen_server callbacks
@@ -164,9 +156,6 @@ handle_call(unblock, _From, State) ->
{stop, State1} -> {stop, normal, stopped, State1}
end.
-handle_cast(shutdown, State) ->
- {stop, normal, State};
-
handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
true -> Volume - Count
@@ -246,9 +235,3 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
ok
end,
State#lim{queues = NewQueues}.
-
-unlink_on_stopped(LimiterPid, stopped) ->
- ok = rabbit_misc:unlink_and_capture_exit(LimiterPid),
- stopped;
-unlink_on_stopped(_LimiterPid, Result) ->
- Result.
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 85bcbca04a..863f77e7eb 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -50,7 +50,7 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(debug/1 :: (string()) -> 'ok').
-spec(debug/2 :: (string(), [any()]) -> 'ok').
-spec(info/1 :: (string()) -> 'ok').
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index bdf3807531..f87b62713a 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -86,7 +86,7 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(update/0 :: () -> 'ok').
-spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok').
-spec(deregister/1 :: (pid()) -> 'ok').
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 5fa3f8edb4..086d260e24 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -39,7 +39,6 @@
-export([die/1, frame_error/2, amqp_error/4,
protocol_error/3, protocol_error/4, protocol_error/1]).
-export([not_found/1, assert_args_equivalence/4]).
--export([get_config/1, get_config/2, set_config/2]).
-export([dirty_read/1]).
-export([table_lookup/2]).
-export([r/3, r/2, r_arg/4, rs/1]).
@@ -108,10 +107,6 @@
rabbit_framing:amqp_table(),
rabbit_types:r(any()), [binary()]) ->
'ok' | rabbit_types:connection_exit()).
--spec(get_config/1 ::
- (atom()) -> rabbit_types:ok_or_error2(any(), 'not_found')).
--spec(get_config/2 :: (atom(), A) -> A).
--spec(set_config/2 :: (atom(), any()) -> 'ok').
-spec(dirty_read/1 ::
({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')).
-spec(table_lookup/2 ::
@@ -240,21 +235,6 @@ assert_args_equivalence1(Orig, New, Name, Key) ->
[Key, rabbit_misc:rs(Name), New1, Orig1])
end.
-get_config(Key) ->
- case dirty_read({rabbit_config, Key}) of
- {ok, {rabbit_config, Key, V}} -> {ok, V};
- Other -> Other
- end.
-
-get_config(Key, DefaultValue) ->
- case get_config(Key) of
- {ok, V} -> V;
- {error, not_found} -> DefaultValue
- end.
-
-set_config(Key, Value) ->
- ok = mnesia:dirty_write({rabbit_config, Key, Value}).
-
dirty_read(ReadSpec) ->
case mnesia:dirty_read(ReadSpec) of
[Result] -> {ok, Result};
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 505dc28fe8..a321488897 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -91,7 +91,6 @@ init() ->
ok = ensure_mnesia_running(),
ok = ensure_mnesia_dir(),
ok = init_db(read_cluster_nodes_config(), true),
- ok = wait_for_tables(),
ok.
is_db_empty() ->
@@ -114,7 +113,6 @@ cluster(ClusterNodes, Force) ->
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
try
ok = init_db(ClusterNodes, Force),
- ok = wait_for_tables(),
ok = create_cluster_nodes_config(ClusterNodes)
after
mnesia:stop()
@@ -157,57 +155,83 @@ table_definitions() ->
[{rabbit_user,
[{record_name, user},
{attributes, record_info(fields, user)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #user{_='_'}}]},
{rabbit_user_permission,
[{record_name, user_permission},
{attributes, record_info(fields, user_permission)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #user_permission{user_vhost = #user_vhost{_='_'},
+ permission = #permission{_='_'},
+ _='_'}}]},
{rabbit_vhost,
[{record_name, vhost},
{attributes, record_info(fields, vhost)},
- {disc_copies, [node()]}]},
- {rabbit_config,
- [{attributes, [key, val]}, % same mnesia's default
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #vhost{_='_'}}]},
{rabbit_listener,
[{record_name, listener},
{attributes, record_info(fields, listener)},
- {type, bag}]},
+ {type, bag},
+ {match, #listener{_='_'}}]},
{rabbit_durable_route,
[{record_name, route},
{attributes, record_info(fields, route)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #route{binding = binding_match(), _='_'}}]},
{rabbit_route,
[{record_name, route},
{attributes, record_info(fields, route)},
- {type, ordered_set}]},
+ {type, ordered_set},
+ {match, #route{binding = binding_match(), _='_'}}]},
{rabbit_reverse_route,
[{record_name, reverse_route},
{attributes, record_info(fields, reverse_route)},
- {type, ordered_set}]},
+ {type, ordered_set},
+ {match, #reverse_route{reverse_binding = reverse_binding_match(),
+ _='_'}}]},
%% Consider the implications to nodes_of_type/1 before altering
%% the next entry.
{rabbit_durable_exchange,
[{record_name, exchange},
{attributes, record_info(fields, exchange)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #exchange{name = exchange_name_match(), _='_'}}]},
{rabbit_exchange,
[{record_name, exchange},
- {attributes, record_info(fields, exchange)}]},
+ {attributes, record_info(fields, exchange)},
+ {match, #exchange{name = exchange_name_match(), _='_'}}]},
{rabbit_durable_queue,
[{record_name, amqqueue},
{attributes, record_info(fields, amqqueue)},
- {disc_copies, [node()]}]},
+ {disc_copies, [node()]},
+ {match, #amqqueue{name = queue_name_match(), _='_'}}]},
{rabbit_queue,
[{record_name, amqqueue},
- {attributes, record_info(fields, amqqueue)}]}].
+ {attributes, record_info(fields, amqqueue)},
+ {match, #amqqueue{name = queue_name_match(), _='_'}}]}].
+
+binding_match() ->
+ #binding{queue_name = queue_name_match(),
+ exchange_name = exchange_name_match(),
+ _='_'}.
+reverse_binding_match() ->
+ #reverse_binding{queue_name = queue_name_match(),
+ exchange_name = exchange_name_match(),
+ _='_'}.
+exchange_name_match() ->
+ resource_match(exchange).
+queue_name_match() ->
+ resource_match(queue).
+resource_match(Kind) ->
+ #resource{kind = Kind, _='_'}.
table_names() ->
[Tab || {Tab, _} <- table_definitions()].
replicated_table_names() ->
- [Tab || {Tab, Attrs} <- table_definitions(),
- not lists:member({local_content, true}, Attrs)
+ [Tab || {Tab, TabDef} <- table_definitions(),
+ not lists:member({local_content, true}, TabDef)
].
dir() -> mnesia:system_info(directory).
@@ -232,26 +256,55 @@ ensure_mnesia_not_running() ->
yes -> throw({error, mnesia_unexpectedly_running})
end.
+ensure_schema_integrity() ->
+ case check_schema_integrity() of
+ ok ->
+ ok;
+ {error, Reason} ->
+ throw({error, {schema_integrity_check_failed, Reason}})
+ end.
+
check_schema_integrity() ->
- TabDefs = table_definitions(),
Tables = mnesia:system_info(tables),
- case [Error || Tab <- table_names(),
+ case [Error || {Tab, TabDef} <- table_definitions(),
case lists:member(Tab, Tables) of
false ->
Error = {table_missing, Tab},
true;
true ->
- {_, TabDef} = proplists:lookup(Tab, TabDefs),
{_, ExpAttrs} = proplists:lookup(attributes, TabDef),
Attrs = mnesia:table_info(Tab, attributes),
Error = {table_attributes_mismatch, Tab,
ExpAttrs, Attrs},
Attrs /= ExpAttrs
end] of
- [] -> ok;
+ [] -> check_table_integrity();
Errors -> {error, Errors}
end.
+check_table_integrity() ->
+ ok = wait_for_tables(),
+ case lists:all(fun ({Tab, TabDef}) ->
+ {_, Match} = proplists:lookup(match, TabDef),
+ read_test_table(Tab, Match)
+ end, table_definitions()) of
+ true -> ok;
+ false -> {error, invalid_table_content}
+ end.
+
+read_test_table(Tab, Match) ->
+ case mnesia:dirty_first(Tab) of
+ '$end_of_table' ->
+ true;
+ Key ->
+ ObjList = mnesia:dirty_read(Tab, Key),
+ MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]),
+ case ets:match_spec_run(ObjList, MatchComp) of
+ ObjList -> true;
+ _ -> false
+ end
+ end.
+
%% The cluster node config file contains some or all of the disk nodes
%% that are members of the cluster this node is / should be a part of.
%%
@@ -347,8 +400,9 @@ init_db(ClusterNodes, Force) ->
ok = create_local_table_copies(case IsDiskNode of
true -> disc;
false -> ram
- end)
- end;
+ end),
+ ok = ensure_schema_integrity()
+ end;
{error, Reason} ->
%% one reason we may end up here is if we try to join
%% nodes together that are currently running standalone or
@@ -363,7 +417,9 @@ create_schema() ->
cannot_create_schema),
rabbit_misc:ensure_ok(mnesia:start(),
cannot_start_mnesia),
- create_tables().
+ ok = create_tables(),
+ ok = ensure_schema_integrity(),
+ ok = wait_for_tables().
move_db() ->
mnesia:stop(),
@@ -388,12 +444,13 @@ move_db() ->
ok.
create_tables() ->
- lists:foreach(fun ({Tab, TabArgs}) ->
- case mnesia:create_table(Tab, TabArgs) of
+ lists:foreach(fun ({Tab, TabDef}) ->
+ TabDef1 = proplists:delete(match, TabDef),
+ case mnesia:create_table(Tab, TabDef1) of
{atomic, ok} -> ok;
{aborted, Reason} ->
throw({error, {table_creation_failed,
- Tab, TabArgs, Reason}})
+ Tab, TabDef1, Reason}})
end
end,
table_definitions()),
@@ -448,17 +505,12 @@ wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
wait_for_tables() -> wait_for_tables(table_names()).
wait_for_tables(TableNames) ->
- case check_schema_integrity() of
- ok ->
- case mnesia:wait_for_tables(TableNames, 30000) of
- ok -> ok;
- {timeout, BadTabs} ->
- throw({error, {timeout_waiting_for_tables, BadTabs}});
- {error, Reason} ->
- throw({error, {failed_waiting_for_tables, Reason}})
- end;
+ case mnesia:wait_for_tables(TableNames, 30000) of
+ ok -> ok;
+ {timeout, BadTabs} ->
+ throw({error, {timeout_waiting_for_tables, BadTabs}});
{error, Reason} ->
- throw({error, {schema_integrity_check_failed, Reason}})
+ throw({error, {failed_waiting_for_tables, Reason}})
end.
reset(Force) ->
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 445aff487d..9be895ed35 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -126,8 +126,7 @@
-spec(start_link/4 ::
(atom(), file:filename(), [binary()] | 'undefined',
- startup_fun_state()) ->
- 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
+ startup_fun_state()) -> rabbit_types:ok_pid_or_error()).
-spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) ->
rabbit_types:ok(client_msstate())).
-spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) ->
@@ -321,7 +320,7 @@ start_link(Server, Dir, ClientRefs, StartupFunState) ->
write(Server, Guid, Msg,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
ok = update_msg_cache(CurFileCacheEts, Guid, Msg),
- {gen_server2:cast(Server, {write, self(), Guid, Msg}), CState}.
+ {gen_server2:cast(Server, {write, self(), Guid}), CState}.
read(Server, Guid,
CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
@@ -366,7 +365,7 @@ set_maximum_since_use(Server, Age) ->
client_init(Server, Ref) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
- gen_server2:call(Server, {new_client_state, Ref}, infinity),
+ gen_server2:pcall(Server, 7, {new_client_state, Ref}, infinity),
#client_msstate { file_handle_cache = dict:new(),
index_state = IState,
index_module = IModule,
@@ -383,10 +382,10 @@ client_terminate(CState) ->
client_delete_and_terminate(CState, Server, Ref) ->
ok = client_terminate(CState),
- ok = gen_server2:call(Server, {delete_client, Ref}, infinity).
+ ok = gen_server2:cast(Server, {delete_client, Ref}).
successfully_recovered_state(Server) ->
- gen_server2:call(Server, successfully_recovered_state, infinity).
+ gen_server2:pcall(Server, 7, successfully_recovered_state, infinity).
register_sync_callback(Server, Fun) ->
gen_server2:call(Server, {register_sync_callback, Fun}, infinity).
@@ -502,7 +501,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
+init([Server, BaseDir, ClientRefs, StartupFunState]) ->
process_flag(trap_exit, true),
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
@@ -513,11 +512,32 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
{ok, IndexModule} = application:get_env(msg_store_index_module),
rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]),
- {AllCleanShutdown, IndexState, ClientRefs1} =
- recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server),
+ AttemptFileSummaryRecovery =
+ case ClientRefs of
+ undefined -> ok = rabbit_misc:recursive_delete([Dir]),
+ ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
+ false;
+ _ -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
+ recover_crashed_compactions(Dir)
+ end,
+ %% if we found crashed compactions we trust neither the
+ %% file_summary nor the location index. Note the file_summary is
+ %% left empty here if it can't be recovered.
{FileSummaryRecovered, FileSummaryEts} =
- recover_file_summary(AllCleanShutdown, Dir, Server),
+ recover_file_summary(AttemptFileSummaryRecovery, Dir),
+
+ {CleanShutdown, IndexState, ClientRefs1} =
+ recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
+ ClientRefs, Dir, Server),
+ %% CleanShutdown => msg location index and file_summary both
+ %% recovered correctly.
+ true = case {FileSummaryRecovered, CleanShutdown} of
+ {true, false} -> ets:delete_all_objects(FileSummaryEts);
+ _ -> true
+ end,
+ %% CleanShutdown <=> msg location index and file_summary both
+ %% recovered correctly.
DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]),
FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles,
@@ -544,28 +564,16 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
client_refs = ClientRefs1,
- successfully_recovered = AllCleanShutdown,
+ successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
pid_to_fun = dict:new(),
pid_to_guids = dict:new()
},
- ok = case AllCleanShutdown of
- true -> ok;
- false -> count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State)
- end,
-
- FileNames =
- sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)),
- TmpFileNames =
- sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)),
- ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames),
-
- %% There should be no more tmp files now, so go ahead and load the
- %% whole lot
- Files = [filename_to_num(FileName) || FileName <- FileNames],
+ %% If we didn't recover the msg location index then we need to
+ %% rebuild it now.
{Offset, State1 = #msstate { current_file = CurFile }} =
- build_index(FileSummaryRecovered, Files, State),
+ build_index(CleanShutdown, StartupFunState, State),
%% read is only needed so that we can seek
{ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile),
@@ -606,11 +614,6 @@ handle_call({new_client_state, CRef}, _From,
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
-handle_call({delete_client, CRef}, _From,
- State = #msstate { client_refs = ClientRefs }) ->
- reply(ok,
- State #msstate { client_refs = sets:del_element(CRef, ClientRefs) });
-
handle_call({register_sync_callback, Fun}, {Pid, _},
State = #msstate { pid_to_fun = PTF }) ->
erlang:monitor(process, Pid),
@@ -618,7 +621,7 @@ handle_call({register_sync_callback, Fun}, {Pid, _},
State #msstate { pid_to_fun = dict:store(Pid, Fun, PTF) }).
-handle_cast({write, Pid, Guid, Msg},
+handle_cast({write, Pid, Guid},
State = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
sum_valid_data = SumValid,
@@ -628,6 +631,7 @@ handle_cast({write, Pid, Guid, Msg},
pid_to_fun = PTF,
pid_to_guids = PTG}) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
+ [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
case index_lookup(Guid, State) of
not_found ->
%% New message, lots to do
@@ -736,7 +740,12 @@ handle_cast({gc_done, Reclaimed, Src, Dst},
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
- noreply(State).
+ noreply(State);
+
+handle_cast({delete_client, CRef},
+ State = #msstate { client_refs = ClientRefs }) ->
+ noreply(
+ State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }).
handle_info(timeout, State) ->
noreply(internal_sync(State));
@@ -1082,9 +1091,9 @@ filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)).
-sort_file_names(FileNames) ->
+list_sorted_file_names(Dir, Ext) ->
lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end,
- FileNames).
+ filelib:wildcard("*" ++ Ext, Dir)).
%%----------------------------------------------------------------------------
%% message cache helper functions
@@ -1164,15 +1173,15 @@ index_delete_by_file(File, #msstate { index_module = Index,
%% shutdown and recovery
%%----------------------------------------------------------------------------
-recover_index_and_client_refs(IndexModule, undefined, Dir, _Server) ->
- ok = rabbit_misc:recursive_delete([Dir]),
- ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
+recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Server) ->
+ {false, IndexModule:new(Dir), sets:new()};
+recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Server) ->
+ rabbit_log:warning("~w: rebuilding indices from scratch~n", [Server]),
{false, IndexModule:new(Dir), sets:new()};
-recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server) ->
- ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
+recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
Fresh = fun (ErrorMsg, ErrorArgs) ->
- rabbit_log:warning("~w: " ++ ErrorMsg ++
- "~nrebuilding indices from scratch~n",
+ rabbit_log:warning("~w: " ++ ErrorMsg ++ "~n"
+ "rebuilding indices from scratch~n",
[Server | ErrorArgs]),
{false, IndexModule:new(Dir), sets:new()}
end,
@@ -1186,12 +1195,12 @@ recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server) ->
andalso IndexModule =:= RecIndexModule) of
true -> case IndexModule:recover(Dir) of
{ok, IndexState1} ->
- ClientRefs1 = sets:from_list(ClientRefs),
- {true, IndexState1, ClientRefs1};
+ {true, IndexState1,
+ sets:from_list(ClientRefs)};
{error, Error} ->
Fresh("failed to recover index: ~p", [Error])
end;
- false -> Fresh("recovery terms differ from present", [])
+ false -> Fresh("recovery terms differ from present", [])
end
end.
@@ -1212,7 +1221,7 @@ store_file_summary(Tid, Dir) ->
ok = ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME),
[{extended_info, [object_count]}]).
-recover_file_summary(false, _Dir, _Server) ->
+recover_file_summary(false, _Dir) ->
%% TODO: the only reason for this to be an *ordered*_set is so
%% that a) maybe_compact can start a traversal from the eldest
%% file, and b) build_index in fast recovery mode can easily
@@ -1222,15 +1231,12 @@ recover_file_summary(false, _Dir, _Server) ->
%% ditching the latter would be neater.
{false, ets:new(rabbit_msg_store_file_summary,
[ordered_set, public, {keypos, #file_summary.file}])};
-recover_file_summary(true, Dir, Server) ->
+recover_file_summary(true, Dir) ->
Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME),
case ets:file2tab(Path) of
- {ok, Tid} -> file:delete(Path),
+ {ok, Tid} -> file:delete(Path),
{true, Tid};
- {error, Error} -> rabbit_log:warning(
- "~w: failed to recover file summary: ~p~n"
- "rebuilding~n", [Server, Error]),
- recover_file_summary(false, Dir, Server)
+ {error, _Error} -> recover_file_summary(false, Dir)
end.
count_msg_refs(Gen, Seed, State) ->
@@ -1243,6 +1249,7 @@ count_msg_refs(Gen, Seed, State) ->
ok = case index_lookup(Guid, State) of
not_found ->
index_insert(#msg_location { guid = Guid,
+ file = undefined,
ref_count = Delta },
State);
#msg_location { ref_count = RefCount } = StoreEntry ->
@@ -1257,7 +1264,9 @@ count_msg_refs(Gen, Seed, State) ->
count_msg_refs(Gen, Next, State)
end.
-recover_crashed_compactions(Dir, FileNames, TmpFileNames) ->
+recover_crashed_compactions(Dir) ->
+ FileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION),
+ TmpFileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP),
lists:foreach(
fun (TmpFileName) ->
NonTmpRelatedFileName =
@@ -1266,103 +1275,26 @@ recover_crashed_compactions(Dir, FileNames, TmpFileNames) ->
ok = recover_crashed_compaction(
Dir, TmpFileName, NonTmpRelatedFileName)
end, TmpFileNames),
- ok.
+ TmpFileNames == [].
recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) ->
- {ok, UncorruptedMessagesTmp, GuidsTmp} =
- scan_file_for_valid_messages_and_guids(Dir, TmpFileName),
- {ok, UncorruptedMessages, Guids} =
- scan_file_for_valid_messages_and_guids(Dir, NonTmpRelatedFileName),
- %% 1) It's possible that everything in the tmp file is also in the
- %% main file such that the main file is (prefix ++
- %% tmpfile). This means that compaction failed immediately
- %% prior to the final step of deleting the tmp file. Plan: just
- %% delete the tmp file
- %% 2) It's possible that everything in the tmp file is also in the
- %% main file but with holes throughout (or just somthing like
- %% main = (prefix ++ hole ++ tmpfile)). This means that
- %% compaction wrote out the tmp file successfully and then
- %% failed. Plan: just delete the tmp file and allow the
- %% compaction to eventually be triggered later
- %% 3) It's possible that everything in the tmp file is also in the
- %% main file but such that the main file does not end with tmp
- %% file (and there are valid messages in the suffix; main =
- %% (prefix ++ tmpfile[with extra holes?] ++ suffix)). This
- %% means that compaction failed as we were writing out the tmp
- %% file. Plan: just delete the tmp file and allow the
- %% compaction to eventually be triggered later
- %% 4) It's possible that there are messages in the tmp file which
- %% are not in the main file. This means that writing out the
- %% tmp file succeeded, but then we failed as we were copying
- %% them back over to the main file, after truncating the main
- %% file. As the main file has already been truncated, it should
- %% consist only of valid messages. Plan: Truncate the main file
- %% back to before any of the files in the tmp file and copy
- %% them over again
- TmpPath = form_filename(Dir, TmpFileName),
- case is_sublist(GuidsTmp, Guids) of
- true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file
- %% note this also catches the case when the tmp file
- %% is empty
- ok = file:delete(TmpPath);
- false ->
- %% We're in case 4 above. We only care about the inital
- %% msgs in main file that are not in the tmp file. If
- %% there are no msgs in the tmp file then we would be in
- %% the 'true' branch of this case, so we know the
- %% lists:last call is safe.
- EldestTmpGuid = lists:last(GuidsTmp),
- {Guids1, UncorruptedMessages1}
- = case lists:splitwith(
- fun (Guid) -> Guid =/= EldestTmpGuid end, Guids) of
- {_Guids, []} -> %% no msgs from tmp in main
- {Guids, UncorruptedMessages};
- {Dropped, [EldestTmpGuid | Rest]} ->
- %% Msgs in Dropped are in tmp, so forget them.
- %% *cry*. Lists indexed from 1.
- {Rest, lists:sublist(UncorruptedMessages,
- 2 + length(Dropped),
- length(Rest))}
- end,
- %% The main file prefix should be contiguous
- {Top, Guids1} = find_contiguous_block_prefix(
- lists:reverse(UncorruptedMessages1)),
- %% we should have that none of the messages in the prefix
- %% are in the tmp file
- true = is_disjoint(Guids1, GuidsTmp),
- %% must open with read flag, otherwise will stomp over contents
- {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName,
- [read | ?WRITE_MODE]),
- %% Wipe out any rubbish at the end of the file. Remember
- %% the head of the list will be the highest entry in the
- %% file.
- [{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp,
- TmpSize = TmpTopOffset + TmpTopTotalSize,
- %% Extend the main file as big as necessary in a single
- %% move. If we run out of disk space, this truncate could
- %% fail, but we still aren't risking losing data
- ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize),
- {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_AHEAD_MODE),
- {ok, TmpSize} = file_handle_cache:copy(TmpHdl, MainHdl, TmpSize),
- ok = file_handle_cache:close(MainHdl),
- ok = file_handle_cache:delete(TmpHdl),
-
- {ok, _MainMessages, GuidsMain} =
- scan_file_for_valid_messages_and_guids(
- Dir, NonTmpRelatedFileName),
- %% check that everything in Guids1 is in GuidsMain
- true = is_sublist(Guids1, GuidsMain),
- %% check that everything in GuidsTmp is in GuidsMain
- true = is_sublist(GuidsTmp, GuidsMain)
- end,
+ %% Because a msg can legitimately appear multiple times in the
+ %% same file, identifying the contents of the tmp file and where
+ %% they came from is non-trivial. If we are recovering a crashed
+ %% compaction then we will be rebuilding the index, which can cope
+ %% with duplicates appearing. Thus the simplest and safest thing
+ %% to do is to append the contents of the tmp file to its main
+ %% file.
+ {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_MODE),
+ {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName,
+ ?READ_MODE ++ ?WRITE_MODE),
+ {ok, _End} = file_handle_cache:position(MainHdl, eof),
+ Size = filelib:file_size(form_filename(Dir, TmpFileName)),
+ {ok, Size} = file_handle_cache:copy(TmpHdl, MainHdl, Size),
+ ok = file_handle_cache:close(MainHdl),
+ ok = file_handle_cache:delete(TmpHdl),
ok.
-is_sublist(SmallerL, BiggerL) ->
- lists:all(fun (Item) -> lists:member(Item, BiggerL) end, SmallerL).
-
-is_disjoint(SmallerL, BiggerL) ->
- lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL).
-
scan_file_for_valid_messages(Dir, FileName) ->
case open_file(Dir, FileName, ?READ_MODE) of
{ok, Hdl} -> Valid = rabbit_msg_file:scan(
@@ -1376,10 +1308,6 @@ scan_file_for_valid_messages(Dir, FileName) ->
{error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
end.
-scan_file_for_valid_messages_and_guids(Dir, FileName) ->
- {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, FileName),
- {ok, Messages, [Guid || {Guid, _TotalSize, _FileOffset} <- Messages]}.
-
%% Takes the list in *ascending* order (i.e. eldest message
%% first). This is the opposite of what scan_file_for_valid_messages
%% produces. The list of msgs that is produced is youngest first.
@@ -1394,8 +1322,8 @@ find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail],
find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) ->
{ExpectedOffset, Guids}.
-build_index(true, _Files, State = #msstate {
- file_summary_ets = FileSummaryEts }) ->
+build_index(true, _StartupFunState,
+ State = #msstate { file_summary_ets = FileSummaryEts }) ->
ets:foldl(
fun (#file_summary { valid_total_size = ValidTotalSize,
file_size = FileSize,
@@ -1407,12 +1335,17 @@ build_index(true, _Files, State = #msstate {
sum_file_size = SumFileSize + FileSize,
current_file = File }}
end, {0, State}, FileSummaryEts);
-build_index(false, Files, State) ->
+build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},
+ State = #msstate { dir = Dir }) ->
+ ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
{ok, Pid} = gatherer:start_link(),
- case Files of
- [] -> build_index(Pid, undefined, [State #msstate.current_file], State);
- _ -> {Offset, State1} = build_index(Pid, undefined, Files, State),
- {Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)}
+ case [filename_to_num(FileName) ||
+ FileName <- list_sorted_file_names(Dir, ?FILE_EXTENSION)] of
+ [] -> build_index(Pid, undefined, [State #msstate.current_file],
+ State);
+ Files -> {Offset, State1} = build_index(Pid, undefined, Files, State),
+ {Offset, lists:foldl(fun delete_file_if_empty/2,
+ State1, Files)}
end.
build_index(Gatherer, Left, [],
@@ -1453,14 +1386,14 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
lists:foldl(
fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
case index_lookup(Guid, State) of
- not_found ->
- {VMAcc, VTSAcc};
- StoreEntry ->
+ #msg_location { file = undefined } = StoreEntry ->
ok = index_update(StoreEntry #msg_location {
file = File, offset = Offset,
total_size = TotalSize },
State),
- {[Obj | VMAcc], VTSAcc + TotalSize}
+ {[Obj | VMAcc], VTSAcc + TotalSize};
+ _ ->
+ {VMAcc, VTSAcc}
end
end, {[], 0}, Messages),
%% foldl reverses lists, find_contiguous_block_prefix needs
@@ -1715,9 +1648,10 @@ find_unremoved_messages_in_file(File,
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
%% foldl will reverse so will end up with msgs in ascending offset order
- lists:foldl(fun ({Guid, TotalSize, _Offset}, Acc = {List, Size}) ->
+ lists:foldl(fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) ->
case Index:lookup(Guid, IndexState) of
- #msg_location { file = File } = Entry ->
+ #msg_location { file = File, total_size = TotalSize,
+ offset = Offset } = Entry ->
{[ Entry | List ], TotalSize + Size};
_ ->
Acc
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index eaa411730c..c7948b7eb3 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -56,7 +56,7 @@
-ifdef(use_specs).
-spec(start_link/4 :: (file:filename(), any(), atom(), ets:tid()) ->
- 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
+ rabbit_types:ok_pid_or_error()).
-spec(gc/3 :: (pid(), non_neg_integer(), non_neg_integer()) -> 'ok').
-spec(no_readers/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(stop/1 :: (pid()) -> 'ok').
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index 3facef17f7..c7a5a60027 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -93,7 +93,14 @@ usage() ->
action(start_all, [NodeCount], RpcTimeout) ->
io:format("Starting all nodes...~n", []),
application:load(rabbit),
- NodeName = rabbit_misc:nodeparts(getenv("RABBITMQ_NODENAME")),
+ {_NodeNamePrefix, NodeHost} = NodeName = rabbit_misc:nodeparts(
+ getenv("RABBITMQ_NODENAME")),
+ case net_adm:names(NodeHost) of
+ {error, EpmdReason} ->
+ throw({cannot_connect_to_epmd, NodeHost, EpmdReason});
+ {ok, _} ->
+ ok
+ end,
{NodePids, Running} =
case list_to_integer(NodeCount) of
1 -> {NodePid, Started} = start_node(rabbit_misc:makenode(NodeName),
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 3a3357ba9d..f656e04cba 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -118,7 +118,7 @@ start() ->
{rabbit_tcp_client_sup,
{tcp_client_sup, start_link,
[{local, rabbit_tcp_client_sup},
- {rabbit_reader,start_link,[]}]},
+ {rabbit_connection_sup,start_link,[]}]},
transient, infinity, supervisor, [tcp_client_sup]}),
ok.
@@ -204,10 +204,10 @@ on_node_down(Node) ->
ok = mnesia:dirty_delete(rabbit_listener, Node).
start_client(Sock, SockTransform) ->
- {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
- ok = rabbit_net:controlling_process(Sock, Child),
- Child ! {go, Sock, SockTransform},
- Child.
+ {ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []),
+ ok = rabbit_net:controlling_process(Sock, Reader),
+ Reader ! {go, Sock, SockTransform},
+ Reader.
start_client(Sock) ->
start_client(Sock, fun (S) -> {ok, S} end).
@@ -230,8 +230,9 @@ start_ssl_client(SslOpts, Sock) ->
end).
connections() ->
- [Pid || {_, Pid, _, _} <- supervisor:which_children(
- rabbit_tcp_client_sup)].
+ [rabbit_connection_sup:reader(ConnSup) ||
+ {_, ConnSup, supervisor, _}
+ <- supervisor:which_children(rabbit_tcp_client_sup)].
connection_info_keys() -> rabbit_reader:info_keys().
@@ -242,8 +243,7 @@ connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end).
connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end).
close_connection(Pid, Explanation) ->
- case lists:any(fun ({_, ChildPid, _, _}) -> ChildPid =:= Pid end,
- supervisor:which_children(rabbit_tcp_client_sup)) of
+ case lists:member(Pid, connections()) of
true -> rabbit_reader:shutdown(Pid, Explanation);
false -> throw({error, {not_a_connection_pid, Pid}})
end.
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index a427b13548..66e5cf6311 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -73,9 +73,8 @@
{deliver, pmsg()} |
{ack, pmsg()}).
--spec(start_link/1 ::
- ([rabbit_amqqueue:name()])
- -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
+-spec(start_link/1 :: ([rabbit_amqqueue:name()]) ->
+ rabbit_types:ok_pid_or_error()).
-spec(transaction/1 :: ([work_item()]) -> 'ok').
-spec(extend_transaction/2 ::
({rabbit_types:txn(), rabbit_amqqueue:name()}, [work_item()])
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index a170fb1da8..26274a365e 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -51,6 +51,7 @@
%%----------------------------------------------------------------------------
start() ->
+ io:format("Activating RabbitMQ plugins ..."),
%% Ensure Rabbit is loaded so we can access it's environment
application:load(rabbit),
@@ -76,7 +77,7 @@ start() ->
AppList
end,
AppVersions = [determine_version(App) || App <- AllApps],
- {rabbit, RabbitVersion} = proplists:lookup(rabbit, AppVersions),
+ RabbitVersion = proplists:get_value(rabbit, AppVersions),
%% Build the overall release descriptor
RDesc = {release,
@@ -129,8 +130,10 @@ start() ->
ok -> ok;
error -> error("failed to compile boot script file ~s", [ScriptFile])
end,
- io:format("~n~w plugins activated.~n~n", [length(PluginApps)]),
- [io:format("* ~w~n", [App]) || App <- PluginApps],
+ io:format("~n~w plugins activated:~n", [length(PluginApps)]),
+ [io:format("* ~s-~s~n", [App, proplists:get_value(App, AppVersions)])
+ || App <- PluginApps],
+ io:nl(),
halt(),
ok.
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 9257ec82ab..0a49b94d09 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -33,7 +33,7 @@
-behaviour(gen_server).
--export([start_link/0, register/2, delete_all/1, shutdown/1]).
+-export([start_link/0, register/2, delete_all/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -46,10 +46,9 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> rabbit_types:ok(pid())).
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok').
-spec(delete_all/1 :: (pid()) -> 'ok').
--spec(shutdown/1 :: (pid()) -> 'ok').
-endif.
@@ -64,9 +63,6 @@ register(CollectorPid, Q) ->
delete_all(CollectorPid) ->
gen_server:call(CollectorPid, delete_all, infinity).
-shutdown(CollectorPid) ->
- gen_server:cast(CollectorPid, shutdown).
-
%%----------------------------------------------------------------------------
init([]) ->
@@ -90,8 +86,8 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) ->
|| {MonitorRef, Q} <- dict:to_list(Queues)],
{reply, ok, State}.
-handle_cast(shutdown, State) ->
- {stop, normal, State}.
+handle_cast(Msg, State) ->
+ {stop, {unhandled_cast, Msg}, State}.
handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
State = #state{queues = Queues}) ->
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index a82da57ae7..fbc3f8aa42 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -259,8 +259,8 @@ publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) ->
true -> ?PUB_PERSIST_JPREFIX;
false -> ?PUB_TRANS_JPREFIX
end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]),
- State2 = State#qistate { unsynced_guids =
- [Guid | State#qistate.unsynced_guids] },
+ State2 = State1 #qistate { unsynced_guids =
+ [Guid | State1#qistate.unsynced_guids] },
maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State2)).
deliver(SeqIds, State) ->
@@ -273,7 +273,7 @@ sync([], State) ->
State;
sync(_SeqIds, State = #qistate { journal_handle = undefined }) ->
State;
-sync(SeqIds, State = #qistate { journal_handle = JournalHdl,
+sync(_SeqIds, State = #qistate { journal_handle = JournalHdl,
on_sync = OnSyncFun }) ->
%% The SeqIds here contains the SeqId of every publish and ack in
%% the transaction. Ideally we should go through these seqids and
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index d5ade90f6f..09ada1c0c8 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -33,11 +33,11 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/0, info_keys/0, info/1, info/2, shutdown/2]).
+-export([start_link/3, info_keys/0, info/1, info/2, shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/1, mainloop/2]).
+-export([init/4, mainloop/2]).
-export([conserve_memory/2, server_properties/0]).
@@ -46,7 +46,6 @@
-export([emit_stats/1]).
-import(gen_tcp).
--import(fprof).
-import(inet).
-import(prim_inet).
@@ -60,7 +59,8 @@
%---------------------------------------------------------------------------
-record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
- connection_state, queue_collector, heartbeater, stats_timer}).
+ connection_state, queue_collector, heartbeater, stats_timer,
+ channel_sup_sup_pid, start_heartbeat_fun}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
@@ -160,6 +160,12 @@
-ifdef(use_specs).
+-type(start_heartbeat_fun() ::
+ fun ((rabbit_networking:socket(), non_neg_integer()) ->
+ rabbit_heartbeat:heartbeaters())).
+
+-spec(start_link/3 :: (pid(), pid(), start_heartbeat_fun()) ->
+ rabbit_types:ok(pid())).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
@@ -168,21 +174,33 @@
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
+%% These specs only exists to add no_return() to keep dialyzer happy
+-spec(init/4 :: (pid(), pid(), pid(), start_heartbeat_fun()) -> no_return()).
+-spec(start_connection/7 ::
+ (pid(), pid(), pid(), start_heartbeat_fun(), any(),
+ rabbit_networking:socket(),
+ fun ((rabbit_networking:socket()) ->
+ rabbit_types:ok_or_error2(
+ rabbit_networking:socket(), any()))) -> no_return()).
+
-endif.
%%--------------------------------------------------------------------------
-start_link() ->
- {ok, proc_lib:spawn_link(?MODULE, init, [self()])}.
+start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) ->
+ {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid,
+ Collector, StartHeartbeatFun])}.
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent) ->
+init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) ->
Deb = sys:debug_options([]),
receive
{go, Sock, SockTransform} ->
- start_connection(Parent, Deb, Sock, SockTransform)
+ start_connection(
+ Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock,
+ SockTransform)
end.
system_continue(Parent, Deb, State) ->
@@ -208,33 +226,6 @@ info(Pid, Items) ->
emit_stats(Pid) ->
gen_server:cast(Pid, emit_stats).
-setup_profiling() ->
- Value = rabbit_misc:get_config(profiling_enabled, false),
- case Value of
- once ->
- rabbit_log:info("Enabling profiling for this connection, "
- "and disabling for subsequent.~n"),
- rabbit_misc:set_config(profiling_enabled, false),
- fprof:trace(start);
- true ->
- rabbit_log:info("Enabling profiling for this connection.~n"),
- fprof:trace(start);
- false ->
- ok
- end,
- Value.
-
-teardown_profiling(Value) ->
- case Value of
- false ->
- ok;
- _ ->
- rabbit_log:info("Completing profiling for this connection.~n"),
- fprof:trace(stop),
- fprof:profile(),
- fprof:analyse([{dest, []}, {cols, 100}])
- end.
-
conserve_memory(Pid, Conserve) ->
Pid ! {conserve_memory, Conserve},
ok.
@@ -261,7 +252,8 @@ socket_op(Sock, Fun) ->
exit(normal)
end.
-start_connection(Parent, Deb, Sock, SockTransform) ->
+start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
+ Sock, SockTransform) ->
process_flag(trap_exit, true),
{PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1),
PeerAddressS = inet_parse:ntoa(PeerAddress),
@@ -270,28 +262,29 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
ClientSock = socket_op(Sock, SockTransform),
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
- ProfilingValue = setup_profiling(),
- {ok, Collector} = rabbit_queue_collector:start_link(),
try
mainloop(Deb, switch_callback(
- #v1{parent = Parent,
- sock = ClientSock,
- connection = #connection{
- user = none,
- timeout_sec = ?HANDSHAKE_TIMEOUT,
- frame_max = ?FRAME_MIN_SIZE,
- vhost = none,
- client_properties = none,
- protocol = none},
- callback = uninitialized_callback,
- recv_length = 0,
- recv_ref = none,
- connection_state = pre_init,
- queue_collector = Collector,
- heartbeater = none,
- stats_timer =
- rabbit_event:init_stats_timer()},
- handshake, 8))
+ #v1{parent = Parent,
+ sock = ClientSock,
+ connection = #connection{
+ protocol = none,
+ user = none,
+ timeout_sec = ?HANDSHAKE_TIMEOUT,
+ frame_max = ?FRAME_MIN_SIZE,
+ vhost = none,
+ client_properties = none},
+ callback = uninitialized_callback,
+ recv_length = 0,
+ recv_ref = none,
+ connection_state = pre_init,
+ queue_collector = Collector,
+ heartbeater = none,
+ stats_timer =
+ rabbit_event:init_stats_timer(),
+ channel_sup_sup_pid = ChannelSupSupPid,
+ start_heartbeat_fun = StartHeartbeatFun
+ },
+ handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
fun rabbit_log:warning/2;
@@ -308,9 +301,6 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
%% output to be sent, which results in unnecessary delays.
%%
%% gen_tcp:close(ClientSock),
- teardown_profiling(ProfilingValue),
- rabbit_misc:unlink_and_capture_exit(Collector),
- rabbit_queue_collector:shutdown(Collector),
rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
@@ -347,10 +337,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
exit(Reason);
{channel_exit, _Chan, E = {writer, send_failed, _Error}} ->
throw(E);
- {channel_exit, Channel, Reason} ->
- mainloop(Deb, handle_channel_exit(Channel, Reason, State));
- {'EXIT', Pid, Reason} ->
- mainloop(Deb, handle_dependent_exit(Pid, Reason, State));
+ {channel_exit, ChannelOrFrPid, Reason} ->
+ mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State));
+ {'EXIT', ChSupPid, Reason} ->
+ mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State));
terminate_connection ->
State;
handshake_timeout ->
@@ -443,33 +433,45 @@ close_channel(Channel, State) ->
put({channel, Channel}, closing),
State.
-handle_channel_exit(ChPid, Reason, State) when is_pid(ChPid) ->
- {channel, Channel} = get({chpid, ChPid}),
+handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) ->
+ {channel, Channel} = get({ch_fr_pid, ChFrPid}),
handle_exception(State, Channel, Reason);
handle_channel_exit(Channel, Reason, State) ->
handle_exception(State, Channel, Reason).
-handle_dependent_exit(Pid, normal, State) ->
- erase({chpid, Pid}),
- maybe_close(State);
-handle_dependent_exit(Pid, Reason, State) ->
- case channel_cleanup(Pid) of
- undefined -> exit({abnormal_dependent_exit, Pid, Reason});
- Channel -> maybe_close(handle_exception(State, Channel, Reason))
+handle_dependent_exit(ChSupPid, Reason, State) ->
+ case termination_kind(Reason) of
+ controlled ->
+ case erase({ch_sup_pid, ChSupPid}) of
+ undefined -> ok;
+ {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr)
+ end,
+ maybe_close(State);
+ uncontrolled ->
+ case channel_cleanup(ChSupPid) of
+ undefined ->
+ exit({abnormal_dependent_exit, ChSupPid, Reason});
+ Channel ->
+ maybe_close(handle_exception(State, Channel, Reason))
+ end
end.
-channel_cleanup(Pid) ->
- case get({chpid, Pid}) of
- undefined -> undefined;
- {channel, Channel} -> erase({channel, Channel}),
- erase({chpid, Pid}),
- Channel
+channel_cleanup(ChSupPid) ->
+ case get({ch_sup_pid, ChSupPid}) of
+ undefined -> undefined;
+ {{channel, Channel}, ChFr} -> erase({channel, Channel}),
+ erase(ChFr),
+ erase({ch_sup_pid, ChSupPid}),
+ Channel
end.
-all_channels() -> [Pid || {{chpid, Pid},_} <- get()].
+all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid},
+ {_Channel, {ch_fr_pid, ChFrPid}}} <- get()].
terminate_channels() ->
- NChannels = length([exit(Pid, normal) || Pid <- all_channels()]),
+ NChannels =
+ length([rabbit_framing_channel:shutdown(ChFrPid)
+ || ChFrPid <- all_channels()]),
if NChannels > 0 ->
Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
@@ -487,14 +489,15 @@ wait_for_channel_termination(0, TimerRef) ->
wait_for_channel_termination(N, TimerRef) ->
receive
- {'EXIT', Pid, Reason} ->
- case channel_cleanup(Pid) of
+ {'EXIT', ChSupPid, Reason} ->
+ case channel_cleanup(ChSupPid) of
undefined ->
- exit({abnormal_dependent_exit, Pid, Reason});
+ exit({abnormal_dependent_exit, ChSupPid, Reason});
Channel ->
- case Reason of
- normal -> ok;
- _ ->
+ case termination_kind(Reason) of
+ controlled ->
+ ok;
+ uncontrolled ->
rabbit_log:error(
"connection ~p, channel ~p - "
"error while terminating:~n~p~n",
@@ -519,6 +522,11 @@ maybe_close(State = #v1{connection_state = closing,
maybe_close(State) ->
State.
+termination_kind(normal) -> controlled;
+termination_kind(shutdown) -> controlled;
+termination_kind({shutdown, _Term}) -> controlled;
+termination_kind(_) -> uncontrolled.
+
handle_frame(Type, 0, Payload,
State = #v1{connection_state = CS,
connection = #connection{protocol = Protocol}})
@@ -548,8 +556,8 @@ handle_frame(Type, Channel, Payload,
AnalyzedFrame ->
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
- {chpid, ChPid} ->
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame),
+ {ch_fr_pid, ChFrPid} ->
+ ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame),
case AnalyzedFrame of
{method, 'channel.close', _} ->
erase({channel, Channel}),
@@ -732,7 +740,8 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
connection = Connection,
- sock = Sock}) ->
+ sock = Sock,
+ start_heartbeat_fun = SHF}) ->
if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) ->
rabbit_misc:protocol_error(
not_allowed, "frame_max=~w < ~w min size",
@@ -742,8 +751,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ?FRAME_MAX]);
true ->
- Heartbeater = rabbit_heartbeat:start_heartbeat(
- Sock, ClientHeartbeat),
+ Heartbeater = SHF(Sock, ClientHeartbeat),
State#v1{connection_state = opening,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
@@ -761,9 +769,10 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
- rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
- State1 = State#v1{connection_state = running,
- connection = NewConnection},
+ State1 = internal_conserve_memory(
+ rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ State#v1{connection_state = running,
+ connection = NewConnection}),
rabbit_event:notify(
connection_created,
[{Item, i(Item, State1)} || Item <- ?CREATION_EVENT_KEYS]),
@@ -848,21 +857,21 @@ i(Item, #v1{}) ->
%%--------------------------------------------------------------------------
-send_to_new_channel(Channel, AnalyzedFrame,
- State = #v1{queue_collector = Collector}) ->
- #v1{sock = Sock, connection = #connection{
- frame_max = FrameMax,
- user = #user{username = Username},
- vhost = VHost,
- protocol = Protocol}} = State,
- {ok, WriterPid} = rabbit_writer:start(Sock, Channel, FrameMax, Protocol),
- {ok, ChPid} = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/6,
- [Channel, self(), WriterPid, Username, VHost, Collector],
- Protocol),
- put({channel, Channel}, {chpid, ChPid}),
- put({chpid, ChPid}, {channel, Channel}),
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
+send_to_new_channel(Channel, AnalyzedFrame, State) ->
+ #v1{sock = Sock, queue_collector = Collector,
+ channel_sup_sup_pid = ChanSupSup,
+ connection = #connection{protocol = Protocol,
+ frame_max = FrameMax,
+ user = #user{username = Username},
+ vhost = VHost}} = State,
+ {ok, ChSupPid, ChFrPid} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, {Protocol, Sock, Channel, FrameMax,
+ self(), Username, VHost, Collector}),
+ put({channel, Channel}, {ch_fr_pid, ChFrPid}),
+ put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}),
+ put({ch_fr_pid, ChFrPid}, {channel, Channel}),
+ ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame).
log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index c07055af40..d783c43e46 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -35,9 +35,6 @@
-export([all_tests/0, test_parsing/0]).
-%% Exported so the hook mechanism can call back
--export([handle_hook/3, bad_handle_hook/3, extra_arg_hook/5]).
-
-import(lists).
-include("rabbit.hrl").
@@ -55,6 +52,8 @@ test_content_prop_roundtrip(Datum, Binary) ->
all_tests() ->
application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
+ ok = file_handle_cache:set_limit(10),
+ passed = test_file_handle_cache(),
passed = test_backing_queue(),
passed = test_priority_queue(),
passed = test_bpqueue(),
@@ -1020,7 +1019,8 @@ test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
{ok, Ch} = rabbit_channel:start_link(1, self(), Writer,
- <<"user">>, <<"/">>, self()),
+ <<"user">>, <<"/">>, self(),
+ fun (_) -> {ok, self()} end),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
[rabbit_amqqueue:declare(
@@ -1061,67 +1061,23 @@ test_server_status() ->
%% cleanup
[{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]],
+
+ unlink(Ch),
ok = rabbit_channel:shutdown(Ch),
passed.
-test_hooks() ->
- %% Firing of hooks calls all hooks in an isolated manner
- rabbit_hooks:subscribe(test_hook, test, {rabbit_tests, handle_hook, []}),
- rabbit_hooks:subscribe(test_hook, test2, {rabbit_tests, handle_hook, []}),
- rabbit_hooks:subscribe(test_hook2, test2, {rabbit_tests, handle_hook, []}),
- rabbit_hooks:trigger(test_hook, [arg1, arg2]),
- [arg1, arg2] = get(test_hook_test_fired),
- [arg1, arg2] = get(test_hook_test2_fired),
- undefined = get(test_hook2_test2_fired),
-
- %% Hook Deletion works
- put(test_hook_test_fired, undefined),
- put(test_hook_test2_fired, undefined),
- rabbit_hooks:unsubscribe(test_hook, test),
- rabbit_hooks:trigger(test_hook, [arg3, arg4]),
- undefined = get(test_hook_test_fired),
- [arg3, arg4] = get(test_hook_test2_fired),
- undefined = get(test_hook2_test2_fired),
-
- %% Catches exceptions from bad hooks
- rabbit_hooks:subscribe(test_hook3, test, {rabbit_tests, bad_handle_hook, []}),
- ok = rabbit_hooks:trigger(test_hook3, []),
-
- %% Passing extra arguments to hooks
- rabbit_hooks:subscribe(arg_hook, test, {rabbit_tests, extra_arg_hook, [1, 3]}),
- rabbit_hooks:trigger(arg_hook, [arg1, arg2]),
- {[arg1, arg2], 1, 3} = get(arg_hook_test_fired),
-
- %% Invoking Pids
- Remote = fun () ->
- receive
- {rabbitmq_hook,[remote_test,test,[],Target]} ->
- Target ! invoked
- end
- end,
- P = spawn(Remote),
- rabbit_hooks:subscribe(remote_test, test, {rabbit_hooks, notify_remote, [P, [self()]]}),
- rabbit_hooks:trigger(remote_test, []),
- receive
- invoked -> ok
- after 100 ->
- io:format("Remote hook not invoked"),
- throw(timeout)
- end,
- passed.
-
test_spawn(Receiver) ->
Me = self(),
Writer = spawn(fun () -> Receiver(Me) end),
- {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"guest">>,
- <<"/">>, self()),
+ {ok, Ch} = rabbit_channel:start_link(1, Me, Writer,
+ <<"guest">>, <<"/">>, self(),
+ fun (_) -> {ok, self()} end),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
- MRef = erlang:monitor(process, Ch),
receive #'channel.open_ok'{} -> ok
after 1000 -> throw(failed_to_receive_channel_open_ok)
end,
- {Writer, Ch, MRef}.
+ {Writer, Ch}.
test_statistics_receiver(Pid) ->
receive
@@ -1160,7 +1116,7 @@ test_statistics() ->
%% by far the most complex code though.
%% Set up a channel and queue
- {_Writer, Ch, _MRef} = test_spawn(fun test_statistics_receiver/1),
+ {_Writer, Ch} = test_spawn(fun test_statistics_receiver/1),
rabbit_channel:do(Ch, #'queue.declare'{}),
QName = receive #'queue.declare_ok'{queue = Q0} ->
Q0
@@ -1404,17 +1360,35 @@ delete_log_handlers(Handlers) ->
Handler <- Handlers],
ok.
-handle_hook(HookName, Handler, Args) ->
- A = atom_to_list(HookName) ++ "_" ++ atom_to_list(Handler) ++ "_fired",
- put(list_to_atom(A), Args).
-bad_handle_hook(_, _, _) ->
- exit(bad_handle_hook_called).
-extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) ->
- handle_hook(Hookname, Handler, {Args, Extra1, Extra2}).
-
test_supervisor_delayed_restart() ->
test_sup:test_supervisor_delayed_restart().
+test_file_handle_cache() ->
+ %% test copying when there is just one spare handle
+ Limit = file_handle_cache:get_limit(),
+ ok = file_handle_cache:set_limit(5), %% 1 or 2 sockets, 2 msg_stores
+ TmpDir = filename:join(rabbit_mnesia:dir(), "tmp"),
+ ok = filelib:ensure_dir(filename:join(TmpDir, "nothing")),
+ Pid = spawn(fun () -> {ok, Hdl} = file_handle_cache:open(
+ filename:join(TmpDir, "file3"),
+ [write], []),
+ receive close -> ok end,
+ file_handle_cache:delete(Hdl)
+ end),
+ Src = filename:join(TmpDir, "file1"),
+ Dst = filename:join(TmpDir, "file2"),
+ Content = <<"foo">>,
+ ok = file:write_file(Src, Content),
+ {ok, SrcHdl} = file_handle_cache:open(Src, [read], []),
+ {ok, DstHdl} = file_handle_cache:open(Dst, [write], []),
+ Size = size(Content),
+ {ok, Size} = file_handle_cache:copy(SrcHdl, DstHdl, Size),
+ ok = file_handle_cache:delete(SrcHdl),
+ file_handle_cache:delete(DstHdl),
+ Pid ! close,
+ ok = file_handle_cache:set_limit(Limit),
+ passed.
+
test_backing_queue() ->
case application:get_env(rabbit, backing_queue_module) of
{ok, rabbit_variable_queue} ->
@@ -1624,7 +1598,8 @@ init_test_queue() ->
test_queue(), true, false,
fun (Guid) ->
rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
- end).
+ end,
+ fun (_Guids) -> ok end).
restart_test_queue(Qi) ->
_ = rabbit_queue_index:terminate([], Qi),
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index a9313503a1..47e8bb0161 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -40,8 +40,8 @@
unencoded_content/0, encoded_content/0, vhost/0, ctag/0,
amqp_error/0, r/1, r2/2, r3/3, ssl_socket/0, listener/0,
binding/0, amqqueue/0, exchange/0, connection/0, protocol/0,
- user/0, error/1, ok_or_error/1, ok_or_error2/2, ok/1,
- channel_exit/0, connection_exit/0]).
+ user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2,
+ ok_pid_or_error/0, channel_exit/0, connection_exit/0]).
-type(channel_exit() :: no_return()).
-type(connection_exit() :: no_return()).
@@ -147,5 +147,6 @@
-type(error(A) :: {'error', A}).
-type(ok_or_error(A) :: 'ok' | error(A)).
-type(ok_or_error2(A, B) :: ok(A) | error(B)).
+-type(ok_pid_or_error() :: ok_or_error2(pid(), any())).
-endif. % use_specs
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index f90ee734a8..fd5b5ba5f0 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -33,9 +33,9 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/4, start_link/4, shutdown/1, mainloop/1]).
--export([send_command/2, send_command/3, send_command_and_signal_back/3,
- send_command_and_signal_back/4, send_command_and_notify/5]).
+-export([start/5, start_link/5, mainloop/2, mainloop1/2]).
+-export([send_command/2, send_command/3, send_command_sync/2,
+ send_command_sync/3, send_command_and_notify/5]).
-export([internal_send_command/4, internal_send_command/6]).
-import(gen_tcp).
@@ -48,24 +48,23 @@
-ifdef(use_specs).
--spec(start/4 ::
+-spec(start/5 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol())
+ non_neg_integer(), rabbit_types:protocol(), pid())
-> rabbit_types:ok(pid())).
--spec(start_link/4 ::
+-spec(start_link/5 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol())
+ non_neg_integer(), rabbit_types:protocol(), pid())
-> rabbit_types:ok(pid())).
-spec(send_command/2 ::
(pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(send_command/3 ::
(pid(), rabbit_framing:amqp_method_record(), rabbit_types:content())
-> 'ok').
--spec(send_command_and_signal_back/3 ::
- (pid(), rabbit_framing:amqp_method(), pid()) -> 'ok').
--spec(send_command_and_signal_back/4 ::
- (pid(), rabbit_framing:amqp_method(), rabbit_types:content(), pid())
- -> 'ok').
+-spec(send_command_sync/2 ::
+ (pid(), rabbit_framing:amqp_method()) -> 'ok').
+-spec(send_command_sync/3 ::
+ (pid(), rabbit_framing:amqp_method(), rabbit_types:content()) -> 'ok').
-spec(send_command_and_notify/5 ::
(pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
rabbit_types:content())
@@ -84,23 +83,35 @@
%%----------------------------------------------------------------------------
-start(Sock, Channel, FrameMax, Protocol) ->
- {ok, spawn(?MODULE, mainloop, [#wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol}])}.
-
-start_link(Sock, Channel, FrameMax, Protocol) ->
- {ok, spawn_link(?MODULE, mainloop, [#wstate{sock = Sock,
+start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
+ {ok,
+ proc_lib:spawn(?MODULE, mainloop, [ReaderPid,
+ #wstate{sock = Sock,
channel = Channel,
frame_max = FrameMax,
protocol = Protocol}])}.
-mainloop(State) ->
+start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
+ {ok,
+ proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid,
+ #wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol}])}.
+
+mainloop(ReaderPid, State) ->
+ try
+ mainloop1(ReaderPid, State)
+ catch
+ exit:Error -> ReaderPid ! {channel_exit, #wstate.channel, Error}
+ end,
+ done.
+
+mainloop1(ReaderPid, State) ->
receive
- Message -> ?MODULE:mainloop(handle_message(Message, State))
+ Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State))
after ?HIBERNATE_AFTER ->
- erlang:hibernate(?MODULE, mainloop, [State])
+ erlang:hibernate(?MODULE, mainloop, [ReaderPid, State])
end.
handle_message({send_command, MethodRecord},
@@ -116,20 +127,20 @@ handle_message({send_command, MethodRecord, Content},
ok = internal_send_command_async(Sock, Channel, MethodRecord,
Content, FrameMax, Protocol),
State;
-handle_message({send_command_and_signal_back, MethodRecord, Parent},
+handle_message({send_command_sync, From, MethodRecord},
State = #wstate{sock = Sock, channel = Channel,
protocol = Protocol}) ->
ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol),
- Parent ! rabbit_writer_send_command_signal,
+ gen_server:reply(From, ok),
State;
-handle_message({send_command_and_signal_back, MethodRecord, Content, Parent},
+handle_message({send_command_sync, From, {MethodRecord, Content}},
State = #wstate{sock = Sock,
channel = Channel,
frame_max = FrameMax,
protocol = Protocol}) ->
ok = internal_send_command_async(Sock, Channel, MethodRecord,
Content, FrameMax, Protocol),
- Parent ! rabbit_writer_send_command_signal,
+ gen_server:reply(From, ok),
State;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
State = #wstate{sock = Sock,
@@ -144,8 +155,6 @@ handle_message({inet_reply, _, ok}, State) ->
State;
handle_message({inet_reply, _, Status}, _State) ->
exit({writer, send_failed, Status});
-handle_message(shutdown, _State) ->
- exit(normal);
handle_message(Message, _State) ->
exit({writer, message_not_understood, Message}).
@@ -159,22 +168,21 @@ send_command(W, MethodRecord, Content) ->
W ! {send_command, MethodRecord, Content},
ok.
-send_command_and_signal_back(W, MethodRecord, Parent) ->
- W ! {send_command_and_signal_back, MethodRecord, Parent},
- ok.
+send_command_sync(W, MethodRecord) ->
+ call(W, send_command_sync, MethodRecord).
-send_command_and_signal_back(W, MethodRecord, Content, Parent) ->
- W ! {send_command_and_signal_back, MethodRecord, Content, Parent},
- ok.
+send_command_sync(W, MethodRecord, Content) ->
+ call(W, send_command_sync, {MethodRecord, Content}).
send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
ok.
-shutdown(W) ->
- W ! shutdown,
- rabbit_misc:unlink_and_capture_exit(W),
- ok.
+%---------------------------------------------------------------------------
+
+call(Pid, Label, Msg) ->
+ {ok, Res} = gen:call(Pid, Label, Msg, infinity),
+ Res.
%---------------------------------------------------------------------------
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index ecb2655f60..4a1c5832b3 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -31,6 +31,11 @@
%% the MaxT and MaxR parameters to permit the child to be
%% restarted. This may require waiting for longer than Delay.
%%
+%% 4) Added an 'intrinsic' restart type. Like the transient type, this
+%% type means the child should only be restarted if the child exits
+%% abnormally. Unlike the transient type, if the child exits
+%% normally, the supervisor itself also exits normally.
+%%
%% All modifications are (C) 2010 Rabbit Technologies Ltd.
%%
%% %CopyrightBegin%
@@ -58,7 +63,7 @@
-export([start_link/2,start_link/3,
start_child/2, restart_child/2,
delete_child/2, terminate_child/2,
- which_children/1,
+ which_children/1, find_child/2,
check_childspecs/1]).
-export([behaviour_info/1]).
@@ -133,6 +138,10 @@ terminate_child(Supervisor, Name) ->
which_children(Supervisor) ->
call(Supervisor, which_children).
+find_child(Supervisor, Name) ->
+ [Pid || {Name1, Pid, _Type, _Modules} <- which_children(Supervisor),
+ Name1 =:= Name].
+
call(Supervisor, Req) ->
gen_server:call(Supervisor, Req, infinity).
@@ -534,13 +543,16 @@ do_restart({RestartType, Delay}, Reason, Child, State) ->
do_restart(permanent, Reason, Child, State) ->
report_error(child_terminated, Reason, Child, State#state.name),
restart(Child, State);
+do_restart(intrinsic, normal, Child, State) ->
+ {shutdown, state_del_child(Child, State)};
do_restart(_, normal, Child, State) ->
NState = state_del_child(Child, State),
{ok, NState};
do_restart(_, shutdown, Child, State) ->
NState = state_del_child(Child, State),
{ok, NState};
-do_restart(transient, Reason, Child, State) ->
+do_restart(Type, Reason, Child, State) when Type =:= transient orelse
+ Type =:= intrinsic ->
report_error(child_terminated, Reason, Child, State#state.name),
restart(Child, State);
do_restart(temporary, Reason, Child, State) ->
@@ -641,14 +653,22 @@ terminate_simple_children(Child, Dynamics, SupName) ->
ok.
do_terminate(Child, SupName) when Child#child.pid =/= undefined ->
- case shutdown(Child#child.pid,
- Child#child.shutdown) of
- ok ->
- Child#child{pid = undefined};
- {error, OtherReason} ->
- report_error(shutdown_error, OtherReason, Child, SupName),
- Child#child{pid = undefined}
- end;
+ ReportError = fun (Reason) ->
+ report_error(shutdown_error, Reason, Child, SupName)
+ end,
+ case shutdown(Child#child.pid, Child#child.shutdown) of
+ ok ->
+ ok;
+ {error, normal} ->
+ case Child#child.restart_type of
+ permanent -> ReportError(normal);
+ {permanent, _Delay} -> ReportError(normal);
+ _ -> ok
+ end;
+ {error, OtherReason} ->
+ ReportError(OtherReason)
+ end,
+ Child#child{pid = undefined};
do_terminate(Child, _SupName) ->
Child.
@@ -834,7 +854,7 @@ supname(N,_) -> N.
%%% where Name is an atom
%%% Func is {Mod, Fun, Args} == {atom, atom, list}
%%% RestartType is permanent | temporary | transient |
-%%% {permanent, Delay} |
+%%% intrinsic | {permanent, Delay} |
%%% {transient, Delay} where Delay >= 0
%%% Shutdown = integer() | infinity | brutal_kill
%%% ChildType = supervisor | worker
@@ -884,6 +904,7 @@ validFunc(Func) -> throw({invalid_mfa, Func}).
validRestartType(permanent) -> true;
validRestartType(temporary) -> true;
validRestartType(transient) -> true;
+validRestartType(intrinsic) -> true;
validRestartType({permanent, Delay}) -> validDelay(Delay);
validRestartType({transient, Delay}) -> validDelay(Delay);
validRestartType(RestartType) -> throw({invalid_restart_type,
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index cc4982c9cb..c9809ace61 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -55,6 +55,7 @@ handle_call(_Request, _From, State) ->
{noreply, State}.
handle_cast(accept, State) ->
+ ok = file_handle_cache:obtain(),
accept(State);
handle_cast(_Msg, State) ->
@@ -83,7 +84,8 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
%% is drained.
gen_event:which_handlers(error_logger),
%% handle
- file_handle_cache:release_on_death(apply(M, F, A ++ [Sock]))
+ file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
+ ok = file_handle_cache:obtain()
catch {inet_error, Reason} ->
gen_tcp:close(Sock),
error_logger:error_msg("unable to accept TCP connection: ~p~n",
@@ -92,11 +94,13 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
%% accept more
accept(State);
+
handle_info({inet_async, LSock, Ref, {error, closed}},
State=#state{sock=LSock, ref=Ref}) ->
%% It would be wrong to attempt to restart the acceptor when we
%% know this will fail.
{stop, normal, State};
+
handle_info(_Info, State) ->
{noreply, State}.
@@ -111,7 +115,6 @@ code_change(_OldVsn, State, _Extra) ->
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
accept(State = #state{sock=LSock}) ->
- ok = file_handle_cache:obtain(),
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} -> {noreply, State#state{ref=Ref}};
Error -> {stop, {cannot_accept, Error}, State}
diff --git a/src/tcp_client_sup.erl b/src/tcp_client_sup.erl
index 1b78584384..02d7e0e40d 100644
--- a/src/tcp_client_sup.erl
+++ b/src/tcp_client_sup.erl
@@ -31,19 +31,19 @@
-module(tcp_client_sup).
--behaviour(supervisor).
+-behaviour(supervisor2).
-export([start_link/1, start_link/2]).
-export([init/1]).
start_link(Callback) ->
- supervisor:start_link(?MODULE, Callback).
+ supervisor2:start_link(?MODULE, Callback).
start_link(SupName, Callback) ->
- supervisor:start_link(SupName, ?MODULE, Callback).
+ supervisor2:start_link(SupName, ?MODULE, Callback).
init({M,F,A}) ->
- {ok, {{simple_one_for_one, 10, 10},
+ {ok, {{simple_one_for_one_terminate, 10, 10},
[{tcp_client, {M,F,A},
- temporary, brutal_kill, worker, [M]}]}}.
+ temporary, infinity, supervisor, [M]}]}}.
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 3cbc80f819..e658f005a3 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -72,10 +72,7 @@
-ifdef(use_specs).
--spec(start_link/1 ::
- (float()) -> 'ignore' |
- rabbit_types:error(any()) |
- rabbit_types:ok(pid())).
+-spec(start_link/1 :: (float()) -> {'ok', pid()} | {'error', any()}).
-spec(update/0 :: () -> 'ok').
-spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')).
-spec(get_vm_limit/0 :: () -> non_neg_integer()).
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 01ce3535d8..595884e033 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -52,7 +52,7 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
+-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A).
-spec(submit_async/1 ::
(fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl
index afa21164be..177a14533b 100644
--- a/src/worker_pool_sup.erl
+++ b/src/worker_pool_sup.erl
@@ -41,9 +41,8 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
--spec(start_link/1 :: (non_neg_integer()) ->
- 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
+-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | {'error', any()}).
-endif.
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index a61e4cc372..42049d5068 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -44,8 +44,7 @@
-ifdef(use_specs).
--spec(start_link/1 ::
- (any()) -> {'ok', pid()} | 'ignore' | rabbit_types:error(any())).
+-spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}).
-spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A).
-spec(submit_async/2 ::
(pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').