summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-06 15:15:50 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-06 15:15:50 +0000
commit4eb2a5282a77f7b73e1f9b94e368b6ca3b81129d (patch)
tree2d02de39f0877a41df3c7b16025c26a169651e0a
parenta5143916e5069d3d97438008187784f3761fd4a4 (diff)
parentc16d3a27f9eabefab3a6d7e248997ee837819127 (diff)
downloadrabbitmq-server-git-4eb2a5282a77f7b73e1f9b94e368b6ca3b81129d.tar.gz
merge default into bug20337
-rw-r--r--LICENSE-MPL-RabbitMQ2
-rw-r--r--codegen.py4
-rw-r--r--docs/html-to-website-xml.xsl44
-rw-r--r--docs/rabbitmqctl.1.xml30
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--include/gm_specs.hrl2
-rw-r--r--include/rabbit.hrl15
-rw-r--r--include/rabbit_auth_backend_spec.hrl2
-rw-r--r--include/rabbit_auth_mechanism_spec.hrl2
-rw-r--r--include/rabbit_backing_queue_spec.hrl2
-rw-r--r--include/rabbit_exchange_type_spec.hrl2
-rw-r--r--include/rabbit_msg_store.hrl2
-rw-r--r--include/rabbit_msg_store_index.hrl2
-rw-r--r--packaging/common/LICENSE.tail4
-rw-r--r--packaging/common/rabbitmq-script-wrapper2
-rwxr-xr-xpackaging/common/rabbitmq-server.ocf2
-rw-r--r--packaging/debs/Debian/Makefile2
-rw-r--r--packaging/windows-exe/rabbitmq_nsi.in2
-rwxr-xr-xscripts/rabbitmq-env13
-rwxr-xr-xscripts/rabbitmq-plugins9
-rwxr-xr-xscripts/rabbitmq-plugins.bat2
-rwxr-xr-xscripts/rabbitmq-server39
-rwxr-xr-xscripts/rabbitmq-server.bat2
-rwxr-xr-xscripts/rabbitmq-service.bat2
-rwxr-xr-xscripts/rabbitmqctl8
-rwxr-xr-x[-rw-r--r--]scripts/rabbitmqctl.bat2
-rw-r--r--src/credit_flow.erl128
-rw-r--r--src/delegate.erl2
-rw-r--r--src/delegate_sup.erl2
-rw-r--r--src/file_handle_cache.erl2
-rw-r--r--src/gatherer.erl2
-rw-r--r--src/gen_server2.erl2
-rw-r--r--src/gm.erl2
-rw-r--r--src/gm_soak_test.erl2
-rw-r--r--src/gm_speed_test.erl2
-rw-r--r--src/gm_tests.erl2
-rw-r--r--src/lqueue.erl2
-rw-r--r--src/mirrored_supervisor.erl2
-rw-r--r--src/mirrored_supervisor_tests.erl2
-rw-r--r--src/mnesia_sync.erl77
-rw-r--r--src/pg_local.erl2
-rw-r--r--src/priority_queue.erl2
-rw-r--r--src/rabbit.erl21
-rw-r--r--src/rabbit_access_control.erl3
-rw-r--r--src/rabbit_alarm.erl2
-rw-r--r--src/rabbit_amqqueue.erl80
-rw-r--r--src/rabbit_amqqueue_process.erl73
-rw-r--r--src/rabbit_amqqueue_sup.erl2
-rw-r--r--src/rabbit_auth_backend.erl2
-rw-r--r--src/rabbit_auth_backend_internal.erl2
-rw-r--r--src/rabbit_auth_mechanism.erl2
-rw-r--r--src/rabbit_auth_mechanism_amqplain.erl2
-rw-r--r--src/rabbit_auth_mechanism_cr_demo.erl2
-rw-r--r--src/rabbit_auth_mechanism_plain.erl2
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_backing_queue_qc.erl2
-rw-r--r--src/rabbit_basic.erl2
-rw-r--r--src/rabbit_binary_generator.erl2
-rw-r--r--src/rabbit_binary_parser.erl2
-rw-r--r--src/rabbit_binding.erl2
-rw-r--r--src/rabbit_channel.erl161
-rw-r--r--src/rabbit_channel_sup.erl2
-rw-r--r--src/rabbit_channel_sup_sup.erl2
-rw-r--r--src/rabbit_client_sup.erl2
-rw-r--r--src/rabbit_command_assembler.erl2
-rw-r--r--src/rabbit_connection_sup.erl2
-rw-r--r--src/rabbit_control.erl2
-rw-r--r--src/rabbit_direct.erl2
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_error_logger_file_h.erl2
-rw-r--r--src/rabbit_event.erl2
-rw-r--r--src/rabbit_exchange.erl20
-rw-r--r--src/rabbit_exchange_type.erl2
-rw-r--r--src/rabbit_exchange_type_direct.erl2
-rw-r--r--src/rabbit_exchange_type_fanout.erl2
-rw-r--r--src/rabbit_exchange_type_headers.erl2
-rw-r--r--src/rabbit_exchange_type_invalid.erl47
-rw-r--r--src/rabbit_exchange_type_topic.erl2
-rw-r--r--src/rabbit_file.erl2
-rw-r--r--src/rabbit_framing.erl2
-rw-r--r--src/rabbit_guid.erl2
-rw-r--r--src/rabbit_heartbeat.erl2
-rw-r--r--src/rabbit_limiter.erl2
-rw-r--r--src/rabbit_log.erl112
-rw-r--r--src/rabbit_memory_monitor.erl2
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl13
-rw-r--r--src/rabbit_mirror_queue_master.erl8
-rw-r--r--src/rabbit_mirror_queue_misc.erl16
-rw-r--r--src/rabbit_mirror_queue_slave.erl117
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl2
-rw-r--r--src/rabbit_misc.erl27
-rw-r--r--src/rabbit_mnesia.erl56
-rw-r--r--src/rabbit_msg_file.erl2
-rw-r--r--src/rabbit_msg_store.erl67
-rw-r--r--src/rabbit_msg_store_ets_index.erl2
-rw-r--r--src/rabbit_msg_store_gc.erl2
-rw-r--r--src/rabbit_msg_store_index.erl2
-rw-r--r--src/rabbit_net.erl22
-rw-r--r--src/rabbit_networking.erl21
-rw-r--r--src/rabbit_node_monitor.erl2
-rw-r--r--src/rabbit_plugins.erl2
-rw-r--r--src/rabbit_prelaunch.erl2
-rw-r--r--src/rabbit_queue_collector.erl2
-rw-r--r--src/rabbit_queue_index.erl4
-rw-r--r--src/rabbit_reader.erl157
-rw-r--r--src/rabbit_registry.erl2
-rw-r--r--src/rabbit_restartable_sup.erl2
-rw-r--r--src/rabbit_router.erl2
-rw-r--r--src/rabbit_sasl_report_file_h.erl2
-rw-r--r--src/rabbit_ssl.erl10
-rw-r--r--src/rabbit_sup.erl2
-rw-r--r--src/rabbit_tests.erl41
-rw-r--r--src/rabbit_tests_event_receiver.erl2
-rw-r--r--src/rabbit_trace.erl2
-rw-r--r--src/rabbit_types.erl2
-rw-r--r--src/rabbit_upgrade.erl2
-rw-r--r--src/rabbit_upgrade_functions.erl2
-rw-r--r--src/rabbit_variable_queue.erl14
-rw-r--r--src/rabbit_version.erl2
-rw-r--r--src/rabbit_vhost.erl2
-rw-r--r--src/rabbit_writer.erl4
-rw-r--r--src/supervisor2.erl2
-rw-r--r--src/tcp_acceptor.erl36
-rw-r--r--src/tcp_acceptor_sup.erl2
-rw-r--r--src/tcp_listener.erl7
-rw-r--r--src/tcp_listener_sup.erl2
-rw-r--r--src/test_sup.erl2
-rw-r--r--src/vm_memory_monitor.erl2
-rw-r--r--src/worker_pool.erl2
-rw-r--r--src/worker_pool_sup.erl2
-rw-r--r--src/worker_pool_worker.erl2
131 files changed, 1040 insertions, 657 deletions
diff --git a/LICENSE-MPL-RabbitMQ b/LICENSE-MPL-RabbitMQ
index 14bcc21d47..d50e32efc8 100644
--- a/LICENSE-MPL-RabbitMQ
+++ b/LICENSE-MPL-RabbitMQ
@@ -447,7 +447,7 @@ EXHIBIT A -Mozilla Public License.
The Original Code is RabbitMQ.
The Initial Developer of the Original Code is VMware, Inc.
- Copyright (c) 2007-2011 VMware, Inc. All rights reserved.''
+ Copyright (c) 2007-2012 VMware, Inc. All rights reserved.''
[NOTE: The text of this Exhibit A may differ slightly from the text of
the notices in the Source Code files of the Original Code. You should
diff --git a/codegen.py b/codegen.py
index 494be73d68..9483e85438 100644
--- a/codegen.py
+++ b/codegen.py
@@ -11,7 +11,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
##
from __future__ import nested_scopes
@@ -118,7 +118,7 @@ def printFileHeader():
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%"""
def genErl(spec):
diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl
index 88aa2e78f7..d83d507369 100644
--- a/docs/html-to-website-xml.xsl
+++ b/docs/html-to-website-xml.xsl
@@ -8,8 +8,6 @@
<xsl:output method="xml" />
-<xsl:template match="*"/>
-
<!-- Copy every element through -->
<xsl:template match="*">
<xsl:element name="{name()}" namespace="http://www.w3.org/1999/xhtml">
@@ -28,36 +26,30 @@
<head>
<title><xsl:value-of select="document($original)/refentry/refnamediv/refname"/><xsl:if test="document($original)/refentry/refmeta/manvolnum">(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</xsl:if> manual page</title>
</head>
- <body>
- <doc:div>
- <xsl:choose>
+ <body show-in-this-page="true">
+ <xsl:choose>
<xsl:when test="document($original)/refentry/refmeta/manvolnum">
- <p>
- This is the manual page for
- <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>.
- </p>
- <p>
- <a href="../manpages.html">See a list of all manual pages</a>.
- </p>
+ <p>
+ This is the manual page for
+ <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>.
+ </p>
+ <p>
+ <a href="../manpages.html">See a list of all manual pages</a>.
+ </p>
</xsl:when>
<xsl:otherwise>
- <p>
- This is the documentation for
- <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>.
- </p>
+ <p>
+ This is the documentation for
+ <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>.
+ </p>
</xsl:otherwise>
- </xsl:choose>
- <p>
+ </xsl:choose>
+ <p>
For more general documentation, please see the
- <a href="../admin-guide.html">administrator's guide</a>.
- </p>
-
- <doc:toc class="compact">
- <doc:heading>Table of Contents</doc:heading>
- </doc:toc>
+ <a href="../admin-guide.html">administrator's guide</a>.
+ </p>
- <xsl:apply-templates select="body/div[@class='refentry']"/>
- </doc:div>
+ <xsl:apply-templates select="body/div[@class='refentry']"/>
</body>
</html>
</xsl:template>
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 7268f09037..c1c51f9f8b 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -266,10 +266,9 @@
</para>
<para>
When the target files do not exist they are created.
- target files do not already exist. When
- no <option>suffix</option> is specified, the empty log
- files are simply created at the original location; no
- rotation takes place.
+ When no <option>suffix</option> is specified, the empty
+ log files are simply created at the original location;
+ no rotation takes place.
</para>
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl rotate_logs .1</screen>
@@ -1067,10 +1066,26 @@
<listitem><para>The period for which the peer's SSL
certificate is valid.</para></listitem>
</varlistentry>
+
+ <varlistentry>
+ <term>last_blocked_by</term>
+ <listitem><para>The reason for which this connection
+ was last blocked. One of 'mem' - due to a memory
+ alarm, 'flow' - due to internal flow control, or
+ 'none' if the connection was never
+ blocked.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>last_blocked_age</term>
+ <listitem><para>Time, in seconds, since this
+ connection was last blocked, or
+ 'infinity'.</para></listitem>
+ </varlistentry>
+
<varlistentry>
<term>state</term>
<listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>,
- <command>opening</command>, <command>running</command>, <command>closing</command>, <command>closed</command>]).</para></listitem>
+ <command>opening</command>, <command>running</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem>
</varlistentry>
<varlistentry>
<term>channels</term>
@@ -1127,8 +1142,9 @@
</varlistentry>
</variablelist>
<para>
- If no <command>connectioninfoitem</command>s are specified then user, peer
- address, peer port and connection state are displayed.
+ If no <command>connectioninfoitem</command>s are
+ specified then user, peer address, peer port, time since
+ flow control and memory block state are displayed.
</para>
<para role="example-prefix">
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 9301af6bdc..2fee1114aa 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -37,6 +37,7 @@
{auth_backends, [rabbit_auth_backend_internal]},
{delegate_count, 16},
{trace_vhosts, []},
+ {log_levels, [{connection, info}]},
{tcp_listen_options, [binary,
{packet, raw},
{reuseaddr, true},
diff --git a/include/gm_specs.hrl b/include/gm_specs.hrl
index ee29706e98..a317e63b68 100644
--- a/include/gm_specs.hrl
+++ b/include/gm_specs.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-ifdef(use_specs).
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index d81b82dbbc..735b47204c 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-record(user, {username,
@@ -86,7 +86,7 @@
%%----------------------------------------------------------------------------
--define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2011 VMware, Inc.").
+-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2012 VMware, Inc.").
-define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/").
-define(PROTOCOL_VERSION, "AMQP 0-9-1 / 0-9 / 0-8").
-define(ERTS_MINIMUM, "5.6.3").
@@ -95,16 +95,7 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
+-define(CREDIT_DISC_BOUND, {2000, 1500}).
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
-
--ifdef(debug).
--define(LOGDEBUG0(F), rabbit_log:debug(F)).
--define(LOGDEBUG(F,A), rabbit_log:debug(F,A)).
--define(LOGMESSAGE(D,C,M,Co), rabbit_log:message(D,C,M,Co)).
--else.
--define(LOGDEBUG0(F), ok).
--define(LOGDEBUG(F,A), ok).
--define(LOGMESSAGE(D,C,M,Co), ok).
--endif.
diff --git a/include/rabbit_auth_backend_spec.hrl b/include/rabbit_auth_backend_spec.hrl
index 803bb75ce7..61a2e22a0e 100644
--- a/include/rabbit_auth_backend_spec.hrl
+++ b/include/rabbit_auth_backend_spec.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-ifdef(use_specs).
diff --git a/include/rabbit_auth_mechanism_spec.hrl b/include/rabbit_auth_mechanism_spec.hrl
index 614a3eed07..9a2f5e0505 100644
--- a/include/rabbit_auth_mechanism_spec.hrl
+++ b/include/rabbit_auth_mechanism_spec.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-ifdef(use_specs).
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 1e870bb74b..918d587a5e 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-type(fetch_result(Ack) ::
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index f6283ef7c1..8f7e22d37e 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-ifdef(use_specs).
diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl
index e9150a97b3..f7c10bd8d9 100644
--- a/include/rabbit_msg_store.hrl
+++ b/include/rabbit_msg_store.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-include("rabbit.hrl").
diff --git a/include/rabbit_msg_store_index.hrl b/include/rabbit_msg_store_index.hrl
index 2ae5b0009d..75d7eb7162 100644
--- a/include/rabbit_msg_store_index.hrl
+++ b/include/rabbit_msg_store_index.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-include("rabbit_msg_store.hrl").
diff --git a/packaging/common/LICENSE.tail b/packaging/common/LICENSE.tail
index 5d842cc1ae..b9c2629b02 100644
--- a/packaging/common/LICENSE.tail
+++ b/packaging/common/LICENSE.tail
@@ -56,7 +56,7 @@ The rest of this package is licensed under the Mozilla Public License 1.1
Authors and Copyright are as described below:
The Initial Developer of the Original Code is VMware, Inc.
- Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+ Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
MOZILLA PUBLIC LICENSE
@@ -508,7 +508,7 @@ EXHIBIT A -Mozilla Public License.
The Original Code is RabbitMQ.
The Initial Developer of the Original Code is VMware, Inc.
- Copyright (c) 2007-2011 VMware, Inc. All rights reserved.''
+ Copyright (c) 2007-2012 VMware, Inc. All rights reserved.''
[NOTE: The text of this Exhibit A may differ slightly from the text of
the notices in the Source Code files of the Original Code. You should
diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper
index 0436f54609..0e59c21804 100644
--- a/packaging/common/rabbitmq-script-wrapper
+++ b/packaging/common/rabbitmq-script-wrapper
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
##
# Escape spaces and quotes, because shell is revolting.
diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf
index e6776effcd..1455728630 100755
--- a/packaging/common/rabbitmq-server.ocf
+++ b/packaging/common/rabbitmq-server.ocf
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
##
##
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index 8696427ea5..79e9c1dd72 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -34,7 +34,7 @@ package: clean
chmod a+x $(UNPACKED_DIR)/debian/rules
echo "This package was debianized by Tony Garnock-Jones <tonyg@rabbitmq.com> on\nWed, 3 Jan 2007 15:43:44 +0000.\n\nIt was downloaded from http://www.rabbitmq.com/\n\n" > $(UNPACKED_DIR)/debian/copyright
cat $(UNPACKED_DIR)/LICENSE >> $(UNPACKED_DIR)/debian/copyright
- echo "\n\nThe Debian packaging is (C) 2007-2011, VMware, Inc. and is licensed\nunder the MPL 1.1, see above.\n" >> $(UNPACKED_DIR)/debian/copyright
+ echo "\n\nThe Debian packaging is (C) 2007-2012, VMware, Inc. and is licensed\nunder the MPL 1.1, see above.\n" >> $(UNPACKED_DIR)/debian/copyright
UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR)
cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING)
rm -rf $(UNPACKED_DIR)
diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in
index 27e4e1dc03..91510991cc 100644
--- a/packaging/windows-exe/rabbitmq_nsi.in
+++ b/packaging/windows-exe/rabbitmq_nsi.in
@@ -37,7 +37,7 @@ VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductName" "RabbitMQ Server"
;VIAddVersionKey /LANG=${LANG_ENGLISH} "Comments" ""
VIAddVersionKey /LANG=${LANG_ENGLISH} "CompanyName" "VMware, Inc"
;VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalTrademarks" "" ; TODO ?
-VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2011 VMware, Inc. All rights reserved."
+VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2012 VMware, Inc. All rights reserved."
VIAddVersionKey /LANG=${LANG_ENGLISH} "FileDescription" "RabbitMQ Server"
VIAddVersionKey /LANG=${LANG_ENGLISH} "FileVersion" "%%VERSION%%"
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index a2ef8d3ceb..1fd1339da2 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
##
# Determine where this script is really located
@@ -36,7 +36,16 @@ RABBITMQ_HOME="${SCRIPT_DIR}/.."
[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname`
NODENAME=rabbit@${HOSTNAME%%.*}
-# Load configuration from the rabbitmq.conf file
+## Set (non-empty) default values for rabbitmq-env.conf variables to override
+SERVER_ERL_ARGS="+K true +A30 +P 1048576 \
+-kernel inet_default_connect_options [{nodelay,true}]"
+CONFIG_FILE=/etc/rabbitmq/rabbitmq
+LOG_BASE=/var/log/rabbitmq
+MNESIA_BASE=/var/lib/rabbitmq/mnesia
+PLUGINS_DIR="${RABBITMQ_HOME}/plugins"
+ENABLED_PLUGINS_FILE=/etc/rabbitmq/enabled_plugins
+
+## Load configuration from the rabbitmq.conf file
if [ -f /etc/rabbitmq/rabbitmq.conf ] && \
[ ! -f /etc/rabbitmq/rabbitmq-env.conf ] ; then
echo -n "WARNING: ignoring /etc/rabbitmq/rabbitmq.conf -- "
diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins
index 4c6cb1fa1d..14a18d5794 100755
--- a/scripts/rabbitmq-plugins
+++ b/scripts/rabbitmq-plugins
@@ -12,16 +12,19 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
##
+# Get default settings with user overrides for (RABBITMQ_)<var_name>
+# Non-empty defaults should be set in rabbitmq-env
. `dirname $0`/rabbitmq-env
-ENABLED_PLUGINS_FILE=/etc/rabbitmq/enabled_plugins
+##--- Set environment vars RABBITMQ_<var_name> to defaults if not set
[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE}
+[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR}
-[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins"
+##--- End of overridden <var_name> variables
exec erl \
-pa "${RABBITMQ_HOME}/ebin" \
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
index ca874a7f4b..66a900a195 100755
--- a/scripts/rabbitmq-plugins.bat
+++ b/scripts/rabbitmq-plugins.bat
@@ -12,7 +12,7 @@ REM
REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 39a68c8e0f..0a5a46400e 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -12,33 +12,23 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
##
-SERVER_ERL_ARGS="+K true +A30 +P 1048576 \
--kernel inet_default_connect_options [{nodelay,true}]"
-CONFIG_FILE=/etc/rabbitmq/rabbitmq
-LOG_BASE=/var/log/rabbitmq
-MNESIA_BASE=/var/lib/rabbitmq/mnesia
-SERVER_START_ARGS=
-ENABLED_PLUGINS_FILE=/etc/rabbitmq/enabled_plugins
-
+# Get default settings with user overrides for (RABBITMQ_)<var_name>
+# Non-empty defaults should be set in rabbitmq-env
. `dirname $0`/rabbitmq-env
+##--- Set environment vars RABBITMQ_<var_name> to defaults if not set
+
DEFAULT_NODE_IP_ADDRESS=auto
DEFAULT_NODE_PORT=5672
-[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
-[ "x" = "x$RABBITMQ_NODE_PORT" ] && [ "x" != "x$NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
-if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ]
-then
- if [ "x" != "x$RABBITMQ_NODE_PORT" ]
- then RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS}
- fi
-else
- if [ "x" = "x$RABBITMQ_NODE_PORT" ]
- then RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT}
- fi
-fi
+[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
+[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
+
+[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS}
+[ "x" != "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT}
+
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS}
[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE}
@@ -48,13 +38,16 @@ fi
[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR}
[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}
+
+[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${PID_FILE}
[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${RABBITMQ_MNESIA_DIR}.pid
[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR}
[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand
[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE}
-[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins"
+
+[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR}
## Log rotation
[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS}
@@ -62,6 +55,8 @@ fi
[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS=${SASL_LOGS}
[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}-sasl.log"
+##--- End of overridden <var_name> variables
+
RABBITMQ_START_RABBIT=
[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput'
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 44ce1ce148..ca49a5d8dd 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -12,7 +12,7 @@ REM
REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 1582bfb142..9e274840f2 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -12,7 +12,7 @@ REM
REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index 9a11c3b35e..4aad6b8f33 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -12,14 +12,20 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
##
+# Get default settings with user overrides for (RABBITMQ_)<var_name>
+# Non-empty defaults should be set in rabbitmq-env
. `dirname $0`/rabbitmq-env
+##--- Set environment vars RABBITMQ_<var_name> to defaults if not set
+
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS}
+##--- End of overridden <var_name> variables
+
exec erl \
-pa "${RABBITMQ_HOME}/ebin" \
-noinput \
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index a74a91fd91..f37fae48a6 100644..100755
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -12,7 +12,7 @@ REM
REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
new file mode 100644
index 0000000000..edbf4fb5af
--- /dev/null
+++ b/src/credit_flow.erl
@@ -0,0 +1,128 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(credit_flow).
+
+%% Credit starts at MaxCredit and goes down. Both sides keep
+%% track. When the receiver goes below MoreCreditAt it issues more
+%% credit by sending a message to the sender. The sender should pass
+%% this message in to handle_bump_msg/1. The sender should block when
+%% it goes below 0 (check by invoking blocked/0). If a process is both
+%% a sender and a receiver it will not grant any more credit to its
+%% senders when it is itself blocked - thus the only processes that
+%% need to check blocked/0 are ones that read from network sockets.
+
+-define(DEFAULT_CREDIT, {200, 150}).
+
+-export([ack/1, ack/2, handle_bump_msg/1, blocked/0, send/1, send/2]).
+-export([peer_down/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-opaque(bump_msg() :: {pid(), non_neg_integer()}).
+-opaque(credit_spec() :: {non_neg_integer(), non_neg_integer()}).
+
+-spec(ack/1 :: (pid()) -> 'ok').
+-spec(ack/2 :: (pid(), credit_spec()) -> 'ok').
+-spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok').
+-spec(blocked/0 :: () -> boolean()).
+-spec(send/1 :: (pid()) -> 'ok').
+-spec(send/2 :: (pid(), credit_spec()) -> 'ok').
+-spec(peer_down/1 :: (pid()) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+%% There are two "flows" here; of messages and of credit, going in
+%% opposite directions. The variable names "From" and "To" refer to
+%% the flow of credit, but the function names refer to the flow of
+%% messages. This is the clearest I can make it (since the function
+%% names form the API and want to make sense externally, while the
+%% variable names are used in credit bookkeeping and want to make
+%% sense internally).
+
+%% For any given pair of processes, ack/2 and send/2 must always be
+%% called with the same credit_spec().
+
+ack(To) -> ack(To, ?DEFAULT_CREDIT).
+
+ack(To, {MaxCredit, MoreCreditAt}) ->
+ MoreCreditAt1 = MoreCreditAt + 1,
+ Credit = case get({credit_to, To}, MaxCredit) of
+ MoreCreditAt1 -> grant(To, MaxCredit - MoreCreditAt),
+ MaxCredit;
+ C -> C - 1
+ end,
+ put({credit_to, To}, Credit).
+
+handle_bump_msg({From, MoreCredit}) ->
+ Credit = get({credit_from, From}, 0) + MoreCredit,
+ put({credit_from, From}, Credit),
+ case Credit > 0 of
+ true -> unblock(From),
+ ok;
+ false -> ok
+ end.
+
+blocked() -> get(credit_blocked, []) =/= [].
+
+send(From) -> send(From, ?DEFAULT_CREDIT).
+
+send(From, {MaxCredit, _MoreCreditAt}) ->
+ Credit = get({credit_from, From}, MaxCredit) - 1,
+ case Credit of
+ 0 -> block(From);
+ _ -> ok
+ end,
+ put({credit_from, From}, Credit).
+
+peer_down(Peer) ->
+ %% In theory we could also remove it from credit_deferred here, but it
+ %% doesn't really matter; at some point later we will drain
+ %% credit_deferred and thus send messages into the void...
+ unblock(Peer),
+ erase({credit_from, Peer}),
+ erase({credit_to, Peer}).
+
+%% --------------------------------------------------------------------------
+
+grant(To, Quantity) ->
+ Msg = {bump_credit, {self(), Quantity}},
+ case blocked() of
+ false -> To ! Msg;
+ true -> Deferred = get(credit_deferred, []),
+ put(credit_deferred, [{To, Msg} | Deferred])
+ end.
+
+block(From) -> put(credit_blocked, [From | get(credit_blocked, [])]).
+
+unblock(From) ->
+ NewBlocks = get(credit_blocked, []) -- [From],
+ put(credit_blocked, NewBlocks),
+ case NewBlocks of
+ [] -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])],
+ erase(credit_deferred);
+ _ -> ok
+ end.
+
+get(Key, Default) ->
+ case get(Key) of
+ undefined -> Default;
+ Value -> Value
+ end.
diff --git a/src/delegate.erl b/src/delegate.erl
index edb4eba4ae..d595e4819e 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(delegate).
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index 4c131a6c59..2a8b915b89 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(delegate_sup).
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index c11fb54bcb..59a0ab1cc0 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(file_handle_cache).
diff --git a/src/gatherer.erl b/src/gatherer.erl
index fe976b50a2..98b360389a 100644
--- a/src/gatherer.erl
+++ b/src/gatherer.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(gatherer).
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 49913d2694..f8537487cd 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -73,7 +73,7 @@
%% but where the second argument is specifically the priority_queue
%% which contains the prioritised message_queue.
-%% All modifications are (C) 2009-2011 VMware, Inc.
+%% All modifications are (C) 2009-2012 VMware, Inc.
%% ``The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
diff --git a/src/gm.erl b/src/gm.erl
index 6c89912213..6f9ff5642b 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(gm).
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
index 5e5a3a5a6f..572175410d 100644
--- a/src/gm_soak_test.erl
+++ b/src/gm_soak_test.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(gm_soak_test).
diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl
index defb0f29b8..dad75bd447 100644
--- a/src/gm_speed_test.erl
+++ b/src/gm_speed_test.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(gm_speed_test).
diff --git a/src/gm_tests.erl b/src/gm_tests.erl
index ca0ffd6483..0a2d420469 100644
--- a/src/gm_tests.erl
+++ b/src/gm_tests.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(gm_tests).
diff --git a/src/lqueue.erl b/src/lqueue.erl
index 04b4070621..c4e046b521 100644
--- a/src/lqueue.erl
+++ b/src/lqueue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(lqueue).
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 3ba8f50d2e..a599effa91 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(mirrored_supervisor).
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index d48a9ca51a..e8baabe8ff 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(mirrored_supervisor_tests).
diff --git a/src/mnesia_sync.erl b/src/mnesia_sync.erl
new file mode 100644
index 0000000000..a3773d90b2
--- /dev/null
+++ b/src/mnesia_sync.erl
@@ -0,0 +1,77 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(mnesia_sync).
+
+%% mnesia:sync_transaction/3 fails to guarantee that the log is flushed to disk
+%% at commit. This module is an attempt to minimise the risk of data loss by
+%% performing a coalesced log fsync. Unfortunately this is performed regardless
+%% of whether or not the log was appended to.
+
+-behaviour(gen_server).
+
+-export([sync/0]).
+
+-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, {waiting, disc_node}).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(sync/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+sync() ->
+ gen_server:call(?SERVER, sync, infinity).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, #state{disc_node = mnesia:system_info(use_dir), waiting = []}}.
+
+handle_call(sync, _From, #state{disc_node = false} = State) ->
+ {reply, ok, State};
+handle_call(sync, From, #state{waiting = Waiting} = State) ->
+ {noreply, State#state{waiting = [From | Waiting]}, 0};
+handle_call(Request, _From, State) ->
+ {stop, {unhandled_call, Request}, State}.
+
+handle_cast(Request, State) ->
+ {stop, {unhandled_cast, Request}, State}.
+
+handle_info(timeout, #state{waiting = Waiting} = State) ->
+ ok = disk_log:sync(latest_log),
+ [gen_server:reply(From, ok) || From <- Waiting],
+ {noreply, State#state{waiting = []}};
+handle_info(Message, State) ->
+ {stop, {unhandled_info, Message}, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/pg_local.erl b/src/pg_local.erl
index c9c3a3a715..e2e82f1f4b 100644
--- a/src/pg_local.erl
+++ b/src/pg_local.erl
@@ -13,7 +13,7 @@
%% versions of Erlang/OTP. The remaining type specs have been
%% removed.
-%% All modifications are (C) 2010-2011 VMware, Inc.
+%% All modifications are (C) 2010-2012 VMware, Inc.
%% %CopyrightBegin%
%%
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 4fc8b46972..780fa2e92f 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% Priority queues have essentially the same interface as ordinary
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 9609eb0400..3eb75217fd 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit).
@@ -45,6 +45,12 @@
{requires, file_handle_cache},
{enables, external_infrastructure}]}).
+-rabbit_boot_step({database_sync,
+ [{description, "database sync"},
+ {mfa, {rabbit_sup, start_child, [mnesia_sync]}},
+ {requires, database},
+ {enables, external_infrastructure}]}).
+
-rabbit_boot_step({file_handle_cache,
[{description, "file handle cache server"},
{mfa, {rabbit_sup, start_restartable_child,
@@ -191,7 +197,7 @@
rabbit_queue_index, gen, dict, ordsets, file_handle_cache,
rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file,
rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia,
- mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists]).
+ mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow]).
%% HiPE compilation uses multiple cores anyway, but some bits are
%% IO-bound so we can go faster if we parallelise a bit more. In
@@ -442,8 +448,7 @@ run_boot_step({StepName, Attributes}) ->
[try
apply(M,F,A)
catch
- _:Reason -> boot_error("FAILED~nReason: ~p~nStacktrace: ~p~n",
- [Reason, erlang:get_stacktrace()])
+ _:Reason -> boot_step_error(Reason, erlang:get_stacktrace())
end || {M,F,A} <- MFAs],
io:format("done~n"),
ok
@@ -502,8 +507,14 @@ sort_boot_steps(UnsortedSteps) ->
end])
end.
+boot_step_error(Reason, Stacktrace) ->
+ boot_error("Error description:~n ~p~n~n"
+ "Log files (may contain more information):~n ~s~n ~s~n~n"
+ "Stack trace:~n ~p~n~n",
+ [Reason, log_location(kernel), log_location(sasl), Stacktrace]).
+
boot_error(Format, Args) ->
- io:format("BOOT ERROR: " ++ Format, Args),
+ io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args),
error_logger:error_msg(Format, Args),
timer:sleep(1000),
exit({?MODULE, failure_during_boot}).
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index ca28d68637..75c5351182 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_access_control).
@@ -66,7 +66,6 @@ check_user_login(Username, AuthProps) ->
check_vhost_access(User = #user{ username = Username,
auth_backend = Module }, VHostPath) ->
- ?LOGDEBUG("Checking VHost access for ~p to ~p~n", [Username, VHostPath]),
check_access(
fun() ->
rabbit_vhost:exists(VHostPath) andalso
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 517dd4ec60..187ec1ab6c 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_alarm).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ac64958e18..d16cd0a8fd 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_amqqueue).
@@ -20,7 +20,7 @@
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
- stat/1, deliver/2, requeue/3, ack/3, reject/4]).
+ stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
@@ -120,6 +120,8 @@
-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
{routing_result(), qpids()}).
+-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
+ {routing_result(), qpids()}).
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
@@ -451,39 +453,9 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge).
-deliver([], #delivery{mandatory = false, immediate = false}) ->
- %% /dev/null optimisation
- {routed, []};
-
-deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}) ->
- %% optimisation: when Mandatory = false and Immediate = false,
- %% rabbit_amqqueue:deliver will deliver the message to the queue
- %% process asynchronously, and return true, which means all the
- %% QPids will always be returned. It is therefore safe to use a
- %% fire-and-forget cast here and return the QPids - the semantics
- %% is preserved. This scales much better than the non-immediate
- %% case below.
- QPids = qpids(Qs),
- delegate:invoke_no_result(
- QPids, fun (QPid) -> gen_server2:cast(QPid, {deliver, Delivery}) end),
- {routed, QPids};
+deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
-deliver(Qs, Delivery = #delivery{mandatory = Mandatory,
- immediate = Immediate}) ->
- QPids = qpids(Qs),
- {Success, _} =
- delegate:invoke(
- QPids, fun (QPid) ->
- gen_server2:call(QPid, {deliver, Delivery}, infinity)
- end),
- case {Mandatory, Immediate,
- lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]};
- ({_, false}, {_, H}) -> {true, H}
- end, {false, []}, Success)} of
- {true, _ , {false, []}} -> {unroutable, []};
- {_ , true, {_ , []}} -> {not_delivered, []};
- {_ , _ , {_ , R}} -> {routed, R}
- end.
+deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow).
requeue(QPid, MsgIds, ChPid) ->
delegate_call(QPid, {requeue, MsgIds, ChPid}).
@@ -575,6 +547,46 @@ pseudo_queue(QueueName, Pid) ->
slave_pids = [],
mirror_nodes = undefined}.
+deliver([], #delivery{mandatory = false, immediate = false}, _Flow) ->
+ %% /dev/null optimisation
+ {routed, []};
+
+deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) ->
+ %% optimisation: when Mandatory = false and Immediate = false,
+ %% rabbit_amqqueue:deliver will deliver the message to the queue
+ %% process asynchronously, and return true, which means all the
+ %% QPids will always be returned. It is therefore safe to use a
+ %% fire-and-forget cast here and return the QPids - the semantics
+ %% is preserved. This scales much better than the non-immediate
+ %% case below.
+ QPids = qpids(Qs),
+ case Flow of
+ flow -> [credit_flow:send(QPid) || QPid <- QPids];
+ noflow -> ok
+ end,
+ delegate:invoke_no_result(
+ QPids, fun (QPid) ->
+ gen_server2:cast(QPid, {deliver, Delivery, Flow})
+ end),
+ {routed, QPids};
+
+deliver(Qs, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate},
+ _Flow) ->
+ QPids = qpids(Qs),
+ {Success, _} =
+ delegate:invoke(
+ QPids, fun (QPid) ->
+ gen_server2:call(QPid, {deliver, Delivery}, infinity)
+ end),
+ case {Mandatory, Immediate,
+ lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]};
+ ({_, false}, {_, H}) -> {true, H}
+ end, {false, []}, Success)} of
+ {true, _ , {false, []}} -> {unroutable, []};
+ {_ , true, {_ , []}} -> {not_delivered, []};
+ {_ , _ , {_ , R}} -> {routed, R}
+ end.
+
qpids(Qs) -> lists:append([[QPid | SPids] ||
#amqqueue{pid = QPid, slave_pids = SPids} <- Qs]).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 16d134017f..16d89f74e6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_amqqueue_process).
@@ -122,7 +122,6 @@ info_keys() -> ?INFO_KEYS.
%%----------------------------------------------------------------------------
init(Q) ->
- ?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
State = #q{q = Q#amqqueue{pid = self()},
@@ -149,7 +148,6 @@ init(Q) ->
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
RateTRef, AckTags, Deliveries, MTC) ->
- ?LOGDEBUG("Queue starting - ~p~n", [Q]),
case Owner of
none -> ok;
_ -> erlang:monitor(process, Owner)
@@ -627,6 +625,12 @@ should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
+ case get({ch_publisher, DownPid}) of
+ undefined -> ok;
+ MRef -> erlang:demonitor(MRef),
+ erase({ch_publisher, DownPid}),
+ credit_flow:peer_down(DownPid)
+ end,
case lookup_ch(DownPid) of
not_found ->
{ok, State};
@@ -872,7 +876,7 @@ cleanup_after_confirm(State = #q{blocked_op = Op,
noreply(State1#q{blocked_op = Op})
end.
-already_been_here(Delivery = #delivery{message = #basic_message{content = Content}},
+already_been_here(#delivery{message = #basic_message{content = Content}},
State) ->
#content{properties = #'P_basic'{headers = Headers}} =
rabbit_binary_parser:ensure_content_decoded(Content),
@@ -1275,24 +1279,43 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({deliver, Delivery = #delivery{sender = Sender,
- msg_seq_no = MsgSeqNo}},
- State = #q{dlx = DLX}) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender,
+ msg_seq_no = MsgSeqNo},
+ Flow}, State = #q{dlx = DLX}) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- case DLX of
- undefined ->
- noreply(deliver_or_enqueue(Delivery, State));
- _ ->
- case already_been_here(Delivery, State) of
- false ->
- noreply(deliver_or_enqueue(Delivery, State));
- Qs ->
- rabbit_log:warning(
- "Message dropped. Dead-letter queues " ++
- "cycle detected: ~p~n", [Qs]),
- rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]),
- noreply(State)
- end
+ ShouldDeliver =
+ case DLX of
+ undefined ->
+ true;
+ _ ->
+ case already_been_here(Delivery, State) of
+ false ->
+ true;
+ Qs ->
+ rabbit_log:warning(
+ "Message dropped. Dead-letter queues " ++
+ "cycle detected: ~p~n", [Qs]),
+ rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]),
+ false
+ end
+ end,
+ case ShouldDeliver of
+ false ->
+ noreply(State);
+ true ->
+ case Flow of
+ flow ->
+ Key = {ch_publisher, Sender},
+ case get(Key) of
+ undefined -> put(Key,
+ erlang:monitor(process, Sender));
+ _ -> ok
+ end,
+ credit_flow:ack(Sender);
+ noflow ->
+ ok
+ end,
+ noreply(deliver_or_enqueue(Delivery, State))
end;
handle_cast({ack, AckTags, ChPid}, State) ->
@@ -1381,8 +1404,7 @@ handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) ->
handle_info(maybe_expire, State) ->
case is_unused(State) of
- true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]),
- dead_letter_deleted_queue(undefined, State);
+ true -> dead_letter_deleted_queue(undefined, State);
false -> noreply(ensure_expiry_timer(State))
end;
@@ -1429,8 +1451,11 @@ handle_info(timeout, State) ->
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
+handle_info({bump_credit, Msg}, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ noreply(State);
+
handle_info(Info, State) ->
- ?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 7b3ebcf266..a4305e5f5b 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_amqqueue_sup).
diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl
index ade158bb8c..e0e252b898 100644
--- a/src/rabbit_auth_backend.erl
+++ b/src/rabbit_auth_backend.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_backend).
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 086a90b49f..3ef81d3208 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_backend_internal).
diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl
index 897199ee78..0c8251b803 100644
--- a/src/rabbit_auth_mechanism.erl
+++ b/src/rabbit_auth_mechanism.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism).
diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl
index b8682a465e..3de6e7a679 100644
--- a/src/rabbit_auth_mechanism_amqplain.erl
+++ b/src/rabbit_auth_mechanism_amqplain.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism_amqplain).
diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl
index acbb6e48f6..64b01d8e5d 100644
--- a/src/rabbit_auth_mechanism_cr_demo.erl
+++ b/src/rabbit_auth_mechanism_cr_demo.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism_cr_demo).
diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl
index 2448acb660..19fb587567 100644
--- a/src/rabbit_auth_mechanism_plain.erl
+++ b/src/rabbit_auth_mechanism_plain.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism_plain).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 72c00e3d01..50e4746230 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_backing_queue).
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index c61184a601..7b00fa5f13 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_backing_queue_qc).
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 4196db7dd4..58b3c131d3 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_basic).
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 494f3203c8..d69376fb75 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_binary_generator).
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index f3ca4e9897..5f0016b60b 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_binary_parser).
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 655bbb7369..bb44797e4d 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_binding).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 4c4ba230bc..d74f699d45 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_channel).
@@ -20,7 +20,7 @@
-behaviour(gen_server2).
--export([start_link/10, do/2, do/3, flush/1, shutdown/1]).
+-export([start_link/10, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
@@ -78,6 +78,8 @@
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
+-spec(do_flow/3 :: (pid(), rabbit_framing:amqp_method_record(),
+ rabbit_types:maybe(rabbit_types:content())) -> 'ok').
-spec(flush/1 :: (pid()) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
@@ -110,7 +112,11 @@ do(Pid, Method) ->
do(Pid, Method, none).
do(Pid, Method, Content) ->
- gen_server2:cast(Pid, {method, Method, Content}).
+ gen_server2:cast(Pid, {method, Method, Content, noflow}).
+
+do_flow(Pid, Method, Content) ->
+ credit_flow:send(Pid),
+ gen_server2:cast(Pid, {method, Method, Content, flow}).
flush(Pid) ->
gen_server2:call(Pid, flush, infinity).
@@ -184,7 +190,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- queue_monitors = dict:new(),
+ queue_monitors = sets:new(),
consumer_mapping = dict:new(),
blocking = sets:new(),
queue_consumers = dict:new(),
@@ -240,7 +246,12 @@ handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
handle_call(_Request, _From, State) ->
noreply(State).
-handle_cast({method, Method, Content}, State) ->
+handle_cast({method, Method, Content, Flow},
+ State = #ch{reader_pid = Reader}) ->
+ case Flow of
+ flow -> credit_flow:ack(Reader);
+ noflow -> ok
+ end,
try handle_method(Method, Content, State) of
{reply, Reply, NewState} ->
ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
@@ -295,13 +306,13 @@ handle_cast({deliver, ConsumerTag, AckRequired,
exchange = ExchangeName#resource.name,
routing_key = RoutingKey},
rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content),
- State2 = maybe_incr_stats([{QPid, 1}], case AckRequired of
- true -> deliver;
- false -> deliver_no_ack
- end, State1),
- State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
+ maybe_incr_stats([{QPid, 1}], case AckRequired of
+ true -> deliver;
+ false -> deliver_no_ack
+ end, State1),
+ maybe_incr_redeliver_stats(Redelivered, QPid, State1),
rabbit_trace:tap_trace_out(Msg, TraceState),
- noreply(State3#ch{next_tag = DeliveryTag + 1});
+ noreply(State1#ch{next_tag = DeliveryTag + 1});
handle_cast(force_event_refresh, State) ->
@@ -311,6 +322,10 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
+handle_info({bump_credit, Msg}, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ noreply(State);
+
handle_info(timeout, State) ->
noreply(State);
@@ -323,9 +338,10 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
State2 = queue_blocked(QPid, State1),
State3 = handle_consuming_queue_down(QPid, State2),
+ credit_flow:peer_down(QPid),
erase_queue_stats(QPid),
noreply(State3#ch{queue_monitors =
- dict:erase(QPid, State3#ch.queue_monitors)});
+ sets:del_element(QPid, State3#ch.queue_monitors)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -523,7 +539,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
#'channel.flow_ok'{active = false});
_ -> ok
end,
- demonitor_queue(QPid, State#ch{blocking = Blocking1})
+ State#ch{blocking = Blocking1}
end.
record_confirm(undefined, _, State) ->
@@ -561,8 +577,7 @@ remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs},
MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
case gb_sets:is_empty(MsgSeqNos1) of
true -> UQM1 = gb_trees:delete(QPid, UQM),
- demonitor_queue(
- QPid, State#ch{unconfirmed_qm = UQM1});
+ State#ch{unconfirmed_qm = UQM1};
false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
State#ch{unconfirmed_qm = UQM1}
end;
@@ -668,7 +683,8 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
State1 = State#ch{unacked_message_q = Remaining},
{noreply,
case TxStatus of
- none -> ack(Acked, State1);
+ none -> ack(Acked, State1),
+ State1;
in_progress -> State1#ch{uncommitted_acks =
Acked ++ State1#ch.uncommitted_acks}
end};
@@ -692,11 +708,11 @@ handle_method(#'basic.get'{queue = QueueNameBin,
State1 = lock_message(not(NoAck),
ack_record(DeliveryTag, none, Msg),
State),
- State2 = maybe_incr_stats([{QPid, 1}], case NoAck of
- true -> get_no_ack;
- false -> get
- end, State1),
- State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
+ maybe_incr_stats([{QPid, 1}], case NoAck of
+ true -> get_no_ack;
+ false -> get
+ end, State1),
+ maybe_incr_redeliver_stats(Redelivered, QPid, State1),
rabbit_trace:tap_trace_out(Msg, TraceState),
ok = rabbit_writer:send_command(
WriterPid,
@@ -706,7 +722,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
- {noreply, State3#ch{next_tag = DeliveryTag + 1}};
+ {noreply, State1#ch{next_tag = DeliveryTag + 1}};
empty ->
{reply, #'basic.get_empty'{}, State}
end;
@@ -783,9 +799,8 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
false -> dict:store(QPid, CTags1, QCons)
end
end,
- NewState = demonitor_queue(
- Q, State#ch{consumer_mapping = ConsumerMapping1,
- queue_consumers = QCons1}),
+ NewState = State#ch{consumer_mapping = ConsumerMapping1,
+ queue_consumers = QCons1},
%% In order to ensure that no more messages are sent to
%% the consumer after the cancel_ok has been sent, we get
%% the queue process to send the cancel_ok on our
@@ -1066,9 +1081,9 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
uncommitted_acks = TAL}) ->
- State1 = new_tx(ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2,
- State, TMQ))),
- {noreply, maybe_complete_tx(State1#ch{tx_status = committing})};
+ State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
+ ack(TAL, State1),
+ {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))};
handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
@@ -1107,10 +1122,7 @@ handle_method(#'channel.flow'{active = false}, _,
ok = rabbit_limiter:block(Limiter1),
case consumer_queues(Consumers) of
[] -> {reply, #'channel.flow_ok'{active = false}, State1};
- QPids -> State2 = lists:foldl(fun monitor_queue/2,
- State1#ch{blocking =
- sets:from_list(QPids)},
- QPids),
+ QPids -> State2 = State1#ch{blocking = sets:from_list(QPids)},
ok = rabbit_amqqueue:flush_all(QPids, self()),
{noreply, State2}
end;
@@ -1141,31 +1153,12 @@ consumer_monitor(ConsumerTag,
end.
monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
- case (not dict:is_key(QPid, QMons) andalso
- queue_monitor_needed(QPid, State)) of
- true -> MRef = erlang:monitor(process, QPid),
- State#ch{queue_monitors = dict:store(QPid, MRef, QMons)};
- false -> State
- end.
-
-demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
- case (dict:is_key(QPid, QMons) andalso
- not queue_monitor_needed(QPid, State)) of
- true -> true = erlang:demonitor(dict:fetch(QPid, QMons)),
- State#ch{queue_monitors = dict:erase(QPid, QMons)};
+ case not sets:is_element(QPid, QMons) of
+ true -> erlang:monitor(process, QPid),
+ State#ch{queue_monitors = sets:add_element(QPid, QMons)};
false -> State
end.
-queue_monitor_needed(QPid, #ch{queue_consumers = QCons,
- blocking = Blocking,
- unconfirmed_qm = UQM} = State) ->
- StatsEnabled = rabbit_event:stats_level(
- State, #ch.stats_timer) =:= fine,
- ConsumerMonitored = dict:is_key(QPid, QCons),
- QueueBlocked = sets:is_element(QPid, Blocking),
- ConfirmMonitored = gb_trees:is_defined(QPid, UQM),
- StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored.
-
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
{value, MsgSet} -> gb_sets:to_list(MsgSet);
@@ -1359,22 +1352,24 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
msg_seq_no = MsgSeqNo},
QNames}, State) ->
{RoutingRes, DeliveredQPids} =
- rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(QNames), Delivery),
- State1 = process_routing_result(RoutingRes, DeliveredQPids,
- XName, MsgSeqNo, Message, State),
+ rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery),
+ State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids),
+ State2 = process_routing_result(RoutingRes, DeliveredQPids,
+ XName, MsgSeqNo, Message, State1),
maybe_incr_stats([{XName, 1} |
[{{QPid, XName}, 1} ||
- QPid <- DeliveredQPids]], publish, State1).
+ QPid <- DeliveredQPids]], publish, State2),
+ State2.
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
- record_confirm(MsgSeqNo, XName,
- maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
- return_unroutable, State));
+ maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
+ return_unroutable, State),
+ record_confirm(MsgSeqNo, XName, State);
process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_consumers),
- record_confirm(MsgSeqNo, XName,
- maybe_incr_stats([{XName, 1}], return_not_delivered, State));
+ maybe_incr_stats([{XName, 1}], return_not_delivered, State),
+ record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, _, _, undefined, _, State) ->
@@ -1392,7 +1387,7 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
State0#ch{unconfirmed_qm = UQM1};
none ->
UQM1 = gb_trees:insert(QPid, SingletonSet, UQM),
- monitor_queue(QPid, State0#ch{unconfirmed_qm = UQM1})
+ State0#ch{unconfirmed_qm = UQM1}
end
end, State#ch{unconfirmed_mq = UMQ1}, QPids).
@@ -1416,13 +1411,12 @@ send_nacks(_, State) ->
send_confirms(State = #ch{tx_status = none, confirmed = []}) ->
State;
send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
- {MsgSeqNos, State1} =
- lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) ->
- {[MsgSeqNo | MSNs],
- maybe_incr_stats([{ExchangeName, 1}], confirm,
- State0)}
- end, {[], State}, lists:append(C)),
- send_confirms(MsgSeqNos, State1 #ch{confirmed = []});
+ MsgSeqNos =
+ lists:foldl(fun ({MsgSeqNo, XName}, MSNs) ->
+ maybe_incr_stats([{XName, 1}], confirm, State),
+ [MsgSeqNo | MSNs]
+ end, [], lists:append(C)),
+ send_confirms(MsgSeqNos, State#ch{confirmed = []});
send_confirms(State) ->
maybe_complete_tx(State).
@@ -1502,26 +1496,21 @@ i(Item, _) ->
maybe_incr_redeliver_stats(true, QPid, State) ->
maybe_incr_stats([{QPid, 1}], redeliver, State);
-maybe_incr_redeliver_stats(_, _, State) ->
- State.
+maybe_incr_redeliver_stats(_, _, _State) ->
+ ok.
maybe_incr_stats(QXIncs, Measure, State) ->
case rabbit_event:stats_level(State, #ch.stats_timer) of
- fine -> lists:foldl(fun ({QX, Inc}, State0) ->
- incr_stats(QX, Inc, Measure, State0)
- end, State, QXIncs);
- _ -> State
+ fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs];
+ _ -> ok
end.
-incr_stats({QPid, _} = QX, Inc, Measure, State) ->
- update_measures(queue_exchange_stats, QX, Inc, Measure),
- monitor_queue(QPid, State);
-incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) ->
- update_measures(queue_stats, QPid, Inc, Measure),
- monitor_queue(QPid, State);
-incr_stats(X, Inc, Measure, State) ->
- update_measures(exchange_stats, X, Inc, Measure),
- State.
+incr_stats({_, _} = QX, Inc, Measure) ->
+ update_measures(queue_exchange_stats, QX, Inc, Measure);
+incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
+ update_measures(queue_stats, QPid, Inc, Measure);
+incr_stats(X, Inc, Measure) ->
+ update_measures(exchange_stats, X, Inc, Measure).
update_measures(Type, QX, Inc, Measure) ->
Measures = case get({Type, QX}) of
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index a19b6bfdad..dc262b4948 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_channel_sup).
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
index e2561c802e..995c41fbc2 100644
--- a/src/rabbit_channel_sup_sup.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_channel_sup_sup).
diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl
index 4ba01b4f4d..c508f1b996 100644
--- a/src/rabbit_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_client_sup).
diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl
index a0953eab95..adf6e41702 100644
--- a/src/rabbit_command_assembler.erl
+++ b/src/rabbit_command_assembler.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_command_assembler).
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index b2aba2eeb0..12a532b6fc 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_connection_sup).
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 22b57b1ae9..d187a0b29d 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_control).
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 6f9a46504f..e2928cae7e 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_direct).
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 6e29ace71a..f1672f4e7b 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_error_logger).
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index 7b6e07c19f..042ab23cbc 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_error_logger_file_h).
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 5ae40c784a..4ec141cfd8 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_event).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index a15b9be490..83e28c44a8 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange).
@@ -355,11 +355,21 @@ peek_serial(XName) ->
_ -> undefined
end.
+invalid_module(T) ->
+ rabbit_log:warning(
+ "Could not find exchange type ~s.~n", [T]),
+ put({xtype_to_module, T}, rabbit_exchange_type_invalid),
+ rabbit_exchange_type_invalid.
+
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
case get({xtype_to_module, T}) of
- undefined -> {ok, Module} = rabbit_registry:lookup_module(exchange, T),
- put({xtype_to_module, T}, Module),
- Module;
- Module -> Module
+ undefined ->
+ case rabbit_registry:lookup_module(exchange, T) of
+ {ok, Module} -> put({xtype_to_module, T}, Module),
+ Module;
+ {error, not_found} -> invalid_module(T)
+ end;
+ Module ->
+ Module
end.
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index ab3d00dc28..44a08e2437 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type).
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index b485e31f33..4bce42d4da 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_direct).
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 3c02972278..cc3fb87c21 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_fanout).
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index f09e4aae73..de9979b422 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_headers).
diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl
new file mode 100644
index 0000000000..8f60f7d898
--- /dev/null
+++ b/src/rabbit_exchange_type_invalid.erl
@@ -0,0 +1,47 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_exchange_type_invalid).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_exchange_type).
+
+-export([description/0, serialise_events/0, route/2]).
+-export([validate/1, create/2, delete/3,
+ add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
+-include("rabbit_exchange_type_spec.hrl").
+
+description() ->
+ [{name, <<"invalid">>},
+ {description,
+ <<"Dummy exchange type, to be used when the intended one is not found.">>
+ }].
+
+serialise_events() -> false.
+
+route(#exchange{name = Name, type = Type}, _) ->
+ rabbit_misc:protocol_error(
+ precondition_failed,
+ "Cannot route message through ~s: exchange type ~s not found",
+ [rabbit_misc:rs(Name), Type]).
+
+validate(_X) -> ok.
+create(_Tx, _X) -> ok.
+delete(_Tx, _X, _Bs) -> ok.
+add_binding(_Tx, _X, _B) -> ok.
+remove_bindings(_Tx, _X, _Bs) -> ok.
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 91c7b5d3ef..3ac6ae7438 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_topic).
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index 5cb8e7b69d..59df14f318 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_file).
diff --git a/src/rabbit_framing.erl b/src/rabbit_framing.erl
index da1a6a49ec..a79188abaf 100644
--- a/src/rabbit_framing.erl
+++ b/src/rabbit_framing.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% TODO auto-generate
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 2d0f5014f2..70772ccd2c 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_guid).
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 177ae86859..80b4e768a3 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_heartbeat).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 8a08d4b673..9fa6213b94 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_limiter).
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 558e095751..a6b4eeb05f 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_log).
@@ -23,8 +23,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([debug/1, debug/2, message/4, info/1, info/2,
- warning/1, warning/2, error/1, error/2]).
+-export([log/3, log/4, info/1, info/2, warning/1, warning/2, error/1, error/2]).
-define(SERVER, ?MODULE).
@@ -32,9 +31,15 @@
-ifdef(use_specs).
+-export_type([level/0]).
+
+-type(category() :: atom()).
+-type(level() :: 'info' | 'warning' | 'error').
+
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(debug/1 :: (string()) -> 'ok').
--spec(debug/2 :: (string(), [any()]) -> 'ok').
+
+-spec(log/3 :: (category(), level(), string()) -> 'ok').
+-spec(log/4 :: (category(), level(), string(), [any()]) -> 'ok').
-spec(info/1 :: (string()) -> 'ok').
-spec(info/2 :: (string(), [any()]) -> 'ok').
-spec(warning/1 :: (string()) -> 'ok').
@@ -42,84 +47,47 @@
-spec(error/1 :: (string()) -> 'ok').
-spec(error/2 :: (string(), [any()]) -> 'ok').
--spec(message/4 :: (_,_,_,_) -> 'ok').
-
-endif.
%%----------------------------------------------------------------------------
-
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+log(Category, Level, Fmt) -> log(Category, Level, Fmt, []).
-debug(Fmt) ->
- gen_server:cast(?SERVER, {debug, Fmt}).
-
-debug(Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {debug, Fmt, Args}).
-
-message(Direction, Channel, MethodRecord, Content) ->
- gen_server:cast(?SERVER,
- {message, Direction, Channel, MethodRecord, Content}).
+log(Category, Level, Fmt, Args) when is_list(Args) ->
+ gen_server:cast(?SERVER, {log, Category, Level, Fmt, Args}).
-info(Fmt) ->
- gen_server:cast(?SERVER, {info, Fmt}).
-
-info(Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {info, Fmt, Args}).
-
-warning(Fmt) ->
- gen_server:cast(?SERVER, {warning, Fmt}).
-
-warning(Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {warning, Fmt, Args}).
-
-error(Fmt) ->
- gen_server:cast(?SERVER, {error, Fmt}).
-
-error(Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {error, Fmt, Args}).
+info(Fmt) -> log(default, info, Fmt).
+info(Fmt, Args) -> log(default, info, Fmt, Args).
+warning(Fmt) -> log(default, warning, Fmt).
+warning(Fmt, Args) -> log(default, warning, Fmt, Args).
+error(Fmt) -> log(default, error, Fmt).
+error(Fmt, Args) -> log(default, error, Fmt, Args).
%%--------------------------------------------------------------------
-init([]) -> {ok, none}.
+init([]) ->
+ {ok, CatLevelList} = application:get_env(log_levels),
+ CatLevels = [{Cat, level(Level)} || {Cat, Level} <- CatLevelList],
+ {ok, orddict:from_list(CatLevels)}.
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast({debug, Fmt}, State) ->
- io:format("debug:: "), io:format(Fmt),
- error_logger:info_msg("debug:: " ++ Fmt),
- {noreply, State};
-handle_cast({debug, Fmt, Args}, State) ->
- io:format("debug:: "), io:format(Fmt, Args),
- error_logger:info_msg("debug:: " ++ Fmt, Args),
- {noreply, State};
-handle_cast({message, Direction, Channel, MethodRecord, Content}, State) ->
- io:format("~s ch~p ~p~n",
- [case Direction of
- in -> "-->";
- out -> "<--" end,
- Channel,
- {MethodRecord, Content}]),
- {noreply, State};
-handle_cast({info, Fmt}, State) ->
- error_logger:info_msg(Fmt),
- {noreply, State};
-handle_cast({info, Fmt, Args}, State) ->
- error_logger:info_msg(Fmt, Args),
- {noreply, State};
-handle_cast({warning, Fmt}, State) ->
- error_logger:warning_msg(Fmt),
- {noreply, State};
-handle_cast({warning, Fmt, Args}, State) ->
- error_logger:warning_msg(Fmt, Args),
- {noreply, State};
-handle_cast({error, Fmt}, State) ->
- error_logger:error_msg(Fmt),
- {noreply, State};
-handle_cast({error, Fmt, Args}, State) ->
- error_logger:error_msg(Fmt, Args),
- {noreply, State};
+handle_cast({log, Category, Level, Fmt, Args}, CatLevels) ->
+ CatLevel = case orddict:find(Category, CatLevels) of
+ {ok, L} -> L;
+ error -> level(info)
+ end,
+ case level(Level) =< CatLevel of
+ false -> ok;
+ true -> (case Level of
+ info -> fun error_logger:info_msg/2;
+ warning -> fun error_logger:warning_msg/2;
+ error -> fun error_logger:error_msg/2
+ end)(Fmt, Args)
+ end,
+ {noreply, CatLevels};
handle_cast(_Msg, State) ->
{noreply, State}.
@@ -132,3 +100,9 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+%%--------------------------------------------------------------------
+
+level(info) -> 3;
+level(warning) -> 2;
+level(error) -> 1;
+level(none) -> 0.
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index c25a177b06..f22ad874e3 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 8ed2bede32..d0b5bab779 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_coordinator).
@@ -325,8 +325,7 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) ->
true = link(GM),
GM
end,
- {ok, _TRef} =
- timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]),
+ ensure_gm_heartbeat(),
{ok, #state { q = Q,
gm = GM1,
monitors = dict:new(),
@@ -366,6 +365,11 @@ handle_cast({ensure_monitoring, Pids},
end, Monitors, Pids),
noreply(State #state { monitors = Monitors1 }).
+handle_info(send_gm_heartbeat, State = #state{gm = GM}) ->
+ gm:broadcast(GM, heartbeat),
+ ensure_gm_heartbeat(),
+ noreply(State);
+
handle_info({'DOWN', _MonitorRef, process, Pid, _Reason},
State = #state { monitors = Monitors,
death_fun = DeathFun }) ->
@@ -419,3 +423,6 @@ noreply(State) ->
reply(Reply, State) ->
{reply, Reply, State, hibernate}.
+
+ensure_gm_heartbeat() ->
+ erlang:send_after(?ONE_SECOND, self(), send_gm_heartbeat).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 4b54d82171..8d7b9ded8f 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_master).
@@ -281,8 +281,10 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }.
-status(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
- BQ:status(BQS).
+status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:status(BQS) ++
+ [ {mirror_seen, dict:size(State #state.seen_status)},
+ {mirror_senders, sets:size(State #state.known_senders)} ].
invoke(?MODULE, Fun, State) ->
Fun(?MODULE, State);
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index baebc52b27..db7d8eccec 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_misc).
@@ -136,12 +136,16 @@ add_mirror(Queue, MirrorNode) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
[] -> Result = rabbit_mirror_queue_slave_sup:start_child(
MirrorNode, [Q]),
- rabbit_log:info(
- "Adding mirror of queue ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, Result]),
case Result of
- {ok, _Pid} -> ok;
- _ -> Result
+ {ok, undefined} -> %% Already running
+ ok;
+ {ok, _Pid} ->
+ rabbit_log:info(
+ "Adding mirror of ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, Result]),
+ ok;
+ _ ->
+ Result
end;
[_] -> {error, {queue_already_mirrored_on_node, MirrorNode}}
end
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index db1f056f4c..29a2e8bd71 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_slave).
@@ -90,7 +90,7 @@
}).
start_link(Q) ->
- gen_server2:start_link(?MODULE, [Q], []).
+ gen_server2:start_link(?MODULE, Q, []).
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
@@ -98,55 +98,61 @@ set_maximum_since_use(QPid, Age) ->
info(QPid) ->
gen_server2:call(QPid, info, infinity).
-init([#amqqueue { name = QueueName } = Q]) ->
- process_flag(trap_exit, true), %% amqqueue_process traps exits too.
- {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
- receive {joined, GM} ->
- ok
- end,
+init(#amqqueue { name = QueueName } = Q) ->
Self = self(),
Node = node(),
- {ok, MPid} =
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
- mnesia:read({rabbit_queue, QueueName}),
- %% ASSERTION
- [] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node],
- MPids1 = MPids ++ [Self],
- ok = rabbit_amqqueue:store_queue(
- Q1 #amqqueue { slave_pids = MPids1 }),
- {ok, QPid}
- end),
- erlang:monitor(process, MPid),
- ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use, [Self]),
- ok = rabbit_memory_monitor:register(
- Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
- {ok, BQ} = application:get_env(backing_queue_module),
- BQS = bq_init(BQ, Q, false),
- State = #state { q = Q,
- gm = GM,
- master_pid = MPid,
- backing_queue = BQ,
- backing_queue_state = BQS,
- rate_timer_ref = undefined,
- sync_timer_ref = undefined,
-
- sender_queues = dict:new(),
- msg_id_ack = dict:new(),
- ack_num = 0,
-
- msg_id_status = dict:new(),
- known_senders = dict:new(),
-
- synchronised = false
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
+ mnesia:read({rabbit_queue, QueueName}),
+ case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
+ [] -> MPids1 = MPids ++ [Self],
+ ok = rabbit_amqqueue:store_queue(
+ Q1 #amqqueue { slave_pids = MPids1 }),
+ {new, QPid};
+ [SPid] -> true = rabbit_misc:is_process_alive(SPid),
+ existing
+ end
+ end) of
+ {new, MPid} ->
+ process_flag(trap_exit, true), %% amqqueue_process traps exits too.
+ {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
+ receive {joined, GM} ->
+ ok
+ end,
+ erlang:monitor(process, MPid),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [Self]),
+ ok = rabbit_memory_monitor:register(
+ Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
+ {ok, BQ} = application:get_env(backing_queue_module),
+ BQS = bq_init(BQ, Q, false),
+ State = #state { q = Q,
+ gm = GM,
+ master_pid = MPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = undefined,
+ sync_timer_ref = undefined,
+
+ sender_queues = dict:new(),
+ msg_id_ack = dict:new(),
+ ack_num = 0,
+
+ msg_id_status = dict:new(),
+ known_senders = dict:new(),
+
+ synchronised = false
},
- rabbit_event:notify(queue_slave_created,
- infos(?CREATION_EVENT_KEYS, State)),
- ok = gm:broadcast(GM, request_length),
- {ok, State, hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+ rabbit_event:notify(queue_slave_created,
+ infos(?CREATION_EVENT_KEYS, State)),
+ ok = gm:broadcast(GM, request_length),
+ {ok, State, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
+ ?DESIRED_HIBERNATE}};
+ existing ->
+ ignore
+ end.
handle_call({deliver, Delivery = #delivery { immediate = true }},
From, State) ->
@@ -207,8 +213,12 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({gm, Instruction}, State) ->
handle_process_result(process_instruction(Instruction, State));
-handle_cast({deliver, Delivery = #delivery {}}, State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ case Flow of
+ flow -> credit_flow:ack(Sender);
+ noflow -> ok
+ end,
noreply(maybe_enqueue_message(Delivery, true, State));
handle_cast({set_maximum_since_use, Age}, State) ->
@@ -249,6 +259,10 @@ handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
+handle_info({bump_credit, Msg}, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ noreply(State);
+
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
@@ -446,7 +460,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% Everything that we're monitoring, we need to ensure our new
%% coordinator is monitoring.
- MonitoringPids = [begin true = erlang:demonitor(MRef),
+ MonitoringPids = [begin put({ch_publisher, Pid}, MRef),
Pid
end || {Pid, MRef} <- dict:to_list(KS)],
ok = rabbit_mirror_queue_coordinator:ensure_monitoring(
@@ -600,7 +614,8 @@ ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
local_sender_death(ChPid, State = #state { known_senders = KS }) ->
ok = case dict:is_key(ChPid, KS) of
false -> ok;
- true -> confirm_sender_death(ChPid)
+ true -> credit_flow:peer_down(ChPid),
+ confirm_sender_death(ChPid)
end,
State.
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
index fc04ec79ca..8eacb1f3c4 100644
--- a/src/rabbit_mirror_queue_slave_sup.erl
+++ b/src/rabbit_mirror_queue_slave_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_slave_sup).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 2676ddc63b..cccc09ac94 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_misc).
@@ -39,7 +39,7 @@
-export([upmap/2, map_in_order/2]).
-export([table_filter/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
--export([format_stderr/2, with_local_io/1, local_info_msg/2]).
+-export([format/2, format_stderr/2, with_local_io/1, local_info_msg/2]).
-export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([sort_field_table/1]).
@@ -158,6 +158,7 @@
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom())
-> 'ok' | 'aborted').
-spec(dirty_dump_log/1 :: (file:filename()) -> ok_or_error()).
+-spec(format/2 :: (string(), [any()]) -> 'ok').
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(with_local_io/1 :: (fun (() -> A)) -> A).
-spec(local_info_msg/2 :: (string(), [any()]) -> 'ok').
@@ -225,7 +226,7 @@ frame_error(MethodName, BinaryFields) ->
protocol_error(frame_error, "cannot decode ~w", [BinaryFields], MethodName).
amqp_error(Name, ExplanationFormat, Params, Method) ->
- Explanation = lists:flatten(io_lib:format(ExplanationFormat, Params)),
+ Explanation = format(ExplanationFormat, Params),
#amqp_error{name = Name, explanation = Explanation, method = Method}.
protocol_error(Name, ExplanationFormat, Params) ->
@@ -279,8 +280,7 @@ val({Type, Value}) ->
true -> "~s";
false -> "~w"
end,
- lists:flatten(io_lib:format("the value '" ++ ValFmt ++ "' of type '~s'",
- [Value, Type])).
+ format("the value '" ++ ValFmt ++ "' of type '~s'", [Value, Type]).
%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
%% expensive due to general mnesia overheads (figuring out table types
@@ -329,8 +329,7 @@ r_arg(VHostPath, Kind, Table, Key) ->
end.
rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) ->
- lists:flatten(io_lib:format("~s '~s' in vhost '~s'",
- [Kind, Name, VHostPath])).
+ format("~s '~s' in vhost '~s'", [Kind, Name, VHostPath]).
enable_cover() -> enable_cover(["."]).
@@ -431,7 +430,11 @@ execute_mnesia_transaction(TxFun) ->
%% elsewhere and get a consistent result even when that read
%% executes on a different node.
case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of
- {atomic, Result} -> Result;
+ {atomic, Result} -> case mnesia:is_transaction() of
+ true -> ok;
+ false -> mnesia_sync:sync()
+ end,
+ Result;
{aborted, Reason} -> throw({error, Reason})
end.
@@ -482,9 +485,7 @@ cookie_hash() ->
tcp_name(Prefix, IPAddress, Port)
when is_atom(Prefix) andalso is_number(Port) ->
list_to_atom(
- lists:flatten(
- io_lib:format("~w_~s:~w",
- [Prefix, inet_parse:ntoa(IPAddress), Port]))).
+ format("~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port])).
%% This is a modified version of Luke Gorrie's pmap -
%% http://lukego.livejournal.com/6753.html - that doesn't care about
@@ -553,6 +554,8 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) ->
io:format("Bad Chunk, ~p: ~p~n", [BadBytes, Terms]),
dirty_dump_log1(LH, disk_log:chunk(LH, K)).
+format(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)).
+
format_stderr(Fmt, Args) ->
case os:type() of
{unix, _} ->
@@ -648,7 +651,7 @@ pid_to_string(Pid) when is_pid(Pid) ->
<<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>>
= term_to_binary(Pid),
Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
- lists:flatten(io_lib:format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser])).
+ format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser]).
%% inverse of above
string_to_pid(Str) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index bf997a6f8b..30b5478e30 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
@@ -98,8 +98,8 @@ status() ->
init() ->
ensure_mnesia_running(),
ensure_mnesia_dir(),
- ok = init_db(read_cluster_nodes_config(), true,
- fun maybe_upgrade_local_or_record_desired/0),
+ Nodes = read_cluster_nodes_config(),
+ ok = init_db(Nodes, should_be_disc_node(Nodes)),
%% We intuitively expect the global name server to be synced when
%% Mnesia is up. In fact that's not guaranteed to be the case - let's
%% make it so.
@@ -174,8 +174,7 @@ cluster(ClusterNodes, Force) ->
%% Join the cluster
start_mnesia(),
try
- ok = init_db(ClusterNodes, Force,
- fun maybe_upgrade_local_or_record_desired/0),
+ ok = init_db(ClusterNodes, Force),
ok = create_cluster_nodes_config(ClusterNodes)
after
stop_mnesia()
@@ -501,6 +500,18 @@ delete_previously_running_nodes() ->
FileName, Reason}})
end.
+init_db(ClusterNodes, Force) ->
+ init_db(
+ ClusterNodes, Force,
+ fun () ->
+ case rabbit_upgrade:maybe_upgrade_local() of
+ ok -> ok;
+ %% If we're just starting up a new node we won't have a
+ %% version
+ version_not_available -> ok = rabbit_version:record_desired()
+ end
+ end).
+
%% Take a cluster node config and create the right kind of node - a
%% standalone disk node, or disk or ram node connected to the
%% specified cluster nodes. If Force is false, don't allow
@@ -509,20 +520,12 @@ init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) ->
UClusterNodes = lists:usort(ClusterNodes),
ProperClusterNodes = UClusterNodes -- [node()],
case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of
+ {ok, []} when not Force andalso ProperClusterNodes =/= [] ->
+ throw({error, {failed_to_cluster_with, ProperClusterNodes,
+ "Mnesia could not connect to any disc nodes."}});
{ok, Nodes} ->
- case Force of
- false -> FailedClusterNodes = ProperClusterNodes -- Nodes,
- case FailedClusterNodes of
- [] -> ok;
- _ -> throw({error, {failed_to_cluster_with,
- FailedClusterNodes,
- "Mnesia could not connect "
- "to some nodes."}})
- end;
- true -> ok
- end,
- WantDiscNode = should_be_disc_node(ClusterNodes),
WasDiscNode = is_disc_node(),
+ WantDiscNode = should_be_disc_node(ClusterNodes),
%% We create a new db (on disk, or in ram) in the first
%% two cases and attempt to upgrade the in the other two
case {Nodes, WasDiscNode, WantDiscNode} of
@@ -572,14 +575,6 @@ init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) ->
throw({error, {unable_to_join_cluster, ClusterNodes, Reason}})
end.
-maybe_upgrade_local_or_record_desired() ->
- case rabbit_upgrade:maybe_upgrade_local() of
- ok -> ok;
- %% If we're just starting up a new node we won't have a
- %% version
- version_not_available -> ok = rabbit_version:record_desired()
- end.
-
schema_ok_or_move() ->
case check_schema_integrity() of
ok ->
@@ -627,10 +622,9 @@ move_db() ->
stop_mnesia(),
MnesiaDir = filename:dirname(dir() ++ "/"),
{{Year, Month, Day}, {Hour, Minute, Second}} = erlang:universaltime(),
- BackupDir = lists:flatten(
- io_lib:format("~s_~w~2..0w~2..0w~2..0w~2..0w~2..0w",
- [MnesiaDir,
- Year, Month, Day, Hour, Minute, Second])),
+ BackupDir = rabbit_misc:format(
+ "~s_~w~2..0w~2..0w~2..0w~2..0w~2..0w",
+ [MnesiaDir, Year, Month, Day, Hour, Minute, Second]),
case file:rename(MnesiaDir, BackupDir) of
ok ->
%% NB: we cannot use rabbit_log here since it may not have
@@ -745,7 +739,9 @@ reset(Force) ->
start_mnesia(),
{Nodes, RunningNodes} =
try
- ok = init(),
+ %% Force=true here so that reset still works when clustered
+ %% with a node which is down
+ ok = init_db(read_cluster_nodes_config(), true),
{all_clustered_nodes() -- [Node],
running_clustered_nodes() -- [Node]}
after
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index b7de27d4b5..f685b109a9 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_file).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e6a32b9023..56265136dd 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store).
@@ -21,7 +21,7 @@
-export([start_link/4, successfully_recovered_state/1,
client_init/4, client_terminate/1, client_delete_and_terminate/1,
client_ref/1, close_all_indicated/1,
- write/3, read/2, contains/2, remove/2]).
+ write/3, write_flow/3, read/2, contains/2, remove/2]).
-export([set_maximum_since_use/2, has_readers/2, combine_files/3,
delete_file/2]). %% internal
@@ -152,6 +152,7 @@
-spec(close_all_indicated/1 ::
(client_msstate()) -> rabbit_types:ok(client_msstate())).
-spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok').
+-spec(write_flow/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok').
-spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) ->
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
-spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()).
@@ -436,7 +437,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} =
gen_server2:call(
- Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity),
+ Server, {new_client_state, Ref, self(), MsgOnDiskFun, CloseFDsFun},
+ infinity),
#client_msstate { server = Server,
client_ref = Ref,
file_handle_cache = dict:new(),
@@ -460,12 +462,11 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
-write(MsgId, Msg,
- CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
- client_ref = CRef }) ->
- ok = client_update_flying(+1, MsgId, CState),
- ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
- ok = server_cast(CState, {write, CRef, MsgId}).
+write_flow(MsgId, Msg, CState = #client_msstate { server = Server }) ->
+ credit_flow:send(whereis(Server), ?CREDIT_DISC_BOUND),
+ client_write(MsgId, Msg, flow, CState).
+
+write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState).
read(MsgId,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
@@ -500,6 +501,13 @@ server_call(#client_msstate { server = Server }, Msg) ->
server_cast(#client_msstate { server = Server }, Msg) ->
gen_server2:cast(Server, Msg).
+client_write(MsgId, Msg, Flow,
+ CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
+ client_ref = CRef }) ->
+ ok = client_update_flying(+1, MsgId, CState),
+ ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
+ ok = server_cast(CState, {write, CRef, MsgId, Flow}).
+
client_read1(#msg_location { msg_id = MsgId, file = File } = MsgLocation, Defer,
CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
case ets:lookup(FileSummaryEts, File) of
@@ -666,7 +674,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
ClientRefs, Dir, Server),
Clients = dict:from_list(
- [{CRef, {undefined, undefined}} || CRef <- ClientRefs1]),
+ [{CRef, {undefined, undefined, undefined}} ||
+ CRef <- ClientRefs1]),
%% CleanShutdown => msg location index and file_summary both
%% recovered correctly.
true = case {FileSummaryRecovered, CleanShutdown} of
@@ -731,10 +740,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- successfully_recovered_state -> 7;
- {new_client_state, _Ref, _MODC, _CloseFDsFun} -> 7;
- {read, _MsgId} -> 2;
- _ -> 0
+ successfully_recovered_state -> 7;
+ {new_client_state, _Ref, _Pid, _MODC, _CloseFDsFun} -> 7;
+ {read, _MsgId} -> 2;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
@@ -755,7 +764,7 @@ prioritise_info(Msg, _State) ->
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
-handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
+handle_call({new_client_state, CRef, CPid, MsgOnDiskFun, CloseFDsFun}, _From,
State = #msstate { dir = Dir,
index_state = IndexState,
index_module = IndexModule,
@@ -765,7 +774,7 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
flying_ets = FlyingEts,
clients = Clients,
gc_pid = GCPid }) ->
- Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients),
+ Clients1 = dict:store(CRef, {CPid, MsgOnDiskFun, CloseFDsFun}, Clients),
reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
CurFileCacheEts, FlyingEts},
State #msstate { clients = Clients1 });
@@ -789,11 +798,19 @@ handle_cast({client_dying, CRef},
handle_cast({client_delete, CRef},
State = #msstate { clients = Clients }) ->
+ {CPid, _, _} = dict:fetch(CRef, Clients),
+ credit_flow:peer_down(CPid),
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
-handle_cast({write, CRef, MsgId},
- State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+handle_cast({write, CRef, MsgId, Flow},
+ State = #msstate { cur_file_cache_ets = CurFileCacheEts,
+ clients = Clients }) ->
+ case Flow of
+ flow -> {CPid, _, _} = dict:fetch(CRef, Clients),
+ credit_flow:ack(CPid, ?CREDIT_DISC_BOUND);
+ noflow -> ok
+ end,
true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
case update_flying(-1, MsgId, CRef, State) of
process ->
@@ -1204,10 +1221,10 @@ update_pending_confirms(Fun, CRef,
State = #msstate { clients = Clients,
cref_to_msg_ids = CTM }) ->
case dict:fetch(CRef, Clients) of
- {undefined, _CloseFDsFun} -> State;
- {MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM),
- State #msstate {
- cref_to_msg_ids = CTM1 }
+ {_CPid, undefined, _CloseFDsFun} -> State;
+ {_CPid, MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM),
+ State #msstate {
+ cref_to_msg_ids = CTM1 }
end.
record_pending_confirm(CRef, MsgId, State) ->
@@ -1294,8 +1311,10 @@ mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) ->
case (ets:update_element(FileHandlesEts, Key, {2, close})
andalso Invoke) of
true -> case dict:fetch(Ref, ClientRefs) of
- {_MsgOnDiskFun, undefined} -> ok;
- {_MsgOnDiskFun, CloseFDsFun} -> ok = CloseFDsFun()
+ {_CPid, _MsgOnDiskFun, undefined} ->
+ ok;
+ {_CPid, _MsgOnDiskFun, CloseFDsFun} ->
+ ok = CloseFDsFun()
end;
false -> ok
end
diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl
index d6dc556878..9c31439f51 100644
--- a/src/rabbit_msg_store_ets_index.erl
+++ b/src/rabbit_msg_store_ets_index.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store_ets_index).
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 77f1f04ee2..3b61ed0bd7 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store_gc).
diff --git a/src/rabbit_msg_store_index.erl b/src/rabbit_msg_store_index.erl
index ef8b7cdfed..2f36256c61 100644
--- a/src/rabbit_msg_store_index.erl
+++ b/src/rabbit_msg_store_index.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store_index).
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index b944ec81a1..02889b93a3 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_net).
@@ -19,7 +19,7 @@
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1,
- sockname/1, peername/1, peercert/1]).
+ sockname/1, peername/1, peercert/1, connection_string/2]).
%%---------------------------------------------------------------------------
@@ -62,6 +62,8 @@
-spec(peercert/1 ::
(socket())
-> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())).
+-spec(connection_string/2 ::
+ (socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())).
-endif.
@@ -141,3 +143,19 @@ peername(Sock) when is_port(Sock) -> inet:peername(Sock).
peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl);
peercert(Sock) when is_port(Sock) -> nossl.
+
+connection_string(Sock, Direction) ->
+ {From, To} = case Direction of
+ inbound -> {fun peername/1, fun sockname/1};
+ outbound -> {fun sockname/1, fun peername/1}
+ end,
+ case {From(Sock), To(Sock)} of
+ {{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} ->
+ {ok, rabbit_misc:format("~s:~p -> ~s:~p",
+ [rabbit_misc:ntoab(FromAddress), FromPort,
+ rabbit_misc:ntoab(ToAddress), ToPort])};
+ {{error, _Reason} = Error, _} ->
+ Error;
+ {_, {error, _Reason} = Error} ->
+ Error
+ end.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index e81f8134f8..825d1bb1e0 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_networking).
@@ -164,8 +164,6 @@ ssl_transform_fun(SslOpts) ->
fun (Sock) ->
case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of
{ok, SslSock} ->
- rabbit_log:info("upgraded TCP connection ~p to SSL~n",
- [self()]),
{ok, #ssl_socket{tcp = Sock, ssl = SslSock}};
{error, Reason} ->
{error, {ssl_upgrade_error, Reason}};
@@ -220,7 +218,12 @@ start_listener(Listener, Protocol, Label, OnConnect) ->
start_listener0(Address, Protocol, Label, OnConnect) ->
Spec = tcp_listener_spec(rabbit_tcp_listener_sup, Address, tcp_opts(),
Protocol, Label, OnConnect),
- {ok,_} = supervisor:start_child(rabbit_sup, Spec).
+ case supervisor:start_child(rabbit_sup, Spec) of
+ {ok, _} -> ok;
+ {error, {shutdown, _}} -> {IPAddress, Port, _Family} = Address,
+ exit({could_not_start_tcp_listener,
+ {rabbit_misc:ntoa(IPAddress), Port}})
+ end.
stop_tcp_listener(Listener) ->
[stop_tcp_listener0(Address) ||
@@ -266,6 +269,16 @@ start_client(Sock, SockTransform) ->
{ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []),
ok = rabbit_net:controlling_process(Sock, Reader),
Reader ! {go, Sock, SockTransform},
+
+ %% In the event that somebody floods us with connections, the
+ %% reader processes can spew log events at error_logger faster
+ %% than it can keep up, causing its mailbox to grow unbounded
+ %% until we eat all the memory available and crash. So here is a
+ %% meaningless synchronous call to the underlying gen_event
+ %% mechanism. When it returns the mailbox is drained, and we
+ %% return to our caller to accept more connetions.
+ gen_event:which_handlers(error_logger),
+
Reader.
start_client(Sock) ->
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 8aa24ab53e..4fc91860cd 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_node_monitor).
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index de2ba8adbb..7b85ab15fc 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_plugins).
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 50444dc49d..11b4d776a3 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_prelaunch).
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 9b45e79869..df957d883c 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_queue_collector).
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index f03c1d1c76..4c8793f1ad 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_queue_index).
@@ -491,7 +491,7 @@ recover_message(false, _, no_del, RelSeq, Segment) ->
queue_name_to_dir_name(Name = #resource { kind = queue }) ->
<<Num:128>> = erlang:md5(term_to_binary(Name)),
- lists:flatten(io_lib:format("~.36B", [Num])).
+ rabbit_misc:format("~.36B", [Num]).
queues_dir() ->
filename:join(rabbit_mnesia:dir(), "queues").
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index fce61129fa..1602cc2b83 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_reader).
@@ -27,8 +27,6 @@
-export([conserve_memory/2, server_properties/1]).
--export([process_channel_frame/5]). %% used by erlang-client
-
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 30).
@@ -40,10 +38,12 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
- auth_mechanism, auth_state}).
+ auth_mechanism, auth_state, conserve_memory,
+ last_blocked_by, last_blocked_at}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
- send_pend, state, channels]).
+ send_pend, state, last_blocked_by, last_blocked_age,
+ channels]).
-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl,
peer_cert_subject, peer_cert_issuer,
@@ -90,10 +90,6 @@
-spec(system_continue/3 :: (_,_,#v1{}) -> any()).
-spec(system_terminate/4 :: (_,_,_,_) -> none()).
--spec(process_channel_frame/5 ::
- (rabbit_command_assembler:frame(), pid(), non_neg_integer(), pid(),
- tuple()) -> tuple()).
-
-endif.
%%--------------------------------------------------------------------------
@@ -177,25 +173,26 @@ server_capabilities(rabbit_framing_amqp_0_9_1) ->
server_capabilities(_) ->
[].
+log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
+
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
socket_op(Sock, Fun) ->
case Fun(Sock) of
{ok, Res} -> Res;
- {error, Reason} -> rabbit_log:error("error on TCP connection ~p:~p~n",
- [self(), Reason]),
- rabbit_log:info("closing TCP connection ~p~n",
- [self()]),
+ {error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n",
+ [self(), Reason]),
exit(normal)
end.
start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
Sock, SockTransform) ->
process_flag(trap_exit, true),
- {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1),
- PeerAddressS = rabbit_misc:ntoab(PeerAddress),
- rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
- [self(), PeerAddressS, PeerPort]),
+ ConnStr = socket_op(Sock, fun (Sock0) ->
+ rabbit_net:connection_string(
+ Sock0, inbound)
+ end),
+ log(info, "accepting AMQP connection ~p (~s)~n", [self(), ConnStr]),
ClientSock = socket_op(Sock, SockTransform),
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
@@ -220,21 +217,22 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
buf = [],
buf_len = 0,
auth_mechanism = none,
- auth_state = none},
+ auth_state = none,
+ conserve_memory = false,
+ last_blocked_by = none,
+ last_blocked_at = never},
try
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
- handshake, 8))
+ handshake, 8)),
+ log(info, "closing AMQP connection ~p (~s)~n", [self(), ConnStr])
catch
- Ex -> (if Ex == connection_closed_abruptly ->
- fun rabbit_log:warning/2;
- true ->
- fun rabbit_log:error/2
- end)("exception on TCP connection ~p from ~s:~p~n~p~n",
- [self(), PeerAddressS, PeerPort, Ex])
+ Ex -> log(case Ex of
+ connection_closed_abruptly -> warning;
+ _ -> error
+ end, "closing AMQP connection ~p (~s):~n~p~n",
+ [self(), ConnStr, Ex])
after
- rabbit_log:info("closing TCP connection ~p from ~s:~p~n",
- [self(), PeerAddressS, PeerPort]),
%% We don't close the socket explicitly. The reader is the
%% controlling process and hence its termination will close
%% the socket. Furthermore, gen_tcp:close/1 waits for pending
@@ -267,21 +265,20 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
{data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf],
buf_len = BufLen + size(Data),
pending_recv = false});
- closed -> if State#v1.connection_state =:= closed ->
- State;
- true ->
- throw(connection_closed_abruptly)
+ closed -> case State#v1.connection_state of
+ closed -> State;
+ _ -> throw(connection_closed_abruptly)
end;
{error, Reason} -> throw({inet_error, Reason});
{other, Other} -> handle_other(Other, Deb, State)
end.
handle_other({conserve_memory, Conserve}, Deb, State) ->
- recvloop(Deb, internal_conserve_memory(Conserve, State));
+ recvloop(Deb, control_throttle(State#v1{conserve_memory = Conserve}));
handle_other({channel_closing, ChPid}, Deb, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
- mainloop(Deb, maybe_close(State));
+ mainloop(Deb, maybe_close(control_throttle(State)));
handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -341,6 +338,9 @@ handle_other(emit_stats, Deb, State) ->
mainloop(Deb, emit_stats(State));
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
+handle_other({bump_credit, Msg}, Deb, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ recvloop(Deb, control_throttle(State));
handle_other(Other, _Deb, _State) ->
%% internal error -> something worth dying for
exit({unexpected_message, Other}).
@@ -355,17 +355,30 @@ terminate(Explanation, State) when ?IS_RUNNING(State) ->
terminate(_Explanation, State) ->
{force, State}.
-internal_conserve_memory(true, State = #v1{connection_state = running}) ->
- State#v1{connection_state = blocking};
-internal_conserve_memory(false, State = #v1{connection_state = blocking}) ->
- State#v1{connection_state = running};
-internal_conserve_memory(false, State = #v1{connection_state = blocked,
- heartbeater = Heartbeater}) ->
- ok = rabbit_heartbeat:resume_monitor(Heartbeater),
- State#v1{connection_state = running};
-internal_conserve_memory(_Conserve, State) ->
+control_throttle(State = #v1{connection_state = CS,
+ conserve_memory = Mem}) ->
+ case {CS, Mem orelse credit_flow:blocked()} of
+ {running, true} -> State#v1{connection_state = blocking};
+ {blocking, false} -> State#v1{connection_state = running};
+ {blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
+ State#v1.heartbeater),
+ State#v1{connection_state = running};
+ {blocked, true} -> update_last_blocked_by(State);
+ {_, _} -> State
+ end.
+
+maybe_block(State = #v1{connection_state = blocking}) ->
+ ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
+ update_last_blocked_by(State#v1{connection_state = blocked,
+ last_blocked_at = erlang:now()});
+maybe_block(State) ->
State.
+update_last_blocked_by(State = #v1{conserve_memory = true}) ->
+ State#v1{last_blocked_by = mem};
+update_last_blocked_by(State = #v1{conserve_memory = false}) ->
+ State#v1{last_blocked_by = flow}.
+
close_connection(State = #v1{queue_collector = Collector,
connection = #connection{
timeout_sec = TimeoutSec}}) ->
@@ -387,17 +400,19 @@ handle_dependent_exit(ChPid, Reason, State) ->
{undefined, uncontrolled} ->
exit({abnormal_dependent_exit, ChPid, Reason});
{_Channel, controlled} ->
- maybe_close(State);
+ maybe_close(control_throttle(State));
{Channel, uncontrolled} ->
- rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
- [self(), Channel, Reason]),
- maybe_close(handle_exception(State, Channel, Reason))
+ log(error, "AMQP connection ~p, channel ~p - error:~n~p~n",
+ [self(), Channel, Reason]),
+ maybe_close(handle_exception(control_throttle(State),
+ Channel, Reason))
end.
channel_cleanup(ChPid) ->
case get({ch_pid, ChPid}) of
undefined -> undefined;
- {Channel, MRef} -> erase({channel, Channel}),
+ {Channel, MRef} -> credit_flow:peer_down(ChPid),
+ erase({channel, Channel}),
erase({ch_pid, ChPid}),
erlang:demonitor(MRef, [flush]),
Channel
@@ -432,9 +447,10 @@ wait_for_channel_termination(N, TimerRef) ->
{_Channel, controlled} ->
wait_for_channel_termination(N-1, TimerRef);
{Channel, uncontrolled} ->
- rabbit_log:error("connection ~p, channel ~p - "
- "error while terminating:~n~p~n",
- [self(), Channel, Reason]),
+ log(error,
+ "AMQP connection ~p, channel ~p - "
+ "error while terminating:~n~p~n",
+ [self(), Channel, Reason]),
wait_for_channel_termination(N-1, TimerRef)
end;
cancel_wait ->
@@ -485,12 +501,12 @@ handle_frame(Type, Channel, Payload,
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
AnalyzedFrame ->
case get({channel, Channel}) of
- {ChPid, FramingState} ->
+ {ChPid, AState} ->
NewAState = process_channel_frame(
- AnalyzedFrame, self(),
- Channel, ChPid, FramingState),
+ AnalyzedFrame, Channel, ChPid, AState),
put({channel, Channel}, {ChPid, NewAState}),
- post_process_frame(AnalyzedFrame, ChPid, State);
+ post_process_frame(AnalyzedFrame, ChPid,
+ control_throttle(State));
undefined ->
case ?IS_RUNNING(State) of
true -> send_to_new_channel(
@@ -504,18 +520,13 @@ handle_frame(Type, Channel, Payload,
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
channel_cleanup(ChPid),
- State;
+ control_throttle(State);
post_process_frame({method, MethodName, _}, _ChPid,
State = #v1{connection = #connection{
protocol = Protocol}}) ->
case Protocol:method_has_content(MethodName) of
true -> erlang:bump_reductions(2000),
- case State#v1.connection_state of
- blocking -> ok = rabbit_heartbeat:pause_monitor(
- State#v1.heartbeater),
- State#v1{connection_state = blocked};
- _ -> State
- end;
+ maybe_block(State);
false -> State
end;
post_process_frame(_Frame, _ChPid, State) ->
@@ -687,10 +698,11 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
- State1 = internal_conserve_memory(
- rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ State1 = control_throttle(
State#v1{connection_state = running,
- connection = NewConnection}),
+ connection = NewConnection,
+ conserve_memory = Conserve}),
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
@@ -822,6 +834,12 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
fun ([{_, I}]) -> I end);
i(state, #v1{connection_state = S}) ->
S;
+i(last_blocked_by, #v1{last_blocked_by = By}) ->
+ By;
+i(last_blocked_age, #v1{last_blocked_at = never}) ->
+ infinity;
+i(last_blocked_age, #v1{last_blocked_at = T}) ->
+ timer:now_diff(erlang:now(), T) / 1000000;
i(channels, #v1{}) ->
length(all_channels());
i(protocol, #v1{connection = #connection{protocol = none}}) ->
@@ -890,21 +908,20 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User,
VHost, Capabilities, Collector}),
MRef = erlang:monitor(process, ChPid),
- NewAState = process_channel_frame(AnalyzedFrame, self(),
- Channel, ChPid, AState),
+ NewAState = process_channel_frame(AnalyzedFrame, Channel, ChPid, AState),
put({channel, Channel}, {ChPid, NewAState}),
put({ch_pid, ChPid}, {Channel, MRef}),
State.
-process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) ->
+process_channel_frame(Frame, Channel, ChPid, AState) ->
case rabbit_command_assembler:process(Frame, AState) of
{ok, NewAState} -> NewAState;
{ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
NewAState;
- {ok, Method, Content, NewAState} -> rabbit_channel:do(ChPid,
- Method, Content),
+ {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
+ ChPid, Method, Content),
NewAState;
- {error, Reason} -> ErrPid ! {channel_exit, Channel,
+ {error, Reason} -> self() ! {channel_exit, Channel,
Reason},
AState
end.
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 9821ae7b86..8c0ebcbe94 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_registry).
diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl
index 1a08efed41..237ab78cac 100644
--- a/src/rabbit_restartable_sup.erl
+++ b/src/rabbit_restartable_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_restartable_sup).
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 219833b726..f4bbda0f7a 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_router).
diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl
index 963294d924..e8beecfe0a 100644
--- a/src/rabbit_sasl_report_file_h.erl
+++ b/src/rabbit_sasl_report_file_h.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_sasl_report_file_h).
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index e524446ea8..3025d981d4 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_ssl).
@@ -72,9 +72,8 @@ peer_cert_validity(Cert) ->
cert_info(fun(#'OTPCertificate' {
tbsCertificate = #'OTPTBSCertificate' {
validity = {'Validity', Start, End} }}) ->
- lists:flatten(
- io_lib:format("~s - ~s", [format_asn1_value(Start),
- format_asn1_value(End)]))
+ rabbit_misc:format("~s - ~s", [format_asn1_value(Start),
+ format_asn1_value(End)])
end, Cert).
%%--------------------------------------------------------------------------
@@ -155,8 +154,7 @@ escape_rdn_value([C | S], middle) when C < 32 ; C >= 126 ->
%% purposes it's handy to escape all non-printable chars. All non-ASCII
%% characters get converted to UTF-8 sequences and then escaped. We've
%% already got a UTF-8 sequence here, so just escape it.
- lists:flatten(io_lib:format("\\~2.16.0B", [C])) ++
- escape_rdn_value(S, middle);
+ rabbit_misc:format("\\~2.16.0B", [C]) ++ escape_rdn_value(S, middle);
escape_rdn_value([C | S], middle) ->
[C | escape_rdn_value(S, middle)].
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index 802ea5e2e7..0965e3b36e 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_sup).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ff55194707..ae690e8cdd 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_tests).
@@ -889,6 +889,14 @@ test_cluster_management2(SecondaryNode) ->
ok = control_action(stop_app, []),
ok = assert_ram_node(),
+ %% ram node will not start by itself
+ ok = control_action(stop_app, []),
+ ok = control_action(stop_app, SecondaryNode, [], []),
+ {error, _} = control_action(start_app, []),
+ ok = control_action(start_app, SecondaryNode, [], []),
+ ok = control_action(start_app, []),
+ ok = control_action(stop_app, []),
+
%% change cluster config while remaining in same cluster
ok = control_action(force_cluster, ["invalid2@invalid", SecondaryNodeS]),
ok = control_action(start_app, []),
@@ -897,8 +905,7 @@ test_cluster_management2(SecondaryNode) ->
%% join non-existing cluster as a ram node
ok = control_action(force_cluster, ["invalid1@invalid",
"invalid2@invalid"]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
+ {error, _} = control_action(start_app, []),
ok = assert_ram_node(),
%% join empty cluster as a ram node (converts to disc)
@@ -2222,14 +2229,26 @@ test_amqqueue(Durable) ->
#amqqueue { durable = Durable }.
with_fresh_variable_queue(Fun) ->
- ok = empty_test_queue(),
- VQ = variable_queue_init(test_amqqueue(true), false),
- S0 = rabbit_variable_queue:status(VQ),
- assert_props(S0, [{q1, 0}, {q2, 0},
- {delta, {delta, undefined, 0, undefined}},
- {q3, 0}, {q4, 0},
- {len, 0}]),
- _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)),
+ Ref = make_ref(),
+ Me = self(),
+ %% Run in a separate process since rabbit_msg_store will send
+ %% bump_credit messages and we want to ignore them
+ spawn_link(fun() ->
+ ok = empty_test_queue(),
+ VQ = variable_queue_init(test_amqqueue(true), false),
+ S0 = rabbit_variable_queue:status(VQ),
+ assert_props(S0, [{q1, 0}, {q2, 0},
+ {delta,
+ {delta, undefined, 0, undefined}},
+ {q3, 0}, {q4, 0},
+ {len, 0}]),
+ _ = rabbit_variable_queue:delete_and_terminate(
+ shutdown, Fun(VQ)),
+ Me ! Ref
+ end),
+ receive
+ Ref -> ok
+ end,
passed.
publish_and_confirm(Q, Payload, Count) ->
diff --git a/src/rabbit_tests_event_receiver.erl b/src/rabbit_tests_event_receiver.erl
index abcbe0b6d5..72c07b51ec 100644
--- a/src/rabbit_tests_event_receiver.erl
+++ b/src/rabbit_tests_event_receiver.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_tests_event_receiver).
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index 58079ccf47..3a5b96de4f 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_trace).
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index ae2b5d3f46..732c29b6b6 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_types).
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 717d94a802..80f50b38b3 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_upgrade).
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index f164035ee5..9f2535bd04 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_upgrade_functions).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 83af5ddd66..9131a59e55 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_variable_queue).
@@ -902,17 +902,23 @@ msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:write(MsgId, Msg, MSCState1) end).
+ fun (MSCState1) ->
+ rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
+ end).
msg_store_read(MSCState, IsPersistent, MsgId) ->
with_msg_store_state(
MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:read(MsgId, MSCState1) end).
+ fun (MSCState1) ->
+ rabbit_msg_store:read(MsgId, MSCState1)
+ end).
msg_store_remove(MSCState, IsPersistent, MsgIds) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end).
+ fun (MCSState1) ->
+ rabbit_msg_store:remove(MsgIds, MCSState1)
+ end).
msg_store_close_fds(MSCState, IsPersistent) ->
with_msg_store_state(
diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl
index f6bcbb7fd5..7545d81362 100644
--- a/src/rabbit_version.erl
+++ b/src/rabbit_version.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_version).
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 38bb76b03b..5548ef6d05 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_vhost).
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 091b50e4c6..269128df17 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_writer).
@@ -169,12 +169,10 @@ call(Pid, Msg) ->
%%---------------------------------------------------------------------------
assemble_frame(Channel, MethodRecord, Protocol) ->
- ?LOGMESSAGE(out, Channel, MethodRecord, none),
rabbit_binary_generator:build_simple_method_frame(
Channel, MethodRecord, Protocol).
assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
- ?LOGMESSAGE(out, Channel, MethodRecord, Content),
MethodName = rabbit_misc:method_record_type(MethodRecord),
true = Protocol:method_has_content(MethodName), % assertion
MethodFrame = rabbit_binary_generator:build_simple_method_frame(
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 26ea502ced..a2f4fae980 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -41,7 +41,7 @@
%% 5) normal, and {shutdown, _} exit reasons are all treated the same
%% (i.e. are regarded as normal exits)
%%
-%% All modifications are (C) 2010-2011 VMware, Inc.
+%% All modifications are (C) 2010-2012 VMware, Inc.
%%
%% %CopyrightBegin%
%%
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 8678c2c9e0..43a6bc994b 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_acceptor).
@@ -54,28 +54,9 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
{ok, Mod} = inet_db:lookup_socket(LSock),
inet_db:register_socket(Sock, Mod),
- try
- %% report
- {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
- {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end),
- error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
- [rabbit_misc:ntoab(Address), Port,
- rabbit_misc:ntoab(PeerAddress), PeerPort]),
- %% In the event that somebody floods us with connections we can spew
- %% the above message at error_logger faster than it can keep up.
- %% So error_logger's mailbox grows unbounded until we eat all the
- %% memory available and crash. So here's a meaningless synchronous call
- %% to the underlying gen_event mechanism - when it returns the mailbox
- %% is drained.
- gen_event:which_handlers(error_logger),
- %% handle
- file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
- ok = file_handle_cache:obtain()
- catch {inet_error, Reason} ->
- gen_tcp:close(Sock),
- error_logger:error_msg("unable to accept TCP connection: ~p~n",
- [Reason])
- end,
+ %% handle
+ file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
+ ok = file_handle_cache:obtain(),
%% accept more
accept(State);
@@ -88,9 +69,12 @@ handle_info({inet_async, LSock, Ref, {error, closed}},
handle_info({inet_async, LSock, Ref, {error, Reason}},
State=#state{sock=LSock, ref=Ref}) ->
- {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
+ {AddressS, Port} = case inet:sockname(LSock) of
+ {ok, {A, P}} -> {rabbit_misc:ntoab(A), P};
+ {error, _} -> {"unknown", unknown}
+ end,
error_logger:error_msg("failed to accept TCP connection on ~s:~p: ~p~n",
- [rabbit_misc:ntoab(Address), Port, Reason]),
+ [AddressS, Port, Reason]),
accept(State);
handle_info(_Info, State) ->
@@ -104,8 +88,6 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
-inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
-
accept(State = #state{sock=LSock}) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} -> {noreply, State#state{ref=Ref}};
diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl
index cb3dd02c42..d8844441d2 100644
--- a/src/tcp_acceptor_sup.erl
+++ b/src/tcp_acceptor_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_acceptor_sup).
diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl
index 9a82ac88c1..fb01c792f4 100644
--- a/src/tcp_listener.erl
+++ b/src/tcp_listener.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_listener).
@@ -72,8 +72,9 @@ init({IPAddress, Port, SocketOpts,
label = Label}};
{error, Reason} ->
error_logger:error_msg(
- "failed to start ~s on ~s:~p - ~p~n",
- [Label, rabbit_misc:ntoab(IPAddress), Port, Reason]),
+ "failed to start ~s on ~s:~p - ~p (~s)~n",
+ [Label, rabbit_misc:ntoab(IPAddress), Port,
+ Reason, inet:format_error(Reason)]),
{stop, {cannot_listen, IPAddress, Port, Reason}}
end.
diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl
index 74297b6dce..9ee921b423 100644
--- a/src/tcp_listener_sup.erl
+++ b/src/tcp_listener_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_listener_sup).
diff --git a/src/test_sup.erl b/src/test_sup.erl
index 5feb146f64..7f4b5049fe 100644
--- a/src/test_sup.erl
+++ b/src/test_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(test_sup).
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 8973a4f750..fca55f02f1 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% In practice Erlang shouldn't be allowed to grow to more than a half
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index fcb07a16f4..c9ecccd6b6 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(worker_pool).
diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl
index d37c3a0fd6..ff356366ac 100644
--- a/src/worker_pool_sup.erl
+++ b/src/worker_pool_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(worker_pool_sup).
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index b42530e269..1ddcebb23d 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(worker_pool_worker).