summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-06-24 16:46:58 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-06-24 16:46:58 +0100
commitc23c9583976ef7d1408f776e284ea083e3c75d1d (patch)
tree507a1867651b361b79addd808c5947e6e84aedbc
parent3125b4f25cb1f64ec9d27e3a910abcba31957100 (diff)
parent9099f9d241e3dfbe3ea5e0257881434e80553aa2 (diff)
downloadrabbitmq-server-git-c23c9583976ef7d1408f776e284ea083e3c75d1d.tar.gz
Merge bug26169
-rw-r--r--docs/rabbitmq-plugins.1.xml64
-rw-r--r--docs/rabbitmq.config.example16
-rw-r--r--ebin/rabbit_app.in3
-rw-r--r--include/rabbit.hrl1
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec6
-rw-r--r--packaging/debs/Debian/debian/changelog12
-rwxr-xr-xscripts/rabbitmq-env5
-rwxr-xr-xscripts/rabbitmq-server14
-rwxr-xr-xscripts/rabbitmq-server.bat1
-rwxr-xr-xscripts/rabbitmq-service.bat1
-rw-r--r--src/rabbit.erl27
-rw-r--r--src/rabbit_amqqueue.erl67
-rw-r--r--src/rabbit_amqqueue_process.erl9
-rw-r--r--src/rabbit_channel.erl26
-rw-r--r--src/rabbit_mirror_queue_misc.erl32
-rw-r--r--src/rabbit_misc.erl7
-rw-r--r--src/rabbit_networking.erl11
-rw-r--r--src/rabbit_node_monitor.erl1
-rw-r--r--src/rabbit_plugins.erl29
-rw-r--r--src/rabbit_plugins_main.erl76
-rw-r--r--src/rabbit_prelaunch.erl4
-rw-r--r--src/rabbit_reader.erl54
-rw-r--r--src/rabbit_table.erl3
-rw-r--r--src/rabbit_upgrade_functions.erl18
24 files changed, 353 insertions, 134 deletions
diff --git a/docs/rabbitmq-plugins.1.xml b/docs/rabbitmq-plugins.1.xml
index e891969fa4..f7be2d2995 100644
--- a/docs/rabbitmq-plugins.1.xml
+++ b/docs/rabbitmq-plugins.1.xml
@@ -63,6 +63,16 @@
enabled. Implicitly enabled plugins are automatically disabled again
when they are no longer required.
</para>
+
+ <para>
+ The <command>enable</command>, <command>disable</command> and
+ <command>set</command> commands will update the plugins file and
+ then attempt to connect to the broker and ensure it is running
+ all enabled plugins. By default if it is not possible to connect
+ to the running broker (for example if it is stopped) then a
+ warning is displayed. Specify <command>--online</command> or
+ <command>--offline</command> to change this behaviour.
+ </para>
</refsect1>
<refsect1>
@@ -150,14 +160,7 @@
</varlistentry>
</variablelist>
<para>
- Enables the specified plugins and all their
- dependencies. This will update the enabled plugins file
- and then attempt to connect to the broker and ensure it is
- running all enabled plugins. By default if it is not
- possible to connect to the running broker (for example if
- it is stopped) then a warning is displayed. Specify
- <command>--online</command> or
- <command>--offline</command> to change this.
+ Enables the specified plugins and all their dependencies.
</para>
<para role="example-prefix">For example:</para>
@@ -188,14 +191,7 @@
</varlistentry>
</variablelist>
<para>
- Disables the specified plugins and all their
- dependencies. This will update the enabled plugins file
- and then attempt to connect to the broker and ensure it is
- running all enabled plugins. By default if it is not
- possible to connect to the running broker (for example if
- it is stopped) then a warning is displayed. Specify
- <command>--online</command> or
- <command>--offline</command> to change this.
+ Disables the specified plugins and all their dependencies.
</para>
<para role="example-prefix">For example:</para>
@@ -206,6 +202,42 @@
</para>
</listitem>
</varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>set</command> <arg choice="opt">--offline</arg> <arg choice="opt">--online</arg> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>--offline</term>
+ <listitem><para>Just modify the enabled plugins file.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>--online</term>
+ <listitem><para>Treat failure to connect to the running broker as fatal.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>plugin</term>
+ <listitem><para>Zero or more plugins to enable.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Enables the specified plugins and all their
+ dependencies. Unlike <command>rabbitmq-plugins
+ enable</command> this command ignores and overwrites any
+ existing enabled plugins. <command>rabbitmq-plugins
+ set</command> with no plugin arguments is a legal command
+ meaning "disable all plugins".
+ </para>
+
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmq-plugins set rabbitmq_management</screen>
+ <para role="example">
+ This command enables the <command>management</command>
+ plugin and its dependencies and disables everything else.
+ </para>
+ </listitem>
+ </varlistentry>
+
</variablelist>
</refsect1>
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example
index 26de71b70d..e8b5666098 100644
--- a/docs/rabbitmq.config.example
+++ b/docs/rabbitmq.config.example
@@ -27,6 +27,11 @@
%%
%% {ssl_listeners, [5671]},
+ %% Maximum time for AMQP 0-8/0-9/0-9-1 handshake (after socket connection
+ %% and SSL handshake), in milliseconds.
+ %%
+ %% {handshake_timeout, 10000},
+
%% Log levels (currently just used for connection logging).
%% One of 'info', 'warning', 'error' or 'none', in decreasing order
%% of verbosity. Defaults to 'info'.
@@ -103,6 +108,10 @@
%%
%% {ssl_cert_login_from, common_name},
+ %% SSL handshake timeout, in milliseconds.
+ %%
+ %% {ssl_handshake_timeout, 5000},
+
%%
%% Default User / VHost
%% ====================
@@ -213,7 +222,12 @@
%% Explicitly enable/disable hipe compilation.
%%
- %% {hipe_compile, true}
+ %% {hipe_compile, true},
+
+ %% Timeout used when waiting for Mnesia tables in a cluster to
+ %% become available.
+ %%
+ %% {mnesia_table_loading_timeout, 30000}
]},
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 7360208aad..f26e0f7709 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -39,12 +39,15 @@
{server_properties, []},
{collect_statistics, none},
{collect_statistics_interval, 5000},
+ {mnesia_table_loading_timeout, 30000},
{auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
{auth_backends, [rabbit_auth_backend_internal]},
{delegate_count, 16},
{trace_vhosts, []},
{log_levels, [{connection, info}]},
{ssl_cert_login_from, distinguished_name},
+ {ssl_handshake_timeout, 5000},
+ {handshake_timeout, 10000},
{reverse_dns_lookups, false},
{cluster_partition_handling, ignore},
{tcp_listen_options, [binary,
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index c13868030a..7a40f9ebf0 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -52,6 +52,7 @@
arguments, %% immutable
pid, %% durable (just so we know home node)
slave_pids, sync_slave_pids, %% transient
+ down_slave_nodes, %% durable
policy, %% durable, implicit update as above
gm_pids, %% transient
decorators}). %% transient, recalculated as above
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index f9ecd457a5..324040579d 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -130,6 +130,12 @@ done
rm -rf %{buildroot}
%changelog
+* Tue Jun 24 2014 simon@rabbitmq.com 3.3.4-1
+- New Upstream Release
+
+* Mon Jun 16 2014 simon@rabbitmq.com 3.3.3-1
+- New Upstream Release
+
* Mon Jun 9 2014 simon@rabbitmq.com 3.3.2-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 7b28cd209c..d26991e437 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,15 @@
+rabbitmq-server (3.3.4-1) unstable; urgency=low
+
+ * New Upstream Release
+
+ -- Simon MacMullen <simon@rabbitmq.com> Tue, 24 Jun 2014 12:50:29 +0100
+
+rabbitmq-server (3.3.3-1) unstable; urgency=low
+
+ * New Upstream Release
+
+ -- Simon MacMullen <simon@rabbitmq.com> Mon, 16 Jun 2014 13:00:00 +0100
+
rabbitmq-server (3.3.2-1) unstable; urgency=low
* New Upstream Release
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index b77416703d..69d5a9c9d0 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -15,10 +15,14 @@
## Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
##
+# We set +e here since since our test for "readlink -f" below needs to
+# be able to fail.
+set +e
# Determine where this script is really located (if this script is
# invoked from another script, this is the location of the caller)
SCRIPT_PATH="$0"
while [ -h "$SCRIPT_PATH" ] ; do
+ # Determine if readlink -f is supported at all. TODO clean this up.
FULL_PATH=`readlink -f $SCRIPT_PATH 2>/dev/null`
if [ "$?" != "0" ]; then
REL_PATH=`readlink $SCRIPT_PATH`
@@ -31,6 +35,7 @@ while [ -h "$SCRIPT_PATH" ] ; do
SCRIPT_PATH=$FULL_PATH
fi
done
+set -e
SCRIPT_DIR=`dirname $SCRIPT_PATH`
RABBITMQ_HOME="${SCRIPT_DIR}/.."
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 18d24542e3..2dbda42734 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -39,7 +39,7 @@ DEFAULT_NODE_PORT=5672
[ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE}
[ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE}
[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS}
-
+[ "x" = "x$RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS" ] && RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=${SERVER_ADDITIONAL_ERL_ARGS}
[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR}
[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}
@@ -87,6 +87,8 @@ esac
RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin"
+set +e
+
RABBITMQ_CONFIG_FILE=$RABBITMQ_CONFIG_FILE \
RABBITMQ_DIST_PORT=$RABBITMQ_DIST_PORT \
${ERL_DIR}erl -pa "$RABBITMQ_EBIN_ROOT" \
@@ -98,13 +100,18 @@ RABBITMQ_DIST_PORT=$RABBITMQ_DIST_PORT \
-extra "${RABBITMQ_NODENAME}"
PRELAUNCH_RESULT=$?
-if [ ${PRELAUNCH_RESULT} = 1 ] ; then
- exit 1
+if [ ${PRELAUNCH_RESULT} = 2 ] ; then
+ # dist port is mentioned in config, so do not set it
+ true
elif [ ${PRELAUNCH_RESULT} = 0 ] ; then
# dist port is not mentioned in the config file, we can set it
RABBITMQ_DIST_ARG="-kernel inet_dist_listen_min ${RABBITMQ_DIST_PORT} -kernel inet_dist_listen_max ${RABBITMQ_DIST_PORT}"
+else
+ exit ${PRELAUNCH_RESULT}
fi
+set -e
+
RABBITMQ_CONFIG_ARG=
[ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}"
@@ -124,6 +131,7 @@ exec ${ERL_DIR}erl \
${RABBITMQ_CONFIG_ARG} \
+W w \
${RABBITMQ_SERVER_ERL_ARGS} \
+ ${RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS} \
${RABBITMQ_LISTEN_ARG} \
-sasl errlog_type error \
-sasl sasl_error_logger false \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 043204faaa..e23124068e 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -147,6 +147,7 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
-kernel inet_default_connect_options "[{nodelay, true}]" ^
!RABBITMQ_LISTEN_ARG! ^
!RABBITMQ_SERVER_ERL_ARGS! ^
+!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
-sasl errlog_type error ^
-sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 895561d4be..fb2703f25a 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -235,6 +235,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
!RABBITMQ_SERVER_START_ARGS! ^
+!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
!RABBITMQ_DIST_ARG! ^
!STARVAR!
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 4901ea17ed..4b7a9a1f3c 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -201,6 +201,7 @@
%% practice 2 processes seems just as fast as any other number > 1,
%% and keeps the progress bar realistic-ish.
-define(HIPE_PROCESSES, 2).
+-define(ASYNC_THREADS_WARNING_THRESHOLD, 8).
%%----------------------------------------------------------------------------
@@ -491,6 +492,7 @@ start(normal, []) ->
true = register(rabbit, self()),
print_banner(),
log_banner(),
+ warn_if_kernel_config_dubious(),
run_boot_steps(),
{ok, SupPid};
Error ->
@@ -816,6 +818,31 @@ log_banner() ->
end || S <- Settings]),
error_logger:info_msg("~s", [Banner]).
+warn_if_kernel_config_dubious() ->
+ case erlang:system_info(kernel_poll) of
+ true -> ok;
+ false -> error_logger:warning_msg(
+ "Kernel poll (epoll, kqueue, etc) is disabled. Throughput "
+ "and CPU utilization may worsen.~n")
+ end,
+ AsyncThreads = erlang:system_info(thread_pool_size),
+ case AsyncThreads < ?ASYNC_THREADS_WARNING_THRESHOLD of
+ true -> error_logger:warning_msg(
+ "Erlang VM is running with ~b I/O threads, "
+ "file I/O performance may worsen~n", [AsyncThreads]);
+ false -> ok
+ end,
+ IDCOpts = case application:get_env(kernel, inet_default_connect_options) of
+ undefined -> [];
+ {ok, Val} -> Val
+ end,
+ case proplists:get_value(nodelay, IDCOpts, false) of
+ false -> error_logger:warning_msg(
+ "Nagle's algorithm is enabled for sockets, "
+ "network I/O latency will be higher~n");
+ true -> ok
+ end.
+
home_dir() ->
case init:get_argument(home) of
{ok, [[Home]]} -> Home;
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8a1d162a7a..4e23dbd242 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -29,7 +29,7 @@
-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
--export([on_node_down/1]).
+-export([on_node_up/1, on_node_down/1]).
-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]).
-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1,
cancel_sync_mirrors/1]).
@@ -174,6 +174,7 @@
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
+-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()).
-spec(immutable/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()).
@@ -257,15 +258,16 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
ok = check_declare_arguments(QueueName, Args),
Q = rabbit_queue_decorator:set(
- rabbit_policy:set(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
- exclusive_owner = Owner,
- pid = none,
- slave_pids = [],
- sync_slave_pids = [],
- gm_pids = []})),
+ rabbit_policy:set(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
+ exclusive_owner = Owner,
+ pid = none,
+ slave_pids = [],
+ sync_slave_pids = [],
+ down_slave_nodes = [],
+ gm_pids = []})),
Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),
gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity).
@@ -665,15 +667,23 @@ forget_all_durable(Node) ->
fun () ->
Qs = mnesia:match_object(rabbit_durable_queue,
#amqqueue{_ = '_'}, write),
- [rabbit_binding:process_deletions(
- internal_delete1(Name)) ||
- #amqqueue{name = Name, pid = Pid} = Q <- Qs,
- node(Pid) =:= Node,
- rabbit_policy:get(<<"ha-mode">>, Q) =:= undefined],
+ [forget_node_for_queue(Q) || #amqqueue{pid = Pid} = Q <- Qs,
+ node(Pid) =:= Node],
ok
end),
ok.
+forget_node_for_queue(#amqqueue{name = Name,
+ down_slave_nodes = []}) ->
+ %% No slaves to recover from, queue is gone
+ rabbit_binding:process_deletions(internal_delete1(Name));
+
+forget_node_for_queue(Q = #amqqueue{down_slave_nodes = [H|T]}) ->
+ %% Promote a slave while down - it'll happily recover as a master
+ Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H),
+ down_slave_nodes = T},
+ ok = mnesia:write(rabbit_durable_queue, Q1, write).
+
run_backing_queue(QPid, Mod, Fun) ->
gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
@@ -689,6 +699,20 @@ stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring).
sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors).
cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors).
+on_node_up(Node) ->
+ ok = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ Qs = mnesia:match_object(rabbit_queue,
+ #amqqueue{_ = '_'}, write),
+ [case lists:member(Node, DSNs) of
+ true -> DSNs1 = DSNs -- [Node],
+ store_queue(
+ Q#amqqueue{down_slave_nodes = DSNs1});
+ false -> ok
+ end || #amqqueue{down_slave_nodes = DSNs} = Q <- Qs],
+ ok
+ end).
+
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
@@ -724,12 +748,13 @@ pseudo_queue(QueueName, Pid) ->
pid = Pid,
slave_pids = []}.
-immutable(Q) -> Q#amqqueue{pid = none,
- slave_pids = none,
- sync_slave_pids = none,
- gm_pids = none,
- policy = none,
- decorators = none}.
+immutable(Q) -> Q#amqqueue{pid = none,
+ slave_pids = none,
+ sync_slave_pids = none,
+ down_slave_nodes = none,
+ gm_pids = none,
+ policy = none,
+ decorators = none}.
deliver([], _Delivery, _Flow) ->
%% /dev/null optimisation
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 97206df350..4082c53d33 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -84,6 +84,7 @@
memory,
slave_pids,
synchronised_slave_pids,
+ down_slave_nodes,
backing_queue_status,
state
]).
@@ -810,6 +811,14 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
false -> '';
true -> SSPids
end;
+i(down_slave_nodes, #q{q = #amqqueue{name = Name,
+ durable = Durable}}) ->
+ {ok, Q = #amqqueue{down_slave_nodes = Nodes}} =
+ rabbit_amqqueue:lookup(Name),
+ case Durable andalso rabbit_mirror_queue_misc:is_mirrored(Q) of
+ false -> '';
+ true -> Nodes
+ end;
i(state, #q{status = running}) -> credit_flow:state();
i(state, #q{status = State}) -> State;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 4efee84a9a..738c4570ac 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -433,17 +433,22 @@ send(_Command, #ch{state = closing}) ->
send(Command, #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Command).
-handle_exception(Reason, State = #ch{protocol = Protocol,
- channel = Channel,
- writer_pid = WriterPid,
- reader_pid = ReaderPid,
- conn_pid = ConnPid}) ->
+handle_exception(Reason, State = #ch{protocol = Protocol,
+ channel = Channel,
+ writer_pid = WriterPid,
+ reader_pid = ReaderPid,
+ conn_pid = ConnPid,
+ conn_name = ConnName,
+ virtual_host = VHost,
+ user = User}) ->
%% something bad's happened: notify_queues may not be 'ok'
{_Result, State1} = notify_queues(State),
case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of
{Channel, CloseMethod} ->
- rabbit_log:error("connection ~p, channel ~p - soft error:~n~p~n",
- [ConnPid, Channel, Reason]),
+ rabbit_log:error("Channel error on connection ~p (~s, vhost: '~s',"
+ " user: '~s'), channel ~p:~n~p~n",
+ [ConnPid, ConnName, VHost, User#user.username,
+ Channel, Reason]),
ok = rabbit_writer:send_command(WriterPid, CloseMethod),
{noreply, State1};
{0, _} ->
@@ -996,7 +1001,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
QueueName,
fun (Q) -> ok = rabbit_amqqueue:assert_equivalence(
Q, Durable, AutoDelete, Args, Owner),
- rabbit_amqqueue:stat(Q)
+ maybe_stat(NoWait, Q)
end) of
{ok, MessageCount, ConsumerCount} ->
return_queue_declare_ok(QueueName, NoWait, MessageCount,
@@ -1052,7 +1057,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
{{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
rabbit_amqqueue:with_or_die(
- QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end),
+ QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end),
ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
State);
@@ -1208,6 +1213,9 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
E
end.
+maybe_stat(false, Q) -> rabbit_amqqueue:stat(Q);
+maybe_stat(true, _Q) -> {ok, 0, 0}.
+
consumer_monitor(ConsumerTag,
State = #ch{consumer_mapping = ConsumerMapping,
queue_monitors = QMons,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 7aec1ac81f..9e8c4a1891 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -78,9 +78,10 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
%% get here.
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [Q = #amqqueue { pid = QPid,
- slave_pids = SPids,
- gm_pids = GMPids }] ->
+ [Q = #amqqueue { pid = QPid,
+ slave_pids = SPids,
+ gm_pids = GMPids,
+ down_slave_nodes = DSNs}] ->
{DeadGM, AliveGM} = lists:partition(
fun ({GM, _}) ->
lists:member(GM, DeadGMPids)
@@ -89,6 +90,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
AlivePids = [Pid || {_GM, Pid} <- AliveGM],
Alive = [Pid || Pid <- [QPid | SPids],
lists:member(Pid, AlivePids)],
+ DSNs1 = [node(Pid) ||
+ Pid <- SPids,
+ not lists:member(Pid, AlivePids)] ++ DSNs,
{QPid1, SPids1} = promote_slave(Alive),
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
@@ -97,9 +101,10 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
- Q1 = Q#amqqueue{pid = QPid1,
- slave_pids = SPids1,
- gm_pids = AliveGM},
+ Q1 = Q#amqqueue{pid = QPid1,
+ slave_pids = SPids1,
+ gm_pids = AliveGM,
+ down_slave_nodes = DSNs1},
store_updated_slaves(Q1),
%% If we add and remove nodes at the same time we
%% might tell the old master we need to sync and
@@ -109,8 +114,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
_ ->
%% Master has changed, and we're not it.
%% [1].
- Q1 = Q#amqqueue{slave_pids = Alive,
- gm_pids = AliveGM},
+ Q1 = Q#amqqueue{slave_pids = Alive,
+ gm_pids = AliveGM,
+ down_slave_nodes = DSNs1},
store_updated_slaves(Q1)
end,
{ok, QPid1, DeadPids}
@@ -239,12 +245,16 @@ log(Level, QName, Fmt, Args) ->
rabbit_log:log(mirroring, Level, "Mirrored ~s: " ++ Fmt,
[rabbit_misc:rs(QName) | Args]).
-store_updated_slaves(Q = #amqqueue{slave_pids = SPids,
- sync_slave_pids = SSPids}) ->
+store_updated_slaves(Q = #amqqueue{pid = MPid,
+ slave_pids = SPids,
+ sync_slave_pids = SSPids,
+ down_slave_nodes = DSNs}) ->
%% TODO now that we clear sync_slave_pids in rabbit_durable_queue,
%% do we still need this filtering?
SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)],
- Q1 = Q#amqqueue{sync_slave_pids = SSPids1},
+ DSNs1 = DSNs -- [node(P) || P <- [MPid | SPids]],
+ Q1 = Q#amqqueue{sync_slave_pids = SSPids1,
+ down_slave_nodes = DSNs1},
ok = rabbit_amqqueue:store_queue(Q1),
%% Wake it up so that we emit a stats event
rabbit_amqqueue:notify_policy_changed(Q1),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 6f353da574..fd4b7b1137 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -45,7 +45,7 @@
-export([with_local_io/1, local_info_msg/2]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([sort_field_table/1]).
--export([pid_to_string/1, string_to_pid/1]).
+-export([pid_to_string/1, string_to_pid/1, node_to_fake_pid/1]).
-export([version_compare/2, version_compare/3]).
-export([version_minor_equivalent/2]).
-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
@@ -193,6 +193,7 @@
(rabbit_framing:amqp_table()) -> rabbit_framing:amqp_table()).
-spec(pid_to_string/1 :: (pid()) -> string()).
-spec(string_to_pid/1 :: (string()) -> pid()).
+-spec(node_to_fake_pid/1 :: (atom()) -> pid()).
-spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt').
-spec(version_compare/3 ::
(string(), string(), ('lt' | 'lte' | 'eq' | 'gte' | 'gt'))
@@ -709,6 +710,10 @@ string_to_pid(Str) ->
throw(Err)
end.
+%% node(node_to_fake_pid(Node)) =:= Node.
+node_to_fake_pid(Node) ->
+ string_to_pid(format("<~s.0.0.0>", [Node])).
+
version_compare(A, B, lte) ->
case version_compare(A, B) of
eq -> true;
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 9082dbd353..96448f3227 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -37,8 +37,6 @@
-include("rabbit.hrl").
-include_lib("kernel/include/inet.hrl").
--define(SSL_TIMEOUT, 5). %% seconds
-
-define(FIRST_TEST_BIND_PORT, 10000).
%%----------------------------------------------------------------------------
@@ -168,9 +166,14 @@ ensure_ssl() ->
end
end.
+ssl_timeout() ->
+ {ok, Val} = application:get_env(rabbit, ssl_handshake_timeout),
+ Val.
+
ssl_transform_fun(SslOpts) ->
fun (Sock) ->
- case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of
+ Timeout = ssl_timeout(),
+ case catch ssl:ssl_accept(Sock, SslOpts, Timeout) of
{ok, SslSock} ->
{ok, #ssl_socket{tcp = Sock, ssl = SslSock}};
{error, timeout} ->
@@ -185,7 +188,7 @@ ssl_transform_fun(SslOpts) ->
%% form, according to the TLS spec). So we give
%% the ssl_connection a little bit of time to send
%% such alerts.
- timer:sleep(?SSL_TIMEOUT * 1000),
+ timer:sleep(Timeout),
{error, {ssl_upgrade_error, Reason}};
{'EXIT', Reason} ->
{error, {ssl_upgrade_failure, Reason}}
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 1496147848..1c971c1da8 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -415,6 +415,7 @@ ensure_ping_timer(State) ->
State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL, ping_nodes).
handle_live_rabbit(Node) ->
+ ok = rabbit_amqqueue:on_node_up(Node),
ok = rabbit_alarm:on_node_up(Node),
ok = rabbit_mnesia:on_node_up(Node).
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 7817626c8b..9acaa1d418 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -130,19 +130,25 @@ dependencies(Reverse, Sources, AllPlugins) ->
true = digraph:delete(G),
Dests.
-%% Make sure we don't list OTP apps in here, and also that we create
-%% fake plugins for missing dependencies.
+%% Make sure we don't list OTP apps in here, and also that we detect
+%% missing dependencies.
ensure_dependencies(Plugins) ->
Names = plugin_names(Plugins),
NotThere = [Dep || #plugin{dependencies = Deps} <- Plugins,
Dep <- Deps,
not lists:member(Dep, Names)],
- {OTP, Missing} = lists:partition(fun is_loadable/1, NotThere),
- Plugins1 = [P#plugin{dependencies = Deps -- OTP}
- || P = #plugin{dependencies = Deps} <- Plugins],
- Fake = [#plugin{name = Name,
- dependencies = []}|| Name <- Missing],
- Plugins1 ++ Fake.
+ {OTP, Missing} = lists:partition(fun is_loadable/1, lists:usort(NotThere)),
+ case Missing of
+ [] -> ok;
+ _ -> Blame = [Name || #plugin{name = Name,
+ dependencies = Deps} <- Plugins,
+ lists:any(fun (Dep) ->
+ lists:member(Dep, Missing)
+ end, Deps)],
+ throw({error, {missing_dependencies, Missing, Blame}})
+ end,
+ [P#plugin{dependencies = Deps -- OTP}
+ || P = #plugin{dependencies = Deps} <- Plugins].
is_loadable(App) ->
case application:load(App) of
@@ -162,13 +168,6 @@ prepare_plugins(Enabled) ->
ToUnpack = dependencies(false, Enabled, AllPlugins),
ToUnpackPlugins = lookup_plugins(ToUnpack, AllPlugins),
- case Enabled -- plugin_names(ToUnpackPlugins) of
- [] -> ok;
- Missing -> error_logger:warning_msg(
- "The following enabled plugins were not found: ~p~n",
- [Missing])
- end,
-
case filelib:ensure_dir(ExpandDir ++ "/") of
ok -> ok;
{error, E2} -> throw({error, {cannot_create_plugins_expand_dir,
diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl
index 98418d8cf5..278fcf986d 100644
--- a/src/rabbit_plugins_main.erl
+++ b/src/rabbit_plugins_main.erl
@@ -44,6 +44,7 @@
[{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]},
{enable, [?OFFLINE_DEF, ?ONLINE_DEF]},
{disable, [?OFFLINE_DEF, ?ONLINE_DEF]},
+ {set, [?OFFLINE_DEF, ?ONLINE_DEF]},
{sync, []}]).
%%----------------------------------------------------------------------------
@@ -86,6 +87,10 @@ start() ->
{'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} ->
PrintInvalidCommandError(),
usage();
+ {error, {missing_dependencies, Missing, Blame}} ->
+ print_error("dependent plugins ~p not found; used by ~p.",
+ [Missing, Blame]),
+ rabbit_misc:quit(2);
{error, Reason} ->
print_error("~p", [Reason]),
rabbit_misc:quit(2);
@@ -137,18 +142,13 @@ action(enable, Node, ToEnable0, Opts, PluginsFile, PluginsDir) ->
ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins),
ToEnable = [list_to_atom(Name) || Name <- ToEnable0],
Missing = ToEnable -- plugin_names(AllPlugins),
+ case Missing of
+ [] -> ok;
+ _ -> throw({error_string, fmt_missing(Missing)})
+ end,
NewEnabled = lists:usort(Enabled ++ ToEnable),
NewImplicitlyEnabled = rabbit_plugins:dependencies(false,
NewEnabled, AllPlugins),
- MissingDeps = (NewImplicitlyEnabled -- plugin_names(AllPlugins)) -- Missing,
- case {Missing, MissingDeps} of
- {[], []} -> ok;
- {Miss, []} -> throw({error_string, fmt_missing("plugins", Miss)});
- {[], Miss} -> throw({error_string, fmt_missing("dependencies", Miss)});
- {_, _} -> throw({error_string,
- fmt_missing("plugins", Missing) ++
- fmt_missing("dependencies", MissingDeps)})
- end,
write_enabled_plugins(PluginsFile, NewEnabled),
case NewEnabled -- ImplicitlyEnabled of
[] -> io:format("Plugin configuration unchanged.~n");
@@ -158,6 +158,27 @@ action(enable, Node, ToEnable0, Opts, PluginsFile, PluginsDir) ->
action_change(
Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile);
+action(set, Node, ToSet0, Opts, PluginsFile, PluginsDir) ->
+ ToSet = [list_to_atom(Name) || Name <- ToSet0],
+ AllPlugins = rabbit_plugins:list(PluginsDir),
+ Enabled = rabbit_plugins:read_enabled(PluginsFile),
+ ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins),
+ Missing = ToSet -- plugin_names(AllPlugins),
+ case Missing of
+ [] -> ok;
+ _ -> throw({error_string, fmt_missing(Missing)})
+ end,
+ NewImplicitlyEnabled = rabbit_plugins:dependencies(false,
+ ToSet, AllPlugins),
+ write_enabled_plugins(PluginsFile, ToSet),
+ case NewImplicitlyEnabled of
+ [] -> io:format("All plugins are now disabled.~n");
+ _ -> print_list("The following plugins are now enabled:",
+ NewImplicitlyEnabled)
+ end,
+ action_change(
+ Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile);
+
action(disable, Node, ToDisable0, Opts, PluginsFile, PluginsDir) ->
case ToDisable0 of
[] -> throw({error_string, "Not enough arguments for 'disable'"});
@@ -226,12 +247,9 @@ format_plugins(Node, Pattern, Opts, PluginsFile, PluginsDir) ->
{badrpc, _} -> {"[failed to contact ~s - status not shown]", []};
Active -> {"* = running on ~s", Active}
end,
- Missing = [#plugin{name = Name, dependencies = []} ||
- Name <- ((EnabledExplicitly ++ EnabledImplicitly) --
- plugin_names(AvailablePlugins))],
{ok, RE} = re:compile(Pattern),
Plugins = [ Plugin ||
- Plugin = #plugin{name = Name} <- AvailablePlugins ++ Missing,
+ Plugin = #plugin{name = Name} <- AvailablePlugins,
re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match,
if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
OnlyEnabledAll -> lists:member(Name, EnabledExplicitly) or
@@ -244,25 +262,23 @@ format_plugins(Node, Pattern, Opts, PluginsFile, PluginsDir) ->
case Format of
minimal -> ok;
_ -> io:format(" Configured: E = explicitly enabled; "
- "e = implicitly enabled; ! = missing~n"
+ "e = implicitly enabled~n"
" | Status: ~s~n"
" |/~n", [rabbit_misc:format(StatusMsg, [Node])])
end,
[format_plugin(P, EnabledExplicitly, EnabledImplicitly, Running,
- plugin_names(Missing), Format, MaxWidth) || P <- Plugins1],
+ Format, MaxWidth) || P <- Plugins1],
ok.
format_plugin(#plugin{name = Name, version = Version,
description = Description, dependencies = Deps},
- EnabledExplicitly, EnabledImplicitly, Running,
- Missing, Format, MaxWidth) ->
+ EnabledExplicitly, EnabledImplicitly, Running, Format,
+ MaxWidth) ->
EnabledGlyph = case {lists:member(Name, EnabledExplicitly),
- lists:member(Name, EnabledImplicitly),
- lists:member(Name, Missing)} of
- {true, false, false} -> "E";
- {false, true, false} -> "e";
- {_, _, true} -> "!";
- _ -> " "
+ lists:member(Name, EnabledImplicitly)} of
+ {true, false} -> "E";
+ {false, true} -> "e";
+ _ -> " "
end,
RunningGlyph = case lists:member(Name, Running) of
true -> "*";
@@ -292,8 +308,8 @@ fmt_list(Header, Plugins) ->
lists:flatten(
[Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins]]).
-fmt_missing(Desc, Missing) ->
- fmt_list("The following " ++ Desc ++ " could not be found:", Missing).
+fmt_missing(Missing) ->
+ fmt_list("The following plugins could not be found:", Missing).
usort_plugins(Plugins) ->
lists:usort(fun plugins_cmp/2, Plugins).
@@ -331,7 +347,7 @@ sync(Node, ForceOnline, PluginsFile) ->
rpc_call(Node, ForceOnline, rabbit_plugins, ensure, [PluginsFile]).
rpc_call(Node, Online, Mod, Fun, Args) ->
- io:format("Applying plugin configuration to ~s...", [Node]),
+ io:format("~nApplying plugin configuration to ~s...", [Node]),
case rpc:call(Node, Mod, Fun, Args) of
{ok, [], []} ->
io:format(" nothing to do.~n", []);
@@ -348,10 +364,10 @@ rpc_call(Node, Online, Mod, Fun, Args) ->
true -> Error;
false -> io:format(
" * Could not contact node ~s.~n"
- " * Changes will take effect at broker restart.~n"
- " * Specify --online for diagnostics and to treat "
- "this as a failure.~n"
- " * Specify --offline to disable changes to running "
+ " Changes will take effect at broker restart.~n"
+ " * Options: --online - fail if broker cannot be "
+ "contacted.~n"
+ " --offline - do not try to contact "
"broker.~n",
[Node])
end;
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 4cc9cd12f1..6a6a4ee680 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -90,9 +90,9 @@ dist_port_set_check() ->
{none, none} -> ok;
_ -> rabbit_misc:quit(?DIST_PORT_CONFIGURED)
end;
+ {ok, _} ->
+ ok;
{error, _} ->
- %% TODO can we present errors more nicely here
- %% than after -config has failed?
ok
end
end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 906c4b6e2f..db6d1eb02a 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -27,7 +27,6 @@
-export([conserve_resources/3, server_properties/1]).
--define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 30).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
@@ -189,10 +188,10 @@ server_capabilities(_) ->
log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
socket_error(Reason) when is_atom(Reason) ->
- log(error, "error on AMQP connection ~p: ~s~n",
+ log(error, "Error on AMQP connection ~p: ~s~n",
[self(), rabbit_misc:format_inet_error(Reason)]);
socket_error(Reason) ->
- log(error, "error on AMQP connection ~p:~n~p~n", [self(), Reason]).
+ log(error, "Error on AMQP connection ~p:~n~p~n", [self(), Reason]).
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
@@ -216,8 +215,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
exit(normal)
end,
log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]),
+ {ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout),
ClientSock = socket_op(Sock, SockTransform),
- erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout),
+ erlang:send_after(HandshakeTimeout, self(), handshake_timeout),
{PeerHost, PeerPort, Host, Port} =
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
?store_proc_name(list_to_binary(Name)),
@@ -231,7 +231,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
peer_port = PeerPort,
protocol = none,
user = none,
- timeout_sec = ?HANDSHAKE_TIMEOUT,
+ timeout_sec = (HandshakeTimeout / 1000),
frame_max = ?FRAME_MIN_SIZE,
vhost = none,
client_properties = none,
@@ -548,21 +548,27 @@ wait_for_channel_termination(0, TimerRef, State) ->
end;
_ -> State
end;
-wait_for_channel_termination(N, TimerRef, State) ->
+wait_for_channel_termination(N, TimerRef,
+ State = #v1{connection_state = CS,
+ connection = #connection{
+ name = ConnName,
+ user = User,
+ vhost = VHost}}) ->
receive
{'DOWN', _MRef, process, ChPid, Reason} ->
{Channel, State1} = channel_cleanup(ChPid, State),
case {Channel, termination_kind(Reason)} of
- {undefined, _} -> exit({abnormal_dependent_exit,
- ChPid, Reason});
- {_, controlled} -> wait_for_channel_termination(
- N-1, TimerRef, State1);
- {_, uncontrolled} -> log(error,
- "AMQP connection ~p, channel ~p - "
- "error while terminating:~n~p~n",
- [self(), Channel, Reason]),
- wait_for_channel_termination(
- N-1, TimerRef, State1)
+ {undefined, _} ->
+ exit({abnormal_dependent_exit, ChPid, Reason});
+ {_, controlled} ->
+ wait_for_channel_termination(N-1, TimerRef, State1);
+ {_, uncontrolled} ->
+ log(error, "Error on AMQP connection ~p (~s, vhost: '~s',"
+ " user: '~s', state: ~p), channel ~p:"
+ "error while terminating:~n~p~n",
+ [self(), ConnName, VHost, User#user.username,
+ CS, Channel, Reason]),
+ wait_for_channel_termination(N-1, TimerRef, State1)
end;
cancel_wait ->
exit(channel_termination_timeout)
@@ -581,16 +587,24 @@ maybe_close(State) ->
termination_kind(normal) -> controlled;
termination_kind(_) -> uncontrolled.
+log_hard_error(#v1{connection_state = CS,
+ connection = #connection{
+ name = ConnName,
+ user = User,
+ vhost = VHost}}, Channel, Reason) ->
+ log(error,
+ "Error on AMQP connection ~p (~s, vhost: '~s',"
+ " user: '~s', state: ~p), channel ~p:~n~p~n",
+ [self(), ConnName, VHost, User#user.username, CS, Channel, Reason]).
+
handle_exception(State = #v1{connection_state = closed}, Channel, Reason) ->
- log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
- [self(), closed, Channel, Reason]),
+ log_hard_error(State, Channel, Reason),
State;
handle_exception(State = #v1{connection = #connection{protocol = Protocol},
connection_state = CS},
Channel, Reason)
when ?IS_RUNNING(State) orelse CS =:= closing ->
- log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
- [self(), CS, Channel, Reason]),
+ log_hard_error(State, Channel, Reason),
{0, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
State1 = close_connection(terminate_channels(State)),
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index da75932d0e..47c77cd0f7 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -70,7 +70,8 @@ wait_for_replicated() ->
not lists:member({local_content, true}, TabDef)]).
wait(TableNames) ->
- case mnesia:wait_for_tables(TableNames, 30000) of
+ {ok, Timeout} = application:get_env(rabbit, mnesia_table_loading_timeout),
+ case mnesia:wait_for_tables(TableNames, Timeout) of
ok ->
ok;
{timeout, BadTabs} ->
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index b6d378525e..1104f3731a 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -48,6 +48,7 @@
-rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}).
-rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}).
-rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}).
+-rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}).
%% -------------------------------------------------------------------
@@ -77,6 +78,8 @@
-spec(policy_apply_to/0 :: () -> 'ok').
-spec(queue_decorators/0 :: () -> 'ok').
-spec(internal_system_x/0 :: () -> 'ok').
+-spec(cluster_name/0 :: () -> 'ok').
+-spec(down_slave_nodes/0 :: () -> 'ok').
-endif.
@@ -382,6 +385,21 @@ cluster_name_tx() ->
[mnesia:delete(T, K, write) || K <- Ks],
ok.
+down_slave_nodes() ->
+ ok = down_slave_nodes(rabbit_queue),
+ ok = down_slave_nodes(rabbit_durable_queue).
+
+down_slave_nodes(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, Policy, GmPids, Decorators}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, [], Policy, GmPids, Decorators}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->