diff options
| -rw-r--r-- | Makefile | 4 | ||||
| -rw-r--r-- | docs/rabbitmqctl.1.xml | 10 | ||||
| -rw-r--r-- | include/rabbit_cli.hrl | 2 | ||||
| -rw-r--r-- | scripts/rabbitmq-defaults | 2 | ||||
| -rw-r--r-- | scripts/rabbitmq-defaults.bat | 37 | ||||
| -rw-r--r-- | scripts/rabbitmq-echopid.bat | 14 | ||||
| -rwxr-xr-x | scripts/rabbitmq-env | 2 | ||||
| -rw-r--r-- | scripts/rabbitmq-env.bat | 286 | ||||
| -rwxr-xr-x | scripts/rabbitmq-plugins.bat | 23 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server | 2 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server.bat | 79 | ||||
| -rwxr-xr-x | scripts/rabbitmq-service.bat | 85 | ||||
| -rwxr-xr-x | scripts/rabbitmqctl.bat | 38 | ||||
| -rw-r--r-- | src/file_handle_cache.erl | 46 | ||||
| -rw-r--r-- | src/gen_server2.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 38 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_cli.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_control_main.erl | 271 | ||||
| -rw-r--r-- | src/rabbit_log.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 16 | ||||
| -rw-r--r-- | src/supervisor2.erl | 2 |
22 files changed, 659 insertions, 333 deletions
@@ -29,6 +29,9 @@ USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML))) ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes) PYTHON=python else +ifeq ($(shell python2.7 -c 'import json' 2>/dev/null && echo yes),yes) +PYTHON=python2.7 +else ifeq ($(shell python2.6 -c 'import simplejson' 2>/dev/null && echo yes),yes) PYTHON=python2.6 else @@ -40,6 +43,7 @@ PYTHON=python endif endif endif +endif BASIC_PLT=basic.plt RABBIT_PLT=rabbit.plt diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 40d8978e9b..854dd277b4 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -41,6 +41,7 @@ <cmdsynopsis> <command>rabbitmqctl</command> <arg choice="opt">-n <replaceable>node</replaceable></arg> + <arg choice="opt">-t <replaceable>timeout</replaceable></arg> <arg choice="opt">-q</arg> <arg choice="req"><replaceable>command</replaceable></arg> <arg choice="opt" rep="repeat"><replaceable>command options</replaceable></arg> @@ -92,6 +93,15 @@ </para> </listitem> </varlistentry> + <varlistentry> + <term><cmdsynopsis><arg choice="opt">-t <replaceable>timeout</replaceable></arg></cmdsynopsis></term> + <listitem> + <para role="usage"> + Operation timeout in seconds. Only applicable to "list" commands. + Default is "infinity". + </para> + </listitem> + </varlistentry> </variablelist> </refsect1> diff --git a/include/rabbit_cli.hrl b/include/rabbit_cli.hrl index 58c5a3a99e..1bffc9a604 100644 --- a/include/rabbit_cli.hrl +++ b/include/rabbit_cli.hrl @@ -17,6 +17,7 @@ -define(NODE_OPT, "-n"). -define(QUIET_OPT, "-q"). -define(VHOST_OPT, "-p"). +-define(TIMEOUT_OPT, "-t"). -define(VERBOSE_OPT, "-v"). -define(MINIMAL_OPT, "-m"). @@ -33,6 +34,7 @@ -define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}). -define(QUIET_DEF, {?QUIET_OPT, flag}). -define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}). +-define(TIMEOUT_DEF, {?TIMEOUT_OPT, {option, "infinity"}}). -define(VERBOSE_DEF, {?VERBOSE_OPT, flag}). -define(MINIMAL_DEF, {?MINIMAL_OPT, flag}). diff --git a/scripts/rabbitmq-defaults b/scripts/rabbitmq-defaults index 1cf9152afb..9ee5127f5a 100644 --- a/scripts/rabbitmq-defaults +++ b/scripts/rabbitmq-defaults @@ -26,6 +26,8 @@ SASL_BOOT_FILE=start_sasl ## Set default values +BOOT_MODULE="rabbit" + CONFIG_FILE=${SYS_PREFIX}/etc/rabbitmq/rabbitmq LOG_BASE=${SYS_PREFIX}/var/log/rabbitmq MNESIA_BASE=${SYS_PREFIX}/var/lib/rabbitmq/mnesia diff --git a/scripts/rabbitmq-defaults.bat b/scripts/rabbitmq-defaults.bat new file mode 100644 index 0000000000..d3983f22cf --- /dev/null +++ b/scripts/rabbitmq-defaults.bat @@ -0,0 +1,37 @@ +@echo off + +REM ### next line potentially updated in package install steps +REM set SYS_PREFIX= + +REM ### next line will be updated when generating a standalone release +REM ERL_DIR= +set ERL_DIR= + +REM These boot files don't appear to be referenced in the batch scripts +REM set CLEAN_BOOT_FILE=start_clean +REM set SASL_BOOT_FILE=start_sasl + +REM ## Set default values + +if "!RABBITMQ_BASE!"=="" ( + set RABBITMQ_BASE=!APPDATA!\RabbitMQ +) + +REM BOOT_MODULE="rabbit" +REM CONFIG_FILE=${SYS_PREFIX}/etc/rabbitmq/rabbitmq +REM LOG_BASE=${SYS_PREFIX}/var/log/rabbitmq +REM MNESIA_BASE=${SYS_PREFIX}/var/lib/rabbitmq/mnesia +REM ENABLED_PLUGINS_FILE=${SYS_PREFIX}/etc/rabbitmq/enabled_plugins +set BOOT_MODULE=rabbit +set CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq +set LOG_BASE=!RABBITMQ_BASE!\log +set MNESIA_BASE=!RABBITMQ_BASE!\db +set ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins + +REM PLUGINS_DIR="${RABBITMQ_HOME}/plugins" +set PLUGINS_DIR=!TDP0!..\plugins + +REM CONF_ENV_FILE=${SYS_PREFIX}/etc/rabbitmq/rabbitmq-env.conf +if "!RABBITMQ_CONF_ENV_FILE!"=="" ( + set CONF_ENV_FILE=!APPDATA!\RabbitMQ\rabbitmq-env-conf.bat +) diff --git a/scripts/rabbitmq-echopid.bat b/scripts/rabbitmq-echopid.bat index 2d3a6fa4e2..7c3d8a1f6b 100644 --- a/scripts/rabbitmq-echopid.bat +++ b/scripts/rabbitmq-echopid.bat @@ -6,6 +6,10 @@ REM <rabbitmq_nodename> (s)name of the erlang node to connect to (required) setlocal +REM Get default settings with user overrides for (RABBITMQ_)<var_name> +REM Non-empty defaults should be set in rabbitmq-env +call "%cd%\rabbitmq-env.bat" + if "%1"=="" goto fail :: set timeout vars :: @@ -18,16 +22,6 @@ if not exist "%WMIC_PATH%" ( goto fail ) -:: sets sname/name :: -if "!RABBITMQ_USE_LONGNAME!"=="" ( - set RABBITMQ_NAME_TYPE="-sname" -) - -if "!RABBITMQ_USE_LONGNAME!"=="true" ( - set RABBITMQ_NAME_TYPE="-name" -) - - :getpid for /f "usebackq tokens=* skip=1" %%P IN (`%%WMIC_PATH%% process where "name='erl.exe' and commandline like '%%%RABBITMQ_NAME_TYPE% %1%%'" get processid 2^>nul`) do ( set PID=%%P diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env index c37f3e2b48..30de121f9a 100755 --- a/scripts/rabbitmq-env +++ b/scripts/rabbitmq-env @@ -155,6 +155,8 @@ DEFAULT_NODE_PORT=5672 [ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${PID_FILE} [ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${RABBITMQ_MNESIA_DIR}.pid +[ "x" = "x$RABBITMQ_BOOT_MODULE" ] && RABBITMQ_BOOT_MODULE=${BOOT_MODULE} + [ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR} [ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand diff --git a/scripts/rabbitmq-env.bat b/scripts/rabbitmq-env.bat new file mode 100644 index 0000000000..77bcd65b73 --- /dev/null +++ b/scripts/rabbitmq-env.bat @@ -0,0 +1,286 @@ +@echo off + +REM Scopes the variables to the current batch file +REM setlocal + +rem Preserve values that might contain exclamation marks before +rem enabling delayed expansion +set TDP0=%~dp0 +set STAR=%* +REM setlocal enabledelayedexpansion + +REM # Determine where this script is really located (if this script is +REM # invoked from another script, this is the location of the caller) +REM SCRIPT_PATH="$0" +REM while [ -h "$SCRIPT_PATH" ] ; do +REM # Determine if readlink -f is supported at all. TODO clean this up. +REM FULL_PATH=`readlink -f $SCRIPT_PATH 2>/dev/null` +REM if [ "$?" != "0" ]; then +REM REL_PATH=`readlink $SCRIPT_PATH` +REM if expr "$REL_PATH" : '/.*' > /dev/null; then +REM SCRIPT_PATH="$REL_PATH" +REM else +REM SCRIPT_PATH="`dirname "$SCRIPT_PATH"`/$REL_PATH" +REM fi +REM else +REM SCRIPT_PATH=$FULL_PATH +REM fi +REM done +REM set -e + +REM SCRIPT_DIR=`dirname $SCRIPT_PATH` +REM RABBITMQ_HOME="${SCRIPT_DIR}/.." +set SCRIPT_DIR=%TDP0% +set RABBITMQ_HOME=%SCRIPT_DIR%.. + +REM ## Set defaults +REM . ${SCRIPT_DIR}/rabbitmq-defaults +call "%SCRIPT_DIR%\rabbitmq-defaults.bat" + +REM These common defaults aren't referenced in the batch scripts +REM ## Common defaults +REM SERVER_ERL_ARGS="+K true +A30 +P 1048576 \ +REM -kernel inet_default_connect_options [{nodelay,true}]" +REM +REM # warn about old rabbitmq.conf file, if no new one +REM if [ -f /etc/rabbitmq/rabbitmq.conf ] && \ +REM [ ! -f ${CONF_ENV_FILE} ] ; then +REM echo -n "WARNING: ignoring /etc/rabbitmq/rabbitmq.conf -- " +REM echo "location has moved to ${CONF_ENV_FILE}" +REM fi + +REM ERL_ARGS aren't referenced in the batch scripts +REM Common defaults +REM set SERVER_ERL_ARGS=+A30 ^ +REM +P 1048576 ^ +REM -kernel inet_default_connect_options "[{nodelay, true}]" ^ + +REM ## Get configuration variables from the configure environment file +REM [ -f ${CONF_ENV_FILE} ] && . ${CONF_ENV_FILE} || true +if exist "!RABBITMQ_CONF_ENV_FILE!" ( + call !RABBITMQ_CONF_ENV_FILE! +) + +REM [ "x" = "x$RABBITMQ_USE_LONGNAME" ] && RABBITMQ_USE_LONGNAME=${USE_LONGNAME} +REM if [ "xtrue" = "x$RABBITMQ_USE_LONGNAME" ] ; then +REM RABBITMQ_NAME_TYPE=-name +REM [ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -f` +REM [ "x" = "x$NODENAME" ] && NODENAME=rabbit@${HOSTNAME} +REM else +REM RABBITMQ_NAME_TYPE=-sname +REM [ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname` +REM [ "x" = "x$NODENAME" ] && NODENAME=rabbit@${HOSTNAME%%.*} +REM fi + +REM Check for the short names here too +if "!RABBITMQ_USE_LONGNAME!"=="" ( + if "!USE_LONGNAME!"=="" ( + set RABBITMQ_NAME_TYPE="-sname" + ) +) + +if "!RABBITMQ_USE_LONGNAME!"=="true" ( + if "!USE_LONGNAME!"=="true" ( + set RABBITMQ_NAME_TYPE="-name" + ) +) + +if "!COMPUTERNAME!"=="" ( + set COMPUTERNAME=localhost +) + +REM [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} +if "!RABBITMQ_NODENAME!"=="" ( + if "!NODENAME!"=="" ( + set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME! + ) else ( + set RABBITMQ_NODENAME=!NODENAME! + ) +) + +REM +REM ##--- Set environment vars RABBITMQ_<var_name> to defaults if not set +REM +REM DEFAULT_NODE_IP_ADDRESS=auto +REM DEFAULT_NODE_PORT=5672 +REM [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} +REM [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} +REM [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS} +REM [ "x" != "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT} + +REM if "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( +REM if not "!RABBITMQ_NODE_PORT!"=="" ( +REM set RABBITMQ_NODE_IP_ADDRESS=auto +REM ) +REM ) else ( +REM if "!RABBITMQ_NODE_PORT!"=="" ( +REM set RABBITMQ_NODE_PORT=5672 +REM ) +REM ) + +REM DOUBLE CHECK THIS LOGIC +if "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( + if "!NODE_IP_ADDRESS!"=="" ( + set RABBITMQ_NODE_IP_ADDRESS=auto + ) else ( + set RABBITMQ_NODE_IP_ADDRESS=!NODE_IP_ADDRESS! + ) +) + +if "!RABBITMQ_NODE_PORT!"=="" ( + if "!NODE_PORT!"=="" ( + set RABBITMQ_NODE_PORT=5672 + ) else ( + set RABBITMQ_NODE_PORT=!NODE_PORT! + ) +) + +REM [ "x" = "x$RABBITMQ_DIST_PORT" ] && RABBITMQ_DIST_PORT=${DIST_PORT} +REM [ "x" = "x$RABBITMQ_DIST_PORT" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_DIST_PORT=$((${DEFAULT_NODE_PORT} + 20000)) +REM [ "x" = "x$RABBITMQ_DIST_PORT" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_DIST_PORT=$((${RABBITMQ_NODE_PORT} + 20000)) + +if "!RABBITMQ_DIST_PORT!"=="" ( + if "!DIST_PORT!"=="" ( + if "!RABBITMQ_NODE_PORT!"=="" ( + set RABBITMQ_DIST_PORT=25672 + ) else ( + set /a RABBITMQ_DIST_PORT=20000+!RABBITMQ_NODE_PORT! + ) + ) else ( + set RABBITMQ_DIST_PORT=!DIST_PORT! + ) +) + +REM [ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS} +REM No Windows equivalent + +REM [ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE} +if "!RABBITMQ_CONFIG_FILE!"=="" ( + if "!CONFIG_FILE!"=="" ( + set RABBITMQ_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq + ) else ( + set RABBITMQ_CONFIG_FILE=!CONFIG_FILE! + ) +) + +REM [ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE} +if "!RABBITMQ_LOG_BASE!"=="" ( + if "!LOG_BASE!"=="" ( + set RABBITMQ_LOG_BASE=!RABBITMQ_BASE!\log + ) else ( + set RABBITMQ_LOG_BASE=!LOG_BASE! + ) +) + +REM [ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE} +if "!RABBITMQ_MNESIA_BASE!"=="" ( + if "!MNESIA_BASE!"=="" ( + set RABBITMQ_MNESIA_BASE=!RABBITMQ_BASE!\db + ) else ( + set RABBITMQ_MNESIA_BASE=!MNESIA_BASE! + ) +) + +REM [ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS} +REM No Windows equivalent + +REM [ "x" = "x$RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS" ] && RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=${SERVER_ADDITIONAL_ERL_ARGS} +REM No Windows equivalent + +REM [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR} +REM [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME} +if "!RABBITMQ_MNESIA_DIR!"=="" ( + if "!MNESIA_DIR!"=="" ( + set RABBITMQ_MNESIA_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-mnesia + ) else ( + set RABBITMQ_MNESIA_DIR=!MNESIA_DIR! + ) +) + +REM [ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${PID_FILE} +REM [ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${RABBITMQ_MNESIA_DIR}.pid +REM No Windows equivalent + +REM [ "x" = "x$RABBITMQ_BOOT_MODULE" ] && RABBITMQ_BOOT_MODULE=${BOOT_MODULE} +if "!RABBITMQ_BOOT_MODULE!"=="" ( + if "!BOOT_MODULE!"=="" ( + set RABBITMQ_BOOT_MODULE=rabbit + ) else ( + set RABBITMQ_BOOT_MODULE=!BOOT_MODULE! + ) +) + +REM [ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR} +REM [ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand +if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" ( + if "!PLUGINS_EXPAND_DIR!"=="" ( + set RABBITMQ_PLUGINS_EXPAND_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-plugins-expand + ) else ( + set RABBITMQ_PLUGINS_EXPAND_DIR=!PLUGINS_EXPAND_DIR! + ) +) + +REM [ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE} +if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" ( + if "!ENABLED_PLUGINS_FILE!"=="" ( + set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins + ) else ( + set RABBITMQ_ENABLED_PLUGINS_FILE=!ENABLED_PLUGINS_FILE! + ) +) + +REM [ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR} +if "!RABBITMQ_PLUGINS_DIR!"=="" ( + if "!PLUGINS_DIR!"=="" ( + set RABBITMQ_PLUGINS_DIR=!RABBITMQ_BASE!\plugins + ) else ( + set RABBITMQ_PLUGINS_DIR=!PLUGINS_DIR! + ) +) + +REM ## Log rotation +REM [ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS} +REM [ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}.log" +if "!RABBITMQ_LOGS!"=="" ( + if "!LOGS!"=="" ( + set LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!.log + ) else ( + set LOGS=!LOGS! + ) +) + +REM [ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS=${SASL_LOGS} +REM [ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}-sasl.log" +if "!RABBITMQ_SASL_LOGS!"=="" ( + if "!SASL_LOGS!"=="" ( + set SASL_LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!-sasl.log + ) else ( + set SASL_LOGS=!SASL_LOGS! + ) +) + +REM [ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS} +if "!$RABBITMQ_CTL_ERL_ARGS!"=="" ( + if not "!CTL_ERL_ARGS!"=="" ( + set RABBITMQ_CTL_ERL_ARGS=!CTL_ERL_ARGS! + ) +) + +REM ADDITIONAL WINDOWS ONLY CONFIG ITEMS +REM rabbitmq-plugins.bat +REM if "!RABBITMQ_SERVICENAME!"=="" ( +REM set RABBITMQ_SERVICENAME=RabbitMQ +REM ) + +if "!RABBITMQ_SERVICENAME!"=="" ( + if "!SERVICENAME!"=="" ( + set RABBITMQ_SERVICENAME=RabbitMQ + ) else ( + set RABBITMQ_SERVICENAME=!SERVICENAME! + ) +) + +REM ##--- End of overridden <var_name> variables +REM +REM # Since we source this elsewhere, don't accidentally stop execution +REM true diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat index c8e15ad24c..fe7d5c64a9 100755 --- a/scripts/rabbitmq-plugins.bat +++ b/scripts/rabbitmq-plugins.bat @@ -1,4 +1,5 @@ @echo off
+
REM The contents of this file are subject to the Mozilla Public License
REM Version 1.1 (the "License"); you may not use this file except in
REM compliance with the License. You may obtain a copy of the License
@@ -23,17 +24,9 @@ set TDP0=%~dp0 set STAR=%*
setlocal enabledelayedexpansion
-if "!RABBITMQ_SERVICENAME!"=="" (
- set RABBITMQ_SERVICENAME=RabbitMQ
-)
-
-if "!RABBITMQ_BASE!"=="" (
- set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
-)
-
-if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
-)
+REM Get default settings with user overrides for (RABBITMQ_)<var_name>
+REM Non-empty defaults should be set in rabbitmq-env
+call "%cd%\rabbitmq-env.bat"
if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
@@ -47,14 +40,6 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" ( exit /B 1
)
-if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
- set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
-)
-
-if "!RABBITMQ_PLUGINS_DIR!"=="" (
- set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
-)
-
"!ERLANG_HOME!\bin\erl.exe" ^
-pa "!TDP0!..\ebin" ^
-noinput ^
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index a6f4be9902..bd72ba6caa 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -21,7 +21,7 @@ RABBITMQ_START_RABBIT= [ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT=" -noinput" -[ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT="$RABBITMQ_START_RABBIT -s rabbit boot " +[ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT="$RABBITMQ_START_RABBIT -s $RABBITMQ_BOOT_MODULE boot " case "$(uname -s)" in CYGWIN*) # we make no attempt to record the cygwin pid; rabbitmqctl wait diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 5e9e11d0f8..fdc38c726d 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -23,43 +23,9 @@ set TDP0=%~dp0 set STAR=%*
setlocal enabledelayedexpansion
-if "!RABBITMQ_USE_LONGNAME!"=="" (
- set RABBITMQ_NAME_TYPE="-sname"
-)
-
-if "!RABBITMQ_USE_LONGNAME!"=="true" (
- set RABBITMQ_NAME_TYPE="-name"
-)
-
-if "!RABBITMQ_BASE!"=="" (
- set RABBITMQ_BASE=!APPDATA!\RabbitMQ
-)
-
-if "!COMPUTERNAME!"=="" (
- set COMPUTERNAME=localhost
-)
-
-if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
-)
-
-if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
- if not "!RABBITMQ_NODE_PORT!"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=auto
- )
-) else (
- if "!RABBITMQ_NODE_PORT!"=="" (
- set RABBITMQ_NODE_PORT=5672
- )
-)
-
-if "!RABBITMQ_DIST_PORT!"=="" (
- if "!RABBITMQ_NODE_PORT!"=="" (
- set RABBITMQ_DIST_PORT=25672
- ) else (
- set /a RABBITMQ_DIST_PORT=20000+!RABBITMQ_NODE_PORT!
- )
-)
+REM Get default settings with user overrides for (RABBITMQ_)<var_name>
+REM Non-empty defaults should be set in rabbitmq-env
+call "%TDP0%\rabbitmq-env.bat"
if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
@@ -73,39 +39,6 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" ( exit /B 1
)
-if "!RABBITMQ_MNESIA_BASE!"=="" (
- set RABBITMQ_MNESIA_BASE=!RABBITMQ_BASE!/db
-)
-if "!RABBITMQ_LOG_BASE!"=="" (
- set RABBITMQ_LOG_BASE=!RABBITMQ_BASE!/log
-)
-
-
-rem We save the previous logs in their respective backup
-rem Log management (rotation, filtering based of size...) is left as an exercice for the user.
-
-set LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!.log
-set SASL_LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!-sasl.log
-
-rem End of log management
-
-
-if "!RABBITMQ_MNESIA_DIR!"=="" (
- set RABBITMQ_MNESIA_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-mnesia
-)
-
-if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
- set RABBITMQ_PLUGINS_EXPAND_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-plugins-expand
-)
-
-if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
- set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
-)
-
-if "!RABBITMQ_PLUGINS_DIR!"=="" (
- set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
-)
-
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
"!ERLANG_HOME!\bin\erl.exe" ^
@@ -125,10 +58,6 @@ if ERRORLEVEL 2 ( set RABBITMQ_EBIN_PATH="-pa !RABBITMQ_EBIN_ROOT!"
-if "!RABBITMQ_CONFIG_FILE!"=="" (
- set RABBITMQ_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq
-)
-
if exist "!RABBITMQ_CONFIG_FILE!.config" (
set RABBITMQ_CONFIG_ARG=-config "!RABBITMQ_CONFIG_FILE!"
) else (
@@ -144,7 +73,7 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( set RABBITMQ_START_RABBIT=
if "!RABBITMQ_NODE_ONLY!"=="" (
- set RABBITMQ_START_RABBIT=-s rabbit boot
+ set RABBITMQ_START_RABBIT=-s "!RABBITMQ_BOOT_MODULE!" boot
)
if "!RABBITMQ_IO_THREAD_POOL_SIZE!"=="" {
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 57dda9dd35..da87ca94e1 100755 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -24,6 +24,10 @@ set TDP0=%~dp0 set P1=%1
setlocal enabledelayedexpansion
+REM Get default settings with user overrides for (RABBITMQ_)<var_name>
+REM Non-empty defaults should be set in rabbitmq-env
+call "%cd%\rabbitmq-env.bat"
+
set STARVAR=
shift
:loop1
@@ -33,48 +37,6 @@ if "%1"=="" goto after_loop goto loop1
:after_loop
-if "!RABBITMQ_USE_LONGNAME!"=="" (
- set RABBITMQ_NAME_TYPE="-sname"
-)
-
-if "!RABBITMQ_USE_LONGNAME!"=="true" (
- set RABBITMQ_NAME_TYPE="-name"
-)
-
-if "!RABBITMQ_SERVICENAME!"=="" (
- set RABBITMQ_SERVICENAME=RabbitMQ
-)
-
-if "!RABBITMQ_BASE!"=="" (
- set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
-)
-
-if "!COMPUTERNAME!"=="" (
- set COMPUTERNAME=localhost
-)
-
-if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
-)
-
-if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
- if not "!RABBITMQ_NODE_PORT!"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=auto
- )
-) else (
- if "!RABBITMQ_NODE_PORT!"=="" (
- set RABBITMQ_NODE_PORT=5672
- )
-)
-
-if "!RABBITMQ_DIST_PORT!"=="" (
- if "!RABBITMQ_NODE_PORT!"=="" (
- set RABBITMQ_DIST_PORT=25672
- ) else (
- set /a RABBITMQ_DIST_PORT=20000+!RABBITMQ_NODE_PORT!
- )
-)
-
if "!ERLANG_SERVICE_MANAGER_PATH!"=="" (
if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
@@ -113,31 +75,6 @@ if not exist "!ERLANG_SERVICE_MANAGER_PATH!\erlsrv.exe" ( exit /B 1
)
-if "!RABBITMQ_MNESIA_BASE!"=="" (
- set RABBITMQ_MNESIA_BASE=!RABBITMQ_BASE!/db
-)
-if "!RABBITMQ_LOG_BASE!"=="" (
- set RABBITMQ_LOG_BASE=!RABBITMQ_BASE!/log
-)
-
-
-rem We save the previous logs in their respective backup
-rem Log management (rotation, filtering based on size...) is left as an exercise for the user.
-
-set LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!.log
-set SASL_LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!-sasl.log
-
-rem End of log management
-
-
-if "!RABBITMQ_MNESIA_DIR!"=="" (
- set RABBITMQ_MNESIA_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-mnesia
-)
-
-if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
- set RABBITMQ_PLUGINS_EXPAND_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-plugins-expand
-)
-
if "!P1!" == "install" goto INSTALL_SERVICE
for %%i in (start stop disable enable list remove) do if "%%i" == "!P1!" goto MODIFY_SERVICE
@@ -174,20 +111,8 @@ if errorlevel 1 ( echo !RABBITMQ_SERVICENAME! service is already present - only updating service parameters
)
-if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
- set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
-)
-
-if "!RABBITMQ_PLUGINS_DIR!"=="" (
- set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
-)
-
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-if "!RABBITMQ_CONFIG_FILE!"=="" (
- set RABBITMQ_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq
-)
-
"!ERLANG_HOME!\bin\erl.exe" ^
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
@@ -230,8 +155,8 @@ if "!RABBITMQ_IO_THREAD_POOL_SIZE!"=="" { set ERLANG_SERVICE_ARGUMENTS= ^
-pa "!RABBITMQ_EBIN_ROOT!" ^
+-boot start_sasl ^
!RABBITMQ_START_RABBIT! ^
--s rabbit boot ^
!RABBITMQ_CONFIG_ARG! ^
+W w ^
+A "!RABBITMQ_IO_THREAD_POOL_SIZE!" ^
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index 38a00c7c5c..ef9b13c7ae 100755 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -15,6 +15,7 @@ REM The Initial Developer of the Original Code is GoPivotal, Inc. REM Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
REM
+REM Scopes the variables to the current batch file
setlocal
rem Preserve values that might contain exclamation marks before
@@ -23,38 +24,11 @@ set TDP0=%~dp0 set STAR=%*
setlocal enabledelayedexpansion
-if "!RABBITMQ_BASE!"=="" (
- set RABBITMQ_BASE=!APPDATA!\RabbitMQ
-)
-
-if "!COMPUTERNAME!"=="" (
- set COMPUTERNAME=localhost
-)
-
-if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
-)
-
-if "!RABBITMQ_MNESIA_BASE!"=="" (
- set RABBITMQ_MNESIA_BASE=!RABBITMQ_BASE!/db
-)
-
-if "!RABBITMQ_MNESIA_DIR!"=="" (
- set RABBITMQ_MNESIA_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-mnesia
-)
-
-if not exist "!ERLANG_HOME!\bin\erl.exe" (
- echo.
- echo ******************************
- echo ERLANG_HOME not set correctly.
- echo ******************************
- echo.
- echo Please either set ERLANG_HOME to point to your Erlang installation or place the
- echo RabbitMQ server distribution in the Erlang lib folder.
- echo.
- exit /B 1
-)
+REM Get default settings with user overrides for (RABBITMQ_)<var_name>
+REM Non-empty defaults should be set in rabbitmq-env
+call "%cd%\rabbitmq-env.bat"
+REM Uncomment this later, just for testing now
"!ERLANG_HOME!\bin\erl.exe" ^
-pa "!TDP0!..\ebin" ^
-noinput ^
@@ -67,4 +41,4 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" ( -extra !STAR!
endlocal
-endlocal
+endlocal
\ No newline at end of file diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index b323842ba0..8be19e5be3 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -148,7 +148,7 @@ copy/3, set_maximum_since_use/1, delete/1, clear/1]). -export([obtain/0, obtain/1, release/0, release/1, transfer/1, transfer/2, set_limit/1, get_limit/0, info_keys/0, with_handle/1, with_handle/2, - info/0, info/1]). + info/0, info/1, clear_read_cache/0]). -export([ulimit/0]). -export([start_link/0, start_link/2, init/1, handle_call/3, handle_cast/2, @@ -164,6 +164,8 @@ -define(CLIENT_ETS_TABLE, file_handle_cache_client). -define(ELDERS_ETS_TABLE, file_handle_cache_elders). +-include("rabbit.hrl"). % For #amqqueue record definition. + %%---------------------------------------------------------------------------- -record(file, @@ -581,6 +583,42 @@ info_keys() -> ?INFO_KEYS. info() -> info(?INFO_KEYS). info(Items) -> gen_server2:call(?SERVER, {info, Items}, infinity). +clear_read_cache() -> + gen_server2:cast(?SERVER, clear_read_cache), + clear_vhost_read_cache(rabbit_vhost:list()). + +clear_vhost_read_cache([]) -> + ok; +clear_vhost_read_cache([VHost | Rest]) -> + clear_queue_read_cache(rabbit_amqqueue:list(VHost)), + clear_vhost_read_cache(Rest). + +clear_queue_read_cache([]) -> + ok; +clear_queue_read_cache([#amqqueue{pid = MPid, slave_pids = SPids} | Rest]) -> + %% Limit the action to the current node. + Pids = [P || P <- [MPid | SPids], node(P) =:= node()], + %% This function is executed in the context of the backing queue + %% process because the read buffer is stored in the process + %% dictionary. + Fun = fun(_, State) -> + clear_process_read_cache(), + State + end, + [rabbit_amqqueue:run_backing_queue(Pid, rabbit_variable_queue, Fun) + || Pid <- Pids], + clear_queue_read_cache(Rest). + +clear_process_read_cache() -> + [ + begin + Handle1 = reset_read_buffer(Handle), + put({Ref, fhc_handle}, Handle1) + end || + {{Ref, fhc_handle}, Handle} <- get(), + size(Handle#handle.read_buffer) > 0 + ]. + %%---------------------------------------------------------------------------- %% Internal functions %%---------------------------------------------------------------------------- @@ -1147,7 +1185,11 @@ handle_cast({transfer, N, FromPid, ToPid}, State) -> {noreply, process_pending( update_counts({obtain, socket}, ToPid, +N, update_counts({obtain, socket}, FromPid, -N, - State)))}. + State)))}; + +handle_cast(clear_read_cache, State) -> + clear_process_read_cache(), + {noreply, State}. handle_info(check_counts, State) -> {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}; diff --git a/src/gen_server2.erl b/src/gen_server2.erl index d2f96b5250..fd0e6553b5 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -576,10 +576,11 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) -> {ok, State, Timeout, Backoff = {backoff, _, _, _}, Mod1} -> Backoff1 = extend_backoff(Backoff), proc_lib:init_ack(Starter, {ok, self()}), - loop(GS2State #gs2_state { mod = Mod1, - state = State, - time = Timeout, - timeout_state = Backoff1 }); + loop(find_prioritisers( + GS2State #gs2_state { mod = Mod1, + state = State, + time = Timeout, + timeout_state = Backoff1 })); {stop, Reason} -> %% For consistency, we must make sure that the %% registered name (if any) is unregistered before diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9ce800023f..5bfa006e09 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -782,15 +782,41 @@ on_node_up(Node) -> fun () -> Qs = mnesia:match_object(rabbit_queue, #amqqueue{_ = '_'}, write), - [case lists:member(Node, RSs) of - true -> RSs1 = RSs -- [Node], - store_queue( - Q#amqqueue{recoverable_slaves = RSs1}); - false -> ok - end || #amqqueue{recoverable_slaves = RSs} = Q <- Qs], + [maybe_clear_recoverable_node(Node, Q) || Q <- Qs], ok end). +maybe_clear_recoverable_node(Node, + #amqqueue{sync_slave_pids = SPids, + recoverable_slaves = RSs} = Q) -> + case lists:member(Node, RSs) of + true -> + %% There is a race with + %% rabbit_mirror_queue_slave:record_synchronised/1 called + %% by the incoming slave node and this function, called + %% by the master node. If this function is executed after + %% record_synchronised/1, the node is erroneously removed + %% from the recoverable slaves list. + %% + %% We check if the slave node's queue PID is alive. If it is + %% the case, then this function is executed after. In this + %% situation, we don't touch the queue record, it is already + %% correct. + DoClearNode = + case [SP || SP <- SPids, node(SP) =:= Node] of + [SPid] -> not rabbit_misc:is_process_alive(SPid); + _ -> true + end, + if + DoClearNode -> RSs1 = RSs -- [Node], + store_queue( + Q#amqqueue{recoverable_slaves = RSs1}); + true -> ok + end; + false -> + ok + end. + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index f549ee2d59..d6cd3ca43d 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -252,10 +252,11 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, - {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 6}, + {delete_and_terminate, 2}, {delete_crashed, 1}, {purge, 1}, + {purge_acks, 1}, {publish, 6}, {publish_delivered, 5}, {discard, 4}, {drain_confirmed, 1}, - {dropwhile, 2}, {fetchwhile, 4}, - {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, + {dropwhile, 2}, {fetchwhile, 4}, {fetch, 2}, + {drop, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1}, {resume, 1}, {msg_rates, 1}, diff --git a/src/rabbit_cli.erl b/src/rabbit_cli.erl index 58724af850..33098ce16b 100644 --- a/src/rabbit_cli.erl +++ b/src/rabbit_cli.erl @@ -18,7 +18,7 @@ -include("rabbit_cli.hrl"). -export([main/3, start_distribution/0, start_distribution/1, - parse_arguments/4, rpc_call/4]). + parse_arguments/4, rpc_call/4, rpc_call/5]). %%---------------------------------------------------------------------------- @@ -94,8 +94,13 @@ main(ParseFun, DoFun, UsageMod) -> print_error("~p", [Reason]), rabbit_misc:quit(2); {badrpc, Reason} -> - print_error("unable to connect to node ~w: ~w", [Node, Reason]), - print_badrpc_diagnostics([Node]), + case Reason of + timeout -> + print_error("operation ~w on node ~w timed out", [Command, Node]); + _ -> + print_error("unable to connect to node ~w: ~w", [Node, Reason]), + print_badrpc_diagnostics([Node]) + end, rabbit_misc:quit(2); {badrpc_multi, Reason, Nodes} -> print_error("unable to connect to nodes ~p: ~w", [Nodes, Reason]), @@ -210,8 +215,11 @@ print_badrpc_diagnostics(Nodes) -> %% a timeout unless we set our ticktime to be the same. So let's do %% that. rpc_call(Node, Mod, Fun, Args) -> - case rpc:call(Node, net_kernel, get_net_ticktime, [], ?RPC_TIMEOUT) of + rpc_call(Node, Mod, Fun, Args, ?RPC_TIMEOUT). + +rpc_call(Node, Mod, Fun, Args, Timeout) -> + case rpc:call(Node, net_kernel, get_net_ticktime, [], Timeout) of {badrpc, _} = E -> E; Time -> net_kernel:set_net_ticktime(Time, 0), - rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT) + rpc:call(Node, Mod, Fun, Args, Timeout) end. diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 25a0fbf9a6..71d0e03130 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -21,11 +21,11 @@ -export([start/0, stop/0, parse_arguments/2, action/5, sync_queue/1, cancel_sync_queue/1, become/1]). --import(rabbit_cli, [rpc_call/4]). +-import(rabbit_cli, [rpc_call/4, rpc_call/5]). -define(EXTERNAL_CHECK_INTERVAL, 1000). --define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node)]). +-define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node), ?TIMEOUT_DEF]). -define(COMMANDS, [stop, @@ -108,6 +108,11 @@ forget_cluster_node, rename_cluster_node, cluster_status, status, environment, eval, force_boot]). +-define(COMMANDS_WITH_TIMEOUT, + [list_user_permissions, list_policies, list_queues, list_exchanges, + list_bindings, list_connections, list_channels, list_consumers, + list_vhosts, list_parameters]). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -119,6 +124,11 @@ fun ((string(), [any()]) -> 'ok')) -> 'ok'). +-spec(action/6 :: + (atom(), node(), [string()], [{string(), any()}], + fun ((string(), [any()]) -> 'ok'), timeout()) + -> 'ok'). + -endif. %%---------------------------------------------------------------------------- @@ -136,7 +146,19 @@ start() -> io:format(Format ++ " ...~n", Args1) end end, - do_action(Command, Node, Args, Opts, Inform) + try + T = case get_timeout(Opts) of + {ok, Timeout} -> + Timeout; + {error, _} -> + %% since this is an error with user input, ignore the quiet + %% setting + io:format("Failed to parse provided timeout value, using ~s~n", [?RPC_TIMEOUT]), + ?RPC_TIMEOUT + end, + do_action(Command, Node, Args, Opts, Inform, T) + catch _:E -> E + end end, rabbit_ctl_usage). parse_arguments(CmdLine, NodeStr) -> @@ -160,18 +182,64 @@ print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg) -> end, io:nl(). +get_timeout(Opts) -> + parse_timeout(proplists:get_value(?TIMEOUT_OPT, Opts, ?RPC_TIMEOUT)). + +parse_number(N) when is_list(N) -> + try list_to_integer(N) of + Val -> Val + catch error:badarg -> + %% could have been a float, give it + %% another shot + list_to_float(N) + end. + +parse_timeout("infinity") -> + {ok, infinity}; +parse_timeout(infinity) -> + {ok, infinity}; +parse_timeout(N) when is_list(N) -> + try parse_number(N) of + M -> + Y = case M >= 0 of + true -> round(M) * 1000; + false -> ?RPC_TIMEOUT + end, + {ok, Y} + catch error:badarg -> + {error, infinity} + end; +parse_timeout(N) -> + {ok, N}. + +announce_timeout(infinity, _Inform) -> + %% no-op + ok; +announce_timeout(Timeout, Inform) when is_number(Timeout) -> + Inform("Timeout: ~w seconds", [Timeout/1000]), + ok. + stop() -> ok. %%---------------------------------------------------------------------------- -do_action(Command, Node, Args, Opts, Inform) -> +do_action(Command, Node, Args, Opts, Inform, Timeout) -> case lists:member(Command, ?COMMANDS_NOT_REQUIRING_APP) of - false -> case ensure_app_running(Node) of - ok -> action(Command, Node, Args, Opts, Inform); - E -> E - end; - true -> action(Command, Node, Args, Opts, Inform) + false -> + case ensure_app_running(Node) of + ok -> + case lists:member(Command, ?COMMANDS_WITH_TIMEOUT) of + true -> + announce_timeout(Timeout, Inform), + action(Command, Node, Args, Opts, Inform, Timeout); + false -> + action(Command, Node, Args, Opts, Inform) + end; + E -> E + end; + true -> + action(Command, Node, Args, Opts, Inform) end. action(stop, Node, Args, _Opts, Inform) -> @@ -313,12 +381,6 @@ action(set_user_tags, Node, [Username | TagsStr], _Opts, Inform) -> rpc_call(Node, rabbit_auth_backend_internal, set_tags, [list_to_binary(Username), Tags]); -action(list_users, Node, [], _Opts, Inform) -> - Inform("Listing users", []), - display_info_list( - call(Node, {rabbit_auth_backend_internal, list_users, []}), - rabbit_auth_backend_internal:user_info_keys()); - action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> Inform("Creating vhost \"~s\"", Args), call(Node, {rabbit_vhost, add, Args}); @@ -327,63 +389,6 @@ action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> Inform("Deleting vhost \"~s\"", Args), call(Node, {rabbit_vhost, delete, Args}); -action(list_vhosts, Node, Args, _Opts, Inform) -> - Inform("Listing vhosts", []), - ArgAtoms = default_if_empty(Args, [name]), - display_info_list(call(Node, {rabbit_vhost, info_all, []}), ArgAtoms); - -action(list_user_permissions, Node, Args = [_Username], _Opts, Inform) -> - Inform("Listing permissions for user ~p", Args), - display_info_list(call(Node, {rabbit_auth_backend_internal, - list_user_permissions, Args}), - rabbit_auth_backend_internal:user_perms_info_keys()); - -action(list_queues, Node, Args, Opts, Inform) -> - Inform("Listing queues", []), - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - ArgAtoms = default_if_empty(Args, [name, messages]), - display_info_list(rpc_call(Node, rabbit_amqqueue, info_all, - [VHostArg, ArgAtoms]), - ArgAtoms); - -action(list_exchanges, Node, Args, Opts, Inform) -> - Inform("Listing exchanges", []), - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - ArgAtoms = default_if_empty(Args, [name, type]), - display_info_list(rpc_call(Node, rabbit_exchange, info_all, - [VHostArg, ArgAtoms]), - ArgAtoms); - -action(list_bindings, Node, Args, Opts, Inform) -> - Inform("Listing bindings", []), - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - ArgAtoms = default_if_empty(Args, [source_name, source_kind, - destination_name, destination_kind, - routing_key, arguments]), - display_info_list(rpc_call(Node, rabbit_binding, info_all, - [VHostArg, ArgAtoms]), - ArgAtoms); - -action(list_connections, Node, Args, _Opts, Inform) -> - Inform("Listing connections", []), - ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]), - display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, - [ArgAtoms]), - ArgAtoms); - -action(list_channels, Node, Args, _Opts, Inform) -> - Inform("Listing channels", []), - ArgAtoms = default_if_empty(Args, [pid, user, consumer_count, - messages_unacknowledged]), - display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]), - ArgAtoms); - -action(list_consumers, Node, _Args, Opts, Inform) -> - Inform("Listing consumers", []), - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - display_info_list(rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg]), - rabbit_amqqueue:consumer_info_keys()); - action(trace_on, Node, [], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Starting tracing for vhost \"~s\"", [VHost]), @@ -416,13 +421,6 @@ action(clear_permissions, Node, [Username], Opts, Inform) -> call(Node, {rabbit_auth_backend_internal, clear_permissions, [Username, VHost]}); -action(list_permissions, Node, [], Opts, Inform) -> - VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Listing permissions in vhost \"~s\"", [VHost]), - display_info_list(call(Node, {rabbit_auth_backend_internal, - list_vhost_permissions, [VHost]}), - rabbit_auth_backend_internal:vhost_perms_info_keys()); - action(set_parameter, Node, [Component, Key, Value], Opts, Inform) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), Inform("Setting runtime parameter ~p for component ~p to ~p", @@ -438,13 +436,6 @@ action(clear_parameter, Node, [Component, Key], Opts, Inform) -> list_to_binary(Component), list_to_binary(Key)]); -action(list_parameters, Node, [], Opts, Inform) -> - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - Inform("Listing runtime parameters", []), - display_info_list( - rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg]), - rabbit_runtime_parameters:info_keys()); - action(set_policy, Node, [Key, Pattern, Defn], Opts, Inform) -> Msg = "Setting policy ~p for pattern ~p to ~p with priority ~p", VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), @@ -460,12 +451,6 @@ action(clear_policy, Node, [Key], Opts, Inform) -> Inform("Clearing policy ~p", [Key]), rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]); -action(list_policies, Node, [], Opts, Inform) -> - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - Inform("Listing policies", []), - display_info_list(rpc_call(Node, rabbit_policy, list_formatted, [VHostArg]), - rabbit_policy:info_keys()); - action(report, Node, _Args, _Opts, Inform) -> Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]), [begin ok = action(Action, N, [], [], Inform), io:nl() end || @@ -493,7 +478,104 @@ action(eval, Node, [Expr], _Opts, _Inform) -> end; {error, E, _} -> {error_string, format_parse_error(E)} - end. + end; + +action(Command, Node, Args, Opts, Inform) -> + %% For backward compatibility, run commands accepting a timeout with + %% the default timeout. + action(Command, Node, Args, Opts, Inform, ?RPC_TIMEOUT). + +action(list_users, Node, [], _Opts, Inform, Timeout) -> + Inform("Listing users", []), + display_info_list( + call(Node, {rabbit_auth_backend_internal, list_users, []}, Timeout), + rabbit_auth_backend_internal:user_info_keys()); + +action(list_permissions, Node, [], Opts, Inform, Timeout) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + Inform("Listing permissions in vhost \"~s\"", [VHost]), + display_info_list(call(Node, {rabbit_auth_backend_internal, + list_vhost_permissions, [VHost]}, Timeout), + rabbit_auth_backend_internal:vhost_perms_info_keys()); + +action(list_parameters, Node, [], Opts, Inform, Timeout) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Listing runtime parameters", []), + display_info_list( + rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg], + Timeout), + rabbit_runtime_parameters:info_keys()); + +action(list_policies, Node, [], Opts, Inform, Timeout) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Listing policies", []), + display_info_list(rpc_call(Node, rabbit_policy, list_formatted, [VHostArg], + Timeout), + rabbit_policy:info_keys()); + +action(list_vhosts, Node, Args, _Opts, Inform, Timeout) -> + Inform("Listing vhosts", []), + ArgAtoms = default_if_empty(Args, [name]), + display_info_list(call(Node, {rabbit_vhost, info_all, []}, Timeout), + ArgAtoms); + +action(list_user_permissions, _Node, _Args = [], _Opts, _Inform, _Timeout) -> + {error_string, + "list_user_permissions expects a username argument, but none provided."}; +action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) -> + Inform("Listing permissions for user ~p", Args), + display_info_list(call(Node, {rabbit_auth_backend_internal, + list_user_permissions, Args}, Timeout), + rabbit_auth_backend_internal:user_perms_info_keys()); + +action(list_queues, Node, Args, Opts, Inform, Timeout) -> + Inform("Listing queues", []), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + ArgAtoms = default_if_empty(Args, [name, messages]), + display_info_list(rpc_call(Node, rabbit_amqqueue, info_all, + [VHostArg, ArgAtoms], Timeout), + ArgAtoms); + +action(list_exchanges, Node, Args, Opts, Inform, Timeout) -> + Inform("Listing exchanges", []), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + ArgAtoms = default_if_empty(Args, [name, type]), + display_info_list(rpc_call(Node, rabbit_exchange, info_all, + [VHostArg, ArgAtoms], Timeout), + ArgAtoms); + +action(list_bindings, Node, Args, Opts, Inform, Timeout) -> + Inform("Listing bindings", []), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + ArgAtoms = default_if_empty(Args, [source_name, source_kind, + destination_name, destination_kind, + routing_key, arguments]), + display_info_list(rpc_call(Node, rabbit_binding, info_all, + [VHostArg, ArgAtoms], Timeout), + ArgAtoms); + +action(list_connections, Node, Args, _Opts, Inform, Timeout) -> + Inform("Listing connections", []), + ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]), + display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, + [ArgAtoms], Timeout), + ArgAtoms); + +action(list_channels, Node, Args, _Opts, Inform, Timeout) -> + Inform("Listing channels", []), + ArgAtoms = default_if_empty(Args, [pid, user, consumer_count, + messages_unacknowledged]), + display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms], + Timeout), + ArgAtoms); + +action(list_consumers, Node, _Args, Opts, Inform, Timeout) -> + Inform("Listing consumers", []), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + display_info_list(rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg], + Timeout), + rabbit_amqqueue:consumer_info_keys()). + format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)). @@ -650,6 +732,9 @@ ensure_app_running(Node) -> call(Node, {Mod, Fun, Args}) -> rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)). +call(Node, {Mod, Fun, Args}, Timeout) -> + rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args), Timeout). + list_to_binary_utf8(L) -> B = list_to_binary(L), case rabbit_binary_parser:validate_utf8(B) of diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index 56e90d30ae..083204df05 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -84,6 +84,7 @@ catlevel(Category) -> level(debug) -> 4; level(info) -> 3; level(warning) -> 2; +level(warn) -> 2; level(error) -> 1; level(none) -> 0. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index fec8f3077b..6a9eafd9fc 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -196,7 +196,17 @@ socket_error(Reason) when is_atom(Reason) -> log(error, "Error on AMQP connection ~p: ~s~n", [self(), rabbit_misc:format_inet_error(Reason)]); socket_error(Reason) -> - log(error, "Error on AMQP connection ~p:~n~p~n", [self(), Reason]). + Level = + case Reason of + {ssl_upgrade_error, closed} -> + %% The socket was closed while upgrading to SSL. + %% This is presumably a TCP healthcheck, so don't log + %% it unless specified otherwise. + debug; + _ -> + error + end, + log(Level, "Error on AMQP connection ~p:~n~p~n", [self(), Reason]). inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). @@ -345,6 +355,8 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock, State#v1{pending_recv = false}); closed when State#v1.connection_state =:= closed -> ok; + closed when CS =:= pre_init andalso Buf =:= [] -> + stop(tcp_healthcheck, State); closed -> stop(closed, State); {error, Reason} -> @@ -359,7 +371,7 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock, end end. -stop(closed, #v1{connection_state = pre_init} = State) -> +stop(tcp_healthcheck, State) -> %% The connection was closed before any packet was received. It's %% probably a load-balancer healthcheck: don't consider this a %% failure. diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 57c3bfc113..7b9421eb3e 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -137,7 +137,7 @@ -record(state, {name, strategy :: strategy(), children = [] :: [child_rec()], - dynamics :: ?DICT:?DICT() | ?SET:?SET(), + dynamics :: ?DICT:?DICT() | ?SETS:?SET(), intensity :: non_neg_integer(), period :: pos_integer(), restarts = [], |
