diff options
| -rw-r--r-- | docs/examples-to-end.xsl | 4 | ||||
| -rw-r--r-- | docs/html-to-website-xml.xsl | 10 | ||||
| -rw-r--r-- | docs/rabbitmq-activate-plugins.1.xml | 2 | ||||
| -rw-r--r-- | docs/rabbitmq-deactivate-plugins.1.xml | 2 | ||||
| -rw-r--r-- | docs/rabbitmq-multi.1.xml | 2 | ||||
| -rw-r--r-- | docs/rabbitmq-server.1.xml | 6 | ||||
| -rw-r--r-- | docs/rabbitmq-service.xml | 24 | ||||
| -rw-r--r-- | docs/rabbitmq.conf.5.xml | 2 | ||||
| -rw-r--r-- | docs/rabbitmqctl.1.xml | 33 | ||||
| -rw-r--r-- | docs/usage.xsl | 10 | ||||
| -rw-r--r-- | include/rabbit.hrl | 1 | ||||
| -rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 4 | ||||
| -rw-r--r-- | packaging/debs/Debian/debian/control | 2 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server | 2 | ||||
| -rw-r--r-- | scripts/rabbitmq-server.bat | 1 | ||||
| -rw-r--r-- | src/rabbit.erl | 67 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 91 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 43 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_persister.erl | 122 | ||||
| -rw-r--r-- | src/worker_pool_worker.erl | 16 |
22 files changed, 225 insertions, 237 deletions
diff --git a/docs/examples-to-end.xsl b/docs/examples-to-end.xsl index 496fcc1c34..d9686adac8 100644 --- a/docs/examples-to-end.xsl +++ b/docs/examples-to-end.xsl @@ -55,7 +55,7 @@ indented) <term> <cmdsynopsis> - <command>list_connections</command> + <command>list_connections</command> <arg choice="opt"> <replaceable>connectioninfoitem</replaceable> ... @@ -66,7 +66,7 @@ indented) However, while DocBook renders this sensibly for HTML, for some reason it doen't show anything inside <cmdsynopsis> at all for man pages. I think what we're doing is semantically correct so this is a bug in DocBook. The following - rules essentially do what DocBook does when <cmdsynopsis> is not inside a + rules essentially do what DocBook does when <cmdsynopsis> is not inside a <term>. --> diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl index a35b869967..f2117e2679 100644 --- a/docs/html-to-website-xml.xsl +++ b/docs/html-to-website-xml.xsl @@ -26,8 +26,8 @@ <xsl:choose> <xsl:when test="document($original)/refentry/refmeta/manvolnum"> <p> - This is the manual page for - <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>. + This is the manual page for + <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>. </p> <p> <a href="manpages.html">See a list of all manual pages</a>. @@ -35,13 +35,13 @@ </xsl:when> <xsl:otherwise> <p> - This is the documentation for - <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>. + This is the documentation for + <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>. </p> </xsl:otherwise> </xsl:choose> <p> - For more general documentation, please see the + For more general documentation, please see the <a href="admin-guide.html">administrator's guide</a>. </p> diff --git a/docs/rabbitmq-activate-plugins.1.xml b/docs/rabbitmq-activate-plugins.1.xml index ef81c201f7..5f8316346c 100644 --- a/docs/rabbitmq-activate-plugins.1.xml +++ b/docs/rabbitmq-activate-plugins.1.xml @@ -24,7 +24,7 @@ <command>rabbitmq-activate-plugins</command> </cmdsynopsis> </refsynopsisdiv> - + <refsect1> <title>Description</title> <para> diff --git a/docs/rabbitmq-deactivate-plugins.1.xml b/docs/rabbitmq-deactivate-plugins.1.xml index eacd014b83..bbf1207e8a 100644 --- a/docs/rabbitmq-deactivate-plugins.1.xml +++ b/docs/rabbitmq-deactivate-plugins.1.xml @@ -24,7 +24,7 @@ <command>rabbitmq-deactivate-plugins</command> </cmdsynopsis> </refsynopsisdiv> - + <refsect1> <title>Description</title> <para> diff --git a/docs/rabbitmq-multi.1.xml b/docs/rabbitmq-multi.1.xml index b3862fdf87..6586890abf 100644 --- a/docs/rabbitmq-multi.1.xml +++ b/docs/rabbitmq-multi.1.xml @@ -26,7 +26,7 @@ <arg choice="opt" rep="repeat"><replaceable>command options</replaceable></arg> </cmdsynopsis> </refsynopsisdiv> - + <refsect1> <title>Description</title> <para> diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml index 25c2aefb8b..921da4f1cc 100644 --- a/docs/rabbitmq-server.1.xml +++ b/docs/rabbitmq-server.1.xml @@ -25,7 +25,7 @@ <arg choice="opt">-detached</arg> </cmdsynopsis> </refsynopsisdiv> - + <refsect1> <title>Description</title> <para> @@ -72,7 +72,7 @@ be placed in this directory. <para> Defaults to rabbit. This can be useful if you want to run more than one node per machine - <envar>RABBITMQ_NODENAME</envar> should be unique per -erlang-node-and-machine combination. See the +erlang-node-and-machine combination. See the <ulink url="http://www.rabbitmq.com/clustering.html#single-machine">clustering on a single machine guide</ulink> for details. </para> @@ -93,7 +93,7 @@ one network interface. <term>RABBITMQ_NODE_PORT</term> <listitem> <para> -Defaults to 5672. +Defaults to 5672. </para> </listitem> </varlistentry> diff --git a/docs/rabbitmq-service.xml b/docs/rabbitmq-service.xml index d59ed63813..2b416e3e1e 100644 --- a/docs/rabbitmq-service.xml +++ b/docs/rabbitmq-service.xml @@ -24,7 +24,7 @@ <arg choice="opt">command</arg> </cmdsynopsis> </refsynopsisdiv> - + <refsect1> <title>Description</title> <para> @@ -34,14 +34,14 @@ scalable implementation of an AMQP broker. </para> <para> Running <command>rabbitmq-service</command> allows the RabbitMQ broker to be run as a -service on NT/2000/2003/XP/Vista® environments. The RabbitMQ broker -service can be started and stopped using the Windows® services -applet. +service on NT/2000/2003/XP/Vista® environments. The RabbitMQ broker +service can be started and stopped using the Windows® services +applet. </para> <para> -By default the service will run in the authentication context of the +By default the service will run in the authentication context of the local system account. It is therefore necessary to synchronise Erlang -cookies between the local system account (typically +cookies between the local system account (typically <filename>C:\WINDOWS\.erlang.cookie</filename> and the account that will be used to run <command>rabbitmqctl</command>. </para> @@ -87,7 +87,7 @@ deleted as a consequence and <command>rabbitmq-server</command> will remain oper <listitem> <para> Start the service. The service must have been correctly installed -beforehand. +beforehand. </para> </listitem> </varlistentry> @@ -96,7 +96,7 @@ beforehand. <term>stop</term> <listitem> <para> -Stop the service. The service must be running for this command to +Stop the service. The service must be running for this command to have any effect. </para> </listitem> @@ -154,7 +154,7 @@ This is the location of log and database directories. <para> Defaults to rabbit. This can be useful if you want to run more than one node per machine - <envar>RABBITMQ_NODENAME</envar> should be unique per -erlang-node-and-machine combination. See the +erlang-node-and-machine combination. See the <ulink url="http://www.rabbitmq.com/clustering.html#single-machine">clustering on a single machine guide</ulink> for details. </para> @@ -175,7 +175,7 @@ one network interface. <term>RABBITMQ_NODE_PORT</term> <listitem> <para> -Defaults to 5672. +Defaults to 5672. </para> </listitem> </varlistentry> @@ -208,11 +208,11 @@ for details. <term>RABBITMQ_CONSOLE_LOG</term> <listitem> <para> -Set this varable to <code>new</code> or <code>reuse</code> to have the console +Set this varable to <code>new</code> or <code>reuse</code> to have the console output from the server redirected to a file named <code>SERVICENAME</code>.debug in the application data directory of the user that installed the service. Under Vista this will be <filename>C:\Users\AppData\username\SERVICENAME</filename>. -Under previous versions of Windows this will be +Under previous versions of Windows this will be <filename>C:\Documents and Settings\username\Application Data\SERVICENAME</filename>. If <code>RABBITMQ_CONSOLE_LOG</code> is set to <code>new</code> then a new file will be created each time the service starts. If <code>RABBITMQ_CONSOLE_LOG</code> is diff --git a/docs/rabbitmq.conf.5.xml b/docs/rabbitmq.conf.5.xml index 34f20f9226..31de71649c 100644 --- a/docs/rabbitmq.conf.5.xml +++ b/docs/rabbitmq.conf.5.xml @@ -18,7 +18,7 @@ <refname>rabbitmq.conf</refname> <refpurpose>default settings for RabbitMQ AMQP server</refpurpose> </refnamediv> - + <refsect1> <title>Description</title> <para> diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 7634b2d247..5e2668c1a6 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1,19 +1,19 @@ <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd"> -<!-- +<!-- There is some extra magic in this document besides the usual DocBook semantics - to allow us to derive manpages, HTML and usage messages from the same source + to allow us to derive manpages, HTML and usage messages from the same source document. Examples need to be moved to the end for man pages. To this end, <para>s and - <screen>s with role="example" will be moved, and with role="example-prefix" + <screen>s with role="example" will be moved, and with role="example-prefix" will be removed. The usage messages are more involved. We have some magic in usage.xsl to pull out the command synopsis, global option and subcommand synopses. We also pull out <para>s with role="usage". - Finally we construct lists of possible values for subcommand options, if the + Finally we construct lists of possible values for subcommand options, if the subcommand's <varlistentry> has role="usage-has-option-list". The option which takes the values should be marked with role="usage-option-list". --> @@ -664,7 +664,7 @@ <para> The <command>queueinfoitem</command> parameter is used to indicate which queue information items to include in the results. The column order in the - results will match the order of the parameters. + results will match the order of the parameters. <command>queueinfoitem</command> can take any value from the list that follows: </para> @@ -715,28 +715,15 @@ <listitem><para>Number of messages delivered to clients but not yet acknowledged.</para></listitem> </varlistentry> <varlistentry> - <term>messages_uncommitted</term> - <listitem><para>Number of messages published in as yet uncommitted transactions</para></listitem> - </varlistentry> - <varlistentry> <term>messages</term> - <listitem><para>Sum of ready, unacknowledged and uncommitted messages + <listitem><para>Sum of ready and unacknowledged messages (queue depth).</para></listitem> </varlistentry> <varlistentry> - <term>acks_uncommitted</term> - <listitem><para>Number of acknowledgements received in as yet uncommitted - transactions.</para></listitem> - </varlistentry> - <varlistentry> <term>consumers</term> <listitem><para>Number of consumers.</para></listitem> </varlistentry> <varlistentry> - <term>transactions</term> - <listitem><para>Number of transactions.</para></listitem> - </varlistentry> - <varlistentry> <term>memory</term> <listitem><para>Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.</para></listitem> @@ -768,7 +755,7 @@ <para> The <command>exchangeinfoitem</command> parameter is used to indicate which exchange information items to include in the results. The column order in the - results will match the order of the parameters. + results will match the order of the parameters. <command>exchangeinfoitem</command> can take any value from the list that follows: </para> @@ -797,7 +784,7 @@ </varlistentry> </variablelist> <para> - If no <command>exchangeinfoitem</command>s are specified then + If no <command>exchangeinfoitem</command>s are specified then exchange name and type are displayed. </para> <para role="example-prefix"> @@ -839,7 +826,7 @@ <para> The <command>connectioninfoitem</command> parameter is used to indicate which connection information items to include in the results. The - column order in the results will match the order of the parameters. + column order in the results will match the order of the parameters. <command>connectioninfoitem</command> can take any value from the list that follows: </para> @@ -945,7 +932,7 @@ The <command>channelinfoitem</command> parameter is used to indicate which channel information items to include in the results. The column order in the results will match the - order of the parameters. + order of the parameters. <command>channelinfoitem</command> can take any value from the list that follows: </para> diff --git a/docs/usage.xsl b/docs/usage.xsl index 72f8880ab1..a6cebd93bf 100644 --- a/docs/usage.xsl +++ b/docs/usage.xsl @@ -11,7 +11,7 @@ <xsl:output method="text" encoding="UTF-8" indent="no"/> -<xsl:strip-space elements="*"/> +<xsl:strip-space elements="*"/> <xsl:preserve-space elements="cmdsynopsis arg" /> <xsl:template match="/"> @@ -19,7 +19,7 @@ -module(<xsl:value-of select="$modulename" />). -export([usage/0]). usage() -> %QUOTE%Usage: -<xsl:value-of select="refentry/refsynopsisdiv/cmdsynopsis/command"/> +<xsl:value-of select="refentry/refsynopsisdiv/cmdsynopsis/command"/> <xsl:text> </xsl:text> <xsl:for-each select="refentry/refsynopsisdiv/cmdsynopsis/arg"> <xsl:apply-templates select="." /> @@ -28,7 +28,7 @@ usage() -> %QUOTE%Usage: <xsl:text> </xsl:text> -<!-- List options (any variable list in a section called "Options"). --> +<!-- List options (any variable list in a section called "Options"). --> <xsl:for-each select=".//*[title='Options']/variablelist"> <xsl:if test="position() = 1"> Options: </xsl:if> <xsl:for-each select="varlistentry"> @@ -40,13 +40,13 @@ usage() -> %QUOTE%Usage: </xsl:for-each> </xsl:for-each> -<!-- Any paragraphs which have been marked as role="usage" (principally for global flags). --> +<!-- Any paragraphs which have been marked as role="usage" (principally for global flags). --> <xsl:text> </xsl:text> <xsl:for-each select=".//*[title='Options']//para[@role='usage']"> <xsl:value-of select="normalize-space(.)"/><xsl:text> </xsl:text> </xsl:for-each> -<!-- List commands (any first-level variable list in a section called "Commands"). --> +<!-- List commands (any first-level variable list in a section called "Commands"). --> <xsl:for-each select=".//*[title='Commands']/variablelist | .//*[title='Commands']/refsect2/variablelist"> <xsl:if test="position() = 1">Commands: </xsl:if> <xsl:for-each select="varlistentry"> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index def1b69eec..c2dad74475 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -176,6 +176,7 @@ -define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."). -define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/"). +-define(ERTS_MINIMUM, "5.6.3"). -define(MAX_WAIT, 16#ffffffff). diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index b052d889b3..6926261f79 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -13,8 +13,8 @@ Source4: rabbitmq-asroot-script-wrapper Source5: rabbitmq-server.ocf URL: http://www.rabbitmq.com/ BuildArch: noarch -BuildRequires: erlang, python-simplejson, xmlto, libxslt -Requires: erlang, logrotate +BuildRequires: erlang >= R12B-3, python-simplejson, xmlto, libxslt +Requires: erlang >= R12B-3, logrotate BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root Summary: The RabbitMQ server Requires(post): %%REQUIRES%% diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index 479c356829..a44f49a0e0 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -7,7 +7,7 @@ Standards-Version: 3.8.0 Package: rabbitmq-server Architecture: all -Depends: erlang-base | erlang-base-hipe, erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends} +Depends: erlang-base (>= 1:12.b.3) | erlang-base-hipe (>= 1:12.b.3), erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends} Description: An AMQP server written in Erlang RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 638498c1e2..ccdfc40160 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -31,7 +31,7 @@ ## NODENAME=rabbit -SERVER_ERL_ARGS="+K true +A30 \ +SERVER_ERL_ARGS="+K true +A30 +P 1048576 \ -kernel inet_default_listen_options [{nodelay,true}] \ -kernel inet_default_connect_options [{nodelay,true}]" CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 28eb8ebb8d..57fe1328ce 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -145,6 +145,7 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( -s rabbit ^
+W w ^
+A30 ^
++P 1048576 ^
-kernel inet_default_listen_options "[{nodelay, true}]" ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^
!RABBITMQ_LISTEN_ARG! ^
diff --git a/src/rabbit.erl b/src/rabbit.erl index 7cf92a6243..c438589ebb 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -47,7 +47,8 @@ [{description, "codec correctness check"}, {mfa, {rabbit_binary_generator, check_empty_content_body_frame_size, - []}}]}). + []}}, + {enables, external_infrastructure}]}). -rabbit_boot_step({database, [{mfa, {rabbit_mnesia, init, []}}, @@ -65,21 +66,21 @@ [{description, "exchange type registry"}, {mfa, {rabbit_sup, start_child, [rabbit_exchange_type_registry]}}, - {enables, kernel_ready}, - {requires, external_infrastructure}]}). + {requires, external_infrastructure}, + {enables, kernel_ready}]}). -rabbit_boot_step({rabbit_log, [{description, "logging server"}, {mfa, {rabbit_sup, start_restartable_child, [rabbit_log]}}, - {enables, kernel_ready}, - {requires, external_infrastructure}]}). + {requires, external_infrastructure}, + {enables, kernel_ready}]}). -rabbit_boot_step({rabbit_hooks, [{description, "internal event notification system"}, {mfa, {rabbit_hooks, start, []}}, - {enables, kernel_ready}, - {requires, external_infrastructure}]}). + {requires, external_infrastructure}, + {enables, kernel_ready}]}). -rabbit_boot_step({kernel_ready, [{description, "kernel ready"}, @@ -113,35 +114,36 @@ {enables, core_initialized}]}). -rabbit_boot_step({core_initialized, - [{description, "core initialized"}]}). + [{description, "core initialized"}, + {requires, kernel_ready}]}). -rabbit_boot_step({empty_db_check, [{description, "empty DB check"}, {mfa, {?MODULE, maybe_insert_default_data, []}}, - {requires, core_initialized}]}). + {requires, core_initialized}, + {enables, routing_ready}]}). -rabbit_boot_step({exchange_recovery, [{description, "exchange recovery"}, {mfa, {rabbit_exchange, recover, []}}, - {requires, empty_db_check}]}). + {requires, empty_db_check}, + {enables, routing_ready}]}). -rabbit_boot_step({queue_sup_queue_recovery, [{description, "queue supervisor and queue recovery"}, {mfa, {rabbit_amqqueue, start, []}}, - {requires, empty_db_check}]}). - --rabbit_boot_step({persister, - [{mfa, {rabbit_sup, start_child, - [rabbit_persister]}}, - {requires, queue_sup_queue_recovery}]}). + {requires, empty_db_check}, + {enables, routing_ready}]}). -rabbit_boot_step({routing_ready, - [{description, "message delivery logic ready"}]}). + [{description, "message delivery logic ready"}, + {requires, core_initialized}]}). -rabbit_boot_step({log_relay, [{description, "error log relay"}, {mfa, {rabbit_error_logger, boot, []}}, - {requires, routing_ready}]}). + {requires, routing_ready}, + {enables, networking}]}). -rabbit_boot_step({networking, [{mfa, {rabbit_networking, boot, []}}, @@ -226,14 +228,18 @@ rotate_logs(BinarySuffix) -> %%-------------------------------------------------------------------- start(normal, []) -> - {ok, SupPid} = rabbit_sup:start_link(), - - print_banner(), - [ok = run_boot_step(Step) || Step <- boot_steps()], - io:format("~nbroker running~n"), + case erts_version_check() of + ok -> + {ok, SupPid} = rabbit_sup:start_link(), - {ok, SupPid}. + print_banner(), + [ok = run_boot_step(Step) || Step <- boot_steps()], + io:format("~nbroker running~n"), + {ok, SupPid}; + Error -> + Error + end. stop(_State) -> terminated_ok = error_logger:delete_report_handler(rabbit_error_logger), @@ -246,6 +252,14 @@ stop(_State) -> %%--------------------------------------------------------------------------- +erts_version_check() -> + FoundVer = erlang:system_info(version), + case rabbit_misc:version_compare(?ERTS_MINIMUM, FoundVer, lte) of + true -> ok; + false -> {error, {erlang_version_too_old, + {found, FoundVer}, {required, ?ERTS_MINIMUM}}} + end. + boot_error(Format, Args) -> io:format("BOOT ERROR: " ++ Format, Args), error_logger:error_msg(Format, Args), @@ -396,8 +410,9 @@ print_banner() -> {"cookie hash", rabbit_misc:cookie_hash()}, {"log", log_location(kernel)}, {"sasl log", log_location(sasl)}, - {"database dir", rabbit_mnesia:dir()}], - DescrLen = lists:max([length(K) || {K, _V} <- Settings]), + {"database dir", rabbit_mnesia:dir()}, + {"erlang version", erlang:system_info(version)}], + DescrLen = 1 + lists:max([length(K) || {K, _V} <- Settings]), Format = "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings), io:nl(). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index da12d51516..d749408ed8 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -35,7 +35,7 @@ -export([internal_declare/2, internal_delete/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, - stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]). + stat/1, stat_all/0, deliver/2, requeue/3, ack/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([consumers/1, consumers_all/1]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). @@ -87,7 +87,6 @@ {'error', 'not_empty'}). -spec(purge/1 :: (amqqueue()) -> qlen()). -spec(deliver/2 :: (pid(), delivery()) -> boolean()). --spec(redeliver/2 :: (pid(), [{message(), boolean()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). -spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). @@ -116,6 +115,9 @@ start() -> DurableQueues = find_durable_queues(), + ok = rabbit_sup:start_child( + rabbit_persister, + [[QName || #amqqueue{name = QName} <- DurableQueues]]), {ok,_} = supervisor:start_child( rabbit_sup, {rabbit_amqqueue_sup, @@ -140,41 +142,51 @@ shared_or_live_owner(Owner) when is_pid(Owner) -> rpc:call(node(Owner), erlang, is_process_alive, [Owner]). recover_durable_queues(DurableQueues) -> - lists:foldl( - fun (RecoveredQ = #amqqueue{ exclusive_owner = Owner }, - Acc) -> - %% We need to catch the case where a client connected to - %% another node has deleted the queue (and possibly - %% re-created it). - DoIfSameQueue = - fun (Action) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> case mnesia:match_object( - rabbit_durable_queue, RecoveredQ, read) of - [_] -> {true, Action()}; - [] -> false - end - end) - end, - case shared_or_live_owner(Owner) of - true -> - Q = start_queue_process(RecoveredQ), - case DoIfSameQueue(fun () -> store_queue(Q) end) of - {true, ok} -> [Q | Acc]; - false -> exit(Q#amqqueue.pid, shutdown), - Acc - end; - false -> - case DoIfSameQueue( - fun () -> - internal_delete2(RecoveredQ#amqqueue.name) - end) of - {true, Hook} -> Hook(); - false -> ok - end, - Acc - end - end, [], DurableQueues). + Qs = [start_queue_process(Q) || Q <- DurableQueues], + %% Issue inits to *all* the queues so that they all init at the same time + [ok = gen_server2:cast(Q#amqqueue.pid, {init, true}) || Q <- Qs], + [ok = gen_server2:call(Q#amqqueue.pid, sync, infinity) || Q <- Qs], + rabbit_misc:execute_mnesia_transaction( + fun () -> [ok = store_queue(Q) || Q <- Qs] end), + Qs. + +%% This changed too radically to merge. We'll fix this later; see bug 22695 +%% recover_durable_queues(DurableQueues) -> +%% lists:foldl( +%% fun (RecoveredQ = #amqqueue{ exclusive_owner = Owner }, +%% Acc) -> +%% %% We need to catch the case where a client connected to +%% %% another node has deleted the queue (and possibly +%% %% re-created it). +%% DoIfSameQueue = +%% fun (Action) -> +%% rabbit_misc:execute_mnesia_transaction( +%% fun () -> case mnesia:match_object( +%% rabbit_durable_queue, RecoveredQ, read) of +%% [_] -> {true, Action()}; +%% [] -> false +%% end +%% end) +%% end, +%% case shared_or_live_owner(Owner) of +%% true -> +%% Q = start_queue_process(RecoveredQ), +%% case DoIfSameQueue(fun () -> store_queue(Q) end) of +%% {true, ok} -> [Q | Acc]; +%% false -> exit(Q#amqqueue.pid, shutdown), +%% Acc +%% end; +%% false -> +%% case DoIfSameQueue( +%% fun () -> +%% internal_delete2(RecoveredQ#amqqueue.name) +%% end) of +%% {true, Hook} -> Hook(); +%% false -> ok +%% end, +%% Acc +%% end +%% end, [], DurableQueues). declare(QueueName, Durable, AutoDelete, Args, Owner) -> Q = start_queue_process(#amqqueue{name = QueueName, @@ -183,6 +195,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> arguments = Args, exclusive_owner = Owner, pid = none}), + ok = gen_server2:cast(Q#amqqueue.pid, {init, false}), + ok = gen_server2:call(Q#amqqueue.pid, sync, infinity), internal_declare(Q, true). internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) -> @@ -297,9 +311,6 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}), true. -redeliver(QPid, Messages) -> - gen_server2:cast(QPid, {redeliver, Messages}). - requeue(QPid, MsgIds, ChPid) -> gen_server2:call(QPid, {requeue, MsgIds, ChPid}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5e1231b92c..696ed7fa34 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -79,11 +79,8 @@ exclusive_consumer_tag, messages_ready, messages_unacknowledged, - messages_uncommitted, messages, - acks_uncommitted, consumers, - transactions, memory]). %%---------------------------------------------------------------------------- @@ -91,24 +88,22 @@ start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). info_keys() -> ?INFO_KEYS. - + %%---------------------------------------------------------------------------- init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), - case Q#amqqueue.exclusive_owner of - none -> ok; - ReaderPid -> erlang:monitor(process, ReaderPid) - end, {ok, #q{q = Q, exclusive_consumer = none, has_had_consumers = false, next_msg_id = 1, - message_buffer = queue:new(), + message_buffer = undefined, active_consumers = queue:new(), blocked_consumers = queue:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +terminate(_Reason, #q{message_buffer = undefined}) -> + ok; terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? QName = qname(State), @@ -441,9 +436,6 @@ store_tx(Txn, Tx) -> erase_tx(Txn) -> erase({txn, Txn}). -all_tx_record() -> - [T || {{txn, _}, T} <- get()]. - all_tx() -> [Txn || {{txn, Txn}, _} <- get()]. @@ -517,20 +509,11 @@ i(messages_ready, #q{message_buffer = MessageBuffer}) -> i(messages_unacknowledged, _) -> lists:sum([dict:size(UAM) || #cr{unacked_messages = UAM} <- all_ch_record()]); -i(messages_uncommitted, _) -> - lists:sum([length(Pending) || - #tx{pending_messages = Pending} <- all_tx_record()]); i(messages, State) -> lists:sum([i(Item, State) || Item <- [messages_ready, - messages_unacknowledged, - messages_uncommitted]]); -i(acks_uncommitted, _) -> - lists:sum([length(Pending) || - #tx{pending_acks = Pending} <- all_tx_record()]); + messages_unacknowledged]]); i(consumers, State) -> queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers); -i(transactions, _) -> - length(all_tx_record()); i(memory, _) -> {memory, M} = process_info(self(), memory), M; @@ -539,6 +522,9 @@ i(Item, _) -> %--------------------------------------------------------------------------- +handle_call(sync, _From, State) -> + reply(ok, State); + handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); @@ -730,6 +716,16 @@ handle_call({requeue, MsgIds, ChPid}, _From, State) -> [{Message, true} || Message <- Messages], State)) end. +handle_cast({init, Recover}, State = #q{message_buffer = undefined, q = Q}) -> + case Q#amqqueue.exclusive_owner of + none -> ok; + ReaderPid -> erlang:monitor(process, ReaderPid) + end, + Messages = case Recover of + true -> rabbit_persister:queue_content(qname(State)); + false -> [] + end, + noreply(State#q{message_buffer = queue:from_list(Messages)}); handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. @@ -758,9 +754,6 @@ handle_cast({rollback, Txn, ChPid}, State) -> record_current_channel_tx(ChPid, none), noreply(State); -handle_cast({redeliver, Messages}, State) -> - noreply(deliver_or_enqueue_n(Messages, State)); - handle_cast({unblock, ChPid}, State) -> noreply( possibly_unblock(State, ChPid, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index b63afe5631..098d7ee956 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -413,7 +413,7 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> if Q#amqqueue.durable and not(X#exchange.durable) -> {error, durability_settings_incompatible}; true -> - case mnesia:read(rabbit_route, B) of + case mnesia:read({rabbit_route, B}) of [] -> sync_binding(B, Q#amqqueue.durable, fun mnesia:write/3), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 2c1808465f..59ba277610 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -598,24 +598,28 @@ version_compare(A, B, gte) -> version_compare(A, B, Result) -> Result =:= version_compare(A, B). -version_compare([], []) -> +version_compare(A, A) -> eq; -version_compare([], _ ) -> +version_compare([], [$0 | B]) -> + version_compare([], dropdot(B)); +version_compare([], _) -> lt; %% 2.3 < 2.3.1 -version_compare(_ , []) -> +version_compare([$0 | A], []) -> + version_compare(dropdot(A), []); +version_compare(_, []) -> gt; %% 2.3.1 > 2.3 version_compare(A, B) -> {AStr, ATl} = lists:splitwith(fun (X) -> X =/= $. end, A), {BStr, BTl} = lists:splitwith(fun (X) -> X =/= $. end, B), ANum = list_to_integer(AStr), BNum = list_to_integer(BStr), - if ANum =:= BNum -> ATl1 = lists:dropwhile(fun (X) -> X =:= $. end, ATl), - BTl1 = lists:dropwhile(fun (X) -> X =:= $. end, BTl), - version_compare(ATl1, BTl1); + if ANum =:= BNum -> version_compare(dropdot(ATl), dropdot(BTl)); ANum < BNum -> lt; ANum > BNum -> gt end. +dropdot(A) -> lists:dropwhile(fun (X) -> X =:= $. end, A). + recursive_delete(Files) -> lists:foldl(fun (Path, ok ) -> recursive_delete1(Path); (_Path, {error, _Err} = Error) -> Error diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index a9e0cab928..a8e41baf74 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -33,14 +33,14 @@ -behaviour(gen_server). --export([start_link/0]). +-export([start_link/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([transaction/1, extend_transaction/2, dirty_work/1, commit_transaction/1, rollback_transaction/1, - force_snapshot/0]). + force_snapshot/0, queue_content/1]). -include("rabbit.hrl"). @@ -52,8 +52,7 @@ -define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}). -record(pstate, {log_handle, entry_count, deadline, - pending_logs, pending_replies, - snapshot}). + pending_logs, pending_replies, snapshot}). %% two tables for efficient persistency %% one maps a key to a message @@ -72,20 +71,22 @@ {deliver, pmsg()} | {ack, pmsg()}). --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(start_link/1 :: ([queue_name()]) -> + {'ok', pid()} | 'ignore' | {'error', any()}). -spec(transaction/1 :: ([work_item()]) -> 'ok'). -spec(extend_transaction/2 :: ({txn(), queue_name()}, [work_item()]) -> 'ok'). -spec(dirty_work/1 :: ([work_item()]) -> 'ok'). -spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). -spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). -spec(force_snapshot/0 :: () -> 'ok'). +-spec(queue_content/1 :: (queue_name()) -> [{message(), boolean()}]). -endif. %%---------------------------------------------------------------------------- -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +start_link(DurableQueues) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [DurableQueues], []). transaction(MessageList) -> ?LOGDEBUG("transaction ~p~n", [MessageList]), @@ -111,15 +112,18 @@ rollback_transaction(TxnKey) -> force_snapshot() -> gen_server:call(?SERVER, force_snapshot, infinity). +queue_content(QName) -> + gen_server:call(?SERVER, {queue_content, QName}, infinity). + %%-------------------------------------------------------------------- -init(_Args) -> +init([DurableQueues]) -> process_flag(trap_exit, true), FileName = base_filename(), ok = filelib:ensure_dir(FileName), Snapshot = #psnapshot{transactions = dict:new(), messages = ets:new(messages, []), - queues = ets:new(queues, []), + queues = ets:new(queues, [ordered_set]), next_seq_id = 0}, LogHandle = case disk_log:open([{name, rabbit_persister}, @@ -135,7 +139,8 @@ init(_Args) -> [Recovered, Bad]), LH end, - {Res, NewSnapshot} = internal_load_snapshot(LogHandle, Snapshot), + {Res, NewSnapshot} = + internal_load_snapshot(LogHandle, DurableQueues, Snapshot), case Res of ok -> ok = take_snapshot(LogHandle, NewSnapshot); @@ -143,12 +148,12 @@ init(_Args) -> rabbit_log:error("Failed to load persister log: ~p~n", [Reason]), ok = take_snapshot_and_save_old(LogHandle, NewSnapshot) end, - State = #pstate{log_handle = LogHandle, - entry_count = 0, - deadline = infinity, - pending_logs = [], - pending_replies = [], - snapshot = NewSnapshot}, + State = #pstate{log_handle = LogHandle, + entry_count = 0, + deadline = infinity, + pending_logs = [], + pending_replies = [], + snapshot = NewSnapshot}, {ok, State}. handle_call({transaction, Key, MessageList}, From, State) -> @@ -158,6 +163,13 @@ handle_call({commit_transaction, TxnKey}, From, State) -> do_noreply(internal_commit(From, TxnKey, State)); handle_call(force_snapshot, _From, State) -> do_reply(ok, flush(true, State)); +handle_call({queue_content, QName}, _From, + State = #pstate{snapshot = #psnapshot{messages = Messages, + queues = Queues}}) -> + MatchSpec= [{{{QName,'$1'}, '$2', '$3'}, [], [{{'$3', '$1', '$2'}}]}], + do_reply([{ets:lookup_element(Messages, K, 2), D} || + {_, K, D} <- lists:sort(ets:select(Queues, MatchSpec))], + State); handle_call(_Request, _From, State) -> {noreply, State}. @@ -339,10 +351,10 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts, next_seq_id = NextSeqId}) -> %% Avoid infinite growth of the table by removing messages not %% bound to a queue anymore - prune_table(Messages, ets:foldl( - fun ({{_QName, PKey}, _Delivered, _SeqId}, S) -> - sets:add_element(PKey, S) - end, sets:new(), Queues)), + PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, _SeqId}, S) -> + sets:add_element(PKey, S) + end, sets:new(), Queues), + prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end), InnerSnapshot = {{txns, Ts}, {messages, ets:tab2list(Messages)}, {queues, ets:tab2list(Queues)}, @@ -351,20 +363,21 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts, {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, term_to_binary(InnerSnapshot)}. -prune_table(Tab, Keys) -> +prune_table(Tab, Pred) -> true = ets:safe_fixtable(Tab, true), - ok = prune_table(Tab, Keys, ets:first(Tab)), + ok = prune_table(Tab, Pred, ets:first(Tab)), true = ets:safe_fixtable(Tab, false). -prune_table(_Tab, _Keys, '$end_of_table') -> ok; -prune_table(Tab, Keys, Key) -> - case sets:is_element(Key, Keys) of +prune_table(_Tab, _Pred, '$end_of_table') -> ok; +prune_table(Tab, Pred, Key) -> + case Pred(Key) of true -> ok; false -> ets:delete(Tab, Key) end, - prune_table(Tab, Keys, ets:next(Tab, Key)). + prune_table(Tab, Pred, ets:next(Tab, Key)). internal_load_snapshot(LogHandle, + DurableQueues, Snapshot = #psnapshot{messages = Messages, queues = Queues}) -> {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start), @@ -378,11 +391,18 @@ internal_load_snapshot(LogHandle, Snapshot#psnapshot{ transactions = Ts, next_seq_id = NextSeqId}), - Snapshot2 = requeue_messages(Snapshot1), + %% Remove all entries for queues that no longer exist. + %% Note that the 'messages' table is pruned when the next + %% snapshot is taken. + DurableQueuesSet = sets:from_list(DurableQueues), + prune_table(Snapshot1#psnapshot.queues, + fun ({QName, _PKey}) -> + sets:is_element(QName, DurableQueuesSet) + end), %% uncompleted transactions are discarded - this is TRTTD %% since we only get into this code on node restart, so %% any uncompleted transactions will have been aborted. - {ok, Snapshot2#psnapshot{transactions = dict:new()}}; + {ok, Snapshot1#psnapshot{transactions = dict:new()}}; {error, Reason} -> {{error, Reason}, Snapshot} end. @@ -394,52 +414,6 @@ check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) -> check_version(_Other) -> {error, unrecognised_persister_log_format}. -requeue_messages(Snapshot = #psnapshot{messages = Messages, - queues = Queues}) -> - Work = ets:foldl( - fun ({{QName, PKey}, Delivered, SeqId}, Acc) -> - rabbit_misc:dict_cons(QName, {SeqId, PKey, Delivered}, Acc) - end, dict:new(), Queues), - %% unstable parallel map, because order doesn't matter - L = lists:append( - rabbit_misc:upmap( - %% we do as much work as possible in spawned worker - %% processes, but we need to make sure the ets:inserts are - %% performed in self() - fun ({QName, Requeues}) -> - requeue(QName, Requeues, Messages) - end, dict:to_list(Work))), - NewMessages = [{K, M} || {_S, _Q, K, M, _D} <- L], - NewQueues = [{{Q, K}, D, S} || {S, Q, K, _M, D} <- L], - ets:delete_all_objects(Messages), - ets:delete_all_objects(Queues), - true = ets:insert(Messages, NewMessages), - true = ets:insert(Queues, NewQueues), - %% contains the mutated messages and queues tables - Snapshot. - -requeue(QName, Requeues, Messages) -> - case rabbit_amqqueue:lookup(QName) of - {ok, #amqqueue{pid = QPid}} -> - RequeueMessages = - [{SeqId, QName, PKey, Message, Delivered} || - {SeqId, PKey, Delivered} <- Requeues, - {_, Message} <- ets:lookup(Messages, PKey)], - rabbit_amqqueue:redeliver( - QPid, - %% Messages published by the same process receive - %% persistence keys that are monotonically - %% increasing. Since message ordering is defined on a - %% per-channel basis, and channels are bound to specific - %% processes, sorting the list does provide the correct - %% ordering properties. - [{Message, Delivered} || {_, _, _, Message, Delivered} <- - lists:sort(RequeueMessages)]), - RequeueMessages; - {error, not_found} -> - [] - end. - replay([], LogHandle, K, Snapshot) -> case disk_log:chunk(LogHandle, K) of {K1, Items} -> diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index d3a4811926..9ef8c636a3 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -46,6 +46,8 @@ -spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A). -spec(submit_async/2 :: (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). +-spec(run/1 :: (fun (() -> A)) -> A; + ({atom(), atom(), [any()]}) -> any()). -endif. @@ -65,6 +67,13 @@ submit(Pid, Fun) -> submit_async(Pid, Fun) -> gen_server2:cast(Pid, {submit_async, Fun}). +run({M, F, A}) -> + apply(M, F, A); +run(Fun) -> + Fun(). + +%%---------------------------------------------------------------------------- + init([WId]) -> ok = worker_pool:idle(WId), put(worker_pool_worker, true), @@ -95,10 +104,3 @@ code_change(_OldVsn, State, _Extra) -> terminate(_Reason, State) -> State. - -%%---------------------------------------------------------------------------- - -run({M, F, A}) -> - apply(M, F, A); -run(Fun) -> - Fun(). |
