summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.hgignore2
-rw-r--r--LICENSE-MPL-RabbitMQ6
-rw-r--r--Makefile21
-rw-r--r--codegen.py49
-rw-r--r--docs/rabbitmqctl.1.pod112
-rw-r--r--generate_deps52
-rw-r--r--include/rabbit.hrl14
-rw-r--r--include/rabbit_framing_spec.hrl6
-rw-r--r--packaging/RPMS/Fedora/Makefile2
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/common/rabbitmq-asroot-script-wrapper6
-rw-r--r--packaging/common/rabbitmq-script-wrapper8
-rw-r--r--packaging/debs/Debian/Makefile2
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rwxr-xr-xpackaging/debs/Debian/debian/copyright16
-rw-r--r--packaging/macports/Makefile55
-rw-r--r--packaging/macports/Portfile.in (renamed from packaging/macports/net/rabbitmq-server/Portfile)16
-rw-r--r--packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper12
-rw-r--r--packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper15
-rw-r--r--packaging/macports/patch-org.macports.rabbitmq-server.plist.diff (renamed from packaging/macports/net/rabbitmq-server/files/patch-org.macports.rabbitmq-server.plist.diff)0
-rw-r--r--packaging/windows/Makefile2
-rwxr-xr-xscripts/rabbitmq-activate-plugins6
-rw-r--r--scripts/rabbitmq-activate-plugins.bat33
-rwxr-xr-xscripts/rabbitmq-deactivate-plugins6
-rw-r--r--scripts/rabbitmq-deactivate-plugins.bat16
-rwxr-xr-xscripts/rabbitmq-env6
-rwxr-xr-xscripts/rabbitmq-multi16
-rw-r--r--[-rwxr-xr-x]scripts/rabbitmq-multi.bat51
-rwxr-xr-xscripts/rabbitmq-server18
-rw-r--r--[-rwxr-xr-x]scripts/rabbitmq-server.bat117
-rw-r--r--[-rwxr-xr-x]scripts/rabbitmq-service.bat206
-rwxr-xr-xscripts/rabbitmqctl6
-rw-r--r--[-rwxr-xr-x]scripts/rabbitmqctl.bat19
-rw-r--r--src/gen_server2.erl4
-rw-r--r--src/pg_local.erl213
-rw-r--r--src/priority_queue.erl6
-rw-r--r--src/rabbit.erl56
-rw-r--r--src/rabbit_access_control.erl6
-rw-r--r--src/rabbit_alarm.erl6
-rw-r--r--src/rabbit_amqqueue.erl34
-rw-r--r--src/rabbit_amqqueue_process.erl187
-rw-r--r--src/rabbit_amqqueue_sup.erl6
-rw-r--r--src/rabbit_basic.erl8
-rw-r--r--src/rabbit_binary_generator.erl44
-rw-r--r--src/rabbit_binary_parser.erl8
-rw-r--r--src/rabbit_channel.erl168
-rw-r--r--src/rabbit_control.erl104
-rw-r--r--src/rabbit_dialyzer.erl6
-rw-r--r--src/rabbit_error_logger.erl6
-rw-r--r--src/rabbit_error_logger_file_h.erl8
-rw-r--r--src/rabbit_exchange.erl25
-rw-r--r--src/rabbit_framing_channel.erl8
-rw-r--r--src/rabbit_guid.erl6
-rw-r--r--src/rabbit_heartbeat.erl6
-rw-r--r--src/rabbit_hooks.erl10
-rw-r--r--src/rabbit_limiter.erl34
-rw-r--r--src/rabbit_load.erl6
-rw-r--r--src/rabbit_log.erl6
-rw-r--r--src/rabbit_misc.erl117
-rw-r--r--src/rabbit_mnesia.erl14
-rw-r--r--src/rabbit_multi.erl33
-rw-r--r--src/rabbit_net.erl18
-rw-r--r--src/rabbit_networking.erl41
-rw-r--r--src/rabbit_node_monitor.erl6
-rw-r--r--src/rabbit_persister.erl26
-rw-r--r--src/rabbit_plugin_activator.erl6
-rw-r--r--src/rabbit_reader.erl46
-rw-r--r--src/rabbit_router.erl6
-rw-r--r--src/rabbit_sasl_report_file_h.erl6
-rw-r--r--src/rabbit_sup.erl6
-rw-r--r--src/rabbit_tests.erl107
-rw-r--r--src/rabbit_tracer.erl6
-rw-r--r--src/rabbit_writer.erl8
-rw-r--r--src/tcp_acceptor.erl28
-rw-r--r--src/tcp_acceptor_sup.erl6
-rw-r--r--src/tcp_client_sup.erl6
-rw-r--r--src/tcp_listener.erl8
-rw-r--r--src/tcp_listener_sup.erl6
-rw-r--r--src/vm_memory_monitor.erl76
79 files changed, 1685 insertions, 767 deletions
diff --git a/.hgignore b/.hgignore
index ccd0b09f08..442425f62c 100644
--- a/.hgignore
+++ b/.hgignore
@@ -4,6 +4,7 @@ syntax: glob
*.swp
*.patch
erl_crash.dump
+deps.mk
syntax: regexp
^cover/
@@ -19,6 +20,7 @@ syntax: regexp
^packaging/RPMS/Fedora/(BUILD|RPMS|SOURCES|SPECS|SRPMS)$
^packaging/debs/Debian/rabbitmq-server_.*\.(dsc|(diff|tar)\.gz|deb|changes)$
^packaging/debs/apt-repository/debian$
+^packaging/macports/macports$
^packaging/generic-unix/rabbitmq-server-generic-unix-.*\.tar\.gz$
^packaging/windows/rabbitmq-server-windows-.*\.zip$
diff --git a/LICENSE-MPL-RabbitMQ b/LICENSE-MPL-RabbitMQ
index 2d0a7b1db2..221c93501d 100644
--- a/LICENSE-MPL-RabbitMQ
+++ b/LICENSE-MPL-RabbitMQ
@@ -454,11 +454,11 @@ EXHIBIT A -Mozilla Public License.
are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
Technologies LLC, and Rabbit Technologies Ltd.
- Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+ Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
Ltd. Portions created by Cohesive Financial Technologies LLC are
- Copyright (C) 2007-2009 Cohesive Financial Technologies
+ Copyright (C) 2007-2010 Cohesive Financial Technologies
LLC. Portions created by Rabbit Technologies Ltd are Copyright
- (C) 2007-2009 Rabbit Technologies Ltd.
+ (C) 2007-2010 Rabbit Technologies Ltd.
All Rights Reserved.
diff --git a/Makefile b/Makefile
index 988aaab07c..7ed4611c63 100644
--- a/Makefile
+++ b/Makefile
@@ -6,12 +6,14 @@ RABBITMQ_SERVER_START_ARGS ?=
RABBITMQ_MNESIA_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-mnesia
RABBITMQ_LOG_BASE ?= $(TMPDIR)
+DEPS_FILE=deps.mk
SOURCE_DIR=src
EBIN_DIR=ebin
INCLUDE_DIR=include
-SOURCES=$(wildcard $(SOURCE_DIR)/*.erl)
-BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES))
-TARGETS=$(EBIN_DIR)/rabbit.app $(BEAM_TARGETS)
+INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl
+SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl
+BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES))
+TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS)
WEB_URL=http://stage.rabbitmq.com/
MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod))
@@ -58,13 +60,13 @@ ERL_EBIN=erl -noinput -pa $(EBIN_DIR)
all: $(TARGETS)
+$(DEPS_FILE): $(SOURCES) $(INCLUDES)
+ escript generate_deps $(INCLUDE_DIR) $(SOURCE_DIR) \$$\(EBIN_DIR\) $@
+
$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
escript generate_app $(EBIN_DIR) $@ < $<
-$(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl
- erlc $(ERLC_OPTS) $<
-
-$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam
+$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl
erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
@@ -100,6 +102,7 @@ clean:
rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc
rm -f docs/*.[0-9].gz
rm -f $(RABBIT_PLT)
+ rm -f $(DEPS_FILE)
cleandb:
rm -rf $(RABBITMQ_MNESIA_DIR)/*
@@ -170,7 +173,7 @@ srcdist: distclean
sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save
cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/
- cp codegen.py Makefile generate_app calculate-relative $(TARGET_SRC_DIR)
+ cp codegen.py Makefile generate_app generate_deps calculate-relative $(TARGET_SRC_DIR)
cp -r scripts $(TARGET_SRC_DIR)
cp -r docs $(TARGET_SRC_DIR)
@@ -220,3 +223,5 @@ install: all docs_all install_dirs
install_dirs:
mkdir -p $(SBIN_DIR)
mkdir -p $(TARGET_DIR)/sbin
+
+-include $(DEPS_FILE)
diff --git a/codegen.py b/codegen.py
index 3608f4c200..b5c91cbd3f 100644
--- a/codegen.py
+++ b/codegen.py
@@ -18,11 +18,11 @@
## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
## Technologies LLC, and Rabbit Technologies Ltd.
##
-## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
## Ltd. Portions created by Cohesive Financial Technologies LLC are
-## Copyright (C) 2007-2009 Cohesive Financial Technologies
+## Copyright (C) 2007-2010 Cohesive Financial Technologies
## LLC. Portions created by Rabbit Technologies Ltd are Copyright
-## (C) 2007-2009 Rabbit Technologies Ltd.
+## (C) 2007-2010 Rabbit Technologies Ltd.
##
## All Rights Reserved.
##
@@ -92,6 +92,40 @@ class PackedMethodBitField:
def full(self):
return self.count() == 8
+
+def printFileHeader():
+ print """%% Autogenerated code. Do not edit.
+%%
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%"""
def genErl(spec):
def erlType(domain):
@@ -180,6 +214,8 @@ def genErl(spec):
elif type == 'table':
print " F%d = rabbit_binary_parser:parse_table(F%dTab)," % \
(f.index, f.index)
+ elif type == 'shortstr':
+ print " if F%dLen > 255 -> exit(method_field_shortstr_overflow); true -> ok end," % (f.index)
else:
pass
@@ -212,7 +248,10 @@ def genErl(spec):
elif type == 'table':
print " F%dTab = rabbit_binary_generator:generate_table(F%d)," % (f.index, f.index)
print " F%dLen = size(F%dTab)," % (f.index, f.index)
- elif type in ['shortstr', 'longstr']:
+ elif type == 'shortstr':
+ print " F%dLen = size(F%d)," % (f.index, f.index)
+ print " if F%dLen > 255 -> exit(method_field_shortstr_overflow); true -> ok end," % (f.index)
+ elif type == 'longstr':
print " F%dLen = size(F%d)," % (f.index, f.index)
else:
pass
@@ -251,6 +290,7 @@ def genErl(spec):
methods = spec.allMethods()
+ printFileHeader()
print """-module(rabbit_framing).
-include("rabbit_framing.hrl").
@@ -325,6 +365,7 @@ def genHrl(spec):
methods = spec.allMethods()
+ printFileHeader()
print "-define(PROTOCOL_VERSION_MAJOR, %d)." % (spec.major)
print "-define(PROTOCOL_VERSION_MINOR, %d)." % (spec.minor)
print "-define(PROTOCOL_VERSION_REVISION, %d)." % (spec.revision)
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod
index 5255be28a0..e26767ab4f 100644
--- a/docs/rabbitmqctl.1.pod
+++ b/docs/rabbitmqctl.1.pod
@@ -98,6 +98,13 @@ nodes determined by I<clusternode> option(s). See
L<http://www.rabbitmq.com/clustering.html> for more information about
clustering.
+=item close_connection I<connectionpid> I<explanation>
+
+Instruct the broker to close the connection associated with the Erlang
+process id I<connectionpid> (see also the I<list_connections>
+command), passing the I<explanation> string to the connected client as
+part of the AMQP connection shutdown protocol.
+
=back
=head2 USER MANAGEMENT
@@ -202,6 +209,22 @@ queue arguments
id of the Erlang process associated with the queue
+=item owner_pid
+
+id of the Erlang process representing the connection which is the
+exclusive owner of the queue, or empty if the queue is non-exclusive
+
+=item exclusive_consumer_pid
+
+id of the Erlang process representing the channel of the exclusive
+consumer subscribed to this queue, or empty if there is no exclusive
+consumer
+
+=item exclusive_consumer_tag
+
+consumer tag of the exclusive consumer subscribed to this queue, or
+empty if there is no exclusive consumer
+
=item messages_ready
number of messages ready to be delivered to clients
@@ -284,7 +307,7 @@ separated by tab characters.
=item list_connections [I<connectioninfoitem> ...]
-List queue information by virtual host. Each line printed describes an
+List current AMQP connections. Each line printed describes a
connection, with the requested I<connectioninfoitem> values separated
by tab characters. If no I<connectioninfoitem>s are specified then
I<user>, I<peer_address>, I<peer_port> and I<state> are assumed.
@@ -295,7 +318,7 @@ I<user>, I<peer_address>, I<peer_port> and I<state> are assumed.
=over
-=item node
+=item pid
id of the Erlang process associated with the connection
@@ -367,10 +390,87 @@ send queue size
=back
-The list_queues, list_exchanges and list_bindings commands accept an
-optional virtual host parameter for which to display results,
-defaulting to I<"/">. The default can be overridden with the B<-p>
-flag.
+=over
+
+=item list_channels [I<channelinfoitem> ...]
+
+List channel information. Each line printed describes a channel, with
+the requested I<channelinfoitem> values separated by tab characters.
+If no I<channelinfoitem>s are specified then I<pid>, I<user>,
+I<transactional>, I<consumer_count>, and I<messages_unacknowledged>
+are assumed.
+
+The list includes channels which are part of ordinary AMQP connections
+(as listed by list_connections) and channels created by various
+plug-ins and other extensions.
+
+=back
+
+=head3 Channel information items
+
+=over
+
+=item pid
+
+id of the Erlang process associated with the channel
+
+=item connection
+
+id of the Erlang process associated with the connection to which the
+channel belongs
+
+=item number
+
+the number of the channel, which uniquely identifies it within a
+connection
+
+=item user
+
+username associated with the channel
+
+=item vhost
+
+virtual host in which the channel operates
+
+=item transactional
+
+true if the channel is in transactional mode, false otherwise
+
+=item consumer_count
+
+number of logical AMQP consumers retrieving messages via the channel
+
+=item messages_unacknowledged
+
+number of messages delivered via this channel but not yet acknowledged
+
+=item acks_uncommitted
+
+number of acknowledgements received in an as yet uncommitted
+transaction
+
+=item prefetch_count
+
+QoS prefetch count limit in force, 0 if unlimited
+
+=back
+
+=item list_consumers
+
+List consumers, i.e. subscriptions to a queue's message stream. Each
+line printed shows, separated by tab characters, the name of the queue
+subscribed to, the id of the channel process via which the
+subscription was created and is managed, the consumer tag which
+uniquely identifies the subscription within a channel, and a boolean
+indicating whether acknowledgements are expected for messages
+delivered to this consumer.
+
+=back
+
+The list_queues, list_exchanges, list_bindings and list_consumers
+commands accept an optional virtual host parameter for which to
+display results, defaulting to I<"/">. The default can be overridden
+with the B<-p> flag.
=head1 OUTPUT ESCAPING
diff --git a/generate_deps b/generate_deps
new file mode 100644
index 0000000000..916006d101
--- /dev/null
+++ b/generate_deps
@@ -0,0 +1,52 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+-mode(compile).
+
+main([IncludeDir, ErlDir, EbinDir, TargetFile]) ->
+ ErlDirContents = filelib:wildcard("*.erl", ErlDir),
+ ErlFiles = [filename:join(ErlDir, FileName) || FileName <- ErlDirContents],
+ Modules = sets:from_list(
+ [list_to_atom(filename:basename(FileName, ".erl")) ||
+ FileName <- ErlDirContents]),
+ Headers = sets:from_list(
+ [filename:join(IncludeDir, FileName) ||
+ FileName <- filelib:wildcard("*.hrl", IncludeDir)]),
+ Deps = lists:foldl(
+ fun (Path, Deps1) ->
+ dict:store(Path, detect_deps(IncludeDir, EbinDir,
+ Modules, Headers, Path),
+ Deps1)
+ end, dict:new(), ErlFiles),
+ {ok, Hdl} = file:open(TargetFile, [write, delayed_write]),
+ dict:fold(
+ fun (_Path, [], ok) ->
+ ok;
+ (Path, Dep, ok) ->
+ Module = filename:basename(Path, ".erl"),
+ ok = file:write(Hdl, [EbinDir, "/", Module, ".beam:"]),
+ ok = sets:fold(fun (E, ok) -> file:write(Hdl, [" ", E]) end,
+ ok, Dep),
+ file:write(Hdl, [" ", ErlDir, "/", Module, ".erl\n"])
+ end, ok, Deps),
+ ok = file:write(Hdl, [TargetFile, ": ", escript:script_name(), "\n"]),
+ ok = file:sync(Hdl),
+ ok = file:close(Hdl).
+
+detect_deps(IncludeDir, EbinDir, Modules, Headers, Path) ->
+ {ok, Forms} = epp:parse_file(Path, [IncludeDir], [{use_specs, true}]),
+ lists:foldl(
+ fun ({attribute, _LineNumber, behaviour, Behaviour}, Deps) ->
+ case sets:is_element(Behaviour, Modules) of
+ true -> sets:add_element(
+ [EbinDir, "/", atom_to_list(Behaviour), ".beam"],
+ Deps);
+ false -> Deps
+ end;
+ ({attribute, _LineNumber, file, {FileName, _LineNumber1}}, Deps) ->
+ case sets:is_element(FileName, Headers) of
+ true -> sets:add_element(FileName, Deps);
+ false -> Deps
+ end;
+ (_Form, Deps) ->
+ Deps
+ end, sets:new(), Forms).
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 9db497fd28..3baf8c1a67 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -128,11 +128,17 @@
properties :: amqp_properties(),
properties_bin :: 'none',
payload_fragments_rev :: [binary()]}).
+-type(unencoded_content() :: undecoded_content()).
-type(decoded_content() ::
#content{class_id :: amqp_class_id(),
properties :: amqp_properties(),
properties_bin :: maybe(binary()),
payload_fragments_rev :: [binary()]}).
+-type(encoded_content() ::
+ #content{class_id :: amqp_class_id(),
+ properties :: maybe(amqp_properties()),
+ properties_bin :: binary(),
+ payload_fragments_rev :: [binary()]}).
-type(content() :: undecoded_content() | decoded_content()).
-type(basic_message() ::
#basic_message{exchange_name :: exchange_name(),
@@ -164,7 +170,7 @@
%%----------------------------------------------------------------------------
--define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2009 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.").
+-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.").
-define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/").
-ifdef(debug).
diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl
index a78c230167..199a0f89c8 100644
--- a/include/rabbit_framing_spec.hrl
+++ b/include/rabbit_framing_spec.hrl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index fa2844fddf..bc5b58cad8 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -34,6 +34,8 @@ prepare:
-e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/sysconfig/rabbitmq|' \
-e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \
SOURCES/rabbitmq-server.init
+ sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \
+ SOURCES/rabbitmq-script-wrapper
cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate
server: prepare
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 62fb1dfbc5..4dd223086f 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -118,6 +118,9 @@ fi
rm -rf %{buildroot}
%changelog
+* Fri Jan 22 2010 Matthew Sackman <matthew@lshift.net> 1.7.1-1
+- New Upstream Release
+
* Mon Oct 5 2009 David Wragg <dpw@lshift.net> 1.7.0-1
- New upstream release
diff --git a/packaging/common/rabbitmq-asroot-script-wrapper b/packaging/common/rabbitmq-asroot-script-wrapper
index ee5947b66c..693a6f0b83 100644
--- a/packaging/common/rabbitmq-asroot-script-wrapper
+++ b/packaging/common/rabbitmq-asroot-script-wrapper
@@ -19,11 +19,11 @@
## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
## Technologies LLC, and Rabbit Technologies Ltd.
##
-## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
## Ltd. Portions created by Cohesive Financial Technologies LLC are
-## Copyright (C) 2007-2009 Cohesive Financial Technologies
+## Copyright (C) 2007-2010 Cohesive Financial Technologies
## LLC. Portions created by Rabbit Technologies Ltd are Copyright
-## (C) 2007-2009 Rabbit Technologies Ltd.
+## (C) 2007-2010 Rabbit Technologies Ltd.
##
## All Rights Reserved.
##
diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper
index dfb714f16e..79096a4e92 100644
--- a/packaging/common/rabbitmq-script-wrapper
+++ b/packaging/common/rabbitmq-script-wrapper
@@ -19,11 +19,11 @@
## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
## Technologies LLC, and Rabbit Technologies Ltd.
##
-## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
## Ltd. Portions created by Cohesive Financial Technologies LLC are
-## Copyright (C) 2007-2009 Cohesive Financial Technologies
+## Copyright (C) 2007-2010 Cohesive Financial Technologies
## LLC. Portions created by Rabbit Technologies Ltd are Copyright
-## (C) 2007-2009 Rabbit Technologies Ltd.
+## (C) 2007-2010 Rabbit Technologies Ltd.
##
## All Rights Reserved.
##
@@ -45,7 +45,7 @@ cd /var/lib/rabbitmq
SCRIPT=`basename $0`
if [ `id -u` = 0 ] ; then
- su rabbitmq -s /bin/sh -c "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}"
+ @SU_RABBITMQ_SH_C@ "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}"
elif [ `id -u` = `id -u rabbitmq` ] ; then
/usr/lib/rabbitmq/bin/${SCRIPT} "$@"
else
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index dafaf9cef4..ab05f73225 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -26,6 +26,8 @@ package: clean
-e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/default/rabbitmq|' \
-e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \
$(UNPACKED_DIR)/debian/rabbitmq-server.init
+ sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \
+ $(UNPACKED_DIR)/debian/rabbitmq-script-wrapper
chmod a+x $(UNPACKED_DIR)/debian/rules
UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR)
cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING)
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index e4cfe7b547..796a301a29 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (1.7.1-1) intrepid; urgency=low
+
+ * New Upstream Release
+
+ -- Matthew Sackman <matthew@lshift.net> Fri, 22 Jan 2010 14:14:29 +0000
+
rabbitmq-server (1.7.0-1) intrepid; urgency=low
* New Upstream Release
diff --git a/packaging/debs/Debian/debian/copyright b/packaging/debs/Debian/debian/copyright
index 69867220f0..a569f31aaa 100755
--- a/packaging/debs/Debian/debian/copyright
+++ b/packaging/debs/Debian/debian/copyright
@@ -5,7 +5,7 @@ It was downloaded from http://www.rabbitmq.com/
The file codegen/amqp-0.8.json is covered by the following terms:
- "Copyright (C) 2008-2009 LShift Ltd, Cohesive Financial Technologies LLC,
+ "Copyright (C) 2008-2010 LShift Ltd, Cohesive Financial Technologies LLC,
and Rabbit Technologies Ltd
Permission is hereby granted, free of charge, to any person
@@ -39,11 +39,11 @@ Authors and Copyright are as described below:
are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
Technologies LLC, and Rabbit Technologies Ltd.
- Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+ Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
Ltd. Portions created by Cohesive Financial Technologies LLC are
- Copyright (C) 2007-2009 Cohesive Financial Technologies
+ Copyright (C) 2007-2010 Cohesive Financial Technologies
LLC. Portions created by Rabbit Technologies Ltd are Copyright
- (C) 2007-2009 Rabbit Technologies Ltd.
+ (C) 2007-2010 Rabbit Technologies Ltd.
MOZILLA PUBLIC LICENSE
@@ -502,11 +502,11 @@ EXHIBIT A -Mozilla Public License.
are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
Technologies LLC, and Rabbit Technologies Ltd.
- Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+ Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
Ltd. Portions created by Cohesive Financial Technologies LLC are
- Copyright (C) 2007-2009 Cohesive Financial Technologies
+ Copyright (C) 2007-2010 Cohesive Financial Technologies
LLC. Portions created by Rabbit Technologies Ltd are Copyright
- (C) 2007-2009 Rabbit Technologies Ltd.
+ (C) 2007-2010 Rabbit Technologies Ltd.
All Rights Reserved.
@@ -524,7 +524,7 @@ EXHIBIT A -Mozilla Public License.
If you have any questions regarding licensing, please contact us at
info@rabbitmq.com.
-The Debian packaging is (C) 2007-2009, Rabbit Technologies Ltd. <info@rabbitmq.com>
+The Debian packaging is (C) 2007-2010, Rabbit Technologies Ltd. <info@rabbitmq.com>
and is licensed under the MPL 1.1, see above.
diff --git a/packaging/macports/Makefile b/packaging/macports/Makefile
new file mode 100644
index 0000000000..d5633955b9
--- /dev/null
+++ b/packaging/macports/Makefile
@@ -0,0 +1,55 @@
+TARBALL_DIR=../../dist
+TARBALL=$(notdir $(wildcard $(TARBALL_DIR)/rabbitmq-server-[0-9.]*.tar.gz))
+COMMON_DIR=../common
+VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g')
+
+# The URL at which things really get deployed
+REAL_WEB_URL=http://www.rabbitmq.com/
+
+# The user@host for an OSX machine with macports installed, which is
+# used to generate the macports index files. That step will be
+# skipped if this variable is not set. If you do set it, you might
+# also want to set SSH_OPTS, which allows adding ssh options, e.g. to
+# specify a key that will get into the OSX machine without a
+# passphrase.
+MACPORTS_USERHOST=
+
+MACPORTS_DIR=macports
+DEST=$(MACPORTS_DIR)/net/rabbitmq-server
+
+all: macports
+
+dirs:
+ mkdir -p $(DEST)/files
+
+$(DEST)/Portfile: Portfile.in
+ for algo in md5 sha1 rmd160 ; do \
+ checksum=$$(openssl $$algo $(TARBALL_DIR)/$(TARBALL) | awk '{print $$NF}') ; \
+ echo "s|@$$algo@|$$checksum|g" ; \
+ done >checksums.sed
+ sed -e "s|@VERSION@|$(VERSION)|g;s|@BASE_URL@|$(REAL_WEB_URL)|g" \
+ -f checksums.sed <$^ >$@
+ rm checksums.sed
+
+macports: dirs $(DEST)/Portfile
+ for f in rabbitmq-asroot-script-wrapper rabbitmq-script-wrapper ; do \
+ cp $(COMMON_DIR)/$$f $(DEST)/files ; \
+ done
+ sed -i -e 's|@SU_RABBITMQ_SH_C@|sudo -E -u rabbitmq -H /bin/sh -c|' \
+ $(DEST)/files/rabbitmq-script-wrapper
+ cp patch-org.macports.rabbitmq-server.plist.diff $(DEST)/files
+ if [ -n "$(MACPORTS_USERHOST)" ] ; then \
+ tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) lshift@macrabbit ' \
+ d="/tmp/mkportindex.$$$$" ; \
+ mkdir $$d \
+ && cd $$d \
+ && tar xf - \
+ && /opt/local/bin/portindex -a -o . >/dev/null \
+ && tar cf - . \
+ && cd \
+ && rm -rf $$d' \
+ | tar xf - -C $(MACPORTS_DIR) ; \
+ fi
+
+clean:
+ rm -rf $(DEST) checksums.sed
diff --git a/packaging/macports/net/rabbitmq-server/Portfile b/packaging/macports/Portfile.in
index 739f99d0e7..e1f582124a 100644
--- a/packaging/macports/net/rabbitmq-server/Portfile
+++ b/packaging/macports/Portfile.in
@@ -3,10 +3,10 @@
PortSystem 1.0
name rabbitmq-server
-version 1.7.0
-revision 0
+version @VERSION@
+revision 1
categories net
-maintainers tonyg@rabbitmq.com
+maintainers rabbitmq.com:tonyg
platforms darwin
description The RabbitMQ AMQP Server
long_description \
@@ -15,13 +15,13 @@ long_description \
robust and scalable implementation of an AMQP broker.
-homepage http://www.rabbitmq.com/
-master_sites http://www.rabbitmq.com/releases/rabbitmq-server/v${version}/
+homepage @BASE_URL@
+master_sites @BASE_URL@releases/rabbitmq-server/v${version}/
checksums \
- md5 4505ca0fd8718439bd6f5e2af2379e56 \
- sha1 84fb86d403057bb808c1b51deee0c1fca3bf7bef \
- rmd160 092f90946825cc3eb277019805e24db637a559f4
+ md5 @md5@ \
+ sha1 @sha1@ \
+ rmd160 @rmd160@
depends_build port:erlang
depends_run port:erlang
diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper
deleted file mode 100644
index c4488dcbe5..0000000000
--- a/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper
+++ /dev/null
@@ -1,12 +0,0 @@
-#!/bin/bash
-cd /var/lib/rabbitmq
-
-SCRIPT=`basename $0`
-
-if [ `id -u` = 0 ] ; then
- /usr/lib/rabbitmq/bin/${SCRIPT} "$@"
-else
- echo -e "\nOnly root should run ${SCRIPT}\n"
- exit 1
-fi
-
diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper
deleted file mode 100644
index 80cb7bd53c..0000000000
--- a/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper
+++ /dev/null
@@ -1,15 +0,0 @@
-#!/bin/bash
-cd /var/lib/rabbitmq
-
-SCRIPT=`basename $0`
-
-if [ `id -u` = 0 ] ; then
- sudo -u rabbitmq -H /usr/lib/rabbitmq/bin/${SCRIPT} "$@"
-elif [ `id -u` = `id -u rabbitmq` ] ; then
- /usr/lib/rabbitmq/bin/${SCRIPT} "$@"
-else
- /usr/lib/rabbitmq/bin/${SCRIPT}
- echo -e "\nOnly root or rabbitmq should run ${SCRIPT}\n"
- exit 1
-fi
-
diff --git a/packaging/macports/net/rabbitmq-server/files/patch-org.macports.rabbitmq-server.plist.diff b/packaging/macports/patch-org.macports.rabbitmq-server.plist.diff
index 45b4949616..45b4949616 100644
--- a/packaging/macports/net/rabbitmq-server/files/patch-org.macports.rabbitmq-server.plist.diff
+++ b/packaging/macports/patch-org.macports.rabbitmq-server.plist.diff
diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile
index f17fe77742..c9e818ac8b 100644
--- a/packaging/windows/Makefile
+++ b/packaging/windows/Makefile
@@ -22,7 +22,7 @@ dist:
mv $(SOURCE_DIR) $(TARGET_DIR)
pod2text --loose rabbitmq-service.pod $(TARGET_DIR)/readme-service.txt
- unix2dos $(TARGET_DIR)/readme-service.txt
+ todos $(TARGET_DIR)/readme-service.txt
zip -r $(TARGET_ZIP).zip $(TARGET_DIR)
rm -rf $(TARGET_DIR)
diff --git a/scripts/rabbitmq-activate-plugins b/scripts/rabbitmq-activate-plugins
index 5ce64c686c..00ee6c61c8 100755
--- a/scripts/rabbitmq-activate-plugins
+++ b/scripts/rabbitmq-activate-plugins
@@ -19,11 +19,11 @@
## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
## Technologies LLC, and Rabbit Technologies Ltd.
##
-## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
## Ltd. Portions created by Cohesive Financial Technologies LLC are
-## Copyright (C) 2007-2009 Cohesive Financial Technologies
+## Copyright (C) 2007-2010 Cohesive Financial Technologies
## LLC. Portions created by Rabbit Technologies Ltd are Copyright
-## (C) 2007-2009 Rabbit Technologies Ltd.
+## (C) 2007-2010 Rabbit Technologies Ltd.
##
## All Rights Reserved.
##
diff --git a/scripts/rabbitmq-activate-plugins.bat b/scripts/rabbitmq-activate-plugins.bat
index e7aa709544..3c9a057c95 100644
--- a/scripts/rabbitmq-activate-plugins.bat
+++ b/scripts/rabbitmq-activate-plugins.bat
@@ -19,11 +19,11 @@ REM Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
REM are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
REM Technologies LLC, and Rabbit Technologies Ltd.
REM
-REM Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+REM Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
REM Ltd. Portions created by Cohesive Financial Technologies LLC are
-REM Copyright (C) 2007-2009 Cohesive Financial Technologies
+REM Copyright (C) 2007-2010 Cohesive Financial Technologies
REM LLC. Portions created by Rabbit Technologies Ltd are Copyright
-REM (C) 2007-2009 Rabbit Technologies Ltd.
+REM (C) 2007-2010 Rabbit Technologies Ltd.
REM
REM All Rights Reserved.
REM
@@ -32,7 +32,13 @@ REM
setlocal
-if not exist "%ERLANG_HOME%\bin\erl.exe" (
+rem Preserve values that might contain exclamation marks before
+rem enabling delayed expansion
+set TDP0=%~dp0
+set STAR=%*
+setlocal enabledelayedexpansion
+
+if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
echo ERLANG_HOME not set correctly.
@@ -44,17 +50,18 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" (
exit /B
)
-set RABBITMQ_PLUGINS_DIR=%~dp0..\plugins
-set RABBITMQ_PLUGINS_EXPAND_DIR=%~dp0..\priv\plugins
-set RABBITMQ_EBIN_DIR=%~dp0..\ebin
+set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+set RABBITMQ_PLUGINS_EXPAND_DIR=!TDP0!..\priv\plugins
+set RABBITMQ_EBIN_DIR=!TDP0!..\ebin
-"%ERLANG_HOME%\bin\erl.exe" ^
--pa "%RABBITMQ_EBIN_DIR%" ^
+"!ERLANG_HOME!\bin\erl.exe" ^
+-pa "!RABBITMQ_EBIN_DIR!" ^
-noinput -hidden ^
-s rabbit_plugin_activator ^
--rabbit plugins_dir \""%RABBITMQ_PLUGINS_DIR:\=/%"\" ^
--rabbit plugins_expand_dir \""%RABBITMQ_PLUGINS_EXPAND_DIR:\=/%"\" ^
--rabbit rabbit_ebin \""%RABBITMQ_EBIN_DIR:\=/%"\" ^
--extra %*
+-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
+-rabbit rabbit_ebin \""!RABBITMQ_EBIN_DIR:\=/!"\" ^
+-extra !STAR!
endlocal
+endlocal
diff --git a/scripts/rabbitmq-deactivate-plugins b/scripts/rabbitmq-deactivate-plugins
index 771c473496..3fd71bfacd 100755
--- a/scripts/rabbitmq-deactivate-plugins
+++ b/scripts/rabbitmq-deactivate-plugins
@@ -19,11 +19,11 @@
## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
## Technologies LLC, and Rabbit Technologies Ltd.
##
-## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
## Ltd. Portions created by Cohesive Financial Technologies LLC are
-## Copyright (C) 2007-2009 Cohesive Financial Technologies
+## Copyright (C) 2007-2010 Cohesive Financial Technologies
## LLC. Portions created by Rabbit Technologies Ltd are Copyright
-## (C) 2007-2009 Rabbit Technologies Ltd.
+## (C) 2007-2010 Rabbit Technologies Ltd.
##
## All Rights Reserved.
##
diff --git a/scripts/rabbitmq-deactivate-plugins.bat b/scripts/rabbitmq-deactivate-plugins.bat
index 40155183a1..1bc3f88efd 100644
--- a/scripts/rabbitmq-deactivate-plugins.bat
+++ b/scripts/rabbitmq-deactivate-plugins.bat
@@ -19,11 +19,11 @@ REM Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
REM are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
REM Technologies LLC, and Rabbit Technologies Ltd.
REM
-REM Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+REM Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
REM Ltd. Portions created by Cohesive Financial Technologies LLC are
-REM Copyright (C) 2007-2009 Cohesive Financial Technologies
+REM Copyright (C) 2007-2010 Cohesive Financial Technologies
REM LLC. Portions created by Rabbit Technologies Ltd are Copyright
-REM (C) 2007-2009 Rabbit Technologies Ltd.
+REM (C) 2007-2010 Rabbit Technologies Ltd.
REM
REM All Rights Reserved.
REM
@@ -32,8 +32,14 @@ REM
setlocal
-set RABBITMQ_EBIN_DIR=%~dp0..\ebin
+rem Preserve values that might contain exclamation marks before
+rem enabling delayed expansion
+set TDP0=%~dp0
+setlocal enabledelayedexpansion
-del /f "%RABBITMQ_EBIN_DIR%"\rabbit.rel "%RABBITMQ_EBIN_DIR%"\rabbit.script "%RABBITMQ_EBIN_DIR%"\rabbit.boot
+set RABBITMQ_EBIN_DIR=!TDP0!..\ebin
+del /f "!RABBITMQ_EBIN_DIR!"\rabbit.rel "!RABBITMQ_EBIN_DIR!"\rabbit.script "!RABBITMQ_EBIN_DIR!"\rabbit.boot
+
+endlocal
endlocal
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index 69ddbcfed1..36734874e7 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -19,11 +19,11 @@
## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
## Technologies LLC, and Rabbit Technologies Ltd.
##
-## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
## Ltd. Portions created by Cohesive Financial Technologies LLC are
-## Copyright (C) 2007-2009 Cohesive Financial Technologies
+## Copyright (C) 2007-2010 Cohesive Financial Technologies
## LLC. Portions created by Rabbit Technologies Ltd are Copyright
-## (C) 2007-2009 Rabbit Technologies Ltd.
+## (C) 2007-2010 Rabbit Technologies Ltd.
##
## All Rights Reserved.
##
diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi
index 1a7eb97e08..8341d35c8c 100755
--- a/scripts/rabbitmq-multi
+++ b/scripts/rabbitmq-multi
@@ -19,19 +19,17 @@
## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
## Technologies LLC, and Rabbit Technologies Ltd.
##
-## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
## Ltd. Portions created by Cohesive Financial Technologies LLC are
-## Copyright (C) 2007-2009 Cohesive Financial Technologies
+## Copyright (C) 2007-2010 Cohesive Financial Technologies
## LLC. Portions created by Rabbit Technologies Ltd are Copyright
-## (C) 2007-2009 Rabbit Technologies Ltd.
+## (C) 2007-2010 Rabbit Technologies Ltd.
##
## All Rights Reserved.
##
## Contributor(s): ______________________________________.
##
NODENAME=rabbit
-NODE_IP_ADDRESS=0.0.0.0
-NODE_PORT=5672
SCRIPT_HOME=$(dirname $0)
PIDS_FILE=/var/lib/rabbitmq/pids
MULTI_ERL_ARGS=
@@ -40,14 +38,18 @@ CONFIG_FILE=/etc/rabbitmq/rabbitmq
. `dirname $0`/rabbitmq-env
+DEFAULT_NODE_IP_ADDRESS=0.0.0.0
+DEFAULT_NODE_PORT=5672
+[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
+[ "x" = "x$RABBITMQ_NODE_PORT" ] && [ "x" != "x$NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ]
then
if [ "x" != "x$RABBITMQ_NODE_PORT" ]
- then RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
+ then RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS}
fi
else
if [ "x" = "x$RABBITMQ_NODE_PORT" ]
- then RABBITMQ_NODE_PORT=${NODE_PORT}
+ then RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT}
fi
fi
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat
index 6dda13af37..a4b7f2e99b 100755..100644
--- a/scripts/rabbitmq-multi.bat
+++ b/scripts/rabbitmq-multi.bat
@@ -19,11 +19,11 @@ REM Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
REM are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
REM Technologies LLC, and Rabbit Technologies Ltd.
REM
-REM Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+REM Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
REM Ltd. Portions created by Cohesive Financial Technologies LLC are
-REM Copyright (C) 2007-2009 Cohesive Financial Technologies
+REM Copyright (C) 2007-2010 Cohesive Financial Technologies
REM LLC. Portions created by Rabbit Technologies Ltd are Copyright
-REM (C) 2007-2009 Rabbit Technologies Ltd.
+REM (C) 2007-2010 Rabbit Technologies Ltd.
REM
REM All Rights Reserved.
REM
@@ -32,38 +32,44 @@ REM
setlocal
-if "%RABBITMQ_BASE%"=="" (
- set RABBITMQ_BASE=%APPDATA%\RabbitMQ
+rem Preserve values that might contain exclamation marks before
+rem enabling delayed expansion
+set TDP0=%~dp0
+set STAR=%*
+setlocal enabledelayedexpansion
+
+if "!RABBITMQ_BASE!"=="" (
+ set RABBITMQ_BASE=!APPDATA!\RabbitMQ
)
-if "%RABBITMQ_NODENAME%"=="" (
+if "!RABBITMQ_NODENAME!"=="" (
set RABBITMQ_NODENAME=rabbit
)
-if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- if not "%RABBITMQ_NODE_PORT%"=="" (
+if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
+ if not "!RABBITMQ_NODE_PORT!"=="" (
set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
)
) else (
- if "%RABBITMQ_NODE_PORT%"=="" (
+ if "!RABBITMQ_NODE_PORT!"=="" (
set RABBITMQ_NODE_PORT=5672
)
)
-set RABBITMQ_PIDS_FILE=%RABBITMQ_BASE%\rabbitmq.pids
-set RABBITMQ_SCRIPT_HOME=%~sdp0%
+set RABBITMQ_PIDS_FILE=!RABBITMQ_BASE!\rabbitmq.pids
+set RABBITMQ_SCRIPT_HOME=!TDP0!
-if "%RABBITMQ_CONFIG_FILE%"=="" (
- set RABBITMQ_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq
+if "!RABBITMQ_CONFIG_FILE!"=="" (
+ set RABBITMQ_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq
)
-if exist "%RABBITMQ_CONFIG_FILE%.config" (
- set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
+if exist "!RABBITMQ_CONFIG_FILE!.config" (
+ set RABBITMQ_CONFIG_ARG=-config "!RABBITMQ_CONFIG_FILE!"
) else (
set RABBITMQ_CONFIG_ARG=
)
-if not exist "%ERLANG_HOME%\bin\erl.exe" (
+if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
echo ERLANG_HOME not set correctly.
@@ -75,14 +81,15 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" (
exit /B
)
-"%ERLANG_HOME%\bin\erl.exe" ^
--pa "%~dp0..\ebin" ^
+"!ERLANG_HOME!\bin\erl.exe" ^
+-pa "!TDP0!..\ebin" ^
-noinput -hidden ^
-%RABBITMQ_MULTI_ERL_ARGS% ^
+!RABBITMQ_MULTI_ERL_ARGS! ^
-sname rabbitmq_multi ^
-%RABBITMQ_CONFIG_ARG% ^
+!RABBITMQ_CONFIG_ARG! ^
-s rabbit_multi ^
-%RABBITMQ_MULTI_START_ARGS% ^
--extra %*
+!RABBITMQ_MULTI_START_ARGS! ^
+-extra !STAR!
endlocal
+endlocal
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 7f08cd9d75..638498c1e2 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -19,11 +19,11 @@
## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
## Technologies LLC, and Rabbit Technologies Ltd.
##
-## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
## Ltd. Portions created by Cohesive Financial Technologies LLC are
-## Copyright (C) 2007-2009 Cohesive Financial Technologies
+## Copyright (C) 2007-2010 Cohesive Financial Technologies
## LLC. Portions created by Rabbit Technologies Ltd are Copyright
-## (C) 2007-2009 Rabbit Technologies Ltd.
+## (C) 2007-2010 Rabbit Technologies Ltd.
##
## All Rights Reserved.
##
@@ -31,10 +31,8 @@
##
NODENAME=rabbit
-NODE_IP_ADDRESS=0.0.0.0
-NODE_PORT=5672
SERVER_ERL_ARGS="+K true +A30 \
--kernel inet_default_listen_options [{nodelay,true},{sndbuf,16384},{recbuf,4096}] \
+-kernel inet_default_listen_options [{nodelay,true}] \
-kernel inet_default_connect_options [{nodelay,true}]"
CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config
CONFIG_FILE=/etc/rabbitmq/rabbitmq
@@ -44,14 +42,18 @@ SERVER_START_ARGS=
. `dirname $0`/rabbitmq-env
+DEFAULT_NODE_IP_ADDRESS=0.0.0.0
+DEFAULT_NODE_PORT=5672
+[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
+[ "x" = "x$RABBITMQ_NODE_PORT" ] && [ "x" != "x$NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ]
then
if [ "x" != "x$RABBITMQ_NODE_PORT" ]
- then RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
+ then RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS}
fi
else
if [ "x" = "x$RABBITMQ_NODE_PORT" ]
- then RABBITMQ_NODE_PORT=${NODE_PORT}
+ then RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT}
fi
fi
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 5110285128..28eb8ebb8d 100755..100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -19,11 +19,11 @@ REM Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
REM are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
REM Technologies LLC, and Rabbit Technologies Ltd.
REM
-REM Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+REM Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
REM Ltd. Portions created by Cohesive Financial Technologies LLC are
-REM Copyright (C) 2007-2009 Cohesive Financial Technologies
+REM Copyright (C) 2007-2010 Cohesive Financial Technologies
REM LLC. Portions created by Rabbit Technologies Ltd are Copyright
-REM (C) 2007-2009 Rabbit Technologies Ltd.
+REM (C) 2007-2010 Rabbit Technologies Ltd.
REM
REM All Rights Reserved.
REM
@@ -32,25 +32,31 @@ REM
setlocal
-if "%RABBITMQ_BASE%"=="" (
- set RABBITMQ_BASE=%APPDATA%\RabbitMQ
+rem Preserve values that might contain exclamation marks before
+rem enabling delayed expansion
+set TDP0=%~dp0
+set STAR=%*
+setlocal enabledelayedexpansion
+
+if "!RABBITMQ_BASE!"=="" (
+ set RABBITMQ_BASE=!APPDATA!\RabbitMQ
)
-if "%RABBITMQ_NODENAME%"=="" (
+if "!RABBITMQ_NODENAME!"=="" (
set RABBITMQ_NODENAME=rabbit
)
-if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- if not "%RABBITMQ_NODE_PORT%"=="" (
+if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
+ if not "!RABBITMQ_NODE_PORT!"=="" (
set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
)
) else (
- if "%RABBITMQ_NODE_PORT%"=="" (
+ if "!RABBITMQ_NODE_PORT!"=="" (
set RABBITMQ_NODE_PORT=5672
)
)
-if not exist "%ERLANG_HOME%\bin\erl.exe" (
+if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
echo ERLANG_HOME not set correctly.
@@ -62,13 +68,13 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" (
exit /B
)
-set RABBITMQ_BASE_UNIX=%RABBITMQ_BASE:\=/%
+set RABBITMQ_BASE_UNIX=!RABBITMQ_BASE:\=/!
-if "%RABBITMQ_MNESIA_BASE%"=="" (
- set RABBITMQ_MNESIA_BASE=%RABBITMQ_BASE_UNIX%/db
+if "!RABBITMQ_MNESIA_BASE!"=="" (
+ set RABBITMQ_MNESIA_BASE=!RABBITMQ_BASE_UNIX!/db
)
-if "%RABBITMQ_LOG_BASE%"=="" (
- set RABBITMQ_LOG_BASE=%RABBITMQ_BASE_UNIX%/log
+if "!RABBITMQ_LOG_BASE!"=="" (
+ set RABBITMQ_LOG_BASE=!RABBITMQ_BASE_UNIX!/log
)
@@ -77,81 +83,82 @@ rem Log management (rotation, filtering based of size...) is left as an exercice
set BACKUP_EXTENSION=.1
-set LOGS=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log
-set SASL_LOGS=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log
+set LOGS=!RABBITMQ_BASE!\log\!RABBITMQ_NODENAME!.log
+set SASL_LOGS=!RABBITMQ_BASE!\log\!RABBITMQ_NODENAME!-sasl.log
-set LOGS_BACKUP=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log%BACKUP_EXTENSION%
-set SASL_LOGS_BACKUP=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log%BACKUP_EXTENSION%
+set LOGS_BACKUP=!RABBITMQ_BASE!\log\!RABBITMQ_NODENAME!.log!BACKUP_EXTENSION!
+set SASL_LOGS_BACKUP=!RABBITMQ_BASE!\log\!RABBITMQ_NODENAME!-sasl.log!BACKUP_EXTENSION!
-if exist "%LOGS%" (
- type "%LOGS%" >> "%LOGS_BACKUP%"
+if exist "!LOGS!" (
+ type "!LOGS!" >> "!LOGS_BACKUP!"
)
-if exist "%SASL_LOGS%" (
- type "%SASL_LOGS%" >> "%SASL_LOGS_BACKUP%"
+if exist "!SASL_LOGS!" (
+ type "!SASL_LOGS!" >> "!SASL_LOGS_BACKUP!"
)
rem End of log management
-if "%RABBITMQ_CLUSTER_CONFIG_FILE%"=="" (
- set RABBITMQ_CLUSTER_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq_cluster.config
+if "!RABBITMQ_CLUSTER_CONFIG_FILE!"=="" (
+ set RABBITMQ_CLUSTER_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq_cluster.config
)
set CLUSTER_CONFIG=
-if not exist "%RABBITMQ_CLUSTER_CONFIG_FILE%" GOTO L1
-set CLUSTER_CONFIG=-rabbit cluster_config \""%RABBITMQ_CLUSTER_CONFIG_FILE:\=/%"\"
+if not exist "!RABBITMQ_CLUSTER_CONFIG_FILE!" GOTO L1
+set CLUSTER_CONFIG=-rabbit cluster_config \""!RABBITMQ_CLUSTER_CONFIG_FILE:\=/!"\"
:L1
-if "%RABBITMQ_MNESIA_DIR%"=="" (
- set RABBITMQ_MNESIA_DIR=%RABBITMQ_MNESIA_BASE%/%RABBITMQ_NODENAME%-mnesia
+if "!RABBITMQ_MNESIA_DIR!"=="" (
+ set RABBITMQ_MNESIA_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-mnesia
)
-set RABBITMQ_EBIN_ROOT=%~dp0..\ebin
-if exist "%RABBITMQ_EBIN_ROOT%\rabbit.boot" (
- echo Using Custom Boot File "%RABBITMQ_EBIN_ROOT%\rabbit.boot"
- set RABBITMQ_BOOT_FILE=%RABBITMQ_EBIN_ROOT%\rabbit
+set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
+if exist "!RABBITMQ_EBIN_ROOT!\rabbit.boot" (
+ echo Using Custom Boot File "!RABBITMQ_EBIN_ROOT!\rabbit.boot"
+ set RABBITMQ_BOOT_FILE=!RABBITMQ_EBIN_ROOT!\rabbit
set RABBITMQ_EBIN_PATH=
) else (
set RABBITMQ_BOOT_FILE=start_sasl
- set RABBITMQ_EBIN_PATH=-pa "%RABBITMQ_EBIN_ROOT%"
+ set RABBITMQ_EBIN_PATH=-pa "!RABBITMQ_EBIN_ROOT!"
)
-if "%RABBITMQ_CONFIG_FILE%"=="" (
- set RABBITMQ_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq
+if "!RABBITMQ_CONFIG_FILE!"=="" (
+ set RABBITMQ_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq
)
-if exist "%RABBITMQ_CONFIG_FILE%.config" (
- set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
+if exist "!RABBITMQ_CONFIG_FILE!.config" (
+ set RABBITMQ_CONFIG_ARG=-config "!RABBITMQ_CONFIG_FILE!"
) else (
set RABBITMQ_CONFIG_ARG=
)
set RABBITMQ_LISTEN_ARG=
-if not "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- if not "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_LISTEN_ARG=-rabbit tcp_listeners [{\""%RABBITMQ_NODE_IP_ADDRESS%"\","%RABBITMQ_NODE_PORT%"}]
+if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
+ if not "!RABBITMQ_NODE_PORT!"=="" (
+ set RABBITMQ_LISTEN_ARG=-rabbit tcp_listeners [{\""!RABBITMQ_NODE_IP_ADDRESS!"\","!RABBITMQ_NODE_PORT!"}]
)
)
-"%ERLANG_HOME%\bin\erl.exe" ^
-%RABBITMQ_EBIN_PATH% ^
+"!ERLANG_HOME!\bin\erl.exe" ^
+!RABBITMQ_EBIN_PATH! ^
-noinput ^
--boot "%RABBITMQ_BOOT_FILE%" ^
-%RABBITMQ_CONFIG_ARG% ^
--sname %RABBITMQ_NODENAME% ^
+-boot "!RABBITMQ_BOOT_FILE!" ^
+!RABBITMQ_CONFIG_ARG! ^
+-sname !RABBITMQ_NODENAME! ^
-s rabbit ^
+W w ^
+A30 ^
--kernel inet_default_listen_options "[{nodelay, true}, {sndbuf, 16384}, {recbuf, 4096}]" ^
+-kernel inet_default_listen_options "[{nodelay, true}]" ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^
-%RABBITMQ_LISTEN_ARG% ^
--kernel error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%.log"\"} ^
-%RABBITMQ_SERVER_ERL_ARGS% ^
+!RABBITMQ_LISTEN_ARG! ^
+-kernel error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!.log"\"} ^
+!RABBITMQ_SERVER_ERL_ARGS! ^
-sasl errlog_type error ^
--sasl sasl_error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%-sasl.log"\"} ^
+-sasl sasl_error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!-sasl.log"\"} ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
--mnesia dir \""%RABBITMQ_MNESIA_DIR%"\" ^
-%CLUSTER_CONFIG% ^
-%RABBITMQ_SERVER_START_ARGS% ^
-%*
+-mnesia dir \""!RABBITMQ_MNESIA_DIR!"\" ^
+!CLUSTER_CONFIG! ^
+!RABBITMQ_SERVER_START_ARGS! ^
+!STAR!
endlocal
+endlocal
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index d960d29dea..a4021fd6a1 100755..100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -19,11 +19,11 @@ REM Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
REM are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
REM Technologies LLC, and Rabbit Technologies Ltd.
REM
-REM Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+REM Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
REM Ltd. Portions created by Cohesive Financial Technologies LLC are
-REM Copyright (C) 2007-2009 Cohesive Financial Technologies
+REM Copyright (C) 2007-2010 Cohesive Financial Technologies
REM LLC. Portions created by Rabbit Technologies Ltd are Copyright
-REM (C) 2007-2009 Rabbit Technologies Ltd.
+REM (C) 2007-2010 Rabbit Technologies Ltd.
REM
REM All Rights Reserved.
REM
@@ -32,61 +32,82 @@ REM
setlocal
-if "%RABBITMQ_SERVICENAME%"=="" (
+rem Preserve values that might contain exclamation marks before
+rem enabling delayed expansion
+set TN0=%~n0
+set TDP0=%~dp0
+set P1=%1
+set STAR=%*
+setlocal enabledelayedexpansion
+
+if "!RABBITMQ_SERVICENAME!"=="" (
set RABBITMQ_SERVICENAME=RabbitMQ
)
-if "%RABBITMQ_BASE%"=="" (
- set RABBITMQ_BASE=%APPDATA%\%RABBITMQ_SERVICENAME%
+if "!RABBITMQ_BASE!"=="" (
+ set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
-if "%RABBITMQ_NODENAME%"=="" (
+if "!RABBITMQ_NODENAME!"=="" (
set RABBITMQ_NODENAME=rabbit
)
-if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- if not "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
- )
+if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
+ if not "!RABBITMQ_NODE_PORT!"=="" (
+ set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ )
) else (
- if "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_PORT=5672
- )
-)
-
-if "%ERLANG_SERVICE_MANAGER_PATH%"=="" (
- set ERLANG_SERVICE_MANAGER_PATH=C:\Program Files\erl5.6.5\erts-5.6.5\bin
+ if "!RABBITMQ_NODE_PORT!"=="" (
+ set RABBITMQ_NODE_PORT=5672
+ )
+)
+
+if "!ERLANG_SERVICE_MANAGER_PATH!"=="" (
+ if not exist "!ERLANG_HOME!\bin\erl.exe" (
+ echo.
+ echo ******************************
+ echo ERLANG_HOME not set correctly.
+ echo ******************************
+ echo.
+ echo Please either set ERLANG_HOME to point to your Erlang installation or place the
+ echo RabbitMQ server distribution in the Erlang lib folder.
+ echo.
+ exit /B
+ )
+ for /f "delims=" %%i in ('dir /ad/b "!ERLANG_HOME!"') do if exist "!ERLANG_HOME!\%%i\bin\erlsrv.exe" (
+ set ERLANG_SERVICE_MANAGER_PATH=!ERLANG_HOME!\%%i\bin
+ )
)
set CONSOLE_FLAG=
set CONSOLE_LOG_VALID=
-for %%i in (new reuse) do if "%%i" == "%RABBITMQ_CONSOLE_LOG%" set CONSOLE_LOG_VALID=TRUE
-if "%CONSOLE_LOG_VALID%" == "TRUE" (
- set CONSOLE_FLAG=-debugtype %RABBITMQ_CONSOLE_LOG%
+for %%i in (new reuse) do if "%%i" == "!RABBITMQ_CONSOLE_LOG!" set CONSOLE_LOG_VALID=TRUE
+if "!CONSOLE_LOG_VALID!" == "TRUE" (
+ set CONSOLE_FLAG=-debugtype !RABBITMQ_CONSOLE_LOG!
)
rem *** End of configuration ***
-if not exist "%ERLANG_SERVICE_MANAGER_PATH%\erlsrv.exe" (
+if not exist "!ERLANG_SERVICE_MANAGER_PATH!\erlsrv.exe" (
echo.
echo **********************************************
echo ERLANG_SERVICE_MANAGER_PATH not set correctly.
echo **********************************************
echo.
- echo "%ERLANG_SERVICE_MANAGER_PATH%\erlsrv.exe" not found!
+ echo "!ERLANG_SERVICE_MANAGER_PATH!\erlsrv.exe" not found
echo Please set ERLANG_SERVICE_MANAGER_PATH to the folder containing "erlsrv.exe".
echo.
exit /B 1
)
rem erlang prefers forwardslash as separator in paths
-set RABBITMQ_BASE_UNIX=%RABBITMQ_BASE:\=/%
+set RABBITMQ_BASE_UNIX=!RABBITMQ_BASE:\=/!
-if "%RABBITMQ_MNESIA_BASE%"=="" (
- set RABBITMQ_MNESIA_BASE=%RABBITMQ_BASE_UNIX%/db
+if "!RABBITMQ_MNESIA_BASE!"=="" (
+ set RABBITMQ_MNESIA_BASE=!RABBITMQ_BASE_UNIX!/db
)
-if "%RABBITMQ_LOG_BASE%"=="" (
- set RABBITMQ_LOG_BASE=%RABBITMQ_BASE_UNIX%/log
+if "!RABBITMQ_LOG_BASE!"=="" (
+ set RABBITMQ_LOG_BASE=!RABBITMQ_BASE_UNIX!/log
)
@@ -95,139 +116,140 @@ rem Log management (rotation, filtering based on size...) is left as an exercise
set BACKUP_EXTENSION=.1
-set LOGS=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log
-set SASL_LOGS=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log
+set LOGS=!RABBITMQ_BASE!\log\!RABBITMQ_NODENAME!.log
+set SASL_LOGS=!RABBITMQ_BASE!\log\!RABBITMQ_NODENAME!-sasl.log
-set LOGS_BACKUP=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log%BACKUP_EXTENSION%
-set SASL_LOGS_BACKUP=%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log%BACKUP_EXTENSION%
+set LOGS_BACKUP=!RABBITMQ_BASE!\log\!RABBITMQ_NODENAME!.log!BACKUP_EXTENSION!
+set SASL_LOGS_BACKUP=!RABBITMQ_BASE!\log\!RABBITMQ_NODENAME!-sasl.log!BACKUP_EXTENSION!
-if exist "%LOGS%" (
- type "%LOGS%" >> "%LOGS_BACKUP%"
+if exist "!LOGS!" (
+ type "!LOGS!" >> "!LOGS_BACKUP!"
)
-if exist "%SASL_LOGS%" (
- type "%SASL_LOGS%" >> "%SASL_LOGS_BACKUP%"
+if exist "!SASL_LOGS!" (
+ type "!SASL_LOGS!" >> "!SASL_LOGS_BACKUP!"
)
rem End of log management
-if "%RABBITMQ_CLUSTER_CONFIG_FILE%"=="" (
- set RABBITMQ_CLUSTER_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq_cluster.config
+if "!RABBITMQ_CLUSTER_CONFIG_FILE!"=="" (
+ set RABBITMQ_CLUSTER_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq_cluster.config
)
set CLUSTER_CONFIG=
-if not exist "%RABBITMQ_CLUSTER_CONFIG_FILE%" GOTO L1
-set CLUSTER_CONFIG=-rabbit cluster_config \""%RABBITMQ_CLUSTER_CONFIG_FILE:\=/%"\"
+if not exist "!RABBITMQ_CLUSTER_CONFIG_FILE!" GOTO L1
+set CLUSTER_CONFIG=-rabbit cluster_config \""!RABBITMQ_CLUSTER_CONFIG_FILE:\=/!"\"
:L1
-if "%RABBITMQ_MNESIA_DIR%"=="" (
- set RABBITMQ_MNESIA_DIR=%RABBITMQ_MNESIA_BASE%/%RABBITMQ_NODENAME%-mnesia
+if "!RABBITMQ_MNESIA_DIR!"=="" (
+ set RABBITMQ_MNESIA_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-mnesia
)
-if "%1" == "install" goto INSTALL_SERVICE
-for %%i in (start stop disable enable list remove) do if "%%i" == "%1" goto MODIFY_SERVICE
+if "!P1!" == "install" goto INSTALL_SERVICE
+for %%i in (start stop disable enable list remove) do if "%%i" == "!P1!" goto MODIFY_SERVICE
echo.
echo *********************
echo Service control usage
echo *********************
echo.
-echo %~n0 help - Display this help
-echo %~n0 install - Install the %RABBITMQ_SERVICENAME% service
-echo %~n0 remove - Remove the %RABBITMQ_SERVICENAME% service
+echo !TN0! help - Display this help
+echo !TN0! install - Install the !RABBITMQ_SERVICENAME! service
+echo !TN0! remove - Remove the !RABBITMQ_SERVICENAME! service
echo.
echo The following actions can also be accomplished by using
echo Windows Services Management Console (services.msc):
echo.
-echo %~n0 start - Start the %RABBITMQ_SERVICENAME% service
-echo %~n0 stop - Stop the %RABBITMQ_SERVICENAME% service
-echo %~n0 disable - Disable the %RABBITMQ_SERVICENAME% service
-echo %~n0 enable - Enable the %RABBITMQ_SERVICENAME% service
+echo !TN0! start - Start the !RABBITMQ_SERVICENAME! service
+echo !TN0! stop - Stop the !RABBITMQ_SERVICENAME! service
+echo !TN0! disable - Disable the !RABBITMQ_SERVICENAME! service
+echo !TN0! enable - Enable the !RABBITMQ_SERVICENAME! service
echo.
exit /B
:INSTALL_SERVICE
-if not exist "%RABBITMQ_BASE%" (
- echo Creating base directory %RABBITMQ_BASE% & md "%RABBITMQ_BASE%"
+if not exist "!RABBITMQ_BASE!" (
+ echo Creating base directory !RABBITMQ_BASE! & md "!RABBITMQ_BASE!"
)
-"%ERLANG_SERVICE_MANAGER_PATH%\erlsrv" list %RABBITMQ_SERVICENAME% 2>NUL 1>NUL
+"!ERLANG_SERVICE_MANAGER_PATH!\erlsrv" list !RABBITMQ_SERVICENAME! 2>NUL 1>NUL
if errorlevel 1 (
- "%ERLANG_SERVICE_MANAGER_PATH%\erlsrv" add %RABBITMQ_SERVICENAME%
+ "!ERLANG_SERVICE_MANAGER_PATH!\erlsrv" add !RABBITMQ_SERVICENAME!
) else (
- echo %RABBITMQ_SERVICENAME% service is already present - only updating service parameters
+ echo !RABBITMQ_SERVICENAME! service is already present - only updating service parameters
)
-set RABBITMQ_EBIN_ROOT=%~dp0..\ebin
-if exist "%RABBITMQ_EBIN_ROOT%\rabbit.boot" (
- echo Using Custom Boot File "%RABBITMQ_EBIN_ROOT%\rabbit.boot"
- set RABBITMQ_BOOT_FILE=%RABBITMQ_EBIN_ROOT%\rabbit
+set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
+if exist "!RABBITMQ_EBIN_ROOT!\rabbit.boot" (
+ echo Using Custom Boot File "!RABBITMQ_EBIN_ROOT!\rabbit.boot"
+ set RABBITMQ_BOOT_FILE=!RABBITMQ_EBIN_ROOT!\rabbit
set RABBITMQ_EBIN_PATH=
) else (
set RABBITMQ_BOOT_FILE=start_sasl
- set RABBITMQ_EBIN_PATH=-pa "%RABBITMQ_EBIN_ROOT%"
+ set RABBITMQ_EBIN_PATH=-pa "!RABBITMQ_EBIN_ROOT!"
)
-if "%RABBITMQ_CONFIG_FILE%"=="" (
- set RABBITMQ_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq
+if "!RABBITMQ_CONFIG_FILE!"=="" (
+ set RABBITMQ_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq
)
-if exist "%RABBITMQ_CONFIG_FILE%.config" (
- set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
+if exist "!RABBITMQ_CONFIG_FILE!.config" (
+ set RABBITMQ_CONFIG_ARG=-config "!RABBITMQ_CONFIG_FILE!"
) else (
set RABBITMQ_CONFIG_ARG=
)
set RABBITMQ_LISTEN_ARG=
-if not "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- if not "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_LISTEN_ARG=-rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\", %RABBITMQ_NODE_PORT%}]"
+if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
+ if not "!RABBITMQ_NODE_PORT!"=="" (
+ set RABBITMQ_LISTEN_ARG=-rabbit tcp_listeners "[{\"!RABBITMQ_NODE_IP_ADDRESS!\", !RABBITMQ_NODE_PORT!}]"
)
)
set ERLANG_SERVICE_ARGUMENTS= ^
-%RABBITMQ_EBIN_PATH% ^
--boot "%RABBITMQ_BOOT_FILE%" ^
-%RABBITMQ_CONFIG_ARG% ^
+!RABBITMQ_EBIN_PATH! ^
+-boot "!RABBITMQ_BOOT_FILE!" ^
+!RABBITMQ_CONFIG_ARG! ^
-s rabbit ^
+W w ^
+A30 ^
--kernel inet_default_listen_options "[{nodelay,true},{sndbuf,16384},{recbuf,4096}]" ^
+-kernel inet_default_listen_options "[{nodelay,true}]" ^
-kernel inet_default_connect_options "[{nodelay,true}]" ^
-%RABBITMQ_LISTEN_ARG% ^
--kernel error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%.log"\"} ^
-%RABBITMQ_SERVER_ERL_ARGS% ^
+!RABBITMQ_LISTEN_ARG! ^
+-kernel error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!.log"\"} ^
+!RABBITMQ_SERVER_ERL_ARGS! ^
-sasl errlog_type error ^
--sasl sasl_error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%-sasl.log"\"} ^
+-sasl sasl_error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!-sasl.log"\"} ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
--mnesia dir \""%RABBITMQ_MNESIA_DIR%"\" ^
-%CLUSTER_CONFIG% ^
-%RABBITMQ_SERVER_START_ARGS% ^
-%*
-
-set ERLANG_SERVICE_ARGUMENTS=%ERLANG_SERVICE_ARGUMENTS:\=\\%
-set ERLANG_SERVICE_ARGUMENTS=%ERLANG_SERVICE_ARGUMENTS:"=\"%
-
-"%ERLANG_SERVICE_MANAGER_PATH%\erlsrv" set %RABBITMQ_SERVICENAME% ^
--machine "%ERLANG_SERVICE_MANAGER_PATH%\erl.exe" ^
--env ERL_CRASH_DUMP="%RABBITMQ_BASE_UNIX%/log" ^
--workdir "%RABBITMQ_BASE%" ^
+-mnesia dir \""!RABBITMQ_MNESIA_DIR!"\" ^
+!CLUSTER_CONFIG! ^
+!RABBITMQ_SERVER_START_ARGS! ^
+!STAR!
+
+set ERLANG_SERVICE_ARGUMENTS=!ERLANG_SERVICE_ARGUMENTS:\=\\!
+set ERLANG_SERVICE_ARGUMENTS=!ERLANG_SERVICE_ARGUMENTS:"=\"!
+
+"!ERLANG_SERVICE_MANAGER_PATH!\erlsrv" set !RABBITMQ_SERVICENAME! ^
+-machine "!ERLANG_SERVICE_MANAGER_PATH!\erl.exe" ^
+-env ERL_CRASH_DUMP="!RABBITMQ_BASE_UNIX!/erl_crash.dump" ^
+-workdir "!RABBITMQ_BASE!" ^
-stopaction "rabbit:stop_and_halt()." ^
--sname %RABBITMQ_NODENAME% ^
-%CONSOLE_FLAG% ^
--args "%ERLANG_SERVICE_ARGUMENTS%" > NUL
+-sname !RABBITMQ_NODENAME! ^
+!CONSOLE_FLAG! ^
+-args "!ERLANG_SERVICE_ARGUMENTS!" > NUL
goto END
:MODIFY_SERVICE
-"%ERLANG_SERVICE_MANAGER_PATH%\erlsrv" %1 %RABBITMQ_SERVICENAME%
+"!ERLANG_SERVICE_MANAGER_PATH!\erlsrv" !P1! !RABBITMQ_SERVICENAME!
goto END
:END
endlocal
+endlocal
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index a332afc6ca..cfb775eb67 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -19,11 +19,11 @@
## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
## Technologies LLC, and Rabbit Technologies Ltd.
##
-## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
## Ltd. Portions created by Cohesive Financial Technologies LLC are
-## Copyright (C) 2007-2009 Cohesive Financial Technologies
+## Copyright (C) 2007-2010 Cohesive Financial Technologies
## LLC. Portions created by Rabbit Technologies Ltd are Copyright
-## (C) 2007-2009 Rabbit Technologies Ltd.
+## (C) 2007-2010 Rabbit Technologies Ltd.
##
## All Rights Reserved.
##
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index 512e8587dc..5557245165 100755..100644
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -19,11 +19,11 @@ REM Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
REM are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
REM Technologies LLC, and Rabbit Technologies Ltd.
REM
-REM Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+REM Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
REM Ltd. Portions created by Cohesive Financial Technologies LLC are
-REM Copyright (C) 2007-2009 Cohesive Financial Technologies
+REM Copyright (C) 2007-2010 Cohesive Financial Technologies
REM LLC. Portions created by Rabbit Technologies Ltd are Copyright
-REM (C) 2007-2009 Rabbit Technologies Ltd.
+REM (C) 2007-2010 Rabbit Technologies Ltd.
REM
REM All Rights Reserved.
REM
@@ -32,11 +32,17 @@ REM
setlocal
-if "%RABBITMQ_NODENAME%"=="" (
+rem Preserve values that might contain exclamation marks before
+rem enabling delayed expansion
+set TDP0=%~dp0
+set STAR=%*
+setlocal enabledelayedexpansion
+
+if "!RABBITMQ_NODENAME!"=="" (
set RABBITMQ_NODENAME=rabbit
)
-if not exist "%ERLANG_HOME%\bin\erl.exe" (
+if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
echo ERLANG_HOME not set correctly.
@@ -48,6 +54,7 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" (
exit /B
)
-"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden %RABBITMQ_CTL_ERL_ARGS% -sname rabbitmqctl -s rabbit_control -nodename %RABBITMQ_NODENAME% -extra %*
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl -s rabbit_control -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
+endlocal
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 53edf8deef..c33582e30d 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -36,7 +36,7 @@
%% InitialTimeout supplied from init). After this timeout has
%% occurred, hibernation will occur as normal. Upon awaking, a new
%% current timeout value will be calculated.
-%%
+%%
%% The purpose is that the gen_server2 takes care of adjusting the
%% current timeout value such that the process will increase the
%% timeout value repeatedly if it is unable to sleep for the
@@ -57,7 +57,7 @@
%% being used. Instead it'll wait for the current timeout as described
%% above.
-%% All modifications are (C) 2009 LShift Ltd.
+%% All modifications are (C) 2009-2010 LShift Ltd.
%% ``The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
diff --git a/src/pg_local.erl b/src/pg_local.erl
new file mode 100644
index 0000000000..fa41fe46b3
--- /dev/null
+++ b/src/pg_local.erl
@@ -0,0 +1,213 @@
+%% This file is a copy of pg2.erl from the R13B-3 Erlang/OTP
+%% distribution, with the following modifications:
+%%
+%% 1) Process groups are node-local only.
+%%
+%% 2) Groups are created/deleted implicitly.
+%%
+%% 3) 'join' and 'leave' are asynchronous.
+%%
+%% 4) the type specs of the exported non-callback functions have been
+%% extracted into a separate, guarded section, and rewritten in
+%% old-style spec syntax, for better compatibility with older
+%% versions of Erlang/OTP. The remaining type specs have been
+%% removed.
+
+%% All modifications are (C) 2010 LShift Ltd.
+
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1997-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(pg_local).
+
+-export([join/2, leave/2, get_members/1]).
+-export([sync/0]). %% intended for testing only; not part of official API
+-export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2,
+ terminate/2]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(name() :: term()).
+
+-spec(start_link/0 :: () -> {'ok', pid()} | {'error', term()}).
+-spec(start/0 :: () -> {'ok', pid()} | {'error', term()}).
+-spec(join/2 :: (name(), pid()) -> 'ok').
+-spec(leave/2 :: (name(), pid()) -> 'ok').
+-spec(get_members/1 :: (name()) -> [pid()]).
+
+-spec(sync/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+%%% As of R13B03 monitors are used instead of links.
+
+%%%
+%%% Exported functions
+%%%
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+start() ->
+ ensure_started().
+
+join(Name, Pid) when is_pid(Pid) ->
+ ensure_started(),
+ gen_server:cast(?MODULE, {join, Name, Pid}).
+
+leave(Name, Pid) when is_pid(Pid) ->
+ ensure_started(),
+ gen_server:cast(?MODULE, {leave, Name, Pid}).
+
+get_members(Name) ->
+ ensure_started(),
+ group_members(Name).
+
+sync() ->
+ ensure_started(),
+ gen_server:call(?MODULE, sync).
+
+%%%
+%%% Callback functions from gen_server
+%%%
+
+-record(state, {}).
+
+init([]) ->
+ pg_local_table = ets:new(pg_local_table, [ordered_set, protected, named_table]),
+ {ok, #state{}}.
+
+handle_call(sync, _From, S) ->
+ {reply, ok, S};
+
+handle_call(Request, From, S) ->
+ error_logger:warning_msg("The pg_local server received an unexpected message:\n"
+ "handle_call(~p, ~p, _)\n",
+ [Request, From]),
+ {noreply, S}.
+
+handle_cast({join, Name, Pid}, S) ->
+ join_group(Name, Pid),
+ {noreply, S};
+handle_cast({leave, Name, Pid}, S) ->
+ leave_group(Name, Pid),
+ {noreply, S};
+handle_cast(_, S) ->
+ {noreply, S}.
+
+handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) ->
+ member_died(MonitorRef),
+ {noreply, S};
+handle_info(_, S) ->
+ {noreply, S}.
+
+terminate(_Reason, _S) ->
+ true = ets:delete(pg_local_table),
+ ok.
+
+%%%
+%%% Local functions
+%%%
+
+%%% One ETS table, pg_local_table, is used for bookkeeping. The type of the
+%%% table is ordered_set, and the fast matching of partially
+%%% instantiated keys is used extensively.
+%%%
+%%% {{ref, Pid}, MonitorRef, Counter}
+%%% {{ref, MonitorRef}, Pid}
+%%% Each process has one monitor. Counter is incremented when the
+%%% Pid joins some group.
+%%% {{member, Name, Pid}, _}
+%%% Pid is a member of group Name, GroupCounter is incremented when the
+%%% Pid joins the group Name.
+%%% {{pid, Pid, Name}}
+%%% Pid is a member of group Name.
+
+member_died(Ref) ->
+ [{{ref, Ref}, Pid}] = ets:lookup(pg_local_table, {ref, Ref}),
+ Names = member_groups(Pid),
+ _ = [leave_group(Name, P) ||
+ Name <- Names,
+ P <- member_in_group(Pid, Name)],
+ ok.
+
+join_group(Name, Pid) ->
+ Ref_Pid = {ref, Pid},
+ try _ = ets:update_counter(pg_local_table, Ref_Pid, {3, +1})
+ catch _:_ ->
+ Ref = erlang:monitor(process, Pid),
+ true = ets:insert(pg_local_table, {Ref_Pid, Ref, 1}),
+ true = ets:insert(pg_local_table, {{ref, Ref}, Pid})
+ end,
+ Member_Name_Pid = {member, Name, Pid},
+ try _ = ets:update_counter(pg_local_table, Member_Name_Pid, {2, +1})
+ catch _:_ ->
+ true = ets:insert(pg_local_table, {Member_Name_Pid, 1}),
+ true = ets:insert(pg_local_table, {{pid, Pid, Name}})
+ end.
+
+leave_group(Name, Pid) ->
+ Member_Name_Pid = {member, Name, Pid},
+ try ets:update_counter(pg_local_table, Member_Name_Pid, {2, -1}) of
+ N ->
+ if
+ N =:= 0 ->
+ true = ets:delete(pg_local_table, {pid, Pid, Name}),
+ true = ets:delete(pg_local_table, Member_Name_Pid);
+ true ->
+ ok
+ end,
+ Ref_Pid = {ref, Pid},
+ case ets:update_counter(pg_local_table, Ref_Pid, {3, -1}) of
+ 0 ->
+ [{Ref_Pid,Ref,0}] = ets:lookup(pg_local_table, Ref_Pid),
+ true = ets:delete(pg_local_table, {ref, Ref}),
+ true = ets:delete(pg_local_table, Ref_Pid),
+ true = erlang:demonitor(Ref, [flush]),
+ ok;
+ _ ->
+ ok
+ end
+ catch _:_ ->
+ ok
+ end.
+
+group_members(Name) ->
+ [P ||
+ [P, N] <- ets:match(pg_local_table, {{member, Name, '$1'},'$2'}),
+ _ <- lists:seq(1, N)].
+
+member_in_group(Pid, Name) ->
+ [{{member, Name, Pid}, N}] = ets:lookup(pg_local_table, {member, Name, Pid}),
+ lists:duplicate(N, Pid).
+
+member_groups(Pid) ->
+ [Name || [Name] <- ets:match(pg_local_table, {{pid, Pid, '$1'}})].
+
+ensure_started() ->
+ case whereis(?MODULE) of
+ undefined ->
+ C = {pg_local, {?MODULE, start_link, []}, permanent,
+ 1000, worker, [?MODULE]},
+ supervisor:start_child(kernel_safe_sup, C);
+ PgLocalPid ->
+ {ok, PgLocalPid}
+ end.
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 74b41a910c..1e481ca718 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 7a0ec89a1f..ac883a1b45 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -51,17 +51,17 @@
-rabbit_boot_step({database,
[{mfa, {rabbit_mnesia, init, []}},
- {pre, kernel_ready}]}).
+ {enables, kernel_ready}]}).
-rabbit_boot_step({rabbit_log,
[{description, "logging server"},
{mfa, {rabbit_sup, start_child, [rabbit_log]}},
- {pre, kernel_ready}]}).
+ {enables, kernel_ready}]}).
-rabbit_boot_step({rabbit_hooks,
[{description, "internal event notification system"},
{mfa, {rabbit_hooks, start, []}},
- {pre, kernel_ready}]}).
+ {enables, kernel_ready}]}).
-rabbit_boot_step({kernel_ready,
[{description, "kernel ready"}]}).
@@ -69,27 +69,27 @@
-rabbit_boot_step({rabbit_alarm,
[{description, "alarm handler"},
{mfa, {rabbit_alarm, start, []}},
- {post, kernel_ready},
- {pre, core_initialized}]}).
+ {requires, kernel_ready},
+ {enables, core_initialized}]}).
-rabbit_boot_step({rabbit_amqqueue_sup,
[{description, "queue supervisor"},
{mfa, {rabbit_amqqueue, start, []}},
- {post, kernel_ready},
- {pre, core_initialized}]}).
+ {requires, kernel_ready},
+ {enables, core_initialized}]}).
-rabbit_boot_step({rabbit_router,
[{description, "cluster router"},
{mfa, {rabbit_sup, start_child, [rabbit_router]}},
- {post, kernel_ready},
- {pre, core_initialized}]}).
+ {requires, kernel_ready},
+ {enables, core_initialized}]}).
-rabbit_boot_step({rabbit_node_monitor,
[{description, "node monitor"},
{mfa, {rabbit_sup, start_child, [rabbit_node_monitor]}},
- {post, kernel_ready},
- {post, rabbit_amqqueue_sup},
- {pre, core_initialized}]}).
+ {requires, kernel_ready},
+ {requires, rabbit_amqqueue_sup},
+ {enables, core_initialized}]}).
-rabbit_boot_step({core_initialized,
[{description, "core initialized"}]}).
@@ -97,27 +97,27 @@
-rabbit_boot_step({empty_db_check,
[{description, "empty DB check"},
{mfa, {?MODULE, maybe_insert_default_data, []}},
- {post, core_initialized}]}).
+ {requires, core_initialized}]}).
-rabbit_boot_step({exchange_recovery,
[{description, "exchange recovery"},
{mfa, {rabbit_exchange, recover, []}},
- {post, empty_db_check}]}).
+ {requires, empty_db_check}]}).
-rabbit_boot_step({queue_recovery,
[{description, "queue recovery"},
{mfa, {rabbit_amqqueue, recover, []}},
- {post, exchange_recovery}]}).
+ {requires, exchange_recovery}]}).
-rabbit_boot_step({persister,
[{mfa, {rabbit_sup, start_child, [rabbit_persister]}},
- {post, queue_recovery}]}).
+ {requires, queue_recovery}]}).
-rabbit_boot_step({guid_generator,
[{description, "guid generator"},
{mfa, {rabbit_sup, start_child, [rabbit_guid]}},
- {post, persister},
- {pre, routing_ready}]}).
+ {requires, persister},
+ {enables, routing_ready}]}).
-rabbit_boot_step({routing_ready,
[{description, "message delivery logic ready"}]}).
@@ -125,12 +125,12 @@
-rabbit_boot_step({log_relay,
[{description, "error log relay"},
{mfa, {rabbit_error_logger, boot, []}},
- {post, routing_ready}]}).
+ {requires, routing_ready}]}).
-rabbit_boot_step({networking,
[{mfa, {rabbit_networking, boot, []}},
- {post, log_relay},
- {pre, networking_listening}]}).
+ {requires, log_relay},
+ {enables, networking_listening}]}).
-rabbit_boot_step({networking_listening,
[{description, "network listeners available"}]}).
@@ -246,9 +246,9 @@ run_boot_step({StepName, Attributes}) ->
end,
case [MFA || {mfa, MFA} <- Attributes] of
[] ->
- io:format("progress -- ~s~n", [Description]);
+ io:format("-- ~s~n", [Description]);
MFAs ->
- io:format("starting ~-40s ...", [Description]),
+ io:format("starting ~-60s ...", [Description]),
[case catch apply(M,F,A) of
{'EXIT', Reason} ->
boot_error("FAILED~nReason: ~p~n", [Reason]);
@@ -286,9 +286,9 @@ sort_boot_steps(UnsortedSteps) ->
%% Add edges, detecting cycles and missing vertices.
lists:foreach(fun ({StepName, Attributes}) ->
[add_boot_step_dep(G, StepName, PrecedingStepName)
- || {post, PrecedingStepName} <- Attributes],
+ || {requires, PrecedingStepName} <- Attributes],
[add_boot_step_dep(G, SucceedingStepName, StepName)
- || {pre, SucceedingStepName} <- Attributes]
+ || {enables, SucceedingStepName} <- Attributes]
end, UnsortedSteps),
%% Use topological sort to find a consistent ordering (if there is
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index eda747b287..23b84afb82 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 534409aaea..3b9eeec18a 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 58312253ea..2611d189ba 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -36,7 +36,8 @@
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
--export([list/1, info/1, info/2, info_all/1, info_all/2]).
+-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
+-export([consumers/1, consumers_all/1]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
-export([notify_sent/2, unblock/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
@@ -68,10 +69,14 @@
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
-spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A).
-spec(list/1 :: (vhost()) -> [amqqueue()]).
+-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (amqqueue()) -> [info()]).
-spec(info/2 :: (amqqueue(), [info_key()]) -> [info()]).
-spec(info_all/1 :: (vhost()) -> [[info()]]).
-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
+-spec(consumers/1 :: (amqqueue()) -> [{pid(), ctag(), boolean()}]).
+-spec(consumers_all/1 ::
+ (vhost()) -> [{queue_name(), pid(), ctag(), boolean()}]).
-spec(stat/1 :: (amqqueue()) -> qstats()).
-spec(stat_all/0 :: () -> [qstats()]).
-spec(delete/3 ::
@@ -93,7 +98,8 @@
-spec(basic_get/3 :: (amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
-spec(basic_consume/8 ::
- (amqqueue(), boolean(), pid(), pid(), pid(), ctag(), boolean(), any()) ->
+ (amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(),
+ boolean(), any()) ->
'ok' | {'error', 'queue_owned_by_another_connection' |
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
@@ -236,6 +242,8 @@ list(VHostPath) ->
rabbit_queue,
#amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}).
+info_keys() -> rabbit_amqqueue_process:info_keys().
+
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
@@ -251,6 +259,16 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
+consumers(#amqqueue{ pid = QPid }) ->
+ gen_server2:pcall(QPid, 9, consumers, infinity).
+
+consumers_all(VHostPath) ->
+ lists:concat(
+ map(VHostPath,
+ fun (Q) -> [{Q#amqqueue.name, ChPid, ConsumerTag, AckRequired} ||
+ {ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
+ end)).
+
stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity).
stat_all() ->
@@ -280,7 +298,7 @@ requeue(QPid, MsgIds, ChPid) ->
gen_server2:call(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
- gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}).
+ gen_server2:pcast(QPid, 8, {ack, Txn, MsgIds, ChPid}).
commit_all(QPids, Txn) ->
safe_pmap_ok(
@@ -307,13 +325,13 @@ limit_all(QPids, ChPid, LimiterPid) ->
fun (_) -> ok end,
fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end,
QPids).
-
+
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity).
basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
+ gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
infinity).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 15b9d05739..6c5f4757dc 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -39,7 +39,7 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
--export([start_link/1]).
+-export([start_link/1, info_keys/0]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
@@ -76,6 +76,9 @@
auto_delete,
arguments,
pid,
+ owner_pid,
+ exclusive_consumer_pid,
+ exclusive_consumer_tag,
messages_ready,
messages_unacknowledged,
messages_uncommitted,
@@ -84,12 +87,13 @@
consumers,
transactions,
memory]).
-
+
%%----------------------------------------------------------------------------
-start_link(Q) ->
- gen_server2:start_link(?MODULE, Q, []).
+start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
+info_keys() -> ?INFO_KEYS.
+
%%----------------------------------------------------------------------------
init(Q) ->
@@ -168,13 +172,12 @@ record_current_channel_tx(ChPid, Txn) ->
%% as a side effect this also starts monitoring the channel (if
%% that wasn't happening already)
store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
-
-deliver_immediately(Message, Delivered,
+
+deliver_immediately(Message, IsDelivered,
State = #q{q = #amqqueue{name = QName},
active_consumers = ActiveConsumers,
blocked_consumers = BlockedConsumers,
next_msg_id = NextId}) ->
- ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
case queue:out(ActiveConsumers) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
@@ -186,7 +189,7 @@ deliver_immediately(Message, Delivered,
true ->
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
- {QName, self(), NextId, Delivered, Message}),
+ {QName, self(), NextId, IsDelivered, Message}),
NewUAM = case AckRequired of
true -> dict:store(NextId, Message, UAM);
false -> UAM
@@ -217,7 +220,7 @@ deliver_immediately(Message, Delivered,
ActiveConsumers,
BlockedConsumers),
deliver_immediately(
- Message, Delivered,
+ Message, IsDelivered,
State#q{active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedConsumers})
end;
@@ -225,6 +228,26 @@ deliver_immediately(Message, Delivered,
{not_offered, State}
end.
+run_message_queue(State = #q{message_buffer = MessageBuffer}) ->
+ run_message_queue(MessageBuffer, State).
+
+run_message_queue(MessageBuffer, State) ->
+ case queue:out(MessageBuffer) of
+ {{value, {Message, IsDelivered}}, BufferTail} ->
+ case deliver_immediately(Message, IsDelivered, State) of
+ {offered, true, NewState} ->
+ persist_delivery(qname(State), Message, IsDelivered),
+ run_message_queue(BufferTail, NewState);
+ {offered, false, NewState} ->
+ persist_auto_ack(qname(State), Message),
+ run_message_queue(BufferTail, NewState);
+ {not_offered, NewState} ->
+ NewState#q{message_buffer = MessageBuffer}
+ end;
+ {empty, _} ->
+ State#q{message_buffer = MessageBuffer}
+ end.
+
attempt_delivery(none, _ChPid, Message, State) ->
case deliver_immediately(Message, false, State) of
{offered, false, State1} ->
@@ -252,8 +275,8 @@ deliver_or_enqueue(Txn, ChPid, Message, State) ->
end.
deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) ->
- run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)),
- State).
+ run_message_queue(queue:join(MessageBuffer, queue:from_list(Messages)),
+ State).
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
@@ -287,44 +310,44 @@ possibly_unblock(State, ChPid, Update) ->
move_consumers(ChPid,
State#q.blocked_consumers,
State#q.active_consumers),
- run_poke_burst(
+ run_message_queue(
State#q{active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedConsumers})
end
end.
-
+
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
case lookup_ch(DownPid) of
- not_found -> {ok, State};
+ not_found ->
+ {ok, State};
#cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn,
unacked_messages = UAM} ->
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
- case Txn of
- none -> ok;
- _ -> ok = rollback_work(Txn, qname(State)),
- erase_tx(Txn)
- end,
- NewState =
- deliver_or_enqueue_n(
- [{Message, true} ||
- {_Messsage_id, Message} <- dict:to_list(UAM)],
- State#q{
- exclusive_consumer = case Holder of
- {ChPid, _} -> none;
- Other -> Other
- end,
- active_consumers = remove_consumers(
- ChPid, State#q.active_consumers),
- blocked_consumers = remove_consumers(
- ChPid, State#q.blocked_consumers)}),
- case should_auto_delete(NewState) of
- false -> {ok, NewState};
- true -> {stop, NewState}
+ State1 = State#q{
+ exclusive_consumer = case Holder of
+ {ChPid, _} -> none;
+ Other -> Other
+ end,
+ active_consumers = remove_consumers(
+ ChPid, State#q.active_consumers),
+ blocked_consumers = remove_consumers(
+ ChPid, State#q.blocked_consumers)},
+ case should_auto_delete(State1) of
+ true -> {stop, State1};
+ false -> case Txn of
+ none -> ok;
+ _ -> ok = rollback_work(Txn, qname(State1)),
+ erase_tx(Txn)
+ end,
+ {ok, deliver_or_enqueue_n(
+ [{Message, true} ||
+ {_MsgId, Message} <- dict:to_list(UAM)],
+ State1)}
end
end.
@@ -343,26 +366,6 @@ check_exclusive_access(none, true, State) ->
false -> in_use
end.
-run_poke_burst(State = #q{message_buffer = MessageBuffer}) ->
- run_poke_burst(MessageBuffer, State).
-
-run_poke_burst(MessageBuffer, State) ->
- case queue:out(MessageBuffer) of
- {{value, {Message, Delivered}}, BufferTail} ->
- case deliver_immediately(Message, Delivered, State) of
- {offered, true, NewState} ->
- persist_delivery(qname(State), Message, Delivered),
- run_poke_burst(BufferTail, NewState);
- {offered, false, NewState} ->
- persist_auto_ack(qname(State), Message),
- run_poke_burst(BufferTail, NewState);
- {not_offered, NewState} ->
- NewState#q{message_buffer = MessageBuffer}
- end;
- {empty, _} ->
- State#q{message_buffer = MessageBuffer}
- end.
-
is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso
queue:is_empty(State#q.blocked_consumers).
@@ -385,10 +388,10 @@ persist_delivery(_QName, _Message,
true) ->
ok;
persist_delivery(_QName, #basic_message{persistent_key = none},
- _Delivered) ->
+ _IsDelivered) ->
ok;
persist_delivery(QName, #basic_message{persistent_key = PKey},
- _Delivered) ->
+ _IsDelivered) ->
persist_work(none, QName, [{deliver, {QName, PKey}}]).
persist_acks(Txn, QName, Messages) ->
@@ -451,7 +454,7 @@ all_tx() ->
mark_tx_persistent(Txn) ->
Tx = lookup_tx(Txn),
store_tx(Txn, Tx#tx{is_persistent = true}).
-
+
is_tx_persistent(Txn) ->
#tx{is_persistent = Res} = lookup_tx(Txn),
Res.
@@ -488,11 +491,11 @@ collect_messages(MsgIds, UAM) ->
purge_message_buffer(QName, MessageBuffer) ->
Messages =
- [[Message || {Message, _Delivered} <-
+ [[Message || {Message, _IsDelivered} <-
queue:to_list(MessageBuffer)] |
lists:map(
fun (#cr{unacked_messages = UAM}) ->
- [Message || {_MessageId, Message} <- dict:to_list(UAM)]
+ [Message || {_MsgId, Message} <- dict:to_list(UAM)]
end,
all_ch_record())],
%% the simplest, though certainly not the most obvious or
@@ -508,6 +511,18 @@ i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete;
i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments;
i(pid, _) ->
self();
+i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) ->
+ '';
+i(owner_pid, #q{q = #amqqueue{exclusive_owner = ReaderPid}}) ->
+ ReaderPid;
+i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
+ '';
+i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
+ ChPid;
+i(exclusive_consumer_tag, #q{exclusive_consumer = none}) ->
+ '';
+i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
+ ConsumerTag;
i(messages_ready, #q{message_buffer = MessageBuffer}) ->
queue:len(MessageBuffer);
i(messages_unacknowledged, _) ->
@@ -544,6 +559,15 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
+handle_call(consumers, _From,
+ State = #q{active_consumers = ActiveConsumers,
+ blocked_consumers = BlockedConsumers}) ->
+ reply(rabbit_misc:queue_fold(
+ fun ({ChPid, #consumer{tag = ConsumerTag,
+ ack_required = AckRequired}}, Acc) ->
+ [{ChPid, ConsumerTag, AckRequired} | Acc]
+ end, [], queue:join(ActiveConsumers, BlockedConsumers)), State);
+
handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
%% Synchronous, "immediate" delivery mode
%%
@@ -590,18 +614,18 @@ handle_call({basic_get, ChPid, NoAck}, _From,
next_msg_id = NextId,
message_buffer = MessageBuffer}) ->
case queue:out(MessageBuffer) of
- {{value, {Message, Delivered}}, BufferTail} ->
+ {{value, {Message, IsDelivered}}, BufferTail} ->
AckRequired = not(NoAck),
case AckRequired of
true ->
- persist_delivery(QName, Message, Delivered),
+ persist_delivery(QName, Message, IsDelivered),
C = #cr{unacked_messages = UAM} = ch_record(ChPid),
NewUAM = dict:store(NextId, Message, UAM),
store_ch_record(C#cr{unacked_messages = NewUAM});
false ->
persist_auto_ack(QName, Message)
end,
- Msg = {QName, self(), NextId, Delivered, Message},
+ Msg = {QName, self(), NextId, IsDelivered, Message},
reply({ok, queue:len(BufferTail), Msg},
State#q{message_buffer = BufferTail,
next_msg_id = NextId + 1});
@@ -611,8 +635,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg},
- _From, State = #q{q = #amqqueue{exclusive_owner = Owner},
- exclusive_consumer = ExistingHolder}) ->
+ _From, State = #q{exclusive_consumer = ExistingHolder}) ->
case check_exclusive_access(ExistingHolder, ExclusiveConsume,
State) of
in_use ->
@@ -623,15 +646,14 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
ack_required = not(NoAck)},
store_ch_record(C#cr{consumer_count = ConsumerCount +1,
limiter_pid = LimiterPid}),
- if ConsumerCount == 0 ->
- ok = rabbit_limiter:register(LimiterPid, self());
- true ->
- ok
+ case ConsumerCount of
+ 0 -> rabbit_limiter:register(LimiterPid, self());
+ _ -> ok
end,
- ExclusiveConsumer =
- if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> ExistingHolder
- end,
+ ExclusiveConsumer = case ExclusiveConsume of
+ true -> {ChPid, ConsumerTag};
+ false -> ExistingHolder
+ end,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
@@ -642,7 +664,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
add_consumer(
ChPid, Consumer,
State1#q.blocked_consumers)};
- false -> run_poke_burst(
+ false -> run_message_queue(
State1#q{
active_consumers =
add_consumer(
@@ -660,10 +682,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
reply(ok, State);
C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} ->
store_ch_record(C#cr{consumer_count = ConsumerCount - 1}),
- if ConsumerCount == 1 ->
- ok = rabbit_limiter:unregister(LimiterPid, self());
- true ->
- ok
+ case ConsumerCount of
+ 1 -> ok = rabbit_limiter:unregister(LimiterPid, self());
+ _ -> ok
end,
ok = maybe_send_reply(ChPid, OkMsg),
NewState =
@@ -685,8 +706,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
message_buffer = MessageBuffer,
active_consumers = ActiveConsumers}) ->
- reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)},
- State);
+ Length = queue:len(MessageBuffer),
+ reply({ok, Name, Length, queue:len(ActiveConsumers)}, State);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{message_buffer = MessageBuffer}) ->
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 46d23a4075..0f3a86646a 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index bec2cd0845..9ebb6e72e0 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -46,7 +46,7 @@
-spec(publish/1 :: (delivery()) -> publish_result()).
-spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) ->
- delivery()).
+ delivery()).
-spec(message/4 :: (exchange_name(), routing_key(), properties_input(),
binary()) -> message()).
-spec(properties/1 :: (properties_input()) -> amqp_properties()).
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 01ac4f027f..1d47d7640e 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -46,6 +46,7 @@
build_heartbeat_frame/0]).
-export([generate_table/1, encode_properties/2]).
-export([check_empty_content_body_frame_size/0]).
+-export([ensure_content_encoded/1, clear_encoded_content/1]).
-import(lists).
@@ -60,9 +61,11 @@
-spec(build_simple_content_frames/3 ::
(channel_number(), content(), non_neg_integer()) -> [frame()]).
-spec(build_heartbeat_frame/0 :: () -> frame()).
--spec(generate_table/1 :: (amqp_table()) -> binary()).
+-spec(generate_table/1 :: (amqp_table()) -> binary()).
-spec(encode_properties/2 :: ([amqp_property_type()], [any()]) -> binary()).
-spec(check_empty_content_body_frame_size/0 :: () -> 'ok').
+-spec(ensure_content_encoded/1 :: (content()) -> encoded_content()).
+-spec(clear_encoded_content/1 :: (content()) -> unencoded_content()).
-endif.
@@ -193,12 +196,16 @@ generate_array(Array) when is_list(Array) ->
fun ({Type, Value}) -> field_value_to_binary(Type, Value) end,
Array)).
-short_string_to_binary(String) when is_binary(String) and (size(String) < 256) ->
- [<<(size(String)):8>>, String];
+short_string_to_binary(String) when is_binary(String) ->
+ Len = size(String),
+ if Len < 256 -> [<<(size(String)):8>>, String];
+ true -> exit(content_properties_shortstr_overflow)
+ end;
short_string_to_binary(String) ->
StringLength = length(String),
- true = (StringLength < 256), % assertion
- [<<StringLength:8>>, String].
+ if StringLength < 256 -> [<<StringLength:8>>, String];
+ true -> exit(content_properties_shortstr_overflow)
+ end.
long_string_to_binary(String) when is_binary(String) ->
[<<(size(String)):32>>, String];
@@ -236,7 +243,10 @@ encode_properties(Bit, [T | TypeList], [Value | ValueList], FirstShortAcc, Flags
end.
encode_property(shortstr, String) ->
- Len = size(String), <<Len:8/unsigned, String:Len/binary>>;
+ Len = size(String),
+ if Len < 256 -> <<Len:8/unsigned, String:Len/binary>>;
+ true -> exit(content_properties_shortstr_overflow)
+ end;
encode_property(longstr, String) ->
Len = size(String), <<Len:32/unsigned, String:Len/binary>>;
encode_property(octet, Int) ->
@@ -262,3 +272,19 @@ check_empty_content_body_frame_size() ->
exit({incorrect_empty_content_body_frame_size,
ComputedSize, ?EMPTY_CONTENT_BODY_FRAME_SIZE})
end.
+
+ensure_content_encoded(Content = #content{properties_bin = PropsBin})
+ when PropsBin =/= 'none' ->
+ Content;
+ensure_content_encoded(Content = #content{properties = Props}) ->
+ Content #content{properties_bin = rabbit_framing:encode_properties(Props)}.
+
+clear_encoded_content(Content = #content{properties_bin = none}) ->
+ Content;
+clear_encoded_content(Content = #content{properties = none}) ->
+ %% Only clear when we can rebuild the properties_bin later in
+ %% accordance to the content record definition comment - maximum
+ %% one of properties and properties_bin can be 'none'
+ Content;
+clear_encoded_content(Content = #content{}) ->
+ Content#content{properties_bin = none}.
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index 506e87ecb1..e022a1fafe 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -139,7 +139,7 @@ parse_properties(Bit, [Type | TypeListRest], Acc, FirstShort,
end,
parse_properties(Bit + 1, TypeListRest, [Value | Acc], FirstShort,
Remainder, Rest).
-
+
parse_property(shortstr, <<Len:8/unsigned, String:Len/binary, Rest/binary>>) ->
{String, Rest};
parse_property(longstr, <<Len:32/unsigned, String:Len/binary, Rest/binary>>) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e16be941a4..ddb941cffe 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -37,8 +37,10 @@
-export([start_link/5, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2]).
+-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
--export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
+-export([init/1, terminate/2, code_change/3,
+ handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]).
-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
transaction_id, tx_participants, next_tag,
@@ -46,10 +48,23 @@
username, virtual_host,
most_recently_declared_queue, consumer_mapping}).
--define(HIBERNATE_AFTER, 1000).
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
+-define(INFO_KEYS,
+ [pid,
+ connection,
+ number,
+ user,
+ vhost,
+ transactional,
+ consumer_count,
+ messages_unacknowledged,
+ acks_uncommitted,
+ prefetch_count]).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -62,6 +77,12 @@
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
-spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
+-spec(list/0 :: () -> [pid()]).
+-spec(info_keys/0 :: () -> [info_key()]).
+-spec(info/1 :: (pid()) -> [info()]).
+-spec(info/2 :: (pid(), [info_key()]) -> [info()]).
+-spec(info_all/0 :: () -> [[info()]]).
+-spec(info_all/1 :: ([info_key()]) -> [[info()]]).
-endif.
@@ -91,12 +112,33 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
conserve_memory(Pid, Conserve) ->
gen_server2:pcast(Pid, 9, {conserve_memory, Conserve}).
+list() ->
+ pg_local:get_members(rabbit_channels).
+
+info_keys() -> ?INFO_KEYS.
+
+info(Pid) ->
+ gen_server2:pcall(Pid, 9, info, infinity).
+
+info(Pid, Items) ->
+ case gen_server2:pcall(Pid, 9, {info, Items}, infinity) of
+ {ok, Res} -> Res;
+ {error, Error} -> throw(Error)
+ end.
+
+info_all() ->
+ rabbit_misc:filter_exit_map(fun (C) -> info(C) end, list()).
+
+info_all(Items) ->
+ rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).
+
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
process_flag(trap_exit, true),
link(WriterPid),
rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ ok = pg_local:join(rabbit_channels, self()),
{ok, #ch{state = starting,
channel = Channel,
reader_pid = ReaderPid,
@@ -110,7 +152,18 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
username = Username,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- consumer_mapping = dict:new()}}.
+ consumer_mapping = dict:new()},
+ hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call(info, _From, State) ->
+ reply(infos(?INFO_KEYS, State), State);
+
+handle_call({info, Items}, _From, State) ->
+ try
+ reply({ok, infos(Items, State)}, State)
+ catch Error -> reply({error, Error}, State)
+ end;
handle_call(_Request, _From, State) ->
noreply(State).
@@ -162,33 +215,31 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
{stop, normal, State};
handle_info({'EXIT', _Pid, Reason}, State) ->
- {stop, Reason, State};
+ {stop, Reason, State}.
-handle_info(timeout, State) ->
+handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
- {noreply, State, hibernate}.
+ {hibernate, State}.
-terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid,
- state = terminating}) ->
- rabbit_writer:shutdown(WriterPid),
- rabbit_limiter:shutdown(LimiterPid);
+terminate(_Reason, State = #ch{state = terminating}) ->
+ terminate(State);
-terminate(Reason, State = #ch{writer_pid = WriterPid,
- limiter_pid = LimiterPid}) ->
+terminate(Reason, State) ->
Res = rollback_and_notify(State),
case Reason of
normal -> ok = Res;
_ -> ok
end,
- rabbit_writer:shutdown(WriterPid),
- rabbit_limiter:shutdown(LimiterPid).
+ terminate(State).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%---------------------------------------------------------------------------
-noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
+reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}.
+
+noreply(NewState) -> {noreply, NewState, hibernate}.
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -488,16 +539,18 @@ handle_method(#'basic.qos'{global = true}, _, _State) ->
rabbit_misc:protocol_error(not_implemented, "global=true", []);
handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
- rabbit_misc:protocol_error(not_implemented,
+ rabbit_misc:protocol_error(not_implemented,
"prefetch_size!=0 (~w)", [Size]);
handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
- _, State = #ch{ limiter_pid = LimiterPid }) ->
+ _, State = #ch{ limiter_pid = LimiterPid,
+ unacked_message_q = UAMQ }) ->
NewLimiterPid = case {LimiterPid, PrefetchCount} of
{undefined, 0} ->
undefined;
{undefined, _} ->
- LPid = rabbit_limiter:start_link(self()),
+ LPid = rabbit_limiter:start_link(self(),
+ queue:len(UAMQ)),
ok = limit_queues(LPid, State),
LPid;
{_, 0} ->
@@ -530,24 +583,24 @@ handle_method(#'basic.recover_async'{requeue = false},
_, State = #ch{ transaction_id = none,
writer_pid = WriterPid,
unacked_message_q = UAMQ }) ->
- lists:foreach(
- fun ({_DeliveryTag, none, _Msg}) ->
- %% Was sent as a basic.get_ok. Don't redeliver
- %% it. FIXME: appropriate?
- ok;
- ({DeliveryTag, ConsumerTag,
- {QName, QPid, MsgId, _Redelivered, Message}}) ->
- %% Was sent as a proper consumer delivery. Resend it as
- %% before.
- %%
- %% FIXME: What should happen if the consumer's been
- %% cancelled since?
- %%
- %% FIXME: should we allocate a fresh DeliveryTag?
- ok = internal_deliver(
+ ok = rabbit_misc:queue_fold(
+ fun ({_DeliveryTag, none, _Msg}, ok) ->
+ %% Was sent as a basic.get_ok. Don't redeliver
+ %% it. FIXME: appropriate?
+ ok;
+ ({DeliveryTag, ConsumerTag,
+ {QName, QPid, MsgId, _Redelivered, Message}}, ok) ->
+ %% Was sent as a proper consumer delivery. Resend
+ %% it as before.
+ %%
+ %% FIXME: What should happen if the consumer's been
+ %% cancelled since?
+ %%
+ %% FIXME: should we allocate a fresh DeliveryTag?
+ internal_deliver(
WriterPid, false, ConsumerTag, DeliveryTag,
{QName, QPid, MsgId, true, Message})
- end, queue:to_list(UAMQ)),
+ end, queue:to_list(UAMQ)),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
{noreply, State};
@@ -785,9 +838,9 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ReturnMethod, NoWait,
State = #ch{ virtual_host = VHostPath,
reader_pid = ReaderPid }) ->
- %% FIXME: connection exception (!) on failure??
+ %% FIXME: connection exception (!) on failure??
%% (see rule named "failure" in spec-XML)
- %% FIXME: don't allow binding to internal exchanges -
+ %% FIXME: don't allow binding to internal exchanges -
%% including the one named "" !
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_write_permitted(QueueName, State),
@@ -902,7 +955,7 @@ rollback_and_notify(State) ->
notify_queues(internal_rollback(State)).
fold_per_queue(F, Acc0, UAQ) ->
- D = lists:foldl(
+ D = rabbit_misc:queue_fold(
fun ({_DTag, _CTag,
{_QName, QPid, MsgId, _Redelivered, _Message}}, D) ->
%% dict:append would be simpler and avoid the
@@ -913,7 +966,7 @@ fold_per_queue(F, Acc0, UAQ) ->
fun (MsgIds) -> [MsgId | MsgIds] end,
[MsgId],
D)
- end, dict:new(), queue:to_list(UAQ)),
+ end, dict:new(), UAQ),
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
@@ -924,7 +977,7 @@ limit_queues(LPid, #ch{consumer_mapping = Consumers}) ->
rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid).
consumer_queues(Consumers) ->
- [QPid || QueueName <-
+ [QPid || QueueName <-
sets:to_list(
dict:fold(fun (_ConsumerTag, QueueName, S) ->
sets:add_element(QueueName, S)
@@ -942,9 +995,9 @@ consumer_queues(Consumers) ->
notify_limiter(undefined, _Acked) ->
ok;
notify_limiter(LimiterPid, Acked) ->
- case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
- ({_, _, _}, Acc) -> Acc + 1
- end, 0, queue:to_list(Acked)) of
+ case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc;
+ ({_, _, _}, Acc) -> Acc + 1
+ end, 0, Acked) of
0 -> ok;
Count -> rabbit_limiter:ack(LimiterPid, Count)
end.
@@ -981,3 +1034,28 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
WriterPid, QPid, self(), M, Content);
false -> rabbit_writer:send_command(WriterPid, M, Content)
end.
+
+terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) ->
+ pg_local:leave(rabbit_channels, self()),
+ rabbit_writer:shutdown(WriterPid),
+ rabbit_limiter:shutdown(LimiterPid).
+
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+
+i(pid, _) -> self();
+i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid;
+i(number, #ch{channel = Channel}) -> Channel;
+i(user, #ch{username = Username}) -> Username;
+i(vhost, #ch{virtual_host = VHost}) -> VHost;
+i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
+i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
+ dict:size(ConsumerMapping);
+i(messages_unacknowledged, #ch{unacked_message_q = UAMQ,
+ uncommitted_ack_q = UAQ}) ->
+ queue:len(UAMQ) + queue:len(UAQ);
+i(acks_uncommitted, #ch{uncommitted_ack_q = UAQ}) ->
+ queue:len(UAQ);
+i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
+ rabbit_limiter:get_limit(LimiterPid);
+i(Item, _) ->
+ throw({bad_argument, Item}).
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 652bb8968d..2c6bd5ae64 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -54,7 +54,7 @@
start() ->
{ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
FullCommand = init:get_plain_arguments(),
- #params{quiet = Quiet, node = Node, command = Command, args = Args} =
+ #params{quiet = Quiet, node = Node, command = Command, args = Args} =
parse_args(FullCommand, #params{quiet = false,
node = rabbit_misc:makenode(NodeStr)}),
Inform = case Quiet of
@@ -81,6 +81,9 @@ start() ->
{error, Reason} ->
error("~p", [Reason]),
halt(2);
+ {badrpc, {'EXIT', Reason}} ->
+ error("~p", [Reason]),
+ halt(2);
{badrpc, Reason} ->
error("unable to connect to node ~w: ~w", [Node, Reason]),
print_badrpc_diagnostics(Node),
@@ -139,6 +142,7 @@ Available commands:
cluster <ClusterNode> ...
status
rotate_logs [Suffix]
+ close_connection <ConnectionPid> <ExplanationString>
add_user <UserName> <Password>
delete_user <UserName>
@@ -156,11 +160,13 @@ Available commands:
list_queues [-p <VHostPath>] [<QueueInfoItem> ...]
list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...]
- list_bindings [-p <VHostPath>]
+ list_bindings [-p <VHostPath>]
list_connections [<ConnectionInfoItem> ...]
+ list_channels [<ChannelInfoItem> ...]
+ list_consumers [-p <VHostPath>]
-Quiet output mode is selected with the \"-q\" flag. Informational messages
-are suppressed when quiet mode is in effect.
+Quiet output mode is selected with the \"-q\" flag. Informational
+messages are suppressed when quiet mode is in effect.
<node> should be the name of the master node of the RabbitMQ
cluster. It defaults to the node named \"rabbit\" on the local
@@ -169,24 +175,39 @@ usually be rabbit@server (unless RABBITMQ_NODENAME has been set to
some non-default value at broker startup time). The output of hostname
-s is usually the correct suffix to use after the \"@\" sign.
-The list_queues, list_exchanges and list_bindings commands accept an optional
-virtual host parameter for which to display results. The default value is \"/\".
+The list_queues, list_exchanges and list_bindings commands accept an
+optional virtual host parameter for which to display results. The
+default value is \"/\".
-<QueueInfoItem> must be a member of the list [name, durable, auto_delete,
-arguments, pid, messages_ready, messages_unacknowledged, messages_uncommitted,
-messages, acks_uncommitted, consumers, transactions, memory]. The default is
- to display name and (number of) messages.
+<QueueInfoItem> must be a member of the list [name, durable,
+auto_delete, arguments, pid, owner_pid, exclusive_consumer_pid,
+exclusive_consumer_tag, messages_ready, messages_unacknowledged,
+messages_uncommitted, messages, acks_uncommitted, consumers,
+transactions, memory]. The default is to display name and (number of)
+messages.
<ExchangeInfoItem> must be a member of the list [name, type, durable,
arguments]. The default is to display name and type.
-The output format for \"list_bindings\" is a list of rows containing
+The output format for \"list_bindings\" is a list of rows containing
exchange name, queue name, routing key and arguments, in that order.
-<ConnectionInfoItem> must be a member of the list [pid, address, port,
-peer_address, peer_port, state, channels, user, vhost, timeout, frame_max,
-client_properties, recv_oct, recv_cnt, send_oct, send_cnt, send_pend].
-The default is to display user, peer_address, peer_port and state.
+<ConnectionInfoItem> must be a member of the list [pid, address, port,
+peer_address, peer_port, state, channels, user, vhost, timeout,
+frame_max, client_properties, recv_oct, recv_cnt, send_oct, send_cnt,
+send_pend]. The default is to display user, peer_address, peer_port
+and state.
+
+<ChannelInfoItem> must be a member of the list [pid, connection,
+number, user, vhost, transactional, consumer_count,
+messages_unacknowledged, acks_uncommitted, prefetch_count]. The
+default is to display pid, user, transactional, consumer_count,
+messages_unacknowledged.
+
+The output format for \"list_consumers\" is a list of rows containing,
+in order, the queue name, channel process id, consumer tag, and a
+boolean indicating whether acknowledgements are expected from the
+consumer.
"),
halt(1).
@@ -232,6 +253,11 @@ action(rotate_logs, Node, Args = [Suffix], Inform) ->
Inform("Rotating logs to files with suffix ~p", [Suffix]),
call(Node, {rabbit, rotate_logs, Args});
+action(close_connection, Node, [PidStr, Explanation], Inform) ->
+ Inform("Closing connection ~s", [PidStr]),
+ rpc_call(Node, rabbit_networking, close_connection,
+ [rabbit_misc:string_to_pid(PidStr), Explanation]);
+
action(add_user, Node, Args = [Username, _Password], Inform) ->
Inform("Creating user ~p", [Username]),
call(Node, {rabbit_access_control, add_user, Args});
@@ -287,9 +313,8 @@ action(list_bindings, Node, Args, Inform) ->
InfoKeys = [exchange_name, queue_name, routing_key, args],
display_info_list(
[lists:zip(InfoKeys, tuple_to_list(X)) ||
- X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])],
- InfoKeys),
- ok;
+ X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])],
+ InfoKeys);
action(list_connections, Node, Args, Inform) ->
Inform("Listing connections", []),
@@ -298,6 +323,22 @@ action(list_connections, Node, Args, Inform) ->
[ArgAtoms]),
ArgAtoms);
+action(list_channels, Node, Args, Inform) ->
+ Inform("Listing channels", []),
+ ArgAtoms = default_if_empty(Args, [pid, user, transactional, consumer_count,
+ messages_unacknowledged]),
+ display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]),
+ ArgAtoms);
+
+action(list_consumers, Node, Args, Inform) ->
+ Inform("Listing consumers", []),
+ {VHostArg, _} = parse_vhost_flag_bin(Args),
+ InfoKeys = [queue_name, channel_pid, consumer_tag, ack_required],
+ display_info_list(
+ [lists:zip(InfoKeys, tuple_to_list(X)) ||
+ X <- rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg])],
+ InfoKeys);
+
action(Command, Node, Args, Inform) ->
{VHost, RemainingArgs} = parse_vhost_flag(Args),
action(Command, Node, VHost, RemainingArgs, Inform).
@@ -317,9 +358,9 @@ action(list_permissions, Node, VHost, [], Inform) ->
[VHost]})).
parse_vhost_flag(Args) when is_list(Args) ->
- case Args of
+ case Args of
["-p", VHost | RemainingArgs] ->
- {VHost, RemainingArgs};
+ {VHost, RemainingArgs};
RemainingArgs ->
{"/", RemainingArgs}
end.
@@ -329,9 +370,9 @@ parse_vhost_flag_bin(Args) ->
{list_to_binary(VHost), RemainingArgs}.
default_if_empty(List, Default) when is_list(List) ->
- if List == [] ->
- Default;
- true ->
+ if List == [] ->
+ Default;
+ true ->
[list_to_atom(X) || X <- List]
end.
@@ -355,8 +396,8 @@ format_info_item(Key, Items) ->
is_tuple(Value) ->
inet_parse:ntoa(Value);
Value when is_pid(Value) ->
- pid_to_string(Value);
- Value when is_binary(Value) ->
+ rabbit_misc:pid_to_string(Value);
+ Value when is_binary(Value) ->
escape(Value);
Value when is_atom(Value) ->
escape(atom_to_list(Value));
@@ -413,10 +454,3 @@ prettify_typed_amqp_value(Type, Value) ->
array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value];
_ -> Value
end.
-
-%% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and 8.7)
-pid_to_string(Pid) ->
- <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>>
- = term_to_binary(Pid),
- Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
- lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])).
diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl
index 23e6fc4432..078cf620f4 100644
--- a/src/rabbit_dialyzer.erl
+++ b/src/rabbit_dialyzer.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 10ea84aacb..face0a1a21 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index 183b69844c..45b66712b8 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -56,7 +56,7 @@ init({{File, Suffix}, []}) ->
init({{File, _}, error}) ->
init(File);
%% Used only when swapping handlers without performing
-%% log rotation
+%% log rotation
init({File, []}) ->
init(File);
init({File, _Type} = FileInfo) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index c16396d357..bcba7d0b4c 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -35,7 +35,7 @@
-include("rabbit_framing.hrl").
-export([recover/0, declare/4, lookup/1, lookup_or_die/1,
- list/1, info/1, info/2, info_all/1, info_all/2,
+ list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
publish/2]).
-export([add_binding/5, delete_binding/5, list_bindings/1]).
-export([delete/2]).
@@ -70,6 +70,7 @@
-spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()).
-spec(lookup_or_die/1 :: (exchange_name()) -> exchange()).
-spec(list/1 :: (vhost()) -> [exchange()]).
+-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (exchange()) -> [info()]).
-spec(info/2 :: (exchange(), [info_key()]) -> [info()]).
-spec(info_all/1 :: (vhost()) -> [[info()]]).
@@ -81,7 +82,7 @@
-spec(delete_binding/5 ::
(exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) ->
bind_res() | {'error', 'binding_not_found'}).
--spec(list_bindings/1 :: (vhost()) ->
+-spec(list_bindings/1 :: (vhost()) ->
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
-spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok').
-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok').
@@ -89,9 +90,9 @@
-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()).
-spec(delete/2 :: (exchange_name(), boolean()) ->
'ok' | not_found() | {'error', 'in_use'}).
--spec(list_queue_bindings/1 :: (queue_name()) ->
+-spec(list_queue_bindings/1 :: (queue_name()) ->
[{exchange_name(), routing_key(), amqp_table()}]).
--spec(list_exchange_bindings/1 :: (exchange_name()) ->
+-spec(list_exchange_bindings/1 :: (exchange_name()) ->
[{queue_name(), routing_key(), amqp_table()}]).
-endif.
@@ -194,6 +195,8 @@ list(VHostPath) ->
rabbit_exchange,
#exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
+info_keys() -> ?INFO_KEYS.
+
map(VHostPath, F) ->
%% TODO: there is scope for optimisation here, e.g. using a
%% cursor, parallelising the function invocation
@@ -349,7 +352,7 @@ delete_queue_bindings(QueueName, FwdDeleteFun) ->
end || Route <- mnesia:match_object(
rabbit_reverse_route,
reverse_route(
- #route{binding = #binding{queue_name = QueueName,
+ #route{binding = #binding{queue_name = QueueName,
_ = '_'}}),
write)],
ok.
@@ -453,7 +456,7 @@ list_bindings(VHostPath) ->
[{ExchangeName, QueueName, RoutingKey, Arguments} ||
#route{binding = #binding{
exchange_name = ExchangeName,
- key = RoutingKey,
+ key = RoutingKey,
queue_name = QueueName,
args = Arguments}}
<- mnesia:dirty_match_object(
@@ -614,7 +617,7 @@ list_exchange_bindings(ExchangeName) ->
[{QueueName, RoutingKey, Arguments} ||
#route{binding = #binding{queue_name = QueueName,
key = RoutingKey,
- args = Arguments}}
+ args = Arguments}}
<- mnesia:dirty_match_object(rabbit_route, Route)].
% Refactoring is left as an exercise for the reader
@@ -624,5 +627,5 @@ list_queue_bindings(QueueName) ->
[{ExchangeName, RoutingKey, Arguments} ||
#route{binding = #binding{exchange_name = ExchangeName,
key = RoutingKey,
- args = Arguments}}
+ args = Arguments}}
<- mnesia:dirty_match_object(rabbit_route, Route)].
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index 5c447792a2..b7c6aa96fa 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -115,7 +115,7 @@ collect_content_payload(ChannelPid, RemainingByteCount, Acc) ->
collect_content_payload(ChannelPid,
RemainingByteCount - size(FragmentBin),
[FragmentBin | Acc]);
- _ ->
+ _ ->
rabbit_misc:protocol_error(
command_invalid,
"expected content body, got non content body frame instead",
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index ea61a679e8..2fa531a7ce 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index ed0066fe07..4556570567 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_hooks.erl b/src/rabbit_hooks.erl
index b3d271c28d..3fc84c1e09 100644
--- a/src/rabbit_hooks.erl
+++ b/src/rabbit_hooks.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -61,8 +61,8 @@ unsubscribe(Hook, HandlerName) ->
trigger(Hook, Args) ->
Hooks = ets:lookup(?TableName, Hook),
[case catch apply(M, F, [Hook, Name, Args | A]) of
- {'EXIT', Reason} ->
- rabbit_log:warning("Failed to execute handler ~p for hook ~p: ~p",
+ {'EXIT', Reason} ->
+ rabbit_log:warning("Failed to execute handler ~p for hook ~p: ~p",
[Name, Hook, Reason]);
_ -> ok
end || {_, Name, {M, F, A}} <- Hooks],
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 087a9f64d9..c9f8183fc9 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -31,12 +31,13 @@
-module(rabbit_limiter).
--behaviour(gen_server).
+-behaviour(gen_server2).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
--export([start_link/1, shutdown/1]).
+-export([start_link/2, shutdown/1]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
+-export([get_limit/1]).
%%----------------------------------------------------------------------------
@@ -44,13 +45,14 @@
-type(maybe_pid() :: pid() | 'undefined').
--spec(start_link/1 :: (pid()) -> pid()).
+-spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
+-spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()).
-endif.
@@ -68,8 +70,8 @@
%% API
%%----------------------------------------------------------------------------
-start_link(ChPid) ->
- {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []),
+start_link(ChPid, UnackedMsgCount) ->
+ {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []),
Pid.
shutdown(undefined) ->
@@ -104,12 +106,19 @@ register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}).
unregister(undefined, _QPid) -> ok;
unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}).
+get_limit(undefined) ->
+ 0;
+get_limit(Pid) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> 0 end,
+ fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([ChPid]) ->
- {ok, #lim{ch_pid = ChPid} }.
+init([ChPid, UnackedMsgCount]) ->
+ {ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}.
handle_call({can_send, QPid, AckRequired}, _From,
State = #lim{volume = Volume}) ->
@@ -118,7 +127,10 @@ handle_call({can_send, QPid, AckRequired}, _From,
false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
true -> Volume
end}}
- end.
+ end;
+
+handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
+ {reply, PrefetchCount, State}.
handle_cast(shutdown, State) ->
{stop, normal, State};
diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl
index 6ef638cb59..4f467162e4 100644
--- a/src/rabbit_load.erl
+++ b/src/rabbit_load.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index dd5b498b07..cc80e360ae 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index d927bfb1f9..0654f58ae7 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -55,7 +55,9 @@
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
--export([unfold/2, ceil/1]).
+-export([unfold/2, ceil/1, queue_fold/3]).
+-export([pid_to_string/1, string_to_pid/1]).
+-export([version_compare/2, version_compare/3]).
-import(mnesia).
-import(lists).
@@ -126,6 +128,9 @@
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
-spec(ceil/1 :: (number()) -> number()).
+-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B).
+-spec(pid_to_string/1 :: (pid()) -> string()).
+-spec(string_to_pid/1 :: (string()) -> pid()).
-endif.
@@ -488,7 +493,105 @@ unfold(Fun, Acc, Init) ->
ceil(N) ->
T = trunc(N),
- case N - T of
- 0 -> N;
- _ -> 1 + T
+ case N == T of
+ true -> T;
+ false -> 1 + T
+ end.
+
+queue_fold(Fun, Init, Q) ->
+ case queue:out(Q) of
+ {empty, _Q} -> Init;
+ {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
+ end.
+
+%% This provides a string representation of a pid that is the same
+%% regardless of what node we are running on. The representation also
+%% permits easy identification of the pid's node.
+pid_to_string(Pid) when is_pid(Pid) ->
+ %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and
+ %% 8.7)
+ <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>>
+ = term_to_binary(Pid),
+ Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
+ lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])).
+
+%% inverse of above
+string_to_pid(Str) ->
+ ErrorFun = fun () -> throw({error, {invalid_pid_syntax, Str}}) end,
+ %% TODO: simplify this code by using the 're' module, once we drop
+ %% support for R11
+ %%
+ %% 1) sanity check
+ %% The \ before the trailing $ is only there to keep emacs
+ %% font-lock from getting confused.
+ case regexp:first_match(Str, "^<.*\\.[0-9]+\\.[0-9]+>\$") of
+ {match, _, _} ->
+ %% 2) strip <>
+ Str1 = string:substr(Str, 2, string:len(Str) - 2),
+ %% 3) extract three constituent parts, taking care to
+ %% handle dots in the node part (hence the reverse and concat)
+ [SerStr, IdStr | Rest] = lists:reverse(string:tokens(Str1, ".")),
+ NodeStr = lists:concat(lists:reverse(Rest)),
+ %% 4) construct a triple term from the three parts
+ TripleStr = lists:flatten(io_lib:format("{~s,~s,~s}.",
+ [NodeStr, IdStr, SerStr])),
+ %% 5) parse the triple
+ Tokens = case erl_scan:string(TripleStr) of
+ {ok, Tokens1, _} -> Tokens1;
+ {error, _, _} -> ErrorFun()
+ end,
+ Term = case erl_parse:parse_term(Tokens) of
+ {ok, Term1} -> Term1;
+ {error, _} -> ErrorFun()
+ end,
+ {Node, Id, Ser} =
+ case Term of
+ {Node1, Id1, Ser1} when is_atom(Node1) andalso
+ is_integer(Id1) andalso
+ is_integer(Ser1) ->
+ Term;
+ _ ->
+ ErrorFun()
+ end,
+ %% 6) turn the triple into a pid - see pid_to_string
+ <<131,NodeEnc/binary>> = term_to_binary(Node),
+ binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>);
+ nomatch ->
+ ErrorFun();
+ Error ->
+ %% invalid regexp - shouldn't happen
+ throw(Error)
+ end.
+
+version_compare(A, B, lte) ->
+ case version_compare(A, B) of
+ eq -> true;
+ lt -> true;
+ gt -> false
+ end;
+version_compare(A, B, gte) ->
+ case version_compare(A, B) of
+ eq -> true;
+ gt -> true;
+ lt -> false
+ end;
+version_compare(A, B, Result) ->
+ Result =:= version_compare(A, B).
+
+version_compare([], []) ->
+ eq;
+version_compare([], _ ) ->
+ lt; %% 2.3 < 2.3.1
+version_compare(_ , []) ->
+ gt; %% 2.3.1 > 2.3
+version_compare(A, B) ->
+ {AStr, ATl} = lists:splitwith(fun (X) -> X =/= $. end, A),
+ {BStr, BTl} = lists:splitwith(fun (X) -> X =/= $. end, B),
+ ANum = list_to_integer(AStr),
+ BNum = list_to_integer(BStr),
+ if ANum =:= BNum -> ATl1 = lists:dropwhile(fun (X) -> X =:= $. end, ATl),
+ BTl1 = lists:dropwhile(fun (X) -> X =:= $. end, BTl),
+ version_compare(ATl1, BTl1);
+ ANum < BNum -> lt;
+ ANum > BNum -> gt
end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 749038dbb1..6ec3cf74b3 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -55,8 +55,8 @@
-spec(cluster/1 :: ([erlang_node()]) -> 'ok').
-spec(reset/0 :: () -> 'ok').
-spec(force_reset/0 :: () -> 'ok').
--spec(is_clustered/0 :: () -> boolean()).
--spec(empty_ram_only_tables/0 :: () -> 'ok').
+-spec(is_clustered/0 :: () -> boolean()).
+-spec(empty_ram_only_tables/0 :: () -> 'ok').
-spec(create_tables/0 :: () -> 'ok').
-endif.
@@ -173,7 +173,7 @@ replicated_table_names() ->
].
dir() -> mnesia:system_info(directory).
-
+
ensure_mnesia_dir() ->
MnesiaDir = dir() ++ "/",
case filelib:ensure_dir(MnesiaDir) of
@@ -389,7 +389,7 @@ wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
wait_for_tables() -> wait_for_tables(table_names()).
-wait_for_tables(TableNames) ->
+wait_for_tables(TableNames) ->
case check_schema_integrity() of
ok ->
case mnesia:wait_for_tables(TableNames, 30000) of
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index dc642df403..bfafa6ff05 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -187,7 +187,7 @@ start_node(Node, RpcTimeout) ->
io:format("Starting node ~s...~n", [Node]),
case rpc:call(Node, os, getpid, []) of
{badrpc, _} ->
- Port = run_cmd(script_filename()),
+ Port = run_rabbitmq_server(),
Started = wait_for_rabbit_to_start(Node, RpcTimeout, Port),
Pid = case rpc:call(Node, os, getpid, []) of
{badrpc, _} -> throw(cannot_get_pid);
@@ -217,8 +217,22 @@ wait_for_rabbit_to_start(Node, RpcTimeout, Port) ->
end
end.
-run_cmd(FullPath) ->
- erlang:open_port({spawn, FullPath}, [nouse_stdio]).
+run_rabbitmq_server() ->
+ with_os([{unix, fun run_rabbitmq_server_unix/0},
+ {win32, fun run_rabbitmq_server_win32/0}]).
+
+run_rabbitmq_server_unix() ->
+ FullPath = getenv("RABBITMQ_SCRIPT_HOME") ++ "/rabbitmq-server",
+ erlang:open_port({spawn_executable, FullPath},
+ [{arg0, FullPath}, {args, ["-noinput"]}, nouse_stdio]).
+
+run_rabbitmq_server_win32() ->
+ Cmd = filename:nativename(os:find_executable("cmd")),
+ CmdLine = "\"" ++ getenv("RABBITMQ_SCRIPT_HOME")
+ ++ "\\rabbitmq-server.bat\" -noinput",
+ erlang:open_port({spawn_executable, Cmd},
+ [{arg0, Cmd}, {args, ["/q", "/s", "/c", CmdLine]},
+ nouse_stdio, hide]).
is_rabbit_running(Node, RpcTimeout) ->
case rpc:call(Node, rabbit, status, [], RpcTimeout) of
@@ -236,13 +250,6 @@ with_os(Handlers) ->
Handler -> Handler()
end.
-script_filename() ->
- ScriptHome = getenv("RABBITMQ_SCRIPT_HOME"),
- ScriptName = with_os(
- [{unix , fun () -> "rabbitmq-server" end},
- {win32, fun () -> "rabbitmq-server.bat" end}]),
- ScriptHome ++ "/" ++ ScriptName ++ " -noinput".
-
pids_file() -> getenv("RABBITMQ_PIDS_FILE").
write_pids_file(Pids) ->
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index a5ccc8e9ae..406977b42a 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -40,7 +40,7 @@
-ifdef(use_specs).
--type(stat_option() ::
+-type(stat_option() ::
'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' |
'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend').
-type(error() :: {'error', any()}).
@@ -50,11 +50,11 @@
-spec(controlling_process/2 :: (socket(), pid()) -> 'ok' | error()).
-spec(port_command/2 :: (socket(), iolist()) -> 'true').
-spec(send/2 :: (socket(), binary() | iolist()) -> 'ok' | error()).
--spec(peername/1 :: (socket()) ->
+-spec(peername/1 :: (socket()) ->
{'ok', {ip_address(), non_neg_integer()}} | error()).
--spec(sockname/1 :: (socket()) ->
+-spec(sockname/1 :: (socket()) ->
{'ok', {ip_address(), non_neg_integer()}} | error()).
--spec(getstat/2 :: (socket(), [stat_option()]) ->
+-spec(getstat/2 :: (socket(), [stat_option()]) ->
{'ok', [{stat_option(), integer()}]} | error()).
-endif.
@@ -66,8 +66,8 @@ async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) ->
Pid = self(),
Ref = make_ref(),
- spawn(fun() -> Pid ! {inet_async, Sock, Ref,
- ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)}
+ spawn(fun() -> Pid ! {inet_async, Sock, Ref,
+ ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)}
end),
{ok, Ref};
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 84658a85c6..cf04f05b7e 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -31,11 +31,13 @@
-module(rabbit_networking).
--export([boot/0, start/0, start_tcp_listener/2, start_ssl_listener/3,
- stop_tcp_listener/2, on_node_down/1, active_listeners/0,
- node_listeners/1, connections/0, connection_info/1,
- connection_info/2, connection_info_all/0,
- connection_info_all/1]).
+-export([boot/0, start/0, start_tcp_listener/2, start_ssl_listener/3,
+ stop_tcp_listener/2, on_node_down/1, active_listeners/0,
+ node_listeners/1, connections/0, connection_info_keys/0,
+ connection_info/1, connection_info/2,
+ connection_info_all/0, connection_info_all/1,
+ close_connection/2]).
+
%%used by TCP-based transports, e.g. STOMP adapter
-export([check_tcp_listener_address/3]).
@@ -46,11 +48,11 @@
-include_lib("kernel/include/inet.hrl").
-define(RABBIT_TCP_OPTS, [
- binary,
- {packet, raw}, % no packaging
- {reuseaddr, true}, % allow rebind without waiting
- %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg.
- %% {delay_send, true},
+ binary,
+ {packet, raw}, % no packaging
+ {reuseaddr, true}, % allow rebind without waiting
+ %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg.
+ %% {delay_send, true},
{exit_on_close, false}
]).
@@ -70,10 +72,12 @@
-spec(active_listeners/0 :: () -> [listener()]).
-spec(node_listeners/1 :: (erlang_node()) -> [listener()]).
-spec(connections/0 :: () -> [connection()]).
+-spec(connection_info_keys/0 :: () -> [info_key()]).
-spec(connection_info/1 :: (connection()) -> [info()]).
-spec(connection_info/2 :: (connection(), [info_key()]) -> [info()]).
-spec(connection_info_all/0 :: () -> [[info()]]).
-spec(connection_info_all/1 :: ([info_key()]) -> [[info()]]).
+-spec(close_connection/2 :: (pid(), string()) -> 'ok').
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(check_tcp_listener_address/3 :: (atom(), host(), ip_port()) ->
{ip_address(), atom()}).
@@ -206,7 +210,7 @@ start_ssl_client(SslOpts, Sock) ->
{error, {ssl_upgrade_error, Reason}};
{'EXIT', Reason} ->
{error, {ssl_upgrade_failure, Reason}}
-
+
end
end).
@@ -214,12 +218,21 @@ connections() ->
[Pid || {_, Pid, _, _} <- supervisor:which_children(
rabbit_tcp_client_sup)].
+connection_info_keys() -> rabbit_reader:info_keys().
+
connection_info(Pid) -> rabbit_reader:info(Pid).
connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items).
connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end).
connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end).
+close_connection(Pid, Explanation) ->
+ case lists:any(fun ({_, ChildPid, _, _}) -> ChildPid =:= Pid end,
+ supervisor:which_children(rabbit_tcp_client_sup)) of
+ true -> rabbit_reader:shutdown(Pid, Explanation);
+ false -> throw({error, {not_a_connection_pid, Pid}})
+ end.
+
%%--------------------------------------------------------------------
tcp_host({0,0,0,0}) ->
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 14a69a472e..f3013a16cf 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index d0d60ddf3d..019d2a269d 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -57,7 +57,7 @@
-record(pstate, {log_handle, entry_count, deadline,
pending_logs, pending_replies,
- snapshot}).
+ snapshot}).
%% two tables for efficient persistency
%% one maps a key to a message
@@ -166,7 +166,7 @@ handle_call({transaction, Key, MessageList}, From, State) ->
do_noreply(internal_commit(From, Key, NewState));
handle_call({commit_transaction, TxnKey}, From, State) ->
do_noreply(internal_commit(From, TxnKey, State));
-handle_call(force_snapshot, _From, State) ->
+handle_call(force_snapshot, _From, State) ->
do_reply(ok, flush(true, State));
handle_call(serial, _From,
State = #pstate{snapshot = #psnapshot{serial = Serial}}) ->
@@ -211,7 +211,7 @@ internal_dirty_work(MessageList, State) ->
log_work(fun (ML) -> {dirty_work, ML} end,
MessageList, State).
-internal_commit(From, Key, State = #pstate{snapshot = Snapshot}) ->
+internal_commit(From, Key, State = #pstate{snapshot = Snapshot}) ->
Unit = {commit_transaction, Key},
NewSnapshot = internal_integrate1(Unit, Snapshot),
complete(From, Unit, State#pstate{snapshot = NewSnapshot}).
@@ -243,7 +243,7 @@ log_work(CreateWorkUnit, MessageList,
fun(M = {publish, Message, QK = {_QName, PKey}}) ->
case ets:lookup(Messages, PKey) of
[_] -> {tied, QK};
- [] -> ets:insert(Messages, {PKey, Message}),
+ [] -> ets:insert(Messages, {PKey, Message}),
M
end;
(M) -> M
@@ -252,7 +252,7 @@ log_work(CreateWorkUnit, MessageList,
NewSnapshot = internal_integrate1(Unit, Snapshot),
log(State#pstate{snapshot = NewSnapshot}, Unit).
-log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs},
+log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs},
Message) ->
State#pstate{deadline = compute_deadline(?LOG_BUNDLE_DELAY,
ExistingDeadline),
@@ -365,7 +365,7 @@ prune_table(Tab, Keys) ->
true = ets:safe_fixtable(Tab, true),
ok = prune_table(Tab, Keys, ets:first(Tab)),
true = ets:safe_fixtable(Tab, false).
-
+
prune_table(_Tab, _Keys, '$end_of_table') -> ok;
prune_table(Tab, Keys, Key) ->
case sets:is_element(Key, Keys) of
@@ -374,7 +374,7 @@ prune_table(Tab, Keys, Key) ->
end,
prune_table(Tab, Keys, ets:next(Tab, Key)).
-internal_load_snapshot(LogHandle,
+internal_load_snapshot(LogHandle,
Snapshot = #psnapshot{messages = Messages,
queues = Queues}) ->
{K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start),
@@ -435,9 +435,9 @@ accumulate_requeues({{QName, PKey}, Delivered}, Acc) ->
requeue(QName, Requeues, Messages) ->
case rabbit_amqqueue:lookup(QName) of
{ok, #amqqueue{pid = QPid}} ->
- RequeueMessages =
+ RequeueMessages =
[{{QName, PKey}, Message, Delivered} ||
- {PKey, Delivered} <- Requeues,
+ {PKey, Delivered} <- Requeues,
{_, Message} <- ets:lookup(Messages, PKey)],
rabbit_amqqueue:redeliver(
QPid,
@@ -459,7 +459,7 @@ replay([], LogHandle, K, Snapshot) ->
{K1, Items} ->
replay(Items, LogHandle, K1, Snapshot);
{K1, Items, Badbytes} ->
- rabbit_log:warning("~p bad bytes recovering persister log~n",
+ rabbit_log:warning("~p bad bytes recovering persister log~n",
[Badbytes]),
replay(Items, LogHandle, K1, Snapshot);
eof -> Snapshot
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index 4fcfab7895..274981efed 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 77dce1fab3..738f7e3f9e 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -33,7 +33,7 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/0, info/1, info/2]).
+-export([start_link/0, info_keys/0, info/1, info/2, shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -132,8 +132,10 @@
-ifdef(use_specs).
+-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (pid()) -> [info()]).
-spec(info/2 :: (pid(), [info_key()]) -> [info()]).
+-spec(shutdown/2 :: (pid(), string()) -> 'ok').
-endif.
@@ -142,6 +144,9 @@
start_link() ->
{ok, proc_lib:spawn_link(?MODULE, init, [self()])}.
+shutdown(Pid, Explanation) ->
+ gen_server:call(Pid, {shutdown, Explanation}, infinity).
+
init(Parent) ->
Deb = sys:debug_options([]),
receive
@@ -158,6 +163,8 @@ system_terminate(Reason, _Parent, _Deb, _State) ->
system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
+info_keys() -> ?INFO_KEYS.
+
info(Pid) ->
gen_server:call(Pid, info, infinity).
@@ -196,7 +203,7 @@ teardown_profiling(Value) ->
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
-socket_op(Sock, Fun) ->
+socket_op(Sock, Fun) ->
case Fun(Sock) of
{ok, Res} -> Res;
{error, Reason} -> rabbit_log:error("error on TCP connection ~p:~p~n",
@@ -216,7 +223,7 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
ProfilingValue = setup_profiling(),
- try
+ try
mainloop(Parent, Deb, switch_callback(
#v1{sock = ClientSock,
connection = #connection{
@@ -267,14 +274,9 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
{inet_async, Sock, Ref, {error, Reason}} ->
throw({inet_error, Reason});
{'EXIT', Parent, Reason} ->
- if State#v1.connection_state =:= running ->
- send_exception(State, 0,
- rabbit_misc:amqp_error(connection_forced,
- "broker forced connection closure with reason '~w'",
- [Reason], none));
- true -> ok
- end,
- %% this is what we are expected to do according to
+ terminate(io_lib:format("broker forced connection closure "
+ "with reason '~w'", [Reason]), State),
+ %% this is what we are expected to do according to
%% http://www.erlang.org/doc/man/sys.html
%%
%% If we wanted to be *really* nice we should wait for a
@@ -301,6 +303,13 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
end;
timeout ->
throw({timeout, State#v1.connection_state});
+ {'$gen_call', From, {shutdown, Explanation}} ->
+ {ForceTermination, NewState} = terminate(Explanation, State),
+ gen_server:reply(From, ok),
+ case ForceTermination of
+ force -> ok;
+ normal -> mainloop(Parent, Deb, NewState)
+ end;
{'$gen_call', From, info} ->
gen_server:reply(From, infos(?INFO_KEYS, State)),
mainloop(Parent, Deb, State);
@@ -323,6 +332,13 @@ switch_callback(OldState, NewCallback, Length) ->
OldState#v1{callback = NewCallback,
recv_ref = Ref}.
+terminate(Explanation, State = #v1{connection_state = running}) ->
+ {normal, send_exception(State, 0,
+ rabbit_misc:amqp_error(
+ connection_forced, Explanation, [], none))};
+terminate(_Explanation, State) ->
+ {force, State}.
+
close_connection(State = #v1{connection = #connection{
timeout_sec = TimeoutSec}}) ->
%% We terminate the connection after the specified interval, but
@@ -648,7 +664,7 @@ i(peer_port, #v1{sock = Sock}) ->
{ok, {_, P}} = rabbit_net:peername(Sock),
P;
i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
- SockStat =:= recv_cnt;
+ SockStat =:= recv_cnt;
SockStat =:= send_oct;
SockStat =:= send_cnt;
SockStat =:= send_pend ->
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 10f80cc301..ee2b82c5bd 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl
index 2a365ce10e..434cdae050 100644
--- a/src/rabbit_sasl_report_file_h.erl
+++ b/src/rabbit_sasl_report_file_h.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index ef32544cc4..a1b8948155 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 416827cb80..8602a2099d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -49,6 +49,7 @@ test_content_prop_roundtrip(Datum, Binary) ->
all_tests() ->
passed = test_priority_queue(),
+ passed = test_pg_local(),
passed = test_unfold(),
passed = test_parsing(),
passed = test_content_framing(),
@@ -106,7 +107,7 @@ test_priority_queue() ->
{true, false, 2, [{1, bar}, {0, foo}], [bar, foo]} =
test_priority_queue(Q6),
- %% merge 1-element priority Q with 1-element no-priority Q
+ %% merge 1-element priority Q with 1-element no-priority Q
Q7 = priority_queue:join(priority_queue:in(foo, 1, Q),
priority_queue:in(bar, Q)),
{true, false, 2, [{1, foo}, {0, bar}], [foo, bar]} =
@@ -184,6 +185,28 @@ test_simple_n_element_queue(N) ->
{true, false, N, ToListRes, Items} = test_priority_queue(Q),
passed.
+test_pg_local() ->
+ [P, Q] = [spawn(fun () -> receive X -> X end end) || _ <- [x, x]],
+ check_pg_local(ok, [], []),
+ check_pg_local(pg_local:join(a, P), [P], []),
+ check_pg_local(pg_local:join(b, P), [P], [P]),
+ check_pg_local(pg_local:join(a, P), [P, P], [P]),
+ check_pg_local(pg_local:join(a, Q), [P, P, Q], [P]),
+ check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q]),
+ check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q, Q]),
+ check_pg_local(pg_local:leave(a, P), [P, Q], [P, Q, Q]),
+ check_pg_local(pg_local:leave(b, P), [P, Q], [Q, Q]),
+ check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]),
+ check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]),
+ [X ! done || X <- [P, Q]],
+ check_pg_local(ok, [], []),
+ passed.
+
+check_pg_local(ok, APids, BPids) ->
+ ok = pg_local:sync(),
+ [true, true] = [lists:sort(Pids) == lists:sort(pg_local:get_members(Key)) ||
+ {Key, Pids} <- [{a, APids}, {b, BPids}]].
+
test_unfold() ->
{[], test} = rabbit_misc:unfold(fun (_V) -> false end, test),
List = lists:seq(2,20,2),
@@ -291,7 +314,7 @@ test_field_values() ->
4,"long", "l", 1234567890:64, % + 14 = 145
5,"short", "s", 655:16, % + 9 = 154
4,"bool", "t", 1, % + 7 = 161
- 6,"binary", "x", 15:32, "a binary string", % + 27 = 188
+ 6,"binary", "x", 15:32, "a binary string", % + 27 = 188
4,"void", "V", % + 6 = 194
5,"array", "A", 23:32, % + 11 = 205
"I", 54321:32, % + 5 = 210
@@ -463,7 +486,7 @@ test_log_management_during_startup() ->
{sasl_report_tty_h, []}]),
ok = control_action(start_app, []),
- %% start application with tty logging and
+ %% start application with tty logging and
%% proper handlers not installed
ok = control_action(stop_app, []),
ok = error_logger:tty(false),
@@ -495,7 +518,7 @@ test_log_management_during_startup() ->
ok = add_log_handlers([{error_logger_file_h, MainLog}]),
ok = case control_action(start_app, []) of
ok -> exit({got_success_but_expected_failure,
- log_rotation_no_write_permission_dir_test});
+ log_rotation_no_write_permission_dir_test});
{error, {cannot_log_to_file, _, _}} -> ok
end,
@@ -516,7 +539,7 @@ test_log_management_during_startup() ->
ok = file:del_dir(TmpDir),
%% start application with standard error_logger_file_h
- %% handler not installed
+ %% handler not installed
ok = application:set_env(kernel, error_logger, {file, MainLog}),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
@@ -624,7 +647,7 @@ test_cluster_management2(SecondaryNode) ->
ok = control_action(cluster, [SecondaryNodeS, NodeS]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
-
+
%% convert a disk node into a ram node
ok = control_action(cluster, ["invalid1@invalid",
"invalid2@invalid"]),
@@ -728,46 +751,48 @@ test_user_management() ->
passed.
test_server_status() ->
-
- %% create a queue so we have something to list
- Q = #amqqueue{} = rabbit_amqqueue:declare(
- rabbit_misc:r(<<"/">>, queue, <<"foo">>),
- false, false, [], none),
-
+ %% create a few things so there is some useful information to list
+ Writer = spawn(fun () -> receive shutdown -> ok end end),
+ Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>),
+ [Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare(
+ rabbit_misc:r(<<"/">>, queue, Name),
+ false, false, [], none) ||
+ Name <- [<<"foo">>, <<"bar">>]],
+
+ ok = rabbit_amqqueue:basic_consume(Q, true, self(), Ch, undefined,
+ <<"ctag">>, true, undefined),
%% list queues
- ok = info_action(
- list_queues,
- [name, durable, auto_delete, arguments, pid,
- messages_ready, messages_unacknowledged, messages_uncommitted,
- messages, acks_uncommitted, consumers, transactions, memory],
- true),
+ ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true),
%% list exchanges
- ok = info_action(
- list_exchanges,
- [name, type, durable, arguments],
- true),
+ ok = info_action(list_exchanges, rabbit_exchange:info_keys(), true),
%% list bindings
ok = control_action(list_bindings, []),
- %% cleanup
- {ok, _} = rabbit_amqqueue:delete(Q, false, false),
-
%% list connections
[#listener{host = H, port = P} | _] =
[L || L = #listener{node = N} <- rabbit_networking:active_listeners(),
N =:= node()],
- {ok, C} = gen_tcp:connect(H, P, []),
+ {ok, _C} = gen_tcp:connect(H, P, []),
timer:sleep(100),
- ok = info_action(
- list_connections,
- [pid, address, port, peer_address, peer_port, state,
- channels, user, vhost, timeout, frame_max,
- recv_oct, recv_cnt, send_oct, send_cnt, send_pend],
- false),
- ok = gen_tcp:close(C),
+ ok = info_action(list_connections,
+ rabbit_networking:connection_info_keys(), false),
+ %% close_connection
+ [ConnPid] = rabbit_networking:connections(),
+ ok = control_action(close_connection, [rabbit_misc:pid_to_string(ConnPid),
+ "go away"]),
+
+ %% list channels
+ ok = info_action(list_channels, rabbit_channel:info_keys(), false),
+
+ %% list consumers
+ ok = control_action(list_consumers, []),
+
+ %% cleanup
+ [{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]],
+ ok = rabbit_channel:shutdown(Ch),
passed.
@@ -800,11 +825,11 @@ test_hooks() ->
{[arg1, arg2], 1, 3} = get(arg_hook_test_fired),
%% Invoking Pids
- Remote = fun() ->
- receive
- {rabbitmq_hook,[remote_test,test,[],Target]} ->
+ Remote = fun() ->
+ receive
+ {rabbitmq_hook,[remote_test,test,[],Target]} ->
Target ! invoked
- end
+ end
end,
P = spawn(Remote),
rabbit_hooks:subscribe(remote_test, test, {rabbit_hooks, notify_remote, [P, [self()]]}),
@@ -830,7 +855,7 @@ control_action(Command, Node, Args) ->
ok ->
io:format("done.~n"),
ok;
- Other ->
+ Other ->
io:format("failed.~n"),
Other
end.
diff --git a/src/rabbit_tracer.erl b/src/rabbit_tracer.erl
index 0c023f2aab..484249b1df 100644
--- a/src/rabbit_tracer.erl
+++ b/src/rabbit_tracer.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 1679ce7c15..54c60f5be0 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -59,7 +59,7 @@
(pid(), pid(), pid(), amqp_method(), content()) -> 'ok').
-spec(internal_send_command/3 ::
(socket(), channel_number(), amqp_method()) -> 'ok').
--spec(internal_send_command/5 ::
+-spec(internal_send_command/5 ::
(socket(), channel_number(), amqp_method(),
content(), non_neg_integer()) -> 'ok').
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index bc7425613f..68efc27f97 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -48,14 +48,15 @@ start_link(Callback, LSock) ->
%%--------------------------------------------------------------------
init({Callback, LSock}) ->
- case prim_inet:async_accept(LSock, -1) of
- {ok, Ref} -> {ok, #state{callback=Callback, sock=LSock, ref=Ref}};
- Error -> {stop, {cannot_accept, Error}}
- end.
+ gen_server:cast(self(), accept),
+ {ok, #state{callback=Callback, sock=LSock}}.
handle_call(_Request, _From, State) ->
{noreply, State}.
+handle_cast(accept, State) ->
+ accept(State);
+
handle_cast(_Msg, State) ->
{noreply, State}.
@@ -63,7 +64,7 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
State = #state{callback={M,F,A}, sock=LSock, ref=Ref}) ->
%% patch up the socket so it looks like one we got from
- %% gen_tcp:accept/1
+ %% gen_tcp:accept/1
{ok, Mod} = inet_db:lookup_socket(LSock),
inet_db:register_socket(Sock, Mod),
@@ -83,10 +84,7 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
end,
%% accept more
- case prim_inet:async_accept(LSock, -1) of
- {ok, NRef} -> {noreply, State#state{ref=NRef}};
- Error -> {stop, {cannot_accept, Error}, none}
- end;
+ accept(State);
handle_info({inet_async, LSock, Ref, {error, closed}},
State=#state{sock=LSock, ref=Ref}) ->
%% It would be wrong to attempt to restart the acceptor when we
@@ -104,3 +102,9 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
+
+accept(State = #state{sock=LSock}) ->
+ case prim_inet:async_accept(LSock, -1) of
+ {ok, Ref} -> {noreply, State#state{ref=Ref}};
+ Error -> {stop, {cannot_accept, Error}, State}
+ end.
diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl
index f2bad5bc2e..6e3bc4c966 100644
--- a/src/tcp_acceptor_sup.erl
+++ b/src/tcp_acceptor_sup.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/tcp_client_sup.erl b/src/tcp_client_sup.erl
index d92066a6c3..1b78584384 100644
--- a/src/tcp_client_sup.erl
+++ b/src/tcp_client_sup.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl
index 4a2e149bb8..73ef9586bf 100644
--- a/src/tcp_listener.erl
+++ b/src/tcp_listener.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -69,7 +69,7 @@ init({IPAddress, Port, SocketOpts,
[Label, inet_parse:ntoa(LIPAddress), LPort]),
apply(M, F, A ++ [IPAddress, Port]),
{ok, #state{sock = LSock,
- on_startup = OnStartup, on_shutdown = OnShutdown,
+ on_startup = OnStartup, on_shutdown = OnShutdown,
label = Label}};
{error, Reason} ->
error_logger:error_msg(
diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl
index d6bbac080f..0fe1542616 100644
--- a/src/tcp_listener_sup.erl
+++ b/src/tcp_listener_sup.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 8be28f523d..cd03fcc6e6 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -37,8 +37,6 @@
%%
%% This module tries to warn Rabbit before such situations occur, so
%% that it has a higher chance to avoid running out of memory.
-%%
-%% This code depends on Erlang os_mon application.
-module(vm_memory_monitor).
@@ -51,7 +49,8 @@
-export([update/0, get_total_memory/0,
get_check_interval/0, set_check_interval/1,
- get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1]).
+ get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1,
+ get_memory_limit/0]).
-define(SERVER, ?MODULE).
@@ -77,6 +76,8 @@
('ignore' | {'error', any()} | {'ok', pid()})).
-spec(update/0 :: () -> 'ok').
-spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')).
+-spec(get_vm_limit/0 :: () -> (non_neg_integer() | 'unknown')).
+-spec(get_memory_limit/0 :: () -> (non_neg_integer() | 'undefined')).
-spec(get_check_interval/0 :: () -> non_neg_integer()).
-spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok').
-spec(get_vm_memory_high_watermark/0 :: () -> float()).
@@ -95,17 +96,24 @@ update() ->
get_total_memory() ->
get_total_memory(os:type()).
+get_vm_limit() ->
+ get_vm_limit(os:type()).
+
get_check_interval() ->
- gen_server:call(?MODULE, get_check_interval).
+ gen_server:call(?MODULE, get_check_interval, infinity).
set_check_interval(Fraction) ->
- gen_server:call(?MODULE, {set_check_interval, Fraction}).
+ gen_server:call(?MODULE, {set_check_interval, Fraction}, infinity).
get_vm_memory_high_watermark() ->
- gen_server:call(?MODULE, get_vm_memory_high_watermark).
+ gen_server:call(?MODULE, get_vm_memory_high_watermark, infinity).
set_vm_memory_high_watermark(Fraction) ->
- gen_server:call(?MODULE, {set_vm_memory_high_watermark, Fraction}).
+ gen_server:call(?MODULE, {set_vm_memory_high_watermark, Fraction},
+ infinity).
+
+get_memory_limit() ->
+ gen_server:call(?MODULE, get_memory_limit, infinity).
%%----------------------------------------------------------------------------
%% gen_server callbacks
@@ -152,6 +160,9 @@ handle_call({set_check_interval, Timeout}, _From, State) ->
{ok, cancel} = timer:cancel(State#state.timer),
{reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
+handle_call(get_memory_limit, _From, State) ->
+ {reply, State#state.memory_limit, State};
+
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -199,17 +210,26 @@ start_timer(Timeout) ->
{ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []),
TRef.
+%% According to http://msdn.microsoft.com/en-us/library/aa366778(VS.85).aspx
+%% Windows has 2GB and 8TB of address space for 32 and 64 bit accordingly.
+get_vm_limit({win32,_OSname}) ->
+ case erlang:system_info(wordsize) of
+ 4 -> 2*1024*1024*1024; %% 2 GB for 32 bits 2^31
+ 8 -> 8*1024*1024*1024*1024 %% 8 TB for 64 bits 2^42
+ end;
+
%% On a 32-bit machine, if you're using more than 2 gigs of RAM you're
%% in big trouble anyway.
-get_vm_limit() ->
+get_vm_limit(_OsType) ->
case erlang:system_info(wordsize) of
- 4 -> 4294967296; %% 4 GB for 32 bits 2^32
- 8 -> 281474976710656 %% 256 TB for 64 bits 2^48
+ 4 -> 4*1024*1024*1024; %% 4 GB for 32 bits 2^32
+ 8 -> 256*1024*1024*1024*1024 %% 256 TB for 64 bits 2^48
%%http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details
end.
get_mem_limit(MemFraction, TotalMemory) ->
- lists:min([trunc(TotalMemory * MemFraction), get_vm_limit()]).
+ AvMem = lists:min([TotalMemory, get_vm_limit()]),
+ trunc(AvMem * MemFraction).
%%----------------------------------------------------------------------------
%% Internal Helpers
@@ -241,11 +261,29 @@ get_total_memory({unix,freebsd}) ->
PageCount * PageSize;
get_total_memory({win32,_OSname}) ->
- [Result|_] = os_mon_sysinfo:get_mem_info(),
- {ok, [_MemLoad, TotPhys, _AvailPhys,
- _TotPage, _AvailPage, _TotV, _AvailV], _RestStr} =
- io_lib:fread("~d~d~d~d~d~d~d", Result),
- TotPhys;
+ %% Due to the Erlang print format bug, on Windows boxes the memory
+ %% size is broken. For example Windows 7 64 bit with 4Gigs of RAM
+ %% we get negative memory size:
+ %% > os_mon_sysinfo:get_mem_info().
+ %% ["76 -1658880 1016913920 -1 -1021628416 2147352576 2134794240\n"]
+ %% Due to this bug, we don't actually know anything. Even if the
+ %% number is postive we can't be sure if it's correct. This only
+ %% affects us on os_mon versions prior to 2.2.1.
+ case application:get_key(os_mon, vsn) of
+ undefined ->
+ unknown;
+ {ok, Version} ->
+ case rabbit_misc:version_compare(Version, "2.2.1", lt) of
+ true -> %% os_mon is < 2.2.1, so we know nothing
+ unknown;
+ false ->
+ [Result|_] = os_mon_sysinfo:get_mem_info(),
+ {ok, [_MemLoad, TotPhys, _AvailPhys,
+ _TotPage, _AvailPage, _TotV, _AvailV], _RestStr} =
+ io_lib:fread("~d~d~d~d~d~d~d", Result),
+ TotPhys
+ end
+ end;
get_total_memory({unix, linux}) ->
File = read_proc_file("/proc/meminfo"),