diff options
65 files changed, 2144 insertions, 1020 deletions
@@ -1,18 +1,18 @@ syntax: glob *.beam *~ +*.swp +*.patch erl_crash.dump syntax: regexp ^cover/ ^dist/ -^include/rabbit_framing.hrl$ -^src/rabbit_framing.erl$ -^rabbit.plt$ -^ebin/rabbit.app$ -^ebin/rabbit.rel$ -^ebin/rabbit.boot$ -^ebin/rabbit.script$ +^include/rabbit_framing\.hrl$ +^src/rabbit_framing\.erl$ +^rabbit\.plt$ +^basic.plt$ +^ebin/rabbit\.(app|rel|boot|script)$ ^plugins/ ^priv/plugins/ @@ -1,29 +1,31 @@ -ifndef TMPDIR -TMPDIR := /tmp -endif -RABBITMQ_NODENAME=rabbit -RABBITMQ_SERVER_START_ARGS= -RABBITMQ_MNESIA_DIR=$(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-mnesia -RABBITMQ_LOG_BASE=$(TMPDIR) +TMPDIR ?= /tmp + +RABBITMQ_NODENAME ?= rabbit +RABBITMQ_SERVER_START_ARGS ?= +RABBITMQ_MNESIA_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-mnesia +RABBITMQ_LOG_BASE ?= $(TMPDIR) SOURCE_DIR=src EBIN_DIR=ebin INCLUDE_DIR=include SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) -BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES)) +BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES)) TARGETS=$(EBIN_DIR)/rabbit.app $(BEAM_TARGETS) WEB_URL=http://stage.rabbitmq.com/ MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod)) PYTHON=python +BASIC_PLT=basic.plt +RABBIT_PLT=rabbit.plt + ifndef USE_SPECS # our type specs rely on features / bug fixes in dialyzer that are -# only available in R12B-3 upwards +# only available in R13B01 upwards (R13B01 is eshell 5.7.2) # # NB: the test assumes that version number will only contain single digits -USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.6.2" ]; then echo "true"; else echo "false"; fi) +USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.1" ]; then echo "true"; else echo "false"; fi) endif #other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests @@ -39,9 +41,8 @@ AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.9.1.json ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e -# for the moment we don't use boot files because they introduce a -# dependency on particular versions of OTP applications -#all: $(EBIN_DIR)/rabbit.boot +ERL_EBIN=erl -noinput -pa $(EBIN_DIR) + all: $(TARGETS) $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app @@ -60,17 +61,32 @@ $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.p $(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@ -$(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script: $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.rel $(TARGETS) - erl -noshell -eval 'systools:make_script("ebin/rabbit", [{path, ["ebin"]}]), halt().' +dialyze: $(BEAM_TARGETS) $(BASIC_PLT) + $(ERL_EBIN) -eval \ + "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:dialyze_files(\"$(BASIC_PLT)\", \"$(BEAM_TARGETS)\"))." -dialyze: $(BEAM_TARGETS) - dialyzer -c $? +# rabbit.plt is used by rabbitmq-erlang-client's dialyze make target +create-plt: $(RABBIT_PLT) + +$(RABBIT_PLT): $(BEAM_TARGETS) $(BASIC_PLT) + cp $(BASIC_PLT) $@ + $(ERL_EBIN) -eval \ + "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:add_to_plt(\"$@\", \"$(BEAM_TARGETS)\"))." + +$(BASIC_PLT): $(BEAM_TARGETS) + if [ -f $@ ]; then \ + touch $@; \ + else \ + $(ERL_EBIN) -eval \ + "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:create_basic_plt(\"$@\"))."; \ + fi clean: rm -f $(EBIN_DIR)/*.beam - rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script + rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc rm -f docs/*.[0-9].gz + rm -f $(RABBIT_PLT) cleandb: rm -rf $(RABBITMQ_MNESIA_DIR)/* @@ -85,13 +101,14 @@ BASIC_SCRIPT_ENVIRONMENT_SETTINGS=\ run: all $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ - RABBITMQ_NODE_ONLY=true \ - RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -s rabbit" \ + RABBITMQ_ALLOW_INPUT=true \ + RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS)" \ ./scripts/rabbitmq-server run-node: all $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ RABBITMQ_NODE_ONLY=true \ + RABBITMQ_ALLOW_INPUT=true \ RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS)" \ ./scripts/rabbitmq-server @@ -101,7 +118,8 @@ run-tests: all start-background-node: $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ RABBITMQ_NODE_ONLY=true \ - ./scripts/rabbitmq-server -detached; sleep 1 + RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -detached" \ + ./scripts/rabbitmq-server ; sleep 1 start-rabbit-on-node: all echo "rabbit:start()." | $(ERL_CALL) @@ -115,8 +133,11 @@ force-snapshot: all stop-node: -$(ERL_CALL) -q +# code coverage will be created for subdirectory "ebin" of COVER_DIR +COVER_DIR=. + start-cover: all - echo "cover:start(), rabbit_misc:enable_cover()." | $(ERL_CALL) + echo "cover:start(), rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL) stop-cover: all echo "rabbit_misc:report_cover(), cover:stop()." | $(ERL_CALL) @@ -136,7 +157,7 @@ srcdist: distclean sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ - cp codegen.py Makefile generate_app $(TARGET_SRC_DIR) + cp codegen.py Makefile generate_app calculate-relative $(TARGET_SRC_DIR) cp -r scripts $(TARGET_SRC_DIR) cp -r docs $(TARGET_SRC_DIR) @@ -147,7 +168,7 @@ srcdist: distclean rm -rf $(TARGET_SRC_DIR) distclean: clean - make -C $(AMQP_CODEGEN_DIR) distclean + $(MAKE) -C $(AMQP_CODEGEN_DIR) distclean rm -rf dist find . -regex '.*\(~\|#\|\.swp\|\.dump\)' -exec rm {} \; @@ -162,7 +183,8 @@ distclean: clean docs_all: $(MANPAGES) -install: all docs_all +install: SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR)) +install: all docs_all install_dirs @[ -n "$(TARGET_DIR)" ] || (echo "Please set TARGET_DIR."; false) @[ -n "$(SBIN_DIR)" ] || (echo "Please set SBIN_DIR."; false) @[ -n "$(MAN_DIR)" ] || (echo "Please set MAN_DIR."; false) @@ -171,13 +193,17 @@ install: all docs_all cp -r ebin include LICENSE LICENSE-MPL-RabbitMQ INSTALL $(TARGET_DIR) chmod 0755 scripts/* - mkdir -p $(SBIN_DIR) - cp scripts/rabbitmq-server $(SBIN_DIR) - cp scripts/rabbitmqctl $(SBIN_DIR) - cp scripts/rabbitmq-multi $(SBIN_DIR) + for script in rabbitmq-env rabbitmq-server rabbitmqctl rabbitmq-multi rabbitmq-activate-plugins rabbitmq-deactivate-plugins; do \ + cp scripts/$$script $(TARGET_DIR)/sbin; \ + [ -e $(SBIN_DIR)/$$script ] || ln -s $(SCRIPTS_REL_PATH)/$$script $(SBIN_DIR)/$$script; \ + done for section in 1 5; do \ mkdir -p $(MAN_DIR)/man$$section; \ for manpage in docs/*.$$section.pod; do \ cp docs/`basename $$manpage .pod`.gz $(MAN_DIR)/man$$section; \ done; \ done + +install_dirs: + mkdir -p $(SBIN_DIR) + mkdir -p $(TARGET_DIR)/sbin diff --git a/calculate-relative b/calculate-relative new file mode 100755 index 0000000000..3af18e8ff8 --- /dev/null +++ b/calculate-relative @@ -0,0 +1,45 @@ +#!/usr/bin/env python +# +# relpath.py +# R.Barran 30/08/2004 +# Retrieved from http://code.activestate.com/recipes/302594/ + +import os +import sys + +def relpath(target, base=os.curdir): + """ + Return a relative path to the target from either the current dir or an optional base dir. + Base can be a directory specified either as absolute or relative to current dir. + """ + + if not os.path.exists(target): + raise OSError, 'Target does not exist: '+target + + if not os.path.isdir(base): + raise OSError, 'Base is not a directory or does not exist: '+base + + base_list = (os.path.abspath(base)).split(os.sep) + target_list = (os.path.abspath(target)).split(os.sep) + + # On the windows platform the target may be on a completely different drive from the base. + if os.name in ['nt','dos','os2'] and base_list[0] <> target_list[0]: + raise OSError, 'Target is on a different drive to base. Target: '+target_list[0].upper()+', base: '+base_list[0].upper() + + # Starting from the filepath root, work out how much of the filepath is + # shared by base and target. + for i in range(min(len(base_list), len(target_list))): + if base_list[i] <> target_list[i]: break + else: + # If we broke out of the loop, i is pointing to the first differing path elements. + # If we didn't break out of the loop, i is pointing to identical path elements. + # Increment i so that in all cases it points to the first differing path elements. + i+=1 + + rel_list = [os.pardir] * (len(base_list)-i) + target_list[i:] + if (len(rel_list) == 0): + return "." + return os.path.join(*rel_list) + +if __name__ == "__main__": + print(relpath(sys.argv[1], sys.argv[2])) diff --git a/codegen.py b/codegen.py index 997f3e32d9..a21dd779d5 100644 --- a/codegen.py +++ b/codegen.py @@ -117,6 +117,13 @@ def genErl(spec): def genMethodHasContent(m): print "method_has_content(%s) -> %s;" % (m.erlangName(), str(m.hasContent).lower()) + + def genMethodIsSynchronous(m): + hasNoWait = "nowait" in fieldNameList(m.arguments) + if m.isSynchronous and hasNoWait: + print "is_method_synchronous(#%s{nowait = NoWait}) -> not(NoWait);" % (m.erlangName()) + else: + print "is_method_synchronous(#%s{}) -> %s;" % (m.erlangName(), str(m.isSynchronous).lower()) def genMethodFieldTypes(m): """Not currently used - may be useful in future?""" @@ -246,6 +253,7 @@ def genErl(spec): -export([method_id/1]). -export([method_has_content/1]). +-export([is_method_synchronous/1]). -export([method_fieldnames/1]). -export([decode_method_fields/2]). -export([decode_properties/2]). @@ -266,6 +274,9 @@ bitvalue(undefined) -> 0. for m in methods: genMethodHasContent(m) print "method_has_content(Name) -> exit({unknown_method_name, Name})." + for m in methods: genMethodIsSynchronous(m) + print "is_method_synchronous(Name) -> exit({unknown_method_name, Name})." + for m in methods: genMethodFieldNames(m) print "method_fieldnames(Name) -> exit({unknown_method_name, Name})." diff --git a/docs/rabbitmq-activate-plugins.1.pod b/docs/rabbitmq-activate-plugins.1.pod new file mode 100644 index 0000000000..42f0c4d2e8 --- /dev/null +++ b/docs/rabbitmq-activate-plugins.1.pod @@ -0,0 +1,37 @@ +=head1 NAME + +rabbitmq-activate-plugins - command line tool for activating plugins +in a RabbitMQ broker + +=head1 SYNOPSIS + +rabbitmq-activate-plugins + +=head1 DESCRIPTION + +RabbitMQ is an implementation of AMQP, the emerging standard for high +performance enterprise messaging. The RabbitMQ server is a robust and +scalable implementation of an AMQP broker. + +rabbitmq-activate-plugins is a command line tool for activating +plugins installed into the broker's plugins directory. + +=head1 EXAMPLES + +To activate all of the installed plugins in the current RabbitMQ install, +execute: + + rabbitmq-activate-plugins + +=head1 SEE ALSO + +L<rabbitmq.conf(5)>, L<rabbitmq-multi(1)>, L<rabbitmq-server(1)>, +L<rabbitmqctl(1)>, L<rabbitmq-deactivate-plugins(1)> + +=head1 AUTHOR + +The RabbitMQ Team <info@rabbitmq.com> + +=head1 REFERENCES + +RabbitMQ Web Site: L<http://www.rabbitmq.com> diff --git a/docs/rabbitmq-deactivate-plugins.1.pod b/docs/rabbitmq-deactivate-plugins.1.pod new file mode 100644 index 0000000000..eb4fbb90a3 --- /dev/null +++ b/docs/rabbitmq-deactivate-plugins.1.pod @@ -0,0 +1,37 @@ +=head1 NAME + +rabbitmq-deactivate-plugins - command line tool for deactivating plugins +in a RabbitMQ broker + +=head1 SYNOPSIS + +rabbitmq-deactivate-plugins + +=head1 DESCRIPTION + +RabbitMQ is an implementation of AMQP, the emerging standard for high +performance enterprise messaging. The RabbitMQ server is a robust and +scalable implementation of an AMQP broker. + +rabbitmq-deactivate-plugins is a command line tool for deactivating +plugins installed into the broker. + +=head1 EXAMPLES + +To deactivate all of the installed plugins in the current RabbitMQ install, +execute: + + rabbitmq-deactivate-plugins + +=head1 SEE ALSO + +L<rabbitmq.conf(5)>, L<rabbitmq-multi(1)>, L<rabbitmq-server(1)>, +L<rabbitmqctl(1)>, L<rabbitmq-activate-plugins(1)> + +=head1 AUTHOR + +The RabbitMQ Team <info@rabbitmq.com> + +=head1 REFERENCES + +RabbitMQ Web Site: L<http://www.rabbitmq.com> diff --git a/docs/rabbitmq-multi.1.pod b/docs/rabbitmq-multi.1.pod index 23fd96ed65..640609eef9 100644 --- a/docs/rabbitmq-multi.1.pod +++ b/docs/rabbitmq-multi.1.pod @@ -15,22 +15,30 @@ scalable implementation of an AMQP broker. rabbitmq-multi scripts allows for easy set-up of a cluster on a single machine. -See also rabbitmq-server(1) for configuration information. +See also L<rabbitmq-server(1)> for configuration information. =head1 COMMANDS -start_all I<count> - start count nodes with unique names, listening on all IP addresses - and on sequential ports starting from 5672. +=over -status - print the status of all running RabbitMQ nodes +=item start_all I<count> -stop_all - stop all local RabbitMQ nodes +Start count nodes with unique names, listening on all IP addresses and +on sequential ports starting from 5672. -rotate_logs - rotate log files for all local and running RabbitMQ nodes +=item status + +Print the status of all running RabbitMQ nodes. + +=item stop_all + +Stop all local RabbitMQ nodes, + +=item rotate_logs + +Rotate log files for all local and running RabbitMQ nodes. + +=back =head1 EXAMPLES @@ -40,7 +48,7 @@ Start 3 local RabbitMQ nodes with unique, sequential port numbers: =head1 SEE ALSO -rabbitmq.conf(5), rabbitmq-server(1), rabbitmqctl(1) +L<rabbitmq.conf(5)>, L<rabbitmq-server(1)>, L<rabbitmqctl(1)> =head1 AUTHOR @@ -48,4 +56,4 @@ The RabbitMQ Team <info@rabbitmq.com> =head1 REFERENCES -RabbitMQ Web Site: http://www.rabbitmq.com +RabbitMQ Web Site: L<http://www.rabbitmq.com> diff --git a/docs/rabbitmq-server.1.pod b/docs/rabbitmq-server.1.pod index 99a7ceccf3..d74ab8d94f 100644 --- a/docs/rabbitmq-server.1.pod +++ b/docs/rabbitmq-server.1.pod @@ -16,43 +16,57 @@ Running rabbitmq-server in the foreground displays a banner message, and reports on progress in the startup sequence, concluding with the message "broker running", indicating that the RabbitMQ broker has been started successfully. To shut down the server, just terminate the -process or use rabbitmqctl(1). +process or use L<rabbitmqctl(1)>. =head1 ENVIRONMENT -B<RABBITMQ_MNESIA_BASE> - Defaults to /var/lib/rabbitmq/mnesia. Set this to the directory - where Mnesia database files should be placed. +=over -B<RABBITMQ_LOG_BASE> - Defaults to /var/log/rabbitmq. Log files generated by the server - will be placed in this directory. +=item B<RABBITMQ_MNESIA_BASE> -B<RABBITMQ_NODENAME> - Defaults to rabbit. This can be useful if you want to run more - than one node per machine - B<RABBITMQ_NODENAME> should be unique - per erlang-node-and-machine combination. See clustering on a - single machine guide at - http://www.rabbitmq.com/clustering.html#single-machine for - details. +Defaults to F</var/lib/rabbitmq/mnesia>. Set this to the directory where +Mnesia database files should be placed. -B<RABBITMQ_NODE_IP_ADDRESS> - Defaults to 0.0.0.0. This can be changed if you only want to bind - to one network interface. +=item B<RABBITMQ_LOG_BASE> -B<RABBITMQ_NODE_PORT> - Defaults to 5672. +Defaults to F</var/log/rabbitmq>. Log files generated by the server will +be placed in this directory. -B<RABBITMQ_CLUSTER_CONFIG_FILE> - Defaults to /etc/rabbitmq/rabbitmq_cluster.config. If this file is - present it is used by the server to auto-configure a RabbitMQ - cluster. - See the clustering guide at http://www.rabbitmq.com/clustering.html - for details. +=item B<RABBITMQ_NODENAME> + +Defaults to rabbit. This can be useful if you want to run more than +one node per machine - B<RABBITMQ_NODENAME> should be unique per +erlang-node-and-machine combination. See clustering on a single +machine guide at +L<http://www.rabbitmq.com/clustering.html#single-machine> for details. + +=item B<RABBITMQ_NODE_IP_ADDRESS> + +Defaults to 0.0.0.0. This can be changed if you only want to bind to +one network interface. + +=item B<RABBITMQ_NODE_PORT> + +Defaults to 5672. + +=item B<RABBITMQ_CLUSTER_CONFIG_FILE> + +Defaults to F</etc/rabbitmq/rabbitmq_cluster.config>. If this file is +present it is used by the server to auto-configure a RabbitMQ cluster. +See the clustering guide at L<http://www.rabbitmq.com/clustering.html> +for details. + +=back =head1 OPTIONS -B<-detached> start the server process in the background +=over + +=item B<-detached> + +start the server process in the background + +=back =head1 EXAMPLES @@ -62,7 +76,7 @@ Run RabbitMQ AMQP server in the background: =head1 SEE ALSO -rabbitmq.conf(5), rabbitmq-multi(1), rabbitmqctl(1) +L<rabbitmq.conf(5)>, L<rabbitmq-multi(1)>, L<rabbitmqctl(1)> =head1 AUTHOR @@ -70,4 +84,5 @@ The RabbitMQ Team <info@rabbitmq.com> =head1 REFERENCES -RabbitMQ Web Site: http://www.rabbitmq.com +RabbitMQ Web Site: L<http://www.rabbitmq.com> + diff --git a/docs/rabbitmq.conf.5.pod b/docs/rabbitmq.conf.5.pod index 9b2536c383..a7bf4c0942 100644 --- a/docs/rabbitmq.conf.5.pod +++ b/docs/rabbitmq.conf.5.pod @@ -1,10 +1,11 @@ =head1 NAME -/etc/rabbitmq/rabbitmq.conf - default settings for RabbitMQ AMQP server +F</etc/rabbitmq/rabbitmq.conf> - default settings for RabbitMQ AMQP +server =head1 DESCRIPTION -/etc/rabbitmq/rabbitmq.conf contains variable settings that override the +F</etc/rabbitmq/rabbitmq.conf> contains variable settings that override the defaults built in to the RabbitMQ startup scripts. The file is interpreted by the system shell, and so should consist of @@ -13,27 +14,35 @@ syntax is permitted (since the file is sourced using the shell "." operator), including line comments starting with "#". In order of preference, the startup scripts get their values from the -environment, from /etc/rabbitmq/rabbitmq.conf and finally from the -built-in default values. For example, for the B<RABBITMQ_NODENAME> setting, +environment, from F</etc/rabbitmq/rabbitmq.conf> and finally from the +built-in default values. For example, for the B<RABBITMQ_NODENAME> +setting, -B<RABBITMQ_NODENAME> - from the environment is checked first. If it is absent or equal to - the empty string, then +=over -B<NODENAME> - from /etc/rabbitmq/rabbitmq.conf is checked. If it is also absent - or set equal to the empty string then the default value from - the startup script is used. +=item B<RABBITMQ_NODENAME> + +from the environment is checked first. If it is absent or equal to the +empty string, then + +=item B<NODENAME> + +from L</etc/rabbitmq/rabbitmq.conf> is checked. If it is also absent +or set equal to the empty string then the default value from the +startup script is used. The variable names in /etc/rabbitmq/rabbitmq.conf are always equal to the environment variable names, with the B<RABBITMQ_> prefix removed: B<RABBITMQ_NODE_PORT> from the environment becomes B<NODE_PORT> in the -/etc/rabbitmq/rabbitmq.conf file, etc. +F</etc/rabbitmq/rabbitmq.conf> file, etc. + +=back =head1 EXAMPLES -The following is an example of a complete /etc/rabbitmq/rabbitmq.conf file -that overrides the default Erlang node name from "rabbit" to "hare": +The following is an example of a complete +F</etc/rabbitmq/rabbitmq.conf> file that overrides the default Erlang +node name from "rabbit" to "hare": # I am a complete /etc/rabbitmq/rabbitmq.conf file. # Comment lines start with a hash character. @@ -42,7 +51,7 @@ that overrides the default Erlang node name from "rabbit" to "hare": =head1 SEE ALSO -rabbitmq-server(1), rabbitmq-multi(1), rabbitmqctl(1) +L<rabbitmq-server(1)>, L<rabbitmq-multi(1)>, L<rabbitmqctl(1)> =head1 AUTHOR @@ -57,4 +66,4 @@ info@rabbitmq.com. =head1 REFERENCES -RabbitMQ Web Site: http://www.rabbitmq.com +RabbitMQ Web Site: L<http://www.rabbitmq.com> diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index 421568960e..c43ed2ea25 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -18,269 +18,388 @@ It performs all actions by connecting to one of the broker's nodes. =head1 OPTIONS -B<-n> I<node> - default node is C<rabbit@server>, where server is the local host. - On a host named C<server.example.com>, the node name of the - RabbitMQ Erlang node will usually be rabbit@server (unless - RABBITMQ_NODENAME has been set to some non-default value at broker - startup time). The output of hostname -s is usually the correct - suffix to use after the "@" sign. See rabbitmq-server(1) for - details of configuring the RabbitMQ broker. - -B<-q> - quiet output mode is selected with the B<-q> flag. Informational - messages are suppressed when quiet mode is in effect. +=over + +=item B<-n> I<node> + +Default node is C<rabbit@server>, where server is the local host. On +a host named C<server.example.com>, the node name of the RabbitMQ +Erlang node will usually be rabbit@server (unless RABBITMQ_NODENAME +has been set to some non-default value at broker startup time). The +output of hostname -s is usually the correct suffix to use after the +"@" sign. See rabbitmq-server(1) for details of configuring the +RabbitMQ broker. + +=item B<-q> + +Quiet output mode is selected with the B<-q> flag. Informational +messages are suppressed when quiet mode is in effect. + +=back =head1 COMMANDS =head2 APPLICATION AND CLUSTER MANAGEMENT -stop - stop the Erlang node on which RabbitMQ broker is running. - -stop_app - stop the RabbitMQ application, leaving the Erlang node running. - This command is typically run prior to performing other management - actions that require the RabbitMQ application to be stopped, - e.g. I<reset>. - -start_app - start the RabbitMQ application. - This command is typically run prior to performing other management - actions that require the RabbitMQ application to be stopped, - e.g. I<reset>. - -status - display various information about the RabbitMQ broker, such as - whether the RabbitMQ application on the current node, its version - number, what nodes are part of the broker, which of these are - running. - -force - return a RabbitMQ node to its virgin state. - Removes the node from any cluster it belongs to, removes all data - from the management database, such as configured users, vhosts and - deletes all persistent messages. - -force_reset - the same as I<force> command, but resets the node unconditionally, - regardless of the current management database state and cluster - configuration. - It should only be used as a last resort if the database or cluster - configuration has been corrupted. - -rotate_logs [suffix] - instruct the RabbitMQ node to rotate the log files. The RabbitMQ - broker will attempt to append the current contents of the log file - to the file with the name composed of the original name and the - suffix. It will create a new file if such a file does not already - exist. When no I<suffix> is specified, the empty log file is - simply created at the original location; no rotation takes place. - When an error occurs while appending the contents of the old log - file, the operation behaves in the same way as if no I<suffix> was - specified. - This command might be helpful when you are e.g. writing your own - logrotate script and you do not want to restart the RabbitMQ node. - -cluster I<clusternode> ... - instruct the node to become member of a cluster with the specified - nodes determined by I<clusternode> option(s). - See http://www.rabbitmq.com/clustering.html for more information - about clustering. +=over + +=item stop + +Stop the Erlang node on which RabbitMQ broker is running. + +=item stop_app + +Stop the RabbitMQ application, leaving the Erlang node running. This +command is typically run prior to performing other management actions +that require the RabbitMQ application to be stopped, e.g. I<reset>. + +=item start_app + +Start the RabbitMQ application. This command is typically run prior +to performing other management actions that require the RabbitMQ +application to be stopped, e.g. I<reset>. + +=item status + +Display various information about the RabbitMQ broker, such as whether +the RabbitMQ application on the current node, its version number, what +nodes are part of the broker, which of these are running. + +=item reset + +Return a RabbitMQ node to its virgin state. Removes the node from any +cluster it belongs to, removes all data from the management database, +such as configured users, vhosts and deletes all persistent messages. + +=item force_reset + +The same as I<reset> command, but resets the node unconditionally, +regardless of the current management database state and cluster +configuration. It should only be used as a last resort if the +database or cluster configuration has been corrupted. + +=item rotate_logs [suffix] + +Instruct the RabbitMQ node to rotate the log files. The RabbitMQ +broker will attempt to append the current contents of the log file to +the file with the name composed of the original name and the +suffix. It will create a new file if such a file does not already +exist. When no I<suffix> is specified, the empty log file is simply +created at the original location; no rotation takes place. When an +error occurs while appending the contents of the old log file, the +operation behaves in the same way as if no I<suffix> was specified. +This command might be helpful when you are e.g. writing your own +logrotate script and you do not want to restart the RabbitMQ node. + +=item cluster I<clusternode> ... + +Instruct the node to become member of a cluster with the specified +nodes determined by I<clusternode> option(s). See +L<http://www.rabbitmq.com/clustering.html> for more information about +clustering. + +=back =head2 USER MANAGEMENT -add_user I<username> I<password> - create a user named I<username> with (initial) password I<password>. +=over + +=item add_user I<username> I<password> -delete_user I<username> - delete the user named I<username>. +Create a user named I<username> with (initial) password I<password>. -change_password I<username> I<newpassword> - change the password for the user named I<username> to I<newpassword>. +=item delete_user I<username> -list_users - list all users. +Delete the user named I<username>. + +=item change_password I<username> I<newpassword> + +Change the password for the user named I<username> to I<newpassword>. + +=item list_users + +List all users, one per line. + +=back =head2 ACCESS CONTROL -add_vhost I<vhostpath> - create a new virtual host called I<vhostpath>. +=over + +=item add_vhost I<vhostpath> + +Create a new virtual host called I<vhostpath>. + +=item delete_vhost I<vhostpath> + +Delete a virtual host I<vhostpath>. This command deletes also all its +exchanges, queues and user mappings. + +=item list_vhosts + +List all virtual hosts, one per line. + +=item set_permissions [-p I<vhostpath>] I<username> I<regexp> I<regexp> I<regexp> -delete_vhost I<vhostpath> - delete a virtual host I<vhostpath>. - That command deletes also all its exchanges, queues and user - mappings. - -list_vhosts - list all virtual hosts. +Set the permissions for the user named I<username> in the virtual host +I<vhostpath>, granting I<configure>, I<write> and I<read> access to +resources with names matching the first, second and third I<regexp>, +respectively. -set_permissions [-p I<vhostpath>] I<username> I<regexp> I<regexp> I<regexp> - set the permissions for the user named I<username> in the virtual - host I<vhostpath>, granting 'configure', 'write' and 'read' access - to resources with names matching the first, second and third - I<regexp>, respectively. +=item clear_permissions [-p I<vhostpath>] I<username> -clear_permissions [-p I<vhostpath>] I<username> - remove the permissions for the user named I<username> in the - virtual host I<vhostpath>. +Remove the permissions for the user named I<username> in the virtual +host I<vhostpath>. -list_permissions [-p I<vhostpath>] - list all the users and their permissions in the virtual host - I<vhostpath>. +=item list_permissions [-p I<vhostpath>] -list_user_permissions I<username> - list the permissions of the user named I<username> across all - virtual hosts. +List all the users and their permissions in the virtual host +I<vhostpath>. Each output line contains the username and their +I<configure>, I<write> and I<read> access regexps, separated by tab +characters. + +=item list_user_permissions I<username> + +List the permissions of the user named I<username> across all virtual +hosts. + +=back =head2 SERVER STATUS -list_queues [-p I<vhostpath>] [I<queueinfoitem> ...] - list queue information by virtual host. If no I<queueinfoitem>s - are specified then then name and number of messages is displayed - for each queue. +=over + +=item list_queues [-p I<vhostpath>] [I<queueinfoitem> ...] + +List queue information by virtual host. Each line printed +describes a queue, with the requested I<queueinfoitem> values +separated by tab characters. If no I<queueinfoitem>s are +specified then I<name> and I<messages> are assumed. + +=back =head3 Queue information items -=over 4 +=over + +=item name + +name of the queue + +=item durable + +whether the queue survives server restarts -name - URL-encoded name of the queue +=item auto_delete -durable - whether the queue survives server restarts +whether the queue will be deleted when no longer used -auto_delete - whether the queue will be deleted when no longer used +=item arguments -arguments - queue arguments +queue arguments -node - node on which the process associated with the queue resides +=item node -messages_ready - number of messages ready to be delivered to clients +node on which the process associated with the queue resides -messages_unacknowledged - number of messages delivered to clients but not yet - acknowledged +=item messages_ready -messages_uncommitted - number of messages published in as yet uncommitted transactions +number of messages ready to be delivered to clients -messages - sum of ready, unacknowledged and uncommitted messages +=item messages_unacknowledged -acks_uncommitted - number of acknowledgements received in as yet uncommitted - transactions +number of messages delivered to clients but not yet acknowledged -consumers - number of consumers +=item messages_uncommitted -transactions - number of transactions +number of messages published in as yet uncommitted transactions -memory - bytes of memory consumed by the Erlang process for the queue, - including stack, heap and internal structures +=item messages + +sum of ready, unacknowledged and uncommitted messages + +=item acks_uncommitted + +number of acknowledgements received in as yet uncommitted transactions + +=item consumers + +number of consumers + +=item transactions + +number of transactions + +=item memory + +bytes of memory consumed by the Erlang process for the queue, +including stack, heap and internal structures =back -list_exchanges [-p I<vhostpath>] [I<exchangeinfoitem> ...] - list exchange information by virtual host. If no - I<exchangeinfoitem>s are specified then name and type is displayed - for each exchange. +=over + +=item list_exchanges [-p I<vhostpath>] [I<exchangeinfoitem> ...] + +List queue information by virtual host. Each line printed describes an +exchange, with the requested I<exchangeinfoitem> values separated by +tab characters. If no I<exchangeinfoitem>s are specified then I<name> +and I<type> are assumed. + +=back =head3 Exchange information items -=over 4 +=over -name - URL-encoded name of the exchange +=item name -type - exchange type (B<direct>, B<topic>, B<fanout>, or B<headers>) +name of the exchange -durable - whether the exchange survives server restarts +=item type -auto_delete - whether the exchange is deleted when no longer used +exchange type (B<direct>, B<topic>, B<fanout>, or B<headers>) -arguments - exchange arguments +=item durable + +whether the exchange survives server restarts + +=item auto_delete + +whether the exchange is deleted when no longer used + +=item arguments + +exchange arguments =back -list_bindings [-p I<vhostpath>] - list bindings by virtual host. Each line contains exchange name, - routing key and queue name (all URL encoded) and arguments. +=over + +=item list_bindings [-p I<vhostpath>] -list_connections [I<connectioninfoitem> ...] - list connection information. If no I<connectioninfoitem>s are - specified then the user, peer address and peer port are displayed. +List bindings by virtual host. Each line printed describes a binding, +with the exchange name, routing key, queue name and arguments, +separated by tab characters. + +=item list_connections [I<connectioninfoitem> ...] + +List queue information by virtual host. Each line printed describes an +connection, with the requested I<connectioninfoitem> values separated +by tab characters. If no I<connectioninfoitem>s are specified then +I<user>, I<peer_address>, I<peer_port> and I<state> are assumed. + +=back =head3 Connection information items -=over 4 +=over + +=item node + +node on which the process associated with the connection resides -node - node on which the process associated with the connection resides +=item address -address - server IP number +server IP number -port - server port +=item port -peer_address - peer address +server port -peer_port - peer port +=item peer_address -state - connection state (B<pre-init>, B<starting>, B<tuning>, B<opening>, - B<running>, B<closing>, B<closed>) +peer address -channels - number of channels using the connection +=item peer_port -user - username associated with the connection +peer port -vhost - URL-encoded virtual host +=item state -timeout - connection timeout +connection state (B<pre-init>, B<starting>, B<tuning>, B<opening>, +B<running>, B<closing>, B<closed>) -frame_max - maximum frame size (bytes) +=item channels -recv_oct - octets received +number of channels using the connection -recv_cnt - packets received +=item user -send_oct - octets sent +username associated with the connection -send_cnt - packets sent +=item vhost -send_pend - send queue size +virtual host + +=item timeout + +connection timeout + +=item frame_max + +maximum frame size (bytes) + +=item recv_oct + +octets received + +=item recv_cnt + +packets received + +=item send_oct + +octets sent + +=item send_cnt + +packets sent + +=item send_pend + +send queue size =back The list_queues, list_exchanges and list_bindings commands accept an -optional virtual host parameter for which to display results, defaulting -to I<"/">. The default can be overridden with the B<-p> flag. Result -columns for these commands and list_connections are tab-separated. +optional virtual host parameter for which to display results, +defaulting to I<"/">. The default can be overridden with the B<-p> +flag. + +=head1 OUTPUT ESCAPING + +Various items that may appear in the output of rabbitmqctl can contain +arbitrary octets. If a octet corresponds to a non-printing ASCII +character (values 0 to 31, and 127), it will be escaped in the output, +using a sequence consisting of a backslash character followed by three +octal digits giving the octet's value (i.e., as used in string +literals in the C programming language). An octet corresponding to +the backslash character (i.e. with value 92) will be escaped using a +sequence of two backslash characters. Octets with a value of 128 or +above are not escaped, in order to preserve strings encoded with +UTF-8. + +The items to which this escaping scheme applies are: + +=over + +=item * +Usernames + +=item * +Virtual host names + +=item * +Queue names + +=item * +Exchange names + +=item * +Regular expressions used for access control + +=back =head1 EXAMPLES @@ -309,4 +428,4 @@ The RabbitMQ Team <info@rabbitmq.com> =head1 REFERENCES -RabbitMQ Web Site: http://www.rabbitmq.com +RabbitMQ Web Site: L<http://www.rabbitmq.com> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 0057ea0478..dd907d1a02 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -15,7 +15,8 @@ %% actually want to start it {mod, {rabbit, []}}, {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, - {extra_startup_steps, []}, + {ssl_listeners, []}, + {ssl_options, []}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, {default_vhost, <<"/">>}, diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 61cedc440a..c94965f9ee 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -64,8 +64,11 @@ -record(basic_message, {exchange_name, routing_key, content, persistent_key}). +-record(ssl_socket, {tcp, ssl}). -record(delivery, {mandatory, immediate, txn, sender, message}). +-record(amqp_error, {name, explanation, method = none}). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -74,7 +77,8 @@ -type(maybe(T) :: T | 'none'). -type(erlang_node() :: atom()). --type(socket() :: port()). +-type(ssl_socket() :: #ssl_socket{}). +-type(socket() :: port() | ssl_socket()). -type(thunk(T) :: fun(() -> T)). -type(info_key() :: atom()). -type(info() :: {info_key(), any()}). @@ -99,14 +103,14 @@ read :: regexp()}). -type(amqqueue() :: #amqqueue{name :: queue_name(), - durable :: bool(), - auto_delete :: bool(), + durable :: boolean(), + auto_delete :: boolean(), arguments :: amqp_table(), pid :: maybe(pid())}). -type(exchange() :: #exchange{name :: exchange_name(), type :: exchange_type(), - durable :: bool(), + durable :: boolean(), arguments :: amqp_table()}). -type(binding() :: #binding{exchange_name :: exchange_name(), @@ -136,14 +140,14 @@ persistent_key :: maybe(pkey())}). -type(message() :: basic_message()). -type(delivery() :: - #delivery{mandatory :: bool(), - immediate :: bool(), + #delivery{mandatory :: boolean(), + immediate :: boolean(), txn :: maybe(txn()), sender :: pid(), message :: message()}). %% this really should be an abstract type -type(msg_id() :: non_neg_integer()). --type(msg() :: {queue_name(), pid(), msg_id(), bool(), message()}). +-type(msg() :: {queue_name(), pid(), msg_id(), boolean(), message()}). -type(listener() :: #listener{node :: erlang_node(), protocol :: atom(), @@ -151,7 +155,10 @@ port :: non_neg_integer()}). -type(not_found() :: {'error', 'not_found'}). -type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). - +-type(amqp_error() :: + #amqp_error{name :: atom(), + explanation :: string(), + method :: atom()}). -endif. %%---------------------------------------------------------------------------- diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl index f45fa6caf4..a78c230167 100644 --- a/include/rabbit_framing_spec.hrl +++ b/include/rabbit_framing_spec.hrl @@ -50,8 +50,6 @@ %% TODO: make this more precise -type(amqp_method_name() :: atom()). -type(channel_number() :: non_neg_integer()). -%% TODO: make this more precise --type(amqp_error() :: {bool(), non_neg_integer(), binary()}). -type(resource_name() :: binary()). -type(routing_key() :: binary()). -type(username() :: binary()). diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index c74d453361..fa2844fddf 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -1,7 +1,8 @@ -VERSION=0.0.0 -SOURCE_TARBALL_DIR=../../../dist +TARBALL_DIR=../../../dist +TARBALL=$(notdir $(wildcard $(TARBALL_DIR)/rabbitmq-server-[0-9.]*.tar.gz)) COMMON_DIR=../../common -TARBALL=$(SOURCE_TARBALL_DIR)/rabbitmq-server-$(VERSION).tar.gz +VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g') + TOP_DIR=$(shell pwd) #Under debian we do not want to check build dependencies, since that #only checks build-dependencies using rpms, not debs @@ -23,13 +24,16 @@ rpms: clean server prepare: mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp - cp $(TOP_DIR)/$(TARBALL) SOURCES + cp $(TARBALL_DIR)/$(TARBALL) SOURCES cp rabbitmq-server.spec SPECS sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|' \ SPECS/rabbitmq-server.spec - cp init.d SOURCES/rabbitmq-server.init cp ${COMMON_DIR}/* SOURCES/ + sed -i \ + -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/sysconfig/rabbitmq|' \ + -e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \ + SOURCES/rabbitmq-server.init cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate server: prepare diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index eb953b81fa..30cfb99fe2 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -9,6 +9,7 @@ Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{v Source1: rabbitmq-server.init Source2: rabbitmq-script-wrapper Source3: rabbitmq-server.logrotate +Source4: rabbitmq-asroot-script-wrapper URL: http://www.rabbitmq.com/ BuildRequires: erlang, python-simplejson Requires: erlang, logrotate @@ -22,9 +23,10 @@ RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and scalable implementation of an AMQP broker. -%define _rabbit_erllibdir %{_libdir}/erlang/lib/rabbitmq_server-%{version} +%define _rabbit_erllibdir %{_libdir}/rabbitmq/lib/rabbitmq_server-%{version} %define _rabbit_libdir %{_libdir}/rabbitmq %define _rabbit_wrapper %{_builddir}/`basename %{S:2}` +%define _rabbit_asroot_wrapper %{_builddir}/`basename %{S:4}` %define _maindir %{buildroot}%{_rabbit_erllibdir} @@ -34,6 +36,8 @@ scalable implementation of an AMQP broker. %build cp %{S:2} %{_rabbit_wrapper} sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_wrapper} +cp %{S:4} %{_rabbit_asroot_wrapper} +sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_asroot_wrapper} make %{?_smp_mflags} %install @@ -51,6 +55,8 @@ install -p -D -m 0755 %{S:1} %{buildroot}%{_initrddir}/rabbitmq-server install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmqctl install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi +install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-activate-plugins +install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-deactivate-plugins install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server diff --git a/packaging/common/rabbitmq-asroot-script-wrapper b/packaging/common/rabbitmq-asroot-script-wrapper new file mode 100644 index 0000000000..9ef59ad76b --- /dev/null +++ b/packaging/common/rabbitmq-asroot-script-wrapper @@ -0,0 +1,53 @@ +#!/bin/bash +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License at +## http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +## License for the specific language governing rights and limitations +## under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developers of the Original Code are LShift Ltd, +## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +## Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Ltd. Portions created by Cohesive Financial Technologies LLC are +## Copyright (C) 2007-2009 Cohesive Financial Technologies +## LLC. Portions created by Rabbit Technologies Ltd are Copyright +## (C) 2007-2009 Rabbit Technologies Ltd. +## +## All Rights Reserved. +## +## Contributor(s): ______________________________________. +## + +# Escape spaces and quotes, because shell is revolting. +for arg in "$@" ; do + # Escape quotes in parameters, so that they're passed through cleanly. + arg=$(sed -e 's/"/\\"/g' <<-END + $arg + END + ) + CMDLINE="${CMDLINE} \"${arg}\"" +done + +cd /var/lib/rabbitmq + +SCRIPT=`basename $0` + +if [ `id -u` = 0 ] ; then + /usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE} +else + echo -e "\nOnly root should run ${SCRIPT}\n" + exit 1 +fi + diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper index 296a77d19c..0c4bd0a8d7 100644 --- a/packaging/common/rabbitmq-script-wrapper +++ b/packaging/common/rabbitmq-script-wrapper @@ -1,8 +1,39 @@ -#!/bin/bash +#!/bin/sh +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License at +## http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +## License for the specific language governing rights and limitations +## under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developers of the Original Code are LShift Ltd, +## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +## Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Ltd. Portions created by Cohesive Financial Technologies LLC are +## Copyright (C) 2007-2009 Cohesive Financial Technologies +## LLC. Portions created by Rabbit Technologies Ltd are Copyright +## (C) 2007-2009 Rabbit Technologies Ltd. +## +## All Rights Reserved. +## +## Contributor(s): ______________________________________. +## + # Escape spaces and quotes, because shell is revolting. for arg in "$@" ; do # Escape quotes in parameters, so that they're passed through cleanly. - arg=$(sed -e 's/"/\\"/' <<-END + arg=$(sed -e 's/"/\\"/g' <<-END $arg END ) diff --git a/packaging/RPMS/Fedora/init.d b/packaging/common/rabbitmq-server.init index 77a6a89af1..dc30597569 100644 --- a/packaging/RPMS/Fedora/init.d +++ b/packaging/common/rabbitmq-server.init @@ -8,10 +8,10 @@ ### BEGIN INIT INFO # Provides: rabbitmq-server -# Default-Start: -# Default-Stop: # Required-Start: $remote_fs $network # Required-Stop: $remote_fs $network +# Default-Start: +# Default-Stop: # Description: RabbitMQ broker # Short-Description: Enable AMQP service provided by RabbitMQ broker ### END INIT INFO @@ -23,14 +23,16 @@ DESC=rabbitmq-server USER=rabbitmq NODE_COUNT=1 ROTATE_SUFFIX= +INIT_LOG_DIR=/var/log/rabbitmq -LOCK_FILE=/var/lock/subsys/$NAME +DEFAULTS_FILE= # This is filled in when building packages +LOCK_FILE= # This is filled in when building packages test -x $DAEMON || exit 0 # Include rabbitmq defaults if available -if [ -f /etc/sysconfig/rabbitmq ] ; then - . /etc/sysconfig/rabbitmq +if [ -f "$DEFAULTS_FILE" ] ; then + . $DEFAULTS_FILE fi RETVAL=0 @@ -38,21 +40,22 @@ set -e start_rabbitmq () { set +e - $DAEMON start_all ${NODE_COUNT} > /var/log/rabbitmq/startup_log 2> /var/log/rabbitmq/startup_err + $DAEMON start_all ${NODE_COUNT} > ${INIT_LOG_DIR}/startup_log 2> ${INIT_LOG_DIR}/startup_err case "$?" in 0) - echo SUCCESS && touch $LOCK_FILE + echo SUCCESS + [ -n "$LOCK_FILE" ] && touch $LOCK_FILE RETVAL=0 ;; 1) - echo TIMEOUT - check /var/log/rabbitmq/startup_\{log,err\} + echo TIMEOUT - check ${INIT_LOG_DIR}/startup_\{log,err\} RETVAL=1 ;; *) - echo FAILED - check /var/log/rabbitmq/startup_log, _err + echo FAILED - check ${INIT_LOG_DIR}/startup_log, _err RETVAL=1 ;; - esac + esac set -e } @@ -60,12 +63,14 @@ stop_rabbitmq () { set +e status_rabbitmq quiet if [ $RETVAL = 0 ] ; then - $DAEMON stop_all > /var/log/rabbitmq/shutdown_log 2> /var/log/rabbitmq/shutdown_err + $DAEMON stop_all > ${INIT_LOG_DIR}/shutdown_log 2> ${INIT_LOG_DIR}/shutdown_err RETVAL=$? - if [ $RETVAL != 0 ] ; then - echo FAILED - check /var/log/rabbitmq/shutdown_log, _err + if [ $RETVAL = 0 ] ; then + # Try to stop epmd if run by the rabbitmq user + pkill -u rabbitmq epmd || : + [ -n "$LOCK_FILE" ] && rm -rf $LOCK_FILE else - rm -rf $LOCK_FILE + echo FAILED - check ${INIT_LOG_DIR}/shutdown_log, _err fi else echo No nodes running @@ -119,19 +124,14 @@ case "$1" in echo -n "Rotating log files for $DESC: " rotate_logs_rabbitmq ;; - force-reload|reload|restart) - echo -n "Restarting $DESC: " - restart_rabbitmq - echo "$NAME." - ;; - condrestart|try-restart) + force-reload|reload|restart|condrestart|try-restart) echo -n "Restarting $DESC: " restart_rabbitmq echo "$NAME." ;; *) echo "Usage: $0 {start|stop|status|rotate-logs|restart|condrestart|try-restart|reload|force-reload}" >&2 - RETVAL=2 + RETVAL=1 ;; esac diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index 67fabae0aa..dafaf9cef4 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -1,8 +1,9 @@ TARBALL_DIR=../../../dist -TARBALL=$(shell (cd $(TARBALL_DIR); echo rabbitmq-server-[0-9]*.tar.gz)) +TARBALL=$(notdir $(wildcard $(TARBALL_DIR)/rabbitmq-server-[0-9.]*.tar.gz)) COMMON_DIR=../../common -DEBIAN_ORIG_TARBALL=$(shell echo $(TARBALL) | sed -e 's:\(.*\)-\(.*\)\(\.tar\.gz\):\1_\2\.orig\3:g') VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g') + +DEBIAN_ORIG_TARBALL=$(shell echo $(TARBALL) | sed -e 's:\(.*\)-\(.*\)\(\.tar\.gz\):\1_\2\.orig\3:g') UNPACKED_DIR=rabbitmq-server-$(VERSION) PACKAGENAME=rabbitmq-server SIGNING_KEY_ID=056E8E56 @@ -21,6 +22,10 @@ package: clean tar -zxvf $(DEBIAN_ORIG_TARBALL) cp -r debian $(UNPACKED_DIR) cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/ + sed -i \ + -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/default/rabbitmq|' \ + -e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \ + $(UNPACKED_DIR)/debian/rabbitmq-server.init chmod a+x $(UNPACKED_DIR)/debian/rules UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR) cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING) diff --git a/packaging/debs/Debian/debian/init.d b/packaging/debs/Debian/debian/init.d deleted file mode 100644 index a35a60ec68..0000000000 --- a/packaging/debs/Debian/debian/init.d +++ /dev/null @@ -1,122 +0,0 @@ -#!/bin/sh -### BEGIN INIT INFO -# Provides: rabbitmq -# Required-Start: $remote_fs $network -# Required-Stop: $remote_fs $network -# Default-Start: 2 3 4 5 -# Default-Stop: 0 1 6 -# Description: RabbitMQ broker -# Short-Description: Enable AMQP service provided by RabbitMQ broker -### END INIT INFO - -PATH=/sbin:/usr/sbin:/bin:/usr/bin -DAEMON=/usr/sbin/rabbitmq-multi -NAME=rabbitmq-server -DESC=rabbitmq-server -USER=rabbitmq -NODE_COUNT=1 -ROTATE_SUFFIX= - -test -x $DAEMON || exit 0 - -# Include rabbitmq defaults if available -if [ -f /etc/default/rabbitmq ] ; then - . /etc/default/rabbitmq -fi - -RETVAL=0 -set -e - -start_rabbitmq () { - set +e - $DAEMON start_all ${NODE_COUNT} > /var/log/rabbitmq/startup_log 2> /var/log/rabbitmq/startup_err - case "$?" in - 0) - echo SUCCESS - RETVAL=0 - ;; - 1) - echo TIMEOUT - check /var/log/rabbitmq/startup_\{log,err\} - RETVAL=1 - ;; - *) - echo FAILED - check /var/log/rabbitmq/startup_log, _err - RETVAL=1 - ;; - esac - set -e -} - -stop_rabbitmq () { - set +e - status_rabbitmq quiet - if [ $RETVAL = 0 ] ; then - $DAEMON stop_all > /var/log/rabbitmq/shutdown_log 2> /var/log/rabbitmq/shutdown_err - RETVAL=$? - if [ $RETVAL != 0 ] ; then - echo FAILED - check /var/log/rabbitmq/shutdown_log, _err - fi - else - echo No nodes running - RETVAL=0 - fi - set -e -} - -status_rabbitmq() { - set +e - if [ "$1" != "quiet" ] ; then - $DAEMON status 2>&1 - else - $DAEMON status > /dev/null 2>&1 - fi - if [ $? != 0 ] ; then - RETVAL=1 - fi - set -e -} - -rotate_logs_rabbitmq() { - set +e - $DAEMON rotate_logs ${ROTATE_SUFFIX} - if [ $? != 0 ] ; then - RETVAL=1 - fi - set -e -} - -restart_rabbitmq() { - stop_rabbitmq - start_rabbitmq -} - -case "$1" in - start) - echo -n "Starting $DESC: " - start_rabbitmq - echo "$NAME." - ;; - stop) - echo -n "Stopping $DESC: " - stop_rabbitmq - echo "$NAME." - ;; - status) - status_rabbitmq - ;; - rotate-logs) - echo -n "Rotating log files for $DESC: " - rotate_logs_rabbitmq - ;; - force-reload|restart) - echo -n "Restarting $DESC: " - restart_rabbitmq - echo "$NAME." - ;; - *) - echo "Usage: $0 {start|stop|status|rotate-logs|restart|force-reload}" >&2 - RETVAL=1 - ;; -esac - -exit $RETVAL diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules index 31904851a9..5e3579557a 100644 --- a/packaging/debs/Debian/debian/rules +++ b/packaging/debs/Debian/debian/rules @@ -3,7 +3,7 @@ include /usr/share/cdbs/1/rules/debhelper.mk include /usr/share/cdbs/1/class/makefile.mk -RABBIT_LIB=$(DEB_DESTDIR)usr/lib/erlang/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)/ +RABBIT_LIB=$(DEB_DESTDIR)usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)/ RABBIT_BIN=$(DEB_DESTDIR)usr/lib/rabbitmq/bin/ DEB_MAKE_INSTALL_TARGET := install TARGET_DIR=$(RABBIT_LIB) SBIN_DIR=$(RABBIT_BIN) MAN_DIR=$(DEB_DESTDIR)usr/share/man/ @@ -17,3 +17,6 @@ install/rabbitmq-server:: for script in rabbitmqctl rabbitmq-server rabbitmq-multi; do \ install -p -D -m 0755 debian/rabbitmq-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \ done + for script in rabbitmq-activate-plugins rabbitmq-deactivate-plugins; do \ + install -p -D -m 0755 debian/rabbitmq-asroot-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \ + done diff --git a/packaging/generic-unix/Makefile b/packaging/generic-unix/Makefile index b398869693..4eade6c744 100644 --- a/packaging/generic-unix/Makefile +++ b/packaging/generic-unix/Makefile @@ -4,10 +4,10 @@ TARGET_DIR=rabbitmq_server-$(VERSION) TARGET_TARBALL=rabbitmq-server-generic-unix-$(VERSION) dist: - make -C ../.. VERSION=$(VERSION) srcdist + $(MAKE) -C ../.. VERSION=$(VERSION) srcdist tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz - make -C $(SOURCE_DIR) \ + $(MAKE) -C $(SOURCE_DIR) \ TARGET_DIR=`pwd`/$(TARGET_DIR) \ SBIN_DIR=`pwd`/$(TARGET_DIR)/sbin \ MAN_DIR=`pwd`/$(TARGET_DIR)/share/man \ diff --git a/packaging/macports/net/rabbitmq-server/Portfile b/packaging/macports/net/rabbitmq-server/Portfile index b8096d206d..6b51fb2f9b 100644 --- a/packaging/macports/net/rabbitmq-server/Portfile +++ b/packaging/macports/net/rabbitmq-server/Portfile @@ -34,6 +34,7 @@ set mnesiadbdir ${prefix}/var/lib/rabbitmq/mnesia set plistloc ${prefix}/etc/LaunchDaemons/org.macports.rabbitmq-server set sbindir ${destroot}${prefix}/lib/rabbitmq/bin set wrappersbin ${destroot}${prefix}/sbin +set realsbin ${destroot}${prefix}/lib/rabbitmq/lib/rabbitmq_server-${version}/sbin use_configure no @@ -42,7 +43,7 @@ use_parallel_build yes build.args PYTHON=${prefix}/bin/python2.5 destroot.destdir \ - TARGET_DIR=${destroot}${prefix}/lib/erlang/lib/rabbitmq_server-${version} \ + TARGET_DIR=${destroot}${prefix}/lib/rabbitmq/lib/rabbitmq_server-${version} \ SBIN_DIR=${sbindir} \ MAN_DIR=${destroot}${prefix}/share/man @@ -61,36 +62,40 @@ post-destroot { xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir} reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \ - ${sbindir}/rabbitmq-multi \ - ${sbindir}/rabbitmq-server \ - ${sbindir}/rabbitmqctl + ${realsbin}/rabbitmq-env reinplace -E "s:(CLUSTER_CONFIG_FILE)=/:\\1=${prefix}/:" \ - ${sbindir}/rabbitmq-multi \ - ${sbindir}/rabbitmq-server \ - ${sbindir}/rabbitmqctl + ${realsbin}/rabbitmq-multi \ + ${realsbin}/rabbitmq-server \ + ${realsbin}/rabbitmqctl reinplace -E "s:(LOG_BASE)=/:\\1=${prefix}/:" \ - ${sbindir}/rabbitmq-multi \ - ${sbindir}/rabbitmq-server \ - ${sbindir}/rabbitmqctl + ${realsbin}/rabbitmq-multi \ + ${realsbin}/rabbitmq-server \ + ${realsbin}/rabbitmqctl reinplace -E "s:(MNESIA_BASE)=/:\\1=${prefix}/:" \ - ${sbindir}/rabbitmq-multi \ - ${sbindir}/rabbitmq-server \ - ${sbindir}/rabbitmqctl + ${realsbin}/rabbitmq-multi \ + ${realsbin}/rabbitmq-server \ + ${realsbin}/rabbitmqctl reinplace -E "s:(PIDS_FILE)=/:\\1=${prefix}/:" \ - ${sbindir}/rabbitmq-multi \ - ${sbindir}/rabbitmq-server \ - ${sbindir}/rabbitmqctl + ${realsbin}/rabbitmq-multi \ + ${realsbin}/rabbitmq-server \ + ${realsbin}/rabbitmqctl xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \ ${wrappersbin}/rabbitmq-multi + xinstall -m 555 ${filespath}/rabbitmq-asroot-script-wrapper \ + ${wrappersbin}/rabbitmq-activate-plugins reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \ ${wrappersbin}/rabbitmq-multi reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \ ${wrappersbin}/rabbitmq-multi + reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \ + ${wrappersbin}/rabbitmq-activate-plugins + reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \ + ${wrappersbin}/rabbitmq-activate-plugins file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl - + file copy ${wrappersbin}/rabbitmq-activate-plugins ${wrappersbin}/rabbitmq-deactivate-plugins } pre-install { diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper new file mode 100644 index 0000000000..c4488dcbe5 --- /dev/null +++ b/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper @@ -0,0 +1,12 @@ +#!/bin/bash +cd /var/lib/rabbitmq + +SCRIPT=`basename $0` + +if [ `id -u` = 0 ] ; then + /usr/lib/rabbitmq/bin/${SCRIPT} "$@" +else + echo -e "\nOnly root should run ${SCRIPT}\n" + exit 1 +fi + diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile index 59101cb2c6..f17fe77742 100644 --- a/packaging/windows/Makefile +++ b/packaging/windows/Makefile @@ -4,15 +4,17 @@ TARGET_DIR=rabbitmq_server-$(VERSION) TARGET_ZIP=rabbitmq-server-windows-$(VERSION) dist: - make -C ../.. VERSION=$(VERSION) srcdist + $(MAKE) -C ../.. VERSION=$(VERSION) srcdist tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz - make -C $(SOURCE_DIR) + $(MAKE) -C $(SOURCE_DIR) mkdir $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmq-server.bat $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmq-service.bat $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmqctl.bat $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmq-multi.bat $(SOURCE_DIR)/sbin + mv $(SOURCE_DIR)/scripts/rabbitmq-activate-plugins.bat $(SOURCE_DIR)/sbin + mv $(SOURCE_DIR)/scripts/rabbitmq-deactivate-plugins.bat $(SOURCE_DIR)/sbin rm -rf $(SOURCE_DIR)/scripts rm -rf $(SOURCE_DIR)/codegen* $(SOURCE_DIR)/Makefile rm -f $(SOURCE_DIR)/README diff --git a/scripts/activate-plugins b/scripts/rabbitmq-activate-plugins index 52f7ddbe61..5ce64c686c 100755 --- a/scripts/activate-plugins +++ b/scripts/rabbitmq-activate-plugins @@ -30,18 +30,18 @@ ## Contributor(s): ______________________________________. ## -[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf +. `dirname $0`/rabbitmq-env -RABBITMQ_EBIN=`dirname $0`/../ebin -[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="`dirname $0`/../plugins" -[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR="`dirname $0`/../priv/plugins" +RABBITMQ_EBIN=${RABBITMQ_HOME}/ebin +[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins" +[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR="${RABBITMQ_HOME}/priv/plugins" exec erl \ -pa "$RABBITMQ_EBIN" \ -rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \ - -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \ - -rabbit rabbit_ebin "\"$RABBITMQ_EBIN\"" \ - -noinput \ + -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \ + -rabbit rabbit_ebin "\"$RABBITMQ_EBIN\"" \ + -noinput \ -hidden \ -s rabbit_plugin_activator \ -extra "$@" diff --git a/scripts/activate-plugins.bat b/scripts/rabbitmq-activate-plugins.bat index 8bef4ad266..3540bf2d9b 100644 --- a/scripts/activate-plugins.bat +++ b/scripts/rabbitmq-activate-plugins.bat @@ -30,10 +30,6 @@ REM REM Contributor(s): ______________________________________.
REM
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
-)
-
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
diff --git a/scripts/rabbitmq-deactivate-plugins b/scripts/rabbitmq-deactivate-plugins new file mode 100755 index 0000000000..771c473496 --- /dev/null +++ b/scripts/rabbitmq-deactivate-plugins @@ -0,0 +1,37 @@ +#!/bin/sh +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License at +## http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +## License for the specific language governing rights and limitations +## under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developers of the Original Code are LShift Ltd, +## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +## Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Ltd. Portions created by Cohesive Financial Technologies LLC are +## Copyright (C) 2007-2009 Cohesive Financial Technologies +## LLC. Portions created by Rabbit Technologies Ltd are Copyright +## (C) 2007-2009 Rabbit Technologies Ltd. +## +## All Rights Reserved. +## +## Contributor(s): ______________________________________. +## + +. `dirname $0`/rabbitmq-env + +RABBITMQ_EBIN=${RABBITMQ_HOME}/ebin + +rm -f ${RABBITMQ_EBIN}/rabbit.rel ${RABBITMQ_EBIN}/rabbit.script ${RABBITMQ_EBIN}/rabbit.boot diff --git a/scripts/rabbitmq-deactivate-plugins.bat b/scripts/rabbitmq-deactivate-plugins.bat new file mode 100644 index 0000000000..190fdef7b7 --- /dev/null +++ b/scripts/rabbitmq-deactivate-plugins.bat @@ -0,0 +1,35 @@ +@echo off
+REM The contents of this file are subject to the Mozilla Public License
+REM Version 1.1 (the "License"); you may not use this file except in
+REM compliance with the License. You may obtain a copy of the License at
+REM http://www.mozilla.org/MPL/
+REM
+REM Software distributed under the License is distributed on an "AS IS"
+REM basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+REM License for the specific language governing rights and limitations
+REM under the License.
+REM
+REM The Original Code is RabbitMQ.
+REM
+REM The Initial Developers of the Original Code are LShift Ltd,
+REM Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+REM
+REM Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+REM Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+REM are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+REM Technologies LLC, and Rabbit Technologies Ltd.
+REM
+REM Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+REM Ltd. Portions created by Cohesive Financial Technologies LLC are
+REM Copyright (C) 2007-2009 Cohesive Financial Technologies
+REM LLC. Portions created by Rabbit Technologies Ltd are Copyright
+REM (C) 2007-2009 Rabbit Technologies Ltd.
+REM
+REM All Rights Reserved.
+REM
+REM Contributor(s): ______________________________________.
+REM
+
+set RABBITMQ_EBIN_DIR="%~dp0..\ebin"
+
+del /f %RABBITMQ_EBIN_DIR%\rabbit.rel %RABBITMQ_EBIN_DIR%\rabbit.script %RABBITMQ_EBIN_DIR%\rabbit.boot
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env new file mode 100755 index 0000000000..69ddbcfed1 --- /dev/null +++ b/scripts/rabbitmq-env @@ -0,0 +1,53 @@ +#!/bin/sh +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License at +## http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +## License for the specific language governing rights and limitations +## under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developers of the Original Code are LShift Ltd, +## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +## Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Ltd. Portions created by Cohesive Financial Technologies LLC are +## Copyright (C) 2007-2009 Cohesive Financial Technologies +## LLC. Portions created by Rabbit Technologies Ltd are Copyright +## (C) 2007-2009 Rabbit Technologies Ltd. +## +## All Rights Reserved. +## +## Contributor(s): ______________________________________. +## + +# Determine where this script is really located +SCRIPT_PATH="$0" +while [ -h "$SCRIPT_PATH" ] ; do + FULL_PATH=`readlink -f $SCRIPT_PATH 2>/dev/null` + if [ "$?" != "0" ]; then + REL_PATH=`readlink $SCRIPT_PATH` + if expr "$REL_PATH" : '/.*' > /dev/null; then + SCRIPT_PATH="$REL_PATH" + else + SCRIPT_PATH="`dirname "$SCRIPT_PATH"`/$REL_PATH" + fi + else + SCRIPT_PATH=$FULL_PATH + fi +done + +SCRIPT_DIR=`dirname $SCRIPT_PATH` +RABBITMQ_HOME="${SCRIPT_DIR}/.." + +# Load configuration from the rabbitmq.conf file +[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi index 1d0c785f6b..7db4cb70b2 100755 --- a/scripts/rabbitmq-multi +++ b/scripts/rabbitmq-multi @@ -37,7 +37,7 @@ PIDS_FILE=/var/lib/rabbitmq/pids MULTI_ERL_ARGS= MULTI_START_ARGS= -[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf +. `dirname $0`/rabbitmq-env [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} @@ -60,7 +60,7 @@ export \ set -f exec erl \ - -pa "`dirname $0`/../ebin" \ + -pa "${RABBITMQ_HOME}/ebin" \ -noinput \ -hidden \ ${RABBITMQ_MULTI_ERL_ARGS} \ diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat index a30c0889ab..8abf13f192 100755 --- a/scripts/rabbitmq-multi.bat +++ b/scripts/rabbitmq-multi.bat @@ -49,10 +49,6 @@ if "%RABBITMQ_NODE_PORT%"=="" ( set RABBITMQ_PIDS_FILE=%RABBITMQ_BASE%\rabbitmq.pids
set RABBITMQ_SCRIPT_HOME=%~sdp0%
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
-)
-
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 41e84639ba..67768c0e86 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -37,17 +37,19 @@ SERVER_ERL_ARGS="+K true +A30 \ -kernel inet_default_listen_options [{nodelay,true},{sndbuf,16384},{recbuf,4096}] \ -kernel inet_default_connect_options [{nodelay,true}]" CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config +CONFIG_FILE=/etc/rabbitmq/rabbitmq LOG_BASE=/var/log/rabbitmq MNESIA_BASE=/var/lib/rabbitmq/mnesia SERVER_START_ARGS= -[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf +. `dirname $0`/rabbitmq-env [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} [ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS} [ "x" = "x$RABBITMQ_CLUSTER_CONFIG_FILE" ] && RABBITMQ_CLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE} +[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE} [ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE} [ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE} [ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS} @@ -73,16 +75,19 @@ else fi RABBITMQ_START_RABBIT= -[ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT='-noinput -s rabbit' +[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput' -RABBITMQ_EBIN_ROOT="`dirname $0`/../ebin" -if [ -f "${RABBITMQ_EBIN_ROOT}/rabbit.boot" ]; then +RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin" +if [ -f "${RABBITMQ_EBIN_ROOT}/rabbit.boot" ] && [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then RABBITMQ_BOOT_FILE="${RABBITMQ_EBIN_ROOT}/rabbit" RABBITMQ_EBIN_PATH="" else RABBITMQ_BOOT_FILE=start_sasl RABBITMQ_EBIN_PATH="-pa ${RABBITMQ_EBIN_ROOT}" + [ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT="${RABBITMQ_START_RABBIT} -s rabbit" fi +RABBITMQ_CONFIG_ARG= +[ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}" # we need to turn off path expansion because some of the vars, notably # RABBITMQ_SERVER_ERL_ARGS, contain terms that look like globs and @@ -94,6 +99,7 @@ exec erl \ ${RABBITMQ_START_RABBIT} \ -sname ${RABBITMQ_NODENAME} \ -boot ${RABBITMQ_BOOT_FILE} \ + ${RABBITMQ_CONFIG_ARG} \ +W w \ ${RABBITMQ_SERVER_ERL_ARGS} \ -rabbit tcp_listeners '[{"'${RABBITMQ_NODE_IP_ADDRESS}'", '${RABBITMQ_NODE_PORT}'}]' \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index b4868841d7..40f47c4b20 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -46,10 +46,6 @@ if "%RABBITMQ_NODE_PORT%"=="" ( set RABBITMQ_NODE_PORT=5672
)
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
-)
-
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
@@ -113,11 +109,20 @@ if exist "%RABBITMQ_EBIN_ROOT%\rabbit.boot" ( set RABBITMQ_BOOT_FILE=start_sasl
set RABBITMQ_EBIN_PATH=-pa "%RABBITMQ_EBIN_ROOT%"
)
+if "%RABBITMQ_CONFIG_FILE%"=="" (
+ set RABBITMQ_CONFIG_FILE="%RABBITMQ_BASE%\rabbitmq"
+)
+
+if exist "%RABBITMQ_CONFIG_FILE%.config" (
+ set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
+) else (
+ set RABBITMQ_CONFIG_ARG=""
+)
"%ERLANG_HOME%\bin\erl.exe" ^
%RABBITMQ_EBIN_PATH% ^
-noinput ^
--boot %RABBITMQ_BOOT_FILE% ^
+-boot %RABBITMQ_BOOT_FILE% %RABBITMQ_CONFIG_ARG% ^
-sname %RABBITMQ_NODENAME% ^
-s rabbit ^
+W w ^
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index c57978c050..a332afc6ca 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -30,15 +30,20 @@ ## Contributor(s): ______________________________________. ## -[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf +NODENAME=rabbit +. `dirname $0`/rabbitmq-env + +[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} [ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS} exec erl \ - -pa "`dirname $0`/../ebin" \ + -pa "${RABBITMQ_HOME}/ebin" \ -noinput \ -hidden \ ${RABBITMQ_CTL_ERL_ARGS} \ -sname rabbitmqctl$$ \ -s rabbit_control \ + -nodename $RABBITMQ_NODENAME \ -extra "$@" + diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index e4dccfba64..8a4e5445e5 100755 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -30,8 +30,8 @@ REM REM Contributor(s): ______________________________________.
REM
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
+if "%RABBITMQ_NODENAME%"=="" (
+ set RABBITMQ_NODENAME=rabbit
)
if not exist "%ERLANG_HOME%\bin\erl.exe" (
@@ -46,4 +46,4 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" ( exit /B
)
-"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden %RABBITMQ_CTL_ERL_ARGS% -sname rabbitmqctl -s rabbit_control -extra %*
+"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden %RABBITMQ_CTL_ERL_ARGS% -sname rabbitmqctl -s rabbit_control -nodename %RABBITMQ_NODENAME% -extra %*
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 36fb4fa8c3..a2d9350c4e 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -437,7 +437,10 @@ unregister_name({local,Name}) -> unregister_name({global,Name}) -> _ = global:unregister_name(Name); unregister_name(Pid) when is_pid(Pid) -> - Pid. + Pid; +% Under R12 let's just ignore it, as we have a single term as Name. +% On R13 it will never get here, as we get tuple with 'local/global' atom. +unregister_name(_Name) -> ok. extend_backoff(undefined) -> undefined; diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 732757c41c..74b41a910c 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -55,7 +55,8 @@ -module(priority_queue). --export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]). +-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, + out/1, join/2]). %%---------------------------------------------------------------------------- @@ -66,13 +67,14 @@ -type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}). -spec(new/0 :: () -> pqueue()). --spec(is_queue/1 :: (any()) -> bool()). --spec(is_empty/1 :: (pqueue()) -> bool()). +-spec(is_queue/1 :: (any()) -> boolean()). +-spec(is_empty/1 :: (pqueue()) -> boolean()). -spec(len/1 :: (pqueue()) -> non_neg_integer()). -spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]). -spec(in/2 :: (any(), pqueue()) -> pqueue()). -spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()). -spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}). +-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()). -endif. @@ -147,6 +149,42 @@ out({pqueue, [{P, Q} | Queues]}) -> end, {R, NewQ}. +join(A, {queue, [], []}) -> + A; +join({queue, [], []}, B) -> + B; +join({queue, AIn, AOut}, {queue, BIn, BOut}) -> + {queue, BIn, AOut ++ lists:reverse(AIn, BOut)}; +join(A = {queue, _, _}, {pqueue, BPQ}) -> + {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ), + Post1 = case Post of + [] -> [ {0, A} ]; + [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ]; + _ -> [ {0, A} | Post ] + end, + {pqueue, Pre ++ Post1}; +join({pqueue, APQ}, B = {queue, _, _}) -> + {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ), + Post1 = case Post of + [] -> [ {0, B} ]; + [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ]; + _ -> [ {0, B} | Post ] + end, + {pqueue, Pre ++ Post1}; +join({pqueue, APQ}, {pqueue, BPQ}) -> + {pqueue, merge(APQ, BPQ, [])}. + +merge([], BPQ, Acc) -> + lists:reverse(Acc, BPQ); +merge(APQ, [], Acc) -> + lists:reverse(Acc, APQ); +merge([{P, A}|As], [{P, B}|Bs], Acc) -> + merge(As, Bs, [ {P, join(A, B)} | Acc ]); +merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB -> + merge(As, Bs, [ {PA, A} | Acc ]); +merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) -> + merge(As, Bs, [ {PB, B} | Acc ]). + r2f([]) -> {queue, [], []}; r2f([_] = R) -> {queue, [], R}; r2f([X,Y]) -> {queue, [X], [Y]}; diff --git a/src/rabbit.erl b/src/rabbit.erl index b0d62b5ab8..18fd1b175f 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -33,7 +33,7 @@ -behaviour(application). --export([start/0, stop/0, stop_and_halt/0, status/0, rotate_logs/1]). +-export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, rotate_logs/1]). -export([start/2, stop/1]). @@ -57,6 +57,7 @@ -type(log_location() :: 'tty' | 'undefined' | string()). -type(file_suffix() :: binary()). +-spec(prepare/0 :: () -> 'ok'). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_halt/0 :: () -> 'ok'). @@ -71,11 +72,14 @@ %%---------------------------------------------------------------------------- +prepare() -> + ok = ensure_working_log_handlers(), + ok = rabbit_mnesia:ensure_mnesia_dir(). + start() -> try - ok = ensure_working_log_handlers(), - ok = rabbit_mnesia:ensure_mnesia_dir(), - ok = rabbit_misc:start_applications(?APPS) + ok = prepare(), + ok = rabbit_misc:start_applications(?APPS) after %%give the error loggers some time to catch up timer:sleep(100) @@ -116,8 +120,6 @@ start(normal, []) -> print_banner(), - {ok, ExtraSteps} = application:get_env(extra_startup_steps), - lists:foreach( fun ({Msg, Thunk}) -> io:format("starting ~-20s ...", [Msg]), @@ -135,13 +137,13 @@ start(normal, []) -> ok = start_child(rabbit_log), ok = rabbit_hooks:start(), - ok = rabbit_amqqueue:start(), + ok = rabbit_binary_generator: + check_empty_content_body_frame_size(), {ok, MemoryAlarms} = application:get_env(memory_alarms), ok = rabbit_alarm:start(MemoryAlarms), - ok = rabbit_binary_generator: - check_empty_content_body_frame_size(), + ok = rabbit_amqqueue:start(), ok = start_child(rabbit_router), ok = start_child(rabbit_node_monitor) @@ -170,14 +172,28 @@ start(normal, []) -> {"TCP listeners", fun () -> ok = rabbit_networking:start(), - {ok, TCPListeners} = application:get_env(tcp_listeners), + {ok, TcpListeners} = application:get_env(tcp_listeners), lists:foreach( fun ({Host, Port}) -> ok = rabbit_networking:start_tcp_listener(Host, Port) end, - TCPListeners) - end}] - ++ ExtraSteps), + TcpListeners) + end}, + {"SSL listeners", + fun () -> + case application:get_env(ssl_listeners) of + {ok, []} -> + ok; + {ok, SslListeners} -> + ok = rabbit_misc:start_applications([crypto, ssl]), + + {ok, SslOpts} = application:get_env(ssl_options), + + [rabbit_networking:start_ssl_listener + (Host, Port, SslOpts) || {Host, Port} <- SslListeners], + ok + end + end}]), io:format("~nbroker running~n"), @@ -203,6 +219,16 @@ log_location(Type) -> _ -> undefined end. +app_location() -> + {ok, Application} = application:get_application(), + filename:absname(code:where_is_file(atom_to_list(Application) ++ ".app")). + +home_dir() -> + case init:get_argument(home) of + {ok, [[Home]]} -> Home; + Other -> Other + end. + %--------------------------------------------------------------------------- print_banner() -> @@ -225,10 +251,13 @@ print_banner() -> [Product, string:right([$v|Version], ProductLen), ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), - Settings = [{"node", node()}, - {"log", log_location(kernel)}, - {"sasl log", log_location(sasl)}, - {"database dir", rabbit_mnesia:dir()}], + Settings = [{"node", node()}, + {"app descriptor", app_location()}, + {"home dir", home_dir()}, + {"cookie hash", rabbit_misc:cookie_hash()}, + {"log", log_location(kernel)}, + {"sasl log", log_location(sasl)}, + {"database dir", rabbit_mnesia:dir()}], DescrLen = lists:max([length(K) || {K, _V} <- Settings]), Format = "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings), diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 21999f16c3..7a2fbcb826 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -41,7 +41,7 @@ -define(MEMSUP_CHECK_INTERVAL, 1000). %% OSes on which we know memory alarms to be trustworthy --define(SUPPORTED_OS, [{unix, linux}]). +-define(SUPPORTED_OS, [{unix, linux}, {unix, darwin}]). -record(alarms, {alertees, system_memory_high_watermark = false}). @@ -50,7 +50,7 @@ -ifdef(use_specs). -type(mfa_tuple() :: {atom(), atom(), list()}). --spec(start/1 :: (bool() | 'auto') -> 'ok'). +-spec(start/1 :: (boolean() | 'auto') -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(register/2 :: (pid(), mfa_tuple()) -> 'ok'). @@ -136,33 +136,35 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- start_memsup() -> - Mod = case os:type() of - %% memsup doesn't take account of buffers or cache when - %% considering "free" memory - therefore on Linux we can - %% get memory alarms very easily without any pressure - %% existing on memory at all. Therefore we need to use - %% our own simple memory monitor. - %% - {unix, linux} -> rabbit_memsup_linux; - - %% Start memsup programmatically rather than via the - %% rabbitmq-server script. This is not quite the right - %% thing to do as os_mon checks to see if memsup is - %% available before starting it, but as memsup is - %% available everywhere (even on VXWorks) it should be - %% ok. - %% - %% One benefit of the programmatic startup is that we - %% can add our alarm_handler before memsup is running, - %% thus ensuring that we notice memory alarms that go - %% off on startup. - %% - _ -> memsup - end, + {Mod, Args} = + case os:type() of + %% memsup doesn't take account of buffers or cache when + %% considering "free" memory - therefore on Linux we can + %% get memory alarms very easily without any pressure + %% existing on memory at all. Therefore we need to use + %% our own simple memory monitor. + %% + {unix, linux} -> {rabbit_memsup, [rabbit_memsup_linux]}; + {unix, darwin} -> {rabbit_memsup, [rabbit_memsup_darwin]}; + + %% Start memsup programmatically rather than via the + %% rabbitmq-server script. This is not quite the right + %% thing to do as os_mon checks to see if memsup is + %% available before starting it, but as memsup is + %% available everywhere (even on VXWorks) it should be + %% ok. + %% + %% One benefit of the programmatic startup is that we + %% can add our alarm_handler before memsup is running, + %% thus ensuring that we notice memory alarms that go + %% off on startup. + %% + _ -> {memsup, []} + end, %% This is based on os_mon:childspec(memsup, true) {ok, _} = supervisor:start_child( os_mon_sup, - {memsup, {Mod, start_link, []}, + {memsup, {Mod, start_link, Args}, permanent, 2000, worker, [Mod]}), ok. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4903c2c57f..1a5e82d714 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -63,7 +63,7 @@ -spec(start/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). --spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) -> +-spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) -> amqqueue()). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). @@ -83,8 +83,8 @@ {'error', 'in_use'} | {'error', 'not_empty'}). -spec(purge/1 :: (amqqueue()) -> qlen()). --spec(deliver/2 :: (pid(), delivery()) -> bool()). --spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok'). +-spec(deliver/2 :: (pid(), delivery()) -> boolean()). +-spec(redeliver/2 :: (pid(), [{message(), boolean()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). -spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). @@ -92,16 +92,16 @@ -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). --spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> +-spec(basic_get/3 :: (amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), msg()} | 'empty'). -spec(basic_consume/8 :: - (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) -> + (amqqueue(), boolean(), pid(), pid(), pid(), ctag(), boolean(), any()) -> 'ok' | {'error', 'queue_owned_by_another_connection' | 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). --spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()). +-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -303,10 +303,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> infinity). notify_sent(QPid, ChPid) -> - gen_server2:cast(QPid, {notify_sent, ChPid}). + gen_server2:pcast(QPid, 8, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - gen_server2:cast(QPid, {unblock, ChPid}). + gen_server2:pcast(QPid, 8, {unblock, ChPid}). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 4033aaafda..bec2cd0845 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -45,13 +45,14 @@ -type(publish_result() :: ({ok, routing_result(), [pid()]} | not_found())). -spec(publish/1 :: (delivery()) -> publish_result()). --spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()). +-spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) -> + delivery()). -spec(message/4 :: (exchange_name(), routing_key(), properties_input(), binary()) -> message()). -spec(properties/1 :: (properties_input()) -> amqp_properties()). -spec(publish/4 :: (exchange_name(), routing_key(), properties_input(), binary()) -> publish_result()). --spec(publish/7 :: (exchange_name(), routing_key(), bool(), bool(), +-spec(publish/7 :: (exchange_name(), routing_key(), boolean(), boolean(), maybe(txn()), properties_input(), binary()) -> publish_result()). -spec(build_content/2 :: (amqp_properties(), binary()) -> content()). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8af85b34c4..26db0777d0 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -60,8 +60,8 @@ -spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). --spec(deliver/4 :: (pid(), ctag(), bool(), msg()) -> 'ok'). --spec(conserve_memory/2 :: (pid(), bool()) -> 'ok'). +-spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok'). +-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -endif. @@ -125,11 +125,11 @@ handle_cast({method, Method, Content}, State) -> stop -> {stop, normal, State#ch{state = terminating}} catch - exit:{amqp, Error, Explanation, none} -> - ok = notify_queues(internal_rollback(State)), - Reason = {amqp, Error, Explanation, - rabbit_misc:method_record_type(Method)}, - State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, + exit:Reason = #amqp_error{} -> + ok = rollback_and_notify(State), + MethodName = rabbit_misc:method_record_type(Method), + State#ch.reader_pid ! {channel_exit, State#ch.channel, + Reason#amqp_error{method = MethodName}}, {stop, normal, State#ch{state = terminating}}; exit:normal -> {stop, normal, State}; @@ -157,6 +157,10 @@ handle_cast({conserve_memory, Conserve}, State) -> State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), noreply(State). +handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, + State = #ch{writer_pid = WriterPid}) -> + State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, + {stop, normal, State}; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; @@ -171,7 +175,7 @@ terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid, terminate(Reason, State = #ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> - Res = notify_queues(internal_rollback(State)), + Res = rollback_and_notify(State), case Reason of normal -> ok = Res; _ -> ok @@ -256,9 +260,6 @@ expand_routing_key_shortcut(<<>>, <<>>, expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) -> RoutingKey. -die_precondition_failed(Fmt, Params) -> - rabbit_misc:protocol_error(precondition_failed, Fmt, Params). - %% check that an exchange/queue name does not contain the reserved %% "amq." prefix. %% @@ -290,7 +291,7 @@ handle_method(_Method, _, #ch{state = starting}) -> rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []); handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> - ok = notify_queues(internal_rollback(State)), + ok = rollback_and_notify(State), ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), stop; @@ -601,8 +602,8 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, {error, not_found} -> rabbit_misc:not_found(ExchangeName); {error, in_use} -> - die_precondition_failed( - "~s in use", [rabbit_misc:rs(ExchangeName)]); + rabbit_misc:protocol_error( + precondition_failed, "~s in use", [rabbit_misc:rs(ExchangeName)]); ok -> return_ok(State, NoWait, #'exchange.delete_ok'{}) end; @@ -676,11 +677,11 @@ handle_method(#'queue.delete'{queue = QueueNameBin, QueueName, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> - die_precondition_failed( - "~s in use", [rabbit_misc:rs(QueueName)]); + rabbit_misc:protocol_error( + precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]); {error, not_empty} -> - die_precondition_failed( - "~s not empty", [rabbit_misc:rs(QueueName)]); + rabbit_misc:protocol_error( + precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]); {ok, PurgedMessageCount} -> return_ok(State, NoWait, #'queue.delete_ok'{ @@ -863,6 +864,11 @@ internal_rollback(State = #ch{transaction_id = TxnKey, internal_error, "rollback failed: ~w", [Errors]) end. +rollback_and_notify(State = #ch{transaction_id = none}) -> + notify_queues(State); +rollback_and_notify(State) -> + notify_queues(internal_rollback(State)). + fold_per_queue(F, Acc0, UAQ) -> D = lists:foldl( fun ({_DTag, _CTag, diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 37e4d18993..a53ac289f2 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -52,10 +52,12 @@ %%---------------------------------------------------------------------------- start() -> + {ok, [[NodeNameStr|_]|_]} = init:get_argument(nodename), + NodeName = list_to_atom(NodeNameStr), FullCommand = init:get_plain_arguments(), #params{quiet = Quiet, node = Node, command = Command, args = Args} = parse_args(FullCommand, #params{quiet = false, - node = rabbit_misc:localnode(rabbit)}), + node = rabbit_misc:localnode(NodeName)}), Inform = case Quiet of true -> fun(_Format, _Args1) -> ok end; false -> fun(Format, Args1) -> @@ -80,13 +82,38 @@ start() -> {error, Reason} -> error("~p", [Reason]), halt(2); + {badrpc, Reason} -> + error("unable to connect to node ~w: ~w", [Node, Reason]), + print_badrpc_diagnostics(Node), + halt(2); Other -> error("~p", [Other]), halt(2) end. -error(Format, Args) -> - rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args). +fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args). + +error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args). + +print_badrpc_diagnostics(Node) -> + fmt_stderr("diagnostics:", []), + NodeHost = rabbit_misc:nodehost(Node), + case net_adm:names(NodeHost) of + {error, EpmdReason} -> + fmt_stderr("- unable to connect to epmd on ~s: ~w", + [NodeHost, EpmdReason]); + {ok, NamePorts} -> + fmt_stderr("- nodes and their ports on ~s: ~p", + [NodeHost, [{list_to_atom(Name), Port} || + {Name, Port} <- NamePorts]]) + end, + fmt_stderr("- current node: ~w", [node()]), + case init:get_argument(home) of + {ok, [[Home]]} -> fmt_stderr("- current node home dir: ~s", [Home]); + Other -> fmt_stderr("- no current node home dir: ~p", [Other]) + end, + fmt_stderr("- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]), + ok. parse_args(["-n", NodeS | Args], Params) -> Node = case lists:member($@, NodeS) of @@ -164,7 +191,7 @@ exchange name, routing key, queue name and arguments, in that order. <ConnectionInfoItem> must be a member of the list [node, address, port, peer_address, peer_port, state, channels, user, vhost, timeout, frame_max, recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display -user, peer_address and peer_port. +user, peer_address, peer_port and state. "), halt(1). @@ -197,9 +224,11 @@ action(cluster, Node, ClusterNodeSs, Inform) -> action(status, Node, [], Inform) -> Inform("Status of node ~p", [Node]), - Res = call(Node, {rabbit, status, []}), - io:format("~p~n", [Res]), - ok; + case call(Node, {rabbit, status, []}) of + {badrpc, _} = Res -> Res; + Res -> io:format("~p~n", [Res]), + ok + end; action(rotate_logs, Node, [], Inform) -> Inform("Reopening logs for node ~p", [Node]), @@ -270,8 +299,9 @@ action(list_bindings, Node, Args, Inform) -> action(list_connections, Node, Args, Inform) -> Inform("Listing connections", []), - ArgAtoms = list_replace(node, pid, - default_if_empty(Args, [user, peer_address, peer_port])), + ArgAtoms = list_replace(node, pid, + default_if_empty(Args, [user, peer_address, + peer_port, state])), display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, [ArgAtoms]), ArgAtoms); @@ -314,7 +344,7 @@ default_if_empty(List, Default) when is_list(List) -> end. display_info_list(Results, InfoItemKeys) when is_list(Results) -> - lists:foreach(fun (Result) -> display_row([format_info_item(Result, X) || + lists:foreach(fun (Result) -> display_row([format_info_item(X, Result) || X <- InfoItemKeys]) end, Results), ok; @@ -325,26 +355,29 @@ display_row(Row) -> io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))), io:nl(). -format_info_item(Items, Key) -> - {value, Info = {Key, Value}} = lists:keysearch(Key, 1, Items), - case Info of - {_, #resource{name = Name}} -> - url_encode(Name); - _ when Key =:= address; Key =:= peer_address andalso is_tuple(Value) -> +format_info_item(Key, Items) -> + case proplists:get_value(Key, Items) of + #resource{name = Name} -> + escape(Name); + Value when Key =:= address; Key =:= peer_address andalso + is_tuple(Value) -> inet_parse:ntoa(Value); - _ when is_pid(Value) -> + Value when is_pid(Value) -> atom_to_list(node(Value)); - _ when is_binary(Value) -> - url_encode(Value); - _ -> + Value when is_binary(Value) -> + escape(Value); + Value when is_atom(Value) -> + escape(atom_to_list(Value)); + Value -> io_lib:format("~w", [Value]) end. display_list(L) when is_list(L) -> lists:foreach(fun (I) when is_binary(I) -> - io:format("~s~n", [url_encode(I)]); + io:format("~s~n", [escape(I)]); (I) when is_tuple(I) -> - display_row([url_encode(V) || V <- tuple_to_list(I)]) + display_row([escape(V) + || V <- tuple_to_list(I)]) end, lists:sort(L)), ok; @@ -356,32 +389,25 @@ call(Node, {Mod, Fun, Args}) -> rpc_call(Node, Mod, Fun, Args) -> rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT). -%% url_encode is lifted from ibrowse, modified to preserve some characters -url_encode(Bin) when binary(Bin) -> - url_encode_char(lists:reverse(binary_to_list(Bin)), []). - -url_encode_char([X | T], Acc) when X >= $a, X =< $z -> - url_encode_char(T, [X | Acc]); -url_encode_char([X | T], Acc) when X >= $A, X =< $Z -> - url_encode_char(T, [X | Acc]); -url_encode_char([X | T], Acc) when X >= $0, X =< $9 -> - url_encode_char(T, [X | Acc]); -url_encode_char([X | T], Acc) - when X == $-; X == $_; X == $.; X == $~; - X == $!; X == $*; X == $'; X == $(; - X == $); X == $;; X == $:; X == $@; - X == $&; X == $=; X == $+; X == $$; - X == $,; X == $/; X == $?; X == $%; - X == $#; X == $[; X == $] -> - url_encode_char(T, [X | Acc]); -url_encode_char([X | T], Acc) -> - url_encode_char(T, [$%, d2h(X bsr 4), d2h(X band 16#0f) | Acc]); -url_encode_char([], Acc) -> +%% escape does C-style backslash escaping of non-printable ASCII +%% characters. We don't escape characters above 127, since they may +%% form part of UTF-8 strings. + +escape(Bin) when binary(Bin) -> + escape(binary_to_list(Bin)); +escape(L) when is_list(L) -> + escape_char(lists:reverse(L), []). + +escape_char([$\\ | T], Acc) -> + escape_char(T, [$\\, $\\ | Acc]); +escape_char([X | T], Acc) when X > 32, X /= 127 -> + escape_char(T, [X | Acc]); +escape_char([X | T], Acc) -> + escape_char(T, [$\\, $0 + (X bsr 6), $0 + (X band 8#070 bsr 3), + $0 + (X band 7) | Acc]); +escape_char([], Acc) -> Acc. -d2h(N) when N<10 -> N+$0; -d2h(N) -> N+$a-10. - list_replace(Find, Replace, List) -> [case X of Find -> Replace; _ -> X end || X <- List]. diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl new file mode 100644 index 0000000000..23e6fc4432 --- /dev/null +++ b/src/rabbit_dialyzer.erl @@ -0,0 +1,91 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_dialyzer). +-include("rabbit.hrl"). + +-export([create_basic_plt/1, add_to_plt/2, dialyze_files/2, halt_with_code/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(create_basic_plt/1 :: (string()) -> 'ok'). +-spec(add_to_plt/2 :: (string(), string()) -> 'ok'). +-spec(dialyze_files/2 :: (string(), string()) -> 'ok'). +-spec(halt_with_code/1 :: (atom()) -> no_return()). + +-endif. + +%%---------------------------------------------------------------------------- + +create_basic_plt(BasicPltPath) -> + OptsRecord = dialyzer_options:build( + [{analysis_type, plt_build}, + {output_plt, BasicPltPath}, + {files_rec, otp_apps_dependencies_paths()}]), + dialyzer_cl:start(OptsRecord), + ok. + +add_to_plt(PltPath, FilesString) -> + {ok, Files} = regexp:split(FilesString, " "), + DialyzerWarnings = dialyzer:run([{analysis_type, plt_add}, + {init_plt, PltPath}, + {output_plt, PltPath}, + {files, Files}]), + print_warnings(DialyzerWarnings), + ok. + +dialyze_files(PltPath, ModifiedFiles) -> + {ok, Files} = regexp:split(ModifiedFiles, " "), + DialyzerWarnings = dialyzer:run([{init_plt, PltPath}, + {files, Files}]), + case DialyzerWarnings of + [] -> io:format("~nOk~n"), + ok; + _ -> io:format("~nFAILED with the following warnings:~n"), + print_warnings(DialyzerWarnings), + fail + end. + +print_warnings(Warnings) -> + [io:format("~s", [dialyzer:format_warning(W)]) || W <- Warnings], + io:format("~n"), + ok. + +otp_apps_dependencies_paths() -> + [code:lib_dir(App, ebin) || + App <- [kernel, stdlib, sasl, mnesia, os_mon, ssl, eunit, tools]]. + +halt_with_code(ok) -> + halt(); +halt_with_code(fail) -> + halt(1). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index f17ee2f530..37a1357d41 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -61,7 +61,7 @@ 'exchange_not_found' | 'exchange_and_queue_not_found'}). -spec(recover/0 :: () -> 'ok'). --spec(declare/4 :: (exchange_name(), exchange_type(), bool(), amqp_table()) -> exchange()). +-spec(declare/4 :: (exchange_name(), exchange_type(), boolean(), amqp_table()) -> exchange()). -spec(check_type/1 :: (binary()) -> atom()). -spec(assert_type/2 :: (exchange(), atom()) -> 'ok'). -spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()). @@ -82,9 +82,9 @@ [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). -spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok'). -spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok'). --spec(topic_matches/2 :: (binary(), binary()) -> bool()). --spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()). --spec(delete/2 :: (exchange_name(), bool()) -> +-spec(topic_matches/2 :: (binary(), binary()) -> boolean()). +-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()). +-spec(delete/2 :: (exchange_name(), boolean()) -> 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> [{exchange_name(), routing_key(), amqp_table()}]). diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 2be005034e..b789fbd1e0 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -42,6 +42,7 @@ terminate/2, code_change/3]). -define(SERVER, ?MODULE). +-define(SERIAL_FILENAME, "rabbit_serial"). -record(state, {serial}). @@ -59,17 +60,28 @@ %%---------------------------------------------------------------------------- start_link() -> - %% The persister can get heavily loaded, and we don't want that to - %% impact guid generation. We therefore keep the serial in a - %% separate process rather than calling rabbit_persister:serial/0 - %% directly in the functions below. gen_server:start_link({local, ?SERVER}, ?MODULE, - [rabbit_persister:serial()], []). + [update_disk_serial()], []). + +update_disk_serial() -> + Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME), + Serial = case rabbit_misc:read_term_file(Filename) of + {ok, [Num]} -> Num; + {error, enoent} -> rabbit_persister:serial(); + {error, Reason} -> + throw({error, {cannot_read_serial_file, Filename, Reason}}) + end, + case rabbit_misc:write_term_file(Filename, [Serial + 1]) of + ok -> ok; + {error, Reason1} -> + throw({error, {cannot_write_serial_file, Filename, Reason1}}) + end, + Serial. %% generate a guid that is monotonically increasing per process. %% %% The id is only unique within a single cluster and as long as the -%% persistent message store hasn't been deleted. +%% serial store hasn't been deleted. guid() -> %% We don't use erlang:now() here because a) it may return %% duplicates when the system clock has been rewound prior to a @@ -77,7 +89,7 @@ guid() -> %% now() to move ahead of the system time), and b) it is really %% slow since it takes a global lock and makes a system call. %% - %% rabbit_persister:serial/0, in combination with self/0 (which + %% A persisted serial number, in combination with self/0 (which %% includes the node name) uniquely identifies a process in space %% and time. We combine that with a process-local counter to give %% us a GUID that is monotonically increasing per process. diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 0a68c9adad..ed0066fe07 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -53,7 +53,7 @@ start_heartbeat(Sock, TimeoutSec) -> spawn_link(fun () -> heartbeater(Sock, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> - catch gen_tcp:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), + catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), continue end, erlang:monitor(process, Parent)) end), @@ -73,7 +73,7 @@ heartbeater(Sock, TimeoutMillisec, StatName, Threshold, Handler, MonitorRef) -> {'DOWN', MonitorRef, process, _Object, _Info} -> ok; Other -> exit({unexpected_message, Other}) after TimeoutMillisec -> - case inet:getstat(Sock, [StatName]) of + case rabbit_net:getstat(Sock, [StatName]) of {ok, [{StatName, NewStatVal}]} -> if NewStatVal =/= StatVal -> F({NewStatVal, 0}); diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 9f3dcbd071..087a9f64d9 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -47,7 +47,7 @@ -spec(start_link/1 :: (pid()) -> pid()). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). --spec(can_send/3 :: (maybe_pid(), pid(), bool()) -> bool()). +-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl index 7bf85347fb..6ef638cb59 100644 --- a/src/rabbit_load.erl +++ b/src/rabbit_load.erl @@ -41,7 +41,7 @@ -ifdef(use_specs). -type(erlang_node() :: atom()). --type(load() :: {{non_neg_integer(), float()}, erlang_node()}). +-type(load() :: {{non_neg_integer(), integer() | 'unknown'}, erlang_node()}). -spec(local_load/0 :: () -> load()). -spec(remote_loads/0 :: () -> [load()]). -spec(pick/0 :: () -> erlang_node()). @@ -52,8 +52,11 @@ local_load() -> LoadAvg = case whereis(cpu_sup) of - undefined -> 0.0; - _Other -> cpu_sup:avg1() + undefined -> unknown; + _ -> case cpu_sup:avg1() of + L when is_integer(L) -> L; + {error, timeout} -> unknown + end end, {{statistics(run_queue), LoadAvg}, node()}. @@ -65,8 +68,12 @@ remote_loads() -> pick() -> RemoteLoads = remote_loads(), {{RunQ, LoadAvg}, Node} = local_load(), - %% add bias towards current node - AdjustedLoadAvg = LoadAvg * ?FUDGE_FACTOR, + %% add bias towards current node; we rely on Erlang's term order + %% of SomeFloat < local_unknown < unknown. + AdjustedLoadAvg = case LoadAvg of + unknown -> local_unknown; + _ -> LoadAvg * ?FUDGE_FACTOR + end, Loads = [{{RunQ, AdjustedLoadAvg}, Node} | RemoteLoads], {_, SelectedNode} = lists:min(Loads), SelectedNode. diff --git a/src/rabbit_memsup.erl b/src/rabbit_memsup.erl new file mode 100644 index 0000000000..b0d57cb27e --- /dev/null +++ b/src/rabbit_memsup.erl @@ -0,0 +1,142 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_memsup). + +-behaviour(gen_server). + +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([update/0]). + +-record(state, {memory_fraction, + timeout, + timer, + mod, + mod_state, + alarmed + }). + +-define(SERVER, memsup). %% must be the same as the standard memsup + +-define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: (atom()) -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(update/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link(Args) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). + +update() -> + gen_server:cast(?SERVER, update). + +%%---------------------------------------------------------------------------- + +init([Mod]) -> + Fraction = os_mon:get_env(memsup, system_memory_high_watermark), + TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL), + InitState = Mod:init(), + State = #state { memory_fraction = Fraction, + timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL, + timer = TRef, + mod = Mod, + mod_state = InitState, + alarmed = false }, + {ok, internal_update(State)}. + +start_timer(Timeout) -> + {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []), + TRef. + +%% Export the same API as the real memsup. Note that +%% get_sysmem_high_watermark gives an int in the range 0 - 100, while +%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0. +handle_call(get_sysmem_high_watermark, _From, State) -> + {reply, trunc(100 * State#state.memory_fraction), State}; + +handle_call({set_sysmem_high_watermark, Float}, _From, State) -> + {reply, ok, State#state{memory_fraction = Float}}; + +handle_call(get_check_interval, _From, State) -> + {reply, State#state.timeout, State}; + +handle_call({set_check_interval, Timeout}, _From, State) -> + {ok, cancel} = timer:cancel(State#state.timer), + {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; + +handle_call(get_memory_data, _From, + State = #state { mod = Mod, mod_state = ModState }) -> + {reply, Mod:get_memory_data(ModState), State}; + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(update, State) -> + {noreply, internal_update(State)}; + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +internal_update(State = #state { memory_fraction = MemoryFraction, + alarmed = Alarmed, + mod = Mod, mod_state = ModState }) -> + ModState1 = Mod:update(ModState), + {MemTotal, MemUsed, _BigProc} = Mod:get_memory_data(ModState1), + NewAlarmed = MemUsed / MemTotal > MemoryFraction, + case {Alarmed, NewAlarmed} of + {false, true} -> + alarm_handler:set_alarm({system_memory_high_watermark, []}); + {true, false} -> + alarm_handler:clear_alarm(system_memory_high_watermark); + _ -> + ok + end, + State #state { mod_state = ModState1, alarmed = NewAlarmed }. diff --git a/src/rabbit_memsup_darwin.erl b/src/rabbit_memsup_darwin.erl new file mode 100644 index 0000000000..3de2d8430e --- /dev/null +++ b/src/rabbit_memsup_darwin.erl @@ -0,0 +1,88 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_memsup_darwin). + +-export([init/0, update/1, get_memory_data/1]). + +-record(state, {total_memory, + allocated_memory}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()), + allocated_memory :: ('undefined' | non_neg_integer()) + }). + +-spec(init/0 :: () -> state()). +-spec(update/1 :: (state()) -> state()). +-spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(), + ('undefined' | pid())}). + +-endif. + +%%---------------------------------------------------------------------------- + +init() -> + #state{total_memory = undefined, + allocated_memory = undefined}. + +update(State) -> + File = os:cmd("/usr/bin/vm_stat"), + Lines = string:tokens(File, "\n"), + Dict = dict:from_list(lists:map(fun parse_line/1, Lines)), + [PageSize, Inactive, Active, Free, Wired] = + [dict:fetch(Key, Dict) || + Key <- [page_size, 'Pages inactive', 'Pages active', 'Pages free', + 'Pages wired down']], + MemTotal = PageSize * (Inactive + Active + Free + Wired), + MemUsed = PageSize * (Active + Wired), + State#state{total_memory = MemTotal, allocated_memory = MemUsed}. + +get_memory_data(State) -> + {State#state.total_memory, State#state.allocated_memory, undefined}. + +%%---------------------------------------------------------------------------- + +%% A line looks like "Foo bar: 123456." +parse_line(Line) -> + [Name, RHS | _Rest] = string:tokens(Line, ":"), + case Name of + "Mach Virtual Memory Statistics" -> + ["(page", "size", "of", PageSize, "bytes)"] = + string:tokens(RHS, " "), + {page_size, list_to_integer(PageSize)}; + _ -> + [Value | _Rest1] = string:tokens(RHS, " ."), + {list_to_atom(Name), list_to_integer(Value)} + end. diff --git a/src/rabbit_memsup_linux.erl b/src/rabbit_memsup_linux.erl index ffdc7e9946..ca942d7caa 100644 --- a/src/rabbit_memsup_linux.erl +++ b/src/rabbit_memsup_linux.erl @@ -31,104 +31,44 @@ -module(rabbit_memsup_linux). --behaviour(gen_server). +-export([init/0, update/1, get_memory_data/1]). --export([start_link/0]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --export([update/0]). - --define(SERVER, memsup). %% must be the same as the standard memsup - --define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000). - --record(state, {memory_fraction, alarmed, timeout, timer}). +-record(state, {total_memory, + allocated_memory}). %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). --spec(update/0 :: () -> 'ok'). - --endif. - -%%---------------------------------------------------------------------------- - -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()), + allocated_memory :: ('undefined' | non_neg_integer()) + }). +-spec(init/0 :: () -> state()). +-spec(update/1 :: (state()) -> state()). +-spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(), + ('undefined' | pid())}). -update() -> - gen_server:cast(?SERVER, update). +-endif. %%---------------------------------------------------------------------------- -init(_Args) -> - Fraction = os_mon:get_env(memsup, system_memory_high_watermark), - TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL), - {ok, #state{alarmed = false, - memory_fraction = Fraction, - timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL, - timer = TRef}}. - -start_timer(Timeout) -> - {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []), - TRef. - -%% Export the same API as the real memsup. Note that -%% get_sysmem_high_watermark gives an int in the range 0 - 100, while -%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0. -handle_call(get_sysmem_high_watermark, _From, State) -> - {reply, trunc(100 * State#state.memory_fraction), State}; - -handle_call({set_sysmem_high_watermark, Float}, _From, State) -> - {reply, ok, State#state{memory_fraction = Float}}; +init() -> + #state{total_memory = undefined, + allocated_memory = undefined}. -handle_call(get_check_interval, _From, State) -> - {reply, State#state.timeout, State}; - -handle_call({set_check_interval, Timeout}, _From, State) -> - {ok, cancel} = timer:cancel(State#state.timer), - {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; - -handle_call(_Request, _From, State) -> - {noreply, State}. - -handle_cast(update, State = #state{alarmed = Alarmed, - memory_fraction = MemoryFraction}) -> +update(State) -> File = read_proc_file("/proc/meminfo"), Lines = string:tokens(File, "\n"), Dict = dict:from_list(lists:map(fun parse_line/1, Lines)), - MemTotal = dict:fetch('MemTotal', Dict), - MemUsed = MemTotal - - dict:fetch('MemFree', Dict) - - dict:fetch('Buffers', Dict) - - dict:fetch('Cached', Dict), - NewAlarmed = MemUsed / MemTotal > MemoryFraction, - case {Alarmed, NewAlarmed} of - {false, true} -> - alarm_handler:set_alarm({system_memory_high_watermark, []}); - {true, false} -> - alarm_handler:clear_alarm(system_memory_high_watermark); - _ -> - ok - end, - {noreply, State#state{alarmed = NewAlarmed}}; - -handle_cast(_Request, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. + [MemTotal, MemFree, Buffers, Cached] = + [dict:fetch(Key, Dict) || + Key <- ['MemTotal', 'MemFree', 'Buffers', 'Cached']], + MemUsed = MemTotal - MemFree - Buffers - Cached, + State#state{total_memory = MemTotal, allocated_memory = MemUsed}. + +get_memory_data(State) -> + {State#state.total_memory, State#state.allocated_memory, undefined}. %%---------------------------------------------------------------------------- @@ -152,5 +92,10 @@ read_proc_file(IoDevice, Acc) -> %% A line looks like "FooBar: 123456 kB" parse_line(Line) -> - [Name, Value | _] = string:tokens(Line, ": "), - {list_to_atom(Name), list_to_integer(Value)}. + [Name, RHS | _Rest] = string:tokens(Line, ":"), + [Value | UnitsRest] = string:tokens(RHS, " "), + Value1 = case UnitsRest of + [] -> list_to_integer(Value); %% no units + ["kB"] -> list_to_integer(Value) * 1024 + end, + {list_to_atom(Name), Value1}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index abf4c7ccfa..b20e9a86b6 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -35,7 +35,8 @@ -include_lib("kernel/include/file.hrl"). -export([method_record_type/1, polite_pause/0, polite_pause/1]). --export([die/1, frame_error/2, protocol_error/3, protocol_error/4]). +-export([die/1, frame_error/2, amqp_error/4, + protocol_error/3, protocol_error/4]). -export([not_found/1]). -export([get_config/1, get_config/2, set_config/2]). -export([dirty_read/1]). @@ -46,13 +47,15 @@ -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([ensure_ok/2]). --export([localnode/1, tcp_name/3]). +-export([localnode/1, nodehost/1, cookie_hash/0, tcp_name/3]). -export([intersperse/2, upmap/2, map_in_order/2]). -export([table_foreach/2]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). +-export([read_term_file/1, write_term_file/2]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). +-export([unfold/2, ceil/1]). -import(mnesia). -import(lists). @@ -65,15 +68,16 @@ -include_lib("kernel/include/inet.hrl"). +-type(ok_or_error() :: 'ok' | {'error', any()}). + -spec(method_record_type/1 :: (tuple()) -> atom()). -spec(polite_pause/0 :: () -> 'done'). -spec(polite_pause/1 :: (non_neg_integer()) -> 'done'). -spec(die/1 :: (atom()) -> no_return()). -spec(frame_error/2 :: (atom(), binary()) -> no_return()). --spec(protocol_error/3 :: - (atom() | amqp_error(), string(), [any()]) -> no_return()). --spec(protocol_error/4 :: - (atom() | amqp_error(), string(), [any()], atom()) -> no_return()). +-spec(amqp_error/4 :: (atom(), string(), [any()], atom()) -> amqp_error()). +-spec(protocol_error/3 :: (atom(), string(), [any()]) -> no_return()). +-spec(protocol_error/4 :: (atom(), string(), [any()], atom()) -> no_return()). -spec(not_found/1 :: (r(atom())) -> no_return()). -spec(get_config/1 :: (atom()) -> {'ok', any()} | not_found()). -spec(get_config/2 :: (atom(), A) -> A). @@ -88,9 +92,9 @@ -spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) -> undefined | r(K) when is_subtype(K, atom())). -spec(rs/1 :: (r(atom())) -> string()). --spec(enable_cover/0 :: () -> 'ok' | {'error', any()}). +-spec(enable_cover/0 :: () -> ok_or_error()). -spec(report_cover/0 :: () -> 'ok'). --spec(enable_cover/1 :: (string()) -> 'ok' | {'error', any()}). +-spec(enable_cover/1 :: (string()) -> ok_or_error()). -spec(report_cover/1 :: (string()) -> 'ok'). -spec(throw_on_error/2 :: (atom(), thunk({error, any()} | {ok, A} | A)) -> A). @@ -100,8 +104,10 @@ -spec(with_vhost/2 :: (vhost(), thunk(A)) -> A). -spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A). -spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). --spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok'). +-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok'). -spec(localnode/1 :: (atom()) -> erlang_node()). +-spec(nodehost/1 :: (erlang_node()) -> string()). +-spec(cookie_hash/0 :: () -> string()). -spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()). -spec(intersperse/2 :: (A, [A]) -> [A]). -spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). @@ -110,12 +116,16 @@ -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). --spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}). --spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}). +-spec(dirty_dump_log/1 :: (string()) -> ok_or_error()). +-spec(read_term_file/1 :: (string()) -> {'ok', [any()]} | {'error', any()}). +-spec(write_term_file/2 :: (string(), [any()]) -> ok_or_error()). +-spec(append_file/2 :: (string(), string()) -> ok_or_error()). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). +-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). +-spec(ceil/1 :: (number()) -> number()). -endif. @@ -136,15 +146,17 @@ die(Error) -> protocol_error(Error, "~w", [Error]). frame_error(MethodName, BinaryFields) -> - protocol_error(frame_error, "cannot decode ~w", - [BinaryFields], MethodName). + protocol_error(frame_error, "cannot decode ~w", [BinaryFields], MethodName). + +amqp_error(Name, ExplanationFormat, Params, Method) -> + Explanation = lists:flatten(io_lib:format(ExplanationFormat, Params)), + #amqp_error{name = Name, explanation = Explanation, method = Method}. -protocol_error(Error, Explanation, Params) -> - protocol_error(Error, Explanation, Params, none). +protocol_error(Name, ExplanationFormat, Params) -> + protocol_error(Name, ExplanationFormat, Params, none). -protocol_error(Error, Explanation, Params, Method) -> - CompleteExplanation = lists:flatten(io_lib:format(Explanation, Params)), - exit({amqp, Error, CompleteExplanation, Method}). +protocol_error(Name, ExplanationFormat, Params, Method) -> + exit(amqp_error(Name, ExplanationFormat, Params, Method)). not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]). @@ -297,11 +309,15 @@ ensure_ok(ok, _) -> ok; ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}). localnode(Name) -> + list_to_atom(lists:append([atom_to_list(Name), "@", nodehost(node())])). + +nodehost(Node) -> %% This is horrible, but there doesn't seem to be a way to split a %% nodename into its constituent parts. - list_to_atom(lists:append(atom_to_list(Name), - lists:dropwhile(fun (E) -> E =/= $@ end, - atom_to_list(node())))). + tl(lists:dropwhile(fun (E) -> E =/= $@ end, atom_to_list(Node))). + +cookie_hash() -> + ssl_base64:encode(erlang:md5(atom_to_list(erlang:get_cookie()))). tcp_name(Prefix, IPAddress, Port) when is_atom(Prefix) andalso is_number(Port) -> @@ -360,7 +376,9 @@ dirty_foreach_key1(F, TableName, K) -> end. dirty_dump_log(FileName) -> - {ok, LH} = disk_log:open([{name, dirty_dump_log}, {mode, read_only}, {file, FileName}]), + {ok, LH} = disk_log:open([{name, dirty_dump_log}, + {mode, read_only}, + {file, FileName}]), dirty_dump_log1(LH, disk_log:chunk(LH, start)), disk_log:close(LH). @@ -374,6 +392,12 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) -> dirty_dump_log1(LH, disk_log:chunk(LH, K)). +read_term_file(File) -> file:consult(File). + +write_term_file(File, Terms) -> + file:write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) || + Term <- Terms])). + append_file(File, Suffix) -> case file:read_file_info(File) of {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix); @@ -444,3 +468,18 @@ stop_applications(Apps) -> cannot_stop_application, Apps). +unfold(Fun, Init) -> + unfold(Fun, [], Init). + +unfold(Fun, Acc, Init) -> + case Fun(Init) of + {true, E, I} -> unfold(Fun, [E|Acc], I); + false -> {Acc, Init} + end. + +ceil(N) -> + T = trunc(N), + case N - T of + 0 -> N; + _ -> 1 + T + end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 575ecb0adc..c4d5aac684 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -50,7 +50,7 @@ -spec(dir/0 :: () -> string()). -spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). --spec(is_db_empty/0 :: () -> bool()). +-spec(is_db_empty/0 :: () -> boolean()). -spec(cluster/1 :: ([erlang_node()]) -> 'ok'). -spec(reset/0 :: () -> 'ok'). -spec(force_reset/0 :: () -> 'ok'). @@ -149,6 +149,11 @@ table_definitions() -> table_names() -> [Tab || {Tab, _} <- table_definitions()]. +replicated_table_names() -> + [Tab || {Tab, Attrs} <- table_definitions(), + not lists:member({local_content, true}, Attrs) + ]. + dir() -> mnesia:system_info(directory). ensure_mnesia_dir() -> @@ -192,28 +197,16 @@ cluster_nodes_config_filename() -> create_cluster_nodes_config(ClusterNodes) -> FileName = cluster_nodes_config_filename(), - Handle = case file:open(FileName, [write]) of - {ok, Device} -> Device; - {error, Reason} -> - throw({error, {cannot_create_cluster_nodes_config, - FileName, Reason}}) - end, - try - ok = io:write(Handle, ClusterNodes), - ok = io:put_chars(Handle, [$.]) - after - case file:close(Handle) of - ok -> ok; - {error, Reason1} -> - throw({error, {cannot_close_cluster_nodes_config, - FileName, Reason1}}) - end - end, - ok. + case rabbit_misc:write_term_file(FileName, [ClusterNodes]) of + ok -> ok; + {error, Reason} -> + throw({error, {cannot_create_cluster_nodes_config, + FileName, Reason}}) + end. read_cluster_nodes_config() -> FileName = cluster_nodes_config_filename(), - case file:consult(FileName) of + case rabbit_misc:read_term_file(FileName) of {ok, [ClusterNodes]} -> ClusterNodes; {error, enoent} -> case application:get_env(cluster_config) of @@ -250,12 +243,10 @@ delete_cluster_nodes_config() -> %% standalone disk node, or disk or ram node connected to the %% specified cluster nodes. init_db(ClusterNodes) -> - WasDiskNode = mnesia:system_info(use_dir), - IsDiskNode = ClusterNodes == [] orelse - lists:member(node(), ClusterNodes), case mnesia:change_config(extra_db_nodes, ClusterNodes -- [node()]) of {ok, []} -> - if WasDiskNode and IsDiskNode -> + case mnesia:system_info(use_dir) of + true -> case check_schema_integrity() of ok -> ok; @@ -270,22 +261,18 @@ init_db(ClusterNodes) -> ok = move_db(), ok = create_schema() end; - WasDiskNode -> - throw({error, {cannot_convert_disk_node_to_ram_node, - ClusterNodes}}); - IsDiskNode -> - ok = create_schema(); - true -> - throw({error, {unable_to_contact_cluster_nodes, - ClusterNodes}}) + false -> + ok = create_schema() end; {ok, [_|_]} -> - ok = wait_for_tables(), - ok = create_local_table_copies( - case IsDiskNode of - true -> disc; - false -> ram - end); + IsDiskNode = ClusterNodes == [] orelse + lists:member(node(), ClusterNodes), + ok = wait_for_replicated_tables(), + ok = create_local_table_copy(schema, disc_copies), + ok = create_local_table_copies(case IsDiskNode of + true -> disc; + false -> ram + end); {error, Reason} -> %% one reason we may end up here is if we try to join %% nodes together that are currently running standalone or @@ -336,40 +323,36 @@ create_tables() -> table_definitions()), ok. +table_has_copy_type(TabDef, DiscType) -> + lists:member(node(), proplists:get_value(DiscType, TabDef, [])). + create_local_table_copies(Type) -> - ok = if Type /= ram -> create_local_table_copy(schema, disc_copies); - true -> ok - end, lists:foreach( fun({Tab, TabDef}) -> - HasDiscCopies = - lists:keymember(disc_copies, 1, TabDef), - HasDiscOnlyCopies = - lists:keymember(disc_only_copies, 1, TabDef), + HasDiscCopies = table_has_copy_type(TabDef, disc_copies), + HasDiscOnlyCopies = table_has_copy_type(TabDef, disc_only_copies), + LocalTab = proplists:get_bool(local_content, TabDef), StorageType = - case Type of - disc -> + if + Type =:= disc orelse LocalTab -> if - HasDiscCopies -> disc_copies; + HasDiscCopies -> disc_copies; HasDiscOnlyCopies -> disc_only_copies; - true -> ram_copies + true -> ram_copies end; %% unused code - commented out to keep dialyzer happy -%% disc_only -> +%% Type =:= disc_only -> %% if %% HasDiscCopies or HasDiscOnlyCopies -> %% disc_only_copies; %% true -> ram_copies %% end; - ram -> + Type =:= ram -> ram_copies end, ok = create_local_table_copy(Tab, StorageType) end, table_definitions()), - ok = if Type == ram -> create_local_table_copy(schema, ram_copies); - true -> ok - end, ok. create_local_table_copy(Tab, Type) -> @@ -384,10 +367,14 @@ create_local_table_copy(Tab, Type) -> end, ok. -wait_for_tables() -> +wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). + +wait_for_tables() -> wait_for_tables(table_names()). + +wait_for_tables(TableNames) -> case check_schema_integrity() of ok -> - case mnesia:wait_for_tables(table_names(), 30000) of + case mnesia:wait_for_tables(TableNames, 30000) of ok -> ok; {timeout, BadTabs} -> throw({error, {timeout_waiting_for_tables, BadTabs}}); diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index d91975359a..b1cc4d028f 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -114,12 +114,13 @@ action(status, [], RpcTimeout) -> io:format("Status of all running nodes...~n", []), call_all_nodes( fun({Node, Pid}) -> - Status = rpc:call(Node, rabbit, status, [], RpcTimeout), + RabbitRunning = + case is_rabbit_running(Node, RpcTimeout) of + false -> not_running; + true -> running + end, io:format("Node '~p' with Pid ~p: ~p~n", - [Node, Pid, case parse_status(Status) of - false -> not_running; - true -> running - end]) + [Node, Pid, RabbitRunning]) end); action(stop_all, [], RpcTimeout) -> @@ -197,7 +198,7 @@ start_node(NodeName, NodePort, RpcTimeout) -> wait_for_rabbit_to_start(_ , RpcTimeout, _) when RpcTimeout < 0 -> false; wait_for_rabbit_to_start(Node, RpcTimeout, Port) -> - case parse_status(rpc:call(Node, rabbit, status, [])) of + case is_rabbit_running(Node, RpcTimeout) of true -> true; false -> receive {'EXIT', Port, PosixCode} -> @@ -211,22 +212,20 @@ wait_for_rabbit_to_start(Node, RpcTimeout, Port) -> run_cmd(FullPath) -> erlang:open_port({spawn, FullPath}, [nouse_stdio]). -parse_status({badrpc, _}) -> - false; - -parse_status(Status) -> - case lists:keysearch(running_applications, 1, Status) of - {value, {running_applications, Apps}} -> - lists:keymember(rabbit, 1, Apps); - _ -> - false +is_rabbit_running(Node, RpcTimeout) -> + case rpc:call(Node, rabbit, status, [], RpcTimeout) of + {badrpc, _} -> false; + Status -> case proplists:get_value(running_applications, Status) of + undefined -> false; + Apps -> lists:keymember(rabbit, 1, Apps) + end end. with_os(Handlers) -> {OsFamily, _} = os:type(), - case lists:keysearch(OsFamily, 1, Handlers) of - {value, {_, Handler}} -> Handler(); - false -> throw({unsupported_os, OsFamily}) + case proplists:get_value(OsFamily, Handlers) of + undefined -> throw({unsupported_os, OsFamily}); + Handler -> Handler() end. script_filename() -> diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl new file mode 100644 index 0000000000..a5ccc8e9ae --- /dev/null +++ b/src/rabbit_net.erl @@ -0,0 +1,132 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_net). +-include("rabbit.hrl"). +-include_lib("kernel/include/inet.hrl"). + +-export([async_recv/3, close/1, controlling_process/2, + getstat/2, peername/1, port_command/2, + send/2, sockname/1]). +%%--------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(stat_option() :: + 'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' | + 'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend'). +-type(error() :: {'error', any()}). + +-spec(async_recv/3 :: (socket(), integer(), timeout()) -> {'ok', any()}). +-spec(close/1 :: (socket()) -> 'ok' | error()). +-spec(controlling_process/2 :: (socket(), pid()) -> 'ok' | error()). +-spec(port_command/2 :: (socket(), iolist()) -> 'true'). +-spec(send/2 :: (socket(), binary() | iolist()) -> 'ok' | error()). +-spec(peername/1 :: (socket()) -> + {'ok', {ip_address(), non_neg_integer()}} | error()). +-spec(sockname/1 :: (socket()) -> + {'ok', {ip_address(), non_neg_integer()}} | error()). +-spec(getstat/2 :: (socket(), [stat_option()]) -> + {'ok', [{stat_option(), integer()}]} | error()). + +-endif. + +%%--------------------------------------------------------------------------- + + +async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) -> + Pid = self(), + Ref = make_ref(), + + spawn(fun() -> Pid ! {inet_async, Sock, Ref, + ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)} + end), + + {ok, Ref}; + +async_recv(Sock, Length, infinity) when is_port(Sock) -> + prim_inet:async_recv(Sock, Length, -1); + +async_recv(Sock, Length, Timeout) when is_port(Sock) -> + prim_inet:async_recv(Sock, Length, Timeout). + +close(Sock) when is_record(Sock, ssl_socket) -> + ssl:close(Sock#ssl_socket.ssl); + +close(Sock) when is_port(Sock) -> + gen_tcp:close(Sock). + + +controlling_process(Sock, Pid) when is_record(Sock, ssl_socket) -> + ssl:controlling_process(Sock#ssl_socket.ssl, Pid); + +controlling_process(Sock, Pid) when is_port(Sock) -> + gen_tcp:controlling_process(Sock, Pid). + + +getstat(Sock, Stats) when is_record(Sock, ssl_socket) -> + inet:getstat(Sock#ssl_socket.tcp, Stats); + +getstat(Sock, Stats) when is_port(Sock) -> + inet:getstat(Sock, Stats). + + +peername(Sock) when is_record(Sock, ssl_socket) -> + ssl:peername(Sock#ssl_socket.ssl); + +peername(Sock) when is_port(Sock) -> + inet:peername(Sock). + + +port_command(Sock, Data) when is_record(Sock, ssl_socket) -> + case ssl:send(Sock#ssl_socket.ssl, Data) of + ok -> + self() ! {inet_reply, Sock, ok}, + true; + {error, Reason} -> + erlang:error(Reason) + end; + +port_command(Sock, Data) when is_port(Sock) -> + erlang:port_command(Sock, Data). + +send(Sock, Data) when is_record(Sock, ssl_socket) -> + ssl:send(Sock#ssl_socket.ssl, Data); + +send(Sock, Data) when is_port(Sock) -> + gen_tcp:send(Sock, Data). + + +sockname(Sock) when is_record(Sock, ssl_socket) -> + ssl:sockname(Sock#ssl_socket.ssl); + +sockname(Sock) when is_port(Sock) -> + inet:sockname(Sock). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 2dbd5a5af2..1bc17a324c 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -31,18 +31,28 @@ -module(rabbit_networking). --export([start/0, start_tcp_listener/2, stop_tcp_listener/2, - on_node_down/1, active_listeners/0, node_listeners/1, - connections/0, connection_info/1, connection_info/2, - connection_info_all/0, connection_info_all/1]). +-export([start/0, start_tcp_listener/2, start_ssl_listener/3, + stop_tcp_listener/2, on_node_down/1, active_listeners/0, + node_listeners/1, connections/0, connection_info/1, + connection_info/2, connection_info_all/0, + connection_info_all/1]). %%used by TCP-based transports, e.g. STOMP adapter -export([check_tcp_listener_address/3]). --export([tcp_listener_started/2, tcp_listener_stopped/2, start_client/1]). +-export([tcp_listener_started/2, tcp_listener_stopped/2, + start_client/1, start_ssl_client/2]). -include("rabbit.hrl"). -include_lib("kernel/include/inet.hrl"). +-define(RABBIT_TCP_OPTS, [ + binary, + {packet, raw}, % no packaging + {reuseaddr, true}, % allow rebind without waiting + %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg. + %% {delay_send, true}, + {exit_on_close, false} + ]). %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -52,6 +62,7 @@ -spec(start/0 :: () -> 'ok'). -spec(start_tcp_listener/2 :: (host(), ip_port()) -> 'ok'). +-spec(start_ssl_listener/3 :: (host(), ip_port(), [info()]) -> 'ok'). -spec(stop_tcp_listener/2 :: (host(), ip_port()) -> 'ok'). -spec(active_listeners/0 :: () -> [listener()]). -spec(node_listeners/1 :: (erlang_node()) -> [listener()]). @@ -90,27 +101,30 @@ check_tcp_listener_address(NamePrefix, Host, Port) -> if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok; true -> error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]), - throw({error, invalid_port, Port}) + throw({error, {invalid_port, Port}}) end, Name = rabbit_misc:tcp_name(NamePrefix, IPAddress, Port), {IPAddress, Name}. start_tcp_listener(Host, Port) -> - {IPAddress, Name} = check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port), + start_listener(Host, Port, "TCP Listener", + {?MODULE, start_client, []}). + +start_ssl_listener(Host, Port, SslOpts) -> + start_listener(Host, Port, "SSL Listener", + {?MODULE, start_ssl_client, [SslOpts]}). + +start_listener(Host, Port, Label, OnConnect) -> + {IPAddress, Name} = + check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port), {ok,_} = supervisor:start_child( rabbit_sup, {Name, {tcp_listener_sup, start_link, - [IPAddress, Port, - [binary, - {packet, raw}, % no packaging - {reuseaddr, true}, % allow rebind without waiting - %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg. - %% {delay_send, true}, - {exit_on_close, false}], + [IPAddress, Port, ?RABBIT_TCP_OPTS , {?MODULE, tcp_listener_started, []}, {?MODULE, tcp_listener_stopped, []}, - {?MODULE, start_client, []}]}, + OnConnect, Label]}, transient, infinity, supervisor, [tcp_listener_sup]}), ok. @@ -148,10 +162,35 @@ on_node_down(Node) -> start_client(Sock) -> {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), - ok = gen_tcp:controlling_process(Sock, Child), + ok = rabbit_net:controlling_process(Sock, Child), Child ! {go, Sock}, Child. +start_ssl_client(SslOpts, Sock) -> + case rabbit_net:peername(Sock) of + {ok, {PeerAddress, PeerPort}} -> + PeerIp = inet_parse:ntoa(PeerAddress), + case ssl:ssl_accept(Sock, SslOpts) of + {ok, SslSock} -> + rabbit_log:info("upgraded TCP connection " + "from ~s:~p to SSL~n", + [PeerIp, PeerPort]), + RabbitSslSock = #ssl_socket{tcp = Sock, ssl = SslSock}, + start_client(RabbitSslSock); + {error, Reason} -> + gen_tcp:close(Sock), + rabbit_log:error("failed to upgrade TCP connection " + "from ~s:~p to SSL: ~n~p~n", + [PeerIp, PeerPort, Reason]), + {error, Reason} + end; + {error, Reason} -> + gen_tcp:close(Sock), + rabbit_log:error("failed to upgrade TCP connection to SSL: ~p~n", + [Reason]), + {error, Reason} + end. + connections() -> [Pid || {_, Pid, _, _} <- supervisor:which_children( rabbit_tcp_client_sup)]. diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index 71278bfb2a..f28c4a6ec5 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -49,6 +49,8 @@ start() -> UnpackedPluginDir = get_env(plugins_expand_dir, ?DefaultUnpackedPluginDir), RabbitEBin = get_env(rabbit_ebin, ?DefaultRabbitEBin), + RootName = RabbitEBin ++ "/rabbit", + %% Unpack any .ez plugins unpack_ez_plugins(PluginDir, UnpackedPluginDir), @@ -60,15 +62,13 @@ start() -> %% Build the entire set of dependencies - this will load the %% applications along the way AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of - {unknown_app, {App, Err}} -> - io:format("ERROR: Failed to load application " ++ - "~s: ~p~n", [App, Err]), - halt(1); + {failed_to_load_app, App, Err} -> + error("failed to load application ~s: ~p", [App, Err]); AppList -> AppList end, AppVersions = [determine_version(App) || App <- AllApps], - {value, {rabbit, RabbitVersion}} = lists:keysearch(rabbit, 1, AppVersions), + {rabbit, RabbitVersion} = proplists:lookup(rabbit, AppVersions), %% Build the overall release descriptor RDesc = {release, @@ -77,11 +77,11 @@ start() -> AppVersions}, %% Write it out to ebin/rabbit.rel - file:write_file(RabbitEBin ++ "/rabbit.rel", - io_lib:format("~p.~n", [RDesc])), + file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])), %% Compile the script - case systools:make_script(RabbitEBin ++ "/rabbit", [local, silent]) of + ScriptFile = RootName ++ ".script", + case systools:make_script(RootName, [local, silent]) of {ok, Module, Warnings} -> %% This gets lots of spurious no-source warnings when we %% have .ez files, so we want to supress them to prevent @@ -98,9 +98,19 @@ start() -> end, ok; {error, Module, Error} -> - io:format("Boot file generation failed: ~s~n", - [Module:format_error(Error)]), - halt(1) + error("generation of boot script file ~s failed: ~w", + [ScriptFile, Module:format_error(Error)]) + end, + + case post_process_script(ScriptFile) of + ok -> ok; + {error, Reason} -> + error("post processing of boot script file ~s failed: ~w", + [ScriptFile, Reason]) + end, + case systools:script2boot(RootName) of + ok -> ok; + error -> error("failed to compile boot script file ~s", [ScriptFile]) end, halt(), ok. @@ -122,10 +132,10 @@ determine_version(App) -> assert_dir(Dir) -> case filelib:is_dir(Dir) of true -> ok; - false -> - ok = filelib:ensure_dir(Dir), - ok = file:make_dir(Dir) + false -> ok = filelib:ensure_dir(Dir), + ok = file:make_dir(Dir) end. + delete_dir(Dir) -> case filelib:is_dir(Dir) of true -> @@ -143,6 +153,7 @@ delete_dir(Dir) -> false -> ok end. + is_symlink(Name) -> case file:read_link(Name) of {ok, _} -> true; @@ -185,14 +196,43 @@ expand_dependencies(Current, [Next|Rest]) -> expand_dependencies(Current, Rest); false -> case application:load(Next) of - ok -> + ok -> ok; - {error, {already_loaded, _}} -> + {error, {already_loaded, _}} -> ok; - X -> - throw({unknown_app, {Next, X}}) + {error, Reason} -> + throw({failed_to_load_app, Next, Reason}) end, {ok, Required} = application:get_key(Next, applications), Unique = [A || A <- Required, not(sets:is_element(A, Current))], expand_dependencies(sets:add_element(Next, Current), Rest ++ Unique) end. + +post_process_script(ScriptFile) -> + case file:consult(ScriptFile) of + {ok, [{script, Name, Entries}]} -> + NewEntries = process_entries(Entries), + case file:open(ScriptFile, [write]) of + {ok, Fd} -> + io:format(Fd, "%% script generated at ~w ~w~n~p.~n", + [date(), time(), {script, Name, NewEntries}]), + file:close(Fd), + ok; + {error, OReason} -> + {error, {failed_to_open_script_file_for_writing, OReason}} + end; + {error, Reason} -> + {error, {failed_to_load_script, Reason}} + end. + +process_entries([]) -> + []; +process_entries([Entry = {apply,{application,start_boot,[stdlib,permanent]}} | + Rest]) -> + [Entry, {apply,{rabbit,prepare,[]}} | Rest]; +process_entries([Entry|Rest]) -> + [Entry | process_entries(Rest)]. + +error(Fmt, Args) -> + io:format("ERROR: " ++ Fmt ++ "~n", Args), + halt(1). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 6a7d68084d..5816ba109b 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -49,7 +49,6 @@ -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 1). --define(CHANNEL_CLOSING_TIMEOUT, 1). -define(CHANNEL_TERMINATION_TIMEOUT, 3). %--------------------------------------------------------------------------- @@ -94,23 +93,19 @@ %% -> log error, wait for channels to terminate forcefully, start %% terminate_connection timer, send close, *closed* %% channel exit with soft error -%% -> log error, start terminate_channel timer, mark channel as -%% closing, *running* -%% terminate_channel timeout -> remove 'closing' mark, *running* +%% -> log error, mark channel as closing, *running* %% handshake_timeout -> ignore, *running* %% heartbeat timeout -> *throw* %% closing: %% socket close -> *terminate* %% receive frame -> ignore, *closing* -%% terminate_channel timeout -> remove 'closing' mark, *closing* %% handshake_timeout -> ignore, *closing* %% heartbeat timeout -> *throw* %% channel exit with hard error %% -> log error, wait for channels to terminate forcefully, start %% terminate_connection timer, send close, *closed* %% channel exit with soft error -%% -> log error, start terminate_channel timer, mark channel as -%% closing +%% -> log error, mark channel as closing %% if last channel to exit then send connection.close_ok, %% start terminate_connection timer, *closed* %% else *closing* @@ -123,7 +118,6 @@ %% *closed* %% receive frame -> ignore, *closed* %% terminate_connection timeout -> *terminate* -%% terminate_channel timeout -> remove 'closing' mark, *closed* %% handshake_timeout -> ignore, *closed* %% heartbeat timeout -> *throw* %% channel exit -> log error, *closed* @@ -200,7 +194,7 @@ inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). peername(Sock) -> try - {Address, Port} = inet_op(fun () -> inet:peername(Sock) end), + {Address, Port} = inet_op(fun () -> rabbit_net:peername(Sock) end), AddressS = inet_parse:ntoa(Address), {AddressS, Port} catch @@ -269,12 +263,10 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> throw({inet_error, Reason}); {'EXIT', Parent, Reason} -> if State#v1.connection_state =:= running -> - send_exception( - State, 0, - {amqp, connection_forced, - io_lib:format( - "broker forced connection closure with reason '~w'", - [Reason]), none}); + send_exception(State, 0, + rabbit_misc:amqp_error(connection_forced, + "broker forced connection closure with reason '~w'", + [Reason], none)); true -> ok end, %% this is what we are expected to do according to @@ -286,14 +278,12 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> %% since this termination is initiated by our parent it is %% probably more important to exit quickly. exit(Reason); - {'EXIT', _Pid, E = {writer, send_failed, _Error}} -> + {channel_exit, _Chan, E = {writer, send_failed, _Error}} -> throw(E); {channel_exit, Channel, Reason} -> mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State)); {'EXIT', Pid, Reason} -> mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State)); - {terminate_channel, Channel, Ref1} -> - mainloop(Parent, Deb, terminate_channel(Channel, Ref1, State)); terminate_connection -> State; handshake_timeout -> @@ -323,8 +313,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> end. switch_callback(OldState, NewCallback, Length) -> - Ref = inet_op(fun () -> prim_inet:async_recv( - OldState#v1.sock, Length, -1) end), + Ref = inet_op(fun () -> rabbit_net:async_recv( + OldState#v1.sock, Length, infinity) end), OldState#v1{callback = NewCallback, recv_ref = Ref}. @@ -341,32 +331,14 @@ close_connection(State = #v1{connection = #connection{ State#v1{connection_state = closed}. close_channel(Channel, State) -> - Ref = make_ref(), - TRef = erlang:send_after(1000 * ?CHANNEL_CLOSING_TIMEOUT, - self(), - {terminate_channel, Channel, Ref}), - put({closing_channel, Channel}, {Ref, TRef}), - State. - -terminate_channel(Channel, Ref, State) -> - case get({closing_channel, Channel}) of - undefined -> ok; %% got close_ok in the meantime - {Ref, _} -> erase({closing_channel, Channel}), - ok; - {_Ref, _} -> ok %% got close_ok, and have new closing channel - end, + put({channel, Channel}, closing), State. handle_channel_exit(Channel, Reason, State) -> - %% We remove the channel from the inbound map only. That allows - %% the channel to be re-opened, but also means the remaining - %% cleanup, including possibly closing the connection, is deferred - %% until we get the (normal) exit signal. - erase({channel, Channel}), handle_exception(State, Channel, Reason). handle_dependent_exit(Pid, normal, State) -> - channel_cleanup(Pid), + erase({chpid, Pid}), maybe_close(State); handle_dependent_exit(Pid, Reason, State) -> case channel_cleanup(Pid) of @@ -376,17 +348,10 @@ handle_dependent_exit(Pid, Reason, State) -> channel_cleanup(Pid) -> case get({chpid, Pid}) of - undefined -> - case get({closing_chpid, Pid}) of - undefined -> undefined; - {channel, Channel} -> - erase({closing_chpid, Pid}), - Channel - end; - {channel, Channel} -> - erase({channel, Channel}), - erase({chpid, Pid}), - Channel + undefined -> undefined; + {channel, Channel} -> erase({channel, Channel}), + erase({chpid, Pid}), + Channel end. all_channels() -> [Pid || {{chpid, Pid},_} <- get()]. @@ -451,7 +416,7 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS}) State; handle_frame(Type, 0, Payload, State) -> case analyze_frame(Type, Payload) of - error -> throw({unknown_frame, Type, Payload}); + error -> throw({unknown_frame, 0, Type, Payload}); heartbeat -> State; {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); @@ -459,19 +424,33 @@ handle_frame(Type, 0, Payload, State) -> end; handle_frame(Type, Channel, Payload, State) -> case analyze_frame(Type, Payload) of - error -> throw({unknown_frame, Type, Payload}); + error -> throw({unknown_frame, Channel, Type, Payload}); heartbeat -> throw({unexpected_heartbeat_frame, Channel}); AnalyzedFrame -> %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of {chpid, ChPid} -> - ok = check_for_close(Channel, ChPid, AnalyzedFrame), + case AnalyzedFrame of + {method, 'channel.close', _} -> + erase({channel, Channel}); + _ -> ok + end, ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame), State; + closing -> + %% According to the spec, after sending a + %% channel.close we must ignore all frames except + %% channel.close_ok. + case AnalyzedFrame of + {method, 'channel.close_ok', _} -> + erase({channel, Channel}); + _ -> ok + end, + State; undefined -> case State#v1.connection_state of - running -> send_to_new_channel( - Channel, AnalyzedFrame, State), + running -> ok = send_to_new_channel( + Channel, AnalyzedFrame, State), State; Other -> throw({channel_frame_while_starting, Channel, Other, AnalyzedFrame}) @@ -510,9 +489,8 @@ handle_input(handshake, <<"AMQP",0,ProtocolMajor,ProtocolMinor,ProtocolRevision> handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, State) -> %% 0-8 and 0-9 style protocol header. check_protocol_header(ProtocolMajor, ProtocolMinor, 0, State); - handle_input(handshake, Other, #v1{sock = Sock}) -> - ok = inet_op(fun () -> gen_tcp:send( + ok = inet_op(fun () -> rabbit_net:send( Sock, <<"AMQP",1,1, ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR>>) end), @@ -570,17 +548,17 @@ handle_method0(MethodName, FieldsBin, State) -> MethodName, FieldsBin), State) catch exit:Reason -> - CompleteReason = - case Reason of - {amqp, Error, Explanation, none} -> - {amqp, Error, Explanation, MethodName}; - OtherReason -> OtherReason - end, + CompleteReason = case Reason of + #amqp_error{method = none} -> + Reason#amqp_error{method = MethodName}; + OtherReason -> OtherReason + end, case State#v1.connection_state of running -> send_exception(State, 0, CompleteReason); Other -> throw({channel0_error, Other, CompleteReason}) end end. + handle_method0(#'connection.start_ok'{mechanism = Mechanism, response = Response}, State = #v1{connection_state = starting, @@ -646,23 +624,23 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, #v1{}) -> self(); i(address, #v1{sock = Sock}) -> - {ok, {A, _}} = inet:sockname(Sock), + {ok, {A, _}} = rabbit_net:sockname(Sock), A; i(port, #v1{sock = Sock}) -> - {ok, {_, P}} = inet:sockname(Sock), + {ok, {_, P}} = rabbit_net:sockname(Sock), P; i(peer_address, #v1{sock = Sock}) -> - {ok, {A, _}} = inet:peername(Sock), + {ok, {A, _}} = rabbit_net:peername(Sock), A; i(peer_port, #v1{sock = Sock}) -> - {ok, {_, P}} = inet:peername(Sock), + {ok, {_, P}} = rabbit_net:peername(Sock), P; i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct; SockStat =:= recv_cnt; SockStat =:= send_oct; SockStat =:= send_cnt; SockStat =:= send_pend -> - case inet:getstat(Sock, [SockStat]) of + case rabbit_net:getstat(Sock, [SockStat]) of {ok, [{SockStat, StatVal}]} -> StatVal; {error, einval} -> undefined; {error, Error} -> throw({cannot_get_socket_stats, Error}) @@ -674,7 +652,7 @@ i(channels, #v1{}) -> i(user, #v1{connection = #connection{user = #user{username = Username}}}) -> Username; i(user, #v1{connection = #connection{user = none}}) -> - none; + ''; i(vhost, #v1{connection = #connection{vhost = VHost}}) -> VHost; i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) -> @@ -687,38 +665,17 @@ i(Item, #v1{}) -> %%-------------------------------------------------------------------------- send_to_new_channel(Channel, AnalyzedFrame, State) -> - case get({closing_channel, Channel}) of - undefined -> - #v1{sock = Sock, - connection = #connection{ - frame_max = FrameMax, - user = #user{username = Username}, - vhost = VHost}} = State, - WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), - ChPid = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/5, - [Channel, self(), WriterPid, Username, VHost]), - put({channel, Channel}, {chpid, ChPid}), - put({chpid, ChPid}, {channel, Channel}), - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame); - {_, TRef} -> - %% According to the spec, after sending a channel.close we - %% must ignore all frames except channel.close_ok. - case AnalyzedFrame of - {method, 'channel.close_ok', _} -> - erlang:cancel_timer(TRef), - erase({closing_channel, Channel}), - ok; - _Other -> ok - end - end. - -check_for_close(Channel, ChPid, {method, 'channel.close', _}) -> - channel_cleanup(ChPid), - put({closing_chpid, ChPid}, {channel, Channel}), - ok; -check_for_close(_Channel, _ChPid, _Frame) -> - ok. + #v1{sock = Sock, connection = #connection{ + frame_max = FrameMax, + user = #user{username = Username}, + vhost = VHost}} = State, + WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), + ChPid = rabbit_framing_channel:start_link( + fun rabbit_channel:start_link/5, + [Channel, self(), WriterPid, Username, VHost]), + put({channel, Channel}, {chpid, ChPid}), + put({chpid, ChPid}, {channel, Channel}), + ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", @@ -764,18 +721,27 @@ map_exception(Channel, Reason) -> end, {ShouldClose, CloseChannel, CloseMethod}. -lookup_amqp_exception({amqp, {ShouldClose, Code, Text}, Expl, Method}) -> - ExplBin = list_to_binary(Expl), - CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>, - SafeTextBin = if size(CompleteTextBin) > 255 -> - <<CompleteTextBin:252/binary, "...">>; - true -> - CompleteTextBin - end, - {ShouldClose, Code, SafeTextBin, Method}; -lookup_amqp_exception({amqp, ErrorName, Expl, Method}) -> - Details = rabbit_framing:lookup_amqp_exception(ErrorName), - lookup_amqp_exception({amqp, Details, Expl, Method}); +%% FIXME: this clause can go when we move to AMQP spec >=8.1 +lookup_amqp_exception(#amqp_error{name = precondition_failed, + explanation = Expl, + method = Method}) -> + ExplBin = amqp_exception_explanation(<<"PRECONDITION_FAILED">>, Expl), + {false, 406, ExplBin, Method}; +lookup_amqp_exception(#amqp_error{name = Name, + explanation = Expl, + method = Method}) -> + {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name), + ExplBin = amqp_exception_explanation(Text, Expl), + {ShouldClose, Code, ExplBin, Method}; lookup_amqp_exception(Other) -> rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]), - {true, ?INTERNAL_ERROR, <<"INTERNAL_ERROR">>, none}. + {ShouldClose, Code, Text} = + rabbit_framing:lookup_amqp_exception(internal_error), + {ShouldClose, Code, Text, none}. + +amqp_exception_explanation(Text, Expl) -> + ExplBin = list_to_binary(Expl), + CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>, + if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>; + true -> CompleteTextBin + end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index e5100ccd16..5c5c55f1e4 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -49,6 +49,7 @@ test_content_prop_roundtrip(Datum, Binary) -> all_tests() -> passed = test_priority_queue(), + passed = test_unfold(), passed = test_parsing(), passed = test_topic_matching(), passed = test_log_management(), @@ -75,7 +76,8 @@ test_priority_queue() -> %% 1-element priority Q Q1 = priority_queue:in(foo, 1, priority_queue:new()), - {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1), + {true, false, 1, [{1, foo}], [foo]} = + test_priority_queue(Q1), %% 2-element same-priority Q Q2 = priority_queue:in(bar, 1, Q1), @@ -91,6 +93,71 @@ test_priority_queue() -> Q4 = priority_queue:in(foo, -1, priority_queue:new()), {true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4), + %% merge 2 * 1-element no-priority Qs + Q5 = priority_queue:join(priority_queue:in(foo, Q), + priority_queue:in(bar, Q)), + {true, false, 2, [{0, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q5), + + %% merge 1-element no-priority Q with 1-element priority Q + Q6 = priority_queue:join(priority_queue:in(foo, Q), + priority_queue:in(bar, 1, Q)), + {true, false, 2, [{1, bar}, {0, foo}], [bar, foo]} = + test_priority_queue(Q6), + + %% merge 1-element priority Q with 1-element no-priority Q + Q7 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, Q)), + {true, false, 2, [{1, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q7), + + %% merge 2 * 1-element same-priority Qs + Q8 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, 1, Q)), + {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} = + test_priority_queue(Q8), + + %% merge 2 * 1-element different-priority Qs + Q9 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, 2, Q)), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = + test_priority_queue(Q9), + + %% merge 2 * 1-element different-priority Qs (other way around) + Q10 = priority_queue:join(priority_queue:in(bar, 2, Q), + priority_queue:in(foo, 1, Q)), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = + test_priority_queue(Q10), + + %% merge 2 * 2-element multi-different-priority Qs + Q11 = priority_queue:join(Q6, Q5), + {true, false, 4, [{1, bar}, {0, foo}, {0, foo}, {0, bar}], + [bar, foo, foo, bar]} = test_priority_queue(Q11), + + %% and the other way around + Q12 = priority_queue:join(Q5, Q6), + {true, false, 4, [{1, bar}, {0, foo}, {0, bar}, {0, foo}], + [bar, foo, bar, foo]} = test_priority_queue(Q12), + + %% merge with negative priorities + Q13 = priority_queue:join(Q4, Q5), + {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} = + test_priority_queue(Q13), + + %% and the other way around + Q14 = priority_queue:join(Q5, Q4), + {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} = + test_priority_queue(Q14), + + %% joins with empty queues: + Q1 = priority_queue:join(Q, Q1), + Q1 = priority_queue:join(Q1, Q), + + %% insert with priority into non-empty zero-priority queue + Q15 = priority_queue:in(baz, 1, Q5), + {true, false, 3, [{1, baz}, {0, foo}, {0, bar}], [baz, foo, bar]} = + test_priority_queue(Q15), + passed. priority_queue_in_all(Q, L) -> @@ -116,6 +183,14 @@ test_simple_n_element_queue(N) -> {true, false, N, ToListRes, Items} = test_priority_queue(Q), passed. +test_unfold() -> + {[], test} = rabbit_misc:unfold(fun (_V) -> false end, test), + List = lists:seq(2,20,2), + {List, 0} = rabbit_misc:unfold(fun (0) -> false; + (N) -> {true, N*2, N-1} + end, 10), + passed. + test_parsing() -> passed = test_content_properties(), passed. @@ -408,19 +483,17 @@ test_cluster_management() -> end, ClusteringSequence), - %% attempt to convert a disk node into a ram node + %% convert a disk node into a ram node ok = control_action(reset, []), ok = control_action(start_app, []), ok = control_action(stop_app, []), - {error, {cannot_convert_disk_node_to_ram_node, _}} = - control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), - %% attempt to join a non-existing cluster as a ram node + %% join a non-existing cluster as a ram node ok = control_action(reset, []), - {error, {unable_to_contact_cluster_nodes, _}} = - control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), SecondaryNode = rabbit_misc:localnode(hare), case net_adm:ping(SecondaryNode) of @@ -436,11 +509,12 @@ test_cluster_management2(SecondaryNode) -> NodeS = atom_to_list(node()), SecondaryNodeS = atom_to_list(SecondaryNode), - %% attempt to convert a disk node into a ram node + %% make a disk node ok = control_action(reset, []), ok = control_action(cluster, [NodeS]), - {error, {unable_to_join_cluster, _, _}} = - control_action(cluster, [SecondaryNodeS]), + %% make a ram node + ok = control_action(reset, []), + ok = control_action(cluster, [SecondaryNodeS]), %% join cluster as a ram node ok = control_action(reset, []), @@ -453,21 +527,21 @@ test_cluster_management2(SecondaryNode) -> ok = control_action(start_app, []), ok = control_action(stop_app, []), - %% attempt to join non-existing cluster as a ram node - {error, _} = control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), - + %% join non-existing cluster as a ram node + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), %% turn ram node into disk node + ok = control_action(reset, []), ok = control_action(cluster, [SecondaryNodeS, NodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), - %% attempt to convert a disk node into a ram node - {error, {cannot_convert_disk_node_to_ram_node, _}} = - control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + %% convert a disk node into a ram node + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), %% turn a disk node into a ram node + ok = control_action(reset, []), ok = control_action(cluster, [SecondaryNodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), @@ -592,7 +666,10 @@ test_server_status() -> {ok, _} = rabbit_amqqueue:delete(Q, false, false), %% list connections - [#listener{host = H, port = P} | _] = rabbit_networking:active_listeners(), + [#listener{host = H, port = P} | _] = + [L || L = #listener{node = N} <- rabbit_networking:active_listeners(), + N =:= node()], + {ok, C} = gen_tcp:connect(H, P, []), timer:sleep(100), ok = info_action( diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index e338ddfe9d..1679ce7c15 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -169,7 +169,7 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax) -> tcp_send(Sock, Data) -> rabbit_misc:throw_on_error(inet_error, - fun () -> gen_tcp:send(Sock, Data) end). + fun () -> rabbit_net:send(Sock, Data) end). internal_send_command(Sock, Channel, MethodRecord) -> ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)). @@ -206,6 +206,6 @@ internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) -> ok. port_cmd(Sock, Data) -> - try erlang:port_command(Sock, Data) + try rabbit_net:port_command(Sock, Data) catch error:Error -> exit({writer, send_failed, Error}) end. diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index aa8b8ad5a7..bc7425613f 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -67,15 +67,20 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, {ok, Mod} = inet_db:lookup_socket(LSock), inet_db:register_socket(Sock, Mod), - %% report - {ok, {Address, Port}} = inet:sockname(LSock), - {ok, {PeerAddress, PeerPort}} = inet:peername(Sock), - error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n", - [inet_parse:ntoa(Address), Port, - inet_parse:ntoa(PeerAddress), PeerPort]), - - %% handle - apply(M, F, A ++ [Sock]), + try + %% report + {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end), + {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end), + error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n", + [inet_parse:ntoa(Address), Port, + inet_parse:ntoa(PeerAddress), PeerPort]), + %% handle + apply(M, F, A ++ [Sock]) + catch {inet_error, Reason} -> + gen_tcp:close(Sock), + error_logger:error_msg("unable to accept TCP connection: ~p~n", + [Reason]) + end, %% accept more case prim_inet:async_accept(LSock, -1) of @@ -95,3 +100,7 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. + +%%-------------------------------------------------------------------- + +inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index 92a47cf127..4a2e149bb8 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -33,28 +33,28 @@ -behaviour(gen_server). --export([start_link/7]). +-export([start_link/8]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {sock, on_startup, on_shutdown}). +-record(state, {sock, on_startup, on_shutdown, label}). %%-------------------------------------------------------------------- start_link(IPAddress, Port, SocketOpts, ConcurrentAcceptorCount, AcceptorSup, - OnStartup, OnShutdown) -> + OnStartup, OnShutdown, Label) -> gen_server:start_link( ?MODULE, {IPAddress, Port, SocketOpts, ConcurrentAcceptorCount, AcceptorSup, - OnStartup, OnShutdown}, []). + OnStartup, OnShutdown, Label}, []). %%-------------------------------------------------------------------- init({IPAddress, Port, SocketOpts, ConcurrentAcceptorCount, AcceptorSup, - {M,F,A} = OnStartup, OnShutdown}) -> + {M,F,A} = OnStartup, OnShutdown, Label}) -> process_flag(trap_exit, true), case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress}, {active, false}]) of @@ -65,15 +65,16 @@ init({IPAddress, Port, SocketOpts, end, lists:duplicate(ConcurrentAcceptorCount, dummy)), {ok, {LIPAddress, LPort}} = inet:sockname(LSock), - error_logger:info_msg("started TCP listener on ~s:~p~n", - [inet_parse:ntoa(LIPAddress), LPort]), + error_logger:info_msg("started ~s on ~s:~p~n", + [Label, inet_parse:ntoa(LIPAddress), LPort]), apply(M, F, A ++ [IPAddress, Port]), - {ok, #state{sock=LSock, - on_startup = OnStartup, on_shutdown = OnShutdown}}; + {ok, #state{sock = LSock, + on_startup = OnStartup, on_shutdown = OnShutdown, + label = Label}}; {error, Reason} -> error_logger:error_msg( - "failed to start TCP listener on ~s:~p - ~p~n", - [inet_parse:ntoa(IPAddress), Port, Reason]), + "failed to start ~s on ~s:~p - ~p~n", + [Label, inet_parse:ntoa(IPAddress), Port, Reason]), {stop, {cannot_listen, IPAddress, Port, Reason}} end. @@ -86,11 +87,11 @@ handle_cast(_Msg, State) -> handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}}) -> +terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}, label=Label}) -> {ok, {IPAddress, Port}} = inet:sockname(LSock), gen_tcp:close(LSock), - error_logger:info_msg("stopped TCP listener on ~s:~p~n", - [inet_parse:ntoa(IPAddress), Port]), + error_logger:info_msg("stopped ~s on ~s:~p~n", + [Label, inet_parse:ntoa(IPAddress), Port]), apply(M, F, A ++ [IPAddress, Port]). code_change(_OldVsn, State, _Extra) -> diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index 901a0da3b7..d6bbac080f 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -33,23 +33,23 @@ -behaviour(supervisor). --export([start_link/6, start_link/7]). +-export([start_link/7, start_link/8]). -export([init/1]). start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback) -> + AcceptCallback, Label) -> start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback, 1). + AcceptCallback, 1, Label). start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback, ConcurrentAcceptorCount) -> + AcceptCallback, ConcurrentAcceptorCount, Label) -> supervisor:start_link( ?MODULE, {IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback, ConcurrentAcceptorCount}). + AcceptCallback, ConcurrentAcceptorCount, Label}). init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback, ConcurrentAcceptorCount}) -> + AcceptCallback, ConcurrentAcceptorCount, Label}) -> %% This is gross. The tcp_listener needs to know about the %% tcp_acceptor_sup, and the only way I can think of accomplishing %% that without jumping through hoops is to register the @@ -62,5 +62,5 @@ init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown, {tcp_listener, {tcp_listener, start_link, [IPAddress, Port, SocketOpts, ConcurrentAcceptorCount, Name, - OnStartup, OnShutdown]}, + OnStartup, OnShutdown, Label]}, transient, 100, worker, [tcp_listener]}]}}. |
