summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2012-01-24 18:27:38 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2012-01-24 18:27:38 +0000
commit60a09e95e6a3bb0f3383097e108a10d5269f0e2d (patch)
treecff4001e2d7dcded69dd810989a035c579ff2b62
parent2142d1b62ef070cb413222920dac858ad53c33df (diff)
parentc177658c5547d4648f47b6dbb92e5e86dd84cb2b (diff)
downloadrabbitmq-server-git-60a09e95e6a3bb0f3383097e108a10d5269f0e2d.tar.gz
merge default into bug20337
-rw-r--r--Makefile5
-rw-r--r--docs/rabbitmq-service.xml3
-rw-r--r--docs/rabbitmqctl.1.xml7
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--include/rabbit.hrl2
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--packaging/macports/Portfile.in6
-rwxr-xr-xscripts/rabbitmq-server2
-rwxr-xr-xscripts/rabbitmq-server.bat2
-rwxr-xr-xscripts/rabbitmq-service.bat4
-rw-r--r--src/file_handle_cache.erl11
-rw-r--r--src/gen_server2.erl18
-rw-r--r--src/gm.erl42
-rw-r--r--src/mirrored_supervisor.erl65
-rw-r--r--src/mirrored_supervisor_tests.erl23
-rw-r--r--src/rabbit.erl40
-rw-r--r--src/rabbit_alarm.erl3
-rw-r--r--src/rabbit_amqqueue.erl68
-rw-r--r--src/rabbit_amqqueue_process.erl7
-rw-r--r--src/rabbit_basic.erl6
-rw-r--r--src/rabbit_binding.erl22
-rw-r--r--src/rabbit_channel.erl3
-rw-r--r--src/rabbit_client_sup.erl5
-rw-r--r--src/rabbit_control.erl14
-rw-r--r--src/rabbit_exchange_type_topic.erl120
-rw-r--r--src/rabbit_memory_monitor.erl40
-rw-r--r--src/rabbit_mirror_queue_slave.erl19
-rw-r--r--src/rabbit_misc.erl19
-rw-r--r--src/rabbit_mnesia.erl13
-rw-r--r--src/rabbit_networking.erl160
-rw-r--r--src/rabbit_plugins.erl34
-rw-r--r--src/rabbit_queue_index.erl30
-rw-r--r--src/rabbit_reader.erl66
-rw-r--r--src/rabbit_restartable_sup.erl3
-rw-r--r--src/rabbit_router.erl60
-rw-r--r--src/rabbit_ssl.erl26
-rw-r--r--src/rabbit_tests.erl8
-rw-r--r--src/rabbit_types.erl14
-rw-r--r--src/rabbit_upgrade_functions.erl8
-rw-r--r--src/supervisor2.erl4
-rw-r--r--src/tcp_acceptor.erl7
-rw-r--r--src/tcp_acceptor_sup.erl6
-rw-r--r--src/tcp_listener.erl9
-rw-r--r--src/tcp_listener_sup.erl12
-rw-r--r--src/worker_pool.erl7
-rw-r--r--src/worker_pool_worker.erl10
47 files changed, 596 insertions, 447 deletions
diff --git a/Makefile b/Makefile
index d46b43489b..31c71dd44b 100644
--- a/Makefile
+++ b/Makefile
@@ -56,7 +56,7 @@ endif
#other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests
ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(call boolean_macro,$(USE_SPECS),use_specs) $(call boolean_macro,$(USE_PROPER_QC),use_proper_qc)
-VERSION=0.0.0
+VERSION?=0.0.0
PLUGINS_SRC_DIR?=$(shell [ -d "plugins-src" ] && echo "plugins-src" || echo )
PLUGINS_DIR=plugins
TARBALL_NAME=rabbitmq-server-$(VERSION)
@@ -246,7 +246,8 @@ stop-cover: all
srcdist: distclean
mkdir -p $(TARGET_SRC_DIR)/codegen
cp -r ebin src include LICENSE LICENSE-MPL-RabbitMQ INSTALL README $(TARGET_SRC_DIR)
- sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save
+ sed 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in > $(TARGET_SRC_DIR)/ebin/rabbit_app.in.tmp && \
+ mv $(TARGET_SRC_DIR)/ebin/rabbit_app.in.tmp $(TARGET_SRC_DIR)/ebin/rabbit_app.in
cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/
cp codegen.py Makefile generate_app generate_deps calculate-relative $(TARGET_SRC_DIR)
diff --git a/docs/rabbitmq-service.xml b/docs/rabbitmq-service.xml
index 3368960b80..a4bd158087 100644
--- a/docs/rabbitmq-service.xml
+++ b/docs/rabbitmq-service.xml
@@ -66,7 +66,8 @@ Display usage information.
<para>
Install the service. The service will not be started.
Subsequent invocations will update the service parameters if
-relevant environment variables were modified.
+relevant environment variables were modified or if the active
+plugins were changed.
</para>
</listitem>
</varlistentry>
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 157550387d..7268f09037 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1273,9 +1273,10 @@
<para>
Displays broker status information such as the running
applications on the current Erlang node, RabbitMQ and
- Erlang versions, OS name and memory statistics. (See
- the <command>cluster_status</command> command to find
- out which nodes are clustered and running.)
+ Erlang versions, OS name, memory and file descriptor
+ statistics. (See the <command>cluster_status</command>
+ command to find out which nodes are clustered and
+ running.)
</para>
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl status</screen>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 5ead105145..9301af6bdc 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -42,5 +42,6 @@
{reuseaddr, true},
{backlog, 128},
{nodelay, true},
+ {linger, {true, 0}},
{exit_on_close, false}]}
]}]}.
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index a603886c4b..d81b82dbbc 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -56,9 +56,11 @@
-record(binding, {source, key, destination, args = []}).
-record(reverse_binding, {destination, key, source, args = []}).
+-record(topic_trie_node, {trie_node, edge_count, binding_count}).
-record(topic_trie_edge, {trie_edge, node_id}).
-record(topic_trie_binding, {trie_binding, value = const}).
+-record(trie_node, {exchange_name, node_id}).
-record(trie_edge, {exchange_name, node_id, word}).
-record(trie_binding, {exchange_name, node_id, destination}).
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 96d3974fb3..a6899005b3 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -121,6 +121,9 @@ done
rm -rf %{buildroot}
%changelog
+* Fri Dec 16 2011 steve@rabbitmq.com 2.7.1-1
+- New Upstream Release
+
* Tue Nov 8 2011 steve@rabbitmq.com 2.7.0-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index ceb08ed092..b3743c39e5 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (2.7.1-1) natty; urgency=low
+
+ * New Upstream Release
+
+ -- Steve Powell <steve@rabbitmq.com> Fri, 16 Dec 2011 12:12:36 +0000
+
rabbitmq-server (2.7.0-1) natty; urgency=low
* New Upstream Release
diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in
index b6dad35792..360fb394eb 100644
--- a/packaging/macports/Portfile.in
+++ b/packaging/macports/Portfile.in
@@ -60,6 +60,12 @@ use_configure no
use_parallel_build yes
+build.env-append HOME=${workpath}
+
+build.env-append VERSION=${version}
+
+destroot.env-append VERSION=${version}
+
destroot.target install_bin
destroot.destdir \
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 1831f87699..39a68c8e0f 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -117,7 +117,7 @@ exec erl \
-sasl sasl_error_logger false \
-rabbit error_logger '{file,"'${RABBITMQ_LOGS}'"}' \
-rabbit sasl_error_logger '{file,"'${RABBITMQ_SASL_LOGS}'"}' \
- -os_mon start_cpu_sup true \
+ -os_mon start_cpu_sup false \
-os_mon start_disksup false \
-os_mon start_memsup false \
-mnesia dir "\"${RABBITMQ_MNESIA_DIR}\"" \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index c27b418adb..44ce1ce148 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -139,7 +139,7 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
-sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
--os_mon start_cpu_sup true ^
+-os_mon start_cpu_sup false ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 4d3fce49b0..1582bfb142 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -145,7 +145,7 @@ if not exist "!RABBITMQ_BASE!" (
"!ERLANG_SERVICE_MANAGER_PATH!\erlsrv" list !RABBITMQ_SERVICENAME! 2>NUL 1>NUL
if errorlevel 1 (
- "!ERLANG_SERVICE_MANAGER_PATH!\erlsrv" add !RABBITMQ_SERVICENAME!
+ "!ERLANG_SERVICE_MANAGER_PATH!\erlsrv" add !RABBITMQ_SERVICENAME! -internalservicename !RABBITMQ_SERVICENAME!
) else (
echo !RABBITMQ_SERVICENAME! service is already present - only updating service parameters
)
@@ -204,7 +204,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
--os_mon start_cpu_sup true ^
+-os_mon start_cpu_sup false ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 6c3f1b5f50..c11fb54bcb 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -125,8 +125,7 @@
%% requesting process is considered to 'own' one more
%% descriptor. release/0 is the inverse operation and releases a
%% previously obtained descriptor. transfer/1 transfers ownership of a
-%% file descriptor between processes. It is non-blocking. Obtain is
-%% used to obtain permission to accept file descriptors. Obtain has a
+%% file descriptor between processes. It is non-blocking. Obtain has a
%% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use
%% the entire limit, but will be evicted by obtain calls up to the
%% point at which no more obtain calls can be satisfied by the obtains
@@ -262,7 +261,7 @@
-endif.
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [obtain_count, obtain_limit]).
+-define(INFO_KEYS, [total_limit, total_used, sockets_limit, sockets_used]).
%%----------------------------------------------------------------------------
%% Public API
@@ -790,8 +789,10 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
-i(obtain_count, #fhc_state{obtain_count = Count}) -> Count;
-i(obtain_limit, #fhc_state{obtain_limit = Limit}) -> Limit;
+i(total_limit, #fhc_state{limit = Limit}) -> Limit;
+i(total_used, #fhc_state{open_count = C1, obtain_count = C2}) -> C1 + C2;
+i(sockets_limit, #fhc_state{obtain_limit = Limit}) -> Limit;
+i(sockets_used, #fhc_state{obtain_count = Count}) -> Count;
i(Item, _) -> throw({bad_argument, Item}).
%%----------------------------------------------------------------------------
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index ab6c4e64f3..49913d2694 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -1079,7 +1079,7 @@ get_proc_name({local, Name}) ->
exit(process_not_registered)
end;
get_proc_name({global, Name}) ->
- case global:safe_whereis_name(Name) of
+ case whereis_name(Name) of
undefined ->
exit(process_not_registered_globally);
Pid when Pid =:= self() ->
@@ -1101,7 +1101,7 @@ get_parent() ->
name_to_pid(Name) ->
case whereis(Name) of
undefined ->
- case global:safe_whereis_name(Name) of
+ case whereis_name(Name) of
undefined ->
exit(could_not_find_registerd_name);
Pid ->
@@ -1111,6 +1111,20 @@ name_to_pid(Name) ->
Pid
end.
+whereis_name(Name) ->
+ case ets:lookup(global_names, Name) of
+ [{_Name, Pid, _Method, _RPid, _Ref}] ->
+ if node(Pid) == node() ->
+ case is_process_alive(Pid) of
+ true -> Pid;
+ false -> undefined
+ end;
+ true ->
+ Pid
+ end;
+ [] -> undefined
+ end.
+
find_prioritisers(GS2State = #gs2_state { mod = Mod }) ->
PrioriCall = function_exported_or_default(
Mod, 'prioritise_call', 3,
diff --git a/src/gm.erl b/src/gm.erl
index 8c838a7056..6c89912213 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -386,6 +386,7 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
-define(BROADCAST_TIMER, 25).
+-define(VERSION_START, 0).
-define(SETS, ordsets).
-define(DICT, orddict).
@@ -515,8 +516,8 @@ group_members(Server) ->
init([GroupName, Module, Args]) ->
{MegaSecs, Secs, MicroSecs} = now(),
random:seed(MegaSecs, Secs, MicroSecs),
+ Self = make_member(GroupName),
gen_server2:cast(self(), join),
- Self = self(),
{ok, #state { self = Self,
left = {Self, undefined},
right = {Self, undefined},
@@ -541,7 +542,8 @@ handle_call({confirmed_broadcast, Msg}, _From,
right = {Self, undefined},
module = Module,
callback_args = Args }) ->
- handle_callback_result({Module:handle_msg(Args, Self, Msg), ok, State});
+ handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
+ ok, State});
handle_call({confirmed_broadcast, Msg}, From, State) ->
internal_broadcast(Msg, From, State);
@@ -604,7 +606,8 @@ handle_cast({broadcast, Msg},
right = {Self, undefined},
module = Module,
callback_args = Args }) ->
- handle_callback_result({Module:handle_msg(Args, Self, Msg), State});
+ handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
+ State});
handle_cast({broadcast, Msg}, State) ->
internal_broadcast(Msg, none, State);
@@ -623,7 +626,7 @@ handle_cast(join, State = #state { self = Self,
State1 = check_neighbours(State #state { view = View,
members_state = MembersState }),
handle_callback_result(
- {Module:joined(Args, all_known_members(View)), State1});
+ {Module:joined(Args, get_pids(all_known_members(View))), State1});
handle_cast(leave, State) ->
{stop, normal, State}.
@@ -817,7 +820,7 @@ internal_broadcast(Msg, From, State = #state { self = Self,
confirms = Confirms,
callback_args = Args,
broadcast_buffer = Buffer }) ->
- Result = Module:handle_msg(Args, Self, Msg),
+ Result = Module:handle_msg(Args, get_pid(Self), Msg),
Buffer1 = [{PubCount, Msg} | Buffer],
Confirms1 = case From of
none -> Confirms;
@@ -979,7 +982,7 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
end,
try
case gen_server2:call(
- Left, {add_on_right, Self}, infinity) of
+ get_pid(Left), {add_on_right, Self}, infinity) of
{ok, Group1} -> group_to_view(Group1);
not_ready -> join_group(Self, GroupName)
end
@@ -1005,7 +1008,7 @@ prune_or_create_group(Self, GroupName) ->
mnesia:sync_transaction(
fun () -> GroupNew = #gm_group { name = GroupName,
members = [Self],
- version = 0 },
+ version = ?VERSION_START },
case mnesia:read({?GROUP_TABLE, GroupName}) of
[] ->
mnesia:write(GroupNew),
@@ -1114,24 +1117,25 @@ can_erase_view_member(_Self, _Id, _LA, _LP) -> false.
ensure_neighbour(_Ver, Self, {Self, undefined}, Self) ->
{Self, undefined};
ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) ->
- ok = gen_server2:cast(RealNeighbour, {?TAG, Ver, check_neighbours}),
+ ok = gen_server2:cast(get_pid(RealNeighbour),
+ {?TAG, Ver, check_neighbours}),
{RealNeighbour, maybe_monitor(RealNeighbour, Self)};
ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) ->
{RealNeighbour, MRef};
ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
true = erlang:demonitor(MRef),
Msg = {?TAG, Ver, check_neighbours},
- ok = gen_server2:cast(RealNeighbour, Msg),
+ ok = gen_server2:cast(get_pid(RealNeighbour), Msg),
ok = case Neighbour of
Self -> ok;
- _ -> gen_server2:cast(Neighbour, Msg)
+ _ -> gen_server2:cast(get_pid(Neighbour), Msg)
end,
{Neighbour, maybe_monitor(Neighbour, Self)}.
maybe_monitor(Self, Self) ->
undefined;
maybe_monitor(Other, _Self) ->
- erlang:monitor(process, Other).
+ erlang:monitor(process, get_pid(Other)).
check_neighbours(State = #state { self = Self,
left = Left,
@@ -1238,6 +1242,15 @@ prepare_members_state(MembersState) ->
build_members_state(MembersStateList) ->
?DICT:from_list(MembersStateList).
+make_member(GroupName) ->
+ {case read_group(GroupName) of
+ #gm_group { version = Version } -> Version;
+ {error, not_found} -> ?VERSION_START
+ end, self()}.
+
+get_pid({_Version, Pid}) -> Pid.
+
+get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
%% ---------------------------------------------------------------------------
%% Activity assembly
@@ -1262,13 +1275,13 @@ maybe_send_activity(Activity, #state { self = Self,
send_right(Right, View, {activity, Self, Activity}).
send_right(Right, View, Msg) ->
- ok = gen_server2:cast(Right, {?TAG, view_version(View), Msg}).
+ ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}).
callback(Args, Module, Activity) ->
lists:foldl(
fun ({Id, Pubs, _Acks}, ok) ->
lists:foldl(fun ({_PubNum, Pub}, ok) ->
- Module:handle_msg(Args, Id, Pub);
+ Module:handle_msg(Args, get_pid(Id), Pub);
(_, Error) ->
Error
end, ok, Pubs);
@@ -1283,7 +1296,8 @@ callback_view_changed(Args, Module, OldView, NewView) ->
Deaths = OldMembers -- NewMembers,
case {Births, Deaths} of
{[], []} -> ok;
- _ -> Module:members_changed(Args, Births, Deaths)
+ _ -> Module:members_changed(Args, get_pids(Births),
+ get_pids(Deaths))
end.
handle_callback_result({Result, State}) ->
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 8dfe39f890..3ba8f50d2e 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -144,32 +144,17 @@
-type child() :: pid() | 'undefined'.
-type child_id() :: term().
--type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | 'undefined'}.
-type modules() :: [module()] | 'dynamic'.
--type restart() :: 'permanent' | 'transient' | 'temporary'.
--type shutdown() :: 'brutal_kill' | timeout().
-type worker() :: 'worker' | 'supervisor'.
-type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}.
-type sup_ref() :: (Name :: atom())
| {Name :: atom(), Node :: node()}
| {'global', Name :: atom()}
| pid().
--type child_spec() :: {Id :: child_id(),
- StartFunc :: mfargs(),
- Restart :: restart(),
- Shutdown :: shutdown(),
- Type :: worker(),
- Modules :: modules()}.
-type startlink_err() :: {'already_started', pid()} | 'shutdown' | term().
-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}.
--type startchild_err() :: 'already_present'
- | {'already_started', Child :: child()} | term().
--type startchild_ret() :: {'ok', Child :: child()}
- | {'ok', Child :: child(), Info :: term()}
- | {'error', startchild_err()}.
-
-type group_name() :: any().
-spec start_link(GroupName, Module, Args) -> startlink_ret() when
@@ -183,9 +168,9 @@
Module :: module(),
Args :: term().
--spec start_child(SupRef, ChildSpec) -> startchild_ret() when
+-spec start_child(SupRef, ChildSpec) -> supervisor:startchild_ret() when
SupRef :: sup_ref(),
- ChildSpec :: child_spec() | (List :: [term()]).
+ ChildSpec :: supervisor:child_spec() | (List :: [term()]).
-spec restart_child(SupRef, Id) -> Result when
SupRef :: sup_ref(),
@@ -215,12 +200,12 @@
Modules :: modules().
-spec check_childspecs(ChildSpecs) -> Result when
- ChildSpecs :: [child_spec()],
+ ChildSpecs :: [supervisor:child_spec()],
Result :: 'ok' | {'error', Error :: term()}.
-spec start_internal(Group, ChildSpecs) -> Result when
Group :: group_name(),
- ChildSpecs :: [child_spec()],
+ ChildSpecs :: [supervisor:child_spec()],
Result :: startlink_ret().
-spec create_tables() -> Result when
@@ -242,8 +227,10 @@ start_link({global, _SupName}, _Group, _Mod, _Args) ->
start_link0(Prefix, Group, Init) ->
case apply(?SUPERVISOR, start_link,
Prefix ++ [?MODULE, {overall, Group, Init}]) of
- {ok, Pid} -> call(Pid, {init, Pid}),
- {ok, Pid};
+ {ok, Pid} -> case catch call(Pid, {init, Pid}) of
+ ok -> {ok, Pid};
+ E -> E
+ end;
Other -> Other
end.
@@ -346,13 +333,20 @@ handle_call({init, Overall}, _From,
end || Pid <- Rest],
Delegate = child(Overall, delegate),
erlang:monitor(process, Delegate),
- [maybe_start(Group, Delegate, S) || S <- ChildSpecs],
- {reply, ok, State#state{overall = Overall, delegate = Delegate}};
+ State1 = State#state{overall = Overall, delegate = Delegate},
+ case all_started([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of
+ true -> {reply, ok, State1};
+ false -> {stop, shutdown, State1}
+ end;
handle_call({start_child, ChildSpec}, _From,
State = #state{delegate = Delegate,
group = Group}) ->
- {reply, maybe_start(Group, Delegate, ChildSpec), State};
+ {reply, case maybe_start(Group, Delegate, ChildSpec) of
+ already_in_mnesia -> {error, already_present};
+ {already_in_mnesia, Pid} -> {error, {already_started, Pid}};
+ Else -> Else
+ end, State};
handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate,
group = Group}) ->
@@ -400,13 +394,16 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason},
%% TODO load balance this
%% No guarantee pg2 will have received the DOWN before us.
Self = self(),
- case lists:sort(?PG2:get_members(Group)) -- [Pid] of
- [Self | _] -> {atomic, ChildSpecs} =
- mnesia:transaction(fun() -> update_all(Pid) end),
- [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
- _ -> ok
- end,
- {noreply, State};
+ R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of
+ [Self | _] -> {atomic, ChildSpecs} =
+ mnesia:transaction(fun() -> update_all(Pid) end),
+ [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
+ _ -> []
+ end,
+ case all_started(R) of
+ true -> {noreply, State};
+ false -> {stop, shutdown, State}
+ end;
handle_info(Info, State) ->
{stop, {unexpected_info, Info}, State}.
@@ -428,8 +425,8 @@ maybe_start(Group, Delegate, ChildSpec) ->
check_start(Group, Delegate, ChildSpec)
end) of
{atomic, start} -> start(Delegate, ChildSpec);
- {atomic, undefined} -> {error, already_present};
- {atomic, Pid} -> {error, {already_started, Pid}};
+ {atomic, undefined} -> already_in_mnesia;
+ {atomic, Pid} -> {already_in_mnesia, Pid};
%% If we are torn down while in the transaction...
{aborted, E} -> {error, E}
end.
@@ -499,6 +496,8 @@ delete_all(Group) ->
[delete(Group, id(C)) ||
C <- mnesia:select(?TABLE, [{MatchHead, [], ['$1']}])].
+all_started(Results) -> [] =:= [R || R = {error, _} <- Results].
+
%%----------------------------------------------------------------------------
create_tables() ->
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index 6c91bc4fd5..d48a9ca51a 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -43,6 +43,7 @@ all_tests() ->
passed = test_start_idempotence(),
passed = test_unsupported(),
passed = test_ignore(),
+ passed = test_startup_failure(),
passed.
%% Simplest test
@@ -195,6 +196,22 @@ test_ignore() ->
{sup, fake_strategy_for_ignore, []}),
passed.
+test_startup_failure() ->
+ [test_startup_failure(F) || F <- [want_error, want_exit]],
+ passed.
+
+test_startup_failure(Fail) ->
+ process_flag(trap_exit, true),
+ ?MS:start_link(get_group(group), ?MODULE,
+ {sup, one_for_one, [childspec(Fail)]}),
+ receive
+ {'EXIT', _, shutdown} ->
+ ok
+ after 1000 ->
+ exit({did_not_exit, Fail})
+ end,
+ process_flag(trap_exit, false).
+
%% ---------------------------------------------------------------------------
with_sups(Fun, Sups) ->
@@ -228,6 +245,12 @@ start_sup0(Name, Group, ChildSpecs) ->
childspec(Id) ->
{Id, {?MODULE, start_gs, [Id]}, transient, 16#ffffffff, worker, [?MODULE]}.
+start_gs(want_error) ->
+ {error, foo};
+
+start_gs(want_exit) ->
+ exit(foo);
+
start_gs(Id) ->
gen_server:start_link({local, Id}, ?MODULE, server, []).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 0a2681a219..9609eb0400 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -132,7 +132,7 @@
-rabbit_boot_step({recovery,
[{description, "exchange, queue and binding recovery"},
{mfa, {rabbit, recover, []}},
- {requires, empty_db_check},
+ {requires, core_initialized},
{enables, routing_ready}]}).
-rabbit_boot_step({mirror_queue_slave_sup,
@@ -158,8 +158,9 @@
{enables, networking}]}).
-rabbit_boot_step({direct_client,
- [{mfa, {rabbit_direct, boot, []}},
- {requires, log_relay}]}).
+ [{description, "direct client"},
+ {mfa, {rabbit_direct, boot, []}},
+ {requires, log_relay}]}).
-rabbit_boot_step({networking,
[{mfa, {rabbit_networking, boot, []}},
@@ -308,17 +309,28 @@ stop_and_halt() ->
ok.
status() ->
- [{pid, list_to_integer(os:getpid())},
- {running_applications, application:which_applications(infinity)},
- {os, os:type()},
- {erlang_version, erlang:system_info(system_version)},
- {memory, erlang:memory()}] ++
- rabbit_misc:filter_exit_map(
- fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end,
- [{vm_memory_high_watermark, {vm_memory_monitor,
- get_vm_memory_high_watermark, []}},
- {vm_memory_limit, {vm_memory_monitor,
- get_memory_limit, []}}]).
+ S1 = [{pid, list_to_integer(os:getpid())},
+ {running_applications, application:which_applications(infinity)},
+ {os, os:type()},
+ {erlang_version, erlang:system_info(system_version)},
+ {memory, erlang:memory()}],
+ S2 = rabbit_misc:filter_exit_map(
+ fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end,
+ [{vm_memory_high_watermark, {vm_memory_monitor,
+ get_vm_memory_high_watermark, []}},
+ {vm_memory_limit, {vm_memory_monitor,
+ get_memory_limit, []}}]),
+ S3 = rabbit_misc:with_exit_handler(
+ fun () -> [] end,
+ fun () -> [{file_descriptors, file_handle_cache:info()}] end),
+ S4 = [{processes, [{limit, erlang:system_info(process_limit)},
+ {used, erlang:system_info(process_count)}]},
+ {run_queue, erlang:statistics(run_queue)},
+ {uptime, begin
+ {T,_} = erlang:statistics(wall_clock),
+ T div 1000
+ end}],
+ S1 ++ S2 ++ S3 ++ S4.
is_running() -> is_running(node()).
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index fd03ca85b3..517dd4ec60 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -31,10 +31,9 @@
-ifdef(use_specs).
--type(mfa_tuple() :: {atom(), atom(), list()}).
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(register/2 :: (pid(), mfa_tuple()) -> boolean()).
+-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()).
-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 041f0b4a23..523a7bba2e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -44,17 +44,17 @@
-ifdef(use_specs).
--export_type([name/0, qmsg/0]).
+-export_type([name/0, qmsg/0, routing_result/0]).
-type(name() :: rabbit_types:r('queue')).
-
+-type(qpids() :: [pid()]).
-type(qlen() :: rabbit_types:ok(non_neg_integer())).
-type(qfun(A) :: fun ((rabbit_types:amqqueue()) -> A | no_return())).
-type(qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}).
-type(msg_id() :: non_neg_integer()).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-
+-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
-type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
-spec(start/0 :: () -> [name()]).
@@ -69,7 +69,8 @@
-> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
-spec(lookup/1 ::
(name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
- rabbit_types:error('not_found')).
+ rabbit_types:error('not_found');
+ ([name()]) -> [rabbit_types:amqqueue()]).
-spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')).
-spec(with_or_die/2 ::
(name(), qfun(A)) -> A | rabbit_types:channel_exit()).
@@ -117,12 +118,13 @@
rabbit_types:error('in_use') |
rabbit_types:error('not_empty')).
-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
--spec(deliver/2 :: (pid(), rabbit_types:delivery()) -> boolean()).
+-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
+ {routing_result(), qpids()}).
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
--spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
--spec(limit_all/3 :: ([pid()], pid(), rabbit_limiter:token()) ->
+-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()).
+-spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) ->
ok_or_errors()).
-spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
@@ -134,7 +136,7 @@
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
--spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
+-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit() |
@@ -244,7 +246,7 @@ determine_queue_nodes(Args) ->
case [list_to_atom(binary_to_list(Node)) ||
{longstr, Node} <- Nodes] of
[Node] -> {Node, undefined};
- [First | Rest] -> {First, Rest}
+ [First | Rest] -> {First, [First | Rest]}
end;
{{_Type, <<"all">>}, _} ->
{node(), all};
@@ -264,6 +266,10 @@ add_default_binding(#amqqueue{name = QueueName}) ->
key = RoutingKey,
args = []}).
+lookup(Names) when is_list(Names) ->
+ %% Normally we'd call mnesia:dirty_read/1 here, but that is quite
+ %% expensive for reasons explained in rabbit_misc:dirty_read/1.
+ lists:append([ets:lookup(rabbit_queue, Name) || Name <- Names]);
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_queue, Name}).
@@ -432,14 +438,39 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge).
-deliver(QPid, Delivery = #delivery{immediate = true}) ->
- gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity);
-deliver(QPid, Delivery = #delivery{mandatory = true}) ->
- gen_server2:call(QPid, {deliver, Delivery}, infinity),
- true;
-deliver(QPid, Delivery) ->
- gen_server2:cast(QPid, {deliver, Delivery}),
- true.
+deliver([], #delivery{mandatory = false, immediate = false}) ->
+ %% /dev/null optimisation
+ {routed, []};
+
+deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}) ->
+ %% optimisation: when Mandatory = false and Immediate = false,
+ %% rabbit_amqqueue:deliver will deliver the message to the queue
+ %% process asynchronously, and return true, which means all the
+ %% QPids will always be returned. It is therefore safe to use a
+ %% fire-and-forget cast here and return the QPids - the semantics
+ %% is preserved. This scales much better than the non-immediate
+ %% case below.
+ QPids = qpids(Qs),
+ delegate:invoke_no_result(
+ QPids, fun (QPid) -> gen_server2:cast(QPid, {deliver, Delivery}) end),
+ {routed, QPids};
+
+deliver(Qs, Delivery = #delivery{mandatory = Mandatory,
+ immediate = Immediate}) ->
+ QPids = qpids(Qs),
+ {Success, _} =
+ delegate:invoke(
+ QPids, fun (QPid) ->
+ gen_server2:call(QPid, {deliver, Delivery}, infinity)
+ end),
+ case {Mandatory, Immediate,
+ lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]};
+ ({_, false}, {_, H}) -> {true, H}
+ end, {false, []}, Success)} of
+ {true, _ , {false, []}} -> {unroutable, []};
+ {_ , true, {_ , []}} -> {not_delivered, []};
+ {_ , _ , {_ , R}} -> {routed, R}
+ end.
requeue(QPid, MsgIds, ChPid) ->
delegate_call(QPid, {requeue, MsgIds, ChPid}).
@@ -531,6 +562,9 @@ pseudo_queue(QueueName, Pid) ->
slave_pids = [],
mirror_nodes = undefined}.
+qpids(Qs) -> lists:append([[QPid | SPids] ||
+ #amqqueue{pid = QPid, slave_pids = SPids} <- Qs]).
+
safe_delegate_call_ok(F, Pids) ->
case delegate:invoke(Pids, fun (Pid) ->
rabbit_misc:with_exit_handler(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 019280f17a..b539ff628b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1061,9 +1061,7 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State) ->
reply(consumers(State), State);
-handle_call({deliver_immediately, Delivery}, _From, State) ->
- %% Synchronous, "immediate" delivery mode
- %%
+handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) ->
%% FIXME: Is this correct semantics?
%%
%% I'm worried in particular about the case where an exchange has
@@ -1081,8 +1079,7 @@ handle_call({deliver_immediately, Delivery}, _From, State) ->
false -> discard_delivery(Delivery, State1)
end);
-handle_call({deliver, Delivery}, From, State) ->
- %% Synchronous, "mandatory" delivery mode. Reply asap.
+handle_call({deliver, Delivery = #delivery{mandatory = true}}, From, State) ->
gen_server2:reply(From, true),
noreply(deliver_or_enqueue(Delivery, State));
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index bc2f6ea994..4196db7dd4 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -30,7 +30,7 @@
-type(properties_input() ::
(rabbit_framing:amqp_property_record() | [{atom(), any()}])).
-type(publish_result() ::
- ({ok, rabbit_router:routing_result(), [pid()]}
+ ({ok, rabbit_amqqueue:routing_result(), [pid()]}
| rabbit_types:error('not_found'))).
-type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())).
@@ -89,8 +89,8 @@ publish(Delivery = #delivery{
end.
publish(X, Delivery) ->
- {RoutingRes, DeliveredQPids} =
- rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery),
+ Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)),
+ {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver(Qs, Delivery),
{ok, RoutingRes, DeliveredQPids}.
delivery(Mandatory, Immediate, Message, MsgSeqNo) ->
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index e625a427e4..655bbb7369 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -277,6 +277,7 @@ has_for_source(SrcName) ->
contains(rabbit_semi_durable_route, Match).
remove_for_source(SrcName) ->
+ lock_route_tables(),
Match = #route{binding = #binding{source = SrcName, _ = '_'}},
Routes = lists:usort(
mnesia:match_object(rabbit_route, Match, write) ++
@@ -351,7 +352,28 @@ continue('$end_of_table') -> false;
continue({[_|_], _}) -> true;
continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
+%% For bulk operations we lock the tables we are operating on in order
+%% to reduce the time complexity. Without the table locks we end up
+%% with num_tables*num_bulk_bindings row-level locks. Taking each lock
+%% takes time proportional to the number of existing locks, thus
+%% resulting in O(num_bulk_bindings^2) complexity.
+%%
+%% The locks need to be write locks since ultimately we end up
+%% removing all these rows.
+%%
+%% The downside of all this is that no other binding operations except
+%% lookup/routing (which uses dirty ops) can take place
+%% concurrently. However, that is the case already since the bulk
+%% operations involve mnesia:match_object calls with a partial key,
+%% which entails taking a table lock.
+lock_route_tables() ->
+ [mnesia:lock({table, T}, write) || T <- [rabbit_route,
+ rabbit_reverse_route,
+ rabbit_semi_durable_route,
+ rabbit_durable_route]].
+
remove_for_destination(DstName, DeleteFun) ->
+ lock_route_tables(),
Match = reverse_route(
#route{binding = #binding{destination = DstName, _ = '_'}}),
ReverseRoutes = mnesia:match_object(rabbit_reverse_route, Match, write),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 9df862c635..4c4ba230bc 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1358,7 +1358,8 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
msg_seq_no = MsgSeqNo},
QNames}, State) ->
- {RoutingRes, DeliveredQPids} = rabbit_router:deliver(QNames, Delivery),
+ {RoutingRes, DeliveredQPids} =
+ rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(QNames), Delivery),
State1 = process_routing_result(RoutingRes, DeliveredQPids,
XName, MsgSeqNo, Message, State),
maybe_incr_stats([{XName, 1} |
diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl
index dfb400e37f..4ba01b4f4d 100644
--- a/src/rabbit_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -28,8 +28,9 @@
-ifdef(use_specs).
--spec(start_link/1 :: (mfa()) -> rabbit_types:ok_pid_or_error()).
--spec(start_link/2 :: ({'local', atom()}, mfa()) ->
+-spec(start_link/1 :: (rabbit_types:mfargs()) ->
+ rabbit_types:ok_pid_or_error()).
+-spec(start_link/2 :: ({'local', atom()}, rabbit_types:mfargs()) ->
rabbit_types:ok_pid_or_error()).
-endif.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 20486af52b..22b57b1ae9 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -79,6 +79,12 @@ start() ->
io:format(Format ++ " ...~n", Args1)
end
end,
+ PrintInvalidCommandError =
+ fun () ->
+ print_error("invalid command '~s'",
+ [string:join([atom_to_list(Command) | Args], " ")])
+ end,
+
%% The reason we don't use a try/catch here is that rpc:call turns
%% thrown errors into normal return values
case catch action(Command, Node, Args, Opts, Inform) of
@@ -88,9 +94,11 @@ start() ->
false -> io:format("...done.~n")
end,
rabbit_misc:quit(0);
- {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
- print_error("invalid command '~s'",
- [string:join([atom_to_list(Command) | Args], " ")]),
+ {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> %% < R15
+ PrintInvalidCommandError(),
+ usage();
+ {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} -> %% >= R15
+ PrintInvalidCommandError(),
usage();
{'EXIT', {badarg, _}} ->
print_error("invalid parameter: ~p", [Args]),
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 348655b101..91c7b5d3ef 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -52,6 +52,7 @@ validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(transaction, #exchange{name = X}, _Bs) ->
+ trie_remove_all_nodes(X),
trie_remove_all_edges(X),
trie_remove_all_bindings(X),
ok;
@@ -63,59 +64,26 @@ add_binding(transaction, _Exchange, Binding) ->
add_binding(none, _Exchange, _Binding) ->
ok.
-remove_bindings(transaction, #exchange{name = X}, Bs) ->
- %% The remove process is split into two distinct phases. In the
- %% first phase we gather the lists of bindings and edges to
- %% delete, then in the second phase we process all the
- %% deletions. This is to prevent interleaving of read/write
- %% operations in mnesia that can adversely affect performance.
- {ToDelete, Paths} =
- lists:foldl(
- fun(#binding{source = S, key = K, destination = D}, {Acc, PathAcc}) ->
- Path = [{FinalNode, _} | _] =
- follow_down_get_path(S, split_topic_key(K)),
- {[{FinalNode, D} | Acc],
- decrement_bindings(X, Path, maybe_add_path(X, Path, PathAcc))}
- end, {[], gb_trees:empty()}, Bs),
-
- [trie_remove_binding(X, FinalNode, D) || {FinalNode, D} <- ToDelete],
- [trie_remove_edge(X, Parent, Node, W) ||
- {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)],
+remove_bindings(transaction, _X, Bs) ->
+ %% See rabbit_binding:lock_route_tables for the rationale for
+ %% taking table locks.
+ case Bs of
+ [_] -> ok;
+ _ -> [mnesia:lock({table, T}, write) ||
+ T <- [rabbit_topic_trie_node,
+ rabbit_topic_trie_edge,
+ rabbit_topic_trie_binding]]
+ end,
+ [begin
+ Path = [{FinalNode, _} | _] =
+ follow_down_get_path(X, split_topic_key(K)),
+ trie_remove_binding(X, FinalNode, D),
+ remove_path_if_empty(X, Path)
+ end || #binding{source = X, key = K, destination = D} <- Bs],
ok;
remove_bindings(none, _X, _Bs) ->
ok.
-maybe_add_path(_X, [{root, none}], PathAcc) ->
- PathAcc;
-maybe_add_path(X, [{Node, W}, {Parent, _} | _], PathAcc) ->
- case gb_trees:is_defined(Node, PathAcc) of
- true -> PathAcc;
- false -> gb_trees:insert(Node, {Parent, W, {trie_binding_count(X, Node),
- trie_child_count(X, Node)}},
- PathAcc)
- end.
-
-decrement_bindings(X, Path, PathAcc) ->
- with_path_acc(X, fun({Bindings, Edges}) -> {Bindings - 1, Edges} end,
- Path, PathAcc).
-
-decrement_edges(X, Path, PathAcc) ->
- with_path_acc(X, fun({Bindings, Edges}) -> {Bindings, Edges - 1} end,
- Path, PathAcc).
-
-with_path_acc(_X, _Fun, [{root, none}], PathAcc) ->
- PathAcc;
-with_path_acc(X, Fun, [{Node, _} | ParentPath], PathAcc) ->
- {Parent, W, Counts} = gb_trees:get(Node, PathAcc),
- NewCounts = Fun(Counts),
- NewPathAcc = gb_trees:update(Node, {Parent, W, NewCounts}, PathAcc),
- case NewCounts of
- {0, 0} -> decrement_edges(X, ParentPath,
- maybe_add_path(X, ParentPath, NewPathAcc));
- _ -> NewPathAcc
- end.
-
-
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
@@ -183,6 +151,16 @@ follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) ->
error -> {error, Acc, Words}
end.
+remove_path_if_empty(_, [{root, none}]) ->
+ ok;
+remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) ->
+ case mnesia:read(rabbit_topic_trie_node,
+ #trie_node{exchange_name = X, node_id = Node}, write) of
+ [] -> trie_remove_edge(X, Parent, Node, W),
+ remove_path_if_empty(X, RestPath);
+ _ -> ok
+ end.
+
trie_child(X, Node, Word) ->
case mnesia:read({rabbit_topic_trie_edge,
#trie_edge{exchange_name = X,
@@ -199,10 +177,30 @@ trie_bindings(X, Node) ->
destination = '$1'}},
mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]).
+trie_update_node_counts(X, Node, Field, Delta) ->
+ E = case mnesia:read(rabbit_topic_trie_node,
+ #trie_node{exchange_name = X,
+ node_id = Node}, write) of
+ [] -> #topic_trie_node{trie_node = #trie_node{
+ exchange_name = X,
+ node_id = Node},
+ edge_count = 0,
+ binding_count = 0};
+ [E0] -> E0
+ end,
+ case setelement(Field, E, element(Field, E) + Delta) of
+ #topic_trie_node{edge_count = 0, binding_count = 0} ->
+ ok = mnesia:delete_object(rabbit_topic_trie_node, E, write);
+ EN ->
+ ok = mnesia:write(rabbit_topic_trie_node, EN, write)
+ end.
+
trie_add_edge(X, FromNode, ToNode, W) ->
+ trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, +1),
trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3).
trie_remove_edge(X, FromNode, ToNode, W) ->
+ trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, -1),
trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3).
trie_edge_op(X, FromNode, ToNode, W, Op) ->
@@ -214,9 +212,11 @@ trie_edge_op(X, FromNode, ToNode, W, Op) ->
write).
trie_add_binding(X, Node, D) ->
+ trie_update_node_counts(X, Node, #topic_trie_node.binding_count, +1),
trie_binding_op(X, Node, D, fun mnesia:write/3).
trie_remove_binding(X, Node, D) ->
+ trie_update_node_counts(X, Node, #topic_trie_node.binding_count, -1),
trie_binding_op(X, Node, D, fun mnesia:delete_object/3).
trie_binding_op(X, Node, D, Op) ->
@@ -227,23 +227,11 @@ trie_binding_op(X, Node, D, Op) ->
destination = D}},
write).
-trie_child_count(X, Node) ->
- count(rabbit_topic_trie_edge,
- #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
- node_id = Node,
- _ = '_'},
- _ = '_'}).
-
-trie_binding_count(X, Node) ->
- count(rabbit_topic_trie_binding,
- #topic_trie_binding{
- trie_binding = #trie_binding{exchange_name = X,
- node_id = Node,
- _ = '_'},
- _ = '_'}).
-
-count(Table, Match) ->
- length(mnesia:match_object(Table, Match, read)).
+trie_remove_all_nodes(X) ->
+ remove_all(rabbit_topic_trie_node,
+ #topic_trie_node{trie_node = #trie_node{exchange_name = X,
+ _ = '_'},
+ _ = '_'}).
trie_remove_all_edges(X) ->
remove_all(rabbit_topic_trie_edge,
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 02f3158f3b..c25a177b06 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -178,11 +178,8 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%----------------------------------------------------------------------------
-zero_clamp(Sum) ->
- case Sum < ?EPSILON of
- true -> 0.0;
- false -> Sum
- end.
+zero_clamp(Sum) when Sum < ?EPSILON -> 0.0;
+zero_clamp(Sum) -> Sum.
internal_deregister(Pid, Demonitor,
State = #state { queue_duration_sum = Sum,
@@ -240,26 +237,21 @@ internal_update(State = #state { queue_durations = Durations,
fun (Proc = #process { reported = QueueDuration,
sent = PrevSendDuration,
callback = {M, F, A} }, true) ->
- case (case {QueueDuration, PrevSendDuration} of
- {infinity, infinity} ->
- true;
- {infinity, D} ->
- DesiredDurationAvg1 < D;
- {D, infinity} ->
- DesiredDurationAvg1 < D;
- {D1, D2} ->
- DesiredDurationAvg1 <
- lists:min([D1,D2])
- end) of
- true ->
- ok = erlang:apply(
- M, F, A ++ [DesiredDurationAvg1]),
- ets:insert(
- Durations,
- Proc #process {sent = DesiredDurationAvg1});
- false ->
- true
+ case should_send(QueueDuration, PrevSendDuration,
+ DesiredDurationAvg1) of
+ true -> ok = erlang:apply(
+ M, F, A ++ [DesiredDurationAvg1]),
+ ets:insert(
+ Durations,
+ Proc #process {
+ sent = DesiredDurationAvg1});
+ false -> true
end
end, true, Durations)
end,
State1.
+
+should_send(infinity, infinity, _) -> true;
+should_send(infinity, D, DD) -> DD < D;
+should_send(D, infinity, DD) -> DD < D;
+should_send(D1, D2, DD) -> DD < lists:min([D1, D2]).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index a8c2006d14..db1f056f4c 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -148,9 +148,8 @@ init([#amqqueue { name = QueueName } = Q]) ->
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) ->
- %% Synchronous, "immediate" delivery mode
-
+handle_call({deliver, Delivery = #delivery { immediate = true }},
+ From, State) ->
%% It is safe to reply 'false' here even if a) we've not seen the
%% msg via gm, or b) the master dies before we receive the msg via
%% gm. In the case of (a), we will eventually receive the msg via
@@ -166,8 +165,8 @@ handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) ->
gen_server2:reply(From, false), %% master may deliver it, not us
noreply(maybe_enqueue_message(Delivery, false, State));
-handle_call({deliver, Delivery = #delivery {}}, From, State) ->
- %% Synchronous, "mandatory" delivery mode
+handle_call({deliver, Delivery = #delivery { mandatory = true }},
+ From, State) ->
gen_server2:reply(From, true), %% amqqueue throws away the result anyway
noreply(maybe_enqueue_message(Delivery, true, State));
@@ -526,9 +525,11 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
CPid, BQ, BQS, GM, SS, MonitoringPids),
- MTC = dict:from_list(
- [{MsgId, {ChPid, MsgSeqNo}} ||
- {MsgId, {published, ChPid, MsgSeqNo}} <- dict:to_list(MS)]),
+ MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) ->
+ gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
+ (_, MTC0) ->
+ MTC0
+ end, gb_trees:empty(), MSList),
NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)],
AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
@@ -725,7 +726,7 @@ process_instruction(
never ->
{MQ2, PendingCh, MS};
eventually ->
- {MQ2, sets:add_element(MsgId, PendingCh),
+ {MQ2, PendingCh,
dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)};
immediately ->
ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index a9a9a4c418..2676ddc63b 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -253,18 +253,23 @@ assert_args_equivalence(Orig, New, Name, Keys) ->
ok.
assert_args_equivalence1(Orig, New, Name, Key) ->
- case {table_lookup(Orig, Key), table_lookup(New, Key)} of
+ {Orig1, New1} = {table_lookup(Orig, Key), table_lookup(New, Key)},
+ FailureFun = fun () ->
+ protocol_error(precondition_failed, "inequivalent arg '~s'"
+ "for ~s: received ~s but current is ~s",
+ [Key, rs(Name), val(New1), val(Orig1)])
+ end,
+ case {Orig1, New1} of
{Same, Same} ->
ok;
- {{OrigType, OrigVal} = Orig1, {NewType, NewVal} = New1} ->
+ {{OrigType, OrigVal}, {NewType, NewVal}} ->
case type_class(OrigType) == type_class(NewType) andalso
OrigVal == NewVal of
true -> ok;
- false -> protocol_error(precondition_failed, "inequivalent arg"
- " '~s' for ~s: received ~s but current"
- " is ~s",
- [Key, rs(Name), val(New1), val(Orig1)])
- end
+ false -> FailureFun()
+ end;
+ {_, _} ->
+ FailureFun()
end.
val(undefined) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index c8c18843a8..bf997a6f8b 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -268,6 +268,11 @@ table_definitions() ->
{type, ordered_set},
{match, #reverse_route{reverse_binding = reverse_binding_match(),
_='_'}}]},
+ {rabbit_topic_trie_node,
+ [{record_name, topic_trie_node},
+ {attributes, record_info(fields, topic_trie_node)},
+ {type, ordered_set},
+ {match, #topic_trie_node{trie_node = trie_node_match(), _='_'}}]},
{rabbit_topic_trie_edge,
[{record_name, topic_trie_edge},
{attributes, record_info(fields, topic_trie_edge)},
@@ -314,12 +319,12 @@ reverse_binding_match() ->
_='_'}.
binding_destination_match() ->
resource_match('_').
+trie_node_match() ->
+ #trie_node{ exchange_name = exchange_name_match(), _='_'}.
trie_edge_match() ->
- #trie_edge{exchange_name = exchange_name_match(),
- _='_'}.
+ #trie_edge{ exchange_name = exchange_name_match(), _='_'}.
trie_binding_match() ->
- #trie_binding{exchange_name = exchange_name_match(),
- _='_'}.
+ #trie_binding{exchange_name = exchange_name_match(), _='_'}.
exchange_name_match() ->
resource_match(exchange).
queue_name_match() ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 2c0912dfd6..e81f8134f8 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -24,7 +24,7 @@
close_connection/2, force_connection_event_refresh/0]).
%%used by TCP-based transports, e.g. STOMP adapter
--export([check_tcp_listener_address/2,
+-export([tcp_listener_addresses/1, tcp_listener_spec/6,
ensure_ssl/0, ssl_transform_fun/1]).
-export([tcp_listener_started/3, tcp_listener_stopped/3,
@@ -47,12 +47,16 @@
-export_type([ip_port/0, hostname/0]).
-type(hostname() :: inet:hostname()).
--type(ip_port() :: inet:ip_port()).
+-type(ip_port() :: inet:port_number()).
-type(family() :: atom()).
-type(listener_config() :: ip_port() |
{hostname(), ip_port()} |
{hostname(), ip_port(), family()}).
+-type(address() :: {inet:ip_address(), ip_port(), family()}).
+-type(name_prefix() :: atom()).
+-type(protocol() :: atom()).
+-type(label() :: string()).
-spec(start/0 :: () -> 'ok').
-spec(start_tcp_listener/1 :: (listener_config()) -> 'ok').
@@ -76,8 +80,10 @@
-spec(force_connection_event_refresh/0 :: () -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
--spec(check_tcp_listener_address/2 :: (atom(), listener_config())
- -> [{inet:ip_address(), ip_port(), family(), atom()}]).
+-spec(tcp_listener_addresses/1 :: (listener_config()) -> [address()]).
+-spec(tcp_listener_spec/6 ::
+ (name_prefix(), address(), [gen_tcp:listen_option()], protocol(),
+ label(), rabbit_types:mfargs()) -> supervisor:child_spec()).
-spec(ensure_ssl/0 :: () -> rabbit_types:infos()).
-spec(ssl_transform_fun/1 ::
(rabbit_types:infos())
@@ -140,39 +146,6 @@ start() ->
transient, infinity, supervisor, [rabbit_client_sup]}),
ok.
-%% inet_parse:address takes care of ip string, like "0.0.0.0"
-%% inet:getaddr returns immediately for ip tuple {0,0,0,0},
-%% and runs 'inet_gethost' port process for dns lookups.
-%% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
-
-getaddr(Host, Family) ->
- case inet_parse:address(Host) of
- {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}];
- {error, _} -> gethostaddr(Host, Family)
- end.
-
-gethostaddr(Host, auto) ->
- Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]],
- case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of
- [] -> host_lookup_error(Host, Lookups);
- IPs -> IPs
- end;
-
-gethostaddr(Host, Family) ->
- case inet:getaddr(Host, Family) of
- {ok, IPAddress} -> [{IPAddress, Family}];
- {error, Reason} -> host_lookup_error(Host, Reason)
- end.
-
-host_lookup_error(Host, Reason) ->
- error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]),
- throw({error, {invalid_host, Host, Reason}}).
-
-resolve_family({_,_,_,_}, auto) -> inet;
-resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6;
-resolve_family(IP, auto) -> throw({error, {strange_family, IP}});
-resolve_family(_, F) -> F.
-
ensure_ssl() ->
ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
{ok, SslOptsConfig} = application:get_env(rabbit, ssl_options),
@@ -201,31 +174,36 @@ ssl_transform_fun(SslOpts) ->
end
end.
-check_tcp_listener_address(NamePrefix, Port) when is_integer(Port) ->
- check_tcp_listener_address_auto(NamePrefix, Port);
-
-check_tcp_listener_address(NamePrefix, {"auto", Port}) ->
+tcp_listener_addresses(Port) when is_integer(Port) ->
+ tcp_listener_addresses_auto(Port);
+tcp_listener_addresses({"auto", Port}) ->
%% Variant to prevent lots of hacking around in bash and batch files
- check_tcp_listener_address_auto(NamePrefix, Port);
-
-check_tcp_listener_address(NamePrefix, {Host, Port}) ->
+ tcp_listener_addresses_auto(Port);
+tcp_listener_addresses({Host, Port}) ->
%% auto: determine family IPv4 / IPv6 after converting to IP address
- check_tcp_listener_address(NamePrefix, {Host, Port, auto});
-
-check_tcp_listener_address(NamePrefix, {Host, Port, Family0}) ->
- if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok;
- true -> error_logger:error_msg("invalid port ~p - not 0..65535~n",
- [Port]),
- throw({error, {invalid_port, Port}})
- end,
- [{IPAddress, Port, Family,
- rabbit_misc:tcp_name(NamePrefix, IPAddress, Port)} ||
- {IPAddress, Family} <- getaddr(Host, Family0)].
-
-check_tcp_listener_address_auto(NamePrefix, Port) ->
- lists:append([check_tcp_listener_address(NamePrefix, Listener) ||
+ tcp_listener_addresses({Host, Port, auto});
+tcp_listener_addresses({Host, Port, Family0})
+ when is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) ->
+ [{IPAddress, Port, Family} ||
+ {IPAddress, Family} <- getaddr(Host, Family0)];
+tcp_listener_addresses({_Host, Port, _Family0}) ->
+ error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]),
+ throw({error, {invalid_port, Port}}).
+
+tcp_listener_addresses_auto(Port) ->
+ lists:append([tcp_listener_addresses(Listener) ||
Listener <- port_to_listeners(Port)]).
+tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts,
+ Protocol, Label, OnConnect) ->
+ {rabbit_misc:tcp_name(NamePrefix, IPAddress, Port),
+ {tcp_listener_sup, start_link,
+ [IPAddress, Port, [Family | SocketOpts],
+ {?MODULE, tcp_listener_started, [Protocol]},
+ {?MODULE, tcp_listener_stopped, [Protocol]},
+ OnConnect, Label]},
+ transient, infinity, supervisor, [tcp_listener_sup]}.
+
start_tcp_listener(Listener) ->
start_listener(Listener, amqp, "TCP Listener",
{?MODULE, start_client, []}).
@@ -235,27 +213,21 @@ start_ssl_listener(Listener, SslOpts) ->
{?MODULE, start_ssl_client, [SslOpts]}).
start_listener(Listener, Protocol, Label, OnConnect) ->
- [start_listener0(Spec, Protocol, Label, OnConnect) ||
- Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)],
+ [start_listener0(Address, Protocol, Label, OnConnect) ||
+ Address <- tcp_listener_addresses(Listener)],
ok.
-start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) ->
- {ok,_} = supervisor:start_child(
- rabbit_sup,
- {Name,
- {tcp_listener_sup, start_link,
- [IPAddress, Port, [Family | tcp_opts()],
- {?MODULE, tcp_listener_started, [Protocol]},
- {?MODULE, tcp_listener_stopped, [Protocol]},
- OnConnect, Label]},
- transient, infinity, supervisor, [tcp_listener_sup]}).
+start_listener0(Address, Protocol, Label, OnConnect) ->
+ Spec = tcp_listener_spec(rabbit_tcp_listener_sup, Address, tcp_opts(),
+ Protocol, Label, OnConnect),
+ {ok,_} = supervisor:start_child(rabbit_sup, Spec).
stop_tcp_listener(Listener) ->
- [stop_tcp_listener0(Spec) ||
- Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)],
+ [stop_tcp_listener0(Address) ||
+ Address <- tcp_listener_addresses(Listener)],
ok.
-stop_tcp_listener0({IPAddress, Port, _Family, Name}) ->
+stop_tcp_listener0({IPAddress, Port, _Family}) ->
Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port),
ok = supervisor:terminate_child(rabbit_sup, Name),
ok = supervisor:delete_child(rabbit_sup, Name).
@@ -307,9 +279,15 @@ connections() ->
rabbit_networking, connections_local, []).
connections_local() ->
- [rabbit_connection_sup:reader(ConnSup) ||
+ [Reader ||
{_, ConnSup, supervisor, _}
- <- supervisor:which_children(rabbit_tcp_client_sup)].
+ <- supervisor:which_children(rabbit_tcp_client_sup),
+ Reader <- [try
+ rabbit_connection_sup:reader(ConnSup)
+ catch exit:{noproc, _} ->
+ noproc
+ end],
+ Reader =/= noproc].
connection_info_keys() -> rabbit_reader:info_keys().
@@ -357,6 +335,38 @@ tcp_opts() ->
{ok, Opts} = application:get_env(rabbit, tcp_listen_options),
Opts.
+%% inet_parse:address takes care of ip string, like "0.0.0.0"
+%% inet:getaddr returns immediately for ip tuple {0,0,0,0},
+%% and runs 'inet_gethost' port process for dns lookups.
+%% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
+getaddr(Host, Family) ->
+ case inet_parse:address(Host) of
+ {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}];
+ {error, _} -> gethostaddr(Host, Family)
+ end.
+
+gethostaddr(Host, auto) ->
+ Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]],
+ case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of
+ [] -> host_lookup_error(Host, Lookups);
+ IPs -> IPs
+ end;
+
+gethostaddr(Host, Family) ->
+ case inet:getaddr(Host, Family) of
+ {ok, IPAddress} -> [{IPAddress, Family}];
+ {error, Reason} -> host_lookup_error(Host, Reason)
+ end.
+
+host_lookup_error(Host, Reason) ->
+ error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]),
+ throw({error, {invalid_host, Host, Reason}}).
+
+resolve_family({_,_,_,_}, auto) -> inet;
+resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6;
+resolve_family(IP, auto) -> throw({error, {strange_family, IP}});
+resolve_family(_, F) -> F.
+
%%--------------------------------------------------------------------
%% There are three kinds of machine (for our purposes).
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 0862f1b2c7..de2ba8adbb 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -55,13 +55,20 @@ start() ->
CmdArgsAndOpts -> CmdArgsAndOpts
end,
Command = list_to_atom(Command0),
+ PrintInvalidCommandError =
+ fun () ->
+ print_error("invalid command '~s'",
+ [string:join([atom_to_list(Command) | Args], " ")])
+ end,
case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of
ok ->
rabbit_misc:quit(0);
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
- print_error("invalid command '~s'",
- [string:join([atom_to_list(Command) | Args], " ")]),
+ PrintInvalidCommandError(),
+ usage();
+ {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} ->
+ PrintInvalidCommandError(),
usage();
{error, Reason} ->
print_error("~p", [Reason]),
@@ -111,8 +118,7 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
[] -> io:format("Plugin configuration unchanged.~n");
_ -> print_list("The following plugins have been enabled:",
NewImplicitlyEnabled -- ImplicitlyEnabled),
- io:format("Plugin configuration has changed. "
- "Restart RabbitMQ for changes to take effect.~n")
+ report_change()
end;
action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
@@ -140,8 +146,7 @@ action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
print_list("The following plugins have been disabled:",
ImplicitlyEnabled -- NewImplicitlyEnabled),
write_enabled_plugins(PluginsFile, NewEnabled),
- io:format("Plugin configuration has changed. "
- "Restart RabbitMQ for changes to take effect.~n")
+ report_change()
end.
%%----------------------------------------------------------------------------
@@ -327,6 +332,9 @@ lookup_plugins(Names, AllPlugins) ->
read_enabled_plugins(PluginsFile) ->
case rabbit_file:read_term_file(PluginsFile) of
{ok, [Plugins]} -> Plugins;
+ {ok, []} -> [];
+ {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file,
+ PluginsFile}});
{error, enoent} -> [];
{error, Reason} -> throw({error, {cannot_read_enabled_plugins_file,
PluginsFile, Reason}})
@@ -374,3 +382,17 @@ maybe_warn_mochiweb(Enabled) ->
false ->
ok
end.
+
+report_change() ->
+ io:format("Plugin configuration has changed. "
+ "Restart RabbitMQ for changes to take effect.~n"),
+ case os:type() of
+ {win32, _OsName} ->
+ io:format("If you have RabbitMQ running as a service then you must"
+ " reinstall by running~n rabbitmq-service.bat stop~n"
+ " rabbitmq-service.bat install~n"
+ " rabbitmq-service.bat start~n~n");
+ _ ->
+ ok
+ end.
+
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 4b54546619..f03c1d1c76 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -505,7 +505,10 @@ queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) ->
[begin
ok = gatherer:fork(Gatherer),
ok = worker_pool:submit_async(
- fun () -> queue_index_walker_reader(QueueName, Gatherer)
+ fun () -> link(Gatherer),
+ ok = queue_index_walker_reader(QueueName, Gatherer),
+ unlink(Gatherer),
+ ok
end)
end || QueueName <- DurableQueues],
queue_index_walker({next, Gatherer});
@@ -837,13 +840,16 @@ segment_entries_foldr(Fun, Init,
%%
%% Does not do any combining with the journal at all.
load_segment(KeepAcked, #segment { path = Path }) ->
+ Empty = {array_new(), 0},
case rabbit_file:is_file(Path) of
- false -> {array_new(), 0};
+ false -> Empty;
true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
- {ok, SegData} = file_handle_cache:read(
- Hdl, ?SEGMENT_TOTAL_SIZE),
- Res = load_segment_entries(KeepAcked, SegData, array_new(), 0),
+ Res = case file_handle_cache:read(Hdl, ?SEGMENT_TOTAL_SIZE) of
+ {ok, SegData} -> load_segment_entries(
+ KeepAcked, SegData, Empty);
+ eof -> Empty
+ end,
ok = file_handle_cache:close(Hdl),
Res
end.
@@ -853,15 +859,15 @@ load_segment_entries(KeepAcked,
IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
PubRecordBody:?PUB_RECORD_BODY_BYTES/binary,
SegData/binary>>,
- SegEntries, UnackedCount) ->
+ {SegEntries, UnackedCount}) ->
{MsgId, MsgProps} = parse_pub_record_body(PubRecordBody),
Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
SegEntries1 = array:set(RelSeq, Obj, SegEntries),
- load_segment_entries(KeepAcked, SegData, SegEntries1, UnackedCount + 1);
+ load_segment_entries(KeepAcked, SegData, {SegEntries1, UnackedCount + 1});
load_segment_entries(KeepAcked,
<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
RelSeq:?REL_SEQ_BITS, SegData/binary>>,
- SegEntries, UnackedCount) ->
+ {SegEntries, UnackedCount}) ->
{UnackedCountDelta, SegEntries1} =
case array:get(RelSeq, SegEntries) of
{Pub, no_del, no_ack} ->
@@ -871,10 +877,10 @@ load_segment_entries(KeepAcked,
{_Pub, del, no_ack} ->
{-1, array:reset(RelSeq, SegEntries)}
end,
- load_segment_entries(KeepAcked, SegData, SegEntries1,
- UnackedCount + UnackedCountDelta);
-load_segment_entries(_KeepAcked, _SegData, SegEntries, UnackedCount) ->
- {SegEntries, UnackedCount}.
+ load_segment_entries(KeepAcked, SegData,
+ {SegEntries1, UnackedCount + UnackedCountDelta});
+load_segment_entries(_KeepAcked, _SegData, Res) ->
+ Res.
array_new() ->
array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 694abd9e49..fce61129fa 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -31,7 +31,7 @@
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
--define(CLOSING_TIMEOUT, 1).
+-define(CLOSING_TIMEOUT, 30).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
-define(SILENT_CLOSE_DELAY, 3).
@@ -329,10 +329,14 @@ handle_other({'$gen_call', From, {info, Items}}, Deb, State) ->
catch Error -> {error, Error}
end),
mainloop(Deb, State);
-handle_other({'$gen_cast', force_event_refresh}, Deb, State) ->
+handle_other({'$gen_cast', force_event_refresh}, Deb, State)
+ when ?IS_RUNNING(State) ->
rabbit_event:notify(connection_created,
[{type, network} | infos(?CREATION_EVENT_KEYS, State)]),
mainloop(Deb, State);
+handle_other({'$gen_cast', force_event_refresh}, Deb, State) ->
+ %% Ignore, we will emit a created event once we start running.
+ mainloop(Deb, State);
handle_other(emit_stats, Deb, State) ->
mainloop(Deb, emit_stats(State));
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
@@ -341,10 +345,6 @@ handle_other(Other, _Deb, _State) ->
%% internal error -> something worth dying for
exit({unexpected_message, Other}).
-switch_callback(State = #v1{connection_state = blocked,
- heartbeater = Heartbeater}, Callback, Length) ->
- ok = rabbit_heartbeat:pause_monitor(Heartbeater),
- State#v1{callback = Callback, recv_len = Length};
switch_callback(State, Callback, Length) ->
State#v1{callback = Callback, recv_len = Length}.
@@ -376,28 +376,22 @@ close_connection(State = #v1{queue_collector = Collector,
rabbit_queue_collector:delete_all(Collector),
%% We terminate the connection after the specified interval, but
%% no later than ?CLOSING_TIMEOUT seconds.
- TimeoutMillisec =
- 1000 * if TimeoutSec > 0 andalso
- TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec;
- true -> ?CLOSING_TIMEOUT
- end,
- erlang:send_after(TimeoutMillisec, self(), terminate_connection),
+ erlang:send_after((if TimeoutSec > 0 andalso
+ TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec;
+ true -> ?CLOSING_TIMEOUT
+ end) * 1000, self(), terminate_connection),
State#v1{connection_state = closed}.
handle_dependent_exit(ChPid, Reason, State) ->
- case termination_kind(Reason) of
- controlled ->
- channel_cleanup(ChPid),
+ case {channel_cleanup(ChPid), termination_kind(Reason)} of
+ {undefined, uncontrolled} ->
+ exit({abnormal_dependent_exit, ChPid, Reason});
+ {_Channel, controlled} ->
maybe_close(State);
- uncontrolled ->
- case channel_cleanup(ChPid) of
- undefined -> exit({abnormal_dependent_exit, ChPid, Reason});
- Channel -> rabbit_log:error(
- "connection ~p, channel ~p - error:~n~p~n",
- [self(), Channel, Reason]),
- maybe_close(
- handle_exception(State, Channel, Reason))
- end
+ {Channel, uncontrolled} ->
+ rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
+ [self(), Channel, Reason]),
+ maybe_close(handle_exception(State, Channel, Reason))
end.
channel_cleanup(ChPid) ->
@@ -432,19 +426,15 @@ wait_for_channel_termination(0, TimerRef) ->
wait_for_channel_termination(N, TimerRef) ->
receive
{'DOWN', _MRef, process, ChPid, Reason} ->
- case channel_cleanup(ChPid) of
- undefined ->
+ case {channel_cleanup(ChPid), termination_kind(Reason)} of
+ {undefined, _} ->
exit({abnormal_dependent_exit, ChPid, Reason});
- Channel ->
- case termination_kind(Reason) of
- controlled ->
- ok;
- uncontrolled ->
- rabbit_log:error(
- "connection ~p, channel ~p - "
- "error while terminating:~n~p~n",
- [self(), Channel, Reason])
- end,
+ {_Channel, controlled} ->
+ wait_for_channel_termination(N-1, TimerRef);
+ {Channel, uncontrolled} ->
+ rabbit_log:error("connection ~p, channel ~p - "
+ "error while terminating:~n~p~n",
+ [self(), Channel, Reason]),
wait_for_channel_termination(N-1, TimerRef)
end;
cancel_wait ->
@@ -521,7 +511,9 @@ post_process_frame({method, MethodName, _}, _ChPid,
case Protocol:method_has_content(MethodName) of
true -> erlang:bump_reductions(2000),
case State#v1.connection_state of
- blocking -> State#v1{connection_state = blocked};
+ blocking -> ok = rabbit_heartbeat:pause_monitor(
+ State#v1.heartbeater),
+ State#v1{connection_state = blocked};
_ -> State
end;
false -> State
diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl
index cda3ccbe0f..1a08efed41 100644
--- a/src/rabbit_restartable_sup.erl
+++ b/src/rabbit_restartable_sup.erl
@@ -28,7 +28,8 @@
-ifdef(use_specs).
--spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/2 :: (atom(), rabbit_types:mfargs()) ->
+ rabbit_types:ok_pid_or_error()).
-endif.
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 31f5ad14ea..219833b726 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -18,21 +18,17 @@
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
--export([deliver/2, match_bindings/2, match_routing_key/2]).
+-export([match_bindings/2, match_routing_key/2]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--export_type([routing_key/0, routing_result/0, match_result/0]).
+-export_type([routing_key/0, match_result/0]).
-type(routing_key() :: binary()).
--type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
--type(qpids() :: [pid()]).
-type(match_result() :: [rabbit_types:binding_destination()]).
--spec(deliver/2 :: ([rabbit_amqqueue:name()], rabbit_types:delivery()) ->
- {routing_result(), qpids()}).
-spec(match_bindings/2 :: (rabbit_types:binding_source(),
fun ((rabbit_types:binding()) -> boolean())) ->
match_result()).
@@ -44,38 +40,6 @@
%%----------------------------------------------------------------------------
-deliver([], #delivery{mandatory = false,
- immediate = false}) ->
- %% /dev/null optimisation
- {routed, []};
-
-deliver(QNames, Delivery = #delivery{mandatory = false,
- immediate = false}) ->
- %% optimisation: when Mandatory = false and Immediate = false,
- %% rabbit_amqqueue:deliver will deliver the message to the queue
- %% process asynchronously, and return true, which means all the
- %% QPids will always be returned. It is therefore safe to use a
- %% fire-and-forget cast here and return the QPids - the semantics
- %% is preserved. This scales much better than the non-immediate
- %% case below.
- QPids = lookup_qpids(QNames),
- delegate:invoke_no_result(
- QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
- {routed, QPids};
-
-deliver(QNames, Delivery = #delivery{mandatory = Mandatory,
- immediate = Immediate}) ->
- QPids = lookup_qpids(QNames),
- {Success, _} =
- delegate:invoke(QPids,
- fun (Pid) ->
- rabbit_amqqueue:deliver(Pid, Delivery)
- end),
- {Routed, Handled} =
- lists:foldl(fun fold_deliveries/2, {false, []}, Success),
- check_delivery(Mandatory, Immediate, {Routed, Handled}).
-
-
%% TODO: Maybe this should be handled by a cursor instead.
%% TODO: This causes a full scan for each entry with the same source
match_bindings(SrcName, Match) ->
@@ -104,26 +68,6 @@ match_routing_key(SrcName, [_|_] = RoutingKeys) ->
%%--------------------------------------------------------------------
-fold_deliveries({Pid, true},{_, Handled}) -> {true, [Pid|Handled]};
-fold_deliveries({_, false},{_, Handled}) -> {true, Handled}.
-
-%% check_delivery(Mandatory, Immediate, {WasRouted, QPids})
-check_delivery(true, _ , {false, []}) -> {unroutable, []};
-check_delivery(_ , true, {_ , []}) -> {not_delivered, []};
-check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}.
-
-%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
-%% expensive for the reasons explained in rabbit_misc:dirty_read/1.
-lookup_qpids(QNames) ->
- lists:foldl(fun (QName, QPids) ->
- case ets:lookup(rabbit_queue, QName) of
- [#amqqueue{pid = QPid, slave_pids = SPids}] ->
- [QPid | SPids ++ QPids];
- [] ->
- QPids
- end
- end, [], QNames).
-
%% Normally we'd call mnesia:dirty_select/2 here, but that is quite
%% expensive for the same reasons as above, and, additionally, due to
%% mnesia 'fixing' the table with ets:safe_fixtable/2, which is wholly
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index e0defa9e96..e524446ea8 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -21,7 +21,7 @@
-include_lib("public_key/include/public_key.hrl").
-export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]).
--export([peer_cert_subject_item/2]).
+-export([peer_cert_subject_items/2]).
%%--------------------------------------------------------------------------
@@ -34,8 +34,8 @@
-spec(peer_cert_issuer/1 :: (certificate()) -> string()).
-spec(peer_cert_subject/1 :: (certificate()) -> string()).
-spec(peer_cert_validity/1 :: (certificate()) -> string()).
--spec(peer_cert_subject_item/2 ::
- (certificate(), tuple()) -> string() | 'not_found').
+-spec(peer_cert_subject_items/2 ::
+ (certificate(), tuple()) -> [string()] | 'not_found').
-endif.
@@ -59,8 +59,8 @@ peer_cert_subject(Cert) ->
format_rdn_sequence(Subject)
end, Cert).
-%% Return a part of the certificate's subject.
-peer_cert_subject_item(Cert, Type) ->
+%% Return the parts of the certificate's subject.
+peer_cert_subject_items(Cert, Type) ->
cert_info(fun(#'OTPCertificate' {
tbsCertificate = #'OTPTBSCertificate' {
subject = Subject }}) ->
@@ -89,8 +89,8 @@ find_by_type(Type, {rdnSequence, RDNs}) ->
case [V || #'AttributeTypeAndValue'{type = T, value = V}
<- lists:flatten(RDNs),
T == Type] of
- [Val] -> format_asn1_value(Val);
- [] -> not_found
+ [] -> not_found;
+ L -> [format_asn1_value(V) || V <- L]
end.
%%--------------------------------------------------------------------------
@@ -150,9 +150,11 @@ escape_rdn_value([$ ], middle) ->
escape_rdn_value([C | S], middle) when C =:= $"; C =:= $+; C =:= $,; C =:= $;;
C =:= $<; C =:= $>; C =:= $\\ ->
[$\\, C | escape_rdn_value(S, middle)];
-escape_rdn_value([C | S], middle) when C < 32 ; C =:= 127 ->
- %% only U+0000 needs escaping, but for display purposes it's handy
- %% to escape all non-printable chars
+escape_rdn_value([C | S], middle) when C < 32 ; C >= 126 ->
+ %% Of ASCII characters only U+0000 needs escaping, but for display
+ %% purposes it's handy to escape all non-printable chars. All non-ASCII
+ %% characters get converted to UTF-8 sequences and then escaped. We've
+ %% already got a UTF-8 sequence here, so just escape it.
lists:flatten(io_lib:format("\\~2.16.0B", [C])) ++
escape_rdn_value(S, middle);
escape_rdn_value([C | S], middle) ->
@@ -167,6 +169,10 @@ format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2,
Min1, Min2, S1, S2, $Z]}) ->
io_lib:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ",
[Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]);
+%% We appear to get an untagged value back for an ia5string
+%% (e.g. domainComponent).
+format_asn1_value(V) when is_list(V) ->
+ V;
format_asn1_value(V) ->
io_lib:format("~p", [V]).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 2f03b2fb0b..ff55194707 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2232,7 +2232,7 @@ with_fresh_variable_queue(Fun) ->
_ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)),
passed.
-publish_and_confirm(QPid, Payload, Count) ->
+publish_and_confirm(Q, Payload, Count) ->
Seqs = lists:seq(1, Count),
[begin
Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
@@ -2240,7 +2240,7 @@ publish_and_confirm(QPid, Payload, Count) ->
Payload),
Delivery = #delivery{mandatory = false, immediate = false,
sender = self(), message = Msg, msg_seq_no = Seq},
- true = rabbit_amqqueue:deliver(QPid, Delivery)
+ {routed, _} = rabbit_amqqueue:deliver([Q], Delivery)
end || Seq <- Seqs],
wait_for_confirms(gb_sets:from_list(Seqs)).
@@ -2484,7 +2484,7 @@ test_queue_recover() ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
{new, #amqqueue { pid = QPid, name = QName } = Q} =
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
- publish_and_confirm(QPid, <<>>, Count),
+ publish_and_confirm(Q, <<>>, Count),
exit(QPid, kill),
MRef = erlang:monitor(process, QPid),
@@ -2514,7 +2514,7 @@ test_variable_queue_delete_msg_store_files_callback() ->
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
Payload = <<0:8388608>>, %% 1MB
Count = 30,
- publish_and_confirm(QPid, Payload, Count),
+ publish_and_confirm(Q, Payload, Count),
rabbit_amqqueue:set_ram_duration_target(QPid, 0),
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 2db960acc9..ae2b5d3f46 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -28,12 +28,9 @@
binding/0, binding_source/0, binding_destination/0,
amqqueue/0, exchange/0,
connection/0, protocol/0, user/0, internal_user/0,
- username/0, password/0, password_hash/0, ok/1, error/1,
- ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0,
- connection_exit/0]).
-
--type(channel_exit() :: no_return()).
--type(connection_exit() :: no_return()).
+ username/0, password/0, password_hash/0,
+ ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0,
+ channel_exit/0, connection_exit/0, mfargs/0]).
-type(maybe(T) :: T | 'none').
-type(vhost() :: binary()).
@@ -156,4 +153,9 @@
-type(ok_or_error2(A, B) :: ok(A) | error(B)).
-type(ok_pid_or_error() :: ok_or_error2(pid(), any())).
+-type(channel_exit() :: no_return()).
+-type(connection_exit() :: no_return()).
+
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-endif. % use_specs
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index e0ca8cbb72..f164035ee5 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -35,6 +35,7 @@
-rabbit_upgrade({gm, mnesia, []}).
-rabbit_upgrade({exchange_scratch, mnesia, [trace_exchanges]}).
-rabbit_upgrade({mirrored_supervisor, mnesia, []}).
+-rabbit_upgrade({topic_trie_node, mnesia, []}).
%% -------------------------------------------------------------------
@@ -54,6 +55,7 @@
-spec(gm/0 :: () -> 'ok').
-spec(exchange_scratch/0 :: () -> 'ok').
-spec(mirrored_supervisor/0 :: () -> 'ok').
+-spec(topic_trie_node/0 :: () -> 'ok').
-endif.
@@ -177,6 +179,12 @@ mirrored_supervisor() ->
[{record_name, mirrored_sup_childspec},
{attributes, [key, mirroring_pid, childspec]}]).
+topic_trie_node() ->
+ create(rabbit_topic_trie_node,
+ [{record_name, topic_trie_node},
+ {attributes, [trie_node, edge_count, binding_count]},
+ {type, ordered_set}]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index f75da87221..26ea502ced 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -717,8 +717,8 @@ do_terminate(Child, SupName) when Child#child.pid =/= undefined ->
ok;
{error, normal} ->
case Child#child.restart_type of
- permanent -> ReportError(normal);
- {permanent, _Delay} -> ReportError(normal);
+ permanent -> ReportError(normal, Child);
+ {permanent, _Delay} -> ReportError(normal, Child);
_ -> ok
end;
{error, OtherReason} ->
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 0d50683db7..8678c2c9e0 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -86,6 +86,13 @@ handle_info({inet_async, LSock, Ref, {error, closed}},
%% know this will fail.
{stop, normal, State};
+handle_info({inet_async, LSock, Ref, {error, Reason}},
+ State=#state{sock=LSock, ref=Ref}) ->
+ {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
+ error_logger:error_msg("failed to accept TCP connection on ~s:~p: ~p~n",
+ [rabbit_misc:ntoab(Address), Port, Reason]),
+ accept(State);
+
handle_info(_Info, State) ->
{noreply, State}.
diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl
index 4c835598e0..cb3dd02c42 100644
--- a/src/tcp_acceptor_sup.erl
+++ b/src/tcp_acceptor_sup.erl
@@ -25,7 +25,11 @@
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()).
+
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
+-spec(start_link/2 :: (atom(), mfargs()) -> rabbit_types:ok_pid_or_error()).
+
-endif.
%%----------------------------------------------------------------------------
diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl
index ad2a0d02d0..9a82ac88c1 100644
--- a/src/tcp_listener.erl
+++ b/src/tcp_listener.erl
@@ -28,9 +28,14 @@
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-spec(start_link/8 ::
- (gen_tcp:ip_address(), integer(), rabbit_types:infos(), integer(),
- atom(), mfa(), mfa(), string()) -> rabbit_types:ok_pid_or_error()).
+ (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
+ integer(), atom(), mfargs(), mfargs(), string()) ->
+ rabbit_types:ok_pid_or_error()).
+
-endif.
%%--------------------------------------------------------------------
diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl
index 5bff5c2701..74297b6dce 100644
--- a/src/tcp_listener_sup.erl
+++ b/src/tcp_listener_sup.erl
@@ -26,12 +26,16 @@
-ifdef(use_specs).
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-spec(start_link/7 ::
- (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(),
- mfa(), string()) -> rabbit_types:ok_pid_or_error()).
+ (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
+ mfargs(), mfargs(), mfargs(), string()) ->
+ rabbit_types:ok_pid_or_error()).
-spec(start_link/8 ::
- (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(),
- mfa(), integer(), string()) -> rabbit_types:ok_pid_or_error()).
+ (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
+ mfargs(), mfargs(), mfargs(), integer(), string()) ->
+ rabbit_types:ok_pid_or_error()).
-endif.
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 456ff39f47..fcb07a16f4 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -37,10 +37,11 @@
-ifdef(use_specs).
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
--spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A).
--spec(submit_async/1 ::
- (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
+-spec(submit/1 :: (fun (() -> A) | mfargs()) -> A).
+-spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok').
-spec(idle/1 :: (any()) -> 'ok').
-endif.
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 78ab4df3ea..b42530e269 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -29,12 +29,12 @@
-ifdef(use_specs).
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}).
--spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A).
--spec(submit_async/2 ::
- (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
--spec(run/1 :: (fun (() -> A)) -> A;
- ({atom(), atom(), [any()]}) -> any()).
+-spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A).
+-spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok').
+-spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()).
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
-endif.