diff options
| author | Michael Bridgen <mikeb@lshift.net> | 2010-02-15 13:05:01 +0000 |
|---|---|---|
| committer | Michael Bridgen <mikeb@lshift.net> | 2010-02-15 13:05:01 +0000 |
| commit | 04070fc945b9619e98007b0d574f92fe1e7a8b67 (patch) | |
| tree | eb7539cbec2d8a880e339fd9eda0cb9e3c5fa79e | |
| parent | c73b4da7b75f2bec2a81ae26e756780355361134 (diff) | |
| parent | dc574a8ef52dd292084dedfc0692665285f3a7a7 (diff) | |
| download | rabbitmq-server-git-04070fc945b9619e98007b0d574f92fe1e7a8b67.tar.gz | |
De-bitrot by merging default in. Passes rabbit_tests:all_tests() and
the ../rabbitmq-test test suite. Some unused variables appear to have crept
in during previous commits to the branch; will fix in another commit.
79 files changed, 1685 insertions, 767 deletions
@@ -4,6 +4,7 @@ syntax: glob *.swp *.patch erl_crash.dump +deps.mk syntax: regexp ^cover/ @@ -19,6 +20,7 @@ syntax: regexp ^packaging/RPMS/Fedora/(BUILD|RPMS|SOURCES|SPECS|SRPMS)$ ^packaging/debs/Debian/rabbitmq-server_.*\.(dsc|(diff|tar)\.gz|deb|changes)$ ^packaging/debs/apt-repository/debian$ +^packaging/macports/macports$ ^packaging/generic-unix/rabbitmq-server-generic-unix-.*\.tar\.gz$ ^packaging/windows/rabbitmq-server-windows-.*\.zip$ diff --git a/LICENSE-MPL-RabbitMQ b/LICENSE-MPL-RabbitMQ index 2d0a7b1db2..221c93501d 100644 --- a/LICENSE-MPL-RabbitMQ +++ b/LICENSE-MPL-RabbitMQ @@ -454,11 +454,11 @@ EXHIBIT A -Mozilla Public License. are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. - Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift + Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift Ltd. Portions created by Cohesive Financial Technologies LLC are - Copyright (C) 2007-2009 Cohesive Financial Technologies + Copyright (C) 2007-2010 Cohesive Financial Technologies LLC. Portions created by Rabbit Technologies Ltd are Copyright - (C) 2007-2009 Rabbit Technologies Ltd. + (C) 2007-2010 Rabbit Technologies Ltd. All Rights Reserved. @@ -6,12 +6,14 @@ RABBITMQ_SERVER_START_ARGS ?= RABBITMQ_MNESIA_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-mnesia RABBITMQ_LOG_BASE ?= $(TMPDIR) +DEPS_FILE=deps.mk SOURCE_DIR=src EBIN_DIR=ebin INCLUDE_DIR=include -SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) -BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES)) -TARGETS=$(EBIN_DIR)/rabbit.app $(BEAM_TARGETS) +INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl +SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl +BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES)) +TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS) WEB_URL=http://stage.rabbitmq.com/ MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod)) @@ -58,13 +60,13 @@ ERL_EBIN=erl -noinput -pa $(EBIN_DIR) all: $(TARGETS) +$(DEPS_FILE): $(SOURCES) $(INCLUDES) + escript generate_deps $(INCLUDE_DIR) $(SOURCE_DIR) \$$\(EBIN_DIR\) $@ + $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app escript generate_app $(EBIN_DIR) $@ < $< -$(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl - erlc $(ERLC_OPTS) $< - -$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam +$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< # ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< @@ -100,6 +102,7 @@ clean: rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc rm -f docs/*.[0-9].gz rm -f $(RABBIT_PLT) + rm -f $(DEPS_FILE) cleandb: rm -rf $(RABBITMQ_MNESIA_DIR)/* @@ -170,7 +173,7 @@ srcdist: distclean sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ - cp codegen.py Makefile generate_app calculate-relative $(TARGET_SRC_DIR) + cp codegen.py Makefile generate_app generate_deps calculate-relative $(TARGET_SRC_DIR) cp -r scripts $(TARGET_SRC_DIR) cp -r docs $(TARGET_SRC_DIR) @@ -220,3 +223,5 @@ install: all docs_all install_dirs install_dirs: mkdir -p $(SBIN_DIR) mkdir -p $(TARGET_DIR)/sbin + +-include $(DEPS_FILE) diff --git a/codegen.py b/codegen.py index 3608f4c200..b5c91cbd3f 100644 --- a/codegen.py +++ b/codegen.py @@ -18,11 +18,11 @@ ## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial ## Technologies LLC, and Rabbit Technologies Ltd. ## -## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift ## Ltd. Portions created by Cohesive Financial Technologies LLC are -## Copyright (C) 2007-2009 Cohesive Financial Technologies +## Copyright (C) 2007-2010 Cohesive Financial Technologies ## LLC. Portions created by Rabbit Technologies Ltd are Copyright -## (C) 2007-2009 Rabbit Technologies Ltd. +## (C) 2007-2010 Rabbit Technologies Ltd. ## ## All Rights Reserved. ## @@ -92,6 +92,40 @@ class PackedMethodBitField: def full(self): return self.count() == 8 + +def printFileHeader(): + print """%% Autogenerated code. Do not edit. +%% +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%%""" def genErl(spec): def erlType(domain): @@ -180,6 +214,8 @@ def genErl(spec): elif type == 'table': print " F%d = rabbit_binary_parser:parse_table(F%dTab)," % \ (f.index, f.index) + elif type == 'shortstr': + print " if F%dLen > 255 -> exit(method_field_shortstr_overflow); true -> ok end," % (f.index) else: pass @@ -212,7 +248,10 @@ def genErl(spec): elif type == 'table': print " F%dTab = rabbit_binary_generator:generate_table(F%d)," % (f.index, f.index) print " F%dLen = size(F%dTab)," % (f.index, f.index) - elif type in ['shortstr', 'longstr']: + elif type == 'shortstr': + print " F%dLen = size(F%d)," % (f.index, f.index) + print " if F%dLen > 255 -> exit(method_field_shortstr_overflow); true -> ok end," % (f.index) + elif type == 'longstr': print " F%dLen = size(F%d)," % (f.index, f.index) else: pass @@ -251,6 +290,7 @@ def genErl(spec): methods = spec.allMethods() + printFileHeader() print """-module(rabbit_framing). -include("rabbit_framing.hrl"). @@ -325,6 +365,7 @@ def genHrl(spec): methods = spec.allMethods() + printFileHeader() print "-define(PROTOCOL_VERSION_MAJOR, %d)." % (spec.major) print "-define(PROTOCOL_VERSION_MINOR, %d)." % (spec.minor) print "-define(PROTOCOL_VERSION_REVISION, %d)." % (spec.revision) diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index 5255be28a0..e26767ab4f 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -98,6 +98,13 @@ nodes determined by I<clusternode> option(s). See L<http://www.rabbitmq.com/clustering.html> for more information about clustering. +=item close_connection I<connectionpid> I<explanation> + +Instruct the broker to close the connection associated with the Erlang +process id I<connectionpid> (see also the I<list_connections> +command), passing the I<explanation> string to the connected client as +part of the AMQP connection shutdown protocol. + =back =head2 USER MANAGEMENT @@ -202,6 +209,22 @@ queue arguments id of the Erlang process associated with the queue +=item owner_pid + +id of the Erlang process representing the connection which is the +exclusive owner of the queue, or empty if the queue is non-exclusive + +=item exclusive_consumer_pid + +id of the Erlang process representing the channel of the exclusive +consumer subscribed to this queue, or empty if there is no exclusive +consumer + +=item exclusive_consumer_tag + +consumer tag of the exclusive consumer subscribed to this queue, or +empty if there is no exclusive consumer + =item messages_ready number of messages ready to be delivered to clients @@ -284,7 +307,7 @@ separated by tab characters. =item list_connections [I<connectioninfoitem> ...] -List queue information by virtual host. Each line printed describes an +List current AMQP connections. Each line printed describes a connection, with the requested I<connectioninfoitem> values separated by tab characters. If no I<connectioninfoitem>s are specified then I<user>, I<peer_address>, I<peer_port> and I<state> are assumed. @@ -295,7 +318,7 @@ I<user>, I<peer_address>, I<peer_port> and I<state> are assumed. =over -=item node +=item pid id of the Erlang process associated with the connection @@ -367,10 +390,87 @@ send queue size =back -The list_queues, list_exchanges and list_bindings commands accept an -optional virtual host parameter for which to display results, -defaulting to I<"/">. The default can be overridden with the B<-p> -flag. +=over + +=item list_channels [I<channelinfoitem> ...] + +List channel information. Each line printed describes a channel, with +the requested I<channelinfoitem> values separated by tab characters. +If no I<channelinfoitem>s are specified then I<pid>, I<user>, +I<transactional>, I<consumer_count>, and I<messages_unacknowledged> +are assumed. + +The list includes channels which are part of ordinary AMQP connections +(as listed by list_connections) and channels created by various +plug-ins and other extensions. + +=back + +=head3 Channel information items + +=over + +=item pid + +id of the Erlang process associated with the channel + +=item connection + +id of the Erlang process associated with the connection to which the +channel belongs + +=item number + +the number of the channel, which uniquely identifies it within a +connection + +=item user + +username associated with the channel + +=item vhost + +virtual host in which the channel operates + +=item transactional + +true if the channel is in transactional mode, false otherwise + +=item consumer_count + +number of logical AMQP consumers retrieving messages via the channel + +=item messages_unacknowledged + +number of messages delivered via this channel but not yet acknowledged + +=item acks_uncommitted + +number of acknowledgements received in an as yet uncommitted +transaction + +=item prefetch_count + +QoS prefetch count limit in force, 0 if unlimited + +=back + +=item list_consumers + +List consumers, i.e. subscriptions to a queue's message stream. Each +line printed shows, separated by tab characters, the name of the queue +subscribed to, the id of the channel process via which the +subscription was created and is managed, the consumer tag which +uniquely identifies the subscription within a channel, and a boolean +indicating whether acknowledgements are expected for messages +delivered to this consumer. + +=back + +The list_queues, list_exchanges, list_bindings and list_consumers +commands accept an optional virtual host parameter for which to +display results, defaulting to I<"/">. The default can be overridden +with the B<-p> flag. =head1 OUTPUT ESCAPING diff --git a/generate_deps b/generate_deps new file mode 100644 index 0000000000..916006d101 --- /dev/null +++ b/generate_deps @@ -0,0 +1,52 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +-mode(compile). + +main([IncludeDir, ErlDir, EbinDir, TargetFile]) -> + ErlDirContents = filelib:wildcard("*.erl", ErlDir), + ErlFiles = [filename:join(ErlDir, FileName) || FileName <- ErlDirContents], + Modules = sets:from_list( + [list_to_atom(filename:basename(FileName, ".erl")) || + FileName <- ErlDirContents]), + Headers = sets:from_list( + [filename:join(IncludeDir, FileName) || + FileName <- filelib:wildcard("*.hrl", IncludeDir)]), + Deps = lists:foldl( + fun (Path, Deps1) -> + dict:store(Path, detect_deps(IncludeDir, EbinDir, + Modules, Headers, Path), + Deps1) + end, dict:new(), ErlFiles), + {ok, Hdl} = file:open(TargetFile, [write, delayed_write]), + dict:fold( + fun (_Path, [], ok) -> + ok; + (Path, Dep, ok) -> + Module = filename:basename(Path, ".erl"), + ok = file:write(Hdl, [EbinDir, "/", Module, ".beam:"]), + ok = sets:fold(fun (E, ok) -> file:write(Hdl, [" ", E]) end, + ok, Dep), + file:write(Hdl, [" ", ErlDir, "/", Module, ".erl\n"]) + end, ok, Deps), + ok = file:write(Hdl, [TargetFile, ": ", escript:script_name(), "\n"]), + ok = file:sync(Hdl), + ok = file:close(Hdl). + +detect_deps(IncludeDir, EbinDir, Modules, Headers, Path) -> + {ok, Forms} = epp:parse_file(Path, [IncludeDir], [{use_specs, true}]), + lists:foldl( + fun ({attribute, _LineNumber, behaviour, Behaviour}, Deps) -> + case sets:is_element(Behaviour, Modules) of + true -> sets:add_element( + [EbinDir, "/", atom_to_list(Behaviour), ".beam"], + Deps); + false -> Deps + end; + ({attribute, _LineNumber, file, {FileName, _LineNumber1}}, Deps) -> + case sets:is_element(FileName, Headers) of + true -> sets:add_element(FileName, Deps); + false -> Deps + end; + (_Form, Deps) -> + Deps + end, sets:new(), Forms). diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 9db497fd28..3baf8c1a67 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -128,11 +128,17 @@ properties :: amqp_properties(), properties_bin :: 'none', payload_fragments_rev :: [binary()]}). +-type(unencoded_content() :: undecoded_content()). -type(decoded_content() :: #content{class_id :: amqp_class_id(), properties :: amqp_properties(), properties_bin :: maybe(binary()), payload_fragments_rev :: [binary()]}). +-type(encoded_content() :: + #content{class_id :: amqp_class_id(), + properties :: maybe(amqp_properties()), + properties_bin :: binary(), + payload_fragments_rev :: [binary()]}). -type(content() :: undecoded_content() | decoded_content()). -type(basic_message() :: #basic_message{exchange_name :: exchange_name(), @@ -164,7 +170,7 @@ %%---------------------------------------------------------------------------- --define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2009 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."). +-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."). -define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/"). -ifdef(debug). diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl index a78c230167..199a0f89c8 100644 --- a/include/rabbit_framing_spec.hrl +++ b/include/rabbit_framing_spec.hrl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index fa2844fddf..bc5b58cad8 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -34,6 +34,8 @@ prepare: -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/sysconfig/rabbitmq|' \ -e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \ SOURCES/rabbitmq-server.init + sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \ + SOURCES/rabbitmq-script-wrapper cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate server: prepare diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 62fb1dfbc5..4dd223086f 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -118,6 +118,9 @@ fi rm -rf %{buildroot} %changelog +* Fri Jan 22 2010 Matthew Sackman <matthew@lshift.net> 1.7.1-1 +- New Upstream Release + * Mon Oct 5 2009 David Wragg <dpw@lshift.net> 1.7.0-1 - New upstream release diff --git a/packaging/common/rabbitmq-asroot-script-wrapper b/packaging/common/rabbitmq-asroot-script-wrapper index ee5947b66c..693a6f0b83 100644 --- a/packaging/common/rabbitmq-asroot-script-wrapper +++ b/packaging/common/rabbitmq-asroot-script-wrapper @@ -19,11 +19,11 @@ ## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial ## Technologies LLC, and Rabbit Technologies Ltd. ## -## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift ## Ltd. Portions created by Cohesive Financial Technologies LLC are -## Copyright (C) 2007-2009 Cohesive Financial Technologies +## Copyright (C) 2007-2010 Cohesive Financial Technologies ## LLC. Portions created by Rabbit Technologies Ltd are Copyright -## (C) 2007-2009 Rabbit Technologies Ltd. +## (C) 2007-2010 Rabbit Technologies Ltd. ## ## All Rights Reserved. ## diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper index dfb714f16e..79096a4e92 100644 --- a/packaging/common/rabbitmq-script-wrapper +++ b/packaging/common/rabbitmq-script-wrapper @@ -19,11 +19,11 @@ ## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial ## Technologies LLC, and Rabbit Technologies Ltd. ## -## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift ## Ltd. Portions created by Cohesive Financial Technologies LLC are -## Copyright (C) 2007-2009 Cohesive Financial Technologies +## Copyright (C) 2007-2010 Cohesive Financial Technologies ## LLC. Portions created by Rabbit Technologies Ltd are Copyright -## (C) 2007-2009 Rabbit Technologies Ltd. +## (C) 2007-2010 Rabbit Technologies Ltd. ## ## All Rights Reserved. ## @@ -45,7 +45,7 @@ cd /var/lib/rabbitmq SCRIPT=`basename $0` if [ `id -u` = 0 ] ; then - su rabbitmq -s /bin/sh -c "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}" + @SU_RABBITMQ_SH_C@ "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}" elif [ `id -u` = `id -u rabbitmq` ] ; then /usr/lib/rabbitmq/bin/${SCRIPT} "$@" else diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index dafaf9cef4..ab05f73225 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -26,6 +26,8 @@ package: clean -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/default/rabbitmq|' \ -e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \ $(UNPACKED_DIR)/debian/rabbitmq-server.init + sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \ + $(UNPACKED_DIR)/debian/rabbitmq-script-wrapper chmod a+x $(UNPACKED_DIR)/debian/rules UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR) cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING) diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index e4cfe7b547..796a301a29 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (1.7.1-1) intrepid; urgency=low + + * New Upstream Release + + -- Matthew Sackman <matthew@lshift.net> Fri, 22 Jan 2010 14:14:29 +0000 + rabbitmq-server (1.7.0-1) intrepid; urgency=low * New Upstream Release diff --git a/packaging/debs/Debian/debian/copyright b/packaging/debs/Debian/debian/copyright index 69867220f0..a569f31aaa 100755 --- a/packaging/debs/Debian/debian/copyright +++ b/packaging/debs/Debian/debian/copyright @@ -5,7 +5,7 @@ It was downloaded from http://www.rabbitmq.com/ The file codegen/amqp-0.8.json is covered by the following terms: - "Copyright (C) 2008-2009 LShift Ltd, Cohesive Financial Technologies LLC, + "Copyright (C) 2008-2010 LShift Ltd, Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd Permission is hereby granted, free of charge, to any person @@ -39,11 +39,11 @@ Authors and Copyright are as described below: are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. - Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift + Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift Ltd. Portions created by Cohesive Financial Technologies LLC are - Copyright (C) 2007-2009 Cohesive Financial Technologies + Copyright (C) 2007-2010 Cohesive Financial Technologies LLC. Portions created by Rabbit Technologies Ltd are Copyright - (C) 2007-2009 Rabbit Technologies Ltd. + (C) 2007-2010 Rabbit Technologies Ltd. MOZILLA PUBLIC LICENSE @@ -502,11 +502,11 @@ EXHIBIT A -Mozilla Public License. are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. - Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift + Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift Ltd. Portions created by Cohesive Financial Technologies LLC are - Copyright (C) 2007-2009 Cohesive Financial Technologies + Copyright (C) 2007-2010 Cohesive Financial Technologies LLC. Portions created by Rabbit Technologies Ltd are Copyright - (C) 2007-2009 Rabbit Technologies Ltd. + (C) 2007-2010 Rabbit Technologies Ltd. All Rights Reserved. @@ -524,7 +524,7 @@ EXHIBIT A -Mozilla Public License. If you have any questions regarding licensing, please contact us at info@rabbitmq.com. -The Debian packaging is (C) 2007-2009, Rabbit Technologies Ltd. <info@rabbitmq.com> +The Debian packaging is (C) 2007-2010, Rabbit Technologies Ltd. <info@rabbitmq.com> and is licensed under the MPL 1.1, see above. diff --git a/packaging/macports/Makefile b/packaging/macports/Makefile new file mode 100644 index 0000000000..d5633955b9 --- /dev/null +++ b/packaging/macports/Makefile @@ -0,0 +1,55 @@ +TARBALL_DIR=../../dist +TARBALL=$(notdir $(wildcard $(TARBALL_DIR)/rabbitmq-server-[0-9.]*.tar.gz)) +COMMON_DIR=../common +VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g') + +# The URL at which things really get deployed +REAL_WEB_URL=http://www.rabbitmq.com/ + +# The user@host for an OSX machine with macports installed, which is +# used to generate the macports index files. That step will be +# skipped if this variable is not set. If you do set it, you might +# also want to set SSH_OPTS, which allows adding ssh options, e.g. to +# specify a key that will get into the OSX machine without a +# passphrase. +MACPORTS_USERHOST= + +MACPORTS_DIR=macports +DEST=$(MACPORTS_DIR)/net/rabbitmq-server + +all: macports + +dirs: + mkdir -p $(DEST)/files + +$(DEST)/Portfile: Portfile.in + for algo in md5 sha1 rmd160 ; do \ + checksum=$$(openssl $$algo $(TARBALL_DIR)/$(TARBALL) | awk '{print $$NF}') ; \ + echo "s|@$$algo@|$$checksum|g" ; \ + done >checksums.sed + sed -e "s|@VERSION@|$(VERSION)|g;s|@BASE_URL@|$(REAL_WEB_URL)|g" \ + -f checksums.sed <$^ >$@ + rm checksums.sed + +macports: dirs $(DEST)/Portfile + for f in rabbitmq-asroot-script-wrapper rabbitmq-script-wrapper ; do \ + cp $(COMMON_DIR)/$$f $(DEST)/files ; \ + done + sed -i -e 's|@SU_RABBITMQ_SH_C@|sudo -E -u rabbitmq -H /bin/sh -c|' \ + $(DEST)/files/rabbitmq-script-wrapper + cp patch-org.macports.rabbitmq-server.plist.diff $(DEST)/files + if [ -n "$(MACPORTS_USERHOST)" ] ; then \ + tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) lshift@macrabbit ' \ + d="/tmp/mkportindex.$$$$" ; \ + mkdir $$d \ + && cd $$d \ + && tar xf - \ + && /opt/local/bin/portindex -a -o . >/dev/null \ + && tar cf - . \ + && cd \ + && rm -rf $$d' \ + | tar xf - -C $(MACPORTS_DIR) ; \ + fi + +clean: + rm -rf $(DEST) checksums.sed diff --git a/packaging/macports/net/rabbitmq-server/Portfile b/packaging/macports/Portfile.in index 739f99d0e7..e1f582124a 100644 --- a/packaging/macports/net/rabbitmq-server/Portfile +++ b/packaging/macports/Portfile.in @@ -3,10 +3,10 @@ PortSystem 1.0 name rabbitmq-server -version 1.7.0 -revision 0 +version @VERSION@ +revision 1 categories net -maintainers tonyg@rabbitmq.com +maintainers rabbitmq.com:tonyg platforms darwin description The RabbitMQ AMQP Server long_description \ @@ -15,13 +15,13 @@ long_description \ robust and scalable implementation of an AMQP broker. -homepage http://www.rabbitmq.com/ -master_sites http://www.rabbitmq.com/releases/rabbitmq-server/v${version}/ +homepage @BASE_URL@ +master_sites @BASE_URL@releases/rabbitmq-server/v${version}/ checksums \ - md5 4505ca0fd8718439bd6f5e2af2379e56 \ - sha1 84fb86d403057bb808c1b51deee0c1fca3bf7bef \ - rmd160 092f90946825cc3eb277019805e24db637a559f4 + md5 @md5@ \ + sha1 @sha1@ \ + rmd160 @rmd160@ depends_build port:erlang depends_run port:erlang diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper deleted file mode 100644 index c4488dcbe5..0000000000 --- a/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/bash -cd /var/lib/rabbitmq - -SCRIPT=`basename $0` - -if [ `id -u` = 0 ] ; then - /usr/lib/rabbitmq/bin/${SCRIPT} "$@" -else - echo -e "\nOnly root should run ${SCRIPT}\n" - exit 1 -fi - diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper deleted file mode 100644 index 80cb7bd53c..0000000000 --- a/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper +++ /dev/null @@ -1,15 +0,0 @@ -#!/bin/bash -cd /var/lib/rabbitmq - -SCRIPT=`basename $0` - -if [ `id -u` = 0 ] ; then - sudo -u rabbitmq -H /usr/lib/rabbitmq/bin/${SCRIPT} "$@" -elif [ `id -u` = `id -u rabbitmq` ] ; then - /usr/lib/rabbitmq/bin/${SCRIPT} "$@" -else - /usr/lib/rabbitmq/bin/${SCRIPT} - echo -e "\nOnly root or rabbitmq should run ${SCRIPT}\n" - exit 1 -fi - diff --git a/packaging/macports/net/rabbitmq-server/files/patch-org.macports.rabbitmq-server.plist.diff b/packaging/macports/patch-org.macports.rabbitmq-server.plist.diff index 45b4949616..45b4949616 100644 --- a/packaging/macports/net/rabbitmq-server/files/patch-org.macports.rabbitmq-server.plist.diff +++ b/packaging/macports/patch-org.macports.rabbitmq-server.plist.diff diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile index f17fe77742..c9e818ac8b 100644 --- a/packaging/windows/Makefile +++ b/packaging/windows/Makefile @@ -22,7 +22,7 @@ dist: mv $(SOURCE_DIR) $(TARGET_DIR) pod2text --loose rabbitmq-service.pod $(TARGET_DIR)/readme-service.txt - unix2dos $(TARGET_DIR)/readme-service.txt + todos $(TARGET_DIR)/readme-service.txt zip -r $(TARGET_ZIP).zip $(TARGET_DIR) rm -rf $(TARGET_DIR) diff --git a/scripts/rabbitmq-activate-plugins b/scripts/rabbitmq-activate-plugins index 5ce64c686c..00ee6c61c8 100755 --- a/scripts/rabbitmq-activate-plugins +++ b/scripts/rabbitmq-activate-plugins @@ -19,11 +19,11 @@ ## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial ## Technologies LLC, and Rabbit Technologies Ltd. ## -## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift ## Ltd. Portions created by Cohesive Financial Technologies LLC are -## Copyright (C) 2007-2009 Cohesive Financial Technologies +## Copyright (C) 2007-2010 Cohesive Financial Technologies ## LLC. Portions created by Rabbit Technologies Ltd are Copyright -## (C) 2007-2009 Rabbit Technologies Ltd. +## (C) 2007-2010 Rabbit Technologies Ltd. ## ## All Rights Reserved. ## diff --git a/scripts/rabbitmq-activate-plugins.bat b/scripts/rabbitmq-activate-plugins.bat index e7aa709544..3c9a057c95 100644 --- a/scripts/rabbitmq-activate-plugins.bat +++ b/scripts/rabbitmq-activate-plugins.bat @@ -19,11 +19,11 @@ REM Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd REM are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
REM Technologies LLC, and Rabbit Technologies Ltd.
REM
-REM Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+REM Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
REM Ltd. Portions created by Cohesive Financial Technologies LLC are
-REM Copyright (C) 2007-2009 Cohesive Financial Technologies
+REM Copyright (C) 2007-2010 Cohesive Financial Technologies
REM LLC. Portions created by Rabbit Technologies Ltd are Copyright
-REM (C) 2007-2009 Rabbit Technologies Ltd.
+REM (C) 2007-2010 Rabbit Technologies Ltd.
REM
REM All Rights Reserved.
REM
@@ -32,7 +32,13 @@ REM setlocal
-if not exist "%ERLANG_HOME%\bin\erl.exe" (
+rem Preserve values that might contain exclamation marks before
+rem enabling delayed expansion
+set TDP0=%~dp0
+set STAR=%*
+setlocal enabledelayedexpansion
+
+if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
echo ERLANG_HOME not set correctly.
@@ -44,17 +50,18 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" ( exit /B
)
-set RABBITMQ_PLUGINS_DIR=%~dp0..\plugins
-set RABBITMQ_PLUGINS_EXPAND_DIR=%~dp0..\priv\plugins
-set RABBITMQ_EBIN_DIR=%~dp0..\ebin
+set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+set RABBITMQ_PLUGINS_EXPAND_DIR=!TDP0!..\priv\plugins
+set RABBITMQ_EBIN_DIR=!TDP0!..\ebin
-"%ERLANG_HOME%\bin\erl.exe" ^
--pa "%RABBITMQ_EBIN_DIR%" ^
+"!ERLANG_HOME!\bin\erl.exe" ^
+-pa "!RABBITMQ_EBIN_DIR!" ^
-noinput -hidden ^
-s rabbit_plugin_activator ^
--rabbit plugins_dir \""%RABBITMQ_PLUGINS_DIR:\=/%"\" ^
--rabbit plugins_expand_dir \""%RABBITMQ_PLUGINS_EXPAND_DIR:\=/%"\" ^
--rabbit rabbit_ebin \""%RABBITMQ_EBIN_DIR:\=/%"\" ^
--extra %*
+-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
+-rabbit rabbit_ebin \""!RABBITMQ_EBIN_DIR:\=/!"\" ^
+-extra !STAR!
endlocal
+endlocal
diff --git a/scripts/rabbitmq-deactivate-plugins b/scripts/rabbitmq-deactivate-plugins index 771c473496..3fd71bfacd 100755 --- a/scripts/rabbitmq-deactivate-plugins +++ b/scripts/rabbitmq-deactivate-plugins @@ -19,11 +19,11 @@ ## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial ## Technologies LLC, and Rabbit Technologies Ltd. ## -## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift ## Ltd. Portions created by Cohesive Financial Technologies LLC are -## Copyright (C) 2007-2009 Cohesive Financial Technologies +## Copyright (C) 2007-2010 Cohesive Financial Technologies ## LLC. Portions created by Rabbit Technologies Ltd are Copyright -## (C) 2007-2009 Rabbit Technologies Ltd. +## (C) 2007-2010 Rabbit Technologies Ltd. ## ## All Rights Reserved. ## diff --git a/scripts/rabbitmq-deactivate-plugins.bat b/scripts/rabbitmq-deactivate-plugins.bat index 40155183a1..1bc3f88efd 100644 --- a/scripts/rabbitmq-deactivate-plugins.bat +++ b/scripts/rabbitmq-deactivate-plugins.bat @@ -19,11 +19,11 @@ REM Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd REM are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
REM Technologies LLC, and Rabbit Technologies Ltd.
REM
-REM Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+REM Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
REM Ltd. Portions created by Cohesive Financial Technologies LLC are
-REM Copyright (C) 2007-2009 Cohesive Financial Technologies
+REM Copyright (C) 2007-2010 Cohesive Financial Technologies
REM LLC. Portions created by Rabbit Technologies Ltd are Copyright
-REM (C) 2007-2009 Rabbit Technologies Ltd.
+REM (C) 2007-2010 Rabbit Technologies Ltd.
REM
REM All Rights Reserved.
REM
@@ -32,8 +32,14 @@ REM setlocal
-set RABBITMQ_EBIN_DIR=%~dp0..\ebin
+rem Preserve values that might contain exclamation marks before
+rem enabling delayed expansion
+set TDP0=%~dp0
+setlocal enabledelayedexpansion
-del /f "%RABBITMQ_EBIN_DIR%"\rabbit.rel "%RABBITMQ_EBIN_DIR%"\rabbit.script "%RABBITMQ_EBIN_DIR%"\rabbit.boot
+set RABBITMQ_EBIN_DIR=!TDP0!..\ebin
+del /f "!RABBITMQ_EBIN_DIR!"\rabbit.rel "!RABBITMQ_EBIN_DIR!"\rabbit.script "!RABBITMQ_EBIN_DIR!"\rabbit.boot
+
+endlocal
endlocal
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env index 69ddbcfed1..36734874e7 100755 --- a/scripts/rabbitmq-env +++ b/scripts/rabbitmq-env @@ -19,11 +19,11 @@ ## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial ## Technologies LLC, and Rabbit Technologies Ltd. ## -## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift ## Ltd. Portions created by Cohesive Financial Technologies LLC are -## Copyright (C) 2007-2009 Cohesive Financial Technologies +## Copyright (C) 2007-2010 Cohesive Financial Technologies ## LLC. Portions created by Rabbit Technologies Ltd are Copyright -## (C) 2007-2009 Rabbit Technologies Ltd. +## (C) 2007-2010 Rabbit Technologies Ltd. ## ## All Rights Reserved. ## diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi index 1a7eb97e08..8341d35c8c 100755 --- a/scripts/rabbitmq-multi +++ b/scripts/rabbitmq-multi @@ -19,19 +19,17 @@ ## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial ## Technologies LLC, and Rabbit Technologies Ltd. ## -## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift ## Ltd. Portions created by Cohesive Financial Technologies LLC are -## Copyright (C) 2007-2009 Cohesive Financial Technologies +## Copyright (C) 2007-2010 Cohesive Financial Technologies ## LLC. Portions created by Rabbit Technologies Ltd are Copyright -## (C) 2007-2009 Rabbit Technologies Ltd. +## (C) 2007-2010 Rabbit Technologies Ltd. ## ## All Rights Reserved. ## ## Contributor(s): ______________________________________. ## NODENAME=rabbit -NODE_IP_ADDRESS=0.0.0.0 -NODE_PORT=5672 SCRIPT_HOME=$(dirname $0) PIDS_FILE=/var/lib/rabbitmq/pids MULTI_ERL_ARGS= @@ -40,14 +38,18 @@ CONFIG_FILE=/etc/rabbitmq/rabbitmq . `dirname $0`/rabbitmq-env +DEFAULT_NODE_IP_ADDRESS=0.0.0.0 +DEFAULT_NODE_PORT=5672 +[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} +[ "x" = "x$RABBITMQ_NODE_PORT" ] && [ "x" != "x$NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] then if [ "x" != "x$RABBITMQ_NODE_PORT" ] - then RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} + then RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS} fi else if [ "x" = "x$RABBITMQ_NODE_PORT" ] - then RABBITMQ_NODE_PORT=${NODE_PORT} + then RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT} fi fi [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat index 6dda13af37..a4b7f2e99b 100755..100644 --- a/scripts/rabbitmq-multi.bat +++ b/scripts/rabbitmq-multi.bat @@ -19,11 +19,11 @@ REM Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd REM are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
REM Technologies LLC, and Rabbit Technologies Ltd.
REM
-REM Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+REM Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
REM Ltd. Portions created by Cohesive Financial Technologies LLC are
-REM Copyright (C) 2007-2009 Cohesive Financial Technologies
+REM Copyright (C) 2007-2010 Cohesive Financial Technologies
REM LLC. Portions created by Rabbit Technologies Ltd are Copyright
-REM (C) 2007-2009 Rabbit Technologies Ltd.
+REM (C) 2007-2010 Rabbit Technologies Ltd.
REM
REM All Rights Reserved.
REM
@@ -32,38 +32,44 @@ REM setlocal
-if "%RABBITMQ_BASE%"=="" (
- set RABBITMQ_BASE=%APPDATA%\RabbitMQ
+rem Preserve values that might contain exclamation marks before
+rem enabling delayed expansion
+set TDP0=%~dp0
+set STAR=%*
+setlocal enabledelayedexpansion
+
+if "!RABBITMQ_BASE!"=="" (
+ set RABBITMQ_BASE=!APPDATA!\RabbitMQ
)
-if "%RABBITMQ_NODENAME%"=="" (
+if "!RABBITMQ_NODENAME!"=="" (
set RABBITMQ_NODENAME=rabbit
)
-if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- if not "%RABBITMQ_NODE_PORT%"=="" (
+if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
+ if not "!RABBITMQ_NODE_PORT!"=="" (
set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
)
) else (
- if "%RABBITMQ_NODE_PORT%"=="" (
+ if "!RABBITMQ_NODE_PORT!"=="" (
set RABBITMQ_NODE_PORT=5672
)
)
-set RABBITMQ_PIDS_FILE=%RABBITMQ_BASE%\rabbitmq.pids
-set RABBITMQ_SCRIPT_HOME=%~sdp0%
+set RABBITMQ_PIDS_FILE=!RABBITMQ_BASE!\rabbitmq.pids
+set RABBITMQ_SCRIPT_HOME=!TDP0!
-if "%RABBITMQ_CONFIG_FILE%"=="" (
- set RABBITMQ_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq
+if "!RABBITMQ_CONFIG_FILE!"=="" (
+ set RABBITMQ_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq
)
-if exist "%RABBITMQ_CONFIG_FILE%.config" (
- set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
+if exist "!RABBITMQ_CONFIG_FILE!.config" (
+ set RABBITMQ_CONFIG_ARG=-config "!RABBITMQ_CONFIG_FILE!"
) else (
set RABBITMQ_CONFIG_ARG=
)
-if not exist "%ERLANG_HOME%\bin\erl.exe" (
+if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
echo ERLANG_HOME not set correctly.
@@ -75,14 +81,15 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" ( exit /B
)
-"%ERLANG_HOME%\bin\erl.exe" ^
--pa "%~dp0..\ebin" ^
+"!ERLANG_HOME!\bin\erl.exe" ^
+-pa "!TDP0!..\ebin" ^
-noinput -hidden ^
-%RABBITMQ_MULTI_ERL_ARGS% ^
+!RABBITMQ_MULTI_ERL_ARGS! ^
-sname rabbitmq_multi ^
-%RABBITMQ_CONFIG_ARG% ^
+!RABBITMQ_CONFIG_ARG! ^
-s rabbit_multi ^
-%RABBITMQ_MULTI_START_ARGS% ^
--extra %*
+!RABBITMQ_MULTI_START_ARGS! ^
+-extra !STAR!
endlocal
+endlocal
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 7f08cd9d75..638498c1e2 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -19,11 +19,11 @@ ## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial ## Technologies LLC, and Rabbit Technologies Ltd. ## -## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift ## Ltd. Portions created by Cohesive Financial Technologies LLC are -## Copyright (C) 2007-2009 Cohesive Financial Technologies +## Copyright (C) 2007-2010 Cohesive Financial Technologies ## LLC. Portions created by Rabbit Technologies Ltd are Copyright -## (C) 2007-2009 Rabbit Technologies Ltd. +## (C) 2007-2010 Rabbit Technologies Ltd. ## ## All Rights Reserved. ## @@ -31,10 +31,8 @@ ## NODENAME=rabbit -NODE_IP_ADDRESS=0.0.0.0 -NODE_PORT=5672 SERVER_ERL_ARGS="+K true +A30 \ --kernel inet_default_listen_options [{nodelay,true},{sndbuf,16384},{recbuf,4096}] \ +-kernel inet_default_listen_options [{nodelay,true}] \ -kernel inet_default_connect_options [{nodelay,true}]" CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config CONFIG_FILE=/etc/rabbitmq/rabbitmq @@ -44,14 +42,18 @@ SERVER_START_ARGS= . `dirname $0`/rabbitmq-env +DEFAULT_NODE_IP_ADDRESS=0.0.0.0 +DEFAULT_NODE_PORT=5672 +[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} +[ "x" = "x$RABBITMQ_NODE_PORT" ] && [ "x" != "x$NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] then if [ "x" != "x$RABBITMQ_NODE_PORT" ] - then RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} + then RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS} fi else if [ "x" = "x$RABBITMQ_NODE_PORT" ] - then RABBITMQ_NODE_PORT=${NODE_PORT} + then RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT} fi fi [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 5110285128..28eb8ebb8d 100755..100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -19,11 +19,11 @@ REM Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd REM are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
REM Technologies LLC, and Rabbit Technologies Ltd.
REM
-REM Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+REM Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
REM Ltd. Portions created by Cohesive Financial Technologies LLC are
-REM Copyright (C) 2007-2009 Cohesive Financial Technologies
+REM Copyright (C) 2007-2010 Cohesive Financial Technologies
REM LLC. Portions created by Rabbit Technologies Ltd are Copyright
-REM (C) 2007-2009 Rabbit Technologies Ltd.
+REM (C) 2007-2010 Rabbit Technologies Ltd.
REM
REM All Rights Reserved.
REM
@@ -32,25 +32,31 @@ REM setlocal
-if "%RABBITMQ_BASE%"=="" (
- set RABBITMQ_BASE=%APPDATA%\RabbitMQ
+rem Preserve values that might contain exclamation marks before
+rem enabling delayed expansion
+set TDP0=%~dp0
+set STAR=%*
+setlocal enabledelayedexpansion
+
+if "!RABBITMQ_BASE!"=="" (
+ set RABBITMQ_BASE=!APPDATA!\RabbitMQ
)
-if "%RABBITMQ_NODENAME%"=="" (
+if "!RABBITMQ_NODENAME!"=="" (
set RABBITMQ_NODENAME=rabbit
)
-if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- if not "%RABBITMQ_NODE_PORT%"=="" (
+if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
+ if not "!RABBITMQ_NODE_PORT!"=="" (
set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
)
) else (
- if "%RABBITMQ_NODE_PORT%"=="" (
+ if "!RABBITMQ_NODE_PORT!"=="" (
set RABBITMQ_NODE_PORT=5672
)
)
-if not exist "%ERLANG_HOME%\bin\erl.exe" (
+if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
echo ERLANG_HOME not set correctly.
@@ -62,13 +68,13 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" ( exit /B
)
-set RABBITMQ_BASE_UNIX=%RABBITMQ_BASE:\=/%
+set RABBITMQ_BASE_UNIX=!RABBITMQ_BASE:\=/!
-if "%RABBITMQ_MNESIA_BASE%"=="" (
- set RABBITMQ_MNESIA_BASE=%RABBITMQ_BASE_UNIX%/db
+if "!RABBITMQ_MNESIA_BASE!"=="" (
+ set RABBITMQ_MNESIA_BASE=!RABBITMQ_BASE_UNIX!/db
)
-if "%RABBITMQ_LOG_BASE%"=="" (
- set RABBITMQ_LOG_BASE=%RABBITMQ_BASE_UNIX%/log
+if "!RABBITMQ_LOG_BASE!"=="" (
+ set RABBITMQ_LOG_BASE=!RABBITMQ_BASE_UNIX!/log
)
@@ -77,81 +83,82 @@ rem Log management (rotation, filtering based of size...) is left as an exercice set BACKUP_EXTENSION=.1
-set LOGS=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log
-set SASL_LOGS=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log
+set LOGS=!RABBITMQ_BASE!\log\!RABBITMQ_NODENAME!.log
+set SASL_LOGS=!RABBITMQ_BASE!\log\!RABBITMQ_NODENAME!-sasl.log
-set LOGS_BACKUP=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log%BACKUP_EXTENSION%
-set SASL_LOGS_BACKUP=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log%BACKUP_EXTENSION%
+set LOGS_BACKUP=!RABBITMQ_BASE!\log\!RABBITMQ_NODENAME!.log!BACKUP_EXTENSION!
+set SASL_LOGS_BACKUP=!RABBITMQ_BASE!\log\!RABBITMQ_NODENAME!-sasl.log!BACKUP_EXTENSION!
-if exist "%LOGS%" (
- type "%LOGS%" >> "%LOGS_BACKUP%"
+if exist "!LOGS!" (
+ type "!LOGS!" >> "!LOGS_BACKUP!"
)
-if exist "%SASL_LOGS%" (
- type "%SASL_LOGS%" >> "%SASL_LOGS_BACKUP%"
+if exist "!SASL_LOGS!" (
+ type "!SASL_LOGS!" >> "!SASL_LOGS_BACKUP!"
)
rem End of log management
-if "%RABBITMQ_CLUSTER_CONFIG_FILE%"=="" (
- set RABBITMQ_CLUSTER_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq_cluster.config
+if "!RABBITMQ_CLUSTER_CONFIG_FILE!"=="" (
+ set RABBITMQ_CLUSTER_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq_cluster.config
)
set CLUSTER_CONFIG=
-if not exist "%RABBITMQ_CLUSTER_CONFIG_FILE%" GOTO L1
-set CLUSTER_CONFIG=-rabbit cluster_config \""%RABBITMQ_CLUSTER_CONFIG_FILE:\=/%"\"
+if not exist "!RABBITMQ_CLUSTER_CONFIG_FILE!" GOTO L1
+set CLUSTER_CONFIG=-rabbit cluster_config \""!RABBITMQ_CLUSTER_CONFIG_FILE:\=/!"\"
:L1
-if "%RABBITMQ_MNESIA_DIR%"=="" (
- set RABBITMQ_MNESIA_DIR=%RABBITMQ_MNESIA_BASE%/%RABBITMQ_NODENAME%-mnesia
+if "!RABBITMQ_MNESIA_DIR!"=="" (
+ set RABBITMQ_MNESIA_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-mnesia
)
-set RABBITMQ_EBIN_ROOT=%~dp0..\ebin
-if exist "%RABBITMQ_EBIN_ROOT%\rabbit.boot" (
- echo Using Custom Boot File "%RABBITMQ_EBIN_ROOT%\rabbit.boot"
- set RABBITMQ_BOOT_FILE=%RABBITMQ_EBIN_ROOT%\rabbit
+set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
+if exist "!RABBITMQ_EBIN_ROOT!\rabbit.boot" (
+ echo Using Custom Boot File "!RABBITMQ_EBIN_ROOT!\rabbit.boot"
+ set RABBITMQ_BOOT_FILE=!RABBITMQ_EBIN_ROOT!\rabbit
set RABBITMQ_EBIN_PATH=
) else (
set RABBITMQ_BOOT_FILE=start_sasl
- set RABBITMQ_EBIN_PATH=-pa "%RABBITMQ_EBIN_ROOT%"
+ set RABBITMQ_EBIN_PATH=-pa "!RABBITMQ_EBIN_ROOT!"
)
-if "%RABBITMQ_CONFIG_FILE%"=="" (
- set RABBITMQ_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq
+if "!RABBITMQ_CONFIG_FILE!"=="" (
+ set RABBITMQ_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq
)
-if exist "%RABBITMQ_CONFIG_FILE%.config" (
- set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
+if exist "!RABBITMQ_CONFIG_FILE!.config" (
+ set RABBITMQ_CONFIG_ARG=-config "!RABBITMQ_CONFIG_FILE!"
) else (
set RABBITMQ_CONFIG_ARG=
)
set RABBITMQ_LISTEN_ARG=
-if not "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- if not "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_LISTEN_ARG=-rabbit tcp_listeners [{\""%RABBITMQ_NODE_IP_ADDRESS%"\","%RABBITMQ_NODE_PORT%"}]
+if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
+ if not "!RABBITMQ_NODE_PORT!"=="" (
+ set RABBITMQ_LISTEN_ARG=-rabbit tcp_listeners [{\""!RABBITMQ_NODE_IP_ADDRESS!"\","!RABBITMQ_NODE_PORT!"}]
)
)
-"%ERLANG_HOME%\bin\erl.exe" ^
-%RABBITMQ_EBIN_PATH% ^
+"!ERLANG_HOME!\bin\erl.exe" ^
+!RABBITMQ_EBIN_PATH! ^
-noinput ^
--boot "%RABBITMQ_BOOT_FILE%" ^
-%RABBITMQ_CONFIG_ARG% ^
--sname %RABBITMQ_NODENAME% ^
+-boot "!RABBITMQ_BOOT_FILE!" ^
+!RABBITMQ_CONFIG_ARG! ^
+-sname !RABBITMQ_NODENAME! ^
-s rabbit ^
+W w ^
+A30 ^
--kernel inet_default_listen_options "[{nodelay, true}, {sndbuf, 16384}, {recbuf, 4096}]" ^
+-kernel inet_default_listen_options "[{nodelay, true}]" ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^
-%RABBITMQ_LISTEN_ARG% ^
--kernel error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%.log"\"} ^
-%RABBITMQ_SERVER_ERL_ARGS% ^
+!RABBITMQ_LISTEN_ARG! ^
+-kernel error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!.log"\"} ^
+!RABBITMQ_SERVER_ERL_ARGS! ^
-sasl errlog_type error ^
--sasl sasl_error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%-sasl.log"\"} ^
+-sasl sasl_error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!-sasl.log"\"} ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
--mnesia dir \""%RABBITMQ_MNESIA_DIR%"\" ^
-%CLUSTER_CONFIG% ^
-%RABBITMQ_SERVER_START_ARGS% ^
-%*
+-mnesia dir \""!RABBITMQ_MNESIA_DIR!"\" ^
+!CLUSTER_CONFIG! ^
+!RABBITMQ_SERVER_START_ARGS! ^
+!STAR!
endlocal
+endlocal
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index d960d29dea..a4021fd6a1 100755..100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -19,11 +19,11 @@ REM Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd REM are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
REM Technologies LLC, and Rabbit Technologies Ltd.
REM
-REM Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+REM Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
REM Ltd. Portions created by Cohesive Financial Technologies LLC are
-REM Copyright (C) 2007-2009 Cohesive Financial Technologies
+REM Copyright (C) 2007-2010 Cohesive Financial Technologies
REM LLC. Portions created by Rabbit Technologies Ltd are Copyright
-REM (C) 2007-2009 Rabbit Technologies Ltd.
+REM (C) 2007-2010 Rabbit Technologies Ltd.
REM
REM All Rights Reserved.
REM
@@ -32,61 +32,82 @@ REM setlocal
-if "%RABBITMQ_SERVICENAME%"=="" (
+rem Preserve values that might contain exclamation marks before
+rem enabling delayed expansion
+set TN0=%~n0
+set TDP0=%~dp0
+set P1=%1
+set STAR=%*
+setlocal enabledelayedexpansion
+
+if "!RABBITMQ_SERVICENAME!"=="" (
set RABBITMQ_SERVICENAME=RabbitMQ
)
-if "%RABBITMQ_BASE%"=="" (
- set RABBITMQ_BASE=%APPDATA%\%RABBITMQ_SERVICENAME%
+if "!RABBITMQ_BASE!"=="" (
+ set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
-if "%RABBITMQ_NODENAME%"=="" (
+if "!RABBITMQ_NODENAME!"=="" (
set RABBITMQ_NODENAME=rabbit
)
-if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- if not "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
- )
+if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
+ if not "!RABBITMQ_NODE_PORT!"=="" (
+ set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ )
) else (
- if "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_PORT=5672
- )
-)
-
-if "%ERLANG_SERVICE_MANAGER_PATH%"=="" (
- set ERLANG_SERVICE_MANAGER_PATH=C:\Program Files\erl5.6.5\erts-5.6.5\bin
+ if "!RABBITMQ_NODE_PORT!"=="" (
+ set RABBITMQ_NODE_PORT=5672
+ )
+)
+
+if "!ERLANG_SERVICE_MANAGER_PATH!"=="" (
+ if not exist "!ERLANG_HOME!\bin\erl.exe" (
+ echo.
+ echo ******************************
+ echo ERLANG_HOME not set correctly.
+ echo ******************************
+ echo.
+ echo Please either set ERLANG_HOME to point to your Erlang installation or place the
+ echo RabbitMQ server distribution in the Erlang lib folder.
+ echo.
+ exit /B
+ )
+ for /f "delims=" %%i in ('dir /ad/b "!ERLANG_HOME!"') do if exist "!ERLANG_HOME!\%%i\bin\erlsrv.exe" (
+ set ERLANG_SERVICE_MANAGER_PATH=!ERLANG_HOME!\%%i\bin
+ )
)
set CONSOLE_FLAG=
set CONSOLE_LOG_VALID=
-for %%i in (new reuse) do if "%%i" == "%RABBITMQ_CONSOLE_LOG%" set CONSOLE_LOG_VALID=TRUE
-if "%CONSOLE_LOG_VALID%" == "TRUE" (
- set CONSOLE_FLAG=-debugtype %RABBITMQ_CONSOLE_LOG%
+for %%i in (new reuse) do if "%%i" == "!RABBITMQ_CONSOLE_LOG!" set CONSOLE_LOG_VALID=TRUE
+if "!CONSOLE_LOG_VALID!" == "TRUE" (
+ set CONSOLE_FLAG=-debugtype !RABBITMQ_CONSOLE_LOG!
)
rem *** End of configuration ***
-if not exist "%ERLANG_SERVICE_MANAGER_PATH%\erlsrv.exe" (
+if not exist "!ERLANG_SERVICE_MANAGER_PATH!\erlsrv.exe" (
echo.
echo **********************************************
echo ERLANG_SERVICE_MANAGER_PATH not set correctly.
echo **********************************************
echo.
- echo "%ERLANG_SERVICE_MANAGER_PATH%\erlsrv.exe" not found!
+ echo "!ERLANG_SERVICE_MANAGER_PATH!\erlsrv.exe" not found
echo Please set ERLANG_SERVICE_MANAGER_PATH to the folder containing "erlsrv.exe".
echo.
exit /B 1
)
rem erlang prefers forwardslash as separator in paths
-set RABBITMQ_BASE_UNIX=%RABBITMQ_BASE:\=/%
+set RABBITMQ_BASE_UNIX=!RABBITMQ_BASE:\=/!
-if "%RABBITMQ_MNESIA_BASE%"=="" (
- set RABBITMQ_MNESIA_BASE=%RABBITMQ_BASE_UNIX%/db
+if "!RABBITMQ_MNESIA_BASE!"=="" (
+ set RABBITMQ_MNESIA_BASE=!RABBITMQ_BASE_UNIX!/db
)
-if "%RABBITMQ_LOG_BASE%"=="" (
- set RABBITMQ_LOG_BASE=%RABBITMQ_BASE_UNIX%/log
+if "!RABBITMQ_LOG_BASE!"=="" (
+ set RABBITMQ_LOG_BASE=!RABBITMQ_BASE_UNIX!/log
)
@@ -95,139 +116,140 @@ rem Log management (rotation, filtering based on size...) is left as an exercise set BACKUP_EXTENSION=.1
-set LOGS=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log
-set SASL_LOGS=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log
+set LOGS=!RABBITMQ_BASE!\log\!RABBITMQ_NODENAME!.log
+set SASL_LOGS=!RABBITMQ_BASE!\log\!RABBITMQ_NODENAME!-sasl.log
-set LOGS_BACKUP=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log%BACKUP_EXTENSION%
-set SASL_LOGS_BACKUP=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log%BACKUP_EXTENSION%
+set LOGS_BACKUP=!RABBITMQ_BASE!\log\!RABBITMQ_NODENAME!.log!BACKUP_EXTENSION!
+set SASL_LOGS_BACKUP=!RABBITMQ_BASE!\log\!RABBITMQ_NODENAME!-sasl.log!BACKUP_EXTENSION!
-if exist "%LOGS%" (
- type "%LOGS%" >> "%LOGS_BACKUP%"
+if exist "!LOGS!" (
+ type "!LOGS!" >> "!LOGS_BACKUP!"
)
-if exist "%SASL_LOGS%" (
- type "%SASL_LOGS%" >> "%SASL_LOGS_BACKUP%"
+if exist "!SASL_LOGS!" (
+ type "!SASL_LOGS!" >> "!SASL_LOGS_BACKUP!"
)
rem End of log management
-if "%RABBITMQ_CLUSTER_CONFIG_FILE%"=="" (
- set RABBITMQ_CLUSTER_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq_cluster.config
+if "!RABBITMQ_CLUSTER_CONFIG_FILE!"=="" (
+ set RABBITMQ_CLUSTER_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq_cluster.config
)
set CLUSTER_CONFIG=
-if not exist "%RABBITMQ_CLUSTER_CONFIG_FILE%" GOTO L1
-set CLUSTER_CONFIG=-rabbit cluster_config \""%RABBITMQ_CLUSTER_CONFIG_FILE:\=/%"\"
+if not exist "!RABBITMQ_CLUSTER_CONFIG_FILE!" GOTO L1
+set CLUSTER_CONFIG=-rabbit cluster_config \""!RABBITMQ_CLUSTER_CONFIG_FILE:\=/!"\"
:L1
-if "%RABBITMQ_MNESIA_DIR%"=="" (
- set RABBITMQ_MNESIA_DIR=%RABBITMQ_MNESIA_BASE%/%RABBITMQ_NODENAME%-mnesia
+if "!RABBITMQ_MNESIA_DIR!"=="" (
+ set RABBITMQ_MNESIA_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-mnesia
)
-if "%1" == "install" goto INSTALL_SERVICE
-for %%i in (start stop disable enable list remove) do if "%%i" == "%1" goto MODIFY_SERVICE
+if "!P1!" == "install" goto INSTALL_SERVICE
+for %%i in (start stop disable enable list remove) do if "%%i" == "!P1!" goto MODIFY_SERVICE
echo.
echo *********************
echo Service control usage
echo *********************
echo.
-echo %~n0 help - Display this help
-echo %~n0 install - Install the %RABBITMQ_SERVICENAME% service
-echo %~n0 remove - Remove the %RABBITMQ_SERVICENAME% service
+echo !TN0! help - Display this help
+echo !TN0! install - Install the !RABBITMQ_SERVICENAME! service
+echo !TN0! remove - Remove the !RABBITMQ_SERVICENAME! service
echo.
echo The following actions can also be accomplished by using
echo Windows Services Management Console (services.msc):
echo.
-echo %~n0 start - Start the %RABBITMQ_SERVICENAME% service
-echo %~n0 stop - Stop the %RABBITMQ_SERVICENAME% service
-echo %~n0 disable - Disable the %RABBITMQ_SERVICENAME% service
-echo %~n0 enable - Enable the %RABBITMQ_SERVICENAME% service
+echo !TN0! start - Start the !RABBITMQ_SERVICENAME! service
+echo !TN0! stop - Stop the !RABBITMQ_SERVICENAME! service
+echo !TN0! disable - Disable the !RABBITMQ_SERVICENAME! service
+echo !TN0! enable - Enable the !RABBITMQ_SERVICENAME! service
echo.
exit /B
:INSTALL_SERVICE
-if not exist "%RABBITMQ_BASE%" (
- echo Creating base directory %RABBITMQ_BASE% & md "%RABBITMQ_BASE%"
+if not exist "!RABBITMQ_BASE!" (
+ echo Creating base directory !RABBITMQ_BASE! & md "!RABBITMQ_BASE!"
)
-"%ERLANG_SERVICE_MANAGER_PATH%\erlsrv" list %RABBITMQ_SERVICENAME% 2>NUL 1>NUL
+"!ERLANG_SERVICE_MANAGER_PATH!\erlsrv" list !RABBITMQ_SERVICENAME! 2>NUL 1>NUL
if errorlevel 1 (
- "%ERLANG_SERVICE_MANAGER_PATH%\erlsrv" add %RABBITMQ_SERVICENAME%
+ "!ERLANG_SERVICE_MANAGER_PATH!\erlsrv" add !RABBITMQ_SERVICENAME!
) else (
- echo %RABBITMQ_SERVICENAME% service is already present - only updating service parameters
+ echo !RABBITMQ_SERVICENAME! service is already present - only updating service parameters
)
-set RABBITMQ_EBIN_ROOT=%~dp0..\ebin
-if exist "%RABBITMQ_EBIN_ROOT%\rabbit.boot" (
- echo Using Custom Boot File "%RABBITMQ_EBIN_ROOT%\rabbit.boot"
- set RABBITMQ_BOOT_FILE=%RABBITMQ_EBIN_ROOT%\rabbit
+set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
+if exist "!RABBITMQ_EBIN_ROOT!\rabbit.boot" (
+ echo Using Custom Boot File "!RABBITMQ_EBIN_ROOT!\rabbit.boot"
+ set RABBITMQ_BOOT_FILE=!RABBITMQ_EBIN_ROOT!\rabbit
set RABBITMQ_EBIN_PATH=
) else (
set RABBITMQ_BOOT_FILE=start_sasl
- set RABBITMQ_EBIN_PATH=-pa "%RABBITMQ_EBIN_ROOT%"
+ set RABBITMQ_EBIN_PATH=-pa "!RABBITMQ_EBIN_ROOT!"
)
-if "%RABBITMQ_CONFIG_FILE%"=="" (
- set RABBITMQ_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq
+if "!RABBITMQ_CONFIG_FILE!"=="" (
+ set RABBITMQ_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq
)
-if exist "%RABBITMQ_CONFIG_FILE%.config" (
- set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
+if exist "!RABBITMQ_CONFIG_FILE!.config" (
+ set RABBITMQ_CONFIG_ARG=-config "!RABBITMQ_CONFIG_FILE!"
) else (
set RABBITMQ_CONFIG_ARG=
)
set RABBITMQ_LISTEN_ARG=
-if not "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- if not "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_LISTEN_ARG=-rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\", %RABBITMQ_NODE_PORT%}]"
+if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
+ if not "!RABBITMQ_NODE_PORT!"=="" (
+ set RABBITMQ_LISTEN_ARG=-rabbit tcp_listeners "[{\"!RABBITMQ_NODE_IP_ADDRESS!\", !RABBITMQ_NODE_PORT!}]"
)
)
set ERLANG_SERVICE_ARGUMENTS= ^
-%RABBITMQ_EBIN_PATH% ^
--boot "%RABBITMQ_BOOT_FILE%" ^
-%RABBITMQ_CONFIG_ARG% ^
+!RABBITMQ_EBIN_PATH! ^
+-boot "!RABBITMQ_BOOT_FILE!" ^
+!RABBITMQ_CONFIG_ARG! ^
-s rabbit ^
+W w ^
+A30 ^
--kernel inet_default_listen_options "[{nodelay,true},{sndbuf,16384},{recbuf,4096}]" ^
+-kernel inet_default_listen_options "[{nodelay,true}]" ^
-kernel inet_default_connect_options "[{nodelay,true}]" ^
-%RABBITMQ_LISTEN_ARG% ^
--kernel error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%.log"\"} ^
-%RABBITMQ_SERVER_ERL_ARGS% ^
+!RABBITMQ_LISTEN_ARG! ^
+-kernel error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!.log"\"} ^
+!RABBITMQ_SERVER_ERL_ARGS! ^
-sasl errlog_type error ^
--sasl sasl_error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%-sasl.log"\"} ^
+-sasl sasl_error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!-sasl.log"\"} ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
--mnesia dir \""%RABBITMQ_MNESIA_DIR%"\" ^
-%CLUSTER_CONFIG% ^
-%RABBITMQ_SERVER_START_ARGS% ^
-%*
-
-set ERLANG_SERVICE_ARGUMENTS=%ERLANG_SERVICE_ARGUMENTS:\=\\%
-set ERLANG_SERVICE_ARGUMENTS=%ERLANG_SERVICE_ARGUMENTS:"=\"%
-
-"%ERLANG_SERVICE_MANAGER_PATH%\erlsrv" set %RABBITMQ_SERVICENAME% ^
--machine "%ERLANG_SERVICE_MANAGER_PATH%\erl.exe" ^
--env ERL_CRASH_DUMP="%RABBITMQ_BASE_UNIX%/log" ^
--workdir "%RABBITMQ_BASE%" ^
+-mnesia dir \""!RABBITMQ_MNESIA_DIR!"\" ^
+!CLUSTER_CONFIG! ^
+!RABBITMQ_SERVER_START_ARGS! ^
+!STAR!
+
+set ERLANG_SERVICE_ARGUMENTS=!ERLANG_SERVICE_ARGUMENTS:\=\\!
+set ERLANG_SERVICE_ARGUMENTS=!ERLANG_SERVICE_ARGUMENTS:"=\"!
+
+"!ERLANG_SERVICE_MANAGER_PATH!\erlsrv" set !RABBITMQ_SERVICENAME! ^
+-machine "!ERLANG_SERVICE_MANAGER_PATH!\erl.exe" ^
+-env ERL_CRASH_DUMP="!RABBITMQ_BASE_UNIX!/erl_crash.dump" ^
+-workdir "!RABBITMQ_BASE!" ^
-stopaction "rabbit:stop_and_halt()." ^
--sname %RABBITMQ_NODENAME% ^
-%CONSOLE_FLAG% ^
--args "%ERLANG_SERVICE_ARGUMENTS%" > NUL
+-sname !RABBITMQ_NODENAME! ^
+!CONSOLE_FLAG! ^
+-args "!ERLANG_SERVICE_ARGUMENTS!" > NUL
goto END
:MODIFY_SERVICE
-"%ERLANG_SERVICE_MANAGER_PATH%\erlsrv" %1 %RABBITMQ_SERVICENAME%
+"!ERLANG_SERVICE_MANAGER_PATH!\erlsrv" !P1! !RABBITMQ_SERVICENAME!
goto END
:END
endlocal
+endlocal
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index a332afc6ca..cfb775eb67 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -19,11 +19,11 @@ ## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial ## Technologies LLC, and Rabbit Technologies Ltd. ## -## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift ## Ltd. Portions created by Cohesive Financial Technologies LLC are -## Copyright (C) 2007-2009 Cohesive Financial Technologies +## Copyright (C) 2007-2010 Cohesive Financial Technologies ## LLC. Portions created by Rabbit Technologies Ltd are Copyright -## (C) 2007-2009 Rabbit Technologies Ltd. +## (C) 2007-2010 Rabbit Technologies Ltd. ## ## All Rights Reserved. ## diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index 512e8587dc..5557245165 100755..100644 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -19,11 +19,11 @@ REM Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd REM are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
REM Technologies LLC, and Rabbit Technologies Ltd.
REM
-REM Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+REM Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
REM Ltd. Portions created by Cohesive Financial Technologies LLC are
-REM Copyright (C) 2007-2009 Cohesive Financial Technologies
+REM Copyright (C) 2007-2010 Cohesive Financial Technologies
REM LLC. Portions created by Rabbit Technologies Ltd are Copyright
-REM (C) 2007-2009 Rabbit Technologies Ltd.
+REM (C) 2007-2010 Rabbit Technologies Ltd.
REM
REM All Rights Reserved.
REM
@@ -32,11 +32,17 @@ REM setlocal
-if "%RABBITMQ_NODENAME%"=="" (
+rem Preserve values that might contain exclamation marks before
+rem enabling delayed expansion
+set TDP0=%~dp0
+set STAR=%*
+setlocal enabledelayedexpansion
+
+if "!RABBITMQ_NODENAME!"=="" (
set RABBITMQ_NODENAME=rabbit
)
-if not exist "%ERLANG_HOME%\bin\erl.exe" (
+if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
echo ERLANG_HOME not set correctly.
@@ -48,6 +54,7 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" ( exit /B
)
-"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden %RABBITMQ_CTL_ERL_ARGS% -sname rabbitmqctl -s rabbit_control -nodename %RABBITMQ_NODENAME% -extra %*
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl -s rabbit_control -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
+endlocal
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 53edf8deef..c33582e30d 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -36,7 +36,7 @@ %% InitialTimeout supplied from init). After this timeout has %% occurred, hibernation will occur as normal. Upon awaking, a new %% current timeout value will be calculated. -%% +%% %% The purpose is that the gen_server2 takes care of adjusting the %% current timeout value such that the process will increase the %% timeout value repeatedly if it is unable to sleep for the @@ -57,7 +57,7 @@ %% being used. Instead it'll wait for the current timeout as described %% above. -%% All modifications are (C) 2009 LShift Ltd. +%% All modifications are (C) 2009-2010 LShift Ltd. %% ``The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in diff --git a/src/pg_local.erl b/src/pg_local.erl new file mode 100644 index 0000000000..fa41fe46b3 --- /dev/null +++ b/src/pg_local.erl @@ -0,0 +1,213 @@ +%% This file is a copy of pg2.erl from the R13B-3 Erlang/OTP +%% distribution, with the following modifications: +%% +%% 1) Process groups are node-local only. +%% +%% 2) Groups are created/deleted implicitly. +%% +%% 3) 'join' and 'leave' are asynchronous. +%% +%% 4) the type specs of the exported non-callback functions have been +%% extracted into a separate, guarded section, and rewritten in +%% old-style spec syntax, for better compatibility with older +%% versions of Erlang/OTP. The remaining type specs have been +%% removed. + +%% All modifications are (C) 2010 LShift Ltd. + +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1997-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +-module(pg_local). + +-export([join/2, leave/2, get_members/1]). +-export([sync/0]). %% intended for testing only; not part of official API +-export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2, + terminate/2]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(name() :: term()). + +-spec(start_link/0 :: () -> {'ok', pid()} | {'error', term()}). +-spec(start/0 :: () -> {'ok', pid()} | {'error', term()}). +-spec(join/2 :: (name(), pid()) -> 'ok'). +-spec(leave/2 :: (name(), pid()) -> 'ok'). +-spec(get_members/1 :: (name()) -> [pid()]). + +-spec(sync/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +%%% As of R13B03 monitors are used instead of links. + +%%% +%%% Exported functions +%%% + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +start() -> + ensure_started(). + +join(Name, Pid) when is_pid(Pid) -> + ensure_started(), + gen_server:cast(?MODULE, {join, Name, Pid}). + +leave(Name, Pid) when is_pid(Pid) -> + ensure_started(), + gen_server:cast(?MODULE, {leave, Name, Pid}). + +get_members(Name) -> + ensure_started(), + group_members(Name). + +sync() -> + ensure_started(), + gen_server:call(?MODULE, sync). + +%%% +%%% Callback functions from gen_server +%%% + +-record(state, {}). + +init([]) -> + pg_local_table = ets:new(pg_local_table, [ordered_set, protected, named_table]), + {ok, #state{}}. + +handle_call(sync, _From, S) -> + {reply, ok, S}; + +handle_call(Request, From, S) -> + error_logger:warning_msg("The pg_local server received an unexpected message:\n" + "handle_call(~p, ~p, _)\n", + [Request, From]), + {noreply, S}. + +handle_cast({join, Name, Pid}, S) -> + join_group(Name, Pid), + {noreply, S}; +handle_cast({leave, Name, Pid}, S) -> + leave_group(Name, Pid), + {noreply, S}; +handle_cast(_, S) -> + {noreply, S}. + +handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) -> + member_died(MonitorRef), + {noreply, S}; +handle_info(_, S) -> + {noreply, S}. + +terminate(_Reason, _S) -> + true = ets:delete(pg_local_table), + ok. + +%%% +%%% Local functions +%%% + +%%% One ETS table, pg_local_table, is used for bookkeeping. The type of the +%%% table is ordered_set, and the fast matching of partially +%%% instantiated keys is used extensively. +%%% +%%% {{ref, Pid}, MonitorRef, Counter} +%%% {{ref, MonitorRef}, Pid} +%%% Each process has one monitor. Counter is incremented when the +%%% Pid joins some group. +%%% {{member, Name, Pid}, _} +%%% Pid is a member of group Name, GroupCounter is incremented when the +%%% Pid joins the group Name. +%%% {{pid, Pid, Name}} +%%% Pid is a member of group Name. + +member_died(Ref) -> + [{{ref, Ref}, Pid}] = ets:lookup(pg_local_table, {ref, Ref}), + Names = member_groups(Pid), + _ = [leave_group(Name, P) || + Name <- Names, + P <- member_in_group(Pid, Name)], + ok. + +join_group(Name, Pid) -> + Ref_Pid = {ref, Pid}, + try _ = ets:update_counter(pg_local_table, Ref_Pid, {3, +1}) + catch _:_ -> + Ref = erlang:monitor(process, Pid), + true = ets:insert(pg_local_table, {Ref_Pid, Ref, 1}), + true = ets:insert(pg_local_table, {{ref, Ref}, Pid}) + end, + Member_Name_Pid = {member, Name, Pid}, + try _ = ets:update_counter(pg_local_table, Member_Name_Pid, {2, +1}) + catch _:_ -> + true = ets:insert(pg_local_table, {Member_Name_Pid, 1}), + true = ets:insert(pg_local_table, {{pid, Pid, Name}}) + end. + +leave_group(Name, Pid) -> + Member_Name_Pid = {member, Name, Pid}, + try ets:update_counter(pg_local_table, Member_Name_Pid, {2, -1}) of + N -> + if + N =:= 0 -> + true = ets:delete(pg_local_table, {pid, Pid, Name}), + true = ets:delete(pg_local_table, Member_Name_Pid); + true -> + ok + end, + Ref_Pid = {ref, Pid}, + case ets:update_counter(pg_local_table, Ref_Pid, {3, -1}) of + 0 -> + [{Ref_Pid,Ref,0}] = ets:lookup(pg_local_table, Ref_Pid), + true = ets:delete(pg_local_table, {ref, Ref}), + true = ets:delete(pg_local_table, Ref_Pid), + true = erlang:demonitor(Ref, [flush]), + ok; + _ -> + ok + end + catch _:_ -> + ok + end. + +group_members(Name) -> + [P || + [P, N] <- ets:match(pg_local_table, {{member, Name, '$1'},'$2'}), + _ <- lists:seq(1, N)]. + +member_in_group(Pid, Name) -> + [{{member, Name, Pid}, N}] = ets:lookup(pg_local_table, {member, Name, Pid}), + lists:duplicate(N, Pid). + +member_groups(Pid) -> + [Name || [Name] <- ets:match(pg_local_table, {{pid, Pid, '$1'}})]. + +ensure_started() -> + case whereis(?MODULE) of + undefined -> + C = {pg_local, {?MODULE, start_link, []}, permanent, + 1000, worker, [?MODULE]}, + supervisor:start_child(kernel_safe_sup, C); + PgLocalPid -> + {ok, PgLocalPid} + end. diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 74b41a910c..1e481ca718 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit.erl b/src/rabbit.erl index 7a0ec89a1f..ac883a1b45 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -51,17 +51,17 @@ -rabbit_boot_step({database, [{mfa, {rabbit_mnesia, init, []}}, - {pre, kernel_ready}]}). + {enables, kernel_ready}]}). -rabbit_boot_step({rabbit_log, [{description, "logging server"}, {mfa, {rabbit_sup, start_child, [rabbit_log]}}, - {pre, kernel_ready}]}). + {enables, kernel_ready}]}). -rabbit_boot_step({rabbit_hooks, [{description, "internal event notification system"}, {mfa, {rabbit_hooks, start, []}}, - {pre, kernel_ready}]}). + {enables, kernel_ready}]}). -rabbit_boot_step({kernel_ready, [{description, "kernel ready"}]}). @@ -69,27 +69,27 @@ -rabbit_boot_step({rabbit_alarm, [{description, "alarm handler"}, {mfa, {rabbit_alarm, start, []}}, - {post, kernel_ready}, - {pre, core_initialized}]}). + {requires, kernel_ready}, + {enables, core_initialized}]}). -rabbit_boot_step({rabbit_amqqueue_sup, [{description, "queue supervisor"}, {mfa, {rabbit_amqqueue, start, []}}, - {post, kernel_ready}, - {pre, core_initialized}]}). + {requires, kernel_ready}, + {enables, core_initialized}]}). -rabbit_boot_step({rabbit_router, [{description, "cluster router"}, {mfa, {rabbit_sup, start_child, [rabbit_router]}}, - {post, kernel_ready}, - {pre, core_initialized}]}). + {requires, kernel_ready}, + {enables, core_initialized}]}). -rabbit_boot_step({rabbit_node_monitor, [{description, "node monitor"}, {mfa, {rabbit_sup, start_child, [rabbit_node_monitor]}}, - {post, kernel_ready}, - {post, rabbit_amqqueue_sup}, - {pre, core_initialized}]}). + {requires, kernel_ready}, + {requires, rabbit_amqqueue_sup}, + {enables, core_initialized}]}). -rabbit_boot_step({core_initialized, [{description, "core initialized"}]}). @@ -97,27 +97,27 @@ -rabbit_boot_step({empty_db_check, [{description, "empty DB check"}, {mfa, {?MODULE, maybe_insert_default_data, []}}, - {post, core_initialized}]}). + {requires, core_initialized}]}). -rabbit_boot_step({exchange_recovery, [{description, "exchange recovery"}, {mfa, {rabbit_exchange, recover, []}}, - {post, empty_db_check}]}). + {requires, empty_db_check}]}). -rabbit_boot_step({queue_recovery, [{description, "queue recovery"}, {mfa, {rabbit_amqqueue, recover, []}}, - {post, exchange_recovery}]}). + {requires, exchange_recovery}]}). -rabbit_boot_step({persister, [{mfa, {rabbit_sup, start_child, [rabbit_persister]}}, - {post, queue_recovery}]}). + {requires, queue_recovery}]}). -rabbit_boot_step({guid_generator, [{description, "guid generator"}, {mfa, {rabbit_sup, start_child, [rabbit_guid]}}, - {post, persister}, - {pre, routing_ready}]}). + {requires, persister}, + {enables, routing_ready}]}). -rabbit_boot_step({routing_ready, [{description, "message delivery logic ready"}]}). @@ -125,12 +125,12 @@ -rabbit_boot_step({log_relay, [{description, "error log relay"}, {mfa, {rabbit_error_logger, boot, []}}, - {post, routing_ready}]}). + {requires, routing_ready}]}). -rabbit_boot_step({networking, [{mfa, {rabbit_networking, boot, []}}, - {post, log_relay}, - {pre, networking_listening}]}). + {requires, log_relay}, + {enables, networking_listening}]}). -rabbit_boot_step({networking_listening, [{description, "network listeners available"}]}). @@ -246,9 +246,9 @@ run_boot_step({StepName, Attributes}) -> end, case [MFA || {mfa, MFA} <- Attributes] of [] -> - io:format("progress -- ~s~n", [Description]); + io:format("-- ~s~n", [Description]); MFAs -> - io:format("starting ~-40s ...", [Description]), + io:format("starting ~-60s ...", [Description]), [case catch apply(M,F,A) of {'EXIT', Reason} -> boot_error("FAILED~nReason: ~p~n", [Reason]); @@ -286,9 +286,9 @@ sort_boot_steps(UnsortedSteps) -> %% Add edges, detecting cycles and missing vertices. lists:foreach(fun ({StepName, Attributes}) -> [add_boot_step_dep(G, StepName, PrecedingStepName) - || {post, PrecedingStepName} <- Attributes], + || {requires, PrecedingStepName} <- Attributes], [add_boot_step_dep(G, SucceedingStepName, StepName) - || {pre, SucceedingStepName} <- Attributes] + || {enables, SucceedingStepName} <- Attributes] end, UnsortedSteps), %% Use topological sort to find a consistent ordering (if there is diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index eda747b287..23b84afb82 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 534409aaea..3b9eeec18a 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 58312253ea..2611d189ba 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -36,7 +36,8 @@ -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]). --export([list/1, info/1, info/2, info_all/1, info_all/2]). +-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). +-export([consumers/1, consumers_all/1]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2, unblock/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). @@ -68,10 +69,14 @@ -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). -spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A). -spec(list/1 :: (vhost()) -> [amqqueue()]). +-spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (amqqueue()) -> [info()]). -spec(info/2 :: (amqqueue(), [info_key()]) -> [info()]). -spec(info_all/1 :: (vhost()) -> [[info()]]). -spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). +-spec(consumers/1 :: (amqqueue()) -> [{pid(), ctag(), boolean()}]). +-spec(consumers_all/1 :: + (vhost()) -> [{queue_name(), pid(), ctag(), boolean()}]). -spec(stat/1 :: (amqqueue()) -> qstats()). -spec(stat_all/0 :: () -> [qstats()]). -spec(delete/3 :: @@ -93,7 +98,8 @@ -spec(basic_get/3 :: (amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), msg()} | 'empty'). -spec(basic_consume/8 :: - (amqqueue(), boolean(), pid(), pid(), pid(), ctag(), boolean(), any()) -> + (amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(), + boolean(), any()) -> 'ok' | {'error', 'queue_owned_by_another_connection' | 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). @@ -236,6 +242,8 @@ list(VHostPath) -> rabbit_queue, #amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}). +info_keys() -> rabbit_amqqueue_process:info_keys(). + map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> @@ -251,6 +259,16 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). +consumers(#amqqueue{ pid = QPid }) -> + gen_server2:pcall(QPid, 9, consumers, infinity). + +consumers_all(VHostPath) -> + lists:concat( + map(VHostPath, + fun (Q) -> [{Q#amqqueue.name, ChPid, ConsumerTag, AckRequired} || + {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] + end)). + stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity). stat_all() -> @@ -280,7 +298,7 @@ requeue(QPid, MsgIds, ChPid) -> gen_server2:call(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}). + gen_server2:pcast(QPid, 8, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> safe_pmap_ok( @@ -307,13 +325,13 @@ limit_all(QPids, ChPid, LimiterPid) -> fun (_) -> ok end, fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end, QPids). - + basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity). basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, infinity). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 15b9d05739..6c5f4757dc 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -39,7 +39,7 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). --export([start_link/1]). +-export([start_link/1, info_keys/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). @@ -76,6 +76,9 @@ auto_delete, arguments, pid, + owner_pid, + exclusive_consumer_pid, + exclusive_consumer_tag, messages_ready, messages_unacknowledged, messages_uncommitted, @@ -84,12 +87,13 @@ consumers, transactions, memory]). - + %%---------------------------------------------------------------------------- -start_link(Q) -> - gen_server2:start_link(?MODULE, Q, []). +start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). +info_keys() -> ?INFO_KEYS. + %%---------------------------------------------------------------------------- init(Q) -> @@ -168,13 +172,12 @@ record_current_channel_tx(ChPid, Txn) -> %% as a side effect this also starts monitoring the channel (if %% that wasn't happening already) store_ch_record((ch_record(ChPid))#cr{txn = Txn}). - -deliver_immediately(Message, Delivered, + +deliver_immediately(Message, IsDelivered, State = #q{q = #amqqueue{name = QName}, active_consumers = ActiveConsumers, blocked_consumers = BlockedConsumers, next_msg_id = NextId}) -> - ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), case queue:out(ActiveConsumers) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, @@ -186,7 +189,7 @@ deliver_immediately(Message, Delivered, true -> rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, Delivered, Message}), + {QName, self(), NextId, IsDelivered, Message}), NewUAM = case AckRequired of true -> dict:store(NextId, Message, UAM); false -> UAM @@ -217,7 +220,7 @@ deliver_immediately(Message, Delivered, ActiveConsumers, BlockedConsumers), deliver_immediately( - Message, Delivered, + Message, IsDelivered, State#q{active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedConsumers}) end; @@ -225,6 +228,26 @@ deliver_immediately(Message, Delivered, {not_offered, State} end. +run_message_queue(State = #q{message_buffer = MessageBuffer}) -> + run_message_queue(MessageBuffer, State). + +run_message_queue(MessageBuffer, State) -> + case queue:out(MessageBuffer) of + {{value, {Message, IsDelivered}}, BufferTail} -> + case deliver_immediately(Message, IsDelivered, State) of + {offered, true, NewState} -> + persist_delivery(qname(State), Message, IsDelivered), + run_message_queue(BufferTail, NewState); + {offered, false, NewState} -> + persist_auto_ack(qname(State), Message), + run_message_queue(BufferTail, NewState); + {not_offered, NewState} -> + NewState#q{message_buffer = MessageBuffer} + end; + {empty, _} -> + State#q{message_buffer = MessageBuffer} + end. + attempt_delivery(none, _ChPid, Message, State) -> case deliver_immediately(Message, false, State) of {offered, false, State1} -> @@ -252,8 +275,8 @@ deliver_or_enqueue(Txn, ChPid, Message, State) -> end. deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) -> - run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)), - State). + run_message_queue(queue:join(MessageBuffer, queue:from_list(Messages)), + State). add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). @@ -287,44 +310,44 @@ possibly_unblock(State, ChPid, Update) -> move_consumers(ChPid, State#q.blocked_consumers, State#q.active_consumers), - run_poke_burst( + run_message_queue( State#q{active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedConsumers}) end end. - + should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> case lookup_ch(DownPid) of - not_found -> {ok, State}; + not_found -> + {ok, State}; #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn, unacked_messages = UAM} -> erlang:demonitor(MonitorRef), erase({ch, ChPid}), - case Txn of - none -> ok; - _ -> ok = rollback_work(Txn, qname(State)), - erase_tx(Txn) - end, - NewState = - deliver_or_enqueue_n( - [{Message, true} || - {_Messsage_id, Message} <- dict:to_list(UAM)], - State#q{ - exclusive_consumer = case Holder of - {ChPid, _} -> none; - Other -> Other - end, - active_consumers = remove_consumers( - ChPid, State#q.active_consumers), - blocked_consumers = remove_consumers( - ChPid, State#q.blocked_consumers)}), - case should_auto_delete(NewState) of - false -> {ok, NewState}; - true -> {stop, NewState} + State1 = State#q{ + exclusive_consumer = case Holder of + {ChPid, _} -> none; + Other -> Other + end, + active_consumers = remove_consumers( + ChPid, State#q.active_consumers), + blocked_consumers = remove_consumers( + ChPid, State#q.blocked_consumers)}, + case should_auto_delete(State1) of + true -> {stop, State1}; + false -> case Txn of + none -> ok; + _ -> ok = rollback_work(Txn, qname(State1)), + erase_tx(Txn) + end, + {ok, deliver_or_enqueue_n( + [{Message, true} || + {_MsgId, Message} <- dict:to_list(UAM)], + State1)} end end. @@ -343,26 +366,6 @@ check_exclusive_access(none, true, State) -> false -> in_use end. -run_poke_burst(State = #q{message_buffer = MessageBuffer}) -> - run_poke_burst(MessageBuffer, State). - -run_poke_burst(MessageBuffer, State) -> - case queue:out(MessageBuffer) of - {{value, {Message, Delivered}}, BufferTail} -> - case deliver_immediately(Message, Delivered, State) of - {offered, true, NewState} -> - persist_delivery(qname(State), Message, Delivered), - run_poke_burst(BufferTail, NewState); - {offered, false, NewState} -> - persist_auto_ack(qname(State), Message), - run_poke_burst(BufferTail, NewState); - {not_offered, NewState} -> - NewState#q{message_buffer = MessageBuffer} - end; - {empty, _} -> - State#q{message_buffer = MessageBuffer} - end. - is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso queue:is_empty(State#q.blocked_consumers). @@ -385,10 +388,10 @@ persist_delivery(_QName, _Message, true) -> ok; persist_delivery(_QName, #basic_message{persistent_key = none}, - _Delivered) -> + _IsDelivered) -> ok; persist_delivery(QName, #basic_message{persistent_key = PKey}, - _Delivered) -> + _IsDelivered) -> persist_work(none, QName, [{deliver, {QName, PKey}}]). persist_acks(Txn, QName, Messages) -> @@ -451,7 +454,7 @@ all_tx() -> mark_tx_persistent(Txn) -> Tx = lookup_tx(Txn), store_tx(Txn, Tx#tx{is_persistent = true}). - + is_tx_persistent(Txn) -> #tx{is_persistent = Res} = lookup_tx(Txn), Res. @@ -488,11 +491,11 @@ collect_messages(MsgIds, UAM) -> purge_message_buffer(QName, MessageBuffer) -> Messages = - [[Message || {Message, _Delivered} <- + [[Message || {Message, _IsDelivered} <- queue:to_list(MessageBuffer)] | lists:map( fun (#cr{unacked_messages = UAM}) -> - [Message || {_MessageId, Message} <- dict:to_list(UAM)] + [Message || {_MsgId, Message} <- dict:to_list(UAM)] end, all_ch_record())], %% the simplest, though certainly not the most obvious or @@ -508,6 +511,18 @@ i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete; i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments; i(pid, _) -> self(); +i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) -> + ''; +i(owner_pid, #q{q = #amqqueue{exclusive_owner = ReaderPid}}) -> + ReaderPid; +i(exclusive_consumer_pid, #q{exclusive_consumer = none}) -> + ''; +i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) -> + ChPid; +i(exclusive_consumer_tag, #q{exclusive_consumer = none}) -> + ''; +i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) -> + ConsumerTag; i(messages_ready, #q{message_buffer = MessageBuffer}) -> queue:len(MessageBuffer); i(messages_unacknowledged, _) -> @@ -544,6 +559,15 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; +handle_call(consumers, _From, + State = #q{active_consumers = ActiveConsumers, + blocked_consumers = BlockedConsumers}) -> + reply(rabbit_misc:queue_fold( + fun ({ChPid, #consumer{tag = ConsumerTag, + ack_required = AckRequired}}, Acc) -> + [{ChPid, ConsumerTag, AckRequired} | Acc] + end, [], queue:join(ActiveConsumers, BlockedConsumers)), State); + handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% Synchronous, "immediate" delivery mode %% @@ -590,18 +614,18 @@ handle_call({basic_get, ChPid, NoAck}, _From, next_msg_id = NextId, message_buffer = MessageBuffer}) -> case queue:out(MessageBuffer) of - {{value, {Message, Delivered}}, BufferTail} -> + {{value, {Message, IsDelivered}}, BufferTail} -> AckRequired = not(NoAck), case AckRequired of true -> - persist_delivery(QName, Message, Delivered), + persist_delivery(QName, Message, IsDelivered), C = #cr{unacked_messages = UAM} = ch_record(ChPid), NewUAM = dict:store(NextId, Message, UAM), store_ch_record(C#cr{unacked_messages = NewUAM}); false -> persist_auto_ack(QName, Message) end, - Msg = {QName, self(), NextId, Delivered, Message}, + Msg = {QName, self(), NextId, IsDelivered, Message}, reply({ok, queue:len(BufferTail), Msg}, State#q{message_buffer = BufferTail, next_msg_id = NextId + 1}); @@ -611,8 +635,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, - _From, State = #q{q = #amqqueue{exclusive_owner = Owner}, - exclusive_consumer = ExistingHolder}) -> + _From, State = #q{exclusive_consumer = ExistingHolder}) -> case check_exclusive_access(ExistingHolder, ExclusiveConsume, State) of in_use -> @@ -623,15 +646,14 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ack_required = not(NoAck)}, store_ch_record(C#cr{consumer_count = ConsumerCount +1, limiter_pid = LimiterPid}), - if ConsumerCount == 0 -> - ok = rabbit_limiter:register(LimiterPid, self()); - true -> - ok + case ConsumerCount of + 0 -> rabbit_limiter:register(LimiterPid, self()); + _ -> ok end, - ExclusiveConsumer = - if ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> ExistingHolder - end, + ExclusiveConsumer = case ExclusiveConsume of + true -> {ChPid, ConsumerTag}; + false -> ExistingHolder + end, State1 = State#q{has_had_consumers = true, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), @@ -642,7 +664,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, add_consumer( ChPid, Consumer, State1#q.blocked_consumers)}; - false -> run_poke_burst( + false -> run_message_queue( State1#q{ active_consumers = add_consumer( @@ -660,10 +682,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, reply(ok, State); C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} -> store_ch_record(C#cr{consumer_count = ConsumerCount - 1}), - if ConsumerCount == 1 -> - ok = rabbit_limiter:unregister(LimiterPid, self()); - true -> - ok + case ConsumerCount of + 1 -> ok = rabbit_limiter:unregister(LimiterPid, self()); + _ -> ok end, ok = maybe_send_reply(ChPid, OkMsg), NewState = @@ -685,8 +706,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, message_buffer = MessageBuffer, active_consumers = ActiveConsumers}) -> - reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)}, - State); + Length = queue:len(MessageBuffer), + reply({ok, Name, Length, queue:len(ActiveConsumers)}, State); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{message_buffer = MessageBuffer}) -> diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 46d23a4075..0f3a86646a 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index bec2cd0845..9ebb6e72e0 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -46,7 +46,7 @@ -spec(publish/1 :: (delivery()) -> publish_result()). -spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) -> - delivery()). + delivery()). -spec(message/4 :: (exchange_name(), routing_key(), properties_input(), binary()) -> message()). -spec(properties/1 :: (properties_input()) -> amqp_properties()). diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 01ac4f027f..1d47d7640e 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -46,6 +46,7 @@ build_heartbeat_frame/0]). -export([generate_table/1, encode_properties/2]). -export([check_empty_content_body_frame_size/0]). +-export([ensure_content_encoded/1, clear_encoded_content/1]). -import(lists). @@ -60,9 +61,11 @@ -spec(build_simple_content_frames/3 :: (channel_number(), content(), non_neg_integer()) -> [frame()]). -spec(build_heartbeat_frame/0 :: () -> frame()). --spec(generate_table/1 :: (amqp_table()) -> binary()). +-spec(generate_table/1 :: (amqp_table()) -> binary()). -spec(encode_properties/2 :: ([amqp_property_type()], [any()]) -> binary()). -spec(check_empty_content_body_frame_size/0 :: () -> 'ok'). +-spec(ensure_content_encoded/1 :: (content()) -> encoded_content()). +-spec(clear_encoded_content/1 :: (content()) -> unencoded_content()). -endif. @@ -193,12 +196,16 @@ generate_array(Array) when is_list(Array) -> fun ({Type, Value}) -> field_value_to_binary(Type, Value) end, Array)). -short_string_to_binary(String) when is_binary(String) and (size(String) < 256) -> - [<<(size(String)):8>>, String]; +short_string_to_binary(String) when is_binary(String) -> + Len = size(String), + if Len < 256 -> [<<(size(String)):8>>, String]; + true -> exit(content_properties_shortstr_overflow) + end; short_string_to_binary(String) -> StringLength = length(String), - true = (StringLength < 256), % assertion - [<<StringLength:8>>, String]. + if StringLength < 256 -> [<<StringLength:8>>, String]; + true -> exit(content_properties_shortstr_overflow) + end. long_string_to_binary(String) when is_binary(String) -> [<<(size(String)):32>>, String]; @@ -236,7 +243,10 @@ encode_properties(Bit, [T | TypeList], [Value | ValueList], FirstShortAcc, Flags end. encode_property(shortstr, String) -> - Len = size(String), <<Len:8/unsigned, String:Len/binary>>; + Len = size(String), + if Len < 256 -> <<Len:8/unsigned, String:Len/binary>>; + true -> exit(content_properties_shortstr_overflow) + end; encode_property(longstr, String) -> Len = size(String), <<Len:32/unsigned, String:Len/binary>>; encode_property(octet, Int) -> @@ -262,3 +272,19 @@ check_empty_content_body_frame_size() -> exit({incorrect_empty_content_body_frame_size, ComputedSize, ?EMPTY_CONTENT_BODY_FRAME_SIZE}) end. + +ensure_content_encoded(Content = #content{properties_bin = PropsBin}) + when PropsBin =/= 'none' -> + Content; +ensure_content_encoded(Content = #content{properties = Props}) -> + Content #content{properties_bin = rabbit_framing:encode_properties(Props)}. + +clear_encoded_content(Content = #content{properties_bin = none}) -> + Content; +clear_encoded_content(Content = #content{properties = none}) -> + %% Only clear when we can rebuild the properties_bin later in + %% accordance to the content record definition comment - maximum + %% one of properties and properties_bin can be 'none' + Content; +clear_encoded_content(Content = #content{}) -> + Content#content{properties_bin = none}. diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index 506e87ecb1..e022a1fafe 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -139,7 +139,7 @@ parse_properties(Bit, [Type | TypeListRest], Acc, FirstShort, end, parse_properties(Bit + 1, TypeListRest, [Value | Acc], FirstShort, Remainder, Rest). - + parse_property(shortstr, <<Len:8/unsigned, String:Len/binary, Rest/binary>>) -> {String, Rest}; parse_property(longstr, <<Len:32/unsigned, String:Len/binary, Rest/binary>>) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e16be941a4..ddb941cffe 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -37,8 +37,10 @@ -export([start_link/5, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2]). +-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). +-export([init/1, terminate/2, code_change/3, + handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). -record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, @@ -46,10 +48,23 @@ username, virtual_host, most_recently_declared_queue, consumer_mapping}). --define(HIBERNATE_AFTER, 1000). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). -define(MAX_PERMISSION_CACHE_SIZE, 12). +-define(INFO_KEYS, + [pid, + connection, + number, + user, + vhost, + transactional, + consumer_count, + messages_unacknowledged, + acks_uncommitted, + prefetch_count]). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -62,6 +77,12 @@ -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). +-spec(list/0 :: () -> [pid()]). +-spec(info_keys/0 :: () -> [info_key()]). +-spec(info/1 :: (pid()) -> [info()]). +-spec(info/2 :: (pid(), [info_key()]) -> [info()]). +-spec(info_all/0 :: () -> [[info()]]). +-spec(info_all/1 :: ([info_key()]) -> [[info()]]). -endif. @@ -91,12 +112,33 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> conserve_memory(Pid, Conserve) -> gen_server2:pcast(Pid, 9, {conserve_memory, Conserve}). +list() -> + pg_local:get_members(rabbit_channels). + +info_keys() -> ?INFO_KEYS. + +info(Pid) -> + gen_server2:pcall(Pid, 9, info, infinity). + +info(Pid, Items) -> + case gen_server2:pcall(Pid, 9, {info, Items}, infinity) of + {ok, Res} -> Res; + {error, Error} -> throw(Error) + end. + +info_all() -> + rabbit_misc:filter_exit_map(fun (C) -> info(C) end, list()). + +info_all(Items) -> + rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + ok = pg_local:join(rabbit_channels, self()), {ok, #ch{state = starting, channel = Channel, reader_pid = ReaderPid, @@ -110,7 +152,18 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) -> username = Username, virtual_host = VHost, most_recently_declared_queue = <<>>, - consumer_mapping = dict:new()}}. + consumer_mapping = dict:new()}, + hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call(info, _From, State) -> + reply(infos(?INFO_KEYS, State), State); + +handle_call({info, Items}, _From, State) -> + try + reply({ok, infos(Items, State)}, State) + catch Error -> reply({error, Error}, State) + end; handle_call(_Request, _From, State) -> noreply(State). @@ -162,33 +215,31 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, {stop, normal, State}; handle_info({'EXIT', _Pid, Reason}, State) -> - {stop, Reason, State}; + {stop, Reason, State}. -handle_info(timeout, State) -> +handle_pre_hibernate(State) -> ok = clear_permission_cache(), - {noreply, State, hibernate}. + {hibernate, State}. -terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid, - state = terminating}) -> - rabbit_writer:shutdown(WriterPid), - rabbit_limiter:shutdown(LimiterPid); +terminate(_Reason, State = #ch{state = terminating}) -> + terminate(State); -terminate(Reason, State = #ch{writer_pid = WriterPid, - limiter_pid = LimiterPid}) -> +terminate(Reason, State) -> Res = rollback_and_notify(State), case Reason of normal -> ok = Res; _ -> ok end, - rabbit_writer:shutdown(WriterPid), - rabbit_limiter:shutdown(LimiterPid). + terminate(State). code_change(_OldVsn, State, _Extra) -> {ok, State}. %%--------------------------------------------------------------------------- -noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. +reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}. + +noreply(NewState) -> {noreply, NewState, hibernate}. return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -488,16 +539,18 @@ handle_method(#'basic.qos'{global = true}, _, _State) -> rabbit_misc:protocol_error(not_implemented, "global=true", []); handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> - rabbit_misc:protocol_error(not_implemented, + rabbit_misc:protocol_error(not_implemented, "prefetch_size!=0 (~w)", [Size]); handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, - _, State = #ch{ limiter_pid = LimiterPid }) -> + _, State = #ch{ limiter_pid = LimiterPid, + unacked_message_q = UAMQ }) -> NewLimiterPid = case {LimiterPid, PrefetchCount} of {undefined, 0} -> undefined; {undefined, _} -> - LPid = rabbit_limiter:start_link(self()), + LPid = rabbit_limiter:start_link(self(), + queue:len(UAMQ)), ok = limit_queues(LPid, State), LPid; {_, 0} -> @@ -530,24 +583,24 @@ handle_method(#'basic.recover_async'{requeue = false}, _, State = #ch{ transaction_id = none, writer_pid = WriterPid, unacked_message_q = UAMQ }) -> - lists:foreach( - fun ({_DeliveryTag, none, _Msg}) -> - %% Was sent as a basic.get_ok. Don't redeliver - %% it. FIXME: appropriate? - ok; - ({DeliveryTag, ConsumerTag, - {QName, QPid, MsgId, _Redelivered, Message}}) -> - %% Was sent as a proper consumer delivery. Resend it as - %% before. - %% - %% FIXME: What should happen if the consumer's been - %% cancelled since? - %% - %% FIXME: should we allocate a fresh DeliveryTag? - ok = internal_deliver( + ok = rabbit_misc:queue_fold( + fun ({_DeliveryTag, none, _Msg}, ok) -> + %% Was sent as a basic.get_ok. Don't redeliver + %% it. FIXME: appropriate? + ok; + ({DeliveryTag, ConsumerTag, + {QName, QPid, MsgId, _Redelivered, Message}}, ok) -> + %% Was sent as a proper consumer delivery. Resend + %% it as before. + %% + %% FIXME: What should happen if the consumer's been + %% cancelled since? + %% + %% FIXME: should we allocate a fresh DeliveryTag? + internal_deliver( WriterPid, false, ConsumerTag, DeliveryTag, {QName, QPid, MsgId, true, Message}) - end, queue:to_list(UAMQ)), + end, queue:to_list(UAMQ)), %% No answer required - basic.recover is the newer, synchronous %% variant of this method {noreply, State}; @@ -785,9 +838,9 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{ virtual_host = VHostPath, reader_pid = ReaderPid }) -> - %% FIXME: connection exception (!) on failure?? + %% FIXME: connection exception (!) on failure?? %% (see rule named "failure" in spec-XML) - %% FIXME: don't allow binding to internal exchanges - + %% FIXME: don't allow binding to internal exchanges - %% including the one named "" ! QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_write_permitted(QueueName, State), @@ -902,7 +955,7 @@ rollback_and_notify(State) -> notify_queues(internal_rollback(State)). fold_per_queue(F, Acc0, UAQ) -> - D = lists:foldl( + D = rabbit_misc:queue_fold( fun ({_DTag, _CTag, {_QName, QPid, MsgId, _Redelivered, _Message}}, D) -> %% dict:append would be simpler and avoid the @@ -913,7 +966,7 @@ fold_per_queue(F, Acc0, UAQ) -> fun (MsgIds) -> [MsgId | MsgIds] end, [MsgId], D) - end, dict:new(), queue:to_list(UAQ)), + end, dict:new(), UAQ), dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). @@ -924,7 +977,7 @@ limit_queues(LPid, #ch{consumer_mapping = Consumers}) -> rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid). consumer_queues(Consumers) -> - [QPid || QueueName <- + [QPid || QueueName <- sets:to_list( dict:fold(fun (_ConsumerTag, QueueName, S) -> sets:add_element(QueueName, S) @@ -942,9 +995,9 @@ consumer_queues(Consumers) -> notify_limiter(undefined, _Acked) -> ok; notify_limiter(LimiterPid, Acked) -> - case lists:foldl(fun ({_, none, _}, Acc) -> Acc; - ({_, _, _}, Acc) -> Acc + 1 - end, 0, queue:to_list(Acked)) of + case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, Acked) of 0 -> ok; Count -> rabbit_limiter:ack(LimiterPid, Count) end. @@ -981,3 +1034,28 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, WriterPid, QPid, self(), M, Content); false -> rabbit_writer:send_command(WriterPid, M, Content) end. + +terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> + pg_local:leave(rabbit_channels, self()), + rabbit_writer:shutdown(WriterPid), + rabbit_limiter:shutdown(LimiterPid). + +infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. + +i(pid, _) -> self(); +i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid; +i(number, #ch{channel = Channel}) -> Channel; +i(user, #ch{username = Username}) -> Username; +i(vhost, #ch{virtual_host = VHost}) -> VHost; +i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; +i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> + dict:size(ConsumerMapping); +i(messages_unacknowledged, #ch{unacked_message_q = UAMQ, + uncommitted_ack_q = UAQ}) -> + queue:len(UAMQ) + queue:len(UAQ); +i(acks_uncommitted, #ch{uncommitted_ack_q = UAQ}) -> + queue:len(UAQ); +i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> + rabbit_limiter:get_limit(LimiterPid); +i(Item, _) -> + throw({bad_argument, Item}). diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 652bb8968d..2c6bd5ae64 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -54,7 +54,7 @@ start() -> {ok, [[NodeStr|_]|_]} = init:get_argument(nodename), FullCommand = init:get_plain_arguments(), - #params{quiet = Quiet, node = Node, command = Command, args = Args} = + #params{quiet = Quiet, node = Node, command = Command, args = Args} = parse_args(FullCommand, #params{quiet = false, node = rabbit_misc:makenode(NodeStr)}), Inform = case Quiet of @@ -81,6 +81,9 @@ start() -> {error, Reason} -> error("~p", [Reason]), halt(2); + {badrpc, {'EXIT', Reason}} -> + error("~p", [Reason]), + halt(2); {badrpc, Reason} -> error("unable to connect to node ~w: ~w", [Node, Reason]), print_badrpc_diagnostics(Node), @@ -139,6 +142,7 @@ Available commands: cluster <ClusterNode> ... status rotate_logs [Suffix] + close_connection <ConnectionPid> <ExplanationString> add_user <UserName> <Password> delete_user <UserName> @@ -156,11 +160,13 @@ Available commands: list_queues [-p <VHostPath>] [<QueueInfoItem> ...] list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...] - list_bindings [-p <VHostPath>] + list_bindings [-p <VHostPath>] list_connections [<ConnectionInfoItem> ...] + list_channels [<ChannelInfoItem> ...] + list_consumers [-p <VHostPath>] -Quiet output mode is selected with the \"-q\" flag. Informational messages -are suppressed when quiet mode is in effect. +Quiet output mode is selected with the \"-q\" flag. Informational +messages are suppressed when quiet mode is in effect. <node> should be the name of the master node of the RabbitMQ cluster. It defaults to the node named \"rabbit\" on the local @@ -169,24 +175,39 @@ usually be rabbit@server (unless RABBITMQ_NODENAME has been set to some non-default value at broker startup time). The output of hostname -s is usually the correct suffix to use after the \"@\" sign. -The list_queues, list_exchanges and list_bindings commands accept an optional -virtual host parameter for which to display results. The default value is \"/\". +The list_queues, list_exchanges and list_bindings commands accept an +optional virtual host parameter for which to display results. The +default value is \"/\". -<QueueInfoItem> must be a member of the list [name, durable, auto_delete, -arguments, pid, messages_ready, messages_unacknowledged, messages_uncommitted, -messages, acks_uncommitted, consumers, transactions, memory]. The default is - to display name and (number of) messages. +<QueueInfoItem> must be a member of the list [name, durable, +auto_delete, arguments, pid, owner_pid, exclusive_consumer_pid, +exclusive_consumer_tag, messages_ready, messages_unacknowledged, +messages_uncommitted, messages, acks_uncommitted, consumers, +transactions, memory]. The default is to display name and (number of) +messages. <ExchangeInfoItem> must be a member of the list [name, type, durable, arguments]. The default is to display name and type. -The output format for \"list_bindings\" is a list of rows containing +The output format for \"list_bindings\" is a list of rows containing exchange name, queue name, routing key and arguments, in that order. -<ConnectionInfoItem> must be a member of the list [pid, address, port, -peer_address, peer_port, state, channels, user, vhost, timeout, frame_max, -client_properties, recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. -The default is to display user, peer_address, peer_port and state. +<ConnectionInfoItem> must be a member of the list [pid, address, port, +peer_address, peer_port, state, channels, user, vhost, timeout, +frame_max, client_properties, recv_oct, recv_cnt, send_oct, send_cnt, +send_pend]. The default is to display user, peer_address, peer_port +and state. + +<ChannelInfoItem> must be a member of the list [pid, connection, +number, user, vhost, transactional, consumer_count, +messages_unacknowledged, acks_uncommitted, prefetch_count]. The +default is to display pid, user, transactional, consumer_count, +messages_unacknowledged. + +The output format for \"list_consumers\" is a list of rows containing, +in order, the queue name, channel process id, consumer tag, and a +boolean indicating whether acknowledgements are expected from the +consumer. "), halt(1). @@ -232,6 +253,11 @@ action(rotate_logs, Node, Args = [Suffix], Inform) -> Inform("Rotating logs to files with suffix ~p", [Suffix]), call(Node, {rabbit, rotate_logs, Args}); +action(close_connection, Node, [PidStr, Explanation], Inform) -> + Inform("Closing connection ~s", [PidStr]), + rpc_call(Node, rabbit_networking, close_connection, + [rabbit_misc:string_to_pid(PidStr), Explanation]); + action(add_user, Node, Args = [Username, _Password], Inform) -> Inform("Creating user ~p", [Username]), call(Node, {rabbit_access_control, add_user, Args}); @@ -287,9 +313,8 @@ action(list_bindings, Node, Args, Inform) -> InfoKeys = [exchange_name, queue_name, routing_key, args], display_info_list( [lists:zip(InfoKeys, tuple_to_list(X)) || - X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])], - InfoKeys), - ok; + X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])], + InfoKeys); action(list_connections, Node, Args, Inform) -> Inform("Listing connections", []), @@ -298,6 +323,22 @@ action(list_connections, Node, Args, Inform) -> [ArgAtoms]), ArgAtoms); +action(list_channels, Node, Args, Inform) -> + Inform("Listing channels", []), + ArgAtoms = default_if_empty(Args, [pid, user, transactional, consumer_count, + messages_unacknowledged]), + display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]), + ArgAtoms); + +action(list_consumers, Node, Args, Inform) -> + Inform("Listing consumers", []), + {VHostArg, _} = parse_vhost_flag_bin(Args), + InfoKeys = [queue_name, channel_pid, consumer_tag, ack_required], + display_info_list( + [lists:zip(InfoKeys, tuple_to_list(X)) || + X <- rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg])], + InfoKeys); + action(Command, Node, Args, Inform) -> {VHost, RemainingArgs} = parse_vhost_flag(Args), action(Command, Node, VHost, RemainingArgs, Inform). @@ -317,9 +358,9 @@ action(list_permissions, Node, VHost, [], Inform) -> [VHost]})). parse_vhost_flag(Args) when is_list(Args) -> - case Args of + case Args of ["-p", VHost | RemainingArgs] -> - {VHost, RemainingArgs}; + {VHost, RemainingArgs}; RemainingArgs -> {"/", RemainingArgs} end. @@ -329,9 +370,9 @@ parse_vhost_flag_bin(Args) -> {list_to_binary(VHost), RemainingArgs}. default_if_empty(List, Default) when is_list(List) -> - if List == [] -> - Default; - true -> + if List == [] -> + Default; + true -> [list_to_atom(X) || X <- List] end. @@ -355,8 +396,8 @@ format_info_item(Key, Items) -> is_tuple(Value) -> inet_parse:ntoa(Value); Value when is_pid(Value) -> - pid_to_string(Value); - Value when is_binary(Value) -> + rabbit_misc:pid_to_string(Value); + Value when is_binary(Value) -> escape(Value); Value when is_atom(Value) -> escape(atom_to_list(Value)); @@ -413,10 +454,3 @@ prettify_typed_amqp_value(Type, Value) -> array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value]; _ -> Value end. - -%% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and 8.7) -pid_to_string(Pid) -> - <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>> - = term_to_binary(Pid), - Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), - lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])). diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl index 23e6fc4432..078cf620f4 100644 --- a/src/rabbit_dialyzer.erl +++ b/src/rabbit_dialyzer.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 10ea84aacb..face0a1a21 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index 183b69844c..45b66712b8 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -56,7 +56,7 @@ init({{File, Suffix}, []}) -> init({{File, _}, error}) -> init(File); %% Used only when swapping handlers without performing -%% log rotation +%% log rotation init({File, []}) -> init(File); init({File, _Type} = FileInfo) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index c16396d357..bcba7d0b4c 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -35,7 +35,7 @@ -include("rabbit_framing.hrl"). -export([recover/0, declare/4, lookup/1, lookup_or_die/1, - list/1, info/1, info/2, info_all/1, info_all/2, + list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, publish/2]). -export([add_binding/5, delete_binding/5, list_bindings/1]). -export([delete/2]). @@ -70,6 +70,7 @@ -spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()). -spec(lookup_or_die/1 :: (exchange_name()) -> exchange()). -spec(list/1 :: (vhost()) -> [exchange()]). +-spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (exchange()) -> [info()]). -spec(info/2 :: (exchange(), [info_key()]) -> [info()]). -spec(info_all/1 :: (vhost()) -> [[info()]]). @@ -81,7 +82,7 @@ -spec(delete_binding/5 :: (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) -> bind_res() | {'error', 'binding_not_found'}). --spec(list_bindings/1 :: (vhost()) -> +-spec(list_bindings/1 :: (vhost()) -> [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). -spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok'). -spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok'). @@ -89,9 +90,9 @@ -spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()). -spec(delete/2 :: (exchange_name(), boolean()) -> 'ok' | not_found() | {'error', 'in_use'}). --spec(list_queue_bindings/1 :: (queue_name()) -> +-spec(list_queue_bindings/1 :: (queue_name()) -> [{exchange_name(), routing_key(), amqp_table()}]). --spec(list_exchange_bindings/1 :: (exchange_name()) -> +-spec(list_exchange_bindings/1 :: (exchange_name()) -> [{queue_name(), routing_key(), amqp_table()}]). -endif. @@ -194,6 +195,8 @@ list(VHostPath) -> rabbit_exchange, #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). +info_keys() -> ?INFO_KEYS. + map(VHostPath, F) -> %% TODO: there is scope for optimisation here, e.g. using a %% cursor, parallelising the function invocation @@ -349,7 +352,7 @@ delete_queue_bindings(QueueName, FwdDeleteFun) -> end || Route <- mnesia:match_object( rabbit_reverse_route, reverse_route( - #route{binding = #binding{queue_name = QueueName, + #route{binding = #binding{queue_name = QueueName, _ = '_'}}), write)], ok. @@ -453,7 +456,7 @@ list_bindings(VHostPath) -> [{ExchangeName, QueueName, RoutingKey, Arguments} || #route{binding = #binding{ exchange_name = ExchangeName, - key = RoutingKey, + key = RoutingKey, queue_name = QueueName, args = Arguments}} <- mnesia:dirty_match_object( @@ -614,7 +617,7 @@ list_exchange_bindings(ExchangeName) -> [{QueueName, RoutingKey, Arguments} || #route{binding = #binding{queue_name = QueueName, key = RoutingKey, - args = Arguments}} + args = Arguments}} <- mnesia:dirty_match_object(rabbit_route, Route)]. % Refactoring is left as an exercise for the reader @@ -624,5 +627,5 @@ list_queue_bindings(QueueName) -> [{ExchangeName, RoutingKey, Arguments} || #route{binding = #binding{exchange_name = ExchangeName, key = RoutingKey, - args = Arguments}} + args = Arguments}} <- mnesia:dirty_match_object(rabbit_route, Route)]. diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 5c447792a2..b7c6aa96fa 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -115,7 +115,7 @@ collect_content_payload(ChannelPid, RemainingByteCount, Acc) -> collect_content_payload(ChannelPid, RemainingByteCount - size(FragmentBin), [FragmentBin | Acc]); - _ -> + _ -> rabbit_misc:protocol_error( command_invalid, "expected content body, got non content body frame instead", diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index ea61a679e8..2fa531a7ce 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index ed0066fe07..4556570567 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_hooks.erl b/src/rabbit_hooks.erl index b3d271c28d..3fc84c1e09 100644 --- a/src/rabbit_hooks.erl +++ b/src/rabbit_hooks.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -61,8 +61,8 @@ unsubscribe(Hook, HandlerName) -> trigger(Hook, Args) -> Hooks = ets:lookup(?TableName, Hook), [case catch apply(M, F, [Hook, Name, Args | A]) of - {'EXIT', Reason} -> - rabbit_log:warning("Failed to execute handler ~p for hook ~p: ~p", + {'EXIT', Reason} -> + rabbit_log:warning("Failed to execute handler ~p for hook ~p: ~p", [Name, Hook, Reason]); _ -> ok end || {_, Name, {M, F, A}} <- Hooks], diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 087a9f64d9..c9f8183fc9 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -31,12 +31,13 @@ -module(rabbit_limiter). --behaviour(gen_server). +-behaviour(gen_server2). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --export([start_link/1, shutdown/1]). +-export([start_link/2, shutdown/1]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). +-export([get_limit/1]). %%---------------------------------------------------------------------------- @@ -44,13 +45,14 @@ -type(maybe_pid() :: pid() | 'undefined'). --spec(start_link/1 :: (pid()) -> pid()). +-spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). +-spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()). -endif. @@ -68,8 +70,8 @@ %% API %%---------------------------------------------------------------------------- -start_link(ChPid) -> - {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []), +start_link(ChPid, UnackedMsgCount) -> + {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []), Pid. shutdown(undefined) -> @@ -104,12 +106,19 @@ register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}). unregister(undefined, _QPid) -> ok; unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}). +get_limit(undefined) -> + 0; +get_limit(Pid) -> + rabbit_misc:with_exit_handler( + fun () -> 0 end, + fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- -init([ChPid]) -> - {ok, #lim{ch_pid = ChPid} }. +init([ChPid, UnackedMsgCount]) -> + {ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}. handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> @@ -118,7 +127,10 @@ handle_call({can_send, QPid, AckRequired}, _From, false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; true -> Volume end}} - end. + end; + +handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> + {reply, PrefetchCount, State}. handle_cast(shutdown, State) -> {stop, normal, State}; diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl index 6ef638cb59..4f467162e4 100644 --- a/src/rabbit_load.erl +++ b/src/rabbit_load.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index dd5b498b07..cc80e360ae 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index d927bfb1f9..0654f58ae7 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -55,7 +55,9 @@ -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). --export([unfold/2, ceil/1]). +-export([unfold/2, ceil/1, queue_fold/3]). +-export([pid_to_string/1, string_to_pid/1]). +-export([version_compare/2, version_compare/3]). -import(mnesia). -import(lists). @@ -126,6 +128,9 @@ -spec(stop_applications/1 :: ([atom()]) -> 'ok'). -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). -spec(ceil/1 :: (number()) -> number()). +-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B). +-spec(pid_to_string/1 :: (pid()) -> string()). +-spec(string_to_pid/1 :: (string()) -> pid()). -endif. @@ -488,7 +493,105 @@ unfold(Fun, Acc, Init) -> ceil(N) -> T = trunc(N), - case N - T of - 0 -> N; - _ -> 1 + T + case N == T of + true -> T; + false -> 1 + T + end. + +queue_fold(Fun, Init, Q) -> + case queue:out(Q) of + {empty, _Q} -> Init; + {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) + end. + +%% This provides a string representation of a pid that is the same +%% regardless of what node we are running on. The representation also +%% permits easy identification of the pid's node. +pid_to_string(Pid) when is_pid(Pid) -> + %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and + %% 8.7) + <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>> + = term_to_binary(Pid), + Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), + lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])). + +%% inverse of above +string_to_pid(Str) -> + ErrorFun = fun () -> throw({error, {invalid_pid_syntax, Str}}) end, + %% TODO: simplify this code by using the 're' module, once we drop + %% support for R11 + %% + %% 1) sanity check + %% The \ before the trailing $ is only there to keep emacs + %% font-lock from getting confused. + case regexp:first_match(Str, "^<.*\\.[0-9]+\\.[0-9]+>\$") of + {match, _, _} -> + %% 2) strip <> + Str1 = string:substr(Str, 2, string:len(Str) - 2), + %% 3) extract three constituent parts, taking care to + %% handle dots in the node part (hence the reverse and concat) + [SerStr, IdStr | Rest] = lists:reverse(string:tokens(Str1, ".")), + NodeStr = lists:concat(lists:reverse(Rest)), + %% 4) construct a triple term from the three parts + TripleStr = lists:flatten(io_lib:format("{~s,~s,~s}.", + [NodeStr, IdStr, SerStr])), + %% 5) parse the triple + Tokens = case erl_scan:string(TripleStr) of + {ok, Tokens1, _} -> Tokens1; + {error, _, _} -> ErrorFun() + end, + Term = case erl_parse:parse_term(Tokens) of + {ok, Term1} -> Term1; + {error, _} -> ErrorFun() + end, + {Node, Id, Ser} = + case Term of + {Node1, Id1, Ser1} when is_atom(Node1) andalso + is_integer(Id1) andalso + is_integer(Ser1) -> + Term; + _ -> + ErrorFun() + end, + %% 6) turn the triple into a pid - see pid_to_string + <<131,NodeEnc/binary>> = term_to_binary(Node), + binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>); + nomatch -> + ErrorFun(); + Error -> + %% invalid regexp - shouldn't happen + throw(Error) + end. + +version_compare(A, B, lte) -> + case version_compare(A, B) of + eq -> true; + lt -> true; + gt -> false + end; +version_compare(A, B, gte) -> + case version_compare(A, B) of + eq -> true; + gt -> true; + lt -> false + end; +version_compare(A, B, Result) -> + Result =:= version_compare(A, B). + +version_compare([], []) -> + eq; +version_compare([], _ ) -> + lt; %% 2.3 < 2.3.1 +version_compare(_ , []) -> + gt; %% 2.3.1 > 2.3 +version_compare(A, B) -> + {AStr, ATl} = lists:splitwith(fun (X) -> X =/= $. end, A), + {BStr, BTl} = lists:splitwith(fun (X) -> X =/= $. end, B), + ANum = list_to_integer(AStr), + BNum = list_to_integer(BStr), + if ANum =:= BNum -> ATl1 = lists:dropwhile(fun (X) -> X =:= $. end, ATl), + BTl1 = lists:dropwhile(fun (X) -> X =:= $. end, BTl), + version_compare(ATl1, BTl1); + ANum < BNum -> lt; + ANum > BNum -> gt end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 749038dbb1..6ec3cf74b3 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -55,8 +55,8 @@ -spec(cluster/1 :: ([erlang_node()]) -> 'ok'). -spec(reset/0 :: () -> 'ok'). -spec(force_reset/0 :: () -> 'ok'). --spec(is_clustered/0 :: () -> boolean()). --spec(empty_ram_only_tables/0 :: () -> 'ok'). +-spec(is_clustered/0 :: () -> boolean()). +-spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). -endif. @@ -173,7 +173,7 @@ replicated_table_names() -> ]. dir() -> mnesia:system_info(directory). - + ensure_mnesia_dir() -> MnesiaDir = dir() ++ "/", case filelib:ensure_dir(MnesiaDir) of @@ -389,7 +389,7 @@ wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). wait_for_tables() -> wait_for_tables(table_names()). -wait_for_tables(TableNames) -> +wait_for_tables(TableNames) -> case check_schema_integrity() of ok -> case mnesia:wait_for_tables(TableNames, 30000) of diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index dc642df403..bfafa6ff05 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -187,7 +187,7 @@ start_node(Node, RpcTimeout) -> io:format("Starting node ~s...~n", [Node]), case rpc:call(Node, os, getpid, []) of {badrpc, _} -> - Port = run_cmd(script_filename()), + Port = run_rabbitmq_server(), Started = wait_for_rabbit_to_start(Node, RpcTimeout, Port), Pid = case rpc:call(Node, os, getpid, []) of {badrpc, _} -> throw(cannot_get_pid); @@ -217,8 +217,22 @@ wait_for_rabbit_to_start(Node, RpcTimeout, Port) -> end end. -run_cmd(FullPath) -> - erlang:open_port({spawn, FullPath}, [nouse_stdio]). +run_rabbitmq_server() -> + with_os([{unix, fun run_rabbitmq_server_unix/0}, + {win32, fun run_rabbitmq_server_win32/0}]). + +run_rabbitmq_server_unix() -> + FullPath = getenv("RABBITMQ_SCRIPT_HOME") ++ "/rabbitmq-server", + erlang:open_port({spawn_executable, FullPath}, + [{arg0, FullPath}, {args, ["-noinput"]}, nouse_stdio]). + +run_rabbitmq_server_win32() -> + Cmd = filename:nativename(os:find_executable("cmd")), + CmdLine = "\"" ++ getenv("RABBITMQ_SCRIPT_HOME") + ++ "\\rabbitmq-server.bat\" -noinput", + erlang:open_port({spawn_executable, Cmd}, + [{arg0, Cmd}, {args, ["/q", "/s", "/c", CmdLine]}, + nouse_stdio, hide]). is_rabbit_running(Node, RpcTimeout) -> case rpc:call(Node, rabbit, status, [], RpcTimeout) of @@ -236,13 +250,6 @@ with_os(Handlers) -> Handler -> Handler() end. -script_filename() -> - ScriptHome = getenv("RABBITMQ_SCRIPT_HOME"), - ScriptName = with_os( - [{unix , fun () -> "rabbitmq-server" end}, - {win32, fun () -> "rabbitmq-server.bat" end}]), - ScriptHome ++ "/" ++ ScriptName ++ " -noinput". - pids_file() -> getenv("RABBITMQ_PIDS_FILE"). write_pids_file(Pids) -> diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index a5ccc8e9ae..406977b42a 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -40,7 +40,7 @@ -ifdef(use_specs). --type(stat_option() :: +-type(stat_option() :: 'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' | 'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend'). -type(error() :: {'error', any()}). @@ -50,11 +50,11 @@ -spec(controlling_process/2 :: (socket(), pid()) -> 'ok' | error()). -spec(port_command/2 :: (socket(), iolist()) -> 'true'). -spec(send/2 :: (socket(), binary() | iolist()) -> 'ok' | error()). --spec(peername/1 :: (socket()) -> +-spec(peername/1 :: (socket()) -> {'ok', {ip_address(), non_neg_integer()}} | error()). --spec(sockname/1 :: (socket()) -> +-spec(sockname/1 :: (socket()) -> {'ok', {ip_address(), non_neg_integer()}} | error()). --spec(getstat/2 :: (socket(), [stat_option()]) -> +-spec(getstat/2 :: (socket(), [stat_option()]) -> {'ok', [{stat_option(), integer()}]} | error()). -endif. @@ -66,8 +66,8 @@ async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) -> Pid = self(), Ref = make_ref(), - spawn(fun() -> Pid ! {inet_async, Sock, Ref, - ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)} + spawn(fun() -> Pid ! {inet_async, Sock, Ref, + ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)} end), {ok, Ref}; diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 84658a85c6..cf04f05b7e 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -31,11 +31,13 @@ -module(rabbit_networking). --export([boot/0, start/0, start_tcp_listener/2, start_ssl_listener/3, - stop_tcp_listener/2, on_node_down/1, active_listeners/0, - node_listeners/1, connections/0, connection_info/1, - connection_info/2, connection_info_all/0, - connection_info_all/1]). +-export([boot/0, start/0, start_tcp_listener/2, start_ssl_listener/3, + stop_tcp_listener/2, on_node_down/1, active_listeners/0, + node_listeners/1, connections/0, connection_info_keys/0, + connection_info/1, connection_info/2, + connection_info_all/0, connection_info_all/1, + close_connection/2]). + %%used by TCP-based transports, e.g. STOMP adapter -export([check_tcp_listener_address/3]). @@ -46,11 +48,11 @@ -include_lib("kernel/include/inet.hrl"). -define(RABBIT_TCP_OPTS, [ - binary, - {packet, raw}, % no packaging - {reuseaddr, true}, % allow rebind without waiting - %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg. - %% {delay_send, true}, + binary, + {packet, raw}, % no packaging + {reuseaddr, true}, % allow rebind without waiting + %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg. + %% {delay_send, true}, {exit_on_close, false} ]). @@ -70,10 +72,12 @@ -spec(active_listeners/0 :: () -> [listener()]). -spec(node_listeners/1 :: (erlang_node()) -> [listener()]). -spec(connections/0 :: () -> [connection()]). +-spec(connection_info_keys/0 :: () -> [info_key()]). -spec(connection_info/1 :: (connection()) -> [info()]). -spec(connection_info/2 :: (connection(), [info_key()]) -> [info()]). -spec(connection_info_all/0 :: () -> [[info()]]). -spec(connection_info_all/1 :: ([info_key()]) -> [[info()]]). +-spec(close_connection/2 :: (pid(), string()) -> 'ok'). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(check_tcp_listener_address/3 :: (atom(), host(), ip_port()) -> {ip_address(), atom()}). @@ -206,7 +210,7 @@ start_ssl_client(SslOpts, Sock) -> {error, {ssl_upgrade_error, Reason}}; {'EXIT', Reason} -> {error, {ssl_upgrade_failure, Reason}} - + end end). @@ -214,12 +218,21 @@ connections() -> [Pid || {_, Pid, _, _} <- supervisor:which_children( rabbit_tcp_client_sup)]. +connection_info_keys() -> rabbit_reader:info_keys(). + connection_info(Pid) -> rabbit_reader:info(Pid). connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items). connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end). connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end). +close_connection(Pid, Explanation) -> + case lists:any(fun ({_, ChildPid, _, _}) -> ChildPid =:= Pid end, + supervisor:which_children(rabbit_tcp_client_sup)) of + true -> rabbit_reader:shutdown(Pid, Explanation); + false -> throw({error, {not_a_connection_pid, Pid}}) + end. + %%-------------------------------------------------------------------- tcp_host({0,0,0,0}) -> diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 14a69a472e..f3013a16cf 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index d0d60ddf3d..019d2a269d 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -57,7 +57,7 @@ -record(pstate, {log_handle, entry_count, deadline, pending_logs, pending_replies, - snapshot}). + snapshot}). %% two tables for efficient persistency %% one maps a key to a message @@ -166,7 +166,7 @@ handle_call({transaction, Key, MessageList}, From, State) -> do_noreply(internal_commit(From, Key, NewState)); handle_call({commit_transaction, TxnKey}, From, State) -> do_noreply(internal_commit(From, TxnKey, State)); -handle_call(force_snapshot, _From, State) -> +handle_call(force_snapshot, _From, State) -> do_reply(ok, flush(true, State)); handle_call(serial, _From, State = #pstate{snapshot = #psnapshot{serial = Serial}}) -> @@ -211,7 +211,7 @@ internal_dirty_work(MessageList, State) -> log_work(fun (ML) -> {dirty_work, ML} end, MessageList, State). -internal_commit(From, Key, State = #pstate{snapshot = Snapshot}) -> +internal_commit(From, Key, State = #pstate{snapshot = Snapshot}) -> Unit = {commit_transaction, Key}, NewSnapshot = internal_integrate1(Unit, Snapshot), complete(From, Unit, State#pstate{snapshot = NewSnapshot}). @@ -243,7 +243,7 @@ log_work(CreateWorkUnit, MessageList, fun(M = {publish, Message, QK = {_QName, PKey}}) -> case ets:lookup(Messages, PKey) of [_] -> {tied, QK}; - [] -> ets:insert(Messages, {PKey, Message}), + [] -> ets:insert(Messages, {PKey, Message}), M end; (M) -> M @@ -252,7 +252,7 @@ log_work(CreateWorkUnit, MessageList, NewSnapshot = internal_integrate1(Unit, Snapshot), log(State#pstate{snapshot = NewSnapshot}, Unit). -log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs}, +log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs}, Message) -> State#pstate{deadline = compute_deadline(?LOG_BUNDLE_DELAY, ExistingDeadline), @@ -365,7 +365,7 @@ prune_table(Tab, Keys) -> true = ets:safe_fixtable(Tab, true), ok = prune_table(Tab, Keys, ets:first(Tab)), true = ets:safe_fixtable(Tab, false). - + prune_table(_Tab, _Keys, '$end_of_table') -> ok; prune_table(Tab, Keys, Key) -> case sets:is_element(Key, Keys) of @@ -374,7 +374,7 @@ prune_table(Tab, Keys, Key) -> end, prune_table(Tab, Keys, ets:next(Tab, Key)). -internal_load_snapshot(LogHandle, +internal_load_snapshot(LogHandle, Snapshot = #psnapshot{messages = Messages, queues = Queues}) -> {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start), @@ -435,9 +435,9 @@ accumulate_requeues({{QName, PKey}, Delivered}, Acc) -> requeue(QName, Requeues, Messages) -> case rabbit_amqqueue:lookup(QName) of {ok, #amqqueue{pid = QPid}} -> - RequeueMessages = + RequeueMessages = [{{QName, PKey}, Message, Delivered} || - {PKey, Delivered} <- Requeues, + {PKey, Delivered} <- Requeues, {_, Message} <- ets:lookup(Messages, PKey)], rabbit_amqqueue:redeliver( QPid, @@ -459,7 +459,7 @@ replay([], LogHandle, K, Snapshot) -> {K1, Items} -> replay(Items, LogHandle, K1, Snapshot); {K1, Items, Badbytes} -> - rabbit_log:warning("~p bad bytes recovering persister log~n", + rabbit_log:warning("~p bad bytes recovering persister log~n", [Badbytes]), replay(Items, LogHandle, K1, Snapshot); eof -> Snapshot diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index 4fcfab7895..274981efed 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 77dce1fab3..738f7e3f9e 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -33,7 +33,7 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/0, info/1, info/2]). +-export([start_link/0, info_keys/0, info/1, info/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -132,8 +132,10 @@ -ifdef(use_specs). +-spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (pid()) -> [info()]). -spec(info/2 :: (pid(), [info_key()]) -> [info()]). +-spec(shutdown/2 :: (pid(), string()) -> 'ok'). -endif. @@ -142,6 +144,9 @@ start_link() -> {ok, proc_lib:spawn_link(?MODULE, init, [self()])}. +shutdown(Pid, Explanation) -> + gen_server:call(Pid, {shutdown, Explanation}, infinity). + init(Parent) -> Deb = sys:debug_options([]), receive @@ -158,6 +163,8 @@ system_terminate(Reason, _Parent, _Deb, _State) -> system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. +info_keys() -> ?INFO_KEYS. + info(Pid) -> gen_server:call(Pid, info, infinity). @@ -196,7 +203,7 @@ teardown_profiling(Value) -> inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). -socket_op(Sock, Fun) -> +socket_op(Sock, Fun) -> case Fun(Sock) of {ok, Res} -> Res; {error, Reason} -> rabbit_log:error("error on TCP connection ~p:~p~n", @@ -216,7 +223,7 @@ start_connection(Parent, Deb, Sock, SockTransform) -> erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), ProfilingValue = setup_profiling(), - try + try mainloop(Parent, Deb, switch_callback( #v1{sock = ClientSock, connection = #connection{ @@ -267,14 +274,9 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> {inet_async, Sock, Ref, {error, Reason}} -> throw({inet_error, Reason}); {'EXIT', Parent, Reason} -> - if State#v1.connection_state =:= running -> - send_exception(State, 0, - rabbit_misc:amqp_error(connection_forced, - "broker forced connection closure with reason '~w'", - [Reason], none)); - true -> ok - end, - %% this is what we are expected to do according to + terminate(io_lib:format("broker forced connection closure " + "with reason '~w'", [Reason]), State), + %% this is what we are expected to do according to %% http://www.erlang.org/doc/man/sys.html %% %% If we wanted to be *really* nice we should wait for a @@ -301,6 +303,13 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> end; timeout -> throw({timeout, State#v1.connection_state}); + {'$gen_call', From, {shutdown, Explanation}} -> + {ForceTermination, NewState} = terminate(Explanation, State), + gen_server:reply(From, ok), + case ForceTermination of + force -> ok; + normal -> mainloop(Parent, Deb, NewState) + end; {'$gen_call', From, info} -> gen_server:reply(From, infos(?INFO_KEYS, State)), mainloop(Parent, Deb, State); @@ -323,6 +332,13 @@ switch_callback(OldState, NewCallback, Length) -> OldState#v1{callback = NewCallback, recv_ref = Ref}. +terminate(Explanation, State = #v1{connection_state = running}) -> + {normal, send_exception(State, 0, + rabbit_misc:amqp_error( + connection_forced, Explanation, [], none))}; +terminate(_Explanation, State) -> + {force, State}. + close_connection(State = #v1{connection = #connection{ timeout_sec = TimeoutSec}}) -> %% We terminate the connection after the specified interval, but @@ -648,7 +664,7 @@ i(peer_port, #v1{sock = Sock}) -> {ok, {_, P}} = rabbit_net:peername(Sock), P; i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct; - SockStat =:= recv_cnt; + SockStat =:= recv_cnt; SockStat =:= send_oct; SockStat =:= send_cnt; SockStat =:= send_pend -> diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 10f80cc301..ee2b82c5bd 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl index 2a365ce10e..434cdae050 100644 --- a/src/rabbit_sasl_report_file_h.erl +++ b/src/rabbit_sasl_report_file_h.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index ef32544cc4..a1b8948155 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 416827cb80..8602a2099d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -49,6 +49,7 @@ test_content_prop_roundtrip(Datum, Binary) -> all_tests() -> passed = test_priority_queue(), + passed = test_pg_local(), passed = test_unfold(), passed = test_parsing(), passed = test_content_framing(), @@ -106,7 +107,7 @@ test_priority_queue() -> {true, false, 2, [{1, bar}, {0, foo}], [bar, foo]} = test_priority_queue(Q6), - %% merge 1-element priority Q with 1-element no-priority Q + %% merge 1-element priority Q with 1-element no-priority Q Q7 = priority_queue:join(priority_queue:in(foo, 1, Q), priority_queue:in(bar, Q)), {true, false, 2, [{1, foo}, {0, bar}], [foo, bar]} = @@ -184,6 +185,28 @@ test_simple_n_element_queue(N) -> {true, false, N, ToListRes, Items} = test_priority_queue(Q), passed. +test_pg_local() -> + [P, Q] = [spawn(fun () -> receive X -> X end end) || _ <- [x, x]], + check_pg_local(ok, [], []), + check_pg_local(pg_local:join(a, P), [P], []), + check_pg_local(pg_local:join(b, P), [P], [P]), + check_pg_local(pg_local:join(a, P), [P, P], [P]), + check_pg_local(pg_local:join(a, Q), [P, P, Q], [P]), + check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q]), + check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q, Q]), + check_pg_local(pg_local:leave(a, P), [P, Q], [P, Q, Q]), + check_pg_local(pg_local:leave(b, P), [P, Q], [Q, Q]), + check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]), + check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]), + [X ! done || X <- [P, Q]], + check_pg_local(ok, [], []), + passed. + +check_pg_local(ok, APids, BPids) -> + ok = pg_local:sync(), + [true, true] = [lists:sort(Pids) == lists:sort(pg_local:get_members(Key)) || + {Key, Pids} <- [{a, APids}, {b, BPids}]]. + test_unfold() -> {[], test} = rabbit_misc:unfold(fun (_V) -> false end, test), List = lists:seq(2,20,2), @@ -291,7 +314,7 @@ test_field_values() -> 4,"long", "l", 1234567890:64, % + 14 = 145 5,"short", "s", 655:16, % + 9 = 154 4,"bool", "t", 1, % + 7 = 161 - 6,"binary", "x", 15:32, "a binary string", % + 27 = 188 + 6,"binary", "x", 15:32, "a binary string", % + 27 = 188 4,"void", "V", % + 6 = 194 5,"array", "A", 23:32, % + 11 = 205 "I", 54321:32, % + 5 = 210 @@ -463,7 +486,7 @@ test_log_management_during_startup() -> {sasl_report_tty_h, []}]), ok = control_action(start_app, []), - %% start application with tty logging and + %% start application with tty logging and %% proper handlers not installed ok = control_action(stop_app, []), ok = error_logger:tty(false), @@ -495,7 +518,7 @@ test_log_management_during_startup() -> ok = add_log_handlers([{error_logger_file_h, MainLog}]), ok = case control_action(start_app, []) of ok -> exit({got_success_but_expected_failure, - log_rotation_no_write_permission_dir_test}); + log_rotation_no_write_permission_dir_test}); {error, {cannot_log_to_file, _, _}} -> ok end, @@ -516,7 +539,7 @@ test_log_management_during_startup() -> ok = file:del_dir(TmpDir), %% start application with standard error_logger_file_h - %% handler not installed + %% handler not installed ok = application:set_env(kernel, error_logger, {file, MainLog}), ok = control_action(start_app, []), ok = control_action(stop_app, []), @@ -624,7 +647,7 @@ test_cluster_management2(SecondaryNode) -> ok = control_action(cluster, [SecondaryNodeS, NodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), - + %% convert a disk node into a ram node ok = control_action(cluster, ["invalid1@invalid", "invalid2@invalid"]), @@ -728,46 +751,48 @@ test_user_management() -> passed. test_server_status() -> - - %% create a queue so we have something to list - Q = #amqqueue{} = rabbit_amqqueue:declare( - rabbit_misc:r(<<"/">>, queue, <<"foo">>), - false, false, [], none), - + %% create a few things so there is some useful information to list + Writer = spawn(fun () -> receive shutdown -> ok end end), + Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>), + [Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare( + rabbit_misc:r(<<"/">>, queue, Name), + false, false, [], none) || + Name <- [<<"foo">>, <<"bar">>]], + + ok = rabbit_amqqueue:basic_consume(Q, true, self(), Ch, undefined, + <<"ctag">>, true, undefined), %% list queues - ok = info_action( - list_queues, - [name, durable, auto_delete, arguments, pid, - messages_ready, messages_unacknowledged, messages_uncommitted, - messages, acks_uncommitted, consumers, transactions, memory], - true), + ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true), %% list exchanges - ok = info_action( - list_exchanges, - [name, type, durable, arguments], - true), + ok = info_action(list_exchanges, rabbit_exchange:info_keys(), true), %% list bindings ok = control_action(list_bindings, []), - %% cleanup - {ok, _} = rabbit_amqqueue:delete(Q, false, false), - %% list connections [#listener{host = H, port = P} | _] = [L || L = #listener{node = N} <- rabbit_networking:active_listeners(), N =:= node()], - {ok, C} = gen_tcp:connect(H, P, []), + {ok, _C} = gen_tcp:connect(H, P, []), timer:sleep(100), - ok = info_action( - list_connections, - [pid, address, port, peer_address, peer_port, state, - channels, user, vhost, timeout, frame_max, - recv_oct, recv_cnt, send_oct, send_cnt, send_pend], - false), - ok = gen_tcp:close(C), + ok = info_action(list_connections, + rabbit_networking:connection_info_keys(), false), + %% close_connection + [ConnPid] = rabbit_networking:connections(), + ok = control_action(close_connection, [rabbit_misc:pid_to_string(ConnPid), + "go away"]), + + %% list channels + ok = info_action(list_channels, rabbit_channel:info_keys(), false), + + %% list consumers + ok = control_action(list_consumers, []), + + %% cleanup + [{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]], + ok = rabbit_channel:shutdown(Ch), passed. @@ -800,11 +825,11 @@ test_hooks() -> {[arg1, arg2], 1, 3} = get(arg_hook_test_fired), %% Invoking Pids - Remote = fun() -> - receive - {rabbitmq_hook,[remote_test,test,[],Target]} -> + Remote = fun() -> + receive + {rabbitmq_hook,[remote_test,test,[],Target]} -> Target ! invoked - end + end end, P = spawn(Remote), rabbit_hooks:subscribe(remote_test, test, {rabbit_hooks, notify_remote, [P, [self()]]}), @@ -830,7 +855,7 @@ control_action(Command, Node, Args) -> ok -> io:format("done.~n"), ok; - Other -> + Other -> io:format("failed.~n"), Other end. diff --git a/src/rabbit_tracer.erl b/src/rabbit_tracer.erl index 0c023f2aab..484249b1df 100644 --- a/src/rabbit_tracer.erl +++ b/src/rabbit_tracer.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 1679ce7c15..54c60f5be0 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -59,7 +59,7 @@ (pid(), pid(), pid(), amqp_method(), content()) -> 'ok'). -spec(internal_send_command/3 :: (socket(), channel_number(), amqp_method()) -> 'ok'). --spec(internal_send_command/5 :: +-spec(internal_send_command/5 :: (socket(), channel_number(), amqp_method(), content(), non_neg_integer()) -> 'ok'). diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index bc7425613f..68efc27f97 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -48,14 +48,15 @@ start_link(Callback, LSock) -> %%-------------------------------------------------------------------- init({Callback, LSock}) -> - case prim_inet:async_accept(LSock, -1) of - {ok, Ref} -> {ok, #state{callback=Callback, sock=LSock, ref=Ref}}; - Error -> {stop, {cannot_accept, Error}} - end. + gen_server:cast(self(), accept), + {ok, #state{callback=Callback, sock=LSock}}. handle_call(_Request, _From, State) -> {noreply, State}. +handle_cast(accept, State) -> + accept(State); + handle_cast(_Msg, State) -> {noreply, State}. @@ -63,7 +64,7 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, State = #state{callback={M,F,A}, sock=LSock, ref=Ref}) -> %% patch up the socket so it looks like one we got from - %% gen_tcp:accept/1 + %% gen_tcp:accept/1 {ok, Mod} = inet_db:lookup_socket(LSock), inet_db:register_socket(Sock, Mod), @@ -83,10 +84,7 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, end, %% accept more - case prim_inet:async_accept(LSock, -1) of - {ok, NRef} -> {noreply, State#state{ref=NRef}}; - Error -> {stop, {cannot_accept, Error}, none} - end; + accept(State); handle_info({inet_async, LSock, Ref, {error, closed}}, State=#state{sock=LSock, ref=Ref}) -> %% It would be wrong to attempt to restart the acceptor when we @@ -104,3 +102,9 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). + +accept(State = #state{sock=LSock}) -> + case prim_inet:async_accept(LSock, -1) of + {ok, Ref} -> {noreply, State#state{ref=Ref}}; + Error -> {stop, {cannot_accept, Error}, State} + end. diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl index f2bad5bc2e..6e3bc4c966 100644 --- a/src/tcp_acceptor_sup.erl +++ b/src/tcp_acceptor_sup.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/tcp_client_sup.erl b/src/tcp_client_sup.erl index d92066a6c3..1b78584384 100644 --- a/src/tcp_client_sup.erl +++ b/src/tcp_client_sup.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index 4a2e149bb8..73ef9586bf 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -69,7 +69,7 @@ init({IPAddress, Port, SocketOpts, [Label, inet_parse:ntoa(LIPAddress), LPort]), apply(M, F, A ++ [IPAddress, Port]), {ok, #state{sock = LSock, - on_startup = OnStartup, on_shutdown = OnShutdown, + on_startup = OnStartup, on_shutdown = OnShutdown, label = Label}}; {error, Reason} -> error_logger:error_msg( diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index d6bbac080f..0fe1542616 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index 8be28f523d..cd03fcc6e6 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -37,8 +37,6 @@ %% %% This module tries to warn Rabbit before such situations occur, so %% that it has a higher chance to avoid running out of memory. -%% -%% This code depends on Erlang os_mon application. -module(vm_memory_monitor). @@ -51,7 +49,8 @@ -export([update/0, get_total_memory/0, get_check_interval/0, set_check_interval/1, - get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1]). + get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1, + get_memory_limit/0]). -define(SERVER, ?MODULE). @@ -77,6 +76,8 @@ ('ignore' | {'error', any()} | {'ok', pid()})). -spec(update/0 :: () -> 'ok'). -spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')). +-spec(get_vm_limit/0 :: () -> (non_neg_integer() | 'unknown')). +-spec(get_memory_limit/0 :: () -> (non_neg_integer() | 'undefined')). -spec(get_check_interval/0 :: () -> non_neg_integer()). -spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok'). -spec(get_vm_memory_high_watermark/0 :: () -> float()). @@ -95,17 +96,24 @@ update() -> get_total_memory() -> get_total_memory(os:type()). +get_vm_limit() -> + get_vm_limit(os:type()). + get_check_interval() -> - gen_server:call(?MODULE, get_check_interval). + gen_server:call(?MODULE, get_check_interval, infinity). set_check_interval(Fraction) -> - gen_server:call(?MODULE, {set_check_interval, Fraction}). + gen_server:call(?MODULE, {set_check_interval, Fraction}, infinity). get_vm_memory_high_watermark() -> - gen_server:call(?MODULE, get_vm_memory_high_watermark). + gen_server:call(?MODULE, get_vm_memory_high_watermark, infinity). set_vm_memory_high_watermark(Fraction) -> - gen_server:call(?MODULE, {set_vm_memory_high_watermark, Fraction}). + gen_server:call(?MODULE, {set_vm_memory_high_watermark, Fraction}, + infinity). + +get_memory_limit() -> + gen_server:call(?MODULE, get_memory_limit, infinity). %%---------------------------------------------------------------------------- %% gen_server callbacks @@ -152,6 +160,9 @@ handle_call({set_check_interval, Timeout}, _From, State) -> {ok, cancel} = timer:cancel(State#state.timer), {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; +handle_call(get_memory_limit, _From, State) -> + {reply, State#state.memory_limit, State}; + handle_call(_Request, _From, State) -> {noreply, State}. @@ -199,17 +210,26 @@ start_timer(Timeout) -> {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []), TRef. +%% According to http://msdn.microsoft.com/en-us/library/aa366778(VS.85).aspx +%% Windows has 2GB and 8TB of address space for 32 and 64 bit accordingly. +get_vm_limit({win32,_OSname}) -> + case erlang:system_info(wordsize) of + 4 -> 2*1024*1024*1024; %% 2 GB for 32 bits 2^31 + 8 -> 8*1024*1024*1024*1024 %% 8 TB for 64 bits 2^42 + end; + %% On a 32-bit machine, if you're using more than 2 gigs of RAM you're %% in big trouble anyway. -get_vm_limit() -> +get_vm_limit(_OsType) -> case erlang:system_info(wordsize) of - 4 -> 4294967296; %% 4 GB for 32 bits 2^32 - 8 -> 281474976710656 %% 256 TB for 64 bits 2^48 + 4 -> 4*1024*1024*1024; %% 4 GB for 32 bits 2^32 + 8 -> 256*1024*1024*1024*1024 %% 256 TB for 64 bits 2^48 %%http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details end. get_mem_limit(MemFraction, TotalMemory) -> - lists:min([trunc(TotalMemory * MemFraction), get_vm_limit()]). + AvMem = lists:min([TotalMemory, get_vm_limit()]), + trunc(AvMem * MemFraction). %%---------------------------------------------------------------------------- %% Internal Helpers @@ -241,11 +261,29 @@ get_total_memory({unix,freebsd}) -> PageCount * PageSize; get_total_memory({win32,_OSname}) -> - [Result|_] = os_mon_sysinfo:get_mem_info(), - {ok, [_MemLoad, TotPhys, _AvailPhys, - _TotPage, _AvailPage, _TotV, _AvailV], _RestStr} = - io_lib:fread("~d~d~d~d~d~d~d", Result), - TotPhys; + %% Due to the Erlang print format bug, on Windows boxes the memory + %% size is broken. For example Windows 7 64 bit with 4Gigs of RAM + %% we get negative memory size: + %% > os_mon_sysinfo:get_mem_info(). + %% ["76 -1658880 1016913920 -1 -1021628416 2147352576 2134794240\n"] + %% Due to this bug, we don't actually know anything. Even if the + %% number is postive we can't be sure if it's correct. This only + %% affects us on os_mon versions prior to 2.2.1. + case application:get_key(os_mon, vsn) of + undefined -> + unknown; + {ok, Version} -> + case rabbit_misc:version_compare(Version, "2.2.1", lt) of + true -> %% os_mon is < 2.2.1, so we know nothing + unknown; + false -> + [Result|_] = os_mon_sysinfo:get_mem_info(), + {ok, [_MemLoad, TotPhys, _AvailPhys, + _TotPage, _AvailPage, _TotV, _AvailV], _RestStr} = + io_lib:fread("~d~d~d~d~d~d~d", Result), + TotPhys + end + end; get_total_memory({unix, linux}) -> File = read_proc_file("/proc/meminfo"), |
