summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.hgignore1
-rw-r--r--Makefile61
-rw-r--r--codegen.py102
-rw-r--r--docs/html-to-website-xml.xsl4
-rw-r--r--docs/rabbitmqctl.1.xml40
-rw-r--r--ebin/rabbit_app.in4
-rw-r--r--include/rabbit.hrl106
-rw-r--r--include/rabbit_backing_queue_spec.hrl24
-rw-r--r--include/rabbit_exchange_type_spec.hrl27
-rw-r--r--include/rabbit_framing_spec.hrl60
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec12
-rwxr-xr-xpackaging/common/rabbitmq-server.ocf172
-rw-r--r--packaging/debs/Debian/debian/changelog12
-rw-r--r--packaging/debs/Debian/debian/postrm.in19
-rw-r--r--packaging/macports/Makefile13
-rw-r--r--packaging/macports/Portfile.in9
-rwxr-xr-xpackaging/macports/make-port-diff.sh27
-rwxr-xr-xscripts/rabbitmq-multi3
-rw-r--r--scripts/rabbitmq-multi.bat6
-rwxr-xr-xscripts/rabbitmq-server3
-rw-r--r--scripts/rabbitmq-server.bat6
-rw-r--r--scripts/rabbitmq-service.bat6
-rwxr-xr-xscripts/rabbitmqctl3
-rw-r--r--scripts/rabbitmqctl.bat6
-rw-r--r--src/delegate.erl58
-rw-r--r--src/delegate_sup.erl2
-rw-r--r--src/file_handle_cache.erl14
-rw-r--r--src/gen_server2.erl4
-rw-r--r--src/pg_local.erl8
-rw-r--r--src/rabbit.erl30
-rw-r--r--src/rabbit_access_control.erl43
-rw-r--r--src/rabbit_alarm.erl11
-rw-r--r--src/rabbit_amqqueue.erl195
-rw-r--r--src/rabbit_amqqueue_process.erl218
-rw-r--r--src/rabbit_backing_queue.erl6
-rw-r--r--src/rabbit_basic.erl51
-rw-r--r--src/rabbit_binary_generator.erl29
-rw-r--r--src/rabbit_binary_parser.erl11
-rw-r--r--src/rabbit_channel.erl411
-rw-r--r--src/rabbit_control.erl12
-rw-r--r--src/rabbit_dialyzer.erl10
-rw-r--r--src/rabbit_error_logger.erl3
-rw-r--r--src/rabbit_error_logger_file_h.erl3
-rw-r--r--src/rabbit_exchange.erl210
-rw-r--r--src/rabbit_exchange_type.erl12
-rw-r--r--src/rabbit_exchange_type_direct.erl10
-rw-r--r--src/rabbit_exchange_type_fanout.erl12
-rw-r--r--src/rabbit_exchange_type_headers.erl15
-rw-r--r--src/rabbit_exchange_type_registry.erl19
-rw-r--r--src/rabbit_exchange_type_topic.erl14
-rw-r--r--src/rabbit_framing_channel.erl34
-rw-r--r--src/rabbit_guid.erl12
-rw-r--r--src/rabbit_invariable_queue.erl91
-rw-r--r--src/rabbit_load.erl5
-rw-r--r--src/rabbit_log.erl2
-rw-r--r--src/rabbit_memory_monitor.erl5
-rw-r--r--src/rabbit_misc.erl134
-rw-r--r--src/rabbit_mnesia.erl126
-rw-r--r--src/rabbit_multi.erl4
-rw-r--r--src/rabbit_net.erl39
-rw-r--r--src/rabbit_networking.erl38
-rw-r--r--src/rabbit_persister.erl26
-rw-r--r--src/rabbit_plugin_activator.erl1
-rw-r--r--src/rabbit_queue_collector.erl106
-rw-r--r--src/rabbit_reader.erl98
-rw-r--r--src/rabbit_router.erl23
-rw-r--r--src/rabbit_sasl_report_file_h.erl3
-rw-r--r--src/rabbit_tests.erl243
-rw-r--r--src/rabbit_types.erl145
-rw-r--r--src/rabbit_writer.erl38
-rw-r--r--src/supervisor2.erl16
-rw-r--r--src/tcp_acceptor.erl7
-rw-r--r--src/vm_memory_monitor.erl6
-rw-r--r--src/worker_pool.erl2
-rw-r--r--src/worker_pool_sup.erl6
-rw-r--r--src/worker_pool_worker.erl3
76 files changed, 2155 insertions, 1195 deletions
diff --git a/.hgignore b/.hgignore
index caaa3acef0..7b796b6640 100644
--- a/.hgignore
+++ b/.hgignore
@@ -10,6 +10,7 @@ syntax: regexp
^cover/
^dist/
^include/rabbit_framing\.hrl$
+^include/rabbit_framing_spec\.hrl$
^src/rabbit_framing\.erl$
^src/.*\_usage.erl$
^rabbit\.plt$
diff --git a/Makefile b/Makefile
index 3d39ccb071..a66d0640e3 100644
--- a/Makefile
+++ b/Makefile
@@ -15,7 +15,7 @@ INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl
SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL)
BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES))
TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS)
-WEB_URL=http://stage.rabbitmq.com/
+WEB_URL=http://www.rabbitmq.com/
MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml))
WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml)
USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-multi.1.xml
@@ -40,11 +40,11 @@ BASIC_PLT=basic.plt
RABBIT_PLT=rabbit.plt
ifndef USE_SPECS
-# our type specs rely on features / bug fixes in dialyzer that are
-# only available in R13B01 upwards (R13B01 is eshell 5.7.2)
+# our type specs rely on features and bug fixes in dialyzer that are
+# only available in R13B04 upwards (R13B04 is erts 5.7.5)
#
# NB: the test assumes that version number will only contain single digits
-USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.1" ]; then echo "true"; else echo "false"; fi)
+USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.4" ]; then echo "true"; else echo "false"; fi)
endif
#other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests
@@ -56,7 +56,7 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME)
SIBLING_CODEGEN_DIR=../rabbitmq-codegen/
AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen)
-AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json
+AMQP_SPEC_JSON_FILES=$(AMQP_CODEGEN_DIR)/amqp-0.8.json $(AMQP_CODEGEN_DIR)/rabbitmq-0.8-extensions.json
ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
@@ -70,6 +70,24 @@ define usage_dep
$(call usage_xml_to_erl, $(1)): $(1) $(DOCS_DIR)/usage.xsl
endef
+ifneq "$(SBIN_DIR)" ""
+ifneq "$(TARGET_DIR)" ""
+SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR))
+endif
+endif
+
+# Versions prior to this are not supported
+NEED_MAKE := 3.80
+ifneq "$(NEED_MAKE)" "$(firstword $(sort $(NEED_MAKE) $(MAKE_VERSION)))"
+$(error Versions of make prior to $(NEED_MAKE) are not supported)
+endif
+
+# .DEFAULT_GOAL introduced in 3.81
+DEFAULT_GOAL_MAKE := 3.81
+ifneq "$(DEFAULT_GOAL_MAKE)" "$(firstword $(sort $(DEFAULT_GOAL_MAKE) $(MAKE_VERSION)))"
+.DEFAULT_GOAL=all
+endif
+
all: $(TARGETS)
$(DEPS_FILE): $(SOURCES) $(INCLUDES)
@@ -81,11 +99,11 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
$(EBIN_DIR)/%.beam:
erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
-$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH)
- $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@
+$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES)
+ $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_FILES) $@
-$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH)
- $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@
+$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES)
+ $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES) $@
dialyze: $(BEAM_TARGETS) $(BASIC_PLT)
$(ERL_EBIN) -eval \
@@ -184,7 +202,7 @@ srcdist: distclean
>> $(TARGET_SRC_DIR)/INSTALL
cp README.in $(TARGET_SRC_DIR)/README
elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \
- >> $(TARGET_SRC_DIR)/BUILD
+ >> $(TARGET_SRC_DIR)/README
sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save
cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/
@@ -205,9 +223,10 @@ distclean: clean
# xmlto can not read from standard input, so we mess with a tmp file.
%.gz: %.xml $(DOCS_DIR)/examples-to-end.xsl
- xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \
- xmlto man -o $(DOCS_DIR) --stringparam man.indent.verbatims=0 $<.tmp && \
- gzip -f $(DOCS_DIR)/`basename $< .xml`
+ xmlto --version | grep -E '^xmlto version 0\.0\.([0-9]|1[1-8])$$' >/dev/null || opt='--stringparam man.indent.verbatims=0' ; \
+ xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \
+ xmlto man -o $(DOCS_DIR) $$opt $<.tmp && \
+ gzip -f $(DOCS_DIR)/`basename $< .xml`
rm -f $<.tmp
# Use tmp files rather than a pipeline so that we get meaningful errors
@@ -234,13 +253,7 @@ $(SOURCE_DIR)/%_usage.erl:
docs_all: $(MANPAGES) $(WEB_MANPAGES)
-install: SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR))
install: all docs_all install_dirs
- @[ -n "$(TARGET_DIR)" ] || (echo "Please set TARGET_DIR."; false)
- @[ -n "$(SBIN_DIR)" ] || (echo "Please set SBIN_DIR."; false)
- @[ -n "$(MAN_DIR)" ] || (echo "Please set MAN_DIR."; false)
-
- mkdir -p $(TARGET_DIR)
cp -r ebin include LICENSE LICENSE-MPL-RabbitMQ INSTALL $(TARGET_DIR)
chmod 0755 scripts/*
@@ -256,10 +269,16 @@ install: all docs_all install_dirs
done
install_dirs:
- mkdir -p $(SBIN_DIR)
+ @ OK=true && \
+ { [ -n "$(TARGET_DIR)" ] || { echo "Please set TARGET_DIR."; OK=false; }; } && \
+ { [ -n "$(SBIN_DIR)" ] || { echo "Please set SBIN_DIR."; OK=false; }; } && \
+ { [ -n "$(MAN_DIR)" ] || { echo "Please set MAN_DIR."; OK=false; }; } && $$OK
+
mkdir -p $(TARGET_DIR)/sbin
+ mkdir -p $(SBIN_DIR)
+ mkdir -p $(MAN_DIR)
-$(foreach XML, $(USAGES_XML), $(eval $(call usage_dep, $(XML))))
+$(foreach XML,$(USAGES_XML),$(eval $(call usage_dep, $(XML))))
# Note that all targets which depend on clean must have clean in their
# name. Also any target that doesn't depend on clean should not have
diff --git a/codegen.py b/codegen.py
index a24a32ff37..6042e78fcd 100644
--- a/codegen.py
+++ b/codegen.py
@@ -95,6 +95,27 @@ class PackedMethodBitField:
def full(self):
return self.count() == 8
+def multiLineFormat(things, prologue, separator, lineSeparator, epilogue, thingsPerLine = 4):
+ r = [prologue]
+ i = 0
+ for t in things:
+ if i != 0:
+ if i % thingsPerLine == 0:
+ r += [lineSeparator]
+ else:
+ r += [separator]
+ r += [t]
+ i += 1
+ r += [epilogue]
+ return "".join(r)
+
+def prettyType(typeName, subTypes, typesPerLine = 4):
+ """Pretty print a type signature made up of many alternative subtypes"""
+ sTs = multiLineFormat(subTypes,
+ "( ", " | ", "\n | ", " )",
+ thingsPerLine = typesPerLine)
+ return "-type(%s ::\n %s)." % (typeName, sTs)
+
def printFileHeader():
print """%% Autogenerated code. Do not edit.
%%
@@ -317,6 +338,83 @@ def genErl(spec):
-export([lookup_amqp_exception/1]).
-export([amqp_exception/1]).
+"""
+ print "%% Various types"
+ print "-ifdef(use_specs)."
+
+ print """-export_type([amqp_table/0, amqp_property_type/0, amqp_method_record/0,
+ amqp_method_name/0, amqp_method/0, amqp_class_id/0,
+ amqp_value/0, amqp_array/0, amqp_exception/0, amqp_property_record/0]).
+
+-type(amqp_field_type() ::
+ 'longstr' | 'signedint' | 'decimal' | 'timestamp' |
+ 'table' | 'byte' | 'double' | 'float' | 'long' |
+ 'short' | 'bool' | 'binary' | 'void').
+-type(amqp_property_type() ::
+ 'shortstr' | 'longstr' | 'octet' | 'shortint' | 'longint' |
+ 'longlongint' | 'timestamp' | 'bit' | 'table').
+
+-type(amqp_table() :: [{binary(), amqp_field_type(), amqp_value()}]).
+-type(amqp_array() :: [{amqp_field_type(), amqp_value()}]).
+-type(amqp_value() :: binary() | % longstr
+ integer() | % signedint
+ {non_neg_integer(), non_neg_integer()} | % decimal
+ amqp_table() |
+ amqp_array() |
+ byte() | % byte
+ float() | % double
+ integer() | % long
+ integer() | % short
+ boolean() | % bool
+ binary() | % binary
+ 'undefined' | % void
+ non_neg_integer() % timestamp
+ ).
+"""
+
+ print prettyType("amqp_method_name()",
+ [m.erlangName() for m in methods])
+ print prettyType("amqp_method()",
+ ["{%s, %s}" % (m.klass.index, m.index) for m in methods],
+ 6)
+ print prettyType("amqp_method_record()",
+ ["#%s{}" % (m.erlangName()) for m in methods])
+ fieldNames = set()
+ for m in methods:
+ fieldNames.update(m.arguments)
+ fieldNames = [erlangize(f.name) for f in fieldNames]
+ print prettyType("amqp_method_field_name()",
+ fieldNames)
+ print prettyType("amqp_property_record()",
+ ["#'P_%s'{}" % erlangize(c.name) for c in spec.allClasses()])
+ print prettyType("amqp_exception()",
+ ["'%s'" % erlangConstantName(c).lower() for (c, v, cls) in spec.constants])
+ print prettyType("amqp_exception_code()",
+ ["%i" % v for (c, v, cls) in spec.constants])
+ classIds = set()
+ for m in spec.allMethods():
+ classIds.add(m.klass.index)
+ print prettyType("amqp_class_id()",
+ ["%i" % ci for ci in classIds])
+ print "-endif. % use_specs"
+
+ print """
+%% Method signatures
+-ifdef(use_specs).
+-spec(lookup_method_name/1 :: (amqp_method()) -> amqp_method_name()).
+-spec(method_id/1 :: (amqp_method_name()) -> amqp_method()).
+-spec(method_has_content/1 :: (amqp_method_name()) -> boolean()).
+-spec(is_method_synchronous/1 :: (amqp_method_record()) -> boolean()).
+-spec(method_record/1 :: (amqp_method_name()) -> amqp_method_record()).
+-spec(method_fieldnames/1 :: (amqp_method_name()) -> [amqp_method_field_name()]).
+-spec(decode_method_fields/2 :: (amqp_method_name(), binary()) -> amqp_method_record()).
+-spec(decode_properties/2 :: (non_neg_integer(), binary()) -> amqp_property_record()).
+-spec(encode_method_fields/1 :: (amqp_method_record()) -> binary()).
+-spec(encode_properties/1 :: (amqp_method_record()) -> binary()).
+-spec(lookup_amqp_exception/1 :: (amqp_exception()) -> {boolean(), amqp_exception_code(), binary()}).
+-spec(amqp_exception/1 :: (amqp_exception_code()) -> amqp_exception()).
+-endif. % use_specs
+
bitvalue(true) -> 1;
bitvalue(false) -> 0;
bitvalue(undefined) -> 0.
@@ -397,6 +495,7 @@ def genHrl(spec):
for c in spec.allClasses():
print "-record('P_%s', {%s})." % (erlangize(c.name), fieldNameList(c.fields))
+
def generateErl(specPath):
genErl(AmqpSpec(specPath))
@@ -404,5 +503,6 @@ def generateHrl(specPath):
genHrl(AmqpSpec(specPath))
if __name__ == "__main__":
- do_main(generateHrl, generateErl)
+ do_main_dict({"header": generateHrl,
+ "body": generateErl})
diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl
index f2117e2679..662dbea00a 100644
--- a/docs/html-to-website-xml.xsl
+++ b/docs/html-to-website-xml.xsl
@@ -58,13 +58,13 @@
<!-- Specific instructions to revert the DocBook HTML to be more like our ad-hoc XML schema -->
<xsl:template match="div[@class='refsect1'] | div[@class='refnamediv'] | div[@class='refsynopsisdiv']">
- <doc:section name="{@title}">
+ <doc:section name="{h2}">
<xsl:apply-templates select="node()"/>
</doc:section>
</xsl:template>
<xsl:template match="div[@class='refsect2']">
- <doc:subsection name="{@title}">
+ <doc:subsection name="{h3}">
<xsl:apply-templates select="node()"/>
</doc:subsection>
</xsl:template>
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 5e2668c1a6..e53a97c2c9 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -270,8 +270,8 @@
<title>Cluster management</title>
<variablelist>
- <varlistentry>
- <term><cmdsynopsis><command>cluster</command> <arg choice="req"><replaceable>clusternode</replaceable></arg></cmdsynopsis></term>
+ <varlistentry id="cluster">
+ <term><cmdsynopsis><command>cluster</command><arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
@@ -281,7 +281,8 @@
</variablelist>
<para>
Instruct the node to become member of a cluster with the
- specified nodes.
+ specified nodes. To cluster with currently offline nodes,
+ use <link linkend="force_cluster"><command>force_cluster</command></link>.
</para>
<para>
Cluster nodes can be of two types: disk or ram. Disk nodes
@@ -334,6 +335,29 @@
</para>
</listitem>
</varlistentry>
+ <varlistentry id="force_cluster">
+ <term><cmdsynopsis><command>force_cluster</command><arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>clusternode</term>
+ <listitem><para>Subset of the nodes of the cluster to which this node should be connected.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Instruct the node to become member of a cluster with the
+ specified nodes. This will succeed even if the specified nodes
+ are offline. For a more detailed description, see
+ <link linkend="cluster"><command>cluster</command>.</link>
+ </para>
+ <para>
+ Note that this variant of the cluster command just
+ ignores the current status of the specified nodes.
+ Clustering may still fail for a variety of other
+ reasons.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect2>
@@ -603,10 +627,12 @@
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl list_permissions -p /myvhost</screen>
<para role="example">
- This command instructs the RabbitMQ broker to list all the
- users which have been granted access to the virtual host
- called <command>/myvhost</command>, and the permissions they
- have for operations on resources in that virtual host.
+ This command instructs the RabbitMQ broker to list all
+ the users which have been granted access to the virtual
+ host called <command>/myvhost</command>, and the
+ permissions they have for operations on resources in
+ that virtual host. Note that an empty string means no
+ permissions granted.
</para>
</listitem>
</varlistentry>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index bdf407eb93..ce94cafe62 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -11,8 +11,8 @@
rabbit_sup,
rabbit_tcp_client_sup]},
{applications, [kernel, stdlib, sasl, mnesia, os_mon]},
-%% we also depend on ssl but it shouldn't be in here as we don't
-%% actually want to start it
+%% we also depend on crypto, public_key and ssl but they shouldn't be
+%% in here as we don't actually want to start it
{mod, {rabbit, []}},
{env, [{tcp_listeners, [{"0.0.0.0", 5672}]},
{ssl_listeners, []},
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 4e8ed11418..54a4021d40 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -51,7 +51,8 @@
-record(exchange, {name, type, durable, auto_delete, arguments}).
--record(amqqueue, {name, durable, auto_delete, arguments, pid}).
+-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
+ arguments, pid}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
@@ -67,113 +68,10 @@
-record(ssl_socket, {tcp, ssl}).
-record(delivery, {mandatory, immediate, txn, sender, message}).
-
-record(amqp_error, {name, explanation = "", method = none}).
%%----------------------------------------------------------------------------
--ifdef(use_specs).
-
--include("rabbit_framing_spec.hrl").
-
--type(maybe(T) :: T | 'none').
--type(erlang_node() :: atom()).
--type(ssl_socket() :: #ssl_socket{}).
--type(socket() :: port() | ssl_socket()).
--type(thunk(T) :: fun(() -> T)).
--type(info_key() :: atom()).
--type(info() :: {info_key(), any()}).
--type(regexp() :: binary()).
--type(file_path() :: string()).
-
-%% this is really an abstract type, but dialyzer does not support them
--type(guid() :: binary()).
--type(txn() :: guid()).
--type(pkey() :: guid()).
--type(r(Kind) ::
- #resource{virtual_host :: vhost(),
- kind :: Kind,
- name :: resource_name()}).
--type(queue_name() :: r('queue')).
--type(exchange_name() :: r('exchange')).
--type(user() ::
- #user{username :: username(),
- password :: password()}).
--type(permission() ::
- #permission{configure :: regexp(),
- write :: regexp(),
- read :: regexp()}).
--type(amqqueue() ::
- #amqqueue{name :: queue_name(),
- durable :: boolean(),
- auto_delete :: boolean(),
- arguments :: amqp_table(),
- pid :: maybe(pid())}).
--type(exchange() ::
- #exchange{name :: exchange_name(),
- type :: exchange_type(),
- durable :: boolean(),
- auto_delete :: boolean(),
- arguments :: amqp_table()}).
--type(binding() ::
- #binding{exchange_name :: exchange_name(),
- queue_name :: queue_name(),
- key :: binding_key()}).
-%% TODO: make this more precise by tying specific class_ids to
-%% specific properties
--type(undecoded_content() ::
- #content{class_id :: amqp_class_id(),
- properties :: 'none',
- properties_bin :: binary(),
- payload_fragments_rev :: [binary()]} |
- #content{class_id :: amqp_class_id(),
- properties :: amqp_properties(),
- properties_bin :: 'none',
- payload_fragments_rev :: [binary()]}).
--type(unencoded_content() :: undecoded_content()).
--type(decoded_content() ::
- #content{class_id :: amqp_class_id(),
- properties :: amqp_properties(),
- properties_bin :: maybe(binary()),
- payload_fragments_rev :: [binary()]}).
--type(encoded_content() ::
- #content{class_id :: amqp_class_id(),
- properties :: maybe(amqp_properties()),
- properties_bin :: binary(),
- payload_fragments_rev :: [binary()]}).
--type(content() :: undecoded_content() | decoded_content()).
--type(basic_message() ::
- #basic_message{exchange_name :: exchange_name(),
- routing_key :: routing_key(),
- content :: content(),
- guid :: guid(),
- is_persistent :: boolean()}).
--type(message() :: basic_message()).
--type(delivery() ::
- #delivery{mandatory :: boolean(),
- immediate :: boolean(),
- txn :: maybe(txn()),
- sender :: pid(),
- message :: message()}).
-%% this really should be an abstract type
--type(msg_id() :: non_neg_integer()).
--type(qmsg() :: {queue_name(), pid(), msg_id(), boolean(), message()}).
--type(listener() ::
- #listener{node :: erlang_node(),
- protocol :: atom(),
- host :: string() | atom(),
- port :: non_neg_integer()}).
--type(not_found() :: {'error', 'not_found'}).
--type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
--type(amqp_error() ::
- #amqp_error{name :: atom(),
- explanation :: string(),
- method :: atom()}).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.").
-define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/").
-define(ERTS_MINIMUM, "5.6.3").
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 1b536dfad1..05dc1464cd 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -31,26 +31,26 @@
-type(fetch_result() ::
%% Message, IsDelivered, AckTag, Remaining_Len
- ('empty'|{basic_message(), boolean(), ack(), non_neg_integer()})).
+ ('empty'|{rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})).
-type(is_durable() :: boolean()).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
-type(ack_required() :: boolean()).
--spec(start/1 :: ([queue_name()]) -> 'ok').
--spec(init/3 :: (queue_name(), is_durable(), attempt_recovery()) -> state()).
+-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
+-spec(init/3 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery()) -> state()).
-spec(terminate/1 :: (state()) -> state()).
-spec(delete_and_terminate/1 :: (state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
--spec(publish/2 :: (basic_message(), state()) -> state()).
+-spec(publish/2 :: (rabbit_types:basic_message(), state()) -> state()).
-spec(publish_delivered/3 ::
- (ack_required(), basic_message(), state()) -> {ack(), state()}).
+ (ack_required(), rabbit_types:basic_message(), state()) -> {ack(), state()}).
-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
-spec(ack/2 :: ([ack()], state()) -> state()).
--spec(tx_publish/3 :: (txn(), basic_message(), state()) -> state()).
--spec(tx_ack/3 :: (txn(), [ack()], state()) -> state()).
--spec(tx_rollback/2 :: (txn(), state()) -> {[ack()], state()}).
--spec(tx_commit/3 :: (txn(), fun (() -> any()), state()) -> {[ack()], state()}).
+-spec(tx_publish/3 :: (rabbit_types:txn(), rabbit_types:basic_message(), state()) -> state()).
+-spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()).
+-spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}).
+-spec(tx_commit/3 :: (rabbit_types:txn(), fun (() -> any()), state()) -> {[ack()], state()}).
-spec(requeue/2 :: ([ack()], state()) -> state()).
-spec(len/1 :: (state()) -> non_neg_integer()).
-spec(is_empty/1 :: (state()) -> boolean()).
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index 9864f1eb64..f05bcb847f 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -31,12 +31,19 @@
-ifdef(use_specs).
-spec(description/0 :: () -> [{atom(), any()}]).
--spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
--spec(validate/1 :: (exchange()) -> 'ok').
--spec(create/1 :: (exchange()) -> 'ok').
--spec(recover/2 :: (exchange(), list(binding())) -> 'ok').
--spec(delete/2 :: (exchange(), list(binding())) -> 'ok').
--spec(add_binding/2 :: (exchange(), binding()) -> 'ok').
--spec(remove_bindings/2 :: (exchange(), list(binding())) -> 'ok').
+-spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
+ -> {rabbit_router:routing_result(), [pid()]}).
+-spec(validate/1 :: (rabbit_types:exchange()) -> 'ok').
+-spec(create/1 :: (rabbit_types:exchange()) -> 'ok').
+-spec(recover/2 :: (rabbit_types:exchange(),
+ [rabbit_types:binding()]) -> 'ok').
+-spec(delete/2 :: (rabbit_types:exchange(),
+ [rabbit_types:binding()]) -> 'ok').
+-spec(add_binding/2 :: (rabbit_types:exchange(),
+ rabbit_types:binding()) -> 'ok').
+-spec(remove_bindings/2 :: (rabbit_types:exchange(),
+ [rabbit_types:binding()]) -> 'ok').
+-spec(assert_args_equivalence/2 :: (rabbit_types:exchange(),
+ rabbit_framing:amqp_table()) -> 'ok').
-endif.
diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl
deleted file mode 100644
index 1a9798998c..0000000000
--- a/include/rabbit_framing_spec.hrl
+++ /dev/null
@@ -1,60 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
-%% TODO: much of this should be generated
-
--type(amqp_field_type() ::
- 'longstr' | 'signedint' | 'decimal' | 'timestamp' |
- 'table' | 'byte' | 'double' | 'float' | 'long' |
- 'short' | 'bool' | 'binary' | 'void').
--type(amqp_property_type() ::
- 'shortstr' | 'longstr' | 'octet' | 'shortint' | 'longint' |
- 'longlongint' | 'timestamp' | 'bit' | 'table').
-%% we could make this more precise but ultimately are limited by
-%% dialyzer's lack of support for recursive types
--type(amqp_table() :: [{binary(), amqp_field_type(), any()}]).
-%% TODO: make this more precise
--type(amqp_class_id() :: non_neg_integer()).
-%% TODO: make this more precise
--type(amqp_properties() :: tuple()).
-%% TODO: make this more precise
--type(amqp_method() :: tuple()).
-%% TODO: make this more precise
--type(amqp_method_name() :: atom()).
--type(channel_number() :: non_neg_integer()).
--type(resource_name() :: binary()).
--type(routing_key() :: binary()).
--type(username() :: binary()).
--type(password() :: binary()).
--type(vhost() :: binary()).
--type(ctag() :: binary()).
--type(exchange_type() :: atom()).
--type(binding_key() :: binary()).
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 6926261f79..86675e1e08 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -107,6 +107,12 @@ if [ $1 = 0 ]; then
# Leave rabbitmq user and group
fi
+# Clean out plugin activation state, both on uninstall and upgrade
+rm -rf %{_rabbit_erllibdir}/priv
+for ext in rel script boot ; do
+ rm -f %{_rabbit_erllibdir}/ebin/rabbit.$ext
+done
+
%files -f ../%{name}.files
%defattr(-,root,root,-)
%attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq
@@ -122,6 +128,12 @@ fi
rm -rf %{buildroot}
%changelog
+* Wed Jul 14 2010 Emile Joubert <emile@rabbitmq.com> 1.8.1-1
+- New Upstream Release
+
+* Tue Jun 15 2010 Matthew Sackman <matthew@rabbitmq.com> 1.8.0-1
+- New Upstream Release
+
* Mon Feb 15 2010 Matthew Sackman <matthew@lshift.net> 1.7.2-1
- New Upstream Release
diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf
index 97c58ea2f8..db0ed70b61 100755
--- a/packaging/common/rabbitmq-server.ocf
+++ b/packaging/common/rabbitmq-server.ocf
@@ -35,21 +35,22 @@
##
## OCF instance parameters
-## OCF_RESKEY_multi
-## OCF_RESKEY_ctl
-## OCF_RESKEY_nodename
-## OCF_RESKEY_ip
-## OCF_RESKEY_port
-## OCF_RESKEY_cluster_config_file
-## OCF_RESKEY_config_file
-## OCF_RESKEY_log_base
-## OCF_RESKEY_mnesia_base
-## OCF_RESKEY_server_start_args
+## OCF_RESKEY_multi
+## OCF_RESKEY_ctl
+## OCF_RESKEY_nodename
+## OCF_RESKEY_ip
+## OCF_RESKEY_port
+## OCF_RESKEY_cluster_config_file
+## OCF_RESKEY_config_file
+## OCF_RESKEY_log_base
+## OCF_RESKEY_mnesia_base
+## OCF_RESKEY_server_start_args
#######################################################################
# Initialization:
-. ${OCF_ROOT}/resource.d/heartbeat/.ocf-shellfuncs
+: ${OCF_FUNCTIONS_DIR=${OCF_ROOT}/resource.d/heartbeat}
+. ${OCF_FUNCTIONS_DIR}/.ocf-shellfuncs
#######################################################################
@@ -63,7 +64,7 @@ OCF_RESKEY_log_base_default="/var/log/rabbitmq"
: ${OCF_RESKEY_log_base=${OCF_RESKEY_log_base_default}}
meta_data() {
- cat <<END
+ cat <<END
<?xml version="1.0"?>
<!DOCTYPE resource-agent SYSTEM "ra-api-1.dtd">
<resource-agent name="rabbitmq-server">
@@ -113,7 +114,7 @@ The IP address for rabbitmq-server to listen on
The IP Port for rabbitmq-server to listen on
</longdesc>
<shortdesc lang="en">IP Port</shortdesc>
-<content type="string" default="" />
+<content type="integer" default="" />
</parameter>
<parameter name="cluster_config_file" unique="0" required="0">
@@ -161,7 +162,8 @@ Additional arguments provided to the server on startup
<actions>
<action name="start" timeout="600" />
<action name="stop" timeout="120" />
-<action name="monitor" timeout="20" interval="10" depth="0" start-delay="0" />
+<action name="status" timeout="20" interval="10" />
+<action name="monitor" timeout="20" interval="10" />
<action name="validate-all" timeout="30" />
<action name="meta-data" timeout="5" />
</actions>
@@ -170,8 +172,8 @@ END
}
rabbit_usage() {
- cat <<END
-usage: $0 {start|stop|monitor|validate-all|meta-data}
+ cat <<END
+usage: $0 {start|stop|status|monitor|validate-all|meta-data}
Expects to have a fully populated OCF RA-compliant environment set.
END
@@ -202,35 +204,35 @@ export_vars() {
rabbit_validate_partial() {
if [ ! -x $RABBITMQ_MULTI ]; then
- ocf_log err "rabbitmq-server multi $RABBITMQ_MULTI does not exist or is not executable";
- return $OCF_ERR_ARGS;
+ ocf_log err "rabbitmq-server multi $RABBITMQ_MULTI does not exist or is not executable";
+ exit $OCF_ERR_INSTALLED;
fi
if [ ! -x $RABBITMQ_CTL ]; then
- ocf_log err "rabbitmq-server ctl $RABBITMQ_CTL does not exist or is not executable";
- return $OCF_ERR_ARGS;
+ ocf_log err "rabbitmq-server ctl $RABBITMQ_CTL does not exist or is not executable";
+ exit $OCF_ERR_INSTALLED;
fi
}
rabbit_validate_full() {
if [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && [ ! -e $RABBITMQ_CLUSTER_CONFIG_FILE ]; then
- ocf_log err "rabbitmq-server cluster_config_file $RABBITMQ_CLUSTER_CONFIG_FILE does not exist or is not a file";
- return $OCF_ERR_ARGS;
+ ocf_log err "rabbitmq-server cluster_config_file $RABBITMQ_CLUSTER_CONFIG_FILE does not exist or is not a file";
+ exit $OCF_ERR_INSTALLED;
fi
if [ ! -z $RABBITMQ_CONFIG_FILE ] && [ ! -e $RABBITMQ_CONFIG_FILE ]; then
- ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file";
- return $OCF_ERR_ARGS;
+ ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file";
+ exit $OCF_ERR_INSTALLED;
fi
if [ ! -z $RABBITMQ_LOG_BASE ] && [ ! -d $RABBITMQ_LOG_BASE ]; then
- ocf_log err "rabbitmq-server log_base $RABBITMQ_LOG_BASE does not exist or is not a directory";
- return $OCF_ERR_ARGS;
+ ocf_log err "rabbitmq-server log_base $RABBITMQ_LOG_BASE does not exist or is not a directory";
+ exit $OCF_ERR_INSTALLED;
fi
if [ ! -z $RABBITMQ_MNESIA_BASE ] && [ ! -d $RABBITMQ_MNESIA_BASE ]; then
- ocf_log err "rabbitmq-server mnesia_base $RABBITMQ_MNESIA_BASE does not exist or is not a directory";
- return $OCF_ERR_ARGS;
+ ocf_log err "rabbitmq-server mnesia_base $RABBITMQ_MNESIA_BASE does not exist or is not a directory";
+ exit $OCF_ERR_INSTALLED;
fi
rabbit_validate_partial
@@ -243,25 +245,26 @@ rabbit_status() {
$RABBITMQ_CTL $NODENAME_ARG status > /dev/null 2> /dev/null
rc=$?
case "$rc" in
- 0)
- return $OCF_SUCCESS
- ;;
- 2)
- return $OCF_NOT_RUNNING
- ;;
- *)
- ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG status: $rc"
- return $OCF_ERR_GENERIC
+ 0)
+ ocf_log debug "RabbitMQ server is running normally"
+ return $OCF_SUCCESS
+ ;;
+ 2)
+ ocf_log debug "RabbitMQ server is not running"
+ return $OCF_NOT_RUNNING
+ ;;
+ *)
+ ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG status: $rc"
+ exit $OCF_ERR_GENERIC
esac
}
rabbit_start() {
local rc
- rabbit_validate_full
- rc=$?
- if [ "$rc" != $OCF_SUCCESS ]; then
- return $rc
+ if rabbit_status; then
+ ocf_log info "Resource already running."
+ return $OCF_SUCCESS
fi
export_vars
@@ -270,24 +273,23 @@ rabbit_start() {
rc=$?
if [ "$rc" != 0 ]; then
- ocf_log err "rabbitmq-server start command failed: $RABBITMQ_MULTI start_all 1, $rc"
- return $rc
+ ocf_log err "rabbitmq-server start command failed: $RABBITMQ_MULTI start_all 1, $rc"
+ return $rc
fi
# Spin waiting for the server to come up.
# Let the CRM/LRM time us out if required
start_wait=1
while [ $start_wait = 1 ]; do
- rabbit_status
- rc=$?
- if [ "$rc" = $OCF_SUCCESS ]; then
- start_wait=0
-
- elif [ "$rc" != $OCF_NOT_RUNNING ]; then
- ocf_log info "rabbitmq-server start failed: $rc"
- return $OCF_ERR_GENERIC
- fi
- sleep 2
+ rabbit_status
+ rc=$?
+ if [ "$rc" = $OCF_SUCCESS ]; then
+ start_wait=0
+ elif [ "$rc" != $OCF_NOT_RUNNING ]; then
+ ocf_log info "rabbitmq-server start failed: $rc"
+ exit $OCF_ERR_GENERIC
+ fi
+ sleep 1
done
return $OCF_SUCCESS
@@ -295,28 +297,34 @@ rabbit_start() {
rabbit_stop() {
local rc
+
+ if ! rabbit_status; then
+ ocf_log info "Resource not running."
+ return $OCF_SUCCESS
+ fi
+
$RABBITMQ_MULTI stop_all &
rc=$?
if [ "$rc" != 0 ]; then
- ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_MULTI stop_all, $rc"
- return $rc
+ ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_MULTI stop_all, $rc"
+ return $rc
fi
# Spin waiting for the server to shut down.
# Let the CRM/LRM time us out if required
stop_wait=1
while [ $stop_wait = 1 ]; do
- rabbit_status
- rc=$?
- if [ "$rc" = $OCF_NOT_RUNNING ]; then
- stop_wait=0
+ rabbit_status
+ rc=$?
+ if [ "$rc" = $OCF_NOT_RUNNING ]; then
+ stop_wait=0
break
- elif [ "$rc" != $OCF_SUCCESS ]; then
- ocf_log info "rabbitmq-server stop failed: $rc"
- return $OCF_ERR_GENERIC
- fi
- sleep 2
+ elif [ "$rc" != $OCF_SUCCESS ]; then
+ ocf_log info "rabbitmq-server stop failed: $rc"
+ exit $OCF_ERR_GENERIC
+ fi
+ sleep 1
done
return $OCF_SUCCESS
@@ -329,34 +337,38 @@ rabbit_monitor() {
case $__OCF_ACTION in
meta-data)
- meta_data
- exit $OCF_SUCCESS
- ;;
+ meta_data
+ exit $OCF_SUCCESS
+ ;;
usage|help)
- rabbit_usage
- exit $OCF_SUCCESS
- ;;
+ rabbit_usage
+ exit $OCF_SUCCESS
+ ;;
esac
-rabbit_validate_partial || exit
+if ocf_is_probe; then
+ rabbit_validate_partial
+else
+ rabbit_validate_full
+fi
case $__OCF_ACTION in
start)
- rabbit_start
+ rabbit_start
;;
stop)
- rabbit_stop
+ rabbit_stop
;;
- monitor)
- rabbit_monitor
+ status|monitor)
+ rabbit_monitor
;;
validate-all)
exit $OCF_SUCCESS
- ;;
+ ;;
*)
- rabbit_usage
- exit $OCF_ERR_UNIMPLEMENTED
- ;;
+ rabbit_usage
+ exit $OCF_ERR_UNIMPLEMENTED
+ ;;
esac
-exit $? \ No newline at end of file
+exit $?
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 63b50749e1..0dccf93879 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,15 @@
+rabbitmq-server (1.8.1-1) lucid; urgency=low
+
+ * New Upstream Release
+
+ -- Emile Joubert <emile@rabbitmq.com> Wed, 14 Jul 2010 15:05:24 +0100
+
+rabbitmq-server (1.8.0-1) intrepid; urgency=low
+
+ * New Upstream Release
+
+ -- Matthew Sackman <matthew@rabbitmq.com> Tue, 15 Jun 2010 12:48:48 +0100
+
rabbitmq-server (1.7.2-1) intrepid; urgency=low
* New Upstream Release
diff --git a/packaging/debs/Debian/debian/postrm.in b/packaging/debs/Debian/debian/postrm.in
index bfcf1f530e..5290de9b17 100644
--- a/packaging/debs/Debian/debian/postrm.in
+++ b/packaging/debs/Debian/debian/postrm.in
@@ -18,6 +18,13 @@ set -e
# for details, see http://www.debian.org/doc/debian-policy/ or
# the debian-policy package
+remove_plugin_traces() {
+ # Remove traces of plugins
+ rm -rf @RABBIT_LIB@/priv @RABBIT_LIB@/plugins
+ for ext in rel script boot ; do
+ rm -f @RABBIT_LIB@/ebin/rabbit.$ext
+ done
+}
case "$1" in
purge)
@@ -34,11 +41,7 @@ case "$1" in
if [ -d /etc/rabbitmq ]; then
rm -r /etc/rabbitmq
fi
- # Remove traces of plugins
- rm -rf @RABBIT_LIB@/priv @RABBIT_LIB@/plugins
- for ext in rel script boot ; do
- rm -f @RABBIT_LIB@/ebin/rabbit.$ext
- done
+ remove_plugin_traces
if getent passwd rabbitmq >/dev/null; then
# Stop epmd if run by the rabbitmq user
pkill -u rabbitmq epmd || :
@@ -50,7 +53,11 @@ case "$1" in
fi
;;
- remove|upgrade|failed-upgrade|abort-install|abort-upgrade|disappear)
+ remove|upgrade)
+ remove_plugin_traces
+ ;;
+
+ failed-upgrade|abort-install|abort-upgrade|disappear)
;;
*)
diff --git a/packaging/macports/Makefile b/packaging/macports/Makefile
index 0ef7dd5e73..3a22eef08a 100644
--- a/packaging/macports/Makefile
+++ b/packaging/macports/Makefile
@@ -31,15 +31,22 @@ $(DEST)/Portfile: Portfile.in
-f checksums.sed <$^ >$@
rm checksums.sed
+# The purpose of the intricate substitution below is to set up similar
+# environment vars to the ones that su will on Linux. On OS X, we
+# have to use the -m option to su in order to be able to set the shell
+# (which for the rabbitmq user would otherwise be /dev/null). But the
+# -m option means that *all* environment vars get preserved. Erlang
+# needs vars such as HOME to be set. So we have to set them
+# explicitly.
macports: dirs $(DEST)/Portfile
for f in rabbitmq-asroot-script-wrapper rabbitmq-script-wrapper ; do \
cp $(COMMON_DIR)/$$f $(DEST)/files ; \
done
- sed -i -e 's|@SU_RABBITMQ_SH_C@|SHELL=/bin/sh su -m rabbitmq -c|' \
+ sed -i -e 's|@SU_RABBITMQ_SH_C@|SHELL=/bin/sh HOME=/var/lib/rabbitmq USER=rabbitmq LOGNAME=rabbitmq PATH="$$(eval `PATH=MACPORTS_PREFIX/bin /usr/libexec/path_helper -s`; echo $$PATH)" su -m rabbitmq -c|' \
$(DEST)/files/rabbitmq-script-wrapper
cp patch-org.macports.rabbitmq-server.plist.diff $(DEST)/files
if [ -n "$(MACPORTS_USERHOST)" ] ; then \
- tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) lshift@macrabbit ' \
+ tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) $(MACPORTS_USERHOST) ' \
d="/tmp/mkportindex.$$$$" ; \
mkdir $$d \
&& cd $$d \
@@ -52,4 +59,4 @@ macports: dirs $(DEST)/Portfile
fi
clean:
- rm -rf $(DEST) checksums.sed
+ rm -rf $(MACPORTS_DIR) checksums.sed
diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in
index 153727be9a..be0d24d75f 100644
--- a/packaging/macports/Portfile.in
+++ b/packaging/macports/Portfile.in
@@ -4,9 +4,8 @@
PortSystem 1.0
name rabbitmq-server
version @VERSION@
-revision 1
categories net
-maintainers rabbitmq.com:tonyg
+maintainers paperplanes.de:meyer rabbitmq.com:tonyg openmaintainer
platforms darwin
description The RabbitMQ AMQP Server
long_description \
@@ -23,8 +22,8 @@ checksums \
sha1 @sha1@ \
rmd160 @rmd160@
-depends_build port:erlang port:xmlto port:libxslt
-depends_run port:erlang
+depends_lib port:erlang
+depends_build port:xmlto port:libxslt
platform darwin 7 {
depends_build-append port:py25-simplejson
@@ -98,6 +97,8 @@ post-destroot {
xinstall -m 555 ${filespath}/rabbitmq-asroot-script-wrapper \
${wrappersbin}/rabbitmq-activate-plugins
+ reinplace -E "s:MACPORTS_PREFIX/bin:${prefix}/bin:" \
+ ${wrappersbin}/rabbitmq-multi
reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \
${wrappersbin}/rabbitmq-multi
reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \
diff --git a/packaging/macports/make-port-diff.sh b/packaging/macports/make-port-diff.sh
new file mode 100755
index 0000000000..3eb1b9f589
--- /dev/null
+++ b/packaging/macports/make-port-diff.sh
@@ -0,0 +1,27 @@
+#!/bin/bash
+
+# This script grabs the latest rabbitmq-server bits from the main
+# macports subversion repo, and from the rabbitmq.com macports repo,
+# and produces a diff from the former to the latter for submission
+# through the macports trac.
+
+set -e
+
+dir=/tmp/$(basename $0).$$
+mkdir -p $dir/macports $dir/rabbitmq
+
+# Get the files from the macports subversion repo
+cd $dir/macports
+svn checkout http://svn.macports.org/repository/macports/trunk/dports/net/rabbitmq-server/ 2>&1 >/dev/null
+
+# Clear out the svn $id tag
+sed -i -e 's|^# \$.*$|# $Id$|' rabbitmq-server/Portfile
+
+# Get the files from the rabbitmq.com macports repo
+cd ../rabbitmq
+curl -s http://www.rabbitmq.com/releases/macports/net/rabbitmq-server.tgz | tar xzf -
+
+cd ..
+diff -Naur --exclude=.svn macports rabbitmq
+cd /
+rm -rf $dir
diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi
index 8341d35c8c..5905069204 100755
--- a/scripts/rabbitmq-multi
+++ b/scripts/rabbitmq-multi
@@ -29,7 +29,8 @@
##
## Contributor(s): ______________________________________.
##
-NODENAME=rabbit
+[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s`
+NODENAME=rabbit@${HOSTNAME%%.*}
SCRIPT_HOME=$(dirname $0)
PIDS_FILE=/var/lib/rabbitmq/pids
MULTI_ERL_ARGS=
diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat
index a4b7f2e99b..a4f8c8b448 100644
--- a/scripts/rabbitmq-multi.bat
+++ b/scripts/rabbitmq-multi.bat
@@ -42,8 +42,12 @@ if "!RABBITMQ_BASE!"=="" (
set RABBITMQ_BASE=!APPDATA!\RabbitMQ
)
+if "!COMPUTERNAME!"=="" (
+ set COMPUTERNAME=localhost
+)
+
if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index ccdfc40160..2261b56ef5 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -30,7 +30,8 @@
## Contributor(s): ______________________________________.
##
-NODENAME=rabbit
+[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s`
+NODENAME=rabbit@${HOSTNAME%%.*}
SERVER_ERL_ARGS="+K true +A30 +P 1048576 \
-kernel inet_default_listen_options [{nodelay,true}] \
-kernel inet_default_connect_options [{nodelay,true}]"
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 57fe1328ce..a290f9356c 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -42,8 +42,12 @@ if "!RABBITMQ_BASE!"=="" (
set RABBITMQ_BASE=!APPDATA!\RabbitMQ
)
+if "!COMPUTERNAME!"=="" (
+ set COMPUTERNAME=localhost
+)
+
if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index a4021fd6a1..bd117b83f4 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -48,8 +48,12 @@ if "!RABBITMQ_BASE!"=="" (
set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
+if "!COMPUTERNAME!"=="" (
+ set COMPUTERNAME=localhost
+)
+
if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index cfb775eb67..92e5312bb2 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -30,7 +30,8 @@
## Contributor(s): ______________________________________.
##
-NODENAME=rabbit
+[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s`
+NODENAME=rabbit@${HOSTNAME%%.*}
. `dirname $0`/rabbitmq-env
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index 5557245165..563b9e58e9 100644
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -38,8 +38,12 @@ set TDP0=%~dp0
set STAR=%*
setlocal enabledelayedexpansion
+if "!COMPUTERNAME!"=="" (
+ set COMPUTERNAME=localhost
+)
+
if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
if not exist "!ERLANG_HOME!\bin\erl.exe" (
diff --git a/src/delegate.erl b/src/delegate.erl
index 12eb814f8f..3f57953bf7 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -44,9 +44,10 @@
-ifdef(use_specs).
--spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()}).
--spec(invoke_no_result/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok').
--spec(invoke/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A).
+-spec(start_link/1 :: (non_neg_integer()) -> rabbit_types:ok(pid())).
+-spec(invoke_no_result/2 ::
+ (pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
+-spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A).
-spec(process_count/0 :: () -> non_neg_integer()).
@@ -63,7 +64,7 @@ start_link(Hash) ->
gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []).
invoke(Pid, Fun) when is_pid(Pid) ->
- [Res] = invoke_per_node([{node(Pid), [Pid]}], Fun),
+ [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun),
case Res of
{ok, Result, _} ->
Result;
@@ -73,7 +74,7 @@ invoke(Pid, Fun) when is_pid(Pid) ->
invoke(Pids, Fun) when is_list(Pids) ->
lists:foldl(
- fun({Status, Result, Pid}, {Good, Bad}) ->
+ fun ({Status, Result, Pid}, {Good, Bad}) ->
case Status of
ok -> {[{Pid, Result}|Good], Bad};
error -> {Good, [{Pid, Result}|Bad]}
@@ -83,7 +84,7 @@ invoke(Pids, Fun) when is_list(Pids) ->
invoke_per_node(split_delegate_per_node(Pids), Fun)).
invoke_no_result(Pid, Fun) when is_pid(Pid) ->
- invoke_no_result_per_node([{node(Pid), [Pid]}], Fun),
+ invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun),
ok;
invoke_no_result(Pids, Fun) when is_list(Pids) ->
@@ -99,42 +100,47 @@ internal_cast(Node, Thunk) when is_atom(Node) ->
gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}).
split_delegate_per_node(Pids) ->
- orddict:to_list(
- lists:foldl(
- fun (Pid, D) ->
- orddict:update(node(Pid),
- fun (Pids1) -> [Pid | Pids1] end,
- [Pid], D)
- end,
- orddict:new(), Pids)).
+ LocalNode = node(),
+ {Local, Remote} =
+ lists:foldl(
+ fun (Pid, {L, D}) ->
+ Node = node(Pid),
+ case Node of
+ LocalNode -> {[Pid|L], D};
+ _ -> {L, orddict:append(Node, Pid, D)}
+ end
+ end,
+ {[], orddict:new()}, Pids),
+ {Local, orddict:to_list(Remote)}.
-invoke_per_node([{Node, Pids}], Fun) when Node == node() ->
- safe_invoke(Pids, Fun);
invoke_per_node(NodePids, Fun) ->
lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)).
-invoke_no_result_per_node([{Node, Pids}], Fun) when Node == node() ->
- %% This is not actually async! However, in practice Fun will
- %% always be something that does a gen_server:cast or similar, so
- %% I don't think it's a problem unless someone misuses this
- %% function. Making this *actually* async would be painful as we
- %% can't spawn at this point or we break effect ordering.
- safe_invoke(Pids, Fun);
invoke_no_result_per_node(NodePids, Fun) ->
delegate_per_node(NodePids, Fun, fun internal_cast/2),
ok.
-delegate_per_node(NodePids, Fun, DelegateFun) ->
+delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) ->
+ %% In the case where DelegateFun is internal_cast, the safe_invoke
+ %% is not actually async! However, in practice Fun will always be
+ %% something that does a gen_server:cast or similar, so I don't
+ %% think it's a problem unless someone misuses this
+ %% function. Making this *actually* async would be painful as we
+ %% can't spawn at this point or we break effect ordering.
+ [safe_invoke(LocalPids, Fun)|
+ delegate_per_remote_node(NodePids, Fun, DelegateFun)].
+
+delegate_per_remote_node(NodePids, Fun, DelegateFun) ->
Self = self(),
%% Note that this is unsafe if the Fun requires reentrancy to the
%% local_server. I.e. if self() == local_server(Node) then we'll
%% block forever.
[gen_server2:cast(
local_server(Node),
- {thunk, fun() ->
+ {thunk, fun () ->
Self ! {result,
DelegateFun(
- Node, fun() -> safe_invoke(Pids, Fun) end)}
+ Node, fun () -> safe_invoke(Pids, Fun) end)}
end}) || {Node, Pids} <- NodePids],
[receive {result, Result} -> Result end || _ <- NodePids].
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index 1c1d62a95d..39ef3f85b8 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -43,7 +43,7 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/0 :: () -> rabbit_types:ok_or_error2(pid(), any()) | 'ignore').
-endif.
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 0f648dcd2b..e209ee6be4 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -182,18 +182,18 @@
-ifdef(use_specs).
-type(ref() :: any()).
--type(error() :: {'error', any()}).
--type(ok_or_error() :: ('ok' | error())).
--type(val_or_error(T) :: ({'ok', T} | error())).
+-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
+-type(val_or_error(T) :: rabbit_types:ok_or_error2(T, any())).
-type(position() :: ('bof' | 'eof' | non_neg_integer() |
- {('bof' |'eof'), non_neg_integer()} | {'cur', integer()})).
+ {('bof' |'eof'), non_neg_integer()} |
+ {'cur', integer()})).
-type(offset() :: non_neg_integer()).
-spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok').
-spec(open/3 ::
- (string(), [any()],
- [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}]) ->
- val_or_error(ref())).
+ (string(), [any()],
+ [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}])
+ -> val_or_error(ref())).
-spec(close/1 :: (ref()) -> ok_or_error()).
-spec(read/2 :: (ref(), non_neg_integer()) ->
val_or_error([char()] | binary()) | 'eof').
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 5b899cdbc7..49ae63c1d5 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -186,7 +186,7 @@
-ifdef(use_specs).
-spec(handle_common_termination/6 ::
- (any(), any(), any(), atom(), any(), any()) -> no_return()).
+ (any(), any(), any(), atom(), any(), any()) -> no_return()).
-spec(hibernate/7 ::
(pid(), any(), any(), atom(), any(), queue(), any()) -> no_return()).
@@ -639,7 +639,7 @@ do_multi_call(Nodes, Name, Req, Timeout) ->
Caller = self(),
Receiver =
spawn(
- fun() ->
+ fun () ->
%% Middleman process. Should be unsensitive to regular
%% exit signals. The sychronization is needed in case
%% the receiver would exit before the caller started
diff --git a/src/pg_local.erl b/src/pg_local.erl
index 1501331d6b..f5ded123d7 100644
--- a/src/pg_local.erl
+++ b/src/pg_local.erl
@@ -36,8 +36,8 @@
-export([join/2, leave/2, get_members/1]).
-export([sync/0]). %% intended for testing only; not part of official API
--export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2,
- terminate/2]).
+-export([start/0, start_link/0, init/1, handle_call/3, handle_cast/2,
+ handle_info/2, terminate/2]).
%%----------------------------------------------------------------------------
@@ -45,8 +45,8 @@
-type(name() :: term()).
--spec(start_link/0 :: () -> {'ok', pid()} | {'error', term()}).
--spec(start/0 :: () -> {'ok', pid()} | {'error', term()}).
+-spec(start_link/0 :: () -> rabbit_types:ok_or_error2(pid(), term())).
+-spec(start/0 :: () -> rabbit_types:ok_or_error2(pid(), term())).
-spec(join/2 :: (name(), pid()) -> 'ok').
-spec(leave/2 :: (name(), pid()) -> 'ok').
-spec(get_members/1 :: (name()) -> [pid()]).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 67f8df947b..18045b94fc 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -33,7 +33,8 @@
-behaviour(application).
--export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, rotate_logs/1]).
+-export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0,
+ rotate_logs/1]).
-export([start/2, stop/1]).
@@ -183,18 +184,19 @@
-ifdef(use_specs).
--type(log_location() :: 'tty' | 'undefined' | string()).
-type(file_suffix() :: binary()).
+%% this really should be an abstract type
+-type(log_location() :: 'tty' | 'undefined' | file:filename()).
-spec(prepare/0 :: () -> 'ok').
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_halt/0 :: () -> 'ok').
--spec(rotate_logs/1 :: (file_suffix()) -> 'ok' | {'error', any()}).
--spec(status/0 :: () ->
- [{running_applications, [{atom(), string(), string()}]} |
- {nodes, [erlang_node()]} |
- {running_nodes, [erlang_node()]}]).
+-spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())).
+-spec(status/0 ::
+ () -> [{running_applications, [{atom(), string(), string()}]} |
+ {nodes, [{rabbit_mnesia:node_type(), [node()]}]} |
+ {running_nodes, [node()]}]).
-spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()).
-endif.
@@ -299,6 +301,18 @@ run_boot_step({StepName, Attributes}) ->
ok
end.
+module_attributes(Module) ->
+ case catch Module:module_info(attributes) of
+ {'EXIT', {undef, [{Module, module_info, _} | _]}} ->
+ io:format("WARNING: module ~p not found, so not scanned for boot steps.~n",
+ [Module]),
+ [];
+ {'EXIT', Reason} ->
+ exit(Reason);
+ V ->
+ V
+ end.
+
boot_steps() ->
AllApps = [App || {App, _, _} <- application:loaded_applications()],
Modules = lists:usort(
@@ -310,7 +324,7 @@ boot_steps() ->
lists:flatmap(fun (Module) ->
[{StepName, Attributes}
|| {rabbit_boot_step, [{StepName, Attributes}]}
- <- Module:module_info(attributes)]
+ <- module_attributes(Module)]
end, Modules),
sort_boot_steps(UnsortedSteps).
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index a445f44197..30bae25e5a 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -45,28 +45,38 @@
-ifdef(use_specs).
+-export_type([username/0, password/0]).
+
-type(permission_atom() :: 'configure' | 'read' | 'write').
+-type(username() :: binary()).
+-type(password() :: binary()).
+-type(regexp() :: binary()).
--spec(check_login/2 :: (binary(), binary()) -> user()).
--spec(user_pass_login/2 :: (username(), password()) -> user()).
--spec(check_vhost_access/2 :: (user(), vhost()) -> 'ok').
+-spec(check_login/2 :: (binary(), binary()) -> rabbit_types:user()).
+-spec(user_pass_login/2 :: (username(), password()) -> rabbit_types:user()).
+-spec(check_vhost_access/2 ::
+ (rabbit_types:user(), rabbit_types:vhost()) -> 'ok').
-spec(check_resource_access/3 ::
- (username(), r(atom()), permission_atom()) -> 'ok').
+ (username(), rabbit_types:r(atom()), permission_atom()) -> 'ok').
-spec(add_user/2 :: (username(), password()) -> 'ok').
-spec(delete_user/1 :: (username()) -> 'ok').
-spec(change_password/2 :: (username(), password()) -> 'ok').
-spec(list_users/0 :: () -> [username()]).
--spec(lookup_user/1 :: (username()) -> {'ok', user()} | not_found()).
--spec(add_vhost/1 :: (vhost()) -> 'ok').
--spec(delete_vhost/1 :: (vhost()) -> 'ok').
--spec(list_vhosts/0 :: () -> [vhost()]).
--spec(set_permissions/5 ::
- (username(), vhost(), regexp(), regexp(), regexp()) -> 'ok').
--spec(clear_permissions/2 :: (username(), vhost()) -> 'ok').
+-spec(lookup_user/1 ::
+ (username()) -> rabbit_types:ok(rabbit_types:user())
+ | rabbit_types:error('not_found')).
+-spec(add_vhost/1 :: (rabbit_types:vhost()) -> 'ok').
+-spec(delete_vhost/1 :: (rabbit_types:vhost()) -> 'ok').
+-spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]).
+-spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(),
+ regexp(), regexp()) -> 'ok').
+-spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok').
-spec(list_vhost_permissions/1 ::
- (vhost()) -> [{username(), regexp(), regexp(), regexp()}]).
+ (rabbit_types:vhost())
+ -> [{username(), regexp(), regexp(), regexp()}]).
-spec(list_user_permissions/1 ::
- (username()) -> [{vhost(), regexp(), regexp(), regexp()}]).
+ (username())
+ -> [{rabbit_types:vhost(), regexp(), regexp(), regexp()}]).
-endif.
@@ -162,9 +172,14 @@ check_resource_access(Username,
[] ->
false;
[#user_permission{permission = P}] ->
+ PermRegexp = case element(permission_index(Permission), P) of
+ %% <<"^$">> breaks Emacs' erlang mode
+ <<"">> -> <<$^, $$>>;
+ RE -> RE
+ end,
case regexp:match(
binary_to_list(Name),
- binary_to_list(element(permission_index(Permission), P))) of
+ binary_to_list(PermRegexp)) of
{match, _, _} -> true;
nomatch -> false
end
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 7e96d9a3a8..53c713e66b 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -47,7 +47,7 @@
-type(mfa_tuple() :: {atom(), atom(), list()}).
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(register/2 :: (pid(), mfa_tuple()) -> 'ok').
+-spec(register/2 :: (pid(), mfa_tuple()) -> boolean()).
-endif.
@@ -67,9 +67,9 @@ stop() ->
ok = alarm_handler:delete_alarm_handler(?MODULE).
register(Pid, HighMemMFA) ->
- ok = gen_event:call(alarm_handler, ?MODULE,
- {register, Pid, HighMemMFA},
- infinity).
+ gen_event:call(alarm_handler, ?MODULE,
+ {register, Pid, HighMemMFA},
+ infinity).
%%----------------------------------------------------------------------------
@@ -84,7 +84,8 @@ handle_call({register, Pid, {M, F, A} = HighMemMFA},
false -> ok
end,
NewAlertees = dict:store(Pid, HighMemMFA, Alertess),
- {ok, ok, State#alarms{alertees = NewAlertees}};
+ {ok, State#alarms.vm_memory_high_watermark,
+ State#alarms{alertees = NewAlertees}};
handle_call(_Request, State) ->
{ok, not_understood, State}.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7b88c45d26..f1b527681c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,18 +31,18 @@
-module(rabbit_amqqueue).
--export([start/0, declare/4, delete/3, purge/1]).
+-export([start/0, declare/5, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
update_ram_duration/1, set_ram_duration_target/2,
set_maximum_since_use/2]).
-export([pseudo_queue/2]).
--export([lookup/1, with/2, with_or_die/2,
- stat/1, stat_all/0, deliver/2, requeue/3, ack/4]).
+-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
+ check_exclusive_access/2, with_exclusive_access_or_die/3,
+ stat/1, deliver/2, requeue/3, ack/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([consumers/1, consumers_all/1]).
--export([claim_queue/2]).
--export([basic_get/3, basic_consume/8, basic_cancel/4]).
+-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2, unblock/2, flush_all/2]).
-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
@@ -59,64 +59,94 @@
-ifdef(use_specs).
--type(qstats() :: {'ok', queue_name(), non_neg_integer(), non_neg_integer()}).
--type(qlen() :: {'ok', non_neg_integer()}).
--type(qfun(A) :: fun ((amqqueue()) -> A)).
+-export_type([name/0, qmsg/0]).
+
+-type(name() :: rabbit_types:r('queue')).
+
+-type(qlen() :: rabbit_types:ok(non_neg_integer())).
+-type(qfun(A) :: fun ((rabbit_types:amqqueue()) -> A)).
+-type(qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}).
+-type(msg_id() :: non_neg_integer()).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-spec(start/0 :: () -> 'ok').
--spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) ->
- amqqueue()).
--spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
--spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
--spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A).
--spec(list/1 :: (vhost()) -> [amqqueue()]).
--spec(info_keys/0 :: () -> [info_key()]).
--spec(info/1 :: (amqqueue()) -> [info()]).
--spec(info/2 :: (amqqueue(), [info_key()]) -> [info()]).
--spec(info_all/1 :: (vhost()) -> [[info()]]).
--spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
--spec(consumers/1 :: (amqqueue()) -> [{pid(), ctag(), boolean()}]).
+-spec(declare/5 ::
+ (name(), boolean(), boolean(),
+ rabbit_framing:amqp_table(), rabbit_types:maybe(pid()))
+ -> {'new' | 'existing', rabbit_types:amqqueue()}).
+-spec(lookup/1 ::
+ (name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
+ rabbit_types:error('not_found')).
+-spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')).
+-spec(with_or_die/2 :: (name(), qfun(A)) -> A).
+-spec(assert_equivalence/5 ::
+ (rabbit_types:amqqueue(), boolean(), boolean(),
+ rabbit_framing:amqp_table(), rabbit_types:maybe(pid))
+ -> ok).
+-spec(check_exclusive_access/2 :: (rabbit_types:amqqueue(), pid()) -> 'ok').
+-spec(with_exclusive_access_or_die/3 :: (name(), pid(), qfun(A)) -> A).
+-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:amqqueue()]).
+-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
+-spec(info/1 :: (rabbit_types:amqqueue()) -> [rabbit_types:info()]).
+-spec(info/2 ::
+ (rabbit_types:amqqueue(), [rabbit_types:info_key()])
+ -> [rabbit_types:info()]).
+-spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]).
+-spec(info_all/2 :: (rabbit_types:vhost(), [rabbit_types:info_key()])
+ -> [[rabbit_types:info()]]).
+-spec(consumers/1 ::
+ (rabbit_types:amqqueue())
+ -> [{pid(), rabbit_types:ctag(), boolean()}]).
-spec(consumers_all/1 ::
- (vhost()) -> [{queue_name(), pid(), ctag(), boolean()}]).
--spec(stat/1 :: (amqqueue()) -> qstats()).
--spec(stat_all/0 :: () -> [qstats()]).
+ (rabbit_types:vhost())
+ -> [{name(), pid(), rabbit_types:ctag(), boolean()}]).
+-spec(stat/1 ::
+ (rabbit_types:amqqueue())
+ -> {'ok', non_neg_integer(), non_neg_integer()}).
-spec(delete/3 ::
- (amqqueue(), 'false', 'false') -> qlen();
- (amqqueue(), 'true' , 'false') -> qlen() | {'error', 'in_use'};
- (amqqueue(), 'false', 'true' ) -> qlen() | {'error', 'not_empty'};
- (amqqueue(), 'true' , 'true' ) -> qlen() |
- {'error', 'in_use'} |
- {'error', 'not_empty'}).
--spec(purge/1 :: (amqqueue()) -> qlen()).
--spec(deliver/2 :: (pid(), delivery()) -> boolean()).
+ (rabbit_types:amqqueue(), 'false', 'false')
+ -> qlen();
+ (rabbit_types:amqqueue(), 'true' , 'false')
+ -> qlen() | rabbit_types:error('in_use');
+ (rabbit_types:amqqueue(), 'false', 'true' )
+ -> qlen() | rabbit_types:error('not_empty');
+ (rabbit_types:amqqueue(), 'true' , 'true' )
+ -> qlen() |
+ rabbit_types:error('in_use') |
+ rabbit_types:error('not_empty')).
+-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
+-spec(deliver/2 :: (pid(), rabbit_types:delivery()) -> boolean()).
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
--spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
--spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()).
--spec(rollback_all/3 :: ([pid()], txn(), pid()) -> 'ok').
+-spec(ack/4 ::
+ (pid(), rabbit_types:maybe(rabbit_types:txn()), [msg_id()], pid())
+ -> 'ok').
+-spec(commit_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> ok_or_errors()).
+-spec(rollback_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> 'ok').
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
--spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
--spec(basic_get/3 :: (amqqueue(), pid(), boolean()) ->
+-spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
--spec(basic_consume/8 ::
- (amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(),
- boolean(), any()) ->
- 'ok' | {'error', 'queue_owned_by_another_connection' |
- 'exclusive_consume_unavailable'}).
--spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
+-spec(basic_consume/7 ::
+ (rabbit_types:amqqueue(), boolean(), pid(), pid() | 'undefined',
+ rabbit_types:ctag(), boolean(), any())
+ -> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
+-spec(basic_cancel/4 ::
+ (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
--spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue() | 'not_found').
--spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
--spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> A))) -> 'ok').
+-spec(internal_declare/2 ::
+ (rabbit_types:amqqueue(), boolean())
+ -> rabbit_types:amqqueue() | 'not_found').
+-spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found')).
+-spec(maybe_run_queue_via_backing_queue/2 ::
+ (pid(), (fun ((A) -> A))) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
--spec(set_ram_duration_target/2 :: (pid(), number()) -> 'ok').
+-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
--spec(on_node_down/1 :: (erlang_node()) -> 'ok').
--spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
+-spec(on_node_down/1 :: (node()) -> 'ok').
+-spec(pseudo_queue/2 :: (binary(), pid()) -> rabbit_types:amqqueue()).
-endif.
@@ -148,11 +178,12 @@ recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(Q) || Q <- DurableQueues],
[Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q].
-declare(QueueName, Durable, AutoDelete, Args) ->
+declare(QueueName, Durable, AutoDelete, Args, Owner) ->
Q = start_queue_process(#amqqueue{name = QueueName,
durable = Durable,
auto_delete = AutoDelete,
arguments = Args,
+ exclusive_owner = Owner,
pid = none}),
case gen_server2:call(Q#amqqueue.pid, {init, false}) of
not_found -> rabbit_misc:not_found(QueueName);
@@ -197,7 +228,8 @@ start_queue_process(Q) ->
add_default_binding(#amqqueue{name = QueueName}) ->
Exchange = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
- rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []),
+ rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [],
+ fun (_X, _Q) -> ok end),
ok.
lookup(Name) ->
@@ -214,6 +246,31 @@ with(Name, F) ->
with_or_die(Name, F) ->
with(Name, F, fun () -> rabbit_misc:not_found(Name) end).
+assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q,
+ Durable, AutoDelete, _Args, Owner) ->
+ check_exclusive_access(Q, Owner, strict);
+assert_equivalence(#amqqueue{name = QueueName},
+ _Durable, _AutoDelete, _Args, _Owner) ->
+ rabbit_misc:protocol_error(
+ not_allowed, "parameters for ~s not equivalent",
+ [rabbit_misc:rs(QueueName)]).
+
+check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax).
+
+check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) ->
+ ok;
+check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) ->
+ ok;
+check_exclusive_access(#amqqueue{name = QueueName}, _ReaderPid, _MatchType) ->
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QueueName)]).
+
+with_exclusive_access_or_die(Name, ReaderPid, F) ->
+ with_or_die(Name,
+ fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end).
+
list(VHostPath) ->
mnesia:dirty_match_object(
rabbit_queue,
@@ -248,9 +305,6 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
-stat_all() ->
- lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)).
-
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
@@ -269,7 +323,7 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
true.
requeue(QPid, MsgIds, ChPid) ->
- delegate_cast(QPid, {requeue, MsgIds, ChPid}).
+ delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity).
ack(QPid, Txn, MsgIds, ChPid) ->
delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
@@ -298,15 +352,12 @@ limit_all(QPids, ChPid, LimiterPid) ->
gen_server2:cast(QPid, {limit, ChPid, LimiterPid})
end).
-claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
- delegate_call(QPid, {claim_queue, ReaderPid}, infinity).
-
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
delegate_call(QPid, {basic_get, ChPid, NoAck}, infinity).
-basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
+basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- delegate_call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
+ delegate_call(QPid, {basic_consume, NoAck, ChPid,
LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
infinity).
@@ -324,19 +375,21 @@ flush_all(QPids, ChPid) ->
delegate:invoke_no_result(
QPids, fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end).
+internal_delete1(QueueName) ->
+ ok = mnesia:delete({rabbit_queue, QueueName}),
+ ok = mnesia:delete({rabbit_durable_queue, QueueName}),
+ %% we want to execute some things, as
+ %% decided by rabbit_exchange, after the
+ %% transaction.
+ rabbit_exchange:delete_queue_bindings(QueueName).
+
internal_delete(QueueName) ->
case
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [_] ->
- ok = mnesia:delete({rabbit_queue, QueueName}),
- ok = mnesia:delete({rabbit_durable_queue, QueueName}),
- %% we want to execute some things, as
- %% decided by rabbit_exchange, after the
- %% transaction.
- rabbit_exchange:delete_queue_bindings(QueueName)
+ [_] -> internal_delete1(QueueName)
end
end) of
Err = {error, _} -> Err;
@@ -394,15 +447,13 @@ safe_delegate_call_ok(H, F, Pids) ->
end.
delegate_call(Pid, Msg, Timeout) ->
- delegate:invoke(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end).
+ delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
delegate_pcall(Pid, Pri, Msg, Timeout) ->
- delegate:invoke(Pid, fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
-
-delegate_cast(Pid, Msg) ->
- delegate:invoke_no_result(Pid, fun(P) -> gen_server2:cast(P, Msg) end).
+ delegate:invoke(Pid,
+ fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
delegate_pcast(Pid, Pri, Msg) ->
delegate:invoke_no_result(Pid,
- fun(P) -> gen_server2:pcast(P, Pri, Msg) end).
+ fun (P) -> gen_server2:pcast(P, Pri, Msg) end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f12e1b70f8..2fb60e9675 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -35,7 +35,7 @@
-behaviour(gen_server2).
--define(UNSENT_MESSAGE_LIMIT, 100).
+-define(UNSENT_MESSAGE_LIMIT, 100).
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
@@ -50,7 +50,6 @@
% Queue's state
-record(q, {q,
- owner,
exclusive_consumer,
has_had_consumers,
backing_queue,
@@ -104,7 +103,6 @@ init(Q) ->
{ok, BQ} = application:get_env(backing_queue_module),
{ok, #q{q = Q#amqqueue{pid = self()},
- owner = none,
exclusive_consumer = none,
has_had_consumers = false,
backing_queue = BQ,
@@ -134,6 +132,23 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
+declare(Recover, From,
+ State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
+ backing_queue = BQ, backing_queue_state = undefined}) ->
+ case rabbit_amqqueue:internal_declare(Q, Recover) of
+ not_found -> {stop, normal, not_found, State};
+ Q -> gen_server2:reply(From, {new, Q}),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use,
+ [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue,
+ set_ram_duration_target, [self()]}),
+ BQS = BQ:init(QName, IsDurable, Recover),
+ noreply(State#q{backing_queue_state = BQS});
+ Q1 -> {stop, normal, {existing, Q1}, State}
+ end.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),
@@ -433,10 +448,6 @@ cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) ->
cancel_holder(_ChPid, _ConsumerTag, Holder) ->
Holder.
-check_queue_owner(none, _) -> ok;
-check_queue_owner({ReaderPid, _}, ReaderPid) -> ok;
-check_queue_owner({_, _}, _) -> mismatch.
-
check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
check_exclusive_access(none, false, _State) ->
@@ -488,10 +499,10 @@ i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete;
i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments;
i(pid, _) ->
self();
-i(owner_pid, #q{owner = none}) ->
+i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) ->
'';
-i(owner_pid, #q{owner = {ReaderPid, _MonitorRef}}) ->
- ReaderPid;
+i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) ->
+ ExclusiveOwner;
i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
'';
i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
@@ -520,25 +531,24 @@ i(Item, _) ->
%---------------------------------------------------------------------------
handle_call({init, Recover}, From,
- State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined}) ->
- %% TODO: If we're exclusively owned && our owner isn't alive &&
- %% Recover then we should BQ:init and then {stop, normal,
- %% not_found, State}, relying on terminate to delete the queue.
- case rabbit_amqqueue:internal_declare(Q, Recover) of
- not_found ->
- {stop, normal, not_found, State};
- Q ->
- gen_server2:reply(From, Q),
- ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use, [self()]),
- ok = rabbit_memory_monitor:register(
- self(),
- {rabbit_amqqueue, set_ram_duration_target, [self()]}),
- noreply(State#q{backing_queue_state =
- BQ:init(QName, IsDurable, Recover)});
- Q1 ->
- {stop, normal, Q1, State}
+ State = #q{q = #amqqueue{exclusive_owner = none}}) ->
+ declare(Recover, From, State);
+
+handle_call({init, Recover}, From,
+ State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
+ case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of
+ true -> erlang:monitor(process, Owner),
+ declare(Recover, From, State);
+ _ -> #q{q = #amqqueue{name = QName, durable = IsDurable},
+ backing_queue = BQ, backing_queue_state = undefined} = State,
+ case Recover of
+ true -> ok;
+ _ -> rabbit_log:warning(
+ "Queue ~p exclusive owner went away~n", [QName])
+ end,
+ BQS = BQ:init(QName, IsDurable, Recover),
+ %% Rely on terminate to delete the queue.
+ {stop, normal, not_found, State#q{backing_queue_state = BQS}}
end;
handle_call(info, _From, State) ->
@@ -613,51 +623,44 @@ handle_call({basic_get, ChPid, NoAck}, _From,
reply({ok, Remaining, Msg}, State#q{backing_queue_state = BQS1})
end;
-handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
+handle_call({basic_consume, NoAck, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg},
- _From, State = #q{owner = Owner,
- exclusive_consumer = ExistingHolder}) ->
- case check_queue_owner(Owner, ReaderPid) of
- mismatch ->
- reply({error, queue_owned_by_another_connection}, State);
+ _From, State = #q{exclusive_consumer = ExistingHolder}) ->
+ case check_exclusive_access(ExistingHolder, ExclusiveConsume,
+ State) of
+ in_use ->
+ reply({error, exclusive_consume_unavailable}, State);
ok ->
- case check_exclusive_access(ExistingHolder, ExclusiveConsume,
- State) of
- in_use ->
- reply({error, exclusive_consume_unavailable}, State);
- ok ->
- C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
- Consumer = #consumer{tag = ConsumerTag,
- ack_required = not NoAck},
- store_ch_record(C#cr{consumer_count = ConsumerCount +1,
- limiter_pid = LimiterPid}),
- case ConsumerCount of
- 0 -> ok = rabbit_limiter:register(LimiterPid, self());
- _ -> ok
- end,
- ExclusiveConsumer = case ExclusiveConsume of
- true -> {ChPid, ConsumerTag};
- false -> ExistingHolder
- end,
- State1 = State#q{has_had_consumers = true,
- exclusive_consumer = ExclusiveConsumer},
- ok = maybe_send_reply(ChPid, OkMsg),
- State2 =
- case is_ch_blocked(C) of
- true -> State1#q{
- blocked_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.blocked_consumers)};
- false -> run_message_queue(
- State1#q{
- active_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.active_consumers)})
- end,
- reply(ok, State2)
- end
+ C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
+ Consumer = #consumer{tag = ConsumerTag,
+ ack_required = not NoAck},
+ store_ch_record(C#cr{consumer_count = ConsumerCount +1,
+ limiter_pid = LimiterPid}),
+ ok = case ConsumerCount of
+ 0 -> rabbit_limiter:register(LimiterPid, self());
+ _ -> ok
+ end,
+ ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
+ true -> ExistingHolder
+ end,
+ State1 = State#q{has_had_consumers = true,
+ exclusive_consumer = ExclusiveConsumer},
+ ok = maybe_send_reply(ChPid, OkMsg),
+ State2 =
+ case is_ch_blocked(C) of
+ true -> State1#q{
+ blocked_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.blocked_consumers)};
+ false -> run_message_queue(
+ State1#q{
+ active_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.active_consumers)})
+ end,
+ reply(ok, State2)
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
@@ -689,11 +692,10 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end
end;
-handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
- backing_queue = BQ,
+handle_call(stat, _From, State = #q{backing_queue = BQ,
backing_queue_state = BQS,
active_consumers = ActiveConsumers}) ->
- reply({ok, Name, BQ:len(BQS), queue:len(ActiveConsumers)}, State);
+ reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
@@ -713,27 +715,17 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
{Count, BQS1} = BQ:purge(BQS),
reply({ok, Count}, State#q{backing_queue_state = BQS1});
-handle_call({claim_queue, ReaderPid}, _From,
- State = #q{owner = Owner, exclusive_consumer = Holder}) ->
- case Owner of
- none ->
- case check_exclusive_access(Holder, true, State) of
- in_use ->
- %% FIXME: Is this really the right answer? What if
- %% an active consumer's reader is actually the
- %% claiming pid? Should that be allowed? In order
- %% to check, we'd need to hold not just the ch
- %% pid for each consumer, but also its reader
- %% pid...
- reply(locked, State);
- ok ->
- MonitorRef = erlang:monitor(process, ReaderPid),
- reply(ok, State#q{owner = {ReaderPid, MonitorRef}})
- end;
- {ReaderPid, _MonitorRef} ->
- reply(ok, State);
- _ ->
- reply(locked, State)
+handle_call({requeue, AckTags, ChPid}, From, State) ->
+ gen_server2:reply(From, ok),
+ case lookup_ch(ChPid) of
+ not_found ->
+ rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
+ [ChPid]),
+ noreply(State);
+ C = #cr{acktags = ChAckTags} ->
+ ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ store_ch_record(C#cr{acktags = ChAckTags1}),
+ noreply(requeue_and_run(AckTags, State))
end;
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
@@ -763,18 +755,6 @@ handle_cast({ack, Txn, AckTags, ChPid},
handle_cast({rollback, Txn, ChPid}, State) ->
noreply(rollback_transaction(Txn, ChPid, State));
-handle_cast({requeue, AckTags, ChPid}, State) ->
- case lookup_ch(ChPid) of
- not_found ->
- rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
- [ChPid]),
- noreply(State);
- C = #cr{acktags = ChAckTags} ->
- ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- store_ch_record(C#cr{acktags = ChAckTags1}),
- noreply(requeue_and_run(AckTags, State))
- end;
-
handle_cast({unblock, ChPid}, State) ->
noreply(
possibly_unblock(State, ChPid,
@@ -825,19 +805,15 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State).
-handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
- State = #q{owner = {DownPid, MonitorRef}}) ->
- %% We know here that there are no consumers on this queue that are
- %% owned by other pids than the one that just went down, so since
- %% exclusive in some sense implies autodelete, we delete the queue
- %% here. The other way of implementing the "exclusive implies
- %% autodelete" feature is to actually set autodelete when an
- %% exclusive declaration is seen, but this has the problem that
- %% the python tests rely on the queue not going away after a
- %% basic.cancel when the queue was declared exclusive and
- %% nonautodelete.
- NewState = State#q{owner = none},
- {stop, normal, NewState};
+handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
+ State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
+ %% Exclusively owned queues must disappear with their owner. In
+ %% the case of clean shutdown we delete the queue synchronously in
+ %% the reader - although not required by the spec this seems to
+ %% match what people expect (see bug 21824). However we need this
+ %% monitor-and-async- delete in case the connection goes away
+ %% unexpectedly.
+ {stop, normal, State};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
case handle_ch_down(DownPid, State) of
{ok, NewState} -> noreply(NewState);
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 2dba00ad62..432d62900b 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 4ab7a2a0b1..03a19961fd 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -42,24 +42,41 @@
-ifdef(use_specs).
--type(properties_input() :: (amqp_properties() | [{atom(), any()}])).
--type(publish_result() :: ({ok, routing_result(), [pid()]} | not_found())).
-
--spec(publish/1 :: (delivery()) -> publish_result()).
--spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) ->
- delivery()).
--spec(message/4 :: (exchange_name(), routing_key(), properties_input(),
- binary()) -> (message() | {'error', any()})).
--spec(properties/1 :: (properties_input()) -> amqp_properties()).
--spec(publish/4 :: (exchange_name(), routing_key(), properties_input(),
- binary()) -> publish_result()).
--spec(publish/7 :: (exchange_name(), routing_key(), boolean(), boolean(),
- maybe(txn()), properties_input(), binary()) ->
- publish_result()).
--spec(build_content/2 :: (amqp_properties(), binary()) -> content()).
--spec(from_content/1 :: (content()) -> {amqp_properties(), binary()}).
+-type(properties_input() ::
+ (rabbit_framing:amqp_property_record() | [{atom(), any()}])).
+-type(publish_result() ::
+ ({ok, rabbit_router:routing_result(), [pid()]}
+ | rabbit_types:error('not_found'))).
+
+-spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()).
+-spec(delivery/4 ::
+ (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
+ rabbit_types:message())
+ -> rabbit_types:delivery()).
+-spec(message/4 ::
+ (rabbit_exchange:name(), rabbit_router:routing_key(),
+ properties_input(), binary())
+ -> (rabbit_types:message() | rabbit_types:error(any()))).
+-spec(properties/1 ::
+ (properties_input()) -> rabbit_framing:amqp_property_record()).
+-spec(publish/4 ::
+ (rabbit_exchange:name(), rabbit_router:routing_key(),
+ properties_input(), binary())
+ -> publish_result()).
+-spec(publish/7 ::
+ (rabbit_exchange:name(), rabbit_router:routing_key(),
+ boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
+ properties_input(), binary())
+ -> publish_result()).
+-spec(build_content/2 ::
+ (rabbit_framing:amqp_property_record(), binary())
+ -> rabbit_types:content()).
+-spec(from_content/1 ::
+ (rabbit_types:content())
+ -> {rabbit_framing:amqp_property_record(), binary()}).
-spec(is_message_persistent/1 ::
- (decoded_content()) -> (boolean() | {'invalid', non_neg_integer()})).
+ (rabbit_types:decoded_content())
+ -> (boolean() | {'invalid', non_neg_integer()})).
-endif.
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index b188c42628..2f48a5d4fe 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -58,15 +58,21 @@
-type(frame() :: [binary()]).
-spec(build_simple_method_frame/2 ::
- (channel_number(), amqp_method()) -> frame()).
+ (rabbit_channel:channel_number(), rabbit_framing:amqp_method_record())
+ -> frame()).
-spec(build_simple_content_frames/3 ::
- (channel_number(), content(), non_neg_integer()) -> [frame()]).
+ (rabbit_channel:channel_number(), rabbit_types:content(),
+ non_neg_integer())
+ -> [frame()]).
-spec(build_heartbeat_frame/0 :: () -> frame()).
--spec(generate_table/1 :: (amqp_table()) -> binary()).
--spec(encode_properties/2 :: ([amqp_property_type()], [any()]) -> binary()).
+-spec(generate_table/1 :: (rabbit_framing:amqp_table()) -> binary()).
+-spec(encode_properties/2 ::
+ ([rabbit_framing:amqp_property_type()], [any()]) -> binary()).
-spec(check_empty_content_body_frame_size/0 :: () -> 'ok').
--spec(ensure_content_encoded/1 :: (content()) -> encoded_content()).
--spec(clear_encoded_content/1 :: (content()) -> unencoded_content()).
+-spec(ensure_content_encoded/1 ::
+ (rabbit_types:content()) -> rabbit_types:encoded_content()).
+-spec(clear_encoded_content/1 ::
+ (rabbit_types:content()) -> rabbit_types:unencoded_content()).
-spec(map_exception/2 :: (non_neg_integer(), amqp_error()) ->
{bool(), non_neg_integer(), amqp_method()}).
@@ -121,10 +127,11 @@ build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc,
[Frag | Frags], BodyPayloadMax, ChannelInt) ->
Size = size(Frag),
{NewFragSizeRem, NewFragAcc, NewFrags} =
- case Size =< FragSizeRem of
- true -> {FragSizeRem - Size, [Frag | FragAcc], Frags};
- false -> <<Head:FragSizeRem/binary, Tail/binary>> = Frag,
- {0, [Head | FragAcc], [Tail | Frags]}
+ if Size == 0 -> {FragSizeRem, FragAcc, Frags};
+ Size =< FragSizeRem -> {FragSizeRem - Size, [Frag | FragAcc], Frags};
+ true -> <<Head:FragSizeRem/binary, Tail/binary>> =
+ Frag,
+ {0, [Head | FragAcc], [Tail | Frags]}
end,
build_content_frames(SizeAcc, FramesAcc, NewFragSizeRem, NewFragAcc,
NewFrags, BodyPayloadMax, ChannelInt).
@@ -341,4 +348,4 @@ amqp_exception_explanation(Text, Expl) ->
CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>,
if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>;
true -> CompleteTextBin
- end. \ No newline at end of file
+ end.
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index e022a1fafe..69e34440b8 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -42,10 +42,13 @@
-ifdef(use_specs).
--spec(parse_table/1 :: (binary()) -> amqp_table()).
--spec(parse_properties/2 :: ([amqp_property_type()], binary()) -> [any()]).
--spec(ensure_content_decoded/1 :: (content()) -> decoded_content()).
--spec(clear_decoded_content/1 :: (content()) -> undecoded_content()).
+-spec(parse_table/1 :: (binary()) -> rabbit_framing:amqp_table()).
+-spec(parse_properties/2 ::
+ ([rabbit_framing:amqp_property_type()], binary()) -> [any()]).
+-spec(ensure_content_decoded/1 ::
+ (rabbit_types:content()) -> rabbit_types:decoded_content()).
+-spec(clear_decoded_content/1 ::
+ (rabbit_types:content()) -> rabbit_types:undecoded_content()).
-endif.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a48db9c8b3..c4db3ace73 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,20 +35,25 @@
-behaviour(gen_server2).
--export([start_link/5, do/2, do/3, shutdown/1]).
+-export([start_link/6, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
--export([init/1, terminate/2, code_change/3,
- handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]).
+-export([flow_timeout/2]).
+
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2, handle_pre_hibernate/1]).
-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking}).
+ consumer_mapping, blocking, queue_collector_pid, flow}).
+
+-record(flow, {server, client, pending}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
+-define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds
-define(INFO_KEYS,
[pid,
@@ -66,30 +71,40 @@
-ifdef(use_specs).
--spec(start_link/5 ::
- (channel_number(), pid(), pid(), username(), vhost()) -> pid()).
--spec(do/2 :: (pid(), amqp_method()) -> 'ok').
--spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok').
+-export_type([channel_number/0]).
+
+-type(ref() :: any()).
+-type(channel_number() :: non_neg_integer()).
+
+-spec(start_link/6 ::
+ (channel_number(), pid(), pid(), rabbit_access_control:username(),
+ rabbit_types:vhost(), pid()) -> pid()).
+-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
+-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
+ rabbit_types:maybe(rabbit_types:content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
--spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
--spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok').
+-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method()) -> 'ok').
+-spec(deliver/4 ::
+ (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
+ -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
+-spec(flow_timeout/2 :: (pid(), ref()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
--spec(info_keys/0 :: () -> [info_key()]).
--spec(info/1 :: (pid()) -> [info()]).
--spec(info/2 :: (pid(), [info_key()]) -> [info()]).
--spec(info_all/0 :: () -> [[info()]]).
--spec(info_all/1 :: ([info_key()]) -> [[info()]]).
+-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
+-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
+-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
+-spec(info_all/0 :: () -> [[rabbit_types:info()]]).
+-spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]).
-endif.
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, Username, VHost) ->
+start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) ->
{ok, Pid} = gen_server2:start_link(
?MODULE, [Channel, ReaderPid, WriterPid,
- Username, VHost], []),
+ Username, VHost, CollectorPid], []),
Pid.
do(Pid, Method) ->
@@ -113,6 +128,9 @@ conserve_memory(Pid, Conserve) ->
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
+flow_timeout(Pid, Ref) ->
+ gen_server2:pcast(Pid, 7, {flow_timeout, Ref}).
+
list() ->
pg_local:get_members(rabbit_channels).
@@ -135,7 +153,7 @@ info_all(Items) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
+init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
process_flag(trap_exit, true),
link(WriterPid),
ok = pg_local:join(rabbit_channels, self()),
@@ -153,7 +171,10 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
virtual_host = VHost,
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
- blocking = dict:new()},
+ blocking = dict:new(),
+ queue_collector_pid = CollectorPid,
+ flow = #flow{server = true, client = true,
+ pending = none}},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -180,11 +201,9 @@ handle_cast({method, Method, Content}, State) ->
{stop, normal, State#ch{state = terminating}}
catch
exit:Reason = #amqp_error{} ->
- ok = rollback_and_notify(State),
MethodName = rabbit_misc:method_record_type(Method),
- State#ch.reader_pid ! {channel_exit, State#ch.channel,
- Reason#amqp_error{method = MethodName}},
- {stop, normal, State#ch{state = terminating}};
+ {stop, normal, terminating(Reason#amqp_error{method = MethodName},
+ State)};
exit:normal ->
{stop, normal, State};
_:Reason ->
@@ -208,11 +227,25 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_cast({conserve_memory, Conserve}, State) ->
- ok = clear_permission_cache(),
- ok = rabbit_writer:send_command(
- State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}),
- noreply(State).
+handle_cast({conserve_memory, true}, State = #ch{state = starting}) ->
+ noreply(State);
+handle_cast({conserve_memory, false}, State = #ch{state = starting}) ->
+ ok = rabbit_writer:send_command(State#ch.writer_pid, #'channel.open_ok'{}),
+ noreply(State#ch{state = running});
+handle_cast({conserve_memory, Conserve}, State = #ch{state = running}) ->
+ flow_control(not Conserve, State);
+handle_cast({conserve_memory, _Conserve}, State) ->
+ noreply(State);
+
+handle_cast({flow_timeout, Ref},
+ State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) ->
+ {stop, normal, terminating(
+ rabbit_misc:amqp_error(
+ precondition_failed,
+ "timeout waiting for channel.flow_ok{active=~w}",
+ [not Flow], none), State)};
+handle_cast({flow_timeout, _Ref}, State) ->
+ {noreply, State}.
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State = #ch{writer_pid = WriterPid}) ->
@@ -253,20 +286,20 @@ return_ok(State, false, Msg) -> {reply, Msg, State}.
ok_msg(true, _Msg) -> undefined;
ok_msg(false, Msg) -> Msg.
-return_queue_declare_ok(State, NoWait, Q) ->
- NewState = State#ch{most_recently_declared_queue =
- (Q#amqqueue.name)#resource.name},
+terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) ->
+ ok = rollback_and_notify(State),
+ Reader ! {channel_exit, Channel, Reason},
+ State#ch{state = terminating}.
+
+return_queue_declare_ok(#resource{name = ActualName},
+ NoWait, MessageCount, ConsumerCount, State) ->
+ NewState = State#ch{most_recently_declared_queue = ActualName},
case NoWait of
true -> {noreply, NewState};
- false ->
- {ok, ActualName, MessageCount, ConsumerCount} =
- rabbit_misc:with_exit_handler(
- fun () -> {ok, Q#amqqueue.name, 0, 0} end,
- fun () -> rabbit_amqqueue:stat(Q) end),
- Reply = #'queue.declare_ok'{queue = ActualName#resource.name,
- message_count = MessageCount,
- consumer_count = ConsumerCount},
- {reply, Reply, NewState}
+ false -> Reply = #'queue.declare_ok'{queue = ActualName,
+ message_count = MessageCount,
+ consumer_count = ConsumerCount},
+ {reply, Reply, NewState}
end.
check_resource_access(Username, Resource, Perm) ->
@@ -300,7 +333,7 @@ check_read_permitted(Resource, #ch{ username = Username}) ->
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
- not_allowed, "no previously declared queue", []);
+ not_found, "no previously declared queue", []);
expand_queue_name_shortcut(<<>>, #ch{ virtual_host = VHostPath,
most_recently_declared_queue = MRDQ }) ->
rabbit_misc:r(VHostPath, queue, MRDQ);
@@ -310,7 +343,7 @@ expand_queue_name_shortcut(QueueNameBin, #ch{ virtual_host = VHostPath }) ->
expand_routing_key_shortcut(<<>>, <<>>,
#ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
- not_allowed, "no previously declared queue", []);
+ not_found, "no previously declared queue", []);
expand_routing_key_shortcut(<<>>, <<>>,
#ch{ most_recently_declared_queue = MRDQ }) ->
MRDQ;
@@ -352,8 +385,10 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
- rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
- {reply, #'channel.open_ok'{}, State#ch{state = running}};
+ case rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}) of
+ true -> {noreply, State};
+ false -> {reply, #'channel.open_ok'{}, State#ch{state = running}}
+ end;
handle_method(#'channel.open'{}, _, _State) ->
rabbit_misc:protocol_error(
@@ -370,13 +405,17 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
-handle_method(#'basic.publish'{exchange = ExchangeNameBin,
+handle_method(#'basic.publish'{}, _, #ch{flow = #flow{client = false}}) ->
+ rabbit_misc:protocol_error(
+ command_invalid,
+ "basic.publish received after channel.flow_ok{active=false}", []);
+handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
- mandatory = Mandatory,
- immediate = Immediate},
- Content, State = #ch{ virtual_host = VHostPath,
- transaction_id = TxnKey,
- writer_pid = WriterPid}) ->
+ mandatory = Mandatory,
+ immediate = Immediate},
+ Content, State = #ch{virtual_host = VHostPath,
+ transaction_id = TxnKey,
+ writer_pid = WriterPid}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -394,16 +433,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Exchange,
rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)),
case RoutingRes of
- routed ->
- ok;
- unroutable ->
- %% FIXME: 312 should be replaced by the ?NO_ROUTE
- %% definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 312, <<"unroutable">>);
- not_delivered ->
- %% FIXME: 313 should be replaced by the ?NO_CONSUMERS
- %% definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>)
+ routed -> ok;
+ unroutable -> ok = basic_return(Message, WriterPid, no_route);
+ not_delivered -> ok = basic_return(Message, WriterPid, no_consumers)
end,
{noreply, case TxnKey of
none -> State;
@@ -413,13 +445,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
_, State = #ch{transaction_id = TxnKey,
- next_tag = NextDeliveryTag,
unacked_message_q = UAMQ}) ->
- if DeliveryTag >= NextDeliveryTag ->
- rabbit_misc:protocol_error(
- command_invalid, "unknown delivery tag ~w", [DeliveryTag]);
- true -> ok
- end,
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
Participants = ack(TxnKey, Acked),
{noreply, case TxnKey of
@@ -436,11 +462,12 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
_, State = #ch{ writer_pid = WriterPid,
+ reader_pid = ReaderPid,
next_tag = DeliveryTag }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
- case rabbit_amqqueue:with_or_die(
- QueueName,
+ case rabbit_amqqueue:with_exclusive_access_or_die(
+ QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
Msg = {_QName, _QPid, _MsgId, Redelivered,
@@ -458,7 +485,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
Content),
{noreply, State1#ch{next_tag = DeliveryTag + 1}};
empty ->
- {reply, #'basic.get_empty'{cluster_id = <<>>}, State}
+ {reply, #'basic.get_empty'{}, State}
end;
handle_method(#'basic.consume'{queue = QueueNameBin,
@@ -480,14 +507,14 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
Other -> Other
end,
- %% In order to ensure that the consume_ok gets sent before
- %% any messages are sent to the consumer, we get the queue
- %% process to send the consume_ok on our behalf.
- case rabbit_amqqueue:with_or_die(
- QueueName,
+ %% We get the queue process to send the consume_ok on our
+ %% behalf. This is for symmetry with basic.cancel - see
+ %% the comment in that method for why.
+ case rabbit_amqqueue:with_exclusive_access_or_die(
+ QueueName, ReaderPid,
fun (Q) ->
rabbit_amqqueue:basic_consume(
- Q, NoAck, ReaderPid, self(), LimiterPid,
+ Q, NoAck, self(), LimiterPid,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag}))
@@ -497,14 +524,6 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
dict:store(ActualConsumerTag,
QueueName,
ConsumerMapping)}};
- {error, queue_owned_by_another_connection} ->
- %% The spec is silent on which exception to use
- %% here. This seems reasonable?
- %% FIXME: check this
-
- rabbit_misc:protocol_error(
- resource_locked, "~s owned by another connection",
- [rabbit_misc:rs(QueueName)]);
{error, exclusive_consume_unavailable} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
@@ -571,9 +590,8 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
end,
{reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}};
-handle_method(#'basic.recover'{requeue = true},
- _, State = #ch{ transaction_id = none,
- unacked_message_q = UAMQ }) ->
+handle_method(#'basic.recover_async'{requeue = true},
+ _, State = #ch{ unacked_message_q = UAMQ }) ->
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
%% The Qpid python test suite incorrectly assumes
@@ -583,12 +601,12 @@ handle_method(#'basic.recover'{requeue = true},
rabbit_amqqueue:requeue(
QPid, lists:reverse(MsgIds), self())
end, ok, UAMQ),
- %% No answer required, apparently!
+ %% No answer required - basic.recover is the newer, synchronous
+ %% variant of this method
{noreply, State#ch{unacked_message_q = queue:new()}};
-handle_method(#'basic.recover'{requeue = false},
- _, State = #ch{ transaction_id = none,
- writer_pid = WriterPid,
+handle_method(#'basic.recover_async'{requeue = false},
+ _, State = #ch{ writer_pid = WriterPid,
unacked_message_q = UAMQ }) ->
ok = rabbit_misc:queue_fold(
fun ({_DeliveryTag, none, _Msg}, ok) ->
@@ -608,12 +626,17 @@ handle_method(#'basic.recover'{requeue = false},
WriterPid, false, ConsumerTag, DeliveryTag,
{QName, QPid, MsgId, true, Message})
end, ok, UAMQ),
- %% No answer required, apparently!
+ %% No answer required - basic.recover is the newer, synchronous
+ %% variant of this method
{noreply, State};
-handle_method(#'basic.recover'{}, _, _State) ->
- rabbit_misc:protocol_error(
- not_allowed, "attempt to recover a transactional channel",[]);
+handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
+ {noreply, State2 = #ch{writer_pid = WriterPid}} =
+ handle_method(#'basic.recover_async'{requeue = Requeue},
+ Content,
+ State),
+ ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}),
+ {noreply, State2};
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
@@ -644,18 +667,17 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
AutoDelete,
Args)
end,
- ok = rabbit_exchange:assert_type(X, CheckedType),
+ ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable,
+ AutoDelete, Args),
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
- type = TypeNameBin,
passive = true,
nowait = NoWait},
_, State = #ch{ virtual_host = VHostPath }) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_configure_permitted(ExchangeName, State),
- X = rabbit_exchange:lookup_or_die(ExchangeName),
- ok = rabbit_exchange:assert_type(X, rabbit_exchange:check_type(TypeNameBin)),
+ _ = rabbit_exchange:lookup_or_die(ExchangeName),
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
@@ -674,73 +696,78 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
return_ok(State, NoWait, #'exchange.delete_ok'{})
end;
-handle_method(#'queue.declare'{queue = QueueNameBin,
- passive = false,
- durable = Durable,
- exclusive = ExclusiveDeclare,
+handle_method(#'queue.declare'{queue = QueueNameBin,
+ passive = false,
+ durable = Durable,
+ exclusive = ExclusiveDeclare,
auto_delete = AutoDelete,
- nowait = NoWait,
- arguments = Args},
- _, State = #ch { virtual_host = VHostPath,
- reader_pid = ReaderPid }) ->
- %% FIXME: atomic create&claim
- Finish =
- fun (Q) ->
- if ExclusiveDeclare ->
- case rabbit_amqqueue:claim_queue(Q, ReaderPid) of
- locked ->
- %% AMQP 0-8 doesn't say which
- %% exception to use, so we mimic QPid
- %% here.
- rabbit_misc:protocol_error(
- resource_locked,
- "cannot obtain exclusive access to locked ~s",
- [rabbit_misc:rs(Q#amqqueue.name)]);
- ok -> ok
- end;
- true ->
- ok
- end,
- Q
- end,
- Q = case rabbit_amqqueue:with(
- rabbit_misc:r(VHostPath, queue, QueueNameBin),
- Finish) of
- {error, not_found} ->
- ActualNameBin =
- case QueueNameBin of
+ nowait = NoWait,
+ arguments = Args} = Declare,
+ _, State = #ch{virtual_host = VHostPath,
+ reader_pid = ReaderPid,
+ queue_collector_pid = CollectorPid}) ->
+ Owner = case ExclusiveDeclare of
+ true -> ReaderPid;
+ false -> none
+ end,
+ ActualNameBin = case QueueNameBin of
<<>> -> rabbit_guid:binstring_guid("amq.gen");
Other -> check_name('queue', Other)
end,
- QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- check_configure_permitted(QueueName, State),
- Finish(rabbit_amqqueue:declare(QueueName,
- Durable, AutoDelete, Args));
- Other = #amqqueue{name = QueueName} ->
- check_configure_permitted(QueueName, State),
- Other
- end,
- return_queue_declare_ok(State, NoWait, Q);
+ QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
+ check_configure_permitted(QueueName, State),
+ case rabbit_amqqueue:with(
+ QueueName,
+ fun (Q) -> ok = rabbit_amqqueue:assert_equivalence(
+ Q, Durable, AutoDelete, Args, Owner),
+ rabbit_amqqueue:stat(Q)
+ end) of
+ {ok, MessageCount, ConsumerCount} ->
+ return_queue_declare_ok(QueueName, NoWait, MessageCount,
+ ConsumerCount, State);
+ {error, not_found} ->
+ case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
+ Args, Owner) of
+ {new, Q = #amqqueue{}} ->
+ %% We need to notify the reader within the channel
+ %% process so that we can be sure there are no
+ %% outstanding exclusive queues being declared as
+ %% the connection shuts down.
+ ok = case Owner of
+ none -> ok;
+ _ -> rabbit_queue_collector:register(CollectorPid, Q)
+ end,
+ return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
+ {existing, _Q} ->
+ %% must have been created between the stat and the
+ %% declare. Loop around again.
+ handle_method(Declare, none, State)
+ end
+ end;
-handle_method(#'queue.declare'{queue = QueueNameBin,
+handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
- nowait = NoWait},
- _, State = #ch{ virtual_host = VHostPath }) ->
+ nowait = NoWait},
+ _, State = #ch{virtual_host = VHostPath,
+ reader_pid = ReaderPid}) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
- Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end),
- return_queue_declare_ok(State, NoWait, Q);
+ {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
+ rabbit_amqqueue:with_or_die(
+ QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end),
+ ok = rabbit_amqqueue:check_exclusive_access(Q, ReaderPid),
+ return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
+ State);
handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
if_empty = IfEmpty,
- nowait = NoWait
- },
- _, State) ->
+ nowait = NoWait},
+ _, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
- case rabbit_amqqueue:with_or_die(
- QueueName,
+ case rabbit_amqqueue:with_exclusive_access_or_die(
+ QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
rabbit_misc:protocol_error(
@@ -750,8 +777,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]);
{ok, PurgedMessageCount} ->
return_ok(State, NoWait,
- #'queue.delete_ok'{
- message_count = PurgedMessageCount})
+ #'queue.delete_ok'{message_count = PurgedMessageCount})
end;
handle_method(#'queue.bind'{queue = QueueNameBin,
@@ -759,7 +785,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
routing_key = RoutingKey,
nowait = NoWait,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin,
+ binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{},
NoWait, State);
@@ -767,17 +793,17 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin,
+ binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{},
false, State);
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
- _, State) ->
+ _, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
- {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die(
- QueueName,
+ {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die(
+ QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:purge(Q) end),
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
@@ -790,14 +816,14 @@ handle_method(#'tx.select'{}, _, State) ->
handle_method(#'tx.commit'{}, _, #ch{transaction_id = none}) ->
rabbit_misc:protocol_error(
- not_allowed, "channel is not transactional", []);
+ precondition_failed, "channel is not transactional", []);
handle_method(#'tx.commit'{}, _, State) ->
{reply, #'tx.commit_ok'{}, internal_commit(State)};
handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) ->
rabbit_misc:protocol_error(
- not_allowed, "channel is not transactional", []);
+ precondition_failed, "channel is not transactional", []);
handle_method(#'tx.rollback'{}, _, State) ->
{reply, #'tx.rollback_ok'{}, internal_rollback(State)};
@@ -810,7 +836,6 @@ handle_method(#'channel.flow'{active = true}, _,
end,
{reply, #'channel.flow_ok'{active = true},
State#ch{limiter_pid = LimiterPid1}};
-
handle_method(#'channel.flow'{active = false}, _,
State = #ch{limiter_pid = LimiterPid,
consumer_mapping = Consumers}) ->
@@ -828,11 +853,25 @@ handle_method(#'channel.flow'{active = false}, _,
blocking = dict:from_list(Queues)}}
end;
-handle_method(#'channel.flow_ok'{active = _}, _, State) ->
- %% TODO: We may want to correlate this to channel.flow messages we
- %% have sent, and complain if we get an unsolicited
- %% channel.flow_ok, or the client refuses our flow request.
- {noreply, State};
+handle_method(#'channel.flow_ok'{active = Active}, _,
+ State = #ch{flow = #flow{server = Active, client = Flow,
+ pending = {_Ref, TRef}} = F})
+ when Flow =:= not Active ->
+ {ok, cancel} = timer:cancel(TRef),
+ {noreply, State#ch{flow = F#flow{client = Active, pending = none}}};
+handle_method(#'channel.flow_ok'{active = Active}, _,
+ State = #ch{flow = #flow{server = Flow, client = Flow,
+ pending = {_Ref, TRef}}})
+ when Flow =:= not Active ->
+ {ok, cancel} = timer:cancel(TRef),
+ {noreply, issue_flow(Flow, State)};
+handle_method(#'channel.flow_ok'{}, _, #ch{flow = #flow{pending = none}}) ->
+ rabbit_misc:protocol_error(
+ command_invalid, "unsolicited channel.flow_ok", []);
+handle_method(#'channel.flow_ok'{active = Active}, _, _State) ->
+ rabbit_misc:protocol_error(
+ command_invalid,
+ "received channel.flow_ok{active=~w} has incorrect polarity", [Active]);
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
@@ -840,8 +879,26 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
+flow_control(Active, State = #ch{flow = #flow{server = Flow, pending = none}})
+ when Flow =:= not Active ->
+ ok = clear_permission_cache(),
+ noreply(issue_flow(Active, State));
+flow_control(Active, State = #ch{flow = F}) ->
+ noreply(State#ch{flow = F#flow{server = Active}}).
+
+issue_flow(Active, State) ->
+ ok = rabbit_writer:send_command(
+ State#ch.writer_pid, #'channel.flow'{active = Active}),
+ Ref = make_ref(),
+ {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout,
+ [self(), Ref]),
+ State#ch{flow = #flow{server = Active, client = not Active,
+ pending = {Ref, TRef}}}.
+
binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
- ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) ->
+ ReturnMethod, NoWait,
+ State = #ch{virtual_host = VHostPath,
+ reader_pid = ReaderPid}) ->
%% FIXME: connection exception (!) on failure??
%% (see rule named "failure" in spec-XML)
%% FIXME: don't allow binding to internal exchanges -
@@ -852,7 +909,12 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_read_permitted(ExchangeName, State),
- case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of
+ case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments,
+ fun (_X, Q) ->
+ try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid)
+ catch exit:Reason -> {error, Reason}
+ end
+ end) of
{error, exchange_not_found} ->
rabbit_misc:not_found(ExchangeName);
{error, queue_not_found} ->
@@ -866,17 +928,17 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
not_found, "no binding ~s between ~s and ~s",
[RoutingKey, rabbit_misc:rs(ExchangeName),
rabbit_misc:rs(QueueName)]);
- {error, durability_settings_incompatible} ->
- rabbit_misc:protocol_error(
- not_allowed, "durability settings of ~s incompatible with ~s",
- [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]);
+ {error, #amqp_error{} = Error} ->
+ rabbit_misc:protocol_error(Error);
ok -> return_ok(State, NoWait, ReturnMethod)
end.
basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = Content},
- WriterPid, ReplyCode, ReplyText) ->
+ WriterPid, Reason) ->
+ {_Close, ReplyCode, ReplyText} =
+ rabbit_framing:lookup_amqp_exception(Reason),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.return'{reply_code = ReplyCode,
@@ -904,7 +966,8 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
QTail, DeliveryTag, Multiple)
end;
{empty, _} ->
- {ToAcc, PrefixAcc}
+ rabbit_misc:protocol_error(
+ not_found, "unknown delivery tag ~w", [DeliveryTag])
end.
add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) ->
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index d1834b3b73..6e6ad06cb3 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -44,7 +44,7 @@
-spec(start/0 :: () -> no_return()).
-spec(stop/0 :: () -> 'ok').
--spec(action/4 :: (atom(), erlang_node(), [string()],
+-spec(action/4 :: (atom(), node(), [string()],
fun ((string(), [any()]) -> 'ok')) -> 'ok').
-spec(usage/0 :: () -> no_return()).
@@ -59,8 +59,8 @@ start() ->
parse_args(FullCommand, #params{quiet = false,
node = rabbit_misc:makenode(NodeStr)}),
Inform = case Quiet of
- true -> fun(_Format, _Args1) -> ok end;
- false -> fun(Format, Args1) ->
+ true -> fun (_Format, _Args1) -> ok end;
+ false -> fun (Format, Args1) ->
io:format(Format ++ " ...~n", Args1)
end
end,
@@ -160,6 +160,12 @@ action(cluster, Node, ClusterNodeSs, Inform) ->
[Node, ClusterNodes]),
rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]);
+action(force_cluster, Node, ClusterNodeSs, Inform) ->
+ ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
+ Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)",
+ [Node, ClusterNodes]),
+ rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]);
+
action(status, Node, [], Inform) ->
Inform("Status of node ~p", [Node]),
case call(Node, {rabbit, status, []}) of
diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl
index f19e8d025a..0ec6beb676 100644
--- a/src/rabbit_dialyzer.erl
+++ b/src/rabbit_dialyzer.erl
@@ -30,17 +30,17 @@
%%
-module(rabbit_dialyzer).
--include("rabbit.hrl").
--export([create_basic_plt/1, add_to_plt/2, dialyze_files/2, halt_with_code/1]).
+-export([create_basic_plt/1, add_to_plt/2, dialyze_files/2,
+ halt_with_code/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(create_basic_plt/1 :: (file_path()) -> 'ok').
--spec(add_to_plt/2 :: (file_path(), string()) -> 'ok').
--spec(dialyze_files/2 :: (file_path(), string()) -> 'ok').
+-spec(create_basic_plt/1 :: (file:filename()) -> 'ok').
+-spec(add_to_plt/2 :: (file:filename(), string()) -> 'ok').
+-spec(dialyze_files/2 :: (file:filename(), string()) -> 'ok').
-spec(halt_with_code/1 :: (atom()) -> no_return()).
-endif.
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index e9baf2c480..42861f8603 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -39,7 +39,8 @@
-export([boot/0]).
--export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2, handle_info/2]).
+-export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2,
+ handle_info/2]).
boot() ->
{ok, DefaultVHost} = application:get_env(default_vhost),
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index 45b66712b8..875d680f86 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -33,7 +33,8 @@
-behaviour(gen_event).
--export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]).
+-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2,
+ code_change/3]).
%% rabbit_error_logger_file_h is a wrapper around the error_logger_file_h
%% module because the original's init/1 does not match properly
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 8f41392f83..d91ebe9ba9 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -33,13 +33,14 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, declare/5, lookup/1, lookup_or_die/1,
- list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
- publish/2]).
--export([add_binding/4, delete_binding/4, list_bindings/1]).
+-export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0,
+ info/1, info/2, info_all/1, info_all/2, publish/2]).
+-export([add_binding/5, delete_binding/5, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
--export([check_type/1, assert_type/2]).
+-export([assert_equivalence/5]).
+-export([assert_args_equivalence/2]).
+-export([check_type/1]).
%% EXTENDED API
-export([list_exchange_bindings/1]).
@@ -54,55 +55,87 @@
-ifdef(use_specs).
--type(bind_res() :: 'ok' | {'error',
- 'queue_not_found' |
- 'exchange_not_found' |
- 'exchange_and_queue_not_found'}).
+-export_type([name/0, type/0, binding_key/0]).
+
+-type(name() :: rabbit_types:r('exchange')).
+-type(type() :: atom()).
+-type(binding_key() :: binary()).
+
+-type(bind_res() :: rabbit_types:ok_or_error('queue_not_found' |
+ 'exchange_not_found' |
+ 'exchange_and_queue_not_found')).
+-type(inner_fun() ::
+ fun((rabbit_types:exchange(), queue()) ->
+ rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
+
-spec(recover/0 :: () -> 'ok').
--spec(declare/5 :: (exchange_name(), exchange_type(), boolean(), boolean(),
- amqp_table()) -> exchange()).
+-spec(declare/5 ::
+ (name(), type(), boolean(), boolean(), rabbit_framing:amqp_table())
+ -> rabbit_types:exchange()).
-spec(check_type/1 :: (binary()) -> atom()).
--spec(assert_type/2 :: (exchange(), atom()) -> 'ok').
--spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()).
--spec(lookup_or_die/1 :: (exchange_name()) -> exchange()).
--spec(list/1 :: (vhost()) -> [exchange()]).
--spec(info_keys/0 :: () -> [info_key()]).
--spec(info/1 :: (exchange()) -> [info()]).
--spec(info/2 :: (exchange(), [info_key()]) -> [info()]).
--spec(info_all/1 :: (vhost()) -> [[info()]]).
--spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
--spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
--spec(add_binding/4 ::
- (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
- bind_res() | {'error', 'durability_settings_incompatible'}).
--spec(delete_binding/4 ::
- (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
- bind_res() | {'error', 'binding_not_found'}).
--spec(list_bindings/1 :: (vhost()) ->
- [{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
--spec(delete_queue_bindings/1 :: (queue_name()) -> fun(() -> none())).
--spec(delete_transient_queue_bindings/1 :: (queue_name()) -> fun(() -> none())).
--spec(delete/2 :: (exchange_name(), boolean()) ->
- 'ok' | not_found() | {'error', 'in_use'}).
--spec(list_queue_bindings/1 :: (queue_name()) ->
- [{exchange_name(), routing_key(), amqp_table()}]).
--spec(list_exchange_bindings/1 :: (exchange_name()) ->
- [{queue_name(), routing_key(), amqp_table()}]).
+-spec(assert_equivalence/5 ::
+ (rabbit_types:exchange(), atom(), boolean(), boolean(),
+ rabbit_framing:amqp_table())
+ -> 'ok').
+-spec(assert_args_equivalence/2 ::
+ (rabbit_types:exchange(), rabbit_framing:amqp_table()) -> 'ok').
+-spec(lookup/1 ::
+ (name()) -> rabbit_types:ok(rabbit_types:exchange()) |
+ rabbit_types:error('not_found')).
+-spec(lookup_or_die/1 :: (name()) -> rabbit_types:exchange()).
+-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]).
+-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
+-spec(info/1 :: (rabbit_types:exchange()) -> [rabbit_types:info()]).
+-spec(info/2 ::
+ (rabbit_types:exchange(), [rabbit_types:info_key()])
+ -> [rabbit_types:info()]).
+-spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]).
+-spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()])
+ -> [[rabbit_types:info()]]).
+-spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
+ -> {rabbit_router:routing_result(), [pid()]}).
+-spec(add_binding/5 ::
+ (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
+ rabbit_framing:amqp_table(), inner_fun())
+ -> bind_res()).
+-spec(delete_binding/5 ::
+ (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
+ rabbit_framing:amqp_table(), inner_fun())
+ -> bind_res() | rabbit_types:error('binding_not_found')).
+-spec(list_bindings/1 ::
+ (rabbit_types:vhost())
+ -> [{name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
+ rabbit_framing:amqp_table()}]).
+-spec(delete_queue_bindings/1 ::
+ (rabbit_amqqueue:name()) -> fun (() -> none())).
+-spec(delete_transient_queue_bindings/1 ::
+ (rabbit_amqqueue:name()) -> fun (() -> none())).
+-spec(delete/2 ::
+ (name(), boolean())-> 'ok' |
+ rabbit_types:error('not_found') |
+ rabbit_types:error('in_use')).
+-spec(list_queue_bindings/1 ::
+ (rabbit_amqqueue:name())
+ -> [{name(), rabbit_router:routing_key(),
+ rabbit_framing:amqp_table()}]).
+-spec(list_exchange_bindings/1 ::
+ (name()) -> [{rabbit_amqqueue:name(), rabbit_router:routing_key(),
+ rabbit_framing:amqp_table()}]).
-endif.
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
+-define(INFO_KEYS, [name, type, durable, auto_delete, arguments]).
recover() ->
Exs = rabbit_misc:table_fold(
- fun(Exchange, Acc) ->
+ fun (Exchange, Acc) ->
ok = mnesia:write(rabbit_exchange, Exchange, write),
[Exchange | Acc]
end, [], rabbit_durable_exchange),
Bs = rabbit_misc:table_fold(
- fun(Route = #route{binding = B}, Acc) ->
+ fun (Route = #route{binding = B}, Acc) ->
{_, ReverseRoute} = route_with_reverse(Route),
ok = mnesia:write(rabbit_route,
Route, write),
@@ -182,13 +215,36 @@ check_type(TypeBin) ->
T
end.
-assert_type(#exchange{ type = ActualType }, RequiredType)
- when ActualType == RequiredType ->
- ok;
-assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) ->
+assert_equivalence(X = #exchange{ durable = Durable,
+ auto_delete = AutoDelete,
+ type = Type},
+ Type, Durable, AutoDelete,
+ RequiredArgs) ->
+ ok = (type_to_module(Type)):assert_args_equivalence(X, RequiredArgs);
+assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete,
+ _Args) ->
rabbit_misc:protocol_error(
- not_allowed, "cannot redeclare ~s of type '~s' with type '~s'",
- [rabbit_misc:rs(Name), ActualType, RequiredType]).
+ not_allowed,
+ "cannot redeclare ~s with different type, durable or autodelete value",
+ [rabbit_misc:rs(Name)]).
+
+alternate_exchange_value(Args) ->
+ lists:keysearch(<<"alternate-exchange">>, 1, Args).
+
+assert_args_equivalence(#exchange{ name = Name,
+ arguments = Args },
+ RequiredArgs) ->
+ %% The spec says "Arguments are compared for semantic
+ %% equivalence". The only arg we care about is
+ %% "alternate-exchange".
+ Ae1 = alternate_exchange_value(RequiredArgs),
+ Ae2 = alternate_exchange_value(Args),
+ if Ae1==Ae2 -> ok;
+ true -> rabbit_misc:protocol_error(
+ not_allowed,
+ "cannot redeclare ~s with inequivalent args",
+ [rabbit_misc:rs(Name)])
+ end.
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_exchange, Name}).
@@ -305,7 +361,7 @@ delete_queue_bindings(QueueName, FwdDeleteFun) ->
Module = type_to_module(Type),
case IsDeleted of
auto_deleted -> Module:delete(X, Bs);
- no_delete -> Module:remove_bindings(X, Bs)
+ not_deleted -> Module:remove_bindings(X, Bs)
end
end, Cleanup)
end.
@@ -349,7 +405,7 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
call_with_exchange(Exchange, Fun) ->
rabbit_misc:execute_mnesia_transaction(
- fun() -> case mnesia:read({rabbit_exchange, Exchange}) of
+ fun () -> case mnesia:read({rabbit_exchange, Exchange}) of
[] -> {error, not_found};
[X] -> Fun(X)
end
@@ -357,7 +413,7 @@ call_with_exchange(Exchange, Fun) ->
call_with_exchange_and_queue(Exchange, Queue, Fun) ->
rabbit_misc:execute_mnesia_transaction(
- fun() -> case {mnesia:read({rabbit_exchange, Exchange}),
+ fun () -> case {mnesia:read({rabbit_exchange, Exchange}),
mnesia:read({rabbit_queue, Queue})} of
{[X], [Q]} -> Fun(X, Q);
{[ ], [_]} -> {error, exchange_not_found};
@@ -366,50 +422,66 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
end
end).
-add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
case binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
fun (X, Q, B) ->
- if Q#amqqueue.durable and not(X#exchange.durable) ->
- {error, durability_settings_incompatible};
- true ->
+ %% this argument is used to check queue exclusivity;
+ %% in general, we want to fail on that in preference to
+ %% anything else
+ case InnerFun(X, Q) of
+ ok ->
case mnesia:read({rabbit_route, B}) of
[] ->
- sync_binding(B, Q#amqqueue.durable,
- fun mnesia:write/3),
+ ok = sync_binding(B,
+ X#exchange.durable andalso
+ Q#amqqueue.durable,
+ fun mnesia:write/3),
{new, X, B};
[_R] ->
{existing, X, B}
- end
+ end;
+ {error, _} = E ->
+ E
end
end) of
{new, Exchange = #exchange{ type = Type }, Binding} ->
(type_to_module(Type)):add_binding(Exchange, Binding);
{existing, _, _} ->
ok;
- Err = {error, _} ->
+ {error, _} = Err ->
Err
end.
-delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
case binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
fun (X, Q, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
- [] -> {error, binding_not_found};
- _ -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:delete_object/3),
- {maybe_auto_delete(X), B}
+ [] ->
+ {error, binding_not_found};
+ _ ->
+ case InnerFun(X, Q) of
+ ok ->
+ ok =
+ sync_binding(B,
+ X#exchange.durable andalso
+ Q#amqqueue.durable,
+ fun mnesia:delete_object/3),
+ {maybe_auto_delete(X), B};
+ {error, _} = E ->
+ E
+ end
end
end) of
- Err = {error, _} ->
+ {error, _} = Err ->
Err;
- {{Action, X = #exchange{ type = Type }}, B} ->
+ {{IsDeleted, X = #exchange{ type = Type }}, B} ->
Module = type_to_module(Type),
- case Action of
- auto_delete -> Module:delete(X, [B]);
- no_delete -> Module:remove_bindings(X, [B])
+ case IsDeleted of
+ auto_deleted -> Module:delete(X, [B]);
+ not_deleted -> Module:remove_bindings(X, [B])
end
end.
@@ -493,10 +565,10 @@ delete(ExchangeName, IfUnused) ->
end.
maybe_auto_delete(Exchange = #exchange{auto_delete = false}) ->
- {no_delete, Exchange};
+ {not_deleted, Exchange};
maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
case conditional_delete(Exchange) of
- {error, in_use} -> {no_delete, Exchange};
+ {error, in_use} -> {not_deleted, Exchange};
{deleted, Exchange, []} -> {auto_deleted, Exchange}
end.
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index a8c071e681..85760edce4 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -54,7 +54,11 @@ behaviour_info(callbacks) ->
{add_binding, 2},
%% called after bindings have been deleted.
- {remove_bindings, 2}
+ {remove_bindings, 2},
+
+ %% called when comparing exchanges for equivalence - should return ok or
+ %% exit with #amqp_error{}
+ {assert_args_equivalence, 2}
];
behaviour_info(_Other) ->
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 9b71e0e1d1..4f6eb85199 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -36,7 +36,7 @@
-export([description/0, publish/2]).
-export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2]).
+ add_binding/2, remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -61,3 +61,5 @@ recover(_X, _Bs) -> ok.
delete(_X, _Bs) -> ok.
add_binding(_X, _B) -> ok.
remove_bindings(_X, _Bs) -> ok.
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 311654ab21..94798c78fe 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -35,8 +35,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, publish/2]).
--export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2]).
+-export([validate/1, create/1, recover/2, delete/2, add_binding/2,
+ remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -59,3 +59,5 @@ recover(_X, _Bs) -> ok.
delete(_X, _Bs) -> ok.
add_binding(_X, _B) -> ok.
remove_bindings(_X, _Bs) -> ok.
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 285dab1a03..44607398cb 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -36,8 +36,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, publish/2]).
--export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2]).
+-export([validate/1, create/1, recover/2, delete/2, add_binding/2,
+ remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -48,7 +48,8 @@
{enables, kernel_ready}]}).
-ifdef(use_specs).
--spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()).
+-spec(headers_match/2 :: (rabbit_framing:amqp_table(),
+ rabbit_framing:amqp_table()) -> boolean()).
-endif.
description() ->
@@ -135,3 +136,5 @@ recover(_X, _Bs) -> ok.
delete(_X, _Bs) -> ok.
add_binding(_X, _B) -> ok.
remove_bindings(_X, _Bs) -> ok.
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_exchange_type_registry.erl
index 175d15ad83..7906fbee72 100644
--- a/src/rabbit_exchange_type_registry.erl
+++ b/src/rabbit_exchange_type_registry.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -35,8 +35,8 @@
-export([start_link/0]).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
-export([register/2, binary_to_type/1, lookup_module/1]).
@@ -45,10 +45,13 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> 'ignore' | {'error', term()} | {'ok', pid()}).
+-spec(start_link/0 ::
+ () -> 'ignore' | rabbit_types:ok_or_error2(pid(), term())).
-spec(register/2 :: (binary(), atom()) -> 'ok').
--spec(binary_to_type/1 :: (binary()) -> atom() | {'error', 'not_found'}).
--spec(lookup_module/1 :: (atom()) -> {'ok', atom()} | {'error', 'not_found'}).
+-spec(binary_to_type/1 ::
+ (binary()) -> atom() | rabbit_types:error('not_found')).
+-spec(lookup_module/1 ::
+ (atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')).
-endif.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 8a3dceeaeb..a374cfee7f 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -35,8 +35,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, publish/2]).
--export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2]).
+-export([validate/1, create/1, recover/2, delete/2, add_binding/2,
+ remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -49,7 +49,9 @@
-export([topic_matches/2]).
-ifdef(use_specs).
+
-spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
+
-endif.
description() ->
@@ -99,3 +101,5 @@ recover(_X, _Bs) -> ok.
delete(_X, _Bs) -> ok.
add_binding(_X, _B) -> ok.
remove_bindings(_X, _Bs) -> ok.
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index b7c6aa96fa..bc1a2a0835 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -76,29 +76,27 @@ mainloop(ChannelPid) ->
{method, MethodName, FieldsBin} = read_frame(ChannelPid),
Method = rabbit_framing:decode_method_fields(MethodName, FieldsBin),
case rabbit_framing:method_has_content(MethodName) of
- true -> rabbit_channel:do(ChannelPid, Method,
- collect_content(ChannelPid, MethodName));
+ true -> {ClassId, _MethodId} = rabbit_framing:method_id(MethodName),
+ rabbit_channel:do(ChannelPid, Method,
+ collect_content(ChannelPid, ClassId));
false -> rabbit_channel:do(ChannelPid, Method)
end,
?MODULE:mainloop(ChannelPid).
-collect_content(ChannelPid, MethodName) ->
- {ClassId, _MethodId} = rabbit_framing:method_id(MethodName),
+collect_content(ChannelPid, ClassId) ->
case read_frame(ChannelPid) of
- {content_header, HeaderClassId, 0, BodySize, PropertiesBin} ->
- if HeaderClassId == ClassId ->
- Payload = collect_content_payload(ChannelPid, BodySize, []),
- #content{class_id = ClassId,
- properties = none,
- properties_bin = PropertiesBin,
- payload_fragments_rev = Payload};
- true ->
- rabbit_misc:protocol_error(
- command_invalid,
- "expected content header for class ~w, "
- "got one for class ~w instead",
- [ClassId, HeaderClassId])
- end;
+ {content_header, ClassId, 0, BodySize, PropertiesBin} ->
+ Payload = collect_content_payload(ChannelPid, BodySize, []),
+ #content{class_id = ClassId,
+ properties = none,
+ properties_bin = PropertiesBin,
+ payload_fragments_rev = Payload};
+ {content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} ->
+ rabbit_misc:protocol_error(
+ command_invalid,
+ "expected content header for class ~w, "
+ "got one for class ~w instead",
+ [ClassId, HeaderClassId]);
_ ->
rabbit_misc:protocol_error(
command_invalid,
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 1ae8f7dac4..af1c629f41 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -31,15 +31,13 @@
-module(rabbit_guid).
--include("rabbit.hrl").
-
-behaviour(gen_server).
-export([start_link/0]).
-export([guid/0, string_guid/1, binstring_guid/1]).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
-define(SERVER, ?MODULE).
-define(SERIAL_FILENAME, "rabbit_serial").
@@ -50,7 +48,11 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-export_type([guid/0]).
+
+-type(guid() :: binary()).
+
+-spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
-spec(guid/0 :: () -> guid()).
-spec(string_guid/1 :: (any()) -> string()).
-spec(binstring_guid/1 :: (any()) -> binary()).
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index b4fd91560f..8214b976c4 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -43,16 +43,16 @@
-include("rabbit.hrl").
--record(iv_state, { queue, qname, len, pending_ack }).
+-record(iv_state, { queue, qname, durable, len, pending_ack }).
-record(tx, { pending_messages, pending_acks, is_persistent }).
-ifdef(use_specs).
--type(ack() :: guid() | 'blank_ack').
+-type(ack() :: rabbit_guid:guid() | 'blank_ack').
-type(state() :: #iv_state { queue :: queue(),
- qname :: queue_name(),
+ qname :: rabbit_amqqueue:name(),
len :: non_neg_integer(),
- pending_ack :: dict()
+ pending_ack :: dict:dictionary()
}).
-include("rabbit_backing_queue_spec.hrl").
@@ -66,18 +66,23 @@ init(QName, IsDurable, Recover) ->
true -> rabbit_persister:queue_content(QName);
false -> []
end),
- #iv_state { queue = Q, qname = QName, len = queue:len(Q),
+ #iv_state { queue = Q,
+ qname = QName,
+ durable = IsDurable,
+ len = queue:len(Q),
pending_ack = dict:new() }.
terminate(State) ->
State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }.
-delete_and_terminate(State = #iv_state { qname = QName, pending_ack = PA }) ->
- ok = persist_acks(none, QName, dict:fetch_keys(PA), PA),
+delete_and_terminate(State = #iv_state { qname = QName, durable = IsDurable,
+ pending_ack = PA }) ->
+ ok = persist_acks(QName, IsDurable, none, dict:fetch_keys(PA), PA),
{_PLen, State1} = purge(State),
terminate(State1).
-purge(State = #iv_state { len = Len, queue = Q, qname = QName }) ->
+purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
+ len = Len }) ->
%% We do not purge messages pending acks.
{AckTags, PA} =
rabbit_misc:queue_fold(
@@ -85,57 +90,63 @@ purge(State = #iv_state { len = Len, queue = Q, qname = QName }) ->
Acc;
({Msg = #basic_message { guid = Guid }, IsDelivered},
{AckTagsN, PAN}) ->
- ok = persist_delivery(QName, Msg, IsDelivered),
+ ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
{[Guid | AckTagsN], dict:store(Guid, Msg, PAN)}
end, {[], dict:new()}, Q),
- ok = persist_acks(none, QName, AckTags, PA),
+ ok = persist_acks(QName, IsDurable, none, AckTags, PA),
{Len, State #iv_state { len = 0, queue = queue:new() }}.
-publish(Msg, State = #iv_state { queue = Q, qname = QName, len = Len }) ->
- ok = persist_message(none, QName, Msg),
+publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
+ len = Len }) ->
+ ok = persist_message(QName, IsDurable, none, Msg),
State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }.
publish_delivered(false, _Msg, State) ->
{blank_ack, State};
publish_delivered(true, Msg = #basic_message { guid = Guid },
- State = #iv_state { qname = QName, len = 0,
- pending_ack = PA }) ->
- ok = persist_message(none, QName, Msg),
- ok = persist_delivery(QName, Msg, false),
+ State = #iv_state { qname = QName, durable = IsDurable,
+ len = 0, pending_ack = PA }) ->
+ ok = persist_message(QName, IsDurable, none, Msg),
+ ok = persist_delivery(QName, IsDurable, false, Msg),
{Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}.
fetch(_AckRequired, State = #iv_state { len = 0 }) ->
{empty, State};
-fetch(AckRequired, State = #iv_state { queue = Q, qname = QName, len = Len,
+fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName,
+ durable = IsDurable,
pending_ack = PA }) ->
{{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} =
queue:out(Q),
Len1 = Len - 1,
- ok = persist_delivery(QName, Msg, IsDelivered),
+ ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
PA1 = dict:store(Guid, Msg, PA),
{AckTag, PA2} = case AckRequired of
true -> {Guid, PA1};
- false -> ok = persist_acks(none, QName, [Guid], PA1),
+ false -> ok = persist_acks(QName, IsDurable, none,
+ [Guid], PA1),
{blank_ack, PA}
end,
{{Msg, IsDelivered, AckTag, Len1},
State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}.
-ack(AckTags, State = #iv_state { qname = QName, pending_ack = PA }) ->
- ok = persist_acks(none, QName, AckTags, PA),
+ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable,
+ pending_ack = PA }) ->
+ ok = persist_acks(QName, IsDurable, none, AckTags, PA),
PA1 = remove_acks(AckTags, PA),
State #iv_state { pending_ack = PA1 }.
-tx_publish(Txn, Msg, State = #iv_state { qname = QName }) ->
+tx_publish(Txn, Msg, State = #iv_state { qname = QName,
+ durable = IsDurable }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }),
- ok = persist_message(Txn, QName, Msg),
+ ok = persist_message(QName, IsDurable, Txn, Msg),
State.
-tx_ack(Txn, AckTags, State = #iv_state { qname = QName, pending_ack = PA }) ->
+tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable,
+ pending_ack = PA }) ->
Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }),
- ok = persist_acks(Txn, QName, AckTags, PA),
+ ok = persist_acks(QName, IsDurable, Txn, AckTags, PA),
State.
tx_rollback(Txn, State = #iv_state { qname = QName }) ->
@@ -228,32 +239,32 @@ do_if_persistent(F, Txn, QName) ->
%%----------------------------------------------------------------------------
-persist_message(_Txn, _QName, #basic_message { is_persistent = false }) ->
- ok;
-persist_message(Txn, QName, Msg) ->
+persist_message(QName, true, Txn, Msg = #basic_message {
+ is_persistent = true }) ->
Msg1 = Msg #basic_message {
- %% don't persist any recoverable decoded properties,
- %% rebuild from properties_bin on restore
+ %% don't persist any recoverable decoded properties
content = rabbit_binary_parser:clear_decoded_content(
Msg #basic_message.content)},
persist_work(Txn, QName,
- [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]).
+ [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]);
+persist_message(_QName, _IsDurable, _Txn, _Msg) ->
+ ok.
-persist_delivery(_QName, #basic_message { is_persistent = false },
- _IsDelivered) ->
- ok;
-persist_delivery(_QName, _Message, true) ->
- ok;
-persist_delivery(QName, #basic_message { guid = Guid }, _IsDelivered) ->
- persist_work(none, QName, [{deliver, {QName, Guid}}]).
+persist_delivery(QName, true, false, #basic_message { is_persistent = true,
+ guid = Guid }) ->
+ persist_work(none, QName, [{deliver, {QName, Guid}}]);
+persist_delivery(_QName, _IsDurable, _IsDelivered, _Msg) ->
+ ok.
-persist_acks(Txn, QName, AckTags, PA) ->
+persist_acks(QName, true, Txn, AckTags, PA) ->
persist_work(Txn, QName,
[{ack, {QName, Guid}} || Guid <- AckTags,
begin
{ok, Msg} = dict:find(Guid, PA),
Msg #basic_message.is_persistent
- end]).
+ end]);
+persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) ->
+ ok.
persist_work(_Txn,_QName, []) ->
ok;
diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl
index 4f467162e4..e0457b1e43 100644
--- a/src/rabbit_load.erl
+++ b/src/rabbit_load.erl
@@ -40,11 +40,10 @@
-ifdef(use_specs).
--type(erlang_node() :: atom()).
--type(load() :: {{non_neg_integer(), integer() | 'unknown'}, erlang_node()}).
+-type(load() :: {{non_neg_integer(), integer() | 'unknown'}, node()}).
-spec(local_load/0 :: () -> load()).
-spec(remote_loads/0 :: () -> [load()]).
--spec(pick/0 :: () -> erlang_node()).
+-spec(pick/0 :: () -> node()).
-endif.
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index cc80e360ae..85bcbca04a 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -50,7 +50,7 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
-spec(debug/1 :: (string()) -> 'ok').
-spec(debug/2 :: (string(), [any()]) -> 'ok').
-spec(info/1 :: (string()) -> 'ok').
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 91e97ffe49..bdf3807531 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -86,11 +86,12 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}).
+-spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
-spec(update/0 :: () -> 'ok').
-spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok').
-spec(deregister/1 :: (pid()) -> 'ok').
--spec(report_ram_duration/2 :: (pid(), float() | 'infinity') -> number()).
+-spec(report_ram_duration/2 ::
+ (pid(), float() | 'infinity') -> number() | 'infinity').
-spec(stop/0 :: () -> 'ok').
-endif.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 723b818b41..fcc9fc7e54 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -32,11 +32,12 @@
-module(rabbit_misc).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
+
-include_lib("kernel/include/file.hrl").
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
-export([die/1, frame_error/2, amqp_error/4,
- protocol_error/3, protocol_error/4]).
+ protocol_error/3, protocol_error/4, protocol_error/1]).
-export([not_found/1]).
-export([get_config/1, get_config/2, set_config/2]).
-export([dirty_read/1]).
@@ -71,61 +72,84 @@
-ifdef(use_specs).
--include_lib("kernel/include/inet.hrl").
+-export_type([resource_name/0]).
--type(ok_or_error() :: 'ok' | {'error', any()}).
+-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
+-type(thunk(T) :: fun(() -> T)).
+-type(resource_name() :: binary()).
--spec(method_record_type/1 :: (tuple()) -> atom()).
+-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
+ -> rabbit_framing:amqp_method_name()).
-spec(polite_pause/0 :: () -> 'done').
-spec(polite_pause/1 :: (non_neg_integer()) -> 'done').
--spec(die/1 :: (atom()) -> no_return()).
--spec(frame_error/2 :: (atom(), binary()) -> no_return()).
--spec(amqp_error/4 :: (atom(), string(), [any()], atom()) -> amqp_error()).
--spec(protocol_error/3 :: (atom(), string(), [any()]) -> no_return()).
--spec(protocol_error/4 :: (atom(), string(), [any()], atom()) -> no_return()).
--spec(not_found/1 :: (r(atom())) -> no_return()).
--spec(get_config/1 :: (atom()) -> {'ok', any()} | not_found()).
+-spec(die/1 :: (rabbit_framing:amqp_exception()) -> no_return()).
+-spec(frame_error/2 :: (rabbit_framing:amqp_method_name(), binary())
+ -> no_return()).
+-spec(amqp_error/4 ::
+ (rabbit_framing:amqp_exception(), string(), [any()],
+ rabbit_framing:amqp_method_name())
+ -> rabbit_types:amqp_error()).
+-spec(protocol_error/3 :: (rabbit_framing:amqp_exception(), string(), [any()])
+ -> no_return()).
+-spec(protocol_error/4 ::
+ (rabbit_framing:amqp_exception(), string(), [any()],
+ rabbit_framing:amqp_method_name())
+ -> no_return()).
+-spec(protocol_error/1 :: (rabbit_types:amqp_error()) -> no_return()).
+-spec(not_found/1 :: (rabbit_types:r(atom())) -> no_return()).
+-spec(get_config/1 ::
+ (atom()) -> rabbit_types:ok_or_error2(any(), 'not_found')).
-spec(get_config/2 :: (atom(), A) -> A).
-spec(set_config/2 :: (atom(), any()) -> 'ok').
--spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()).
--spec(r/3 :: (vhost() | r(atom()), K, resource_name()) ->
- r(K) when is_subtype(K, atom())).
--spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(),
- kind :: K,
- name :: '_'}
- when is_subtype(K, atom())).
--spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) ->
- undefined | r(K) when is_subtype(K, atom())).
--spec(rs/1 :: (r(atom())) -> string()).
+-spec(dirty_read/1 ::
+ ({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')).
+-spec(r/2 :: (rabbit_types:vhost(), K)
+ -> rabbit_types:r3(rabbit_types:vhost(), K, '_')
+ when is_subtype(K, atom())).
+-spec(r/3 ::
+ (rabbit_types:vhost() | rabbit_types:r(atom()), K, resource_name())
+ -> rabbit_types:r3(rabbit_types:vhost(), K, resource_name())
+ when is_subtype(K, atom())).
+-spec(r_arg/4 ::
+ (rabbit_types:vhost() | rabbit_types:r(atom()), K,
+ rabbit_framing:amqp_table(), binary())
+ -> undefined | rabbit_types:r(K)
+ when is_subtype(K, atom())).
+-spec(rs/1 :: (rabbit_types:r(atom())) -> string()).
-spec(enable_cover/0 :: () -> ok_or_error()).
-spec(start_cover/1 :: ([{string(), string()} | string()]) -> 'ok').
-spec(report_cover/0 :: () -> 'ok').
--spec(enable_cover/1 :: (file_path()) -> ok_or_error()).
--spec(report_cover/1 :: (file_path()) -> 'ok').
+-spec(enable_cover/1 :: (file:filename()) -> ok_or_error()).
+-spec(report_cover/1 :: (file:filename()) -> 'ok').
-spec(throw_on_error/2 ::
- (atom(), thunk({error, any()} | {ok, A} | A)) -> A).
+ (atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
-spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]).
--spec(with_user/2 :: (username(), thunk(A)) -> A).
--spec(with_vhost/2 :: (vhost(), thunk(A)) -> A).
--spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A).
+-spec(with_user/2 :: (rabbit_access_control:username(), thunk(A)) -> A).
+-spec(with_vhost/2 :: (rabbit_types:vhost(), thunk(A)) -> A).
+-spec(with_user_and_vhost/3 ::
+ (rabbit_access_control:username(), rabbit_types:vhost(), thunk(A))
+ -> A).
-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A).
-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
--spec(makenode/1 :: ({string(), string()} | string()) -> erlang_node()).
--spec(nodeparts/1 :: (erlang_node() | string()) -> {string(), string()}).
+-spec(makenode/1 :: ({string(), string()} | string()) -> node()).
+-spec(nodeparts/1 :: (node() | string()) -> {string(), string()}).
-spec(cookie_hash/0 :: () -> string()).
--spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()).
+-spec(tcp_name/3 ::
+ (atom(), inet:ip_address(), rabbit_networking:ip_port())
+ -> atom()).
-spec(intersperse/2 :: (A, [A]) -> [A]).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(table_fold/3 :: (fun ((any(), A) -> A), A, atom()) -> A).
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
--spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) ->
- 'ok' | 'aborted').
--spec(dirty_dump_log/1 :: (file_path()) -> ok_or_error()).
--spec(read_term_file/1 :: (file_path()) -> {'ok', [any()]} | {'error', any()}).
--spec(write_term_file/2 :: (file_path(), [any()]) -> ok_or_error()).
--spec(append_file/2 :: (file_path(), string()) -> ok_or_error()).
+-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom())
+ -> 'ok' | 'aborted').
+-spec(dirty_dump_log/1 :: (file:filename()) -> ok_or_error()).
+-spec(read_term_file/1 ::
+ (file:filename()) -> {'ok', [any()]} | rabbit_types:error(any())).
+-spec(write_term_file/2 :: (file:filename(), [any()]) -> ok_or_error()).
+-spec(append_file/2 :: (file:filename(), string()) -> ok_or_error()).
-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(start_applications/1 :: ([atom()]) -> 'ok').
@@ -133,15 +157,18 @@
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
-spec(ceil/1 :: (number()) -> integer()).
-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B).
--spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()).
+-spec(sort_field_table/1 ::
+ (rabbit_framing:amqp_table()) -> rabbit_framing:amqp_table()).
-spec(pid_to_string/1 :: (pid()) -> string()).
-spec(string_to_pid/1 :: (string()) -> pid()).
-spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt').
--spec(version_compare/3 :: (string(), string(),
- ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) -> boolean()).
--spec(recursive_delete/1 :: ([file_path()]) ->
- 'ok' | {'error', {file_path(), any()}}).
--spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
+-spec(version_compare/3 ::
+ (string(), string(), ('lt' | 'lte' | 'eq' | 'gte' | 'gt'))
+ -> boolean()).
+-spec(recursive_delete/1 ::
+ ([file:filename()])
+ -> rabbit_types:ok_or_error({file:filename(), any()})).
+-spec(dict_cons/3 :: (any(), any(), dict:dictionary()) -> dict:dictionary()).
-spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok').
-endif.
@@ -173,7 +200,10 @@ protocol_error(Name, ExplanationFormat, Params) ->
protocol_error(Name, ExplanationFormat, Params, none).
protocol_error(Name, ExplanationFormat, Params, Method) ->
- exit(amqp_error(Name, ExplanationFormat, Params, Method)).
+ protocol_error(amqp_error(Name, ExplanationFormat, Params, Method)).
+
+protocol_error(#amqp_error{} = Error) ->
+ exit(Error).
not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]).
@@ -242,12 +272,12 @@ report_cover([Root]) when is_atom(Root) ->
report_cover(Root) ->
Dir = filename:join(Root, "cover"),
ok = filelib:ensure_dir(filename:join(Dir,"junk")),
- lists:foreach(fun(F) -> file:delete(F) end,
+ lists:foreach(fun (F) -> file:delete(F) end,
filelib:wildcard(filename:join(Dir, "*.html"))),
{ok, SummaryFile} = file:open(filename:join(Dir, "summary.txt"), [write]),
{CT, NCT} =
lists:foldl(
- fun(M,{CovTot, NotCovTot}) ->
+ fun (M,{CovTot, NotCovTot}) ->
{ok, {M, {Cov, NotCov}}} = cover:analyze(M, module),
ok = report_coverage_percentage(SummaryFile,
Cov, NotCov, M),
@@ -367,7 +397,7 @@ upmap(F, L) ->
Parent = self(),
Ref = make_ref(),
[receive {Ref, Result} -> Result end
- || _ <- [spawn(fun() -> Parent ! {Ref, F(X)} end) || X <- L]].
+ || _ <- [spawn(fun () -> Parent ! {Ref, F(X)} end) || X <- L]].
map_in_order(F, L) ->
lists:reverse(
@@ -537,18 +567,24 @@ pid_to_string(Pid) when is_pid(Pid) ->
%% inverse of above
string_to_pid(Str) ->
+ Err = {error, {invalid_pid_syntax, Str}},
%% The \ before the trailing $ is only there to keep emacs
%% font-lock from getting confused.
case re:run(Str, "^<(.*)\\.([0-9]+)\\.([0-9]+)>\$",
[{capture,all_but_first,list}]) of
{match, [NodeStr, IdStr, SerStr]} ->
- %% turn the triple into a pid - see pid_to_string
- <<131,NodeEnc/binary>> = term_to_binary(list_to_atom(NodeStr)),
+ %% the NodeStr atom might be quoted, so we have to parse
+ %% it rather than doing a simple list_to_atom
+ NodeAtom = case erl_scan:string(NodeStr) of
+ {ok, [{atom, _, X}], _} -> X;
+ {error, _, _} -> throw(Err)
+ end,
+ <<131,NodeEnc/binary>> = term_to_binary(NodeAtom),
Id = list_to_integer(IdStr),
Ser = list_to_integer(SerStr),
binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>);
nomatch ->
- throw({error, {invalid_pid_syntax, Str}})
+ throw(Err)
end.
version_compare(A, B, lte) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 55a6761d2d..e2b6927f8c 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -29,11 +29,12 @@
%% Contributor(s): ______________________________________.
%%
+
-module(rabbit_mnesia).
-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
- cluster/1, reset/0, force_reset/0, is_clustered/0,
- empty_ram_only_tables/0]).
+ cluster/1, force_cluster/1, reset/0, force_reset/0,
+ is_clustered/0, empty_ram_only_tables/0]).
-export([table_names/0]).
@@ -47,12 +48,18 @@
-ifdef(use_specs).
--spec(status/0 :: () -> [{'nodes' | 'running_nodes', [erlang_node()]}]).
--spec(dir/0 :: () -> file_path()).
+-export_type([node_type/0]).
+
+-type(node_type() :: disc_only | disc | ram | unknown).
+-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
+ {'running_nodes', [node()]}]).
+-spec(dir/0 :: () -> file:filename()).
-spec(ensure_mnesia_dir/0 :: () -> 'ok').
-spec(init/0 :: () -> 'ok').
-spec(is_db_empty/0 :: () -> boolean()).
--spec(cluster/1 :: ([erlang_node()]) -> 'ok').
+-spec(cluster/1 :: ([node()]) -> 'ok').
+-spec(force_cluster/1 :: ([node()]) -> 'ok').
+-spec(cluster/2 :: ([node()], boolean()) -> 'ok').
-spec(reset/0 :: () -> 'ok').
-spec(force_reset/0 :: () -> 'ok').
-spec(is_clustered/0 :: () -> boolean()).
@@ -64,13 +71,26 @@
%%----------------------------------------------------------------------------
status() ->
- [{nodes, mnesia:system_info(db_nodes)},
+ [{nodes, case mnesia:system_info(is_running) of
+ yes -> [{Key, Nodes} ||
+ {Key, CopyType} <- [{disc_only, disc_only_copies},
+ {disc, disc_copies},
+ {ram, ram_copies}],
+ begin
+ Nodes = mnesia:table_info(schema, CopyType),
+ Nodes =/= []
+ end];
+ no -> case mnesia:system_info(db_nodes) of
+ [] -> [];
+ Nodes -> [{unknown, Nodes}]
+ end
+ end},
{running_nodes, mnesia:system_info(running_db_nodes)}].
init() ->
ok = ensure_mnesia_running(),
ok = ensure_mnesia_dir(),
- ok = init_db(read_cluster_nodes_config()),
+ ok = init_db(read_cluster_nodes_config(), true),
ok = wait_for_tables(),
ok.
@@ -78,16 +98,22 @@ is_db_empty() ->
lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
table_names()).
+cluster(ClusterNodes) ->
+ cluster(ClusterNodes, false).
+force_cluster(ClusterNodes) ->
+ cluster(ClusterNodes, true).
+
%% Alter which disk nodes this node is clustered with. This can be a
%% subset of all the disk nodes in the cluster but can (and should)
%% include the node itself if it is to be a disk rather than a ram
-%% node.
-cluster(ClusterNodes) ->
+%% node. If Force is false, only connections to online nodes are
+%% allowed.
+cluster(ClusterNodes, Force) ->
ok = ensure_mnesia_not_running(),
ok = ensure_mnesia_dir(),
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
try
- ok = init_db(ClusterNodes),
+ ok = init_db(ClusterNodes, Force),
ok = wait_for_tables(),
ok = create_cluster_nodes_config(ClusterNodes)
after
@@ -259,38 +285,56 @@ delete_cluster_nodes_config() ->
%% Take a cluster node config and create the right kind of node - a
%% standalone disk node, or disk or ram node connected to the
-%% specified cluster nodes.
-init_db(ClusterNodes) ->
- case mnesia:change_config(extra_db_nodes, ClusterNodes -- [node()]) of
- {ok, []} ->
- case mnesia:system_info(use_dir) of
- true ->
- case check_schema_integrity() of
- ok ->
- ok;
- {error, Reason} ->
- %% NB: we cannot use rabbit_log here since
- %% it may not have been started yet
- error_logger:warning_msg(
- "schema integrity check failed: ~p~n"
- "moving database to backup location "
- "and recreating schema from scratch~n",
- [Reason]),
- ok = move_db(),
+%% specified cluster nodes. If Force is false, don't allow
+%% connections to offline nodes.
+init_db(ClusterNodes, Force) ->
+ UClusterNodes = lists:usort(ClusterNodes),
+ ProperClusterNodes = UClusterNodes -- [node()],
+ case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of
+ {ok, Nodes} ->
+ case Force of
+ false ->
+ FailedClusterNodes = ProperClusterNodes -- Nodes,
+ case FailedClusterNodes of
+ [] -> ok;
+ _ ->
+ throw({error, {failed_to_cluster_with,
+ FailedClusterNodes,
+ "Mnesia could not connect to some nodes."}})
+ end;
+ _ -> ok
+ end,
+ case Nodes of
+ [] ->
+ case mnesia:system_info(use_dir) of
+ true ->
+ case check_schema_integrity() of
+ ok ->
+ ok;
+ {error, Reason} ->
+ %% NB: we cannot use rabbit_log here since
+ %% it may not have been started yet
+ error_logger:warning_msg(
+ "schema integrity check failed: ~p~n"
+ "moving database to backup location "
+ "and recreating schema from scratch~n",
+ [Reason]),
+ ok = move_db(),
+ ok = create_schema()
+ end;
+ false ->
ok = create_schema()
end;
- false ->
- ok = create_schema()
- end;
- {ok, [_|_]} ->
- IsDiskNode = ClusterNodes == [] orelse
- lists:member(node(), ClusterNodes),
- ok = wait_for_replicated_tables(),
- ok = create_local_table_copy(schema, disc_copies),
- ok = create_local_table_copies(case IsDiskNode of
- true -> disc;
- false -> ram
- end);
+ [_|_] ->
+ IsDiskNode = ClusterNodes == [] orelse
+ lists:member(node(), ClusterNodes),
+ ok = wait_for_replicated_tables(),
+ ok = create_local_table_copy(schema, disc_copies),
+ ok = create_local_table_copies(case IsDiskNode of
+ true -> disc;
+ false -> ram
+ end)
+ end;
{error, Reason} ->
%% one reason we may end up here is if we try to join
%% nodes together that are currently running standalone or
@@ -346,7 +390,7 @@ table_has_copy_type(TabDef, DiscType) ->
create_local_table_copies(Type) ->
lists:foreach(
- fun({Tab, TabDef}) ->
+ fun ({Tab, TabDef}) ->
HasDiscCopies = table_has_copy_type(TabDef, disc_copies),
HasDiscOnlyCopies = table_has_copy_type(TabDef, disc_only_copies),
LocalTab = proplists:get_bool(local_content, TabDef),
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index 336f74bf9a..5db1d77a32 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -111,7 +111,7 @@ action(start_all, [NodeCount], RpcTimeout) ->
action(status, [], RpcTimeout) ->
io:format("Status of all running nodes...~n", []),
call_all_nodes(
- fun({Node, Pid}) ->
+ fun ({Node, Pid}) ->
RabbitRunning =
case is_rabbit_running(Node, RpcTimeout) of
false -> not_running;
@@ -123,7 +123,7 @@ action(status, [], RpcTimeout) ->
action(stop_all, [], RpcTimeout) ->
io:format("Stopping all nodes...~n", []),
- call_all_nodes(fun({Node, Pid}) ->
+ call_all_nodes(fun ({Node, Pid}) ->
io:format("Stopping node ~p~n", [Node]),
rpc:call(Node, rabbit, stop_and_halt, []),
case kill_wait(Pid, RpcTimeout, false) of
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 406977b42a..6baa4b8864 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -31,31 +31,42 @@
-module(rabbit_net).
-include("rabbit.hrl").
--include_lib("kernel/include/inet.hrl").
-export([async_recv/3, close/1, controlling_process/2,
getstat/2, peername/1, port_command/2,
send/2, sockname/1]).
+
%%---------------------------------------------------------------------------
-ifdef(use_specs).
+-export_type([socket/0]).
+
-type(stat_option() ::
'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' |
'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend').
--type(error() :: {'error', any()}).
-
--spec(async_recv/3 :: (socket(), integer(), timeout()) -> {'ok', any()}).
--spec(close/1 :: (socket()) -> 'ok' | error()).
--spec(controlling_process/2 :: (socket(), pid()) -> 'ok' | error()).
+-type(error() :: rabbit_types:error(any())).
+-type(socket() :: rabbit_networking:ip_port() | rabbit_types:ssl_socket()).
+
+-spec(async_recv/3 ::
+ (socket(), integer(), timeout()) -> rabbit_types:ok(any())).
+-spec(close/1 :: (socket()) -> rabbit_types:ok_or_error(any())).
+-spec(controlling_process/2 ::
+ (socket(), pid()) -> rabbit_types:ok_or_error(any())).
-spec(port_command/2 :: (socket(), iolist()) -> 'true').
--spec(send/2 :: (socket(), binary() | iolist()) -> 'ok' | error()).
--spec(peername/1 :: (socket()) ->
- {'ok', {ip_address(), non_neg_integer()}} | error()).
--spec(sockname/1 :: (socket()) ->
- {'ok', {ip_address(), non_neg_integer()}} | error()).
--spec(getstat/2 :: (socket(), [stat_option()]) ->
- {'ok', [{stat_option(), integer()}]} | error()).
+-spec(send/2 ::
+ (socket(), binary() | iolist()) -> rabbit_types:ok_or_error(any())).
+-spec(peername/1 ::
+ (socket())
+ -> rabbit_types:ok({inet:ip_address(), rabbit_networking:ip_port()}) |
+ error()).
+-spec(sockname/1 ::
+ (socket())
+ -> rabbit_types:ok({inet:ip_address(), rabbit_networking:ip_port()}) |
+ error()).
+-spec(getstat/2 ::
+ (socket(), [stat_option()])
+ -> rabbit_types:ok([{stat_option(), integer()}]) | error()).
-endif.
@@ -66,7 +77,7 @@ async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) ->
Pid = self(),
Ref = make_ref(),
- spawn(fun() -> Pid ! {inet_async, Sock, Ref,
+ spawn(fun () -> Pid ! {inet_async, Sock, Ref,
ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)}
end),
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index c3d0b7b786..3a3357ba9d 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -63,25 +63,29 @@
-ifdef(use_specs).
--type(host() :: ip_address() | string() | atom()).
--type(connection() :: pid()).
+-export_type([ip_port/0, hostname/0]).
-spec(start/0 :: () -> 'ok').
--spec(start_tcp_listener/2 :: (host(), ip_port()) -> 'ok').
--spec(start_ssl_listener/3 :: (host(), ip_port(), [info()]) -> 'ok').
--spec(stop_tcp_listener/2 :: (host(), ip_port()) -> 'ok').
--spec(active_listeners/0 :: () -> [listener()]).
--spec(node_listeners/1 :: (erlang_node()) -> [listener()]).
--spec(connections/0 :: () -> [connection()]).
--spec(connection_info_keys/0 :: () -> [info_key()]).
--spec(connection_info/1 :: (connection()) -> [info()]).
--spec(connection_info/2 :: (connection(), [info_key()]) -> [info()]).
--spec(connection_info_all/0 :: () -> [[info()]]).
--spec(connection_info_all/1 :: ([info_key()]) -> [[info()]]).
+-spec(start_tcp_listener/2 :: (hostname(), ip_port()) -> 'ok').
+-spec(start_ssl_listener/3 :: (hostname(), ip_port(), [rabbit_types:info()])
+ -> 'ok').
+-spec(stop_tcp_listener/2 :: (hostname(), ip_port()) -> 'ok').
+-spec(active_listeners/0 :: () -> [rabbit_types:listener()]).
+-spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]).
+-spec(connections/0 :: () -> [rabbit_types:connection()]).
+-spec(connection_info_keys/0 :: () -> [rabbit_types:info_key()]).
+-spec(connection_info/1 ::
+ (rabbit_types:connection()) -> [rabbit_types:info()]).
+-spec(connection_info/2 ::
+ (rabbit_types:connection(), [rabbit_types:info_key()])
+ -> [rabbit_types:info()]).
+-spec(connection_info_all/0 :: () -> [[rabbit_types:info()]]).
+-spec(connection_info_all/1 ::
+ ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]).
-spec(close_connection/2 :: (pid(), string()) -> 'ok').
--spec(on_node_down/1 :: (erlang_node()) -> 'ok').
--spec(check_tcp_listener_address/3 :: (atom(), host(), ip_port()) ->
- {ip_address(), atom()}).
+-spec(on_node_down/1 :: (node()) -> 'ok').
+-spec(check_tcp_listener_address/3 ::
+ (atom(), hostname(), ip_port()) -> {inet:ip_address(), atom()}).
-endif.
@@ -102,7 +106,7 @@ boot_ssl() ->
{ok, []} ->
ok;
{ok, SslListeners} ->
- ok = rabbit_misc:start_applications([crypto, ssl]),
+ ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
{ok, SslOpts} = application:get_env(ssl_options),
[start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners],
ok
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 3cd42e4753..a427b13548 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -65,21 +65,29 @@
-ifdef(use_specs).
--type(pmsg() :: {queue_name(), pkey()}).
+-type(pkey() :: rabbit_guid:guid()).
+-type(pmsg() :: {rabbit_amqqueue:name(), pkey()}).
+
-type(work_item() ::
- {publish, message(), pmsg()} |
+ {publish, rabbit_types:message(), pmsg()} |
{deliver, pmsg()} |
{ack, pmsg()}).
--spec(start_link/1 :: ([queue_name()]) ->
- {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/1 ::
+ ([rabbit_amqqueue:name()])
+ -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
-spec(transaction/1 :: ([work_item()]) -> 'ok').
--spec(extend_transaction/2 :: ({txn(), queue_name()}, [work_item()]) -> 'ok').
+-spec(extend_transaction/2 ::
+ ({rabbit_types:txn(), rabbit_amqqueue:name()}, [work_item()])
+ -> 'ok').
-spec(dirty_work/1 :: ([work_item()]) -> 'ok').
--spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
--spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
+-spec(commit_transaction/1 ::
+ ({rabbit_types:txn(), rabbit_amqqueue:name()}) -> 'ok').
+-spec(rollback_transaction/1 ::
+ ({rabbit_types:txn(), rabbit_amqqueue:name()}) -> 'ok').
-spec(force_snapshot/0 :: () -> 'ok').
--spec(queue_content/1 :: (queue_name()) -> [{message(), boolean()}]).
+-spec(queue_content/1 ::
+ (rabbit_amqqueue:name()) -> [{rabbit_types:message(), boolean()}]).
-endif.
@@ -236,7 +244,7 @@ log_work(CreateWorkUnit, MessageList,
snapshot = Snapshot = #psnapshot{messages = Messages}}) ->
Unit = CreateWorkUnit(
rabbit_misc:map_in_order(
- fun(M = {publish, Message, QK = {_QName, PKey}}) ->
+ fun (M = {publish, Message, QK = {_QName, PKey}}) ->
case ets:lookup(Messages, PKey) of
[_] -> {tied, QK};
[] -> ets:insert(Messages, {PKey, Message}),
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index 274981efed..ef3c5cc250 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -108,6 +108,7 @@ start() ->
WApp == stdlib;
WApp == kernel;
WApp == sasl;
+ WApp == crypto;
WApp == os_mon -> false;
_ -> true
end]),
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
new file mode 100644
index 0000000000..ea3768d4b4
--- /dev/null
+++ b/src/rabbit_queue_collector.erl
@@ -0,0 +1,106 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_queue_collector).
+
+-behaviour(gen_server).
+
+-export([start_link/0, register/2, delete_all/1, shutdown/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {queues}).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> rabbit_types:ok(pid())).
+-spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok').
+-spec(delete_all/1 :: (pid()) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server:start_link(?MODULE, [], []).
+
+register(CollectorPid, Q) ->
+ gen_server:call(CollectorPid, {register, Q}, infinity).
+
+delete_all(CollectorPid) ->
+ gen_server:call(CollectorPid, delete_all, infinity).
+
+shutdown(CollectorPid) ->
+ gen_server:call(CollectorPid, shutdown, infinity).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, #state{queues = dict:new()}}.
+
+%%--------------------------------------------------------------------------
+
+handle_call({register, Q}, _From,
+ State = #state{queues = Queues}) ->
+ MonitorRef = erlang:monitor(process, Q#amqqueue.pid),
+ {reply, ok,
+ State#state{queues = dict:store(MonitorRef, Q, Queues)}};
+
+handle_call(delete_all, _From, State = #state{queues = Queues}) ->
+ [rabbit_misc:with_exit_handler(
+ fun () -> ok end,
+ fun () ->
+ erlang:demonitor(MonitorRef),
+ rabbit_amqqueue:delete(Q, false, false)
+ end)
+ || {MonitorRef, Q} <- dict:to_list(Queues)],
+ {reply, ok, State};
+
+handle_call(shutdown, _From, State) ->
+ {stop, normal, ok, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
+ State = #state{queues = Queues}) ->
+ {noreply, State#state{queues = dict:erase(MonitorRef, Queues)}}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 12ffb331d2..5a9e74c658 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -52,10 +52,13 @@
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 1).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
+-define(SILENT_CLOSE_DELAY, 3).
+-define(FRAME_MAX, 131072). %% set to zero once QPid fix their negotiation
%---------------------------------------------------------------------------
--record(v1, {sock, connection, callback, recv_ref, connection_state}).
+-record(v1, {sock, connection, callback, recv_ref, connection_state,
+ queue_collector}).
-define(INFO_KEYS,
[pid, address, port, peer_address, peer_port,
@@ -100,6 +103,8 @@
%% heartbeat timeout -> *throw*
%% closing:
%% socket close -> *terminate*
+%% receive connection.close -> send connection.close_ok,
+%% *closing*
%% receive frame -> ignore, *closing*
%% handshake_timeout -> ignore, *closing*
%% heartbeat timeout -> *throw*
@@ -116,6 +121,8 @@
%% start terminate_connection timer, *closed*
%% closed:
%% socket close -> *terminate*
+%% receive connection.close -> send connection.close_ok,
+%% *closed*
%% receive connection.close_ok -> self() ! terminate_connection,
%% *closed*
%% receive frame -> ignore, *closed*
@@ -131,11 +138,11 @@
-ifdef(use_specs).
--spec(info_keys/0 :: () -> [info_key()]).
--spec(info/1 :: (pid()) -> [info()]).
--spec(info/2 :: (pid(), [info_key()]) -> [info()]).
+-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
+-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
+-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
--spec(server_properties/0 :: () -> amqp_table()).
+-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
-endif.
@@ -233,6 +240,7 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
ProfilingValue = setup_profiling(),
+ {ok, Collector} = rabbit_queue_collector:start_link(),
try
mainloop(Parent, Deb, switch_callback(
#v1{sock = ClientSock,
@@ -244,7 +252,8 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
client_properties = none},
callback = uninitialized_callback,
recv_ref = none,
- connection_state = pre_init},
+ connection_state = pre_init,
+ queue_collector = Collector},
handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
@@ -262,7 +271,9 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
%% output to be sent, which results in unnecessary delays.
%%
%% gen_tcp:close(ClientSock),
- teardown_profiling(ProfilingValue)
+ teardown_profiling(ProfilingValue),
+ rabbit_queue_collector:shutdown(Collector),
+ rabbit_misc:unlink_and_capture_exit(Collector)
end,
done.
@@ -425,11 +436,17 @@ wait_for_channel_termination(N, TimerRef) ->
exit(channel_termination_timeout)
end.
-maybe_close(State = #v1{connection_state = closing}) ->
+maybe_close(State = #v1{connection_state = closing,
+ queue_collector = Collector}) ->
case all_channels() of
- [] -> ok = send_on_channel0(
- State#v1.sock, #'connection.close_ok'{}),
- close_connection(State);
+ [] ->
+ %% Spec says "Exclusive queues may only be accessed by the current
+ %% connection, and are deleted when that connection closes."
+ %% This does not strictly imply synchrony, but in practice it seems
+ %% to be what people assume.
+ rabbit_queue_collector:delete_all(Collector),
+ ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
+ close_connection(State);
_ -> State
end;
maybe_close(State) ->
@@ -473,10 +490,18 @@ handle_frame(Type, Channel, Payload, State) ->
closing ->
%% According to the spec, after sending a
%% channel.close we must ignore all frames except
+ %% channel.close and channel.close_ok. In the
+ %% event of a channel.close, we should send back a
%% channel.close_ok.
case AnalyzedFrame of
{method, 'channel.close_ok', _} ->
erase({channel, Channel});
+ {method, 'channel.close', _} ->
+ %% We're already closing this channel, so
+ %% there's no cleanup to do (notify
+ %% queues, etc.)
+ ok = rabbit_writer:send_command(State#v1.sock,
+ #'channel.close_ok'{});
_ -> ok
end,
State;
@@ -575,7 +600,11 @@ handle_method0(MethodName, FieldsBin, State) ->
end,
case State#v1.connection_state of
running -> send_exception(State, 0, CompleteReason);
- Other -> throw({channel0_error, Other, CompleteReason})
+ %% We don't trust the client at this point - force
+ %% them to wait for a bit so they can't DOS us with
+ %% repeated failed logins etc.
+ Other -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw({channel0_error, Other, CompleteReason})
end
end.
@@ -589,27 +618,33 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
ok = send_on_channel0(
Sock,
#'connection.tune'{channel_max = 0,
- %% set to zero once QPid fix their negotiation
- frame_max = 131072,
+ frame_max = ?FRAME_MAX,
heartbeat = 0}),
State#v1{connection_state = tuning,
connection = Connection#connection{
user = User,
client_properties = ClientProperties}};
-handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax,
- frame_max = FrameMax,
+handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
connection = Connection,
sock = Sock}) ->
- %% if we have a channel_max limit that the client wishes to
- %% exceed, die as per spec. Not currently a problem, so we ignore
- %% the client's channel_max parameter.
- rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat),
- State#v1{connection_state = opening,
- connection = Connection#connection{
- timeout_sec = ClientHeartbeat,
- frame_max = FrameMax}};
+ if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) ->
+ rabbit_misc:protocol_error(
+ not_allowed, "frame_max=~w < ~w min size",
+ [FrameMax, ?FRAME_MIN_SIZE]);
+ (?FRAME_MAX /= 0) and (FrameMax > ?FRAME_MAX) ->
+ rabbit_misc:protocol_error(
+ not_allowed, "frame_max=~w > ~w max size",
+ [FrameMax, ?FRAME_MAX]);
+ true ->
+ rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat),
+ State#v1{connection_state = opening,
+ connection = Connection#connection{
+ timeout_sec = ClientHeartbeat,
+ frame_max = FrameMax}}
+ end;
+
handle_method0(#'connection.open'{virtual_host = VHostPath,
insist = Insist},
State = #v1{connection_state = opening,
@@ -643,6 +678,14 @@ handle_method0(#'connection.close'{},
State = #v1{connection_state = running}) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
+handle_method0(#'connection.close'{},
+ State = #v1{connection_state = CS,
+ sock = Sock})
+ when CS =:= closing; CS =:= closed ->
+ %% We're already closed or closing, so we don't need to cleanup
+ %% anything.
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}),
+ State;
handle_method0(#'connection.close_ok'{},
State = #v1{connection_state = closed}) ->
self() ! terminate_connection,
@@ -722,15 +765,16 @@ i(Item, #v1{}) ->
%%--------------------------------------------------------------------------
-send_to_new_channel(Channel, AnalyzedFrame, State) ->
+send_to_new_channel(Channel, AnalyzedFrame,
+ State = #v1{queue_collector = Collector}) ->
#v1{sock = Sock, connection = #connection{
frame_max = FrameMax,
user = #user{username = Username},
vhost = VHost}} = State,
WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
ChPid = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/5,
- [Channel, self(), WriterPid, Username, VHost]),
+ fun rabbit_channel:start_link/6,
+ [Channel, self(), WriterPid, Username, VHost, Collector]),
put({channel, Channel}, {chpid, ChPid}),
put({chpid, ChPid}, {channel, Channel}),
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 03979d6c60..d50b9f3126 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -41,7 +41,13 @@
-ifdef(use_specs).
--spec(deliver/2 :: ([pid()], delivery()) -> {routing_result(), [pid()]}).
+-export_type([routing_key/0, routing_result/0]).
+
+-type(routing_key() :: binary()).
+-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
+
+-spec(deliver/2 ::
+ ([pid()], rabbit_types:delivery()) -> {routing_result(), [pid()]}).
-endif.
@@ -57,14 +63,17 @@ deliver(QPids, Delivery = #delivery{mandatory = false,
%% is preserved. This scales much better than the non-immediate
%% case below.
delegate:invoke_no_result(
- QPids, fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
+ QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
{routed, QPids};
deliver(QPids, Delivery) ->
{Success, _} =
delegate:invoke(QPids,
- fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
- {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Success),
+ fun (Pid) ->
+ rabbit_amqqueue:deliver(Pid, Delivery)
+ end),
+ {Routed, Handled} =
+ lists:foldl(fun fold_deliveries/2, {false, []}, Success),
check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
{Routed, Handled}).
@@ -87,13 +96,13 @@ match_routing_key(Name, RoutingKey) ->
lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
lookup_qpids(Queues) ->
- sets:fold(
- fun(Key, Acc) ->
+ lists:foldl(
+ fun (Key, Acc) ->
case mnesia:dirty_read({rabbit_queue, Key}) of
[#amqqueue{pid = QPid}] -> [QPid | Acc];
[] -> Acc
end
- end, [], sets:from_list(Queues)).
+ end, [], lists:usort(Queues)).
%%--------------------------------------------------------------------
diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl
index 434cdae050..eb2037c21b 100644
--- a/src/rabbit_sasl_report_file_h.erl
+++ b/src/rabbit_sasl_report_file_h.erl
@@ -33,7 +33,8 @@
-behaviour(gen_event).
--export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]).
+-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2,
+ code_change/3]).
%% rabbit_sasl_report_file_h is a wrapper around the sasl_report_file_h
%% module because the original's init/1 does not match properly
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 76ebd982f4..ff7c07e37e 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -41,6 +41,7 @@
-import(lists).
-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
-include_lib("kernel/include/file.hrl").
test_content_prop_roundtrip(Datum, Binary) ->
@@ -54,10 +55,12 @@ all_tests() ->
passed = test_pg_local(),
passed = test_unfold(),
passed = test_parsing(),
+ passed = test_content_framing(),
passed = test_topic_matching(),
passed = test_log_management(),
passed = test_app_management(),
passed = test_log_management_during_startup(),
+ passed = test_memory_pressure(),
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_server_status(),
@@ -351,6 +354,41 @@ test_field_values() ->
>>),
passed.
+%% Test that content frames don't exceed frame-max
+test_content_framing(FrameMax, BodyBin) ->
+ [Header | Frames] =
+ rabbit_binary_generator:build_simple_content_frames(
+ 1,
+ rabbit_binary_generator:ensure_content_encoded(
+ rabbit_basic:build_content(#'P_basic'{}, BodyBin)),
+ FrameMax),
+ %% header is formatted correctly and the size is the total of the
+ %% fragments
+ <<_FrameHeader:7/binary, _ClassAndWeight:4/binary,
+ BodySize:64/unsigned, _Rest/binary>> = list_to_binary(Header),
+ BodySize = size(BodyBin),
+ true = lists:all(
+ fun (ContentFrame) ->
+ FrameBinary = list_to_binary(ContentFrame),
+ %% assert
+ <<_TypeAndChannel:3/binary,
+ Size:32/unsigned, _Payload:Size/binary, 16#CE>> =
+ FrameBinary,
+ size(FrameBinary) =< FrameMax
+ end, Frames),
+ passed.
+
+test_content_framing() ->
+ %% no content
+ passed = test_content_framing(4096, <<>>),
+ %% easily fit in one frame
+ passed = test_content_framing(4096, <<"Easy">>),
+ %% exactly one frame (empty frame = 8 bytes)
+ passed = test_content_framing(11, <<"One">>),
+ %% more than one frame
+ passed = test_content_framing(11, <<"More than one frame">>),
+ passed.
+
test_topic_match(P, R) ->
test_topic_match(P, R, true).
@@ -559,19 +597,19 @@ test_cluster_management() ->
ok = control_action(reset, []),
lists:foreach(fun (Arg) ->
- ok = control_action(cluster, Arg),
+ ok = control_action(force_cluster, Arg),
ok
end,
ClusteringSequence),
lists:foreach(fun (Arg) ->
ok = control_action(reset, []),
- ok = control_action(cluster, Arg),
+ ok = control_action(force_cluster, Arg),
ok
end,
ClusteringSequence),
ok = control_action(reset, []),
lists:foreach(fun (Arg) ->
- ok = control_action(cluster, Arg),
+ ok = control_action(force_cluster, Arg),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
ok
@@ -579,7 +617,7 @@ test_cluster_management() ->
ClusteringSequence),
lists:foreach(fun (Arg) ->
ok = control_action(reset, []),
- ok = control_action(cluster, Arg),
+ ok = control_action(force_cluster, Arg),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
ok
@@ -590,13 +628,13 @@ test_cluster_management() ->
ok = control_action(reset, []),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
- ok = control_action(cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
+ ok = control_action(force_cluster, ["invalid1@invalid",
+ "invalid2@invalid"]),
%% join a non-existing cluster as a ram node
ok = control_action(reset, []),
- ok = control_action(cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
+ ok = control_action(force_cluster, ["invalid1@invalid",
+ "invalid2@invalid"]),
SecondaryNode = rabbit_misc:makenode("hare"),
case net_adm:ping(SecondaryNode) of
@@ -621,18 +659,26 @@ test_cluster_management2(SecondaryNode) ->
%% join cluster as a ram node
ok = control_action(reset, []),
- ok = control_action(cluster, [SecondaryNodeS, "invalid1@invalid"]),
+ ok = control_action(force_cluster, [SecondaryNodeS, "invalid1@invalid"]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
%% change cluster config while remaining in same cluster
- ok = control_action(cluster, ["invalid2@invalid", SecondaryNodeS]),
+ ok = control_action(force_cluster, ["invalid2@invalid", SecondaryNodeS]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
%% join non-existing cluster as a ram node
- ok = control_action(cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
+ ok = control_action(force_cluster, ["invalid1@invalid",
+ "invalid2@invalid"]),
+ ok = control_action(start_app, []),
+ ok = control_action(stop_app, []),
+
+ %% join empty cluster as a ram node
+ ok = control_action(cluster, []),
+ ok = control_action(start_app, []),
+ ok = control_action(stop_app, []),
+
%% turn ram node into disk node
ok = control_action(reset, []),
ok = control_action(cluster, [SecondaryNodeS, NodeS]),
@@ -640,8 +686,8 @@ test_cluster_management2(SecondaryNode) ->
ok = control_action(stop_app, []),
%% convert a disk node into a ram node
- ok = control_action(cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
+ ok = control_action(force_cluster, ["invalid1@invalid",
+ "invalid2@invalid"]),
%% turn a disk node into a ram node
ok = control_action(reset, []),
@@ -746,17 +792,17 @@ test_user_management() ->
passed.
test_server_status() ->
-
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
- Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>),
- [Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare(
+ Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
+ self()),
+ [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
+ {new, Queue = #amqqueue{}} <-
+ [rabbit_amqqueue:declare(
rabbit_misc:r(<<"/">>, queue, Name),
- false, false, []) ||
- Name <- [<<"foo">>, <<"bar">>]],
+ false, false, [], none)]],
- ok = rabbit_amqqueue:claim_queue(Q, self()),
- ok = rabbit_amqqueue:basic_consume(Q, true, self(), Ch, undefined,
+ ok = rabbit_amqqueue:basic_consume(Q, true, Ch, undefined,
<<"ctag">>, true, undefined),
%% list queues
@@ -823,7 +869,7 @@ test_hooks() ->
{[arg1, arg2], 1, 3} = get(arg_hook_test_fired),
%% Invoking Pids
- Remote = fun() ->
+ Remote = fun () ->
receive
{rabbitmq_hook,[remote_test,test,[],Target]} ->
Target ! invoked
@@ -840,11 +886,133 @@ test_hooks() ->
end,
passed.
+test_memory_pressure_receiver(Pid) ->
+ receive
+ shutdown ->
+ ok;
+ {send_command, Method} ->
+ ok = case Method of
+ #'channel.flow'{} -> ok;
+ #'basic.qos_ok'{} -> ok;
+ #'channel.open_ok'{} -> ok
+ end,
+ Pid ! Method,
+ test_memory_pressure_receiver(Pid);
+ sync ->
+ Pid ! sync,
+ test_memory_pressure_receiver(Pid)
+ end.
+
+test_memory_pressure_receive_flow(Active) ->
+ receive #'channel.flow'{active = Active} -> ok
+ after 1000 -> throw(failed_to_receive_channel_flow)
+ end,
+ receive #'channel.flow'{} ->
+ throw(pipelining_sync_commands_detected)
+ after 0 ->
+ ok
+ end.
+
+test_memory_pressure_sync(Ch, Writer) ->
+ ok = rabbit_channel:do(Ch, #'basic.qos'{}),
+ Writer ! sync,
+ receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
+ receive #'basic.qos_ok'{} -> ok
+ after 1000 -> throw(failed_to_receive_basic_qos_ok)
+ end.
+
+test_memory_pressure_spawn() ->
+ Me = self(),
+ Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end),
+ Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
+ self()),
+ ok = rabbit_channel:do(Ch, #'channel.open'{}),
+ MRef = erlang:monitor(process, Ch),
+ receive #'channel.open_ok'{} -> ok
+ after 1000 -> throw(failed_to_receive_channel_open_ok)
+ end,
+ {Writer, Ch, MRef}.
+
+expect_normal_channel_termination(MRef, Ch) ->
+ receive {'DOWN', MRef, process, Ch, normal} -> ok
+ after 1000 -> throw(channel_failed_to_exit)
+ end.
+
+test_memory_pressure() ->
+ {Writer0, Ch0, MRef0} = test_memory_pressure_spawn(),
+ [ok = rabbit_channel:conserve_memory(Ch0, Conserve) ||
+ Conserve <- [false, false, true, false, true, true, false]],
+ ok = test_memory_pressure_sync(Ch0, Writer0),
+ receive {'DOWN', MRef0, process, Ch0, Info0} ->
+ throw({channel_died_early, Info0})
+ after 0 -> ok
+ end,
+
+ %% we should have just 1 active=false waiting for us
+ ok = test_memory_pressure_receive_flow(false),
+
+ %% if we reply with flow_ok, we should immediately get an
+ %% active=true back
+ ok = rabbit_channel:do(Ch0, #'channel.flow_ok'{active = false}),
+ ok = test_memory_pressure_receive_flow(true),
+
+ %% if we publish at this point, the channel should die
+ Content = rabbit_basic:build_content(#'P_basic'{}, <<>>),
+ ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content),
+ expect_normal_channel_termination(MRef0, Ch0),
+
+ {Writer1, Ch1, MRef1} = test_memory_pressure_spawn(),
+ ok = rabbit_channel:conserve_memory(Ch1, true),
+ ok = test_memory_pressure_receive_flow(false),
+ ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}),
+ ok = test_memory_pressure_sync(Ch1, Writer1),
+ ok = rabbit_channel:conserve_memory(Ch1, false),
+ ok = test_memory_pressure_receive_flow(true),
+ %% send back the wrong flow_ok. Channel should die.
+ ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}),
+ expect_normal_channel_termination(MRef1, Ch1),
+
+ {_Writer2, Ch2, MRef2} = test_memory_pressure_spawn(),
+ %% just out of the blue, send a flow_ok. Life should end.
+ ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}),
+ expect_normal_channel_termination(MRef2, Ch2),
+
+ {_Writer3, Ch3, MRef3} = test_memory_pressure_spawn(),
+ ok = rabbit_channel:conserve_memory(Ch3, true),
+ receive {'DOWN', MRef3, process, Ch3, _} ->
+ ok
+ after 12000 ->
+ throw(channel_failed_to_exit)
+ end,
+
+ alarm_handler:set_alarm({vm_memory_high_watermark, []}),
+ Me = self(),
+ Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end),
+ Ch4 = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, <<"/">>,
+ self()),
+ ok = rabbit_channel:do(Ch4, #'channel.open'{}),
+ MRef4 = erlang:monitor(process, Ch4),
+ Writer4 ! sync,
+ receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
+ receive #'channel.open_ok'{} -> throw(unexpected_channel_open_ok)
+ after 0 -> ok
+ end,
+ alarm_handler:clear_alarm(vm_memory_high_watermark),
+ Writer4 ! sync,
+ receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
+ receive #'channel.open_ok'{} -> ok
+ after 1000 -> throw(failed_to_receive_channel_open_ok)
+ end,
+ rabbit_channel:shutdown(Ch4),
+ expect_normal_channel_termination(MRef4, Ch4),
+
+ passed.
+
test_delegates_async(SecondaryNode) ->
Self = self(),
- Sender = fun(Pid) -> Pid ! {invoked, Self} end,
+ Sender = fun (Pid) -> Pid ! {invoked, Self} end,
- Responder = make_responder(fun({invoked, Pid}) -> Pid ! response end),
+ Responder = make_responder(fun ({invoked, Pid}) -> Pid ! response end),
ok = delegate:invoke_no_result(spawn(Responder), Sender),
ok = delegate:invoke_no_result(spawn(SecondaryNode, Responder), Sender),
@@ -857,10 +1025,11 @@ test_delegates_async(SecondaryNode) ->
passed.
-make_responder(FMsg) ->
- fun() ->
+make_responder(FMsg) -> make_responder(FMsg, timeout).
+make_responder(FMsg, Throw) ->
+ fun () ->
receive Msg -> FMsg(Msg)
- after 1000 -> throw(timeout)
+ after 1000 -> throw(Throw)
end
end.
@@ -887,24 +1056,28 @@ must_exit(Fun) ->
end.
test_delegates_sync(SecondaryNode) ->
- Sender = fun(Pid) -> gen_server:call(Pid, invoked) end,
- BadSender = fun(_Pid) -> exit(exception) end,
+ Sender = fun (Pid) -> gen_server:call(Pid, invoked) end,
+ BadSender = fun (_Pid) -> exit(exception) end,
- Responder = make_responder(fun({'$gen_call', From, invoked}) ->
+ Responder = make_responder(fun ({'$gen_call', From, invoked}) ->
gen_server:reply(From, response)
end),
+ BadResponder = make_responder(fun ({'$gen_call', From, invoked}) ->
+ gen_server:reply(From, response)
+ end, bad_responder_died),
+
response = delegate:invoke(spawn(Responder), Sender),
response = delegate:invoke(spawn(SecondaryNode, Responder), Sender),
- must_exit(fun() -> delegate:invoke(spawn(Responder), BadSender) end),
- must_exit(fun() ->
- delegate:invoke(spawn(SecondaryNode, Responder), BadSender) end),
+ must_exit(fun () -> delegate:invoke(spawn(BadResponder), BadSender) end),
+ must_exit(fun () ->
+ delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end),
LocalGoodPids = spawn_responders(node(), Responder, 2),
RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2),
- LocalBadPids = spawn_responders(node(), Responder, 2),
- RemoteBadPids = spawn_responders(SecondaryNode, Responder, 2),
+ LocalBadPids = spawn_responders(node(), BadResponder, 2),
+ RemoteBadPids = spawn_responders(SecondaryNode, BadResponder, 2),
{GoodRes, []} = delegate:invoke(LocalGoodPids ++ RemoteGoodPids, Sender),
true = lists:all(fun ({_, response}) -> true end, GoodRes),
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
new file mode 100644
index 0000000000..2e492b80bd
--- /dev/null
+++ b/src/rabbit_types.erl
@@ -0,0 +1,145 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_types).
+
+-include("rabbit.hrl").
+
+-ifdef(use_specs).
+
+-export_type([txn/0, maybe/1, info/0, info_key/0, message/0, basic_message/0,
+ delivery/0, content/0, decoded_content/0, undecoded_content/0,
+ unencoded_content/0, encoded_content/0, vhost/0, ctag/0,
+ amqp_error/0, r/1, r2/2, r3/3, ssl_socket/0, listener/0,
+ binding/0, amqqueue/0, exchange/0, connection/0, user/0,
+ error/1, ok_or_error/1, ok_or_error2/2, ok/1]).
+
+-type(maybe(T) :: T | 'none').
+-type(vhost() :: binary()).
+-type(ctag() :: binary()).
+
+%% TODO: make this more precise by tying specific class_ids to
+%% specific properties
+-type(undecoded_content() ::
+ #content{class_id :: rabbit_framing:amqp_class_id(),
+ properties :: 'none',
+ properties_bin :: binary(),
+ payload_fragments_rev :: [binary()]} |
+ #content{class_id :: rabbit_framing:amqp_class_id(),
+ properties :: rabbit_framing:amqp_property_record(),
+ properties_bin :: 'none',
+ payload_fragments_rev :: [binary()]}).
+-type(unencoded_content() :: undecoded_content()).
+-type(decoded_content() ::
+ #content{class_id :: rabbit_framing:amqp_class_id(),
+ properties :: rabbit_framing:amqp_property_record(),
+ properties_bin :: maybe(binary()),
+ payload_fragments_rev :: [binary()]}).
+-type(encoded_content() ::
+ #content{class_id :: rabbit_framing:amqp_class_id(),
+ properties :: maybe(rabbit_framing:amqp_property_record()),
+ properties_bin :: binary(),
+ payload_fragments_rev :: [binary()]}).
+-type(content() :: undecoded_content() | decoded_content()).
+-type(basic_message() ::
+ #basic_message{exchange_name :: rabbit_exchange:name(),
+ routing_key :: rabbit_router:routing_key(),
+ content :: content(),
+ guid :: rabbit_guid:guid(),
+ is_persistent :: boolean()}).
+-type(message() :: basic_message()).
+-type(delivery() ::
+ #delivery{mandatory :: boolean(),
+ immediate :: boolean(),
+ txn :: maybe(txn()),
+ sender :: pid(),
+ message :: message()}).
+
+%% this is really an abstract type, but dialyzer does not support them
+-type(txn() :: rabbit_guid:guid()).
+
+-type(info_key() :: atom()).
+-type(info() :: {info_key(), any()}).
+
+-type(amqp_error() ::
+ #amqp_error{name :: rabbit_framing:amqp_exception(),
+ explanation :: string(),
+ method :: rabbit_framing:amqp_method_name()}).
+
+-type(r(Kind) ::
+ r2(vhost(), Kind)).
+-type(r2(VirtualHost, Kind) ::
+ r3(VirtualHost, Kind, rabbit_misc:resource_name())).
+-type(r3(VirtualHost, Kind, Name) ::
+ #resource{virtual_host :: VirtualHost,
+ kind :: Kind,
+ name :: Name}).
+
+-type(ssl_socket() :: #ssl_socket{}).
+
+-type(listener() ::
+ #listener{node :: node(),
+ protocol :: atom(),
+ host :: rabbit_networking:hostname(),
+ port :: rabbit_networking:ip_port()}).
+
+-type(binding() ::
+ #binding{exchange_name :: rabbit_exchange:name(),
+ queue_name :: rabbit_amqqueue:name(),
+ key :: rabbit_exchange:binding_key()}).
+
+-type(amqqueue() ::
+ #amqqueue{name :: rabbit_amqqueue:name(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
+ exclusive_owner :: rabbit_types:maybe(pid()),
+ arguments :: rabbit_framing:amqp_table(),
+ pid :: rabbit_types:maybe(pid())}).
+
+-type(exchange() ::
+ #exchange{name :: rabbit_exchange:name(),
+ type :: rabbit_exchange:type(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
+ arguments :: rabbit_framing:amqp_table()}).
+
+-type(connection() :: pid()).
+
+-type(user() ::
+ #user{username :: rabbit_access_control:username(),
+ password :: rabbit_access_control:password()}).
+
+-type(ok(A) :: {'ok', A}).
+-type(error(A) :: {'error', A}).
+-type(ok_or_error(A) :: 'ok' | error(A)).
+-type(ok_or_error2(A, B) :: ok(A) | error(B)).
+
+-endif. % use_specs
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 54c60f5be0..8060203897 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -48,20 +48,37 @@
-ifdef(use_specs).
--spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()).
--spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()).
--spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
--spec(send_command/3 :: (pid(), amqp_method(), content()) -> 'ok').
--spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok').
+-spec(start/3 ::
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ non_neg_integer())
+ -> pid()).
+-spec(start_link/3 ::
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ non_neg_integer())
+ -> pid()).
+-spec(send_command/2 ::
+ (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
+-spec(send_command/3 ::
+ (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content())
+ -> 'ok').
+-spec(send_command_and_signal_back/3 ::
+ (pid(), rabbit_framing:amqp_method(), pid()) -> 'ok').
-spec(send_command_and_signal_back/4 ::
- (pid(), amqp_method(), content(), pid()) -> 'ok').
+ (pid(), rabbit_framing:amqp_method(), rabbit_types:content(), pid())
+ -> 'ok').
-spec(send_command_and_notify/5 ::
- (pid(), pid(), pid(), amqp_method(), content()) -> 'ok').
+ (pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
+ rabbit_types:content())
+ -> 'ok').
-spec(internal_send_command/3 ::
- (socket(), channel_number(), amqp_method()) -> 'ok').
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ rabbit_framing:amqp_method_record())
+ -> 'ok').
-spec(internal_send_command/5 ::
- (socket(), channel_number(), amqp_method(),
- content(), non_neg_integer()) -> 'ok').
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ rabbit_framing:amqp_method_record(), rabbit_types:content(),
+ non_neg_integer())
+ -> 'ok').
-endif.
@@ -149,6 +166,7 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
shutdown(W) ->
W ! shutdown,
+ rabbit_misc:unlink_and_capture_exit(W),
ok.
%---------------------------------------------------------------------------
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 5575351269..03dc0f990f 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -11,20 +11,20 @@
%% All modifications are (C) 2010 LShift Ltd.
%%
%% %CopyrightBegin%
-%%
+%%
%% Copyright Ericsson AB 1996-2009. All Rights Reserved.
-%%
+%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
-%%
+%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
-%%
+%%
%% %CopyrightEnd%
%%
-module(supervisor2).
@@ -301,13 +301,13 @@ handle_call({terminate_child, Name}, _From, State) ->
handle_call(which_children, _From, State) when ?is_simple(State) ->
[#child{child_type = CT, modules = Mods}] = State#state.children,
- Reply = lists:map(fun({Pid, _}) -> {undefined, Pid, CT, Mods} end,
+ Reply = lists:map(fun ({Pid, _}) -> {undefined, Pid, CT, Mods} end,
?DICT:to_list(State#state.dynamics)),
{reply, Reply, State};
handle_call(which_children, _From, State) ->
Resp =
- lists:map(fun(#child{pid = Pid, name = Name,
+ lists:map(fun (#child{pid = Pid, name = Name,
child_type = ChildType, modules = Mods}) ->
{Name, Pid, ChildType, Mods}
end,
@@ -415,7 +415,7 @@ update_childspec1([], Children, KeepOld) ->
lists:reverse(Children ++ KeepOld).
update_chsp(OldCh, Children) ->
- case lists:map(fun(Ch) when OldCh#child.name =:= Ch#child.name ->
+ case lists:map(fun (Ch) when OldCh#child.name =:= Ch#child.name ->
Ch#child{pid = OldCh#child.pid};
(Ch) ->
Ch
@@ -828,7 +828,7 @@ validShutdown(Shutdown, _) -> throw({invalid_shutdown, Shutdown}).
validMods(dynamic) -> true;
validMods(Mods) when is_list(Mods) ->
- lists:foreach(fun(Mod) ->
+ lists:foreach(fun (Mod) ->
if
is_atom(Mod) -> ok;
true -> throw({invalid_module, Mod})
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 3b23daa5c1..cc4982c9cb 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -75,6 +75,13 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
[inet_parse:ntoa(Address), Port,
inet_parse:ntoa(PeerAddress), PeerPort]),
+ %% In the event that somebody floods us with connections we can spew
+ %% the above message at error_logger faster than it can keep up.
+ %% So error_logger's mailbox grows unbounded until we eat all the
+ %% memory available and crash. So here's a meaningless synchronous call
+ %% to the underlying gen_event mechanism - when it returns the mailbox
+ %% is drained.
+ gen_event:which_handlers(error_logger),
%% handle
file_handle_cache:release_on_death(apply(M, F, A ++ [Sock]))
catch {inet_error, Reason} ->
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index cd03fcc6e6..bbc3a8c017 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -72,8 +72,10 @@
-ifdef(use_specs).
--spec(start_link/1 :: (float()) ->
- ('ignore' | {'error', any()} | {'ok', pid()})).
+-spec(start_link/1 ::
+ (float()) -> 'ignore' |
+ rabbit_types:error(any()) |
+ rabbit_types:ok(pid())).
-spec(update/0 :: () -> 'ok').
-spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')).
-spec(get_vm_limit/0 :: () -> (non_neg_integer() | 'unknown')).
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 97e075459f..01ce3535d8 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -52,7 +52,7 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A).
-spec(submit_async/1 ::
(fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl
index 4ded63a8db..afa21164be 100644
--- a/src/worker_pool_sup.erl
+++ b/src/worker_pool_sup.erl
@@ -41,9 +41,9 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
--spec(start_link/1 ::
- (non_neg_integer()) -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
+-spec(start_link/1 :: (non_neg_integer()) ->
+ 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
-endif.
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 57901fd5cf..a61e4cc372 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -44,7 +44,8 @@
-ifdef(use_specs).
--spec(start_link/1 :: (any()) -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/1 ::
+ (any()) -> {'ok', pid()} | 'ignore' | rabbit_types:error(any())).
-spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A).
-spec(submit_async/2 ::
(pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').