diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-02-06 15:15:50 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-02-06 15:15:50 +0000 |
| commit | 4eb2a5282a77f7b73e1f9b94e368b6ca3b81129d (patch) | |
| tree | 2d02de39f0877a41df3c7b16025c26a169651e0a | |
| parent | a5143916e5069d3d97438008187784f3761fd4a4 (diff) | |
| parent | c16d3a27f9eabefab3a6d7e248997ee837819127 (diff) | |
| download | rabbitmq-server-git-4eb2a5282a77f7b73e1f9b94e368b6ca3b81129d.tar.gz | |
merge default into bug20337
131 files changed, 1040 insertions, 657 deletions
diff --git a/LICENSE-MPL-RabbitMQ b/LICENSE-MPL-RabbitMQ index 14bcc21d47..d50e32efc8 100644 --- a/LICENSE-MPL-RabbitMQ +++ b/LICENSE-MPL-RabbitMQ @@ -447,7 +447,7 @@ EXHIBIT A -Mozilla Public License. The Original Code is RabbitMQ. The Initial Developer of the Original Code is VMware, Inc. - Copyright (c) 2007-2011 VMware, Inc. All rights reserved.'' + Copyright (c) 2007-2012 VMware, Inc. All rights reserved.'' [NOTE: The text of this Exhibit A may differ slightly from the text of the notices in the Source Code files of the Original Code. You should diff --git a/codegen.py b/codegen.py index 494be73d68..9483e85438 100644 --- a/codegen.py +++ b/codegen.py @@ -11,7 +11,7 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +## Copyright (c) 2007-2012 VMware, Inc. All rights reserved. ## from __future__ import nested_scopes @@ -118,7 +118,7 @@ def printFileHeader(): %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %%""" def genErl(spec): diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl index 88aa2e78f7..d83d507369 100644 --- a/docs/html-to-website-xml.xsl +++ b/docs/html-to-website-xml.xsl @@ -8,8 +8,6 @@ <xsl:output method="xml" /> -<xsl:template match="*"/> - <!-- Copy every element through --> <xsl:template match="*"> <xsl:element name="{name()}" namespace="http://www.w3.org/1999/xhtml"> @@ -28,36 +26,30 @@ <head> <title><xsl:value-of select="document($original)/refentry/refnamediv/refname"/><xsl:if test="document($original)/refentry/refmeta/manvolnum">(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</xsl:if> manual page</title> </head> - <body> - <doc:div> - <xsl:choose> + <body show-in-this-page="true"> + <xsl:choose> <xsl:when test="document($original)/refentry/refmeta/manvolnum"> - <p> - This is the manual page for - <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>. - </p> - <p> - <a href="../manpages.html">See a list of all manual pages</a>. - </p> + <p> + This is the manual page for + <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>. + </p> + <p> + <a href="../manpages.html">See a list of all manual pages</a>. + </p> </xsl:when> <xsl:otherwise> - <p> - This is the documentation for - <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>. - </p> + <p> + This is the documentation for + <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>. + </p> </xsl:otherwise> - </xsl:choose> - <p> + </xsl:choose> + <p> For more general documentation, please see the - <a href="../admin-guide.html">administrator's guide</a>. - </p> - - <doc:toc class="compact"> - <doc:heading>Table of Contents</doc:heading> - </doc:toc> + <a href="../admin-guide.html">administrator's guide</a>. + </p> - <xsl:apply-templates select="body/div[@class='refentry']"/> - </doc:div> + <xsl:apply-templates select="body/div[@class='refentry']"/> </body> </html> </xsl:template> diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 7268f09037..c1c51f9f8b 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -266,10 +266,9 @@ </para> <para> When the target files do not exist they are created. - target files do not already exist. When - no <option>suffix</option> is specified, the empty log - files are simply created at the original location; no - rotation takes place. + When no <option>suffix</option> is specified, the empty + log files are simply created at the original location; + no rotation takes place. </para> <para role="example-prefix">For example:</para> <screen role="example">rabbitmqctl rotate_logs .1</screen> @@ -1067,10 +1066,26 @@ <listitem><para>The period for which the peer's SSL certificate is valid.</para></listitem> </varlistentry> + + <varlistentry> + <term>last_blocked_by</term> + <listitem><para>The reason for which this connection + was last blocked. One of 'mem' - due to a memory + alarm, 'flow' - due to internal flow control, or + 'none' if the connection was never + blocked.</para></listitem> + </varlistentry> + <varlistentry> + <term>last_blocked_age</term> + <listitem><para>Time, in seconds, since this + connection was last blocked, or + 'infinity'.</para></listitem> + </varlistentry> + <varlistentry> <term>state</term> <listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>, - <command>opening</command>, <command>running</command>, <command>closing</command>, <command>closed</command>]).</para></listitem> + <command>opening</command>, <command>running</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem> </varlistentry> <varlistentry> <term>channels</term> @@ -1127,8 +1142,9 @@ </varlistentry> </variablelist> <para> - If no <command>connectioninfoitem</command>s are specified then user, peer - address, peer port and connection state are displayed. + If no <command>connectioninfoitem</command>s are + specified then user, peer address, peer port, time since + flow control and memory block state are displayed. </para> <para role="example-prefix"> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 9301af6bdc..2fee1114aa 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -37,6 +37,7 @@ {auth_backends, [rabbit_auth_backend_internal]}, {delegate_count, 16}, {trace_vhosts, []}, + {log_levels, [{connection, info}]}, {tcp_listen_options, [binary, {packet, raw}, {reuseaddr, true}, diff --git a/include/gm_specs.hrl b/include/gm_specs.hrl index ee29706e98..a317e63b68 100644 --- a/include/gm_specs.hrl +++ b/include/gm_specs.hrl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -ifdef(use_specs). diff --git a/include/rabbit.hrl b/include/rabbit.hrl index d81b82dbbc..735b47204c 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -record(user, {username, @@ -86,7 +86,7 @@ %%---------------------------------------------------------------------------- --define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2011 VMware, Inc."). +-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2012 VMware, Inc."). -define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/"). -define(PROTOCOL_VERSION, "AMQP 0-9-1 / 0-9 / 0-8"). -define(ERTS_MINIMUM, "5.6.3"). @@ -95,16 +95,7 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). +-define(CREDIT_DISC_BOUND, {2000, 1500}). -define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]). -define(DELETED_HEADER, <<"BCC">>). - --ifdef(debug). --define(LOGDEBUG0(F), rabbit_log:debug(F)). --define(LOGDEBUG(F,A), rabbit_log:debug(F,A)). --define(LOGMESSAGE(D,C,M,Co), rabbit_log:message(D,C,M,Co)). --else. --define(LOGDEBUG0(F), ok). --define(LOGDEBUG(F,A), ok). --define(LOGMESSAGE(D,C,M,Co), ok). --endif. diff --git a/include/rabbit_auth_backend_spec.hrl b/include/rabbit_auth_backend_spec.hrl index 803bb75ce7..61a2e22a0e 100644 --- a/include/rabbit_auth_backend_spec.hrl +++ b/include/rabbit_auth_backend_spec.hrl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -ifdef(use_specs). diff --git a/include/rabbit_auth_mechanism_spec.hrl b/include/rabbit_auth_mechanism_spec.hrl index 614a3eed07..9a2f5e0505 100644 --- a/include/rabbit_auth_mechanism_spec.hrl +++ b/include/rabbit_auth_mechanism_spec.hrl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -ifdef(use_specs). diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 1e870bb74b..918d587a5e 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -type(fetch_result(Ack) :: diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl index f6283ef7c1..8f7e22d37e 100644 --- a/include/rabbit_exchange_type_spec.hrl +++ b/include/rabbit_exchange_type_spec.hrl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -ifdef(use_specs). diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl index e9150a97b3..f7c10bd8d9 100644 --- a/include/rabbit_msg_store.hrl +++ b/include/rabbit_msg_store.hrl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -include("rabbit.hrl"). diff --git a/include/rabbit_msg_store_index.hrl b/include/rabbit_msg_store_index.hrl index 2ae5b0009d..75d7eb7162 100644 --- a/include/rabbit_msg_store_index.hrl +++ b/include/rabbit_msg_store_index.hrl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -include("rabbit_msg_store.hrl"). diff --git a/packaging/common/LICENSE.tail b/packaging/common/LICENSE.tail index 5d842cc1ae..b9c2629b02 100644 --- a/packaging/common/LICENSE.tail +++ b/packaging/common/LICENSE.tail @@ -56,7 +56,7 @@ The rest of this package is licensed under the Mozilla Public License 1.1 Authors and Copyright are as described below: The Initial Developer of the Original Code is VMware, Inc. - Copyright (c) 2007-2011 VMware, Inc. All rights reserved. + Copyright (c) 2007-2012 VMware, Inc. All rights reserved. MOZILLA PUBLIC LICENSE @@ -508,7 +508,7 @@ EXHIBIT A -Mozilla Public License. The Original Code is RabbitMQ. The Initial Developer of the Original Code is VMware, Inc. - Copyright (c) 2007-2011 VMware, Inc. All rights reserved.'' + Copyright (c) 2007-2012 VMware, Inc. All rights reserved.'' [NOTE: The text of this Exhibit A may differ slightly from the text of the notices in the Source Code files of the Original Code. You should diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper index 0436f54609..0e59c21804 100644 --- a/packaging/common/rabbitmq-script-wrapper +++ b/packaging/common/rabbitmq-script-wrapper @@ -12,7 +12,7 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +## Copyright (c) 2007-2012 VMware, Inc. All rights reserved. ## # Escape spaces and quotes, because shell is revolting. diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf index e6776effcd..1455728630 100755 --- a/packaging/common/rabbitmq-server.ocf +++ b/packaging/common/rabbitmq-server.ocf @@ -12,7 +12,7 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +## Copyright (c) 2007-2012 VMware, Inc. All rights reserved. ## ## diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index 8696427ea5..79e9c1dd72 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -34,7 +34,7 @@ package: clean chmod a+x $(UNPACKED_DIR)/debian/rules echo "This package was debianized by Tony Garnock-Jones <tonyg@rabbitmq.com> on\nWed, 3 Jan 2007 15:43:44 +0000.\n\nIt was downloaded from http://www.rabbitmq.com/\n\n" > $(UNPACKED_DIR)/debian/copyright cat $(UNPACKED_DIR)/LICENSE >> $(UNPACKED_DIR)/debian/copyright - echo "\n\nThe Debian packaging is (C) 2007-2011, VMware, Inc. and is licensed\nunder the MPL 1.1, see above.\n" >> $(UNPACKED_DIR)/debian/copyright + echo "\n\nThe Debian packaging is (C) 2007-2012, VMware, Inc. and is licensed\nunder the MPL 1.1, see above.\n" >> $(UNPACKED_DIR)/debian/copyright UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR) cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING) rm -rf $(UNPACKED_DIR) diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in index 27e4e1dc03..91510991cc 100644 --- a/packaging/windows-exe/rabbitmq_nsi.in +++ b/packaging/windows-exe/rabbitmq_nsi.in @@ -37,7 +37,7 @@ VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductName" "RabbitMQ Server" ;VIAddVersionKey /LANG=${LANG_ENGLISH} "Comments" "" VIAddVersionKey /LANG=${LANG_ENGLISH} "CompanyName" "VMware, Inc" ;VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalTrademarks" "" ; TODO ? -VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2011 VMware, Inc. All rights reserved." +VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2012 VMware, Inc. All rights reserved." VIAddVersionKey /LANG=${LANG_ENGLISH} "FileDescription" "RabbitMQ Server" VIAddVersionKey /LANG=${LANG_ENGLISH} "FileVersion" "%%VERSION%%" diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env index a2ef8d3ceb..1fd1339da2 100755 --- a/scripts/rabbitmq-env +++ b/scripts/rabbitmq-env @@ -12,7 +12,7 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +## Copyright (c) 2007-2012 VMware, Inc. All rights reserved. ## # Determine where this script is really located @@ -36,7 +36,16 @@ RABBITMQ_HOME="${SCRIPT_DIR}/.." [ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname` NODENAME=rabbit@${HOSTNAME%%.*} -# Load configuration from the rabbitmq.conf file +## Set (non-empty) default values for rabbitmq-env.conf variables to override +SERVER_ERL_ARGS="+K true +A30 +P 1048576 \ +-kernel inet_default_connect_options [{nodelay,true}]" +CONFIG_FILE=/etc/rabbitmq/rabbitmq +LOG_BASE=/var/log/rabbitmq +MNESIA_BASE=/var/lib/rabbitmq/mnesia +PLUGINS_DIR="${RABBITMQ_HOME}/plugins" +ENABLED_PLUGINS_FILE=/etc/rabbitmq/enabled_plugins + +## Load configuration from the rabbitmq.conf file if [ -f /etc/rabbitmq/rabbitmq.conf ] && \ [ ! -f /etc/rabbitmq/rabbitmq-env.conf ] ; then echo -n "WARNING: ignoring /etc/rabbitmq/rabbitmq.conf -- " diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins index 4c6cb1fa1d..14a18d5794 100755 --- a/scripts/rabbitmq-plugins +++ b/scripts/rabbitmq-plugins @@ -12,16 +12,19 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +## Copyright (c) 2007-2012 VMware, Inc. All rights reserved. ## +# Get default settings with user overrides for (RABBITMQ_)<var_name> +# Non-empty defaults should be set in rabbitmq-env . `dirname $0`/rabbitmq-env -ENABLED_PLUGINS_FILE=/etc/rabbitmq/enabled_plugins +##--- Set environment vars RABBITMQ_<var_name> to defaults if not set [ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE} +[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR} -[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins" +##--- End of overridden <var_name> variables exec erl \ -pa "${RABBITMQ_HOME}/ebin" \ diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat index ca874a7f4b..66a900a195 100755 --- a/scripts/rabbitmq-plugins.bat +++ b/scripts/rabbitmq-plugins.bat @@ -12,7 +12,7 @@ REM REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 39a68c8e0f..0a5a46400e 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -12,33 +12,23 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +## Copyright (c) 2007-2012 VMware, Inc. All rights reserved. ## -SERVER_ERL_ARGS="+K true +A30 +P 1048576 \ --kernel inet_default_connect_options [{nodelay,true}]" -CONFIG_FILE=/etc/rabbitmq/rabbitmq -LOG_BASE=/var/log/rabbitmq -MNESIA_BASE=/var/lib/rabbitmq/mnesia -SERVER_START_ARGS= -ENABLED_PLUGINS_FILE=/etc/rabbitmq/enabled_plugins - +# Get default settings with user overrides for (RABBITMQ_)<var_name> +# Non-empty defaults should be set in rabbitmq-env . `dirname $0`/rabbitmq-env +##--- Set environment vars RABBITMQ_<var_name> to defaults if not set + DEFAULT_NODE_IP_ADDRESS=auto DEFAULT_NODE_PORT=5672 -[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} -[ "x" = "x$RABBITMQ_NODE_PORT" ] && [ "x" != "x$NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} -if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] -then - if [ "x" != "x$RABBITMQ_NODE_PORT" ] - then RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS} - fi -else - if [ "x" = "x$RABBITMQ_NODE_PORT" ] - then RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT} - fi -fi +[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} +[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} + +[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS} +[ "x" != "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT} + [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} [ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS} [ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE} @@ -48,13 +38,16 @@ fi [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR} [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME} + +[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${PID_FILE} [ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${RABBITMQ_MNESIA_DIR}.pid [ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR} [ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand [ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE} -[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins" + +[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR} ## Log rotation [ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS} @@ -62,6 +55,8 @@ fi [ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS=${SASL_LOGS} [ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}-sasl.log" +##--- End of overridden <var_name> variables + RABBITMQ_START_RABBIT= [ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput' diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 44ce1ce148..ca49a5d8dd 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -12,7 +12,7 @@ REM REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 1582bfb142..9e274840f2 100755 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -12,7 +12,7 @@ REM REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index 9a11c3b35e..4aad6b8f33 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -12,14 +12,20 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +## Copyright (c) 2007-2012 VMware, Inc. All rights reserved. ## +# Get default settings with user overrides for (RABBITMQ_)<var_name> +# Non-empty defaults should be set in rabbitmq-env . `dirname $0`/rabbitmq-env +##--- Set environment vars RABBITMQ_<var_name> to defaults if not set + [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} [ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS} +##--- End of overridden <var_name> variables + exec erl \ -pa "${RABBITMQ_HOME}/ebin" \ -noinput \ diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index a74a91fd91..f37fae48a6 100644..100755 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -12,7 +12,7 @@ REM REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/src/credit_flow.erl b/src/credit_flow.erl new file mode 100644 index 0000000000..edbf4fb5af --- /dev/null +++ b/src/credit_flow.erl @@ -0,0 +1,128 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(credit_flow). + +%% Credit starts at MaxCredit and goes down. Both sides keep +%% track. When the receiver goes below MoreCreditAt it issues more +%% credit by sending a message to the sender. The sender should pass +%% this message in to handle_bump_msg/1. The sender should block when +%% it goes below 0 (check by invoking blocked/0). If a process is both +%% a sender and a receiver it will not grant any more credit to its +%% senders when it is itself blocked - thus the only processes that +%% need to check blocked/0 are ones that read from network sockets. + +-define(DEFAULT_CREDIT, {200, 150}). + +-export([ack/1, ack/2, handle_bump_msg/1, blocked/0, send/1, send/2]). +-export([peer_down/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-opaque(bump_msg() :: {pid(), non_neg_integer()}). +-opaque(credit_spec() :: {non_neg_integer(), non_neg_integer()}). + +-spec(ack/1 :: (pid()) -> 'ok'). +-spec(ack/2 :: (pid(), credit_spec()) -> 'ok'). +-spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok'). +-spec(blocked/0 :: () -> boolean()). +-spec(send/1 :: (pid()) -> 'ok'). +-spec(send/2 :: (pid(), credit_spec()) -> 'ok'). +-spec(peer_down/1 :: (pid()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +%% There are two "flows" here; of messages and of credit, going in +%% opposite directions. The variable names "From" and "To" refer to +%% the flow of credit, but the function names refer to the flow of +%% messages. This is the clearest I can make it (since the function +%% names form the API and want to make sense externally, while the +%% variable names are used in credit bookkeeping and want to make +%% sense internally). + +%% For any given pair of processes, ack/2 and send/2 must always be +%% called with the same credit_spec(). + +ack(To) -> ack(To, ?DEFAULT_CREDIT). + +ack(To, {MaxCredit, MoreCreditAt}) -> + MoreCreditAt1 = MoreCreditAt + 1, + Credit = case get({credit_to, To}, MaxCredit) of + MoreCreditAt1 -> grant(To, MaxCredit - MoreCreditAt), + MaxCredit; + C -> C - 1 + end, + put({credit_to, To}, Credit). + +handle_bump_msg({From, MoreCredit}) -> + Credit = get({credit_from, From}, 0) + MoreCredit, + put({credit_from, From}, Credit), + case Credit > 0 of + true -> unblock(From), + ok; + false -> ok + end. + +blocked() -> get(credit_blocked, []) =/= []. + +send(From) -> send(From, ?DEFAULT_CREDIT). + +send(From, {MaxCredit, _MoreCreditAt}) -> + Credit = get({credit_from, From}, MaxCredit) - 1, + case Credit of + 0 -> block(From); + _ -> ok + end, + put({credit_from, From}, Credit). + +peer_down(Peer) -> + %% In theory we could also remove it from credit_deferred here, but it + %% doesn't really matter; at some point later we will drain + %% credit_deferred and thus send messages into the void... + unblock(Peer), + erase({credit_from, Peer}), + erase({credit_to, Peer}). + +%% -------------------------------------------------------------------------- + +grant(To, Quantity) -> + Msg = {bump_credit, {self(), Quantity}}, + case blocked() of + false -> To ! Msg; + true -> Deferred = get(credit_deferred, []), + put(credit_deferred, [{To, Msg} | Deferred]) + end. + +block(From) -> put(credit_blocked, [From | get(credit_blocked, [])]). + +unblock(From) -> + NewBlocks = get(credit_blocked, []) -- [From], + put(credit_blocked, NewBlocks), + case NewBlocks of + [] -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])], + erase(credit_deferred); + _ -> ok + end. + +get(Key, Default) -> + case get(Key) of + undefined -> Default; + Value -> Value + end. diff --git a/src/delegate.erl b/src/delegate.erl index edb4eba4ae..d595e4819e 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(delegate). diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index 4c131a6c59..2a8b915b89 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(delegate_sup). diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index c11fb54bcb..59a0ab1cc0 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(file_handle_cache). diff --git a/src/gatherer.erl b/src/gatherer.erl index fe976b50a2..98b360389a 100644 --- a/src/gatherer.erl +++ b/src/gatherer.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(gatherer). diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 49913d2694..f8537487cd 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -73,7 +73,7 @@ %% but where the second argument is specifically the priority_queue %% which contains the prioritised message_queue. -%% All modifications are (C) 2009-2011 VMware, Inc. +%% All modifications are (C) 2009-2012 VMware, Inc. %% ``The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in diff --git a/src/gm.erl b/src/gm.erl index 6c89912213..6f9ff5642b 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(gm). diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl index 5e5a3a5a6f..572175410d 100644 --- a/src/gm_soak_test.erl +++ b/src/gm_soak_test.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(gm_soak_test). diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl index defb0f29b8..dad75bd447 100644 --- a/src/gm_speed_test.erl +++ b/src/gm_speed_test.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(gm_speed_test). diff --git a/src/gm_tests.erl b/src/gm_tests.erl index ca0ffd6483..0a2d420469 100644 --- a/src/gm_tests.erl +++ b/src/gm_tests.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(gm_tests). diff --git a/src/lqueue.erl b/src/lqueue.erl index 04b4070621..c4e046b521 100644 --- a/src/lqueue.erl +++ b/src/lqueue.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. %% -module(lqueue). diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 3ba8f50d2e..a599effa91 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. %% -module(mirrored_supervisor). diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl index d48a9ca51a..e8baabe8ff 100644 --- a/src/mirrored_supervisor_tests.erl +++ b/src/mirrored_supervisor_tests.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. %% -module(mirrored_supervisor_tests). diff --git a/src/mnesia_sync.erl b/src/mnesia_sync.erl new file mode 100644 index 0000000000..a3773d90b2 --- /dev/null +++ b/src/mnesia_sync.erl @@ -0,0 +1,77 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(mnesia_sync). + +%% mnesia:sync_transaction/3 fails to guarantee that the log is flushed to disk +%% at commit. This module is an attempt to minimise the risk of data loss by +%% performing a coalesced log fsync. Unfortunately this is performed regardless +%% of whether or not the log was appended to. + +-behaviour(gen_server). + +-export([sync/0]). + +-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, {waiting, disc_node}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(sync/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +sync() -> + gen_server:call(?SERVER, sync, infinity). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, #state{disc_node = mnesia:system_info(use_dir), waiting = []}}. + +handle_call(sync, _From, #state{disc_node = false} = State) -> + {reply, ok, State}; +handle_call(sync, From, #state{waiting = Waiting} = State) -> + {noreply, State#state{waiting = [From | Waiting]}, 0}; +handle_call(Request, _From, State) -> + {stop, {unhandled_call, Request}, State}. + +handle_cast(Request, State) -> + {stop, {unhandled_cast, Request}, State}. + +handle_info(timeout, #state{waiting = Waiting} = State) -> + ok = disk_log:sync(latest_log), + [gen_server:reply(From, ok) || From <- Waiting], + {noreply, State#state{waiting = []}}; +handle_info(Message, State) -> + {stop, {unhandled_info, Message}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/pg_local.erl b/src/pg_local.erl index c9c3a3a715..e2e82f1f4b 100644 --- a/src/pg_local.erl +++ b/src/pg_local.erl @@ -13,7 +13,7 @@ %% versions of Erlang/OTP. The remaining type specs have been %% removed. -%% All modifications are (C) 2010-2011 VMware, Inc. +%% All modifications are (C) 2010-2012 VMware, Inc. %% %CopyrightBegin% %% diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 4fc8b46972..780fa2e92f 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% %% Priority queues have essentially the same interface as ordinary diff --git a/src/rabbit.erl b/src/rabbit.erl index 9609eb0400..3eb75217fd 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit). @@ -45,6 +45,12 @@ {requires, file_handle_cache}, {enables, external_infrastructure}]}). +-rabbit_boot_step({database_sync, + [{description, "database sync"}, + {mfa, {rabbit_sup, start_child, [mnesia_sync]}}, + {requires, database}, + {enables, external_infrastructure}]}). + -rabbit_boot_step({file_handle_cache, [{description, "file handle cache server"}, {mfa, {rabbit_sup, start_restartable_child, @@ -191,7 +197,7 @@ rabbit_queue_index, gen, dict, ordsets, file_handle_cache, rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file, rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia, - mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists]). + mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow]). %% HiPE compilation uses multiple cores anyway, but some bits are %% IO-bound so we can go faster if we parallelise a bit more. In @@ -442,8 +448,7 @@ run_boot_step({StepName, Attributes}) -> [try apply(M,F,A) catch - _:Reason -> boot_error("FAILED~nReason: ~p~nStacktrace: ~p~n", - [Reason, erlang:get_stacktrace()]) + _:Reason -> boot_step_error(Reason, erlang:get_stacktrace()) end || {M,F,A} <- MFAs], io:format("done~n"), ok @@ -502,8 +507,14 @@ sort_boot_steps(UnsortedSteps) -> end]) end. +boot_step_error(Reason, Stacktrace) -> + boot_error("Error description:~n ~p~n~n" + "Log files (may contain more information):~n ~s~n ~s~n~n" + "Stack trace:~n ~p~n~n", + [Reason, log_location(kernel), log_location(sasl), Stacktrace]). + boot_error(Format, Args) -> - io:format("BOOT ERROR: " ++ Format, Args), + io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args), error_logger:error_msg(Format, Args), timer:sleep(1000), exit({?MODULE, failure_during_boot}). diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index ca28d68637..75c5351182 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_access_control). @@ -66,7 +66,6 @@ check_user_login(Username, AuthProps) -> check_vhost_access(User = #user{ username = Username, auth_backend = Module }, VHostPath) -> - ?LOGDEBUG("Checking VHost access for ~p to ~p~n", [Username, VHostPath]), check_access( fun() -> rabbit_vhost:exists(VHostPath) andalso diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 517dd4ec60..187ec1ab6c 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_alarm). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ac64958e18..d16cd0a8fd 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_amqqueue). @@ -20,7 +20,7 @@ -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, - stat/1, deliver/2, requeue/3, ack/3, reject/4]). + stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). @@ -120,6 +120,8 @@ -spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()). -spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> {routing_result(), qpids()}). +-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> + {routing_result(), qpids()}). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). @@ -451,39 +453,9 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge). -deliver([], #delivery{mandatory = false, immediate = false}) -> - %% /dev/null optimisation - {routed, []}; - -deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}) -> - %% optimisation: when Mandatory = false and Immediate = false, - %% rabbit_amqqueue:deliver will deliver the message to the queue - %% process asynchronously, and return true, which means all the - %% QPids will always be returned. It is therefore safe to use a - %% fire-and-forget cast here and return the QPids - the semantics - %% is preserved. This scales much better than the non-immediate - %% case below. - QPids = qpids(Qs), - delegate:invoke_no_result( - QPids, fun (QPid) -> gen_server2:cast(QPid, {deliver, Delivery}) end), - {routed, QPids}; +deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow). -deliver(Qs, Delivery = #delivery{mandatory = Mandatory, - immediate = Immediate}) -> - QPids = qpids(Qs), - {Success, _} = - delegate:invoke( - QPids, fun (QPid) -> - gen_server2:call(QPid, {deliver, Delivery}, infinity) - end), - case {Mandatory, Immediate, - lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]}; - ({_, false}, {_, H}) -> {true, H} - end, {false, []}, Success)} of - {true, _ , {false, []}} -> {unroutable, []}; - {_ , true, {_ , []}} -> {not_delivered, []}; - {_ , _ , {_ , R}} -> {routed, R} - end. +deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow). requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}). @@ -575,6 +547,46 @@ pseudo_queue(QueueName, Pid) -> slave_pids = [], mirror_nodes = undefined}. +deliver([], #delivery{mandatory = false, immediate = false}, _Flow) -> + %% /dev/null optimisation + {routed, []}; + +deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) -> + %% optimisation: when Mandatory = false and Immediate = false, + %% rabbit_amqqueue:deliver will deliver the message to the queue + %% process asynchronously, and return true, which means all the + %% QPids will always be returned. It is therefore safe to use a + %% fire-and-forget cast here and return the QPids - the semantics + %% is preserved. This scales much better than the non-immediate + %% case below. + QPids = qpids(Qs), + case Flow of + flow -> [credit_flow:send(QPid) || QPid <- QPids]; + noflow -> ok + end, + delegate:invoke_no_result( + QPids, fun (QPid) -> + gen_server2:cast(QPid, {deliver, Delivery, Flow}) + end), + {routed, QPids}; + +deliver(Qs, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate}, + _Flow) -> + QPids = qpids(Qs), + {Success, _} = + delegate:invoke( + QPids, fun (QPid) -> + gen_server2:call(QPid, {deliver, Delivery}, infinity) + end), + case {Mandatory, Immediate, + lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]}; + ({_, false}, {_, H}) -> {true, H} + end, {false, []}, Success)} of + {true, _ , {false, []}} -> {unroutable, []}; + {_ , true, {_ , []}} -> {not_delivered, []}; + {_ , _ , {_ , R}} -> {routed, R} + end. + qpids(Qs) -> lists:append([[QPid | SPids] || #amqqueue{pid = QPid, slave_pids = SPids} <- Qs]). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 16d134017f..16d89f74e6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_amqqueue_process). @@ -122,7 +122,6 @@ info_keys() -> ?INFO_KEYS. %%---------------------------------------------------------------------------- init(Q) -> - ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), State = #q{q = Q#amqqueue{pid = self()}, @@ -149,7 +148,6 @@ init(Q) -> init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, RateTRef, AckTags, Deliveries, MTC) -> - ?LOGDEBUG("Queue starting - ~p~n", [Q]), case Owner of none -> ok; _ -> erlang:monitor(process, Owner) @@ -627,6 +625,12 @@ should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> + case get({ch_publisher, DownPid}) of + undefined -> ok; + MRef -> erlang:demonitor(MRef), + erase({ch_publisher, DownPid}), + credit_flow:peer_down(DownPid) + end, case lookup_ch(DownPid) of not_found -> {ok, State}; @@ -872,7 +876,7 @@ cleanup_after_confirm(State = #q{blocked_op = Op, noreply(State1#q{blocked_op = Op}) end. -already_been_here(Delivery = #delivery{message = #basic_message{content = Content}}, +already_been_here(#delivery{message = #basic_message{content = Content}}, State) -> #content{properties = #'P_basic'{headers = Headers}} = rabbit_binary_parser:ensure_content_decoded(Content), @@ -1275,24 +1279,43 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast({deliver, Delivery = #delivery{sender = Sender, - msg_seq_no = MsgSeqNo}}, - State = #q{dlx = DLX}) -> +handle_cast({deliver, Delivery = #delivery{sender = Sender, + msg_seq_no = MsgSeqNo}, + Flow}, State = #q{dlx = DLX}) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - case DLX of - undefined -> - noreply(deliver_or_enqueue(Delivery, State)); - _ -> - case already_been_here(Delivery, State) of - false -> - noreply(deliver_or_enqueue(Delivery, State)); - Qs -> - rabbit_log:warning( - "Message dropped. Dead-letter queues " ++ - "cycle detected: ~p~n", [Qs]), - rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]), - noreply(State) - end + ShouldDeliver = + case DLX of + undefined -> + true; + _ -> + case already_been_here(Delivery, State) of + false -> + true; + Qs -> + rabbit_log:warning( + "Message dropped. Dead-letter queues " ++ + "cycle detected: ~p~n", [Qs]), + rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]), + false + end + end, + case ShouldDeliver of + false -> + noreply(State); + true -> + case Flow of + flow -> + Key = {ch_publisher, Sender}, + case get(Key) of + undefined -> put(Key, + erlang:monitor(process, Sender)); + _ -> ok + end, + credit_flow:ack(Sender); + noflow -> + ok + end, + noreply(deliver_or_enqueue(Delivery, State)) end; handle_cast({ack, AckTags, ChPid}, State) -> @@ -1381,8 +1404,7 @@ handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) -> handle_info(maybe_expire, State) -> case is_unused(State) of - true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]), - dead_letter_deleted_queue(undefined, State); + true -> dead_letter_deleted_queue(undefined, State); false -> noreply(ensure_expiry_timer(State)) end; @@ -1429,8 +1451,11 @@ handle_info(timeout, State) -> handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; +handle_info({bump_credit, Msg}, State) -> + credit_flow:handle_bump_msg(Msg), + noreply(State); + handle_info(Info, State) -> - ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 7b3ebcf266..a4305e5f5b 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_amqqueue_sup). diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl index ade158bb8c..e0e252b898 100644 --- a/src/rabbit_auth_backend.erl +++ b/src/rabbit_auth_backend.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_auth_backend). diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index 086a90b49f..3ef81d3208 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_auth_backend_internal). diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl index 897199ee78..0c8251b803 100644 --- a/src/rabbit_auth_mechanism.erl +++ b/src/rabbit_auth_mechanism.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_auth_mechanism). diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl index b8682a465e..3de6e7a679 100644 --- a/src/rabbit_auth_mechanism_amqplain.erl +++ b/src/rabbit_auth_mechanism_amqplain.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_auth_mechanism_amqplain). diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl index acbb6e48f6..64b01d8e5d 100644 --- a/src/rabbit_auth_mechanism_cr_demo.erl +++ b/src/rabbit_auth_mechanism_cr_demo.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_auth_mechanism_cr_demo). diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl index 2448acb660..19fb587567 100644 --- a/src/rabbit_auth_mechanism_plain.erl +++ b/src/rabbit_auth_mechanism_plain.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_auth_mechanism_plain). diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 72c00e3d01..50e4746230 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_backing_queue). diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index c61184a601..7b00fa5f13 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. %% -module(rabbit_backing_queue_qc). diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 4196db7dd4..58b3c131d3 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_basic). diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 494f3203c8..d69376fb75 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_binary_generator). diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index f3ca4e9897..5f0016b60b 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_binary_parser). diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 655bbb7369..bb44797e4d 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_binding). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4c4ba230bc..d74f699d45 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_channel). @@ -20,7 +20,7 @@ -behaviour(gen_server2). --export([start_link/10, do/2, do/3, flush/1, shutdown/1]). +-export([start_link/10, do/2, do/3, do_flow/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). @@ -78,6 +78,8 @@ -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). +-spec(do_flow/3 :: (pid(), rabbit_framing:amqp_method_record(), + rabbit_types:maybe(rabbit_types:content())) -> 'ok'). -spec(flush/1 :: (pid()) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). @@ -110,7 +112,11 @@ do(Pid, Method) -> do(Pid, Method, none). do(Pid, Method, Content) -> - gen_server2:cast(Pid, {method, Method, Content}). + gen_server2:cast(Pid, {method, Method, Content, noflow}). + +do_flow(Pid, Method, Content) -> + credit_flow:send(Pid), + gen_server2:cast(Pid, {method, Method, Content, flow}). flush(Pid) -> gen_server2:call(Pid, flush, infinity). @@ -184,7 +190,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, - queue_monitors = dict:new(), + queue_monitors = sets:new(), consumer_mapping = dict:new(), blocking = sets:new(), queue_consumers = dict:new(), @@ -240,7 +246,12 @@ handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) -> handle_call(_Request, _From, State) -> noreply(State). -handle_cast({method, Method, Content}, State) -> +handle_cast({method, Method, Content, Flow}, + State = #ch{reader_pid = Reader}) -> + case Flow of + flow -> credit_flow:ack(Reader); + noflow -> ok + end, try handle_method(Method, Content, State) of {reply, Reply, NewState} -> ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), @@ -295,13 +306,13 @@ handle_cast({deliver, ConsumerTag, AckRequired, exchange = ExchangeName#resource.name, routing_key = RoutingKey}, rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content), - State2 = maybe_incr_stats([{QPid, 1}], case AckRequired of - true -> deliver; - false -> deliver_no_ack - end, State1), - State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2), + maybe_incr_stats([{QPid, 1}], case AckRequired of + true -> deliver; + false -> deliver_no_ack + end, State1), + maybe_incr_redeliver_stats(Redelivered, QPid, State1), rabbit_trace:tap_trace_out(Msg, TraceState), - noreply(State3#ch{next_tag = DeliveryTag + 1}); + noreply(State1#ch{next_tag = DeliveryTag + 1}); handle_cast(force_event_refresh, State) -> @@ -311,6 +322,10 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end). +handle_info({bump_credit, Msg}, State) -> + credit_flow:handle_bump_msg(Msg), + noreply(State); + handle_info(timeout, State) -> noreply(State); @@ -323,9 +338,10 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), State2 = queue_blocked(QPid, State1), State3 = handle_consuming_queue_down(QPid, State2), + credit_flow:peer_down(QPid), erase_queue_stats(QPid), noreply(State3#ch{queue_monitors = - dict:erase(QPid, State3#ch.queue_monitors)}); + sets:del_element(QPid, State3#ch.queue_monitors)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -523,7 +539,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> #'channel.flow_ok'{active = false}); _ -> ok end, - demonitor_queue(QPid, State#ch{blocking = Blocking1}) + State#ch{blocking = Blocking1} end. record_confirm(undefined, _, State) -> @@ -561,8 +577,7 @@ remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), case gb_sets:is_empty(MsgSeqNos1) of true -> UQM1 = gb_trees:delete(QPid, UQM), - demonitor_queue( - QPid, State#ch{unconfirmed_qm = UQM1}); + State#ch{unconfirmed_qm = UQM1}; false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM), State#ch{unconfirmed_qm = UQM1} end; @@ -668,7 +683,8 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, State1 = State#ch{unacked_message_q = Remaining}, {noreply, case TxStatus of - none -> ack(Acked, State1); + none -> ack(Acked, State1), + State1; in_progress -> State1#ch{uncommitted_acks = Acked ++ State1#ch.uncommitted_acks} end}; @@ -692,11 +708,11 @@ handle_method(#'basic.get'{queue = QueueNameBin, State1 = lock_message(not(NoAck), ack_record(DeliveryTag, none, Msg), State), - State2 = maybe_incr_stats([{QPid, 1}], case NoAck of - true -> get_no_ack; - false -> get - end, State1), - State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2), + maybe_incr_stats([{QPid, 1}], case NoAck of + true -> get_no_ack; + false -> get + end, State1), + maybe_incr_redeliver_stats(Redelivered, QPid, State1), rabbit_trace:tap_trace_out(Msg, TraceState), ok = rabbit_writer:send_command( WriterPid, @@ -706,7 +722,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, message_count = MessageCount}, Content), - {noreply, State3#ch{next_tag = DeliveryTag + 1}}; + {noreply, State1#ch{next_tag = DeliveryTag + 1}}; empty -> {reply, #'basic.get_empty'{}, State} end; @@ -783,9 +799,8 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, false -> dict:store(QPid, CTags1, QCons) end end, - NewState = demonitor_queue( - Q, State#ch{consumer_mapping = ConsumerMapping1, - queue_consumers = QCons1}), + NewState = State#ch{consumer_mapping = ConsumerMapping1, + queue_consumers = QCons1}, %% In order to ensure that no more messages are sent to %% the consumer after the cancel_ok has been sent, we get %% the queue process to send the cancel_ok on our @@ -1066,9 +1081,9 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, uncommitted_acks = TAL}) -> - State1 = new_tx(ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2, - State, TMQ))), - {noreply, maybe_complete_tx(State1#ch{tx_status = committing})}; + State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ), + ack(TAL, State1), + {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))}; handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> rabbit_misc:protocol_error( @@ -1107,10 +1122,7 @@ handle_method(#'channel.flow'{active = false}, _, ok = rabbit_limiter:block(Limiter1), case consumer_queues(Consumers) of [] -> {reply, #'channel.flow_ok'{active = false}, State1}; - QPids -> State2 = lists:foldl(fun monitor_queue/2, - State1#ch{blocking = - sets:from_list(QPids)}, - QPids), + QPids -> State2 = State1#ch{blocking = sets:from_list(QPids)}, ok = rabbit_amqqueue:flush_all(QPids, self()), {noreply, State2} end; @@ -1141,31 +1153,12 @@ consumer_monitor(ConsumerTag, end. monitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> - case (not dict:is_key(QPid, QMons) andalso - queue_monitor_needed(QPid, State)) of - true -> MRef = erlang:monitor(process, QPid), - State#ch{queue_monitors = dict:store(QPid, MRef, QMons)}; - false -> State - end. - -demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> - case (dict:is_key(QPid, QMons) andalso - not queue_monitor_needed(QPid, State)) of - true -> true = erlang:demonitor(dict:fetch(QPid, QMons)), - State#ch{queue_monitors = dict:erase(QPid, QMons)}; + case not sets:is_element(QPid, QMons) of + true -> erlang:monitor(process, QPid), + State#ch{queue_monitors = sets:add_element(QPid, QMons)}; false -> State end. -queue_monitor_needed(QPid, #ch{queue_consumers = QCons, - blocking = Blocking, - unconfirmed_qm = UQM} = State) -> - StatsEnabled = rabbit_event:stats_level( - State, #ch.stats_timer) =:= fine, - ConsumerMonitored = dict:is_key(QPid, QCons), - QueueBlocked = sets:is_element(QPid, Blocking), - ConfirmMonitored = gb_trees:is_defined(QPid, UQM), - StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored. - handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> MsgSeqNos = case gb_trees:lookup(QPid, UQM) of {value, MsgSet} -> gb_sets:to_list(MsgSet); @@ -1359,22 +1352,24 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ msg_seq_no = MsgSeqNo}, QNames}, State) -> {RoutingRes, DeliveredQPids} = - rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(QNames), Delivery), - State1 = process_routing_result(RoutingRes, DeliveredQPids, - XName, MsgSeqNo, Message, State), + rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery), + State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids), + State2 = process_routing_result(RoutingRes, DeliveredQPids, + XName, MsgSeqNo, Message, State1), maybe_incr_stats([{XName, 1} | [{{QPid, XName}, 1} || - QPid <- DeliveredQPids]], publish, State1). + QPid <- DeliveredQPids]], publish, State2), + State2. process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), - record_confirm(MsgSeqNo, XName, - maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], - return_unroutable, State)); + maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], + return_unroutable, State), + record_confirm(MsgSeqNo, XName, State); process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_consumers), - record_confirm(MsgSeqNo, XName, - maybe_incr_stats([{XName, 1}], return_not_delivered, State)); + maybe_incr_stats([{XName, 1}], return_not_delivered, State), + record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, _, _, undefined, _, State) -> @@ -1392,7 +1387,7 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> State0#ch{unconfirmed_qm = UQM1}; none -> UQM1 = gb_trees:insert(QPid, SingletonSet, UQM), - monitor_queue(QPid, State0#ch{unconfirmed_qm = UQM1}) + State0#ch{unconfirmed_qm = UQM1} end end, State#ch{unconfirmed_mq = UMQ1}, QPids). @@ -1416,13 +1411,12 @@ send_nacks(_, State) -> send_confirms(State = #ch{tx_status = none, confirmed = []}) -> State; send_confirms(State = #ch{tx_status = none, confirmed = C}) -> - {MsgSeqNos, State1} = - lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) -> - {[MsgSeqNo | MSNs], - maybe_incr_stats([{ExchangeName, 1}], confirm, - State0)} - end, {[], State}, lists:append(C)), - send_confirms(MsgSeqNos, State1 #ch{confirmed = []}); + MsgSeqNos = + lists:foldl(fun ({MsgSeqNo, XName}, MSNs) -> + maybe_incr_stats([{XName, 1}], confirm, State), + [MsgSeqNo | MSNs] + end, [], lists:append(C)), + send_confirms(MsgSeqNos, State#ch{confirmed = []}); send_confirms(State) -> maybe_complete_tx(State). @@ -1502,26 +1496,21 @@ i(Item, _) -> maybe_incr_redeliver_stats(true, QPid, State) -> maybe_incr_stats([{QPid, 1}], redeliver, State); -maybe_incr_redeliver_stats(_, _, State) -> - State. +maybe_incr_redeliver_stats(_, _, _State) -> + ok. maybe_incr_stats(QXIncs, Measure, State) -> case rabbit_event:stats_level(State, #ch.stats_timer) of - fine -> lists:foldl(fun ({QX, Inc}, State0) -> - incr_stats(QX, Inc, Measure, State0) - end, State, QXIncs); - _ -> State + fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs]; + _ -> ok end. -incr_stats({QPid, _} = QX, Inc, Measure, State) -> - update_measures(queue_exchange_stats, QX, Inc, Measure), - monitor_queue(QPid, State); -incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) -> - update_measures(queue_stats, QPid, Inc, Measure), - monitor_queue(QPid, State); -incr_stats(X, Inc, Measure, State) -> - update_measures(exchange_stats, X, Inc, Measure), - State. +incr_stats({_, _} = QX, Inc, Measure) -> + update_measures(queue_exchange_stats, QX, Inc, Measure); +incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> + update_measures(queue_stats, QPid, Inc, Measure); +incr_stats(X, Inc, Measure) -> + update_measures(exchange_stats, X, Inc, Measure). update_measures(Type, QX, Inc, Measure) -> Measures = case get({Type, QX}) of diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index a19b6bfdad..dc262b4948 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_channel_sup). diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index e2561c802e..995c41fbc2 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_channel_sup_sup). diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl index 4ba01b4f4d..c508f1b996 100644 --- a/src/rabbit_client_sup.erl +++ b/src/rabbit_client_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_client_sup). diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl index a0953eab95..adf6e41702 100644 --- a/src/rabbit_command_assembler.erl +++ b/src/rabbit_command_assembler.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_command_assembler). diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index b2aba2eeb0..12a532b6fc 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_connection_sup). diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 22b57b1ae9..d187a0b29d 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_control). diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 6f9a46504f..e2928cae7e 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_direct). diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 6e29ace71a..f1672f4e7b 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_error_logger). diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index 7b6e07c19f..042ab23cbc 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_error_logger_file_h). diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 5ae40c784a..4ec141cfd8 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_event). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index a15b9be490..83e28c44a8 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_exchange). @@ -355,11 +355,21 @@ peek_serial(XName) -> _ -> undefined end. +invalid_module(T) -> + rabbit_log:warning( + "Could not find exchange type ~s.~n", [T]), + put({xtype_to_module, T}, rabbit_exchange_type_invalid), + rabbit_exchange_type_invalid. + %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> case get({xtype_to_module, T}) of - undefined -> {ok, Module} = rabbit_registry:lookup_module(exchange, T), - put({xtype_to_module, T}, Module), - Module; - Module -> Module + undefined -> + case rabbit_registry:lookup_module(exchange, T) of + {ok, Module} -> put({xtype_to_module, T}, Module), + Module; + {error, not_found} -> invalid_module(T) + end; + Module -> + Module end. diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index ab3d00dc28..44a08e2437 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type). diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index b485e31f33..4bce42d4da 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type_direct). diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 3c02972278..cc3fb87c21 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type_fanout). diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index f09e4aae73..de9979b422 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type_headers). diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl new file mode 100644 index 0000000000..8f60f7d898 --- /dev/null +++ b/src/rabbit_exchange_type_invalid.erl @@ -0,0 +1,47 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_exchange_type_invalid). +-include("rabbit.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, serialise_events/0, route/2]). +-export([validate/1, create/2, delete/3, + add_binding/3, remove_bindings/3, assert_args_equivalence/2]). +-include("rabbit_exchange_type_spec.hrl"). + +description() -> + [{name, <<"invalid">>}, + {description, + <<"Dummy exchange type, to be used when the intended one is not found.">> + }]. + +serialise_events() -> false. + +route(#exchange{name = Name, type = Type}, _) -> + rabbit_misc:protocol_error( + precondition_failed, + "Cannot route message through ~s: exchange type ~s not found", + [rabbit_misc:rs(Name), Type]). + +validate(_X) -> ok. +create(_Tx, _X) -> ok. +delete(_Tx, _X, _Bs) -> ok. +add_binding(_Tx, _X, _B) -> ok. +remove_bindings(_Tx, _X, _Bs) -> ok. +assert_args_equivalence(X, Args) -> + rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 91c7b5d3ef..3ac6ae7438 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type_topic). diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl index 5cb8e7b69d..59df14f318 100644 --- a/src/rabbit_file.erl +++ b/src/rabbit_file.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. %% -module(rabbit_file). diff --git a/src/rabbit_framing.erl b/src/rabbit_framing.erl index da1a6a49ec..a79188abaf 100644 --- a/src/rabbit_framing.erl +++ b/src/rabbit_framing.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% %% TODO auto-generate diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 2d0f5014f2..70772ccd2c 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_guid). diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 177ae86859..80b4e768a3 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_heartbeat). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 8a08d4b673..9fa6213b94 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_limiter). diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index 558e095751..a6b4eeb05f 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_log). @@ -23,8 +23,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([debug/1, debug/2, message/4, info/1, info/2, - warning/1, warning/2, error/1, error/2]). +-export([log/3, log/4, info/1, info/2, warning/1, warning/2, error/1, error/2]). -define(SERVER, ?MODULE). @@ -32,9 +31,15 @@ -ifdef(use_specs). +-export_type([level/0]). + +-type(category() :: atom()). +-type(level() :: 'info' | 'warning' | 'error'). + -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(debug/1 :: (string()) -> 'ok'). --spec(debug/2 :: (string(), [any()]) -> 'ok'). + +-spec(log/3 :: (category(), level(), string()) -> 'ok'). +-spec(log/4 :: (category(), level(), string(), [any()]) -> 'ok'). -spec(info/1 :: (string()) -> 'ok'). -spec(info/2 :: (string(), [any()]) -> 'ok'). -spec(warning/1 :: (string()) -> 'ok'). @@ -42,84 +47,47 @@ -spec(error/1 :: (string()) -> 'ok'). -spec(error/2 :: (string(), [any()]) -> 'ok'). --spec(message/4 :: (_,_,_,_) -> 'ok'). - -endif. %%---------------------------------------------------------------------------- - start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +log(Category, Level, Fmt) -> log(Category, Level, Fmt, []). -debug(Fmt) -> - gen_server:cast(?SERVER, {debug, Fmt}). - -debug(Fmt, Args) when is_list(Args) -> - gen_server:cast(?SERVER, {debug, Fmt, Args}). - -message(Direction, Channel, MethodRecord, Content) -> - gen_server:cast(?SERVER, - {message, Direction, Channel, MethodRecord, Content}). +log(Category, Level, Fmt, Args) when is_list(Args) -> + gen_server:cast(?SERVER, {log, Category, Level, Fmt, Args}). -info(Fmt) -> - gen_server:cast(?SERVER, {info, Fmt}). - -info(Fmt, Args) when is_list(Args) -> - gen_server:cast(?SERVER, {info, Fmt, Args}). - -warning(Fmt) -> - gen_server:cast(?SERVER, {warning, Fmt}). - -warning(Fmt, Args) when is_list(Args) -> - gen_server:cast(?SERVER, {warning, Fmt, Args}). - -error(Fmt) -> - gen_server:cast(?SERVER, {error, Fmt}). - -error(Fmt, Args) when is_list(Args) -> - gen_server:cast(?SERVER, {error, Fmt, Args}). +info(Fmt) -> log(default, info, Fmt). +info(Fmt, Args) -> log(default, info, Fmt, Args). +warning(Fmt) -> log(default, warning, Fmt). +warning(Fmt, Args) -> log(default, warning, Fmt, Args). +error(Fmt) -> log(default, error, Fmt). +error(Fmt, Args) -> log(default, error, Fmt, Args). %%-------------------------------------------------------------------- -init([]) -> {ok, none}. +init([]) -> + {ok, CatLevelList} = application:get_env(log_levels), + CatLevels = [{Cat, level(Level)} || {Cat, Level} <- CatLevelList], + {ok, orddict:from_list(CatLevels)}. handle_call(_Request, _From, State) -> {noreply, State}. -handle_cast({debug, Fmt}, State) -> - io:format("debug:: "), io:format(Fmt), - error_logger:info_msg("debug:: " ++ Fmt), - {noreply, State}; -handle_cast({debug, Fmt, Args}, State) -> - io:format("debug:: "), io:format(Fmt, Args), - error_logger:info_msg("debug:: " ++ Fmt, Args), - {noreply, State}; -handle_cast({message, Direction, Channel, MethodRecord, Content}, State) -> - io:format("~s ch~p ~p~n", - [case Direction of - in -> "-->"; - out -> "<--" end, - Channel, - {MethodRecord, Content}]), - {noreply, State}; -handle_cast({info, Fmt}, State) -> - error_logger:info_msg(Fmt), - {noreply, State}; -handle_cast({info, Fmt, Args}, State) -> - error_logger:info_msg(Fmt, Args), - {noreply, State}; -handle_cast({warning, Fmt}, State) -> - error_logger:warning_msg(Fmt), - {noreply, State}; -handle_cast({warning, Fmt, Args}, State) -> - error_logger:warning_msg(Fmt, Args), - {noreply, State}; -handle_cast({error, Fmt}, State) -> - error_logger:error_msg(Fmt), - {noreply, State}; -handle_cast({error, Fmt, Args}, State) -> - error_logger:error_msg(Fmt, Args), - {noreply, State}; +handle_cast({log, Category, Level, Fmt, Args}, CatLevels) -> + CatLevel = case orddict:find(Category, CatLevels) of + {ok, L} -> L; + error -> level(info) + end, + case level(Level) =< CatLevel of + false -> ok; + true -> (case Level of + info -> fun error_logger:info_msg/2; + warning -> fun error_logger:warning_msg/2; + error -> fun error_logger:error_msg/2 + end)(Fmt, Args) + end, + {noreply, CatLevels}; handle_cast(_Msg, State) -> {noreply, State}. @@ -132,3 +100,9 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +%%-------------------------------------------------------------------- + +level(info) -> 3; +level(warning) -> 2; +level(error) -> 1; +level(none) -> 0. diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index c25a177b06..f22ad874e3 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 8ed2bede32..d0b5bab779 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. %% -module(rabbit_mirror_queue_coordinator). @@ -325,8 +325,7 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) -> true = link(GM), GM end, - {ok, _TRef} = - timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]), + ensure_gm_heartbeat(), {ok, #state { q = Q, gm = GM1, monitors = dict:new(), @@ -366,6 +365,11 @@ handle_cast({ensure_monitoring, Pids}, end, Monitors, Pids), noreply(State #state { monitors = Monitors1 }). +handle_info(send_gm_heartbeat, State = #state{gm = GM}) -> + gm:broadcast(GM, heartbeat), + ensure_gm_heartbeat(), + noreply(State); + handle_info({'DOWN', _MonitorRef, process, Pid, _Reason}, State = #state { monitors = Monitors, death_fun = DeathFun }) -> @@ -419,3 +423,6 @@ noreply(State) -> reply(Reply, State) -> {reply, Reply, State, hibernate}. + +ensure_gm_heartbeat() -> + erlang:send_after(?ONE_SECOND, self(), send_gm_heartbeat). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 4b54d82171..8d7b9ded8f 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. %% -module(rabbit_mirror_queue_master). @@ -281,8 +281,10 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }. -status(#state { backing_queue = BQ, backing_queue_state = BQS }) -> - BQ:status(BQS). +status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:status(BQS) ++ + [ {mirror_seen, dict:size(State #state.seen_status)}, + {mirror_senders, sets:size(State #state.known_senders)} ]. invoke(?MODULE, Fun, State) -> Fun(?MODULE, State); diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index baebc52b27..db7d8eccec 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. %% -module(rabbit_mirror_queue_misc). @@ -136,12 +136,16 @@ add_mirror(Queue, MirrorNode) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> Result = rabbit_mirror_queue_slave_sup:start_child( MirrorNode, [Q]), - rabbit_log:info( - "Adding mirror of queue ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, Result]), case Result of - {ok, _Pid} -> ok; - _ -> Result + {ok, undefined} -> %% Already running + ok; + {ok, _Pid} -> + rabbit_log:info( + "Adding mirror of ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, Result]), + ok; + _ -> + Result end; [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}} end diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index db1f056f4c..29a2e8bd71 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. %% -module(rabbit_mirror_queue_slave). @@ -90,7 +90,7 @@ }). start_link(Q) -> - gen_server2:start_link(?MODULE, [Q], []). + gen_server2:start_link(?MODULE, Q, []). set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). @@ -98,55 +98,61 @@ set_maximum_since_use(QPid, Age) -> info(QPid) -> gen_server2:call(QPid, info, infinity). -init([#amqqueue { name = QueueName } = Q]) -> - process_flag(trap_exit, true), %% amqqueue_process traps exits too. - {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), - receive {joined, GM} -> - ok - end, +init(#amqqueue { name = QueueName } = Q) -> Self = self(), Node = node(), - {ok, MPid} = - rabbit_misc:execute_mnesia_transaction( - fun () -> - [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = - mnesia:read({rabbit_queue, QueueName}), - %% ASSERTION - [] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node], - MPids1 = MPids ++ [Self], - ok = rabbit_amqqueue:store_queue( - Q1 #amqqueue { slave_pids = MPids1 }), - {ok, QPid} - end), - erlang:monitor(process, MPid), - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [Self]), - ok = rabbit_memory_monitor:register( - Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), - {ok, BQ} = application:get_env(backing_queue_module), - BQS = bq_init(BQ, Q, false), - State = #state { q = Q, - gm = GM, - master_pid = MPid, - backing_queue = BQ, - backing_queue_state = BQS, - rate_timer_ref = undefined, - sync_timer_ref = undefined, - - sender_queues = dict:new(), - msg_id_ack = dict:new(), - ack_num = 0, - - msg_id_status = dict:new(), - known_senders = dict:new(), - - synchronised = false + case rabbit_misc:execute_mnesia_transaction( + fun () -> + [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = + mnesia:read({rabbit_queue, QueueName}), + case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of + [] -> MPids1 = MPids ++ [Self], + ok = rabbit_amqqueue:store_queue( + Q1 #amqqueue { slave_pids = MPids1 }), + {new, QPid}; + [SPid] -> true = rabbit_misc:is_process_alive(SPid), + existing + end + end) of + {new, MPid} -> + process_flag(trap_exit, true), %% amqqueue_process traps exits too. + {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), + receive {joined, GM} -> + ok + end, + erlang:monitor(process, MPid), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [Self]), + ok = rabbit_memory_monitor:register( + Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), + {ok, BQ} = application:get_env(backing_queue_module), + BQS = bq_init(BQ, Q, false), + State = #state { q = Q, + gm = GM, + master_pid = MPid, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = undefined, + sync_timer_ref = undefined, + + sender_queues = dict:new(), + msg_id_ack = dict:new(), + ack_num = 0, + + msg_id_status = dict:new(), + known_senders = dict:new(), + + synchronised = false }, - rabbit_event:notify(queue_slave_created, - infos(?CREATION_EVENT_KEYS, State)), - ok = gm:broadcast(GM, request_length), - {ok, State, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + rabbit_event:notify(queue_slave_created, + infos(?CREATION_EVENT_KEYS, State)), + ok = gm:broadcast(GM, request_length), + {ok, State, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, + ?DESIRED_HIBERNATE}}; + existing -> + ignore + end. handle_call({deliver, Delivery = #delivery { immediate = true }}, From, State) -> @@ -207,8 +213,12 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({gm, Instruction}, State) -> handle_process_result(process_instruction(Instruction, State)); -handle_cast({deliver, Delivery = #delivery {}}, State) -> +handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + case Flow of + flow -> credit_flow:ack(Sender); + noflow -> ok + end, noreply(maybe_enqueue_message(Delivery, true, State)); handle_cast({set_maximum_since_use, Age}, State) -> @@ -249,6 +259,10 @@ handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) -> handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; +handle_info({bump_credit, Msg}, State) -> + credit_flow:handle_bump_msg(Msg), + noreply(State); + handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. @@ -446,7 +460,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% Everything that we're monitoring, we need to ensure our new %% coordinator is monitoring. - MonitoringPids = [begin true = erlang:demonitor(MRef), + MonitoringPids = [begin put({ch_publisher, Pid}, MRef), Pid end || {Pid, MRef} <- dict:to_list(KS)], ok = rabbit_mirror_queue_coordinator:ensure_monitoring( @@ -600,7 +614,8 @@ ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> local_sender_death(ChPid, State = #state { known_senders = KS }) -> ok = case dict:is_key(ChPid, KS) of false -> ok; - true -> confirm_sender_death(ChPid) + true -> credit_flow:peer_down(ChPid), + confirm_sender_death(ChPid) end, State. diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl index fc04ec79ca..8eacb1f3c4 100644 --- a/src/rabbit_mirror_queue_slave_sup.erl +++ b/src/rabbit_mirror_queue_slave_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. %% -module(rabbit_mirror_queue_slave_sup). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 2676ddc63b..cccc09ac94 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_misc). @@ -39,7 +39,7 @@ -export([upmap/2, map_in_order/2]). -export([table_filter/3]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). --export([format_stderr/2, with_local_io/1, local_info_msg/2]). +-export([format/2, format_stderr/2, with_local_io/1, local_info_msg/2]). -export([start_applications/1, stop_applications/1]). -export([unfold/2, ceil/1, queue_fold/3]). -export([sort_field_table/1]). @@ -158,6 +158,7 @@ -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). -spec(dirty_dump_log/1 :: (file:filename()) -> ok_or_error()). +-spec(format/2 :: (string(), [any()]) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(with_local_io/1 :: (fun (() -> A)) -> A). -spec(local_info_msg/2 :: (string(), [any()]) -> 'ok'). @@ -225,7 +226,7 @@ frame_error(MethodName, BinaryFields) -> protocol_error(frame_error, "cannot decode ~w", [BinaryFields], MethodName). amqp_error(Name, ExplanationFormat, Params, Method) -> - Explanation = lists:flatten(io_lib:format(ExplanationFormat, Params)), + Explanation = format(ExplanationFormat, Params), #amqp_error{name = Name, explanation = Explanation, method = Method}. protocol_error(Name, ExplanationFormat, Params) -> @@ -279,8 +280,7 @@ val({Type, Value}) -> true -> "~s"; false -> "~w" end, - lists:flatten(io_lib:format("the value '" ++ ValFmt ++ "' of type '~s'", - [Value, Type])). + format("the value '" ++ ValFmt ++ "' of type '~s'", [Value, Type]). %% Normally we'd call mnesia:dirty_read/1 here, but that is quite %% expensive due to general mnesia overheads (figuring out table types @@ -329,8 +329,7 @@ r_arg(VHostPath, Kind, Table, Key) -> end. rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) -> - lists:flatten(io_lib:format("~s '~s' in vhost '~s'", - [Kind, Name, VHostPath])). + format("~s '~s' in vhost '~s'", [Kind, Name, VHostPath]). enable_cover() -> enable_cover(["."]). @@ -431,7 +430,11 @@ execute_mnesia_transaction(TxFun) -> %% elsewhere and get a consistent result even when that read %% executes on a different node. case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of - {atomic, Result} -> Result; + {atomic, Result} -> case mnesia:is_transaction() of + true -> ok; + false -> mnesia_sync:sync() + end, + Result; {aborted, Reason} -> throw({error, Reason}) end. @@ -482,9 +485,7 @@ cookie_hash() -> tcp_name(Prefix, IPAddress, Port) when is_atom(Prefix) andalso is_number(Port) -> list_to_atom( - lists:flatten( - io_lib:format("~w_~s:~w", - [Prefix, inet_parse:ntoa(IPAddress), Port]))). + format("~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port])). %% This is a modified version of Luke Gorrie's pmap - %% http://lukego.livejournal.com/6753.html - that doesn't care about @@ -553,6 +554,8 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) -> io:format("Bad Chunk, ~p: ~p~n", [BadBytes, Terms]), dirty_dump_log1(LH, disk_log:chunk(LH, K)). +format(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)). + format_stderr(Fmt, Args) -> case os:type() of {unix, _} -> @@ -648,7 +651,7 @@ pid_to_string(Pid) when is_pid(Pid) -> <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>> = term_to_binary(Pid), Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), - lists:flatten(io_lib:format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser])). + format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser]). %% inverse of above string_to_pid(Str) -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index bf997a6f8b..30b5478e30 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% @@ -98,8 +98,8 @@ status() -> init() -> ensure_mnesia_running(), ensure_mnesia_dir(), - ok = init_db(read_cluster_nodes_config(), true, - fun maybe_upgrade_local_or_record_desired/0), + Nodes = read_cluster_nodes_config(), + ok = init_db(Nodes, should_be_disc_node(Nodes)), %% We intuitively expect the global name server to be synced when %% Mnesia is up. In fact that's not guaranteed to be the case - let's %% make it so. @@ -174,8 +174,7 @@ cluster(ClusterNodes, Force) -> %% Join the cluster start_mnesia(), try - ok = init_db(ClusterNodes, Force, - fun maybe_upgrade_local_or_record_desired/0), + ok = init_db(ClusterNodes, Force), ok = create_cluster_nodes_config(ClusterNodes) after stop_mnesia() @@ -501,6 +500,18 @@ delete_previously_running_nodes() -> FileName, Reason}}) end. +init_db(ClusterNodes, Force) -> + init_db( + ClusterNodes, Force, + fun () -> + case rabbit_upgrade:maybe_upgrade_local() of + ok -> ok; + %% If we're just starting up a new node we won't have a + %% version + version_not_available -> ok = rabbit_version:record_desired() + end + end). + %% Take a cluster node config and create the right kind of node - a %% standalone disk node, or disk or ram node connected to the %% specified cluster nodes. If Force is false, don't allow @@ -509,20 +520,12 @@ init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) -> UClusterNodes = lists:usort(ClusterNodes), ProperClusterNodes = UClusterNodes -- [node()], case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of + {ok, []} when not Force andalso ProperClusterNodes =/= [] -> + throw({error, {failed_to_cluster_with, ProperClusterNodes, + "Mnesia could not connect to any disc nodes."}}); {ok, Nodes} -> - case Force of - false -> FailedClusterNodes = ProperClusterNodes -- Nodes, - case FailedClusterNodes of - [] -> ok; - _ -> throw({error, {failed_to_cluster_with, - FailedClusterNodes, - "Mnesia could not connect " - "to some nodes."}}) - end; - true -> ok - end, - WantDiscNode = should_be_disc_node(ClusterNodes), WasDiscNode = is_disc_node(), + WantDiscNode = should_be_disc_node(ClusterNodes), %% We create a new db (on disk, or in ram) in the first %% two cases and attempt to upgrade the in the other two case {Nodes, WasDiscNode, WantDiscNode} of @@ -572,14 +575,6 @@ init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) -> throw({error, {unable_to_join_cluster, ClusterNodes, Reason}}) end. -maybe_upgrade_local_or_record_desired() -> - case rabbit_upgrade:maybe_upgrade_local() of - ok -> ok; - %% If we're just starting up a new node we won't have a - %% version - version_not_available -> ok = rabbit_version:record_desired() - end. - schema_ok_or_move() -> case check_schema_integrity() of ok -> @@ -627,10 +622,9 @@ move_db() -> stop_mnesia(), MnesiaDir = filename:dirname(dir() ++ "/"), {{Year, Month, Day}, {Hour, Minute, Second}} = erlang:universaltime(), - BackupDir = lists:flatten( - io_lib:format("~s_~w~2..0w~2..0w~2..0w~2..0w~2..0w", - [MnesiaDir, - Year, Month, Day, Hour, Minute, Second])), + BackupDir = rabbit_misc:format( + "~s_~w~2..0w~2..0w~2..0w~2..0w~2..0w", + [MnesiaDir, Year, Month, Day, Hour, Minute, Second]), case file:rename(MnesiaDir, BackupDir) of ok -> %% NB: we cannot use rabbit_log here since it may not have @@ -745,7 +739,9 @@ reset(Force) -> start_mnesia(), {Nodes, RunningNodes} = try - ok = init(), + %% Force=true here so that reset still works when clustered + %% with a node which is down + ok = init_db(read_cluster_nodes_config(), true), {all_clustered_nodes() -- [Node], running_clustered_nodes() -- [Node]} after diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index b7de27d4b5..f685b109a9 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_msg_file). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index e6a32b9023..56265136dd 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_msg_store). @@ -21,7 +21,7 @@ -export([start_link/4, successfully_recovered_state/1, client_init/4, client_terminate/1, client_delete_and_terminate/1, client_ref/1, close_all_indicated/1, - write/3, read/2, contains/2, remove/2]). + write/3, write_flow/3, read/2, contains/2, remove/2]). -export([set_maximum_since_use/2, has_readers/2, combine_files/3, delete_file/2]). %% internal @@ -152,6 +152,7 @@ -spec(close_all_indicated/1 :: (client_msstate()) -> rabbit_types:ok(client_msstate())). -spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'). +-spec(write_flow/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'). -spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) -> {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). -spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()). @@ -436,7 +437,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} = gen_server2:call( - Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity), + Server, {new_client_state, Ref, self(), MsgOnDiskFun, CloseFDsFun}, + infinity), #client_msstate { server = Server, client_ref = Ref, file_handle_cache = dict:new(), @@ -460,12 +462,11 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> client_ref(#client_msstate { client_ref = Ref }) -> Ref. -write(MsgId, Msg, - CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, - client_ref = CRef }) -> - ok = client_update_flying(+1, MsgId, CState), - ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), - ok = server_cast(CState, {write, CRef, MsgId}). +write_flow(MsgId, Msg, CState = #client_msstate { server = Server }) -> + credit_flow:send(whereis(Server), ?CREDIT_DISC_BOUND), + client_write(MsgId, Msg, flow, CState). + +write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState). read(MsgId, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> @@ -500,6 +501,13 @@ server_call(#client_msstate { server = Server }, Msg) -> server_cast(#client_msstate { server = Server }, Msg) -> gen_server2:cast(Server, Msg). +client_write(MsgId, Msg, Flow, + CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, + client_ref = CRef }) -> + ok = client_update_flying(+1, MsgId, CState), + ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), + ok = server_cast(CState, {write, CRef, MsgId, Flow}). + client_read1(#msg_location { msg_id = MsgId, file = File } = MsgLocation, Defer, CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> case ets:lookup(FileSummaryEts, File) of @@ -666,7 +674,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> recover_index_and_client_refs(IndexModule, FileSummaryRecovered, ClientRefs, Dir, Server), Clients = dict:from_list( - [{CRef, {undefined, undefined}} || CRef <- ClientRefs1]), + [{CRef, {undefined, undefined, undefined}} || + CRef <- ClientRefs1]), %% CleanShutdown => msg location index and file_summary both %% recovered correctly. true = case {FileSummaryRecovered, CleanShutdown} of @@ -731,10 +740,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> prioritise_call(Msg, _From, _State) -> case Msg of - successfully_recovered_state -> 7; - {new_client_state, _Ref, _MODC, _CloseFDsFun} -> 7; - {read, _MsgId} -> 2; - _ -> 0 + successfully_recovered_state -> 7; + {new_client_state, _Ref, _Pid, _MODC, _CloseFDsFun} -> 7; + {read, _MsgId} -> 2; + _ -> 0 end. prioritise_cast(Msg, _State) -> @@ -755,7 +764,7 @@ prioritise_info(Msg, _State) -> handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); -handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From, +handle_call({new_client_state, CRef, CPid, MsgOnDiskFun, CloseFDsFun}, _From, State = #msstate { dir = Dir, index_state = IndexState, index_module = IndexModule, @@ -765,7 +774,7 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From, flying_ets = FlyingEts, clients = Clients, gc_pid = GCPid }) -> - Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients), + Clients1 = dict:store(CRef, {CPid, MsgOnDiskFun, CloseFDsFun}, Clients), reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts}, State #msstate { clients = Clients1 }); @@ -789,11 +798,19 @@ handle_cast({client_dying, CRef}, handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> + {CPid, _, _} = dict:fetch(CRef, Clients), + credit_flow:peer_down(CPid), State1 = State #msstate { clients = dict:erase(CRef, Clients) }, noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); -handle_cast({write, CRef, MsgId}, - State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> +handle_cast({write, CRef, MsgId, Flow}, + State = #msstate { cur_file_cache_ets = CurFileCacheEts, + clients = Clients }) -> + case Flow of + flow -> {CPid, _, _} = dict:fetch(CRef, Clients), + credit_flow:ack(CPid, ?CREDIT_DISC_BOUND); + noflow -> ok + end, true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}), case update_flying(-1, MsgId, CRef, State) of process -> @@ -1204,10 +1221,10 @@ update_pending_confirms(Fun, CRef, State = #msstate { clients = Clients, cref_to_msg_ids = CTM }) -> case dict:fetch(CRef, Clients) of - {undefined, _CloseFDsFun} -> State; - {MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM), - State #msstate { - cref_to_msg_ids = CTM1 } + {_CPid, undefined, _CloseFDsFun} -> State; + {_CPid, MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM), + State #msstate { + cref_to_msg_ids = CTM1 } end. record_pending_confirm(CRef, MsgId, State) -> @@ -1294,8 +1311,10 @@ mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) -> case (ets:update_element(FileHandlesEts, Key, {2, close}) andalso Invoke) of true -> case dict:fetch(Ref, ClientRefs) of - {_MsgOnDiskFun, undefined} -> ok; - {_MsgOnDiskFun, CloseFDsFun} -> ok = CloseFDsFun() + {_CPid, _MsgOnDiskFun, undefined} -> + ok; + {_CPid, _MsgOnDiskFun, CloseFDsFun} -> + ok = CloseFDsFun() end; false -> ok end diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl index d6dc556878..9c31439f51 100644 --- a/src/rabbit_msg_store_ets_index.erl +++ b/src/rabbit_msg_store_ets_index.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_msg_store_ets_index). diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 77f1f04ee2..3b61ed0bd7 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_msg_store_gc). diff --git a/src/rabbit_msg_store_index.erl b/src/rabbit_msg_store_index.erl index ef8b7cdfed..2f36256c61 100644 --- a/src/rabbit_msg_store_index.erl +++ b/src/rabbit_msg_store_index.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_msg_store_index). diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index b944ec81a1..02889b93a3 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_net). @@ -19,7 +19,7 @@ -export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2, recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1, - sockname/1, peername/1, peercert/1]). + sockname/1, peername/1, peercert/1, connection_string/2]). %%--------------------------------------------------------------------------- @@ -62,6 +62,8 @@ -spec(peercert/1 :: (socket()) -> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())). +-spec(connection_string/2 :: + (socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())). -endif. @@ -141,3 +143,19 @@ peername(Sock) when is_port(Sock) -> inet:peername(Sock). peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl); peercert(Sock) when is_port(Sock) -> nossl. + +connection_string(Sock, Direction) -> + {From, To} = case Direction of + inbound -> {fun peername/1, fun sockname/1}; + outbound -> {fun sockname/1, fun peername/1} + end, + case {From(Sock), To(Sock)} of + {{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} -> + {ok, rabbit_misc:format("~s:~p -> ~s:~p", + [rabbit_misc:ntoab(FromAddress), FromPort, + rabbit_misc:ntoab(ToAddress), ToPort])}; + {{error, _Reason} = Error, _} -> + Error; + {_, {error, _Reason} = Error} -> + Error + end. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index e81f8134f8..825d1bb1e0 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_networking). @@ -164,8 +164,6 @@ ssl_transform_fun(SslOpts) -> fun (Sock) -> case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of {ok, SslSock} -> - rabbit_log:info("upgraded TCP connection ~p to SSL~n", - [self()]), {ok, #ssl_socket{tcp = Sock, ssl = SslSock}}; {error, Reason} -> {error, {ssl_upgrade_error, Reason}}; @@ -220,7 +218,12 @@ start_listener(Listener, Protocol, Label, OnConnect) -> start_listener0(Address, Protocol, Label, OnConnect) -> Spec = tcp_listener_spec(rabbit_tcp_listener_sup, Address, tcp_opts(), Protocol, Label, OnConnect), - {ok,_} = supervisor:start_child(rabbit_sup, Spec). + case supervisor:start_child(rabbit_sup, Spec) of + {ok, _} -> ok; + {error, {shutdown, _}} -> {IPAddress, Port, _Family} = Address, + exit({could_not_start_tcp_listener, + {rabbit_misc:ntoa(IPAddress), Port}}) + end. stop_tcp_listener(Listener) -> [stop_tcp_listener0(Address) || @@ -266,6 +269,16 @@ start_client(Sock, SockTransform) -> {ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []), ok = rabbit_net:controlling_process(Sock, Reader), Reader ! {go, Sock, SockTransform}, + + %% In the event that somebody floods us with connections, the + %% reader processes can spew log events at error_logger faster + %% than it can keep up, causing its mailbox to grow unbounded + %% until we eat all the memory available and crash. So here is a + %% meaningless synchronous call to the underlying gen_event + %% mechanism. When it returns the mailbox is drained, and we + %% return to our caller to accept more connetions. + gen_event:which_handlers(error_logger), + Reader. start_client(Sock) -> diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 8aa24ab53e..4fc91860cd 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_node_monitor). diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index de2ba8adbb..7b85ab15fc 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. %% -module(rabbit_plugins). diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 50444dc49d..11b4d776a3 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_prelaunch). diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index 9b45e79869..df957d883c 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_queue_collector). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index f03c1d1c76..4c8793f1ad 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_queue_index). @@ -491,7 +491,7 @@ recover_message(false, _, no_del, RelSeq, Segment) -> queue_name_to_dir_name(Name = #resource { kind = queue }) -> <<Num:128>> = erlang:md5(term_to_binary(Name)), - lists:flatten(io_lib:format("~.36B", [Num])). + rabbit_misc:format("~.36B", [Num]). queues_dir() -> filename:join(rabbit_mnesia:dir(), "queues"). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index fce61129fa..1602cc2b83 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_reader). @@ -27,8 +27,6 @@ -export([conserve_memory/2, server_properties/1]). --export([process_channel_frame/5]). %% used by erlang-client - -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 30). @@ -40,10 +38,12 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, - auth_mechanism, auth_state}). + auth_mechanism, auth_state, conserve_memory, + last_blocked_by, last_blocked_at}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, - send_pend, state, channels]). + send_pend, state, last_blocked_by, last_blocked_age, + channels]). -define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl, peer_cert_subject, peer_cert_issuer, @@ -90,10 +90,6 @@ -spec(system_continue/3 :: (_,_,#v1{}) -> any()). -spec(system_terminate/4 :: (_,_,_,_) -> none()). --spec(process_channel_frame/5 :: - (rabbit_command_assembler:frame(), pid(), non_neg_integer(), pid(), - tuple()) -> tuple()). - -endif. %%-------------------------------------------------------------------------- @@ -177,25 +173,26 @@ server_capabilities(rabbit_framing_amqp_0_9_1) -> server_capabilities(_) -> []. +log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args). + inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). socket_op(Sock, Fun) -> case Fun(Sock) of {ok, Res} -> Res; - {error, Reason} -> rabbit_log:error("error on TCP connection ~p:~p~n", - [self(), Reason]), - rabbit_log:info("closing TCP connection ~p~n", - [self()]), + {error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n", + [self(), Reason]), exit(normal) end. start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), - {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1), - PeerAddressS = rabbit_misc:ntoab(PeerAddress), - rabbit_log:info("starting TCP connection ~p from ~s:~p~n", - [self(), PeerAddressS, PeerPort]), + ConnStr = socket_op(Sock, fun (Sock0) -> + rabbit_net:connection_string( + Sock0, inbound) + end), + log(info, "accepting AMQP connection ~p (~s)~n", [self(), ConnStr]), ClientSock = socket_op(Sock, SockTransform), erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), @@ -220,21 +217,22 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, buf = [], buf_len = 0, auth_mechanism = none, - auth_state = none}, + auth_state = none, + conserve_memory = false, + last_blocked_by = none, + last_blocked_at = never}, try recvloop(Deb, switch_callback(rabbit_event:init_stats_timer( State, #v1.stats_timer), - handshake, 8)) + handshake, 8)), + log(info, "closing AMQP connection ~p (~s)~n", [self(), ConnStr]) catch - Ex -> (if Ex == connection_closed_abruptly -> - fun rabbit_log:warning/2; - true -> - fun rabbit_log:error/2 - end)("exception on TCP connection ~p from ~s:~p~n~p~n", - [self(), PeerAddressS, PeerPort, Ex]) + Ex -> log(case Ex of + connection_closed_abruptly -> warning; + _ -> error + end, "closing AMQP connection ~p (~s):~n~p~n", + [self(), ConnStr, Ex]) after - rabbit_log:info("closing TCP connection ~p from ~s:~p~n", - [self(), PeerAddressS, PeerPort]), %% We don't close the socket explicitly. The reader is the %% controlling process and hence its termination will close %% the socket. Furthermore, gen_tcp:close/1 waits for pending @@ -267,21 +265,20 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> {data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf], buf_len = BufLen + size(Data), pending_recv = false}); - closed -> if State#v1.connection_state =:= closed -> - State; - true -> - throw(connection_closed_abruptly) + closed -> case State#v1.connection_state of + closed -> State; + _ -> throw(connection_closed_abruptly) end; {error, Reason} -> throw({inet_error, Reason}); {other, Other} -> handle_other(Other, Deb, State) end. handle_other({conserve_memory, Conserve}, Deb, State) -> - recvloop(Deb, internal_conserve_memory(Conserve, State)); + recvloop(Deb, control_throttle(State#v1{conserve_memory = Conserve})); handle_other({channel_closing, ChPid}, Deb, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), - mainloop(Deb, maybe_close(State)); + mainloop(Deb, maybe_close(control_throttle(State))); handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), @@ -341,6 +338,9 @@ handle_other(emit_stats, Deb, State) -> mainloop(Deb, emit_stats(State)); handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); +handle_other({bump_credit, Msg}, Deb, State) -> + credit_flow:handle_bump_msg(Msg), + recvloop(Deb, control_throttle(State)); handle_other(Other, _Deb, _State) -> %% internal error -> something worth dying for exit({unexpected_message, Other}). @@ -355,17 +355,30 @@ terminate(Explanation, State) when ?IS_RUNNING(State) -> terminate(_Explanation, State) -> {force, State}. -internal_conserve_memory(true, State = #v1{connection_state = running}) -> - State#v1{connection_state = blocking}; -internal_conserve_memory(false, State = #v1{connection_state = blocking}) -> - State#v1{connection_state = running}; -internal_conserve_memory(false, State = #v1{connection_state = blocked, - heartbeater = Heartbeater}) -> - ok = rabbit_heartbeat:resume_monitor(Heartbeater), - State#v1{connection_state = running}; -internal_conserve_memory(_Conserve, State) -> +control_throttle(State = #v1{connection_state = CS, + conserve_memory = Mem}) -> + case {CS, Mem orelse credit_flow:blocked()} of + {running, true} -> State#v1{connection_state = blocking}; + {blocking, false} -> State#v1{connection_state = running}; + {blocked, false} -> ok = rabbit_heartbeat:resume_monitor( + State#v1.heartbeater), + State#v1{connection_state = running}; + {blocked, true} -> update_last_blocked_by(State); + {_, _} -> State + end. + +maybe_block(State = #v1{connection_state = blocking}) -> + ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater), + update_last_blocked_by(State#v1{connection_state = blocked, + last_blocked_at = erlang:now()}); +maybe_block(State) -> State. +update_last_blocked_by(State = #v1{conserve_memory = true}) -> + State#v1{last_blocked_by = mem}; +update_last_blocked_by(State = #v1{conserve_memory = false}) -> + State#v1{last_blocked_by = flow}. + close_connection(State = #v1{queue_collector = Collector, connection = #connection{ timeout_sec = TimeoutSec}}) -> @@ -387,17 +400,19 @@ handle_dependent_exit(ChPid, Reason, State) -> {undefined, uncontrolled} -> exit({abnormal_dependent_exit, ChPid, Reason}); {_Channel, controlled} -> - maybe_close(State); + maybe_close(control_throttle(State)); {Channel, uncontrolled} -> - rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", - [self(), Channel, Reason]), - maybe_close(handle_exception(State, Channel, Reason)) + log(error, "AMQP connection ~p, channel ~p - error:~n~p~n", + [self(), Channel, Reason]), + maybe_close(handle_exception(control_throttle(State), + Channel, Reason)) end. channel_cleanup(ChPid) -> case get({ch_pid, ChPid}) of undefined -> undefined; - {Channel, MRef} -> erase({channel, Channel}), + {Channel, MRef} -> credit_flow:peer_down(ChPid), + erase({channel, Channel}), erase({ch_pid, ChPid}), erlang:demonitor(MRef, [flush]), Channel @@ -432,9 +447,10 @@ wait_for_channel_termination(N, TimerRef) -> {_Channel, controlled} -> wait_for_channel_termination(N-1, TimerRef); {Channel, uncontrolled} -> - rabbit_log:error("connection ~p, channel ~p - " - "error while terminating:~n~p~n", - [self(), Channel, Reason]), + log(error, + "AMQP connection ~p, channel ~p - " + "error while terminating:~n~p~n", + [self(), Channel, Reason]), wait_for_channel_termination(N-1, TimerRef) end; cancel_wait -> @@ -485,12 +501,12 @@ handle_frame(Type, Channel, Payload, heartbeat -> throw({unexpected_heartbeat_frame, Channel}); AnalyzedFrame -> case get({channel, Channel}) of - {ChPid, FramingState} -> + {ChPid, AState} -> NewAState = process_channel_frame( - AnalyzedFrame, self(), - Channel, ChPid, FramingState), + AnalyzedFrame, Channel, ChPid, AState), put({channel, Channel}, {ChPid, NewAState}), - post_process_frame(AnalyzedFrame, ChPid, State); + post_process_frame(AnalyzedFrame, ChPid, + control_throttle(State)); undefined -> case ?IS_RUNNING(State) of true -> send_to_new_channel( @@ -504,18 +520,13 @@ handle_frame(Type, Channel, Payload, post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> channel_cleanup(ChPid), - State; + control_throttle(State); post_process_frame({method, MethodName, _}, _ChPid, State = #v1{connection = #connection{ protocol = Protocol}}) -> case Protocol:method_has_content(MethodName) of true -> erlang:bump_reductions(2000), - case State#v1.connection_state of - blocking -> ok = rabbit_heartbeat:pause_monitor( - State#v1.heartbeater), - State#v1{connection_state = blocked}; - _ -> State - end; + maybe_block(State); false -> State end; post_process_frame(_Frame, _ChPid, State) -> @@ -687,10 +698,11 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), - State1 = internal_conserve_memory( - rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + State1 = control_throttle( State#v1{connection_state = running, - connection = NewConnection}), + connection = NewConnection, + conserve_memory = Conserve}), rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), @@ -822,6 +834,12 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct; fun ([{_, I}]) -> I end); i(state, #v1{connection_state = S}) -> S; +i(last_blocked_by, #v1{last_blocked_by = By}) -> + By; +i(last_blocked_age, #v1{last_blocked_at = never}) -> + infinity; +i(last_blocked_age, #v1{last_blocked_at = T}) -> + timer:now_diff(erlang:now(), T) / 1000000; i(channels, #v1{}) -> length(all_channels()); i(protocol, #v1{connection = #connection{protocol = none}}) -> @@ -890,21 +908,20 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User, VHost, Capabilities, Collector}), MRef = erlang:monitor(process, ChPid), - NewAState = process_channel_frame(AnalyzedFrame, self(), - Channel, ChPid, AState), + NewAState = process_channel_frame(AnalyzedFrame, Channel, ChPid, AState), put({channel, Channel}, {ChPid, NewAState}), put({ch_pid, ChPid}, {Channel, MRef}), State. -process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) -> +process_channel_frame(Frame, Channel, ChPid, AState) -> case rabbit_command_assembler:process(Frame, AState) of {ok, NewAState} -> NewAState; {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), NewAState; - {ok, Method, Content, NewAState} -> rabbit_channel:do(ChPid, - Method, Content), + {ok, Method, Content, NewAState} -> rabbit_channel:do_flow( + ChPid, Method, Content), NewAState; - {error, Reason} -> ErrPid ! {channel_exit, Channel, + {error, Reason} -> self() ! {channel_exit, Channel, Reason}, AState end. diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index 9821ae7b86..8c0ebcbe94 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_registry). diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl index 1a08efed41..237ab78cac 100644 --- a/src/rabbit_restartable_sup.erl +++ b/src/rabbit_restartable_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_restartable_sup). diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 219833b726..f4bbda0f7a 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_router). diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl index 963294d924..e8beecfe0a 100644 --- a/src/rabbit_sasl_report_file_h.erl +++ b/src/rabbit_sasl_report_file_h.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_sasl_report_file_h). diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index e524446ea8..3025d981d4 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_ssl). @@ -72,9 +72,8 @@ peer_cert_validity(Cert) -> cert_info(fun(#'OTPCertificate' { tbsCertificate = #'OTPTBSCertificate' { validity = {'Validity', Start, End} }}) -> - lists:flatten( - io_lib:format("~s - ~s", [format_asn1_value(Start), - format_asn1_value(End)])) + rabbit_misc:format("~s - ~s", [format_asn1_value(Start), + format_asn1_value(End)]) end, Cert). %%-------------------------------------------------------------------------- @@ -155,8 +154,7 @@ escape_rdn_value([C | S], middle) when C < 32 ; C >= 126 -> %% purposes it's handy to escape all non-printable chars. All non-ASCII %% characters get converted to UTF-8 sequences and then escaped. We've %% already got a UTF-8 sequence here, so just escape it. - lists:flatten(io_lib:format("\\~2.16.0B", [C])) ++ - escape_rdn_value(S, middle); + rabbit_misc:format("\\~2.16.0B", [C]) ++ escape_rdn_value(S, middle); escape_rdn_value([C | S], middle) -> [C | escape_rdn_value(S, middle)]. diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index 802ea5e2e7..0965e3b36e 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_sup). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ff55194707..ae690e8cdd 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_tests). @@ -889,6 +889,14 @@ test_cluster_management2(SecondaryNode) -> ok = control_action(stop_app, []), ok = assert_ram_node(), + %% ram node will not start by itself + ok = control_action(stop_app, []), + ok = control_action(stop_app, SecondaryNode, [], []), + {error, _} = control_action(start_app, []), + ok = control_action(start_app, SecondaryNode, [], []), + ok = control_action(start_app, []), + ok = control_action(stop_app, []), + %% change cluster config while remaining in same cluster ok = control_action(force_cluster, ["invalid2@invalid", SecondaryNodeS]), ok = control_action(start_app, []), @@ -897,8 +905,7 @@ test_cluster_management2(SecondaryNode) -> %% join non-existing cluster as a ram node ok = control_action(force_cluster, ["invalid1@invalid", "invalid2@invalid"]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), + {error, _} = control_action(start_app, []), ok = assert_ram_node(), %% join empty cluster as a ram node (converts to disc) @@ -2222,14 +2229,26 @@ test_amqqueue(Durable) -> #amqqueue { durable = Durable }. with_fresh_variable_queue(Fun) -> - ok = empty_test_queue(), - VQ = variable_queue_init(test_amqqueue(true), false), - S0 = rabbit_variable_queue:status(VQ), - assert_props(S0, [{q1, 0}, {q2, 0}, - {delta, {delta, undefined, 0, undefined}}, - {q3, 0}, {q4, 0}, - {len, 0}]), - _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)), + Ref = make_ref(), + Me = self(), + %% Run in a separate process since rabbit_msg_store will send + %% bump_credit messages and we want to ignore them + spawn_link(fun() -> + ok = empty_test_queue(), + VQ = variable_queue_init(test_amqqueue(true), false), + S0 = rabbit_variable_queue:status(VQ), + assert_props(S0, [{q1, 0}, {q2, 0}, + {delta, + {delta, undefined, 0, undefined}}, + {q3, 0}, {q4, 0}, + {len, 0}]), + _ = rabbit_variable_queue:delete_and_terminate( + shutdown, Fun(VQ)), + Me ! Ref + end), + receive + Ref -> ok + end, passed. publish_and_confirm(Q, Payload, Count) -> diff --git a/src/rabbit_tests_event_receiver.erl b/src/rabbit_tests_event_receiver.erl index abcbe0b6d5..72c07b51ec 100644 --- a/src/rabbit_tests_event_receiver.erl +++ b/src/rabbit_tests_event_receiver.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_tests_event_receiver). diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl index 58079ccf47..3a5b96de4f 100644 --- a/src/rabbit_trace.erl +++ b/src/rabbit_trace.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_trace). diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index ae2b5d3f46..732c29b6b6 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_types). diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 717d94a802..80f50b38b3 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_upgrade). diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index f164035ee5..9f2535bd04 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_upgrade_functions). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 83af5ddd66..9131a59e55 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_variable_queue). @@ -902,17 +902,23 @@ msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) -> msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( MSCState, IsPersistent, - fun (MSCState1) -> rabbit_msg_store:write(MsgId, Msg, MSCState1) end). + fun (MSCState1) -> + rabbit_msg_store:write_flow(MsgId, Msg, MSCState1) + end). msg_store_read(MSCState, IsPersistent, MsgId) -> with_msg_store_state( MSCState, IsPersistent, - fun (MSCState1) -> rabbit_msg_store:read(MsgId, MSCState1) end). + fun (MSCState1) -> + rabbit_msg_store:read(MsgId, MSCState1) + end). msg_store_remove(MSCState, IsPersistent, MsgIds) -> with_immutable_msg_store_state( MSCState, IsPersistent, - fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end). + fun (MCSState1) -> + rabbit_msg_store:remove(MsgIds, MCSState1) + end). msg_store_close_fds(MSCState, IsPersistent) -> with_msg_store_state( diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl index f6bcbb7fd5..7545d81362 100644 --- a/src/rabbit_version.erl +++ b/src/rabbit_version.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_version). diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 38bb76b03b..5548ef6d05 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_vhost). diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 091b50e4c6..269128df17 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_writer). @@ -169,12 +169,10 @@ call(Pid, Msg) -> %%--------------------------------------------------------------------------- assemble_frame(Channel, MethodRecord, Protocol) -> - ?LOGMESSAGE(out, Channel, MethodRecord, none), rabbit_binary_generator:build_simple_method_frame( Channel, MethodRecord, Protocol). assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> - ?LOGMESSAGE(out, Channel, MethodRecord, Content), MethodName = rabbit_misc:method_record_type(MethodRecord), true = Protocol:method_has_content(MethodName), % assertion MethodFrame = rabbit_binary_generator:build_simple_method_frame( diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 26ea502ced..a2f4fae980 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -41,7 +41,7 @@ %% 5) normal, and {shutdown, _} exit reasons are all treated the same %% (i.e. are regarded as normal exits) %% -%% All modifications are (C) 2010-2011 VMware, Inc. +%% All modifications are (C) 2010-2012 VMware, Inc. %% %% %CopyrightBegin% %% diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 8678c2c9e0..43a6bc994b 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(tcp_acceptor). @@ -54,28 +54,9 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, {ok, Mod} = inet_db:lookup_socket(LSock), inet_db:register_socket(Sock, Mod), - try - %% report - {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end), - {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end), - error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n", - [rabbit_misc:ntoab(Address), Port, - rabbit_misc:ntoab(PeerAddress), PeerPort]), - %% In the event that somebody floods us with connections we can spew - %% the above message at error_logger faster than it can keep up. - %% So error_logger's mailbox grows unbounded until we eat all the - %% memory available and crash. So here's a meaningless synchronous call - %% to the underlying gen_event mechanism - when it returns the mailbox - %% is drained. - gen_event:which_handlers(error_logger), - %% handle - file_handle_cache:transfer(apply(M, F, A ++ [Sock])), - ok = file_handle_cache:obtain() - catch {inet_error, Reason} -> - gen_tcp:close(Sock), - error_logger:error_msg("unable to accept TCP connection: ~p~n", - [Reason]) - end, + %% handle + file_handle_cache:transfer(apply(M, F, A ++ [Sock])), + ok = file_handle_cache:obtain(), %% accept more accept(State); @@ -88,9 +69,12 @@ handle_info({inet_async, LSock, Ref, {error, closed}}, handle_info({inet_async, LSock, Ref, {error, Reason}}, State=#state{sock=LSock, ref=Ref}) -> - {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end), + {AddressS, Port} = case inet:sockname(LSock) of + {ok, {A, P}} -> {rabbit_misc:ntoab(A), P}; + {error, _} -> {"unknown", unknown} + end, error_logger:error_msg("failed to accept TCP connection on ~s:~p: ~p~n", - [rabbit_misc:ntoab(Address), Port, Reason]), + [AddressS, Port, Reason]), accept(State); handle_info(_Info, State) -> @@ -104,8 +88,6 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- -inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). - accept(State = #state{sock=LSock}) -> case prim_inet:async_accept(LSock, -1) of {ok, Ref} -> {noreply, State#state{ref=Ref}}; diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl index cb3dd02c42..d8844441d2 100644 --- a/src/tcp_acceptor_sup.erl +++ b/src/tcp_acceptor_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(tcp_acceptor_sup). diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index 9a82ac88c1..fb01c792f4 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(tcp_listener). @@ -72,8 +72,9 @@ init({IPAddress, Port, SocketOpts, label = Label}}; {error, Reason} -> error_logger:error_msg( - "failed to start ~s on ~s:~p - ~p~n", - [Label, rabbit_misc:ntoab(IPAddress), Port, Reason]), + "failed to start ~s on ~s:~p - ~p (~s)~n", + [Label, rabbit_misc:ntoab(IPAddress), Port, + Reason, inet:format_error(Reason)]), {stop, {cannot_listen, IPAddress, Port, Reason}} end. diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index 74297b6dce..9ee921b423 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(tcp_listener_sup). diff --git a/src/test_sup.erl b/src/test_sup.erl index 5feb146f64..7f4b5049fe 100644 --- a/src/test_sup.erl +++ b/src/test_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(test_sup). diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index 8973a4f750..fca55f02f1 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% %% In practice Erlang shouldn't be allowed to grow to more than a half diff --git a/src/worker_pool.erl b/src/worker_pool.erl index fcb07a16f4..c9ecccd6b6 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(worker_pool). diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl index d37c3a0fd6..ff356366ac 100644 --- a/src/worker_pool_sup.erl +++ b/src/worker_pool_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(worker_pool_sup). diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index b42530e269..1ddcebb23d 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(worker_pool_worker). |
