diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-06-24 16:46:58 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-06-24 16:46:58 +0100 |
| commit | c23c9583976ef7d1408f776e284ea083e3c75d1d (patch) | |
| tree | 507a1867651b361b79addd808c5947e6e84aedbc | |
| parent | 3125b4f25cb1f64ec9d27e3a910abcba31957100 (diff) | |
| parent | 9099f9d241e3dfbe3ea5e0257881434e80553aa2 (diff) | |
| download | rabbitmq-server-git-c23c9583976ef7d1408f776e284ea083e3c75d1d.tar.gz | |
Merge bug26169
| -rw-r--r-- | docs/rabbitmq-plugins.1.xml | 64 | ||||
| -rw-r--r-- | docs/rabbitmq.config.example | 16 | ||||
| -rw-r--r-- | ebin/rabbit_app.in | 3 | ||||
| -rw-r--r-- | include/rabbit.hrl | 1 | ||||
| -rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 6 | ||||
| -rw-r--r-- | packaging/debs/Debian/debian/changelog | 12 | ||||
| -rwxr-xr-x | scripts/rabbitmq-env | 5 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server | 14 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server.bat | 1 | ||||
| -rwxr-xr-x | scripts/rabbitmq-service.bat | 1 | ||||
| -rw-r--r-- | src/rabbit.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 67 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_plugins.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_plugins_main.erl | 76 | ||||
| -rw-r--r-- | src/rabbit_prelaunch.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 54 | ||||
| -rw-r--r-- | src/rabbit_table.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 18 |
24 files changed, 353 insertions, 134 deletions
diff --git a/docs/rabbitmq-plugins.1.xml b/docs/rabbitmq-plugins.1.xml index e891969fa4..f7be2d2995 100644 --- a/docs/rabbitmq-plugins.1.xml +++ b/docs/rabbitmq-plugins.1.xml @@ -63,6 +63,16 @@ enabled. Implicitly enabled plugins are automatically disabled again when they are no longer required. </para> + + <para> + The <command>enable</command>, <command>disable</command> and + <command>set</command> commands will update the plugins file and + then attempt to connect to the broker and ensure it is running + all enabled plugins. By default if it is not possible to connect + to the running broker (for example if it is stopped) then a + warning is displayed. Specify <command>--online</command> or + <command>--offline</command> to change this behaviour. + </para> </refsect1> <refsect1> @@ -150,14 +160,7 @@ </varlistentry> </variablelist> <para> - Enables the specified plugins and all their - dependencies. This will update the enabled plugins file - and then attempt to connect to the broker and ensure it is - running all enabled plugins. By default if it is not - possible to connect to the running broker (for example if - it is stopped) then a warning is displayed. Specify - <command>--online</command> or - <command>--offline</command> to change this. + Enables the specified plugins and all their dependencies. </para> <para role="example-prefix">For example:</para> @@ -188,14 +191,7 @@ </varlistentry> </variablelist> <para> - Disables the specified plugins and all their - dependencies. This will update the enabled plugins file - and then attempt to connect to the broker and ensure it is - running all enabled plugins. By default if it is not - possible to connect to the running broker (for example if - it is stopped) then a warning is displayed. Specify - <command>--online</command> or - <command>--offline</command> to change this. + Disables the specified plugins and all their dependencies. </para> <para role="example-prefix">For example:</para> @@ -206,6 +202,42 @@ </para> </listitem> </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>set</command> <arg choice="opt">--offline</arg> <arg choice="opt">--online</arg> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>--offline</term> + <listitem><para>Just modify the enabled plugins file.</para></listitem> + </varlistentry> + <varlistentry> + <term>--online</term> + <listitem><para>Treat failure to connect to the running broker as fatal.</para></listitem> + </varlistentry> + <varlistentry> + <term>plugin</term> + <listitem><para>Zero or more plugins to enable.</para></listitem> + </varlistentry> + </variablelist> + <para> + Enables the specified plugins and all their + dependencies. Unlike <command>rabbitmq-plugins + enable</command> this command ignores and overwrites any + existing enabled plugins. <command>rabbitmq-plugins + set</command> with no plugin arguments is a legal command + meaning "disable all plugins". + </para> + + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmq-plugins set rabbitmq_management</screen> + <para role="example"> + This command enables the <command>management</command> + plugin and its dependencies and disables everything else. + </para> + </listitem> + </varlistentry> + </variablelist> </refsect1> diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index 26de71b70d..e8b5666098 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -27,6 +27,11 @@ %% %% {ssl_listeners, [5671]}, + %% Maximum time for AMQP 0-8/0-9/0-9-1 handshake (after socket connection + %% and SSL handshake), in milliseconds. + %% + %% {handshake_timeout, 10000}, + %% Log levels (currently just used for connection logging). %% One of 'info', 'warning', 'error' or 'none', in decreasing order %% of verbosity. Defaults to 'info'. @@ -103,6 +108,10 @@ %% %% {ssl_cert_login_from, common_name}, + %% SSL handshake timeout, in milliseconds. + %% + %% {ssl_handshake_timeout, 5000}, + %% %% Default User / VHost %% ==================== @@ -213,7 +222,12 @@ %% Explicitly enable/disable hipe compilation. %% - %% {hipe_compile, true} + %% {hipe_compile, true}, + + %% Timeout used when waiting for Mnesia tables in a cluster to + %% become available. + %% + %% {mnesia_table_loading_timeout, 30000} ]}, diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 7360208aad..f26e0f7709 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -39,12 +39,15 @@ {server_properties, []}, {collect_statistics, none}, {collect_statistics_interval, 5000}, + {mnesia_table_loading_timeout, 30000}, {auth_mechanisms, ['PLAIN', 'AMQPLAIN']}, {auth_backends, [rabbit_auth_backend_internal]}, {delegate_count, 16}, {trace_vhosts, []}, {log_levels, [{connection, info}]}, {ssl_cert_login_from, distinguished_name}, + {ssl_handshake_timeout, 5000}, + {handshake_timeout, 10000}, {reverse_dns_lookups, false}, {cluster_partition_handling, ignore}, {tcp_listen_options, [binary, diff --git a/include/rabbit.hrl b/include/rabbit.hrl index c13868030a..7a40f9ebf0 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -52,6 +52,7 @@ arguments, %% immutable pid, %% durable (just so we know home node) slave_pids, sync_slave_pids, %% transient + down_slave_nodes, %% durable policy, %% durable, implicit update as above gm_pids, %% transient decorators}). %% transient, recalculated as above diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index f9ecd457a5..324040579d 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -130,6 +130,12 @@ done rm -rf %{buildroot} %changelog +* Tue Jun 24 2014 simon@rabbitmq.com 3.3.4-1 +- New Upstream Release + +* Mon Jun 16 2014 simon@rabbitmq.com 3.3.3-1 +- New Upstream Release + * Mon Jun 9 2014 simon@rabbitmq.com 3.3.2-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 7b28cd209c..d26991e437 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,15 @@ +rabbitmq-server (3.3.4-1) unstable; urgency=low + + * New Upstream Release + + -- Simon MacMullen <simon@rabbitmq.com> Tue, 24 Jun 2014 12:50:29 +0100 + +rabbitmq-server (3.3.3-1) unstable; urgency=low + + * New Upstream Release + + -- Simon MacMullen <simon@rabbitmq.com> Mon, 16 Jun 2014 13:00:00 +0100 + rabbitmq-server (3.3.2-1) unstable; urgency=low * New Upstream Release diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env index b77416703d..69d5a9c9d0 100755 --- a/scripts/rabbitmq-env +++ b/scripts/rabbitmq-env @@ -15,10 +15,14 @@ ## Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. ## +# We set +e here since since our test for "readlink -f" below needs to +# be able to fail. +set +e # Determine where this script is really located (if this script is # invoked from another script, this is the location of the caller) SCRIPT_PATH="$0" while [ -h "$SCRIPT_PATH" ] ; do + # Determine if readlink -f is supported at all. TODO clean this up. FULL_PATH=`readlink -f $SCRIPT_PATH 2>/dev/null` if [ "$?" != "0" ]; then REL_PATH=`readlink $SCRIPT_PATH` @@ -31,6 +35,7 @@ while [ -h "$SCRIPT_PATH" ] ; do SCRIPT_PATH=$FULL_PATH fi done +set -e SCRIPT_DIR=`dirname $SCRIPT_PATH` RABBITMQ_HOME="${SCRIPT_DIR}/.." diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 18d24542e3..2dbda42734 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -39,7 +39,7 @@ DEFAULT_NODE_PORT=5672 [ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE} [ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE} [ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS} - +[ "x" = "x$RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS" ] && RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=${SERVER_ADDITIONAL_ERL_ARGS} [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR} [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME} @@ -87,6 +87,8 @@ esac RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin" +set +e + RABBITMQ_CONFIG_FILE=$RABBITMQ_CONFIG_FILE \ RABBITMQ_DIST_PORT=$RABBITMQ_DIST_PORT \ ${ERL_DIR}erl -pa "$RABBITMQ_EBIN_ROOT" \ @@ -98,13 +100,18 @@ RABBITMQ_DIST_PORT=$RABBITMQ_DIST_PORT \ -extra "${RABBITMQ_NODENAME}" PRELAUNCH_RESULT=$? -if [ ${PRELAUNCH_RESULT} = 1 ] ; then - exit 1 +if [ ${PRELAUNCH_RESULT} = 2 ] ; then + # dist port is mentioned in config, so do not set it + true elif [ ${PRELAUNCH_RESULT} = 0 ] ; then # dist port is not mentioned in the config file, we can set it RABBITMQ_DIST_ARG="-kernel inet_dist_listen_min ${RABBITMQ_DIST_PORT} -kernel inet_dist_listen_max ${RABBITMQ_DIST_PORT}" +else + exit ${PRELAUNCH_RESULT} fi +set -e + RABBITMQ_CONFIG_ARG= [ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}" @@ -124,6 +131,7 @@ exec ${ERL_DIR}erl \ ${RABBITMQ_CONFIG_ARG} \ +W w \ ${RABBITMQ_SERVER_ERL_ARGS} \ + ${RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS} \ ${RABBITMQ_LISTEN_ARG} \ -sasl errlog_type error \ -sasl sasl_error_logger false \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 043204faaa..e23124068e 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -147,6 +147,7 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( -kernel inet_default_connect_options "[{nodelay, true}]" ^
!RABBITMQ_LISTEN_ARG! ^
!RABBITMQ_SERVER_ERL_ARGS! ^
+!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
-sasl errlog_type error ^
-sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 895561d4be..fb2703f25a 100755 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -235,6 +235,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^ -os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
!RABBITMQ_SERVER_START_ARGS! ^
+!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
!RABBITMQ_DIST_ARG! ^
!STARVAR!
diff --git a/src/rabbit.erl b/src/rabbit.erl index 4901ea17ed..4b7a9a1f3c 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -201,6 +201,7 @@ %% practice 2 processes seems just as fast as any other number > 1, %% and keeps the progress bar realistic-ish. -define(HIPE_PROCESSES, 2). +-define(ASYNC_THREADS_WARNING_THRESHOLD, 8). %%---------------------------------------------------------------------------- @@ -491,6 +492,7 @@ start(normal, []) -> true = register(rabbit, self()), print_banner(), log_banner(), + warn_if_kernel_config_dubious(), run_boot_steps(), {ok, SupPid}; Error -> @@ -816,6 +818,31 @@ log_banner() -> end || S <- Settings]), error_logger:info_msg("~s", [Banner]). +warn_if_kernel_config_dubious() -> + case erlang:system_info(kernel_poll) of + true -> ok; + false -> error_logger:warning_msg( + "Kernel poll (epoll, kqueue, etc) is disabled. Throughput " + "and CPU utilization may worsen.~n") + end, + AsyncThreads = erlang:system_info(thread_pool_size), + case AsyncThreads < ?ASYNC_THREADS_WARNING_THRESHOLD of + true -> error_logger:warning_msg( + "Erlang VM is running with ~b I/O threads, " + "file I/O performance may worsen~n", [AsyncThreads]); + false -> ok + end, + IDCOpts = case application:get_env(kernel, inet_default_connect_options) of + undefined -> []; + {ok, Val} -> Val + end, + case proplists:get_value(nodelay, IDCOpts, false) of + false -> error_logger:warning_msg( + "Nagle's algorithm is enabled for sockets, " + "network I/O latency will be higher~n"); + true -> ok + end. + home_dir() -> case init:get_argument(home) of {ok, [[Home]]} -> Home; diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8a1d162a7a..4e23dbd242 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -29,7 +29,7 @@ -export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2]). -export([notify_down_all/2, activate_limit_all/2, credit/5]). --export([on_node_down/1]). +-export([on_node_up/1, on_node_down/1]). -export([update/2, store_queue/1, update_decorators/1, policy_changed/2]). -export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]). @@ -174,6 +174,7 @@ (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()). -spec(immutable/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()). @@ -257,15 +258,16 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> ok = check_declare_arguments(QueueName, Args), Q = rabbit_queue_decorator:set( - rabbit_policy:set(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = none, - slave_pids = [], - sync_slave_pids = [], - gm_pids = []})), + rabbit_policy:set(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = none, + slave_pids = [], + sync_slave_pids = [], + down_slave_nodes = [], + gm_pids = []})), Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity). @@ -665,15 +667,23 @@ forget_all_durable(Node) -> fun () -> Qs = mnesia:match_object(rabbit_durable_queue, #amqqueue{_ = '_'}, write), - [rabbit_binding:process_deletions( - internal_delete1(Name)) || - #amqqueue{name = Name, pid = Pid} = Q <- Qs, - node(Pid) =:= Node, - rabbit_policy:get(<<"ha-mode">>, Q) =:= undefined], + [forget_node_for_queue(Q) || #amqqueue{pid = Pid} = Q <- Qs, + node(Pid) =:= Node], ok end), ok. +forget_node_for_queue(#amqqueue{name = Name, + down_slave_nodes = []}) -> + %% No slaves to recover from, queue is gone + rabbit_binding:process_deletions(internal_delete1(Name)); + +forget_node_for_queue(Q = #amqqueue{down_slave_nodes = [H|T]}) -> + %% Promote a slave while down - it'll happily recover as a master + Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H), + down_slave_nodes = T}, + ok = mnesia:write(rabbit_durable_queue, Q1, write). + run_backing_queue(QPid, Mod, Fun) -> gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). @@ -689,6 +699,20 @@ stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring). sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors). cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors). +on_node_up(Node) -> + ok = rabbit_misc:execute_mnesia_transaction( + fun () -> + Qs = mnesia:match_object(rabbit_queue, + #amqqueue{_ = '_'}, write), + [case lists:member(Node, DSNs) of + true -> DSNs1 = DSNs -- [Node], + store_queue( + Q#amqqueue{down_slave_nodes = DSNs1}); + false -> ok + end || #amqqueue{down_slave_nodes = DSNs} = Q <- Qs], + ok + end). + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = @@ -724,12 +748,13 @@ pseudo_queue(QueueName, Pid) -> pid = Pid, slave_pids = []}. -immutable(Q) -> Q#amqqueue{pid = none, - slave_pids = none, - sync_slave_pids = none, - gm_pids = none, - policy = none, - decorators = none}. +immutable(Q) -> Q#amqqueue{pid = none, + slave_pids = none, + sync_slave_pids = none, + down_slave_nodes = none, + gm_pids = none, + policy = none, + decorators = none}. deliver([], _Delivery, _Flow) -> %% /dev/null optimisation diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 97206df350..4082c53d33 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -84,6 +84,7 @@ memory, slave_pids, synchronised_slave_pids, + down_slave_nodes, backing_queue_status, state ]). @@ -810,6 +811,14 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> false -> ''; true -> SSPids end; +i(down_slave_nodes, #q{q = #amqqueue{name = Name, + durable = Durable}}) -> + {ok, Q = #amqqueue{down_slave_nodes = Nodes}} = + rabbit_amqqueue:lookup(Name), + case Durable andalso rabbit_mirror_queue_misc:is_mirrored(Q) of + false -> ''; + true -> Nodes + end; i(state, #q{status = running}) -> credit_flow:state(); i(state, #q{status = State}) -> State; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4efee84a9a..738c4570ac 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -433,17 +433,22 @@ send(_Command, #ch{state = closing}) -> send(Command, #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Command). -handle_exception(Reason, State = #ch{protocol = Protocol, - channel = Channel, - writer_pid = WriterPid, - reader_pid = ReaderPid, - conn_pid = ConnPid}) -> +handle_exception(Reason, State = #ch{protocol = Protocol, + channel = Channel, + writer_pid = WriterPid, + reader_pid = ReaderPid, + conn_pid = ConnPid, + conn_name = ConnName, + virtual_host = VHost, + user = User}) -> %% something bad's happened: notify_queues may not be 'ok' {_Result, State1} = notify_queues(State), case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of {Channel, CloseMethod} -> - rabbit_log:error("connection ~p, channel ~p - soft error:~n~p~n", - [ConnPid, Channel, Reason]), + rabbit_log:error("Channel error on connection ~p (~s, vhost: '~s'," + " user: '~s'), channel ~p:~n~p~n", + [ConnPid, ConnName, VHost, User#user.username, + Channel, Reason]), ok = rabbit_writer:send_command(WriterPid, CloseMethod), {noreply, State1}; {0, _} -> @@ -996,7 +1001,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, QueueName, fun (Q) -> ok = rabbit_amqqueue:assert_equivalence( Q, Durable, AutoDelete, Args, Owner), - rabbit_amqqueue:stat(Q) + maybe_stat(NoWait, Q) end) of {ok, MessageCount, ConsumerCount} -> return_queue_declare_ok(QueueName, NoWait, MessageCount, @@ -1052,7 +1057,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} = rabbit_amqqueue:with_or_die( - QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end), + QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end), ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid), return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); @@ -1208,6 +1213,9 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, E end. +maybe_stat(false, Q) -> rabbit_amqqueue:stat(Q); +maybe_stat(true, _Q) -> {ok, 0, 0}. + consumer_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, queue_monitors = QMons, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 7aec1ac81f..9e8c4a1891 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -78,9 +78,10 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> %% get here. case mnesia:read({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [Q = #amqqueue { pid = QPid, - slave_pids = SPids, - gm_pids = GMPids }] -> + [Q = #amqqueue { pid = QPid, + slave_pids = SPids, + gm_pids = GMPids, + down_slave_nodes = DSNs}] -> {DeadGM, AliveGM} = lists:partition( fun ({GM, _}) -> lists:member(GM, DeadGMPids) @@ -89,6 +90,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> AlivePids = [Pid || {_GM, Pid} <- AliveGM], Alive = [Pid || Pid <- [QPid | SPids], lists:member(Pid, AlivePids)], + DSNs1 = [node(Pid) || + Pid <- SPids, + not lists:member(Pid, AlivePids)] ++ DSNs, {QPid1, SPids1} = promote_slave(Alive), case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> @@ -97,9 +101,10 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. - Q1 = Q#amqqueue{pid = QPid1, - slave_pids = SPids1, - gm_pids = AliveGM}, + Q1 = Q#amqqueue{pid = QPid1, + slave_pids = SPids1, + gm_pids = AliveGM, + down_slave_nodes = DSNs1}, store_updated_slaves(Q1), %% If we add and remove nodes at the same time we %% might tell the old master we need to sync and @@ -109,8 +114,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> _ -> %% Master has changed, and we're not it. %% [1]. - Q1 = Q#amqqueue{slave_pids = Alive, - gm_pids = AliveGM}, + Q1 = Q#amqqueue{slave_pids = Alive, + gm_pids = AliveGM, + down_slave_nodes = DSNs1}, store_updated_slaves(Q1) end, {ok, QPid1, DeadPids} @@ -239,12 +245,16 @@ log(Level, QName, Fmt, Args) -> rabbit_log:log(mirroring, Level, "Mirrored ~s: " ++ Fmt, [rabbit_misc:rs(QName) | Args]). -store_updated_slaves(Q = #amqqueue{slave_pids = SPids, - sync_slave_pids = SSPids}) -> +store_updated_slaves(Q = #amqqueue{pid = MPid, + slave_pids = SPids, + sync_slave_pids = SSPids, + down_slave_nodes = DSNs}) -> %% TODO now that we clear sync_slave_pids in rabbit_durable_queue, %% do we still need this filtering? SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)], - Q1 = Q#amqqueue{sync_slave_pids = SSPids1}, + DSNs1 = DSNs -- [node(P) || P <- [MPid | SPids]], + Q1 = Q#amqqueue{sync_slave_pids = SSPids1, + down_slave_nodes = DSNs1}, ok = rabbit_amqqueue:store_queue(Q1), %% Wake it up so that we emit a stats event rabbit_amqqueue:notify_policy_changed(Q1), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 6f353da574..fd4b7b1137 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -45,7 +45,7 @@ -export([with_local_io/1, local_info_msg/2]). -export([unfold/2, ceil/1, queue_fold/3]). -export([sort_field_table/1]). --export([pid_to_string/1, string_to_pid/1]). +-export([pid_to_string/1, string_to_pid/1, node_to_fake_pid/1]). -export([version_compare/2, version_compare/3]). -export([version_minor_equivalent/2]). -export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]). @@ -193,6 +193,7 @@ (rabbit_framing:amqp_table()) -> rabbit_framing:amqp_table()). -spec(pid_to_string/1 :: (pid()) -> string()). -spec(string_to_pid/1 :: (string()) -> pid()). +-spec(node_to_fake_pid/1 :: (atom()) -> pid()). -spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt'). -spec(version_compare/3 :: (string(), string(), ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) @@ -709,6 +710,10 @@ string_to_pid(Str) -> throw(Err) end. +%% node(node_to_fake_pid(Node)) =:= Node. +node_to_fake_pid(Node) -> + string_to_pid(format("<~s.0.0.0>", [Node])). + version_compare(A, B, lte) -> case version_compare(A, B) of eq -> true; diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 9082dbd353..96448f3227 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -37,8 +37,6 @@ -include("rabbit.hrl"). -include_lib("kernel/include/inet.hrl"). --define(SSL_TIMEOUT, 5). %% seconds - -define(FIRST_TEST_BIND_PORT, 10000). %%---------------------------------------------------------------------------- @@ -168,9 +166,14 @@ ensure_ssl() -> end end. +ssl_timeout() -> + {ok, Val} = application:get_env(rabbit, ssl_handshake_timeout), + Val. + ssl_transform_fun(SslOpts) -> fun (Sock) -> - case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of + Timeout = ssl_timeout(), + case catch ssl:ssl_accept(Sock, SslOpts, Timeout) of {ok, SslSock} -> {ok, #ssl_socket{tcp = Sock, ssl = SslSock}}; {error, timeout} -> @@ -185,7 +188,7 @@ ssl_transform_fun(SslOpts) -> %% form, according to the TLS spec). So we give %% the ssl_connection a little bit of time to send %% such alerts. - timer:sleep(?SSL_TIMEOUT * 1000), + timer:sleep(Timeout), {error, {ssl_upgrade_error, Reason}}; {'EXIT', Reason} -> {error, {ssl_upgrade_failure, Reason}} diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 1496147848..1c971c1da8 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -415,6 +415,7 @@ ensure_ping_timer(State) -> State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL, ping_nodes). handle_live_rabbit(Node) -> + ok = rabbit_amqqueue:on_node_up(Node), ok = rabbit_alarm:on_node_up(Node), ok = rabbit_mnesia:on_node_up(Node). diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 7817626c8b..9acaa1d418 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -130,19 +130,25 @@ dependencies(Reverse, Sources, AllPlugins) -> true = digraph:delete(G), Dests. -%% Make sure we don't list OTP apps in here, and also that we create -%% fake plugins for missing dependencies. +%% Make sure we don't list OTP apps in here, and also that we detect +%% missing dependencies. ensure_dependencies(Plugins) -> Names = plugin_names(Plugins), NotThere = [Dep || #plugin{dependencies = Deps} <- Plugins, Dep <- Deps, not lists:member(Dep, Names)], - {OTP, Missing} = lists:partition(fun is_loadable/1, NotThere), - Plugins1 = [P#plugin{dependencies = Deps -- OTP} - || P = #plugin{dependencies = Deps} <- Plugins], - Fake = [#plugin{name = Name, - dependencies = []}|| Name <- Missing], - Plugins1 ++ Fake. + {OTP, Missing} = lists:partition(fun is_loadable/1, lists:usort(NotThere)), + case Missing of + [] -> ok; + _ -> Blame = [Name || #plugin{name = Name, + dependencies = Deps} <- Plugins, + lists:any(fun (Dep) -> + lists:member(Dep, Missing) + end, Deps)], + throw({error, {missing_dependencies, Missing, Blame}}) + end, + [P#plugin{dependencies = Deps -- OTP} + || P = #plugin{dependencies = Deps} <- Plugins]. is_loadable(App) -> case application:load(App) of @@ -162,13 +168,6 @@ prepare_plugins(Enabled) -> ToUnpack = dependencies(false, Enabled, AllPlugins), ToUnpackPlugins = lookup_plugins(ToUnpack, AllPlugins), - case Enabled -- plugin_names(ToUnpackPlugins) of - [] -> ok; - Missing -> error_logger:warning_msg( - "The following enabled plugins were not found: ~p~n", - [Missing]) - end, - case filelib:ensure_dir(ExpandDir ++ "/") of ok -> ok; {error, E2} -> throw({error, {cannot_create_plugins_expand_dir, diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl index 98418d8cf5..278fcf986d 100644 --- a/src/rabbit_plugins_main.erl +++ b/src/rabbit_plugins_main.erl @@ -44,6 +44,7 @@ [{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]}, {enable, [?OFFLINE_DEF, ?ONLINE_DEF]}, {disable, [?OFFLINE_DEF, ?ONLINE_DEF]}, + {set, [?OFFLINE_DEF, ?ONLINE_DEF]}, {sync, []}]). %%---------------------------------------------------------------------------- @@ -86,6 +87,10 @@ start() -> {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} -> PrintInvalidCommandError(), usage(); + {error, {missing_dependencies, Missing, Blame}} -> + print_error("dependent plugins ~p not found; used by ~p.", + [Missing, Blame]), + rabbit_misc:quit(2); {error, Reason} -> print_error("~p", [Reason]), rabbit_misc:quit(2); @@ -137,18 +142,13 @@ action(enable, Node, ToEnable0, Opts, PluginsFile, PluginsDir) -> ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins), ToEnable = [list_to_atom(Name) || Name <- ToEnable0], Missing = ToEnable -- plugin_names(AllPlugins), + case Missing of + [] -> ok; + _ -> throw({error_string, fmt_missing(Missing)}) + end, NewEnabled = lists:usort(Enabled ++ ToEnable), NewImplicitlyEnabled = rabbit_plugins:dependencies(false, NewEnabled, AllPlugins), - MissingDeps = (NewImplicitlyEnabled -- plugin_names(AllPlugins)) -- Missing, - case {Missing, MissingDeps} of - {[], []} -> ok; - {Miss, []} -> throw({error_string, fmt_missing("plugins", Miss)}); - {[], Miss} -> throw({error_string, fmt_missing("dependencies", Miss)}); - {_, _} -> throw({error_string, - fmt_missing("plugins", Missing) ++ - fmt_missing("dependencies", MissingDeps)}) - end, write_enabled_plugins(PluginsFile, NewEnabled), case NewEnabled -- ImplicitlyEnabled of [] -> io:format("Plugin configuration unchanged.~n"); @@ -158,6 +158,27 @@ action(enable, Node, ToEnable0, Opts, PluginsFile, PluginsDir) -> action_change( Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile); +action(set, Node, ToSet0, Opts, PluginsFile, PluginsDir) -> + ToSet = [list_to_atom(Name) || Name <- ToSet0], + AllPlugins = rabbit_plugins:list(PluginsDir), + Enabled = rabbit_plugins:read_enabled(PluginsFile), + ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins), + Missing = ToSet -- plugin_names(AllPlugins), + case Missing of + [] -> ok; + _ -> throw({error_string, fmt_missing(Missing)}) + end, + NewImplicitlyEnabled = rabbit_plugins:dependencies(false, + ToSet, AllPlugins), + write_enabled_plugins(PluginsFile, ToSet), + case NewImplicitlyEnabled of + [] -> io:format("All plugins are now disabled.~n"); + _ -> print_list("The following plugins are now enabled:", + NewImplicitlyEnabled) + end, + action_change( + Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile); + action(disable, Node, ToDisable0, Opts, PluginsFile, PluginsDir) -> case ToDisable0 of [] -> throw({error_string, "Not enough arguments for 'disable'"}); @@ -226,12 +247,9 @@ format_plugins(Node, Pattern, Opts, PluginsFile, PluginsDir) -> {badrpc, _} -> {"[failed to contact ~s - status not shown]", []}; Active -> {"* = running on ~s", Active} end, - Missing = [#plugin{name = Name, dependencies = []} || - Name <- ((EnabledExplicitly ++ EnabledImplicitly) -- - plugin_names(AvailablePlugins))], {ok, RE} = re:compile(Pattern), Plugins = [ Plugin || - Plugin = #plugin{name = Name} <- AvailablePlugins ++ Missing, + Plugin = #plugin{name = Name} <- AvailablePlugins, re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match, if OnlyEnabled -> lists:member(Name, EnabledExplicitly); OnlyEnabledAll -> lists:member(Name, EnabledExplicitly) or @@ -244,25 +262,23 @@ format_plugins(Node, Pattern, Opts, PluginsFile, PluginsDir) -> case Format of minimal -> ok; _ -> io:format(" Configured: E = explicitly enabled; " - "e = implicitly enabled; ! = missing~n" + "e = implicitly enabled~n" " | Status: ~s~n" " |/~n", [rabbit_misc:format(StatusMsg, [Node])]) end, [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Running, - plugin_names(Missing), Format, MaxWidth) || P <- Plugins1], + Format, MaxWidth) || P <- Plugins1], ok. format_plugin(#plugin{name = Name, version = Version, description = Description, dependencies = Deps}, - EnabledExplicitly, EnabledImplicitly, Running, - Missing, Format, MaxWidth) -> + EnabledExplicitly, EnabledImplicitly, Running, Format, + MaxWidth) -> EnabledGlyph = case {lists:member(Name, EnabledExplicitly), - lists:member(Name, EnabledImplicitly), - lists:member(Name, Missing)} of - {true, false, false} -> "E"; - {false, true, false} -> "e"; - {_, _, true} -> "!"; - _ -> " " + lists:member(Name, EnabledImplicitly)} of + {true, false} -> "E"; + {false, true} -> "e"; + _ -> " " end, RunningGlyph = case lists:member(Name, Running) of true -> "*"; @@ -292,8 +308,8 @@ fmt_list(Header, Plugins) -> lists:flatten( [Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins]]). -fmt_missing(Desc, Missing) -> - fmt_list("The following " ++ Desc ++ " could not be found:", Missing). +fmt_missing(Missing) -> + fmt_list("The following plugins could not be found:", Missing). usort_plugins(Plugins) -> lists:usort(fun plugins_cmp/2, Plugins). @@ -331,7 +347,7 @@ sync(Node, ForceOnline, PluginsFile) -> rpc_call(Node, ForceOnline, rabbit_plugins, ensure, [PluginsFile]). rpc_call(Node, Online, Mod, Fun, Args) -> - io:format("Applying plugin configuration to ~s...", [Node]), + io:format("~nApplying plugin configuration to ~s...", [Node]), case rpc:call(Node, Mod, Fun, Args) of {ok, [], []} -> io:format(" nothing to do.~n", []); @@ -348,10 +364,10 @@ rpc_call(Node, Online, Mod, Fun, Args) -> true -> Error; false -> io:format( " * Could not contact node ~s.~n" - " * Changes will take effect at broker restart.~n" - " * Specify --online for diagnostics and to treat " - "this as a failure.~n" - " * Specify --offline to disable changes to running " + " Changes will take effect at broker restart.~n" + " * Options: --online - fail if broker cannot be " + "contacted.~n" + " --offline - do not try to contact " "broker.~n", [Node]) end; diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 4cc9cd12f1..6a6a4ee680 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -90,9 +90,9 @@ dist_port_set_check() -> {none, none} -> ok; _ -> rabbit_misc:quit(?DIST_PORT_CONFIGURED) end; + {ok, _} -> + ok; {error, _} -> - %% TODO can we present errors more nicely here - %% than after -config has failed? ok end end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 906c4b6e2f..db6d1eb02a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -27,7 +27,6 @@ -export([conserve_resources/3, server_properties/1]). --define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 30). -define(CHANNEL_TERMINATION_TIMEOUT, 3). @@ -189,10 +188,10 @@ server_capabilities(_) -> log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args). socket_error(Reason) when is_atom(Reason) -> - log(error, "error on AMQP connection ~p: ~s~n", + log(error, "Error on AMQP connection ~p: ~s~n", [self(), rabbit_misc:format_inet_error(Reason)]); socket_error(Reason) -> - log(error, "error on AMQP connection ~p:~n~p~n", [self(), Reason]). + log(error, "Error on AMQP connection ~p:~n~p~n", [self(), Reason]). inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). @@ -216,8 +215,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> exit(normal) end, log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]), + {ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout), ClientSock = socket_op(Sock, SockTransform), - erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), + erlang:send_after(HandshakeTimeout, self(), handshake_timeout), {PeerHost, PeerPort, Host, Port} = socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end), ?store_proc_name(list_to_binary(Name)), @@ -231,7 +231,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> peer_port = PeerPort, protocol = none, user = none, - timeout_sec = ?HANDSHAKE_TIMEOUT, + timeout_sec = (HandshakeTimeout / 1000), frame_max = ?FRAME_MIN_SIZE, vhost = none, client_properties = none, @@ -548,21 +548,27 @@ wait_for_channel_termination(0, TimerRef, State) -> end; _ -> State end; -wait_for_channel_termination(N, TimerRef, State) -> +wait_for_channel_termination(N, TimerRef, + State = #v1{connection_state = CS, + connection = #connection{ + name = ConnName, + user = User, + vhost = VHost}}) -> receive {'DOWN', _MRef, process, ChPid, Reason} -> {Channel, State1} = channel_cleanup(ChPid, State), case {Channel, termination_kind(Reason)} of - {undefined, _} -> exit({abnormal_dependent_exit, - ChPid, Reason}); - {_, controlled} -> wait_for_channel_termination( - N-1, TimerRef, State1); - {_, uncontrolled} -> log(error, - "AMQP connection ~p, channel ~p - " - "error while terminating:~n~p~n", - [self(), Channel, Reason]), - wait_for_channel_termination( - N-1, TimerRef, State1) + {undefined, _} -> + exit({abnormal_dependent_exit, ChPid, Reason}); + {_, controlled} -> + wait_for_channel_termination(N-1, TimerRef, State1); + {_, uncontrolled} -> + log(error, "Error on AMQP connection ~p (~s, vhost: '~s'," + " user: '~s', state: ~p), channel ~p:" + "error while terminating:~n~p~n", + [self(), ConnName, VHost, User#user.username, + CS, Channel, Reason]), + wait_for_channel_termination(N-1, TimerRef, State1) end; cancel_wait -> exit(channel_termination_timeout) @@ -581,16 +587,24 @@ maybe_close(State) -> termination_kind(normal) -> controlled; termination_kind(_) -> uncontrolled. +log_hard_error(#v1{connection_state = CS, + connection = #connection{ + name = ConnName, + user = User, + vhost = VHost}}, Channel, Reason) -> + log(error, + "Error on AMQP connection ~p (~s, vhost: '~s'," + " user: '~s', state: ~p), channel ~p:~n~p~n", + [self(), ConnName, VHost, User#user.username, CS, Channel, Reason]). + handle_exception(State = #v1{connection_state = closed}, Channel, Reason) -> - log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n", - [self(), closed, Channel, Reason]), + log_hard_error(State, Channel, Reason), State; handle_exception(State = #v1{connection = #connection{protocol = Protocol}, connection_state = CS}, Channel, Reason) when ?IS_RUNNING(State) orelse CS =:= closing -> - log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n", - [self(), CS, Channel, Reason]), + log_hard_error(State, Channel, Reason), {0, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), State1 = close_connection(terminate_channels(State)), diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl index da75932d0e..47c77cd0f7 100644 --- a/src/rabbit_table.erl +++ b/src/rabbit_table.erl @@ -70,7 +70,8 @@ wait_for_replicated() -> not lists:member({local_content, true}, TabDef)]). wait(TableNames) -> - case mnesia:wait_for_tables(TableNames, 30000) of + {ok, Timeout} = application:get_env(rabbit, mnesia_table_loading_timeout), + case mnesia:wait_for_tables(TableNames, Timeout) of ok -> ok; {timeout, BadTabs} -> diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index b6d378525e..1104f3731a 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -48,6 +48,7 @@ -rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}). -rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}). -rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}). +-rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}). %% ------------------------------------------------------------------- @@ -77,6 +78,8 @@ -spec(policy_apply_to/0 :: () -> 'ok'). -spec(queue_decorators/0 :: () -> 'ok'). -spec(internal_system_x/0 :: () -> 'ok'). +-spec(cluster_name/0 :: () -> 'ok'). +-spec(down_slave_nodes/0 :: () -> 'ok'). -endif. @@ -382,6 +385,21 @@ cluster_name_tx() -> [mnesia:delete(T, K, write) || K <- Ks], ok. +down_slave_nodes() -> + ok = down_slave_nodes(rabbit_queue), + ok = down_slave_nodes(rabbit_durable_queue). + +down_slave_nodes(Table) -> + transform( + Table, + fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, Policy, GmPids, Decorators}) -> + {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, [], Policy, GmPids, Decorators} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> |
