diff options
60 files changed, 3834 insertions, 1650 deletions
@@ -11,6 +11,7 @@ syntax: regexp ^dist/ ^include/rabbit_framing\.hrl$ ^src/rabbit_framing\.erl$ +^src/.*\_usage.erl$ ^rabbit\.plt$ ^basic.plt$ ^ebin/rabbit\.(app|rel|boot|script)$ @@ -25,3 +26,4 @@ syntax: regexp ^packaging/windows/rabbitmq-server-windows-.*\.zip$ ^docs/.*\.[15]\.gz$ +^docs/.*\.man\.xml$ @@ -10,12 +10,16 @@ DEPS_FILE=deps.mk SOURCE_DIR=src EBIN_DIR=ebin INCLUDE_DIR=include +DOCS_DIR=docs INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl -SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl +SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL) BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES)) TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS) WEB_URL=http://stage.rabbitmq.com/ -MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod)) +MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml)) +WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml) +USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-multi.1.xml +USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML))) ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes) PYTHON=python @@ -58,6 +62,14 @@ ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e ERL_EBIN=erl -noinput -pa $(EBIN_DIR) +define usage_xml_to_erl + $(subst __,_,$(patsubst $(DOCS_DIR)/rabbitmq%.1.xml, $(SOURCE_DIR)/rabbit_%_usage.erl, $(subst -,_,$(1)))) +endef + +define usage_dep + $(call usage_xml_to_erl, $(1)): $(1) $(DOCS_DIR)/usage.xsl +endef + all: $(TARGETS) $(DEPS_FILE): $(SOURCES) $(INCLUDES) @@ -66,9 +78,8 @@ $(DEPS_FILE): $(SOURCES) $(INCLUDES) $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app escript generate_app $(EBIN_DIR) $@ < $< -$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl +$(EBIN_DIR)/%.beam: erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< -# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@ @@ -100,7 +111,7 @@ clean: rm -f $(EBIN_DIR)/*.beam rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc - rm -f docs/*.[0-9].gz + rm -f $(DOCS_DIR)/*.[0-9].gz $(DOCS_DIR)/*.man.xml $(DOCS_DIR)/*.erl $(USAGES_ERL) rm -f $(RABBIT_PLT) rm -f $(DEPS_FILE) @@ -176,7 +187,7 @@ srcdist: distclean cp codegen.py Makefile generate_app generate_deps calculate-relative $(TARGET_SRC_DIR) cp -r scripts $(TARGET_SRC_DIR) - cp -r docs $(TARGET_SRC_DIR) + cp -r $(DOCS_DIR) $(TARGET_SRC_DIR) chmod 0755 $(TARGET_SRC_DIR)/scripts/* (cd dist; tar -zcf $(TARBALL_NAME).tar.gz $(TARBALL_NAME)) @@ -188,16 +199,36 @@ distclean: clean rm -rf dist find . -regex '.*\(~\|#\|\.swp\|\.dump\)' -exec rm {} \; -%.gz: %.pod - pod2man \ - -n `echo $$(basename $*) | sed -e 's/\.[[:digit:]]\+//'` \ - -s `echo $$(basename $*) | sed -e 's/.*\.\([^.]\+\)/\1/'` \ - -c "RabbitMQ AMQP Server" \ - -d "" \ - -r "" \ - $< | gzip --best > $@ - -docs_all: $(MANPAGES) +# xmlto can not read from standard input, so we mess with a tmp file. +%.gz: %.xml $(DOCS_DIR)/examples-to-end.xsl + xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \ + xmlto man -o $(DOCS_DIR) --stringparam man.indent.verbatims=0 $<.tmp && \ + gzip -f $(DOCS_DIR)/`basename $< .xml` + rm -f $<.tmp + +# Use tmp files rather than a pipeline so that we get meaningful errors +# Do not fold the cp into previous line, it's there to stop the file being +# generated but empty if we fail +$(SOURCE_DIR)/%_usage.erl: + xsltproc --stringparam modulename "`basename $@ .erl`" \ + $(DOCS_DIR)/usage.xsl $< > $@.tmp + sed -e 's/"/\\"/g' -e 's/%QUOTE%/"/g' $@.tmp > $@.tmp2 + fold -s $@.tmp2 > $@.tmp3 + mv $@.tmp3 $@ + rm $@.tmp $@.tmp2 + +# We rename the file before xmlto sees it since xmlto will use the name of +# the file to make internal links. +%.man.xml: %.xml $(DOCS_DIR)/html-to-website-xml.xsl + cp $< `basename $< .xml`.xml && \ + xmlto xhtml-nochunks `basename $< .xml`.xml ; rm `basename $< .xml`.xml + cat `basename $< .xml`.html | \ + xsltproc --novalid $(DOCS_DIR)/remove-namespaces.xsl - | \ + xsltproc --stringparam original `basename $<` $(DOCS_DIR)/html-to-website-xml.xsl - | \ + xmllint --format - > $@ + rm `basename $< .xml`.html + +docs_all: $(MANPAGES) $(WEB_MANPAGES) install: SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR)) install: all docs_all install_dirs @@ -215,8 +246,8 @@ install: all docs_all install_dirs done for section in 1 5; do \ mkdir -p $(MAN_DIR)/man$$section; \ - for manpage in docs/*.$$section.pod; do \ - cp docs/`basename $$manpage .pod`.gz $(MAN_DIR)/man$$section; \ + for manpage in $(DOCS_DIR)/*.$$section.gz; do \ + cp $$manpage $(MAN_DIR)/man$$section; \ done; \ done @@ -224,4 +255,27 @@ install_dirs: mkdir -p $(SBIN_DIR) mkdir -p $(TARGET_DIR)/sbin --include $(DEPS_FILE) +$(foreach XML, $(USAGES_XML), $(eval $(call usage_dep, $(XML)))) + +# Note that all targets which depend on clean must have clean in their +# name. Also any target that doesn't depend on clean should not have +# clean in its name, unless you know that you don't need any of the +# automatic dependency generation for that target (eg cleandb). + +# We want to load the dep file if *any* target *doesn't* contain +# "clean" - i.e. if removing all clean-like targets leaves something + +ifeq "$(MAKECMDGOALS)" "" +TESTABLEGOALS:=$(.DEFAULT_GOAL) +else +TESTABLEGOALS:=$(MAKECMDGOALS) +endif + +ifneq "$(strip $(TESTABLEGOALS))" "$(DEPS_FILE)" +ifneq "$(strip $(patsubst clean%,,$(patsubst %clean,,$(TESTABLEGOALS))))" "" +ifeq "$(strip $(wildcard $(DEPS_FILE)))" "" +$(info $(shell $(MAKE) $(DEPS_FILE))) +endif +include $(DEPS_FILE) +endif +endif diff --git a/codegen.py b/codegen.py index 96109610ad..91c70e8196 100644 --- a/codegen.py +++ b/codegen.py @@ -126,7 +126,7 @@ def printFileHeader(): %% %% Contributor(s): ______________________________________. %%""" - + def genErl(spec): def erlType(domain): return erlangTypeMap[spec.resolveDomain(domain)] @@ -151,7 +151,7 @@ def genErl(spec): def genMethodHasContent(m): print "method_has_content(%s) -> %s;" % (m.erlangName(), str(m.hasContent).lower()) - + def genMethodIsSynchronous(m): hasNoWait = "nowait" in fieldNameList(m.arguments) if m.isSynchronous and hasNoWait: @@ -219,6 +219,9 @@ def genErl(spec): else: pass + def genMethodRecord(m): + print "method_record(%s) -> #%s{};" % (m.erlangName(), m.erlangName()) + def genDecodeMethodFields(m): packedFields = packMethodFields(m.arguments) binaryPattern = ', '.join([methodFieldFragment(f) for f in packedFields]) @@ -299,6 +302,7 @@ def genErl(spec): -export([method_id/1]). -export([method_has_content/1]). -export([is_method_synchronous/1]). +-export([method_record/1]). -export([method_fieldnames/1]). -export([decode_method_fields/2]). -export([decode_properties/2]). @@ -323,6 +327,9 @@ bitvalue(undefined) -> 0. for m in methods: genMethodIsSynchronous(m) print "is_method_synchronous(Name) -> exit({unknown_method_name, Name})." + for m in methods: genMethodRecord(m) + print "method_record(Name) -> exit({unknown_method_name, Name})." + for m in methods: genMethodFieldNames(m) print "method_fieldnames(Name) -> exit({unknown_method_name, Name})." @@ -362,7 +369,7 @@ def genHrl(spec): result += ' = ' + conv_fn(field.defaultvalue) return result return ', '.join([fillField(f) for f in fields]) - + methods = spec.allMethods() printFileHeader() @@ -386,7 +393,7 @@ def generateErl(specPath): def generateHrl(specPath): genHrl(AmqpSpec(specPath)) - + if __name__ == "__main__": do_main(generateHrl, generateErl) diff --git a/docs/examples-to-end.xsl b/docs/examples-to-end.xsl new file mode 100644 index 0000000000..496fcc1c34 --- /dev/null +++ b/docs/examples-to-end.xsl @@ -0,0 +1,94 @@ +<?xml version='1.0'?> +<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" + xmlns:exsl="http://exslt.org/common" + xmlns:ng="http://docbook.org/docbook-ng" + xmlns:db="http://docbook.org/ns/docbook" + exclude-result-prefixes="exsl ng db" + version='1.0'> + +<xsl:output doctype-public="-//OASIS//DTD DocBook XML V4.5//EN" doctype-system="http://www.docbook.org/xml/4.5/docbookx.dtd" /> + +<!-- Don't copy examples through in place --> +<xsl:template match="*[@role='example-prefix']"/> +<xsl:template match="*[@role='example']"/> + +<!-- Copy everything through (with lower priority) --> +<xsl:template match="@*|node()"> + <xsl:copy><xsl:apply-templates select="@*|node()"/></xsl:copy> +</xsl:template> + +<!-- Copy the root node, and add examples at the end--> +<xsl:template match="/refentry"> +<refentry lang="en"> +<xsl:for-each select="*"> + <xsl:copy><xsl:apply-templates select="@*|node()"/></xsl:copy> +</xsl:for-each> + <refsect1> + <title>Examples</title> +<xsl:if test="//screen[@role='example']"> + <variablelist> +<xsl:for-each select="//screen[@role='example']"> + <varlistentry> + <term><command><xsl:copy-of select="text()"/></command></term> + <listitem> + <xsl:copy-of select="following-sibling::para[@role='example']"/> + </listitem> + </varlistentry> +</xsl:for-each> + </variablelist> +</xsl:if> +<!-- +We need to handle multiline examples separately, since not using a +variablelist leads to slightly less nice formatting (the explanation doesn't get +indented) +--> +<xsl:for-each select="//screen[@role='example-multiline']"> +<screen><emphasis role="bold"><xsl:copy-of select="text()"/></emphasis></screen> +<xsl:copy-of select="following-sibling::para[@role='example']"/> +</xsl:for-each> + </refsect1> +</refentry> +</xsl:template> + +<!-- + We show all the subcommands using XML that looks like this: + + <term> + <cmdsynopsis> + <command>list_connections</command> + <arg choice="opt"> + <replaceable>connectioninfoitem</replaceable> + ... + </arg> + </cmdsynopsis> + </term> + + However, while DocBook renders this sensibly for HTML, for some reason it + doen't show anything inside <cmdsynopsis> at all for man pages. I think what + we're doing is semantically correct so this is a bug in DocBook. The following + rules essentially do what DocBook does when <cmdsynopsis> is not inside a + <term>. +--> + +<xsl:template match="term/cmdsynopsis"> + <xsl:apply-templates mode="docbook-bug"/> +</xsl:template> + +<xsl:template match="command" mode="docbook-bug"> + <emphasis role="bold"><xsl:apply-templates mode="docbook-bug"/></emphasis> +</xsl:template> + +<xsl:template match="arg[@choice='opt']" mode="docbook-bug"> + [<xsl:apply-templates mode="docbook-bug"/>] +</xsl:template> + +<xsl:template match="arg[@choice='req']" mode="docbook-bug"> + {<xsl:apply-templates mode="docbook-bug"/>} +</xsl:template> + +<xsl:template match="replaceable" mode="docbook-bug"> + <emphasis><xsl:apply-templates mode="docbook-bug"/></emphasis> +</xsl:template> + +</xsl:stylesheet> + diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl new file mode 100644 index 0000000000..a35b869967 --- /dev/null +++ b/docs/html-to-website-xml.xsl @@ -0,0 +1,91 @@ +<?xml version='1.0'?> +<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" + xmlns:doc="http://www.rabbitmq.com/namespaces/ad-hoc/doc" + version='1.0'> + +<xsl:param name="original"/> + +<xsl:output method="xml" doctype-public="bug in xslt processor requires fake doctype" doctype-system="otherwise css isn't included" /> + +<xsl:template match="*"/> + +<!-- Copy every element through --> +<xsl:template match="@*|node()"> + <xsl:copy><xsl:apply-templates select="@*|node()"/></xsl:copy> +</xsl:template> + +<!-- Copy the root node, and munge the outer part of the page --> +<xsl:template match="/html"> +<xsl:processing-instruction name="xml-stylesheet">type="text/xml" href="page.xsl"</xsl:processing-instruction> +<html xmlns:doc="http://www.rabbitmq.com/namespaces/ad-hoc/doc"> + <head> + <title><xsl:value-of select="document($original)/refentry/refnamediv/refname"/><xsl:if test="document($original)/refentry/refmeta/manvolnum">(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</xsl:if> manual page</title> + </head> + <body> + <doc:div> + <xsl:choose> + <xsl:when test="document($original)/refentry/refmeta/manvolnum"> + <p> + This is the manual page for + <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>. + </p> + <p> + <a href="manpages.html">See a list of all manual pages</a>. + </p> + </xsl:when> + <xsl:otherwise> + <p> + This is the documentation for + <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>. + </p> + </xsl:otherwise> + </xsl:choose> + <p> + For more general documentation, please see the + <a href="admin-guide.html">administrator's guide</a>. + </p> + + <doc:toc class="compact"> + <doc:heading>Table of Contents</doc:heading> + </doc:toc> + + <xsl:apply-templates select="body/div[@class='refentry']"/> + </doc:div> + </body> +</html> +</xsl:template> + +<!-- Specific instructions to revert the DocBook HTML to be more like our ad-hoc XML schema --> + +<xsl:template match="div[@class='refsect1'] | div[@class='refnamediv'] | div[@class='refsynopsisdiv']"> + <doc:section name="{@title}"> + <xsl:apply-templates select="node()"/> + </doc:section> +</xsl:template> + +<xsl:template match="div[@class='refsect2']"> + <doc:subsection name="{@title}"> + <xsl:apply-templates select="node()"/> + </doc:subsection> +</xsl:template> + +<xsl:template match="h2 | h3"> + <doc:heading> + <xsl:apply-templates select="node()"/> + </doc:heading> +</xsl:template> + +<xsl:template match="pre[@class='screen']"> + <pre class="sourcecode"> + <xsl:apply-templates select="node()"/> + </pre> +</xsl:template> + +<xsl:template match="div[@class='cmdsynopsis']"> + <div class="cmdsynopsis" id="{p/code[@class='command']}"> + <xsl:apply-templates select="node()"/> + </div> +</xsl:template> + +</xsl:stylesheet> + diff --git a/docs/rabbitmq-activate-plugins.1.pod b/docs/rabbitmq-activate-plugins.1.pod deleted file mode 100644 index 42f0c4d2e8..0000000000 --- a/docs/rabbitmq-activate-plugins.1.pod +++ /dev/null @@ -1,37 +0,0 @@ -=head1 NAME - -rabbitmq-activate-plugins - command line tool for activating plugins -in a RabbitMQ broker - -=head1 SYNOPSIS - -rabbitmq-activate-plugins - -=head1 DESCRIPTION - -RabbitMQ is an implementation of AMQP, the emerging standard for high -performance enterprise messaging. The RabbitMQ server is a robust and -scalable implementation of an AMQP broker. - -rabbitmq-activate-plugins is a command line tool for activating -plugins installed into the broker's plugins directory. - -=head1 EXAMPLES - -To activate all of the installed plugins in the current RabbitMQ install, -execute: - - rabbitmq-activate-plugins - -=head1 SEE ALSO - -L<rabbitmq.conf(5)>, L<rabbitmq-multi(1)>, L<rabbitmq-server(1)>, -L<rabbitmqctl(1)>, L<rabbitmq-deactivate-plugins(1)> - -=head1 AUTHOR - -The RabbitMQ Team <info@rabbitmq.com> - -=head1 REFERENCES - -RabbitMQ Web Site: L<http://www.rabbitmq.com> diff --git a/docs/rabbitmq-activate-plugins.1.xml b/docs/rabbitmq-activate-plugins.1.xml new file mode 100644 index 0000000000..ef81c201f7 --- /dev/null +++ b/docs/rabbitmq-activate-plugins.1.xml @@ -0,0 +1,60 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd"> +<refentry lang="en"> + <refentryinfo> + <productname>RabbitMQ Server</productname> + <authorgroup> + <corpauthor>The RabbitMQ Team <<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>></corpauthor> + </authorgroup> + </refentryinfo> + + <refmeta> + <refentrytitle>rabbitmq-activate-plugins</refentrytitle> + <manvolnum>1</manvolnum> + <refmiscinfo class="manual">RabbitMQ Server</refmiscinfo> + </refmeta> + + <refnamediv> + <refname>rabbitmq-activate-plugins</refname> + <refpurpose>command line tool for activating plugins in a RabbitMQ broker</refpurpose> + </refnamediv> + + <refsynopsisdiv> + <cmdsynopsis> + <command>rabbitmq-activate-plugins</command> + </cmdsynopsis> + </refsynopsisdiv> + + <refsect1> + <title>Description</title> + <para> + RabbitMQ is an implementation of AMQP, the emerging standard for high +performance enterprise messaging. The RabbitMQ server is a robust and +scalable implementation of an AMQP broker. + </para> + <para> + rabbitmq-activate-plugins is a command line tool for activating +plugins installed into the broker's plugins directory. + </para> + <para role="example-prefix"> + For example: + </para> + <screen role="example"> + rabbitmq-activate-plugins + </screen> + <para role="example"> + This command activates all of the installed plugins in the current RabbitMQ install. + </para> + </refsect1> + + <refsect1> + <title>See also</title> + <para> + <citerefentry><refentrytitle>rabbitmq.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry> + <citerefentry><refentrytitle>rabbitmq-multi</refentrytitle><manvolnum>1</manvolnum></citerefentry> + <citerefentry><refentrytitle>rabbitmq-server</refentrytitle><manvolnum>1</manvolnum></citerefentry> + <citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry> + <citerefentry><refentrytitle>rabbitmq-deactivate-plugins</refentrytitle><manvolnum>1</manvolnum></citerefentry> + </para> + </refsect1> +</refentry> diff --git a/docs/rabbitmq-deactivate-plugins.1.pod b/docs/rabbitmq-deactivate-plugins.1.pod deleted file mode 100644 index eb4fbb90a3..0000000000 --- a/docs/rabbitmq-deactivate-plugins.1.pod +++ /dev/null @@ -1,37 +0,0 @@ -=head1 NAME - -rabbitmq-deactivate-plugins - command line tool for deactivating plugins -in a RabbitMQ broker - -=head1 SYNOPSIS - -rabbitmq-deactivate-plugins - -=head1 DESCRIPTION - -RabbitMQ is an implementation of AMQP, the emerging standard for high -performance enterprise messaging. The RabbitMQ server is a robust and -scalable implementation of an AMQP broker. - -rabbitmq-deactivate-plugins is a command line tool for deactivating -plugins installed into the broker. - -=head1 EXAMPLES - -To deactivate all of the installed plugins in the current RabbitMQ install, -execute: - - rabbitmq-deactivate-plugins - -=head1 SEE ALSO - -L<rabbitmq.conf(5)>, L<rabbitmq-multi(1)>, L<rabbitmq-server(1)>, -L<rabbitmqctl(1)>, L<rabbitmq-activate-plugins(1)> - -=head1 AUTHOR - -The RabbitMQ Team <info@rabbitmq.com> - -=head1 REFERENCES - -RabbitMQ Web Site: L<http://www.rabbitmq.com> diff --git a/docs/rabbitmq-deactivate-plugins.1.xml b/docs/rabbitmq-deactivate-plugins.1.xml new file mode 100644 index 0000000000..eacd014b83 --- /dev/null +++ b/docs/rabbitmq-deactivate-plugins.1.xml @@ -0,0 +1,60 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd"> +<refentry lang="en"> + <refentryinfo> + <productname>RabbitMQ Server</productname> + <authorgroup> + <corpauthor>The RabbitMQ Team <<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>></corpauthor> + </authorgroup> + </refentryinfo> + + <refmeta> + <refentrytitle>rabbitmq-deactivate-plugins</refentrytitle> + <manvolnum>1</manvolnum> + <refmiscinfo class="manual">RabbitMQ Server</refmiscinfo> + </refmeta> + + <refnamediv> + <refname>rabbitmq-deactivate-plugins</refname> + <refpurpose>command line tool for deactivating plugins in a RabbitMQ broker</refpurpose> + </refnamediv> + + <refsynopsisdiv> + <cmdsynopsis> + <command>rabbitmq-deactivate-plugins</command> + </cmdsynopsis> + </refsynopsisdiv> + + <refsect1> + <title>Description</title> + <para> + RabbitMQ is an implementation of AMQP, the emerging standard for high +performance enterprise messaging. The RabbitMQ server is a robust and +scalable implementation of an AMQP broker. + </para> + <para> +rabbitmq-deactivate-plugins is a command line tool for deactivating +plugins installed into the broker. + </para> + <para role="example-prefix"> + For example: + </para> + <screen role="example"> + rabbitmq-deactivate-plugins + </screen> + <para role="example"> + This command deactivates all of the installed plugins in the current RabbitMQ install. + </para> + </refsect1> + + <refsect1> + <title>See also</title> + <para> + <citerefentry><refentrytitle>rabbitmq.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry> + <citerefentry><refentrytitle>rabbitmq-multi</refentrytitle><manvolnum>1</manvolnum></citerefentry> + <citerefentry><refentrytitle>rabbitmq-server</refentrytitle><manvolnum>1</manvolnum></citerefentry> + <citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry> + <citerefentry><refentrytitle>rabbitmq-activate-plugins</refentrytitle><manvolnum>1</manvolnum></citerefentry> + </para> + </refsect1> +</refentry> diff --git a/docs/rabbitmq-multi.1.pod b/docs/rabbitmq-multi.1.pod deleted file mode 100644 index 640609eef9..0000000000 --- a/docs/rabbitmq-multi.1.pod +++ /dev/null @@ -1,59 +0,0 @@ -=head1 NAME - -rabbitmq-multi - start/stop local cluster RabbitMQ nodes - -=head1 SYNOPSIS - -rabbitmq-multi I<command> [command option] - -=head1 DESCRIPTION - -RabbitMQ is an implementation of AMQP, the emerging standard for high -performance enterprise messaging. The RabbitMQ server is a robust and -scalable implementation of an AMQP broker. - -rabbitmq-multi scripts allows for easy set-up of a cluster on a single -machine. - -See also L<rabbitmq-server(1)> for configuration information. - -=head1 COMMANDS - -=over - -=item start_all I<count> - -Start count nodes with unique names, listening on all IP addresses and -on sequential ports starting from 5672. - -=item status - -Print the status of all running RabbitMQ nodes. - -=item stop_all - -Stop all local RabbitMQ nodes, - -=item rotate_logs - -Rotate log files for all local and running RabbitMQ nodes. - -=back - -=head1 EXAMPLES - -Start 3 local RabbitMQ nodes with unique, sequential port numbers: - - rabbitmq-multi start_all 3 - -=head1 SEE ALSO - -L<rabbitmq.conf(5)>, L<rabbitmq-server(1)>, L<rabbitmqctl(1)> - -=head1 AUTHOR - -The RabbitMQ Team <info@rabbitmq.com> - -=head1 REFERENCES - -RabbitMQ Web Site: L<http://www.rabbitmq.com> diff --git a/docs/rabbitmq-multi.1.xml b/docs/rabbitmq-multi.1.xml new file mode 100644 index 0000000000..b3862fdf87 --- /dev/null +++ b/docs/rabbitmq-multi.1.xml @@ -0,0 +1,100 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd"> +<refentry lang="en"> + <refentryinfo> + <productname>RabbitMQ Server</productname> + <authorgroup> + <corpauthor>The RabbitMQ Team <<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>></corpauthor> + </authorgroup> + </refentryinfo> + + <refmeta> + <refentrytitle>rabbitmq-multi</refentrytitle> + <manvolnum>1</manvolnum> + <refmiscinfo class="manual">RabbitMQ Server</refmiscinfo> + </refmeta> + + <refnamediv> + <refname>rabbitmq-multi</refname> + <refpurpose>start/stop local cluster RabbitMQ nodes</refpurpose> + </refnamediv> + + <refsynopsisdiv> + <cmdsynopsis> + <command>rabbitmq-multi</command> + <arg choice="req"><replaceable>command</replaceable></arg> + <arg choice="opt" rep="repeat"><replaceable>command options</replaceable></arg> + </cmdsynopsis> + </refsynopsisdiv> + + <refsect1> + <title>Description</title> + <para> + RabbitMQ is an implementation of AMQP, the emerging standard for high +performance enterprise messaging. The RabbitMQ server is a robust and +scalable implementation of an AMQP broker. + </para> + <para> +rabbitmq-multi scripts allows for easy set-up of a cluster on a single +machine. + </para> + </refsect1> + + <refsect1> + <title>Commands</title> + <variablelist> + <varlistentry> + <term><cmdsynopsis><command>start_all</command> <arg choice="req"><replaceable>count</replaceable></arg></cmdsynopsis></term> + <listitem> + <para> +Start count nodes with unique names, listening on all IP addresses and +on sequential ports starting from 5672. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmq-multi start_all 3</screen> + <para role="example"> + Starts 3 local RabbitMQ nodes with unique, sequential port numbers. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>status</command></cmdsynopsis></term> + <listitem> + <para> +Print the status of all running RabbitMQ nodes. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>stop_all</command></cmdsynopsis></term> + <listitem> + <para> +Stop all local RabbitMQ nodes, + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>rotate_logs</command></cmdsynopsis></term> + <listitem> + <para> +Rotate log files for all local and running RabbitMQ nodes. + </para> + </listitem> + </varlistentry> + + </variablelist> + </refsect1> + + + <refsect1> + <title>See also</title> + <para> + <citerefentry><refentrytitle>rabbitmq.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry> + <citerefentry><refentrytitle>rabbitmq-server</refentrytitle><manvolnum>1</manvolnum></citerefentry> + <citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry> + </para> + </refsect1> +</refentry> diff --git a/docs/rabbitmq-server.1.pod b/docs/rabbitmq-server.1.pod deleted file mode 100644 index d74ab8d94f..0000000000 --- a/docs/rabbitmq-server.1.pod +++ /dev/null @@ -1,88 +0,0 @@ -=head1 NAME - -rabbitmq-server - start RabbitMQ AMQP server - -=head1 SYNOPSIS - -rabbitmq-server [-detached] - -=head1 DESCRIPTION - -RabbitMQ is an implementation of AMQP, the emerging standard for high -performance enterprise messaging. The RabbitMQ server is a robust and -scalable implementation of an AMQP broker. - -Running rabbitmq-server in the foreground displays a banner message, -and reports on progress in the startup sequence, concluding with the -message "broker running", indicating that the RabbitMQ broker has been -started successfully. To shut down the server, just terminate the -process or use L<rabbitmqctl(1)>. - -=head1 ENVIRONMENT - -=over - -=item B<RABBITMQ_MNESIA_BASE> - -Defaults to F</var/lib/rabbitmq/mnesia>. Set this to the directory where -Mnesia database files should be placed. - -=item B<RABBITMQ_LOG_BASE> - -Defaults to F</var/log/rabbitmq>. Log files generated by the server will -be placed in this directory. - -=item B<RABBITMQ_NODENAME> - -Defaults to rabbit. This can be useful if you want to run more than -one node per machine - B<RABBITMQ_NODENAME> should be unique per -erlang-node-and-machine combination. See clustering on a single -machine guide at -L<http://www.rabbitmq.com/clustering.html#single-machine> for details. - -=item B<RABBITMQ_NODE_IP_ADDRESS> - -Defaults to 0.0.0.0. This can be changed if you only want to bind to -one network interface. - -=item B<RABBITMQ_NODE_PORT> - -Defaults to 5672. - -=item B<RABBITMQ_CLUSTER_CONFIG_FILE> - -Defaults to F</etc/rabbitmq/rabbitmq_cluster.config>. If this file is -present it is used by the server to auto-configure a RabbitMQ cluster. -See the clustering guide at L<http://www.rabbitmq.com/clustering.html> -for details. - -=back - -=head1 OPTIONS - -=over - -=item B<-detached> - -start the server process in the background - -=back - -=head1 EXAMPLES - -Run RabbitMQ AMQP server in the background: - - rabbitmq-server -detached - -=head1 SEE ALSO - -L<rabbitmq.conf(5)>, L<rabbitmq-multi(1)>, L<rabbitmqctl(1)> - -=head1 AUTHOR - -The RabbitMQ Team <info@rabbitmq.com> - -=head1 REFERENCES - -RabbitMQ Web Site: L<http://www.rabbitmq.com> - diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml new file mode 100644 index 0000000000..25c2aefb8b --- /dev/null +++ b/docs/rabbitmq-server.1.xml @@ -0,0 +1,143 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd"> +<refentry lang="en"> + <refentryinfo> + <productname>RabbitMQ Server</productname> + <authorgroup> + <corpauthor>The RabbitMQ Team <<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>></corpauthor> + </authorgroup> + </refentryinfo> + + <refmeta> + <refentrytitle>rabbitmq-server</refentrytitle> + <manvolnum>1</manvolnum> + <refmiscinfo class="manual">RabbitMQ Server</refmiscinfo> + </refmeta> + + <refnamediv> + <refname>rabbitmq-server</refname> + <refpurpose>start RabbitMQ AMQP server</refpurpose> + </refnamediv> + + <refsynopsisdiv> + <cmdsynopsis> + <command>rabbitmq-multi</command> + <arg choice="opt">-detached</arg> + </cmdsynopsis> + </refsynopsisdiv> + + <refsect1> + <title>Description</title> + <para> + RabbitMQ is an implementation of AMQP, the emerging standard for high +performance enterprise messaging. The RabbitMQ server is a robust and +scalable implementation of an AMQP broker. + </para> + <para> +Running rabbitmq-server in the foreground displays a banner message, +and reports on progress in the startup sequence, concluding with the +message "broker running", indicating that the RabbitMQ broker has been +started successfully. To shut down the server, just terminate the +process or use rabbitmqctl(1). + </para> + </refsect1> + + <refsect1> + <title>Environment</title> + <variablelist> + + <varlistentry> + <term>RABBITMQ_MNESIA_BASE</term> + <listitem> + <para> +Defaults to <filename>/var/lib/rabbitmq/mnesia</filename>. Set this to the directory where +Mnesia database files should be placed. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>RABBITMQ_LOG_BASE</term> + <listitem> + <para> +Defaults to <filename>/var/log/rabbitmq</filename>. Log files generated by the server will +be placed in this directory. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>RABBITMQ_NODENAME</term> + <listitem> + <para> +Defaults to rabbit. This can be useful if you want to run more than +one node per machine - <envar>RABBITMQ_NODENAME</envar> should be unique per +erlang-node-and-machine combination. See the +<ulink url="http://www.rabbitmq.com/clustering.html#single-machine">clustering on a single +machine guide</ulink> for details. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>RABBITMQ_NODE_IP_ADDRESS</term> + <listitem> + <para> +Defaults to 0.0.0.0. This can be changed if you only want to bind to +one network interface. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>RABBITMQ_NODE_PORT</term> + <listitem> + <para> +Defaults to 5672. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>RABBITMQ_CLUSTER_CONFIG_FILE</term> + <listitem> + <para> +Defaults to <filename>/etc/rabbitmq/rabbitmq_cluster.config</filename>. If this file is +present it is used by the server to auto-configure a RabbitMQ cluster. +See the <ulink url="http://www.rabbitmq.com/clustering.html">clustering guide</ulink> +for details. + </para> + </listitem> + </varlistentry> + + </variablelist> + </refsect1> + + <refsect1> + <title>Options</title> + <variablelist> + <varlistentry> + <term>-detached</term> + <listitem> + <para> + start the server process in the background + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmq-server -detached</screen> + <para role="example"> + Runs RabbitMQ AMQP server in the background. + </para> + </listitem> + </varlistentry> + </variablelist> + </refsect1> + + <refsect1> + <title>See also</title> + <para> + <citerefentry><refentrytitle>rabbitmq.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry> + <citerefentry><refentrytitle>rabbitmq-multi</refentrytitle><manvolnum>1</manvolnum></citerefentry> + <citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry> + </para> + </refsect1> +</refentry> diff --git a/docs/rabbitmq-service.xml b/docs/rabbitmq-service.xml new file mode 100644 index 0000000000..d59ed63813 --- /dev/null +++ b/docs/rabbitmq-service.xml @@ -0,0 +1,228 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd"> +<refentry lang="en"> + <refentryinfo> + <productname>RabbitMQ Server</productname> + <authorgroup> + <corpauthor>The RabbitMQ Team <<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>></corpauthor> + </authorgroup> + </refentryinfo> + + <refmeta> + <refentrytitle>rabbitmq-service.bat</refentrytitle> + <refmiscinfo class="manual">RabbitMQ Server</refmiscinfo> + </refmeta> + + <refnamediv> + <refname>rabbitmq-service.bat</refname> + <refpurpose>manage RabbitMQ AMQP service</refpurpose> + </refnamediv> + + <refsynopsisdiv> + <cmdsynopsis> + <command>rabbitmq-service.bat</command> + <arg choice="opt">command</arg> + </cmdsynopsis> + </refsynopsisdiv> + + <refsect1> + <title>Description</title> + <para> + RabbitMQ is an implementation of AMQP, the emerging standard for high +performance enterprise messaging. The RabbitMQ server is a robust and +scalable implementation of an AMQP broker. + </para> + <para> +Running <command>rabbitmq-service</command> allows the RabbitMQ broker to be run as a +service on NT/2000/2003/XP/Vista® environments. The RabbitMQ broker +service can be started and stopped using the Windows® services +applet. + </para> + <para> +By default the service will run in the authentication context of the +local system account. It is therefore necessary to synchronise Erlang +cookies between the local system account (typically +<filename>C:\WINDOWS\.erlang.cookie</filename> and the account that will be used to +run <command>rabbitmqctl</command>. + </para> + </refsect1> + + <refsect1> + <title>Commands</title> + <variablelist> + + <varlistentry> + <term>help</term> + <listitem> + <para> +Display usage information. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>install</term> + <listitem> + <para> +Install the service. The service will not be started. +Subsequent invocations will update the service parameters if +relevant environment variables were modified. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>remove</term> + <listitem> + <para> +Remove the service. If the service is running then it will +automatically be stopped before being removed. No files will be +deleted as a consequence and <command>rabbitmq-server</command> will remain operable. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>start</term> + <listitem> + <para> +Start the service. The service must have been correctly installed +beforehand. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>stop</term> + <listitem> + <para> +Stop the service. The service must be running for this command to +have any effect. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>disable</term> + <listitem> + <para> +Disable the service. This is the equivalent of setting the startup +type to <code>Disabled</code> using the service control panel. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>enable</term> + <listitem> + <para> +Enable the service. This is the equivalent of setting the startup +type to <code>Automatic</code> using the service control panel. + </para> + </listitem> + </varlistentry> + </variablelist> + </refsect1> + + <refsect1> + <title>Environment</title> + <variablelist> + + <varlistentry> + <term>RABBITMQ_SERVICENAME</term> + <listitem> + <para> +Defaults to RabbitMQ. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>RABBITMQ_BASE</term> + <listitem> + <para> +Defaults to the application data directory of the current user. +This is the location of log and database directories. + + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>RABBITMQ_NODENAME</term> + <listitem> + <para> +Defaults to rabbit. This can be useful if you want to run more than +one node per machine - <envar>RABBITMQ_NODENAME</envar> should be unique per +erlang-node-and-machine combination. See the +<ulink url="http://www.rabbitmq.com/clustering.html#single-machine">clustering on a single +machine guide</ulink> for details. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>RABBITMQ_NODE_IP_ADDRESS</term> + <listitem> + <para> +Defaults to 0.0.0.0. This can be changed if you only want to bind to +one network interface. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>RABBITMQ_NODE_PORT</term> + <listitem> + <para> +Defaults to 5672. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>ERLANG_SERVICE_MANAGER_PATH</term> + <listitem> + <para> +Defaults to <filename>C:\Program Files\erl5.5.5\erts-5.5.5\bin</filename> +(or <filename>C:\Program Files (x86)\erl5.5.5\erts-5.5.5\bin</filename> for 64-bit +environments). This is the installation location of the Erlang service +manager. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>RABBITMQ_CLUSTER_CONFIG_FILE</term> + <listitem> + <para> +If this file is +present it is used by the server to auto-configure a RabbitMQ cluster. +See the <ulink url="http://www.rabbitmq.com/clustering.html">clustering guide</ulink> +for details. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>RABBITMQ_CONSOLE_LOG</term> + <listitem> + <para> +Set this varable to <code>new</code> or <code>reuse</code> to have the console +output from the server redirected to a file named <code>SERVICENAME</code>.debug +in the application data directory of the user that installed the service. +Under Vista this will be <filename>C:\Users\AppData\username\SERVICENAME</filename>. +Under previous versions of Windows this will be +<filename>C:\Documents and Settings\username\Application Data\SERVICENAME</filename>. +If <code>RABBITMQ_CONSOLE_LOG</code> is set to <code>new</code> then a new file will be +created each time the service starts. If <code>RABBITMQ_CONSOLE_LOG</code> is +set to <code>reuse</code> then the file will be overwritten each time the +service starts. The default behaviour when <code>RABBITMQ_CONSOLE_LOG</code> is +not set or set to a value other than <code>new</code> or <code>reuse</code> is to discard +the server output. + </para> + </listitem> + </varlistentry> + </variablelist> + </refsect1> +</refentry> diff --git a/docs/rabbitmq.conf.5.pod b/docs/rabbitmq.conf.5.pod deleted file mode 100644 index a7bf4c0942..0000000000 --- a/docs/rabbitmq.conf.5.pod +++ /dev/null @@ -1,69 +0,0 @@ -=head1 NAME - -F</etc/rabbitmq/rabbitmq.conf> - default settings for RabbitMQ AMQP -server - -=head1 DESCRIPTION - -F</etc/rabbitmq/rabbitmq.conf> contains variable settings that override the -defaults built in to the RabbitMQ startup scripts. - -The file is interpreted by the system shell, and so should consist of -a sequence of shell environment variable definitions. Normal shell -syntax is permitted (since the file is sourced using the shell "." -operator), including line comments starting with "#". - -In order of preference, the startup scripts get their values from the -environment, from F</etc/rabbitmq/rabbitmq.conf> and finally from the -built-in default values. For example, for the B<RABBITMQ_NODENAME> -setting, - -=over - -=item B<RABBITMQ_NODENAME> - -from the environment is checked first. If it is absent or equal to the -empty string, then - -=item B<NODENAME> - -from L</etc/rabbitmq/rabbitmq.conf> is checked. If it is also absent -or set equal to the empty string then the default value from the -startup script is used. - -The variable names in /etc/rabbitmq/rabbitmq.conf are always equal to the -environment variable names, with the B<RABBITMQ_> prefix removed: -B<RABBITMQ_NODE_PORT> from the environment becomes B<NODE_PORT> in the -F</etc/rabbitmq/rabbitmq.conf> file, etc. - -=back - -=head1 EXAMPLES - -The following is an example of a complete -F</etc/rabbitmq/rabbitmq.conf> file that overrides the default Erlang -node name from "rabbit" to "hare": - - # I am a complete /etc/rabbitmq/rabbitmq.conf file. - # Comment lines start with a hash character. - # This is a /bin/sh script file - use ordinary envt var syntax - NODENAME=hare - -=head1 SEE ALSO - -L<rabbitmq-server(1)>, L<rabbitmq-multi(1)>, L<rabbitmqctl(1)> - -=head1 AUTHOR - -Originally written by The RabbitMQ Team <info@rabbitmq.com> - -=head1 COPYRIGHT - -This package, the RabbitMQ server is licensed under the MPL. - -If you have any questions regarding licensing, please contact us at -info@rabbitmq.com. - -=head1 REFERENCES - -RabbitMQ Web Site: L<http://www.rabbitmq.com> diff --git a/docs/rabbitmq.conf.5.xml b/docs/rabbitmq.conf.5.xml new file mode 100644 index 0000000000..34f20f9226 --- /dev/null +++ b/docs/rabbitmq.conf.5.xml @@ -0,0 +1,84 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd"> +<refentry lang="en"> + <refentryinfo> + <productname>RabbitMQ Server</productname> + <authorgroup> + <corpauthor>The RabbitMQ Team <<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>></corpauthor> + </authorgroup> + </refentryinfo> + + <refmeta> + <refentrytitle>rabbitmq.conf</refentrytitle> + <manvolnum>5</manvolnum> + <refmiscinfo class="manual">RabbitMQ Server</refmiscinfo> + </refmeta> + + <refnamediv> + <refname>rabbitmq.conf</refname> + <refpurpose>default settings for RabbitMQ AMQP server</refpurpose> + </refnamediv> + + <refsect1> + <title>Description</title> + <para> +<filename>/etc/rabbitmq/rabbitmq.conf</filename> contains variable settings that override the +defaults built in to the RabbitMQ startup scripts. + </para> + <para> +The file is interpreted by the system shell, and so should consist of +a sequence of shell environment variable definitions. Normal shell +syntax is permitted (since the file is sourced using the shell "." +operator), including line comments starting with "#". + </para> + <para> +In order of preference, the startup scripts get their values from the +environment, from <filename>/etc/rabbitmq/rabbitmq.conf</filename> and finally from the +built-in default values. For example, for the <envar>RABBITMQ_NODENAME</envar> +setting, + </para> + <para> + <envar>RABBITMQ_NODENAME</envar> + </para> + <para> +from the environment is checked first. If it is absent or equal to the +empty string, then + </para> + <para> + <envar>NODENAME</envar> + </para> + <para> +from <filename>/etc/rabbitmq/rabbitmq.conf</filename> is checked. If it is also absent +or set equal to the empty string then the default value from the +startup script is used. + </para> + <para> +The variable names in /etc/rabbitmq/rabbitmq.conf are always equal to the +environment variable names, with the <envar>RABBITMQ_</envar> prefix removed: +<envar>RABBITMQ_NODE_PORT</envar> from the environment becomes <envar>NODE_PORT</envar> in the +<filename>/etc/rabbitmq/rabbitmq.conf</filename> file, etc. + </para> + <para role="example-prefix">For example:</para> + <screen role="example-multiline"> +# I am a complete /etc/rabbitmq/rabbitmq.conf file. +# Comment lines start with a hash character. +# This is a /bin/sh script file - use ordinary envt var syntax +NODENAME=hare + </screen> + <para role="example"> + This is an example of a complete + <filename>/etc/rabbitmq/rabbitmq.conf</filename> file that overrides the default Erlang + node name from "rabbit" to "hare". + </para> + + </refsect1> + + <refsect1> + <title>See also</title> + <para> + <citerefentry><refentrytitle>rabbitmq-multi</refentrytitle><manvolnum>1</manvolnum></citerefentry> + <citerefentry><refentrytitle>rabbitmq-server</refentrytitle><manvolnum>1</manvolnum></citerefentry> + <citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry> + </para> + </refsect1> +</refentry> diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod deleted file mode 100644 index e26767ab4f..0000000000 --- a/docs/rabbitmqctl.1.pod +++ /dev/null @@ -1,536 +0,0 @@ -=head1 NAME - -rabbitmqctl - command line tool for managing a RabbitMQ broker - -=head1 SYNOPSIS - -rabbitmqctl [-n I<node>] I<<command>> [command options] - -=head1 DESCRIPTION - -RabbitMQ is an implementation of AMQP, the emerging standard for high -performance enterprise messaging. The RabbitMQ server is a robust and -scalable implementation of an AMQP broker. - -rabbitmqctl is a command line tool for managing a RabbitMQ broker. -It performs all actions by connecting to one of the broker's nodes. - - -=head1 OPTIONS - -=over - -=item B<-n> I<node> - -Default node is C<rabbit@server>, where server is the local host. On -a host named C<server.example.com>, the node name of the RabbitMQ -Erlang node will usually be rabbit@server (unless RABBITMQ_NODENAME -has been set to some non-default value at broker startup time). The -output of hostname -s is usually the correct suffix to use after the -"@" sign. See rabbitmq-server(1) for details of configuring the -RabbitMQ broker. - -=item B<-q> - -Quiet output mode is selected with the B<-q> flag. Informational -messages are suppressed when quiet mode is in effect. - -=back - -=head1 COMMANDS - -=head2 APPLICATION AND CLUSTER MANAGEMENT - -=over - -=item stop - -Stop the Erlang node on which RabbitMQ broker is running. - -=item stop_app - -Stop the RabbitMQ application, leaving the Erlang node running. This -command is typically run prior to performing other management actions -that require the RabbitMQ application to be stopped, e.g. I<reset>. - -=item start_app - -Start the RabbitMQ application. This command is typically run prior -to performing other management actions that require the RabbitMQ -application to be stopped, e.g. I<reset>. - -=item status - -Display various information about the RabbitMQ broker, such as whether -the RabbitMQ application on the current node, its version number, what -nodes are part of the broker, which of these are running. - -=item reset - -Return a RabbitMQ node to its virgin state. Removes the node from any -cluster it belongs to, removes all data from the management database, -such as configured users, vhosts and deletes all persistent messages. - -=item force_reset - -The same as I<reset> command, but resets the node unconditionally, -regardless of the current management database state and cluster -configuration. It should only be used as a last resort if the -database or cluster configuration has been corrupted. - -=item rotate_logs [suffix] - -Instruct the RabbitMQ node to rotate the log files. The RabbitMQ -broker will attempt to append the current contents of the log file to -the file with the name composed of the original name and the -suffix. It will create a new file if such a file does not already -exist. When no I<suffix> is specified, the empty log file is simply -created at the original location; no rotation takes place. When an -error occurs while appending the contents of the old log file, the -operation behaves in the same way as if no I<suffix> was specified. -This command might be helpful when you are e.g. writing your own -logrotate script and you do not want to restart the RabbitMQ node. - -=item cluster I<clusternode> ... - -Instruct the node to become member of a cluster with the specified -nodes determined by I<clusternode> option(s). See -L<http://www.rabbitmq.com/clustering.html> for more information about -clustering. - -=item close_connection I<connectionpid> I<explanation> - -Instruct the broker to close the connection associated with the Erlang -process id I<connectionpid> (see also the I<list_connections> -command), passing the I<explanation> string to the connected client as -part of the AMQP connection shutdown protocol. - -=back - -=head2 USER MANAGEMENT - -=over - -=item add_user I<username> I<password> - -Create a user named I<username> with (initial) password I<password>. - -=item delete_user I<username> - -Delete the user named I<username>. - -=item change_password I<username> I<newpassword> - -Change the password for the user named I<username> to I<newpassword>. - -=item list_users - -List all users, one per line. - -=back - -=head2 ACCESS CONTROL - -=over - -=item add_vhost I<vhostpath> - -Create a new virtual host called I<vhostpath>. - -=item delete_vhost I<vhostpath> - -Delete a virtual host I<vhostpath>. This command deletes also all its -exchanges, queues and user mappings. - -=item list_vhosts - -List all virtual hosts, one per line. - -=item set_permissions [-p I<vhostpath>] I<username> I<regexp> I<regexp> I<regexp> - -Set the permissions for the user named I<username> in the virtual host -I<vhostpath>, granting I<configure>, I<write> and I<read> access to -resources with names matching the first, second and third I<regexp>, -respectively. - -=item clear_permissions [-p I<vhostpath>] I<username> - -Remove the permissions for the user named I<username> in the virtual -host I<vhostpath>. - -=item list_permissions [-p I<vhostpath>] - -List all the users and their permissions in the virtual host -I<vhostpath>. Each output line contains the username and their -I<configure>, I<write> and I<read> access regexps, separated by tab -characters. - -=item list_user_permissions I<username> - -List the permissions of the user named I<username> across all virtual -hosts. - -=back - -=head2 SERVER STATUS - -=over - -=item list_queues [-p I<vhostpath>] [I<queueinfoitem> ...] - -List queue information by virtual host. Each line printed -describes a queue, with the requested I<queueinfoitem> values -separated by tab characters. If no I<queueinfoitem>s are -specified then I<name> and I<messages> are assumed. - -=back - -=head3 Queue information items - -=over - -=item name - -name of the queue - -=item durable - -whether the queue survives server restarts - -=item auto_delete - -whether the queue will be deleted when no longer used - -=item arguments - -queue arguments - -=item pid - -id of the Erlang process associated with the queue - -=item owner_pid - -id of the Erlang process representing the connection which is the -exclusive owner of the queue, or empty if the queue is non-exclusive - -=item exclusive_consumer_pid - -id of the Erlang process representing the channel of the exclusive -consumer subscribed to this queue, or empty if there is no exclusive -consumer - -=item exclusive_consumer_tag - -consumer tag of the exclusive consumer subscribed to this queue, or -empty if there is no exclusive consumer - -=item messages_ready - -number of messages ready to be delivered to clients - -=item messages_unacknowledged - -number of messages delivered to clients but not yet acknowledged - -=item messages_uncommitted - -number of messages published in as yet uncommitted transactions - -=item messages - -sum of ready, unacknowledged and uncommitted messages - -=item acks_uncommitted - -number of acknowledgements received in as yet uncommitted transactions - -=item consumers - -number of consumers - -=item transactions - -number of transactions - -=item memory - -bytes of memory consumed by the Erlang process for the queue, -including stack, heap and internal structures - -=back - -=over - -=item list_exchanges [-p I<vhostpath>] [I<exchangeinfoitem> ...] - -List queue information by virtual host. Each line printed describes an -exchange, with the requested I<exchangeinfoitem> values separated by -tab characters. If no I<exchangeinfoitem>s are specified then I<name> -and I<type> are assumed. - -=back - -=head3 Exchange information items - -=over - -=item name - -name of the exchange - -=item type - -exchange type (B<direct>, B<topic>, B<fanout>, or B<headers>) - -=item durable - -whether the exchange survives server restarts - -=item auto_delete - -whether the exchange is deleted when no longer used - -=item arguments - -exchange arguments - -=back - -=over - -=item list_bindings [-p I<vhostpath>] - -List bindings by virtual host. Each line printed describes a binding, -with the exchange name, queue name, routing key and arguments, -separated by tab characters. - -=item list_connections [I<connectioninfoitem> ...] - -List current AMQP connections. Each line printed describes a -connection, with the requested I<connectioninfoitem> values separated -by tab characters. If no I<connectioninfoitem>s are specified then -I<user>, I<peer_address>, I<peer_port> and I<state> are assumed. - -=back - -=head3 Connection information items - -=over - -=item pid - -id of the Erlang process associated with the connection - -=item address - -server IP number - -=item port - -server port - -=item peer_address - -peer address - -=item peer_port - -peer port - -=item state - -connection state (B<pre-init>, B<starting>, B<tuning>, B<opening>, -B<running>, B<closing>, B<closed>) - -=item channels - -number of channels using the connection - -=item user - -username associated with the connection - -=item vhost - -virtual host - -=item timeout - -connection timeout - -=item frame_max - -maximum frame size (bytes) - -=item client_properties - -informational properties transmitted by the client during connection -establishment - -=item recv_oct - -octets received - -=item recv_cnt - -packets received - -=item send_oct - -octets sent - -=item send_cnt - -packets sent - -=item send_pend - -send queue size - -=back - -=over - -=item list_channels [I<channelinfoitem> ...] - -List channel information. Each line printed describes a channel, with -the requested I<channelinfoitem> values separated by tab characters. -If no I<channelinfoitem>s are specified then I<pid>, I<user>, -I<transactional>, I<consumer_count>, and I<messages_unacknowledged> -are assumed. - -The list includes channels which are part of ordinary AMQP connections -(as listed by list_connections) and channels created by various -plug-ins and other extensions. - -=back - -=head3 Channel information items - -=over - -=item pid - -id of the Erlang process associated with the channel - -=item connection - -id of the Erlang process associated with the connection to which the -channel belongs - -=item number - -the number of the channel, which uniquely identifies it within a -connection - -=item user - -username associated with the channel - -=item vhost - -virtual host in which the channel operates - -=item transactional - -true if the channel is in transactional mode, false otherwise - -=item consumer_count - -number of logical AMQP consumers retrieving messages via the channel - -=item messages_unacknowledged - -number of messages delivered via this channel but not yet acknowledged - -=item acks_uncommitted - -number of acknowledgements received in an as yet uncommitted -transaction - -=item prefetch_count - -QoS prefetch count limit in force, 0 if unlimited - -=back - -=item list_consumers - -List consumers, i.e. subscriptions to a queue's message stream. Each -line printed shows, separated by tab characters, the name of the queue -subscribed to, the id of the channel process via which the -subscription was created and is managed, the consumer tag which -uniquely identifies the subscription within a channel, and a boolean -indicating whether acknowledgements are expected for messages -delivered to this consumer. - -=back - -The list_queues, list_exchanges, list_bindings and list_consumers -commands accept an optional virtual host parameter for which to -display results, defaulting to I<"/">. The default can be overridden -with the B<-p> flag. - -=head1 OUTPUT ESCAPING - -Various items that may appear in the output of rabbitmqctl can contain -arbitrary octets. If a octet corresponds to a non-printing ASCII -character (values 0 to 31, and 127), it will be escaped in the output, -using a sequence consisting of a backslash character followed by three -octal digits giving the octet's value (i.e., as used in string -literals in the C programming language). An octet corresponding to -the backslash character (i.e. with value 92) will be escaped using a -sequence of two backslash characters. Octets with a value of 128 or -above are not escaped, in order to preserve strings encoded with -UTF-8. - -The items to which this escaping scheme applies are: - -=over - -=item * -Usernames - -=item * -Virtual host names - -=item * -Queue names - -=item * -Exchange names - -=item * -Regular expressions used for access control - -=back - -=head1 EXAMPLES - -Create a user named foo with (initial) password bar at the Erlang node -rabbit@test: - - rabbitmqctl -n rabbit@test add_user foo bar - -Grant user named foo access to the virtual host called test at the -default Erlang node: - - rabbitmqctl map_user_vhost foo test - -Append the current logs' content to the files with ".1" suffix and reopen -them: - - rabbitmqctl rotate_logs .1 - -=head1 SEE ALSO - -rabbitmq.conf(5), rabbitmq-multi(1), rabbitmq-server(1) - -=head1 AUTHOR - -The RabbitMQ Team <info@rabbitmq.com> - -=head1 REFERENCES - -RabbitMQ Web Site: L<http://www.rabbitmq.com> diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml new file mode 100644 index 0000000000..7634b2d247 --- /dev/null +++ b/docs/rabbitmqctl.1.xml @@ -0,0 +1,1042 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd"> +<!-- + There is some extra magic in this document besides the usual DocBook semantics + to allow us to derive manpages, HTML and usage messages from the same source + document. + + Examples need to be moved to the end for man pages. To this end, <para>s and + <screen>s with role="example" will be moved, and with role="example-prefix" + will be removed. + + The usage messages are more involved. We have some magic in usage.xsl to pull + out the command synopsis, global option and subcommand synopses. We also pull + out <para>s with role="usage". + + Finally we construct lists of possible values for subcommand options, if the + subcommand's <varlistentry> has role="usage-has-option-list". The option which + takes the values should be marked with role="usage-option-list". +--> + +<refentry lang="en"> + <refentryinfo> + <productname>RabbitMQ Server</productname> + <authorgroup> + <corpauthor>The RabbitMQ Team <<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>></corpauthor> + </authorgroup> + </refentryinfo> + + <refmeta> + <refentrytitle>rabbitmqctl</refentrytitle> + <manvolnum>1</manvolnum> + <refmiscinfo class="manual">RabbitMQ Service</refmiscinfo> + </refmeta> + + <refnamediv> + <refname>rabbitmqctl</refname> + <refpurpose>command line tool for managing a RabbitMQ broker</refpurpose> + </refnamediv> + + <refsynopsisdiv> + <cmdsynopsis> + <command>rabbitmqctl</command> + <arg choice="opt">-n <replaceable>node</replaceable></arg> + <arg choice="opt">-q</arg> + <arg choice="req"><replaceable>command</replaceable></arg> + <arg choice="opt" rep="repeat"><replaceable>command options</replaceable></arg> + </cmdsynopsis> + </refsynopsisdiv> + + <refsect1> + <title>Description</title> + <para> + RabbitMQ is an implementation of AMQP, the emerging standard for high + performance enterprise messaging. The RabbitMQ server is a robust and + scalable implementation of an AMQP broker. + </para> + <para> + <command>rabbitmqctl</command> is a command line tool for managing a + RabbitMQ broker. It performs all actions by connecting to one of the + broker's nodes. + </para> + </refsect1> + + <refsect1> + <title>Options</title> + <variablelist> + <varlistentry> + <term><cmdsynopsis><arg choice="opt">-n <replaceable>node</replaceable></arg></cmdsynopsis></term> + <listitem> + <para role="usage"> + Default node is "rabbit@server", where server is the local host. On + a host named "server.example.com", the node name of the RabbitMQ + Erlang node will usually be rabbit@server (unless RABBITMQ_NODENAME + has been set to some non-default value at broker startup time). The + output of <command>hostname -s</command> is usually the correct suffix to use after the + "@" sign. See rabbitmq-server(1) for details of configuring the + RabbitMQ broker. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><cmdsynopsis><arg choice="opt">-q</arg></cmdsynopsis></term> + <listitem> + <para role="usage"> + Quiet output mode is selected with the "-q" flag. Informational + messages are suppressed when quiet mode is in effect. + </para> + </listitem> + </varlistentry> + </variablelist> + <para> + Flags must precede all other parameters to <command>rabbitmqctl</command>. + </para> + </refsect1> + + <refsect1> + <title>Commands</title> + + <refsect2> + <title>Application and Cluster Management</title> + + <variablelist> + <varlistentry> + <term><cmdsynopsis><command>stop</command></cmdsynopsis></term> + <listitem> + <para> + Stops the Erlang node on which RabbitMQ is running. To + restart the node follow the instructions for <citetitle>Running + the Server</citetitle> in the <ulink url="http://www.rabbitmq.com/install.html">installation + guide</ulink>. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl stop</screen> + <para role="example"> + This command instructs the RabbitMQ node to terminate. + </para> + </listitem> + </varlistentry> + + <varlistentry id="stop_app"> + <term><cmdsynopsis><command>stop_app</command></cmdsynopsis></term> + <listitem> + <para> + Stops the RabbitMQ application, leaving the Erlang node + running. + </para> + <para> + This command is typically run prior to performing other + management actions that require the RabbitMQ application + to be stopped, e.g. <link + linkend="reset"><command>reset</command></link>. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl stop_app</screen> + <para role="example"> + This command instructs the RabbitMQ node to stop the + RabbitMQ application. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>start_app</command></cmdsynopsis></term> + <listitem> + <para> + Starts the RabbitMQ application. + </para> + <para> + This command is typically run after performing other + management actions that required the RabbitMQ application + to be stopped, e.g. <link + linkend="reset"><command>reset</command></link>. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl start_app</screen> + <para role="example"> + This command instructs the RabbitMQ node to start the + RabbitMQ application. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>status</command></cmdsynopsis></term> + <listitem> + <para> + Displays various information about the RabbitMQ broker, + such as whether the RabbitMQ application on the current + node, its version number, what nodes are part of the + broker, which of these are running. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl status</screen> + <para role="example"> + This command displays information about the RabbitMQ + broker. + </para> + </listitem> + </varlistentry> + + <varlistentry id="reset"> + <term><cmdsynopsis><command>reset</command></cmdsynopsis></term> + <listitem> + <para> + Return a RabbitMQ node to its virgin state. + </para> + <para> + Removes the node from any cluster it belongs to, removes + all data from the management database, such as configured + users and vhosts, and deletes all persistent + messages. + </para> + <para> + For <command>reset</command> and <command>force_reset</command> to + succeed the RabbitMQ application must have been stopped, + e.g. with <link linkend="stop_app"><command>stop_app</command></link>. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl reset</screen> + <para role="example"> + This command resets the RabbitMQ node. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>force_reset</command></cmdsynopsis></term> + <listitem> + <para> + Forcefully return a RabbitMQ node to its virgin state. + </para> + <para> + The <command>force_reset</command> command differs from + <command>reset</command> in that it resets the node + unconditionally, regardless of the current management + database state and cluster configuration. It should only + be used as a last resort if the database or cluster + configuration has been corrupted. + </para> + <para> + For <command>reset</command> and <command>force_reset</command> to + succeed the RabbitMQ application must have been stopped, + e.g. with <link linkend="stop_app"><command>stop_app</command></link>. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl force_reset</screen> + <para role="example"> + This command resets the RabbitMQ node. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>rotate_logs</command> <arg choice="req"><replaceable>suffix</replaceable></arg></cmdsynopsis></term> + <listitem> + <para> + Instruct the RabbitMQ node to rotate the log files. + </para> + <para> + The RabbitMQ broker will attempt to append the current contents + of the log file to the file with name composed of the original + name and the suffix. + It will create a new file if such a file does not already exist. + When no <option>suffix</option> is specified, the empty log file is + simply created at the original location; no rotation takes place. + </para> + <para> + When an error occurs while appending the contents of the old log + file, the operation behaves in the same way as if no <option>suffix</option> was + specified. + </para> + <para> + This command might be helpful when you are e.g. writing your + own logrotate script and you do not want to restart the RabbitMQ + node. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl rotate_logs .1</screen> + <para role="example"> + This command instructs the RabbitMQ node to append the current content + of the log files to the files with names consisting of the original logs' + names and ".1" suffix, e.g. rabbit.log.1. Finally, the old log files are reopened. + </para> + </listitem> + </varlistentry> + </variablelist> + </refsect2> + + <refsect2> + <title>Cluster management</title> + + <variablelist> + <varlistentry> + <term><cmdsynopsis><command>cluster</command> <arg choice="req"><replaceable>clusternode</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>clusternode</term> + <listitem><para>Subset of the nodes of the cluster to which this node should be connected.</para></listitem> + </varlistentry> + </variablelist> + <para> + Instruct the node to become member of a cluster with the + specified nodes. + </para> + <para> + Cluster nodes can be of two types: disk or ram. Disk nodes + replicate data in ram and on disk, thus providing + redundancy in the event of node failure and recovery from + global events such as power failure across all nodes. Ram + nodes replicate data in ram only and are mainly used for + scalability. A cluster must always have at least one disk node. + </para> + <para> + If the current node is to become a disk node it needs to + appear in the cluster node list. Otherwise it becomes a + ram node. If the node list is empty or only contains the + current node then the node becomes a standalone, + i.e. non-clustered, (disk) node. + </para> + <para> + After executing the <command>cluster</command> command, whenever + the RabbitMQ application is started on the current node it + will attempt to connect to the specified nodes, thus + becoming an active node in the cluster comprising those + nodes (and possibly others). + </para> + <para> + The list of nodes does not have to contain all the + cluster's nodes; a subset is sufficient. Also, clustering + generally succeeds as long as at least one of the + specified nodes is active. Hence adjustments to the list + are only necessary if the cluster configuration is to be + altered radically. + </para> + <para> + For this command to succeed the RabbitMQ application must + have been stopped, e.g. with <link linkend="stop_app"><command>stop_app</command></link>. Furthermore, + turning a standalone node into a clustered node requires + the node be <link linkend="reset"><command>reset</command></link> first, + in order to avoid accidental destruction of data with the + <command>cluster</command> command. + </para> + <para> + For more details see the <ulink url="http://www.rabbitmq.com/clustering.html">clustering guide</ulink>. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl cluster rabbit@tanto hare@elena</screen> + <para role="example"> + This command instructs the RabbitMQ node to join the + cluster with nodes <command>rabbit@tanto</command> and + <command>hare@elena</command>. If the node is one of these then + it becomes a disk node, otherwise a ram node. + </para> + </listitem> + </varlistentry> + </variablelist> + </refsect2> + + <refsect2> + <title>Closing individual connections</title> + + <variablelist> + <varlistentry> + <term><cmdsynopsis><command>close_connection</command> <arg choice="req"><replaceable>connectionpid</replaceable></arg> <arg choice="req"><replaceable>explanation</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>connectionpid</term> + <listitem><para>Id of the Erlang process associated with the connection to close.</para></listitem> + </varlistentry> + <varlistentry> + <term>explanation</term> + <listitem><para>Explanation string.</para></listitem> + </varlistentry> + </variablelist> + <para> + Instruct the broker to close the connection associated + with the Erlang process id <option>connectionpid</option> (see also the + <link linkend="list_connections"><command>list_connections</command></link> + command), passing the <option>explanation</option> string to the + connected client as part of the AMQP connection shutdown + protocol. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl close_connection "<rabbit@tanto.4262.0>" "go away"</screen> + <para role="example"> + This command instructs the RabbitMQ broker to close the + connection associated with the Erlang process + id <command><rabbit@tanto.4262.0></command>, passing the + explanation <command>go away</command> to the connected client. + </para> + </listitem> + </varlistentry> + </variablelist> + </refsect2> + + <refsect2> + <title>User management</title> + + <variablelist> + <varlistentry> + <term><cmdsynopsis><command>add_user</command> <arg choice="req"><replaceable>username</replaceable></arg> <arg choice="req"><replaceable>password</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>username</term> + <listitem><para>The name of the user to create.</para></listitem> + </varlistentry> + <varlistentry> + <term>password</term> + <listitem><para>The password the created user will use to log in to the broker.</para></listitem> + </varlistentry> + </variablelist> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl add_user tonyg changeit</screen> + <para role="example"> + This command instructs the RabbitMQ broker to create a + user named <command>tonyg</command> with (initial) password + <command>changeit</command>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>delete_user</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>username</term> + <listitem><para>The name of the user to delete.</para></listitem> + </varlistentry> + </variablelist> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl delete_user tonyg</screen> + <para role="example"> + This command instructs the RabbitMQ broker to delete the + user named <command>tonyg</command>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>change_password</command> <arg choice="req"><replaceable>username</replaceable></arg> <arg choice="req"><replaceable>newpassword</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>username</term> + <listitem><para>The name of the user whose password is to be changed.</para></listitem> + </varlistentry> + <varlistentry> + <term>newpassword</term> + <listitem><para>The new password for the user.</para></listitem> + </varlistentry> + </variablelist> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl change_password tonyg newpass</screen> + <para role="example"> + This command instructs the RabbitMQ broker to change the + password for the user named <command>tonyg</command> to + <command>newpass</command>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>list_users</command></cmdsynopsis></term> + <listitem> + <para>Lists users</para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl list_users</screen> + <para role="example"> + This command instructs the RabbitMQ broker to list all users. + </para> + </listitem> + </varlistentry> + </variablelist> + </refsect2> + + <refsect2> + <title>Access control</title> + + <variablelist> + <varlistentry> + <term><cmdsynopsis><command>add_vhost</command> <arg choice="req"><replaceable>vhostpath</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>vhostpath</term> + <listitem><para>The name of the virtual host entry to create.</para></listitem> + </varlistentry> + </variablelist> + <para> + Creates a virtual host. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl add_vhost test</screen> + <para role="example"> + This command instructs the RabbitMQ broker to create a new + virtual host called <command>test</command>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>delete_vhost</command> <arg choice="req"><replaceable>vhostpath</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>vhostpath</term> + <listitem><para>The name of the virtual host entry to delete.</para></listitem> + </varlistentry> + </variablelist> + <para> + Deletes a virtual host. + </para> + <para> + Deleting a virtual host deletes all its exchanges, + queues, user mappings and associated permissions. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl delete_vhost test</screen> + <para role="example"> + This command instructs the RabbitMQ broker to delete the + virtual host called <command>test</command>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>list_vhosts</command></cmdsynopsis></term> + <listitem> + <para> + Lists virtual hosts. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl list_vhosts</screen> + <para role="example"> + This command instructs the RabbitMQ broker to list all + virtual hosts. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>set_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>username</replaceable></arg> <arg choice="req"><replaceable>configure</replaceable></arg> <arg choice="req"><replaceable>write</replaceable></arg> <arg choice="req"><replaceable>read</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>vhostpath</term> + <listitem><para>The name of the virtual host to which to grant the user access, defaulting to <command>/</command>.</para></listitem> + </varlistentry> + <varlistentry> + <term>username</term> + <listitem><para>The name of the user to grant access to the specified virtual host.</para></listitem> + </varlistentry> + <varlistentry> + <term>configure</term> + <listitem><para>A regular expression matching resource names for which the user is granted configure permissions.</para></listitem> + </varlistentry> + <varlistentry> + <term>write</term> + <listitem><para>A regular expression matching resource names for which the user is granted write permissions.</para></listitem> + </varlistentry> + <varlistentry> + <term>read</term> + <listitem><para>A regular expression matching resource names for which the user is granted read permissions.</para></listitem> + </varlistentry> + </variablelist> + <para> + Sets user permissions. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl set_permissions -p /myvhost tonyg "^tonyg-.*" ".*" ".*"</screen> + <para role="example"> + This command instructs the RabbitMQ broker to grant the + user named <command>tonyg</command> access to the virtual host + called <command>/myvhost</command>, with configure permissions + on all resources whose names starts with "tonyg-", and + write and read permissions on all resources. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>clear_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>vhostpath</term> + <listitem><para>The name of the virtual host to which to deny the user access, defaulting to <command>/</command>.</para></listitem> + </varlistentry> + <varlistentry> + <term>username</term> + <listitem><para>The name of the user to deny access to the specified virtual host.</para></listitem> + </varlistentry> + </variablelist> + <para> + Sets user permissions. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl clear_permissions -p /myvhost tonyg</screen> + <para role="example"> + This command instructs the RabbitMQ broker to deny the + user named <command>tonyg</command> access to the virtual host + called <command>/myvhost</command>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>list_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>vhostpath</term> + <listitem><para>The name of the virtual host for which to list the users that have been granted access to it, and their permissions. Defaults to <command>/</command>.</para></listitem> + </varlistentry> + </variablelist> + <para> + Lists permissions in a virtual host. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl list_permissions -p /myvhost</screen> + <para role="example"> + This command instructs the RabbitMQ broker to list all the + users which have been granted access to the virtual host + called <command>/myvhost</command>, and the permissions they + have for operations on resources in that virtual host. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>list_user_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>username</term> + <listitem><para>The name of the user for which to list the permissions.</para></listitem> + </varlistentry> + </variablelist> + <para> + Lists user permissions. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl list_user_permissions tonyg</screen> + <para role="example"> + This command instructs the RabbitMQ broker to list all the + virtual hosts to which the user named <command>tonyg</command> + has been granted access, and the permissions the user has + for operations on resources in these virtual hosts. + </para> + </listitem> + </varlistentry> + </variablelist> + </refsect2> + + <refsect2> + <title>Server Status</title> + <para> + The server status queries interrogate the server and return a list of + results with tab-delimited columns. Some queries (<command>list_queues</command>, + <command>list_exchanges</command>, <command>list_bindings</command>, and + <command>list_consumers</command>) accept an + optional <command>vhost</command> parameter. This parameter, if present, must be + specified immediately after the query. + </para> + <para role="usage"> + The list_queues, list_exchanges and list_bindings commands accept an + optional virtual host parameter for which to display results. The + default value is "/". + </para> + + <variablelist> + <varlistentry role="usage-has-option-list"> + <term><cmdsynopsis><command>list_queues</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="opt" role="usage-option-list"><replaceable>queueinfoitem</replaceable> ...</arg></cmdsynopsis></term> + <listitem> + <para> + Returns queue details. Queue details of the <command>/</command> virtual host + are returned if the "-p" flag is absent. The "-p" flag can be used to + override this default. + </para> + <para> + The <command>queueinfoitem</command> parameter is used to indicate which queue + information items to include in the results. The column order in the + results will match the order of the parameters. + <command>queueinfoitem</command> can take any value from the list + that follows: + </para> + <variablelist> + <varlistentry> + <term>name</term> + <listitem><para>The name of the queue with non-ASCII characters URL-escaped.</para></listitem> + </varlistentry> + <varlistentry> + <term>durable</term> + <listitem><para>Whether or not the queue survives server restarts.</para></listitem> + </varlistentry> + <varlistentry> + <term>auto_delete</term> + <listitem><para>Whether the queue will be deleted automatically when no longer used.</para></listitem> + </varlistentry> + <varlistentry> + <term>arguments</term> + <listitem><para>Queue arguments.</para></listitem> + </varlistentry> + <varlistentry> + <term>pid</term> + <listitem><para>Id of the Erlang process associated with the queue.</para></listitem> + </varlistentry> + <varlistentry> + <term>owner_pid</term> + <listitem><para>Id of the Erlang process representing the connection + which is the exclusive owner of the queue. Empty if the + queue is non-exclusive.</para></listitem> + </varlistentry> + <varlistentry> + <term>exclusive_consumer_pid</term> + <listitem><para>Id of the Erlang process representing the channel of the + exclusive consumer subscribed to this queue. Empty if + there is no exclusive consumer.</para></listitem> + </varlistentry> + <varlistentry> + <term>exclusive_consumer_tag</term> + <listitem><para>Consumer tag of the exclusive consumer subscribed to + this queue. Empty if there is no exclusive consumer.</para></listitem> + </varlistentry> + <varlistentry> + <term>messages_ready</term> + <listitem><para>Number of messages ready to be delivered to clients.</para></listitem> + </varlistentry> + <varlistentry> + <term>messages_unacknowledged</term> + <listitem><para>Number of messages delivered to clients but not yet acknowledged.</para></listitem> + </varlistentry> + <varlistentry> + <term>messages_uncommitted</term> + <listitem><para>Number of messages published in as yet uncommitted transactions</para></listitem> + </varlistentry> + <varlistentry> + <term>messages</term> + <listitem><para>Sum of ready, unacknowledged and uncommitted messages + (queue depth).</para></listitem> + </varlistentry> + <varlistentry> + <term>acks_uncommitted</term> + <listitem><para>Number of acknowledgements received in as yet uncommitted + transactions.</para></listitem> + </varlistentry> + <varlistentry> + <term>consumers</term> + <listitem><para>Number of consumers.</para></listitem> + </varlistentry> + <varlistentry> + <term>transactions</term> + <listitem><para>Number of transactions.</para></listitem> + </varlistentry> + <varlistentry> + <term>memory</term> + <listitem><para>Bytes of memory consumed by the Erlang process associated with the + queue, including stack, heap and internal structures.</para></listitem> + </varlistentry> + </variablelist> + <para> + If no <command>queueinfoitem</command>s are specified then queue name and depth are + displayed. + </para> + <para role="example-prefix"> + For example: + </para> + <screen role="example">rabbitmqctl list_queues -p /myvhost messages consumers</screen> + <para role="example"> + This command displays the depth and number of consumers for each + queue of the virtual host named <command>/myvhost</command>. + </para> + </listitem> + </varlistentry> + + <varlistentry role="usage-has-option-list"> + <term><cmdsynopsis><command>list_exchanges</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="opt" role="usage-option-list"><replaceable>exchangeinfoitem</replaceable> ...</arg></cmdsynopsis></term> + <listitem> + <para> + Returns exchange details. Exchange details of the <command>/</command> virtual host + are returned if the "-p" flag is absent. The "-p" flag can be used to + override this default. + </para> + <para> + The <command>exchangeinfoitem</command> parameter is used to indicate which + exchange information items to include in the results. The column order in the + results will match the order of the parameters. + <command>exchangeinfoitem</command> can take any value from the list + that follows: + </para> + <variablelist> + <varlistentry> + <term>name</term> + <listitem><para>The name of the exchange with non-ASCII characters URL-escaped.</para></listitem> + </varlistentry> + <varlistentry> + <term>type</term> + <listitem><para>The exchange type (one of [<command>direct</command>, + <command>topic</command>, <command>headers</command>, + <command>fanout</command>]).</para></listitem> + </varlistentry> + <varlistentry> + <term>durable</term> + <listitem><para>Whether or not the exchange survives server restarts.</para></listitem> + </varlistentry> + <varlistentry> + <term>auto_delete</term> + <listitem><para>Whether the exchange will be deleted automatically when no longer used.</para></listitem> + </varlistentry> + <varlistentry> + <term>arguments</term> + <listitem><para>Exchange arguments.</para></listitem> + </varlistentry> + </variablelist> + <para> + If no <command>exchangeinfoitem</command>s are specified then + exchange name and type are displayed. + </para> + <para role="example-prefix"> + For example: + </para> + <screen role="example">rabbitmqctl list_exchanges -p /myvhost name type</screen> + <para role="example"> + This command displays the name and type for each + exchange of the virtual host named <command>/myvhost</command>. + </para> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry> + <term><cmdsynopsis><command>list_bindings</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg></cmdsynopsis></term> + <listitem> + <para> + By default the bindings for the <command>/</command> virtual + host are returned. The "-p" flag can be used to override + this default. Each result row will contain an exchange + name, queue name, routing key and binding arguments, in + that order. Non-ASCII characters will be URL-encoded. + </para> + <para role="usage"> + The output format for "list_bindings" is a list of rows containing + exchange name, queue name, routing key and arguments, in that order. + </para> + </listitem> + </varlistentry> + + <varlistentry id="list_connections" role="usage-has-option-list"> + <term><cmdsynopsis><command>list_connections</command> <arg choice="opt" role="usage-option-list"><replaceable>connectioninfoitem</replaceable> ...</arg></cmdsynopsis></term> + <listitem> + <para> + Returns TCP/IP connection statistics. + </para> + <para> + The <command>connectioninfoitem</command> parameter is used to indicate + which connection information items to include in the results. The + column order in the results will match the order of the parameters. + <command>connectioninfoitem</command> can take any value from the list + that follows: + </para> + + <variablelist> + <varlistentry> + <term>pid</term> + <listitem><para>Id of the Erlang process associated with the connection.</para></listitem> + </varlistentry> + <varlistentry> + <term>address</term> + <listitem><para>Server IP address.</para></listitem> + </varlistentry> + <varlistentry> + <term>port</term> + <listitem><para>Server port.</para></listitem> + </varlistentry> + <varlistentry> + <term>peer_address</term> + <listitem><para>Peer address.</para></listitem> + </varlistentry> + <varlistentry> + <term>peer_port</term> + <listitem><para>Peer port.</para></listitem> + </varlistentry> + <varlistentry> + <term>state</term> + <listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>, + <command>opening</command>, <command>running</command>, <command>closing</command>, <command>closed</command>]).</para></listitem> + </varlistentry> + <varlistentry> + <term>channels</term> + <listitem><para>Number of channels using the connection.</para></listitem> + </varlistentry> + <varlistentry> + <term>user</term> + <listitem><para>Username associated with the connection.</para></listitem> + </varlistentry> + <varlistentry> + <term>vhost</term> + <listitem><para>Virtual host name with non-ASCII characters URL-escaped.</para></listitem> + </varlistentry> + <varlistentry> + <term>timeout</term> + <listitem><para>Connection timeout.</para></listitem> + </varlistentry> + <varlistentry> + <term>frame_max</term> + <listitem><para>Maximum frame size (bytes).</para></listitem> + </varlistentry> + <varlistentry> + <term>client_properties</term> + <listitem><para>Informational properties transmitted by the client + during connection establishment.</para></listitem> + </varlistentry> + <varlistentry> + <term>recv_oct</term> + <listitem><para>Octets received.</para></listitem> + </varlistentry> + <varlistentry> + <term>recv_cnt</term> + <listitem><para>Packets received.</para></listitem> + </varlistentry> + <varlistentry> + <term>send_oct</term> + <listitem><para>Octets send.</para></listitem> + </varlistentry> + <varlistentry> + <term>send_cnt</term> + <listitem><para>Packets sent.</para></listitem> + </varlistentry> + <varlistentry> + <term>send_pend</term> + <listitem><para>Send queue size.</para></listitem> + </varlistentry> + </variablelist> + <para> + If no <command>connectioninfoitem</command>s are specified then user, peer + address, peer port and connection state are displayed. + </para> + + <para role="example-prefix"> + For example: + </para> + <screen role="example">rabbitmqctl list_connections send_pend server_port</screen> + <para role="example"> + This command displays the send queue size and server port for each + connection. + </para> + </listitem> + </varlistentry> + + <varlistentry role="usage-has-option-list"> + <term><cmdsynopsis><command>list_channels</command> <arg choice="opt" role="usage-option-list"><replaceable>channelinfoitem</replaceable> ...</arg></cmdsynopsis></term> + <listitem> + <para> + Returns information on all current channels, the logical + containers executing most AMQP commands. This includes + channels that are part of ordinary AMQP connections, and + channels created by various plug-ins and other extensions. + </para> + <para> + The <command>channelinfoitem</command> parameter is used to + indicate which channel information items to include in the + results. The column order in the results will match the + order of the parameters. + <command>channelinfoitem</command> can take any value from the list + that follows: + </para> + + <variablelist> + <varlistentry> + <term>pid</term> + <listitem><para>Id of the Erlang process associated with the connection.</para></listitem> + </varlistentry> + <varlistentry> + <term>connection</term> + <listitem><para>Id of the Erlang process associated with the connection + to which the channel belongs.</para></listitem> + </varlistentry> + <varlistentry> + <term>number</term> + <listitem><para>The number of the channel, which uniquely identifies it within + a connection.</para></listitem> + </varlistentry> + <varlistentry> + <term>user</term> + <listitem><para>Username associated with the channel.</para></listitem> + </varlistentry> + <varlistentry> + <term>vhost</term> + <listitem><para>Virtual host in which the channel operates.</para></listitem> + </varlistentry> + <varlistentry> + <term>transactional</term> + <listitem><para>True if the channel is in transactional mode, false otherwise.</para></listitem> + </varlistentry> + <varlistentry> + <term>consumer_count</term> + <listitem><para>Number of logical AMQP consumers retrieving messages via + the channel.</para></listitem> + </varlistentry> + <varlistentry> + <term>messages_unacknowledged</term> + <listitem><para>Number of messages delivered via this channel but not + yet acknowledged.</para></listitem> + </varlistentry> + <varlistentry> + <term>acks_uncommitted</term> + <listitem><para>Number of acknowledgements received in an as yet + uncommitted transaction.</para></listitem> + </varlistentry> + <varlistentry> + <term>prefetch_count</term> + <listitem><para>QoS prefetch count limit in force, 0 if unlimited.</para></listitem> + </varlistentry> + </variablelist> + <para> + If no <command>channelinfoitem</command>s are specified then pid, + user, transactional, consumer_count, and + messages_unacknowledged are assumed. + </para> + + <para role="example-prefix"> + For example: + </para> + <screen role="example">rabbitmqctl list_channels connection messages_unacknowledged</screen> + <para role="example"> + This command displays the connection process and count + of unacknowledged messages for each channel. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>list_consumers</command></cmdsynopsis></term> + <listitem> + <para> + List consumers, i.e. subscriptions to a queue's message + stream. Each line printed shows, separated by tab + characters, the name of the queue subscribed to, the id of + the channel process via which the subscription was created + and is managed, the consumer tag which uniquely identifies + the subscription within a channel, and a boolean + indicating whether acknowledgements are expected for + messages delivered to this consumer. + </para> + <para role="usage"> + The output format for "list_consumers" is a list of rows containing, + in order, the queue name, channel process id, consumer tag, and a + boolean indicating whether acknowledgements are expected from the + consumer. + </para> + </listitem> + </varlistentry> + </variablelist> + </refsect2> + </refsect1> + +</refentry> diff --git a/docs/remove-namespaces.xsl b/docs/remove-namespaces.xsl new file mode 100644 index 0000000000..58a1e826d2 --- /dev/null +++ b/docs/remove-namespaces.xsl @@ -0,0 +1,17 @@ +<?xml version='1.0'?> +<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" + xmlns:doc="http://www.rabbitmq.com/namespaces/ad-hoc/doc" + version='1.0'> + +<xsl:output method="xml" /> + + <!-- Copy every element through with local name only --> + <xsl:template match="*"> + <xsl:element name="{local-name()}"> + <xsl:apply-templates select="@*|node()"/> + </xsl:element> + </xsl:template> + + <!-- Copy every attribute through --> + <xsl:template match="@*"><xsl:copy/></xsl:template> +</xsl:stylesheet> diff --git a/docs/usage.xsl b/docs/usage.xsl new file mode 100644 index 0000000000..72f8880ab1 --- /dev/null +++ b/docs/usage.xsl @@ -0,0 +1,78 @@ +<?xml version='1.0'?> +<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" + xmlns:exsl="http://exslt.org/common" + xmlns:ng="http://docbook.org/docbook-ng" + xmlns:db="http://docbook.org/ns/docbook" + exclude-result-prefixes="exsl" + version='1.0'> + +<xsl:param name="modulename"/> + +<xsl:output method="text" + encoding="UTF-8" + indent="no"/> +<xsl:strip-space elements="*"/> +<xsl:preserve-space elements="cmdsynopsis arg" /> + +<xsl:template match="/"> +<!-- Pull out cmdsynopsis to show the command usage line. -->%% Generated, do not edit! +-module(<xsl:value-of select="$modulename" />). +-export([usage/0]). +usage() -> %QUOTE%Usage: +<xsl:value-of select="refentry/refsynopsisdiv/cmdsynopsis/command"/> +<xsl:text> </xsl:text> +<xsl:for-each select="refentry/refsynopsisdiv/cmdsynopsis/arg"> + <xsl:apply-templates select="." /> + <xsl:text> </xsl:text> +</xsl:for-each> + +<xsl:text> </xsl:text> + +<!-- List options (any variable list in a section called "Options"). --> +<xsl:for-each select=".//*[title='Options']/variablelist"> + <xsl:if test="position() = 1"> Options: </xsl:if> + <xsl:for-each select="varlistentry"> + <xsl:text> </xsl:text> + <xsl:for-each select=".//term"> + <xsl:value-of select="."/> + <xsl:if test="not(position() = last())">, </xsl:if> + </xsl:for-each><xsl:text> </xsl:text> + </xsl:for-each> +</xsl:for-each> + +<!-- Any paragraphs which have been marked as role="usage" (principally for global flags). --> +<xsl:text> </xsl:text> +<xsl:for-each select=".//*[title='Options']//para[@role='usage']"> +<xsl:value-of select="normalize-space(.)"/><xsl:text> </xsl:text> +</xsl:for-each> + +<!-- List commands (any first-level variable list in a section called "Commands"). --> +<xsl:for-each select=".//*[title='Commands']/variablelist | .//*[title='Commands']/refsect2/variablelist"> + <xsl:if test="position() = 1">Commands: </xsl:if> + <xsl:for-each select="varlistentry"> + <xsl:text> </xsl:text> + <xsl:apply-templates select="term"/> + <xsl:text> </xsl:text> + </xsl:for-each> + <xsl:text> </xsl:text> +</xsl:for-each> + +<xsl:apply-templates select=".//*[title='Commands']/refsect2" mode="command-usage" /> +%QUOTE%. +</xsl:template> + +<!-- Option lists in command usage --> +<xsl:template match="varlistentry[@role='usage-has-option-list']" mode="command-usage"><<xsl:value-of select="term/cmdsynopsis/arg[@role='usage-option-list']/replaceable"/>> must be a member of the list [<xsl:for-each select="listitem/variablelist/varlistentry"><xsl:apply-templates select="term"/><xsl:if test="not(position() = last())">, </xsl:if></xsl:for-each>].<xsl:text> </xsl:text></xsl:template> + +<!-- Usage paras in command usage --> +<xsl:template match="para[@role='usage']" mode="command-usage"> +<xsl:value-of select="normalize-space(.)"/><xsl:text> </xsl:text> +</xsl:template> + +<!-- Don't show anything else in command usage --> +<xsl:template match="text()" mode="command-usage"/> + +<xsl:template match="arg[@choice='opt']">[<xsl:apply-templates/>]</xsl:template> +<xsl:template match="replaceable"><<xsl:value-of select="."/>></xsl:template> + +</xsl:stylesheet> diff --git a/generate_deps b/generate_deps index 916006d101..29587b5a5f 100644 --- a/generate_deps +++ b/generate_deps @@ -23,10 +23,11 @@ main([IncludeDir, ErlDir, EbinDir, TargetFile]) -> ok; (Path, Dep, ok) -> Module = filename:basename(Path, ".erl"), - ok = file:write(Hdl, [EbinDir, "/", Module, ".beam:"]), + ok = file:write(Hdl, [EbinDir, "/", Module, ".beam: ", + Path]), ok = sets:fold(fun (E, ok) -> file:write(Hdl, [" ", E]) end, ok, Dep), - file:write(Hdl, [" ", ErlDir, "/", Module, ".erl\n"]) + file:write(Hdl, ["\n"]) end, ok, Deps), ok = file:write(Hdl, [TargetFile, ": ", escript:script_name(), "\n"]), ok = file:sync(Hdl), @@ -35,7 +36,8 @@ main([IncludeDir, ErlDir, EbinDir, TargetFile]) -> detect_deps(IncludeDir, EbinDir, Modules, Headers, Path) -> {ok, Forms} = epp:parse_file(Path, [IncludeDir], [{use_specs, true}]), lists:foldl( - fun ({attribute, _LineNumber, behaviour, Behaviour}, Deps) -> + fun ({attribute, _LineNumber, Attribute, Behaviour}, Deps) + when Attribute =:= behaviour orelse Attribute =:= behavior -> case sets:is_element(Behaviour, Modules) of true -> sets:add_element( [EbinDir, "/", atom_to_list(Behaviour), ".beam"], diff --git a/include/rabbit.hrl b/include/rabbit.hrl index e2980eff70..552aec6726 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -62,7 +62,8 @@ -record(listener, {node, protocol, host, port}). --record(basic_message, {exchange_name, routing_key, content, persistent_key}). +-record(basic_message, {exchange_name, routing_key, content, guid, + is_persistent}). -record(ssl_socket, {tcp, ssl}). -record(delivery, {mandatory, immediate, txn, sender, message}). @@ -83,6 +84,7 @@ -type(info_key() :: atom()). -type(info() :: {info_key(), any()}). -type(regexp() :: binary()). +-type(file_path() :: string()). %% this is really an abstract type, but dialyzer does not support them -type(guid() :: any()). @@ -144,7 +146,8 @@ #basic_message{exchange_name :: exchange_name(), routing_key :: routing_key(), content :: content(), - persistent_key :: maybe(pkey())}). + guid :: guid(), + is_persistent :: boolean()}). -type(message() :: basic_message()). -type(delivery() :: #delivery{mandatory :: boolean(), @@ -154,7 +157,7 @@ message :: message()}). %% this really should be an abstract type -type(msg_id() :: non_neg_integer()). --type(msg() :: {queue_name(), pid(), msg_id(), boolean(), message()}). +-type(qmsg() :: {queue_name(), pid(), msg_id(), boolean(), message()}). -type(listener() :: #listener{node :: erlang_node(), protocol :: atom(), @@ -166,6 +169,7 @@ #amqp_error{name :: atom(), explanation :: string(), method :: atom()}). + -endif. %%---------------------------------------------------------------------------- @@ -173,6 +177,11 @@ -define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."). -define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/"). +-define(MAX_WAIT, 16#ffffffff). + +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + -ifdef(debug). -define(LOGDEBUG0(F), rabbit_log:debug(F)). -define(LOGDEBUG(F,A), rabbit_log:debug(F,A)). diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl new file mode 100644 index 0000000000..9864f1eb64 --- /dev/null +++ b/include/rabbit_exchange_type_spec.hrl @@ -0,0 +1,42 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% +-ifdef(use_specs). + +-spec(description/0 :: () -> [{atom(), any()}]). +-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}). +-spec(validate/1 :: (exchange()) -> 'ok'). +-spec(create/1 :: (exchange()) -> 'ok'). +-spec(recover/2 :: (exchange(), list(binding())) -> 'ok'). +-spec(delete/2 :: (exchange(), list(binding())) -> 'ok'). +-spec(add_binding/2 :: (exchange(), binding()) -> 'ok'). +-spec(remove_bindings/2 :: (exchange(), list(binding())) -> 'ok'). + +-endif. diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl index 199a0f89c8..1a9798998c 100644 --- a/include/rabbit_framing_spec.hrl +++ b/include/rabbit_framing_spec.hrl @@ -56,5 +56,5 @@ -type(password() :: binary()). -type(vhost() :: binary()). -type(ctag() :: binary()). --type(exchange_type() :: 'direct' | 'topic' | 'fanout'). +-type(exchange_type() :: atom()). -type(binding_key() :: binary()). diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index bc5b58cad8..74a1800adb 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -39,11 +39,7 @@ prepare: cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate server: prepare - rpmbuild -ba --nodeps SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) \ - --target i386 - rpmbuild -ba --nodeps SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) \ - --define '_libdir /usr/lib64' --define '_arch x86_64' \ - --define '_defaultdocdir /usr/share/doc' --target x86_64 + rpmbuild -ba --nodeps SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) clean: rm -rf SOURCES SPECS RPMS SRPMS BUILD tmp diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 54d0c8f38b..c318a96c22 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -11,6 +11,7 @@ Source2: rabbitmq-script-wrapper Source3: rabbitmq-server.logrotate Source4: rabbitmq-asroot-script-wrapper URL: http://www.rabbitmq.com/ +BuildArch: noarch BuildRequires: erlang, python-simplejson Requires: erlang, logrotate BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root @@ -23,8 +24,9 @@ RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and scalable implementation of an AMQP broker. -%define _rabbit_erllibdir %{_libdir}/rabbitmq/lib/rabbitmq_server-%{version} -%define _rabbit_libdir %{_libdir}/rabbitmq +# We want to install into /usr/lib, even on 64-bit platforms +%define _rabbit_libdir %{_exec_prefix}/lib/rabbitmq +%define _rabbit_erllibdir %{_rabbit_libdir}/lib/rabbitmq_server-%{version} %define _rabbit_wrapper %{_builddir}/`basename %{S:2}` %define _rabbit_asroot_wrapper %{_builddir}/`basename %{S:4}` @@ -35,9 +37,7 @@ scalable implementation of an AMQP broker. %build cp %{S:2} %{_rabbit_wrapper} -sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_wrapper} cp %{S:4} %{_rabbit_asroot_wrapper} -sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_asroot_wrapper} make %{?_smp_mflags} %install @@ -65,12 +65,12 @@ mkdir -p %{buildroot}%{_sysconfdir}/rabbitmq rm %{_maindir}/LICENSE %{_maindir}/LICENSE-MPL-RabbitMQ %{_maindir}/INSTALL #Build the list of files -rm -f %{_builddir}/filelist.%{name}.rpm -echo '%defattr(-,root,root, -)' >> %{_builddir}/filelist.%{name}.rpm +rm -f %{_builddir}/%{name}.files +echo '%defattr(-,root,root, -)' >> %{_builddir}/%{name}.files (cd %{buildroot}; \ find . -type f ! -regex '\.%{_sysconfdir}.*' \ ! -regex '\.\(%{_rabbit_erllibdir}\|%{_rabbit_libdir}\).*' \ - | sed -e 's/^\.//' >> %{_builddir}/filelist.%{name}.rpm) + | sed -e 's/^\.//' >> %{_builddir}/%{name}.files) %pre @@ -103,7 +103,7 @@ if [ $1 = 0 ]; then # Leave rabbitmq user and group fi -%files -f ../filelist.%{name}.rpm +%files -f ../%{name}.files %defattr(-,root,root,-) %attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq %attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/log/rabbitmq diff --git a/packaging/macports/Makefile b/packaging/macports/Makefile index d5633955b9..0ef7dd5e73 100644 --- a/packaging/macports/Makefile +++ b/packaging/macports/Makefile @@ -35,7 +35,7 @@ macports: dirs $(DEST)/Portfile for f in rabbitmq-asroot-script-wrapper rabbitmq-script-wrapper ; do \ cp $(COMMON_DIR)/$$f $(DEST)/files ; \ done - sed -i -e 's|@SU_RABBITMQ_SH_C@|sudo -E -u rabbitmq -H /bin/sh -c|' \ + sed -i -e 's|@SU_RABBITMQ_SH_C@|SHELL=/bin/sh su -m rabbitmq -c|' \ $(DEST)/files/rabbitmq-script-wrapper cp patch-org.macports.rabbitmq-server.plist.diff $(DEST)/files if [ -n "$(MACPORTS_USERHOST)" ] ; then \ diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile index c9e818ac8b..50ce16370b 100644 --- a/packaging/windows/Makefile +++ b/packaging/windows/Makefile @@ -21,10 +21,13 @@ dist: rm -rf $(SOURCE_DIR)/docs mv $(SOURCE_DIR) $(TARGET_DIR) - pod2text --loose rabbitmq-service.pod $(TARGET_DIR)/readme-service.txt + mkdir -p $(TARGET_DIR) + xmlto -o . xhtml-nochunks ../../docs/rabbitmq-service.xml + elinks -dump -no-references -no-numbering rabbitmq-service.html \ + > $(TARGET_DIR)/readme-service.txt todos $(TARGET_DIR)/readme-service.txt zip -r $(TARGET_ZIP).zip $(TARGET_DIR) - rm -rf $(TARGET_DIR) + rm -rf $(TARGET_DIR) rabbitmq-service.html clean: clean_partial rm -f rabbitmq-server-windows-*.zip diff --git a/packaging/windows/rabbitmq-service.pod b/packaging/windows/rabbitmq-service.pod deleted file mode 100644 index 8a2d2e5b22..0000000000 --- a/packaging/windows/rabbitmq-service.pod +++ /dev/null @@ -1,133 +0,0 @@ -=head1 NAME - -rabbitmq-service - manage RabbitMQ AMQP service - -=head1 SYNOPSIS - -rabbitmq-service.bat command - -=head1 DESCRIPTION - -RabbitMQ is an implementation of AMQP, the emerging standard for high -performance enterprise messaging. The RabbitMQ server is a robust and -scalable implementation of an AMQP broker. - -Running B<rabbitmq-service> allows the RabbitMQ broker to be run as a -service on NT/2000/2003/XP/Vista® environments. The RabbitMQ broker -service can be started and stopped using the Windows® services -applet. - -By default the service will run in the authentication context of the -local system account. It is therefore necessary to synchronise Erlang -cookies between the local system account (typically -C<C:\WINDOWS\.erlang.cookie> and the account that will be used to -run B<rabbitmqctl>. - -=head1 COMMANDS - -=head2 help - -Display usage information. - -=head2 install - -Install the service. The service will not be started. -Subsequent invocations will update the service parameters if -relevant environment variables were modified. - -=head2 remove - -Remove the service. If the service is running then it will -automatically be stopped before being removed. No files will be -deleted as a consequence and B<rabbitmq-server> will remain operable. - -=head2 start - -Start the service. The service must have been correctly installed -beforehand. - -=head2 stop - -Stop the service. The service must be running for this command to -have any effect. - -=head2 disable - -Disable the service. This is the equivalent of setting the startup -type to B<Disabled> using the service control panel. - -=head2 enable - -Enable the service. This is the equivalent of setting the startup -type to B<Automatic> using the service control panel. - -=head1 ENVIRONMENT - -=head2 RABBITMQ_SERVICENAME - -Defaults to RabbitMQ. -This is the location of log and database directories. - -=head2 RABBITMQ_BASE - -Defaults to the application data directory of the current user. -This is the location of log and database directories. - -=head2 RABBITMQ_NODENAME - -Defaults to "rabbit". This can be useful if you want to run more than -one node per machine - B<RABBITMQ_NODENAME> should be unique per -erlang-node-and-machine combination. See clustering on a single -machine guide at -L<http://www.rabbitmq.com/clustering.html#single-machine> for details. - -=head2 RABBITMQ_NODE_IP_ADDRESS - -Defaults to "0.0.0.0". This can be changed if you only want to bind -to one network interface. - -=head2 RABBITMQ_NODE_PORT - -Defaults to 5672. - -=head2 ERLANG_SERVICE_MANAGER_PATH - -Defaults to F<C:\Program Files\erl5.5.5\erts-5.5.5\bin> -(or F<C:\Program Files (x86)\erl5.5.5\erts-5.5.5\bin> for 64-bit -environments). This is the installation location of the Erlang service -manager. - -=head2 CLUSTER_CONFIG_FILE - -If this file is present it is used by the server to -auto-configure a RabbitMQ cluster. See the clustering guide -at L<http://www.rabbitmq.com/clustering.html> for details. - -=head2 RABBITMQ_CONSOLE_LOG - -Set this varable to B<new> or B<reuse> to have the console -output from the server redirected to a file named B<SERVICENAME>.debug -in the application data directory of the user that installed the service. -Under Vista this will be F<C:\Documents and Settings\User\AppData\username\SERVICENAME>. -Under previous versions of Windows this will be -F<C:\Documents and Settings\username\Application Data\SERVICENAME>. -If B<RABBITMQ_CONSOLE_LOG> is set to B<new> then a new file will be -created each time the service starts. If B<RABBITMQ_CONSOLE_LOG> is -set to B<reuse> then the file will be overwritten each time the -service starts. The default behaviour when B<RABBITMQ_CONSOLE_LOG> is -not set or set to a value other than B<new> or B<reuse> is to discard -the server output. - -=head1 EXAMPLES - -Start a previously-installed RabbitMQ AMQP service: - - rabbitmq-service start - -=head1 AUTHOR - -The RabbitMQ Team <info@rabbitmq.com> - -=head1 REFERENCES - -RabbitMQ Web Site: http://www.rabbitmq.com diff --git a/src/pg_local.erl b/src/pg_local.erl index fa41fe46b3..1501331d6b 100644 --- a/src/pg_local.erl +++ b/src/pg_local.erl @@ -206,7 +206,7 @@ ensure_started() -> case whereis(?MODULE) of undefined -> C = {pg_local, {?MODULE, start_link, []}, permanent, - 1000, worker, [?MODULE]}, + 16#ffffffff, worker, [?MODULE]}, supervisor:start_child(kernel_safe_sup, C); PgLocalPid -> {ok, PgLocalPid} diff --git a/src/rabbit.erl b/src/rabbit.erl index 35d3ce4a5a..259ac0401a 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -51,20 +51,39 @@ -rabbit_boot_step({database, [{mfa, {rabbit_mnesia, init, []}}, - {enables, kernel_ready}]}). + {enables, external_infrastructure}]}). + +-rabbit_boot_step({worker_pool, + [{description, "worker pool"}, + {mfa, {rabbit_sup, start_child, [worker_pool_sup]}}, + {enables, external_infrastructure}]}). + +-rabbit_boot_step({external_infrastructure, + [{description, "external infrastructure ready"}]}). + +-rabbit_boot_step({rabbit_exchange_type_registry, + [{description, "exchange type registry"}, + {mfa, {rabbit_sup, start_child, + [rabbit_exchange_type_registry]}}, + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_log, [{description, "logging server"}, - {mfa, {rabbit_sup, start_child, [rabbit_log]}}, - {enables, kernel_ready}]}). + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_log]}}, + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_hooks, [{description, "internal event notification system"}, {mfa, {rabbit_hooks, start, []}}, - {enables, kernel_ready}]}). + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({kernel_ready, - [{description, "kernel ready"}]}). + [{description, "kernel ready"}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_alarm, [{description, "alarm handler"}, @@ -72,23 +91,18 @@ {requires, kernel_ready}, {enables, core_initialized}]}). --rabbit_boot_step({rabbit_amqqueue_sup, - [{description, "queue supervisor"}, - {mfa, {rabbit_amqqueue, start, []}}, - {requires, kernel_ready}, - {enables, core_initialized}]}). - -rabbit_boot_step({rabbit_router, [{description, "cluster router"}, - {mfa, {rabbit_sup, start_child, [rabbit_router]}}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_router]}}, {requires, kernel_ready}, {enables, core_initialized}]}). -rabbit_boot_step({rabbit_node_monitor, [{description, "node monitor"}, - {mfa, {rabbit_sup, start_child, [rabbit_node_monitor]}}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_node_monitor]}}, {requires, kernel_ready}, - {requires, rabbit_amqqueue_sup}, {enables, core_initialized}]}). -rabbit_boot_step({core_initialized, @@ -104,18 +118,20 @@ {mfa, {rabbit_exchange, recover, []}}, {requires, empty_db_check}]}). --rabbit_boot_step({queue_recovery, - [{description, "queue recovery"}, - {mfa, {rabbit_amqqueue, recover, []}}, - {requires, exchange_recovery}]}). +-rabbit_boot_step({queue_sup_queue_recovery, + [{description, "queue supervisor and queue recovery"}, + {mfa, {rabbit_amqqueue, start, []}}, + {requires, empty_db_check}]}). -rabbit_boot_step({persister, - [{mfa, {rabbit_sup, start_child, [rabbit_persister]}}, - {requires, queue_recovery}]}). + [{mfa, {rabbit_sup, start_child, + [rabbit_persister]}}, + {requires, queue_sup_queue_recovery}]}). -rabbit_boot_step({guid_generator, [{description, "guid generator"}, - {mfa, {rabbit_sup, start_child, [rabbit_guid]}}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_guid]}}, {requires, persister}, {enables, routing_ready}]}). @@ -187,15 +203,12 @@ stop() -> ok = rabbit_misc:stop_applications(?APPS). stop_and_halt() -> - spawn(fun () -> - SleepTime = 1000, - rabbit_log:info("Stop-and-halt request received; " - "halting in ~p milliseconds~n", - [SleepTime]), - timer:sleep(SleepTime), - init:stop() - end), - case catch stop() of _ -> ok end. + try + stop() + after + init:stop() + end, + ok. status() -> [{running_applications, application:which_applications()}] ++ diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 3b9eeec18a..7e96d9a3a8 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -57,10 +57,9 @@ start() -> ok = alarm_handler:add_alarm_handler(?MODULE, []), {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark), ok = case MemoryWatermark == 0 of - true -> - ok; - false -> - rabbit_sup:start_child(vm_memory_monitor, [MemoryWatermark]) + true -> ok; + false -> rabbit_sup:start_restartable_child(vm_memory_monitor, + [MemoryWatermark]) end, ok. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3f25d72e4f..e13dd9edb5 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,7 @@ -module(rabbit_amqqueue). --export([start/0, recover/0, declare/4, delete/3, purge/1]). +-export([start/0, declare/4, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, @@ -40,7 +40,7 @@ -export([consumers/1, consumers_all/1]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). --export([notify_sent/2, unblock/2]). +-export([notify_sent/2, unblock/2, flush_all/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). @@ -63,7 +63,6 @@ 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). -spec(start/0 :: () -> 'ok'). --spec(recover/0 :: () -> 'ok'). -spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) -> amqqueue()). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). @@ -98,7 +97,7 @@ -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), boolean()) -> - {'ok', non_neg_integer(), msg()} | 'empty'). + {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(basic_consume/8 :: (amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(), boolean(), any()) -> @@ -107,6 +106,7 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). +-spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). @@ -117,45 +117,47 @@ %%---------------------------------------------------------------------------- start() -> + DurableQueues = find_durable_queues(), {ok,_} = supervisor:start_child( rabbit_sup, {rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, transient, infinity, supervisor, [rabbit_amqqueue_sup]}), + _RealDurableQueues = recover_durable_queues(DurableQueues), ok. -recover() -> - ok = recover_durable_queues(), - ok. - -recover_durable_queues() -> +find_durable_queues() -> Node = node(), - lists:foreach( - fun (RecoveredQ) -> + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} + <- mnesia:table(rabbit_durable_queue), + node(Pid) == Node])) + end). + +recover_durable_queues(DurableQueues) -> + lists:foldl( + fun (RecoveredQ, Acc) -> Q = start_queue_process(RecoveredQ), %% We need to catch the case where a client connected to %% another node has deleted the queue (and possibly %% re-created it). case rabbit_misc:execute_mnesia_transaction( - fun () -> case mnesia:match_object( - rabbit_durable_queue, RecoveredQ, read) of - [_] -> ok = store_queue(Q), - true; - [] -> false - end + fun () -> + case mnesia:match_object( + rabbit_durable_queue, RecoveredQ, + read) of + [_] -> ok = store_queue(Q), + true; + [] -> false + end end) of - true -> ok; - false -> exit(Q#amqqueue.pid, shutdown) + true -> [Q | Acc]; + false -> exit(Q#amqqueue.pid, shutdown), + Acc end - end, - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(rabbit_durable_queue), - node(Pid) == Node])) - end)), - ok. + end, [], DurableQueues). declare(QueueName, Durable, AutoDelete, Args) -> Q = start_queue_process(#amqqueue{name = QueueName, @@ -284,7 +286,7 @@ requeue(QPid, MsgIds, ChPid) -> gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server2:pcast(QPid, 8, {ack, Txn, MsgIds, ChPid}). + gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> safe_pmap_ok( @@ -329,39 +331,53 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> infinity). notify_sent(QPid, ChPid) -> - gen_server2:pcast(QPid, 8, {notify_sent, ChPid}). + gen_server2:pcast(QPid, 7, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - gen_server2:pcast(QPid, 8, {unblock, ChPid}). + gen_server2:pcast(QPid, 7, {unblock, ChPid}). + +flush_all(QPids, ChPid) -> + safe_pmap_ok( + fun (_) -> ok end, + fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end, + QPids). internal_delete(QueueName) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_queue, QueueName}) of - [] -> {error, not_found}; - [_] -> - ok = rabbit_exchange:delete_queue_bindings(QueueName), - ok = mnesia:delete({rabbit_queue, QueueName}), - ok = mnesia:delete({rabbit_durable_queue, QueueName}), - ok - end - end). + case + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_queue, QueueName}) of + [] -> {error, not_found}; + [_] -> + ok = mnesia:delete({rabbit_queue, QueueName}), + ok = mnesia:delete({rabbit_durable_queue, QueueName}), + %% we want to execute some things, as + %% decided by rabbit_exchange, after the + %% transaction. + rabbit_exchange:delete_queue_bindings(QueueName) + end + end) of + Err = {error, _} -> Err; + PostHook -> + PostHook(), + ok + end. on_node_down(Node) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:fold( - fun (QueueName, Acc) -> - ok = rabbit_exchange:delete_transient_queue_bindings( - QueueName), - ok = mnesia:delete({rabbit_queue, QueueName}), - Acc - end, - ok, - qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])) - end). + [Hook() || + Hook <- rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([delete_queue(QueueName) || + #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])) + end)], + ok. + +delete_queue(QueueName) -> + Post = rabbit_exchange:delete_transient_queue_bindings(QueueName), + ok = mnesia:delete({rabbit_queue, QueueName}), + Post. pseudo_queue(QueueName, Pid) -> #amqqueue{name = QueueName, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e4791f9524..ba41f55030 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -36,8 +36,6 @@ -behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). -export([start_link/1, info_keys/0]). @@ -376,7 +374,7 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. -persist_message(_Txn, _QName, #basic_message{persistent_key = none}) -> +persist_message(_Txn, _QName, #basic_message{is_persistent = false}) -> ok; persist_message(Txn, QName, Message) -> M = Message#basic_message{ @@ -384,29 +382,28 @@ persist_message(Txn, QName, Message) -> content = rabbit_binary_parser:clear_decoded_content( Message#basic_message.content)}, persist_work(Txn, QName, - [{publish, M, {QName, M#basic_message.persistent_key}}]). + [{publish, M, {QName, M#basic_message.guid}}]). persist_delivery(_QName, _Message, true) -> ok; -persist_delivery(_QName, #basic_message{persistent_key = none}, +persist_delivery(_QName, #basic_message{is_persistent = false}, _IsDelivered) -> ok; -persist_delivery(QName, #basic_message{persistent_key = PKey}, +persist_delivery(QName, #basic_message{guid = Guid}, _IsDelivered) -> - persist_work(none, QName, [{deliver, {QName, PKey}}]). + persist_work(none, QName, [{deliver, {QName, Guid}}]). persist_acks(Txn, QName, Messages) -> persist_work(Txn, QName, - [{ack, {QName, PKey}} || - #basic_message{persistent_key = PKey} <- Messages, - PKey =/= none]). + [{ack, {QName, Guid}} || #basic_message{ + guid = Guid, is_persistent = true} <- Messages]). -persist_auto_ack(_QName, #basic_message{persistent_key = none}) -> +persist_auto_ack(_QName, #basic_message{is_persistent = false}) -> ok; -persist_auto_ack(QName, #basic_message{persistent_key = PKey}) -> +persist_auto_ack(QName, #basic_message{guid = Guid}) -> %% auto-acks are always non-transactional - rabbit_persister:dirty_work([{ack, {QName, PKey}}]). + rabbit_persister:dirty_work([{ack, {QName, Guid}}]). persist_work(_Txn,_QName, []) -> ok; @@ -826,7 +823,11 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> end, NewLimited = Limited andalso LimiterPid =/= undefined, C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} - end)). + end)); + +handle_cast({flush, ChPid}, State) -> + ok = rabbit_channel:flushed(ChPid, self()), + noreply(State). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 9ebb6e72e0..4ab7a2a0b1 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -36,6 +36,7 @@ -export([publish/1, message/4, properties/1, delivery/4]). -export([publish/4, publish/7]). -export([build_content/2, from_content/1]). +-export([is_message_persistent/1]). %%---------------------------------------------------------------------------- @@ -48,7 +49,7 @@ -spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) -> delivery()). -spec(message/4 :: (exchange_name(), routing_key(), properties_input(), - binary()) -> message()). + binary()) -> (message() | {'error', any()})). -spec(properties/1 :: (properties_input()) -> amqp_properties()). -spec(publish/4 :: (exchange_name(), routing_key(), properties_input(), binary()) -> publish_result()). @@ -57,6 +58,8 @@ publish_result()). -spec(build_content/2 :: (amqp_properties(), binary()) -> content()). -spec(from_content/1 :: (content()) -> {amqp_properties(), binary()}). +-spec(is_message_persistent/1 :: + (decoded_content()) -> (boolean() | {'invalid', non_neg_integer()})). -endif. @@ -93,10 +96,17 @@ from_content(Content) -> message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> Properties = properties(RawProperties), - #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKeyBin, - content = build_content(Properties, BodyBin), - persistent_key = none}. + Content = build_content(Properties, BodyBin), + case is_message_persistent(Content) of + {invalid, Other} -> + {error, {invalid_delivery_mode, Other}}; + IsPersistent when is_boolean(IsPersistent) -> + #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKeyBin, + content = Content, + guid = rabbit_guid:guid(), + is_persistent = IsPersistent} + end. properties(P = #'P_basic'{}) -> P; @@ -130,3 +140,12 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties, publish(delivery(Mandatory, Immediate, Txn, message(ExchangeName, RoutingKeyBin, properties(Properties), BodyBin))). + +is_message_persistent(#content{properties = #'P_basic'{ + delivery_mode = Mode}}) -> + case Mode of + 1 -> false; + 2 -> true; + undefined -> false; + Other -> {invalid, Other} + end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 585c59dc9b..9aeb4623f1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1,4 +1,4 @@ -%% The contents of this file are subject to the Mozilla Public Licenses +%% The contents of this file are subject to the Mozilla Public License %% Version 1.1 (the "License"); you may not use this file except in %% compliance with the License. You may obtain a copy of the License at %% http://www.mozilla.org/MPL/ @@ -36,7 +36,7 @@ -behaviour(gen_server2). -export([start_link/5, do/2, do/3, shutdown/1]). --export([send_command/2, deliver/4, conserve_memory/2]). +-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([init/1, terminate/2, code_change/3, @@ -45,11 +45,8 @@ -record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, - username, virtual_host, - most_recently_declared_queue, consumer_mapping}). - --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). + username, virtual_host, most_recently_declared_queue, + consumer_mapping, blocking}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -75,8 +72,9 @@ -spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). --spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok'). +-spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). +-spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (pid()) -> [info()]). @@ -110,7 +108,10 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). conserve_memory(Pid, Conserve) -> - gen_server2:pcast(Pid, 9, {conserve_memory, Conserve}). + gen_server2:pcast(Pid, 8, {conserve_memory, Conserve}). + +flushed(Pid, QPid) -> + gen_server2:cast(Pid, {flushed, QPid}). list() -> pg_local:get_members(rabbit_channels). @@ -152,7 +153,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) -> username = Username, virtual_host = VHost, most_recently_declared_queue = <<>>, - consumer_mapping = dict:new()}, + consumer_mapping = dict:new(), + blocking = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -190,6 +192,9 @@ handle_cast({method, Method, Content}, State) -> {stop, {Reason, erlang:get_stacktrace()}, State} end; +handle_cast({flushed, QPid}, State) -> + {noreply, queue_blocked(QPid, State)}; + handle_cast(terminate, State) -> {stop, normal, State}; @@ -215,7 +220,9 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, {stop, normal, State}; handle_info({'EXIT', _Pid, Reason}, State) -> - {stop, Reason, State}. + {stop, Reason, State}; +handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> + {noreply, queue_blocked(QPid, State)}. handle_pre_hibernate(State) -> ok = clear_permission_cache(), @@ -331,6 +338,20 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) -> check_name(_Kind, NameBin) -> NameBin. +queue_blocked(QPid, State = #ch{blocking = Blocking}) -> + case dict:find(QPid, Blocking) of + error -> State; + {ok, MRef} -> true = erlang:demonitor(MRef), + Blocking1 = dict:erase(QPid, Blocking), + ok = case dict:size(Blocking1) of + 0 -> rabbit_writer:send_command( + State#ch.writer_pid, + #'channel.flow_ok'{active = false}); + _ -> ok + end, + State#ch{blocking = Blocking1} + end. + handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -362,14 +383,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), - PersistentKey = case is_message_persistent(DecodedContent) of - true -> rabbit_guid:guid(); - false -> none - end, + IsPersistent = is_message_persistent(DecodedContent), Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, - persistent_key = PersistentKey}, + guid = rabbit_guid:guid(), + is_persistent = IsPersistent}, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -540,25 +559,17 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> "prefetch_size!=0 (~w)", [Size]); handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, - _, State = #ch{ limiter_pid = LimiterPid, - unacked_message_q = UAMQ }) -> - NewLimiterPid = case {LimiterPid, PrefetchCount} of - {undefined, 0} -> - undefined; - {undefined, _} -> - LPid = rabbit_limiter:start_link(self(), - queue:len(UAMQ)), - ok = limit_queues(LPid, State), - LPid; - {_, 0} -> - ok = rabbit_limiter:shutdown(LimiterPid), - ok = limit_queues(undefined, State), - undefined; - {_, _} -> - LimiterPid - end, - ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount), - {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}}; + _, State = #ch{limiter_pid = LimiterPid}) -> + LimiterPid1 = case {LimiterPid, PrefetchCount} of + {undefined, 0} -> undefined; + {undefined, _} -> start_limiter(State); + {_, _} -> LimiterPid + end, + LimiterPid2 = case rabbit_limiter:limit(LimiterPid1, PrefetchCount) of + ok -> LimiterPid1; + stopped -> unlimit_queues(State) + end, + {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}}; handle_method(#'basic.recover'{requeue = true}, _, State = #ch{ transaction_id = none, @@ -791,9 +802,31 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> handle_method(#'tx.rollback'{}, _, State) -> {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; -handle_method(#'channel.flow'{active = _}, _, State) -> - %% FIXME: implement - {reply, #'channel.flow_ok'{active = true}, State}; +handle_method(#'channel.flow'{active = true}, _, + State = #ch{limiter_pid = LimiterPid}) -> + LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of + ok -> LimiterPid; + stopped -> unlimit_queues(State) + end, + {reply, #'channel.flow_ok'{active = true}, + State#ch{limiter_pid = LimiterPid1}}; + +handle_method(#'channel.flow'{active = false}, _, + State = #ch{limiter_pid = LimiterPid, + consumer_mapping = Consumers}) -> + LimiterPid1 = case LimiterPid of + undefined -> start_limiter(State); + Other -> Other + end, + ok = rabbit_limiter:block(LimiterPid1), + QPids = consumer_queues(Consumers), + Queues = [{QPid, erlang:monitor(process, QPid)} || QPid <- QPids], + ok = rabbit_amqqueue:flush_all(QPids, self()), + case Queues of + [] -> {reply, #'channel.flow_ok'{active = false}, State}; + _ -> {noreply, State#ch{limiter_pid = LimiterPid1, + blocking = dict:from_list(Queues)}} + end; handle_method(#'channel.flow_ok'{active = _}, _, State) -> %% TODO: We may want to correlate this to channel.flow messages we @@ -928,21 +961,27 @@ fold_per_queue(F, Acc0, UAQ) -> D = rabbit_misc:queue_fold( fun ({_DTag, _CTag, {_QName, QPid, MsgId, _Redelivered, _Message}}, D) -> - %% dict:append would be simpler and avoid the - %% lists:reverse in handle_message({recover, true}, - %% ...). However, it is significantly slower when - %% going beyond a few thousand elements. - dict:update(QPid, - fun (MsgIds) -> [MsgId | MsgIds] end, - [MsgId], - D) + %% dict:append would avoid the lists:reverse in + %% handle_message({recover, true}, ...). However, it + %% is significantly slower when going beyond a few + %% thousand elements. + rabbit_misc:dict_cons(QPid, MsgId, D) end, dict:new(), UAQ), dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). +start_limiter(State = #ch{unacked_message_q = UAMQ}) -> + LPid = rabbit_limiter:start_link(self(), queue:len(UAMQ)), + ok = limit_queues(LPid, State), + LPid. + notify_queues(#ch{consumer_mapping = Consumers}) -> rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()). +unlimit_queues(State) -> + ok = limit_queues(undefined, State), + undefined. + limit_queues(LPid, #ch{consumer_mapping = Consumers}) -> rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid). @@ -972,16 +1011,15 @@ notify_limiter(LimiterPid, Acked) -> Count -> rabbit_limiter:ack(LimiterPid, Count) end. -is_message_persistent(#content{properties = #'P_basic'{ - delivery_mode = Mode}}) -> - case Mode of - 1 -> false; - 2 -> true; - undefined -> false; - Other -> rabbit_log:warning("Unknown delivery mode ~p - " - "treating as 1, non-persistent~n", - [Other]), - false +is_message_persistent(Content) -> + case rabbit_basic:is_message_persistent(Content) of + {invalid, Other} -> + rabbit_log:warning("Unknown delivery mode ~p - " + "treating as 1, non-persistent~n", + [Other]), + false; + IsPersistent when is_boolean(IsPersistent) -> + IsPersistent end. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 6aac442888..d1834b3b73 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -46,6 +46,7 @@ -spec(stop/0 :: () -> 'ok'). -spec(action/4 :: (atom(), erlang_node(), [string()], fun ((string(), [any()]) -> 'ok')) -> 'ok'). +-spec(usage/0 :: () -> no_return()). -endif. @@ -130,86 +131,7 @@ stop() -> ok. usage() -> - io:format("Usage: rabbitmqctl [-q] [-n <node>] <command> [<arg> ...] - -Available commands: - - stop - stops the RabbitMQ application and halts the node - stop_app - stops the RabbitMQ application, leaving the node running - start_app - starts the RabbitMQ application on an already-running node - reset - resets node to default configuration, deleting all data - force_reset - cluster <ClusterNode> ... - status - rotate_logs [Suffix] - close_connection <ConnectionPid> <ExplanationString> - - add_user <UserName> <Password> - delete_user <UserName> - change_password <UserName> <NewPassword> - list_users - - add_vhost <VHostPath> - delete_vhost <VHostPath> - list_vhosts - - set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> <Regexp> - clear_permissions [-p <VHostPath>] <UserName> - list_permissions [-p <VHostPath>] - list_user_permissions <UserName> - - list_queues [-p <VHostPath>] [<QueueInfoItem> ...] - list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...] - list_bindings [-p <VHostPath>] - list_connections [<ConnectionInfoItem> ...] - list_channels [<ChannelInfoItem> ...] - list_consumers [-p <VHostPath>] - -Quiet output mode is selected with the \"-q\" flag. Informational -messages are suppressed when quiet mode is in effect. - -<node> should be the name of the master node of the RabbitMQ -cluster. It defaults to the node named \"rabbit\" on the local -host. On a host named \"server.example.com\", the master node will -usually be rabbit@server (unless RABBITMQ_NODENAME has been set to -some non-default value at broker startup time). The output of hostname --s is usually the correct suffix to use after the \"@\" sign. - -The list_queues, list_exchanges and list_bindings commands accept an -optional virtual host parameter for which to display results. The -default value is \"/\". - -<QueueInfoItem> must be a member of the list [name, durable, -auto_delete, arguments, pid, owner_pid, exclusive_consumer_pid, -exclusive_consumer_tag, messages_ready, messages_unacknowledged, -messages_uncommitted, messages, acks_uncommitted, consumers, -transactions, memory]. The default is to display name and (number of) -messages. - -<ExchangeInfoItem> must be a member of the list [name, type, durable, -auto_delete, arguments]. The default is to display name and type. - -The output format for \"list_bindings\" is a list of rows containing -exchange name, queue name, routing key and arguments, in that order. - -<ConnectionInfoItem> must be a member of the list [pid, address, port, -peer_address, peer_port, state, channels, user, vhost, timeout, -frame_max, client_properties, recv_oct, recv_cnt, send_oct, send_cnt, -send_pend]. The default is to display user, peer_address, peer_port -and state. - -<ChannelInfoItem> must be a member of the list [pid, connection, -number, user, vhost, transactional, consumer_count, -messages_unacknowledged, acks_uncommitted, prefetch_count]. The -default is to display pid, user, transactional, consumer_count, -messages_unacknowledged. - -The output format for \"list_consumers\" is a list of rows containing, -in order, the queue name, channel process id, consumer tag, and a -boolean indicating whether acknowledgements are expected from the -consumer. - -"), + io:format("~s", [rabbit_ctl_usage:usage()]), halt(1). action(stop, Node, [], Inform) -> diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl index 078cf620f4..f19e8d025a 100644 --- a/src/rabbit_dialyzer.erl +++ b/src/rabbit_dialyzer.erl @@ -38,9 +38,9 @@ -ifdef(use_specs). --spec(create_basic_plt/1 :: (string()) -> 'ok'). --spec(add_to_plt/2 :: (string(), string()) -> 'ok'). --spec(dialyze_files/2 :: (string(), string()) -> 'ok'). +-spec(create_basic_plt/1 :: (file_path()) -> 'ok'). +-spec(add_to_plt/2 :: (file_path(), string()) -> 'ok'). +-spec(dialyze_files/2 :: (file_path(), string()) -> 'ok'). -spec(halt_with_code/1 :: (atom()) -> no_return()). -endif. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 832acd16c4..1cfba00eb2 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -30,7 +30,6 @@ %% -module(rabbit_exchange). --include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). @@ -40,7 +39,7 @@ -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). --export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]). +-export([check_type/1, assert_type/2]). %% EXTENDED API -export([list_exchange_bindings/1]). @@ -49,7 +48,6 @@ -import(mnesia). -import(sets). -import(lists). --import(qlc). -import(regexp). %%---------------------------------------------------------------------------- @@ -82,10 +80,8 @@ bind_res() | {'error', 'binding_not_found'}). -spec(list_bindings/1 :: (vhost()) -> [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). --spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok'). --spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok'). --spec(topic_matches/2 :: (binary(), binary()) -> boolean()). --spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()). +-spec(delete_queue_bindings/1 :: (queue_name()) -> fun(() -> none())). +-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> fun(() -> none())). -spec(delete/2 :: (exchange_name(), boolean()) -> 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> @@ -100,17 +96,37 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, arguments]. recover() -> - ok = rabbit_misc:table_foreach( - fun(Exchange) -> ok = mnesia:write(rabbit_exchange, - Exchange, write) - end, rabbit_durable_exchange), - ok = rabbit_misc:table_foreach( - fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route), - ok = mnesia:write(rabbit_route, - Route, write), - ok = mnesia:write(rabbit_reverse_route, - ReverseRoute, write) - end, rabbit_durable_route). + Exs = rabbit_misc:table_fold( + fun(Exchange, Acc) -> + ok = mnesia:write(rabbit_exchange, Exchange, write), + [Exchange | Acc] + end, [], rabbit_durable_exchange), + Bs = rabbit_misc:table_fold( + fun(Route = #route{binding = B}, Acc) -> + {_, ReverseRoute} = route_with_reverse(Route), + ok = mnesia:write(rabbit_route, + Route, write), + ok = mnesia:write(rabbit_reverse_route, + ReverseRoute, write), + [B | Acc] + end, [], rabbit_durable_route), + recover_with_bindings(Bs, Exs), + ok. + +recover_with_bindings(Bs, Exs) -> + recover_with_bindings( + lists:keysort(#binding.exchange_name, Bs), + lists:keysort(#exchange.name, Exs), []). + +recover_with_bindings([B = #binding{exchange_name = Name} | Rest], + Xs = [#exchange{name = Name} | _], + Bindings) -> + recover_with_bindings(Rest, Xs, [B | Bindings]); +recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> + (type_to_module(Type)):recover(X, Bindings), + recover_with_bindings(Bs, Xs, []); +recover_with_bindings([], [], []) -> + ok. declare(ExchangeName, Type, Durable, AutoDelete, Args) -> Exchange = #exchange{name = ExchangeName, @@ -118,31 +134,53 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> durable = Durable, auto_delete = AutoDelete, arguments = Args}, - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_exchange, ExchangeName}) of - [] -> ok = mnesia:write(rabbit_exchange, Exchange, write), - if Durable -> - ok = mnesia:write(rabbit_durable_exchange, - Exchange, write); - true -> ok - end, - Exchange; - [ExistingX] -> ExistingX - end - end). + %% We want to upset things if it isn't ok; this is different from + %% the other hooks invocations, where we tend to ignore the return + %% value. + TypeModule = type_to_module(Type), + ok = TypeModule:validate(Exchange), + case rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_exchange, ExchangeName}) of + [] -> + ok = mnesia:write(rabbit_exchange, Exchange, write), + ok = case Durable of + true -> + mnesia:write(rabbit_durable_exchange, + Exchange, write); + false -> + ok + end, + {new, Exchange}; + [ExistingX] -> + {existing, ExistingX} + end + end) of + {new, X} -> TypeModule:create(X), + X; + {existing, X} -> X; + Err -> Err + end. -check_type(<<"fanout">>) -> - fanout; -check_type(<<"direct">>) -> - direct; -check_type(<<"topic">>) -> - topic; -check_type(<<"headers">>) -> - headers; -check_type(T) -> - rabbit_misc:protocol_error( - command_invalid, "invalid exchange type '~s'", [T]). +%% Used with atoms from records; e.g., the type is expected to exist. +type_to_module(T) -> + case rabbit_exchange_type_registry:lookup_module(T) of + {ok, Module} -> Module; + {error, not_found} -> rabbit_misc:protocol_error( + command_invalid, + "invalid exchange type '~s'", [T]) + end. + +%% Used with binaries sent over the wire; the type may not exist. +check_type(TypeBin) -> + case rabbit_exchange_type_registry:binary_to_type(TypeBin) of + {error, not_found} -> + rabbit_misc:protocol_error( + command_invalid, "unknown exchange type '~s'", [TypeBin]); + T -> + _Module = type_to_module(T), + T + end. assert_type(#exchange{ type = ActualType }, RequiredType) when ActualType == RequiredType -> @@ -157,7 +195,7 @@ lookup(Name) -> lookup_or_die(Name) -> case lookup(Name) of - {ok, X} -> X; + {ok, X} -> X; {error, not_found} -> rabbit_misc:not_found(Name) end. @@ -193,9 +231,8 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). publish(X, Delivery) -> publish(X, [], Delivery). -publish(X, Seen, Delivery = #delivery{ - message = #basic_message{routing_key = RK, content = C}}) -> - case rabbit_router:deliver(route(X, RK, C), Delivery) of +publish(X = #exchange{type = Type}, Seen, Delivery) -> + case (type_to_module(Type)):publish(X, Delivery) of {_, []} = R -> #exchange{name = XName, arguments = Args} = X, case rabbit_misc:r_arg(XName, exchange, Args, @@ -205,95 +242,24 @@ publish(X, Seen, Delivery = #delivery{ AName -> NewSeen = [XName | Seen], case lists:member(AName, NewSeen) of - true -> - R; - false -> - case lookup(AName) of - {ok, AX} -> - publish(AX, NewSeen, Delivery); - {error, not_found} -> - rabbit_log:warning( - "alternate exchange for ~s " - "does not exist: ~s", - [rabbit_misc:rs(XName), - rabbit_misc:rs(AName)]), - R - end + true -> R; + false -> case lookup(AName) of + {ok, AX} -> + publish(AX, NewSeen, Delivery); + {error, not_found} -> + rabbit_log:warning( + "alternate exchange for ~s " + "does not exist: ~s", + [rabbit_misc:rs(XName), + rabbit_misc:rs(AName)]), + R + end end end; R -> R end. -%% return the list of qpids to which a message with a given routing -%% key, sent to a particular exchange, should be delivered. -%% -%% The function ensures that a qpid appears in the return list exactly -%% as many times as a message should be delivered to it. With the -%% current exchange types that is at most once. -route(X = #exchange{type = topic}, RoutingKey, _Content) -> - match_bindings(X, fun (#binding{key = BindingKey}) -> - topic_matches(BindingKey, RoutingKey) - end); - -route(X = #exchange{type = headers}, _RoutingKey, Content) -> - Headers = case (Content#content.properties)#'P_basic'.headers of - undefined -> []; - H -> sort_arguments(H) - end, - match_bindings(X, fun (#binding{args = Spec}) -> - headers_match(Spec, Headers) - end); - -route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> - match_routing_key(X, '_'); - -route(X = #exchange{type = direct}, RoutingKey, _Content) -> - match_routing_key(X, RoutingKey). - -sort_arguments(Arguments) -> - lists:keysort(1, Arguments). - -%% TODO: Maybe this should be handled by a cursor instead. -%% TODO: This causes a full scan for each entry with the same exchange -match_bindings(#exchange{name = Name}, Match) -> - Query = qlc:q([QName || #route{binding = Binding = #binding{ - exchange_name = ExchangeName, - queue_name = QName}} <- - mnesia:table(rabbit_route), - ExchangeName == Name, - Match(Binding)]), - lookup_qpids( - try - mnesia:async_dirty(fun qlc:e/1, [Query]) - catch exit:{aborted, {badarg, _}} -> - %% work around OTP-7025, which was fixed in R12B-1, by - %% falling back on a less efficient method - [QName || #route{binding = Binding = #binding{ - queue_name = QName}} <- - mnesia:dirty_match_object( - rabbit_route, - #route{binding = #binding{exchange_name = Name, - _ = '_'}}), - Match(Binding)] - end). - -match_routing_key(#exchange{name = Name}, RoutingKey) -> - MatchHead = #route{binding = #binding{exchange_name = Name, - queue_name = '$1', - key = RoutingKey, - _ = '_'}}, - lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). - -lookup_qpids(Queues) -> - sets:fold( - fun(Key, Acc) -> - case mnesia:dirty_read({rabbit_queue, Key}) of - [#amqqueue{pid = QPid}] -> [QPid | Acc]; - [] -> Acc - end - end, [], sets:from_list(Queues)). - %% TODO: Should all of the route and binding management not be %% refactored to its own module, especially seeing as unbind will have %% to be implemented for 0.91 ? @@ -302,13 +268,13 @@ delete_exchange_bindings(ExchangeName) -> [begin ok = mnesia:delete_object(rabbit_reverse_route, reverse_route(Route), write), - ok = delete_forward_routes(Route) + ok = delete_forward_routes(Route), + Route#route.binding end || Route <- mnesia:match_object( rabbit_route, #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, - write)], - ok. + write)]. delete_queue_bindings(QueueName) -> delete_queue_bindings(QueueName, fun delete_forward_routes/1). @@ -317,21 +283,55 @@ delete_transient_queue_bindings(QueueName) -> delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1). delete_queue_bindings(QueueName, FwdDeleteFun) -> - Exchanges = exchanges_for_queue(QueueName), - [begin - ok = FwdDeleteFun(reverse_route(Route)), - ok = mnesia:delete_object(rabbit_reverse_route, Route, write) - end || Route <- mnesia:match_object( - rabbit_reverse_route, - reverse_route( - #route{binding = #binding{queue_name = QueueName, - _ = '_'}}), - write)], - [begin - [X] = mnesia:read({rabbit_exchange, ExchangeName}), - ok = maybe_auto_delete(X) - end || ExchangeName <- Exchanges], - ok. + DeletedBindings = + [begin + Route = reverse_route(ReverseRoute), + ok = FwdDeleteFun(Route), + ok = mnesia:delete_object(rabbit_reverse_route, + ReverseRoute, write), + Route#route.binding + end || ReverseRoute + <- mnesia:match_object( + rabbit_reverse_route, + reverse_route(#route{binding = #binding{ + queue_name = QueueName, + _ = '_'}}), + write)], + Cleanup = cleanup_deleted_queue_bindings( + lists:keysort(#binding.exchange_name, DeletedBindings), []), + fun () -> + lists:foreach( + fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) -> + Module = type_to_module(Type), + case IsDeleted of + auto_deleted -> Module:delete(X, Bs); + no_delete -> Module:remove_bindings(X, Bs) + end + end, Cleanup) + end. + +%% Requires that its input binding list is sorted in exchange-name +%% order, so that the grouping of bindings (for passing to +%% cleanup_deleted_queue_bindings1) works properly. +cleanup_deleted_queue_bindings([], Acc) -> + Acc; +cleanup_deleted_queue_bindings( + [B = #binding{exchange_name = ExchangeName} | Bs], Acc) -> + cleanup_deleted_queue_bindings(ExchangeName, Bs, [B], Acc). + +cleanup_deleted_queue_bindings( + ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs], + Bindings, Acc) -> + cleanup_deleted_queue_bindings(ExchangeName, Bs, [B | Bindings], Acc); +cleanup_deleted_queue_bindings(ExchangeName, Deleted, Bindings, Acc) -> + %% either Deleted is [], or its head has a non-matching ExchangeName + NewAcc = [cleanup_deleted_queue_bindings1(ExchangeName, Bindings) | Acc], + cleanup_deleted_queue_bindings(Deleted, NewAcc). + +cleanup_deleted_queue_bindings1(ExchangeName, Bindings) -> + [X] = mnesia:read({rabbit_exchange, ExchangeName}), + {maybe_auto_delete(X), Bindings}. + delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), @@ -340,15 +340,6 @@ delete_forward_routes(Route) -> delete_transient_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write). -exchanges_for_queue(QueueName) -> - MatchHead = reverse_route( - #route{binding = #binding{exchange_name = '$1', - queue_name = QueueName, - _ = '_'}}), - sets:to_list( - sets:from_list( - mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))). - contains(Table, MatchHead) -> try continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)) @@ -385,37 +376,61 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) -> end). add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> - binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, - fun (X, Q, B) -> - if Q#amqqueue.durable and not(X#exchange.durable) -> - {error, durability_settings_incompatible}; - true -> ok = sync_binding(B, Q#amqqueue.durable, - fun mnesia:write/3) - end - end). + case binding_action( + ExchangeName, QueueName, RoutingKey, Arguments, + fun (X, Q, B) -> + if Q#amqqueue.durable and not(X#exchange.durable) -> + {error, durability_settings_incompatible}; + true -> + case mnesia:read(rabbit_route, B) of + [] -> + sync_binding(B, Q#amqqueue.durable, + fun mnesia:write/3), + {new, X, B}; + [_R] -> + {existing, X, B} + end + end + end) of + {new, Exchange = #exchange{ type = Type }, Binding} -> + (type_to_module(Type)):add_binding(Exchange, Binding); + {existing, _, _} -> + ok; + Err = {error, _} -> + Err + end. delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> - binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, - fun (X, Q, B) -> - case mnesia:match_object(rabbit_route, #route{binding = B}, - write) of - [] -> {error, binding_not_found}; - _ -> ok = sync_binding(B, Q#amqqueue.durable, - fun mnesia:delete_object/3), - maybe_auto_delete(X) - end - end). + case binding_action( + ExchangeName, QueueName, RoutingKey, Arguments, + fun (X, Q, B) -> + case mnesia:match_object(rabbit_route, #route{binding = B}, + write) of + [] -> {error, binding_not_found}; + _ -> ok = sync_binding(B, Q#amqqueue.durable, + fun mnesia:delete_object/3), + {maybe_auto_delete(X), B} + end + end) of + Err = {error, _} -> + Err; + {{Action, X = #exchange{ type = Type }}, B} -> + Module = type_to_module(Type), + case Action of + auto_delete -> Module:delete(X, [B]); + no_delete -> Module:remove_bindings(X, [B]) + end + end. binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) -> call_with_exchange_and_queue( ExchangeName, QueueName, fun (X, Q) -> - Fun(X, Q, #binding{exchange_name = ExchangeName, - queue_name = QueueName, - key = RoutingKey, - args = sort_arguments(Arguments)}) + Fun(X, Q, #binding{ + exchange_name = ExchangeName, + queue_name = QueueName, + key = RoutingKey, + args = rabbit_misc:sort_field_table(Arguments)}) end). sync_binding(Binding, Durable, Fun) -> @@ -440,8 +455,8 @@ list_bindings(VHostPath) -> rabbit_route, #route{binding = #binding{ exchange_name = rabbit_misc:r(VHostPath, exchange), - _ = '_'}, - _ = '_'})]. + _ = '_'}, + _ = '_'})]. route_with_reverse(#route{binding = Binding}) -> route_with_reverse(Binding); @@ -456,136 +471,60 @@ reverse_route(#reverse_route{reverse_binding = Binding}) -> #route{binding = reverse_binding(Binding)}. reverse_binding(#reverse_binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}) -> + queue_name = Queue, + key = Key, + args = Args}) -> #binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}; + queue_name = Queue, + key = Key, + args = Args}; reverse_binding(#binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}) -> + queue_name = Queue, + key = Key, + args = Args}) -> #reverse_binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}. - -default_headers_match_kind() -> all. - -parse_x_match(<<"all">>) -> all; -parse_x_match(<<"any">>) -> any; -parse_x_match(Other) -> - rabbit_log:warning("Invalid x-match field value ~p; expected all or any", - [Other]), - default_headers_match_kind(). - -%% Horrendous matching algorithm. Depends for its merge-like -%% (linear-time) behaviour on the lists:keysort (sort_arguments) that -%% route/3 and {add,delete}_binding/4 do. -%% -%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. -%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -%% -headers_match(Pattern, Data) -> - MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of - {value, {_, longstr, MK}} -> parse_x_match(MK); - {value, {_, Type, MK}} -> - rabbit_log:warning("Invalid x-match field type ~p " - "(value ~p); expected longstr", - [Type, MK]), - default_headers_match_kind(); - _ -> default_headers_match_kind() - end, - headers_match(Pattern, Data, true, false, MatchKind). - -headers_match([], _Data, AllMatch, _AnyMatch, all) -> - AllMatch; -headers_match([], _Data, _AllMatch, AnyMatch, any) -> - AnyMatch; -headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data, - AllMatch, AnyMatch, MatchKind) -> - headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind); -headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> - headers_match([], [], false, AnyMatch, MatchKind); -headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], - AllMatch, AnyMatch, MatchKind) when PK > DK -> - headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind); -headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _], - _AllMatch, AnyMatch, MatchKind) when PK < DK -> - headers_match(PRest, Data, false, AnyMatch, MatchKind); -headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], - AllMatch, AnyMatch, MatchKind) when PK == DK -> - {AllMatch1, AnyMatch1} = - if - %% It's not properly specified, but a "no value" in a - %% pattern field is supposed to mean simple presence of - %% the corresponding data field. I've interpreted that to - %% mean a type of "void" for the pattern field. - PT == void -> {AllMatch, true}; - %% Similarly, it's not specified, but I assume that a - %% mismatched type causes a mismatched value. - PT =/= DT -> {false, AnyMatch}; - PV == DV -> {AllMatch, true}; - true -> {false, AnyMatch} - end, - headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). - -split_topic_key(Key) -> - {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), - KeySplit. - -topic_matches(PatternKey, RoutingKey) -> - P = split_topic_key(PatternKey), - R = split_topic_key(RoutingKey), - topic_matches1(P, R). - -topic_matches1(["#"], _R) -> - true; -topic_matches1(["#" | PTail], R) -> - last_topic_match(PTail, [], lists:reverse(R)); -topic_matches1([], []) -> - true; -topic_matches1(["*" | PatRest], [_ | ValRest]) -> - topic_matches1(PatRest, ValRest); -topic_matches1([PatElement | PatRest], [ValElement | ValRest]) when PatElement == ValElement -> - topic_matches1(PatRest, ValRest); -topic_matches1(_, _) -> - false. - -last_topic_match(P, R, []) -> - topic_matches1(P, R); -last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> - topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList). - -delete(ExchangeName, _IfUnused = true) -> - call_with_exchange(ExchangeName, fun conditional_delete/1); -delete(ExchangeName, _IfUnused = false) -> - call_with_exchange(ExchangeName, fun unconditional_delete/1). - -maybe_auto_delete(#exchange{auto_delete = false}) -> - ok; + queue_name = Queue, + key = Key, + args = Args}. + +delete(ExchangeName, IfUnused) -> + Fun = case IfUnused of + true -> fun conditional_delete/1; + false -> fun unconditional_delete/1 + end, + case call_with_exchange(ExchangeName, Fun) of + {deleted, X = #exchange{type = Type}, Bs} -> + (type_to_module(Type)):delete(X, Bs), + ok; + Error = {error, _InUseOrNotFound} -> + Error + end. + +maybe_auto_delete(Exchange = #exchange{auto_delete = false}) -> + {no_delete, Exchange}; maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> - conditional_delete(Exchange), - ok. + case conditional_delete(Exchange) of + {error, in_use} -> {no_delete, Exchange}; + {deleted, Exchange, []} -> {auto_deleted, Exchange} + end. conditional_delete(Exchange = #exchange{name = ExchangeName}) -> Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, %% we need to check for durable routes here too in case a bunch of %% routes to durable queues have been removed temporarily as a %% result of a node failure - case contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match) of + case contains(rabbit_route, Match) orelse + contains(rabbit_durable_route, Match) of false -> unconditional_delete(Exchange); true -> {error, in_use} end. -unconditional_delete(#exchange{name = ExchangeName}) -> - ok = delete_exchange_bindings(ExchangeName), +unconditional_delete(Exchange = #exchange{name = ExchangeName}) -> + Bindings = delete_exchange_bindings(ExchangeName), ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), - ok = mnesia:delete({rabbit_exchange, ExchangeName}). + ok = mnesia:delete({rabbit_exchange, ExchangeName}), + {deleted, Exchange, Bindings}. %%---------------------------------------------------------------------------- %% EXTENDED API diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl new file mode 100644 index 0000000000..a8c071e681 --- /dev/null +++ b/src/rabbit_exchange_type.erl @@ -0,0 +1,61 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type). + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [ + {description, 0}, + {publish, 2}, + + %% called BEFORE declaration, to check args etc; may exit with #amqp_error{} + {validate, 1}, + + %% called after declaration when previously absent + {create, 1}, + + %% called when recovering + {recover, 2}, + + %% called after exchange deletion. + {delete, 2}, + + %% called after a binding has been added + {add_binding, 2}, + + %% called after bindings have been deleted. + {remove_bindings, 2} + + ]; +behaviour_info(_Other) -> + undefined. diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl new file mode 100644 index 0000000000..9b71e0e1d1 --- /dev/null +++ b/src/rabbit_exchange_type_direct.erl @@ -0,0 +1,63 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type_direct). +-include("rabbit.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, publish/2]). +-export([validate/1, create/1, recover/2, delete/2, + add_binding/2, remove_bindings/2]). +-include("rabbit_exchange_type_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "exchange type direct"}, + {mfa, {rabbit_exchange_type_registry, register, + [<<"direct">>, ?MODULE]}}, + {requires, rabbit_exchange_type_registry}, + {enables, kernel_ready}]}). + +description() -> + [{name, <<"direct">>}, + {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. + +publish(#exchange{name = Name}, Delivery = + #delivery{message = #basic_message{routing_key = RoutingKey}}) -> + rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), + Delivery). + +validate(_X) -> ok. +create(_X) -> ok. +recover(_X, _Bs) -> ok. +delete(_X, _Bs) -> ok. +add_binding(_X, _B) -> ok. +remove_bindings(_X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl new file mode 100644 index 0000000000..311654ab21 --- /dev/null +++ b/src/rabbit_exchange_type_fanout.erl @@ -0,0 +1,61 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type_fanout). +-include("rabbit.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, publish/2]). +-export([validate/1, create/1, recover/2, delete/2, + add_binding/2, remove_bindings/2]). +-include("rabbit_exchange_type_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "exchange type fanout"}, + {mfa, {rabbit_exchange_type_registry, register, + [<<"fanout">>, ?MODULE]}}, + {requires, rabbit_exchange_type_registry}, + {enables, kernel_ready}]}). + +description() -> + [{name, <<"fanout">>}, + {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. + +publish(#exchange{name = Name}, Delivery) -> + rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery). + +validate(_X) -> ok. +create(_X) -> ok. +recover(_X, _Bs) -> ok. +delete(_X, _Bs) -> ok. +add_binding(_X, _B) -> ok. +remove_bindings(_X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl new file mode 100644 index 0000000000..285dab1a03 --- /dev/null +++ b/src/rabbit_exchange_type_headers.erl @@ -0,0 +1,137 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type_headers). +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, publish/2]). +-export([validate/1, create/1, recover/2, delete/2, + add_binding/2, remove_bindings/2]). +-include("rabbit_exchange_type_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "exchange type headers"}, + {mfa, {rabbit_exchange_type_registry, register, + [<<"headers">>, ?MODULE]}}, + {requires, rabbit_exchange_type_registry}, + {enables, kernel_ready}]}). + +-ifdef(use_specs). +-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()). +-endif. + +description() -> + [{name, <<"headers">>}, + {description, <<"AMQP headers exchange, as per the AMQP specification">>}]. + +publish(#exchange{name = Name}, + Delivery = #delivery{message = #basic_message{content = Content}}) -> + Headers = case (Content#content.properties)#'P_basic'.headers of + undefined -> []; + H -> rabbit_misc:sort_field_table(H) + end, + rabbit_router:deliver(rabbit_router:match_bindings( + Name, fun (#binding{args = Spec}) -> + headers_match(Spec, Headers) + end), + Delivery). + +default_headers_match_kind() -> all. + +parse_x_match(<<"all">>) -> all; +parse_x_match(<<"any">>) -> any; +parse_x_match(Other) -> + rabbit_log:warning("Invalid x-match field value ~p; expected all or any", + [Other]), + default_headers_match_kind(). + +%% Horrendous matching algorithm. Depends for its merge-like +%% (linear-time) behaviour on the lists:keysort +%% (rabbit_misc:sort_field_table) that route/3 and +%% rabbit_exchange:{add,delete}_binding/4 do. +%% +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% +headers_match(Pattern, Data) -> + MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of + {value, {_, longstr, MK}} -> parse_x_match(MK); + {value, {_, Type, MK}} -> + rabbit_log:warning("Invalid x-match field type ~p " + "(value ~p); expected longstr", + [Type, MK]), + default_headers_match_kind(); + _ -> default_headers_match_kind() + end, + headers_match(Pattern, Data, true, false, MatchKind). + +headers_match([], _Data, AllMatch, _AnyMatch, all) -> + AllMatch; +headers_match([], _Data, _AllMatch, AnyMatch, any) -> + AnyMatch; +headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data, + AllMatch, AnyMatch, MatchKind) -> + headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind); +headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> + headers_match([], [], false, AnyMatch, MatchKind); +headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK > DK -> + headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind); +headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _], + _AllMatch, AnyMatch, MatchKind) when PK < DK -> + headers_match(PRest, Data, false, AnyMatch, MatchKind); +headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK == DK -> + {AllMatch1, AnyMatch1} = + if + %% It's not properly specified, but a "no value" in a + %% pattern field is supposed to mean simple presence of + %% the corresponding data field. I've interpreted that to + %% mean a type of "void" for the pattern field. + PT == void -> {AllMatch, true}; + %% Similarly, it's not specified, but I assume that a + %% mismatched type causes a mismatched value. + PT =/= DT -> {false, AnyMatch}; + PV == DV -> {AllMatch, true}; + true -> {false, AnyMatch} + end, + headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). + +validate(_X) -> ok. +create(_X) -> ok. +recover(_X, _Bs) -> ok. +delete(_X, _Bs) -> ok. +add_binding(_X, _B) -> ok. +remove_bindings(_X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_exchange_type_registry.erl new file mode 100644 index 0000000000..175d15ad83 --- /dev/null +++ b/src/rabbit_exchange_type_registry.erl @@ -0,0 +1,129 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type_registry). + +-behaviour(gen_server). + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([register/2, binary_to_type/1, lookup_module/1]). + +-define(SERVER, ?MODULE). +-define(ETS_NAME, ?MODULE). + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> 'ignore' | {'error', term()} | {'ok', pid()}). +-spec(register/2 :: (binary(), atom()) -> 'ok'). +-spec(binary_to_type/1 :: (binary()) -> atom() | {'error', 'not_found'}). +-spec(lookup_module/1 :: (atom()) -> {'ok', atom()} | {'error', 'not_found'}). + +-endif. + +%%--------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%--------------------------------------------------------------------------- + +register(TypeName, ModuleName) -> + gen_server:call(?SERVER, {register, TypeName, ModuleName}). + +%% This is used with user-supplied arguments (e.g., on exchange +%% declare), so we restrict it to existing atoms only. This means it +%% can throw a badarg, indicating that the type cannot have been +%% registered. +binary_to_type(TypeBin) when is_binary(TypeBin) -> + case catch list_to_existing_atom(binary_to_list(TypeBin)) of + {'EXIT', {badarg, _}} -> {error, not_found}; + TypeAtom -> TypeAtom + end. + +lookup_module(T) when is_atom(T) -> + case ets:lookup(?ETS_NAME, T) of + [{_, Module}] -> + {ok, Module}; + [] -> + {error, not_found} + end. + +%%--------------------------------------------------------------------------- + +internal_binary_to_type(TypeBin) when is_binary(TypeBin) -> + list_to_atom(binary_to_list(TypeBin)). + +internal_register(TypeName, ModuleName) + when is_binary(TypeName), is_atom(ModuleName) -> + ok = sanity_check_module(ModuleName), + true = ets:insert(?ETS_NAME, + {internal_binary_to_type(TypeName), ModuleName}), + ok. + +sanity_check_module(Module) -> + case catch lists:member(rabbit_exchange_type, + lists:flatten( + [Bs || {Attr, Bs} <- + Module:module_info(attributes), + Attr =:= behavior orelse + Attr =:= behaviour])) of + {'EXIT', {undef, _}} -> {error, not_module}; + false -> {error, not_exchange_type}; + true -> ok + end. + +%%--------------------------------------------------------------------------- + +init([]) -> + ?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]), + {ok, none}. + +handle_call({register, TypeName, ModuleName}, _From, State) -> + ok = internal_register(TypeName, ModuleName), + {reply, ok, State}; +handle_call(Request, _From, State) -> + {stop, {unhandled_call, Request}, State}. + +handle_cast(Request, State) -> + {stop, {unhandled_cast, Request}, State}. + +handle_info(Message, State) -> + {stop, {unhandled_info, Message}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl new file mode 100644 index 0000000000..8a3dceeaeb --- /dev/null +++ b/src/rabbit_exchange_type_topic.erl @@ -0,0 +1,101 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type_topic). +-include("rabbit.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, publish/2]). +-export([validate/1, create/1, recover/2, delete/2, + add_binding/2, remove_bindings/2]). +-include("rabbit_exchange_type_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "exchange type topic"}, + {mfa, {rabbit_exchange_type_registry, register, + [<<"topic">>, ?MODULE]}}, + {requires, rabbit_exchange_type_registry}, + {enables, kernel_ready}]}). + +-export([topic_matches/2]). + +-ifdef(use_specs). +-spec(topic_matches/2 :: (binary(), binary()) -> boolean()). +-endif. + +description() -> + [{name, <<"topic">>}, + {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. + +publish(#exchange{name = Name}, Delivery = + #delivery{message = #basic_message{routing_key = RoutingKey}}) -> + rabbit_router:deliver(rabbit_router:match_bindings( + Name, fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RoutingKey) + end), + Delivery). + +split_topic_key(Key) -> + {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), + KeySplit. + +topic_matches(PatternKey, RoutingKey) -> + P = split_topic_key(PatternKey), + R = split_topic_key(RoutingKey), + topic_matches1(P, R). + +topic_matches1(["#"], _R) -> + true; +topic_matches1(["#" | PTail], R) -> + last_topic_match(PTail, [], lists:reverse(R)); +topic_matches1([], []) -> + true; +topic_matches1(["*" | PatRest], [_ | ValRest]) -> + topic_matches1(PatRest, ValRest); +topic_matches1([PatElement | PatRest], [ValElement | ValRest]) + when PatElement == ValElement -> + topic_matches1(PatRest, ValRest); +topic_matches1(_, _) -> + false. + +last_topic_match(P, R, []) -> + topic_matches1(P, R); +last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> + topic_matches1(P, R) or + last_topic_match(P, [BacktrackNext | R], BacktrackList). + +validate(_X) -> ok. +create(_X) -> ok. +recover(_X, _Bs) -> ok. +delete(_X, _Bs) -> ok. +add_binding(_X, _B) -> ok. +remove_bindings(_X, _Bs) -> ok. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index c9f8183fc9..878af02976 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -37,7 +37,7 @@ handle_info/2]). -export([start_link/2, shutdown/1]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). --export([get_limit/1]). +-export([get_limit/1, block/1, unblock/1]). %%---------------------------------------------------------------------------- @@ -47,12 +47,14 @@ -spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). --spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). +-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). -spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()). +-spec(block/1 :: (maybe_pid()) -> 'ok'). +-spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped'). -endif. @@ -60,6 +62,7 @@ -record(lim, {prefetch_count = 0, ch_pid, + blocked = false, queues = dict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). %% 'Notify' is a boolean that indicates whether a queue should be @@ -77,13 +80,14 @@ start_link(ChPid, UnackedMsgCount) -> shutdown(undefined) -> ok; shutdown(LimiterPid) -> - unlink(LimiterPid), + true = unlink(LimiterPid), gen_server2:cast(LimiterPid, shutdown). limit(undefined, 0) -> ok; limit(LimiterPid, PrefetchCount) -> - gen_server2:cast(LimiterPid, {limit, PrefetchCount}). + unlink_on_stopped(LimiterPid, + gen_server2:call(LimiterPid, {limit, PrefetchCount})). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit @@ -113,6 +117,17 @@ get_limit(Pid) -> fun () -> 0 end, fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end). +block(undefined) -> + ok; +block(LimiterPid) -> + gen_server2:call(LimiterPid, block, infinity). + +unblock(undefined) -> + ok; +unblock(LimiterPid) -> + unlink_on_stopped(LimiterPid, + gen_server2:call(LimiterPid, unblock, infinity)). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -120,29 +135,45 @@ get_limit(Pid) -> init([ChPid, UnackedMsgCount]) -> {ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}. +handle_call({can_send, _QPid, _AckRequired}, _From, + State = #lim{blocked = true}) -> + {reply, false, State}; handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> case limit_reached(State) of true -> {reply, false, limit_queue(QPid, State)}; - false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; - true -> Volume - end}} + false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; + true -> Volume + end}} end; handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> - {reply, PrefetchCount, State}. + {reply, PrefetchCount, State}; + +handle_call({limit, PrefetchCount}, _From, State) -> + case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of + {cont, State1} -> {reply, ok, State1}; + {stop, State1} -> {stop, normal, stopped, State1} + end; + +handle_call(block, _From, State) -> + {reply, ok, State#lim{blocked = true}}; + +handle_call(unblock, _From, State) -> + case maybe_notify(State, State#lim{blocked = false}) of + {cont, State1} -> {reply, ok, State1}; + {stop, State1} -> {stop, normal, stopped, State1} + end. handle_cast(shutdown, State) -> {stop, normal, State}; -handle_cast({limit, PrefetchCount}, State) -> - {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})}; - handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; true -> Volume - Count end, - {noreply, maybe_notify(State, State#lim{volume = NewVolume})}; + {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}), + {noreply, State1}; handle_cast({register, QPid}, State) -> {noreply, remember_queue(QPid, State)}; @@ -164,14 +195,21 @@ code_change(_, State, _) -> %%---------------------------------------------------------------------------- maybe_notify(OldState, NewState) -> - case limit_reached(OldState) andalso not(limit_reached(NewState)) of - true -> notify_queues(NewState); - false -> NewState + case (limit_reached(OldState) orelse is_blocked(OldState)) andalso + not (limit_reached(NewState) orelse is_blocked(NewState)) of + true -> NewState1 = notify_queues(NewState), + {case NewState1#lim.prefetch_count of + 0 -> stop; + _ -> cont + end, NewState1}; + false -> {cont, NewState} end. limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. +is_blocked(#lim{blocked = Blocked}) -> Blocked. + remember_queue(QPid, State = #lim{queues = Queues}) -> case dict:is_key(QPid, Queues) of false -> MRef = erlang:monitor(process, QPid), @@ -209,3 +247,9 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> ok end, State#lim{queues = NewQueues}. + +unlink_on_stopped(LimiterPid, stopped) -> + ok = rabbit_misc:unlink_and_capture_exit(LimiterPid), + stopped; +unlink_on_stopped(_LimiterPid, Result) -> + Result. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 0654f58ae7..028b0d73ea 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -49,15 +49,17 @@ -export([ensure_ok/2]). -export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]). -export([intersperse/2, upmap/2, map_in_order/2]). --export([table_foreach/2]). +-export([table_fold/3]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([read_term_file/1, write_term_file/2]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). -export([unfold/2, ceil/1, queue_fold/3]). +-export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). +-export([recursive_delete/1, dict_cons/3, unlink_and_capture_exit/1]). -import(mnesia). -import(lists). @@ -96,8 +98,8 @@ -spec(rs/1 :: (r(atom())) -> string()). -spec(enable_cover/0 :: () -> ok_or_error()). -spec(report_cover/0 :: () -> 'ok'). --spec(enable_cover/1 :: (string()) -> ok_or_error()). --spec(report_cover/1 :: (string()) -> 'ok'). +-spec(enable_cover/1 :: (file_path()) -> ok_or_error()). +-spec(report_cover/1 :: (file_path()) -> 'ok'). -spec(throw_on_error/2 :: (atom(), thunk({error, any()} | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). @@ -114,23 +116,31 @@ -spec(intersperse/2 :: (A, [A]) -> [A]). -spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]). --spec(table_foreach/2 :: (fun ((any()) -> any()), atom()) -> 'ok'). +-spec(table_fold/3 :: (fun ((any(), A) -> A), A, atom()) -> A). -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). --spec(dirty_dump_log/1 :: (string()) -> ok_or_error()). --spec(read_term_file/1 :: (string()) -> {'ok', [any()]} | {'error', any()}). --spec(write_term_file/2 :: (string(), [any()]) -> ok_or_error()). --spec(append_file/2 :: (string(), string()) -> ok_or_error()). +-spec(dirty_dump_log/1 :: (file_path()) -> ok_or_error()). +-spec(read_term_file/1 :: (file_path()) -> {'ok', [any()]} | {'error', any()}). +-spec(write_term_file/2 :: (file_path(), [any()]) -> ok_or_error()). +-spec(append_file/2 :: (file_path(), string()) -> ok_or_error()). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). --spec(ceil/1 :: (number()) -> number()). +-spec(ceil/1 :: (number()) -> integer()). -spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B). +-spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()). -spec(pid_to_string/1 :: (pid()) -> string()). -spec(string_to_pid/1 :: (string()) -> pid()). +-spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt'). +-spec(version_compare/3 :: (string(), string(), + ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) -> boolean()). +-spec(recursive_delete/1 :: ([file_path()]) -> + 'ok' | {'error', {file_path(), any()}}). +-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). +-spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok'). -endif. @@ -305,7 +315,7 @@ execute_mnesia_transaction(TxFun) -> %% Making this a sync_transaction allows us to use dirty_read %% elsewhere and get a consistent result even when that read %% executes on a different node. - case mnesia:sync_transaction(TxFun) of + case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of {atomic, Result} -> Result; {aborted, Reason} -> throw({error, Reason}) end. @@ -357,20 +367,20 @@ map_in_order(F, L) -> lists:reverse( lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)). -%% For each entry in a table, execute a function in a transaction. -%% This is often far more efficient than wrapping a tx around the lot. +%% Fold over each entry in a table, executing the cons function in a +%% transaction. This is often far more efficient than wrapping a tx +%% around the lot. %% %% We ignore entries that have been modified or removed. -table_foreach(F, TableName) -> - lists:foreach( - fun (E) -> execute_mnesia_transaction( +table_fold(F, Acc0, TableName) -> + lists:foldl( + fun (E, Acc) -> execute_mnesia_transaction( fun () -> case mnesia:match_object(TableName, E, read) of - [] -> ok; - _ -> F(E) + [] -> Acc; + _ -> F(E, Acc) end end) - end, dirty_read_all(TableName)), - ok. + end, Acc0, dirty_read_all(TableName)). dirty_read_all(TableName) -> mnesia:dirty_select(TableName, [{'$1',[],['$1']}]). @@ -504,6 +514,10 @@ queue_fold(Fun, Init, Q) -> {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) end. +%% Sorts a list of AMQP table fields as per the AMQP spec +sort_field_table(Arguments) -> + lists:keysort(1, Arguments). + %% This provides a string representation of a pid that is the same %% regardless of what node we are running on. The representation also %% permits easy identification of the pid's node. @@ -595,3 +609,46 @@ version_compare(A, B) -> ANum < BNum -> lt; ANum > BNum -> gt end. + +recursive_delete(Files) -> + lists:foldl(fun (Path, ok ) -> recursive_delete1(Path); + (_Path, {error, _Err} = Error) -> Error + end, ok, Files). + +recursive_delete1(Path) -> + case filelib:is_dir(Path) of + false -> case file:delete(Path) of + ok -> ok; + {error, enoent} -> ok; %% Path doesn't exist anyway + {error, Err} -> {error, {Path, Err}} + end; + true -> case file:list_dir(Path) of + {ok, FileNames} -> + case lists:foldl( + fun (FileName, ok) -> + recursive_delete1( + filename:join(Path, FileName)); + (_FileName, Error) -> + Error + end, ok, FileNames) of + ok -> + case file:del_dir(Path) of + ok -> ok; + {error, Err} -> {error, {Path, Err}} + end; + {error, _Err} = Error -> + Error + end; + {error, Err} -> + {error, {Path, Err}} + end + end. + +dict_cons(Key, Value, Dict) -> + dict:update(Key, fun (List) -> [Value | List] end, [Value], Dict). + +unlink_and_capture_exit(Pid) -> + unlink(Pid), + receive {'EXIT', Pid, _} -> ok + after 0 -> ok + end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 6ec3cf74b3..55a6761d2d 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -48,7 +48,7 @@ -ifdef(use_specs). -spec(status/0 :: () -> [{'nodes' | 'running_nodes', [erlang_node()]}]). --spec(dir/0 :: () -> string()). +-spec(dir/0 :: () -> file_path()). -spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). -spec(is_db_empty/0 :: () -> boolean()). @@ -424,9 +424,8 @@ reset(Force) -> cannot_delete_schema) end, ok = delete_cluster_nodes_config(), - %% remove persistet messages and any other garbage we find - lists:foreach(fun file:delete/1, - filelib:wildcard(dir() ++ "/*")), + %% remove persisted messages and any other garbage we find + ok = rabbit_misc:recursive_delete(filelib:wildcard(dir() ++ "/*")), ok. leave_cluster([], _) -> ok; diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 8c898498e6..336f74bf9a 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -42,6 +42,7 @@ -spec(start/0 :: () -> no_return()). -spec(stop/0 :: () -> 'ok'). +-spec(usage/0 :: () -> no_return()). -endif. @@ -51,7 +52,7 @@ start() -> RpcTimeout = case init:get_argument(maxwait) of {ok,[[N1]]} -> 1000 * list_to_integer(N1); - _ -> 30000 + _ -> ?MAX_WAIT end, case init:get_plain_arguments() of [] -> @@ -86,16 +87,8 @@ stop() -> ok. usage() -> - io:format("Usage: rabbitmq-multi <command> - -Available commands: - - start_all <NodeCount> - start a local cluster of RabbitMQ nodes. - status - print status of all running nodes - stop_all - stops all local RabbitMQ nodes. - rotate_logs [Suffix] - rotate logs for all local and running RabbitMQ nodes. -"), - halt(3). + io:format("~s", [rabbit_multi_usage:usage()]), + halt(1). action(start_all, [NodeCount], RpcTimeout) -> io:format("Starting all nodes...~n", []), diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index cf04f05b7e..7978573d90 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -117,15 +117,25 @@ start() -> transient, infinity, supervisor, [tcp_client_sup]}), ok. +getaddr(Host) -> + %% inet_parse:address takes care of ip string, like "0.0.0.0" + %% inet:getaddr returns immediately for ip tuple {0,0,0,0}, + %% and runs 'inet_gethost' port process for dns lookups. + %% On Windows inet:getaddr runs dns resolver for ip string, which may fail. + case inet_parse:address(Host) of + {ok, IPAddress1} -> IPAddress1; + {error, _} -> + case inet:getaddr(Host, inet) of + {ok, IPAddress2} -> IPAddress2; + {error, Reason} -> + error_logger:error_msg("invalid host ~p - ~p~n", + [Host, Reason]), + throw({error, {invalid_host, Host, Reason}}) + end + end. + check_tcp_listener_address(NamePrefix, Host, Port) -> - IPAddress = - case inet:getaddr(Host, inet) of - {ok, IPAddress1} -> IPAddress1; - {error, Reason} -> - error_logger:error_msg("invalid host ~p - ~p~n", - [Host, Reason]), - throw({error, {invalid_host, Host, Reason}}) - end, + IPAddress = getaddr(Host), if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok; true -> error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]), @@ -157,7 +167,7 @@ start_listener(Host, Port, Label, OnConnect) -> ok. stop_tcp_listener(Host, Port) -> - {ok, IPAddress} = inet:getaddr(Host, inet), + IPAddress = getaddr(Host), Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port), ok = supervisor:terminate_child(rabbit_sup, Name), ok = supervisor:delete_child(rabbit_sup, Name), diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 019d2a269d..8aa5ad8d32 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -70,11 +70,11 @@ -ifdef(use_specs). --type(qmsg() :: {amqqueue(), pkey()}). +-type(pmsg() :: {queue_name(), pkey()}). -type(work_item() :: - {publish, message(), qmsg()} | - {deliver, qmsg()} | - {ack, qmsg()}). + {publish, message(), pmsg()} | + {deliver, pmsg()} | + {ack, pmsg()}). -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(transaction/1 :: ([work_item()]) -> 'ok'). @@ -406,7 +406,10 @@ check_version(_Other) -> requeue_messages(Snapshot = #psnapshot{messages = Messages, queues = Queues}) -> - Work = ets:foldl(fun accumulate_requeues/2, dict:new(), Queues), + Work = ets:foldl( + fun ({{QName, PKey}, Delivered}, Acc) -> + rabbit_misc:dict_cons(QName, {PKey, Delivered}, Acc) + end, dict:new(), Queues), %% unstable parallel map, because order doesn't matter L = lists:append( rabbit_misc:upmap( @@ -425,13 +428,6 @@ requeue_messages(Snapshot = #psnapshot{messages = Messages, %% contains the mutated messages and queues tables Snapshot. -accumulate_requeues({{QName, PKey}, Delivered}, Acc) -> - Requeue = {PKey, Delivered}, - dict:update(QName, - fun (Requeues) -> [Requeue | Requeues] end, - [Requeue], - Acc). - requeue(QName, Requeues, Messages) -> case rabbit_amqqueue:lookup(QName) of {ok, #amqqueue{pid = QPid}} -> @@ -474,12 +470,8 @@ internal_integrate_messages(Items, Snapshot) -> internal_integrate1({extend_transaction, Key, MessageList}, Snapshot = #psnapshot {transactions = Transactions}) -> - NewTransactions = - dict:update(Key, - fun (MessageLists) -> [MessageList | MessageLists] end, - [MessageList], - Transactions), - Snapshot#psnapshot{transactions = NewTransactions}; + Snapshot#psnapshot{transactions = rabbit_misc:dict_cons(Key, MessageList, + Transactions)}; internal_integrate1({rollback_transaction, Key}, Snapshot = #psnapshot{transactions = Transactions}) -> Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 1a4830e11c..5cf519b795 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -39,6 +39,8 @@ -export([init/1, mainloop/3]). +-export([server_properties/0]). + -export([analyze_frame/2]). -import(gen_tcp). @@ -133,6 +135,7 @@ -spec(info/1 :: (pid()) -> [info()]). -spec(info/2 :: (pid(), [info_key()]) -> [info()]). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). +-spec(server_properties/0 :: () -> amqp_table()). -endif. @@ -198,6 +201,16 @@ teardown_profiling(Value) -> fprof:analyse([{dest, []}, {cols, 100}]) end. +server_properties() -> + {ok, Product} = application:get_key(rabbit, id), + {ok, Version} = application:get_key(rabbit, vsn), + [{list_to_binary(K), longstr, list_to_binary(V)} || + {K, V} <- [{"product", Product}, + {"version", Version}, + {"platform", "Erlang/OTP"}, + {"copyright", ?COPYRIGHT_MESSAGE}, + {"information", ?INFORMATION_MESSAGE}]]. + inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). socket_op(Sock, Fun) -> @@ -510,21 +523,12 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, case check_version({ProtocolMajor, ProtocolMinor}, {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of true -> - {ok, Product} = application:get_key(id), - {ok, Version} = application:get_key(vsn), ok = send_on_channel0( Sock, #'connection.start'{ version_major = ?PROTOCOL_VERSION_MAJOR, version_minor = ?PROTOCOL_VERSION_MINOR, - server_properties = - [{list_to_binary(K), longstr, list_to_binary(V)} || - {K, V} <- - [{"product", Product}, - {"version", Version}, - {"platform", "Erlang/OTP"}, - {"copyright", ?COPYRIGHT_MESSAGE}, - {"information", ?INFORMATION_MESSAGE}]], + server_properties = server_properties(), mechanisms = <<"PLAIN AMQPLAIN">>, locales = <<"en_US">> }), {State#v1{connection = Connection#connection{ diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl new file mode 100644 index 0000000000..06d59249bb --- /dev/null +++ b/src/rabbit_restartable_sup.erl @@ -0,0 +1,47 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_restartable_sup). + +-behaviour(supervisor). + +-export([start_link/2]). + +-export([init/1]). + +-include("rabbit.hrl"). + +start_link(Name, {_M, _F, _A} = Fun) -> + supervisor:start_link({local, Name}, ?MODULE, [Fun]). + +init([{Mod, _F, _A} = Fun]) -> + {ok, {{one_for_one, 10, 10}, + [{Mod, Fun, transient, ?MAX_WAIT, worker, [Mod]}]}}. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index ee2b82c5bd..a449e19eb4 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -30,12 +30,15 @@ %% -module(rabbit_router). +-include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). -behaviour(gen_server2). -export([start_link/0, - deliver/2]). + deliver/2, + match_bindings/2, + match_routing_key/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -73,13 +76,9 @@ deliver(QPids, Delivery) -> %% which then in turn delivers it to its queues. deliver_per_node( dict:to_list( - lists:foldl( - fun (QPid, D) -> - dict:update(node(QPid), - fun (QPids1) -> [QPid | QPids1] end, - [QPid], D) - end, - dict:new(), QPids)), + lists:foldl(fun (QPid, D) -> + rabbit_misc:dict_cons(node(QPid), QPid, D) + end, dict:new(), QPids)), Delivery). deliver_per_node([{Node, QPids}], Delivery) when Node == node() -> @@ -129,6 +128,46 @@ deliver_per_node(NodeQPids, Delivery) -> -endif. +%% TODO: Maybe this should be handled by a cursor instead. +%% TODO: This causes a full scan for each entry with the same exchange +match_bindings(Name, Match) -> + Query = qlc:q([QName || #route{binding = Binding = #binding{ + exchange_name = ExchangeName, + queue_name = QName}} <- + mnesia:table(rabbit_route), + ExchangeName == Name, + Match(Binding)]), + lookup_qpids( + try + mnesia:async_dirty(fun qlc:e/1, [Query]) + catch exit:{aborted, {badarg, _}} -> + %% work around OTP-7025, which was fixed in R12B-1, by + %% falling back on a less efficient method + [QName || #route{binding = Binding = #binding{ + queue_name = QName}} <- + mnesia:dirty_match_object( + rabbit_route, + #route{binding = #binding{exchange_name = Name, + _ = '_'}}), + Match(Binding)] + end). + +match_routing_key(Name, RoutingKey) -> + MatchHead = #route{binding = #binding{exchange_name = Name, + queue_name = '$1', + key = RoutingKey, + _ = '_'}}, + lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). + +lookup_qpids(Queues) -> + sets:fold( + fun(Key, Acc) -> + case mnesia:dirty_read({rabbit_queue, Key}) of + [#amqqueue{pid = QPid}] -> [QPid | Acc]; + [] -> Acc + end + end, [], sets:from_list(Queues)). + %%-------------------------------------------------------------------- init([]) -> diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index a1b8948155..2c5e51125e 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -33,10 +33,13 @@ -behaviour(supervisor). --export([start_link/0, start_child/1, start_child/2]). +-export([start_link/0, start_child/1, start_child/2, start_child/3, + start_restartable_child/1, start_restartable_child/2]). -export([init/1]). +-include("rabbit.hrl"). + -define(SERVER, ?MODULE). start_link() -> @@ -46,10 +49,25 @@ start_child(Mod) -> start_child(Mod, []). start_child(Mod, Args) -> + start_child(Mod, Mod, Args). + +start_child(ChildId, Mod, Args) -> {ok, _} = supervisor:start_child(?SERVER, - {Mod, {Mod, start_link, Args}, - transient, 100, worker, [Mod]}), + {ChildId, {Mod, start_link, Args}, + transient, ?MAX_WAIT, worker, [Mod]}), + ok. + +start_restartable_child(Mod) -> + start_restartable_child(Mod, []). + +start_restartable_child(Mod, Args) -> + Name = list_to_atom(atom_to_list(Mod) ++ "_sup"), + {ok, _} = supervisor:start_child( + ?SERVER, + {Name, {rabbit_restartable_sup, start_link, + [Name, {Mod, start_link, Args}]}, + transient, infinity, supervisor, [rabbit_restartable_sup]}), ok. init([]) -> - {ok, {{one_for_one, 10, 10}, []}}. + {ok, {{one_for_all, 0, 1}, []}}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 51d95c35a3..82f2d19918 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -330,7 +330,8 @@ test_topic_match(P, R) -> test_topic_match(P, R, true). test_topic_match(P, R, Expected) -> - case rabbit_exchange:topic_matches(list_to_binary(P), list_to_binary(R)) of + case rabbit_exchange_type_topic:topic_matches(list_to_binary(P), + list_to_binary(R)) of Expected -> passed; _ -> diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index 0fe1542616..493925efd5 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -63,4 +63,4 @@ init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown, [IPAddress, Port, SocketOpts, ConcurrentAcceptorCount, Name, OnStartup, OnShutdown, Label]}, - transient, 100, worker, [tcp_listener]}]}}. + transient, 16#ffffffff, worker, [tcp_listener]}]}}. diff --git a/src/worker_pool.erl b/src/worker_pool.erl new file mode 100644 index 0000000000..97e075459f --- /dev/null +++ b/src/worker_pool.erl @@ -0,0 +1,155 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool). + +%% Generic worker pool manager. +%% +%% Supports nested submission of jobs (nested jobs always run +%% immediately in current worker process). +%% +%% Possible future enhancements: +%% +%% 1. Allow priorities (basically, change the pending queue to a +%% priority_queue). + +-behaviour(gen_server2). + +-export([start_link/0, submit/1, submit_async/1, idle/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A). +-spec(submit_async/1 :: + (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(SERVER, ?MODULE). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +-record(state, { available, pending }). + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], + [{timeout, infinity}]). + +submit(Fun) -> + case get(worker_pool_worker) of + true -> worker_pool_worker:run(Fun); + _ -> Pid = gen_server2:call(?SERVER, next_free, infinity), + worker_pool_worker:submit(Pid, Fun) + end. + +submit_async(Fun) -> + gen_server2:cast(?SERVER, {run_async, Fun}). + +idle(WId) -> + gen_server2:cast(?SERVER, {idle, WId}). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, #state { pending = queue:new(), available = queue:new() }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call(next_free, From, State = #state { available = Avail, + pending = Pending }) -> + case queue:out(Avail) of + {empty, _Avail} -> + {noreply, + State #state { pending = queue:in({next_free, From}, Pending) }, + hibernate}; + {{value, WId}, Avail1} -> + {reply, get_worker_pid(WId), State #state { available = Avail1 }, + hibernate} + end; + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast({idle, WId}, State = #state { available = Avail, + pending = Pending }) -> + {noreply, case queue:out(Pending) of + {empty, _Pending} -> + State #state { available = queue:in(WId, Avail) }; + {{value, {next_free, From}}, Pending1} -> + gen_server2:reply(From, get_worker_pid(WId)), + State #state { pending = Pending1 }; + {{value, {run_async, Fun}}, Pending1} -> + worker_pool_worker:submit_async(get_worker_pid(WId), Fun), + State #state { pending = Pending1 } + end, hibernate}; + +handle_cast({run_async, Fun}, State = #state { available = Avail, + pending = Pending }) -> + {noreply, + case queue:out(Avail) of + {empty, _Avail} -> + State #state { pending = queue:in({run_async, Fun}, Pending)}; + {{value, WId}, Avail1} -> + worker_pool_worker:submit_async(get_worker_pid(WId), Fun), + State #state { available = Avail1 } + end, hibernate}; + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + State. + +%%---------------------------------------------------------------------------- + +get_worker_pid(WId) -> + [{WId, Pid, _Type, _Modules} | _] = + lists:dropwhile(fun ({Id, _Pid, _Type, _Modules}) + when Id =:= WId -> false; + (_) -> true + end, + supervisor:which_children(worker_pool_sup)), + Pid. diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl new file mode 100644 index 0000000000..4ded63a8db --- /dev/null +++ b/src/worker_pool_sup.erl @@ -0,0 +1,69 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool_sup). + +-behaviour(supervisor). + +-export([start_link/0, start_link/1]). + +-export([init/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(start_link/1 :: + (non_neg_integer()) -> {'ok', pid()} | 'ignore' | {'error', any()}). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(SERVER, ?MODULE). + +%%---------------------------------------------------------------------------- + +start_link() -> + start_link(erlang:system_info(schedulers)). + +start_link(WCount) -> + supervisor:start_link({local, ?SERVER}, ?MODULE, [WCount]). + +%%---------------------------------------------------------------------------- + +init([WCount]) -> + {ok, {{one_for_one, 10, 10}, + [{worker_pool, {worker_pool, start_link, []}, transient, + 16#ffffffff, worker, [worker_pool]} | + [{N, {worker_pool_worker, start_link, [N]}, transient, 16#ffffffff, + worker, [worker_pool_worker]} || N <- lists:seq(1, WCount)]]}}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl new file mode 100644 index 0000000000..d3a4811926 --- /dev/null +++ b/src/worker_pool_worker.erl @@ -0,0 +1,104 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool_worker). + +-behaviour(gen_server2). + +-export([start_link/1, submit/2, submit_async/2, run/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: (any()) -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A). +-spec(submit_async/2 :: + (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +%%---------------------------------------------------------------------------- + +start_link(WId) -> + gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]). + +submit(Pid, Fun) -> + gen_server2:call(Pid, {submit, Fun}, infinity). + +submit_async(Pid, Fun) -> + gen_server2:cast(Pid, {submit_async, Fun}). + +init([WId]) -> + ok = worker_pool:idle(WId), + put(worker_pool_worker, true), + {ok, WId, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call({submit, Fun}, From, WId) -> + gen_server2:reply(From, run(Fun)), + ok = worker_pool:idle(WId), + {noreply, WId, hibernate}; + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast({submit_async, Fun}, WId) -> + run(Fun), + ok = worker_pool:idle(WId), + {noreply, WId, hibernate}; + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + State. + +%%---------------------------------------------------------------------------- + +run({M, F, A}) -> + apply(M, F, A); +run(Fun) -> + Fun(). |
