diff options
| author | Essien Ita Essien <essiene@gmail.com> | 2009-05-15 16:41:55 +0100 |
|---|---|---|
| committer | Essien Ita Essien <essiene@gmail.com> | 2009-05-15 16:41:55 +0100 |
| commit | 27e15fbe128f946fb8b8d5f6fabe593452820952 (patch) | |
| tree | a00ed0d01de5a48f1c50b83567b9c1cbd7de9fbe | |
| parent | 9b35a0aed208f16f9cfa1fb63c6e5c2b75f02a08 (diff) | |
| parent | 08c1136f058e41a7daec79fb9d6c45e36c1ab4b0 (diff) | |
| download | rabbitmq-server-git-27e15fbe128f946fb8b8d5f6fabe593452820952.tar.gz | |
Merge with upstream
43 files changed, 736 insertions, 305 deletions
@@ -94,11 +94,10 @@ run-node: all run-tests: all echo "rabbit_tests:all_tests()." | $(ERL_CALL) -start-background-node: stop-node +start-background-node: $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ RABBITMQ_NODE_ONLY=true \ - RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -detached" \ - ./scripts/rabbitmq-server ; sleep 1 + ./scripts/rabbitmq-server -detached; sleep 1 start-rabbit-on-node: all echo "rabbit:start()." | $(ERL_CALL) @@ -130,10 +129,10 @@ srcdist: distclean cp README.in $(TARGET_SRC_DIR)/README elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \ >> $(TARGET_SRC_DIR)/BUILD - sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in + sed -i.save 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ - cp codegen.py Makefile $(TARGET_SRC_DIR) + cp codegen.py Makefile generate_app $(TARGET_SRC_DIR) cp -r scripts $(TARGET_SRC_DIR) cp -r docs $(TARGET_SRC_DIR) @@ -150,7 +149,7 @@ distclean: clean %.gz: %.pod pod2man \ - -n `echo $$(basename $*) | sed -e 's/\.[^.]\+//'` \ + -n `echo $$(basename $*) | sed -e 's/\.[[:digit:]]\+//'` \ -s `echo $$(basename $*) | sed -e 's/.*\.\([^.]\+\)/\1/'` \ -c "RabbitMQ AMQP Server" \ -d "" \ diff --git a/docs/rabbitmq-multi.1.pod b/docs/rabbitmq-multi.1.pod index 82c3911681..23fd96ed65 100644 --- a/docs/rabbitmq-multi.1.pod +++ b/docs/rabbitmq-multi.1.pod @@ -40,7 +40,7 @@ Start 3 local RabbitMQ nodes with unique, sequential port numbers: =head1 SEE ALSO -rabbitmq-server(1), rabbitmqctl(1) +rabbitmq.conf(5), rabbitmq-server(1), rabbitmqctl(1) =head1 AUTHOR diff --git a/docs/rabbitmq-server.1.pod b/docs/rabbitmq-server.1.pod index 00210c8b5c..99a7ceccf3 100644 --- a/docs/rabbitmq-server.1.pod +++ b/docs/rabbitmq-server.1.pod @@ -44,7 +44,7 @@ B<RABBITMQ_NODE_PORT> Defaults to 5672. B<RABBITMQ_CLUSTER_CONFIG_FILE> - Defaults to /etc/default/rabbitmq_cluster.config. If this file is + Defaults to /etc/rabbitmq/rabbitmq_cluster.config. If this file is present it is used by the server to auto-configure a RabbitMQ cluster. See the clustering guide at http://www.rabbitmq.com/clustering.html @@ -62,7 +62,7 @@ Run RabbitMQ AMQP server in the background: =head1 SEE ALSO -rabbitmq-multi(1), rabbitmqctl(1) +rabbitmq.conf(5), rabbitmq-multi(1), rabbitmqctl(1) =head1 AUTHOR diff --git a/docs/rabbitmq.5.pod b/docs/rabbitmq.conf.5.pod index e6972935f5..9b2536c383 100644 --- a/docs/rabbitmq.5.pod +++ b/docs/rabbitmq.conf.5.pod @@ -1,10 +1,10 @@ =head1 NAME -/etc/default/rabbitmq - default settings for RabbitMQ AMQP server +/etc/rabbitmq/rabbitmq.conf - default settings for RabbitMQ AMQP server =head1 DESCRIPTION -/etc/default/rabbitmq contains variable settings that override the +/etc/rabbitmq/rabbitmq.conf contains variable settings that override the defaults built in to the RabbitMQ startup scripts. The file is interpreted by the system shell, and so should consist of @@ -13,29 +13,29 @@ syntax is permitted (since the file is sourced using the shell "." operator), including line comments starting with "#". In order of preference, the startup scripts get their values from the -environment, from /etc/default/rabbitmq, and finally from the built-in -default values. For example, for the B<RABBITMQ_NODENAME> setting, +environment, from /etc/rabbitmq/rabbitmq.conf and finally from the +built-in default values. For example, for the B<RABBITMQ_NODENAME> setting, B<RABBITMQ_NODENAME> - from the environment is checked first. If it is absent or equal - to the empty string, then + from the environment is checked first. If it is absent or equal to + the empty string, then B<NODENAME> - from /etc/default/rabbitmq is checked next. If it is also absent - or set equal to the empty string, then the default value from the - startup script is used. + from /etc/rabbitmq/rabbitmq.conf is checked. If it is also absent + or set equal to the empty string then the default value from + the startup script is used. -The variable names in /etc/default/rabbitmq are always equal to the +The variable names in /etc/rabbitmq/rabbitmq.conf are always equal to the environment variable names, with the B<RABBITMQ_> prefix removed: B<RABBITMQ_NODE_PORT> from the environment becomes B<NODE_PORT> in the -/etc/default/rabbitmq file, etc. +/etc/rabbitmq/rabbitmq.conf file, etc. =head1 EXAMPLES -The following is an example of a complete /etc/default/rabbitmq file +The following is an example of a complete /etc/rabbitmq/rabbitmq.conf file that overrides the default Erlang node name from "rabbit" to "hare": - # I am a complete /etc/default/rabbitmq file. + # I am a complete /etc/rabbitmq/rabbitmq.conf file. # Comment lines start with a hash character. # This is a /bin/sh script file - use ordinary envt var syntax NODENAME=hare @@ -46,7 +46,7 @@ rabbitmq-server(1), rabbitmq-multi(1), rabbitmqctl(1) =head1 AUTHOR -Originally written by The RabbitMQ Team <info@lshift.net> +Originally written by The RabbitMQ Team <info@rabbitmq.com> =head1 COPYRIGHT diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index d86aa2717d..a0232a40f2 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -109,8 +109,9 @@ add_vhost I<vhostpath> delete_vhost I<vhostpath> delete a virtual host I<vhostpath>. - That command deletes also all its exchanges, queues and user mappings. - + That command deletes also all its exchanges, queues and user + mappings. + list_vhosts list all virtual hosts. @@ -162,7 +163,8 @@ messages_ready number of messages ready to be delivered to clients messages_unacknowledged - number of messages delivered to clients but not yet acknowledged + number of messages delivered to clients but not yet + acknowledged messages_uncommitted number of messages published in as yet uncommitted transactions @@ -299,7 +301,7 @@ them: =head1 SEE ALSO -rabbitmq-multi(1), rabbitmq-server(1) +rabbitmq.conf(5), rabbitmq-multi(1), rabbitmq-server(1) =head1 AUTHOR diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index 973608ccba..9fe91b98d2 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -5,7 +5,7 @@ TARBALL=$(SOURCE_TARBALL_DIR)/rabbitmq-server-$(VERSION).tar.gz TOP_DIR=$(shell pwd) #Under debian we do not want to check build dependencies, since that #only checks build-dependencies using rpms, not debs -DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'debian 1' +DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define '_sysconfdir /etc' --define '_localstatedir /var' ifndef RPM_OS RPM_OS=fedora @@ -35,9 +35,9 @@ prepare: cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate server: prepare - rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) \ + rpmbuild -ba --nodeps SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) \ --target i386 - rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) \ + rpmbuild -ba --nodeps SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) \ --define '_libdir /usr/lib64' --define '_arch x86_64' \ --define '_defaultdocdir /usr/share/doc' --target x86_64 diff --git a/packaging/RPMS/Fedora/init.d b/packaging/RPMS/Fedora/init.d index 9f4dbb43b8..a9155f3b03 100644 --- a/packaging/RPMS/Fedora/init.d +++ b/packaging/RPMS/Fedora/init.d @@ -2,16 +2,14 @@ # # rabbitmq-server RabbitMQ broker # -#chkconfig: 2345 80 05 -#description: Enable AMQP service provided by RabbitMQ +# chkconfig: - 80 05 +# description: Enable AMQP service provided by RabbitMQ # ### BEGIN INIT INFO -# Provides: rabbitmq +# Provides: rabbitmq-server # Required-Start: $remote_fs $network # Required-Stop: $remote_fs $network -# Default-Start: 2 3 4 5 -# Default-Stop: 0 1 6 # Description: RabbitMQ broker # Short-Description: Enable AMQP service provided by RabbitMQ broker ### END INIT INFO @@ -29,13 +27,12 @@ LOCK_FILE=/var/lock/subsys/$NAME test -x $DAEMON || exit 0 # Include rabbitmq defaults if available -if [ -f /etc/default/rabbitmq ] ; then - . /etc/default/rabbitmq +if [ -f /etc/sysconfig/rabbitmq ] ; then + . /etc/sysconfig/rabbitmq fi RETVAL=0 set -e -cd / start_rabbitmq () { set +e @@ -90,7 +87,10 @@ status_rabbitmq() { rotate_logs_rabbitmq() { set +e - $DAEMON rotate_logs ${ROTATE_SUFFIX} 2>&1 + $DAEMON rotate_logs ${ROTATE_SUFFIX} + if [ $? != 0 ] ; then + RETVAL=1 + fi set -e } @@ -129,7 +129,7 @@ case "$1" in ;; *) echo "Usage: $0 {start|stop|status|rotate-logs|restart|condrestart|try-restart|reload|force-reload}" >&2 - RETVAL=1 + RETVAL=2 ;; esac diff --git a/packaging/RPMS/Fedora/rabbitmq-server.logrotate b/packaging/RPMS/Fedora/rabbitmq-server.logrotate index ab87e4a5c6..6b657614de 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.logrotate +++ b/packaging/RPMS/Fedora/rabbitmq-server.logrotate @@ -7,6 +7,6 @@ notifempty sharedscripts postrotate - /sbin/service rabbitmq-server rotate-logs + /sbin/service rabbitmq-server rotate-logs > /dev/null endscript } diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index bedf8d816c..cf8a2b07b3 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -1,3 +1,5 @@ +%define debug_package %{nil} + Name: rabbitmq-server Version: %%VERSION%% Release: 1%%RELEASE_OS%% @@ -8,13 +10,8 @@ Source1: rabbitmq-server.init Source2: rabbitmq-script-wrapper Source3: rabbitmq-server.logrotate URL: http://www.rabbitmq.com/ -Vendor: LShift Ltd., Cohesive Financial Technologies LLC., Rabbit Technlogies Ltd. -%if 0%{?debian} -%else BuildRequires: erlang, python-simplejson -%endif Requires: erlang, logrotate -Packager: Hubert Plociniczak <hubert@lshift.net> BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root Summary: The RabbitMQ server Requires(post): %%REQUIRES%% @@ -27,22 +24,21 @@ scalable implementation of an AMQP broker. %define _rabbit_erllibdir %{_libdir}/erlang/lib/rabbitmq_server-%{version} %define _rabbit_libdir %{_libdir}/rabbitmq +%define _rabbit_wrapper %{_builddir}/`basename %{S:2}` %define _maindir %{buildroot}%{_rabbit_erllibdir} -%pre -if [ $1 -gt 1 ]; then - #Upgrade - stop and remove previous instance of rabbitmq-server init.d script - /sbin/service rabbitmq-server stop - /sbin/chkconfig --del rabbitmq-server -fi - %prep -%setup -n %{name}-%{version} -sed -i 's|/usr/lib/|%{_libdir}/|' %SOURCE2 +%setup -q %build -make +cp %{S:2} %{_rabbit_wrapper} +sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_wrapper} + +# The rabbitmq build needs escript, which is missing from /usr/bin in +# some versions of the erlang RPM. See +# <https://bugzilla.redhat.com/show_bug.cgi?id=481302> +PATH=%{_libdir}/erlang/bin:$PATH make %{?_smp_mflags} %install rm -rf %{buildroot} @@ -51,22 +47,18 @@ make install TARGET_DIR=%{_maindir} \ SBIN_DIR=%{buildroot}%{_rabbit_libdir}/bin \ MAN_DIR=%{buildroot}%{_mandir} -mkdir -p %{buildroot}/var/lib/rabbitmq/mnesia -mkdir -p %{buildroot}/var/log/rabbitmq -mkdir -p %{buildroot}%{_initrddir} +mkdir -p %{buildroot}%{_localstatedir}/lib/rabbitmq/mnesia +mkdir -p %{buildroot}%{_localstatedir}/log/rabbitmq #Copy all necessary lib files etc. -install -m 0755 %SOURCE1 %{buildroot}%{_initrddir}/rabbitmq-server -chmod 0755 %{buildroot}%{_initrddir}/rabbitmq-server -sed -i 's|/usr/lib/|%{_libdir}/|' %{buildroot}%{_initrddir}/rabbitmq-server +install -p -D -m 0755 %{S:1} %{buildroot}%{_initrddir}/rabbitmq-server +install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmqctl +install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server +install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi +install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server -install -p -D -m 0755 %SOURCE2 %{buildroot}%{_sbindir}/rabbitmqctl -install -p -D -m 0755 %SOURCE2 %{buildroot}%{_sbindir}/rabbitmq-server -install -p -D -m 0755 %SOURCE2 %{buildroot}%{_sbindir}/rabbitmq-multi - -mkdir -p %{buildroot}/etc/logrotate.d -install -m 0644 %SOURCE3 %{buildroot}/etc/logrotate.d/rabbitmq-server +mkdir -p %{buildroot}%{_sysconfdir}/rabbitmq rm %{_maindir}/LICENSE %{_maindir}/LICENSE-MPL-RabbitMQ %{_maindir}/INSTALL @@ -74,11 +66,18 @@ rm %{_maindir}/LICENSE %{_maindir}/LICENSE-MPL-RabbitMQ %{_maindir}/INSTALL rm -f %{_builddir}/filelist.%{name}.rpm echo '%defattr(-,root,root, -)' >> %{_builddir}/filelist.%{name}.rpm (cd %{buildroot}; \ - find . -type f ! -regex '\./etc.*' \ + find . -type f ! -regex '\.%{_sysconfdir}.*' \ ! -regex '\.\(%{_rabbit_erllibdir}\|%{_rabbit_libdir}\).*' \ | sed -e 's/^\.//' >> %{_builddir}/filelist.%{name}.rpm) -%post +%pre + +if [ $1 -gt 1 ]; then + #Upgrade - stop and remove previous instance of rabbitmq-server init.d script + /sbin/service rabbitmq-server stop + /sbin/chkconfig --del rabbitmq-server +fi + # create rabbitmq group if ! getent group rabbitmq >/dev/null; then groupadd -r rabbitmq @@ -86,15 +85,12 @@ fi # create rabbitmq user if ! getent passwd rabbitmq >/dev/null; then - useradd -r -g rabbitmq --home /var/lib/rabbitmq rabbitmq - usermod -c "RabbitMQ messaging server" rabbitmq + useradd -r -g rabbitmq -d %{_localstatedir}/lib/rabbitmq rabbitmq \ + -c "RabbitMQ messaging server" fi -chown -R rabbitmq:rabbitmq /var/lib/rabbitmq -chown -R rabbitmq:rabbitmq /var/log/rabbitmq - +%post /sbin/chkconfig --add %{name} -/sbin/service rabbitmq-server start %preun if [ $1 = 0 ]; then @@ -108,18 +104,28 @@ fi %files -f ../filelist.%{name}.rpm %defattr(-,root,root,-) -%dir /var/lib/rabbitmq -%dir /var/log/rabbitmq +%attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq +%attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/log/rabbitmq +%dir %{_sysconfdir}/rabbitmq %{_rabbit_erllibdir} %{_rabbit_libdir} %{_initrddir}/rabbitmq-server -%config(noreplace) /etc/logrotate.d/rabbitmq-server +%config(noreplace) %{_sysconfdir}/logrotate.d/rabbitmq-server %doc LICENSE LICENSE-MPL-RabbitMQ INSTALL %clean rm -rf %{buildroot} %changelog +* Mon Apr 6 2009 Matthias Radestock <matthias@lshift.net> 1.5.4-1 +- Maintenance release for the 1.5.x series + +* Tue Feb 24 2009 Tony Garnock-Jones <tonyg@lshift.net> 1.5.3-1 +- Maintenance release for the 1.5.x series + +* Mon Feb 23 2009 Tony Garnock-Jones <tonyg@lshift.net> 1.5.2-1 +- Maintenance release for the 1.5.x series + * Mon Jan 19 2009 Ben Hood <0x6e6562@gmail.com> 1.5.1-1 - Maintenance release for the 1.5.x series diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper index 217d1658b5..296a77d19c 100644 --- a/packaging/common/rabbitmq-script-wrapper +++ b/packaging/common/rabbitmq-script-wrapper @@ -9,7 +9,7 @@ for arg in "$@" ; do CMDLINE="${CMDLINE} \"${arg}\"" done -cd / +cd /var/lib/rabbitmq SCRIPT=`basename $0` diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 37b01dabb8..d1ccd3a0c2 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,21 @@ +rabbitmq-server (1.5.4-1) hardy; urgency=low + + * New Upstream Release + + -- Matthias Radestock <matthias@lshift.net> Mon, 06 Apr 2009 09:19:32 +0100 + +rabbitmq-server (1.5.3-1) hardy; urgency=low + + * New Upstream Release + + -- Tony Garnock-Jones <tonyg@lshift.net> Tue, 24 Feb 2009 18:23:33 +0000 + +rabbitmq-server (1.5.2-1) hardy; urgency=low + + * New Upstream Release + + -- Tony Garnock-Jones <tonyg@lshift.net> Mon, 23 Feb 2009 16:03:38 +0000 + rabbitmq-server (1.5.1-1) hardy; urgency=low * New Upstream Release diff --git a/packaging/debs/Debian/debian/copyright b/packaging/debs/Debian/debian/copyright index f9a2b6515a..69867220f0 100644..100755 --- a/packaging/debs/Debian/debian/copyright +++ b/packaging/debs/Debian/debian/copyright @@ -3,14 +3,47 @@ Wed, 3 Jan 2007 15:43:44 +0000. It was downloaded from http://www.rabbitmq.com/ -Upstream Author: The RabbitMQ team <info@rabbitmq.com> +The file codegen/amqp-0.8.json is covered by the following terms: + + "Copyright (C) 2008-2009 LShift Ltd, Cohesive Financial Technologies LLC, + and Rabbit Technologies Ltd + + Permission is hereby granted, free of charge, to any person + obtaining a copy of this file (the Software), to deal in the + Software without restriction, including without limitation the + rights to use, copy, modify, merge, publish, distribute, + sublicense, and/or sell copies of the Software, and to permit + persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + OTHER DEALINGS IN THE SOFTWARE." + +The rest of this package is licensed under the Mozilla Public License 1.1 +Authors and Copyright are as described below: -Copyright: 2006-2009 Rabbit Technologies Ltd. - -License: -The RabbitMQ server is licensed under the MPL. + The Initial Developers of the Original Code are LShift Ltd, + Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. + Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, + Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd + are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial + Technologies LLC, and Rabbit Technologies Ltd. + Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift + Ltd. Portions created by Cohesive Financial Technologies LLC are + Copyright (C) 2007-2009 Cohesive Financial Technologies + LLC. Portions created by Rabbit Technologies Ltd are Copyright + (C) 2007-2009 Rabbit Technologies Ltd. MOZILLA PUBLIC LICENSE diff --git a/packaging/debs/Debian/debian/dirs b/packaging/debs/Debian/debian/dirs index 1a707bc191..625b7d41f5 100644 --- a/packaging/debs/Debian/debian/dirs +++ b/packaging/debs/Debian/debian/dirs @@ -5,4 +5,5 @@ usr/share/man var/lib/rabbitmq/mnesia var/log/rabbitmq etc/logrotate.d +etc/rabbitmq diff --git a/packaging/debs/Debian/debian/init.d b/packaging/debs/Debian/debian/init.d index aff0ce4da9..a35a60ec68 100644 --- a/packaging/debs/Debian/debian/init.d +++ b/packaging/debs/Debian/debian/init.d @@ -26,7 +26,6 @@ fi RETVAL=0 set -e -cd / start_rabbitmq () { set +e @@ -79,7 +78,10 @@ status_rabbitmq() { rotate_logs_rabbitmq() { set +e - $DAEMON rotate_logs ${ROTATE_SUFFIX} 2>&1 + $DAEMON rotate_logs ${ROTATE_SUFFIX} + if [ $? != 0 ] ; then + RETVAL=1 + fi set -e } diff --git a/packaging/debs/Debian/debian/postrm b/packaging/debs/Debian/debian/postrm index 8d1e92a929..a999d95b29 100644 --- a/packaging/debs/Debian/debian/postrm +++ b/packaging/debs/Debian/debian/postrm @@ -31,6 +31,9 @@ case "$1" in if [ -d /var/run/rabbitmq ]; then rm -r /var/run/rabbitmq fi + if [ -d /etc/rabbitmq ]; then + rm -r /etc/rabbitmq + fi if getent passwd rabbitmq >/dev/null; then deluser rabbitmq fi diff --git a/packaging/debs/Debian/debian/rabbitmq-server.logrotate b/packaging/debs/Debian/debian/rabbitmq-server.logrotate index bfd6b8da0b..c786df77b2 100644 --- a/packaging/debs/Debian/debian/rabbitmq-server.logrotate +++ b/packaging/debs/Debian/debian/rabbitmq-server.logrotate @@ -7,6 +7,6 @@ notifempty sharedscripts postrotate - /etc/init.d/rabbitmq-server rotate-logs + /etc/init.d/rabbitmq-server rotate-logs > /dev/null endscript } diff --git a/packaging/macports/net/rabbitmq-server/Portfile b/packaging/macports/net/rabbitmq-server/Portfile index 82ae62aa69..d9d16dbbb2 100644 --- a/packaging/macports/net/rabbitmq-server/Portfile +++ b/packaging/macports/net/rabbitmq-server/Portfile @@ -1,79 +1,98 @@ -# $Id$ -*- coding: utf-8; mode: tcl; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- vim:fenc=utf-8:filetype=tcl:et:sw=4:ts=4:sts=4 - -PortSystem 1.0 -name rabbitmq-server -version 1.3.0 -revision 0 -categories net -maintainers tonyg@rabbitmq.com -platforms darwin -description The RabbitMQ AMQP Server -long_description \ +# -*- coding: utf-8; mode: tcl; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- vim:fenc=utf-8:filetype=tcl:et:sw=4:ts=4:sts=4 +# $Id$ + +PortSystem 1.0 + +name rabbitmq-server +version 1.5.3 +categories net +maintainers tonyg@rabbitmq.com +platforms darwin +description The RabbitMQ AMQP Server +long_description \ RabbitMQ is an implementation of AMQP, the emerging standard for \ high performance enterprise messaging. The RabbitMQ server is a \ robust and scalable implementation of an AMQP broker. -homepage http://www.rabbitmq.com/ -master_sites http://www.rabbitmq.com/releases/source/ -distname rabbitmq-${version} +homepage http://www.rabbitmq.com/ +master_sites http://www.rabbitmq.com/releases/rabbitmq-server/v${version}/ checksums \ - md5 46ee6dbbacdc67b25cc6ccd9c394b6f2 \ - sha1 67e1e640136a1993567ace97dc5f67b1ad8e6304 \ - rmd160 9e92502d36ab5cd1e3f0d39a46bb512b9440f35a + md5 3242a67885c2471b5ab62254bf024679 \ + sha1 f4d6a01eaa2c74fa32f567fe410d21d9be1b43aa \ + rmd160 1a1c4b97d765548028c161d1617905151ca9e040 -depends_build port:erlang -depends_run port:erlang +depends_build port:erlang port:py25-simplejson +depends_run port:erlang -use_configure no +set serveruser rabbitmq +set servergroup rabbitmq +set serverhome ${prefix}/var/lib/rabbitmq +set logdir ${prefix}/var/log/rabbitmq +set mnesiadbdir ${prefix}/var/lib/rabbitmq/mnesia +set plistloc ${prefix}/etc/LaunchDaemons/org.macports.rabbitmq-server -worksrcdir rabbitmq-${version}/erlang/rabbit +use_configure no use_parallel_build yes +build.args PYTHON=${prefix}/bin/python2.5 + destroot.destdir \ - DIST_DIR=${destroot}${prefix}/lib/erlang/lib/rabbitmq_server-${version} \ - SBIN_DIR=${destroot}${prefix}/sbin -destroot.target dist-unix + TARGET_DIR=${destroot}${prefix}/lib/erlang/lib/rabbitmq_server-${version} \ + SBIN_DIR=${destroot}${prefix}/sbin \ + MAN_DIR=${destroot}${prefix}/share/man destroot.keepdirs \ - ${destroot}${prefix}/var/lib/rabbitmq/pids \ - ${destroot}${prefix}/var/log/rabbitmq \ - ${destroot}${prefix}/var/lib/rabbitmq/mnesia + ${destroot}${logdir} \ + ${destroot}${mnesiadbdir} pre-destroot { - addgroup rabbitmq - adduser rabbitmq gid=[existsgroup rabbitmq] realname=RabbitMQ\ Server home=${prefix}/var/lib/rabbitmq + addgroup ${servergroup} + adduser ${serveruser} gid=[existsgroup ${servergroup}] realname=RabbitMQ\ Server home=${serverhome} } post-destroot { - xinstall -d ${destroot}${prefix}/etc/default - xinstall -d -g [existsgroup rabbitmq] -m 775 ${destroot}${prefix}/var/log/rabbitmq - xinstall -d -g [existsgroup rabbitmq] -m 775 ${destroot}${prefix}/var/lib/rabbitmq - xinstall -d -g [existsgroup rabbitmq] -m 775 ${destroot}${prefix}/var/lib/rabbitmq/pids - xinstall -d -g [existsgroup rabbitmq] -m 775 ${destroot}${prefix}/var/lib/rabbitmq/mnesia - file rename ${destroot}${prefix}/sbin/rabbitmqctl ${destroot}${prefix}/sbin/rabbitmqctl_real - xinstall -m 555 ${filespath}/rabbitmqctl_wrapper ${destroot}${prefix}/sbin - file rename ${destroot}${prefix}/sbin/rabbitmqctl_wrapper ${destroot}${prefix}/sbin/rabbitmqctl - file copy ${filespath}/rabbitmq-defaults ${destroot}${prefix}/etc/default/rabbitmq - reinplace "s:^CLUSTER_CONFIG_FILE=:CLUSTER_CONFIG_FILE=${prefix}:" \ + xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${logdir} + xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${serverhome} + xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir} + + reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \ + ${destroot}${prefix}/sbin/rabbitmq-multi \ + ${destroot}${prefix}/sbin/rabbitmq-server \ + ${destroot}${prefix}/sbin/rabbitmqctl + reinplace -E "s:(RABBITMQ_CLUSTER_CONFIG_FILE)=/:\\1=${prefix}/:" \ + ${destroot}${prefix}/sbin/rabbitmq-multi \ + ${destroot}${prefix}/sbin/rabbitmq-server \ + ${destroot}${prefix}/sbin/rabbitmqctl + reinplace -E "s:(RABBITMQ_LOG_BASE)=/:\\1=${prefix}/:" \ + ${destroot}${prefix}/sbin/rabbitmq-multi \ + ${destroot}${prefix}/sbin/rabbitmq-server \ + ${destroot}${prefix}/sbin/rabbitmqctl + reinplace -E "s:(RABBITMQ_MNESIA_BASE)=/:\\1=${prefix}/:" \ ${destroot}${prefix}/sbin/rabbitmq-multi \ ${destroot}${prefix}/sbin/rabbitmq-server \ - ${destroot}${prefix}/sbin/rabbitmqctl \ - ${destroot}${prefix}/sbin/rabbitmqctl_real - reinplace "s:^CONFIG_FILE=:CONFIG_FILE=${prefix}:" \ + ${destroot}${prefix}/sbin/rabbitmqctl + reinplace -E "s:(RABBITMQ_PIDS_FILE)=/:\\1=${prefix}/:" \ ${destroot}${prefix}/sbin/rabbitmq-multi \ ${destroot}${prefix}/sbin/rabbitmq-server \ - ${destroot}${prefix}/sbin/rabbitmqctl \ - ${destroot}${prefix}/sbin/rabbitmqctl_real - reinplace "s|@PREFIX@|${prefix}|" \ - ${destroot}${prefix}/sbin/rabbitmqctl \ - ${destroot}${prefix}/etc/default/rabbitmq + ${destroot}${prefix}/sbin/rabbitmqctl + + file rename ${destroot}${prefix}/sbin/rabbitmqctl ${destroot}${prefix}/sbin/rabbitmqctl_real + xinstall -m 555 ${filespath}/rabbitmqctl_wrapper ${destroot}${prefix}/sbin + file rename ${destroot}${prefix}/sbin/rabbitmqctl_wrapper ${destroot}${prefix}/sbin/rabbitmqctl + + reinplace -E "s:@PREFIX@:${prefix}:" \ + ${destroot}${prefix}/sbin/rabbitmqctl +} + +pre-install { + system "cd ${destroot}${plistloc}; patch <${filespath}/patch-org.macports.rabbitmq-server.plist.diff" } startupitem.create yes startupitem.init "PATH=${prefix}/bin:${prefix}/sbin:\$PATH; export PATH" -startupitem.start "su rabbitmq -c rabbitmq-server 2>&1" +startupitem.start "rabbitmq-server 2>&1" startupitem.stop "rabbitmqctl stop 2>&1" startupitem.logfile ${prefix}/var/log/rabbitmq/startupitem.log diff --git a/packaging/macports/net/rabbitmq-server/files/patch-org.macports.rabbitmq-server.plist.diff b/packaging/macports/net/rabbitmq-server/files/patch-org.macports.rabbitmq-server.plist.diff new file mode 100644 index 0000000000..45b4949616 --- /dev/null +++ b/packaging/macports/net/rabbitmq-server/files/patch-org.macports.rabbitmq-server.plist.diff @@ -0,0 +1,10 @@ +--- org.macports.rabbitmq-server.plist.old 2009-02-26 08:00:31.000000000 -0800 ++++ org.macports.rabbitmq-server.plist 2009-02-26 08:01:27.000000000 -0800 +@@ -22,6 +22,7 @@ + <string>;</string> + <string>--pid=none</string> + </array> ++<key>UserName</key><string>rabbitmq</string> + <key>Debug</key><false/> + <key>Disabled</key><true/> + <key>OnDemand</key><false/> diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmq-defaults b/packaging/macports/net/rabbitmq-server/files/rabbitmq-defaults deleted file mode 100644 index 1f9aad1177..0000000000 --- a/packaging/macports/net/rabbitmq-server/files/rabbitmq-defaults +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/sh -# defaults file for rabbitmq-server -# - -PIDS_FILE=@PREFIX@/var/lib/rabbitmq/pids -LOG_BASE=@PREFIX@/var/log/rabbitmq -MNESIA_BASE=@PREFIX@/var/lib/rabbitmq/mnesia diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper index 392c82ff9c..1996811eb5 100644 --- a/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper +++ b/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper @@ -1,13 +1,2 @@ #!/bin/bash -# Escape spaces and quotes, because shell is revolting. -for arg in "$@" ; do - # Escape quotes in parameters, so that they're passed through cleanly. - arg=$(sed -e 's/"/\\"/' <<-END - $arg - END - ) - CMDLINE="${CMDLINE} \"${arg}\"" -done - -cd / -exec su rabbitmq -c "@PREFIX@/sbin/rabbitmqctl_real ${CMDLINE}" +exec sudo -H -u rabbitmq "@PREFIX@/sbin/rabbitmqctl_real" "$@" diff --git a/packaging/windows/rabbitmq-service.pod b/packaging/windows/rabbitmq-service.pod index 7c4d3ef210..8a2d2e5b22 100644 --- a/packaging/windows/rabbitmq-service.pod +++ b/packaging/windows/rabbitmq-service.pod @@ -92,8 +92,10 @@ Defaults to 5672. =head2 ERLANG_SERVICE_MANAGER_PATH -Defaults to F<C:\Program Files\erl5.5.5\erts-5.5.5\bin>. This is -the installation location of the Erlang service manager. +Defaults to F<C:\Program Files\erl5.5.5\erts-5.5.5\bin> +(or F<C:\Program Files (x86)\erl5.5.5\erts-5.5.5\bin> for 64-bit +environments). This is the installation location of the Erlang service +manager. =head2 CLUSTER_CONFIG_FILE diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi index 164c5e187c..1d0c785f6b 100755 --- a/scripts/rabbitmq-multi +++ b/scripts/rabbitmq-multi @@ -29,23 +29,23 @@ ## ## Contributor(s): ______________________________________. ## +NODENAME=rabbit +NODE_IP_ADDRESS=0.0.0.0 +NODE_PORT=5672 +SCRIPT_HOME=$(dirname $0) +PIDS_FILE=/var/lib/rabbitmq/pids +MULTI_ERL_ARGS= +MULTI_START_ARGS= -[ -f /etc/default/rabbitmq ] && . /etc/default/rabbitmq +[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} -[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=rabbit [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} -[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=0.0.0.0 [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} -[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=5672 [ "x" = "x$RABBITMQ_SCRIPT_HOME" ] && RABBITMQ_SCRIPT_HOME=${SCRIPT_HOME} -[ "x" = "x$RABBITMQ_SCRIPT_HOME" ] && RABBITMQ_SCRIPT_HOME=$(dirname $0) [ "x" = "x$RABBITMQ_PIDS_FILE" ] && RABBITMQ_PIDS_FILE=${PIDS_FILE} -[ "x" = "x$RABBITMQ_PIDS_FILE" ] && RABBITMQ_PIDS_FILE=/var/lib/rabbitmq/pids [ "x" = "x$RABBITMQ_MULTI_ERL_ARGS" ] && RABBITMQ_MULTI_ERL_ARGS=${MULTI_ERL_ARGS} -[ "x" = "x$RABBITMQ_MULTI_ERL_ARGS" ] && RABBITMQ_MULTI_ERL_ARGS= [ "x" = "x$RABBITMQ_MULTI_START_ARGS" ] && RABBITMQ_MULTI_START_ARGS=${MULTI_START_ARGS} -[ "x" = "x$RABBITMQ_MULTI_START_ARGS" ] && RABBITMQ_MULTI_START_ARGS= export \ RABBITMQ_NODENAME \ diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat index 30f33a5a26..a30c0889ab 100755 --- a/scripts/rabbitmq-multi.bat +++ b/scripts/rabbitmq-multi.bat @@ -47,7 +47,7 @@ if "%RABBITMQ_NODE_PORT%"=="" ( )
set RABBITMQ_PIDS_FILE=%RABBITMQ_BASE%\rabbitmq.pids
-set RABBITMQ_SCRIPT_HOME=%~dp0%
+set RABBITMQ_SCRIPT_HOME=%~sdp0%
if "%ERLANG_HOME%"=="" (
set ERLANG_HOME=%~dp0%..\..\..
@@ -65,5 +65,5 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" ( exit /B
)
-"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden -sname rabbitmq_multi -s rabbit_multi %START_ARGS% -extra %*
+"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden %RABBITMQ_MULTI_ERL_ARGS% -sname rabbitmq_multi -s rabbit_multi %RABBITMQ_MULTI_START_ARGS% -extra %*
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 9a35c47721..8502d60abc 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -30,28 +30,30 @@ ## Contributor(s): ______________________________________. ## -[ -f /etc/default/rabbitmq ] && . /etc/default/rabbitmq +NODENAME=rabbit +NODE_IP_ADDRESS=0.0.0.0 +NODE_PORT=5672 +SERVER_ERL_ARGS="+K true +A30 \ +-kernel inet_default_listen_options [{nodelay,true},{sndbuf,16384},{recbuf,4096}] \ +-kernel inet_default_connect_options [{nodelay,true}]" +CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config +LOG_BASE=/var/log/rabbitmq +MNESIA_BASE=/var/lib/rabbitmq/mnesia +SERVER_START_ARGS= + +[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} -[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=rabbit [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} -[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=0.0.0.0 [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} -[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=5672 [ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS} -[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS="+K true +A30 \ --kernel inet_default_listen_options [{nodelay,true},{sndbuf,16384},{recbuf,4096}] \ --kernel inet_default_connect_options [{nodelay,true}]" [ "x" = "x$RABBITMQ_CLUSTER_CONFIG_FILE" ] && RABBITMQ_CLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE} -[ "x" = "x$RABBITMQ_CLUSTER_CONFIG_FILE" ] && RABBITMQ_CLUSTER_CONFIG_FILE=/etc/default/rabbitmq_cluster.config [ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE} -[ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=/var/log/rabbitmq [ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE} -[ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=/var/lib/rabbitmq/mnesia +[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS} + [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR} [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME} -[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS} -[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS= ## Log rotation [ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS} diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 5b20ef2010..9915727bd9 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -117,6 +117,7 @@ if "%RABBITMQ_MNESIA_DIR%"=="" ( -kernel inet_default_connect_options "[{nodelay, true}]" ^
-rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\", %RABBITMQ_NODE_PORT%}]" ^
-kernel error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%.log"\"} ^
+%RABBITMQ_SERVER_ERL_ARGS% ^
-sasl errlog_type error ^
-sasl sasl_error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%-sasl.log"\"} ^
-os_mon start_cpu_sup true ^
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index b941b85005..c57978c050 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -30,10 +30,15 @@ ## Contributor(s): ______________________________________. ## +[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf + +[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS} + exec erl \ -pa "`dirname $0`/../ebin" \ -noinput \ -hidden \ + ${RABBITMQ_CTL_ERL_ARGS} \ -sname rabbitmqctl$$ \ -s rabbit_control \ -extra "$@" diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index 33a10777d0..e4dccfba64 100755 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -46,4 +46,4 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" ( exit /B
)
-"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden -sname rabbitmqctl -s rabbit_control -extra %*
+"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden %RABBITMQ_CTL_ERL_ARGS% -sname rabbitmqctl -s rabbit_control -extra %*
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 11bb66d743..ba8becfca9 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -16,6 +16,11 @@ %% The original code could reorder messages when communicating with a %% process on a remote node that was not currently connected. %% +%% 4) The new functions gen_server2:pcall/3, pcall/4, and pcast/3 +%% allow callers to attach priorities to requests. Requests with +%% higher priorities are processed before requests with lower +%% priorities. The default priority is 0. +%% %% All modifications are (C) 2009 LShift Ltd. %% ``The contents of this file are subject to the Erlang Public License, @@ -107,8 +112,8 @@ %% API -export([start/3, start/4, start_link/3, start_link/4, - call/2, call/3, - cast/2, reply/2, + call/2, call/3, pcall/3, pcall/4, + cast/2, pcast/3, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, enter_loop/3, enter_loop/4, enter_loop/5]). @@ -188,6 +193,22 @@ call(Name, Request, Timeout) -> exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) end. +pcall(Name, Priority, Request) -> + case catch gen:call(Name, '$gen_pcall', {Priority, Request}) of + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, pcall, [Name, Priority, Request]}}) + end. + +pcall(Name, Priority, Request, Timeout) -> + case catch gen:call(Name, '$gen_pcall', {Priority, Request}, Timeout) of + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, pcall, [Name, Priority, Request, Timeout]}}) + end. + %% ----------------------------------------------------------------- %% Make a cast to a generic server. %% ----------------------------------------------------------------- @@ -207,6 +228,22 @@ do_cast(Dest, Request) -> cast_msg(Request) -> {'$gen_cast',Request}. +pcast({global,Name}, Priority, Request) -> + catch global:send(Name, cast_msg(Priority, Request)), + ok; +pcast({Name,Node}=Dest, Priority, Request) when is_atom(Name), is_atom(Node) -> + do_cast(Dest, Priority, Request); +pcast(Dest, Priority, Request) when is_atom(Dest) -> + do_cast(Dest, Priority, Request); +pcast(Dest, Priority, Request) when is_pid(Dest) -> + do_cast(Dest, Priority, Request). + +do_cast(Dest, Priority, Request) -> + do_send(Dest, cast_msg(Priority, Request)), + ok. + +cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}. + %% ----------------------------------------------------------------- %% Send a reply to the client. %% ----------------------------------------------------------------- @@ -276,7 +313,7 @@ enter_loop(Mod, Options, State, ServerName, Timeout) -> Name = get_proc_name(ServerName), Parent = get_parent(), Debug = debug_options(Name, Options), - Queue = queue:new(), + Queue = priority_queue:new(), loop(Parent, Name, State, Mod, Timeout, Queue, Debug). %%%======================================================================== @@ -294,7 +331,7 @@ init_it(Starter, self, Name, Mod, Args, Options) -> init_it(Starter, self(), Name, Mod, Args, Options); init_it(Starter, Parent, Name, Mod, Args, Options) -> Debug = debug_options(Name, Options), - Queue = queue:new(), + Queue = priority_queue:new(), case catch Mod:init(Args) of {ok, State} -> proc_lib:init_ack(Starter, {ok, self()}), @@ -326,9 +363,9 @@ init_it(Starter, Parent, Name, Mod, Args, Options) -> loop(Parent, Name, State, Mod, Time, Queue, Debug) -> receive Input -> loop(Parent, Name, State, Mod, - Time, queue:in(Input, Queue), Debug) + Time, in(Input, Queue), Debug) after 0 -> - case queue:out(Queue) of + case priority_queue:out(Queue) of {{value, Msg}, Queue1} -> process_msg(Parent, Name, State, Mod, Time, Queue1, Debug, Msg); @@ -336,14 +373,21 @@ loop(Parent, Name, State, Mod, Time, Queue, Debug) -> receive Input -> loop(Parent, Name, State, Mod, - Time, queue:in(Input, Queue1), Debug) + Time, in(Input, Queue1), Debug) after Time -> process_msg(Parent, Name, State, Mod, Time, Queue1, Debug, timeout) end end end. - + +in({'$gen_pcast', {Priority, Msg}}, Queue) -> + priority_queue:in({'$gen_cast', Msg}, Priority, Queue); +in({'$gen_pcall', From, {Priority, Msg}}, Queue) -> + priority_queue:in({'$gen_call', From, Msg}, Priority, Queue); +in(Input, Queue) -> + priority_queue:in(Input, Queue). + process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) -> case Msg of {system, From, Req} -> @@ -850,5 +894,5 @@ format_status(Opt, StatusData) -> {data, [{"Status", SysState}, {"Parent", Parent}, {"Logged events", Log}, - {"Queued messages", queue:to_list(Queue)}]} | + {"Queued messages", priority_queue:to_list(Queue)}]} | Specfic]. diff --git a/src/priority_queue.erl b/src/priority_queue.erl new file mode 100644 index 0000000000..732757c41c --- /dev/null +++ b/src/priority_queue.erl @@ -0,0 +1,153 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +%% Priority queues have essentially the same interface as ordinary +%% queues, except that a) there is an in/3 that takes a priority, and +%% b) we have only implemented the core API we need. +%% +%% Priorities should be integers - the higher the value the higher the +%% priority - but we don't actually check that. +%% +%% in/2 inserts items with priority 0. +%% +%% We optimise the case where a priority queue is being used just like +%% an ordinary queue. When that is the case we represent the priority +%% queue as an ordinary queue. We could just call into the 'queue' +%% module for that, but for efficiency we implement the relevant +%% functions directly in here, thus saving on inter-module calls and +%% eliminating a level of boxing. +%% +%% When the queue contains items with non-zero priorities, it is +%% represented as a sorted kv list with the inverted Priority as the +%% key and an ordinary queue as the value. Here again we use our own +%% ordinary queue implemention for efficiency, often making recursive +%% calls into the same function knowing that ordinary queues represent +%% a base case. + + +-module(priority_queue). + +-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(priority() :: integer()). +-type(squeue() :: {queue, [any()], [any()]}). +-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}). + +-spec(new/0 :: () -> pqueue()). +-spec(is_queue/1 :: (any()) -> bool()). +-spec(is_empty/1 :: (pqueue()) -> bool()). +-spec(len/1 :: (pqueue()) -> non_neg_integer()). +-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]). +-spec(in/2 :: (any(), pqueue()) -> pqueue()). +-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()). +-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}). + +-endif. + +%%---------------------------------------------------------------------------- + +new() -> + {queue, [], []}. + +is_queue({queue, R, F}) when is_list(R), is_list(F) -> + true; +is_queue({pqueue, Queues}) when is_list(Queues) -> + lists:all(fun ({P, Q}) -> is_integer(P) andalso is_queue(Q) end, + Queues); +is_queue(_) -> + false. + +is_empty({queue, [], []}) -> + true; +is_empty(_) -> + false. + +len({queue, R, F}) when is_list(R), is_list(F) -> + length(R) + length(F); +len({pqueue, Queues}) -> + lists:sum([len(Q) || {_, Q} <- Queues]). + +to_list({queue, In, Out}) when is_list(In), is_list(Out) -> + [{0, V} || V <- Out ++ lists:reverse(In, [])]; +to_list({pqueue, Queues}) -> + [{-P, V} || {P, Q} <- Queues, {0, V} <- to_list(Q)]. + +in(Item, Q) -> + in(Item, 0, Q). + +in(X, 0, {queue, [_] = In, []}) -> + {queue, [X], In}; +in(X, 0, {queue, In, Out}) when is_list(In), is_list(Out) -> + {queue, [X|In], Out}; +in(X, Priority, _Q = {queue, [], []}) -> + in(X, Priority, {pqueue, []}); +in(X, Priority, Q = {queue, _, _}) -> + in(X, Priority, {pqueue, [{0, Q}]}); +in(X, Priority, {pqueue, Queues}) -> + P = -Priority, + {pqueue, case lists:keysearch(P, 1, Queues) of + {value, {_, Q}} -> + lists:keyreplace(P, 1, Queues, {P, in(X, Q)}); + false -> + lists:keysort(1, [{P, {queue, [X], []}} | Queues]) + end}. + +out({queue, [], []} = Q) -> + {empty, Q}; +out({queue, [V], []}) -> + {{value, V}, {queue, [], []}}; +out({queue, [Y|In], []}) -> + [V|Out] = lists:reverse(In, []), + {{value, V}, {queue, [Y], Out}}; +out({queue, In, [V]}) when is_list(In) -> + {{value,V}, r2f(In)}; +out({queue, In,[V|Out]}) when is_list(In) -> + {{value, V}, {queue, In, Out}}; +out({pqueue, [{P, Q} | Queues]}) -> + {R, Q1} = out(Q), + NewQ = case is_empty(Q1) of + true -> case Queues of + [] -> {queue, [], []}; + [{0, OnlyQ}] -> OnlyQ; + [_|_] -> {pqueue, Queues} + end; + false -> {pqueue, [{P, Q1} | Queues]} + end, + {R, NewQ}. + +r2f([]) -> {queue, [], []}; +r2f([_] = R) -> {queue, [], R}; +r2f([X,Y]) -> {queue, [X], [Y]}; +r2f([X,Y|R]) -> {queue, [X,Y], lists:reverse(R, [])}. diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index da0ab9cf7a..54348d9a1c 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -192,7 +192,7 @@ delete_user(Username) -> fun () -> ok = mnesia:delete({rabbit_user, Username}), [ok = mnesia:delete_object( - rabbit_user_permissions, R, write) || + rabbit_user_permission, R, write) || R <- mnesia:match_object( rabbit_user_permission, #user_permission{user_vhost = #user_vhost{ diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 875624ba55..21999f16c3 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -78,7 +78,8 @@ stop() -> register(Pid, HighMemMFA) -> ok = gen_event:call(alarm_handler, ?MODULE, - {register, Pid, HighMemMFA}). + {register, Pid, HighMemMFA}, + infinity). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3018582f94..eb076e94d6 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -124,19 +124,32 @@ recover() -> recover_durable_queues() -> Node = node(), - %% TODO: use dirty ops instead - R = rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} + lists:foreach( + fun (RecoveredQ) -> + Q = start_queue_process(RecoveredQ), + %% We need to catch the case where a client connected to + %% another node has deleted the queue (and possibly + %% re-created it). + case rabbit_misc:execute_mnesia_transaction( + fun () -> case mnesia:match_object( + rabbit_durable_queue, RecoveredQ, read) of + [_] -> ok = store_queue(Q), + true; + [] -> false + end + end) of + true -> ok; + false -> exit(Q#amqqueue.pid, shutdown) + end + end, + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} <- mnesia:table(rabbit_durable_queue), - node(Pid) == Node])) - end), - Queues = lists:map(fun start_queue_process/1, R), - rabbit_misc:execute_mnesia_transaction( - fun () -> - lists:foreach(fun store_queue/1, Queues), - ok - end). + node(Pid) == Node])) + end)), + ok. declare(QueueName, Durable, AutoDelete, Args) -> Q = start_queue_process(#amqqueue{name = QueueName, @@ -200,10 +213,10 @@ list(VHostPath) -> map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - gen_server2:call(QPid, info). + gen_server2:pcall(QPid, 9, info, infinity). info(#amqqueue{ pid = QPid }, Items) -> - case gen_server2:call(QPid, {info, Items}) of + case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -212,20 +225,20 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). -stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat). +stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity). stat_all() -> lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - gen_server2:call(QPid, {delete, IfUnused, IfEmpty}). + gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity). -purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge). +purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity). deliver(_IsMandatory, true, Txn, Message, QPid) -> - gen_server2:call(QPid, {deliver_immediately, Txn, Message}); + gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity); deliver(true, _IsImmediate, Txn, Message, QPid) -> - gen_server2:call(QPid, {deliver, Txn, Message}), + gen_server2:call(QPid, {deliver, Txn, Message}, infinity), true; deliver(false, _IsImmediate, Txn, Message, QPid) -> gen_server2:cast(QPid, {deliver, Txn, Message}), @@ -241,10 +254,9 @@ ack(QPid, Txn, MsgIds, ChPid) -> gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> - Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, Timeout) end, + fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end, QPids). rollback_all(QPids, Txn) -> @@ -254,12 +266,11 @@ rollback_all(QPids, Txn) -> QPids). notify_down_all(QPids, ChPid) -> - Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( %% we don't care if the queue process has terminated in the %% meantime fun (_) -> ok end, - fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, Timeout) end, + fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end, QPids). limit_all(QPids, ChPid, LimiterPid) -> @@ -269,18 +280,20 @@ limit_all(QPids, ChPid, LimiterPid) -> QPids). claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - gen_server2:call(QPid, {claim_queue, ReaderPid}). + gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - gen_server2:call(QPid, {basic_get, ChPid, NoAck}). + gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity). basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, - LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). + LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, + infinity). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). + ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, + infinity). notify_sent(QPid, ChPid) -> gen_server2:cast(QPid, {notify_sent, ChPid}). @@ -292,28 +305,29 @@ internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of - [] -> {error, not_found}; - [Q] -> - ok = delete_queue(Q), + [] -> {error, not_found}; + [_] -> + ok = rabbit_exchange:delete_queue_bindings(QueueName), + ok = mnesia:delete({rabbit_queue, QueueName}), ok = mnesia:delete({rabbit_durable_queue, QueueName}), ok end end). -delete_queue(#amqqueue{name = QueueName}) -> - ok = rabbit_exchange:delete_bindings_for_queue(QueueName), - ok = mnesia:delete({rabbit_queue, QueueName}), - ok. - on_node_down(Node) -> rabbit_misc:execute_mnesia_transaction( fun () -> qlc:fold( - fun (Q, Acc) -> ok = delete_queue(Q), Acc end, + fun (QueueName, Acc) -> + ok = rabbit_exchange:delete_transient_queue_bindings( + QueueName), + ok = mnesia:delete({rabbit_queue, QueueName}), + Acc + end, ok, - qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])) + qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])) end). pseudo_queue(QueueName, Pid) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7574cd673a..b2716ec478 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -745,12 +745,16 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of + {error, exchange_not_found} -> + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); {error, queue_not_found} -> rabbit_misc:protocol_error( not_found, "no ~s", [rabbit_misc:rs(QueueName)]); - {error, exchange_not_found} -> + {error, exchange_and_queue_not_found} -> rabbit_misc:protocol_error( - not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); + not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName), + rabbit_misc:rs(QueueName)]); {error, binding_not_found} -> rabbit_misc:protocol_error( not_found, "no binding ~s between ~s and ~s", diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index e6717d689f..6649899ade 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -38,6 +38,19 @@ -define(RPC_TIMEOUT, 30000). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start/0 :: () -> no_return()). +-spec(stop/0 :: () -> 'ok'). +-spec(action/4 :: (atom(), erlang_node(), [string()], + fun ((string(), [any()]) -> 'ok')) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + start() -> FullCommand = init:get_plain_arguments(), #params{quiet = Quiet, node = Node, command = Command, args = Args} = diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 19efd9fc22..fc89cfca51 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -40,7 +40,7 @@ route/3]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). --export([delete_bindings_for_queue/1]). +-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). -export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]). %% EXTENDED API @@ -59,8 +59,10 @@ -type(publish_res() :: {'ok', [pid()]} | not_found() | {'error', 'unroutable' | 'not_delivered'}). --type(bind_res() :: 'ok' | - {'error', 'queue_not_found' | 'exchange_not_found'}). +-type(bind_res() :: 'ok' | {'error', + 'queue_not_found' | + 'exchange_not_found' | + 'exchange_and_queue_not_found'}). -spec(recover/0 :: () -> 'ok'). -spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(), amqp_table()) -> exchange()). @@ -86,7 +88,8 @@ bind_res() | {'error', 'binding_not_found'}). -spec(list_bindings/1 :: (vhost()) -> [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). --spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok'). +-spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok'). +-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok'). -spec(topic_matches/2 :: (binary(), binary()) -> bool()). -spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()). -spec(delete/2 :: (exchange_name(), bool()) -> @@ -103,24 +106,17 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, arguments]. recover() -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - mnesia:foldl( - fun (Exchange, Acc) -> - ok = mnesia:write(rabbit_exchange, Exchange, write), - Acc - end, ok, rabbit_durable_exchange), - mnesia:foldl( - fun (Route, Acc) -> - {_, ReverseRoute} = route_with_reverse(Route), - ok = mnesia:write(rabbit_route, - Route, write), - ok = mnesia:write(rabbit_reverse_route, - ReverseRoute, write), - Acc - end, ok, rabbit_durable_route), - ok - end). + ok = rabbit_misc:table_foreach( + fun(Exchange) -> ok = mnesia:write(rabbit_exchange, + Exchange, write) + end, rabbit_durable_exchange), + ok = rabbit_misc:table_foreach( + fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route), + ok = mnesia:write(rabbit_route, + Route, write), + ok = mnesia:write(rabbit_reverse_route, + ReverseRoute, write) + end, rabbit_durable_route). declare(ExchangeName, Type, Durable, AutoDelete, Args) -> Exchange = #exchange{name = ExchangeName, @@ -300,7 +296,7 @@ lookup_qpids(Queues) -> %% refactored to its own module, especially seeing as unbind will have %% to be implemented for 0.91 ? -delete_bindings_for_exchange(ExchangeName) -> +delete_exchange_bindings(ExchangeName) -> [begin ok = mnesia:delete_object(rabbit_reverse_route, reverse_route(Route), write), @@ -312,10 +308,16 @@ delete_bindings_for_exchange(ExchangeName) -> write)], ok. -delete_bindings_for_queue(QueueName) -> +delete_queue_bindings(QueueName) -> + delete_queue_bindings(QueueName, fun delete_forward_routes/1). + +delete_transient_queue_bindings(QueueName) -> + delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1). + +delete_queue_bindings(QueueName, FwdDeleteFun) -> Exchanges = exchanges_for_queue(QueueName), [begin - ok = delete_forward_routes(reverse_route(Route)), + ok = FwdDeleteFun(reverse_route(Route)), ok = mnesia:delete_object(rabbit_reverse_route, Route, write) end || Route <- mnesia:match_object( rabbit_reverse_route, @@ -333,6 +335,9 @@ delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), ok = mnesia:delete_object(rabbit_durable_route, Route, write). +delete_transient_forward_routes(Route) -> + ok = mnesia:delete_object(rabbit_route, Route, write). + exchanges_for_queue(QueueName) -> MatchHead = reverse_route( #route{binding = #binding{exchange_name = '$1', @@ -342,16 +347,13 @@ exchanges_for_queue(QueueName) -> sets:from_list( mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))). -has_bindings(ExchangeName) -> - MatchHead = #route{binding = #binding{exchange_name = ExchangeName, - _ = '_'}}, +contains(Table, MatchHead) -> try - continue(mnesia:select(rabbit_route, [{MatchHead, [], ['$_']}], - 1, read)) + continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)) catch exit:{aborted, {badarg, _}} -> %% work around OTP-7025, which was fixed in R12B-1, by %% falling back on a less efficient method - case mnesia:match_object(rabbit_route, MatchHead, read) of + case mnesia:match_object(Table, MatchHead, read) of [] -> false; [_|_] -> true end @@ -364,18 +366,20 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)). call_with_exchange(Exchange, Fun) -> rabbit_misc:execute_mnesia_transaction( fun() -> case mnesia:read({rabbit_exchange, Exchange}) of - [] -> {error, exchange_not_found}; + [] -> {error, not_found}; [X] -> Fun(X) end end). call_with_exchange_and_queue(Exchange, Queue, Fun) -> - call_with_exchange( - Exchange, - fun(X) -> case mnesia:read({rabbit_queue, Queue}) of - [] -> {error, queue_not_found}; - [Q] -> Fun(X, Q) - end + rabbit_misc:execute_mnesia_transaction( + fun() -> case {mnesia:read({rabbit_exchange, Exchange}), + mnesia:read({rabbit_queue, Queue})} of + {[X], [Q]} -> Fun(X, Q); + {[ ], [_]} -> {error, exchange_not_found}; + {[_], [ ]} -> {error, queue_not_found}; + {[ ], [ ]} -> {error, exchange_and_queue_not_found} + end end). add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> @@ -559,13 +563,17 @@ maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> ok. conditional_delete(Exchange = #exchange{name = ExchangeName}) -> - case has_bindings(ExchangeName) of + Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, + %% we need to check for durable routes here too in case a bunch of + %% routes to durable queues have been removed temporarily as a + %% result of a node failure + case contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match) of false -> unconditional_delete(Exchange); true -> {error, in_use} end. unconditional_delete(#exchange{name = ExchangeName}) -> - ok = delete_bindings_for_exchange(ExchangeName), + ok = delete_exchange_bindings(ExchangeName), ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), ok = mnesia:delete({rabbit_exchange, ExchangeName}). diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 51c1665bbb..2be005034e 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -82,7 +82,8 @@ guid() -> %% and time. We combine that with a process-local counter to give %% us a GUID that is monotonically increasing per process. G = case get(guid) of - undefined -> {{gen_server:call(?SERVER, serial), self()}, 0}; + undefined -> {{gen_server:call(?SERVER, serial, infinity), self()}, + 0}; {S, I} -> {S, I+1} end, put(guid, G), diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 532be26d8e..3f9b6ebb9b 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -90,7 +90,7 @@ can_send(undefined, _QPid) -> can_send(LimiterPid, QPid) -> rabbit_misc:with_exit_handler( fun () -> true end, - fun () -> gen_server2:call(LimiterPid, {can_send, QPid}) end). + fun () -> gen_server2:call(LimiterPid, {can_send, QPid}, infinity) end). %% Let the limiter know that the channel has received some acks from a %% consumer diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 5d176f8fac..eced0b3cbe 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -46,6 +46,7 @@ -export([ensure_ok/2]). -export([localnode/1, tcp_name/3]). -export([intersperse/2, upmap/2, map_in_order/2]). +-export([table_foreach/2]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). @@ -98,13 +99,14 @@ -spec(intersperse/2 :: (A, [A]) -> [A]). -spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]). +-spec(table_foreach/2 :: (fun ((any()) -> any()), atom()) -> 'ok'). -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). -spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}). -spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). --spec(format_stderr/2 :: (string(), [any()]) -> 'true'). +-spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). @@ -298,6 +300,21 @@ map_in_order(F, L) -> lists:reverse( lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)). +%% For each entry in a table, execute a function in a transaction. +%% This is often far more efficient than wrapping a tx around the lot. +%% +%% We ignore entries that have been modified or removed. +table_foreach(F, TableName) -> + lists:foreach( + fun (E) -> execute_mnesia_transaction( + fun () -> case mnesia:match_object(TableName, E, read) of + [] -> ok; + _ -> F(E) + end + end) + end, dirty_read_all(TableName)), + ok. + dirty_read_all(TableName) -> mnesia:dirty_select(TableName, [{'$1',[],['$1']}]). @@ -358,9 +375,19 @@ ensure_parent_dirs_exist(Filename) -> end. format_stderr(Fmt, Args) -> - Port = open_port({fd, 0, 2}, [out]), - port_command(Port, io_lib:format(Fmt, Args)), - port_close(Port). + case os:type() of + {unix, _} -> + Port = open_port({fd, 0, 2}, [out]), + port_command(Port, io_lib:format(Fmt, Args)), + port_close(Port); + {win32, _} -> + %% stderr on Windows is buffered and I can't figure out a + %% way to trigger a fflush(stderr) in Erlang. So rather + %% than risk losing output we write to stdout instead, + %% which appears to be unbuffered. + io:format(Fmt, Args) + end, + ok. manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> Iterate(fun (App, Acc) -> diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 5e8edd53a1..d91975359a 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -36,6 +36,17 @@ -define(RPC_SLEEP, 500). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start/0 :: () -> no_return()). +-spec(stop/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + start() -> RpcTimeout = case init:get_argument(maxwait) of diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 94033a4f3d..f4fa45993a 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -49,6 +49,8 @@ -define(LOG_BUNDLE_DELAY, 5). -define(COMPLETE_BUNDLE_DELAY, 2). +-define(HIBERNATE_AFTER, 10000). + -define(MAX_WRAP_ENTRIES, 500). -define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}). @@ -93,7 +95,7 @@ start_link() -> transaction(MessageList) -> ?LOGDEBUG("transaction ~p~n", [MessageList]), TxnKey = rabbit_guid:guid(), - gen_server:call(?SERVER, {transaction, TxnKey, MessageList}). + gen_server:call(?SERVER, {transaction, TxnKey, MessageList}, infinity). extend_transaction(TxnKey, MessageList) -> ?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]), @@ -105,17 +107,17 @@ dirty_work(MessageList) -> commit_transaction(TxnKey) -> ?LOGDEBUG("commit_transaction ~p~n", [TxnKey]), - gen_server:call(?SERVER, {commit_transaction, TxnKey}). + gen_server:call(?SERVER, {commit_transaction, TxnKey}, infinity). rollback_transaction(TxnKey) -> ?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]), gen_server:cast(?SERVER, {rollback_transaction, TxnKey}). force_snapshot() -> - gen_server:call(?SERVER, force_snapshot). + gen_server:call(?SERVER, force_snapshot, infinity). serial() -> - gen_server:call(?SERVER, serial). + gen_server:call(?SERVER, serial, infinity). %%-------------------------------------------------------------------- @@ -164,10 +166,8 @@ handle_call({transaction, Key, MessageList}, From, State) -> do_noreply(internal_commit(From, Key, NewState)); handle_call({commit_transaction, TxnKey}, From, State) -> do_noreply(internal_commit(From, TxnKey, State)); -handle_call(force_snapshot, _From, State = #pstate{log_handle = LH, - snapshot = Snapshot}) -> - ok = take_snapshot(LH, Snapshot), - do_reply(ok, State); +handle_call(force_snapshot, _From, State) -> + do_reply(ok, flush(true, State)); handle_call(serial, _From, State = #pstate{snapshot = #psnapshot{serial = Serial}}) -> do_reply(Serial, State); @@ -183,8 +183,13 @@ handle_cast({extend_transaction, TxnKey, MessageList}, State) -> handle_cast(_Msg, State) -> {noreply, State}. +handle_info(timeout, State = #pstate{deadline = infinity}) -> + State1 = flush(true, State), + %% TODO: Once we drop support for R11B-5, we can change this to + %% {noreply, State1, hibernate}; + proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]); handle_info(timeout, State) -> - {noreply, flush(State)}; + do_noreply(flush(State)); handle_info(_Info, State) -> {noreply, State}. @@ -275,12 +280,13 @@ take_snapshot_and_save_old(LogHandle, Snapshot) -> rabbit_log:info("Saving persister log in ~p~n", [OldFileName]), ok = take_snapshot(LogHandle, OldFileName, Snapshot). -maybe_take_snapshot(State = #pstate{entry_count = EntryCount, log_handle = LH, - snapshot = Snapshot}) - when EntryCount >= ?MAX_WRAP_ENTRIES -> +maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount, + log_handle = LH, + snapshot = Snapshot}) + when Force orelse EntryCount >= ?MAX_WRAP_ENTRIES -> ok = take_snapshot(LH, Snapshot), State#pstate{entry_count = 0}; -maybe_take_snapshot(State) -> +maybe_take_snapshot(_Force, State) -> State. later_ms(DeltaMilliSec) -> @@ -298,7 +304,7 @@ compute_deadline(_TimerDelay, ExistingDeadline) -> ExistingDeadline. compute_timeout(infinity) -> - infinity; + ?HIBERNATE_AFTER; compute_timeout(Deadline) -> DeltaMilliSec = time_diff(Deadline, now()) * 1000.0, if @@ -314,18 +320,18 @@ do_noreply(State = #pstate{deadline = Deadline}) -> do_reply(Reply, State = #pstate{deadline = Deadline}) -> {reply, Reply, State, compute_timeout(Deadline)}. -flush(State = #pstate{pending_logs = PendingLogs, - pending_replies = Waiting, - log_handle = LogHandle}) -> - State1 = if - PendingLogs /= [] -> +flush(State) -> flush(false, State). + +flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs, + pending_replies = Waiting, + log_handle = LogHandle}) -> + State1 = if PendingLogs /= [] -> disk_log:alog(LogHandle, lists:reverse(PendingLogs)), - maybe_take_snapshot( - State#pstate{ - entry_count = State#pstate.entry_count + 1}); - true -> + State#pstate{entry_count = State#pstate.entry_count + 1}; + true -> State end, + State2 = maybe_take_snapshot(ForceSnapshot, State1), if Waiting /= [] -> ok = disk_log:sync(LogHandle), lists:foreach(fun (From) -> gen_server:reply(From, ok) end, @@ -333,7 +339,7 @@ flush(State = #pstate{pending_logs = PendingLogs, true -> ok end, - State1#pstate{deadline = infinity, + State2#pstate{deadline = infinity, pending_logs = [], pending_replies = []}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index dbb9358314..ba6d6e6a42 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -161,10 +161,10 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. info(Pid) -> - gen_server:call(Pid, info). + gen_server:call(Pid, info, infinity). info(Pid, Items) -> - case gen_server:call(Pid, {info, Items}) of + case gen_server:call(Pid, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index ff42ea0460..0b06a063a7 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -112,7 +112,8 @@ deliver_per_node(NodeQPids, Mandatory, Immediate, fun ({Node, QPids}) -> try gen_server2:call( {?SERVER, Node}, - {deliver, QPids, Mandatory, Immediate, Txn, Message}) + {deliver, QPids, Mandatory, Immediate, Txn, Message}, + infinity) catch _Class:_Reason -> %% TODO: figure out what to log (and do!) here diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 6312e8e364..8f0a3a8973 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -45,6 +45,7 @@ test_content_prop_roundtrip(Datum, Binary) -> Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion all_tests() -> + passed = test_priority_queue(), passed = test_parsing(), passed = test_topic_matching(), passed = test_log_management(), @@ -55,6 +56,62 @@ all_tests() -> passed = test_server_status(), passed. +test_priority_queue() -> + + false = priority_queue:is_queue(not_a_queue), + + %% empty Q + Q = priority_queue:new(), + {true, true, 0, [], []} = test_priority_queue(Q), + + %% 1-4 element no-priority Q + true = lists:all(fun (X) -> X =:= passed end, + lists:map(fun test_simple_n_element_queue/1, + lists:seq(1, 4))), + + %% 1-element priority Q + Q1 = priority_queue:in(foo, 1, priority_queue:new()), + {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1), + + %% 2-element same-priority Q + Q2 = priority_queue:in(bar, 1, Q1), + {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} = + test_priority_queue(Q2), + + %% 2-element different-priority Q + Q3 = priority_queue:in(bar, 2, Q1), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = + test_priority_queue(Q3), + + %% 1-element negative priority Q + Q4 = priority_queue:in(foo, -1, priority_queue:new()), + {true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4), + + passed. + +priority_queue_in_all(Q, L) -> + lists:foldl(fun (X, Acc) -> priority_queue:in(X, Acc) end, Q, L). + +priority_queue_out_all(Q) -> + case priority_queue:out(Q) of + {empty, _} -> []; + {{value, V}, Q1} -> [V | priority_queue_out_all(Q1)] + end. + +test_priority_queue(Q) -> + {priority_queue:is_queue(Q), + priority_queue:is_empty(Q), + priority_queue:len(Q), + priority_queue:to_list(Q), + priority_queue_out_all(Q)}. + +test_simple_n_element_queue(N) -> + Items = lists:seq(1, N), + Q = priority_queue_in_all(priority_queue:new(), Items), + ToListRes = [{0, X} || X <- Items], + {true, false, N, ToListRes, Items} = test_priority_queue(Q), + passed. + test_parsing() -> passed = test_content_properties(), passed. @@ -430,7 +487,13 @@ test_cluster_management2(SecondaryNode) -> ok = control_action(stop_app, []), {error, {no_running_cluster_nodes, _, _}} = control_action(reset, []), + + %% leave system clustered, with the secondary node as a ram node ok = control_action(force_reset, []), + ok = control_action(start_app, []), + ok = control_action(force_reset, SecondaryNode, []), + ok = control_action(cluster, SecondaryNode, [NodeS]), + ok = control_action(start_app, SecondaryNode, []), passed. |
