summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.hgignore2
-rw-r--r--Makefile92
-rw-r--r--codegen.py15
-rw-r--r--docs/examples-to-end.xsl94
-rw-r--r--docs/html-to-website-xml.xsl91
-rw-r--r--docs/rabbitmq-activate-plugins.1.pod37
-rw-r--r--docs/rabbitmq-activate-plugins.1.xml60
-rw-r--r--docs/rabbitmq-deactivate-plugins.1.pod37
-rw-r--r--docs/rabbitmq-deactivate-plugins.1.xml60
-rw-r--r--docs/rabbitmq-multi.1.pod59
-rw-r--r--docs/rabbitmq-multi.1.xml100
-rw-r--r--docs/rabbitmq-server.1.pod88
-rw-r--r--docs/rabbitmq-server.1.xml143
-rw-r--r--docs/rabbitmq-service.xml228
-rw-r--r--docs/rabbitmq.conf.5.pod69
-rw-r--r--docs/rabbitmq.conf.5.xml84
-rw-r--r--docs/rabbitmqctl.1.pod536
-rw-r--r--docs/rabbitmqctl.1.xml1042
-rw-r--r--docs/remove-namespaces.xsl17
-rw-r--r--docs/usage.xsl78
-rw-r--r--generate_deps8
-rw-r--r--include/rabbit.hrl15
-rw-r--r--include/rabbit_exchange_type_spec.hrl42
-rw-r--r--include/rabbit_framing_spec.hrl2
-rw-r--r--packaging/RPMS/Fedora/Makefile6
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec16
-rw-r--r--packaging/macports/Makefile2
-rw-r--r--packaging/windows/Makefile7
-rw-r--r--packaging/windows/rabbitmq-service.pod133
-rw-r--r--src/pg_local.erl2
-rw-r--r--src/rabbit.erl73
-rw-r--r--src/rabbit_alarm.erl7
-rw-r--r--src/rabbit_amqqueue.erl128
-rw-r--r--src/rabbit_amqqueue_process.erl29
-rw-r--r--src/rabbit_basic.erl29
-rw-r--r--src/rabbit_channel.erl150
-rw-r--r--src/rabbit_control.erl82
-rw-r--r--src/rabbit_dialyzer.erl6
-rw-r--r--src/rabbit_exchange.erl531
-rw-r--r--src/rabbit_exchange_type.erl61
-rw-r--r--src/rabbit_exchange_type_direct.erl63
-rw-r--r--src/rabbit_exchange_type_fanout.erl61
-rw-r--r--src/rabbit_exchange_type_headers.erl137
-rw-r--r--src/rabbit_exchange_type_registry.erl129
-rw-r--r--src/rabbit_exchange_type_topic.erl101
-rw-r--r--src/rabbit_limiter.erl74
-rw-r--r--src/rabbit_misc.erl95
-rw-r--r--src/rabbit_mnesia.erl7
-rw-r--r--src/rabbit_multi.erl15
-rw-r--r--src/rabbit_networking.erl28
-rw-r--r--src/rabbit_persister.erl28
-rw-r--r--src/rabbit_reader.erl24
-rw-r--r--src/rabbit_restartable_sup.erl47
-rw-r--r--src/rabbit_router.erl55
-rw-r--r--src/rabbit_sup.erl26
-rw-r--r--src/rabbit_tests.erl3
-rw-r--r--src/tcp_listener_sup.erl2
-rw-r--r--src/worker_pool.erl155
-rw-r--r--src/worker_pool_sup.erl69
-rw-r--r--src/worker_pool_worker.erl104
60 files changed, 3834 insertions, 1650 deletions
diff --git a/.hgignore b/.hgignore
index 442425f62c..caaa3acef0 100644
--- a/.hgignore
+++ b/.hgignore
@@ -11,6 +11,7 @@ syntax: regexp
^dist/
^include/rabbit_framing\.hrl$
^src/rabbit_framing\.erl$
+^src/.*\_usage.erl$
^rabbit\.plt$
^basic.plt$
^ebin/rabbit\.(app|rel|boot|script)$
@@ -25,3 +26,4 @@ syntax: regexp
^packaging/windows/rabbitmq-server-windows-.*\.zip$
^docs/.*\.[15]\.gz$
+^docs/.*\.man\.xml$
diff --git a/Makefile b/Makefile
index 5d43ee8add..6cb086ab94 100644
--- a/Makefile
+++ b/Makefile
@@ -10,12 +10,16 @@ DEPS_FILE=deps.mk
SOURCE_DIR=src
EBIN_DIR=ebin
INCLUDE_DIR=include
+DOCS_DIR=docs
INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl
-SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl
+SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL)
BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES))
TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS)
WEB_URL=http://stage.rabbitmq.com/
-MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod))
+MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml))
+WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml)
+USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-multi.1.xml
+USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML)))
ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes)
PYTHON=python
@@ -58,6 +62,14 @@ ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
ERL_EBIN=erl -noinput -pa $(EBIN_DIR)
+define usage_xml_to_erl
+ $(subst __,_,$(patsubst $(DOCS_DIR)/rabbitmq%.1.xml, $(SOURCE_DIR)/rabbit_%_usage.erl, $(subst -,_,$(1))))
+endef
+
+define usage_dep
+ $(call usage_xml_to_erl, $(1)): $(1) $(DOCS_DIR)/usage.xsl
+endef
+
all: $(TARGETS)
$(DEPS_FILE): $(SOURCES) $(INCLUDES)
@@ -66,9 +78,8 @@ $(DEPS_FILE): $(SOURCES) $(INCLUDES)
$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
escript generate_app $(EBIN_DIR) $@ < $<
-$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl
+$(EBIN_DIR)/%.beam:
erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
-# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH)
$(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@
@@ -100,7 +111,7 @@ clean:
rm -f $(EBIN_DIR)/*.beam
rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel
rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc
- rm -f docs/*.[0-9].gz
+ rm -f $(DOCS_DIR)/*.[0-9].gz $(DOCS_DIR)/*.man.xml $(DOCS_DIR)/*.erl $(USAGES_ERL)
rm -f $(RABBIT_PLT)
rm -f $(DEPS_FILE)
@@ -176,7 +187,7 @@ srcdist: distclean
cp codegen.py Makefile generate_app generate_deps calculate-relative $(TARGET_SRC_DIR)
cp -r scripts $(TARGET_SRC_DIR)
- cp -r docs $(TARGET_SRC_DIR)
+ cp -r $(DOCS_DIR) $(TARGET_SRC_DIR)
chmod 0755 $(TARGET_SRC_DIR)/scripts/*
(cd dist; tar -zcf $(TARBALL_NAME).tar.gz $(TARBALL_NAME))
@@ -188,16 +199,36 @@ distclean: clean
rm -rf dist
find . -regex '.*\(~\|#\|\.swp\|\.dump\)' -exec rm {} \;
-%.gz: %.pod
- pod2man \
- -n `echo $$(basename $*) | sed -e 's/\.[[:digit:]]\+//'` \
- -s `echo $$(basename $*) | sed -e 's/.*\.\([^.]\+\)/\1/'` \
- -c "RabbitMQ AMQP Server" \
- -d "" \
- -r "" \
- $< | gzip --best > $@
-
-docs_all: $(MANPAGES)
+# xmlto can not read from standard input, so we mess with a tmp file.
+%.gz: %.xml $(DOCS_DIR)/examples-to-end.xsl
+ xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \
+ xmlto man -o $(DOCS_DIR) --stringparam man.indent.verbatims=0 $<.tmp && \
+ gzip -f $(DOCS_DIR)/`basename $< .xml`
+ rm -f $<.tmp
+
+# Use tmp files rather than a pipeline so that we get meaningful errors
+# Do not fold the cp into previous line, it's there to stop the file being
+# generated but empty if we fail
+$(SOURCE_DIR)/%_usage.erl:
+ xsltproc --stringparam modulename "`basename $@ .erl`" \
+ $(DOCS_DIR)/usage.xsl $< > $@.tmp
+ sed -e 's/"/\\"/g' -e 's/%QUOTE%/"/g' $@.tmp > $@.tmp2
+ fold -s $@.tmp2 > $@.tmp3
+ mv $@.tmp3 $@
+ rm $@.tmp $@.tmp2
+
+# We rename the file before xmlto sees it since xmlto will use the name of
+# the file to make internal links.
+%.man.xml: %.xml $(DOCS_DIR)/html-to-website-xml.xsl
+ cp $< `basename $< .xml`.xml && \
+ xmlto xhtml-nochunks `basename $< .xml`.xml ; rm `basename $< .xml`.xml
+ cat `basename $< .xml`.html | \
+ xsltproc --novalid $(DOCS_DIR)/remove-namespaces.xsl - | \
+ xsltproc --stringparam original `basename $<` $(DOCS_DIR)/html-to-website-xml.xsl - | \
+ xmllint --format - > $@
+ rm `basename $< .xml`.html
+
+docs_all: $(MANPAGES) $(WEB_MANPAGES)
install: SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR))
install: all docs_all install_dirs
@@ -215,8 +246,8 @@ install: all docs_all install_dirs
done
for section in 1 5; do \
mkdir -p $(MAN_DIR)/man$$section; \
- for manpage in docs/*.$$section.pod; do \
- cp docs/`basename $$manpage .pod`.gz $(MAN_DIR)/man$$section; \
+ for manpage in $(DOCS_DIR)/*.$$section.gz; do \
+ cp $$manpage $(MAN_DIR)/man$$section; \
done; \
done
@@ -224,4 +255,27 @@ install_dirs:
mkdir -p $(SBIN_DIR)
mkdir -p $(TARGET_DIR)/sbin
--include $(DEPS_FILE)
+$(foreach XML, $(USAGES_XML), $(eval $(call usage_dep, $(XML))))
+
+# Note that all targets which depend on clean must have clean in their
+# name. Also any target that doesn't depend on clean should not have
+# clean in its name, unless you know that you don't need any of the
+# automatic dependency generation for that target (eg cleandb).
+
+# We want to load the dep file if *any* target *doesn't* contain
+# "clean" - i.e. if removing all clean-like targets leaves something
+
+ifeq "$(MAKECMDGOALS)" ""
+TESTABLEGOALS:=$(.DEFAULT_GOAL)
+else
+TESTABLEGOALS:=$(MAKECMDGOALS)
+endif
+
+ifneq "$(strip $(TESTABLEGOALS))" "$(DEPS_FILE)"
+ifneq "$(strip $(patsubst clean%,,$(patsubst %clean,,$(TESTABLEGOALS))))" ""
+ifeq "$(strip $(wildcard $(DEPS_FILE)))" ""
+$(info $(shell $(MAKE) $(DEPS_FILE)))
+endif
+include $(DEPS_FILE)
+endif
+endif
diff --git a/codegen.py b/codegen.py
index 96109610ad..91c70e8196 100644
--- a/codegen.py
+++ b/codegen.py
@@ -126,7 +126,7 @@ def printFileHeader():
%%
%% Contributor(s): ______________________________________.
%%"""
-
+
def genErl(spec):
def erlType(domain):
return erlangTypeMap[spec.resolveDomain(domain)]
@@ -151,7 +151,7 @@ def genErl(spec):
def genMethodHasContent(m):
print "method_has_content(%s) -> %s;" % (m.erlangName(), str(m.hasContent).lower())
-
+
def genMethodIsSynchronous(m):
hasNoWait = "nowait" in fieldNameList(m.arguments)
if m.isSynchronous and hasNoWait:
@@ -219,6 +219,9 @@ def genErl(spec):
else:
pass
+ def genMethodRecord(m):
+ print "method_record(%s) -> #%s{};" % (m.erlangName(), m.erlangName())
+
def genDecodeMethodFields(m):
packedFields = packMethodFields(m.arguments)
binaryPattern = ', '.join([methodFieldFragment(f) for f in packedFields])
@@ -299,6 +302,7 @@ def genErl(spec):
-export([method_id/1]).
-export([method_has_content/1]).
-export([is_method_synchronous/1]).
+-export([method_record/1]).
-export([method_fieldnames/1]).
-export([decode_method_fields/2]).
-export([decode_properties/2]).
@@ -323,6 +327,9 @@ bitvalue(undefined) -> 0.
for m in methods: genMethodIsSynchronous(m)
print "is_method_synchronous(Name) -> exit({unknown_method_name, Name})."
+ for m in methods: genMethodRecord(m)
+ print "method_record(Name) -> exit({unknown_method_name, Name})."
+
for m in methods: genMethodFieldNames(m)
print "method_fieldnames(Name) -> exit({unknown_method_name, Name})."
@@ -362,7 +369,7 @@ def genHrl(spec):
result += ' = ' + conv_fn(field.defaultvalue)
return result
return ', '.join([fillField(f) for f in fields])
-
+
methods = spec.allMethods()
printFileHeader()
@@ -386,7 +393,7 @@ def generateErl(specPath):
def generateHrl(specPath):
genHrl(AmqpSpec(specPath))
-
+
if __name__ == "__main__":
do_main(generateHrl, generateErl)
diff --git a/docs/examples-to-end.xsl b/docs/examples-to-end.xsl
new file mode 100644
index 0000000000..496fcc1c34
--- /dev/null
+++ b/docs/examples-to-end.xsl
@@ -0,0 +1,94 @@
+<?xml version='1.0'?>
+<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
+ xmlns:exsl="http://exslt.org/common"
+ xmlns:ng="http://docbook.org/docbook-ng"
+ xmlns:db="http://docbook.org/ns/docbook"
+ exclude-result-prefixes="exsl ng db"
+ version='1.0'>
+
+<xsl:output doctype-public="-//OASIS//DTD DocBook XML V4.5//EN" doctype-system="http://www.docbook.org/xml/4.5/docbookx.dtd" />
+
+<!-- Don't copy examples through in place -->
+<xsl:template match="*[@role='example-prefix']"/>
+<xsl:template match="*[@role='example']"/>
+
+<!-- Copy everything through (with lower priority) -->
+<xsl:template match="@*|node()">
+ <xsl:copy><xsl:apply-templates select="@*|node()"/></xsl:copy>
+</xsl:template>
+
+<!-- Copy the root node, and add examples at the end-->
+<xsl:template match="/refentry">
+<refentry lang="en">
+<xsl:for-each select="*">
+ <xsl:copy><xsl:apply-templates select="@*|node()"/></xsl:copy>
+</xsl:for-each>
+ <refsect1>
+ <title>Examples</title>
+<xsl:if test="//screen[@role='example']">
+ <variablelist>
+<xsl:for-each select="//screen[@role='example']">
+ <varlistentry>
+ <term><command><xsl:copy-of select="text()"/></command></term>
+ <listitem>
+ <xsl:copy-of select="following-sibling::para[@role='example']"/>
+ </listitem>
+ </varlistentry>
+</xsl:for-each>
+ </variablelist>
+</xsl:if>
+<!--
+We need to handle multiline examples separately, since not using a
+variablelist leads to slightly less nice formatting (the explanation doesn't get
+indented)
+-->
+<xsl:for-each select="//screen[@role='example-multiline']">
+<screen><emphasis role="bold"><xsl:copy-of select="text()"/></emphasis></screen>
+<xsl:copy-of select="following-sibling::para[@role='example']"/>
+</xsl:for-each>
+ </refsect1>
+</refentry>
+</xsl:template>
+
+<!--
+ We show all the subcommands using XML that looks like this:
+
+ <term>
+ <cmdsynopsis>
+ <command>list_connections</command>
+ <arg choice="opt">
+ <replaceable>connectioninfoitem</replaceable>
+ ...
+ </arg>
+ </cmdsynopsis>
+ </term>
+
+ However, while DocBook renders this sensibly for HTML, for some reason it
+ doen't show anything inside <cmdsynopsis> at all for man pages. I think what
+ we're doing is semantically correct so this is a bug in DocBook. The following
+ rules essentially do what DocBook does when <cmdsynopsis> is not inside a
+ <term>.
+-->
+
+<xsl:template match="term/cmdsynopsis">
+ <xsl:apply-templates mode="docbook-bug"/>
+</xsl:template>
+
+<xsl:template match="command" mode="docbook-bug">
+ <emphasis role="bold"><xsl:apply-templates mode="docbook-bug"/></emphasis>
+</xsl:template>
+
+<xsl:template match="arg[@choice='opt']" mode="docbook-bug">
+ [<xsl:apply-templates mode="docbook-bug"/>]
+</xsl:template>
+
+<xsl:template match="arg[@choice='req']" mode="docbook-bug">
+ {<xsl:apply-templates mode="docbook-bug"/>}
+</xsl:template>
+
+<xsl:template match="replaceable" mode="docbook-bug">
+ <emphasis><xsl:apply-templates mode="docbook-bug"/></emphasis>
+</xsl:template>
+
+</xsl:stylesheet>
+
diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl
new file mode 100644
index 0000000000..a35b869967
--- /dev/null
+++ b/docs/html-to-website-xml.xsl
@@ -0,0 +1,91 @@
+<?xml version='1.0'?>
+<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
+ xmlns:doc="http://www.rabbitmq.com/namespaces/ad-hoc/doc"
+ version='1.0'>
+
+<xsl:param name="original"/>
+
+<xsl:output method="xml" doctype-public="bug in xslt processor requires fake doctype" doctype-system="otherwise css isn't included" />
+
+<xsl:template match="*"/>
+
+<!-- Copy every element through -->
+<xsl:template match="@*|node()">
+ <xsl:copy><xsl:apply-templates select="@*|node()"/></xsl:copy>
+</xsl:template>
+
+<!-- Copy the root node, and munge the outer part of the page -->
+<xsl:template match="/html">
+<xsl:processing-instruction name="xml-stylesheet">type="text/xml" href="page.xsl"</xsl:processing-instruction>
+<html xmlns:doc="http://www.rabbitmq.com/namespaces/ad-hoc/doc">
+ <head>
+ <title><xsl:value-of select="document($original)/refentry/refnamediv/refname"/><xsl:if test="document($original)/refentry/refmeta/manvolnum">(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</xsl:if> manual page</title>
+ </head>
+ <body>
+ <doc:div>
+ <xsl:choose>
+ <xsl:when test="document($original)/refentry/refmeta/manvolnum">
+ <p>
+ This is the manual page for
+ <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>.
+ </p>
+ <p>
+ <a href="manpages.html">See a list of all manual pages</a>.
+ </p>
+ </xsl:when>
+ <xsl:otherwise>
+ <p>
+ This is the documentation for
+ <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>.
+ </p>
+ </xsl:otherwise>
+ </xsl:choose>
+ <p>
+ For more general documentation, please see the
+ <a href="admin-guide.html">administrator's guide</a>.
+ </p>
+
+ <doc:toc class="compact">
+ <doc:heading>Table of Contents</doc:heading>
+ </doc:toc>
+
+ <xsl:apply-templates select="body/div[@class='refentry']"/>
+ </doc:div>
+ </body>
+</html>
+</xsl:template>
+
+<!-- Specific instructions to revert the DocBook HTML to be more like our ad-hoc XML schema -->
+
+<xsl:template match="div[@class='refsect1'] | div[@class='refnamediv'] | div[@class='refsynopsisdiv']">
+ <doc:section name="{@title}">
+ <xsl:apply-templates select="node()"/>
+ </doc:section>
+</xsl:template>
+
+<xsl:template match="div[@class='refsect2']">
+ <doc:subsection name="{@title}">
+ <xsl:apply-templates select="node()"/>
+ </doc:subsection>
+</xsl:template>
+
+<xsl:template match="h2 | h3">
+ <doc:heading>
+ <xsl:apply-templates select="node()"/>
+ </doc:heading>
+</xsl:template>
+
+<xsl:template match="pre[@class='screen']">
+ <pre class="sourcecode">
+ <xsl:apply-templates select="node()"/>
+ </pre>
+</xsl:template>
+
+<xsl:template match="div[@class='cmdsynopsis']">
+ <div class="cmdsynopsis" id="{p/code[@class='command']}">
+ <xsl:apply-templates select="node()"/>
+ </div>
+</xsl:template>
+
+</xsl:stylesheet>
+
diff --git a/docs/rabbitmq-activate-plugins.1.pod b/docs/rabbitmq-activate-plugins.1.pod
deleted file mode 100644
index 42f0c4d2e8..0000000000
--- a/docs/rabbitmq-activate-plugins.1.pod
+++ /dev/null
@@ -1,37 +0,0 @@
-=head1 NAME
-
-rabbitmq-activate-plugins - command line tool for activating plugins
-in a RabbitMQ broker
-
-=head1 SYNOPSIS
-
-rabbitmq-activate-plugins
-
-=head1 DESCRIPTION
-
-RabbitMQ is an implementation of AMQP, the emerging standard for high
-performance enterprise messaging. The RabbitMQ server is a robust and
-scalable implementation of an AMQP broker.
-
-rabbitmq-activate-plugins is a command line tool for activating
-plugins installed into the broker's plugins directory.
-
-=head1 EXAMPLES
-
-To activate all of the installed plugins in the current RabbitMQ install,
-execute:
-
- rabbitmq-activate-plugins
-
-=head1 SEE ALSO
-
-L<rabbitmq.conf(5)>, L<rabbitmq-multi(1)>, L<rabbitmq-server(1)>,
-L<rabbitmqctl(1)>, L<rabbitmq-deactivate-plugins(1)>
-
-=head1 AUTHOR
-
-The RabbitMQ Team <info@rabbitmq.com>
-
-=head1 REFERENCES
-
-RabbitMQ Web Site: L<http://www.rabbitmq.com>
diff --git a/docs/rabbitmq-activate-plugins.1.xml b/docs/rabbitmq-activate-plugins.1.xml
new file mode 100644
index 0000000000..ef81c201f7
--- /dev/null
+++ b/docs/rabbitmq-activate-plugins.1.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd">
+<refentry lang="en">
+ <refentryinfo>
+ <productname>RabbitMQ Server</productname>
+ <authorgroup>
+ <corpauthor>The RabbitMQ Team &lt;<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>&gt;</corpauthor>
+ </authorgroup>
+ </refentryinfo>
+
+ <refmeta>
+ <refentrytitle>rabbitmq-activate-plugins</refentrytitle>
+ <manvolnum>1</manvolnum>
+ <refmiscinfo class="manual">RabbitMQ Server</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+ <refname>rabbitmq-activate-plugins</refname>
+ <refpurpose>command line tool for activating plugins in a RabbitMQ broker</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+ <cmdsynopsis>
+ <command>rabbitmq-activate-plugins</command>
+ </cmdsynopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+ <title>Description</title>
+ <para>
+ RabbitMQ is an implementation of AMQP, the emerging standard for high
+performance enterprise messaging. The RabbitMQ server is a robust and
+scalable implementation of an AMQP broker.
+ </para>
+ <para>
+ rabbitmq-activate-plugins is a command line tool for activating
+plugins installed into the broker's plugins directory.
+ </para>
+ <para role="example-prefix">
+ For example:
+ </para>
+ <screen role="example">
+ rabbitmq-activate-plugins
+ </screen>
+ <para role="example">
+ This command activates all of the installed plugins in the current RabbitMQ install.
+ </para>
+ </refsect1>
+
+ <refsect1>
+ <title>See also</title>
+ <para>
+ <citerefentry><refentrytitle>rabbitmq.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry>
+ <citerefentry><refentrytitle>rabbitmq-multi</refentrytitle><manvolnum>1</manvolnum></citerefentry>
+ <citerefentry><refentrytitle>rabbitmq-server</refentrytitle><manvolnum>1</manvolnum></citerefentry>
+ <citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>
+ <citerefentry><refentrytitle>rabbitmq-deactivate-plugins</refentrytitle><manvolnum>1</manvolnum></citerefentry>
+ </para>
+ </refsect1>
+</refentry>
diff --git a/docs/rabbitmq-deactivate-plugins.1.pod b/docs/rabbitmq-deactivate-plugins.1.pod
deleted file mode 100644
index eb4fbb90a3..0000000000
--- a/docs/rabbitmq-deactivate-plugins.1.pod
+++ /dev/null
@@ -1,37 +0,0 @@
-=head1 NAME
-
-rabbitmq-deactivate-plugins - command line tool for deactivating plugins
-in a RabbitMQ broker
-
-=head1 SYNOPSIS
-
-rabbitmq-deactivate-plugins
-
-=head1 DESCRIPTION
-
-RabbitMQ is an implementation of AMQP, the emerging standard for high
-performance enterprise messaging. The RabbitMQ server is a robust and
-scalable implementation of an AMQP broker.
-
-rabbitmq-deactivate-plugins is a command line tool for deactivating
-plugins installed into the broker.
-
-=head1 EXAMPLES
-
-To deactivate all of the installed plugins in the current RabbitMQ install,
-execute:
-
- rabbitmq-deactivate-plugins
-
-=head1 SEE ALSO
-
-L<rabbitmq.conf(5)>, L<rabbitmq-multi(1)>, L<rabbitmq-server(1)>,
-L<rabbitmqctl(1)>, L<rabbitmq-activate-plugins(1)>
-
-=head1 AUTHOR
-
-The RabbitMQ Team <info@rabbitmq.com>
-
-=head1 REFERENCES
-
-RabbitMQ Web Site: L<http://www.rabbitmq.com>
diff --git a/docs/rabbitmq-deactivate-plugins.1.xml b/docs/rabbitmq-deactivate-plugins.1.xml
new file mode 100644
index 0000000000..eacd014b83
--- /dev/null
+++ b/docs/rabbitmq-deactivate-plugins.1.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd">
+<refentry lang="en">
+ <refentryinfo>
+ <productname>RabbitMQ Server</productname>
+ <authorgroup>
+ <corpauthor>The RabbitMQ Team &lt;<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>&gt;</corpauthor>
+ </authorgroup>
+ </refentryinfo>
+
+ <refmeta>
+ <refentrytitle>rabbitmq-deactivate-plugins</refentrytitle>
+ <manvolnum>1</manvolnum>
+ <refmiscinfo class="manual">RabbitMQ Server</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+ <refname>rabbitmq-deactivate-plugins</refname>
+ <refpurpose>command line tool for deactivating plugins in a RabbitMQ broker</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+ <cmdsynopsis>
+ <command>rabbitmq-deactivate-plugins</command>
+ </cmdsynopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+ <title>Description</title>
+ <para>
+ RabbitMQ is an implementation of AMQP, the emerging standard for high
+performance enterprise messaging. The RabbitMQ server is a robust and
+scalable implementation of an AMQP broker.
+ </para>
+ <para>
+rabbitmq-deactivate-plugins is a command line tool for deactivating
+plugins installed into the broker.
+ </para>
+ <para role="example-prefix">
+ For example:
+ </para>
+ <screen role="example">
+ rabbitmq-deactivate-plugins
+ </screen>
+ <para role="example">
+ This command deactivates all of the installed plugins in the current RabbitMQ install.
+ </para>
+ </refsect1>
+
+ <refsect1>
+ <title>See also</title>
+ <para>
+ <citerefentry><refentrytitle>rabbitmq.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry>
+ <citerefentry><refentrytitle>rabbitmq-multi</refentrytitle><manvolnum>1</manvolnum></citerefentry>
+ <citerefentry><refentrytitle>rabbitmq-server</refentrytitle><manvolnum>1</manvolnum></citerefentry>
+ <citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>
+ <citerefentry><refentrytitle>rabbitmq-activate-plugins</refentrytitle><manvolnum>1</manvolnum></citerefentry>
+ </para>
+ </refsect1>
+</refentry>
diff --git a/docs/rabbitmq-multi.1.pod b/docs/rabbitmq-multi.1.pod
deleted file mode 100644
index 640609eef9..0000000000
--- a/docs/rabbitmq-multi.1.pod
+++ /dev/null
@@ -1,59 +0,0 @@
-=head1 NAME
-
-rabbitmq-multi - start/stop local cluster RabbitMQ nodes
-
-=head1 SYNOPSIS
-
-rabbitmq-multi I<command> [command option]
-
-=head1 DESCRIPTION
-
-RabbitMQ is an implementation of AMQP, the emerging standard for high
-performance enterprise messaging. The RabbitMQ server is a robust and
-scalable implementation of an AMQP broker.
-
-rabbitmq-multi scripts allows for easy set-up of a cluster on a single
-machine.
-
-See also L<rabbitmq-server(1)> for configuration information.
-
-=head1 COMMANDS
-
-=over
-
-=item start_all I<count>
-
-Start count nodes with unique names, listening on all IP addresses and
-on sequential ports starting from 5672.
-
-=item status
-
-Print the status of all running RabbitMQ nodes.
-
-=item stop_all
-
-Stop all local RabbitMQ nodes,
-
-=item rotate_logs
-
-Rotate log files for all local and running RabbitMQ nodes.
-
-=back
-
-=head1 EXAMPLES
-
-Start 3 local RabbitMQ nodes with unique, sequential port numbers:
-
- rabbitmq-multi start_all 3
-
-=head1 SEE ALSO
-
-L<rabbitmq.conf(5)>, L<rabbitmq-server(1)>, L<rabbitmqctl(1)>
-
-=head1 AUTHOR
-
-The RabbitMQ Team <info@rabbitmq.com>
-
-=head1 REFERENCES
-
-RabbitMQ Web Site: L<http://www.rabbitmq.com>
diff --git a/docs/rabbitmq-multi.1.xml b/docs/rabbitmq-multi.1.xml
new file mode 100644
index 0000000000..b3862fdf87
--- /dev/null
+++ b/docs/rabbitmq-multi.1.xml
@@ -0,0 +1,100 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd">
+<refentry lang="en">
+ <refentryinfo>
+ <productname>RabbitMQ Server</productname>
+ <authorgroup>
+ <corpauthor>The RabbitMQ Team &lt;<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>&gt;</corpauthor>
+ </authorgroup>
+ </refentryinfo>
+
+ <refmeta>
+ <refentrytitle>rabbitmq-multi</refentrytitle>
+ <manvolnum>1</manvolnum>
+ <refmiscinfo class="manual">RabbitMQ Server</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+ <refname>rabbitmq-multi</refname>
+ <refpurpose>start/stop local cluster RabbitMQ nodes</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+ <cmdsynopsis>
+ <command>rabbitmq-multi</command>
+ <arg choice="req"><replaceable>command</replaceable></arg>
+ <arg choice="opt" rep="repeat"><replaceable>command options</replaceable></arg>
+ </cmdsynopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+ <title>Description</title>
+ <para>
+ RabbitMQ is an implementation of AMQP, the emerging standard for high
+performance enterprise messaging. The RabbitMQ server is a robust and
+scalable implementation of an AMQP broker.
+ </para>
+ <para>
+rabbitmq-multi scripts allows for easy set-up of a cluster on a single
+machine.
+ </para>
+ </refsect1>
+
+ <refsect1>
+ <title>Commands</title>
+ <variablelist>
+ <varlistentry>
+ <term><cmdsynopsis><command>start_all</command> <arg choice="req"><replaceable>count</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+Start count nodes with unique names, listening on all IP addresses and
+on sequential ports starting from 5672.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmq-multi start_all 3</screen>
+ <para role="example">
+ Starts 3 local RabbitMQ nodes with unique, sequential port numbers.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>status</command></cmdsynopsis></term>
+ <listitem>
+ <para>
+Print the status of all running RabbitMQ nodes.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>stop_all</command></cmdsynopsis></term>
+ <listitem>
+ <para>
+Stop all local RabbitMQ nodes,
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>rotate_logs</command></cmdsynopsis></term>
+ <listitem>
+ <para>
+Rotate log files for all local and running RabbitMQ nodes.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+ </refsect1>
+
+
+ <refsect1>
+ <title>See also</title>
+ <para>
+ <citerefentry><refentrytitle>rabbitmq.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry>
+ <citerefentry><refentrytitle>rabbitmq-server</refentrytitle><manvolnum>1</manvolnum></citerefentry>
+ <citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>
+ </para>
+ </refsect1>
+</refentry>
diff --git a/docs/rabbitmq-server.1.pod b/docs/rabbitmq-server.1.pod
deleted file mode 100644
index d74ab8d94f..0000000000
--- a/docs/rabbitmq-server.1.pod
+++ /dev/null
@@ -1,88 +0,0 @@
-=head1 NAME
-
-rabbitmq-server - start RabbitMQ AMQP server
-
-=head1 SYNOPSIS
-
-rabbitmq-server [-detached]
-
-=head1 DESCRIPTION
-
-RabbitMQ is an implementation of AMQP, the emerging standard for high
-performance enterprise messaging. The RabbitMQ server is a robust and
-scalable implementation of an AMQP broker.
-
-Running rabbitmq-server in the foreground displays a banner message,
-and reports on progress in the startup sequence, concluding with the
-message "broker running", indicating that the RabbitMQ broker has been
-started successfully. To shut down the server, just terminate the
-process or use L<rabbitmqctl(1)>.
-
-=head1 ENVIRONMENT
-
-=over
-
-=item B<RABBITMQ_MNESIA_BASE>
-
-Defaults to F</var/lib/rabbitmq/mnesia>. Set this to the directory where
-Mnesia database files should be placed.
-
-=item B<RABBITMQ_LOG_BASE>
-
-Defaults to F</var/log/rabbitmq>. Log files generated by the server will
-be placed in this directory.
-
-=item B<RABBITMQ_NODENAME>
-
-Defaults to rabbit. This can be useful if you want to run more than
-one node per machine - B<RABBITMQ_NODENAME> should be unique per
-erlang-node-and-machine combination. See clustering on a single
-machine guide at
-L<http://www.rabbitmq.com/clustering.html#single-machine> for details.
-
-=item B<RABBITMQ_NODE_IP_ADDRESS>
-
-Defaults to 0.0.0.0. This can be changed if you only want to bind to
-one network interface.
-
-=item B<RABBITMQ_NODE_PORT>
-
-Defaults to 5672.
-
-=item B<RABBITMQ_CLUSTER_CONFIG_FILE>
-
-Defaults to F</etc/rabbitmq/rabbitmq_cluster.config>. If this file is
-present it is used by the server to auto-configure a RabbitMQ cluster.
-See the clustering guide at L<http://www.rabbitmq.com/clustering.html>
-for details.
-
-=back
-
-=head1 OPTIONS
-
-=over
-
-=item B<-detached>
-
-start the server process in the background
-
-=back
-
-=head1 EXAMPLES
-
-Run RabbitMQ AMQP server in the background:
-
- rabbitmq-server -detached
-
-=head1 SEE ALSO
-
-L<rabbitmq.conf(5)>, L<rabbitmq-multi(1)>, L<rabbitmqctl(1)>
-
-=head1 AUTHOR
-
-The RabbitMQ Team <info@rabbitmq.com>
-
-=head1 REFERENCES
-
-RabbitMQ Web Site: L<http://www.rabbitmq.com>
-
diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml
new file mode 100644
index 0000000000..25c2aefb8b
--- /dev/null
+++ b/docs/rabbitmq-server.1.xml
@@ -0,0 +1,143 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd">
+<refentry lang="en">
+ <refentryinfo>
+ <productname>RabbitMQ Server</productname>
+ <authorgroup>
+ <corpauthor>The RabbitMQ Team &lt;<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>&gt;</corpauthor>
+ </authorgroup>
+ </refentryinfo>
+
+ <refmeta>
+ <refentrytitle>rabbitmq-server</refentrytitle>
+ <manvolnum>1</manvolnum>
+ <refmiscinfo class="manual">RabbitMQ Server</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+ <refname>rabbitmq-server</refname>
+ <refpurpose>start RabbitMQ AMQP server</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+ <cmdsynopsis>
+ <command>rabbitmq-multi</command>
+ <arg choice="opt">-detached</arg>
+ </cmdsynopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+ <title>Description</title>
+ <para>
+ RabbitMQ is an implementation of AMQP, the emerging standard for high
+performance enterprise messaging. The RabbitMQ server is a robust and
+scalable implementation of an AMQP broker.
+ </para>
+ <para>
+Running rabbitmq-server in the foreground displays a banner message,
+and reports on progress in the startup sequence, concluding with the
+message "broker running", indicating that the RabbitMQ broker has been
+started successfully. To shut down the server, just terminate the
+process or use rabbitmqctl(1).
+ </para>
+ </refsect1>
+
+ <refsect1>
+ <title>Environment</title>
+ <variablelist>
+
+ <varlistentry>
+ <term>RABBITMQ_MNESIA_BASE</term>
+ <listitem>
+ <para>
+Defaults to <filename>/var/lib/rabbitmq/mnesia</filename>. Set this to the directory where
+Mnesia database files should be placed.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>RABBITMQ_LOG_BASE</term>
+ <listitem>
+ <para>
+Defaults to <filename>/var/log/rabbitmq</filename>. Log files generated by the server will
+be placed in this directory.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>RABBITMQ_NODENAME</term>
+ <listitem>
+ <para>
+Defaults to rabbit. This can be useful if you want to run more than
+one node per machine - <envar>RABBITMQ_NODENAME</envar> should be unique per
+erlang-node-and-machine combination. See the
+<ulink url="http://www.rabbitmq.com/clustering.html#single-machine">clustering on a single
+machine guide</ulink> for details.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>RABBITMQ_NODE_IP_ADDRESS</term>
+ <listitem>
+ <para>
+Defaults to 0.0.0.0. This can be changed if you only want to bind to
+one network interface.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>RABBITMQ_NODE_PORT</term>
+ <listitem>
+ <para>
+Defaults to 5672.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>RABBITMQ_CLUSTER_CONFIG_FILE</term>
+ <listitem>
+ <para>
+Defaults to <filename>/etc/rabbitmq/rabbitmq_cluster.config</filename>. If this file is
+present it is used by the server to auto-configure a RabbitMQ cluster.
+See the <ulink url="http://www.rabbitmq.com/clustering.html">clustering guide</ulink>
+for details.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+ </refsect1>
+
+ <refsect1>
+ <title>Options</title>
+ <variablelist>
+ <varlistentry>
+ <term>-detached</term>
+ <listitem>
+ <para>
+ start the server process in the background
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmq-server -detached</screen>
+ <para role="example">
+ Runs RabbitMQ AMQP server in the background.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </refsect1>
+
+ <refsect1>
+ <title>See also</title>
+ <para>
+ <citerefentry><refentrytitle>rabbitmq.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry>
+ <citerefentry><refentrytitle>rabbitmq-multi</refentrytitle><manvolnum>1</manvolnum></citerefentry>
+ <citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>
+ </para>
+ </refsect1>
+</refentry>
diff --git a/docs/rabbitmq-service.xml b/docs/rabbitmq-service.xml
new file mode 100644
index 0000000000..d59ed63813
--- /dev/null
+++ b/docs/rabbitmq-service.xml
@@ -0,0 +1,228 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd">
+<refentry lang="en">
+ <refentryinfo>
+ <productname>RabbitMQ Server</productname>
+ <authorgroup>
+ <corpauthor>The RabbitMQ Team &lt;<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>&gt;</corpauthor>
+ </authorgroup>
+ </refentryinfo>
+
+ <refmeta>
+ <refentrytitle>rabbitmq-service.bat</refentrytitle>
+ <refmiscinfo class="manual">RabbitMQ Server</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+ <refname>rabbitmq-service.bat</refname>
+ <refpurpose>manage RabbitMQ AMQP service</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+ <cmdsynopsis>
+ <command>rabbitmq-service.bat</command>
+ <arg choice="opt">command</arg>
+ </cmdsynopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+ <title>Description</title>
+ <para>
+ RabbitMQ is an implementation of AMQP, the emerging standard for high
+performance enterprise messaging. The RabbitMQ server is a robust and
+scalable implementation of an AMQP broker.
+ </para>
+ <para>
+Running <command>rabbitmq-service</command> allows the RabbitMQ broker to be run as a
+service on NT/2000/2003/XP/Vista® environments. The RabbitMQ broker
+service can be started and stopped using the Windows® services
+applet.
+ </para>
+ <para>
+By default the service will run in the authentication context of the
+local system account. It is therefore necessary to synchronise Erlang
+cookies between the local system account (typically
+<filename>C:\WINDOWS\.erlang.cookie</filename> and the account that will be used to
+run <command>rabbitmqctl</command>.
+ </para>
+ </refsect1>
+
+ <refsect1>
+ <title>Commands</title>
+ <variablelist>
+
+ <varlistentry>
+ <term>help</term>
+ <listitem>
+ <para>
+Display usage information.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>install</term>
+ <listitem>
+ <para>
+Install the service. The service will not be started.
+Subsequent invocations will update the service parameters if
+relevant environment variables were modified.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>remove</term>
+ <listitem>
+ <para>
+Remove the service. If the service is running then it will
+automatically be stopped before being removed. No files will be
+deleted as a consequence and <command>rabbitmq-server</command> will remain operable.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>start</term>
+ <listitem>
+ <para>
+Start the service. The service must have been correctly installed
+beforehand.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>stop</term>
+ <listitem>
+ <para>
+Stop the service. The service must be running for this command to
+have any effect.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>disable</term>
+ <listitem>
+ <para>
+Disable the service. This is the equivalent of setting the startup
+type to <code>Disabled</code> using the service control panel.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>enable</term>
+ <listitem>
+ <para>
+Enable the service. This is the equivalent of setting the startup
+type to <code>Automatic</code> using the service control panel.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </refsect1>
+
+ <refsect1>
+ <title>Environment</title>
+ <variablelist>
+
+ <varlistentry>
+ <term>RABBITMQ_SERVICENAME</term>
+ <listitem>
+ <para>
+Defaults to RabbitMQ.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>RABBITMQ_BASE</term>
+ <listitem>
+ <para>
+Defaults to the application data directory of the current user.
+This is the location of log and database directories.
+
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>RABBITMQ_NODENAME</term>
+ <listitem>
+ <para>
+Defaults to rabbit. This can be useful if you want to run more than
+one node per machine - <envar>RABBITMQ_NODENAME</envar> should be unique per
+erlang-node-and-machine combination. See the
+<ulink url="http://www.rabbitmq.com/clustering.html#single-machine">clustering on a single
+machine guide</ulink> for details.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>RABBITMQ_NODE_IP_ADDRESS</term>
+ <listitem>
+ <para>
+Defaults to 0.0.0.0. This can be changed if you only want to bind to
+one network interface.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>RABBITMQ_NODE_PORT</term>
+ <listitem>
+ <para>
+Defaults to 5672.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>ERLANG_SERVICE_MANAGER_PATH</term>
+ <listitem>
+ <para>
+Defaults to <filename>C:\Program Files\erl5.5.5\erts-5.5.5\bin</filename>
+(or <filename>C:\Program Files (x86)\erl5.5.5\erts-5.5.5\bin</filename> for 64-bit
+environments). This is the installation location of the Erlang service
+manager.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>RABBITMQ_CLUSTER_CONFIG_FILE</term>
+ <listitem>
+ <para>
+If this file is
+present it is used by the server to auto-configure a RabbitMQ cluster.
+See the <ulink url="http://www.rabbitmq.com/clustering.html">clustering guide</ulink>
+for details.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>RABBITMQ_CONSOLE_LOG</term>
+ <listitem>
+ <para>
+Set this varable to <code>new</code> or <code>reuse</code> to have the console
+output from the server redirected to a file named <code>SERVICENAME</code>.debug
+in the application data directory of the user that installed the service.
+Under Vista this will be <filename>C:\Users\AppData\username\SERVICENAME</filename>.
+Under previous versions of Windows this will be
+<filename>C:\Documents and Settings\username\Application Data\SERVICENAME</filename>.
+If <code>RABBITMQ_CONSOLE_LOG</code> is set to <code>new</code> then a new file will be
+created each time the service starts. If <code>RABBITMQ_CONSOLE_LOG</code> is
+set to <code>reuse</code> then the file will be overwritten each time the
+service starts. The default behaviour when <code>RABBITMQ_CONSOLE_LOG</code> is
+not set or set to a value other than <code>new</code> or <code>reuse</code> is to discard
+the server output.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </refsect1>
+</refentry>
diff --git a/docs/rabbitmq.conf.5.pod b/docs/rabbitmq.conf.5.pod
deleted file mode 100644
index a7bf4c0942..0000000000
--- a/docs/rabbitmq.conf.5.pod
+++ /dev/null
@@ -1,69 +0,0 @@
-=head1 NAME
-
-F</etc/rabbitmq/rabbitmq.conf> - default settings for RabbitMQ AMQP
-server
-
-=head1 DESCRIPTION
-
-F</etc/rabbitmq/rabbitmq.conf> contains variable settings that override the
-defaults built in to the RabbitMQ startup scripts.
-
-The file is interpreted by the system shell, and so should consist of
-a sequence of shell environment variable definitions. Normal shell
-syntax is permitted (since the file is sourced using the shell "."
-operator), including line comments starting with "#".
-
-In order of preference, the startup scripts get their values from the
-environment, from F</etc/rabbitmq/rabbitmq.conf> and finally from the
-built-in default values. For example, for the B<RABBITMQ_NODENAME>
-setting,
-
-=over
-
-=item B<RABBITMQ_NODENAME>
-
-from the environment is checked first. If it is absent or equal to the
-empty string, then
-
-=item B<NODENAME>
-
-from L</etc/rabbitmq/rabbitmq.conf> is checked. If it is also absent
-or set equal to the empty string then the default value from the
-startup script is used.
-
-The variable names in /etc/rabbitmq/rabbitmq.conf are always equal to the
-environment variable names, with the B<RABBITMQ_> prefix removed:
-B<RABBITMQ_NODE_PORT> from the environment becomes B<NODE_PORT> in the
-F</etc/rabbitmq/rabbitmq.conf> file, etc.
-
-=back
-
-=head1 EXAMPLES
-
-The following is an example of a complete
-F</etc/rabbitmq/rabbitmq.conf> file that overrides the default Erlang
-node name from "rabbit" to "hare":
-
- # I am a complete /etc/rabbitmq/rabbitmq.conf file.
- # Comment lines start with a hash character.
- # This is a /bin/sh script file - use ordinary envt var syntax
- NODENAME=hare
-
-=head1 SEE ALSO
-
-L<rabbitmq-server(1)>, L<rabbitmq-multi(1)>, L<rabbitmqctl(1)>
-
-=head1 AUTHOR
-
-Originally written by The RabbitMQ Team <info@rabbitmq.com>
-
-=head1 COPYRIGHT
-
-This package, the RabbitMQ server is licensed under the MPL.
-
-If you have any questions regarding licensing, please contact us at
-info@rabbitmq.com.
-
-=head1 REFERENCES
-
-RabbitMQ Web Site: L<http://www.rabbitmq.com>
diff --git a/docs/rabbitmq.conf.5.xml b/docs/rabbitmq.conf.5.xml
new file mode 100644
index 0000000000..34f20f9226
--- /dev/null
+++ b/docs/rabbitmq.conf.5.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd">
+<refentry lang="en">
+ <refentryinfo>
+ <productname>RabbitMQ Server</productname>
+ <authorgroup>
+ <corpauthor>The RabbitMQ Team &lt;<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>&gt;</corpauthor>
+ </authorgroup>
+ </refentryinfo>
+
+ <refmeta>
+ <refentrytitle>rabbitmq.conf</refentrytitle>
+ <manvolnum>5</manvolnum>
+ <refmiscinfo class="manual">RabbitMQ Server</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+ <refname>rabbitmq.conf</refname>
+ <refpurpose>default settings for RabbitMQ AMQP server</refpurpose>
+ </refnamediv>
+
+ <refsect1>
+ <title>Description</title>
+ <para>
+<filename>/etc/rabbitmq/rabbitmq.conf</filename> contains variable settings that override the
+defaults built in to the RabbitMQ startup scripts.
+ </para>
+ <para>
+The file is interpreted by the system shell, and so should consist of
+a sequence of shell environment variable definitions. Normal shell
+syntax is permitted (since the file is sourced using the shell "."
+operator), including line comments starting with "#".
+ </para>
+ <para>
+In order of preference, the startup scripts get their values from the
+environment, from <filename>/etc/rabbitmq/rabbitmq.conf</filename> and finally from the
+built-in default values. For example, for the <envar>RABBITMQ_NODENAME</envar>
+setting,
+ </para>
+ <para>
+ <envar>RABBITMQ_NODENAME</envar>
+ </para>
+ <para>
+from the environment is checked first. If it is absent or equal to the
+empty string, then
+ </para>
+ <para>
+ <envar>NODENAME</envar>
+ </para>
+ <para>
+from <filename>/etc/rabbitmq/rabbitmq.conf</filename> is checked. If it is also absent
+or set equal to the empty string then the default value from the
+startup script is used.
+ </para>
+ <para>
+The variable names in /etc/rabbitmq/rabbitmq.conf are always equal to the
+environment variable names, with the <envar>RABBITMQ_</envar> prefix removed:
+<envar>RABBITMQ_NODE_PORT</envar> from the environment becomes <envar>NODE_PORT</envar> in the
+<filename>/etc/rabbitmq/rabbitmq.conf</filename> file, etc.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example-multiline">
+# I am a complete /etc/rabbitmq/rabbitmq.conf file.
+# Comment lines start with a hash character.
+# This is a /bin/sh script file - use ordinary envt var syntax
+NODENAME=hare
+ </screen>
+ <para role="example">
+ This is an example of a complete
+ <filename>/etc/rabbitmq/rabbitmq.conf</filename> file that overrides the default Erlang
+ node name from "rabbit" to "hare".
+ </para>
+
+ </refsect1>
+
+ <refsect1>
+ <title>See also</title>
+ <para>
+ <citerefentry><refentrytitle>rabbitmq-multi</refentrytitle><manvolnum>1</manvolnum></citerefentry>
+ <citerefentry><refentrytitle>rabbitmq-server</refentrytitle><manvolnum>1</manvolnum></citerefentry>
+ <citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>
+ </para>
+ </refsect1>
+</refentry>
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod
deleted file mode 100644
index e26767ab4f..0000000000
--- a/docs/rabbitmqctl.1.pod
+++ /dev/null
@@ -1,536 +0,0 @@
-=head1 NAME
-
-rabbitmqctl - command line tool for managing a RabbitMQ broker
-
-=head1 SYNOPSIS
-
-rabbitmqctl [-n I<node>] I<<command>> [command options]
-
-=head1 DESCRIPTION
-
-RabbitMQ is an implementation of AMQP, the emerging standard for high
-performance enterprise messaging. The RabbitMQ server is a robust and
-scalable implementation of an AMQP broker.
-
-rabbitmqctl is a command line tool for managing a RabbitMQ broker.
-It performs all actions by connecting to one of the broker's nodes.
-
-
-=head1 OPTIONS
-
-=over
-
-=item B<-n> I<node>
-
-Default node is C<rabbit@server>, where server is the local host. On
-a host named C<server.example.com>, the node name of the RabbitMQ
-Erlang node will usually be rabbit@server (unless RABBITMQ_NODENAME
-has been set to some non-default value at broker startup time). The
-output of hostname -s is usually the correct suffix to use after the
-"@" sign. See rabbitmq-server(1) for details of configuring the
-RabbitMQ broker.
-
-=item B<-q>
-
-Quiet output mode is selected with the B<-q> flag. Informational
-messages are suppressed when quiet mode is in effect.
-
-=back
-
-=head1 COMMANDS
-
-=head2 APPLICATION AND CLUSTER MANAGEMENT
-
-=over
-
-=item stop
-
-Stop the Erlang node on which RabbitMQ broker is running.
-
-=item stop_app
-
-Stop the RabbitMQ application, leaving the Erlang node running. This
-command is typically run prior to performing other management actions
-that require the RabbitMQ application to be stopped, e.g. I<reset>.
-
-=item start_app
-
-Start the RabbitMQ application. This command is typically run prior
-to performing other management actions that require the RabbitMQ
-application to be stopped, e.g. I<reset>.
-
-=item status
-
-Display various information about the RabbitMQ broker, such as whether
-the RabbitMQ application on the current node, its version number, what
-nodes are part of the broker, which of these are running.
-
-=item reset
-
-Return a RabbitMQ node to its virgin state. Removes the node from any
-cluster it belongs to, removes all data from the management database,
-such as configured users, vhosts and deletes all persistent messages.
-
-=item force_reset
-
-The same as I<reset> command, but resets the node unconditionally,
-regardless of the current management database state and cluster
-configuration. It should only be used as a last resort if the
-database or cluster configuration has been corrupted.
-
-=item rotate_logs [suffix]
-
-Instruct the RabbitMQ node to rotate the log files. The RabbitMQ
-broker will attempt to append the current contents of the log file to
-the file with the name composed of the original name and the
-suffix. It will create a new file if such a file does not already
-exist. When no I<suffix> is specified, the empty log file is simply
-created at the original location; no rotation takes place. When an
-error occurs while appending the contents of the old log file, the
-operation behaves in the same way as if no I<suffix> was specified.
-This command might be helpful when you are e.g. writing your own
-logrotate script and you do not want to restart the RabbitMQ node.
-
-=item cluster I<clusternode> ...
-
-Instruct the node to become member of a cluster with the specified
-nodes determined by I<clusternode> option(s). See
-L<http://www.rabbitmq.com/clustering.html> for more information about
-clustering.
-
-=item close_connection I<connectionpid> I<explanation>
-
-Instruct the broker to close the connection associated with the Erlang
-process id I<connectionpid> (see also the I<list_connections>
-command), passing the I<explanation> string to the connected client as
-part of the AMQP connection shutdown protocol.
-
-=back
-
-=head2 USER MANAGEMENT
-
-=over
-
-=item add_user I<username> I<password>
-
-Create a user named I<username> with (initial) password I<password>.
-
-=item delete_user I<username>
-
-Delete the user named I<username>.
-
-=item change_password I<username> I<newpassword>
-
-Change the password for the user named I<username> to I<newpassword>.
-
-=item list_users
-
-List all users, one per line.
-
-=back
-
-=head2 ACCESS CONTROL
-
-=over
-
-=item add_vhost I<vhostpath>
-
-Create a new virtual host called I<vhostpath>.
-
-=item delete_vhost I<vhostpath>
-
-Delete a virtual host I<vhostpath>. This command deletes also all its
-exchanges, queues and user mappings.
-
-=item list_vhosts
-
-List all virtual hosts, one per line.
-
-=item set_permissions [-p I<vhostpath>] I<username> I<regexp> I<regexp> I<regexp>
-
-Set the permissions for the user named I<username> in the virtual host
-I<vhostpath>, granting I<configure>, I<write> and I<read> access to
-resources with names matching the first, second and third I<regexp>,
-respectively.
-
-=item clear_permissions [-p I<vhostpath>] I<username>
-
-Remove the permissions for the user named I<username> in the virtual
-host I<vhostpath>.
-
-=item list_permissions [-p I<vhostpath>]
-
-List all the users and their permissions in the virtual host
-I<vhostpath>. Each output line contains the username and their
-I<configure>, I<write> and I<read> access regexps, separated by tab
-characters.
-
-=item list_user_permissions I<username>
-
-List the permissions of the user named I<username> across all virtual
-hosts.
-
-=back
-
-=head2 SERVER STATUS
-
-=over
-
-=item list_queues [-p I<vhostpath>] [I<queueinfoitem> ...]
-
-List queue information by virtual host. Each line printed
-describes a queue, with the requested I<queueinfoitem> values
-separated by tab characters. If no I<queueinfoitem>s are
-specified then I<name> and I<messages> are assumed.
-
-=back
-
-=head3 Queue information items
-
-=over
-
-=item name
-
-name of the queue
-
-=item durable
-
-whether the queue survives server restarts
-
-=item auto_delete
-
-whether the queue will be deleted when no longer used
-
-=item arguments
-
-queue arguments
-
-=item pid
-
-id of the Erlang process associated with the queue
-
-=item owner_pid
-
-id of the Erlang process representing the connection which is the
-exclusive owner of the queue, or empty if the queue is non-exclusive
-
-=item exclusive_consumer_pid
-
-id of the Erlang process representing the channel of the exclusive
-consumer subscribed to this queue, or empty if there is no exclusive
-consumer
-
-=item exclusive_consumer_tag
-
-consumer tag of the exclusive consumer subscribed to this queue, or
-empty if there is no exclusive consumer
-
-=item messages_ready
-
-number of messages ready to be delivered to clients
-
-=item messages_unacknowledged
-
-number of messages delivered to clients but not yet acknowledged
-
-=item messages_uncommitted
-
-number of messages published in as yet uncommitted transactions
-
-=item messages
-
-sum of ready, unacknowledged and uncommitted messages
-
-=item acks_uncommitted
-
-number of acknowledgements received in as yet uncommitted transactions
-
-=item consumers
-
-number of consumers
-
-=item transactions
-
-number of transactions
-
-=item memory
-
-bytes of memory consumed by the Erlang process for the queue,
-including stack, heap and internal structures
-
-=back
-
-=over
-
-=item list_exchanges [-p I<vhostpath>] [I<exchangeinfoitem> ...]
-
-List queue information by virtual host. Each line printed describes an
-exchange, with the requested I<exchangeinfoitem> values separated by
-tab characters. If no I<exchangeinfoitem>s are specified then I<name>
-and I<type> are assumed.
-
-=back
-
-=head3 Exchange information items
-
-=over
-
-=item name
-
-name of the exchange
-
-=item type
-
-exchange type (B<direct>, B<topic>, B<fanout>, or B<headers>)
-
-=item durable
-
-whether the exchange survives server restarts
-
-=item auto_delete
-
-whether the exchange is deleted when no longer used
-
-=item arguments
-
-exchange arguments
-
-=back
-
-=over
-
-=item list_bindings [-p I<vhostpath>]
-
-List bindings by virtual host. Each line printed describes a binding,
-with the exchange name, queue name, routing key and arguments,
-separated by tab characters.
-
-=item list_connections [I<connectioninfoitem> ...]
-
-List current AMQP connections. Each line printed describes a
-connection, with the requested I<connectioninfoitem> values separated
-by tab characters. If no I<connectioninfoitem>s are specified then
-I<user>, I<peer_address>, I<peer_port> and I<state> are assumed.
-
-=back
-
-=head3 Connection information items
-
-=over
-
-=item pid
-
-id of the Erlang process associated with the connection
-
-=item address
-
-server IP number
-
-=item port
-
-server port
-
-=item peer_address
-
-peer address
-
-=item peer_port
-
-peer port
-
-=item state
-
-connection state (B<pre-init>, B<starting>, B<tuning>, B<opening>,
-B<running>, B<closing>, B<closed>)
-
-=item channels
-
-number of channels using the connection
-
-=item user
-
-username associated with the connection
-
-=item vhost
-
-virtual host
-
-=item timeout
-
-connection timeout
-
-=item frame_max
-
-maximum frame size (bytes)
-
-=item client_properties
-
-informational properties transmitted by the client during connection
-establishment
-
-=item recv_oct
-
-octets received
-
-=item recv_cnt
-
-packets received
-
-=item send_oct
-
-octets sent
-
-=item send_cnt
-
-packets sent
-
-=item send_pend
-
-send queue size
-
-=back
-
-=over
-
-=item list_channels [I<channelinfoitem> ...]
-
-List channel information. Each line printed describes a channel, with
-the requested I<channelinfoitem> values separated by tab characters.
-If no I<channelinfoitem>s are specified then I<pid>, I<user>,
-I<transactional>, I<consumer_count>, and I<messages_unacknowledged>
-are assumed.
-
-The list includes channels which are part of ordinary AMQP connections
-(as listed by list_connections) and channels created by various
-plug-ins and other extensions.
-
-=back
-
-=head3 Channel information items
-
-=over
-
-=item pid
-
-id of the Erlang process associated with the channel
-
-=item connection
-
-id of the Erlang process associated with the connection to which the
-channel belongs
-
-=item number
-
-the number of the channel, which uniquely identifies it within a
-connection
-
-=item user
-
-username associated with the channel
-
-=item vhost
-
-virtual host in which the channel operates
-
-=item transactional
-
-true if the channel is in transactional mode, false otherwise
-
-=item consumer_count
-
-number of logical AMQP consumers retrieving messages via the channel
-
-=item messages_unacknowledged
-
-number of messages delivered via this channel but not yet acknowledged
-
-=item acks_uncommitted
-
-number of acknowledgements received in an as yet uncommitted
-transaction
-
-=item prefetch_count
-
-QoS prefetch count limit in force, 0 if unlimited
-
-=back
-
-=item list_consumers
-
-List consumers, i.e. subscriptions to a queue's message stream. Each
-line printed shows, separated by tab characters, the name of the queue
-subscribed to, the id of the channel process via which the
-subscription was created and is managed, the consumer tag which
-uniquely identifies the subscription within a channel, and a boolean
-indicating whether acknowledgements are expected for messages
-delivered to this consumer.
-
-=back
-
-The list_queues, list_exchanges, list_bindings and list_consumers
-commands accept an optional virtual host parameter for which to
-display results, defaulting to I<"/">. The default can be overridden
-with the B<-p> flag.
-
-=head1 OUTPUT ESCAPING
-
-Various items that may appear in the output of rabbitmqctl can contain
-arbitrary octets. If a octet corresponds to a non-printing ASCII
-character (values 0 to 31, and 127), it will be escaped in the output,
-using a sequence consisting of a backslash character followed by three
-octal digits giving the octet's value (i.e., as used in string
-literals in the C programming language). An octet corresponding to
-the backslash character (i.e. with value 92) will be escaped using a
-sequence of two backslash characters. Octets with a value of 128 or
-above are not escaped, in order to preserve strings encoded with
-UTF-8.
-
-The items to which this escaping scheme applies are:
-
-=over
-
-=item *
-Usernames
-
-=item *
-Virtual host names
-
-=item *
-Queue names
-
-=item *
-Exchange names
-
-=item *
-Regular expressions used for access control
-
-=back
-
-=head1 EXAMPLES
-
-Create a user named foo with (initial) password bar at the Erlang node
-rabbit@test:
-
- rabbitmqctl -n rabbit@test add_user foo bar
-
-Grant user named foo access to the virtual host called test at the
-default Erlang node:
-
- rabbitmqctl map_user_vhost foo test
-
-Append the current logs' content to the files with ".1" suffix and reopen
-them:
-
- rabbitmqctl rotate_logs .1
-
-=head1 SEE ALSO
-
-rabbitmq.conf(5), rabbitmq-multi(1), rabbitmq-server(1)
-
-=head1 AUTHOR
-
-The RabbitMQ Team <info@rabbitmq.com>
-
-=head1 REFERENCES
-
-RabbitMQ Web Site: L<http://www.rabbitmq.com>
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
new file mode 100644
index 0000000000..7634b2d247
--- /dev/null
+++ b/docs/rabbitmqctl.1.xml
@@ -0,0 +1,1042 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd">
+<!--
+ There is some extra magic in this document besides the usual DocBook semantics
+ to allow us to derive manpages, HTML and usage messages from the same source
+ document.
+
+ Examples need to be moved to the end for man pages. To this end, <para>s and
+ <screen>s with role="example" will be moved, and with role="example-prefix"
+ will be removed.
+
+ The usage messages are more involved. We have some magic in usage.xsl to pull
+ out the command synopsis, global option and subcommand synopses. We also pull
+ out <para>s with role="usage".
+
+ Finally we construct lists of possible values for subcommand options, if the
+ subcommand's <varlistentry> has role="usage-has-option-list". The option which
+ takes the values should be marked with role="usage-option-list".
+-->
+
+<refentry lang="en">
+ <refentryinfo>
+ <productname>RabbitMQ Server</productname>
+ <authorgroup>
+ <corpauthor>The RabbitMQ Team &lt;<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>&gt;</corpauthor>
+ </authorgroup>
+ </refentryinfo>
+
+ <refmeta>
+ <refentrytitle>rabbitmqctl</refentrytitle>
+ <manvolnum>1</manvolnum>
+ <refmiscinfo class="manual">RabbitMQ Service</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+ <refname>rabbitmqctl</refname>
+ <refpurpose>command line tool for managing a RabbitMQ broker</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+ <cmdsynopsis>
+ <command>rabbitmqctl</command>
+ <arg choice="opt">-n <replaceable>node</replaceable></arg>
+ <arg choice="opt">-q</arg>
+ <arg choice="req"><replaceable>command</replaceable></arg>
+ <arg choice="opt" rep="repeat"><replaceable>command options</replaceable></arg>
+ </cmdsynopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+ <title>Description</title>
+ <para>
+ RabbitMQ is an implementation of AMQP, the emerging standard for high
+ performance enterprise messaging. The RabbitMQ server is a robust and
+ scalable implementation of an AMQP broker.
+ </para>
+ <para>
+ <command>rabbitmqctl</command> is a command line tool for managing a
+ RabbitMQ broker. It performs all actions by connecting to one of the
+ broker's nodes.
+ </para>
+ </refsect1>
+
+ <refsect1>
+ <title>Options</title>
+ <variablelist>
+ <varlistentry>
+ <term><cmdsynopsis><arg choice="opt">-n <replaceable>node</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para role="usage">
+ Default node is "rabbit@server", where server is the local host. On
+ a host named "server.example.com", the node name of the RabbitMQ
+ Erlang node will usually be rabbit@server (unless RABBITMQ_NODENAME
+ has been set to some non-default value at broker startup time). The
+ output of <command>hostname -s</command> is usually the correct suffix to use after the
+ "@" sign. See rabbitmq-server(1) for details of configuring the
+ RabbitMQ broker.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><arg choice="opt">-q</arg></cmdsynopsis></term>
+ <listitem>
+ <para role="usage">
+ Quiet output mode is selected with the "-q" flag. Informational
+ messages are suppressed when quiet mode is in effect.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Flags must precede all other parameters to <command>rabbitmqctl</command>.
+ </para>
+ </refsect1>
+
+ <refsect1>
+ <title>Commands</title>
+
+ <refsect2>
+ <title>Application and Cluster Management</title>
+
+ <variablelist>
+ <varlistentry>
+ <term><cmdsynopsis><command>stop</command></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Stops the Erlang node on which RabbitMQ is running. To
+ restart the node follow the instructions for <citetitle>Running
+ the Server</citetitle> in the <ulink url="http://www.rabbitmq.com/install.html">installation
+ guide</ulink>.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl stop</screen>
+ <para role="example">
+ This command instructs the RabbitMQ node to terminate.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="stop_app">
+ <term><cmdsynopsis><command>stop_app</command></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Stops the RabbitMQ application, leaving the Erlang node
+ running.
+ </para>
+ <para>
+ This command is typically run prior to performing other
+ management actions that require the RabbitMQ application
+ to be stopped, e.g. <link
+ linkend="reset"><command>reset</command></link>.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl stop_app</screen>
+ <para role="example">
+ This command instructs the RabbitMQ node to stop the
+ RabbitMQ application.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>start_app</command></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Starts the RabbitMQ application.
+ </para>
+ <para>
+ This command is typically run after performing other
+ management actions that required the RabbitMQ application
+ to be stopped, e.g. <link
+ linkend="reset"><command>reset</command></link>.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl start_app</screen>
+ <para role="example">
+ This command instructs the RabbitMQ node to start the
+ RabbitMQ application.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>status</command></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Displays various information about the RabbitMQ broker,
+ such as whether the RabbitMQ application on the current
+ node, its version number, what nodes are part of the
+ broker, which of these are running.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl status</screen>
+ <para role="example">
+ This command displays information about the RabbitMQ
+ broker.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="reset">
+ <term><cmdsynopsis><command>reset</command></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Return a RabbitMQ node to its virgin state.
+ </para>
+ <para>
+ Removes the node from any cluster it belongs to, removes
+ all data from the management database, such as configured
+ users and vhosts, and deletes all persistent
+ messages.
+ </para>
+ <para>
+ For <command>reset</command> and <command>force_reset</command> to
+ succeed the RabbitMQ application must have been stopped,
+ e.g. with <link linkend="stop_app"><command>stop_app</command></link>.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl reset</screen>
+ <para role="example">
+ This command resets the RabbitMQ node.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>force_reset</command></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Forcefully return a RabbitMQ node to its virgin state.
+ </para>
+ <para>
+ The <command>force_reset</command> command differs from
+ <command>reset</command> in that it resets the node
+ unconditionally, regardless of the current management
+ database state and cluster configuration. It should only
+ be used as a last resort if the database or cluster
+ configuration has been corrupted.
+ </para>
+ <para>
+ For <command>reset</command> and <command>force_reset</command> to
+ succeed the RabbitMQ application must have been stopped,
+ e.g. with <link linkend="stop_app"><command>stop_app</command></link>.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl force_reset</screen>
+ <para role="example">
+ This command resets the RabbitMQ node.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>rotate_logs</command> <arg choice="req"><replaceable>suffix</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Instruct the RabbitMQ node to rotate the log files.
+ </para>
+ <para>
+ The RabbitMQ broker will attempt to append the current contents
+ of the log file to the file with name composed of the original
+ name and the suffix.
+ It will create a new file if such a file does not already exist.
+ When no <option>suffix</option> is specified, the empty log file is
+ simply created at the original location; no rotation takes place.
+ </para>
+ <para>
+ When an error occurs while appending the contents of the old log
+ file, the operation behaves in the same way as if no <option>suffix</option> was
+ specified.
+ </para>
+ <para>
+ This command might be helpful when you are e.g. writing your
+ own logrotate script and you do not want to restart the RabbitMQ
+ node.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl rotate_logs .1</screen>
+ <para role="example">
+ This command instructs the RabbitMQ node to append the current content
+ of the log files to the files with names consisting of the original logs'
+ names and ".1" suffix, e.g. rabbit.log.1. Finally, the old log files are reopened.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </refsect2>
+
+ <refsect2>
+ <title>Cluster management</title>
+
+ <variablelist>
+ <varlistentry>
+ <term><cmdsynopsis><command>cluster</command> <arg choice="req"><replaceable>clusternode</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>clusternode</term>
+ <listitem><para>Subset of the nodes of the cluster to which this node should be connected.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Instruct the node to become member of a cluster with the
+ specified nodes.
+ </para>
+ <para>
+ Cluster nodes can be of two types: disk or ram. Disk nodes
+ replicate data in ram and on disk, thus providing
+ redundancy in the event of node failure and recovery from
+ global events such as power failure across all nodes. Ram
+ nodes replicate data in ram only and are mainly used for
+ scalability. A cluster must always have at least one disk node.
+ </para>
+ <para>
+ If the current node is to become a disk node it needs to
+ appear in the cluster node list. Otherwise it becomes a
+ ram node. If the node list is empty or only contains the
+ current node then the node becomes a standalone,
+ i.e. non-clustered, (disk) node.
+ </para>
+ <para>
+ After executing the <command>cluster</command> command, whenever
+ the RabbitMQ application is started on the current node it
+ will attempt to connect to the specified nodes, thus
+ becoming an active node in the cluster comprising those
+ nodes (and possibly others).
+ </para>
+ <para>
+ The list of nodes does not have to contain all the
+ cluster's nodes; a subset is sufficient. Also, clustering
+ generally succeeds as long as at least one of the
+ specified nodes is active. Hence adjustments to the list
+ are only necessary if the cluster configuration is to be
+ altered radically.
+ </para>
+ <para>
+ For this command to succeed the RabbitMQ application must
+ have been stopped, e.g. with <link linkend="stop_app"><command>stop_app</command></link>. Furthermore,
+ turning a standalone node into a clustered node requires
+ the node be <link linkend="reset"><command>reset</command></link> first,
+ in order to avoid accidental destruction of data with the
+ <command>cluster</command> command.
+ </para>
+ <para>
+ For more details see the <ulink url="http://www.rabbitmq.com/clustering.html">clustering guide</ulink>.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl cluster rabbit@tanto hare@elena</screen>
+ <para role="example">
+ This command instructs the RabbitMQ node to join the
+ cluster with nodes <command>rabbit@tanto</command> and
+ <command>hare@elena</command>. If the node is one of these then
+ it becomes a disk node, otherwise a ram node.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </refsect2>
+
+ <refsect2>
+ <title>Closing individual connections</title>
+
+ <variablelist>
+ <varlistentry>
+ <term><cmdsynopsis><command>close_connection</command> <arg choice="req"><replaceable>connectionpid</replaceable></arg> <arg choice="req"><replaceable>explanation</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>connectionpid</term>
+ <listitem><para>Id of the Erlang process associated with the connection to close.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>explanation</term>
+ <listitem><para>Explanation string.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Instruct the broker to close the connection associated
+ with the Erlang process id <option>connectionpid</option> (see also the
+ <link linkend="list_connections"><command>list_connections</command></link>
+ command), passing the <option>explanation</option> string to the
+ connected client as part of the AMQP connection shutdown
+ protocol.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl close_connection "&lt;rabbit@tanto.4262.0&gt;" "go away"</screen>
+ <para role="example">
+ This command instructs the RabbitMQ broker to close the
+ connection associated with the Erlang process
+ id <command>&lt;rabbit@tanto.4262.0&gt;</command>, passing the
+ explanation <command>go away</command> to the connected client.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </refsect2>
+
+ <refsect2>
+ <title>User management</title>
+
+ <variablelist>
+ <varlistentry>
+ <term><cmdsynopsis><command>add_user</command> <arg choice="req"><replaceable>username</replaceable></arg> <arg choice="req"><replaceable>password</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>username</term>
+ <listitem><para>The name of the user to create.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>password</term>
+ <listitem><para>The password the created user will use to log in to the broker.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl add_user tonyg changeit</screen>
+ <para role="example">
+ This command instructs the RabbitMQ broker to create a
+ user named <command>tonyg</command> with (initial) password
+ <command>changeit</command>.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>delete_user</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>username</term>
+ <listitem><para>The name of the user to delete.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl delete_user tonyg</screen>
+ <para role="example">
+ This command instructs the RabbitMQ broker to delete the
+ user named <command>tonyg</command>.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>change_password</command> <arg choice="req"><replaceable>username</replaceable></arg> <arg choice="req"><replaceable>newpassword</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>username</term>
+ <listitem><para>The name of the user whose password is to be changed.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>newpassword</term>
+ <listitem><para>The new password for the user.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl change_password tonyg newpass</screen>
+ <para role="example">
+ This command instructs the RabbitMQ broker to change the
+ password for the user named <command>tonyg</command> to
+ <command>newpass</command>.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>list_users</command></cmdsynopsis></term>
+ <listitem>
+ <para>Lists users</para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl list_users</screen>
+ <para role="example">
+ This command instructs the RabbitMQ broker to list all users.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </refsect2>
+
+ <refsect2>
+ <title>Access control</title>
+
+ <variablelist>
+ <varlistentry>
+ <term><cmdsynopsis><command>add_vhost</command> <arg choice="req"><replaceable>vhostpath</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>vhostpath</term>
+ <listitem><para>The name of the virtual host entry to create.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Creates a virtual host.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl add_vhost test</screen>
+ <para role="example">
+ This command instructs the RabbitMQ broker to create a new
+ virtual host called <command>test</command>.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>delete_vhost</command> <arg choice="req"><replaceable>vhostpath</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>vhostpath</term>
+ <listitem><para>The name of the virtual host entry to delete.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Deletes a virtual host.
+ </para>
+ <para>
+ Deleting a virtual host deletes all its exchanges,
+ queues, user mappings and associated permissions.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl delete_vhost test</screen>
+ <para role="example">
+ This command instructs the RabbitMQ broker to delete the
+ virtual host called <command>test</command>.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>list_vhosts</command></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Lists virtual hosts.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl list_vhosts</screen>
+ <para role="example">
+ This command instructs the RabbitMQ broker to list all
+ virtual hosts.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>set_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>username</replaceable></arg> <arg choice="req"><replaceable>configure</replaceable></arg> <arg choice="req"><replaceable>write</replaceable></arg> <arg choice="req"><replaceable>read</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>vhostpath</term>
+ <listitem><para>The name of the virtual host to which to grant the user access, defaulting to <command>/</command>.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>username</term>
+ <listitem><para>The name of the user to grant access to the specified virtual host.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>configure</term>
+ <listitem><para>A regular expression matching resource names for which the user is granted configure permissions.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>write</term>
+ <listitem><para>A regular expression matching resource names for which the user is granted write permissions.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>read</term>
+ <listitem><para>A regular expression matching resource names for which the user is granted read permissions.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Sets user permissions.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl set_permissions -p /myvhost tonyg "^tonyg-.*" ".*" ".*"</screen>
+ <para role="example">
+ This command instructs the RabbitMQ broker to grant the
+ user named <command>tonyg</command> access to the virtual host
+ called <command>/myvhost</command>, with configure permissions
+ on all resources whose names starts with "tonyg-", and
+ write and read permissions on all resources.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>clear_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>vhostpath</term>
+ <listitem><para>The name of the virtual host to which to deny the user access, defaulting to <command>/</command>.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>username</term>
+ <listitem><para>The name of the user to deny access to the specified virtual host.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Sets user permissions.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl clear_permissions -p /myvhost tonyg</screen>
+ <para role="example">
+ This command instructs the RabbitMQ broker to deny the
+ user named <command>tonyg</command> access to the virtual host
+ called <command>/myvhost</command>.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>list_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>vhostpath</term>
+ <listitem><para>The name of the virtual host for which to list the users that have been granted access to it, and their permissions. Defaults to <command>/</command>.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Lists permissions in a virtual host.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl list_permissions -p /myvhost</screen>
+ <para role="example">
+ This command instructs the RabbitMQ broker to list all the
+ users which have been granted access to the virtual host
+ called <command>/myvhost</command>, and the permissions they
+ have for operations on resources in that virtual host.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>list_user_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>username</term>
+ <listitem><para>The name of the user for which to list the permissions.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Lists user permissions.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl list_user_permissions tonyg</screen>
+ <para role="example">
+ This command instructs the RabbitMQ broker to list all the
+ virtual hosts to which the user named <command>tonyg</command>
+ has been granted access, and the permissions the user has
+ for operations on resources in these virtual hosts.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </refsect2>
+
+ <refsect2>
+ <title>Server Status</title>
+ <para>
+ The server status queries interrogate the server and return a list of
+ results with tab-delimited columns. Some queries (<command>list_queues</command>,
+ <command>list_exchanges</command>, <command>list_bindings</command>, and
+ <command>list_consumers</command>) accept an
+ optional <command>vhost</command> parameter. This parameter, if present, must be
+ specified immediately after the query.
+ </para>
+ <para role="usage">
+ The list_queues, list_exchanges and list_bindings commands accept an
+ optional virtual host parameter for which to display results. The
+ default value is "/".
+ </para>
+
+ <variablelist>
+ <varlistentry role="usage-has-option-list">
+ <term><cmdsynopsis><command>list_queues</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="opt" role="usage-option-list"><replaceable>queueinfoitem</replaceable> ...</arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Returns queue details. Queue details of the <command>/</command> virtual host
+ are returned if the "-p" flag is absent. The "-p" flag can be used to
+ override this default.
+ </para>
+ <para>
+ The <command>queueinfoitem</command> parameter is used to indicate which queue
+ information items to include in the results. The column order in the
+ results will match the order of the parameters.
+ <command>queueinfoitem</command> can take any value from the list
+ that follows:
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term>name</term>
+ <listitem><para>The name of the queue with non-ASCII characters URL-escaped.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>durable</term>
+ <listitem><para>Whether or not the queue survives server restarts.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>auto_delete</term>
+ <listitem><para>Whether the queue will be deleted automatically when no longer used.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>arguments</term>
+ <listitem><para>Queue arguments.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>pid</term>
+ <listitem><para>Id of the Erlang process associated with the queue.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>owner_pid</term>
+ <listitem><para>Id of the Erlang process representing the connection
+ which is the exclusive owner of the queue. Empty if the
+ queue is non-exclusive.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>exclusive_consumer_pid</term>
+ <listitem><para>Id of the Erlang process representing the channel of the
+ exclusive consumer subscribed to this queue. Empty if
+ there is no exclusive consumer.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>exclusive_consumer_tag</term>
+ <listitem><para>Consumer tag of the exclusive consumer subscribed to
+ this queue. Empty if there is no exclusive consumer.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>messages_ready</term>
+ <listitem><para>Number of messages ready to be delivered to clients.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>messages_unacknowledged</term>
+ <listitem><para>Number of messages delivered to clients but not yet acknowledged.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>messages_uncommitted</term>
+ <listitem><para>Number of messages published in as yet uncommitted transactions</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>messages</term>
+ <listitem><para>Sum of ready, unacknowledged and uncommitted messages
+ (queue depth).</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>acks_uncommitted</term>
+ <listitem><para>Number of acknowledgements received in as yet uncommitted
+ transactions.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>consumers</term>
+ <listitem><para>Number of consumers.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>transactions</term>
+ <listitem><para>Number of transactions.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>memory</term>
+ <listitem><para>Bytes of memory consumed by the Erlang process associated with the
+ queue, including stack, heap and internal structures.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ If no <command>queueinfoitem</command>s are specified then queue name and depth are
+ displayed.
+ </para>
+ <para role="example-prefix">
+ For example:
+ </para>
+ <screen role="example">rabbitmqctl list_queues -p /myvhost messages consumers</screen>
+ <para role="example">
+ This command displays the depth and number of consumers for each
+ queue of the virtual host named <command>/myvhost</command>.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry role="usage-has-option-list">
+ <term><cmdsynopsis><command>list_exchanges</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="opt" role="usage-option-list"><replaceable>exchangeinfoitem</replaceable> ...</arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Returns exchange details. Exchange details of the <command>/</command> virtual host
+ are returned if the "-p" flag is absent. The "-p" flag can be used to
+ override this default.
+ </para>
+ <para>
+ The <command>exchangeinfoitem</command> parameter is used to indicate which
+ exchange information items to include in the results. The column order in the
+ results will match the order of the parameters.
+ <command>exchangeinfoitem</command> can take any value from the list
+ that follows:
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term>name</term>
+ <listitem><para>The name of the exchange with non-ASCII characters URL-escaped.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>type</term>
+ <listitem><para>The exchange type (one of [<command>direct</command>,
+ <command>topic</command>, <command>headers</command>,
+ <command>fanout</command>]).</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>durable</term>
+ <listitem><para>Whether or not the exchange survives server restarts.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>auto_delete</term>
+ <listitem><para>Whether the exchange will be deleted automatically when no longer used.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>arguments</term>
+ <listitem><para>Exchange arguments.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ If no <command>exchangeinfoitem</command>s are specified then
+ exchange name and type are displayed.
+ </para>
+ <para role="example-prefix">
+ For example:
+ </para>
+ <screen role="example">rabbitmqctl list_exchanges -p /myvhost name type</screen>
+ <para role="example">
+ This command displays the name and type for each
+ exchange of the virtual host named <command>/myvhost</command>.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ <variablelist>
+ <varlistentry>
+ <term><cmdsynopsis><command>list_bindings</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ By default the bindings for the <command>/</command> virtual
+ host are returned. The "-p" flag can be used to override
+ this default. Each result row will contain an exchange
+ name, queue name, routing key and binding arguments, in
+ that order. Non-ASCII characters will be URL-encoded.
+ </para>
+ <para role="usage">
+ The output format for "list_bindings" is a list of rows containing
+ exchange name, queue name, routing key and arguments, in that order.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="list_connections" role="usage-has-option-list">
+ <term><cmdsynopsis><command>list_connections</command> <arg choice="opt" role="usage-option-list"><replaceable>connectioninfoitem</replaceable> ...</arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Returns TCP/IP connection statistics.
+ </para>
+ <para>
+ The <command>connectioninfoitem</command> parameter is used to indicate
+ which connection information items to include in the results. The
+ column order in the results will match the order of the parameters.
+ <command>connectioninfoitem</command> can take any value from the list
+ that follows:
+ </para>
+
+ <variablelist>
+ <varlistentry>
+ <term>pid</term>
+ <listitem><para>Id of the Erlang process associated with the connection.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>address</term>
+ <listitem><para>Server IP address.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>port</term>
+ <listitem><para>Server port.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>peer_address</term>
+ <listitem><para>Peer address.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>peer_port</term>
+ <listitem><para>Peer port.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>state</term>
+ <listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>,
+ <command>opening</command>, <command>running</command>, <command>closing</command>, <command>closed</command>]).</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>channels</term>
+ <listitem><para>Number of channels using the connection.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>user</term>
+ <listitem><para>Username associated with the connection.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>vhost</term>
+ <listitem><para>Virtual host name with non-ASCII characters URL-escaped.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>timeout</term>
+ <listitem><para>Connection timeout.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>frame_max</term>
+ <listitem><para>Maximum frame size (bytes).</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>client_properties</term>
+ <listitem><para>Informational properties transmitted by the client
+ during connection establishment.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>recv_oct</term>
+ <listitem><para>Octets received.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>recv_cnt</term>
+ <listitem><para>Packets received.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>send_oct</term>
+ <listitem><para>Octets send.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>send_cnt</term>
+ <listitem><para>Packets sent.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>send_pend</term>
+ <listitem><para>Send queue size.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ If no <command>connectioninfoitem</command>s are specified then user, peer
+ address, peer port and connection state are displayed.
+ </para>
+
+ <para role="example-prefix">
+ For example:
+ </para>
+ <screen role="example">rabbitmqctl list_connections send_pend server_port</screen>
+ <para role="example">
+ This command displays the send queue size and server port for each
+ connection.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry role="usage-has-option-list">
+ <term><cmdsynopsis><command>list_channels</command> <arg choice="opt" role="usage-option-list"><replaceable>channelinfoitem</replaceable> ...</arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Returns information on all current channels, the logical
+ containers executing most AMQP commands. This includes
+ channels that are part of ordinary AMQP connections, and
+ channels created by various plug-ins and other extensions.
+ </para>
+ <para>
+ The <command>channelinfoitem</command> parameter is used to
+ indicate which channel information items to include in the
+ results. The column order in the results will match the
+ order of the parameters.
+ <command>channelinfoitem</command> can take any value from the list
+ that follows:
+ </para>
+
+ <variablelist>
+ <varlistentry>
+ <term>pid</term>
+ <listitem><para>Id of the Erlang process associated with the connection.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>connection</term>
+ <listitem><para>Id of the Erlang process associated with the connection
+ to which the channel belongs.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>number</term>
+ <listitem><para>The number of the channel, which uniquely identifies it within
+ a connection.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>user</term>
+ <listitem><para>Username associated with the channel.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>vhost</term>
+ <listitem><para>Virtual host in which the channel operates.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>transactional</term>
+ <listitem><para>True if the channel is in transactional mode, false otherwise.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>consumer_count</term>
+ <listitem><para>Number of logical AMQP consumers retrieving messages via
+ the channel.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>messages_unacknowledged</term>
+ <listitem><para>Number of messages delivered via this channel but not
+ yet acknowledged.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>acks_uncommitted</term>
+ <listitem><para>Number of acknowledgements received in an as yet
+ uncommitted transaction.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>prefetch_count</term>
+ <listitem><para>QoS prefetch count limit in force, 0 if unlimited.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ If no <command>channelinfoitem</command>s are specified then pid,
+ user, transactional, consumer_count, and
+ messages_unacknowledged are assumed.
+ </para>
+
+ <para role="example-prefix">
+ For example:
+ </para>
+ <screen role="example">rabbitmqctl list_channels connection messages_unacknowledged</screen>
+ <para role="example">
+ This command displays the connection process and count
+ of unacknowledged messages for each channel.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>list_consumers</command></cmdsynopsis></term>
+ <listitem>
+ <para>
+ List consumers, i.e. subscriptions to a queue's message
+ stream. Each line printed shows, separated by tab
+ characters, the name of the queue subscribed to, the id of
+ the channel process via which the subscription was created
+ and is managed, the consumer tag which uniquely identifies
+ the subscription within a channel, and a boolean
+ indicating whether acknowledgements are expected for
+ messages delivered to this consumer.
+ </para>
+ <para role="usage">
+ The output format for "list_consumers" is a list of rows containing,
+ in order, the queue name, channel process id, consumer tag, and a
+ boolean indicating whether acknowledgements are expected from the
+ consumer.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </refsect2>
+ </refsect1>
+
+</refentry>
diff --git a/docs/remove-namespaces.xsl b/docs/remove-namespaces.xsl
new file mode 100644
index 0000000000..58a1e826d2
--- /dev/null
+++ b/docs/remove-namespaces.xsl
@@ -0,0 +1,17 @@
+<?xml version='1.0'?>
+<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
+ xmlns:doc="http://www.rabbitmq.com/namespaces/ad-hoc/doc"
+ version='1.0'>
+
+<xsl:output method="xml" />
+
+ <!-- Copy every element through with local name only -->
+ <xsl:template match="*">
+ <xsl:element name="{local-name()}">
+ <xsl:apply-templates select="@*|node()"/>
+ </xsl:element>
+ </xsl:template>
+
+ <!-- Copy every attribute through -->
+ <xsl:template match="@*"><xsl:copy/></xsl:template>
+</xsl:stylesheet>
diff --git a/docs/usage.xsl b/docs/usage.xsl
new file mode 100644
index 0000000000..72f8880ab1
--- /dev/null
+++ b/docs/usage.xsl
@@ -0,0 +1,78 @@
+<?xml version='1.0'?>
+<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
+ xmlns:exsl="http://exslt.org/common"
+ xmlns:ng="http://docbook.org/docbook-ng"
+ xmlns:db="http://docbook.org/ns/docbook"
+ exclude-result-prefixes="exsl"
+ version='1.0'>
+
+<xsl:param name="modulename"/>
+
+<xsl:output method="text"
+ encoding="UTF-8"
+ indent="no"/>
+<xsl:strip-space elements="*"/>
+<xsl:preserve-space elements="cmdsynopsis arg" />
+
+<xsl:template match="/">
+<!-- Pull out cmdsynopsis to show the command usage line. -->%% Generated, do not edit!
+-module(<xsl:value-of select="$modulename" />).
+-export([usage/0]).
+usage() -> %QUOTE%Usage:
+<xsl:value-of select="refentry/refsynopsisdiv/cmdsynopsis/command"/>
+<xsl:text> </xsl:text>
+<xsl:for-each select="refentry/refsynopsisdiv/cmdsynopsis/arg">
+ <xsl:apply-templates select="." />
+ <xsl:text> </xsl:text>
+</xsl:for-each>
+
+<xsl:text>&#10;</xsl:text>
+
+<!-- List options (any variable list in a section called "Options"). -->
+<xsl:for-each select=".//*[title='Options']/variablelist">
+ <xsl:if test="position() = 1">&#10;Options:&#10;</xsl:if>
+ <xsl:for-each select="varlistentry">
+ <xsl:text> </xsl:text>
+ <xsl:for-each select=".//term">
+ <xsl:value-of select="."/>
+ <xsl:if test="not(position() = last())">, </xsl:if>
+ </xsl:for-each><xsl:text>&#10;</xsl:text>
+ </xsl:for-each>
+</xsl:for-each>
+
+<!-- Any paragraphs which have been marked as role="usage" (principally for global flags). -->
+<xsl:text>&#10;</xsl:text>
+<xsl:for-each select=".//*[title='Options']//para[@role='usage']">
+<xsl:value-of select="normalize-space(.)"/><xsl:text>&#10;&#10;</xsl:text>
+</xsl:for-each>
+
+<!-- List commands (any first-level variable list in a section called "Commands"). -->
+<xsl:for-each select=".//*[title='Commands']/variablelist | .//*[title='Commands']/refsect2/variablelist">
+ <xsl:if test="position() = 1">Commands:&#10;</xsl:if>
+ <xsl:for-each select="varlistentry">
+ <xsl:text> </xsl:text>
+ <xsl:apply-templates select="term"/>
+ <xsl:text>&#10;</xsl:text>
+ </xsl:for-each>
+ <xsl:text>&#10;</xsl:text>
+</xsl:for-each>
+
+<xsl:apply-templates select=".//*[title='Commands']/refsect2" mode="command-usage" />
+%QUOTE%.
+</xsl:template>
+
+<!-- Option lists in command usage -->
+<xsl:template match="varlistentry[@role='usage-has-option-list']" mode="command-usage">&lt;<xsl:value-of select="term/cmdsynopsis/arg[@role='usage-option-list']/replaceable"/>&gt; must be a member of the list [<xsl:for-each select="listitem/variablelist/varlistentry"><xsl:apply-templates select="term"/><xsl:if test="not(position() = last())">, </xsl:if></xsl:for-each>].<xsl:text>&#10;&#10;</xsl:text></xsl:template>
+
+<!-- Usage paras in command usage -->
+<xsl:template match="para[@role='usage']" mode="command-usage">
+<xsl:value-of select="normalize-space(.)"/><xsl:text>&#10;&#10;</xsl:text>
+</xsl:template>
+
+<!-- Don't show anything else in command usage -->
+<xsl:template match="text()" mode="command-usage"/>
+
+<xsl:template match="arg[@choice='opt']">[<xsl:apply-templates/>]</xsl:template>
+<xsl:template match="replaceable">&lt;<xsl:value-of select="."/>&gt;</xsl:template>
+
+</xsl:stylesheet>
diff --git a/generate_deps b/generate_deps
index 916006d101..29587b5a5f 100644
--- a/generate_deps
+++ b/generate_deps
@@ -23,10 +23,11 @@ main([IncludeDir, ErlDir, EbinDir, TargetFile]) ->
ok;
(Path, Dep, ok) ->
Module = filename:basename(Path, ".erl"),
- ok = file:write(Hdl, [EbinDir, "/", Module, ".beam:"]),
+ ok = file:write(Hdl, [EbinDir, "/", Module, ".beam: ",
+ Path]),
ok = sets:fold(fun (E, ok) -> file:write(Hdl, [" ", E]) end,
ok, Dep),
- file:write(Hdl, [" ", ErlDir, "/", Module, ".erl\n"])
+ file:write(Hdl, ["\n"])
end, ok, Deps),
ok = file:write(Hdl, [TargetFile, ": ", escript:script_name(), "\n"]),
ok = file:sync(Hdl),
@@ -35,7 +36,8 @@ main([IncludeDir, ErlDir, EbinDir, TargetFile]) ->
detect_deps(IncludeDir, EbinDir, Modules, Headers, Path) ->
{ok, Forms} = epp:parse_file(Path, [IncludeDir], [{use_specs, true}]),
lists:foldl(
- fun ({attribute, _LineNumber, behaviour, Behaviour}, Deps) ->
+ fun ({attribute, _LineNumber, Attribute, Behaviour}, Deps)
+ when Attribute =:= behaviour orelse Attribute =:= behavior ->
case sets:is_element(Behaviour, Modules) of
true -> sets:add_element(
[EbinDir, "/", atom_to_list(Behaviour), ".beam"],
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index e2980eff70..552aec6726 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -62,7 +62,8 @@
-record(listener, {node, protocol, host, port}).
--record(basic_message, {exchange_name, routing_key, content, persistent_key}).
+-record(basic_message, {exchange_name, routing_key, content, guid,
+ is_persistent}).
-record(ssl_socket, {tcp, ssl}).
-record(delivery, {mandatory, immediate, txn, sender, message}).
@@ -83,6 +84,7 @@
-type(info_key() :: atom()).
-type(info() :: {info_key(), any()}).
-type(regexp() :: binary()).
+-type(file_path() :: string()).
%% this is really an abstract type, but dialyzer does not support them
-type(guid() :: any()).
@@ -144,7 +146,8 @@
#basic_message{exchange_name :: exchange_name(),
routing_key :: routing_key(),
content :: content(),
- persistent_key :: maybe(pkey())}).
+ guid :: guid(),
+ is_persistent :: boolean()}).
-type(message() :: basic_message()).
-type(delivery() ::
#delivery{mandatory :: boolean(),
@@ -154,7 +157,7 @@
message :: message()}).
%% this really should be an abstract type
-type(msg_id() :: non_neg_integer()).
--type(msg() :: {queue_name(), pid(), msg_id(), boolean(), message()}).
+-type(qmsg() :: {queue_name(), pid(), msg_id(), boolean(), message()}).
-type(listener() ::
#listener{node :: erlang_node(),
protocol :: atom(),
@@ -166,6 +169,7 @@
#amqp_error{name :: atom(),
explanation :: string(),
method :: atom()}).
+
-endif.
%%----------------------------------------------------------------------------
@@ -173,6 +177,11 @@
-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.").
-define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/").
+-define(MAX_WAIT, 16#ffffffff).
+
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+
-ifdef(debug).
-define(LOGDEBUG0(F), rabbit_log:debug(F)).
-define(LOGDEBUG(F,A), rabbit_log:debug(F,A)).
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
new file mode 100644
index 0000000000..9864f1eb64
--- /dev/null
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -0,0 +1,42 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+-ifdef(use_specs).
+
+-spec(description/0 :: () -> [{atom(), any()}]).
+-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
+-spec(validate/1 :: (exchange()) -> 'ok').
+-spec(create/1 :: (exchange()) -> 'ok').
+-spec(recover/2 :: (exchange(), list(binding())) -> 'ok').
+-spec(delete/2 :: (exchange(), list(binding())) -> 'ok').
+-spec(add_binding/2 :: (exchange(), binding()) -> 'ok').
+-spec(remove_bindings/2 :: (exchange(), list(binding())) -> 'ok').
+
+-endif.
diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl
index 199a0f89c8..1a9798998c 100644
--- a/include/rabbit_framing_spec.hrl
+++ b/include/rabbit_framing_spec.hrl
@@ -56,5 +56,5 @@
-type(password() :: binary()).
-type(vhost() :: binary()).
-type(ctag() :: binary()).
--type(exchange_type() :: 'direct' | 'topic' | 'fanout').
+-type(exchange_type() :: atom()).
-type(binding_key() :: binary()).
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index bc5b58cad8..74a1800adb 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -39,11 +39,7 @@ prepare:
cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate
server: prepare
- rpmbuild -ba --nodeps SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) \
- --target i386
- rpmbuild -ba --nodeps SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) \
- --define '_libdir /usr/lib64' --define '_arch x86_64' \
- --define '_defaultdocdir /usr/share/doc' --target x86_64
+ rpmbuild -ba --nodeps SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES)
clean:
rm -rf SOURCES SPECS RPMS SRPMS BUILD tmp
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 54d0c8f38b..c318a96c22 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -11,6 +11,7 @@ Source2: rabbitmq-script-wrapper
Source3: rabbitmq-server.logrotate
Source4: rabbitmq-asroot-script-wrapper
URL: http://www.rabbitmq.com/
+BuildArch: noarch
BuildRequires: erlang, python-simplejson
Requires: erlang, logrotate
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root
@@ -23,8 +24,9 @@ RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
scalable implementation of an AMQP broker.
-%define _rabbit_erllibdir %{_libdir}/rabbitmq/lib/rabbitmq_server-%{version}
-%define _rabbit_libdir %{_libdir}/rabbitmq
+# We want to install into /usr/lib, even on 64-bit platforms
+%define _rabbit_libdir %{_exec_prefix}/lib/rabbitmq
+%define _rabbit_erllibdir %{_rabbit_libdir}/lib/rabbitmq_server-%{version}
%define _rabbit_wrapper %{_builddir}/`basename %{S:2}`
%define _rabbit_asroot_wrapper %{_builddir}/`basename %{S:4}`
@@ -35,9 +37,7 @@ scalable implementation of an AMQP broker.
%build
cp %{S:2} %{_rabbit_wrapper}
-sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_wrapper}
cp %{S:4} %{_rabbit_asroot_wrapper}
-sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_asroot_wrapper}
make %{?_smp_mflags}
%install
@@ -65,12 +65,12 @@ mkdir -p %{buildroot}%{_sysconfdir}/rabbitmq
rm %{_maindir}/LICENSE %{_maindir}/LICENSE-MPL-RabbitMQ %{_maindir}/INSTALL
#Build the list of files
-rm -f %{_builddir}/filelist.%{name}.rpm
-echo '%defattr(-,root,root, -)' >> %{_builddir}/filelist.%{name}.rpm
+rm -f %{_builddir}/%{name}.files
+echo '%defattr(-,root,root, -)' >> %{_builddir}/%{name}.files
(cd %{buildroot}; \
find . -type f ! -regex '\.%{_sysconfdir}.*' \
! -regex '\.\(%{_rabbit_erllibdir}\|%{_rabbit_libdir}\).*' \
- | sed -e 's/^\.//' >> %{_builddir}/filelist.%{name}.rpm)
+ | sed -e 's/^\.//' >> %{_builddir}/%{name}.files)
%pre
@@ -103,7 +103,7 @@ if [ $1 = 0 ]; then
# Leave rabbitmq user and group
fi
-%files -f ../filelist.%{name}.rpm
+%files -f ../%{name}.files
%defattr(-,root,root,-)
%attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq
%attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/log/rabbitmq
diff --git a/packaging/macports/Makefile b/packaging/macports/Makefile
index d5633955b9..0ef7dd5e73 100644
--- a/packaging/macports/Makefile
+++ b/packaging/macports/Makefile
@@ -35,7 +35,7 @@ macports: dirs $(DEST)/Portfile
for f in rabbitmq-asroot-script-wrapper rabbitmq-script-wrapper ; do \
cp $(COMMON_DIR)/$$f $(DEST)/files ; \
done
- sed -i -e 's|@SU_RABBITMQ_SH_C@|sudo -E -u rabbitmq -H /bin/sh -c|' \
+ sed -i -e 's|@SU_RABBITMQ_SH_C@|SHELL=/bin/sh su -m rabbitmq -c|' \
$(DEST)/files/rabbitmq-script-wrapper
cp patch-org.macports.rabbitmq-server.plist.diff $(DEST)/files
if [ -n "$(MACPORTS_USERHOST)" ] ; then \
diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile
index c9e818ac8b..50ce16370b 100644
--- a/packaging/windows/Makefile
+++ b/packaging/windows/Makefile
@@ -21,10 +21,13 @@ dist:
rm -rf $(SOURCE_DIR)/docs
mv $(SOURCE_DIR) $(TARGET_DIR)
- pod2text --loose rabbitmq-service.pod $(TARGET_DIR)/readme-service.txt
+ mkdir -p $(TARGET_DIR)
+ xmlto -o . xhtml-nochunks ../../docs/rabbitmq-service.xml
+ elinks -dump -no-references -no-numbering rabbitmq-service.html \
+ > $(TARGET_DIR)/readme-service.txt
todos $(TARGET_DIR)/readme-service.txt
zip -r $(TARGET_ZIP).zip $(TARGET_DIR)
- rm -rf $(TARGET_DIR)
+ rm -rf $(TARGET_DIR) rabbitmq-service.html
clean: clean_partial
rm -f rabbitmq-server-windows-*.zip
diff --git a/packaging/windows/rabbitmq-service.pod b/packaging/windows/rabbitmq-service.pod
deleted file mode 100644
index 8a2d2e5b22..0000000000
--- a/packaging/windows/rabbitmq-service.pod
+++ /dev/null
@@ -1,133 +0,0 @@
-=head1 NAME
-
-rabbitmq-service - manage RabbitMQ AMQP service
-
-=head1 SYNOPSIS
-
-rabbitmq-service.bat command
-
-=head1 DESCRIPTION
-
-RabbitMQ is an implementation of AMQP, the emerging standard for high
-performance enterprise messaging. The RabbitMQ server is a robust and
-scalable implementation of an AMQP broker.
-
-Running B<rabbitmq-service> allows the RabbitMQ broker to be run as a
-service on NT/2000/2003/XP/Vista® environments. The RabbitMQ broker
-service can be started and stopped using the Windows® services
-applet.
-
-By default the service will run in the authentication context of the
-local system account. It is therefore necessary to synchronise Erlang
-cookies between the local system account (typically
-C<C:\WINDOWS\.erlang.cookie> and the account that will be used to
-run B<rabbitmqctl>.
-
-=head1 COMMANDS
-
-=head2 help
-
-Display usage information.
-
-=head2 install
-
-Install the service. The service will not be started.
-Subsequent invocations will update the service parameters if
-relevant environment variables were modified.
-
-=head2 remove
-
-Remove the service. If the service is running then it will
-automatically be stopped before being removed. No files will be
-deleted as a consequence and B<rabbitmq-server> will remain operable.
-
-=head2 start
-
-Start the service. The service must have been correctly installed
-beforehand.
-
-=head2 stop
-
-Stop the service. The service must be running for this command to
-have any effect.
-
-=head2 disable
-
-Disable the service. This is the equivalent of setting the startup
-type to B<Disabled> using the service control panel.
-
-=head2 enable
-
-Enable the service. This is the equivalent of setting the startup
-type to B<Automatic> using the service control panel.
-
-=head1 ENVIRONMENT
-
-=head2 RABBITMQ_SERVICENAME
-
-Defaults to RabbitMQ.
-This is the location of log and database directories.
-
-=head2 RABBITMQ_BASE
-
-Defaults to the application data directory of the current user.
-This is the location of log and database directories.
-
-=head2 RABBITMQ_NODENAME
-
-Defaults to "rabbit". This can be useful if you want to run more than
-one node per machine - B<RABBITMQ_NODENAME> should be unique per
-erlang-node-and-machine combination. See clustering on a single
-machine guide at
-L<http://www.rabbitmq.com/clustering.html#single-machine> for details.
-
-=head2 RABBITMQ_NODE_IP_ADDRESS
-
-Defaults to "0.0.0.0". This can be changed if you only want to bind
-to one network interface.
-
-=head2 RABBITMQ_NODE_PORT
-
-Defaults to 5672.
-
-=head2 ERLANG_SERVICE_MANAGER_PATH
-
-Defaults to F<C:\Program Files\erl5.5.5\erts-5.5.5\bin>
-(or F<C:\Program Files (x86)\erl5.5.5\erts-5.5.5\bin> for 64-bit
-environments). This is the installation location of the Erlang service
-manager.
-
-=head2 CLUSTER_CONFIG_FILE
-
-If this file is present it is used by the server to
-auto-configure a RabbitMQ cluster. See the clustering guide
-at L<http://www.rabbitmq.com/clustering.html> for details.
-
-=head2 RABBITMQ_CONSOLE_LOG
-
-Set this varable to B<new> or B<reuse> to have the console
-output from the server redirected to a file named B<SERVICENAME>.debug
-in the application data directory of the user that installed the service.
-Under Vista this will be F<C:\Documents and Settings\User\AppData\username\SERVICENAME>.
-Under previous versions of Windows this will be
-F<C:\Documents and Settings\username\Application Data\SERVICENAME>.
-If B<RABBITMQ_CONSOLE_LOG> is set to B<new> then a new file will be
-created each time the service starts. If B<RABBITMQ_CONSOLE_LOG> is
-set to B<reuse> then the file will be overwritten each time the
-service starts. The default behaviour when B<RABBITMQ_CONSOLE_LOG> is
-not set or set to a value other than B<new> or B<reuse> is to discard
-the server output.
-
-=head1 EXAMPLES
-
-Start a previously-installed RabbitMQ AMQP service:
-
- rabbitmq-service start
-
-=head1 AUTHOR
-
-The RabbitMQ Team <info@rabbitmq.com>
-
-=head1 REFERENCES
-
-RabbitMQ Web Site: http://www.rabbitmq.com
diff --git a/src/pg_local.erl b/src/pg_local.erl
index fa41fe46b3..1501331d6b 100644
--- a/src/pg_local.erl
+++ b/src/pg_local.erl
@@ -206,7 +206,7 @@ ensure_started() ->
case whereis(?MODULE) of
undefined ->
C = {pg_local, {?MODULE, start_link, []}, permanent,
- 1000, worker, [?MODULE]},
+ 16#ffffffff, worker, [?MODULE]},
supervisor:start_child(kernel_safe_sup, C);
PgLocalPid ->
{ok, PgLocalPid}
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 35d3ce4a5a..259ac0401a 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -51,20 +51,39 @@
-rabbit_boot_step({database,
[{mfa, {rabbit_mnesia, init, []}},
- {enables, kernel_ready}]}).
+ {enables, external_infrastructure}]}).
+
+-rabbit_boot_step({worker_pool,
+ [{description, "worker pool"},
+ {mfa, {rabbit_sup, start_child, [worker_pool_sup]}},
+ {enables, external_infrastructure}]}).
+
+-rabbit_boot_step({external_infrastructure,
+ [{description, "external infrastructure ready"}]}).
+
+-rabbit_boot_step({rabbit_exchange_type_registry,
+ [{description, "exchange type registry"},
+ {mfa, {rabbit_sup, start_child,
+ [rabbit_exchange_type_registry]}},
+ {enables, kernel_ready},
+ {requires, external_infrastructure}]}).
-rabbit_boot_step({rabbit_log,
[{description, "logging server"},
- {mfa, {rabbit_sup, start_child, [rabbit_log]}},
- {enables, kernel_ready}]}).
+ {mfa, {rabbit_sup, start_restartable_child,
+ [rabbit_log]}},
+ {enables, kernel_ready},
+ {requires, external_infrastructure}]}).
-rabbit_boot_step({rabbit_hooks,
[{description, "internal event notification system"},
{mfa, {rabbit_hooks, start, []}},
- {enables, kernel_ready}]}).
+ {enables, kernel_ready},
+ {requires, external_infrastructure}]}).
-rabbit_boot_step({kernel_ready,
- [{description, "kernel ready"}]}).
+ [{description, "kernel ready"},
+ {requires, external_infrastructure}]}).
-rabbit_boot_step({rabbit_alarm,
[{description, "alarm handler"},
@@ -72,23 +91,18 @@
{requires, kernel_ready},
{enables, core_initialized}]}).
--rabbit_boot_step({rabbit_amqqueue_sup,
- [{description, "queue supervisor"},
- {mfa, {rabbit_amqqueue, start, []}},
- {requires, kernel_ready},
- {enables, core_initialized}]}).
-
-rabbit_boot_step({rabbit_router,
[{description, "cluster router"},
- {mfa, {rabbit_sup, start_child, [rabbit_router]}},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [rabbit_router]}},
{requires, kernel_ready},
{enables, core_initialized}]}).
-rabbit_boot_step({rabbit_node_monitor,
[{description, "node monitor"},
- {mfa, {rabbit_sup, start_child, [rabbit_node_monitor]}},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [rabbit_node_monitor]}},
{requires, kernel_ready},
- {requires, rabbit_amqqueue_sup},
{enables, core_initialized}]}).
-rabbit_boot_step({core_initialized,
@@ -104,18 +118,20 @@
{mfa, {rabbit_exchange, recover, []}},
{requires, empty_db_check}]}).
--rabbit_boot_step({queue_recovery,
- [{description, "queue recovery"},
- {mfa, {rabbit_amqqueue, recover, []}},
- {requires, exchange_recovery}]}).
+-rabbit_boot_step({queue_sup_queue_recovery,
+ [{description, "queue supervisor and queue recovery"},
+ {mfa, {rabbit_amqqueue, start, []}},
+ {requires, empty_db_check}]}).
-rabbit_boot_step({persister,
- [{mfa, {rabbit_sup, start_child, [rabbit_persister]}},
- {requires, queue_recovery}]}).
+ [{mfa, {rabbit_sup, start_child,
+ [rabbit_persister]}},
+ {requires, queue_sup_queue_recovery}]}).
-rabbit_boot_step({guid_generator,
[{description, "guid generator"},
- {mfa, {rabbit_sup, start_child, [rabbit_guid]}},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [rabbit_guid]}},
{requires, persister},
{enables, routing_ready}]}).
@@ -187,15 +203,12 @@ stop() ->
ok = rabbit_misc:stop_applications(?APPS).
stop_and_halt() ->
- spawn(fun () ->
- SleepTime = 1000,
- rabbit_log:info("Stop-and-halt request received; "
- "halting in ~p milliseconds~n",
- [SleepTime]),
- timer:sleep(SleepTime),
- init:stop()
- end),
- case catch stop() of _ -> ok end.
+ try
+ stop()
+ after
+ init:stop()
+ end,
+ ok.
status() ->
[{running_applications, application:which_applications()}] ++
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 3b9eeec18a..7e96d9a3a8 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -57,10 +57,9 @@ start() ->
ok = alarm_handler:add_alarm_handler(?MODULE, []),
{ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
ok = case MemoryWatermark == 0 of
- true ->
- ok;
- false ->
- rabbit_sup:start_child(vm_memory_monitor, [MemoryWatermark])
+ true -> ok;
+ false -> rabbit_sup:start_restartable_child(vm_memory_monitor,
+ [MemoryWatermark])
end,
ok.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 3f25d72e4f..e13dd9edb5 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,7 @@
-module(rabbit_amqqueue).
--export([start/0, recover/0, declare/4, delete/3, purge/1]).
+-export([start/0, declare/4, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
@@ -40,7 +40,7 @@
-export([consumers/1, consumers_all/1]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
--export([notify_sent/2, unblock/2]).
+-export([notify_sent/2, unblock/2, flush_all/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
@@ -63,7 +63,6 @@
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-spec(start/0 :: () -> 'ok').
--spec(recover/0 :: () -> 'ok').
-spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) ->
amqqueue()).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
@@ -98,7 +97,7 @@
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), boolean()) ->
- {'ok', non_neg_integer(), msg()} | 'empty').
+ {'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(basic_consume/8 ::
(amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(),
boolean(), any()) ->
@@ -107,6 +106,7 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
+-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
@@ -117,45 +117,47 @@
%%----------------------------------------------------------------------------
start() ->
+ DurableQueues = find_durable_queues(),
{ok,_} = supervisor:start_child(
rabbit_sup,
{rabbit_amqqueue_sup,
{rabbit_amqqueue_sup, start_link, []},
transient, infinity, supervisor, [rabbit_amqqueue_sup]}),
+ _RealDurableQueues = recover_durable_queues(DurableQueues),
ok.
-recover() ->
- ok = recover_durable_queues(),
- ok.
-
-recover_durable_queues() ->
+find_durable_queues() ->
Node = node(),
- lists:foreach(
- fun (RecoveredQ) ->
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
+ <- mnesia:table(rabbit_durable_queue),
+ node(Pid) == Node]))
+ end).
+
+recover_durable_queues(DurableQueues) ->
+ lists:foldl(
+ fun (RecoveredQ, Acc) ->
Q = start_queue_process(RecoveredQ),
%% We need to catch the case where a client connected to
%% another node has deleted the queue (and possibly
%% re-created it).
case rabbit_misc:execute_mnesia_transaction(
- fun () -> case mnesia:match_object(
- rabbit_durable_queue, RecoveredQ, read) of
- [_] -> ok = store_queue(Q),
- true;
- [] -> false
- end
+ fun () ->
+ case mnesia:match_object(
+ rabbit_durable_queue, RecoveredQ,
+ read) of
+ [_] -> ok = store_queue(Q),
+ true;
+ [] -> false
+ end
end) of
- true -> ok;
- false -> exit(Q#amqqueue.pid, shutdown)
+ true -> [Q | Acc];
+ false -> exit(Q#amqqueue.pid, shutdown),
+ Acc
end
- end,
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(rabbit_durable_queue),
- node(Pid) == Node]))
- end)),
- ok.
+ end, [], DurableQueues).
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,
@@ -284,7 +286,7 @@ requeue(QPid, MsgIds, ChPid) ->
gen_server2:cast(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
- gen_server2:pcast(QPid, 8, {ack, Txn, MsgIds, ChPid}).
+ gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
commit_all(QPids, Txn) ->
safe_pmap_ok(
@@ -329,39 +331,53 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
infinity).
notify_sent(QPid, ChPid) ->
- gen_server2:pcast(QPid, 8, {notify_sent, ChPid}).
+ gen_server2:pcast(QPid, 7, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
- gen_server2:pcast(QPid, 8, {unblock, ChPid}).
+ gen_server2:pcast(QPid, 7, {unblock, ChPid}).
+
+flush_all(QPids, ChPid) ->
+ safe_pmap_ok(
+ fun (_) -> ok end,
+ fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end,
+ QPids).
internal_delete(QueueName) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_queue, QueueName}) of
- [] -> {error, not_found};
- [_] ->
- ok = rabbit_exchange:delete_queue_bindings(QueueName),
- ok = mnesia:delete({rabbit_queue, QueueName}),
- ok = mnesia:delete({rabbit_durable_queue, QueueName}),
- ok
- end
- end).
+ case
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_queue, QueueName}) of
+ [] -> {error, not_found};
+ [_] ->
+ ok = mnesia:delete({rabbit_queue, QueueName}),
+ ok = mnesia:delete({rabbit_durable_queue, QueueName}),
+ %% we want to execute some things, as
+ %% decided by rabbit_exchange, after the
+ %% transaction.
+ rabbit_exchange:delete_queue_bindings(QueueName)
+ end
+ end) of
+ Err = {error, _} -> Err;
+ PostHook ->
+ PostHook(),
+ ok
+ end.
on_node_down(Node) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:fold(
- fun (QueueName, Acc) ->
- ok = rabbit_exchange:delete_transient_queue_bindings(
- QueueName),
- ok = mnesia:delete({rabbit_queue, QueueName}),
- Acc
- end,
- ok,
- qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid}
- <- mnesia:table(rabbit_queue),
- node(Pid) == Node]))
- end).
+ [Hook() ||
+ Hook <- rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ qlc:e(qlc:q([delete_queue(QueueName) ||
+ #amqqueue{name = QueueName, pid = Pid}
+ <- mnesia:table(rabbit_queue),
+ node(Pid) == Node]))
+ end)],
+ ok.
+
+delete_queue(QueueName) ->
+ Post = rabbit_exchange:delete_transient_queue_bindings(QueueName),
+ ok = mnesia:delete({rabbit_queue, QueueName}),
+ Post.
pseudo_queue(QueueName, Pid) ->
#amqqueue{name = QueueName,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e4791f9524..ba41f55030 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -36,8 +36,6 @@
-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
--define(HIBERNATE_AFTER_MIN, 1000).
--define(DESIRED_HIBERNATE, 10000).
-export([start_link/1, info_keys/0]).
@@ -376,7 +374,7 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
-persist_message(_Txn, _QName, #basic_message{persistent_key = none}) ->
+persist_message(_Txn, _QName, #basic_message{is_persistent = false}) ->
ok;
persist_message(Txn, QName, Message) ->
M = Message#basic_message{
@@ -384,29 +382,28 @@ persist_message(Txn, QName, Message) ->
content = rabbit_binary_parser:clear_decoded_content(
Message#basic_message.content)},
persist_work(Txn, QName,
- [{publish, M, {QName, M#basic_message.persistent_key}}]).
+ [{publish, M, {QName, M#basic_message.guid}}]).
persist_delivery(_QName, _Message,
true) ->
ok;
-persist_delivery(_QName, #basic_message{persistent_key = none},
+persist_delivery(_QName, #basic_message{is_persistent = false},
_IsDelivered) ->
ok;
-persist_delivery(QName, #basic_message{persistent_key = PKey},
+persist_delivery(QName, #basic_message{guid = Guid},
_IsDelivered) ->
- persist_work(none, QName, [{deliver, {QName, PKey}}]).
+ persist_work(none, QName, [{deliver, {QName, Guid}}]).
persist_acks(Txn, QName, Messages) ->
persist_work(Txn, QName,
- [{ack, {QName, PKey}} ||
- #basic_message{persistent_key = PKey} <- Messages,
- PKey =/= none]).
+ [{ack, {QName, Guid}} || #basic_message{
+ guid = Guid, is_persistent = true} <- Messages]).
-persist_auto_ack(_QName, #basic_message{persistent_key = none}) ->
+persist_auto_ack(_QName, #basic_message{is_persistent = false}) ->
ok;
-persist_auto_ack(QName, #basic_message{persistent_key = PKey}) ->
+persist_auto_ack(QName, #basic_message{guid = Guid}) ->
%% auto-acks are always non-transactional
- rabbit_persister:dirty_work([{ack, {QName, PKey}}]).
+ rabbit_persister:dirty_work([{ack, {QName, Guid}}]).
persist_work(_Txn,_QName, []) ->
ok;
@@ -826,7 +823,11 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
end,
NewLimited = Limited andalso LimiterPid =/= undefined,
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
- end)).
+ end));
+
+handle_cast({flush, ChPid}, State) ->
+ ok = rabbit_channel:flushed(ChPid, self()),
+ noreply(State).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 9ebb6e72e0..4ab7a2a0b1 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -36,6 +36,7 @@
-export([publish/1, message/4, properties/1, delivery/4]).
-export([publish/4, publish/7]).
-export([build_content/2, from_content/1]).
+-export([is_message_persistent/1]).
%%----------------------------------------------------------------------------
@@ -48,7 +49,7 @@
-spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) ->
delivery()).
-spec(message/4 :: (exchange_name(), routing_key(), properties_input(),
- binary()) -> message()).
+ binary()) -> (message() | {'error', any()})).
-spec(properties/1 :: (properties_input()) -> amqp_properties()).
-spec(publish/4 :: (exchange_name(), routing_key(), properties_input(),
binary()) -> publish_result()).
@@ -57,6 +58,8 @@
publish_result()).
-spec(build_content/2 :: (amqp_properties(), binary()) -> content()).
-spec(from_content/1 :: (content()) -> {amqp_properties(), binary()}).
+-spec(is_message_persistent/1 ::
+ (decoded_content()) -> (boolean() | {'invalid', non_neg_integer()})).
-endif.
@@ -93,10 +96,17 @@ from_content(Content) ->
message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
Properties = properties(RawProperties),
- #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKeyBin,
- content = build_content(Properties, BodyBin),
- persistent_key = none}.
+ Content = build_content(Properties, BodyBin),
+ case is_message_persistent(Content) of
+ {invalid, Other} ->
+ {error, {invalid_delivery_mode, Other}};
+ IsPersistent when is_boolean(IsPersistent) ->
+ #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKeyBin,
+ content = Content,
+ guid = rabbit_guid:guid(),
+ is_persistent = IsPersistent}
+ end.
properties(P = #'P_basic'{}) ->
P;
@@ -130,3 +140,12 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties,
publish(delivery(Mandatory, Immediate, Txn,
message(ExchangeName, RoutingKeyBin,
properties(Properties), BodyBin))).
+
+is_message_persistent(#content{properties = #'P_basic'{
+ delivery_mode = Mode}}) ->
+ case Mode of
+ 1 -> false;
+ 2 -> true;
+ undefined -> false;
+ Other -> {invalid, Other}
+ end.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 585c59dc9b..9aeb4623f1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1,4 +1,4 @@
-%% The contents of this file are subject to the Mozilla Public Licenses
+%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
@@ -36,7 +36,7 @@
-behaviour(gen_server2).
-export([start_link/5, do/2, do/3, shutdown/1]).
--export([send_command/2, deliver/4, conserve_memory/2]).
+-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([init/1, terminate/2, code_change/3,
@@ -45,11 +45,8 @@
-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
- username, virtual_host,
- most_recently_declared_queue, consumer_mapping}).
-
--define(HIBERNATE_AFTER_MIN, 1000).
--define(DESIRED_HIBERNATE, 10000).
+ username, virtual_host, most_recently_declared_queue,
+ consumer_mapping, blocking}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -75,8 +72,9 @@
-spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
--spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok').
+-spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
+-spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (pid()) -> [info()]).
@@ -110,7 +108,10 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
conserve_memory(Pid, Conserve) ->
- gen_server2:pcast(Pid, 9, {conserve_memory, Conserve}).
+ gen_server2:pcast(Pid, 8, {conserve_memory, Conserve}).
+
+flushed(Pid, QPid) ->
+ gen_server2:cast(Pid, {flushed, QPid}).
list() ->
pg_local:get_members(rabbit_channels).
@@ -152,7 +153,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
username = Username,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- consumer_mapping = dict:new()},
+ consumer_mapping = dict:new(),
+ blocking = dict:new()},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -190,6 +192,9 @@ handle_cast({method, Method, Content}, State) ->
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
+handle_cast({flushed, QPid}, State) ->
+ {noreply, queue_blocked(QPid, State)};
+
handle_cast(terminate, State) ->
{stop, normal, State};
@@ -215,7 +220,9 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
{stop, normal, State};
handle_info({'EXIT', _Pid, Reason}, State) ->
- {stop, Reason, State}.
+ {stop, Reason, State};
+handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
+ {noreply, queue_blocked(QPid, State)}.
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
@@ -331,6 +338,20 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) ->
check_name(_Kind, NameBin) ->
NameBin.
+queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
+ case dict:find(QPid, Blocking) of
+ error -> State;
+ {ok, MRef} -> true = erlang:demonitor(MRef),
+ Blocking1 = dict:erase(QPid, Blocking),
+ ok = case dict:size(Blocking1) of
+ 0 -> rabbit_writer:send_command(
+ State#ch.writer_pid,
+ #'channel.flow_ok'{active = false});
+ _ -> ok
+ end,
+ State#ch{blocking = Blocking1}
+ end.
+
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -362,14 +383,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
- PersistentKey = case is_message_persistent(DecodedContent) of
- true -> rabbit_guid:guid();
- false -> none
- end,
+ IsPersistent = is_message_persistent(DecodedContent),
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = DecodedContent,
- persistent_key = PersistentKey},
+ guid = rabbit_guid:guid(),
+ is_persistent = IsPersistent},
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
@@ -540,25 +559,17 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
"prefetch_size!=0 (~w)", [Size]);
handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
- _, State = #ch{ limiter_pid = LimiterPid,
- unacked_message_q = UAMQ }) ->
- NewLimiterPid = case {LimiterPid, PrefetchCount} of
- {undefined, 0} ->
- undefined;
- {undefined, _} ->
- LPid = rabbit_limiter:start_link(self(),
- queue:len(UAMQ)),
- ok = limit_queues(LPid, State),
- LPid;
- {_, 0} ->
- ok = rabbit_limiter:shutdown(LimiterPid),
- ok = limit_queues(undefined, State),
- undefined;
- {_, _} ->
- LimiterPid
- end,
- ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount),
- {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}};
+ _, State = #ch{limiter_pid = LimiterPid}) ->
+ LimiterPid1 = case {LimiterPid, PrefetchCount} of
+ {undefined, 0} -> undefined;
+ {undefined, _} -> start_limiter(State);
+ {_, _} -> LimiterPid
+ end,
+ LimiterPid2 = case rabbit_limiter:limit(LimiterPid1, PrefetchCount) of
+ ok -> LimiterPid1;
+ stopped -> unlimit_queues(State)
+ end,
+ {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}};
handle_method(#'basic.recover'{requeue = true},
_, State = #ch{ transaction_id = none,
@@ -791,9 +802,31 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) ->
handle_method(#'tx.rollback'{}, _, State) ->
{reply, #'tx.rollback_ok'{}, internal_rollback(State)};
-handle_method(#'channel.flow'{active = _}, _, State) ->
- %% FIXME: implement
- {reply, #'channel.flow_ok'{active = true}, State};
+handle_method(#'channel.flow'{active = true}, _,
+ State = #ch{limiter_pid = LimiterPid}) ->
+ LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of
+ ok -> LimiterPid;
+ stopped -> unlimit_queues(State)
+ end,
+ {reply, #'channel.flow_ok'{active = true},
+ State#ch{limiter_pid = LimiterPid1}};
+
+handle_method(#'channel.flow'{active = false}, _,
+ State = #ch{limiter_pid = LimiterPid,
+ consumer_mapping = Consumers}) ->
+ LimiterPid1 = case LimiterPid of
+ undefined -> start_limiter(State);
+ Other -> Other
+ end,
+ ok = rabbit_limiter:block(LimiterPid1),
+ QPids = consumer_queues(Consumers),
+ Queues = [{QPid, erlang:monitor(process, QPid)} || QPid <- QPids],
+ ok = rabbit_amqqueue:flush_all(QPids, self()),
+ case Queues of
+ [] -> {reply, #'channel.flow_ok'{active = false}, State};
+ _ -> {noreply, State#ch{limiter_pid = LimiterPid1,
+ blocking = dict:from_list(Queues)}}
+ end;
handle_method(#'channel.flow_ok'{active = _}, _, State) ->
%% TODO: We may want to correlate this to channel.flow messages we
@@ -928,21 +961,27 @@ fold_per_queue(F, Acc0, UAQ) ->
D = rabbit_misc:queue_fold(
fun ({_DTag, _CTag,
{_QName, QPid, MsgId, _Redelivered, _Message}}, D) ->
- %% dict:append would be simpler and avoid the
- %% lists:reverse in handle_message({recover, true},
- %% ...). However, it is significantly slower when
- %% going beyond a few thousand elements.
- dict:update(QPid,
- fun (MsgIds) -> [MsgId | MsgIds] end,
- [MsgId],
- D)
+ %% dict:append would avoid the lists:reverse in
+ %% handle_message({recover, true}, ...). However, it
+ %% is significantly slower when going beyond a few
+ %% thousand elements.
+ rabbit_misc:dict_cons(QPid, MsgId, D)
end, dict:new(), UAQ),
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
+start_limiter(State = #ch{unacked_message_q = UAMQ}) ->
+ LPid = rabbit_limiter:start_link(self(), queue:len(UAMQ)),
+ ok = limit_queues(LPid, State),
+ LPid.
+
notify_queues(#ch{consumer_mapping = Consumers}) ->
rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()).
+unlimit_queues(State) ->
+ ok = limit_queues(undefined, State),
+ undefined.
+
limit_queues(LPid, #ch{consumer_mapping = Consumers}) ->
rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid).
@@ -972,16 +1011,15 @@ notify_limiter(LimiterPid, Acked) ->
Count -> rabbit_limiter:ack(LimiterPid, Count)
end.
-is_message_persistent(#content{properties = #'P_basic'{
- delivery_mode = Mode}}) ->
- case Mode of
- 1 -> false;
- 2 -> true;
- undefined -> false;
- Other -> rabbit_log:warning("Unknown delivery mode ~p - "
- "treating as 1, non-persistent~n",
- [Other]),
- false
+is_message_persistent(Content) ->
+ case rabbit_basic:is_message_persistent(Content) of
+ {invalid, Other} ->
+ rabbit_log:warning("Unknown delivery mode ~p - "
+ "treating as 1, non-persistent~n",
+ [Other]),
+ false;
+ IsPersistent when is_boolean(IsPersistent) ->
+ IsPersistent
end.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 6aac442888..d1834b3b73 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -46,6 +46,7 @@
-spec(stop/0 :: () -> 'ok').
-spec(action/4 :: (atom(), erlang_node(), [string()],
fun ((string(), [any()]) -> 'ok')) -> 'ok').
+-spec(usage/0 :: () -> no_return()).
-endif.
@@ -130,86 +131,7 @@ stop() ->
ok.
usage() ->
- io:format("Usage: rabbitmqctl [-q] [-n <node>] <command> [<arg> ...]
-
-Available commands:
-
- stop - stops the RabbitMQ application and halts the node
- stop_app - stops the RabbitMQ application, leaving the node running
- start_app - starts the RabbitMQ application on an already-running node
- reset - resets node to default configuration, deleting all data
- force_reset
- cluster <ClusterNode> ...
- status
- rotate_logs [Suffix]
- close_connection <ConnectionPid> <ExplanationString>
-
- add_user <UserName> <Password>
- delete_user <UserName>
- change_password <UserName> <NewPassword>
- list_users
-
- add_vhost <VHostPath>
- delete_vhost <VHostPath>
- list_vhosts
-
- set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> <Regexp>
- clear_permissions [-p <VHostPath>] <UserName>
- list_permissions [-p <VHostPath>]
- list_user_permissions <UserName>
-
- list_queues [-p <VHostPath>] [<QueueInfoItem> ...]
- list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...]
- list_bindings [-p <VHostPath>]
- list_connections [<ConnectionInfoItem> ...]
- list_channels [<ChannelInfoItem> ...]
- list_consumers [-p <VHostPath>]
-
-Quiet output mode is selected with the \"-q\" flag. Informational
-messages are suppressed when quiet mode is in effect.
-
-<node> should be the name of the master node of the RabbitMQ
-cluster. It defaults to the node named \"rabbit\" on the local
-host. On a host named \"server.example.com\", the master node will
-usually be rabbit@server (unless RABBITMQ_NODENAME has been set to
-some non-default value at broker startup time). The output of hostname
--s is usually the correct suffix to use after the \"@\" sign.
-
-The list_queues, list_exchanges and list_bindings commands accept an
-optional virtual host parameter for which to display results. The
-default value is \"/\".
-
-<QueueInfoItem> must be a member of the list [name, durable,
-auto_delete, arguments, pid, owner_pid, exclusive_consumer_pid,
-exclusive_consumer_tag, messages_ready, messages_unacknowledged,
-messages_uncommitted, messages, acks_uncommitted, consumers,
-transactions, memory]. The default is to display name and (number of)
-messages.
-
-<ExchangeInfoItem> must be a member of the list [name, type, durable,
-auto_delete, arguments]. The default is to display name and type.
-
-The output format for \"list_bindings\" is a list of rows containing
-exchange name, queue name, routing key and arguments, in that order.
-
-<ConnectionInfoItem> must be a member of the list [pid, address, port,
-peer_address, peer_port, state, channels, user, vhost, timeout,
-frame_max, client_properties, recv_oct, recv_cnt, send_oct, send_cnt,
-send_pend]. The default is to display user, peer_address, peer_port
-and state.
-
-<ChannelInfoItem> must be a member of the list [pid, connection,
-number, user, vhost, transactional, consumer_count,
-messages_unacknowledged, acks_uncommitted, prefetch_count]. The
-default is to display pid, user, transactional, consumer_count,
-messages_unacknowledged.
-
-The output format for \"list_consumers\" is a list of rows containing,
-in order, the queue name, channel process id, consumer tag, and a
-boolean indicating whether acknowledgements are expected from the
-consumer.
-
-"),
+ io:format("~s", [rabbit_ctl_usage:usage()]),
halt(1).
action(stop, Node, [], Inform) ->
diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl
index 078cf620f4..f19e8d025a 100644
--- a/src/rabbit_dialyzer.erl
+++ b/src/rabbit_dialyzer.erl
@@ -38,9 +38,9 @@
-ifdef(use_specs).
--spec(create_basic_plt/1 :: (string()) -> 'ok').
--spec(add_to_plt/2 :: (string(), string()) -> 'ok').
--spec(dialyze_files/2 :: (string(), string()) -> 'ok').
+-spec(create_basic_plt/1 :: (file_path()) -> 'ok').
+-spec(add_to_plt/2 :: (file_path(), string()) -> 'ok').
+-spec(dialyze_files/2 :: (file_path(), string()) -> 'ok').
-spec(halt_with_code/1 :: (atom()) -> no_return()).
-endif.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 832acd16c4..1cfba00eb2 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -30,7 +30,6 @@
%%
-module(rabbit_exchange).
--include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -40,7 +39,7 @@
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
--export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]).
+-export([check_type/1, assert_type/2]).
%% EXTENDED API
-export([list_exchange_bindings/1]).
@@ -49,7 +48,6 @@
-import(mnesia).
-import(sets).
-import(lists).
--import(qlc).
-import(regexp).
%%----------------------------------------------------------------------------
@@ -82,10 +80,8 @@
bind_res() | {'error', 'binding_not_found'}).
-spec(list_bindings/1 :: (vhost()) ->
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
--spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok').
--spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok').
--spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
--spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()).
+-spec(delete_queue_bindings/1 :: (queue_name()) -> fun(() -> none())).
+-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> fun(() -> none())).
-spec(delete/2 :: (exchange_name(), boolean()) ->
'ok' | not_found() | {'error', 'in_use'}).
-spec(list_queue_bindings/1 :: (queue_name()) ->
@@ -100,17 +96,37 @@
-define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
recover() ->
- ok = rabbit_misc:table_foreach(
- fun(Exchange) -> ok = mnesia:write(rabbit_exchange,
- Exchange, write)
- end, rabbit_durable_exchange),
- ok = rabbit_misc:table_foreach(
- fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route),
- ok = mnesia:write(rabbit_route,
- Route, write),
- ok = mnesia:write(rabbit_reverse_route,
- ReverseRoute, write)
- end, rabbit_durable_route).
+ Exs = rabbit_misc:table_fold(
+ fun(Exchange, Acc) ->
+ ok = mnesia:write(rabbit_exchange, Exchange, write),
+ [Exchange | Acc]
+ end, [], rabbit_durable_exchange),
+ Bs = rabbit_misc:table_fold(
+ fun(Route = #route{binding = B}, Acc) ->
+ {_, ReverseRoute} = route_with_reverse(Route),
+ ok = mnesia:write(rabbit_route,
+ Route, write),
+ ok = mnesia:write(rabbit_reverse_route,
+ ReverseRoute, write),
+ [B | Acc]
+ end, [], rabbit_durable_route),
+ recover_with_bindings(Bs, Exs),
+ ok.
+
+recover_with_bindings(Bs, Exs) ->
+ recover_with_bindings(
+ lists:keysort(#binding.exchange_name, Bs),
+ lists:keysort(#exchange.name, Exs), []).
+
+recover_with_bindings([B = #binding{exchange_name = Name} | Rest],
+ Xs = [#exchange{name = Name} | _],
+ Bindings) ->
+ recover_with_bindings(Rest, Xs, [B | Bindings]);
+recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) ->
+ (type_to_module(Type)):recover(X, Bindings),
+ recover_with_bindings(Bs, Xs, []);
+recover_with_bindings([], [], []) ->
+ ok.
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange = #exchange{name = ExchangeName,
@@ -118,31 +134,53 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
durable = Durable,
auto_delete = AutoDelete,
arguments = Args},
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_exchange, ExchangeName}) of
- [] -> ok = mnesia:write(rabbit_exchange, Exchange, write),
- if Durable ->
- ok = mnesia:write(rabbit_durable_exchange,
- Exchange, write);
- true -> ok
- end,
- Exchange;
- [ExistingX] -> ExistingX
- end
- end).
+ %% We want to upset things if it isn't ok; this is different from
+ %% the other hooks invocations, where we tend to ignore the return
+ %% value.
+ TypeModule = type_to_module(Type),
+ ok = TypeModule:validate(Exchange),
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_exchange, ExchangeName}) of
+ [] ->
+ ok = mnesia:write(rabbit_exchange, Exchange, write),
+ ok = case Durable of
+ true ->
+ mnesia:write(rabbit_durable_exchange,
+ Exchange, write);
+ false ->
+ ok
+ end,
+ {new, Exchange};
+ [ExistingX] ->
+ {existing, ExistingX}
+ end
+ end) of
+ {new, X} -> TypeModule:create(X),
+ X;
+ {existing, X} -> X;
+ Err -> Err
+ end.
-check_type(<<"fanout">>) ->
- fanout;
-check_type(<<"direct">>) ->
- direct;
-check_type(<<"topic">>) ->
- topic;
-check_type(<<"headers">>) ->
- headers;
-check_type(T) ->
- rabbit_misc:protocol_error(
- command_invalid, "invalid exchange type '~s'", [T]).
+%% Used with atoms from records; e.g., the type is expected to exist.
+type_to_module(T) ->
+ case rabbit_exchange_type_registry:lookup_module(T) of
+ {ok, Module} -> Module;
+ {error, not_found} -> rabbit_misc:protocol_error(
+ command_invalid,
+ "invalid exchange type '~s'", [T])
+ end.
+
+%% Used with binaries sent over the wire; the type may not exist.
+check_type(TypeBin) ->
+ case rabbit_exchange_type_registry:binary_to_type(TypeBin) of
+ {error, not_found} ->
+ rabbit_misc:protocol_error(
+ command_invalid, "unknown exchange type '~s'", [TypeBin]);
+ T ->
+ _Module = type_to_module(T),
+ T
+ end.
assert_type(#exchange{ type = ActualType }, RequiredType)
when ActualType == RequiredType ->
@@ -157,7 +195,7 @@ lookup(Name) ->
lookup_or_die(Name) ->
case lookup(Name) of
- {ok, X} -> X;
+ {ok, X} -> X;
{error, not_found} -> rabbit_misc:not_found(Name)
end.
@@ -193,9 +231,8 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
publish(X, Delivery) ->
publish(X, [], Delivery).
-publish(X, Seen, Delivery = #delivery{
- message = #basic_message{routing_key = RK, content = C}}) ->
- case rabbit_router:deliver(route(X, RK, C), Delivery) of
+publish(X = #exchange{type = Type}, Seen, Delivery) ->
+ case (type_to_module(Type)):publish(X, Delivery) of
{_, []} = R ->
#exchange{name = XName, arguments = Args} = X,
case rabbit_misc:r_arg(XName, exchange, Args,
@@ -205,95 +242,24 @@ publish(X, Seen, Delivery = #delivery{
AName ->
NewSeen = [XName | Seen],
case lists:member(AName, NewSeen) of
- true ->
- R;
- false ->
- case lookup(AName) of
- {ok, AX} ->
- publish(AX, NewSeen, Delivery);
- {error, not_found} ->
- rabbit_log:warning(
- "alternate exchange for ~s "
- "does not exist: ~s",
- [rabbit_misc:rs(XName),
- rabbit_misc:rs(AName)]),
- R
- end
+ true -> R;
+ false -> case lookup(AName) of
+ {ok, AX} ->
+ publish(AX, NewSeen, Delivery);
+ {error, not_found} ->
+ rabbit_log:warning(
+ "alternate exchange for ~s "
+ "does not exist: ~s",
+ [rabbit_misc:rs(XName),
+ rabbit_misc:rs(AName)]),
+ R
+ end
end
end;
R ->
R
end.
-%% return the list of qpids to which a message with a given routing
-%% key, sent to a particular exchange, should be delivered.
-%%
-%% The function ensures that a qpid appears in the return list exactly
-%% as many times as a message should be delivered to it. With the
-%% current exchange types that is at most once.
-route(X = #exchange{type = topic}, RoutingKey, _Content) ->
- match_bindings(X, fun (#binding{key = BindingKey}) ->
- topic_matches(BindingKey, RoutingKey)
- end);
-
-route(X = #exchange{type = headers}, _RoutingKey, Content) ->
- Headers = case (Content#content.properties)#'P_basic'.headers of
- undefined -> [];
- H -> sort_arguments(H)
- end,
- match_bindings(X, fun (#binding{args = Spec}) ->
- headers_match(Spec, Headers)
- end);
-
-route(X = #exchange{type = fanout}, _RoutingKey, _Content) ->
- match_routing_key(X, '_');
-
-route(X = #exchange{type = direct}, RoutingKey, _Content) ->
- match_routing_key(X, RoutingKey).
-
-sort_arguments(Arguments) ->
- lists:keysort(1, Arguments).
-
-%% TODO: Maybe this should be handled by a cursor instead.
-%% TODO: This causes a full scan for each entry with the same exchange
-match_bindings(#exchange{name = Name}, Match) ->
- Query = qlc:q([QName || #route{binding = Binding = #binding{
- exchange_name = ExchangeName,
- queue_name = QName}} <-
- mnesia:table(rabbit_route),
- ExchangeName == Name,
- Match(Binding)]),
- lookup_qpids(
- try
- mnesia:async_dirty(fun qlc:e/1, [Query])
- catch exit:{aborted, {badarg, _}} ->
- %% work around OTP-7025, which was fixed in R12B-1, by
- %% falling back on a less efficient method
- [QName || #route{binding = Binding = #binding{
- queue_name = QName}} <-
- mnesia:dirty_match_object(
- rabbit_route,
- #route{binding = #binding{exchange_name = Name,
- _ = '_'}}),
- Match(Binding)]
- end).
-
-match_routing_key(#exchange{name = Name}, RoutingKey) ->
- MatchHead = #route{binding = #binding{exchange_name = Name,
- queue_name = '$1',
- key = RoutingKey,
- _ = '_'}},
- lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
-
-lookup_qpids(Queues) ->
- sets:fold(
- fun(Key, Acc) ->
- case mnesia:dirty_read({rabbit_queue, Key}) of
- [#amqqueue{pid = QPid}] -> [QPid | Acc];
- [] -> Acc
- end
- end, [], sets:from_list(Queues)).
-
%% TODO: Should all of the route and binding management not be
%% refactored to its own module, especially seeing as unbind will have
%% to be implemented for 0.91 ?
@@ -302,13 +268,13 @@ delete_exchange_bindings(ExchangeName) ->
[begin
ok = mnesia:delete_object(rabbit_reverse_route,
reverse_route(Route), write),
- ok = delete_forward_routes(Route)
+ ok = delete_forward_routes(Route),
+ Route#route.binding
end || Route <- mnesia:match_object(
rabbit_route,
#route{binding = #binding{exchange_name = ExchangeName,
_ = '_'}},
- write)],
- ok.
+ write)].
delete_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, fun delete_forward_routes/1).
@@ -317,21 +283,55 @@ delete_transient_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
delete_queue_bindings(QueueName, FwdDeleteFun) ->
- Exchanges = exchanges_for_queue(QueueName),
- [begin
- ok = FwdDeleteFun(reverse_route(Route)),
- ok = mnesia:delete_object(rabbit_reverse_route, Route, write)
- end || Route <- mnesia:match_object(
- rabbit_reverse_route,
- reverse_route(
- #route{binding = #binding{queue_name = QueueName,
- _ = '_'}}),
- write)],
- [begin
- [X] = mnesia:read({rabbit_exchange, ExchangeName}),
- ok = maybe_auto_delete(X)
- end || ExchangeName <- Exchanges],
- ok.
+ DeletedBindings =
+ [begin
+ Route = reverse_route(ReverseRoute),
+ ok = FwdDeleteFun(Route),
+ ok = mnesia:delete_object(rabbit_reverse_route,
+ ReverseRoute, write),
+ Route#route.binding
+ end || ReverseRoute
+ <- mnesia:match_object(
+ rabbit_reverse_route,
+ reverse_route(#route{binding = #binding{
+ queue_name = QueueName,
+ _ = '_'}}),
+ write)],
+ Cleanup = cleanup_deleted_queue_bindings(
+ lists:keysort(#binding.exchange_name, DeletedBindings), []),
+ fun () ->
+ lists:foreach(
+ fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) ->
+ Module = type_to_module(Type),
+ case IsDeleted of
+ auto_deleted -> Module:delete(X, Bs);
+ no_delete -> Module:remove_bindings(X, Bs)
+ end
+ end, Cleanup)
+ end.
+
+%% Requires that its input binding list is sorted in exchange-name
+%% order, so that the grouping of bindings (for passing to
+%% cleanup_deleted_queue_bindings1) works properly.
+cleanup_deleted_queue_bindings([], Acc) ->
+ Acc;
+cleanup_deleted_queue_bindings(
+ [B = #binding{exchange_name = ExchangeName} | Bs], Acc) ->
+ cleanup_deleted_queue_bindings(ExchangeName, Bs, [B], Acc).
+
+cleanup_deleted_queue_bindings(
+ ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs],
+ Bindings, Acc) ->
+ cleanup_deleted_queue_bindings(ExchangeName, Bs, [B | Bindings], Acc);
+cleanup_deleted_queue_bindings(ExchangeName, Deleted, Bindings, Acc) ->
+ %% either Deleted is [], or its head has a non-matching ExchangeName
+ NewAcc = [cleanup_deleted_queue_bindings1(ExchangeName, Bindings) | Acc],
+ cleanup_deleted_queue_bindings(Deleted, NewAcc).
+
+cleanup_deleted_queue_bindings1(ExchangeName, Bindings) ->
+ [X] = mnesia:read({rabbit_exchange, ExchangeName}),
+ {maybe_auto_delete(X), Bindings}.
+
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
@@ -340,15 +340,6 @@ delete_forward_routes(Route) ->
delete_transient_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write).
-exchanges_for_queue(QueueName) ->
- MatchHead = reverse_route(
- #route{binding = #binding{exchange_name = '$1',
- queue_name = QueueName,
- _ = '_'}}),
- sets:to_list(
- sets:from_list(
- mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))).
-
contains(Table, MatchHead) ->
try
continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read))
@@ -385,37 +376,61 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
end).
add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
- binding_action(
- ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
- if Q#amqqueue.durable and not(X#exchange.durable) ->
- {error, durability_settings_incompatible};
- true -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:write/3)
- end
- end).
+ case binding_action(
+ ExchangeName, QueueName, RoutingKey, Arguments,
+ fun (X, Q, B) ->
+ if Q#amqqueue.durable and not(X#exchange.durable) ->
+ {error, durability_settings_incompatible};
+ true ->
+ case mnesia:read(rabbit_route, B) of
+ [] ->
+ sync_binding(B, Q#amqqueue.durable,
+ fun mnesia:write/3),
+ {new, X, B};
+ [_R] ->
+ {existing, X, B}
+ end
+ end
+ end) of
+ {new, Exchange = #exchange{ type = Type }, Binding} ->
+ (type_to_module(Type)):add_binding(Exchange, Binding);
+ {existing, _, _} ->
+ ok;
+ Err = {error, _} ->
+ Err
+ end.
delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
- binding_action(
- ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
- case mnesia:match_object(rabbit_route, #route{binding = B},
- write) of
- [] -> {error, binding_not_found};
- _ -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:delete_object/3),
- maybe_auto_delete(X)
- end
- end).
+ case binding_action(
+ ExchangeName, QueueName, RoutingKey, Arguments,
+ fun (X, Q, B) ->
+ case mnesia:match_object(rabbit_route, #route{binding = B},
+ write) of
+ [] -> {error, binding_not_found};
+ _ -> ok = sync_binding(B, Q#amqqueue.durable,
+ fun mnesia:delete_object/3),
+ {maybe_auto_delete(X), B}
+ end
+ end) of
+ Err = {error, _} ->
+ Err;
+ {{Action, X = #exchange{ type = Type }}, B} ->
+ Module = type_to_module(Type),
+ case Action of
+ auto_delete -> Module:delete(X, [B]);
+ no_delete -> Module:remove_bindings(X, [B])
+ end
+ end.
binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) ->
call_with_exchange_and_queue(
ExchangeName, QueueName,
fun (X, Q) ->
- Fun(X, Q, #binding{exchange_name = ExchangeName,
- queue_name = QueueName,
- key = RoutingKey,
- args = sort_arguments(Arguments)})
+ Fun(X, Q, #binding{
+ exchange_name = ExchangeName,
+ queue_name = QueueName,
+ key = RoutingKey,
+ args = rabbit_misc:sort_field_table(Arguments)})
end).
sync_binding(Binding, Durable, Fun) ->
@@ -440,8 +455,8 @@ list_bindings(VHostPath) ->
rabbit_route,
#route{binding = #binding{
exchange_name = rabbit_misc:r(VHostPath, exchange),
- _ = '_'},
- _ = '_'})].
+ _ = '_'},
+ _ = '_'})].
route_with_reverse(#route{binding = Binding}) ->
route_with_reverse(Binding);
@@ -456,136 +471,60 @@ reverse_route(#reverse_route{reverse_binding = Binding}) ->
#route{binding = reverse_binding(Binding)}.
reverse_binding(#reverse_binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args}) ->
+ queue_name = Queue,
+ key = Key,
+ args = Args}) ->
#binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args};
+ queue_name = Queue,
+ key = Key,
+ args = Args};
reverse_binding(#binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args}) ->
+ queue_name = Queue,
+ key = Key,
+ args = Args}) ->
#reverse_binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args}.
-
-default_headers_match_kind() -> all.
-
-parse_x_match(<<"all">>) -> all;
-parse_x_match(<<"any">>) -> any;
-parse_x_match(Other) ->
- rabbit_log:warning("Invalid x-match field value ~p; expected all or any",
- [Other]),
- default_headers_match_kind().
-
-%% Horrendous matching algorithm. Depends for its merge-like
-%% (linear-time) behaviour on the lists:keysort (sort_arguments) that
-%% route/3 and {add,delete}_binding/4 do.
-%%
-%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
-%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
-%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
-%%
-headers_match(Pattern, Data) ->
- MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
- {value, {_, longstr, MK}} -> parse_x_match(MK);
- {value, {_, Type, MK}} ->
- rabbit_log:warning("Invalid x-match field type ~p "
- "(value ~p); expected longstr",
- [Type, MK]),
- default_headers_match_kind();
- _ -> default_headers_match_kind()
- end,
- headers_match(Pattern, Data, true, false, MatchKind).
-
-headers_match([], _Data, AllMatch, _AnyMatch, all) ->
- AllMatch;
-headers_match([], _Data, _AllMatch, AnyMatch, any) ->
- AnyMatch;
-headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data,
- AllMatch, AnyMatch, MatchKind) ->
- headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind);
-headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) ->
- headers_match([], [], false, AnyMatch, MatchKind);
-headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest],
- AllMatch, AnyMatch, MatchKind) when PK > DK ->
- headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind);
-headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _],
- _AllMatch, AnyMatch, MatchKind) when PK < DK ->
- headers_match(PRest, Data, false, AnyMatch, MatchKind);
-headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
- AllMatch, AnyMatch, MatchKind) when PK == DK ->
- {AllMatch1, AnyMatch1} =
- if
- %% It's not properly specified, but a "no value" in a
- %% pattern field is supposed to mean simple presence of
- %% the corresponding data field. I've interpreted that to
- %% mean a type of "void" for the pattern field.
- PT == void -> {AllMatch, true};
- %% Similarly, it's not specified, but I assume that a
- %% mismatched type causes a mismatched value.
- PT =/= DT -> {false, AnyMatch};
- PV == DV -> {AllMatch, true};
- true -> {false, AnyMatch}
- end,
- headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
-
-split_topic_key(Key) ->
- {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
- KeySplit.
-
-topic_matches(PatternKey, RoutingKey) ->
- P = split_topic_key(PatternKey),
- R = split_topic_key(RoutingKey),
- topic_matches1(P, R).
-
-topic_matches1(["#"], _R) ->
- true;
-topic_matches1(["#" | PTail], R) ->
- last_topic_match(PTail, [], lists:reverse(R));
-topic_matches1([], []) ->
- true;
-topic_matches1(["*" | PatRest], [_ | ValRest]) ->
- topic_matches1(PatRest, ValRest);
-topic_matches1([PatElement | PatRest], [ValElement | ValRest]) when PatElement == ValElement ->
- topic_matches1(PatRest, ValRest);
-topic_matches1(_, _) ->
- false.
-
-last_topic_match(P, R, []) ->
- topic_matches1(P, R);
-last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
- topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList).
-
-delete(ExchangeName, _IfUnused = true) ->
- call_with_exchange(ExchangeName, fun conditional_delete/1);
-delete(ExchangeName, _IfUnused = false) ->
- call_with_exchange(ExchangeName, fun unconditional_delete/1).
-
-maybe_auto_delete(#exchange{auto_delete = false}) ->
- ok;
+ queue_name = Queue,
+ key = Key,
+ args = Args}.
+
+delete(ExchangeName, IfUnused) ->
+ Fun = case IfUnused of
+ true -> fun conditional_delete/1;
+ false -> fun unconditional_delete/1
+ end,
+ case call_with_exchange(ExchangeName, Fun) of
+ {deleted, X = #exchange{type = Type}, Bs} ->
+ (type_to_module(Type)):delete(X, Bs),
+ ok;
+ Error = {error, _InUseOrNotFound} ->
+ Error
+ end.
+
+maybe_auto_delete(Exchange = #exchange{auto_delete = false}) ->
+ {no_delete, Exchange};
maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
- conditional_delete(Exchange),
- ok.
+ case conditional_delete(Exchange) of
+ {error, in_use} -> {no_delete, Exchange};
+ {deleted, Exchange, []} -> {auto_deleted, Exchange}
+ end.
conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
%% we need to check for durable routes here too in case a bunch of
%% routes to durable queues have been removed temporarily as a
%% result of a node failure
- case contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match) of
+ case contains(rabbit_route, Match) orelse
+ contains(rabbit_durable_route, Match) of
false -> unconditional_delete(Exchange);
true -> {error, in_use}
end.
-unconditional_delete(#exchange{name = ExchangeName}) ->
- ok = delete_exchange_bindings(ExchangeName),
+unconditional_delete(Exchange = #exchange{name = ExchangeName}) ->
+ Bindings = delete_exchange_bindings(ExchangeName),
ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
- ok = mnesia:delete({rabbit_exchange, ExchangeName}).
+ ok = mnesia:delete({rabbit_exchange, ExchangeName}),
+ {deleted, Exchange, Bindings}.
%%----------------------------------------------------------------------------
%% EXTENDED API
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
new file mode 100644
index 0000000000..a8c071e681
--- /dev/null
+++ b/src/rabbit_exchange_type.erl
@@ -0,0 +1,61 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_exchange_type).
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ {description, 0},
+ {publish, 2},
+
+ %% called BEFORE declaration, to check args etc; may exit with #amqp_error{}
+ {validate, 1},
+
+ %% called after declaration when previously absent
+ {create, 1},
+
+ %% called when recovering
+ {recover, 2},
+
+ %% called after exchange deletion.
+ {delete, 2},
+
+ %% called after a binding has been added
+ {add_binding, 2},
+
+ %% called after bindings have been deleted.
+ {remove_bindings, 2}
+
+ ];
+behaviour_info(_Other) ->
+ undefined.
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
new file mode 100644
index 0000000000..9b71e0e1d1
--- /dev/null
+++ b/src/rabbit_exchange_type_direct.erl
@@ -0,0 +1,63 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_exchange_type_direct).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_exchange_type).
+
+-export([description/0, publish/2]).
+-export([validate/1, create/1, recover/2, delete/2,
+ add_binding/2, remove_bindings/2]).
+-include("rabbit_exchange_type_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "exchange type direct"},
+ {mfa, {rabbit_exchange_type_registry, register,
+ [<<"direct">>, ?MODULE]}},
+ {requires, rabbit_exchange_type_registry},
+ {enables, kernel_ready}]}).
+
+description() ->
+ [{name, <<"direct">>},
+ {description, <<"AMQP direct exchange, as per the AMQP specification">>}].
+
+publish(#exchange{name = Name}, Delivery =
+ #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
+ rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey),
+ Delivery).
+
+validate(_X) -> ok.
+create(_X) -> ok.
+recover(_X, _Bs) -> ok.
+delete(_X, _Bs) -> ok.
+add_binding(_X, _B) -> ok.
+remove_bindings(_X, _Bs) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
new file mode 100644
index 0000000000..311654ab21
--- /dev/null
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -0,0 +1,61 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_exchange_type_fanout).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_exchange_type).
+
+-export([description/0, publish/2]).
+-export([validate/1, create/1, recover/2, delete/2,
+ add_binding/2, remove_bindings/2]).
+-include("rabbit_exchange_type_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "exchange type fanout"},
+ {mfa, {rabbit_exchange_type_registry, register,
+ [<<"fanout">>, ?MODULE]}},
+ {requires, rabbit_exchange_type_registry},
+ {enables, kernel_ready}]}).
+
+description() ->
+ [{name, <<"fanout">>},
+ {description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
+
+publish(#exchange{name = Name}, Delivery) ->
+ rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery).
+
+validate(_X) -> ok.
+create(_X) -> ok.
+recover(_X, _Bs) -> ok.
+delete(_X, _Bs) -> ok.
+add_binding(_X, _B) -> ok.
+remove_bindings(_X, _Bs) -> ok.
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
new file mode 100644
index 0000000000..285dab1a03
--- /dev/null
+++ b/src/rabbit_exchange_type_headers.erl
@@ -0,0 +1,137 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_exchange_type_headers).
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+
+-behaviour(rabbit_exchange_type).
+
+-export([description/0, publish/2]).
+-export([validate/1, create/1, recover/2, delete/2,
+ add_binding/2, remove_bindings/2]).
+-include("rabbit_exchange_type_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "exchange type headers"},
+ {mfa, {rabbit_exchange_type_registry, register,
+ [<<"headers">>, ?MODULE]}},
+ {requires, rabbit_exchange_type_registry},
+ {enables, kernel_ready}]}).
+
+-ifdef(use_specs).
+-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()).
+-endif.
+
+description() ->
+ [{name, <<"headers">>},
+ {description, <<"AMQP headers exchange, as per the AMQP specification">>}].
+
+publish(#exchange{name = Name},
+ Delivery = #delivery{message = #basic_message{content = Content}}) ->
+ Headers = case (Content#content.properties)#'P_basic'.headers of
+ undefined -> [];
+ H -> rabbit_misc:sort_field_table(H)
+ end,
+ rabbit_router:deliver(rabbit_router:match_bindings(
+ Name, fun (#binding{args = Spec}) ->
+ headers_match(Spec, Headers)
+ end),
+ Delivery).
+
+default_headers_match_kind() -> all.
+
+parse_x_match(<<"all">>) -> all;
+parse_x_match(<<"any">>) -> any;
+parse_x_match(Other) ->
+ rabbit_log:warning("Invalid x-match field value ~p; expected all or any",
+ [Other]),
+ default_headers_match_kind().
+
+%% Horrendous matching algorithm. Depends for its merge-like
+%% (linear-time) behaviour on the lists:keysort
+%% (rabbit_misc:sort_field_table) that route/3 and
+%% rabbit_exchange:{add,delete}_binding/4 do.
+%%
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%%
+headers_match(Pattern, Data) ->
+ MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
+ {value, {_, longstr, MK}} -> parse_x_match(MK);
+ {value, {_, Type, MK}} ->
+ rabbit_log:warning("Invalid x-match field type ~p "
+ "(value ~p); expected longstr",
+ [Type, MK]),
+ default_headers_match_kind();
+ _ -> default_headers_match_kind()
+ end,
+ headers_match(Pattern, Data, true, false, MatchKind).
+
+headers_match([], _Data, AllMatch, _AnyMatch, all) ->
+ AllMatch;
+headers_match([], _Data, _AllMatch, AnyMatch, any) ->
+ AnyMatch;
+headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data,
+ AllMatch, AnyMatch, MatchKind) ->
+ headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind);
+headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) ->
+ headers_match([], [], false, AnyMatch, MatchKind);
+headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK > DK ->
+ headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind);
+headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _],
+ _AllMatch, AnyMatch, MatchKind) when PK < DK ->
+ headers_match(PRest, Data, false, AnyMatch, MatchKind);
+headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK == DK ->
+ {AllMatch1, AnyMatch1} =
+ if
+ %% It's not properly specified, but a "no value" in a
+ %% pattern field is supposed to mean simple presence of
+ %% the corresponding data field. I've interpreted that to
+ %% mean a type of "void" for the pattern field.
+ PT == void -> {AllMatch, true};
+ %% Similarly, it's not specified, but I assume that a
+ %% mismatched type causes a mismatched value.
+ PT =/= DT -> {false, AnyMatch};
+ PV == DV -> {AllMatch, true};
+ true -> {false, AnyMatch}
+ end,
+ headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
+
+validate(_X) -> ok.
+create(_X) -> ok.
+recover(_X, _Bs) -> ok.
+delete(_X, _Bs) -> ok.
+add_binding(_X, _B) -> ok.
+remove_bindings(_X, _Bs) -> ok.
diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_exchange_type_registry.erl
new file mode 100644
index 0000000000..175d15ad83
--- /dev/null
+++ b/src/rabbit_exchange_type_registry.erl
@@ -0,0 +1,129 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_exchange_type_registry).
+
+-behaviour(gen_server).
+
+-export([start_link/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([register/2, binary_to_type/1, lookup_module/1]).
+
+-define(SERVER, ?MODULE).
+-define(ETS_NAME, ?MODULE).
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> 'ignore' | {'error', term()} | {'ok', pid()}).
+-spec(register/2 :: (binary(), atom()) -> 'ok').
+-spec(binary_to_type/1 :: (binary()) -> atom() | {'error', 'not_found'}).
+-spec(lookup_module/1 :: (atom()) -> {'ok', atom()} | {'error', 'not_found'}).
+
+-endif.
+
+%%---------------------------------------------------------------------------
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+%%---------------------------------------------------------------------------
+
+register(TypeName, ModuleName) ->
+ gen_server:call(?SERVER, {register, TypeName, ModuleName}).
+
+%% This is used with user-supplied arguments (e.g., on exchange
+%% declare), so we restrict it to existing atoms only. This means it
+%% can throw a badarg, indicating that the type cannot have been
+%% registered.
+binary_to_type(TypeBin) when is_binary(TypeBin) ->
+ case catch list_to_existing_atom(binary_to_list(TypeBin)) of
+ {'EXIT', {badarg, _}} -> {error, not_found};
+ TypeAtom -> TypeAtom
+ end.
+
+lookup_module(T) when is_atom(T) ->
+ case ets:lookup(?ETS_NAME, T) of
+ [{_, Module}] ->
+ {ok, Module};
+ [] ->
+ {error, not_found}
+ end.
+
+%%---------------------------------------------------------------------------
+
+internal_binary_to_type(TypeBin) when is_binary(TypeBin) ->
+ list_to_atom(binary_to_list(TypeBin)).
+
+internal_register(TypeName, ModuleName)
+ when is_binary(TypeName), is_atom(ModuleName) ->
+ ok = sanity_check_module(ModuleName),
+ true = ets:insert(?ETS_NAME,
+ {internal_binary_to_type(TypeName), ModuleName}),
+ ok.
+
+sanity_check_module(Module) ->
+ case catch lists:member(rabbit_exchange_type,
+ lists:flatten(
+ [Bs || {Attr, Bs} <-
+ Module:module_info(attributes),
+ Attr =:= behavior orelse
+ Attr =:= behaviour])) of
+ {'EXIT', {undef, _}} -> {error, not_module};
+ false -> {error, not_exchange_type};
+ true -> ok
+ end.
+
+%%---------------------------------------------------------------------------
+
+init([]) ->
+ ?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]),
+ {ok, none}.
+
+handle_call({register, TypeName, ModuleName}, _From, State) ->
+ ok = internal_register(TypeName, ModuleName),
+ {reply, ok, State};
+handle_call(Request, _From, State) ->
+ {stop, {unhandled_call, Request}, State}.
+
+handle_cast(Request, State) ->
+ {stop, {unhandled_cast, Request}, State}.
+
+handle_info(Message, State) ->
+ {stop, {unhandled_info, Message}, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
new file mode 100644
index 0000000000..8a3dceeaeb
--- /dev/null
+++ b/src/rabbit_exchange_type_topic.erl
@@ -0,0 +1,101 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_exchange_type_topic).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_exchange_type).
+
+-export([description/0, publish/2]).
+-export([validate/1, create/1, recover/2, delete/2,
+ add_binding/2, remove_bindings/2]).
+-include("rabbit_exchange_type_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "exchange type topic"},
+ {mfa, {rabbit_exchange_type_registry, register,
+ [<<"topic">>, ?MODULE]}},
+ {requires, rabbit_exchange_type_registry},
+ {enables, kernel_ready}]}).
+
+-export([topic_matches/2]).
+
+-ifdef(use_specs).
+-spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
+-endif.
+
+description() ->
+ [{name, <<"topic">>},
+ {description, <<"AMQP topic exchange, as per the AMQP specification">>}].
+
+publish(#exchange{name = Name}, Delivery =
+ #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
+ rabbit_router:deliver(rabbit_router:match_bindings(
+ Name, fun (#binding{key = BindingKey}) ->
+ topic_matches(BindingKey, RoutingKey)
+ end),
+ Delivery).
+
+split_topic_key(Key) ->
+ {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
+ KeySplit.
+
+topic_matches(PatternKey, RoutingKey) ->
+ P = split_topic_key(PatternKey),
+ R = split_topic_key(RoutingKey),
+ topic_matches1(P, R).
+
+topic_matches1(["#"], _R) ->
+ true;
+topic_matches1(["#" | PTail], R) ->
+ last_topic_match(PTail, [], lists:reverse(R));
+topic_matches1([], []) ->
+ true;
+topic_matches1(["*" | PatRest], [_ | ValRest]) ->
+ topic_matches1(PatRest, ValRest);
+topic_matches1([PatElement | PatRest], [ValElement | ValRest])
+ when PatElement == ValElement ->
+ topic_matches1(PatRest, ValRest);
+topic_matches1(_, _) ->
+ false.
+
+last_topic_match(P, R, []) ->
+ topic_matches1(P, R);
+last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
+ topic_matches1(P, R) or
+ last_topic_match(P, [BacktrackNext | R], BacktrackList).
+
+validate(_X) -> ok.
+create(_X) -> ok.
+recover(_X, _Bs) -> ok.
+delete(_X, _Bs) -> ok.
+add_binding(_X, _B) -> ok.
+remove_bindings(_X, _Bs) -> ok.
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index c9f8183fc9..878af02976 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -37,7 +37,7 @@
handle_info/2]).
-export([start_link/2, shutdown/1]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
--export([get_limit/1]).
+-export([get_limit/1, block/1, unblock/1]).
%%----------------------------------------------------------------------------
@@ -47,12 +47,14 @@
-spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
--spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
+-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped').
-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()).
+-spec(block/1 :: (maybe_pid()) -> 'ok').
+-spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped').
-endif.
@@ -60,6 +62,7 @@
-record(lim, {prefetch_count = 0,
ch_pid,
+ blocked = false,
queues = dict:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
%% 'Notify' is a boolean that indicates whether a queue should be
@@ -77,13 +80,14 @@ start_link(ChPid, UnackedMsgCount) ->
shutdown(undefined) ->
ok;
shutdown(LimiterPid) ->
- unlink(LimiterPid),
+ true = unlink(LimiterPid),
gen_server2:cast(LimiterPid, shutdown).
limit(undefined, 0) ->
ok;
limit(LimiterPid, PrefetchCount) ->
- gen_server2:cast(LimiterPid, {limit, PrefetchCount}).
+ unlink_on_stopped(LimiterPid,
+ gen_server2:call(LimiterPid, {limit, PrefetchCount})).
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
@@ -113,6 +117,17 @@ get_limit(Pid) ->
fun () -> 0 end,
fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end).
+block(undefined) ->
+ ok;
+block(LimiterPid) ->
+ gen_server2:call(LimiterPid, block, infinity).
+
+unblock(undefined) ->
+ ok;
+unblock(LimiterPid) ->
+ unlink_on_stopped(LimiterPid,
+ gen_server2:call(LimiterPid, unblock, infinity)).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -120,29 +135,45 @@ get_limit(Pid) ->
init([ChPid, UnackedMsgCount]) ->
{ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}.
+handle_call({can_send, _QPid, _AckRequired}, _From,
+ State = #lim{blocked = true}) ->
+ {reply, false, State};
handle_call({can_send, QPid, AckRequired}, _From,
State = #lim{volume = Volume}) ->
case limit_reached(State) of
true -> {reply, false, limit_queue(QPid, State)};
- false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
- true -> Volume
- end}}
+ false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
+ true -> Volume
+ end}}
end;
handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
- {reply, PrefetchCount, State}.
+ {reply, PrefetchCount, State};
+
+handle_call({limit, PrefetchCount}, _From, State) ->
+ case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of
+ {cont, State1} -> {reply, ok, State1};
+ {stop, State1} -> {stop, normal, stopped, State1}
+ end;
+
+handle_call(block, _From, State) ->
+ {reply, ok, State#lim{blocked = true}};
+
+handle_call(unblock, _From, State) ->
+ case maybe_notify(State, State#lim{blocked = false}) of
+ {cont, State1} -> {reply, ok, State1};
+ {stop, State1} -> {stop, normal, stopped, State1}
+ end.
handle_cast(shutdown, State) ->
{stop, normal, State};
-handle_cast({limit, PrefetchCount}, State) ->
- {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})};
-
handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
true -> Volume - Count
end,
- {noreply, maybe_notify(State, State#lim{volume = NewVolume})};
+ {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}),
+ {noreply, State1};
handle_cast({register, QPid}, State) ->
{noreply, remember_queue(QPid, State)};
@@ -164,14 +195,21 @@ code_change(_, State, _) ->
%%----------------------------------------------------------------------------
maybe_notify(OldState, NewState) ->
- case limit_reached(OldState) andalso not(limit_reached(NewState)) of
- true -> notify_queues(NewState);
- false -> NewState
+ case (limit_reached(OldState) orelse is_blocked(OldState)) andalso
+ not (limit_reached(NewState) orelse is_blocked(NewState)) of
+ true -> NewState1 = notify_queues(NewState),
+ {case NewState1#lim.prefetch_count of
+ 0 -> stop;
+ _ -> cont
+ end, NewState1};
+ false -> {cont, NewState}
end.
limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
Limit =/= 0 andalso Volume >= Limit.
+is_blocked(#lim{blocked = Blocked}) -> Blocked.
+
remember_queue(QPid, State = #lim{queues = Queues}) ->
case dict:is_key(QPid, Queues) of
false -> MRef = erlang:monitor(process, QPid),
@@ -209,3 +247,9 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
ok
end,
State#lim{queues = NewQueues}.
+
+unlink_on_stopped(LimiterPid, stopped) ->
+ ok = rabbit_misc:unlink_and_capture_exit(LimiterPid),
+ stopped;
+unlink_on_stopped(_LimiterPid, Result) ->
+ Result.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 0654f58ae7..028b0d73ea 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -49,15 +49,17 @@
-export([ensure_ok/2]).
-export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
-export([intersperse/2, upmap/2, map_in_order/2]).
--export([table_foreach/2]).
+-export([table_fold/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([read_term_file/1, write_term_file/2]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1, queue_fold/3]).
+-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
+-export([recursive_delete/1, dict_cons/3, unlink_and_capture_exit/1]).
-import(mnesia).
-import(lists).
@@ -96,8 +98,8 @@
-spec(rs/1 :: (r(atom())) -> string()).
-spec(enable_cover/0 :: () -> ok_or_error()).
-spec(report_cover/0 :: () -> 'ok').
--spec(enable_cover/1 :: (string()) -> ok_or_error()).
--spec(report_cover/1 :: (string()) -> 'ok').
+-spec(enable_cover/1 :: (file_path()) -> ok_or_error()).
+-spec(report_cover/1 :: (file_path()) -> 'ok').
-spec(throw_on_error/2 ::
(atom(), thunk({error, any()} | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
@@ -114,23 +116,31 @@
-spec(intersperse/2 :: (A, [A]) -> [A]).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
--spec(table_foreach/2 :: (fun ((any()) -> any()), atom()) -> 'ok').
+-spec(table_fold/3 :: (fun ((any(), A) -> A), A, atom()) -> A).
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) ->
'ok' | 'aborted').
--spec(dirty_dump_log/1 :: (string()) -> ok_or_error()).
--spec(read_term_file/1 :: (string()) -> {'ok', [any()]} | {'error', any()}).
--spec(write_term_file/2 :: (string(), [any()]) -> ok_or_error()).
--spec(append_file/2 :: (string(), string()) -> ok_or_error()).
+-spec(dirty_dump_log/1 :: (file_path()) -> ok_or_error()).
+-spec(read_term_file/1 :: (file_path()) -> {'ok', [any()]} | {'error', any()}).
+-spec(write_term_file/2 :: (file_path(), [any()]) -> ok_or_error()).
+-spec(append_file/2 :: (file_path(), string()) -> ok_or_error()).
-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(start_applications/1 :: ([atom()]) -> 'ok').
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
--spec(ceil/1 :: (number()) -> number()).
+-spec(ceil/1 :: (number()) -> integer()).
-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B).
+-spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()).
-spec(pid_to_string/1 :: (pid()) -> string()).
-spec(string_to_pid/1 :: (string()) -> pid()).
+-spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt').
+-spec(version_compare/3 :: (string(), string(),
+ ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) -> boolean()).
+-spec(recursive_delete/1 :: ([file_path()]) ->
+ 'ok' | {'error', {file_path(), any()}}).
+-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
+-spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok').
-endif.
@@ -305,7 +315,7 @@ execute_mnesia_transaction(TxFun) ->
%% Making this a sync_transaction allows us to use dirty_read
%% elsewhere and get a consistent result even when that read
%% executes on a different node.
- case mnesia:sync_transaction(TxFun) of
+ case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of
{atomic, Result} -> Result;
{aborted, Reason} -> throw({error, Reason})
end.
@@ -357,20 +367,20 @@ map_in_order(F, L) ->
lists:reverse(
lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)).
-%% For each entry in a table, execute a function in a transaction.
-%% This is often far more efficient than wrapping a tx around the lot.
+%% Fold over each entry in a table, executing the cons function in a
+%% transaction. This is often far more efficient than wrapping a tx
+%% around the lot.
%%
%% We ignore entries that have been modified or removed.
-table_foreach(F, TableName) ->
- lists:foreach(
- fun (E) -> execute_mnesia_transaction(
+table_fold(F, Acc0, TableName) ->
+ lists:foldl(
+ fun (E, Acc) -> execute_mnesia_transaction(
fun () -> case mnesia:match_object(TableName, E, read) of
- [] -> ok;
- _ -> F(E)
+ [] -> Acc;
+ _ -> F(E, Acc)
end
end)
- end, dirty_read_all(TableName)),
- ok.
+ end, Acc0, dirty_read_all(TableName)).
dirty_read_all(TableName) ->
mnesia:dirty_select(TableName, [{'$1',[],['$1']}]).
@@ -504,6 +514,10 @@ queue_fold(Fun, Init, Q) ->
{{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
end.
+%% Sorts a list of AMQP table fields as per the AMQP spec
+sort_field_table(Arguments) ->
+ lists:keysort(1, Arguments).
+
%% This provides a string representation of a pid that is the same
%% regardless of what node we are running on. The representation also
%% permits easy identification of the pid's node.
@@ -595,3 +609,46 @@ version_compare(A, B) ->
ANum < BNum -> lt;
ANum > BNum -> gt
end.
+
+recursive_delete(Files) ->
+ lists:foldl(fun (Path, ok ) -> recursive_delete1(Path);
+ (_Path, {error, _Err} = Error) -> Error
+ end, ok, Files).
+
+recursive_delete1(Path) ->
+ case filelib:is_dir(Path) of
+ false -> case file:delete(Path) of
+ ok -> ok;
+ {error, enoent} -> ok; %% Path doesn't exist anyway
+ {error, Err} -> {error, {Path, Err}}
+ end;
+ true -> case file:list_dir(Path) of
+ {ok, FileNames} ->
+ case lists:foldl(
+ fun (FileName, ok) ->
+ recursive_delete1(
+ filename:join(Path, FileName));
+ (_FileName, Error) ->
+ Error
+ end, ok, FileNames) of
+ ok ->
+ case file:del_dir(Path) of
+ ok -> ok;
+ {error, Err} -> {error, {Path, Err}}
+ end;
+ {error, _Err} = Error ->
+ Error
+ end;
+ {error, Err} ->
+ {error, {Path, Err}}
+ end
+ end.
+
+dict_cons(Key, Value, Dict) ->
+ dict:update(Key, fun (List) -> [Value | List] end, [Value], Dict).
+
+unlink_and_capture_exit(Pid) ->
+ unlink(Pid),
+ receive {'EXIT', Pid, _} -> ok
+ after 0 -> ok
+ end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 6ec3cf74b3..55a6761d2d 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -48,7 +48,7 @@
-ifdef(use_specs).
-spec(status/0 :: () -> [{'nodes' | 'running_nodes', [erlang_node()]}]).
--spec(dir/0 :: () -> string()).
+-spec(dir/0 :: () -> file_path()).
-spec(ensure_mnesia_dir/0 :: () -> 'ok').
-spec(init/0 :: () -> 'ok').
-spec(is_db_empty/0 :: () -> boolean()).
@@ -424,9 +424,8 @@ reset(Force) ->
cannot_delete_schema)
end,
ok = delete_cluster_nodes_config(),
- %% remove persistet messages and any other garbage we find
- lists:foreach(fun file:delete/1,
- filelib:wildcard(dir() ++ "/*")),
+ %% remove persisted messages and any other garbage we find
+ ok = rabbit_misc:recursive_delete(filelib:wildcard(dir() ++ "/*")),
ok.
leave_cluster([], _) -> ok;
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index 8c898498e6..336f74bf9a 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -42,6 +42,7 @@
-spec(start/0 :: () -> no_return()).
-spec(stop/0 :: () -> 'ok').
+-spec(usage/0 :: () -> no_return()).
-endif.
@@ -51,7 +52,7 @@ start() ->
RpcTimeout =
case init:get_argument(maxwait) of
{ok,[[N1]]} -> 1000 * list_to_integer(N1);
- _ -> 30000
+ _ -> ?MAX_WAIT
end,
case init:get_plain_arguments() of
[] ->
@@ -86,16 +87,8 @@ stop() ->
ok.
usage() ->
- io:format("Usage: rabbitmq-multi <command>
-
-Available commands:
-
- start_all <NodeCount> - start a local cluster of RabbitMQ nodes.
- status - print status of all running nodes
- stop_all - stops all local RabbitMQ nodes.
- rotate_logs [Suffix] - rotate logs for all local and running RabbitMQ nodes.
-"),
- halt(3).
+ io:format("~s", [rabbit_multi_usage:usage()]),
+ halt(1).
action(start_all, [NodeCount], RpcTimeout) ->
io:format("Starting all nodes...~n", []),
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index cf04f05b7e..7978573d90 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -117,15 +117,25 @@ start() ->
transient, infinity, supervisor, [tcp_client_sup]}),
ok.
+getaddr(Host) ->
+ %% inet_parse:address takes care of ip string, like "0.0.0.0"
+ %% inet:getaddr returns immediately for ip tuple {0,0,0,0},
+ %% and runs 'inet_gethost' port process for dns lookups.
+ %% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
+ case inet_parse:address(Host) of
+ {ok, IPAddress1} -> IPAddress1;
+ {error, _} ->
+ case inet:getaddr(Host, inet) of
+ {ok, IPAddress2} -> IPAddress2;
+ {error, Reason} ->
+ error_logger:error_msg("invalid host ~p - ~p~n",
+ [Host, Reason]),
+ throw({error, {invalid_host, Host, Reason}})
+ end
+ end.
+
check_tcp_listener_address(NamePrefix, Host, Port) ->
- IPAddress =
- case inet:getaddr(Host, inet) of
- {ok, IPAddress1} -> IPAddress1;
- {error, Reason} ->
- error_logger:error_msg("invalid host ~p - ~p~n",
- [Host, Reason]),
- throw({error, {invalid_host, Host, Reason}})
- end,
+ IPAddress = getaddr(Host),
if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok;
true -> error_logger:error_msg("invalid port ~p - not 0..65535~n",
[Port]),
@@ -157,7 +167,7 @@ start_listener(Host, Port, Label, OnConnect) ->
ok.
stop_tcp_listener(Host, Port) ->
- {ok, IPAddress} = inet:getaddr(Host, inet),
+ IPAddress = getaddr(Host),
Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port),
ok = supervisor:terminate_child(rabbit_sup, Name),
ok = supervisor:delete_child(rabbit_sup, Name),
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 019d2a269d..8aa5ad8d32 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -70,11 +70,11 @@
-ifdef(use_specs).
--type(qmsg() :: {amqqueue(), pkey()}).
+-type(pmsg() :: {queue_name(), pkey()}).
-type(work_item() ::
- {publish, message(), qmsg()} |
- {deliver, qmsg()} |
- {ack, qmsg()}).
+ {publish, message(), pmsg()} |
+ {deliver, pmsg()} |
+ {ack, pmsg()}).
-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(transaction/1 :: ([work_item()]) -> 'ok').
@@ -406,7 +406,10 @@ check_version(_Other) ->
requeue_messages(Snapshot = #psnapshot{messages = Messages,
queues = Queues}) ->
- Work = ets:foldl(fun accumulate_requeues/2, dict:new(), Queues),
+ Work = ets:foldl(
+ fun ({{QName, PKey}, Delivered}, Acc) ->
+ rabbit_misc:dict_cons(QName, {PKey, Delivered}, Acc)
+ end, dict:new(), Queues),
%% unstable parallel map, because order doesn't matter
L = lists:append(
rabbit_misc:upmap(
@@ -425,13 +428,6 @@ requeue_messages(Snapshot = #psnapshot{messages = Messages,
%% contains the mutated messages and queues tables
Snapshot.
-accumulate_requeues({{QName, PKey}, Delivered}, Acc) ->
- Requeue = {PKey, Delivered},
- dict:update(QName,
- fun (Requeues) -> [Requeue | Requeues] end,
- [Requeue],
- Acc).
-
requeue(QName, Requeues, Messages) ->
case rabbit_amqqueue:lookup(QName) of
{ok, #amqqueue{pid = QPid}} ->
@@ -474,12 +470,8 @@ internal_integrate_messages(Items, Snapshot) ->
internal_integrate1({extend_transaction, Key, MessageList},
Snapshot = #psnapshot {transactions = Transactions}) ->
- NewTransactions =
- dict:update(Key,
- fun (MessageLists) -> [MessageList | MessageLists] end,
- [MessageList],
- Transactions),
- Snapshot#psnapshot{transactions = NewTransactions};
+ Snapshot#psnapshot{transactions = rabbit_misc:dict_cons(Key, MessageList,
+ Transactions)};
internal_integrate1({rollback_transaction, Key},
Snapshot = #psnapshot{transactions = Transactions}) ->
Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 1a4830e11c..5cf519b795 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -39,6 +39,8 @@
-export([init/1, mainloop/3]).
+-export([server_properties/0]).
+
-export([analyze_frame/2]).
-import(gen_tcp).
@@ -133,6 +135,7 @@
-spec(info/1 :: (pid()) -> [info()]).
-spec(info/2 :: (pid(), [info_key()]) -> [info()]).
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
+-spec(server_properties/0 :: () -> amqp_table()).
-endif.
@@ -198,6 +201,16 @@ teardown_profiling(Value) ->
fprof:analyse([{dest, []}, {cols, 100}])
end.
+server_properties() ->
+ {ok, Product} = application:get_key(rabbit, id),
+ {ok, Version} = application:get_key(rabbit, vsn),
+ [{list_to_binary(K), longstr, list_to_binary(V)} ||
+ {K, V} <- [{"product", Product},
+ {"version", Version},
+ {"platform", "Erlang/OTP"},
+ {"copyright", ?COPYRIGHT_MESSAGE},
+ {"information", ?INFORMATION_MESSAGE}]].
+
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
socket_op(Sock, Fun) ->
@@ -510,21 +523,12 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
case check_version({ProtocolMajor, ProtocolMinor},
{?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of
true ->
- {ok, Product} = application:get_key(id),
- {ok, Version} = application:get_key(vsn),
ok = send_on_channel0(
Sock,
#'connection.start'{
version_major = ?PROTOCOL_VERSION_MAJOR,
version_minor = ?PROTOCOL_VERSION_MINOR,
- server_properties =
- [{list_to_binary(K), longstr, list_to_binary(V)} ||
- {K, V} <-
- [{"product", Product},
- {"version", Version},
- {"platform", "Erlang/OTP"},
- {"copyright", ?COPYRIGHT_MESSAGE},
- {"information", ?INFORMATION_MESSAGE}]],
+ server_properties = server_properties(),
mechanisms = <<"PLAIN AMQPLAIN">>,
locales = <<"en_US">> }),
{State#v1{connection = Connection#connection{
diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl
new file mode 100644
index 0000000000..06d59249bb
--- /dev/null
+++ b/src/rabbit_restartable_sup.erl
@@ -0,0 +1,47 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_restartable_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/2]).
+
+-export([init/1]).
+
+-include("rabbit.hrl").
+
+start_link(Name, {_M, _F, _A} = Fun) ->
+ supervisor:start_link({local, Name}, ?MODULE, [Fun]).
+
+init([{Mod, _F, _A} = Fun]) ->
+ {ok, {{one_for_one, 10, 10},
+ [{Mod, Fun, transient, ?MAX_WAIT, worker, [Mod]}]}}.
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index ee2b82c5bd..a449e19eb4 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -30,12 +30,15 @@
%%
-module(rabbit_router).
+-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
-behaviour(gen_server2).
-export([start_link/0,
- deliver/2]).
+ deliver/2,
+ match_bindings/2,
+ match_routing_key/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -73,13 +76,9 @@ deliver(QPids, Delivery) ->
%% which then in turn delivers it to its queues.
deliver_per_node(
dict:to_list(
- lists:foldl(
- fun (QPid, D) ->
- dict:update(node(QPid),
- fun (QPids1) -> [QPid | QPids1] end,
- [QPid], D)
- end,
- dict:new(), QPids)),
+ lists:foldl(fun (QPid, D) ->
+ rabbit_misc:dict_cons(node(QPid), QPid, D)
+ end, dict:new(), QPids)),
Delivery).
deliver_per_node([{Node, QPids}], Delivery) when Node == node() ->
@@ -129,6 +128,46 @@ deliver_per_node(NodeQPids, Delivery) ->
-endif.
+%% TODO: Maybe this should be handled by a cursor instead.
+%% TODO: This causes a full scan for each entry with the same exchange
+match_bindings(Name, Match) ->
+ Query = qlc:q([QName || #route{binding = Binding = #binding{
+ exchange_name = ExchangeName,
+ queue_name = QName}} <-
+ mnesia:table(rabbit_route),
+ ExchangeName == Name,
+ Match(Binding)]),
+ lookup_qpids(
+ try
+ mnesia:async_dirty(fun qlc:e/1, [Query])
+ catch exit:{aborted, {badarg, _}} ->
+ %% work around OTP-7025, which was fixed in R12B-1, by
+ %% falling back on a less efficient method
+ [QName || #route{binding = Binding = #binding{
+ queue_name = QName}} <-
+ mnesia:dirty_match_object(
+ rabbit_route,
+ #route{binding = #binding{exchange_name = Name,
+ _ = '_'}}),
+ Match(Binding)]
+ end).
+
+match_routing_key(Name, RoutingKey) ->
+ MatchHead = #route{binding = #binding{exchange_name = Name,
+ queue_name = '$1',
+ key = RoutingKey,
+ _ = '_'}},
+ lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
+
+lookup_qpids(Queues) ->
+ sets:fold(
+ fun(Key, Acc) ->
+ case mnesia:dirty_read({rabbit_queue, Key}) of
+ [#amqqueue{pid = QPid}] -> [QPid | Acc];
+ [] -> Acc
+ end
+ end, [], sets:from_list(Queues)).
+
%%--------------------------------------------------------------------
init([]) ->
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index a1b8948155..2c5e51125e 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -33,10 +33,13 @@
-behaviour(supervisor).
--export([start_link/0, start_child/1, start_child/2]).
+-export([start_link/0, start_child/1, start_child/2, start_child/3,
+ start_restartable_child/1, start_restartable_child/2]).
-export([init/1]).
+-include("rabbit.hrl").
+
-define(SERVER, ?MODULE).
start_link() ->
@@ -46,10 +49,25 @@ start_child(Mod) ->
start_child(Mod, []).
start_child(Mod, Args) ->
+ start_child(Mod, Mod, Args).
+
+start_child(ChildId, Mod, Args) ->
{ok, _} = supervisor:start_child(?SERVER,
- {Mod, {Mod, start_link, Args},
- transient, 100, worker, [Mod]}),
+ {ChildId, {Mod, start_link, Args},
+ transient, ?MAX_WAIT, worker, [Mod]}),
+ ok.
+
+start_restartable_child(Mod) ->
+ start_restartable_child(Mod, []).
+
+start_restartable_child(Mod, Args) ->
+ Name = list_to_atom(atom_to_list(Mod) ++ "_sup"),
+ {ok, _} = supervisor:start_child(
+ ?SERVER,
+ {Name, {rabbit_restartable_sup, start_link,
+ [Name, {Mod, start_link, Args}]},
+ transient, infinity, supervisor, [rabbit_restartable_sup]}),
ok.
init([]) ->
- {ok, {{one_for_one, 10, 10}, []}}.
+ {ok, {{one_for_all, 0, 1}, []}}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 51d95c35a3..82f2d19918 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -330,7 +330,8 @@ test_topic_match(P, R) ->
test_topic_match(P, R, true).
test_topic_match(P, R, Expected) ->
- case rabbit_exchange:topic_matches(list_to_binary(P), list_to_binary(R)) of
+ case rabbit_exchange_type_topic:topic_matches(list_to_binary(P),
+ list_to_binary(R)) of
Expected ->
passed;
_ ->
diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl
index 0fe1542616..493925efd5 100644
--- a/src/tcp_listener_sup.erl
+++ b/src/tcp_listener_sup.erl
@@ -63,4 +63,4 @@ init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
[IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, Name,
OnStartup, OnShutdown, Label]},
- transient, 100, worker, [tcp_listener]}]}}.
+ transient, 16#ffffffff, worker, [tcp_listener]}]}}.
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
new file mode 100644
index 0000000000..97e075459f
--- /dev/null
+++ b/src/worker_pool.erl
@@ -0,0 +1,155 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(worker_pool).
+
+%% Generic worker pool manager.
+%%
+%% Supports nested submission of jobs (nested jobs always run
+%% immediately in current worker process).
+%%
+%% Possible future enhancements:
+%%
+%% 1. Allow priorities (basically, change the pending queue to a
+%% priority_queue).
+
+-behaviour(gen_server2).
+
+-export([start_link/0, submit/1, submit_async/1, idle/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A).
+-spec(submit_async/1 ::
+ (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-define(SERVER, ?MODULE).
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+
+-record(state, { available, pending }).
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [],
+ [{timeout, infinity}]).
+
+submit(Fun) ->
+ case get(worker_pool_worker) of
+ true -> worker_pool_worker:run(Fun);
+ _ -> Pid = gen_server2:call(?SERVER, next_free, infinity),
+ worker_pool_worker:submit(Pid, Fun)
+ end.
+
+submit_async(Fun) ->
+ gen_server2:cast(?SERVER, {run_async, Fun}).
+
+idle(WId) ->
+ gen_server2:cast(?SERVER, {idle, WId}).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, #state { pending = queue:new(), available = queue:new() }, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call(next_free, From, State = #state { available = Avail,
+ pending = Pending }) ->
+ case queue:out(Avail) of
+ {empty, _Avail} ->
+ {noreply,
+ State #state { pending = queue:in({next_free, From}, Pending) },
+ hibernate};
+ {{value, WId}, Avail1} ->
+ {reply, get_worker_pid(WId), State #state { available = Avail1 },
+ hibernate}
+ end;
+
+handle_call(Msg, _From, State) ->
+ {stop, {unexpected_call, Msg}, State}.
+
+handle_cast({idle, WId}, State = #state { available = Avail,
+ pending = Pending }) ->
+ {noreply, case queue:out(Pending) of
+ {empty, _Pending} ->
+ State #state { available = queue:in(WId, Avail) };
+ {{value, {next_free, From}}, Pending1} ->
+ gen_server2:reply(From, get_worker_pid(WId)),
+ State #state { pending = Pending1 };
+ {{value, {run_async, Fun}}, Pending1} ->
+ worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
+ State #state { pending = Pending1 }
+ end, hibernate};
+
+handle_cast({run_async, Fun}, State = #state { available = Avail,
+ pending = Pending }) ->
+ {noreply,
+ case queue:out(Avail) of
+ {empty, _Avail} ->
+ State #state { pending = queue:in({run_async, Fun}, Pending)};
+ {{value, WId}, Avail1} ->
+ worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
+ State #state { available = Avail1 }
+ end, hibernate};
+
+handle_cast(Msg, State) ->
+ {stop, {unexpected_cast, Msg}, State}.
+
+handle_info(Msg, State) ->
+ {stop, {unexpected_info, Msg}, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Reason, State) ->
+ State.
+
+%%----------------------------------------------------------------------------
+
+get_worker_pid(WId) ->
+ [{WId, Pid, _Type, _Modules} | _] =
+ lists:dropwhile(fun ({Id, _Pid, _Type, _Modules})
+ when Id =:= WId -> false;
+ (_) -> true
+ end,
+ supervisor:which_children(worker_pool_sup)),
+ Pid.
diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl
new file mode 100644
index 0000000000..4ded63a8db
--- /dev/null
+++ b/src/worker_pool_sup.erl
@@ -0,0 +1,69 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(worker_pool_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0, start_link/1]).
+
+-export([init/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/1 ::
+ (non_neg_integer()) -> {'ok', pid()} | 'ignore' | {'error', any()}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-define(SERVER, ?MODULE).
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ start_link(erlang:system_info(schedulers)).
+
+start_link(WCount) ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, [WCount]).
+
+%%----------------------------------------------------------------------------
+
+init([WCount]) ->
+ {ok, {{one_for_one, 10, 10},
+ [{worker_pool, {worker_pool, start_link, []}, transient,
+ 16#ffffffff, worker, [worker_pool]} |
+ [{N, {worker_pool_worker, start_link, [N]}, transient, 16#ffffffff,
+ worker, [worker_pool_worker]} || N <- lists:seq(1, WCount)]]}}.
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
new file mode 100644
index 0000000000..d3a4811926
--- /dev/null
+++ b/src/worker_pool_worker.erl
@@ -0,0 +1,104 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(worker_pool_worker).
+
+-behaviour(gen_server2).
+
+-export([start_link/1, submit/2, submit_async/2, run/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/1 :: (any()) -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A).
+-spec(submit_async/2 ::
+ (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+
+%%----------------------------------------------------------------------------
+
+start_link(WId) ->
+ gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]).
+
+submit(Pid, Fun) ->
+ gen_server2:call(Pid, {submit, Fun}, infinity).
+
+submit_async(Pid, Fun) ->
+ gen_server2:cast(Pid, {submit_async, Fun}).
+
+init([WId]) ->
+ ok = worker_pool:idle(WId),
+ put(worker_pool_worker, true),
+ {ok, WId, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call({submit, Fun}, From, WId) ->
+ gen_server2:reply(From, run(Fun)),
+ ok = worker_pool:idle(WId),
+ {noreply, WId, hibernate};
+
+handle_call(Msg, _From, State) ->
+ {stop, {unexpected_call, Msg}, State}.
+
+handle_cast({submit_async, Fun}, WId) ->
+ run(Fun),
+ ok = worker_pool:idle(WId),
+ {noreply, WId, hibernate};
+
+handle_cast(Msg, State) ->
+ {stop, {unexpected_cast, Msg}, State}.
+
+handle_info(Msg, State) ->
+ {stop, {unexpected_info, Msg}, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Reason, State) ->
+ State.
+
+%%----------------------------------------------------------------------------
+
+run({M, F, A}) ->
+ apply(M, F, A);
+run(Fun) ->
+ Fun().