summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/examples-to-end.xsl4
-rw-r--r--docs/html-to-website-xml.xsl10
-rw-r--r--docs/rabbitmq-activate-plugins.1.xml2
-rw-r--r--docs/rabbitmq-deactivate-plugins.1.xml2
-rw-r--r--docs/rabbitmq-multi.1.xml2
-rw-r--r--docs/rabbitmq-server.1.xml6
-rw-r--r--docs/rabbitmq-service.xml24
-rw-r--r--docs/rabbitmq.conf.5.xml2
-rw-r--r--docs/rabbitmqctl.1.xml33
-rw-r--r--docs/usage.xsl10
-rw-r--r--include/rabbit.hrl1
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec4
-rw-r--r--packaging/debs/Debian/debian/control2
-rwxr-xr-xscripts/rabbitmq-server2
-rw-r--r--scripts/rabbitmq-server.bat1
-rw-r--r--src/rabbit.erl67
-rw-r--r--src/rabbit_amqqueue.erl91
-rw-r--r--src/rabbit_amqqueue_process.erl43
-rw-r--r--src/rabbit_exchange.erl2
-rw-r--r--src/rabbit_misc.erl16
-rw-r--r--src/rabbit_persister.erl122
-rw-r--r--src/worker_pool_worker.erl16
22 files changed, 225 insertions, 237 deletions
diff --git a/docs/examples-to-end.xsl b/docs/examples-to-end.xsl
index 496fcc1c34..d9686adac8 100644
--- a/docs/examples-to-end.xsl
+++ b/docs/examples-to-end.xsl
@@ -55,7 +55,7 @@ indented)
<term>
<cmdsynopsis>
- <command>list_connections</command>
+ <command>list_connections</command>
<arg choice="opt">
<replaceable>connectioninfoitem</replaceable>
...
@@ -66,7 +66,7 @@ indented)
However, while DocBook renders this sensibly for HTML, for some reason it
doen't show anything inside <cmdsynopsis> at all for man pages. I think what
we're doing is semantically correct so this is a bug in DocBook. The following
- rules essentially do what DocBook does when <cmdsynopsis> is not inside a
+ rules essentially do what DocBook does when <cmdsynopsis> is not inside a
<term>.
-->
diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl
index a35b869967..f2117e2679 100644
--- a/docs/html-to-website-xml.xsl
+++ b/docs/html-to-website-xml.xsl
@@ -26,8 +26,8 @@
<xsl:choose>
<xsl:when test="document($original)/refentry/refmeta/manvolnum">
<p>
- This is the manual page for
- <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>.
+ This is the manual page for
+ <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>.
</p>
<p>
<a href="manpages.html">See a list of all manual pages</a>.
@@ -35,13 +35,13 @@
</xsl:when>
<xsl:otherwise>
<p>
- This is the documentation for
- <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>.
+ This is the documentation for
+ <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>.
</p>
</xsl:otherwise>
</xsl:choose>
<p>
- For more general documentation, please see the
+ For more general documentation, please see the
<a href="admin-guide.html">administrator's guide</a>.
</p>
diff --git a/docs/rabbitmq-activate-plugins.1.xml b/docs/rabbitmq-activate-plugins.1.xml
index ef81c201f7..5f8316346c 100644
--- a/docs/rabbitmq-activate-plugins.1.xml
+++ b/docs/rabbitmq-activate-plugins.1.xml
@@ -24,7 +24,7 @@
<command>rabbitmq-activate-plugins</command>
</cmdsynopsis>
</refsynopsisdiv>
-
+
<refsect1>
<title>Description</title>
<para>
diff --git a/docs/rabbitmq-deactivate-plugins.1.xml b/docs/rabbitmq-deactivate-plugins.1.xml
index eacd014b83..bbf1207e8a 100644
--- a/docs/rabbitmq-deactivate-plugins.1.xml
+++ b/docs/rabbitmq-deactivate-plugins.1.xml
@@ -24,7 +24,7 @@
<command>rabbitmq-deactivate-plugins</command>
</cmdsynopsis>
</refsynopsisdiv>
-
+
<refsect1>
<title>Description</title>
<para>
diff --git a/docs/rabbitmq-multi.1.xml b/docs/rabbitmq-multi.1.xml
index b3862fdf87..6586890abf 100644
--- a/docs/rabbitmq-multi.1.xml
+++ b/docs/rabbitmq-multi.1.xml
@@ -26,7 +26,7 @@
<arg choice="opt" rep="repeat"><replaceable>command options</replaceable></arg>
</cmdsynopsis>
</refsynopsisdiv>
-
+
<refsect1>
<title>Description</title>
<para>
diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml
index 25c2aefb8b..921da4f1cc 100644
--- a/docs/rabbitmq-server.1.xml
+++ b/docs/rabbitmq-server.1.xml
@@ -25,7 +25,7 @@
<arg choice="opt">-detached</arg>
</cmdsynopsis>
</refsynopsisdiv>
-
+
<refsect1>
<title>Description</title>
<para>
@@ -72,7 +72,7 @@ be placed in this directory.
<para>
Defaults to rabbit. This can be useful if you want to run more than
one node per machine - <envar>RABBITMQ_NODENAME</envar> should be unique per
-erlang-node-and-machine combination. See the
+erlang-node-and-machine combination. See the
<ulink url="http://www.rabbitmq.com/clustering.html#single-machine">clustering on a single
machine guide</ulink> for details.
</para>
@@ -93,7 +93,7 @@ one network interface.
<term>RABBITMQ_NODE_PORT</term>
<listitem>
<para>
-Defaults to 5672.
+Defaults to 5672.
</para>
</listitem>
</varlistentry>
diff --git a/docs/rabbitmq-service.xml b/docs/rabbitmq-service.xml
index d59ed63813..2b416e3e1e 100644
--- a/docs/rabbitmq-service.xml
+++ b/docs/rabbitmq-service.xml
@@ -24,7 +24,7 @@
<arg choice="opt">command</arg>
</cmdsynopsis>
</refsynopsisdiv>
-
+
<refsect1>
<title>Description</title>
<para>
@@ -34,14 +34,14 @@ scalable implementation of an AMQP broker.
</para>
<para>
Running <command>rabbitmq-service</command> allows the RabbitMQ broker to be run as a
-service on NT/2000/2003/XP/Vista® environments. The RabbitMQ broker
-service can be started and stopped using the Windows® services
-applet.
+service on NT/2000/2003/XP/Vista® environments. The RabbitMQ broker
+service can be started and stopped using the Windows® services
+applet.
</para>
<para>
-By default the service will run in the authentication context of the
+By default the service will run in the authentication context of the
local system account. It is therefore necessary to synchronise Erlang
-cookies between the local system account (typically
+cookies between the local system account (typically
<filename>C:\WINDOWS\.erlang.cookie</filename> and the account that will be used to
run <command>rabbitmqctl</command>.
</para>
@@ -87,7 +87,7 @@ deleted as a consequence and <command>rabbitmq-server</command> will remain oper
<listitem>
<para>
Start the service. The service must have been correctly installed
-beforehand.
+beforehand.
</para>
</listitem>
</varlistentry>
@@ -96,7 +96,7 @@ beforehand.
<term>stop</term>
<listitem>
<para>
-Stop the service. The service must be running for this command to
+Stop the service. The service must be running for this command to
have any effect.
</para>
</listitem>
@@ -154,7 +154,7 @@ This is the location of log and database directories.
<para>
Defaults to rabbit. This can be useful if you want to run more than
one node per machine - <envar>RABBITMQ_NODENAME</envar> should be unique per
-erlang-node-and-machine combination. See the
+erlang-node-and-machine combination. See the
<ulink url="http://www.rabbitmq.com/clustering.html#single-machine">clustering on a single
machine guide</ulink> for details.
</para>
@@ -175,7 +175,7 @@ one network interface.
<term>RABBITMQ_NODE_PORT</term>
<listitem>
<para>
-Defaults to 5672.
+Defaults to 5672.
</para>
</listitem>
</varlistentry>
@@ -208,11 +208,11 @@ for details.
<term>RABBITMQ_CONSOLE_LOG</term>
<listitem>
<para>
-Set this varable to <code>new</code> or <code>reuse</code> to have the console
+Set this varable to <code>new</code> or <code>reuse</code> to have the console
output from the server redirected to a file named <code>SERVICENAME</code>.debug
in the application data directory of the user that installed the service.
Under Vista this will be <filename>C:\Users\AppData\username\SERVICENAME</filename>.
-Under previous versions of Windows this will be
+Under previous versions of Windows this will be
<filename>C:\Documents and Settings\username\Application Data\SERVICENAME</filename>.
If <code>RABBITMQ_CONSOLE_LOG</code> is set to <code>new</code> then a new file will be
created each time the service starts. If <code>RABBITMQ_CONSOLE_LOG</code> is
diff --git a/docs/rabbitmq.conf.5.xml b/docs/rabbitmq.conf.5.xml
index 34f20f9226..31de71649c 100644
--- a/docs/rabbitmq.conf.5.xml
+++ b/docs/rabbitmq.conf.5.xml
@@ -18,7 +18,7 @@
<refname>rabbitmq.conf</refname>
<refpurpose>default settings for RabbitMQ AMQP server</refpurpose>
</refnamediv>
-
+
<refsect1>
<title>Description</title>
<para>
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 7634b2d247..5e2668c1a6 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1,19 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd">
-<!--
+<!--
There is some extra magic in this document besides the usual DocBook semantics
- to allow us to derive manpages, HTML and usage messages from the same source
+ to allow us to derive manpages, HTML and usage messages from the same source
document.
Examples need to be moved to the end for man pages. To this end, <para>s and
- <screen>s with role="example" will be moved, and with role="example-prefix"
+ <screen>s with role="example" will be moved, and with role="example-prefix"
will be removed.
The usage messages are more involved. We have some magic in usage.xsl to pull
out the command synopsis, global option and subcommand synopses. We also pull
out <para>s with role="usage".
- Finally we construct lists of possible values for subcommand options, if the
+ Finally we construct lists of possible values for subcommand options, if the
subcommand's <varlistentry> has role="usage-has-option-list". The option which
takes the values should be marked with role="usage-option-list".
-->
@@ -664,7 +664,7 @@
<para>
The <command>queueinfoitem</command> parameter is used to indicate which queue
information items to include in the results. The column order in the
- results will match the order of the parameters.
+ results will match the order of the parameters.
<command>queueinfoitem</command> can take any value from the list
that follows:
</para>
@@ -715,28 +715,15 @@
<listitem><para>Number of messages delivered to clients but not yet acknowledged.</para></listitem>
</varlistentry>
<varlistentry>
- <term>messages_uncommitted</term>
- <listitem><para>Number of messages published in as yet uncommitted transactions</para></listitem>
- </varlistentry>
- <varlistentry>
<term>messages</term>
- <listitem><para>Sum of ready, unacknowledged and uncommitted messages
+ <listitem><para>Sum of ready and unacknowledged messages
(queue depth).</para></listitem>
</varlistentry>
<varlistentry>
- <term>acks_uncommitted</term>
- <listitem><para>Number of acknowledgements received in as yet uncommitted
- transactions.</para></listitem>
- </varlistentry>
- <varlistentry>
<term>consumers</term>
<listitem><para>Number of consumers.</para></listitem>
</varlistentry>
<varlistentry>
- <term>transactions</term>
- <listitem><para>Number of transactions.</para></listitem>
- </varlistentry>
- <varlistentry>
<term>memory</term>
<listitem><para>Bytes of memory consumed by the Erlang process associated with the
queue, including stack, heap and internal structures.</para></listitem>
@@ -768,7 +755,7 @@
<para>
The <command>exchangeinfoitem</command> parameter is used to indicate which
exchange information items to include in the results. The column order in the
- results will match the order of the parameters.
+ results will match the order of the parameters.
<command>exchangeinfoitem</command> can take any value from the list
that follows:
</para>
@@ -797,7 +784,7 @@
</varlistentry>
</variablelist>
<para>
- If no <command>exchangeinfoitem</command>s are specified then
+ If no <command>exchangeinfoitem</command>s are specified then
exchange name and type are displayed.
</para>
<para role="example-prefix">
@@ -839,7 +826,7 @@
<para>
The <command>connectioninfoitem</command> parameter is used to indicate
which connection information items to include in the results. The
- column order in the results will match the order of the parameters.
+ column order in the results will match the order of the parameters.
<command>connectioninfoitem</command> can take any value from the list
that follows:
</para>
@@ -945,7 +932,7 @@
The <command>channelinfoitem</command> parameter is used to
indicate which channel information items to include in the
results. The column order in the results will match the
- order of the parameters.
+ order of the parameters.
<command>channelinfoitem</command> can take any value from the list
that follows:
</para>
diff --git a/docs/usage.xsl b/docs/usage.xsl
index 72f8880ab1..a6cebd93bf 100644
--- a/docs/usage.xsl
+++ b/docs/usage.xsl
@@ -11,7 +11,7 @@
<xsl:output method="text"
encoding="UTF-8"
indent="no"/>
-<xsl:strip-space elements="*"/>
+<xsl:strip-space elements="*"/>
<xsl:preserve-space elements="cmdsynopsis arg" />
<xsl:template match="/">
@@ -19,7 +19,7 @@
-module(<xsl:value-of select="$modulename" />).
-export([usage/0]).
usage() -> %QUOTE%Usage:
-<xsl:value-of select="refentry/refsynopsisdiv/cmdsynopsis/command"/>
+<xsl:value-of select="refentry/refsynopsisdiv/cmdsynopsis/command"/>
<xsl:text> </xsl:text>
<xsl:for-each select="refentry/refsynopsisdiv/cmdsynopsis/arg">
<xsl:apply-templates select="." />
@@ -28,7 +28,7 @@ usage() -> %QUOTE%Usage:
<xsl:text>&#10;</xsl:text>
-<!-- List options (any variable list in a section called "Options"). -->
+<!-- List options (any variable list in a section called "Options"). -->
<xsl:for-each select=".//*[title='Options']/variablelist">
<xsl:if test="position() = 1">&#10;Options:&#10;</xsl:if>
<xsl:for-each select="varlistentry">
@@ -40,13 +40,13 @@ usage() -> %QUOTE%Usage:
</xsl:for-each>
</xsl:for-each>
-<!-- Any paragraphs which have been marked as role="usage" (principally for global flags). -->
+<!-- Any paragraphs which have been marked as role="usage" (principally for global flags). -->
<xsl:text>&#10;</xsl:text>
<xsl:for-each select=".//*[title='Options']//para[@role='usage']">
<xsl:value-of select="normalize-space(.)"/><xsl:text>&#10;&#10;</xsl:text>
</xsl:for-each>
-<!-- List commands (any first-level variable list in a section called "Commands"). -->
+<!-- List commands (any first-level variable list in a section called "Commands"). -->
<xsl:for-each select=".//*[title='Commands']/variablelist | .//*[title='Commands']/refsect2/variablelist">
<xsl:if test="position() = 1">Commands:&#10;</xsl:if>
<xsl:for-each select="varlistentry">
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index def1b69eec..c2dad74475 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -176,6 +176,7 @@
-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.").
-define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/").
+-define(ERTS_MINIMUM, "5.6.3").
-define(MAX_WAIT, 16#ffffffff).
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index b052d889b3..6926261f79 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -13,8 +13,8 @@ Source4: rabbitmq-asroot-script-wrapper
Source5: rabbitmq-server.ocf
URL: http://www.rabbitmq.com/
BuildArch: noarch
-BuildRequires: erlang, python-simplejson, xmlto, libxslt
-Requires: erlang, logrotate
+BuildRequires: erlang >= R12B-3, python-simplejson, xmlto, libxslt
+Requires: erlang >= R12B-3, logrotate
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root
Summary: The RabbitMQ server
Requires(post): %%REQUIRES%%
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index 479c356829..a44f49a0e0 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -7,7 +7,7 @@ Standards-Version: 3.8.0
Package: rabbitmq-server
Architecture: all
-Depends: erlang-base | erlang-base-hipe, erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends}
+Depends: erlang-base (>= 1:12.b.3) | erlang-base-hipe (>= 1:12.b.3), erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends}
Description: An AMQP server written in Erlang
RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 638498c1e2..ccdfc40160 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -31,7 +31,7 @@
##
NODENAME=rabbit
-SERVER_ERL_ARGS="+K true +A30 \
+SERVER_ERL_ARGS="+K true +A30 +P 1048576 \
-kernel inet_default_listen_options [{nodelay,true}] \
-kernel inet_default_connect_options [{nodelay,true}]"
CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 28eb8ebb8d..57fe1328ce 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -145,6 +145,7 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
-s rabbit ^
+W w ^
+A30 ^
++P 1048576 ^
-kernel inet_default_listen_options "[{nodelay, true}]" ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^
!RABBITMQ_LISTEN_ARG! ^
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 7cf92a6243..c438589ebb 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -47,7 +47,8 @@
[{description, "codec correctness check"},
{mfa, {rabbit_binary_generator,
check_empty_content_body_frame_size,
- []}}]}).
+ []}},
+ {enables, external_infrastructure}]}).
-rabbit_boot_step({database,
[{mfa, {rabbit_mnesia, init, []}},
@@ -65,21 +66,21 @@
[{description, "exchange type registry"},
{mfa, {rabbit_sup, start_child,
[rabbit_exchange_type_registry]}},
- {enables, kernel_ready},
- {requires, external_infrastructure}]}).
+ {requires, external_infrastructure},
+ {enables, kernel_ready}]}).
-rabbit_boot_step({rabbit_log,
[{description, "logging server"},
{mfa, {rabbit_sup, start_restartable_child,
[rabbit_log]}},
- {enables, kernel_ready},
- {requires, external_infrastructure}]}).
+ {requires, external_infrastructure},
+ {enables, kernel_ready}]}).
-rabbit_boot_step({rabbit_hooks,
[{description, "internal event notification system"},
{mfa, {rabbit_hooks, start, []}},
- {enables, kernel_ready},
- {requires, external_infrastructure}]}).
+ {requires, external_infrastructure},
+ {enables, kernel_ready}]}).
-rabbit_boot_step({kernel_ready,
[{description, "kernel ready"},
@@ -113,35 +114,36 @@
{enables, core_initialized}]}).
-rabbit_boot_step({core_initialized,
- [{description, "core initialized"}]}).
+ [{description, "core initialized"},
+ {requires, kernel_ready}]}).
-rabbit_boot_step({empty_db_check,
[{description, "empty DB check"},
{mfa, {?MODULE, maybe_insert_default_data, []}},
- {requires, core_initialized}]}).
+ {requires, core_initialized},
+ {enables, routing_ready}]}).
-rabbit_boot_step({exchange_recovery,
[{description, "exchange recovery"},
{mfa, {rabbit_exchange, recover, []}},
- {requires, empty_db_check}]}).
+ {requires, empty_db_check},
+ {enables, routing_ready}]}).
-rabbit_boot_step({queue_sup_queue_recovery,
[{description, "queue supervisor and queue recovery"},
{mfa, {rabbit_amqqueue, start, []}},
- {requires, empty_db_check}]}).
-
--rabbit_boot_step({persister,
- [{mfa, {rabbit_sup, start_child,
- [rabbit_persister]}},
- {requires, queue_sup_queue_recovery}]}).
+ {requires, empty_db_check},
+ {enables, routing_ready}]}).
-rabbit_boot_step({routing_ready,
- [{description, "message delivery logic ready"}]}).
+ [{description, "message delivery logic ready"},
+ {requires, core_initialized}]}).
-rabbit_boot_step({log_relay,
[{description, "error log relay"},
{mfa, {rabbit_error_logger, boot, []}},
- {requires, routing_ready}]}).
+ {requires, routing_ready},
+ {enables, networking}]}).
-rabbit_boot_step({networking,
[{mfa, {rabbit_networking, boot, []}},
@@ -226,14 +228,18 @@ rotate_logs(BinarySuffix) ->
%%--------------------------------------------------------------------
start(normal, []) ->
- {ok, SupPid} = rabbit_sup:start_link(),
-
- print_banner(),
- [ok = run_boot_step(Step) || Step <- boot_steps()],
- io:format("~nbroker running~n"),
+ case erts_version_check() of
+ ok ->
+ {ok, SupPid} = rabbit_sup:start_link(),
- {ok, SupPid}.
+ print_banner(),
+ [ok = run_boot_step(Step) || Step <- boot_steps()],
+ io:format("~nbroker running~n"),
+ {ok, SupPid};
+ Error ->
+ Error
+ end.
stop(_State) ->
terminated_ok = error_logger:delete_report_handler(rabbit_error_logger),
@@ -246,6 +252,14 @@ stop(_State) ->
%%---------------------------------------------------------------------------
+erts_version_check() ->
+ FoundVer = erlang:system_info(version),
+ case rabbit_misc:version_compare(?ERTS_MINIMUM, FoundVer, lte) of
+ true -> ok;
+ false -> {error, {erlang_version_too_old,
+ {found, FoundVer}, {required, ?ERTS_MINIMUM}}}
+ end.
+
boot_error(Format, Args) ->
io:format("BOOT ERROR: " ++ Format, Args),
error_logger:error_msg(Format, Args),
@@ -396,8 +410,9 @@ print_banner() ->
{"cookie hash", rabbit_misc:cookie_hash()},
{"log", log_location(kernel)},
{"sasl log", log_location(sasl)},
- {"database dir", rabbit_mnesia:dir()}],
- DescrLen = lists:max([length(K) || {K, _V} <- Settings]),
+ {"database dir", rabbit_mnesia:dir()},
+ {"erlang version", erlang:system_info(version)}],
+ DescrLen = 1 + lists:max([length(K) || {K, _V} <- Settings]),
Format = "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n",
lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings),
io:nl().
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index da12d51516..d749408ed8 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -35,7 +35,7 @@
-export([internal_declare/2, internal_delete/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
- stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
+ stat/1, stat_all/0, deliver/2, requeue/3, ack/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([consumers/1, consumers_all/1]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
@@ -87,7 +87,6 @@
{'error', 'not_empty'}).
-spec(purge/1 :: (amqqueue()) -> qlen()).
-spec(deliver/2 :: (pid(), delivery()) -> boolean()).
--spec(redeliver/2 :: (pid(), [{message(), boolean()}]) -> 'ok').
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
-spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()).
@@ -116,6 +115,9 @@
start() ->
DurableQueues = find_durable_queues(),
+ ok = rabbit_sup:start_child(
+ rabbit_persister,
+ [[QName || #amqqueue{name = QName} <- DurableQueues]]),
{ok,_} = supervisor:start_child(
rabbit_sup,
{rabbit_amqqueue_sup,
@@ -140,41 +142,51 @@ shared_or_live_owner(Owner) when is_pid(Owner) ->
rpc:call(node(Owner), erlang, is_process_alive, [Owner]).
recover_durable_queues(DurableQueues) ->
- lists:foldl(
- fun (RecoveredQ = #amqqueue{ exclusive_owner = Owner },
- Acc) ->
- %% We need to catch the case where a client connected to
- %% another node has deleted the queue (and possibly
- %% re-created it).
- DoIfSameQueue =
- fun (Action) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () -> case mnesia:match_object(
- rabbit_durable_queue, RecoveredQ, read) of
- [_] -> {true, Action()};
- [] -> false
- end
- end)
- end,
- case shared_or_live_owner(Owner) of
- true ->
- Q = start_queue_process(RecoveredQ),
- case DoIfSameQueue(fun () -> store_queue(Q) end) of
- {true, ok} -> [Q | Acc];
- false -> exit(Q#amqqueue.pid, shutdown),
- Acc
- end;
- false ->
- case DoIfSameQueue(
- fun () ->
- internal_delete2(RecoveredQ#amqqueue.name)
- end) of
- {true, Hook} -> Hook();
- false -> ok
- end,
- Acc
- end
- end, [], DurableQueues).
+ Qs = [start_queue_process(Q) || Q <- DurableQueues],
+ %% Issue inits to *all* the queues so that they all init at the same time
+ [ok = gen_server2:cast(Q#amqqueue.pid, {init, true}) || Q <- Qs],
+ [ok = gen_server2:call(Q#amqqueue.pid, sync, infinity) || Q <- Qs],
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> [ok = store_queue(Q) || Q <- Qs] end),
+ Qs.
+
+%% This changed too radically to merge. We'll fix this later; see bug 22695
+%% recover_durable_queues(DurableQueues) ->
+%% lists:foldl(
+%% fun (RecoveredQ = #amqqueue{ exclusive_owner = Owner },
+%% Acc) ->
+%% %% We need to catch the case where a client connected to
+%% %% another node has deleted the queue (and possibly
+%% %% re-created it).
+%% DoIfSameQueue =
+%% fun (Action) ->
+%% rabbit_misc:execute_mnesia_transaction(
+%% fun () -> case mnesia:match_object(
+%% rabbit_durable_queue, RecoveredQ, read) of
+%% [_] -> {true, Action()};
+%% [] -> false
+%% end
+%% end)
+%% end,
+%% case shared_or_live_owner(Owner) of
+%% true ->
+%% Q = start_queue_process(RecoveredQ),
+%% case DoIfSameQueue(fun () -> store_queue(Q) end) of
+%% {true, ok} -> [Q | Acc];
+%% false -> exit(Q#amqqueue.pid, shutdown),
+%% Acc
+%% end;
+%% false ->
+%% case DoIfSameQueue(
+%% fun () ->
+%% internal_delete2(RecoveredQ#amqqueue.name)
+%% end) of
+%% {true, Hook} -> Hook();
+%% false -> ok
+%% end,
+%% Acc
+%% end
+%% end, [], DurableQueues).
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
Q = start_queue_process(#amqqueue{name = QueueName,
@@ -183,6 +195,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
arguments = Args,
exclusive_owner = Owner,
pid = none}),
+ ok = gen_server2:cast(Q#amqqueue.pid, {init, false}),
+ ok = gen_server2:call(Q#amqqueue.pid, sync, infinity),
internal_declare(Q, true).
internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) ->
@@ -297,9 +311,6 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}),
true.
-redeliver(QPid, Messages) ->
- gen_server2:cast(QPid, {redeliver, Messages}).
-
requeue(QPid, MsgIds, ChPid) ->
gen_server2:call(QPid, {requeue, MsgIds, ChPid}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5e1231b92c..696ed7fa34 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -79,11 +79,8 @@
exclusive_consumer_tag,
messages_ready,
messages_unacknowledged,
- messages_uncommitted,
messages,
- acks_uncommitted,
consumers,
- transactions,
memory]).
%%----------------------------------------------------------------------------
@@ -91,24 +88,22 @@
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
info_keys() -> ?INFO_KEYS.
-
+
%%----------------------------------------------------------------------------
init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
- case Q#amqqueue.exclusive_owner of
- none -> ok;
- ReaderPid -> erlang:monitor(process, ReaderPid)
- end,
{ok, #q{q = Q,
exclusive_consumer = none,
has_had_consumers = false,
next_msg_id = 1,
- message_buffer = queue:new(),
+ message_buffer = undefined,
active_consumers = queue:new(),
blocked_consumers = queue:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+terminate(_Reason, #q{message_buffer = undefined}) ->
+ ok;
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
QName = qname(State),
@@ -441,9 +436,6 @@ store_tx(Txn, Tx) ->
erase_tx(Txn) ->
erase({txn, Txn}).
-all_tx_record() ->
- [T || {{txn, _}, T} <- get()].
-
all_tx() ->
[Txn || {{txn, Txn}, _} <- get()].
@@ -517,20 +509,11 @@ i(messages_ready, #q{message_buffer = MessageBuffer}) ->
i(messages_unacknowledged, _) ->
lists:sum([dict:size(UAM) ||
#cr{unacked_messages = UAM} <- all_ch_record()]);
-i(messages_uncommitted, _) ->
- lists:sum([length(Pending) ||
- #tx{pending_messages = Pending} <- all_tx_record()]);
i(messages, State) ->
lists:sum([i(Item, State) || Item <- [messages_ready,
- messages_unacknowledged,
- messages_uncommitted]]);
-i(acks_uncommitted, _) ->
- lists:sum([length(Pending) ||
- #tx{pending_acks = Pending} <- all_tx_record()]);
+ messages_unacknowledged]]);
i(consumers, State) ->
queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers);
-i(transactions, _) ->
- length(all_tx_record());
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
@@ -539,6 +522,9 @@ i(Item, _) ->
%---------------------------------------------------------------------------
+handle_call(sync, _From, State) ->
+ reply(ok, State);
+
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State);
@@ -730,6 +716,16 @@ handle_call({requeue, MsgIds, ChPid}, _From, State) ->
[{Message, true} || Message <- Messages], State))
end.
+handle_cast({init, Recover}, State = #q{message_buffer = undefined, q = Q}) ->
+ case Q#amqqueue.exclusive_owner of
+ none -> ok;
+ ReaderPid -> erlang:monitor(process, ReaderPid)
+ end,
+ Messages = case Recover of
+ true -> rabbit_persister:queue_content(qname(State));
+ false -> []
+ end,
+ noreply(State#q{message_buffer = queue:from_list(Messages)});
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
@@ -758,9 +754,6 @@ handle_cast({rollback, Txn, ChPid}, State) ->
record_current_channel_tx(ChPid, none),
noreply(State);
-handle_cast({redeliver, Messages}, State) ->
- noreply(deliver_or_enqueue_n(Messages, State));
-
handle_cast({unblock, ChPid}, State) ->
noreply(
possibly_unblock(State, ChPid,
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index b63afe5631..098d7ee956 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -413,7 +413,7 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
if Q#amqqueue.durable and not(X#exchange.durable) ->
{error, durability_settings_incompatible};
true ->
- case mnesia:read(rabbit_route, B) of
+ case mnesia:read({rabbit_route, B}) of
[] ->
sync_binding(B, Q#amqqueue.durable,
fun mnesia:write/3),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 2c1808465f..59ba277610 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -598,24 +598,28 @@ version_compare(A, B, gte) ->
version_compare(A, B, Result) ->
Result =:= version_compare(A, B).
-version_compare([], []) ->
+version_compare(A, A) ->
eq;
-version_compare([], _ ) ->
+version_compare([], [$0 | B]) ->
+ version_compare([], dropdot(B));
+version_compare([], _) ->
lt; %% 2.3 < 2.3.1
-version_compare(_ , []) ->
+version_compare([$0 | A], []) ->
+ version_compare(dropdot(A), []);
+version_compare(_, []) ->
gt; %% 2.3.1 > 2.3
version_compare(A, B) ->
{AStr, ATl} = lists:splitwith(fun (X) -> X =/= $. end, A),
{BStr, BTl} = lists:splitwith(fun (X) -> X =/= $. end, B),
ANum = list_to_integer(AStr),
BNum = list_to_integer(BStr),
- if ANum =:= BNum -> ATl1 = lists:dropwhile(fun (X) -> X =:= $. end, ATl),
- BTl1 = lists:dropwhile(fun (X) -> X =:= $. end, BTl),
- version_compare(ATl1, BTl1);
+ if ANum =:= BNum -> version_compare(dropdot(ATl), dropdot(BTl));
ANum < BNum -> lt;
ANum > BNum -> gt
end.
+dropdot(A) -> lists:dropwhile(fun (X) -> X =:= $. end, A).
+
recursive_delete(Files) ->
lists:foldl(fun (Path, ok ) -> recursive_delete1(Path);
(_Path, {error, _Err} = Error) -> Error
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index a9e0cab928..a8e41baf74 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -33,14 +33,14 @@
-behaviour(gen_server).
--export([start_link/0]).
+-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([transaction/1, extend_transaction/2, dirty_work/1,
commit_transaction/1, rollback_transaction/1,
- force_snapshot/0]).
+ force_snapshot/0, queue_content/1]).
-include("rabbit.hrl").
@@ -52,8 +52,7 @@
-define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}).
-record(pstate, {log_handle, entry_count, deadline,
- pending_logs, pending_replies,
- snapshot}).
+ pending_logs, pending_replies, snapshot}).
%% two tables for efficient persistency
%% one maps a key to a message
@@ -72,20 +71,22 @@
{deliver, pmsg()} |
{ack, pmsg()}).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/1 :: ([queue_name()]) ->
+ {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(transaction/1 :: ([work_item()]) -> 'ok').
-spec(extend_transaction/2 :: ({txn(), queue_name()}, [work_item()]) -> 'ok').
-spec(dirty_work/1 :: ([work_item()]) -> 'ok').
-spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
-spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
-spec(force_snapshot/0 :: () -> 'ok').
+-spec(queue_content/1 :: (queue_name()) -> [{message(), boolean()}]).
-endif.
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+start_link(DurableQueues) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [DurableQueues], []).
transaction(MessageList) ->
?LOGDEBUG("transaction ~p~n", [MessageList]),
@@ -111,15 +112,18 @@ rollback_transaction(TxnKey) ->
force_snapshot() ->
gen_server:call(?SERVER, force_snapshot, infinity).
+queue_content(QName) ->
+ gen_server:call(?SERVER, {queue_content, QName}, infinity).
+
%%--------------------------------------------------------------------
-init(_Args) ->
+init([DurableQueues]) ->
process_flag(trap_exit, true),
FileName = base_filename(),
ok = filelib:ensure_dir(FileName),
Snapshot = #psnapshot{transactions = dict:new(),
messages = ets:new(messages, []),
- queues = ets:new(queues, []),
+ queues = ets:new(queues, [ordered_set]),
next_seq_id = 0},
LogHandle =
case disk_log:open([{name, rabbit_persister},
@@ -135,7 +139,8 @@ init(_Args) ->
[Recovered, Bad]),
LH
end,
- {Res, NewSnapshot} = internal_load_snapshot(LogHandle, Snapshot),
+ {Res, NewSnapshot} =
+ internal_load_snapshot(LogHandle, DurableQueues, Snapshot),
case Res of
ok ->
ok = take_snapshot(LogHandle, NewSnapshot);
@@ -143,12 +148,12 @@ init(_Args) ->
rabbit_log:error("Failed to load persister log: ~p~n", [Reason]),
ok = take_snapshot_and_save_old(LogHandle, NewSnapshot)
end,
- State = #pstate{log_handle = LogHandle,
- entry_count = 0,
- deadline = infinity,
- pending_logs = [],
- pending_replies = [],
- snapshot = NewSnapshot},
+ State = #pstate{log_handle = LogHandle,
+ entry_count = 0,
+ deadline = infinity,
+ pending_logs = [],
+ pending_replies = [],
+ snapshot = NewSnapshot},
{ok, State}.
handle_call({transaction, Key, MessageList}, From, State) ->
@@ -158,6 +163,13 @@ handle_call({commit_transaction, TxnKey}, From, State) ->
do_noreply(internal_commit(From, TxnKey, State));
handle_call(force_snapshot, _From, State) ->
do_reply(ok, flush(true, State));
+handle_call({queue_content, QName}, _From,
+ State = #pstate{snapshot = #psnapshot{messages = Messages,
+ queues = Queues}}) ->
+ MatchSpec= [{{{QName,'$1'}, '$2', '$3'}, [], [{{'$3', '$1', '$2'}}]}],
+ do_reply([{ets:lookup_element(Messages, K, 2), D} ||
+ {_, K, D} <- lists:sort(ets:select(Queues, MatchSpec))],
+ State);
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -339,10 +351,10 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts,
next_seq_id = NextSeqId}) ->
%% Avoid infinite growth of the table by removing messages not
%% bound to a queue anymore
- prune_table(Messages, ets:foldl(
- fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
- sets:add_element(PKey, S)
- end, sets:new(), Queues)),
+ PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
+ sets:add_element(PKey, S)
+ end, sets:new(), Queues),
+ prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end),
InnerSnapshot = {{txns, Ts},
{messages, ets:tab2list(Messages)},
{queues, ets:tab2list(Queues)},
@@ -351,20 +363,21 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts,
{persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
term_to_binary(InnerSnapshot)}.
-prune_table(Tab, Keys) ->
+prune_table(Tab, Pred) ->
true = ets:safe_fixtable(Tab, true),
- ok = prune_table(Tab, Keys, ets:first(Tab)),
+ ok = prune_table(Tab, Pred, ets:first(Tab)),
true = ets:safe_fixtable(Tab, false).
-prune_table(_Tab, _Keys, '$end_of_table') -> ok;
-prune_table(Tab, Keys, Key) ->
- case sets:is_element(Key, Keys) of
+prune_table(_Tab, _Pred, '$end_of_table') -> ok;
+prune_table(Tab, Pred, Key) ->
+ case Pred(Key) of
true -> ok;
false -> ets:delete(Tab, Key)
end,
- prune_table(Tab, Keys, ets:next(Tab, Key)).
+ prune_table(Tab, Pred, ets:next(Tab, Key)).
internal_load_snapshot(LogHandle,
+ DurableQueues,
Snapshot = #psnapshot{messages = Messages,
queues = Queues}) ->
{K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start),
@@ -378,11 +391,18 @@ internal_load_snapshot(LogHandle,
Snapshot#psnapshot{
transactions = Ts,
next_seq_id = NextSeqId}),
- Snapshot2 = requeue_messages(Snapshot1),
+ %% Remove all entries for queues that no longer exist.
+ %% Note that the 'messages' table is pruned when the next
+ %% snapshot is taken.
+ DurableQueuesSet = sets:from_list(DurableQueues),
+ prune_table(Snapshot1#psnapshot.queues,
+ fun ({QName, _PKey}) ->
+ sets:is_element(QName, DurableQueuesSet)
+ end),
%% uncompleted transactions are discarded - this is TRTTD
%% since we only get into this code on node restart, so
%% any uncompleted transactions will have been aborted.
- {ok, Snapshot2#psnapshot{transactions = dict:new()}};
+ {ok, Snapshot1#psnapshot{transactions = dict:new()}};
{error, Reason} -> {{error, Reason}, Snapshot}
end.
@@ -394,52 +414,6 @@ check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) ->
check_version(_Other) ->
{error, unrecognised_persister_log_format}.
-requeue_messages(Snapshot = #psnapshot{messages = Messages,
- queues = Queues}) ->
- Work = ets:foldl(
- fun ({{QName, PKey}, Delivered, SeqId}, Acc) ->
- rabbit_misc:dict_cons(QName, {SeqId, PKey, Delivered}, Acc)
- end, dict:new(), Queues),
- %% unstable parallel map, because order doesn't matter
- L = lists:append(
- rabbit_misc:upmap(
- %% we do as much work as possible in spawned worker
- %% processes, but we need to make sure the ets:inserts are
- %% performed in self()
- fun ({QName, Requeues}) ->
- requeue(QName, Requeues, Messages)
- end, dict:to_list(Work))),
- NewMessages = [{K, M} || {_S, _Q, K, M, _D} <- L],
- NewQueues = [{{Q, K}, D, S} || {S, Q, K, _M, D} <- L],
- ets:delete_all_objects(Messages),
- ets:delete_all_objects(Queues),
- true = ets:insert(Messages, NewMessages),
- true = ets:insert(Queues, NewQueues),
- %% contains the mutated messages and queues tables
- Snapshot.
-
-requeue(QName, Requeues, Messages) ->
- case rabbit_amqqueue:lookup(QName) of
- {ok, #amqqueue{pid = QPid}} ->
- RequeueMessages =
- [{SeqId, QName, PKey, Message, Delivered} ||
- {SeqId, PKey, Delivered} <- Requeues,
- {_, Message} <- ets:lookup(Messages, PKey)],
- rabbit_amqqueue:redeliver(
- QPid,
- %% Messages published by the same process receive
- %% persistence keys that are monotonically
- %% increasing. Since message ordering is defined on a
- %% per-channel basis, and channels are bound to specific
- %% processes, sorting the list does provide the correct
- %% ordering properties.
- [{Message, Delivered} || {_, _, _, Message, Delivered} <-
- lists:sort(RequeueMessages)]),
- RequeueMessages;
- {error, not_found} ->
- []
- end.
-
replay([], LogHandle, K, Snapshot) ->
case disk_log:chunk(LogHandle, K) of
{K1, Items} ->
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index d3a4811926..9ef8c636a3 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -46,6 +46,8 @@
-spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A).
-spec(submit_async/2 ::
(pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
+-spec(run/1 :: (fun (() -> A)) -> A;
+ ({atom(), atom(), [any()]}) -> any()).
-endif.
@@ -65,6 +67,13 @@ submit(Pid, Fun) ->
submit_async(Pid, Fun) ->
gen_server2:cast(Pid, {submit_async, Fun}).
+run({M, F, A}) ->
+ apply(M, F, A);
+run(Fun) ->
+ Fun().
+
+%%----------------------------------------------------------------------------
+
init([WId]) ->
ok = worker_pool:idle(WId),
put(worker_pool_worker, true),
@@ -95,10 +104,3 @@ code_change(_OldVsn, State, _Extra) ->
terminate(_Reason, State) ->
State.
-
-%%----------------------------------------------------------------------------
-
-run({M, F, A}) ->
- apply(M, F, A);
-run(Fun) ->
- Fun().