diff options
76 files changed, 2155 insertions, 1195 deletions
@@ -10,6 +10,7 @@ syntax: regexp ^cover/ ^dist/ ^include/rabbit_framing\.hrl$ +^include/rabbit_framing_spec\.hrl$ ^src/rabbit_framing\.erl$ ^src/.*\_usage.erl$ ^rabbit\.plt$ @@ -15,7 +15,7 @@ INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL) BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES)) TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS) -WEB_URL=http://stage.rabbitmq.com/ +WEB_URL=http://www.rabbitmq.com/ MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml)) WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml) USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-multi.1.xml @@ -40,11 +40,11 @@ BASIC_PLT=basic.plt RABBIT_PLT=rabbit.plt ifndef USE_SPECS -# our type specs rely on features / bug fixes in dialyzer that are -# only available in R13B01 upwards (R13B01 is eshell 5.7.2) +# our type specs rely on features and bug fixes in dialyzer that are +# only available in R13B04 upwards (R13B04 is erts 5.7.5) # # NB: the test assumes that version number will only contain single digits -USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.1" ]; then echo "true"; else echo "false"; fi) +USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.4" ]; then echo "true"; else echo "false"; fi) endif #other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests @@ -56,7 +56,7 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME) SIBLING_CODEGEN_DIR=../rabbitmq-codegen/ AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen) -AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json +AMQP_SPEC_JSON_FILES=$(AMQP_CODEGEN_DIR)/amqp-0.8.json $(AMQP_CODEGEN_DIR)/rabbitmq-0.8-extensions.json ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e @@ -70,6 +70,24 @@ define usage_dep $(call usage_xml_to_erl, $(1)): $(1) $(DOCS_DIR)/usage.xsl endef +ifneq "$(SBIN_DIR)" "" +ifneq "$(TARGET_DIR)" "" +SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR)) +endif +endif + +# Versions prior to this are not supported +NEED_MAKE := 3.80 +ifneq "$(NEED_MAKE)" "$(firstword $(sort $(NEED_MAKE) $(MAKE_VERSION)))" +$(error Versions of make prior to $(NEED_MAKE) are not supported) +endif + +# .DEFAULT_GOAL introduced in 3.81 +DEFAULT_GOAL_MAKE := 3.81 +ifneq "$(DEFAULT_GOAL_MAKE)" "$(firstword $(sort $(DEFAULT_GOAL_MAKE) $(MAKE_VERSION)))" +.DEFAULT_GOAL=all +endif + all: $(TARGETS) $(DEPS_FILE): $(SOURCES) $(INCLUDES) @@ -81,11 +99,11 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app $(EBIN_DIR)/%.beam: erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< -$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) - $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@ +$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) + $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_FILES) $@ -$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) - $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@ +$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) + $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES) $@ dialyze: $(BEAM_TARGETS) $(BASIC_PLT) $(ERL_EBIN) -eval \ @@ -184,7 +202,7 @@ srcdist: distclean >> $(TARGET_SRC_DIR)/INSTALL cp README.in $(TARGET_SRC_DIR)/README elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \ - >> $(TARGET_SRC_DIR)/BUILD + >> $(TARGET_SRC_DIR)/README sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ @@ -205,9 +223,10 @@ distclean: clean # xmlto can not read from standard input, so we mess with a tmp file. %.gz: %.xml $(DOCS_DIR)/examples-to-end.xsl - xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \ - xmlto man -o $(DOCS_DIR) --stringparam man.indent.verbatims=0 $<.tmp && \ - gzip -f $(DOCS_DIR)/`basename $< .xml` + xmlto --version | grep -E '^xmlto version 0\.0\.([0-9]|1[1-8])$$' >/dev/null || opt='--stringparam man.indent.verbatims=0' ; \ + xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \ + xmlto man -o $(DOCS_DIR) $$opt $<.tmp && \ + gzip -f $(DOCS_DIR)/`basename $< .xml` rm -f $<.tmp # Use tmp files rather than a pipeline so that we get meaningful errors @@ -234,13 +253,7 @@ $(SOURCE_DIR)/%_usage.erl: docs_all: $(MANPAGES) $(WEB_MANPAGES) -install: SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR)) install: all docs_all install_dirs - @[ -n "$(TARGET_DIR)" ] || (echo "Please set TARGET_DIR."; false) - @[ -n "$(SBIN_DIR)" ] || (echo "Please set SBIN_DIR."; false) - @[ -n "$(MAN_DIR)" ] || (echo "Please set MAN_DIR."; false) - - mkdir -p $(TARGET_DIR) cp -r ebin include LICENSE LICENSE-MPL-RabbitMQ INSTALL $(TARGET_DIR) chmod 0755 scripts/* @@ -256,10 +269,16 @@ install: all docs_all install_dirs done install_dirs: - mkdir -p $(SBIN_DIR) + @ OK=true && \ + { [ -n "$(TARGET_DIR)" ] || { echo "Please set TARGET_DIR."; OK=false; }; } && \ + { [ -n "$(SBIN_DIR)" ] || { echo "Please set SBIN_DIR."; OK=false; }; } && \ + { [ -n "$(MAN_DIR)" ] || { echo "Please set MAN_DIR."; OK=false; }; } && $$OK + mkdir -p $(TARGET_DIR)/sbin + mkdir -p $(SBIN_DIR) + mkdir -p $(MAN_DIR) -$(foreach XML, $(USAGES_XML), $(eval $(call usage_dep, $(XML)))) +$(foreach XML,$(USAGES_XML),$(eval $(call usage_dep, $(XML)))) # Note that all targets which depend on clean must have clean in their # name. Also any target that doesn't depend on clean should not have diff --git a/codegen.py b/codegen.py index a24a32ff37..6042e78fcd 100644 --- a/codegen.py +++ b/codegen.py @@ -95,6 +95,27 @@ class PackedMethodBitField: def full(self): return self.count() == 8 +def multiLineFormat(things, prologue, separator, lineSeparator, epilogue, thingsPerLine = 4): + r = [prologue] + i = 0 + for t in things: + if i != 0: + if i % thingsPerLine == 0: + r += [lineSeparator] + else: + r += [separator] + r += [t] + i += 1 + r += [epilogue] + return "".join(r) + +def prettyType(typeName, subTypes, typesPerLine = 4): + """Pretty print a type signature made up of many alternative subtypes""" + sTs = multiLineFormat(subTypes, + "( ", " | ", "\n | ", " )", + thingsPerLine = typesPerLine) + return "-type(%s ::\n %s)." % (typeName, sTs) + def printFileHeader(): print """%% Autogenerated code. Do not edit. %% @@ -317,6 +338,83 @@ def genErl(spec): -export([lookup_amqp_exception/1]). -export([amqp_exception/1]). +""" + print "%% Various types" + print "-ifdef(use_specs)." + + print """-export_type([amqp_table/0, amqp_property_type/0, amqp_method_record/0, + amqp_method_name/0, amqp_method/0, amqp_class_id/0, + amqp_value/0, amqp_array/0, amqp_exception/0, amqp_property_record/0]). + +-type(amqp_field_type() :: + 'longstr' | 'signedint' | 'decimal' | 'timestamp' | + 'table' | 'byte' | 'double' | 'float' | 'long' | + 'short' | 'bool' | 'binary' | 'void'). +-type(amqp_property_type() :: + 'shortstr' | 'longstr' | 'octet' | 'shortint' | 'longint' | + 'longlongint' | 'timestamp' | 'bit' | 'table'). + +-type(amqp_table() :: [{binary(), amqp_field_type(), amqp_value()}]). +-type(amqp_array() :: [{amqp_field_type(), amqp_value()}]). +-type(amqp_value() :: binary() | % longstr + integer() | % signedint + {non_neg_integer(), non_neg_integer()} | % decimal + amqp_table() | + amqp_array() | + byte() | % byte + float() | % double + integer() | % long + integer() | % short + boolean() | % bool + binary() | % binary + 'undefined' | % void + non_neg_integer() % timestamp + ). +""" + + print prettyType("amqp_method_name()", + [m.erlangName() for m in methods]) + print prettyType("amqp_method()", + ["{%s, %s}" % (m.klass.index, m.index) for m in methods], + 6) + print prettyType("amqp_method_record()", + ["#%s{}" % (m.erlangName()) for m in methods]) + fieldNames = set() + for m in methods: + fieldNames.update(m.arguments) + fieldNames = [erlangize(f.name) for f in fieldNames] + print prettyType("amqp_method_field_name()", + fieldNames) + print prettyType("amqp_property_record()", + ["#'P_%s'{}" % erlangize(c.name) for c in spec.allClasses()]) + print prettyType("amqp_exception()", + ["'%s'" % erlangConstantName(c).lower() for (c, v, cls) in spec.constants]) + print prettyType("amqp_exception_code()", + ["%i" % v for (c, v, cls) in spec.constants]) + classIds = set() + for m in spec.allMethods(): + classIds.add(m.klass.index) + print prettyType("amqp_class_id()", + ["%i" % ci for ci in classIds]) + print "-endif. % use_specs" + + print """ +%% Method signatures +-ifdef(use_specs). +-spec(lookup_method_name/1 :: (amqp_method()) -> amqp_method_name()). +-spec(method_id/1 :: (amqp_method_name()) -> amqp_method()). +-spec(method_has_content/1 :: (amqp_method_name()) -> boolean()). +-spec(is_method_synchronous/1 :: (amqp_method_record()) -> boolean()). +-spec(method_record/1 :: (amqp_method_name()) -> amqp_method_record()). +-spec(method_fieldnames/1 :: (amqp_method_name()) -> [amqp_method_field_name()]). +-spec(decode_method_fields/2 :: (amqp_method_name(), binary()) -> amqp_method_record()). +-spec(decode_properties/2 :: (non_neg_integer(), binary()) -> amqp_property_record()). +-spec(encode_method_fields/1 :: (amqp_method_record()) -> binary()). +-spec(encode_properties/1 :: (amqp_method_record()) -> binary()). +-spec(lookup_amqp_exception/1 :: (amqp_exception()) -> {boolean(), amqp_exception_code(), binary()}). +-spec(amqp_exception/1 :: (amqp_exception_code()) -> amqp_exception()). +-endif. % use_specs + bitvalue(true) -> 1; bitvalue(false) -> 0; bitvalue(undefined) -> 0. @@ -397,6 +495,7 @@ def genHrl(spec): for c in spec.allClasses(): print "-record('P_%s', {%s})." % (erlangize(c.name), fieldNameList(c.fields)) + def generateErl(specPath): genErl(AmqpSpec(specPath)) @@ -404,5 +503,6 @@ def generateHrl(specPath): genHrl(AmqpSpec(specPath)) if __name__ == "__main__": - do_main(generateHrl, generateErl) + do_main_dict({"header": generateHrl, + "body": generateErl}) diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl index f2117e2679..662dbea00a 100644 --- a/docs/html-to-website-xml.xsl +++ b/docs/html-to-website-xml.xsl @@ -58,13 +58,13 @@ <!-- Specific instructions to revert the DocBook HTML to be more like our ad-hoc XML schema --> <xsl:template match="div[@class='refsect1'] | div[@class='refnamediv'] | div[@class='refsynopsisdiv']"> - <doc:section name="{@title}"> + <doc:section name="{h2}"> <xsl:apply-templates select="node()"/> </doc:section> </xsl:template> <xsl:template match="div[@class='refsect2']"> - <doc:subsection name="{@title}"> + <doc:subsection name="{h3}"> <xsl:apply-templates select="node()"/> </doc:subsection> </xsl:template> diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 5e2668c1a6..e53a97c2c9 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -270,8 +270,8 @@ <title>Cluster management</title> <variablelist> - <varlistentry> - <term><cmdsynopsis><command>cluster</command> <arg choice="req"><replaceable>clusternode</replaceable></arg></cmdsynopsis></term> + <varlistentry id="cluster"> + <term><cmdsynopsis><command>cluster</command><arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> @@ -281,7 +281,8 @@ </variablelist> <para> Instruct the node to become member of a cluster with the - specified nodes. + specified nodes. To cluster with currently offline nodes, + use <link linkend="force_cluster"><command>force_cluster</command></link>. </para> <para> Cluster nodes can be of two types: disk or ram. Disk nodes @@ -334,6 +335,29 @@ </para> </listitem> </varlistentry> + <varlistentry id="force_cluster"> + <term><cmdsynopsis><command>force_cluster</command><arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>clusternode</term> + <listitem><para>Subset of the nodes of the cluster to which this node should be connected.</para></listitem> + </varlistentry> + </variablelist> + <para> + Instruct the node to become member of a cluster with the + specified nodes. This will succeed even if the specified nodes + are offline. For a more detailed description, see + <link linkend="cluster"><command>cluster</command>.</link> + </para> + <para> + Note that this variant of the cluster command just + ignores the current status of the specified nodes. + Clustering may still fail for a variety of other + reasons. + </para> + </listitem> + </varlistentry> </variablelist> </refsect2> @@ -603,10 +627,12 @@ <para role="example-prefix">For example:</para> <screen role="example">rabbitmqctl list_permissions -p /myvhost</screen> <para role="example"> - This command instructs the RabbitMQ broker to list all the - users which have been granted access to the virtual host - called <command>/myvhost</command>, and the permissions they - have for operations on resources in that virtual host. + This command instructs the RabbitMQ broker to list all + the users which have been granted access to the virtual + host called <command>/myvhost</command>, and the + permissions they have for operations on resources in + that virtual host. Note that an empty string means no + permissions granted. </para> </listitem> </varlistentry> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index bdf407eb93..ce94cafe62 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -11,8 +11,8 @@ rabbit_sup, rabbit_tcp_client_sup]}, {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, -%% we also depend on ssl but it shouldn't be in here as we don't -%% actually want to start it +%% we also depend on crypto, public_key and ssl but they shouldn't be +%% in here as we don't actually want to start it {mod, {rabbit, []}}, {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, {ssl_listeners, []}, diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 4e8ed11418..54a4021d40 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -51,7 +51,8 @@ -record(exchange, {name, type, durable, auto_delete, arguments}). --record(amqqueue, {name, durable, auto_delete, arguments, pid}). +-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, + arguments, pid}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). @@ -67,113 +68,10 @@ -record(ssl_socket, {tcp, ssl}). -record(delivery, {mandatory, immediate, txn, sender, message}). - -record(amqp_error, {name, explanation = "", method = none}). %%---------------------------------------------------------------------------- --ifdef(use_specs). - --include("rabbit_framing_spec.hrl"). - --type(maybe(T) :: T | 'none'). --type(erlang_node() :: atom()). --type(ssl_socket() :: #ssl_socket{}). --type(socket() :: port() | ssl_socket()). --type(thunk(T) :: fun(() -> T)). --type(info_key() :: atom()). --type(info() :: {info_key(), any()}). --type(regexp() :: binary()). --type(file_path() :: string()). - -%% this is really an abstract type, but dialyzer does not support them --type(guid() :: binary()). --type(txn() :: guid()). --type(pkey() :: guid()). --type(r(Kind) :: - #resource{virtual_host :: vhost(), - kind :: Kind, - name :: resource_name()}). --type(queue_name() :: r('queue')). --type(exchange_name() :: r('exchange')). --type(user() :: - #user{username :: username(), - password :: password()}). --type(permission() :: - #permission{configure :: regexp(), - write :: regexp(), - read :: regexp()}). --type(amqqueue() :: - #amqqueue{name :: queue_name(), - durable :: boolean(), - auto_delete :: boolean(), - arguments :: amqp_table(), - pid :: maybe(pid())}). --type(exchange() :: - #exchange{name :: exchange_name(), - type :: exchange_type(), - durable :: boolean(), - auto_delete :: boolean(), - arguments :: amqp_table()}). --type(binding() :: - #binding{exchange_name :: exchange_name(), - queue_name :: queue_name(), - key :: binding_key()}). -%% TODO: make this more precise by tying specific class_ids to -%% specific properties --type(undecoded_content() :: - #content{class_id :: amqp_class_id(), - properties :: 'none', - properties_bin :: binary(), - payload_fragments_rev :: [binary()]} | - #content{class_id :: amqp_class_id(), - properties :: amqp_properties(), - properties_bin :: 'none', - payload_fragments_rev :: [binary()]}). --type(unencoded_content() :: undecoded_content()). --type(decoded_content() :: - #content{class_id :: amqp_class_id(), - properties :: amqp_properties(), - properties_bin :: maybe(binary()), - payload_fragments_rev :: [binary()]}). --type(encoded_content() :: - #content{class_id :: amqp_class_id(), - properties :: maybe(amqp_properties()), - properties_bin :: binary(), - payload_fragments_rev :: [binary()]}). --type(content() :: undecoded_content() | decoded_content()). --type(basic_message() :: - #basic_message{exchange_name :: exchange_name(), - routing_key :: routing_key(), - content :: content(), - guid :: guid(), - is_persistent :: boolean()}). --type(message() :: basic_message()). --type(delivery() :: - #delivery{mandatory :: boolean(), - immediate :: boolean(), - txn :: maybe(txn()), - sender :: pid(), - message :: message()}). -%% this really should be an abstract type --type(msg_id() :: non_neg_integer()). --type(qmsg() :: {queue_name(), pid(), msg_id(), boolean(), message()}). --type(listener() :: - #listener{node :: erlang_node(), - protocol :: atom(), - host :: string() | atom(), - port :: non_neg_integer()}). --type(not_found() :: {'error', 'not_found'}). --type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). --type(amqp_error() :: - #amqp_error{name :: atom(), - explanation :: string(), - method :: atom()}). - --endif. - -%%---------------------------------------------------------------------------- - -define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."). -define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/"). -define(ERTS_MINIMUM, "5.6.3"). diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 1b536dfad1..05dc1464cd 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -31,26 +31,26 @@ -type(fetch_result() :: %% Message, IsDelivered, AckTag, Remaining_Len - ('empty'|{basic_message(), boolean(), ack(), non_neg_integer()})). + ('empty'|{rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})). -type(is_durable() :: boolean()). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). -type(ack_required() :: boolean()). --spec(start/1 :: ([queue_name()]) -> 'ok'). --spec(init/3 :: (queue_name(), is_durable(), attempt_recovery()) -> state()). +-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). +-spec(init/3 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery()) -> state()). -spec(terminate/1 :: (state()) -> state()). -spec(delete_and_terminate/1 :: (state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). --spec(publish/2 :: (basic_message(), state()) -> state()). +-spec(publish/2 :: (rabbit_types:basic_message(), state()) -> state()). -spec(publish_delivered/3 :: - (ack_required(), basic_message(), state()) -> {ack(), state()}). + (ack_required(), rabbit_types:basic_message(), state()) -> {ack(), state()}). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). -spec(ack/2 :: ([ack()], state()) -> state()). --spec(tx_publish/3 :: (txn(), basic_message(), state()) -> state()). --spec(tx_ack/3 :: (txn(), [ack()], state()) -> state()). --spec(tx_rollback/2 :: (txn(), state()) -> {[ack()], state()}). --spec(tx_commit/3 :: (txn(), fun (() -> any()), state()) -> {[ack()], state()}). +-spec(tx_publish/3 :: (rabbit_types:txn(), rabbit_types:basic_message(), state()) -> state()). +-spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()). +-spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}). +-spec(tx_commit/3 :: (rabbit_types:txn(), fun (() -> any()), state()) -> {[ack()], state()}). -spec(requeue/2 :: ([ack()], state()) -> state()). -spec(len/1 :: (state()) -> non_neg_integer()). -spec(is_empty/1 :: (state()) -> boolean()). diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl index 9864f1eb64..f05bcb847f 100644 --- a/include/rabbit_exchange_type_spec.hrl +++ b/include/rabbit_exchange_type_spec.hrl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -31,12 +31,19 @@ -ifdef(use_specs). -spec(description/0 :: () -> [{atom(), any()}]). --spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}). --spec(validate/1 :: (exchange()) -> 'ok'). --spec(create/1 :: (exchange()) -> 'ok'). --spec(recover/2 :: (exchange(), list(binding())) -> 'ok'). --spec(delete/2 :: (exchange(), list(binding())) -> 'ok'). --spec(add_binding/2 :: (exchange(), binding()) -> 'ok'). --spec(remove_bindings/2 :: (exchange(), list(binding())) -> 'ok'). +-spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) + -> {rabbit_router:routing_result(), [pid()]}). +-spec(validate/1 :: (rabbit_types:exchange()) -> 'ok'). +-spec(create/1 :: (rabbit_types:exchange()) -> 'ok'). +-spec(recover/2 :: (rabbit_types:exchange(), + [rabbit_types:binding()]) -> 'ok'). +-spec(delete/2 :: (rabbit_types:exchange(), + [rabbit_types:binding()]) -> 'ok'). +-spec(add_binding/2 :: (rabbit_types:exchange(), + rabbit_types:binding()) -> 'ok'). +-spec(remove_bindings/2 :: (rabbit_types:exchange(), + [rabbit_types:binding()]) -> 'ok'). +-spec(assert_args_equivalence/2 :: (rabbit_types:exchange(), + rabbit_framing:amqp_table()) -> 'ok'). -endif. diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl deleted file mode 100644 index 1a9798998c..0000000000 --- a/include/rabbit_framing_spec.hrl +++ /dev/null @@ -1,60 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - -%% TODO: much of this should be generated - --type(amqp_field_type() :: - 'longstr' | 'signedint' | 'decimal' | 'timestamp' | - 'table' | 'byte' | 'double' | 'float' | 'long' | - 'short' | 'bool' | 'binary' | 'void'). --type(amqp_property_type() :: - 'shortstr' | 'longstr' | 'octet' | 'shortint' | 'longint' | - 'longlongint' | 'timestamp' | 'bit' | 'table'). -%% we could make this more precise but ultimately are limited by -%% dialyzer's lack of support for recursive types --type(amqp_table() :: [{binary(), amqp_field_type(), any()}]). -%% TODO: make this more precise --type(amqp_class_id() :: non_neg_integer()). -%% TODO: make this more precise --type(amqp_properties() :: tuple()). -%% TODO: make this more precise --type(amqp_method() :: tuple()). -%% TODO: make this more precise --type(amqp_method_name() :: atom()). --type(channel_number() :: non_neg_integer()). --type(resource_name() :: binary()). --type(routing_key() :: binary()). --type(username() :: binary()). --type(password() :: binary()). --type(vhost() :: binary()). --type(ctag() :: binary()). --type(exchange_type() :: atom()). --type(binding_key() :: binary()). diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 6926261f79..86675e1e08 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -107,6 +107,12 @@ if [ $1 = 0 ]; then # Leave rabbitmq user and group fi +# Clean out plugin activation state, both on uninstall and upgrade +rm -rf %{_rabbit_erllibdir}/priv +for ext in rel script boot ; do + rm -f %{_rabbit_erllibdir}/ebin/rabbit.$ext +done + %files -f ../%{name}.files %defattr(-,root,root,-) %attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq @@ -122,6 +128,12 @@ fi rm -rf %{buildroot} %changelog +* Wed Jul 14 2010 Emile Joubert <emile@rabbitmq.com> 1.8.1-1 +- New Upstream Release + +* Tue Jun 15 2010 Matthew Sackman <matthew@rabbitmq.com> 1.8.0-1 +- New Upstream Release + * Mon Feb 15 2010 Matthew Sackman <matthew@lshift.net> 1.7.2-1 - New Upstream Release diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf index 97c58ea2f8..db0ed70b61 100755 --- a/packaging/common/rabbitmq-server.ocf +++ b/packaging/common/rabbitmq-server.ocf @@ -35,21 +35,22 @@ ## ## OCF instance parameters -## OCF_RESKEY_multi -## OCF_RESKEY_ctl -## OCF_RESKEY_nodename -## OCF_RESKEY_ip -## OCF_RESKEY_port -## OCF_RESKEY_cluster_config_file -## OCF_RESKEY_config_file -## OCF_RESKEY_log_base -## OCF_RESKEY_mnesia_base -## OCF_RESKEY_server_start_args +## OCF_RESKEY_multi +## OCF_RESKEY_ctl +## OCF_RESKEY_nodename +## OCF_RESKEY_ip +## OCF_RESKEY_port +## OCF_RESKEY_cluster_config_file +## OCF_RESKEY_config_file +## OCF_RESKEY_log_base +## OCF_RESKEY_mnesia_base +## OCF_RESKEY_server_start_args ####################################################################### # Initialization: -. ${OCF_ROOT}/resource.d/heartbeat/.ocf-shellfuncs +: ${OCF_FUNCTIONS_DIR=${OCF_ROOT}/resource.d/heartbeat} +. ${OCF_FUNCTIONS_DIR}/.ocf-shellfuncs ####################################################################### @@ -63,7 +64,7 @@ OCF_RESKEY_log_base_default="/var/log/rabbitmq" : ${OCF_RESKEY_log_base=${OCF_RESKEY_log_base_default}} meta_data() { - cat <<END + cat <<END <?xml version="1.0"?> <!DOCTYPE resource-agent SYSTEM "ra-api-1.dtd"> <resource-agent name="rabbitmq-server"> @@ -113,7 +114,7 @@ The IP address for rabbitmq-server to listen on The IP Port for rabbitmq-server to listen on </longdesc> <shortdesc lang="en">IP Port</shortdesc> -<content type="string" default="" /> +<content type="integer" default="" /> </parameter> <parameter name="cluster_config_file" unique="0" required="0"> @@ -161,7 +162,8 @@ Additional arguments provided to the server on startup <actions> <action name="start" timeout="600" /> <action name="stop" timeout="120" /> -<action name="monitor" timeout="20" interval="10" depth="0" start-delay="0" /> +<action name="status" timeout="20" interval="10" /> +<action name="monitor" timeout="20" interval="10" /> <action name="validate-all" timeout="30" /> <action name="meta-data" timeout="5" /> </actions> @@ -170,8 +172,8 @@ END } rabbit_usage() { - cat <<END -usage: $0 {start|stop|monitor|validate-all|meta-data} + cat <<END +usage: $0 {start|stop|status|monitor|validate-all|meta-data} Expects to have a fully populated OCF RA-compliant environment set. END @@ -202,35 +204,35 @@ export_vars() { rabbit_validate_partial() { if [ ! -x $RABBITMQ_MULTI ]; then - ocf_log err "rabbitmq-server multi $RABBITMQ_MULTI does not exist or is not executable"; - return $OCF_ERR_ARGS; + ocf_log err "rabbitmq-server multi $RABBITMQ_MULTI does not exist or is not executable"; + exit $OCF_ERR_INSTALLED; fi if [ ! -x $RABBITMQ_CTL ]; then - ocf_log err "rabbitmq-server ctl $RABBITMQ_CTL does not exist or is not executable"; - return $OCF_ERR_ARGS; + ocf_log err "rabbitmq-server ctl $RABBITMQ_CTL does not exist or is not executable"; + exit $OCF_ERR_INSTALLED; fi } rabbit_validate_full() { if [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && [ ! -e $RABBITMQ_CLUSTER_CONFIG_FILE ]; then - ocf_log err "rabbitmq-server cluster_config_file $RABBITMQ_CLUSTER_CONFIG_FILE does not exist or is not a file"; - return $OCF_ERR_ARGS; + ocf_log err "rabbitmq-server cluster_config_file $RABBITMQ_CLUSTER_CONFIG_FILE does not exist or is not a file"; + exit $OCF_ERR_INSTALLED; fi if [ ! -z $RABBITMQ_CONFIG_FILE ] && [ ! -e $RABBITMQ_CONFIG_FILE ]; then - ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file"; - return $OCF_ERR_ARGS; + ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file"; + exit $OCF_ERR_INSTALLED; fi if [ ! -z $RABBITMQ_LOG_BASE ] && [ ! -d $RABBITMQ_LOG_BASE ]; then - ocf_log err "rabbitmq-server log_base $RABBITMQ_LOG_BASE does not exist or is not a directory"; - return $OCF_ERR_ARGS; + ocf_log err "rabbitmq-server log_base $RABBITMQ_LOG_BASE does not exist or is not a directory"; + exit $OCF_ERR_INSTALLED; fi if [ ! -z $RABBITMQ_MNESIA_BASE ] && [ ! -d $RABBITMQ_MNESIA_BASE ]; then - ocf_log err "rabbitmq-server mnesia_base $RABBITMQ_MNESIA_BASE does not exist or is not a directory"; - return $OCF_ERR_ARGS; + ocf_log err "rabbitmq-server mnesia_base $RABBITMQ_MNESIA_BASE does not exist or is not a directory"; + exit $OCF_ERR_INSTALLED; fi rabbit_validate_partial @@ -243,25 +245,26 @@ rabbit_status() { $RABBITMQ_CTL $NODENAME_ARG status > /dev/null 2> /dev/null rc=$? case "$rc" in - 0) - return $OCF_SUCCESS - ;; - 2) - return $OCF_NOT_RUNNING - ;; - *) - ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG status: $rc" - return $OCF_ERR_GENERIC + 0) + ocf_log debug "RabbitMQ server is running normally" + return $OCF_SUCCESS + ;; + 2) + ocf_log debug "RabbitMQ server is not running" + return $OCF_NOT_RUNNING + ;; + *) + ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG status: $rc" + exit $OCF_ERR_GENERIC esac } rabbit_start() { local rc - rabbit_validate_full - rc=$? - if [ "$rc" != $OCF_SUCCESS ]; then - return $rc + if rabbit_status; then + ocf_log info "Resource already running." + return $OCF_SUCCESS fi export_vars @@ -270,24 +273,23 @@ rabbit_start() { rc=$? if [ "$rc" != 0 ]; then - ocf_log err "rabbitmq-server start command failed: $RABBITMQ_MULTI start_all 1, $rc" - return $rc + ocf_log err "rabbitmq-server start command failed: $RABBITMQ_MULTI start_all 1, $rc" + return $rc fi # Spin waiting for the server to come up. # Let the CRM/LRM time us out if required start_wait=1 while [ $start_wait = 1 ]; do - rabbit_status - rc=$? - if [ "$rc" = $OCF_SUCCESS ]; then - start_wait=0 - - elif [ "$rc" != $OCF_NOT_RUNNING ]; then - ocf_log info "rabbitmq-server start failed: $rc" - return $OCF_ERR_GENERIC - fi - sleep 2 + rabbit_status + rc=$? + if [ "$rc" = $OCF_SUCCESS ]; then + start_wait=0 + elif [ "$rc" != $OCF_NOT_RUNNING ]; then + ocf_log info "rabbitmq-server start failed: $rc" + exit $OCF_ERR_GENERIC + fi + sleep 1 done return $OCF_SUCCESS @@ -295,28 +297,34 @@ rabbit_start() { rabbit_stop() { local rc + + if ! rabbit_status; then + ocf_log info "Resource not running." + return $OCF_SUCCESS + fi + $RABBITMQ_MULTI stop_all & rc=$? if [ "$rc" != 0 ]; then - ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_MULTI stop_all, $rc" - return $rc + ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_MULTI stop_all, $rc" + return $rc fi # Spin waiting for the server to shut down. # Let the CRM/LRM time us out if required stop_wait=1 while [ $stop_wait = 1 ]; do - rabbit_status - rc=$? - if [ "$rc" = $OCF_NOT_RUNNING ]; then - stop_wait=0 + rabbit_status + rc=$? + if [ "$rc" = $OCF_NOT_RUNNING ]; then + stop_wait=0 break - elif [ "$rc" != $OCF_SUCCESS ]; then - ocf_log info "rabbitmq-server stop failed: $rc" - return $OCF_ERR_GENERIC - fi - sleep 2 + elif [ "$rc" != $OCF_SUCCESS ]; then + ocf_log info "rabbitmq-server stop failed: $rc" + exit $OCF_ERR_GENERIC + fi + sleep 1 done return $OCF_SUCCESS @@ -329,34 +337,38 @@ rabbit_monitor() { case $__OCF_ACTION in meta-data) - meta_data - exit $OCF_SUCCESS - ;; + meta_data + exit $OCF_SUCCESS + ;; usage|help) - rabbit_usage - exit $OCF_SUCCESS - ;; + rabbit_usage + exit $OCF_SUCCESS + ;; esac -rabbit_validate_partial || exit +if ocf_is_probe; then + rabbit_validate_partial +else + rabbit_validate_full +fi case $__OCF_ACTION in start) - rabbit_start + rabbit_start ;; stop) - rabbit_stop + rabbit_stop ;; - monitor) - rabbit_monitor + status|monitor) + rabbit_monitor ;; validate-all) exit $OCF_SUCCESS - ;; + ;; *) - rabbit_usage - exit $OCF_ERR_UNIMPLEMENTED - ;; + rabbit_usage + exit $OCF_ERR_UNIMPLEMENTED + ;; esac -exit $?
\ No newline at end of file +exit $? diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 63b50749e1..0dccf93879 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,15 @@ +rabbitmq-server (1.8.1-1) lucid; urgency=low + + * New Upstream Release + + -- Emile Joubert <emile@rabbitmq.com> Wed, 14 Jul 2010 15:05:24 +0100 + +rabbitmq-server (1.8.0-1) intrepid; urgency=low + + * New Upstream Release + + -- Matthew Sackman <matthew@rabbitmq.com> Tue, 15 Jun 2010 12:48:48 +0100 + rabbitmq-server (1.7.2-1) intrepid; urgency=low * New Upstream Release diff --git a/packaging/debs/Debian/debian/postrm.in b/packaging/debs/Debian/debian/postrm.in index bfcf1f530e..5290de9b17 100644 --- a/packaging/debs/Debian/debian/postrm.in +++ b/packaging/debs/Debian/debian/postrm.in @@ -18,6 +18,13 @@ set -e # for details, see http://www.debian.org/doc/debian-policy/ or # the debian-policy package +remove_plugin_traces() { + # Remove traces of plugins + rm -rf @RABBIT_LIB@/priv @RABBIT_LIB@/plugins + for ext in rel script boot ; do + rm -f @RABBIT_LIB@/ebin/rabbit.$ext + done +} case "$1" in purge) @@ -34,11 +41,7 @@ case "$1" in if [ -d /etc/rabbitmq ]; then rm -r /etc/rabbitmq fi - # Remove traces of plugins - rm -rf @RABBIT_LIB@/priv @RABBIT_LIB@/plugins - for ext in rel script boot ; do - rm -f @RABBIT_LIB@/ebin/rabbit.$ext - done + remove_plugin_traces if getent passwd rabbitmq >/dev/null; then # Stop epmd if run by the rabbitmq user pkill -u rabbitmq epmd || : @@ -50,7 +53,11 @@ case "$1" in fi ;; - remove|upgrade|failed-upgrade|abort-install|abort-upgrade|disappear) + remove|upgrade) + remove_plugin_traces + ;; + + failed-upgrade|abort-install|abort-upgrade|disappear) ;; *) diff --git a/packaging/macports/Makefile b/packaging/macports/Makefile index 0ef7dd5e73..3a22eef08a 100644 --- a/packaging/macports/Makefile +++ b/packaging/macports/Makefile @@ -31,15 +31,22 @@ $(DEST)/Portfile: Portfile.in -f checksums.sed <$^ >$@ rm checksums.sed +# The purpose of the intricate substitution below is to set up similar +# environment vars to the ones that su will on Linux. On OS X, we +# have to use the -m option to su in order to be able to set the shell +# (which for the rabbitmq user would otherwise be /dev/null). But the +# -m option means that *all* environment vars get preserved. Erlang +# needs vars such as HOME to be set. So we have to set them +# explicitly. macports: dirs $(DEST)/Portfile for f in rabbitmq-asroot-script-wrapper rabbitmq-script-wrapper ; do \ cp $(COMMON_DIR)/$$f $(DEST)/files ; \ done - sed -i -e 's|@SU_RABBITMQ_SH_C@|SHELL=/bin/sh su -m rabbitmq -c|' \ + sed -i -e 's|@SU_RABBITMQ_SH_C@|SHELL=/bin/sh HOME=/var/lib/rabbitmq USER=rabbitmq LOGNAME=rabbitmq PATH="$$(eval `PATH=MACPORTS_PREFIX/bin /usr/libexec/path_helper -s`; echo $$PATH)" su -m rabbitmq -c|' \ $(DEST)/files/rabbitmq-script-wrapper cp patch-org.macports.rabbitmq-server.plist.diff $(DEST)/files if [ -n "$(MACPORTS_USERHOST)" ] ; then \ - tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) lshift@macrabbit ' \ + tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) $(MACPORTS_USERHOST) ' \ d="/tmp/mkportindex.$$$$" ; \ mkdir $$d \ && cd $$d \ @@ -52,4 +59,4 @@ macports: dirs $(DEST)/Portfile fi clean: - rm -rf $(DEST) checksums.sed + rm -rf $(MACPORTS_DIR) checksums.sed diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in index 153727be9a..be0d24d75f 100644 --- a/packaging/macports/Portfile.in +++ b/packaging/macports/Portfile.in @@ -4,9 +4,8 @@ PortSystem 1.0 name rabbitmq-server version @VERSION@ -revision 1 categories net -maintainers rabbitmq.com:tonyg +maintainers paperplanes.de:meyer rabbitmq.com:tonyg openmaintainer platforms darwin description The RabbitMQ AMQP Server long_description \ @@ -23,8 +22,8 @@ checksums \ sha1 @sha1@ \ rmd160 @rmd160@ -depends_build port:erlang port:xmlto port:libxslt -depends_run port:erlang +depends_lib port:erlang +depends_build port:xmlto port:libxslt platform darwin 7 { depends_build-append port:py25-simplejson @@ -98,6 +97,8 @@ post-destroot { xinstall -m 555 ${filespath}/rabbitmq-asroot-script-wrapper \ ${wrappersbin}/rabbitmq-activate-plugins + reinplace -E "s:MACPORTS_PREFIX/bin:${prefix}/bin:" \ + ${wrappersbin}/rabbitmq-multi reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \ ${wrappersbin}/rabbitmq-multi reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \ diff --git a/packaging/macports/make-port-diff.sh b/packaging/macports/make-port-diff.sh new file mode 100755 index 0000000000..3eb1b9f589 --- /dev/null +++ b/packaging/macports/make-port-diff.sh @@ -0,0 +1,27 @@ +#!/bin/bash + +# This script grabs the latest rabbitmq-server bits from the main +# macports subversion repo, and from the rabbitmq.com macports repo, +# and produces a diff from the former to the latter for submission +# through the macports trac. + +set -e + +dir=/tmp/$(basename $0).$$ +mkdir -p $dir/macports $dir/rabbitmq + +# Get the files from the macports subversion repo +cd $dir/macports +svn checkout http://svn.macports.org/repository/macports/trunk/dports/net/rabbitmq-server/ 2>&1 >/dev/null + +# Clear out the svn $id tag +sed -i -e 's|^# \$.*$|# $Id$|' rabbitmq-server/Portfile + +# Get the files from the rabbitmq.com macports repo +cd ../rabbitmq +curl -s http://www.rabbitmq.com/releases/macports/net/rabbitmq-server.tgz | tar xzf - + +cd .. +diff -Naur --exclude=.svn macports rabbitmq +cd / +rm -rf $dir diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi index 8341d35c8c..5905069204 100755 --- a/scripts/rabbitmq-multi +++ b/scripts/rabbitmq-multi @@ -29,7 +29,8 @@ ## ## Contributor(s): ______________________________________. ## -NODENAME=rabbit +[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s` +NODENAME=rabbit@${HOSTNAME%%.*} SCRIPT_HOME=$(dirname $0) PIDS_FILE=/var/lib/rabbitmq/pids MULTI_ERL_ARGS= diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat index a4b7f2e99b..a4f8c8b448 100644 --- a/scripts/rabbitmq-multi.bat +++ b/scripts/rabbitmq-multi.bat @@ -42,8 +42,12 @@ if "!RABBITMQ_BASE!"=="" ( set RABBITMQ_BASE=!APPDATA!\RabbitMQ
)
+if "!COMPUTERNAME!"=="" (
+ set COMPUTERNAME=localhost
+)
+
if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index ccdfc40160..2261b56ef5 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -30,7 +30,8 @@ ## Contributor(s): ______________________________________. ## -NODENAME=rabbit +[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s` +NODENAME=rabbit@${HOSTNAME%%.*} SERVER_ERL_ARGS="+K true +A30 +P 1048576 \ -kernel inet_default_listen_options [{nodelay,true}] \ -kernel inet_default_connect_options [{nodelay,true}]" diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 57fe1328ce..a290f9356c 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -42,8 +42,12 @@ if "!RABBITMQ_BASE!"=="" ( set RABBITMQ_BASE=!APPDATA!\RabbitMQ
)
+if "!COMPUTERNAME!"=="" (
+ set COMPUTERNAME=localhost
+)
+
if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index a4021fd6a1..bd117b83f4 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -48,8 +48,12 @@ if "!RABBITMQ_BASE!"=="" ( set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
+if "!COMPUTERNAME!"=="" (
+ set COMPUTERNAME=localhost
+)
+
if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index cfb775eb67..92e5312bb2 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -30,7 +30,8 @@ ## Contributor(s): ______________________________________. ## -NODENAME=rabbit +[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s` +NODENAME=rabbit@${HOSTNAME%%.*} . `dirname $0`/rabbitmq-env diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index 5557245165..563b9e58e9 100644 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -38,8 +38,12 @@ set TDP0=%~dp0 set STAR=%*
setlocal enabledelayedexpansion
+if "!COMPUTERNAME!"=="" (
+ set COMPUTERNAME=localhost
+)
+
if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
if not exist "!ERLANG_HOME!\bin\erl.exe" (
diff --git a/src/delegate.erl b/src/delegate.erl index 12eb814f8f..3f57953bf7 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -44,9 +44,10 @@ -ifdef(use_specs). --spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()}). --spec(invoke_no_result/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok'). --spec(invoke/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A). +-spec(start_link/1 :: (non_neg_integer()) -> rabbit_types:ok(pid())). +-spec(invoke_no_result/2 :: + (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). +-spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A). -spec(process_count/0 :: () -> non_neg_integer()). @@ -63,7 +64,7 @@ start_link(Hash) -> gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []). invoke(Pid, Fun) when is_pid(Pid) -> - [Res] = invoke_per_node([{node(Pid), [Pid]}], Fun), + [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun), case Res of {ok, Result, _} -> Result; @@ -73,7 +74,7 @@ invoke(Pid, Fun) when is_pid(Pid) -> invoke(Pids, Fun) when is_list(Pids) -> lists:foldl( - fun({Status, Result, Pid}, {Good, Bad}) -> + fun ({Status, Result, Pid}, {Good, Bad}) -> case Status of ok -> {[{Pid, Result}|Good], Bad}; error -> {Good, [{Pid, Result}|Bad]} @@ -83,7 +84,7 @@ invoke(Pids, Fun) when is_list(Pids) -> invoke_per_node(split_delegate_per_node(Pids), Fun)). invoke_no_result(Pid, Fun) when is_pid(Pid) -> - invoke_no_result_per_node([{node(Pid), [Pid]}], Fun), + invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun), ok; invoke_no_result(Pids, Fun) when is_list(Pids) -> @@ -99,42 +100,47 @@ internal_cast(Node, Thunk) when is_atom(Node) -> gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}). split_delegate_per_node(Pids) -> - orddict:to_list( - lists:foldl( - fun (Pid, D) -> - orddict:update(node(Pid), - fun (Pids1) -> [Pid | Pids1] end, - [Pid], D) - end, - orddict:new(), Pids)). + LocalNode = node(), + {Local, Remote} = + lists:foldl( + fun (Pid, {L, D}) -> + Node = node(Pid), + case Node of + LocalNode -> {[Pid|L], D}; + _ -> {L, orddict:append(Node, Pid, D)} + end + end, + {[], orddict:new()}, Pids), + {Local, orddict:to_list(Remote)}. -invoke_per_node([{Node, Pids}], Fun) when Node == node() -> - safe_invoke(Pids, Fun); invoke_per_node(NodePids, Fun) -> lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)). -invoke_no_result_per_node([{Node, Pids}], Fun) when Node == node() -> - %% This is not actually async! However, in practice Fun will - %% always be something that does a gen_server:cast or similar, so - %% I don't think it's a problem unless someone misuses this - %% function. Making this *actually* async would be painful as we - %% can't spawn at this point or we break effect ordering. - safe_invoke(Pids, Fun); invoke_no_result_per_node(NodePids, Fun) -> delegate_per_node(NodePids, Fun, fun internal_cast/2), ok. -delegate_per_node(NodePids, Fun, DelegateFun) -> +delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) -> + %% In the case where DelegateFun is internal_cast, the safe_invoke + %% is not actually async! However, in practice Fun will always be + %% something that does a gen_server:cast or similar, so I don't + %% think it's a problem unless someone misuses this + %% function. Making this *actually* async would be painful as we + %% can't spawn at this point or we break effect ordering. + [safe_invoke(LocalPids, Fun)| + delegate_per_remote_node(NodePids, Fun, DelegateFun)]. + +delegate_per_remote_node(NodePids, Fun, DelegateFun) -> Self = self(), %% Note that this is unsafe if the Fun requires reentrancy to the %% local_server. I.e. if self() == local_server(Node) then we'll %% block forever. [gen_server2:cast( local_server(Node), - {thunk, fun() -> + {thunk, fun () -> Self ! {result, DelegateFun( - Node, fun() -> safe_invoke(Pids, Fun) end)} + Node, fun () -> safe_invoke(Pids, Fun) end)} end}) || {Node, Pids} <- NodePids], [receive {result, Result} -> Result end || _ <- NodePids]. diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index 1c1d62a95d..39ef3f85b8 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -43,7 +43,7 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(start_link/0 :: () -> rabbit_types:ok_or_error2(pid(), any()) | 'ignore'). -endif. diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 0f648dcd2b..e209ee6be4 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -182,18 +182,18 @@ -ifdef(use_specs). -type(ref() :: any()). --type(error() :: {'error', any()}). --type(ok_or_error() :: ('ok' | error())). --type(val_or_error(T) :: ({'ok', T} | error())). +-type(ok_or_error() :: rabbit_types:ok_or_error(any())). +-type(val_or_error(T) :: rabbit_types:ok_or_error2(T, any())). -type(position() :: ('bof' | 'eof' | non_neg_integer() | - {('bof' |'eof'), non_neg_integer()} | {'cur', integer()})). + {('bof' |'eof'), non_neg_integer()} | + {'cur', integer()})). -type(offset() :: non_neg_integer()). -spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok'). -spec(open/3 :: - (string(), [any()], - [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}]) -> - val_or_error(ref())). + (string(), [any()], + [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}]) + -> val_or_error(ref())). -spec(close/1 :: (ref()) -> ok_or_error()). -spec(read/2 :: (ref(), non_neg_integer()) -> val_or_error([char()] | binary()) | 'eof'). diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 5b899cdbc7..49ae63c1d5 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -186,7 +186,7 @@ -ifdef(use_specs). -spec(handle_common_termination/6 :: - (any(), any(), any(), atom(), any(), any()) -> no_return()). + (any(), any(), any(), atom(), any(), any()) -> no_return()). -spec(hibernate/7 :: (pid(), any(), any(), atom(), any(), queue(), any()) -> no_return()). @@ -639,7 +639,7 @@ do_multi_call(Nodes, Name, Req, Timeout) -> Caller = self(), Receiver = spawn( - fun() -> + fun () -> %% Middleman process. Should be unsensitive to regular %% exit signals. The sychronization is needed in case %% the receiver would exit before the caller started diff --git a/src/pg_local.erl b/src/pg_local.erl index 1501331d6b..f5ded123d7 100644 --- a/src/pg_local.erl +++ b/src/pg_local.erl @@ -36,8 +36,8 @@ -export([join/2, leave/2, get_members/1]). -export([sync/0]). %% intended for testing only; not part of official API --export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2, - terminate/2]). +-export([start/0, start_link/0, init/1, handle_call/3, handle_cast/2, + handle_info/2, terminate/2]). %%---------------------------------------------------------------------------- @@ -45,8 +45,8 @@ -type(name() :: term()). --spec(start_link/0 :: () -> {'ok', pid()} | {'error', term()}). --spec(start/0 :: () -> {'ok', pid()} | {'error', term()}). +-spec(start_link/0 :: () -> rabbit_types:ok_or_error2(pid(), term())). +-spec(start/0 :: () -> rabbit_types:ok_or_error2(pid(), term())). -spec(join/2 :: (name(), pid()) -> 'ok'). -spec(leave/2 :: (name(), pid()) -> 'ok'). -spec(get_members/1 :: (name()) -> [pid()]). diff --git a/src/rabbit.erl b/src/rabbit.erl index 67f8df947b..18045b94fc 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -33,7 +33,8 @@ -behaviour(application). --export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, rotate_logs/1]). +-export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, + rotate_logs/1]). -export([start/2, stop/1]). @@ -183,18 +184,19 @@ -ifdef(use_specs). --type(log_location() :: 'tty' | 'undefined' | string()). -type(file_suffix() :: binary()). +%% this really should be an abstract type +-type(log_location() :: 'tty' | 'undefined' | file:filename()). -spec(prepare/0 :: () -> 'ok'). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_halt/0 :: () -> 'ok'). --spec(rotate_logs/1 :: (file_suffix()) -> 'ok' | {'error', any()}). --spec(status/0 :: () -> - [{running_applications, [{atom(), string(), string()}]} | - {nodes, [erlang_node()]} | - {running_nodes, [erlang_node()]}]). +-spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())). +-spec(status/0 :: + () -> [{running_applications, [{atom(), string(), string()}]} | + {nodes, [{rabbit_mnesia:node_type(), [node()]}]} | + {running_nodes, [node()]}]). -spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()). -endif. @@ -299,6 +301,18 @@ run_boot_step({StepName, Attributes}) -> ok end. +module_attributes(Module) -> + case catch Module:module_info(attributes) of + {'EXIT', {undef, [{Module, module_info, _} | _]}} -> + io:format("WARNING: module ~p not found, so not scanned for boot steps.~n", + [Module]), + []; + {'EXIT', Reason} -> + exit(Reason); + V -> + V + end. + boot_steps() -> AllApps = [App || {App, _, _} <- application:loaded_applications()], Modules = lists:usort( @@ -310,7 +324,7 @@ boot_steps() -> lists:flatmap(fun (Module) -> [{StepName, Attributes} || {rabbit_boot_step, [{StepName, Attributes}]} - <- Module:module_info(attributes)] + <- module_attributes(Module)] end, Modules), sort_boot_steps(UnsortedSteps). diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index a445f44197..30bae25e5a 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -45,28 +45,38 @@ -ifdef(use_specs). +-export_type([username/0, password/0]). + -type(permission_atom() :: 'configure' | 'read' | 'write'). +-type(username() :: binary()). +-type(password() :: binary()). +-type(regexp() :: binary()). --spec(check_login/2 :: (binary(), binary()) -> user()). --spec(user_pass_login/2 :: (username(), password()) -> user()). --spec(check_vhost_access/2 :: (user(), vhost()) -> 'ok'). +-spec(check_login/2 :: (binary(), binary()) -> rabbit_types:user()). +-spec(user_pass_login/2 :: (username(), password()) -> rabbit_types:user()). +-spec(check_vhost_access/2 :: + (rabbit_types:user(), rabbit_types:vhost()) -> 'ok'). -spec(check_resource_access/3 :: - (username(), r(atom()), permission_atom()) -> 'ok'). + (username(), rabbit_types:r(atom()), permission_atom()) -> 'ok'). -spec(add_user/2 :: (username(), password()) -> 'ok'). -spec(delete_user/1 :: (username()) -> 'ok'). -spec(change_password/2 :: (username(), password()) -> 'ok'). -spec(list_users/0 :: () -> [username()]). --spec(lookup_user/1 :: (username()) -> {'ok', user()} | not_found()). --spec(add_vhost/1 :: (vhost()) -> 'ok'). --spec(delete_vhost/1 :: (vhost()) -> 'ok'). --spec(list_vhosts/0 :: () -> [vhost()]). --spec(set_permissions/5 :: - (username(), vhost(), regexp(), regexp(), regexp()) -> 'ok'). --spec(clear_permissions/2 :: (username(), vhost()) -> 'ok'). +-spec(lookup_user/1 :: + (username()) -> rabbit_types:ok(rabbit_types:user()) + | rabbit_types:error('not_found')). +-spec(add_vhost/1 :: (rabbit_types:vhost()) -> 'ok'). +-spec(delete_vhost/1 :: (rabbit_types:vhost()) -> 'ok'). +-spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]). +-spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(), + regexp(), regexp()) -> 'ok'). +-spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok'). -spec(list_vhost_permissions/1 :: - (vhost()) -> [{username(), regexp(), regexp(), regexp()}]). + (rabbit_types:vhost()) + -> [{username(), regexp(), regexp(), regexp()}]). -spec(list_user_permissions/1 :: - (username()) -> [{vhost(), regexp(), regexp(), regexp()}]). + (username()) + -> [{rabbit_types:vhost(), regexp(), regexp(), regexp()}]). -endif. @@ -162,9 +172,14 @@ check_resource_access(Username, [] -> false; [#user_permission{permission = P}] -> + PermRegexp = case element(permission_index(Permission), P) of + %% <<"^$">> breaks Emacs' erlang mode + <<"">> -> <<$^, $$>>; + RE -> RE + end, case regexp:match( binary_to_list(Name), - binary_to_list(element(permission_index(Permission), P))) of + binary_to_list(PermRegexp)) of {match, _, _} -> true; nomatch -> false end diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 7e96d9a3a8..53c713e66b 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -47,7 +47,7 @@ -type(mfa_tuple() :: {atom(), atom(), list()}). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(register/2 :: (pid(), mfa_tuple()) -> 'ok'). +-spec(register/2 :: (pid(), mfa_tuple()) -> boolean()). -endif. @@ -67,9 +67,9 @@ stop() -> ok = alarm_handler:delete_alarm_handler(?MODULE). register(Pid, HighMemMFA) -> - ok = gen_event:call(alarm_handler, ?MODULE, - {register, Pid, HighMemMFA}, - infinity). + gen_event:call(alarm_handler, ?MODULE, + {register, Pid, HighMemMFA}, + infinity). %%---------------------------------------------------------------------------- @@ -84,7 +84,8 @@ handle_call({register, Pid, {M, F, A} = HighMemMFA}, false -> ok end, NewAlertees = dict:store(Pid, HighMemMFA, Alertess), - {ok, ok, State#alarms{alertees = NewAlertees}}; + {ok, State#alarms.vm_memory_high_watermark, + State#alarms{alertees = NewAlertees}}; handle_call(_Request, State) -> {ok, not_understood, State}. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7b88c45d26..f1b527681c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,18 +31,18 @@ -module(rabbit_amqqueue). --export([start/0, declare/4, delete/3, purge/1]). +-export([start/0, declare/5, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1, maybe_run_queue_via_backing_queue/2, update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2]). -export([pseudo_queue/2]). --export([lookup/1, with/2, with_or_die/2, - stat/1, stat_all/0, deliver/2, requeue/3, ack/4]). +-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, + check_exclusive_access/2, with_exclusive_access_or_die/3, + stat/1, deliver/2, requeue/3, ack/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([consumers/1, consumers_all/1]). --export([claim_queue/2]). --export([basic_get/3, basic_consume/8, basic_cancel/4]). +-export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, unblock/2, flush_all/2]). -export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). @@ -59,64 +59,94 @@ -ifdef(use_specs). --type(qstats() :: {'ok', queue_name(), non_neg_integer(), non_neg_integer()}). --type(qlen() :: {'ok', non_neg_integer()}). --type(qfun(A) :: fun ((amqqueue()) -> A)). +-export_type([name/0, qmsg/0]). + +-type(name() :: rabbit_types:r('queue')). + +-type(qlen() :: rabbit_types:ok(non_neg_integer())). +-type(qfun(A) :: fun ((rabbit_types:amqqueue()) -> A)). +-type(qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}). +-type(msg_id() :: non_neg_integer()). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). -spec(start/0 :: () -> 'ok'). --spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) -> - amqqueue()). --spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). --spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). --spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A). --spec(list/1 :: (vhost()) -> [amqqueue()]). --spec(info_keys/0 :: () -> [info_key()]). --spec(info/1 :: (amqqueue()) -> [info()]). --spec(info/2 :: (amqqueue(), [info_key()]) -> [info()]). --spec(info_all/1 :: (vhost()) -> [[info()]]). --spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). --spec(consumers/1 :: (amqqueue()) -> [{pid(), ctag(), boolean()}]). +-spec(declare/5 :: + (name(), boolean(), boolean(), + rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) + -> {'new' | 'existing', rabbit_types:amqqueue()}). +-spec(lookup/1 :: + (name()) -> rabbit_types:ok(rabbit_types:amqqueue()) | + rabbit_types:error('not_found')). +-spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')). +-spec(with_or_die/2 :: (name(), qfun(A)) -> A). +-spec(assert_equivalence/5 :: + (rabbit_types:amqqueue(), boolean(), boolean(), + rabbit_framing:amqp_table(), rabbit_types:maybe(pid)) + -> ok). +-spec(check_exclusive_access/2 :: (rabbit_types:amqqueue(), pid()) -> 'ok'). +-spec(with_exclusive_access_or_die/3 :: (name(), pid(), qfun(A)) -> A). +-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:amqqueue()]). +-spec(info_keys/0 :: () -> [rabbit_types:info_key()]). +-spec(info/1 :: (rabbit_types:amqqueue()) -> [rabbit_types:info()]). +-spec(info/2 :: + (rabbit_types:amqqueue(), [rabbit_types:info_key()]) + -> [rabbit_types:info()]). +-spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]). +-spec(info_all/2 :: (rabbit_types:vhost(), [rabbit_types:info_key()]) + -> [[rabbit_types:info()]]). +-spec(consumers/1 :: + (rabbit_types:amqqueue()) + -> [{pid(), rabbit_types:ctag(), boolean()}]). -spec(consumers_all/1 :: - (vhost()) -> [{queue_name(), pid(), ctag(), boolean()}]). --spec(stat/1 :: (amqqueue()) -> qstats()). --spec(stat_all/0 :: () -> [qstats()]). + (rabbit_types:vhost()) + -> [{name(), pid(), rabbit_types:ctag(), boolean()}]). +-spec(stat/1 :: + (rabbit_types:amqqueue()) + -> {'ok', non_neg_integer(), non_neg_integer()}). -spec(delete/3 :: - (amqqueue(), 'false', 'false') -> qlen(); - (amqqueue(), 'true' , 'false') -> qlen() | {'error', 'in_use'}; - (amqqueue(), 'false', 'true' ) -> qlen() | {'error', 'not_empty'}; - (amqqueue(), 'true' , 'true' ) -> qlen() | - {'error', 'in_use'} | - {'error', 'not_empty'}). --spec(purge/1 :: (amqqueue()) -> qlen()). --spec(deliver/2 :: (pid(), delivery()) -> boolean()). + (rabbit_types:amqqueue(), 'false', 'false') + -> qlen(); + (rabbit_types:amqqueue(), 'true' , 'false') + -> qlen() | rabbit_types:error('in_use'); + (rabbit_types:amqqueue(), 'false', 'true' ) + -> qlen() | rabbit_types:error('not_empty'); + (rabbit_types:amqqueue(), 'true' , 'true' ) + -> qlen() | + rabbit_types:error('in_use') | + rabbit_types:error('not_empty')). +-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()). +-spec(deliver/2 :: (pid(), rabbit_types:delivery()) -> boolean()). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). --spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). --spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). --spec(rollback_all/3 :: ([pid()], txn(), pid()) -> 'ok'). +-spec(ack/4 :: + (pid(), rabbit_types:maybe(rabbit_types:txn()), [msg_id()], pid()) + -> 'ok'). +-spec(commit_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> ok_or_errors()). +-spec(rollback_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> 'ok'). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). --spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). --spec(basic_get/3 :: (amqqueue(), pid(), boolean()) -> +-spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). --spec(basic_consume/8 :: - (amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(), - boolean(), any()) -> - 'ok' | {'error', 'queue_owned_by_another_connection' | - 'exclusive_consume_unavailable'}). --spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). +-spec(basic_consume/7 :: + (rabbit_types:amqqueue(), boolean(), pid(), pid() | 'undefined', + rabbit_types:ctag(), boolean(), any()) + -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). +-spec(basic_cancel/4 :: + (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). --spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue() | 'not_found'). --spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). --spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> A))) -> 'ok'). +-spec(internal_declare/2 :: + (rabbit_types:amqqueue(), boolean()) + -> rabbit_types:amqqueue() | 'not_found'). +-spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found')). +-spec(maybe_run_queue_via_backing_queue/2 :: + (pid(), (fun ((A) -> A))) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). --spec(set_ram_duration_target/2 :: (pid(), number()) -> 'ok'). +-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). --spec(on_node_down/1 :: (erlang_node()) -> 'ok'). --spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). +-spec(on_node_down/1 :: (node()) -> 'ok'). +-spec(pseudo_queue/2 :: (binary(), pid()) -> rabbit_types:amqqueue()). -endif. @@ -148,11 +178,12 @@ recover_durable_queues(DurableQueues) -> Qs = [start_queue_process(Q) || Q <- DurableQueues], [Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q]. -declare(QueueName, Durable, AutoDelete, Args) -> +declare(QueueName, Durable, AutoDelete, Args, Owner) -> Q = start_queue_process(#amqqueue{name = QueueName, durable = Durable, auto_delete = AutoDelete, arguments = Args, + exclusive_owner = Owner, pid = none}), case gen_server2:call(Q#amqqueue.pid, {init, false}) of not_found -> rabbit_misc:not_found(QueueName); @@ -197,7 +228,8 @@ start_queue_process(Q) -> add_default_binding(#amqqueue{name = QueueName}) -> Exchange = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, - rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []), + rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [], + fun (_X, _Q) -> ok end), ok. lookup(Name) -> @@ -214,6 +246,31 @@ with(Name, F) -> with_or_die(Name, F) -> with(Name, F, fun () -> rabbit_misc:not_found(Name) end). +assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q, + Durable, AutoDelete, _Args, Owner) -> + check_exclusive_access(Q, Owner, strict); +assert_equivalence(#amqqueue{name = QueueName}, + _Durable, _AutoDelete, _Args, _Owner) -> + rabbit_misc:protocol_error( + not_allowed, "parameters for ~s not equivalent", + [rabbit_misc:rs(QueueName)]). + +check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax). + +check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) -> + ok; +check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) -> + ok; +check_exclusive_access(#amqqueue{name = QueueName}, _ReaderPid, _MatchType) -> + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QueueName)]). + +with_exclusive_access_or_die(Name, ReaderPid, F) -> + with_or_die(Name, + fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end). + list(VHostPath) -> mnesia:dirty_match_object( rabbit_queue, @@ -248,9 +305,6 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). -stat_all() -> - lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)). - delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). @@ -269,7 +323,7 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> true. requeue(QPid, MsgIds, ChPid) -> - delegate_cast(QPid, {requeue, MsgIds, ChPid}). + delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity). ack(QPid, Txn, MsgIds, ChPid) -> delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). @@ -298,15 +352,12 @@ limit_all(QPids, ChPid, LimiterPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end). -claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - delegate_call(QPid, {claim_queue, ReaderPid}, infinity). - basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> delegate_call(QPid, {basic_get, ChPid, NoAck}, infinity). -basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, +basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - delegate_call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + delegate_call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, infinity). @@ -324,19 +375,21 @@ flush_all(QPids, ChPid) -> delegate:invoke_no_result( QPids, fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end). +internal_delete1(QueueName) -> + ok = mnesia:delete({rabbit_queue, QueueName}), + ok = mnesia:delete({rabbit_durable_queue, QueueName}), + %% we want to execute some things, as + %% decided by rabbit_exchange, after the + %% transaction. + rabbit_exchange:delete_queue_bindings(QueueName). + internal_delete(QueueName) -> case rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [_] -> - ok = mnesia:delete({rabbit_queue, QueueName}), - ok = mnesia:delete({rabbit_durable_queue, QueueName}), - %% we want to execute some things, as - %% decided by rabbit_exchange, after the - %% transaction. - rabbit_exchange:delete_queue_bindings(QueueName) + [_] -> internal_delete1(QueueName) end end) of Err = {error, _} -> Err; @@ -394,15 +447,13 @@ safe_delegate_call_ok(H, F, Pids) -> end. delegate_call(Pid, Msg, Timeout) -> - delegate:invoke(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end). + delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end). delegate_pcall(Pid, Pri, Msg, Timeout) -> - delegate:invoke(Pid, fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). - -delegate_cast(Pid, Msg) -> - delegate:invoke_no_result(Pid, fun(P) -> gen_server2:cast(P, Msg) end). + delegate:invoke(Pid, + fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). delegate_pcast(Pid, Pri, Msg) -> delegate:invoke_no_result(Pid, - fun(P) -> gen_server2:pcast(P, Pri, Msg) end). + fun (P) -> gen_server2:pcast(P, Pri, Msg) end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f12e1b70f8..2fb60e9675 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). --define(UNSENT_MESSAGE_LIMIT, 100). +-define(UNSENT_MESSAGE_LIMIT, 100). -define(SYNC_INTERVAL, 5). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). @@ -50,7 +50,6 @@ % Queue's state -record(q, {q, - owner, exclusive_consumer, has_had_consumers, backing_queue, @@ -104,7 +103,6 @@ init(Q) -> {ok, BQ} = application:get_env(backing_queue_module), {ok, #q{q = Q#amqqueue{pid = self()}, - owner = none, exclusive_consumer = none, has_had_consumers = false, backing_queue = BQ, @@ -134,6 +132,23 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- +declare(Recover, From, + State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, + backing_queue = BQ, backing_queue_state = undefined}) -> + case rabbit_amqqueue:internal_declare(Q, Recover) of + not_found -> {stop, normal, not_found, State}; + Q -> gen_server2:reply(From, {new, Q}), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, + [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, + set_ram_duration_target, [self()]}), + BQS = BQ:init(QName, IsDurable, Recover), + noreply(State#q{backing_queue_state = BQS}); + Q1 -> {stop, normal, {existing, Q1}, State} + end. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), @@ -433,10 +448,6 @@ cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) -> cancel_holder(_ChPid, _ConsumerTag, Holder) -> Holder. -check_queue_owner(none, _) -> ok; -check_queue_owner({ReaderPid, _}, ReaderPid) -> ok; -check_queue_owner({_, _}, _) -> mismatch. - check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) -> in_use; check_exclusive_access(none, false, _State) -> @@ -488,10 +499,10 @@ i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete; i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments; i(pid, _) -> self(); -i(owner_pid, #q{owner = none}) -> +i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) -> ''; -i(owner_pid, #q{owner = {ReaderPid, _MonitorRef}}) -> - ReaderPid; +i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) -> + ExclusiveOwner; i(exclusive_consumer_pid, #q{exclusive_consumer = none}) -> ''; i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) -> @@ -520,25 +531,24 @@ i(Item, _) -> %--------------------------------------------------------------------------- handle_call({init, Recover}, From, - State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, - backing_queue = BQ, backing_queue_state = undefined}) -> - %% TODO: If we're exclusively owned && our owner isn't alive && - %% Recover then we should BQ:init and then {stop, normal, - %% not_found, State}, relying on terminate to delete the queue. - case rabbit_amqqueue:internal_declare(Q, Recover) of - not_found -> - {stop, normal, not_found, State}; - Q -> - gen_server2:reply(From, Q), - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [self()]), - ok = rabbit_memory_monitor:register( - self(), - {rabbit_amqqueue, set_ram_duration_target, [self()]}), - noreply(State#q{backing_queue_state = - BQ:init(QName, IsDurable, Recover)}); - Q1 -> - {stop, normal, Q1, State} + State = #q{q = #amqqueue{exclusive_owner = none}}) -> + declare(Recover, From, State); + +handle_call({init, Recover}, From, + State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> + case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of + true -> erlang:monitor(process, Owner), + declare(Recover, From, State); + _ -> #q{q = #amqqueue{name = QName, durable = IsDurable}, + backing_queue = BQ, backing_queue_state = undefined} = State, + case Recover of + true -> ok; + _ -> rabbit_log:warning( + "Queue ~p exclusive owner went away~n", [QName]) + end, + BQS = BQ:init(QName, IsDurable, Recover), + %% Rely on terminate to delete the queue. + {stop, normal, not_found, State#q{backing_queue_state = BQS}} end; handle_call(info, _From, State) -> @@ -613,51 +623,44 @@ handle_call({basic_get, ChPid, NoAck}, _From, reply({ok, Remaining, Msg}, State#q{backing_queue_state = BQS1}) end; -handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, +handle_call({basic_consume, NoAck, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, - _From, State = #q{owner = Owner, - exclusive_consumer = ExistingHolder}) -> - case check_queue_owner(Owner, ReaderPid) of - mismatch -> - reply({error, queue_owned_by_another_connection}, State); + _From, State = #q{exclusive_consumer = ExistingHolder}) -> + case check_exclusive_access(ExistingHolder, ExclusiveConsume, + State) of + in_use -> + reply({error, exclusive_consume_unavailable}, State); ok -> - case check_exclusive_access(ExistingHolder, ExclusiveConsume, - State) of - in_use -> - reply({error, exclusive_consume_unavailable}, State); - ok -> - C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), - Consumer = #consumer{tag = ConsumerTag, - ack_required = not NoAck}, - store_ch_record(C#cr{consumer_count = ConsumerCount +1, - limiter_pid = LimiterPid}), - case ConsumerCount of - 0 -> ok = rabbit_limiter:register(LimiterPid, self()); - _ -> ok - end, - ExclusiveConsumer = case ExclusiveConsume of - true -> {ChPid, ConsumerTag}; - false -> ExistingHolder - end, - State1 = State#q{has_had_consumers = true, - exclusive_consumer = ExclusiveConsumer}, - ok = maybe_send_reply(ChPid, OkMsg), - State2 = - case is_ch_blocked(C) of - true -> State1#q{ - blocked_consumers = - add_consumer( - ChPid, Consumer, - State1#q.blocked_consumers)}; - false -> run_message_queue( - State1#q{ - active_consumers = - add_consumer( - ChPid, Consumer, - State1#q.active_consumers)}) - end, - reply(ok, State2) - end + C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), + Consumer = #consumer{tag = ConsumerTag, + ack_required = not NoAck}, + store_ch_record(C#cr{consumer_count = ConsumerCount +1, + limiter_pid = LimiterPid}), + ok = case ConsumerCount of + 0 -> rabbit_limiter:register(LimiterPid, self()); + _ -> ok + end, + ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; + true -> ExistingHolder + end, + State1 = State#q{has_had_consumers = true, + exclusive_consumer = ExclusiveConsumer}, + ok = maybe_send_reply(ChPid, OkMsg), + State2 = + case is_ch_blocked(C) of + true -> State1#q{ + blocked_consumers = + add_consumer( + ChPid, Consumer, + State1#q.blocked_consumers)}; + false -> run_message_queue( + State1#q{ + active_consumers = + add_consumer( + ChPid, Consumer, + State1#q.active_consumers)}) + end, + reply(ok, State2) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, @@ -689,11 +692,10 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, end end; -handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, - backing_queue = BQ, +handle_call(stat, _From, State = #q{backing_queue = BQ, backing_queue_state = BQS, active_consumers = ActiveConsumers}) -> - reply({ok, Name, BQ:len(BQS), queue:len(ActiveConsumers)}, State); + reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> @@ -713,27 +715,17 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, {Count, BQS1} = BQ:purge(BQS), reply({ok, Count}, State#q{backing_queue_state = BQS1}); -handle_call({claim_queue, ReaderPid}, _From, - State = #q{owner = Owner, exclusive_consumer = Holder}) -> - case Owner of - none -> - case check_exclusive_access(Holder, true, State) of - in_use -> - %% FIXME: Is this really the right answer? What if - %% an active consumer's reader is actually the - %% claiming pid? Should that be allowed? In order - %% to check, we'd need to hold not just the ch - %% pid for each consumer, but also its reader - %% pid... - reply(locked, State); - ok -> - MonitorRef = erlang:monitor(process, ReaderPid), - reply(ok, State#q{owner = {ReaderPid, MonitorRef}}) - end; - {ReaderPid, _MonitorRef} -> - reply(ok, State); - _ -> - reply(locked, State) +handle_call({requeue, AckTags, ChPid}, From, State) -> + gen_server2:reply(From, ok), + case lookup_ch(ChPid) of + not_found -> + rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", + [ChPid]), + noreply(State); + C = #cr{acktags = ChAckTags} -> + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + store_ch_record(C#cr{acktags = ChAckTags1}), + noreply(requeue_and_run(AckTags, State)) end; handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> @@ -763,18 +755,6 @@ handle_cast({ack, Txn, AckTags, ChPid}, handle_cast({rollback, Txn, ChPid}, State) -> noreply(rollback_transaction(Txn, ChPid, State)); -handle_cast({requeue, AckTags, ChPid}, State) -> - case lookup_ch(ChPid) of - not_found -> - rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", - [ChPid]), - noreply(State); - C = #cr{acktags = ChAckTags} -> - ChAckTags1 = subtract_acks(ChAckTags, AckTags), - store_ch_record(C#cr{acktags = ChAckTags1}), - noreply(requeue_and_run(AckTags, State)) - end; - handle_cast({unblock, ChPid}, State) -> noreply( possibly_unblock(State, ChPid, @@ -825,19 +805,15 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State). -handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, - State = #q{owner = {DownPid, MonitorRef}}) -> - %% We know here that there are no consumers on this queue that are - %% owned by other pids than the one that just went down, so since - %% exclusive in some sense implies autodelete, we delete the queue - %% here. The other way of implementing the "exclusive implies - %% autodelete" feature is to actually set autodelete when an - %% exclusive declaration is seen, but this has the problem that - %% the python tests rely on the queue not going away after a - %% basic.cancel when the queue was declared exclusive and - %% nonautodelete. - NewState = State#q{owner = none}, - {stop, normal, NewState}; +handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, + State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> + %% Exclusively owned queues must disappear with their owner. In + %% the case of clean shutdown we delete the queue synchronously in + %% the reader - although not required by the spec this seems to + %% match what people expect (see bug 21824). However we need this + %% monitor-and-async- delete in case the connection goes away + %% unexpectedly. + {stop, normal, State}; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> case handle_ch_down(DownPid, State) of {ok, NewState} -> noreply(NewState); diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 2dba00ad62..432d62900b 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 4ab7a2a0b1..03a19961fd 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -42,24 +42,41 @@ -ifdef(use_specs). --type(properties_input() :: (amqp_properties() | [{atom(), any()}])). --type(publish_result() :: ({ok, routing_result(), [pid()]} | not_found())). - --spec(publish/1 :: (delivery()) -> publish_result()). --spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) -> - delivery()). --spec(message/4 :: (exchange_name(), routing_key(), properties_input(), - binary()) -> (message() | {'error', any()})). --spec(properties/1 :: (properties_input()) -> amqp_properties()). --spec(publish/4 :: (exchange_name(), routing_key(), properties_input(), - binary()) -> publish_result()). --spec(publish/7 :: (exchange_name(), routing_key(), boolean(), boolean(), - maybe(txn()), properties_input(), binary()) -> - publish_result()). --spec(build_content/2 :: (amqp_properties(), binary()) -> content()). --spec(from_content/1 :: (content()) -> {amqp_properties(), binary()}). +-type(properties_input() :: + (rabbit_framing:amqp_property_record() | [{atom(), any()}])). +-type(publish_result() :: + ({ok, rabbit_router:routing_result(), [pid()]} + | rabbit_types:error('not_found'))). + +-spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). +-spec(delivery/4 :: + (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), + rabbit_types:message()) + -> rabbit_types:delivery()). +-spec(message/4 :: + (rabbit_exchange:name(), rabbit_router:routing_key(), + properties_input(), binary()) + -> (rabbit_types:message() | rabbit_types:error(any()))). +-spec(properties/1 :: + (properties_input()) -> rabbit_framing:amqp_property_record()). +-spec(publish/4 :: + (rabbit_exchange:name(), rabbit_router:routing_key(), + properties_input(), binary()) + -> publish_result()). +-spec(publish/7 :: + (rabbit_exchange:name(), rabbit_router:routing_key(), + boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), + properties_input(), binary()) + -> publish_result()). +-spec(build_content/2 :: + (rabbit_framing:amqp_property_record(), binary()) + -> rabbit_types:content()). +-spec(from_content/1 :: + (rabbit_types:content()) + -> {rabbit_framing:amqp_property_record(), binary()}). -spec(is_message_persistent/1 :: - (decoded_content()) -> (boolean() | {'invalid', non_neg_integer()})). + (rabbit_types:decoded_content()) + -> (boolean() | {'invalid', non_neg_integer()})). -endif. diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index b188c42628..2f48a5d4fe 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -58,15 +58,21 @@ -type(frame() :: [binary()]). -spec(build_simple_method_frame/2 :: - (channel_number(), amqp_method()) -> frame()). + (rabbit_channel:channel_number(), rabbit_framing:amqp_method_record()) + -> frame()). -spec(build_simple_content_frames/3 :: - (channel_number(), content(), non_neg_integer()) -> [frame()]). + (rabbit_channel:channel_number(), rabbit_types:content(), + non_neg_integer()) + -> [frame()]). -spec(build_heartbeat_frame/0 :: () -> frame()). --spec(generate_table/1 :: (amqp_table()) -> binary()). --spec(encode_properties/2 :: ([amqp_property_type()], [any()]) -> binary()). +-spec(generate_table/1 :: (rabbit_framing:amqp_table()) -> binary()). +-spec(encode_properties/2 :: + ([rabbit_framing:amqp_property_type()], [any()]) -> binary()). -spec(check_empty_content_body_frame_size/0 :: () -> 'ok'). --spec(ensure_content_encoded/1 :: (content()) -> encoded_content()). --spec(clear_encoded_content/1 :: (content()) -> unencoded_content()). +-spec(ensure_content_encoded/1 :: + (rabbit_types:content()) -> rabbit_types:encoded_content()). +-spec(clear_encoded_content/1 :: + (rabbit_types:content()) -> rabbit_types:unencoded_content()). -spec(map_exception/2 :: (non_neg_integer(), amqp_error()) -> {bool(), non_neg_integer(), amqp_method()}). @@ -121,10 +127,11 @@ build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc, [Frag | Frags], BodyPayloadMax, ChannelInt) -> Size = size(Frag), {NewFragSizeRem, NewFragAcc, NewFrags} = - case Size =< FragSizeRem of - true -> {FragSizeRem - Size, [Frag | FragAcc], Frags}; - false -> <<Head:FragSizeRem/binary, Tail/binary>> = Frag, - {0, [Head | FragAcc], [Tail | Frags]} + if Size == 0 -> {FragSizeRem, FragAcc, Frags}; + Size =< FragSizeRem -> {FragSizeRem - Size, [Frag | FragAcc], Frags}; + true -> <<Head:FragSizeRem/binary, Tail/binary>> = + Frag, + {0, [Head | FragAcc], [Tail | Frags]} end, build_content_frames(SizeAcc, FramesAcc, NewFragSizeRem, NewFragAcc, NewFrags, BodyPayloadMax, ChannelInt). @@ -341,4 +348,4 @@ amqp_exception_explanation(Text, Expl) -> CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>, if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>; true -> CompleteTextBin - end.
\ No newline at end of file + end. diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index e022a1fafe..69e34440b8 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -42,10 +42,13 @@ -ifdef(use_specs). --spec(parse_table/1 :: (binary()) -> amqp_table()). --spec(parse_properties/2 :: ([amqp_property_type()], binary()) -> [any()]). --spec(ensure_content_decoded/1 :: (content()) -> decoded_content()). --spec(clear_decoded_content/1 :: (content()) -> undecoded_content()). +-spec(parse_table/1 :: (binary()) -> rabbit_framing:amqp_table()). +-spec(parse_properties/2 :: + ([rabbit_framing:amqp_property_type()], binary()) -> [any()]). +-spec(ensure_content_decoded/1 :: + (rabbit_types:content()) -> rabbit_types:decoded_content()). +-spec(clear_decoded_content/1 :: + (rabbit_types:content()) -> rabbit_types:undecoded_content()). -endif. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a48db9c8b3..c4db3ace73 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,20 +35,25 @@ -behaviour(gen_server2). --export([start_link/5, do/2, do/3, shutdown/1]). +-export([start_link/6, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([init/1, terminate/2, code_change/3, - handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). +-export([flow_timeout/2]). + +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, + handle_info/2, handle_pre_hibernate/1]). -record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking}). + consumer_mapping, blocking, queue_collector_pid, flow}). + +-record(flow, {server, client, pending}). -define(MAX_PERMISSION_CACHE_SIZE, 12). +-define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds -define(INFO_KEYS, [pid, @@ -66,30 +71,40 @@ -ifdef(use_specs). --spec(start_link/5 :: - (channel_number(), pid(), pid(), username(), vhost()) -> pid()). --spec(do/2 :: (pid(), amqp_method()) -> 'ok'). --spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). +-export_type([channel_number/0]). + +-type(ref() :: any()). +-type(channel_number() :: non_neg_integer()). + +-spec(start_link/6 :: + (channel_number(), pid(), pid(), rabbit_access_control:username(), + rabbit_types:vhost(), pid()) -> pid()). +-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). +-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), + rabbit_types:maybe(rabbit_types:content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). --spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). --spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok'). +-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method()) -> 'ok'). +-spec(deliver/4 :: + (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) + -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). +-spec(flow_timeout/2 :: (pid(), ref()) -> 'ok'). -spec(list/0 :: () -> [pid()]). --spec(info_keys/0 :: () -> [info_key()]). --spec(info/1 :: (pid()) -> [info()]). --spec(info/2 :: (pid(), [info_key()]) -> [info()]). --spec(info_all/0 :: () -> [[info()]]). --spec(info_all/1 :: ([info_key()]) -> [[info()]]). +-spec(info_keys/0 :: () -> [rabbit_types:info_key()]). +-spec(info/1 :: (pid()) -> [rabbit_types:info()]). +-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). +-spec(info_all/0 :: () -> [[rabbit_types:info()]]). +-spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]). -endif. %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, Username, VHost) -> +start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) -> {ok, Pid} = gen_server2:start_link( ?MODULE, [Channel, ReaderPid, WriterPid, - Username, VHost], []), + Username, VHost, CollectorPid], []), Pid. do(Pid, Method) -> @@ -113,6 +128,9 @@ conserve_memory(Pid, Conserve) -> flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). +flow_timeout(Pid, Ref) -> + gen_server2:pcast(Pid, 7, {flow_timeout, Ref}). + list() -> pg_local:get_members(rabbit_channels). @@ -135,7 +153,7 @@ info_all(Items) -> %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, Username, VHost]) -> +init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> process_flag(trap_exit, true), link(WriterPid), ok = pg_local:join(rabbit_channels, self()), @@ -153,7 +171,10 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) -> virtual_host = VHost, most_recently_declared_queue = <<>>, consumer_mapping = dict:new(), - blocking = dict:new()}, + blocking = dict:new(), + queue_collector_pid = CollectorPid, + flow = #flow{server = true, client = true, + pending = none}}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -180,11 +201,9 @@ handle_cast({method, Method, Content}, State) -> {stop, normal, State#ch{state = terminating}} catch exit:Reason = #amqp_error{} -> - ok = rollback_and_notify(State), MethodName = rabbit_misc:method_record_type(Method), - State#ch.reader_pid ! {channel_exit, State#ch.channel, - Reason#amqp_error{method = MethodName}}, - {stop, normal, State#ch{state = terminating}}; + {stop, normal, terminating(Reason#amqp_error{method = MethodName}, + State)}; exit:normal -> {stop, normal, State}; _:Reason -> @@ -208,11 +227,25 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), noreply(State1#ch{next_tag = DeliveryTag + 1}); -handle_cast({conserve_memory, Conserve}, State) -> - ok = clear_permission_cache(), - ok = rabbit_writer:send_command( - State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), - noreply(State). +handle_cast({conserve_memory, true}, State = #ch{state = starting}) -> + noreply(State); +handle_cast({conserve_memory, false}, State = #ch{state = starting}) -> + ok = rabbit_writer:send_command(State#ch.writer_pid, #'channel.open_ok'{}), + noreply(State#ch{state = running}); +handle_cast({conserve_memory, Conserve}, State = #ch{state = running}) -> + flow_control(not Conserve, State); +handle_cast({conserve_memory, _Conserve}, State) -> + noreply(State); + +handle_cast({flow_timeout, Ref}, + State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) -> + {stop, normal, terminating( + rabbit_misc:amqp_error( + precondition_failed, + "timeout waiting for channel.flow_ok{active=~w}", + [not Flow], none), State)}; +handle_cast({flow_timeout, _Ref}, State) -> + {noreply, State}. handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> @@ -253,20 +286,20 @@ return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. -return_queue_declare_ok(State, NoWait, Q) -> - NewState = State#ch{most_recently_declared_queue = - (Q#amqqueue.name)#resource.name}, +terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> + ok = rollback_and_notify(State), + Reader ! {channel_exit, Channel, Reason}, + State#ch{state = terminating}. + +return_queue_declare_ok(#resource{name = ActualName}, + NoWait, MessageCount, ConsumerCount, State) -> + NewState = State#ch{most_recently_declared_queue = ActualName}, case NoWait of true -> {noreply, NewState}; - false -> - {ok, ActualName, MessageCount, ConsumerCount} = - rabbit_misc:with_exit_handler( - fun () -> {ok, Q#amqqueue.name, 0, 0} end, - fun () -> rabbit_amqqueue:stat(Q) end), - Reply = #'queue.declare_ok'{queue = ActualName#resource.name, - message_count = MessageCount, - consumer_count = ConsumerCount}, - {reply, Reply, NewState} + false -> Reply = #'queue.declare_ok'{queue = ActualName, + message_count = MessageCount, + consumer_count = ConsumerCount}, + {reply, Reply, NewState} end. check_resource_access(Username, Resource, Perm) -> @@ -300,7 +333,7 @@ check_read_permitted(Resource, #ch{ username = Username}) -> expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( - not_allowed, "no previously declared queue", []); + not_found, "no previously declared queue", []); expand_queue_name_shortcut(<<>>, #ch{ virtual_host = VHostPath, most_recently_declared_queue = MRDQ }) -> rabbit_misc:r(VHostPath, queue, MRDQ); @@ -310,7 +343,7 @@ expand_queue_name_shortcut(QueueNameBin, #ch{ virtual_host = VHostPath }) -> expand_routing_key_shortcut(<<>>, <<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( - not_allowed, "no previously declared queue", []); + not_found, "no previously declared queue", []); expand_routing_key_shortcut(<<>>, <<>>, #ch{ most_recently_declared_queue = MRDQ }) -> MRDQ; @@ -352,8 +385,10 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> - rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), - {reply, #'channel.open_ok'{}, State#ch{state = running}}; + case rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}) of + true -> {noreply, State}; + false -> {reply, #'channel.open_ok'{}, State#ch{state = running}} + end; handle_method(#'channel.open'{}, _, _State) -> rabbit_misc:protocol_error( @@ -370,13 +405,17 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; -handle_method(#'basic.publish'{exchange = ExchangeNameBin, +handle_method(#'basic.publish'{}, _, #ch{flow = #flow{client = false}}) -> + rabbit_misc:protocol_error( + command_invalid, + "basic.publish received after channel.flow_ok{active=false}", []); +handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, - mandatory = Mandatory, - immediate = Immediate}, - Content, State = #ch{ virtual_host = VHostPath, - transaction_id = TxnKey, - writer_pid = WriterPid}) -> + mandatory = Mandatory, + immediate = Immediate}, + Content, State = #ch{virtual_host = VHostPath, + transaction_id = TxnKey, + writer_pid = WriterPid}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -394,16 +433,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, Exchange, rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), case RoutingRes of - routed -> - ok; - unroutable -> - %% FIXME: 312 should be replaced by the ?NO_ROUTE - %% definition, when we move to >=0-9 - ok = basic_return(Message, WriterPid, 312, <<"unroutable">>); - not_delivered -> - %% FIXME: 313 should be replaced by the ?NO_CONSUMERS - %% definition, when we move to >=0-9 - ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>) + routed -> ok; + unroutable -> ok = basic_return(Message, WriterPid, no_route); + not_delivered -> ok = basic_return(Message, WriterPid, no_consumers) end, {noreply, case TxnKey of none -> State; @@ -413,13 +445,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State = #ch{transaction_id = TxnKey, - next_tag = NextDeliveryTag, unacked_message_q = UAMQ}) -> - if DeliveryTag >= NextDeliveryTag -> - rabbit_misc:protocol_error( - command_invalid, "unknown delivery tag ~w", [DeliveryTag]); - true -> ok - end, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), Participants = ack(TxnKey, Acked), {noreply, case TxnKey of @@ -436,11 +462,12 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, _, State = #ch{ writer_pid = WriterPid, + reader_pid = ReaderPid, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - case rabbit_amqqueue:with_or_die( - QueueName, + case rabbit_amqqueue:with_exclusive_access_or_die( + QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, Msg = {_QName, _QPid, _MsgId, Redelivered, @@ -458,7 +485,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, Content), {noreply, State1#ch{next_tag = DeliveryTag + 1}}; empty -> - {reply, #'basic.get_empty'{cluster_id = <<>>}, State} + {reply, #'basic.get_empty'{}, State} end; handle_method(#'basic.consume'{queue = QueueNameBin, @@ -480,14 +507,14 @@ handle_method(#'basic.consume'{queue = QueueNameBin, Other -> Other end, - %% In order to ensure that the consume_ok gets sent before - %% any messages are sent to the consumer, we get the queue - %% process to send the consume_ok on our behalf. - case rabbit_amqqueue:with_or_die( - QueueName, + %% We get the queue process to send the consume_ok on our + %% behalf. This is for symmetry with basic.cancel - see + %% the comment in that method for why. + case rabbit_amqqueue:with_exclusive_access_or_die( + QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_consume( - Q, NoAck, ReaderPid, self(), LimiterPid, + Q, NoAck, self(), LimiterPid, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})) @@ -497,14 +524,6 @@ handle_method(#'basic.consume'{queue = QueueNameBin, dict:store(ActualConsumerTag, QueueName, ConsumerMapping)}}; - {error, queue_owned_by_another_connection} -> - %% The spec is silent on which exception to use - %% here. This seems reasonable? - %% FIXME: check this - - rabbit_misc:protocol_error( - resource_locked, "~s owned by another connection", - [rabbit_misc:rs(QueueName)]); {error, exclusive_consume_unavailable} -> rabbit_misc:protocol_error( access_refused, "~s in exclusive use", @@ -571,9 +590,8 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, end, {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}}; -handle_method(#'basic.recover'{requeue = true}, - _, State = #ch{ transaction_id = none, - unacked_message_q = UAMQ }) -> +handle_method(#'basic.recover_async'{requeue = true}, + _, State = #ch{ unacked_message_q = UAMQ }) -> ok = fold_per_queue( fun (QPid, MsgIds, ok) -> %% The Qpid python test suite incorrectly assumes @@ -583,12 +601,12 @@ handle_method(#'basic.recover'{requeue = true}, rabbit_amqqueue:requeue( QPid, lists:reverse(MsgIds), self()) end, ok, UAMQ), - %% No answer required, apparently! + %% No answer required - basic.recover is the newer, synchronous + %% variant of this method {noreply, State#ch{unacked_message_q = queue:new()}}; -handle_method(#'basic.recover'{requeue = false}, - _, State = #ch{ transaction_id = none, - writer_pid = WriterPid, +handle_method(#'basic.recover_async'{requeue = false}, + _, State = #ch{ writer_pid = WriterPid, unacked_message_q = UAMQ }) -> ok = rabbit_misc:queue_fold( fun ({_DeliveryTag, none, _Msg}, ok) -> @@ -608,12 +626,17 @@ handle_method(#'basic.recover'{requeue = false}, WriterPid, false, ConsumerTag, DeliveryTag, {QName, QPid, MsgId, true, Message}) end, ok, UAMQ), - %% No answer required, apparently! + %% No answer required - basic.recover is the newer, synchronous + %% variant of this method {noreply, State}; -handle_method(#'basic.recover'{}, _, _State) -> - rabbit_misc:protocol_error( - not_allowed, "attempt to recover a transactional channel",[]); +handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> + {noreply, State2 = #ch{writer_pid = WriterPid}} = + handle_method(#'basic.recover_async'{requeue = Requeue}, + Content, + State), + ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}), + {noreply, State2}; handle_method(#'exchange.declare'{exchange = ExchangeNameBin, type = TypeNameBin, @@ -644,18 +667,17 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, AutoDelete, Args) end, - ok = rabbit_exchange:assert_type(X, CheckedType), + ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable, + AutoDelete, Args), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, - type = TypeNameBin, passive = true, nowait = NoWait}, _, State = #ch{ virtual_host = VHostPath }) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_configure_permitted(ExchangeName, State), - X = rabbit_exchange:lookup_or_die(ExchangeName), - ok = rabbit_exchange:assert_type(X, rabbit_exchange:check_type(TypeNameBin)), + _ = rabbit_exchange:lookup_or_die(ExchangeName), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.delete'{exchange = ExchangeNameBin, @@ -674,73 +696,78 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, return_ok(State, NoWait, #'exchange.delete_ok'{}) end; -handle_method(#'queue.declare'{queue = QueueNameBin, - passive = false, - durable = Durable, - exclusive = ExclusiveDeclare, +handle_method(#'queue.declare'{queue = QueueNameBin, + passive = false, + durable = Durable, + exclusive = ExclusiveDeclare, auto_delete = AutoDelete, - nowait = NoWait, - arguments = Args}, - _, State = #ch { virtual_host = VHostPath, - reader_pid = ReaderPid }) -> - %% FIXME: atomic create&claim - Finish = - fun (Q) -> - if ExclusiveDeclare -> - case rabbit_amqqueue:claim_queue(Q, ReaderPid) of - locked -> - %% AMQP 0-8 doesn't say which - %% exception to use, so we mimic QPid - %% here. - rabbit_misc:protocol_error( - resource_locked, - "cannot obtain exclusive access to locked ~s", - [rabbit_misc:rs(Q#amqqueue.name)]); - ok -> ok - end; - true -> - ok - end, - Q - end, - Q = case rabbit_amqqueue:with( - rabbit_misc:r(VHostPath, queue, QueueNameBin), - Finish) of - {error, not_found} -> - ActualNameBin = - case QueueNameBin of + nowait = NoWait, + arguments = Args} = Declare, + _, State = #ch{virtual_host = VHostPath, + reader_pid = ReaderPid, + queue_collector_pid = CollectorPid}) -> + Owner = case ExclusiveDeclare of + true -> ReaderPid; + false -> none + end, + ActualNameBin = case QueueNameBin of <<>> -> rabbit_guid:binstring_guid("amq.gen"); Other -> check_name('queue', Other) end, - QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), - check_configure_permitted(QueueName, State), - Finish(rabbit_amqqueue:declare(QueueName, - Durable, AutoDelete, Args)); - Other = #amqqueue{name = QueueName} -> - check_configure_permitted(QueueName, State), - Other - end, - return_queue_declare_ok(State, NoWait, Q); + QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), + check_configure_permitted(QueueName, State), + case rabbit_amqqueue:with( + QueueName, + fun (Q) -> ok = rabbit_amqqueue:assert_equivalence( + Q, Durable, AutoDelete, Args, Owner), + rabbit_amqqueue:stat(Q) + end) of + {ok, MessageCount, ConsumerCount} -> + return_queue_declare_ok(QueueName, NoWait, MessageCount, + ConsumerCount, State); + {error, not_found} -> + case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, + Args, Owner) of + {new, Q = #amqqueue{}} -> + %% We need to notify the reader within the channel + %% process so that we can be sure there are no + %% outstanding exclusive queues being declared as + %% the connection shuts down. + ok = case Owner of + none -> ok; + _ -> rabbit_queue_collector:register(CollectorPid, Q) + end, + return_queue_declare_ok(QueueName, NoWait, 0, 0, State); + {existing, _Q} -> + %% must have been created between the stat and the + %% declare. Loop around again. + handle_method(Declare, none, State) + end + end; -handle_method(#'queue.declare'{queue = QueueNameBin, +handle_method(#'queue.declare'{queue = QueueNameBin, passive = true, - nowait = NoWait}, - _, State = #ch{ virtual_host = VHostPath }) -> + nowait = NoWait}, + _, State = #ch{virtual_host = VHostPath, + reader_pid = ReaderPid}) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), - Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end), - return_queue_declare_ok(State, NoWait, Q); + {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} = + rabbit_amqqueue:with_or_die( + QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end), + ok = rabbit_amqqueue:check_exclusive_access(Q, ReaderPid), + return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, + State); handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, if_empty = IfEmpty, - nowait = NoWait - }, - _, State) -> + nowait = NoWait}, + _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), - case rabbit_amqqueue:with_or_die( - QueueName, + case rabbit_amqqueue:with_exclusive_access_or_die( + QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> rabbit_misc:protocol_error( @@ -750,8 +777,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]); {ok, PurgedMessageCount} -> return_ok(State, NoWait, - #'queue.delete_ok'{ - message_count = PurgedMessageCount}) + #'queue.delete_ok'{message_count = PurgedMessageCount}) end; handle_method(#'queue.bind'{queue = QueueNameBin, @@ -759,7 +785,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin, routing_key = RoutingKey, nowait = NoWait, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin, + binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, NoWait, State); @@ -767,17 +793,17 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin, + binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, false, State); handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, - _, State) -> + _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die( - QueueName, + {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die( + QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:purge(Q) end), return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); @@ -790,14 +816,14 @@ handle_method(#'tx.select'{}, _, State) -> handle_method(#'tx.commit'{}, _, #ch{transaction_id = none}) -> rabbit_misc:protocol_error( - not_allowed, "channel is not transactional", []); + precondition_failed, "channel is not transactional", []); handle_method(#'tx.commit'{}, _, State) -> {reply, #'tx.commit_ok'{}, internal_commit(State)}; handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> rabbit_misc:protocol_error( - not_allowed, "channel is not transactional", []); + precondition_failed, "channel is not transactional", []); handle_method(#'tx.rollback'{}, _, State) -> {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; @@ -810,7 +836,6 @@ handle_method(#'channel.flow'{active = true}, _, end, {reply, #'channel.flow_ok'{active = true}, State#ch{limiter_pid = LimiterPid1}}; - handle_method(#'channel.flow'{active = false}, _, State = #ch{limiter_pid = LimiterPid, consumer_mapping = Consumers}) -> @@ -828,11 +853,25 @@ handle_method(#'channel.flow'{active = false}, _, blocking = dict:from_list(Queues)}} end; -handle_method(#'channel.flow_ok'{active = _}, _, State) -> - %% TODO: We may want to correlate this to channel.flow messages we - %% have sent, and complain if we get an unsolicited - %% channel.flow_ok, or the client refuses our flow request. - {noreply, State}; +handle_method(#'channel.flow_ok'{active = Active}, _, + State = #ch{flow = #flow{server = Active, client = Flow, + pending = {_Ref, TRef}} = F}) + when Flow =:= not Active -> + {ok, cancel} = timer:cancel(TRef), + {noreply, State#ch{flow = F#flow{client = Active, pending = none}}}; +handle_method(#'channel.flow_ok'{active = Active}, _, + State = #ch{flow = #flow{server = Flow, client = Flow, + pending = {_Ref, TRef}}}) + when Flow =:= not Active -> + {ok, cancel} = timer:cancel(TRef), + {noreply, issue_flow(Flow, State)}; +handle_method(#'channel.flow_ok'{}, _, #ch{flow = #flow{pending = none}}) -> + rabbit_misc:protocol_error( + command_invalid, "unsolicited channel.flow_ok", []); +handle_method(#'channel.flow_ok'{active = Active}, _, _State) -> + rabbit_misc:protocol_error( + command_invalid, + "received channel.flow_ok{active=~w} has incorrect polarity", [Active]); handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( @@ -840,8 +879,26 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- +flow_control(Active, State = #ch{flow = #flow{server = Flow, pending = none}}) + when Flow =:= not Active -> + ok = clear_permission_cache(), + noreply(issue_flow(Active, State)); +flow_control(Active, State = #ch{flow = F}) -> + noreply(State#ch{flow = F#flow{server = Active}}). + +issue_flow(Active, State) -> + ok = rabbit_writer:send_command( + State#ch.writer_pid, #'channel.flow'{active = Active}), + Ref = make_ref(), + {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout, + [self(), Ref]), + State#ch{flow = #flow{server = Active, client = not Active, + pending = {Ref, TRef}}}. + binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, - ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) -> + ReturnMethod, NoWait, + State = #ch{virtual_host = VHostPath, + reader_pid = ReaderPid}) -> %% FIXME: connection exception (!) on failure?? %% (see rule named "failure" in spec-XML) %% FIXME: don't allow binding to internal exchanges - @@ -852,7 +909,12 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), - case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of + case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, + fun (_X, Q) -> + try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) + catch exit:Reason -> {error, Reason} + end + end) of {error, exchange_not_found} -> rabbit_misc:not_found(ExchangeName); {error, queue_not_found} -> @@ -866,17 +928,17 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, not_found, "no binding ~s between ~s and ~s", [RoutingKey, rabbit_misc:rs(ExchangeName), rabbit_misc:rs(QueueName)]); - {error, durability_settings_incompatible} -> - rabbit_misc:protocol_error( - not_allowed, "durability settings of ~s incompatible with ~s", - [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]); + {error, #amqp_error{} = Error} -> + rabbit_misc:protocol_error(Error); ok -> return_ok(State, NoWait, ReturnMethod) end. basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}, - WriterPid, ReplyCode, ReplyText) -> + WriterPid, Reason) -> + {_Close, ReplyCode, ReplyText} = + rabbit_framing:lookup_amqp_exception(Reason), ok = rabbit_writer:send_command( WriterPid, #'basic.return'{reply_code = ReplyCode, @@ -904,7 +966,8 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> QTail, DeliveryTag, Multiple) end; {empty, _} -> - {ToAcc, PrefixAcc} + rabbit_misc:protocol_error( + not_found, "unknown delivery tag ~w", [DeliveryTag]) end. add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) -> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index d1834b3b73..6e6ad06cb3 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -44,7 +44,7 @@ -spec(start/0 :: () -> no_return()). -spec(stop/0 :: () -> 'ok'). --spec(action/4 :: (atom(), erlang_node(), [string()], +-spec(action/4 :: (atom(), node(), [string()], fun ((string(), [any()]) -> 'ok')) -> 'ok'). -spec(usage/0 :: () -> no_return()). @@ -59,8 +59,8 @@ start() -> parse_args(FullCommand, #params{quiet = false, node = rabbit_misc:makenode(NodeStr)}), Inform = case Quiet of - true -> fun(_Format, _Args1) -> ok end; - false -> fun(Format, Args1) -> + true -> fun (_Format, _Args1) -> ok end; + false -> fun (Format, Args1) -> io:format(Format ++ " ...~n", Args1) end end, @@ -160,6 +160,12 @@ action(cluster, Node, ClusterNodeSs, Inform) -> [Node, ClusterNodes]), rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]); +action(force_cluster, Node, ClusterNodeSs, Inform) -> + ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), + Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)", + [Node, ClusterNodes]), + rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]); + action(status, Node, [], Inform) -> Inform("Status of node ~p", [Node]), case call(Node, {rabbit, status, []}) of diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl index f19e8d025a..0ec6beb676 100644 --- a/src/rabbit_dialyzer.erl +++ b/src/rabbit_dialyzer.erl @@ -30,17 +30,17 @@ %% -module(rabbit_dialyzer). --include("rabbit.hrl"). --export([create_basic_plt/1, add_to_plt/2, dialyze_files/2, halt_with_code/1]). +-export([create_basic_plt/1, add_to_plt/2, dialyze_files/2, + halt_with_code/1]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(create_basic_plt/1 :: (file_path()) -> 'ok'). --spec(add_to_plt/2 :: (file_path(), string()) -> 'ok'). --spec(dialyze_files/2 :: (file_path(), string()) -> 'ok'). +-spec(create_basic_plt/1 :: (file:filename()) -> 'ok'). +-spec(add_to_plt/2 :: (file:filename(), string()) -> 'ok'). +-spec(dialyze_files/2 :: (file:filename(), string()) -> 'ok'). -spec(halt_with_code/1 :: (atom()) -> no_return()). -endif. diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index e9baf2c480..42861f8603 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -39,7 +39,8 @@ -export([boot/0]). --export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2, handle_info/2]). +-export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2, + handle_info/2]). boot() -> {ok, DefaultVHost} = application:get_env(default_vhost), diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index 45b66712b8..875d680f86 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -33,7 +33,8 @@ -behaviour(gen_event). --export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]). +-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, + code_change/3]). %% rabbit_error_logger_file_h is a wrapper around the error_logger_file_h %% module because the original's init/1 does not match properly diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 8f41392f83..d91ebe9ba9 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -33,13 +33,14 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([recover/0, declare/5, lookup/1, lookup_or_die/1, - list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, - publish/2]). --export([add_binding/4, delete_binding/4, list_bindings/1]). +-export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0, + info/1, info/2, info_all/1, info_all/2, publish/2]). +-export([add_binding/5, delete_binding/5, list_bindings/1]). -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). --export([check_type/1, assert_type/2]). +-export([assert_equivalence/5]). +-export([assert_args_equivalence/2]). +-export([check_type/1]). %% EXTENDED API -export([list_exchange_bindings/1]). @@ -54,55 +55,87 @@ -ifdef(use_specs). --type(bind_res() :: 'ok' | {'error', - 'queue_not_found' | - 'exchange_not_found' | - 'exchange_and_queue_not_found'}). +-export_type([name/0, type/0, binding_key/0]). + +-type(name() :: rabbit_types:r('exchange')). +-type(type() :: atom()). +-type(binding_key() :: binary()). + +-type(bind_res() :: rabbit_types:ok_or_error('queue_not_found' | + 'exchange_not_found' | + 'exchange_and_queue_not_found')). +-type(inner_fun() :: + fun((rabbit_types:exchange(), queue()) -> + rabbit_types:ok_or_error(rabbit_types:amqp_error()))). + -spec(recover/0 :: () -> 'ok'). --spec(declare/5 :: (exchange_name(), exchange_type(), boolean(), boolean(), - amqp_table()) -> exchange()). +-spec(declare/5 :: + (name(), type(), boolean(), boolean(), rabbit_framing:amqp_table()) + -> rabbit_types:exchange()). -spec(check_type/1 :: (binary()) -> atom()). --spec(assert_type/2 :: (exchange(), atom()) -> 'ok'). --spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()). --spec(lookup_or_die/1 :: (exchange_name()) -> exchange()). --spec(list/1 :: (vhost()) -> [exchange()]). --spec(info_keys/0 :: () -> [info_key()]). --spec(info/1 :: (exchange()) -> [info()]). --spec(info/2 :: (exchange(), [info_key()]) -> [info()]). --spec(info_all/1 :: (vhost()) -> [[info()]]). --spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). --spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}). --spec(add_binding/4 :: - (exchange_name(), queue_name(), routing_key(), amqp_table()) -> - bind_res() | {'error', 'durability_settings_incompatible'}). --spec(delete_binding/4 :: - (exchange_name(), queue_name(), routing_key(), amqp_table()) -> - bind_res() | {'error', 'binding_not_found'}). --spec(list_bindings/1 :: (vhost()) -> - [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). --spec(delete_queue_bindings/1 :: (queue_name()) -> fun(() -> none())). --spec(delete_transient_queue_bindings/1 :: (queue_name()) -> fun(() -> none())). --spec(delete/2 :: (exchange_name(), boolean()) -> - 'ok' | not_found() | {'error', 'in_use'}). --spec(list_queue_bindings/1 :: (queue_name()) -> - [{exchange_name(), routing_key(), amqp_table()}]). --spec(list_exchange_bindings/1 :: (exchange_name()) -> - [{queue_name(), routing_key(), amqp_table()}]). +-spec(assert_equivalence/5 :: + (rabbit_types:exchange(), atom(), boolean(), boolean(), + rabbit_framing:amqp_table()) + -> 'ok'). +-spec(assert_args_equivalence/2 :: + (rabbit_types:exchange(), rabbit_framing:amqp_table()) -> 'ok'). +-spec(lookup/1 :: + (name()) -> rabbit_types:ok(rabbit_types:exchange()) | + rabbit_types:error('not_found')). +-spec(lookup_or_die/1 :: (name()) -> rabbit_types:exchange()). +-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]). +-spec(info_keys/0 :: () -> [rabbit_types:info_key()]). +-spec(info/1 :: (rabbit_types:exchange()) -> [rabbit_types:info()]). +-spec(info/2 :: + (rabbit_types:exchange(), [rabbit_types:info_key()]) + -> [rabbit_types:info()]). +-spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]). +-spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()]) + -> [[rabbit_types:info()]]). +-spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) + -> {rabbit_router:routing_result(), [pid()]}). +-spec(add_binding/5 :: + (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(), + rabbit_framing:amqp_table(), inner_fun()) + -> bind_res()). +-spec(delete_binding/5 :: + (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(), + rabbit_framing:amqp_table(), inner_fun()) + -> bind_res() | rabbit_types:error('binding_not_found')). +-spec(list_bindings/1 :: + (rabbit_types:vhost()) + -> [{name(), rabbit_amqqueue:name(), rabbit_router:routing_key(), + rabbit_framing:amqp_table()}]). +-spec(delete_queue_bindings/1 :: + (rabbit_amqqueue:name()) -> fun (() -> none())). +-spec(delete_transient_queue_bindings/1 :: + (rabbit_amqqueue:name()) -> fun (() -> none())). +-spec(delete/2 :: + (name(), boolean())-> 'ok' | + rabbit_types:error('not_found') | + rabbit_types:error('in_use')). +-spec(list_queue_bindings/1 :: + (rabbit_amqqueue:name()) + -> [{name(), rabbit_router:routing_key(), + rabbit_framing:amqp_table()}]). +-spec(list_exchange_bindings/1 :: + (name()) -> [{rabbit_amqqueue:name(), rabbit_router:routing_key(), + rabbit_framing:amqp_table()}]). -endif. %%---------------------------------------------------------------------------- --define(INFO_KEYS, [name, type, durable, auto_delete, arguments]. +-define(INFO_KEYS, [name, type, durable, auto_delete, arguments]). recover() -> Exs = rabbit_misc:table_fold( - fun(Exchange, Acc) -> + fun (Exchange, Acc) -> ok = mnesia:write(rabbit_exchange, Exchange, write), [Exchange | Acc] end, [], rabbit_durable_exchange), Bs = rabbit_misc:table_fold( - fun(Route = #route{binding = B}, Acc) -> + fun (Route = #route{binding = B}, Acc) -> {_, ReverseRoute} = route_with_reverse(Route), ok = mnesia:write(rabbit_route, Route, write), @@ -182,13 +215,36 @@ check_type(TypeBin) -> T end. -assert_type(#exchange{ type = ActualType }, RequiredType) - when ActualType == RequiredType -> - ok; -assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) -> +assert_equivalence(X = #exchange{ durable = Durable, + auto_delete = AutoDelete, + type = Type}, + Type, Durable, AutoDelete, + RequiredArgs) -> + ok = (type_to_module(Type)):assert_args_equivalence(X, RequiredArgs); +assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete, + _Args) -> rabbit_misc:protocol_error( - not_allowed, "cannot redeclare ~s of type '~s' with type '~s'", - [rabbit_misc:rs(Name), ActualType, RequiredType]). + not_allowed, + "cannot redeclare ~s with different type, durable or autodelete value", + [rabbit_misc:rs(Name)]). + +alternate_exchange_value(Args) -> + lists:keysearch(<<"alternate-exchange">>, 1, Args). + +assert_args_equivalence(#exchange{ name = Name, + arguments = Args }, + RequiredArgs) -> + %% The spec says "Arguments are compared for semantic + %% equivalence". The only arg we care about is + %% "alternate-exchange". + Ae1 = alternate_exchange_value(RequiredArgs), + Ae2 = alternate_exchange_value(Args), + if Ae1==Ae2 -> ok; + true -> rabbit_misc:protocol_error( + not_allowed, + "cannot redeclare ~s with inequivalent args", + [rabbit_misc:rs(Name)]) + end. lookup(Name) -> rabbit_misc:dirty_read({rabbit_exchange, Name}). @@ -305,7 +361,7 @@ delete_queue_bindings(QueueName, FwdDeleteFun) -> Module = type_to_module(Type), case IsDeleted of auto_deleted -> Module:delete(X, Bs); - no_delete -> Module:remove_bindings(X, Bs) + not_deleted -> Module:remove_bindings(X, Bs) end end, Cleanup) end. @@ -349,7 +405,7 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)). call_with_exchange(Exchange, Fun) -> rabbit_misc:execute_mnesia_transaction( - fun() -> case mnesia:read({rabbit_exchange, Exchange}) of + fun () -> case mnesia:read({rabbit_exchange, Exchange}) of [] -> {error, not_found}; [X] -> Fun(X) end @@ -357,7 +413,7 @@ call_with_exchange(Exchange, Fun) -> call_with_exchange_and_queue(Exchange, Queue, Fun) -> rabbit_misc:execute_mnesia_transaction( - fun() -> case {mnesia:read({rabbit_exchange, Exchange}), + fun () -> case {mnesia:read({rabbit_exchange, Exchange}), mnesia:read({rabbit_queue, Queue})} of {[X], [Q]} -> Fun(X, Q); {[ ], [_]} -> {error, exchange_not_found}; @@ -366,50 +422,66 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) -> end end). -add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> +add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> case binding_action( ExchangeName, QueueName, RoutingKey, Arguments, fun (X, Q, B) -> - if Q#amqqueue.durable and not(X#exchange.durable) -> - {error, durability_settings_incompatible}; - true -> + %% this argument is used to check queue exclusivity; + %% in general, we want to fail on that in preference to + %% anything else + case InnerFun(X, Q) of + ok -> case mnesia:read({rabbit_route, B}) of [] -> - sync_binding(B, Q#amqqueue.durable, - fun mnesia:write/3), + ok = sync_binding(B, + X#exchange.durable andalso + Q#amqqueue.durable, + fun mnesia:write/3), {new, X, B}; [_R] -> {existing, X, B} - end + end; + {error, _} = E -> + E end end) of {new, Exchange = #exchange{ type = Type }, Binding} -> (type_to_module(Type)):add_binding(Exchange, Binding); {existing, _, _} -> ok; - Err = {error, _} -> + {error, _} = Err -> Err end. -delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> +delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> case binding_action( ExchangeName, QueueName, RoutingKey, Arguments, fun (X, Q, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of - [] -> {error, binding_not_found}; - _ -> ok = sync_binding(B, Q#amqqueue.durable, - fun mnesia:delete_object/3), - {maybe_auto_delete(X), B} + [] -> + {error, binding_not_found}; + _ -> + case InnerFun(X, Q) of + ok -> + ok = + sync_binding(B, + X#exchange.durable andalso + Q#amqqueue.durable, + fun mnesia:delete_object/3), + {maybe_auto_delete(X), B}; + {error, _} = E -> + E + end end end) of - Err = {error, _} -> + {error, _} = Err -> Err; - {{Action, X = #exchange{ type = Type }}, B} -> + {{IsDeleted, X = #exchange{ type = Type }}, B} -> Module = type_to_module(Type), - case Action of - auto_delete -> Module:delete(X, [B]); - no_delete -> Module:remove_bindings(X, [B]) + case IsDeleted of + auto_deleted -> Module:delete(X, [B]); + not_deleted -> Module:remove_bindings(X, [B]) end end. @@ -493,10 +565,10 @@ delete(ExchangeName, IfUnused) -> end. maybe_auto_delete(Exchange = #exchange{auto_delete = false}) -> - {no_delete, Exchange}; + {not_deleted, Exchange}; maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> case conditional_delete(Exchange) of - {error, in_use} -> {no_delete, Exchange}; + {error, in_use} -> {not_deleted, Exchange}; {deleted, Exchange, []} -> {auto_deleted, Exchange} end. diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index a8c071e681..85760edce4 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -54,7 +54,11 @@ behaviour_info(callbacks) -> {add_binding, 2}, %% called after bindings have been deleted. - {remove_bindings, 2} + {remove_bindings, 2}, + + %% called when comparing exchanges for equivalence - should return ok or + %% exit with #amqp_error{} + {assert_args_equivalence, 2} ]; behaviour_info(_Other) -> diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 9b71e0e1d1..4f6eb85199 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -36,7 +36,7 @@ -export([description/0, publish/2]). -export([validate/1, create/1, recover/2, delete/2, - add_binding/2, remove_bindings/2]). + add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -61,3 +61,5 @@ recover(_X, _Bs) -> ok. delete(_X, _Bs) -> ok. add_binding(_X, _B) -> ok. remove_bindings(_X, _Bs) -> ok. +assert_args_equivalence(X, Args) -> + rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 311654ab21..94798c78fe 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -35,8 +35,8 @@ -behaviour(rabbit_exchange_type). -export([description/0, publish/2]). --export([validate/1, create/1, recover/2, delete/2, - add_binding/2, remove_bindings/2]). +-export([validate/1, create/1, recover/2, delete/2, add_binding/2, + remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -59,3 +59,5 @@ recover(_X, _Bs) -> ok. delete(_X, _Bs) -> ok. add_binding(_X, _B) -> ok. remove_bindings(_X, _Bs) -> ok. +assert_args_equivalence(X, Args) -> + rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 285dab1a03..44607398cb 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -36,8 +36,8 @@ -behaviour(rabbit_exchange_type). -export([description/0, publish/2]). --export([validate/1, create/1, recover/2, delete/2, - add_binding/2, remove_bindings/2]). +-export([validate/1, create/1, recover/2, delete/2, add_binding/2, + remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -48,7 +48,8 @@ {enables, kernel_ready}]}). -ifdef(use_specs). --spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()). +-spec(headers_match/2 :: (rabbit_framing:amqp_table(), + rabbit_framing:amqp_table()) -> boolean()). -endif. description() -> @@ -135,3 +136,5 @@ recover(_X, _Bs) -> ok. delete(_X, _Bs) -> ok. add_binding(_X, _B) -> ok. remove_bindings(_X, _Bs) -> ok. +assert_args_equivalence(X, Args) -> + rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_exchange_type_registry.erl index 175d15ad83..7906fbee72 100644 --- a/src/rabbit_exchange_type_registry.erl +++ b/src/rabbit_exchange_type_registry.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -35,8 +35,8 @@ -export([start_link/0]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). -export([register/2, binary_to_type/1, lookup_module/1]). @@ -45,10 +45,13 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> 'ignore' | {'error', term()} | {'ok', pid()}). +-spec(start_link/0 :: + () -> 'ignore' | rabbit_types:ok_or_error2(pid(), term())). -spec(register/2 :: (binary(), atom()) -> 'ok'). --spec(binary_to_type/1 :: (binary()) -> atom() | {'error', 'not_found'}). --spec(lookup_module/1 :: (atom()) -> {'ok', atom()} | {'error', 'not_found'}). +-spec(binary_to_type/1 :: + (binary()) -> atom() | rabbit_types:error('not_found')). +-spec(lookup_module/1 :: + (atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')). -endif. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 8a3dceeaeb..a374cfee7f 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -35,8 +35,8 @@ -behaviour(rabbit_exchange_type). -export([description/0, publish/2]). --export([validate/1, create/1, recover/2, delete/2, - add_binding/2, remove_bindings/2]). +-export([validate/1, create/1, recover/2, delete/2, add_binding/2, + remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -49,7 +49,9 @@ -export([topic_matches/2]). -ifdef(use_specs). + -spec(topic_matches/2 :: (binary(), binary()) -> boolean()). + -endif. description() -> @@ -99,3 +101,5 @@ recover(_X, _Bs) -> ok. delete(_X, _Bs) -> ok. add_binding(_X, _B) -> ok. remove_bindings(_X, _Bs) -> ok. +assert_args_equivalence(X, Args) -> + rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index b7c6aa96fa..bc1a2a0835 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -76,29 +76,27 @@ mainloop(ChannelPid) -> {method, MethodName, FieldsBin} = read_frame(ChannelPid), Method = rabbit_framing:decode_method_fields(MethodName, FieldsBin), case rabbit_framing:method_has_content(MethodName) of - true -> rabbit_channel:do(ChannelPid, Method, - collect_content(ChannelPid, MethodName)); + true -> {ClassId, _MethodId} = rabbit_framing:method_id(MethodName), + rabbit_channel:do(ChannelPid, Method, + collect_content(ChannelPid, ClassId)); false -> rabbit_channel:do(ChannelPid, Method) end, ?MODULE:mainloop(ChannelPid). -collect_content(ChannelPid, MethodName) -> - {ClassId, _MethodId} = rabbit_framing:method_id(MethodName), +collect_content(ChannelPid, ClassId) -> case read_frame(ChannelPid) of - {content_header, HeaderClassId, 0, BodySize, PropertiesBin} -> - if HeaderClassId == ClassId -> - Payload = collect_content_payload(ChannelPid, BodySize, []), - #content{class_id = ClassId, - properties = none, - properties_bin = PropertiesBin, - payload_fragments_rev = Payload}; - true -> - rabbit_misc:protocol_error( - command_invalid, - "expected content header for class ~w, " - "got one for class ~w instead", - [ClassId, HeaderClassId]) - end; + {content_header, ClassId, 0, BodySize, PropertiesBin} -> + Payload = collect_content_payload(ChannelPid, BodySize, []), + #content{class_id = ClassId, + properties = none, + properties_bin = PropertiesBin, + payload_fragments_rev = Payload}; + {content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} -> + rabbit_misc:protocol_error( + command_invalid, + "expected content header for class ~w, " + "got one for class ~w instead", + [ClassId, HeaderClassId]); _ -> rabbit_misc:protocol_error( command_invalid, diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 1ae8f7dac4..af1c629f41 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -31,15 +31,13 @@ -module(rabbit_guid). --include("rabbit.hrl"). - -behaviour(gen_server). -export([start_link/0]). -export([guid/0, string_guid/1, binstring_guid/1]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). -define(SERVER, ?MODULE). -define(SERIAL_FILENAME, "rabbit_serial"). @@ -50,7 +48,11 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-export_type([guid/0]). + +-type(guid() :: binary()). + +-spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())). -spec(guid/0 :: () -> guid()). -spec(string_guid/1 :: (any()) -> string()). -spec(binstring_guid/1 :: (any()) -> binary()). diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index b4fd91560f..8214b976c4 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -43,16 +43,16 @@ -include("rabbit.hrl"). --record(iv_state, { queue, qname, len, pending_ack }). +-record(iv_state, { queue, qname, durable, len, pending_ack }). -record(tx, { pending_messages, pending_acks, is_persistent }). -ifdef(use_specs). --type(ack() :: guid() | 'blank_ack'). +-type(ack() :: rabbit_guid:guid() | 'blank_ack'). -type(state() :: #iv_state { queue :: queue(), - qname :: queue_name(), + qname :: rabbit_amqqueue:name(), len :: non_neg_integer(), - pending_ack :: dict() + pending_ack :: dict:dictionary() }). -include("rabbit_backing_queue_spec.hrl"). @@ -66,18 +66,23 @@ init(QName, IsDurable, Recover) -> true -> rabbit_persister:queue_content(QName); false -> [] end), - #iv_state { queue = Q, qname = QName, len = queue:len(Q), + #iv_state { queue = Q, + qname = QName, + durable = IsDurable, + len = queue:len(Q), pending_ack = dict:new() }. terminate(State) -> State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }. -delete_and_terminate(State = #iv_state { qname = QName, pending_ack = PA }) -> - ok = persist_acks(none, QName, dict:fetch_keys(PA), PA), +delete_and_terminate(State = #iv_state { qname = QName, durable = IsDurable, + pending_ack = PA }) -> + ok = persist_acks(QName, IsDurable, none, dict:fetch_keys(PA), PA), {_PLen, State1} = purge(State), terminate(State1). -purge(State = #iv_state { len = Len, queue = Q, qname = QName }) -> +purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, + len = Len }) -> %% We do not purge messages pending acks. {AckTags, PA} = rabbit_misc:queue_fold( @@ -85,57 +90,63 @@ purge(State = #iv_state { len = Len, queue = Q, qname = QName }) -> Acc; ({Msg = #basic_message { guid = Guid }, IsDelivered}, {AckTagsN, PAN}) -> - ok = persist_delivery(QName, Msg, IsDelivered), + ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), {[Guid | AckTagsN], dict:store(Guid, Msg, PAN)} end, {[], dict:new()}, Q), - ok = persist_acks(none, QName, AckTags, PA), + ok = persist_acks(QName, IsDurable, none, AckTags, PA), {Len, State #iv_state { len = 0, queue = queue:new() }}. -publish(Msg, State = #iv_state { queue = Q, qname = QName, len = Len }) -> - ok = persist_message(none, QName, Msg), +publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable, + len = Len }) -> + ok = persist_message(QName, IsDurable, none, Msg), State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }. publish_delivered(false, _Msg, State) -> {blank_ack, State}; publish_delivered(true, Msg = #basic_message { guid = Guid }, - State = #iv_state { qname = QName, len = 0, - pending_ack = PA }) -> - ok = persist_message(none, QName, Msg), - ok = persist_delivery(QName, Msg, false), + State = #iv_state { qname = QName, durable = IsDurable, + len = 0, pending_ack = PA }) -> + ok = persist_message(QName, IsDurable, none, Msg), + ok = persist_delivery(QName, IsDurable, false, Msg), {Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}. fetch(_AckRequired, State = #iv_state { len = 0 }) -> {empty, State}; -fetch(AckRequired, State = #iv_state { queue = Q, qname = QName, len = Len, +fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName, + durable = IsDurable, pending_ack = PA }) -> {{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} = queue:out(Q), Len1 = Len - 1, - ok = persist_delivery(QName, Msg, IsDelivered), + ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), PA1 = dict:store(Guid, Msg, PA), {AckTag, PA2} = case AckRequired of true -> {Guid, PA1}; - false -> ok = persist_acks(none, QName, [Guid], PA1), + false -> ok = persist_acks(QName, IsDurable, none, + [Guid], PA1), {blank_ack, PA} end, {{Msg, IsDelivered, AckTag, Len1}, State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}. -ack(AckTags, State = #iv_state { qname = QName, pending_ack = PA }) -> - ok = persist_acks(none, QName, AckTags, PA), +ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable, + pending_ack = PA }) -> + ok = persist_acks(QName, IsDurable, none, AckTags, PA), PA1 = remove_acks(AckTags, PA), State #iv_state { pending_ack = PA1 }. -tx_publish(Txn, Msg, State = #iv_state { qname = QName }) -> +tx_publish(Txn, Msg, State = #iv_state { qname = QName, + durable = IsDurable }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), - ok = persist_message(Txn, QName, Msg), + ok = persist_message(QName, IsDurable, Txn, Msg), State. -tx_ack(Txn, AckTags, State = #iv_state { qname = QName, pending_ack = PA }) -> +tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable, + pending_ack = PA }) -> Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), - ok = persist_acks(Txn, QName, AckTags, PA), + ok = persist_acks(QName, IsDurable, Txn, AckTags, PA), State. tx_rollback(Txn, State = #iv_state { qname = QName }) -> @@ -228,32 +239,32 @@ do_if_persistent(F, Txn, QName) -> %%---------------------------------------------------------------------------- -persist_message(_Txn, _QName, #basic_message { is_persistent = false }) -> - ok; -persist_message(Txn, QName, Msg) -> +persist_message(QName, true, Txn, Msg = #basic_message { + is_persistent = true }) -> Msg1 = Msg #basic_message { - %% don't persist any recoverable decoded properties, - %% rebuild from properties_bin on restore + %% don't persist any recoverable decoded properties content = rabbit_binary_parser:clear_decoded_content( Msg #basic_message.content)}, persist_work(Txn, QName, - [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]). + [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]); +persist_message(_QName, _IsDurable, _Txn, _Msg) -> + ok. -persist_delivery(_QName, #basic_message { is_persistent = false }, - _IsDelivered) -> - ok; -persist_delivery(_QName, _Message, true) -> - ok; -persist_delivery(QName, #basic_message { guid = Guid }, _IsDelivered) -> - persist_work(none, QName, [{deliver, {QName, Guid}}]). +persist_delivery(QName, true, false, #basic_message { is_persistent = true, + guid = Guid }) -> + persist_work(none, QName, [{deliver, {QName, Guid}}]); +persist_delivery(_QName, _IsDurable, _IsDelivered, _Msg) -> + ok. -persist_acks(Txn, QName, AckTags, PA) -> +persist_acks(QName, true, Txn, AckTags, PA) -> persist_work(Txn, QName, [{ack, {QName, Guid}} || Guid <- AckTags, begin {ok, Msg} = dict:find(Guid, PA), Msg #basic_message.is_persistent - end]). + end]); +persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) -> + ok. persist_work(_Txn,_QName, []) -> ok; diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl index 4f467162e4..e0457b1e43 100644 --- a/src/rabbit_load.erl +++ b/src/rabbit_load.erl @@ -40,11 +40,10 @@ -ifdef(use_specs). --type(erlang_node() :: atom()). --type(load() :: {{non_neg_integer(), integer() | 'unknown'}, erlang_node()}). +-type(load() :: {{non_neg_integer(), integer() | 'unknown'}, node()}). -spec(local_load/0 :: () -> load()). -spec(remote_loads/0 :: () -> [load()]). --spec(pick/0 :: () -> erlang_node()). +-spec(pick/0 :: () -> node()). -endif. diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index cc80e360ae..85bcbca04a 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -50,7 +50,7 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())). -spec(debug/1 :: (string()) -> 'ok'). -spec(debug/2 :: (string(), [any()]) -> 'ok'). -spec(info/1 :: (string()) -> 'ok'). diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 91e97ffe49..bdf3807531 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -86,11 +86,12 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}). +-spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())). -spec(update/0 :: () -> 'ok'). -spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok'). -spec(deregister/1 :: (pid()) -> 'ok'). --spec(report_ram_duration/2 :: (pid(), float() | 'infinity') -> number()). +-spec(report_ram_duration/2 :: + (pid(), float() | 'infinity') -> number() | 'infinity'). -spec(stop/0 :: () -> 'ok'). -endif. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 723b818b41..fcc9fc7e54 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -32,11 +32,12 @@ -module(rabbit_misc). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). + -include_lib("kernel/include/file.hrl"). -export([method_record_type/1, polite_pause/0, polite_pause/1]). -export([die/1, frame_error/2, amqp_error/4, - protocol_error/3, protocol_error/4]). + protocol_error/3, protocol_error/4, protocol_error/1]). -export([not_found/1]). -export([get_config/1, get_config/2, set_config/2]). -export([dirty_read/1]). @@ -71,61 +72,84 @@ -ifdef(use_specs). --include_lib("kernel/include/inet.hrl"). +-export_type([resource_name/0]). --type(ok_or_error() :: 'ok' | {'error', any()}). +-type(ok_or_error() :: rabbit_types:ok_or_error(any())). +-type(thunk(T) :: fun(() -> T)). +-type(resource_name() :: binary()). --spec(method_record_type/1 :: (tuple()) -> atom()). +-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record()) + -> rabbit_framing:amqp_method_name()). -spec(polite_pause/0 :: () -> 'done'). -spec(polite_pause/1 :: (non_neg_integer()) -> 'done'). --spec(die/1 :: (atom()) -> no_return()). --spec(frame_error/2 :: (atom(), binary()) -> no_return()). --spec(amqp_error/4 :: (atom(), string(), [any()], atom()) -> amqp_error()). --spec(protocol_error/3 :: (atom(), string(), [any()]) -> no_return()). --spec(protocol_error/4 :: (atom(), string(), [any()], atom()) -> no_return()). --spec(not_found/1 :: (r(atom())) -> no_return()). --spec(get_config/1 :: (atom()) -> {'ok', any()} | not_found()). +-spec(die/1 :: (rabbit_framing:amqp_exception()) -> no_return()). +-spec(frame_error/2 :: (rabbit_framing:amqp_method_name(), binary()) + -> no_return()). +-spec(amqp_error/4 :: + (rabbit_framing:amqp_exception(), string(), [any()], + rabbit_framing:amqp_method_name()) + -> rabbit_types:amqp_error()). +-spec(protocol_error/3 :: (rabbit_framing:amqp_exception(), string(), [any()]) + -> no_return()). +-spec(protocol_error/4 :: + (rabbit_framing:amqp_exception(), string(), [any()], + rabbit_framing:amqp_method_name()) + -> no_return()). +-spec(protocol_error/1 :: (rabbit_types:amqp_error()) -> no_return()). +-spec(not_found/1 :: (rabbit_types:r(atom())) -> no_return()). +-spec(get_config/1 :: + (atom()) -> rabbit_types:ok_or_error2(any(), 'not_found')). -spec(get_config/2 :: (atom(), A) -> A). -spec(set_config/2 :: (atom(), any()) -> 'ok'). --spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()). --spec(r/3 :: (vhost() | r(atom()), K, resource_name()) -> - r(K) when is_subtype(K, atom())). --spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(), - kind :: K, - name :: '_'} - when is_subtype(K, atom())). --spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) -> - undefined | r(K) when is_subtype(K, atom())). --spec(rs/1 :: (r(atom())) -> string()). +-spec(dirty_read/1 :: + ({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')). +-spec(r/2 :: (rabbit_types:vhost(), K) + -> rabbit_types:r3(rabbit_types:vhost(), K, '_') + when is_subtype(K, atom())). +-spec(r/3 :: + (rabbit_types:vhost() | rabbit_types:r(atom()), K, resource_name()) + -> rabbit_types:r3(rabbit_types:vhost(), K, resource_name()) + when is_subtype(K, atom())). +-spec(r_arg/4 :: + (rabbit_types:vhost() | rabbit_types:r(atom()), K, + rabbit_framing:amqp_table(), binary()) + -> undefined | rabbit_types:r(K) + when is_subtype(K, atom())). +-spec(rs/1 :: (rabbit_types:r(atom())) -> string()). -spec(enable_cover/0 :: () -> ok_or_error()). -spec(start_cover/1 :: ([{string(), string()} | string()]) -> 'ok'). -spec(report_cover/0 :: () -> 'ok'). --spec(enable_cover/1 :: (file_path()) -> ok_or_error()). --spec(report_cover/1 :: (file_path()) -> 'ok'). +-spec(enable_cover/1 :: (file:filename()) -> ok_or_error()). +-spec(report_cover/1 :: (file:filename()) -> 'ok'). -spec(throw_on_error/2 :: - (atom(), thunk({error, any()} | {ok, A} | A)) -> A). + (atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). -spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]). --spec(with_user/2 :: (username(), thunk(A)) -> A). --spec(with_vhost/2 :: (vhost(), thunk(A)) -> A). --spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A). +-spec(with_user/2 :: (rabbit_access_control:username(), thunk(A)) -> A). +-spec(with_vhost/2 :: (rabbit_types:vhost(), thunk(A)) -> A). +-spec(with_user_and_vhost/3 :: + (rabbit_access_control:username(), rabbit_types:vhost(), thunk(A)) + -> A). -spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). -spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok'). --spec(makenode/1 :: ({string(), string()} | string()) -> erlang_node()). --spec(nodeparts/1 :: (erlang_node() | string()) -> {string(), string()}). +-spec(makenode/1 :: ({string(), string()} | string()) -> node()). +-spec(nodeparts/1 :: (node() | string()) -> {string(), string()}). -spec(cookie_hash/0 :: () -> string()). --spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()). +-spec(tcp_name/3 :: + (atom(), inet:ip_address(), rabbit_networking:ip_port()) + -> atom()). -spec(intersperse/2 :: (A, [A]) -> [A]). -spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(table_fold/3 :: (fun ((any(), A) -> A), A, atom()) -> A). -spec(dirty_read_all/1 :: (atom()) -> [any()]). --spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> - 'ok' | 'aborted'). --spec(dirty_dump_log/1 :: (file_path()) -> ok_or_error()). --spec(read_term_file/1 :: (file_path()) -> {'ok', [any()]} | {'error', any()}). --spec(write_term_file/2 :: (file_path(), [any()]) -> ok_or_error()). --spec(append_file/2 :: (file_path(), string()) -> ok_or_error()). +-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) + -> 'ok' | 'aborted'). +-spec(dirty_dump_log/1 :: (file:filename()) -> ok_or_error()). +-spec(read_term_file/1 :: + (file:filename()) -> {'ok', [any()]} | rabbit_types:error(any())). +-spec(write_term_file/2 :: (file:filename(), [any()]) -> ok_or_error()). +-spec(append_file/2 :: (file:filename(), string()) -> ok_or_error()). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(start_applications/1 :: ([atom()]) -> 'ok'). @@ -133,15 +157,18 @@ -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). -spec(ceil/1 :: (number()) -> integer()). -spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B). --spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()). +-spec(sort_field_table/1 :: + (rabbit_framing:amqp_table()) -> rabbit_framing:amqp_table()). -spec(pid_to_string/1 :: (pid()) -> string()). -spec(string_to_pid/1 :: (string()) -> pid()). -spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt'). --spec(version_compare/3 :: (string(), string(), - ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) -> boolean()). --spec(recursive_delete/1 :: ([file_path()]) -> - 'ok' | {'error', {file_path(), any()}}). --spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). +-spec(version_compare/3 :: + (string(), string(), ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) + -> boolean()). +-spec(recursive_delete/1 :: + ([file:filename()]) + -> rabbit_types:ok_or_error({file:filename(), any()})). +-spec(dict_cons/3 :: (any(), any(), dict:dictionary()) -> dict:dictionary()). -spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok'). -endif. @@ -173,7 +200,10 @@ protocol_error(Name, ExplanationFormat, Params) -> protocol_error(Name, ExplanationFormat, Params, none). protocol_error(Name, ExplanationFormat, Params, Method) -> - exit(amqp_error(Name, ExplanationFormat, Params, Method)). + protocol_error(amqp_error(Name, ExplanationFormat, Params, Method)). + +protocol_error(#amqp_error{} = Error) -> + exit(Error). not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]). @@ -242,12 +272,12 @@ report_cover([Root]) when is_atom(Root) -> report_cover(Root) -> Dir = filename:join(Root, "cover"), ok = filelib:ensure_dir(filename:join(Dir,"junk")), - lists:foreach(fun(F) -> file:delete(F) end, + lists:foreach(fun (F) -> file:delete(F) end, filelib:wildcard(filename:join(Dir, "*.html"))), {ok, SummaryFile} = file:open(filename:join(Dir, "summary.txt"), [write]), {CT, NCT} = lists:foldl( - fun(M,{CovTot, NotCovTot}) -> + fun (M,{CovTot, NotCovTot}) -> {ok, {M, {Cov, NotCov}}} = cover:analyze(M, module), ok = report_coverage_percentage(SummaryFile, Cov, NotCov, M), @@ -367,7 +397,7 @@ upmap(F, L) -> Parent = self(), Ref = make_ref(), [receive {Ref, Result} -> Result end - || _ <- [spawn(fun() -> Parent ! {Ref, F(X)} end) || X <- L]]. + || _ <- [spawn(fun () -> Parent ! {Ref, F(X)} end) || X <- L]]. map_in_order(F, L) -> lists:reverse( @@ -537,18 +567,24 @@ pid_to_string(Pid) when is_pid(Pid) -> %% inverse of above string_to_pid(Str) -> + Err = {error, {invalid_pid_syntax, Str}}, %% The \ before the trailing $ is only there to keep emacs %% font-lock from getting confused. case re:run(Str, "^<(.*)\\.([0-9]+)\\.([0-9]+)>\$", [{capture,all_but_first,list}]) of {match, [NodeStr, IdStr, SerStr]} -> - %% turn the triple into a pid - see pid_to_string - <<131,NodeEnc/binary>> = term_to_binary(list_to_atom(NodeStr)), + %% the NodeStr atom might be quoted, so we have to parse + %% it rather than doing a simple list_to_atom + NodeAtom = case erl_scan:string(NodeStr) of + {ok, [{atom, _, X}], _} -> X; + {error, _, _} -> throw(Err) + end, + <<131,NodeEnc/binary>> = term_to_binary(NodeAtom), Id = list_to_integer(IdStr), Ser = list_to_integer(SerStr), binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>); nomatch -> - throw({error, {invalid_pid_syntax, Str}}) + throw(Err) end. version_compare(A, B, lte) -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 55a6761d2d..e2b6927f8c 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -29,11 +29,12 @@ %% Contributor(s): ______________________________________. %% + -module(rabbit_mnesia). -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, - cluster/1, reset/0, force_reset/0, is_clustered/0, - empty_ram_only_tables/0]). + cluster/1, force_cluster/1, reset/0, force_reset/0, + is_clustered/0, empty_ram_only_tables/0]). -export([table_names/0]). @@ -47,12 +48,18 @@ -ifdef(use_specs). --spec(status/0 :: () -> [{'nodes' | 'running_nodes', [erlang_node()]}]). --spec(dir/0 :: () -> file_path()). +-export_type([node_type/0]). + +-type(node_type() :: disc_only | disc | ram | unknown). +-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} | + {'running_nodes', [node()]}]). +-spec(dir/0 :: () -> file:filename()). -spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). -spec(is_db_empty/0 :: () -> boolean()). --spec(cluster/1 :: ([erlang_node()]) -> 'ok'). +-spec(cluster/1 :: ([node()]) -> 'ok'). +-spec(force_cluster/1 :: ([node()]) -> 'ok'). +-spec(cluster/2 :: ([node()], boolean()) -> 'ok'). -spec(reset/0 :: () -> 'ok'). -spec(force_reset/0 :: () -> 'ok'). -spec(is_clustered/0 :: () -> boolean()). @@ -64,13 +71,26 @@ %%---------------------------------------------------------------------------- status() -> - [{nodes, mnesia:system_info(db_nodes)}, + [{nodes, case mnesia:system_info(is_running) of + yes -> [{Key, Nodes} || + {Key, CopyType} <- [{disc_only, disc_only_copies}, + {disc, disc_copies}, + {ram, ram_copies}], + begin + Nodes = mnesia:table_info(schema, CopyType), + Nodes =/= [] + end]; + no -> case mnesia:system_info(db_nodes) of + [] -> []; + Nodes -> [{unknown, Nodes}] + end + end}, {running_nodes, mnesia:system_info(running_db_nodes)}]. init() -> ok = ensure_mnesia_running(), ok = ensure_mnesia_dir(), - ok = init_db(read_cluster_nodes_config()), + ok = init_db(read_cluster_nodes_config(), true), ok = wait_for_tables(), ok. @@ -78,16 +98,22 @@ is_db_empty() -> lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, table_names()). +cluster(ClusterNodes) -> + cluster(ClusterNodes, false). +force_cluster(ClusterNodes) -> + cluster(ClusterNodes, true). + %% Alter which disk nodes this node is clustered with. This can be a %% subset of all the disk nodes in the cluster but can (and should) %% include the node itself if it is to be a disk rather than a ram -%% node. -cluster(ClusterNodes) -> +%% node. If Force is false, only connections to online nodes are +%% allowed. +cluster(ClusterNodes, Force) -> ok = ensure_mnesia_not_running(), ok = ensure_mnesia_dir(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), try - ok = init_db(ClusterNodes), + ok = init_db(ClusterNodes, Force), ok = wait_for_tables(), ok = create_cluster_nodes_config(ClusterNodes) after @@ -259,38 +285,56 @@ delete_cluster_nodes_config() -> %% Take a cluster node config and create the right kind of node - a %% standalone disk node, or disk or ram node connected to the -%% specified cluster nodes. -init_db(ClusterNodes) -> - case mnesia:change_config(extra_db_nodes, ClusterNodes -- [node()]) of - {ok, []} -> - case mnesia:system_info(use_dir) of - true -> - case check_schema_integrity() of - ok -> - ok; - {error, Reason} -> - %% NB: we cannot use rabbit_log here since - %% it may not have been started yet - error_logger:warning_msg( - "schema integrity check failed: ~p~n" - "moving database to backup location " - "and recreating schema from scratch~n", - [Reason]), - ok = move_db(), +%% specified cluster nodes. If Force is false, don't allow +%% connections to offline nodes. +init_db(ClusterNodes, Force) -> + UClusterNodes = lists:usort(ClusterNodes), + ProperClusterNodes = UClusterNodes -- [node()], + case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of + {ok, Nodes} -> + case Force of + false -> + FailedClusterNodes = ProperClusterNodes -- Nodes, + case FailedClusterNodes of + [] -> ok; + _ -> + throw({error, {failed_to_cluster_with, + FailedClusterNodes, + "Mnesia could not connect to some nodes."}}) + end; + _ -> ok + end, + case Nodes of + [] -> + case mnesia:system_info(use_dir) of + true -> + case check_schema_integrity() of + ok -> + ok; + {error, Reason} -> + %% NB: we cannot use rabbit_log here since + %% it may not have been started yet + error_logger:warning_msg( + "schema integrity check failed: ~p~n" + "moving database to backup location " + "and recreating schema from scratch~n", + [Reason]), + ok = move_db(), + ok = create_schema() + end; + false -> ok = create_schema() end; - false -> - ok = create_schema() - end; - {ok, [_|_]} -> - IsDiskNode = ClusterNodes == [] orelse - lists:member(node(), ClusterNodes), - ok = wait_for_replicated_tables(), - ok = create_local_table_copy(schema, disc_copies), - ok = create_local_table_copies(case IsDiskNode of - true -> disc; - false -> ram - end); + [_|_] -> + IsDiskNode = ClusterNodes == [] orelse + lists:member(node(), ClusterNodes), + ok = wait_for_replicated_tables(), + ok = create_local_table_copy(schema, disc_copies), + ok = create_local_table_copies(case IsDiskNode of + true -> disc; + false -> ram + end) + end; {error, Reason} -> %% one reason we may end up here is if we try to join %% nodes together that are currently running standalone or @@ -346,7 +390,7 @@ table_has_copy_type(TabDef, DiscType) -> create_local_table_copies(Type) -> lists:foreach( - fun({Tab, TabDef}) -> + fun ({Tab, TabDef}) -> HasDiscCopies = table_has_copy_type(TabDef, disc_copies), HasDiscOnlyCopies = table_has_copy_type(TabDef, disc_only_copies), LocalTab = proplists:get_bool(local_content, TabDef), diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 336f74bf9a..5db1d77a32 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -111,7 +111,7 @@ action(start_all, [NodeCount], RpcTimeout) -> action(status, [], RpcTimeout) -> io:format("Status of all running nodes...~n", []), call_all_nodes( - fun({Node, Pid}) -> + fun ({Node, Pid}) -> RabbitRunning = case is_rabbit_running(Node, RpcTimeout) of false -> not_running; @@ -123,7 +123,7 @@ action(status, [], RpcTimeout) -> action(stop_all, [], RpcTimeout) -> io:format("Stopping all nodes...~n", []), - call_all_nodes(fun({Node, Pid}) -> + call_all_nodes(fun ({Node, Pid}) -> io:format("Stopping node ~p~n", [Node]), rpc:call(Node, rabbit, stop_and_halt, []), case kill_wait(Pid, RpcTimeout, false) of diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 406977b42a..6baa4b8864 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -31,31 +31,42 @@ -module(rabbit_net). -include("rabbit.hrl"). --include_lib("kernel/include/inet.hrl"). -export([async_recv/3, close/1, controlling_process/2, getstat/2, peername/1, port_command/2, send/2, sockname/1]). + %%--------------------------------------------------------------------------- -ifdef(use_specs). +-export_type([socket/0]). + -type(stat_option() :: 'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' | 'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend'). --type(error() :: {'error', any()}). - --spec(async_recv/3 :: (socket(), integer(), timeout()) -> {'ok', any()}). --spec(close/1 :: (socket()) -> 'ok' | error()). --spec(controlling_process/2 :: (socket(), pid()) -> 'ok' | error()). +-type(error() :: rabbit_types:error(any())). +-type(socket() :: rabbit_networking:ip_port() | rabbit_types:ssl_socket()). + +-spec(async_recv/3 :: + (socket(), integer(), timeout()) -> rabbit_types:ok(any())). +-spec(close/1 :: (socket()) -> rabbit_types:ok_or_error(any())). +-spec(controlling_process/2 :: + (socket(), pid()) -> rabbit_types:ok_or_error(any())). -spec(port_command/2 :: (socket(), iolist()) -> 'true'). --spec(send/2 :: (socket(), binary() | iolist()) -> 'ok' | error()). --spec(peername/1 :: (socket()) -> - {'ok', {ip_address(), non_neg_integer()}} | error()). --spec(sockname/1 :: (socket()) -> - {'ok', {ip_address(), non_neg_integer()}} | error()). --spec(getstat/2 :: (socket(), [stat_option()]) -> - {'ok', [{stat_option(), integer()}]} | error()). +-spec(send/2 :: + (socket(), binary() | iolist()) -> rabbit_types:ok_or_error(any())). +-spec(peername/1 :: + (socket()) + -> rabbit_types:ok({inet:ip_address(), rabbit_networking:ip_port()}) | + error()). +-spec(sockname/1 :: + (socket()) + -> rabbit_types:ok({inet:ip_address(), rabbit_networking:ip_port()}) | + error()). +-spec(getstat/2 :: + (socket(), [stat_option()]) + -> rabbit_types:ok([{stat_option(), integer()}]) | error()). -endif. @@ -66,7 +77,7 @@ async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) -> Pid = self(), Ref = make_ref(), - spawn(fun() -> Pid ! {inet_async, Sock, Ref, + spawn(fun () -> Pid ! {inet_async, Sock, Ref, ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)} end), diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index c3d0b7b786..3a3357ba9d 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -63,25 +63,29 @@ -ifdef(use_specs). --type(host() :: ip_address() | string() | atom()). --type(connection() :: pid()). +-export_type([ip_port/0, hostname/0]). -spec(start/0 :: () -> 'ok'). --spec(start_tcp_listener/2 :: (host(), ip_port()) -> 'ok'). --spec(start_ssl_listener/3 :: (host(), ip_port(), [info()]) -> 'ok'). --spec(stop_tcp_listener/2 :: (host(), ip_port()) -> 'ok'). --spec(active_listeners/0 :: () -> [listener()]). --spec(node_listeners/1 :: (erlang_node()) -> [listener()]). --spec(connections/0 :: () -> [connection()]). --spec(connection_info_keys/0 :: () -> [info_key()]). --spec(connection_info/1 :: (connection()) -> [info()]). --spec(connection_info/2 :: (connection(), [info_key()]) -> [info()]). --spec(connection_info_all/0 :: () -> [[info()]]). --spec(connection_info_all/1 :: ([info_key()]) -> [[info()]]). +-spec(start_tcp_listener/2 :: (hostname(), ip_port()) -> 'ok'). +-spec(start_ssl_listener/3 :: (hostname(), ip_port(), [rabbit_types:info()]) + -> 'ok'). +-spec(stop_tcp_listener/2 :: (hostname(), ip_port()) -> 'ok'). +-spec(active_listeners/0 :: () -> [rabbit_types:listener()]). +-spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]). +-spec(connections/0 :: () -> [rabbit_types:connection()]). +-spec(connection_info_keys/0 :: () -> [rabbit_types:info_key()]). +-spec(connection_info/1 :: + (rabbit_types:connection()) -> [rabbit_types:info()]). +-spec(connection_info/2 :: + (rabbit_types:connection(), [rabbit_types:info_key()]) + -> [rabbit_types:info()]). +-spec(connection_info_all/0 :: () -> [[rabbit_types:info()]]). +-spec(connection_info_all/1 :: + ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]). -spec(close_connection/2 :: (pid(), string()) -> 'ok'). --spec(on_node_down/1 :: (erlang_node()) -> 'ok'). --spec(check_tcp_listener_address/3 :: (atom(), host(), ip_port()) -> - {ip_address(), atom()}). +-spec(on_node_down/1 :: (node()) -> 'ok'). +-spec(check_tcp_listener_address/3 :: + (atom(), hostname(), ip_port()) -> {inet:ip_address(), atom()}). -endif. @@ -102,7 +106,7 @@ boot_ssl() -> {ok, []} -> ok; {ok, SslListeners} -> - ok = rabbit_misc:start_applications([crypto, ssl]), + ok = rabbit_misc:start_applications([crypto, public_key, ssl]), {ok, SslOpts} = application:get_env(ssl_options), [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners], ok diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 3cd42e4753..a427b13548 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -65,21 +65,29 @@ -ifdef(use_specs). --type(pmsg() :: {queue_name(), pkey()}). +-type(pkey() :: rabbit_guid:guid()). +-type(pmsg() :: {rabbit_amqqueue:name(), pkey()}). + -type(work_item() :: - {publish, message(), pmsg()} | + {publish, rabbit_types:message(), pmsg()} | {deliver, pmsg()} | {ack, pmsg()}). --spec(start_link/1 :: ([queue_name()]) -> - {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(start_link/1 :: + ([rabbit_amqqueue:name()]) + -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())). -spec(transaction/1 :: ([work_item()]) -> 'ok'). --spec(extend_transaction/2 :: ({txn(), queue_name()}, [work_item()]) -> 'ok'). +-spec(extend_transaction/2 :: + ({rabbit_types:txn(), rabbit_amqqueue:name()}, [work_item()]) + -> 'ok'). -spec(dirty_work/1 :: ([work_item()]) -> 'ok'). --spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). --spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). +-spec(commit_transaction/1 :: + ({rabbit_types:txn(), rabbit_amqqueue:name()}) -> 'ok'). +-spec(rollback_transaction/1 :: + ({rabbit_types:txn(), rabbit_amqqueue:name()}) -> 'ok'). -spec(force_snapshot/0 :: () -> 'ok'). --spec(queue_content/1 :: (queue_name()) -> [{message(), boolean()}]). +-spec(queue_content/1 :: + (rabbit_amqqueue:name()) -> [{rabbit_types:message(), boolean()}]). -endif. @@ -236,7 +244,7 @@ log_work(CreateWorkUnit, MessageList, snapshot = Snapshot = #psnapshot{messages = Messages}}) -> Unit = CreateWorkUnit( rabbit_misc:map_in_order( - fun(M = {publish, Message, QK = {_QName, PKey}}) -> + fun (M = {publish, Message, QK = {_QName, PKey}}) -> case ets:lookup(Messages, PKey) of [_] -> {tied, QK}; [] -> ets:insert(Messages, {PKey, Message}), diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index 274981efed..ef3c5cc250 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -108,6 +108,7 @@ start() -> WApp == stdlib; WApp == kernel; WApp == sasl; + WApp == crypto; WApp == os_mon -> false; _ -> true end]), diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl new file mode 100644 index 0000000000..ea3768d4b4 --- /dev/null +++ b/src/rabbit_queue_collector.erl @@ -0,0 +1,106 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_queue_collector). + +-behaviour(gen_server). + +-export([start_link/0, register/2, delete_all/1, shutdown/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {queues}). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> rabbit_types:ok(pid())). +-spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok'). +-spec(delete_all/1 :: (pid()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link(?MODULE, [], []). + +register(CollectorPid, Q) -> + gen_server:call(CollectorPid, {register, Q}, infinity). + +delete_all(CollectorPid) -> + gen_server:call(CollectorPid, delete_all, infinity). + +shutdown(CollectorPid) -> + gen_server:call(CollectorPid, shutdown, infinity). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, #state{queues = dict:new()}}. + +%%-------------------------------------------------------------------------- + +handle_call({register, Q}, _From, + State = #state{queues = Queues}) -> + MonitorRef = erlang:monitor(process, Q#amqqueue.pid), + {reply, ok, + State#state{queues = dict:store(MonitorRef, Q, Queues)}}; + +handle_call(delete_all, _From, State = #state{queues = Queues}) -> + [rabbit_misc:with_exit_handler( + fun () -> ok end, + fun () -> + erlang:demonitor(MonitorRef), + rabbit_amqqueue:delete(Q, false, false) + end) + || {MonitorRef, Q} <- dict:to_list(Queues)], + {reply, ok, State}; + +handle_call(shutdown, _From, State) -> + {stop, normal, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, + State = #state{queues = Queues}) -> + {noreply, State#state{queues = dict:erase(MonitorRef, Queues)}}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 12ffb331d2..5a9e74c658 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -52,10 +52,13 @@ -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 1). -define(CHANNEL_TERMINATION_TIMEOUT, 3). +-define(SILENT_CLOSE_DELAY, 3). +-define(FRAME_MAX, 131072). %% set to zero once QPid fix their negotiation %--------------------------------------------------------------------------- --record(v1, {sock, connection, callback, recv_ref, connection_state}). +-record(v1, {sock, connection, callback, recv_ref, connection_state, + queue_collector}). -define(INFO_KEYS, [pid, address, port, peer_address, peer_port, @@ -100,6 +103,8 @@ %% heartbeat timeout -> *throw* %% closing: %% socket close -> *terminate* +%% receive connection.close -> send connection.close_ok, +%% *closing* %% receive frame -> ignore, *closing* %% handshake_timeout -> ignore, *closing* %% heartbeat timeout -> *throw* @@ -116,6 +121,8 @@ %% start terminate_connection timer, *closed* %% closed: %% socket close -> *terminate* +%% receive connection.close -> send connection.close_ok, +%% *closed* %% receive connection.close_ok -> self() ! terminate_connection, %% *closed* %% receive frame -> ignore, *closed* @@ -131,11 +138,11 @@ -ifdef(use_specs). --spec(info_keys/0 :: () -> [info_key()]). --spec(info/1 :: (pid()) -> [info()]). --spec(info/2 :: (pid(), [info_key()]) -> [info()]). +-spec(info_keys/0 :: () -> [rabbit_types:info_key()]). +-spec(info/1 :: (pid()) -> [rabbit_types:info()]). +-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). --spec(server_properties/0 :: () -> amqp_table()). +-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). -endif. @@ -233,6 +240,7 @@ start_connection(Parent, Deb, Sock, SockTransform) -> erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), ProfilingValue = setup_profiling(), + {ok, Collector} = rabbit_queue_collector:start_link(), try mainloop(Parent, Deb, switch_callback( #v1{sock = ClientSock, @@ -244,7 +252,8 @@ start_connection(Parent, Deb, Sock, SockTransform) -> client_properties = none}, callback = uninitialized_callback, recv_ref = none, - connection_state = pre_init}, + connection_state = pre_init, + queue_collector = Collector}, handshake, 8)) catch Ex -> (if Ex == connection_closed_abruptly -> @@ -262,7 +271,9 @@ start_connection(Parent, Deb, Sock, SockTransform) -> %% output to be sent, which results in unnecessary delays. %% %% gen_tcp:close(ClientSock), - teardown_profiling(ProfilingValue) + teardown_profiling(ProfilingValue), + rabbit_queue_collector:shutdown(Collector), + rabbit_misc:unlink_and_capture_exit(Collector) end, done. @@ -425,11 +436,17 @@ wait_for_channel_termination(N, TimerRef) -> exit(channel_termination_timeout) end. -maybe_close(State = #v1{connection_state = closing}) -> +maybe_close(State = #v1{connection_state = closing, + queue_collector = Collector}) -> case all_channels() of - [] -> ok = send_on_channel0( - State#v1.sock, #'connection.close_ok'{}), - close_connection(State); + [] -> + %% Spec says "Exclusive queues may only be accessed by the current + %% connection, and are deleted when that connection closes." + %% This does not strictly imply synchrony, but in practice it seems + %% to be what people assume. + rabbit_queue_collector:delete_all(Collector), + ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}), + close_connection(State); _ -> State end; maybe_close(State) -> @@ -473,10 +490,18 @@ handle_frame(Type, Channel, Payload, State) -> closing -> %% According to the spec, after sending a %% channel.close we must ignore all frames except + %% channel.close and channel.close_ok. In the + %% event of a channel.close, we should send back a %% channel.close_ok. case AnalyzedFrame of {method, 'channel.close_ok', _} -> erase({channel, Channel}); + {method, 'channel.close', _} -> + %% We're already closing this channel, so + %% there's no cleanup to do (notify + %% queues, etc.) + ok = rabbit_writer:send_command(State#v1.sock, + #'channel.close_ok'{}); _ -> ok end, State; @@ -575,7 +600,11 @@ handle_method0(MethodName, FieldsBin, State) -> end, case State#v1.connection_state of running -> send_exception(State, 0, CompleteReason); - Other -> throw({channel0_error, Other, CompleteReason}) + %% We don't trust the client at this point - force + %% them to wait for a bit so they can't DOS us with + %% repeated failed logins etc. + Other -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), + throw({channel0_error, Other, CompleteReason}) end end. @@ -589,27 +618,33 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, ok = send_on_channel0( Sock, #'connection.tune'{channel_max = 0, - %% set to zero once QPid fix their negotiation - frame_max = 131072, + frame_max = ?FRAME_MAX, heartbeat = 0}), State#v1{connection_state = tuning, connection = Connection#connection{ user = User, client_properties = ClientProperties}}; -handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax, - frame_max = FrameMax, +handle_method0(#'connection.tune_ok'{frame_max = FrameMax, heartbeat = ClientHeartbeat}, State = #v1{connection_state = tuning, connection = Connection, sock = Sock}) -> - %% if we have a channel_max limit that the client wishes to - %% exceed, die as per spec. Not currently a problem, so we ignore - %% the client's channel_max parameter. - rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat), - State#v1{connection_state = opening, - connection = Connection#connection{ - timeout_sec = ClientHeartbeat, - frame_max = FrameMax}}; + if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) -> + rabbit_misc:protocol_error( + not_allowed, "frame_max=~w < ~w min size", + [FrameMax, ?FRAME_MIN_SIZE]); + (?FRAME_MAX /= 0) and (FrameMax > ?FRAME_MAX) -> + rabbit_misc:protocol_error( + not_allowed, "frame_max=~w > ~w max size", + [FrameMax, ?FRAME_MAX]); + true -> + rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat), + State#v1{connection_state = opening, + connection = Connection#connection{ + timeout_sec = ClientHeartbeat, + frame_max = FrameMax}} + end; + handle_method0(#'connection.open'{virtual_host = VHostPath, insist = Insist}, State = #v1{connection_state = opening, @@ -643,6 +678,14 @@ handle_method0(#'connection.close'{}, State = #v1{connection_state = running}) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); +handle_method0(#'connection.close'{}, + State = #v1{connection_state = CS, + sock = Sock}) + when CS =:= closing; CS =:= closed -> + %% We're already closed or closing, so we don't need to cleanup + %% anything. + ok = send_on_channel0(Sock, #'connection.close_ok'{}), + State; handle_method0(#'connection.close_ok'{}, State = #v1{connection_state = closed}) -> self() ! terminate_connection, @@ -722,15 +765,16 @@ i(Item, #v1{}) -> %%-------------------------------------------------------------------------- -send_to_new_channel(Channel, AnalyzedFrame, State) -> +send_to_new_channel(Channel, AnalyzedFrame, + State = #v1{queue_collector = Collector}) -> #v1{sock = Sock, connection = #connection{ frame_max = FrameMax, user = #user{username = Username}, vhost = VHost}} = State, WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), ChPid = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/5, - [Channel, self(), WriterPid, Username, VHost]), + fun rabbit_channel:start_link/6, + [Channel, self(), WriterPid, Username, VHost, Collector]), put({channel, Channel}, {chpid, ChPid}), put({chpid, ChPid}, {channel, Channel}), ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 03979d6c60..d50b9f3126 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -41,7 +41,13 @@ -ifdef(use_specs). --spec(deliver/2 :: ([pid()], delivery()) -> {routing_result(), [pid()]}). +-export_type([routing_key/0, routing_result/0]). + +-type(routing_key() :: binary()). +-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). + +-spec(deliver/2 :: + ([pid()], rabbit_types:delivery()) -> {routing_result(), [pid()]}). -endif. @@ -57,14 +63,17 @@ deliver(QPids, Delivery = #delivery{mandatory = false, %% is preserved. This scales much better than the non-immediate %% case below. delegate:invoke_no_result( - QPids, fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), + QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {routed, QPids}; deliver(QPids, Delivery) -> {Success, _} = delegate:invoke(QPids, - fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), - {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Success), + fun (Pid) -> + rabbit_amqqueue:deliver(Pid, Delivery) + end), + {Routed, Handled} = + lists:foldl(fun fold_deliveries/2, {false, []}, Success), check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, {Routed, Handled}). @@ -87,13 +96,13 @@ match_routing_key(Name, RoutingKey) -> lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). lookup_qpids(Queues) -> - sets:fold( - fun(Key, Acc) -> + lists:foldl( + fun (Key, Acc) -> case mnesia:dirty_read({rabbit_queue, Key}) of [#amqqueue{pid = QPid}] -> [QPid | Acc]; [] -> Acc end - end, [], sets:from_list(Queues)). + end, [], lists:usort(Queues)). %%-------------------------------------------------------------------- diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl index 434cdae050..eb2037c21b 100644 --- a/src/rabbit_sasl_report_file_h.erl +++ b/src/rabbit_sasl_report_file_h.erl @@ -33,7 +33,8 @@ -behaviour(gen_event). --export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]). +-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, + code_change/3]). %% rabbit_sasl_report_file_h is a wrapper around the sasl_report_file_h %% module because the original's init/1 does not match properly diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 76ebd982f4..ff7c07e37e 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -41,6 +41,7 @@ -import(lists). -include("rabbit.hrl"). +-include("rabbit_framing.hrl"). -include_lib("kernel/include/file.hrl"). test_content_prop_roundtrip(Datum, Binary) -> @@ -54,10 +55,12 @@ all_tests() -> passed = test_pg_local(), passed = test_unfold(), passed = test_parsing(), + passed = test_content_framing(), passed = test_topic_matching(), passed = test_log_management(), passed = test_app_management(), passed = test_log_management_during_startup(), + passed = test_memory_pressure(), passed = test_cluster_management(), passed = test_user_management(), passed = test_server_status(), @@ -351,6 +354,41 @@ test_field_values() -> >>), passed. +%% Test that content frames don't exceed frame-max +test_content_framing(FrameMax, BodyBin) -> + [Header | Frames] = + rabbit_binary_generator:build_simple_content_frames( + 1, + rabbit_binary_generator:ensure_content_encoded( + rabbit_basic:build_content(#'P_basic'{}, BodyBin)), + FrameMax), + %% header is formatted correctly and the size is the total of the + %% fragments + <<_FrameHeader:7/binary, _ClassAndWeight:4/binary, + BodySize:64/unsigned, _Rest/binary>> = list_to_binary(Header), + BodySize = size(BodyBin), + true = lists:all( + fun (ContentFrame) -> + FrameBinary = list_to_binary(ContentFrame), + %% assert + <<_TypeAndChannel:3/binary, + Size:32/unsigned, _Payload:Size/binary, 16#CE>> = + FrameBinary, + size(FrameBinary) =< FrameMax + end, Frames), + passed. + +test_content_framing() -> + %% no content + passed = test_content_framing(4096, <<>>), + %% easily fit in one frame + passed = test_content_framing(4096, <<"Easy">>), + %% exactly one frame (empty frame = 8 bytes) + passed = test_content_framing(11, <<"One">>), + %% more than one frame + passed = test_content_framing(11, <<"More than one frame">>), + passed. + test_topic_match(P, R) -> test_topic_match(P, R, true). @@ -559,19 +597,19 @@ test_cluster_management() -> ok = control_action(reset, []), lists:foreach(fun (Arg) -> - ok = control_action(cluster, Arg), + ok = control_action(force_cluster, Arg), ok end, ClusteringSequence), lists:foreach(fun (Arg) -> ok = control_action(reset, []), - ok = control_action(cluster, Arg), + ok = control_action(force_cluster, Arg), ok end, ClusteringSequence), ok = control_action(reset, []), lists:foreach(fun (Arg) -> - ok = control_action(cluster, Arg), + ok = control_action(force_cluster, Arg), ok = control_action(start_app, []), ok = control_action(stop_app, []), ok @@ -579,7 +617,7 @@ test_cluster_management() -> ClusteringSequence), lists:foreach(fun (Arg) -> ok = control_action(reset, []), - ok = control_action(cluster, Arg), + ok = control_action(force_cluster, Arg), ok = control_action(start_app, []), ok = control_action(stop_app, []), ok @@ -590,13 +628,13 @@ test_cluster_management() -> ok = control_action(reset, []), ok = control_action(start_app, []), ok = control_action(stop_app, []), - ok = control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + ok = control_action(force_cluster, ["invalid1@invalid", + "invalid2@invalid"]), %% join a non-existing cluster as a ram node ok = control_action(reset, []), - ok = control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + ok = control_action(force_cluster, ["invalid1@invalid", + "invalid2@invalid"]), SecondaryNode = rabbit_misc:makenode("hare"), case net_adm:ping(SecondaryNode) of @@ -621,18 +659,26 @@ test_cluster_management2(SecondaryNode) -> %% join cluster as a ram node ok = control_action(reset, []), - ok = control_action(cluster, [SecondaryNodeS, "invalid1@invalid"]), + ok = control_action(force_cluster, [SecondaryNodeS, "invalid1@invalid"]), ok = control_action(start_app, []), ok = control_action(stop_app, []), %% change cluster config while remaining in same cluster - ok = control_action(cluster, ["invalid2@invalid", SecondaryNodeS]), + ok = control_action(force_cluster, ["invalid2@invalid", SecondaryNodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), %% join non-existing cluster as a ram node - ok = control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + ok = control_action(force_cluster, ["invalid1@invalid", + "invalid2@invalid"]), + ok = control_action(start_app, []), + ok = control_action(stop_app, []), + + %% join empty cluster as a ram node + ok = control_action(cluster, []), + ok = control_action(start_app, []), + ok = control_action(stop_app, []), + %% turn ram node into disk node ok = control_action(reset, []), ok = control_action(cluster, [SecondaryNodeS, NodeS]), @@ -640,8 +686,8 @@ test_cluster_management2(SecondaryNode) -> ok = control_action(stop_app, []), %% convert a disk node into a ram node - ok = control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + ok = control_action(force_cluster, ["invalid1@invalid", + "invalid2@invalid"]), %% turn a disk node into a ram node ok = control_action(reset, []), @@ -746,17 +792,17 @@ test_user_management() -> passed. test_server_status() -> - %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), - Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>), - [Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare( + Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, + self()), + [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], + {new, Queue = #amqqueue{}} <- + [rabbit_amqqueue:declare( rabbit_misc:r(<<"/">>, queue, Name), - false, false, []) || - Name <- [<<"foo">>, <<"bar">>]], + false, false, [], none)]], - ok = rabbit_amqqueue:claim_queue(Q, self()), - ok = rabbit_amqqueue:basic_consume(Q, true, self(), Ch, undefined, + ok = rabbit_amqqueue:basic_consume(Q, true, Ch, undefined, <<"ctag">>, true, undefined), %% list queues @@ -823,7 +869,7 @@ test_hooks() -> {[arg1, arg2], 1, 3} = get(arg_hook_test_fired), %% Invoking Pids - Remote = fun() -> + Remote = fun () -> receive {rabbitmq_hook,[remote_test,test,[],Target]} -> Target ! invoked @@ -840,11 +886,133 @@ test_hooks() -> end, passed. +test_memory_pressure_receiver(Pid) -> + receive + shutdown -> + ok; + {send_command, Method} -> + ok = case Method of + #'channel.flow'{} -> ok; + #'basic.qos_ok'{} -> ok; + #'channel.open_ok'{} -> ok + end, + Pid ! Method, + test_memory_pressure_receiver(Pid); + sync -> + Pid ! sync, + test_memory_pressure_receiver(Pid) + end. + +test_memory_pressure_receive_flow(Active) -> + receive #'channel.flow'{active = Active} -> ok + after 1000 -> throw(failed_to_receive_channel_flow) + end, + receive #'channel.flow'{} -> + throw(pipelining_sync_commands_detected) + after 0 -> + ok + end. + +test_memory_pressure_sync(Ch, Writer) -> + ok = rabbit_channel:do(Ch, #'basic.qos'{}), + Writer ! sync, + receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, + receive #'basic.qos_ok'{} -> ok + after 1000 -> throw(failed_to_receive_basic_qos_ok) + end. + +test_memory_pressure_spawn() -> + Me = self(), + Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end), + Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, + self()), + ok = rabbit_channel:do(Ch, #'channel.open'{}), + MRef = erlang:monitor(process, Ch), + receive #'channel.open_ok'{} -> ok + after 1000 -> throw(failed_to_receive_channel_open_ok) + end, + {Writer, Ch, MRef}. + +expect_normal_channel_termination(MRef, Ch) -> + receive {'DOWN', MRef, process, Ch, normal} -> ok + after 1000 -> throw(channel_failed_to_exit) + end. + +test_memory_pressure() -> + {Writer0, Ch0, MRef0} = test_memory_pressure_spawn(), + [ok = rabbit_channel:conserve_memory(Ch0, Conserve) || + Conserve <- [false, false, true, false, true, true, false]], + ok = test_memory_pressure_sync(Ch0, Writer0), + receive {'DOWN', MRef0, process, Ch0, Info0} -> + throw({channel_died_early, Info0}) + after 0 -> ok + end, + + %% we should have just 1 active=false waiting for us + ok = test_memory_pressure_receive_flow(false), + + %% if we reply with flow_ok, we should immediately get an + %% active=true back + ok = rabbit_channel:do(Ch0, #'channel.flow_ok'{active = false}), + ok = test_memory_pressure_receive_flow(true), + + %% if we publish at this point, the channel should die + Content = rabbit_basic:build_content(#'P_basic'{}, <<>>), + ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content), + expect_normal_channel_termination(MRef0, Ch0), + + {Writer1, Ch1, MRef1} = test_memory_pressure_spawn(), + ok = rabbit_channel:conserve_memory(Ch1, true), + ok = test_memory_pressure_receive_flow(false), + ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}), + ok = test_memory_pressure_sync(Ch1, Writer1), + ok = rabbit_channel:conserve_memory(Ch1, false), + ok = test_memory_pressure_receive_flow(true), + %% send back the wrong flow_ok. Channel should die. + ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}), + expect_normal_channel_termination(MRef1, Ch1), + + {_Writer2, Ch2, MRef2} = test_memory_pressure_spawn(), + %% just out of the blue, send a flow_ok. Life should end. + ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}), + expect_normal_channel_termination(MRef2, Ch2), + + {_Writer3, Ch3, MRef3} = test_memory_pressure_spawn(), + ok = rabbit_channel:conserve_memory(Ch3, true), + receive {'DOWN', MRef3, process, Ch3, _} -> + ok + after 12000 -> + throw(channel_failed_to_exit) + end, + + alarm_handler:set_alarm({vm_memory_high_watermark, []}), + Me = self(), + Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end), + Ch4 = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, <<"/">>, + self()), + ok = rabbit_channel:do(Ch4, #'channel.open'{}), + MRef4 = erlang:monitor(process, Ch4), + Writer4 ! sync, + receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, + receive #'channel.open_ok'{} -> throw(unexpected_channel_open_ok) + after 0 -> ok + end, + alarm_handler:clear_alarm(vm_memory_high_watermark), + Writer4 ! sync, + receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, + receive #'channel.open_ok'{} -> ok + after 1000 -> throw(failed_to_receive_channel_open_ok) + end, + rabbit_channel:shutdown(Ch4), + expect_normal_channel_termination(MRef4, Ch4), + + passed. + test_delegates_async(SecondaryNode) -> Self = self(), - Sender = fun(Pid) -> Pid ! {invoked, Self} end, + Sender = fun (Pid) -> Pid ! {invoked, Self} end, - Responder = make_responder(fun({invoked, Pid}) -> Pid ! response end), + Responder = make_responder(fun ({invoked, Pid}) -> Pid ! response end), ok = delegate:invoke_no_result(spawn(Responder), Sender), ok = delegate:invoke_no_result(spawn(SecondaryNode, Responder), Sender), @@ -857,10 +1025,11 @@ test_delegates_async(SecondaryNode) -> passed. -make_responder(FMsg) -> - fun() -> +make_responder(FMsg) -> make_responder(FMsg, timeout). +make_responder(FMsg, Throw) -> + fun () -> receive Msg -> FMsg(Msg) - after 1000 -> throw(timeout) + after 1000 -> throw(Throw) end end. @@ -887,24 +1056,28 @@ must_exit(Fun) -> end. test_delegates_sync(SecondaryNode) -> - Sender = fun(Pid) -> gen_server:call(Pid, invoked) end, - BadSender = fun(_Pid) -> exit(exception) end, + Sender = fun (Pid) -> gen_server:call(Pid, invoked) end, + BadSender = fun (_Pid) -> exit(exception) end, - Responder = make_responder(fun({'$gen_call', From, invoked}) -> + Responder = make_responder(fun ({'$gen_call', From, invoked}) -> gen_server:reply(From, response) end), + BadResponder = make_responder(fun ({'$gen_call', From, invoked}) -> + gen_server:reply(From, response) + end, bad_responder_died), + response = delegate:invoke(spawn(Responder), Sender), response = delegate:invoke(spawn(SecondaryNode, Responder), Sender), - must_exit(fun() -> delegate:invoke(spawn(Responder), BadSender) end), - must_exit(fun() -> - delegate:invoke(spawn(SecondaryNode, Responder), BadSender) end), + must_exit(fun () -> delegate:invoke(spawn(BadResponder), BadSender) end), + must_exit(fun () -> + delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end), LocalGoodPids = spawn_responders(node(), Responder, 2), RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2), - LocalBadPids = spawn_responders(node(), Responder, 2), - RemoteBadPids = spawn_responders(SecondaryNode, Responder, 2), + LocalBadPids = spawn_responders(node(), BadResponder, 2), + RemoteBadPids = spawn_responders(SecondaryNode, BadResponder, 2), {GoodRes, []} = delegate:invoke(LocalGoodPids ++ RemoteGoodPids, Sender), true = lists:all(fun ({_, response}) -> true end, GoodRes), diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl new file mode 100644 index 0000000000..2e492b80bd --- /dev/null +++ b/src/rabbit_types.erl @@ -0,0 +1,145 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_types). + +-include("rabbit.hrl"). + +-ifdef(use_specs). + +-export_type([txn/0, maybe/1, info/0, info_key/0, message/0, basic_message/0, + delivery/0, content/0, decoded_content/0, undecoded_content/0, + unencoded_content/0, encoded_content/0, vhost/0, ctag/0, + amqp_error/0, r/1, r2/2, r3/3, ssl_socket/0, listener/0, + binding/0, amqqueue/0, exchange/0, connection/0, user/0, + error/1, ok_or_error/1, ok_or_error2/2, ok/1]). + +-type(maybe(T) :: T | 'none'). +-type(vhost() :: binary()). +-type(ctag() :: binary()). + +%% TODO: make this more precise by tying specific class_ids to +%% specific properties +-type(undecoded_content() :: + #content{class_id :: rabbit_framing:amqp_class_id(), + properties :: 'none', + properties_bin :: binary(), + payload_fragments_rev :: [binary()]} | + #content{class_id :: rabbit_framing:amqp_class_id(), + properties :: rabbit_framing:amqp_property_record(), + properties_bin :: 'none', + payload_fragments_rev :: [binary()]}). +-type(unencoded_content() :: undecoded_content()). +-type(decoded_content() :: + #content{class_id :: rabbit_framing:amqp_class_id(), + properties :: rabbit_framing:amqp_property_record(), + properties_bin :: maybe(binary()), + payload_fragments_rev :: [binary()]}). +-type(encoded_content() :: + #content{class_id :: rabbit_framing:amqp_class_id(), + properties :: maybe(rabbit_framing:amqp_property_record()), + properties_bin :: binary(), + payload_fragments_rev :: [binary()]}). +-type(content() :: undecoded_content() | decoded_content()). +-type(basic_message() :: + #basic_message{exchange_name :: rabbit_exchange:name(), + routing_key :: rabbit_router:routing_key(), + content :: content(), + guid :: rabbit_guid:guid(), + is_persistent :: boolean()}). +-type(message() :: basic_message()). +-type(delivery() :: + #delivery{mandatory :: boolean(), + immediate :: boolean(), + txn :: maybe(txn()), + sender :: pid(), + message :: message()}). + +%% this is really an abstract type, but dialyzer does not support them +-type(txn() :: rabbit_guid:guid()). + +-type(info_key() :: atom()). +-type(info() :: {info_key(), any()}). + +-type(amqp_error() :: + #amqp_error{name :: rabbit_framing:amqp_exception(), + explanation :: string(), + method :: rabbit_framing:amqp_method_name()}). + +-type(r(Kind) :: + r2(vhost(), Kind)). +-type(r2(VirtualHost, Kind) :: + r3(VirtualHost, Kind, rabbit_misc:resource_name())). +-type(r3(VirtualHost, Kind, Name) :: + #resource{virtual_host :: VirtualHost, + kind :: Kind, + name :: Name}). + +-type(ssl_socket() :: #ssl_socket{}). + +-type(listener() :: + #listener{node :: node(), + protocol :: atom(), + host :: rabbit_networking:hostname(), + port :: rabbit_networking:ip_port()}). + +-type(binding() :: + #binding{exchange_name :: rabbit_exchange:name(), + queue_name :: rabbit_amqqueue:name(), + key :: rabbit_exchange:binding_key()}). + +-type(amqqueue() :: + #amqqueue{name :: rabbit_amqqueue:name(), + durable :: boolean(), + auto_delete :: boolean(), + exclusive_owner :: rabbit_types:maybe(pid()), + arguments :: rabbit_framing:amqp_table(), + pid :: rabbit_types:maybe(pid())}). + +-type(exchange() :: + #exchange{name :: rabbit_exchange:name(), + type :: rabbit_exchange:type(), + durable :: boolean(), + auto_delete :: boolean(), + arguments :: rabbit_framing:amqp_table()}). + +-type(connection() :: pid()). + +-type(user() :: + #user{username :: rabbit_access_control:username(), + password :: rabbit_access_control:password()}). + +-type(ok(A) :: {'ok', A}). +-type(error(A) :: {'error', A}). +-type(ok_or_error(A) :: 'ok' | error(A)). +-type(ok_or_error2(A, B) :: ok(A) | error(B)). + +-endif. % use_specs diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 54c60f5be0..8060203897 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -48,20 +48,37 @@ -ifdef(use_specs). --spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). --spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). --spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). --spec(send_command/3 :: (pid(), amqp_method(), content()) -> 'ok'). --spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok'). +-spec(start/3 :: + (rabbit_net:socket(), rabbit_channel:channel_number(), + non_neg_integer()) + -> pid()). +-spec(start_link/3 :: + (rabbit_net:socket(), rabbit_channel:channel_number(), + non_neg_integer()) + -> pid()). +-spec(send_command/2 :: + (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). +-spec(send_command/3 :: + (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) + -> 'ok'). +-spec(send_command_and_signal_back/3 :: + (pid(), rabbit_framing:amqp_method(), pid()) -> 'ok'). -spec(send_command_and_signal_back/4 :: - (pid(), amqp_method(), content(), pid()) -> 'ok'). + (pid(), rabbit_framing:amqp_method(), rabbit_types:content(), pid()) + -> 'ok'). -spec(send_command_and_notify/5 :: - (pid(), pid(), pid(), amqp_method(), content()) -> 'ok'). + (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), + rabbit_types:content()) + -> 'ok'). -spec(internal_send_command/3 :: - (socket(), channel_number(), amqp_method()) -> 'ok'). + (rabbit_net:socket(), rabbit_channel:channel_number(), + rabbit_framing:amqp_method_record()) + -> 'ok'). -spec(internal_send_command/5 :: - (socket(), channel_number(), amqp_method(), - content(), non_neg_integer()) -> 'ok'). + (rabbit_net:socket(), rabbit_channel:channel_number(), + rabbit_framing:amqp_method_record(), rabbit_types:content(), + non_neg_integer()) + -> 'ok'). -endif. @@ -149,6 +166,7 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> shutdown(W) -> W ! shutdown, + rabbit_misc:unlink_and_capture_exit(W), ok. %--------------------------------------------------------------------------- diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 5575351269..03dc0f990f 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -11,20 +11,20 @@ %% All modifications are (C) 2010 LShift Ltd. %% %% %CopyrightBegin% -%% +%% %% Copyright Ericsson AB 1996-2009. All Rights Reserved. -%% +%% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. -%% +%% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. -%% +%% %% %CopyrightEnd% %% -module(supervisor2). @@ -301,13 +301,13 @@ handle_call({terminate_child, Name}, _From, State) -> handle_call(which_children, _From, State) when ?is_simple(State) -> [#child{child_type = CT, modules = Mods}] = State#state.children, - Reply = lists:map(fun({Pid, _}) -> {undefined, Pid, CT, Mods} end, + Reply = lists:map(fun ({Pid, _}) -> {undefined, Pid, CT, Mods} end, ?DICT:to_list(State#state.dynamics)), {reply, Reply, State}; handle_call(which_children, _From, State) -> Resp = - lists:map(fun(#child{pid = Pid, name = Name, + lists:map(fun (#child{pid = Pid, name = Name, child_type = ChildType, modules = Mods}) -> {Name, Pid, ChildType, Mods} end, @@ -415,7 +415,7 @@ update_childspec1([], Children, KeepOld) -> lists:reverse(Children ++ KeepOld). update_chsp(OldCh, Children) -> - case lists:map(fun(Ch) when OldCh#child.name =:= Ch#child.name -> + case lists:map(fun (Ch) when OldCh#child.name =:= Ch#child.name -> Ch#child{pid = OldCh#child.pid}; (Ch) -> Ch @@ -828,7 +828,7 @@ validShutdown(Shutdown, _) -> throw({invalid_shutdown, Shutdown}). validMods(dynamic) -> true; validMods(Mods) when is_list(Mods) -> - lists:foreach(fun(Mod) -> + lists:foreach(fun (Mod) -> if is_atom(Mod) -> ok; true -> throw({invalid_module, Mod}) diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 3b23daa5c1..cc4982c9cb 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -75,6 +75,13 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n", [inet_parse:ntoa(Address), Port, inet_parse:ntoa(PeerAddress), PeerPort]), + %% In the event that somebody floods us with connections we can spew + %% the above message at error_logger faster than it can keep up. + %% So error_logger's mailbox grows unbounded until we eat all the + %% memory available and crash. So here's a meaningless synchronous call + %% to the underlying gen_event mechanism - when it returns the mailbox + %% is drained. + gen_event:which_handlers(error_logger), %% handle file_handle_cache:release_on_death(apply(M, F, A ++ [Sock])) catch {inet_error, Reason} -> diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index cd03fcc6e6..bbc3a8c017 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -72,8 +72,10 @@ -ifdef(use_specs). --spec(start_link/1 :: (float()) -> - ('ignore' | {'error', any()} | {'ok', pid()})). +-spec(start_link/1 :: + (float()) -> 'ignore' | + rabbit_types:error(any()) | + rabbit_types:ok(pid())). -spec(update/0 :: () -> 'ok'). -spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')). -spec(get_vm_limit/0 :: () -> (non_neg_integer() | 'unknown')). diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 97e075459f..01ce3535d8 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -52,7 +52,7 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())). -spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A). -spec(submit_async/1 :: (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl index 4ded63a8db..afa21164be 100644 --- a/src/worker_pool_sup.erl +++ b/src/worker_pool_sup.erl @@ -41,9 +41,9 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). --spec(start_link/1 :: - (non_neg_integer()) -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())). +-spec(start_link/1 :: (non_neg_integer()) -> + 'ignore' | rabbit_types:ok_or_error2(pid(), any())). -endif. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 57901fd5cf..a61e4cc372 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -44,7 +44,8 @@ -ifdef(use_specs). --spec(start_link/1 :: (any()) -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(start_link/1 :: + (any()) -> {'ok', pid()} | 'ignore' | rabbit_types:error(any())). -spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A). -spec(submit_async/2 :: (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). |
