summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile4
-rw-r--r--docs/rabbitmq-env.conf.5.xml (renamed from docs/rabbitmq.conf.5.xml)19
-rw-r--r--docs/rabbitmq-multi.1.xml100
-rw-r--r--docs/rabbitmq-server.1.xml3
-rw-r--r--docs/rabbitmqctl.1.xml22
-rw-r--r--include/gm_specs.hrl2
-rw-r--r--include/rabbit.hrl2
-rw-r--r--packaging/RPMS/Fedora/Makefile1
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec4
-rw-r--r--packaging/common/rabbitmq-server.init70
-rwxr-xr-xpackaging/common/rabbitmq-server.ocf66
-rw-r--r--packaging/debs/Debian/Makefile1
-rw-r--r--packaging/debs/Debian/debian/postinst4
-rw-r--r--packaging/debs/Debian/debian/rules2
-rw-r--r--packaging/macports/Portfile.in19
-rw-r--r--packaging/windows/Makefile1
-rwxr-xr-xscripts/rabbitmq-env6
-rwxr-xr-xscripts/rabbitmq-multi72
-rw-r--r--scripts/rabbitmq-multi.bat84
-rw-r--r--src/file_handle_cache.erl27
-rw-r--r--src/gm.erl15
-rw-r--r--src/gm_soak_test.erl130
-rw-r--r--src/gm_tests.erl182
-rw-r--r--src/pg_local.erl2
-rw-r--r--src/rabbit.erl3
-rw-r--r--src/rabbit_amqqueue.erl52
-rw-r--r--src/rabbit_amqqueue_process.erl11
-rw-r--r--src/rabbit_binary_generator.erl26
-rw-r--r--src/rabbit_channel.erl225
-rw-r--r--src/rabbit_channel_sup.erl28
-rw-r--r--src/rabbit_control.erl52
-rw-r--r--src/rabbit_direct.erl22
-rw-r--r--src/rabbit_limiter.erl2
-rw-r--r--src/rabbit_mnesia.erl14
-rw-r--r--src/rabbit_msg_store.erl2
-rw-r--r--src/rabbit_multi.erl349
-rw-r--r--src/rabbit_reader.erl226
-rw-r--r--src/rabbit_registry.erl2
-rw-r--r--src/rabbit_tests.erl32
39 files changed, 803 insertions, 1081 deletions
diff --git a/Makefile b/Makefile
index 51b998f418..00c7809dbd 100644
--- a/Makefile
+++ b/Makefile
@@ -18,7 +18,7 @@ TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS)
WEB_URL=http://www.rabbitmq.com/
MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml))
WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml)
-USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-multi.1.xml
+USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml
USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML)))
ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes)
@@ -268,7 +268,7 @@ install_bin: all install_dirs
cp -r ebin include LICENSE LICENSE-MPL-RabbitMQ INSTALL $(TARGET_DIR)
chmod 0755 scripts/*
- for script in rabbitmq-env rabbitmq-server rabbitmqctl rabbitmq-multi; do \
+ for script in rabbitmq-env rabbitmq-server rabbitmqctl; do \
cp scripts/$$script $(TARGET_DIR)/sbin; \
[ -e $(SBIN_DIR)/$$script ] || ln -s $(SCRIPTS_REL_PATH)/$$script $(SBIN_DIR)/$$script; \
done
diff --git a/docs/rabbitmq.conf.5.xml b/docs/rabbitmq-env.conf.5.xml
index 31de71649c..c887596c59 100644
--- a/docs/rabbitmq.conf.5.xml
+++ b/docs/rabbitmq-env.conf.5.xml
@@ -9,20 +9,20 @@
</refentryinfo>
<refmeta>
- <refentrytitle>rabbitmq.conf</refentrytitle>
+ <refentrytitle>rabbitmq-env.conf</refentrytitle>
<manvolnum>5</manvolnum>
<refmiscinfo class="manual">RabbitMQ Server</refmiscinfo>
</refmeta>
<refnamediv>
- <refname>rabbitmq.conf</refname>
+ <refname>rabbitmq-env.conf</refname>
<refpurpose>default settings for RabbitMQ AMQP server</refpurpose>
</refnamediv>
<refsect1>
<title>Description</title>
<para>
-<filename>/etc/rabbitmq/rabbitmq.conf</filename> contains variable settings that override the
+<filename>/etc/rabbitmq/rabbitmq-env.conf</filename> contains variable settings that override the
defaults built in to the RabbitMQ startup scripts.
</para>
<para>
@@ -33,7 +33,7 @@ operator), including line comments starting with "#".
</para>
<para>
In order of preference, the startup scripts get their values from the
-environment, from <filename>/etc/rabbitmq/rabbitmq.conf</filename> and finally from the
+environment, from <filename>/etc/rabbitmq/rabbitmq-env.conf</filename> and finally from the
built-in default values. For example, for the <envar>RABBITMQ_NODENAME</envar>
setting,
</para>
@@ -48,26 +48,26 @@ empty string, then
<envar>NODENAME</envar>
</para>
<para>
-from <filename>/etc/rabbitmq/rabbitmq.conf</filename> is checked. If it is also absent
+from <filename>/etc/rabbitmq/rabbitmq-env.conf</filename> is checked. If it is also absent
or set equal to the empty string then the default value from the
startup script is used.
</para>
<para>
-The variable names in /etc/rabbitmq/rabbitmq.conf are always equal to the
+The variable names in /etc/rabbitmq/rabbitmq-env.conf are always equal to the
environment variable names, with the <envar>RABBITMQ_</envar> prefix removed:
<envar>RABBITMQ_NODE_PORT</envar> from the environment becomes <envar>NODE_PORT</envar> in the
-<filename>/etc/rabbitmq/rabbitmq.conf</filename> file, etc.
+<filename>/etc/rabbitmq/rabbitmq-env.conf</filename> file, etc.
</para>
<para role="example-prefix">For example:</para>
<screen role="example-multiline">
-# I am a complete /etc/rabbitmq/rabbitmq.conf file.
+# I am a complete /etc/rabbitmq/rabbitmq-env.conf file.
# Comment lines start with a hash character.
# This is a /bin/sh script file - use ordinary envt var syntax
NODENAME=hare
</screen>
<para role="example">
This is an example of a complete
- <filename>/etc/rabbitmq/rabbitmq.conf</filename> file that overrides the default Erlang
+ <filename>/etc/rabbitmq/rabbitmq-env.conf</filename> file that overrides the default Erlang
node name from "rabbit" to "hare".
</para>
@@ -76,7 +76,6 @@ NODENAME=hare
<refsect1>
<title>See also</title>
<para>
- <citerefentry><refentrytitle>rabbitmq-multi</refentrytitle><manvolnum>1</manvolnum></citerefentry>
<citerefentry><refentrytitle>rabbitmq-server</refentrytitle><manvolnum>1</manvolnum></citerefentry>
<citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>
</para>
diff --git a/docs/rabbitmq-multi.1.xml b/docs/rabbitmq-multi.1.xml
deleted file mode 100644
index 6586890abf..0000000000
--- a/docs/rabbitmq-multi.1.xml
+++ /dev/null
@@ -1,100 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd">
-<refentry lang="en">
- <refentryinfo>
- <productname>RabbitMQ Server</productname>
- <authorgroup>
- <corpauthor>The RabbitMQ Team &lt;<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>&gt;</corpauthor>
- </authorgroup>
- </refentryinfo>
-
- <refmeta>
- <refentrytitle>rabbitmq-multi</refentrytitle>
- <manvolnum>1</manvolnum>
- <refmiscinfo class="manual">RabbitMQ Server</refmiscinfo>
- </refmeta>
-
- <refnamediv>
- <refname>rabbitmq-multi</refname>
- <refpurpose>start/stop local cluster RabbitMQ nodes</refpurpose>
- </refnamediv>
-
- <refsynopsisdiv>
- <cmdsynopsis>
- <command>rabbitmq-multi</command>
- <arg choice="req"><replaceable>command</replaceable></arg>
- <arg choice="opt" rep="repeat"><replaceable>command options</replaceable></arg>
- </cmdsynopsis>
- </refsynopsisdiv>
-
- <refsect1>
- <title>Description</title>
- <para>
- RabbitMQ is an implementation of AMQP, the emerging standard for high
-performance enterprise messaging. The RabbitMQ server is a robust and
-scalable implementation of an AMQP broker.
- </para>
- <para>
-rabbitmq-multi scripts allows for easy set-up of a cluster on a single
-machine.
- </para>
- </refsect1>
-
- <refsect1>
- <title>Commands</title>
- <variablelist>
- <varlistentry>
- <term><cmdsynopsis><command>start_all</command> <arg choice="req"><replaceable>count</replaceable></arg></cmdsynopsis></term>
- <listitem>
- <para>
-Start count nodes with unique names, listening on all IP addresses and
-on sequential ports starting from 5672.
- </para>
- <para role="example-prefix">For example:</para>
- <screen role="example">rabbitmq-multi start_all 3</screen>
- <para role="example">
- Starts 3 local RabbitMQ nodes with unique, sequential port numbers.
- </para>
- </listitem>
- </varlistentry>
-
- <varlistentry>
- <term><cmdsynopsis><command>status</command></cmdsynopsis></term>
- <listitem>
- <para>
-Print the status of all running RabbitMQ nodes.
- </para>
- </listitem>
- </varlistentry>
-
- <varlistentry>
- <term><cmdsynopsis><command>stop_all</command></cmdsynopsis></term>
- <listitem>
- <para>
-Stop all local RabbitMQ nodes,
- </para>
- </listitem>
- </varlistentry>
-
- <varlistentry>
- <term><cmdsynopsis><command>rotate_logs</command></cmdsynopsis></term>
- <listitem>
- <para>
-Rotate log files for all local and running RabbitMQ nodes.
- </para>
- </listitem>
- </varlistentry>
-
- </variablelist>
- </refsect1>
-
-
- <refsect1>
- <title>See also</title>
- <para>
- <citerefentry><refentrytitle>rabbitmq.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry>
- <citerefentry><refentrytitle>rabbitmq-server</refentrytitle><manvolnum>1</manvolnum></citerefentry>
- <citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>
- </para>
- </refsect1>
-</refentry>
diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml
index f161a291f6..ca63927c20 100644
--- a/docs/rabbitmq-server.1.xml
+++ b/docs/rabbitmq-server.1.xml
@@ -124,8 +124,7 @@ Defaults to 5672.
<refsect1>
<title>See also</title>
<para>
- <citerefentry><refentrytitle>rabbitmq.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry>
- <citerefentry><refentrytitle>rabbitmq-multi</refentrytitle><manvolnum>1</manvolnum></citerefentry>
+ <citerefentry><refentrytitle>rabbitmq-env.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry>
<citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>
</para>
</refsect1>
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index bd9fee7d4b..3550e5eaef 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -158,6 +158,28 @@
</varlistentry>
<varlistentry>
+ <term><cmdsynopsis><command>wait</command></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Wait for the RabbitMQ application to start.
+ </para>
+ <para>
+ This command will wait for the RabbitMQ application to
+ start at the node. As long as the Erlang node is up but
+ the RabbitMQ application is down it will wait
+ indefinitely. If the node itself goes down, or takes
+ more than five seconds to come up, it will fail.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl wait</screen>
+ <para role="example">
+ This command will return when the RabbitMQ node has
+ started up.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><cmdsynopsis><command>status</command></cmdsynopsis></term>
<listitem>
<para>
diff --git a/include/gm_specs.hrl b/include/gm_specs.hrl
index 2109d15d22..ee29706e98 100644
--- a/include/gm_specs.hrl
+++ b/include/gm_specs.hrl
@@ -17,7 +17,7 @@
-ifdef(use_specs).
-type(callback_result() :: 'ok' | {'stop', any()} | {'become', atom(), args()}).
--type(args() :: [any()]).
+-type(args() :: any()).
-type(members() :: [pid()]).
-spec(joined/2 :: (args(), members()) -> callback_result()).
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index f79a8106b0..327867c75e 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -28,7 +28,7 @@
-record(vhost, {virtual_host, dummy}).
-record(connection, {protocol, user, timeout_sec, frame_max, vhost,
- client_properties}).
+ client_properties, capabilities}).
-record(content,
{class_id,
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index 74a1800adb..287945fe1b 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -31,7 +31,6 @@ prepare:
cp ${COMMON_DIR}/* SOURCES/
sed -i \
- -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/sysconfig/rabbitmq|' \
-e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \
SOURCES/rabbitmq-server.init
sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 473168644a..009d52998e 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -55,7 +55,6 @@ mkdir -p %{buildroot}%{_localstatedir}/log/rabbitmq
install -p -D -m 0755 %{S:1} %{buildroot}%{_initrddir}/rabbitmq-server
install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmqctl
install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server
-install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi
install -p -D -m 0755 %{_rabbit_server_ocf} %{buildroot}%{_exec_prefix}/lib/ocf/resource.d/rabbitmq/rabbitmq-server
install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server
@@ -92,6 +91,9 @@ fi
%post
/sbin/chkconfig --add %{name}
+if [ -f %{_sysconfdir}/rabbitmq/rabbitmq.conf ] && [ ! -f %{_sysconfdir}/rabbitmq/rabbitmq-env.conf ]; then
+ mv %{_sysconfdir}/rabbitmq/rabbitmq.conf %{_sysconfdir}/rabbitmq/rabbitmq-env.conf
+fi
%preun
if [ $1 = 0 ]; then
diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init
index 39d239835c..c1647dc505 100644
--- a/packaging/common/rabbitmq-server.init
+++ b/packaging/common/rabbitmq-server.init
@@ -17,75 +17,77 @@
### END INIT INFO
PATH=/sbin:/usr/sbin:/bin:/usr/bin
-DAEMON=/usr/sbin/rabbitmq-multi
NAME=rabbitmq-server
+DAEMON=/usr/sbin/${NAME}
+CONTROL=/usr/sbin/rabbitmqctl
DESC=rabbitmq-server
USER=rabbitmq
-NODE_COUNT=1
ROTATE_SUFFIX=
INIT_LOG_DIR=/var/log/rabbitmq
-DEFAULTS_FILE= # This is filled in when building packages
LOCK_FILE= # This is filled in when building packages
test -x $DAEMON || exit 0
-# Include rabbitmq defaults if available
-if [ -f "$DEFAULTS_FILE" ] ; then
- . $DEFAULTS_FILE
-fi
-
RETVAL=0
set -e
start_rabbitmq () {
- set +e
- $DAEMON start_all ${NODE_COUNT} > ${INIT_LOG_DIR}/startup_log 2> ${INIT_LOG_DIR}/startup_err
- case "$?" in
- 0)
- echo SUCCESS
- [ -n "$LOCK_FILE" ] && touch $LOCK_FILE
+ status_rabbitmq quiet
+ if [ $RETVAL = 0 ] ; then
+ echo RabbitMQ is currently running
+ else
RETVAL=0
- ;;
- 1)
- echo TIMEOUT - check ${INIT_LOG_DIR}/startup_\{log,err\}
- RETVAL=1
- ;;
- *)
- echo FAILED - check ${INIT_LOG_DIR}/startup_log, _err
- RETVAL=1
- ;;
- esac
- set -e
+ set +e
+ setsid sh -c "$DAEMON > ${INIT_LOG_DIR}/startup_log \
+ 2> ${INIT_LOG_DIR}/startup_err" &
+ $CONTROL wait >/dev/null 2>&1
+ RETVAL=$?
+ set -e
+ case "$RETVAL" in
+ 0)
+ echo SUCCESS
+ if [ -n "$LOCK_FILE" ] ; then
+ touch $LOCK_FILE
+ fi
+ ;;
+ *)
+ echo FAILED - check ${INIT_LOG_DIR}/startup_\{log, _err\}
+ RETVAL=1
+ ;;
+ esac
+ fi
}
stop_rabbitmq () {
- set +e
status_rabbitmq quiet
if [ $RETVAL = 0 ] ; then
- $DAEMON stop_all > ${INIT_LOG_DIR}/shutdown_log 2> ${INIT_LOG_DIR}/shutdown_err
+ set +e
+ $CONTROL stop > ${INIT_LOG_DIR}/shutdown_log 2> ${INIT_LOG_DIR}/shutdown_err
RETVAL=$?
+ set -e
if [ $RETVAL = 0 ] ; then
- [ -n "$LOCK_FILE" ] && rm -rf $LOCK_FILE
+ if [ -n "$LOCK_FILE" ] ; then
+ rm -f $LOCK_FILE
+ fi
else
echo FAILED - check ${INIT_LOG_DIR}/shutdown_log, _err
fi
else
- echo No nodes running
+ echo RabbitMQ is not running
RETVAL=0
fi
- set -e
}
status_rabbitmq() {
set +e
if [ "$1" != "quiet" ] ; then
- $DAEMON status 2>&1
+ $CONTROL status 2>&1
else
- $DAEMON status > /dev/null 2>&1
+ $CONTROL status > /dev/null 2>&1
fi
if [ $? != 0 ] ; then
- RETVAL=1
+ RETVAL=3
fi
set -e
}
@@ -100,7 +102,7 @@ rotate_logs_rabbitmq() {
}
restart_rabbitmq() {
- stop_rabbitmq
+ stop_rabbitmq
start_rabbitmq
}
diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf
index dc0521dd19..94999d0edf 100755
--- a/packaging/common/rabbitmq-server.ocf
+++ b/packaging/common/rabbitmq-server.ocf
@@ -20,7 +20,7 @@
##
## OCF instance parameters
-## OCF_RESKEY_multi
+## OCF_RESKEY_server
## OCF_RESKEY_ctl
## OCF_RESKEY_nodename
## OCF_RESKEY_ip
@@ -38,11 +38,11 @@
#######################################################################
-OCF_RESKEY_multi_default="/usr/sbin/rabbitmq-multi"
+OCF_RESKEY_server_default="/usr/sbin/rabbitmq-server"
OCF_RESKEY_ctl_default="/usr/sbin/rabbitmqctl"
OCF_RESKEY_nodename_default="rabbit@localhost"
OCF_RESKEY_log_base_default="/var/log/rabbitmq"
-: ${OCF_RESKEY_multi=${OCF_RESKEY_multi_default}}
+: ${OCF_RESKEY_server=${OCF_RESKEY_server_default}}
: ${OCF_RESKEY_ctl=${OCF_RESKEY_ctl_default}}
: ${OCF_RESKEY_nodename=${OCF_RESKEY_nodename_default}}
: ${OCF_RESKEY_log_base=${OCF_RESKEY_log_base_default}}
@@ -61,12 +61,12 @@ Resource agent for RabbitMQ-server
<shortdesc lang="en">Resource agent for RabbitMQ-server</shortdesc>
<parameters>
-<parameter name="multi" unique="0" required="0">
+<parameter name="server" unique="0" required="0">
<longdesc lang="en">
-The path to the rabbitmq-multi script
+The path to the rabbitmq-server script
</longdesc>
-<shortdesc lang="en">Path to rabbitmq-multi</shortdesc>
-<content type="string" default="${OCF_RESKEY_multi_default}" />
+<shortdesc lang="en">Path to rabbitmq-server</shortdesc>
+<content type="string" default="${OCF_RESKEY_server_default}" />
</parameter>
<parameter name="ctl" unique="0" required="0">
@@ -155,7 +155,7 @@ Expects to have a fully populated OCF RA-compliant environment set.
END
}
-RABBITMQ_MULTI=$OCF_RESKEY_multi
+RABBITMQ_SERVER=$OCF_RESKEY_server
RABBITMQ_CTL=$OCF_RESKEY_ctl
RABBITMQ_NODENAME=$OCF_RESKEY_nodename
RABBITMQ_NODE_IP_ADDRESS=$OCF_RESKEY_ip
@@ -177,8 +177,8 @@ export_vars() {
}
rabbit_validate_partial() {
- if [ ! -x $RABBITMQ_MULTI ]; then
- ocf_log err "rabbitmq-server multi $RABBITMQ_MULTI does not exist or is not executable";
+ if [ ! -x $RABBITMQ_SERVER ]; then
+ ocf_log err "rabbitmq-server server $RABBITMQ_SERVER does not exist or is not executable";
exit $OCF_ERR_INSTALLED;
fi
@@ -210,8 +210,18 @@ rabbit_validate_full() {
}
rabbit_status() {
+ rabbitmqctl_action "status"
+}
+
+rabbit_wait() {
+ rabbitmqctl_action "wait"
+}
+
+rabbitmqctl_action() {
local rc
- $RABBITMQ_CTL $NODENAME_ARG status > /dev/null 2> /dev/null
+ local action
+ action=$1
+ $RABBITMQ_CTL $NODENAME_ARG $action > /dev/null 2> /dev/null
rc=$?
case "$rc" in
0)
@@ -223,7 +233,7 @@ rabbit_status() {
return $OCF_NOT_RUNNING
;;
*)
- ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG status: $rc"
+ ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG $action: $rc"
exit $OCF_ERR_GENERIC
esac
}
@@ -238,28 +248,16 @@ rabbit_start() {
export_vars
- $RABBITMQ_MULTI start_all 1 > ${RABBITMQ_LOG_BASE}/startup_log 2> ${RABBITMQ_LOG_BASE}/startup_err &
- rc=$?
-
- if [ "$rc" != 0 ]; then
- ocf_log err "rabbitmq-server start command failed: $RABBITMQ_MULTI start_all 1, $rc"
- return $rc
- fi
+ setsid sh -c "$RABBITMQ_SERVER > ${RABBITMQ_LOG_BASE}/startup_log 2> ${RABBITMQ_LOG_BASE}/startup_err" &
- # Spin waiting for the server to come up.
+ # Wait for the server to come up.
# Let the CRM/LRM time us out if required
- start_wait=1
- while [ $start_wait = 1 ]; do
- rabbit_status
- rc=$?
- if [ "$rc" = $OCF_SUCCESS ]; then
- start_wait=0
- elif [ "$rc" != $OCF_NOT_RUNNING ]; then
- ocf_log info "rabbitmq-server start failed: $rc"
- exit $OCF_ERR_GENERIC
- fi
- sleep 1
- done
+ rabbit_wait
+ rc=$?
+ if [ "$rc" != $OCF_SUCCESS ]; then
+ ocf_log info "rabbitmq-server start failed: $rc"
+ exit $OCF_ERR_GENERIC
+ fi
return $OCF_SUCCESS
}
@@ -272,11 +270,11 @@ rabbit_stop() {
return $OCF_SUCCESS
fi
- $RABBITMQ_MULTI stop_all &
+ $RABBITMQ_CTL stop
rc=$?
if [ "$rc" != 0 ]; then
- ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_MULTI stop_all, $rc"
+ ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_CTL stop, $rc"
return $rc
fi
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index ab05f73225..d937fbb2eb 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -23,7 +23,6 @@ package: clean
cp -r debian $(UNPACKED_DIR)
cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/
sed -i \
- -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/default/rabbitmq|' \
-e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \
$(UNPACKED_DIR)/debian/rabbitmq-server.init
sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \
diff --git a/packaging/debs/Debian/debian/postinst b/packaging/debs/Debian/debian/postinst
index 134f16ee1b..b11340ef8a 100644
--- a/packaging/debs/Debian/debian/postinst
+++ b/packaging/debs/Debian/debian/postinst
@@ -35,6 +35,10 @@ chown -R rabbitmq:rabbitmq /var/log/rabbitmq
case "$1" in
configure)
+ if [ -f /etc/rabbitmq/rabbitmq.conf ] && \
+ [ ! -f /etc/rabbitmq/rabbitmq-env.conf ]; then
+ mv /etc/rabbitmq/rabbitmq.conf /etc/rabbitmq/rabbitmq-env.conf
+ fi
;;
abort-upgrade|abort-remove|abort-deconfigure)
diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules
index 6b6df33b09..a785b292af 100644
--- a/packaging/debs/Debian/debian/rules
+++ b/packaging/debs/Debian/debian/rules
@@ -14,7 +14,7 @@ DOCDIR=$(DEB_DESTDIR)usr/share/doc/rabbitmq-server/
install/rabbitmq-server::
mkdir -p $(DOCDIR)
rm $(RABBIT_LIB)LICENSE* $(RABBIT_LIB)INSTALL*
- for script in rabbitmqctl rabbitmq-server rabbitmq-multi; do \
+ for script in rabbitmqctl rabbitmq-server; do \
install -p -D -m 0755 debian/rabbitmq-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \
done
sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm
diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in
index f8417b83b9..c69c4f94d4 100644
--- a/packaging/macports/Portfile.in
+++ b/packaging/macports/Portfile.in
@@ -81,31 +81,28 @@ post-destroot {
xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${serverhome}
xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir}
- reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \
+ reinplace -E "s:(/etc/rabbitmq/rabbitmq):${prefix}\\1:g" \
${realsbin}/rabbitmq-env
- foreach var {CONFIG_FILE LOG_BASE MNESIA_BASE PIDS_FILE} {
+ foreach var {CONFIG_FILE LOG_BASE MNESIA_BASE} {
reinplace -E "s:^($var)=/:\\1=${prefix}/:" \
- ${realsbin}/rabbitmq-multi \
${realsbin}/rabbitmq-server \
${realsbin}/rabbitmqctl
}
xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \
- ${wrappersbin}/rabbitmq-multi
+ ${wrappersbin}/rabbitmq-server
reinplace -E "s:MACPORTS_PREFIX/bin:${prefix}/bin:" \
- ${wrappersbin}/rabbitmq-multi
+ ${wrappersbin}/rabbitmq-server
reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \
- ${wrappersbin}/rabbitmq-multi
+ ${wrappersbin}/rabbitmq-server
reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \
- ${wrappersbin}/rabbitmq-multi
- file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server
- file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl
+ ${wrappersbin}/rabbitmq-server
+ file copy ${wrappersbin}/rabbitmq-server ${wrappersbin}/rabbitmqctl
- file copy ${mansrc}/man1/rabbitmq-multi.1.gz ${mandest}/man1/
file copy ${mansrc}/man1/rabbitmq-server.1.gz ${mandest}/man1/
file copy ${mansrc}/man1/rabbitmqctl.1.gz ${mandest}/man1/
- file copy ${mansrc}/man5/rabbitmq.conf.5.gz ${mandest}/man5/
+ file copy ${mansrc}/man5/rabbitmq-env.conf.5.gz ${mandest}/man5/
}
pre-install {
diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile
index abe174e08e..dacfa6207f 100644
--- a/packaging/windows/Makefile
+++ b/packaging/windows/Makefile
@@ -11,7 +11,6 @@ dist:
mv $(SOURCE_DIR)/scripts/rabbitmq-server.bat $(SOURCE_DIR)/sbin
mv $(SOURCE_DIR)/scripts/rabbitmq-service.bat $(SOURCE_DIR)/sbin
mv $(SOURCE_DIR)/scripts/rabbitmqctl.bat $(SOURCE_DIR)/sbin
- mv $(SOURCE_DIR)/scripts/rabbitmq-multi.bat $(SOURCE_DIR)/sbin
rm -rf $(SOURCE_DIR)/scripts
rm -rf $(SOURCE_DIR)/codegen* $(SOURCE_DIR)/Makefile
rm -f $(SOURCE_DIR)/README
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index df4b24d8fc..3e17394981 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -37,4 +37,8 @@ RABBITMQ_HOME="${SCRIPT_DIR}/.."
NODENAME=rabbit@${HOSTNAME%%.*}
# Load configuration from the rabbitmq.conf file
-[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
+if [ -f /etc/rabbitmq/rabbitmq.conf ]; then
+ echo -n "WARNING: ignoring /etc/rabbitmq/rabbitmq.conf -- "
+ echo "location has moved to /etc/rabbitmq/rabbitmq-env.conf"
+fi
+[ -f /etc/rabbitmq/rabbitmq-env.conf ] && . /etc/rabbitmq/rabbitmq-env.conf
diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi
deleted file mode 100755
index ebcf4b6305..0000000000
--- a/scripts/rabbitmq-multi
+++ /dev/null
@@ -1,72 +0,0 @@
-#!/bin/sh
-## The contents of this file are subject to the Mozilla Public License
-## Version 1.1 (the "License"); you may not use this file except in
-## compliance with the License. You may obtain a copy of the License
-## at http://www.mozilla.org/MPL/
-##
-## Software distributed under the License is distributed on an "AS IS"
-## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-## the License for the specific language governing rights and
-## limitations under the License.
-##
-## The Original Code is RabbitMQ.
-##
-## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
-##
-
-SCRIPT_HOME=$(dirname $0)
-PIDS_FILE=/var/lib/rabbitmq/pids
-MULTI_ERL_ARGS=
-MULTI_START_ARGS=
-CONFIG_FILE=/etc/rabbitmq/rabbitmq
-
-. `dirname $0`/rabbitmq-env
-
-DEFAULT_NODE_IP_ADDRESS=0.0.0.0
-DEFAULT_NODE_PORT=5672
-[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
-[ "x" = "x$RABBITMQ_NODE_PORT" ] && [ "x" != "x$NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
-if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ]
-then
- if [ "x" != "x$RABBITMQ_NODE_PORT" ]
- then RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS}
- fi
-else
- if [ "x" = "x$RABBITMQ_NODE_PORT" ]
- then RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT}
- fi
-fi
-[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
-[ "x" = "x$RABBITMQ_SCRIPT_HOME" ] && RABBITMQ_SCRIPT_HOME=${SCRIPT_HOME}
-[ "x" = "x$RABBITMQ_PIDS_FILE" ] && RABBITMQ_PIDS_FILE=${PIDS_FILE}
-[ "x" = "x$RABBITMQ_MULTI_ERL_ARGS" ] && RABBITMQ_MULTI_ERL_ARGS=${MULTI_ERL_ARGS}
-[ "x" = "x$RABBITMQ_MULTI_START_ARGS" ] && RABBITMQ_MULTI_START_ARGS=${MULTI_START_ARGS}
-[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE}
-
-export \
- RABBITMQ_NODENAME \
- RABBITMQ_NODE_IP_ADDRESS \
- RABBITMQ_NODE_PORT \
- RABBITMQ_SCRIPT_HOME \
- RABBITMQ_PIDS_FILE \
- RABBITMQ_CONFIG_FILE
-
-RABBITMQ_CONFIG_ARG=
-[ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}"
-
-# we need to turn off path expansion because some of the vars, notably
-# RABBITMQ_MULTI_ERL_ARGS, may contain terms that look like globs and
-# there is no other way of preventing their expansion.
-set -f
-
-exec erl \
- -pa "${RABBITMQ_HOME}/ebin" \
- -noinput \
- -hidden \
- ${RABBITMQ_MULTI_ERL_ARGS} \
- -sname rabbitmq_multi$$ \
- ${RABBITMQ_CONFIG_ARG} \
- -s rabbit_multi \
- ${RABBITMQ_MULTI_START_ARGS} \
- -extra "$@"
diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat
deleted file mode 100644
index a2d10f2e74..0000000000
--- a/scripts/rabbitmq-multi.bat
+++ /dev/null
@@ -1,84 +0,0 @@
-@echo off
-REM The contents of this file are subject to the Mozilla Public License
-REM Version 1.1 (the "License"); you may not use this file except in
-REM compliance with the License. You may obtain a copy of the License
-REM at http://www.mozilla.org/MPL/
-REM
-REM Software distributed under the License is distributed on an "AS IS"
-REM basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-REM the License for the specific language governing rights and
-REM limitations under the License.
-REM
-REM The Original Code is RabbitMQ.
-REM
-REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
-REM
-
-setlocal
-
-rem Preserve values that might contain exclamation marks before
-rem enabling delayed expansion
-set TDP0=%~dp0
-set STAR=%*
-setlocal enabledelayedexpansion
-
-if "!RABBITMQ_BASE!"=="" (
- set RABBITMQ_BASE=!APPDATA!\RabbitMQ
-)
-
-if "!COMPUTERNAME!"=="" (
- set COMPUTERNAME=localhost
-)
-
-if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
-)
-
-if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
- if not "!RABBITMQ_NODE_PORT!"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
- )
-) else (
- if "!RABBITMQ_NODE_PORT!"=="" (
- set RABBITMQ_NODE_PORT=5672
- )
-)
-
-set RABBITMQ_PIDS_FILE=!RABBITMQ_BASE!\rabbitmq.pids
-set RABBITMQ_SCRIPT_HOME=!TDP0!
-
-if "!RABBITMQ_CONFIG_FILE!"=="" (
- set RABBITMQ_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq
-)
-
-if exist "!RABBITMQ_CONFIG_FILE!.config" (
- set RABBITMQ_CONFIG_ARG=-config "!RABBITMQ_CONFIG_FILE!"
-) else (
- set RABBITMQ_CONFIG_ARG=
-)
-
-if not exist "!ERLANG_HOME!\bin\erl.exe" (
- echo.
- echo ******************************
- echo ERLANG_HOME not set correctly.
- echo ******************************
- echo.
- echo Please either set ERLANG_HOME to point to your Erlang installation or place the
- echo RabbitMQ server distribution in the Erlang lib folder.
- echo.
- exit /B
-)
-
-"!ERLANG_HOME!\bin\erl.exe" ^
--pa "!TDP0!..\ebin" ^
--noinput -hidden ^
-!RABBITMQ_MULTI_ERL_ARGS! ^
--sname rabbitmq_multi!RANDOM! ^
-!RABBITMQ_CONFIG_ARG! ^
--s rabbit_multi ^
-!RABBITMQ_MULTI_START_ARGS! ^
--extra !STAR!
-
-endlocal
-endlocal
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 1e1f37cb3d..f41815d06b 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -146,7 +146,8 @@
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
--export([obtain/0, transfer/1, set_limit/1, get_limit/0]).
+-export([obtain/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0,
+ info/1]).
-export([ulimit/0]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -259,11 +260,17 @@
-spec(transfer/1 :: (pid()) -> 'ok').
-spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
-spec(get_limit/0 :: () -> non_neg_integer()).
+-spec(info_keys/0 :: () -> [atom()]).
+-spec(info/0 :: () -> [{atom(), any()}]).
+-spec(info/1 :: ([atom()]) -> [{atom(), any()}]).
-spec(ulimit/0 :: () -> 'infinity' | 'unknown' | non_neg_integer()).
-endif.
%%----------------------------------------------------------------------------
+-define(INFO_KEYS, [obtain_count, obtain_limit]).
+
+%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
@@ -494,6 +501,11 @@ set_limit(Limit) ->
get_limit() ->
gen_server:call(?SERVER, get_limit, infinity).
+info_keys() -> ?INFO_KEYS.
+
+info() -> info(?INFO_KEYS).
+info(Items) -> gen_server:call(?SERVER, {info, Items}, infinity).
+
%%----------------------------------------------------------------------------
%% Internal functions
%%----------------------------------------------------------------------------
@@ -789,6 +801,12 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
{Error, Handle}
end.
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+
+i(obtain_count, #fhc_state{obtain_count = Count}) -> Count;
+i(obtain_limit, #fhc_state{obtain_limit = Limit}) -> Limit;
+i(Item, _) -> throw({bad_argument, Item}).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -871,13 +889,18 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
false ->
{noreply, run_pending_item(Item, State)}
end;
+
handle_call({set_limit, Limit}, _From, State) ->
{reply, ok, maybe_reduce(
process_pending(State #fhc_state {
limit = Limit,
obtain_limit = obtain_limit(Limit) }))};
+
handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) ->
- {reply, Limit, State}.
+ {reply, Limit, State};
+
+handle_call({info, Items}, _From, State) ->
+ {reply, infos(Items, State), State}.
handle_cast({register_callback, Pid, MFA},
State = #fhc_state { clients = Clients }) ->
diff --git a/src/gm.erl b/src/gm.erl
index 283b243171..b3fb7eca53 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -53,13 +53,12 @@
%% to create the tables required.
%%
%% start_link/3
-%% Provide the group name, the callback module name, and a list of any
-%% arguments you wish to be passed into the callback module's
-%% functions. The joined/1 will be called when we have joined the
-%% group, and the list of arguments will have appended to it a list of
-%% the current members of the group. See the comments in
-%% behaviour_info/1 below for further details of the callback
-%% functions.
+%% Provide the group name, the callback module name, and any arguments
+%% you wish to be passed into the callback module's functions. The
+%% joined/1 will be called when we have joined the group, and the list
+%% of arguments will have appended to it a list of the current members
+%% of the group. See the comments in behaviour_info/1 below for
+%% further details of the callback functions.
%%
%% leave/1
%% Provide the Pid. Removes the Pid from the group. The callback
@@ -421,7 +420,7 @@
-type(group_name() :: any()).
-spec(create_tables/0 :: () -> 'ok').
--spec(start_link/3 :: (group_name(), atom(), [any()]) ->
+-spec(start_link/3 :: (group_name(), atom(), any()) ->
{'ok', pid()} | {'error', any()}).
-spec(leave/1 :: (pid()) -> 'ok').
-spec(broadcast/2 :: (pid(), any()) -> 'ok').
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
new file mode 100644
index 0000000000..1f8832a6b2
--- /dev/null
+++ b/src/gm_soak_test.erl
@@ -0,0 +1,130 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(gm_soak_test).
+
+-export([test/0]).
+-export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
+
+-behaviour(gm).
+
+-include("gm_specs.hrl").
+
+%% ---------------------------------------------------------------------------
+%% Soak test
+%% ---------------------------------------------------------------------------
+
+get_state() ->
+ get(state).
+
+with_state(Fun) ->
+ put(state, Fun(get_state())).
+
+inc() ->
+ case 1 + get(count) of
+ 100000 -> Now = os:timestamp(),
+ Start = put(ts, Now),
+ Diff = timer:now_diff(Now, Start),
+ Rate = 100000 / (Diff / 1000000),
+ io:format("~p seeing ~p msgs/sec~n", [self(), Rate]),
+ put(count, 0);
+ N -> put(count, N)
+ end.
+
+joined([], Members) ->
+ io:format("Joined ~p (~p members)~n", [self(), length(Members)]),
+ put(state, dict:from_list([{Member, empty} || Member <- Members])),
+ put(count, 0),
+ put(ts, os:timestamp()),
+ ok.
+
+members_changed([], Births, Deaths) ->
+ with_state(
+ fun (State) ->
+ State1 =
+ lists:foldl(
+ fun (Born, StateN) ->
+ false = dict:is_key(Born, StateN),
+ dict:store(Born, empty, StateN)
+ end, State, Births),
+ lists:foldl(
+ fun (Died, StateN) ->
+ true = dict:is_key(Died, StateN),
+ dict:store(Died, died, StateN)
+ end, State1, Deaths)
+ end),
+ ok.
+
+handle_msg([], From, {test_msg, Num}) ->
+ inc(),
+ with_state(
+ fun (State) ->
+ ok = case dict:find(From, State) of
+ {ok, died} ->
+ exit({{from, From},
+ {received_posthumous_delivery, Num}});
+ {ok, empty} -> ok;
+ {ok, Num} -> ok;
+ {ok, Num1} when Num < Num1 ->
+ exit({{from, From},
+ {duplicate_delivery_of, Num1},
+ {expecting, Num}});
+ {ok, Num1} ->
+ exit({{from, From},
+ {missing_delivery_of, Num},
+ {received_early, Num1}});
+ error ->
+ exit({{from, From},
+ {received_premature_delivery, Num}})
+ end,
+ dict:store(From, Num + 1, State)
+ end),
+ ok.
+
+terminate([], Reason) ->
+ io:format("Left ~p (~p)~n", [self(), Reason]),
+ ok.
+
+spawn_member() ->
+ spawn_link(
+ fun () ->
+ random:seed(now()),
+ %% start up delay of no more than 10 seconds
+ timer:sleep(random:uniform(10000)),
+ {ok, Pid} = gm:start_link(?MODULE, ?MODULE, []),
+ Start = random:uniform(10000),
+ send_loop(Pid, Start, Start + random:uniform(10000)),
+ gm:leave(Pid),
+ spawn_more()
+ end).
+
+spawn_more() ->
+ [spawn_member() || _ <- lists:seq(1, 4 - random:uniform(4))].
+
+send_loop(_Pid, Target, Target) ->
+ ok;
+send_loop(Pid, Count, Target) when Target > Count ->
+ case random:uniform(3) of
+ 3 -> gm:confirmed_broadcast(Pid, {test_msg, Count});
+ _ -> gm:broadcast(Pid, {test_msg, Count})
+ end,
+ timer:sleep(random:uniform(5) - 1), %% sleep up to 4 ms
+ send_loop(Pid, Count + 1, Target).
+
+test() ->
+ ok = gm:create_tables(),
+ spawn_member(),
+ spawn_member().
diff --git a/src/gm_tests.erl b/src/gm_tests.erl
new file mode 100644
index 0000000000..65e9cff041
--- /dev/null
+++ b/src/gm_tests.erl
@@ -0,0 +1,182 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(gm_tests).
+
+-export([test_join_leave/0,
+ test_broadcast/0,
+ test_confirmed_broadcast/0,
+ test_member_death/0,
+ test_receive_in_order/0,
+ all_tests/0]).
+-export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
+
+-behaviour(gm).
+
+-include("gm_specs.hrl").
+
+-define(RECEIVE_OR_THROW(Body, Bool, Error),
+ receive Body ->
+ true = Bool,
+ passed
+ after 1000 ->
+ throw(Error)
+ end).
+
+joined(Pid, Members) ->
+ Pid ! {joined, self(), Members},
+ ok.
+
+members_changed(Pid, Births, Deaths) ->
+ Pid ! {members_changed, self(), Births, Deaths},
+ ok.
+
+handle_msg(Pid, From, Msg) ->
+ Pid ! {msg, self(), From, Msg},
+ ok.
+
+terminate(Pid, Reason) ->
+ Pid ! {termination, self(), Reason},
+ ok.
+
+%% ---------------------------------------------------------------------------
+%% Functional tests
+%% ---------------------------------------------------------------------------
+
+all_tests() ->
+ passed = test_join_leave(),
+ passed = test_broadcast(),
+ passed = test_confirmed_broadcast(),
+ passed = test_member_death(),
+ passed = test_receive_in_order(),
+ passed.
+
+test_join_leave() ->
+ with_two_members(fun (_Pid, _Pid2) -> passed end).
+
+test_broadcast() ->
+ test_broadcast(fun gm:broadcast/2).
+
+test_confirmed_broadcast() ->
+ test_broadcast(fun gm:confirmed_broadcast/2).
+
+test_member_death() ->
+ with_two_members(
+ fun (Pid, Pid2) ->
+ {ok, Pid3} = gm:start_link(?MODULE, ?MODULE, self()),
+ passed = receive_joined(Pid3, [Pid, Pid2, Pid3],
+ timeout_joining_gm_group_3),
+ passed = receive_birth(Pid, Pid3, timeout_waiting_for_birth_3_1),
+ passed = receive_birth(Pid2, Pid3, timeout_waiting_for_birth_3_2),
+
+ unlink(Pid3),
+ exit(Pid3, kill),
+
+ %% Have to do some broadcasts to ensure that all members
+ %% find out about the death.
+ passed = (test_broadcast_fun(fun gm:confirmed_broadcast/2))(
+ Pid, Pid2),
+
+ passed = receive_death(Pid, Pid3, timeout_waiting_for_death_3_1),
+ passed = receive_death(Pid2, Pid3, timeout_waiting_for_death_3_2),
+
+ passed
+ end).
+
+test_receive_in_order() ->
+ with_two_members(
+ fun (Pid, Pid2) ->
+ Numbers = lists:seq(1,1000),
+ [begin ok = gm:broadcast(Pid, N), ok = gm:broadcast(Pid2, N) end
+ || N <- Numbers],
+ passed = receive_numbers(
+ Pid, Pid, {timeout_for_msgs, Pid, Pid}, Numbers),
+ passed = receive_numbers(
+ Pid, Pid2, {timeout_for_msgs, Pid, Pid2}, Numbers),
+ passed = receive_numbers(
+ Pid2, Pid, {timeout_for_msgs, Pid2, Pid}, Numbers),
+ passed = receive_numbers(
+ Pid2, Pid2, {timeout_for_msgs, Pid2, Pid2}, Numbers),
+ passed
+ end).
+
+test_broadcast(Fun) ->
+ with_two_members(test_broadcast_fun(Fun)).
+
+test_broadcast_fun(Fun) ->
+ fun (Pid, Pid2) ->
+ ok = Fun(Pid, magic_message),
+ passed = receive_or_throw({msg, Pid, Pid, magic_message},
+ timeout_waiting_for_msg),
+ passed = receive_or_throw({msg, Pid2, Pid, magic_message},
+ timeout_waiting_for_msg)
+ end.
+
+with_two_members(Fun) ->
+ ok = gm:create_tables(),
+
+ {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self()),
+ passed = receive_joined(Pid, [Pid], timeout_joining_gm_group_1),
+
+ {ok, Pid2} = gm:start_link(?MODULE, ?MODULE, self()),
+ passed = receive_joined(Pid2, [Pid, Pid2], timeout_joining_gm_group_2),
+ passed = receive_birth(Pid, Pid2, timeout_waiting_for_birth_2),
+
+ passed = Fun(Pid, Pid2),
+
+ ok = gm:leave(Pid),
+ passed = receive_death(Pid2, Pid, timeout_waiting_for_death_1),
+ passed =
+ receive_termination(Pid, normal, timeout_waiting_for_termination_1),
+
+ ok = gm:leave(Pid2),
+ passed =
+ receive_termination(Pid2, normal, timeout_waiting_for_termination_2),
+
+ receive X -> throw({unexpected_message, X})
+ after 0 -> passed
+ end.
+
+receive_or_throw(Pattern, Error) ->
+ ?RECEIVE_OR_THROW(Pattern, true, Error).
+
+receive_birth(From, Born, Error) ->
+ ?RECEIVE_OR_THROW({members_changed, From, Birth, Death},
+ ([Born] == Birth) andalso ([] == Death),
+ Error).
+
+receive_death(From, Died, Error) ->
+ ?RECEIVE_OR_THROW({members_changed, From, Birth, Death},
+ ([] == Birth) andalso ([Died] == Death),
+ Error).
+
+receive_joined(From, Members, Error) ->
+ ?RECEIVE_OR_THROW({joined, From, Members1},
+ lists:usort(Members) == lists:usort(Members1),
+ Error).
+
+receive_termination(From, Reason, Error) ->
+ ?RECEIVE_OR_THROW({termination, From, Reason1},
+ Reason == Reason1,
+ Error).
+
+receive_numbers(_Pid, _Sender, _Error, []) ->
+ passed;
+receive_numbers(Pid, Sender, Error, [N | Numbers]) ->
+ ?RECEIVE_OR_THROW({msg, Pid, Sender, M},
+ M == N,
+ Error),
+ receive_numbers(Pid, Sender, Error, Numbers).
diff --git a/src/pg_local.erl b/src/pg_local.erl
index fd515747e5..c9c3a3a715 100644
--- a/src/pg_local.erl
+++ b/src/pg_local.erl
@@ -83,7 +83,7 @@ get_members(Name) ->
sync() ->
ensure_started(),
- gen_server:call(?MODULE, sync).
+ gen_server:call(?MODULE, sync, infinity).
%%%
%%% Callback functions from gen_server
diff --git a/src/rabbit.erl b/src/rabbit.erl
index d967cfb9cd..fb5207b960 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -213,7 +213,8 @@ stop_and_halt() ->
ok.
status() ->
- [{running_applications, application:which_applications()}] ++
+ [{pid, list_to_integer(os:getpid())},
+ {running_applications, application:which_applications()}] ++
rabbit_mnesia:status().
rotate_logs(BinarySuffix) ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4ef9750c82..155e1317c3 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -199,7 +199,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
exclusive_owner = Owner,
pid = none,
mirror_pids = []}),
- case gen_server2:call(Q#amqqueue.pid, {init, false}) of
+ case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of
not_found -> rabbit_misc:not_found(QueueName);
Q1 -> Q1
end.
@@ -302,29 +302,19 @@ check_declare_arguments(QueueName, Args) ->
"invalid arg '~s' for ~s: ~w",
[Key, rabbit_misc:rs(QueueName), Error])
end || {Key, Fun} <-
- [{<<"x-expires">>, fun check_expires_argument/1},
- {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]],
+ [{<<"x-expires">>, fun check_integer_argument/1},
+ {<<"x-message-ttl">>, fun check_integer_argument/1}]],
ok.
-check_expires_argument(Val) ->
- check_integer_argument(Val,
- expires_not_of_acceptable_type,
- expires_zero_or_less).
-
-check_message_ttl_argument(Val) ->
- check_integer_argument(Val,
- ttl_not_of_acceptable_type,
- ttl_zero_or_less).
-
-check_integer_argument(undefined, _, _) ->
+check_integer_argument(undefined) ->
ok;
-check_integer_argument({Type, Val}, InvalidTypeError, _) when Val > 0 ->
+check_integer_argument({Type, Val}) when Val > 0 ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
true -> ok;
- false -> {error, {InvalidTypeError, Type, Val}}
+ false -> {error, {unacceptable_type, Type}}
end;
-check_integer_argument({_Type, _Val}, _, ZeroOrLessError) ->
- {error, ZeroOrLessError}.
+check_integer_argument({_Type, Val}) ->
+ {error, {value_zero_or_less, Val}}.
list(VHostPath) ->
mnesia:dirty_match_object(
@@ -336,10 +326,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
- delegate_call(QPid, info, infinity).
+ delegate_call(QPid, info).
info(#amqqueue{ pid = QPid }, Items) ->
- case delegate_call(QPid, {info, Items}, infinity) of
+ case delegate_call(QPid, {info, Items}) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -349,7 +339,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
consumers(#amqqueue{ pid = QPid }) ->
- delegate_call(QPid, consumers, infinity).
+ delegate_call(QPid, consumers).
consumers_all(VHostPath) ->
lists:append(
@@ -359,7 +349,7 @@ consumers_all(VHostPath) ->
end)).
stat(#amqqueue{pid = QPid}) ->
- delegate_call(QPid, stat, infinity).
+ delegate_call(QPid, stat).
emit_stats(#amqqueue{pid = QPid}) ->
delegate_cast(QPid, emit_stats).
@@ -368,9 +358,9 @@ delete_immediately(#amqqueue{ pid = QPid }) ->
gen_server2:cast(QPid, delete_immediately).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
- delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
+ delegate_call(QPid, {delete, IfUnused, IfEmpty}).
-purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity).
+purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge).
deliver(QPid, Delivery = #delivery{immediate = true}) ->
gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity);
@@ -382,7 +372,7 @@ deliver(QPid, Delivery) ->
true.
requeue(QPid, MsgIds, ChPid) ->
- delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity).
+ delegate_call(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}).
@@ -411,17 +401,15 @@ limit_all(QPids, ChPid, LimiterPid) ->
end).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- delegate_call(QPid, {basic_get, ChPid, NoAck}, infinity).
+ delegate_call(QPid, {basic_get, ChPid, NoAck}).
basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
delegate_call(QPid, {basic_consume, NoAck, ChPid,
- LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
- infinity).
+ LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
- infinity).
+ ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
notify_sent(QPid, ChPid) ->
gen_server2:cast(QPid, {notify_sent, ChPid}).
@@ -516,8 +504,8 @@ safe_delegate_call_ok(F, Pids) ->
{_, Bad} -> {error, Bad}
end.
-delegate_call(Pid, Msg, Timeout) ->
- delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
+delegate_call(Pid, Msg) ->
+ delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, infinity) end).
delegate_cast(Pid, Msg) ->
delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0b5ad05939..01f7fa6fbf 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -21,7 +21,7 @@
-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
--define(SYNC_INTERVAL, 5). %% milliseconds
+-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(BASE_MESSAGE_PROPERTIES,
@@ -154,6 +154,8 @@ terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->
terminate(_Reason, State = #q{backing_queue = BQ}) ->
%% FIXME: How do we cancel active subscriptions?
terminate_shutdown(fun (BQS) ->
+ rabbit_event:notify(
+ queue_deleted, [{pid, self()}]),
BQS1 = BQ:delete_and_terminate(BQS),
%% don't care if the internal delete
%% doesn't return 'ok'.
@@ -217,7 +219,6 @@ terminate_shutdown(Fun, State) ->
end, BQS, all_ch_record()),
[emit_consumer_deleted(Ch, CTag)
|| {Ch, CTag, _} <- consumers(State1)],
- rabbit_event:notify(queue_deleted, [{pid, self()}]),
State1#q{backing_queue_state = Fun(BQS1)}
end.
@@ -700,13 +701,13 @@ message_properties(#q{ttl=TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL)}.
calculate_msg_expiry(undefined) -> undefined;
-calculate_msg_expiry(TTL) -> now_millis() + (TTL * 1000).
+calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
drop_expired_messages(State = #q{ttl = undefined}) ->
State;
drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
- Now = now_millis(),
+ Now = now_micros(),
BQS1 = BQ:dropwhile(
fun (#message_properties{expiry = Expiry}) ->
Now > Expiry
@@ -727,7 +728,7 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ensure_ttl_timer(State) ->
State.
-now_millis() -> timer:now_diff(now(), {0,0,0}).
+now_micros() -> timer:now_diff(now(), {0,0,0}).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index d67c7f58e1..dc81ace6bf 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -61,8 +61,7 @@
-spec(map_exception/3 :: (rabbit_channel:channel_number(),
rabbit_types:amqp_error() | any(),
rabbit_types:protocol()) ->
- {boolean(),
- rabbit_channel:channel_number(),
+ {rabbit_channel:channel_number(),
rabbit_framing:amqp_method_record()}).
-endif.
@@ -301,24 +300,21 @@ clear_encoded_content(Content = #content{}) ->
map_exception(Channel, Reason, Protocol) ->
{SuggestedClose, ReplyCode, ReplyText, FailedMethod} =
lookup_amqp_exception(Reason, Protocol),
- ShouldClose = SuggestedClose orelse (Channel == 0),
{ClassId, MethodId} = case FailedMethod of
{_, _} -> FailedMethod;
none -> {0, 0};
_ -> Protocol:method_id(FailedMethod)
end,
- {CloseChannel, CloseMethod} =
- case ShouldClose of
- true -> {0, #'connection.close'{reply_code = ReplyCode,
- reply_text = ReplyText,
- class_id = ClassId,
- method_id = MethodId}};
- false -> {Channel, #'channel.close'{reply_code = ReplyCode,
- reply_text = ReplyText,
- class_id = ClassId,
- method_id = MethodId}}
- end,
- {ShouldClose, CloseChannel, CloseMethod}.
+ case SuggestedClose orelse (Channel == 0) of
+ true -> {0, #'connection.close'{reply_code = ReplyCode,
+ reply_text = ReplyText,
+ class_id = ClassId,
+ method_id = MethodId}};
+ false -> {Channel, #'channel.close'{reply_code = ReplyCode,
+ reply_text = ReplyText,
+ class_id = ClassId,
+ method_id = MethodId}}
+ end.
lookup_amqp_exception(#amqp_error{name = Name,
explanation = Expl,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a82e5eff3e..34a5e5a440 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -20,21 +20,22 @@
-behaviour(gen_server2).
--export([start_link/7, do/2, do/3, flush/1, shutdown/1]).
+-export([start_link/9, do/2, do/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
--export([emit_stats/1]).
+-export([emit_stats/1, ready_for_close/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2]).
--record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
+-record(ch, {state, protocol, channel, reader_pid, writer_pid, limiter_pid,
start_limiter_fun, transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, unconfirmed, confirmed}).
+ confirm_enabled, publish_seqno, unconfirmed_mq, unconfirmed_qm,
+ confirmed, capabilities}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -66,10 +67,10 @@
-type(channel_number() :: non_neg_integer()).
--spec(start_link/7 ::
- (channel_number(), pid(), pid(), rabbit_types:user(),
- rabbit_types:vhost(), pid(),
- fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
+-spec(start_link/9 ::
+ (channel_number(), pid(), pid(), rabbit_types:protocol(),
+ rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
+ pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
@@ -89,15 +90,17 @@
-spec(info_all/0 :: () -> [rabbit_types:infos()]).
-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]).
-spec(emit_stats/1 :: (pid()) -> 'ok').
+-spec(ready_for_close/1 :: (pid()) -> 'ok').
-endif.
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
- StartLimiterFun) ->
- gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, User,
- VHost, CollectorPid, StartLimiterFun], []).
+start_link(Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities,
+ CollectorPid, StartLimiterFun) ->
+ gen_server2:start_link(
+ ?MODULE, [Channel, ReaderPid, WriterPid, Protocol, User, VHost,
+ Capabilities, CollectorPid, StartLimiterFun], []).
do(Pid, Method) ->
do(Pid, Method, none).
@@ -106,7 +109,7 @@ do(Pid, Method, Content) ->
gen_server2:cast(Pid, {method, Method, Content}).
flush(Pid) ->
- gen_server2:call(Pid, flush).
+ gen_server2:call(Pid, flush, infinity).
shutdown(Pid) ->
gen_server2:cast(Pid, terminate).
@@ -146,14 +149,18 @@ info_all(Items) ->
emit_stats(Pid) ->
gen_server2:cast(Pid, emit_stats).
+ready_for_close(Pid) ->
+ gen_server2:cast(Pid, ready_for_close).
+
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
- StartLimiterFun]) ->
+init([Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities,
+ CollectorPid, StartLimiterFun]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
StatsTimer = rabbit_event:init_stats_timer(),
State = #ch{state = starting,
+ protocol = Protocol,
channel = Channel,
reader_pid = ReaderPid,
writer_pid = WriterPid,
@@ -173,8 +180,10 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
publish_seqno = 1,
- unconfirmed = gb_trees:empty(),
- confirmed = []},
+ unconfirmed_mq = gb_trees:empty(),
+ unconfirmed_qm = gb_trees:empty(),
+ confirmed = [],
+ capabilities = Capabilities},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State) end),
@@ -218,14 +227,11 @@ handle_cast({method, Method, Content}, State) ->
{noreply, NewState} ->
noreply(NewState);
stop ->
- {stop, normal, State#ch{state = terminating}}
+ {stop, normal, State}
catch
exit:Reason = #amqp_error{} ->
MethodName = rabbit_misc:method_record_type(Method),
- {stop, normal, terminating(Reason#amqp_error{method = MethodName},
- State)};
- exit:normal ->
- {stop, normal, State};
+ send_exception(Reason#amqp_error{method = MethodName}, State);
_:Reason ->
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
@@ -233,6 +239,11 @@ handle_cast({method, Method, Content}, State) ->
handle_cast({flushed, QPid}, State) ->
{noreply, queue_blocked(QPid, State), hibernate};
+handle_cast(ready_for_close, State = #ch{state = closing,
+ writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
+ {stop, normal, State};
+
handle_cast(terminate, State) ->
{stop, normal, State};
@@ -278,19 +289,22 @@ handle_info(timeout, State) ->
noreply(State);
handle_info({'DOWN', _MRef, process, QPid, Reason},
- State = #ch{unconfirmed = UC}) ->
- %% TODO: this does a complete scan and partial rebuild of the
- %% tree, which is quite efficient. To do better we'd need to
- %% maintain a secondary mapping, from QPids to MsgSeqNos.
- {MXs, UC1} = remove_queue_unconfirmed(
- gb_trees:next(gb_trees:iterator(UC)), QPid,
- {[], UC}, State),
+ State = #ch{unconfirmed_qm = UQM}) ->
+ MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSet} -> gb_sets:to_list(MsgSet);
+ none -> []
+ end,
+ %% We remove the MsgSeqNos from UQM before calling
+ %% process_confirms to prevent each MsgSeqNo being removed from
+ %% the set one by one which which would be inefficient
+ State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
+ {MXs, State2} = process_confirms(MsgSeqNos, QPid, State1),
erase_queue_stats(QPid),
- State1 = case Reason of
- normal -> record_confirms(MXs, State#ch{unconfirmed = UC1});
- _ -> send_nacks(MXs, State#ch{unconfirmed = UC1})
- end,
- noreply(queue_blocked(QPid, State1)).
+ State3 = (case Reason of
+ normal -> fun record_confirms/2;
+ _ -> fun send_nacks/2
+ end)(MXs, State2),
+ noreply(queue_blocked(QPid, State3)).
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
@@ -302,18 +316,16 @@ handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer),
{hibernate, State#ch{stats_timer = StatsTimer1}}.
-terminate(_Reason, State = #ch{state = terminating}) ->
- terminate(State);
-
terminate(Reason, State) ->
- Res = rollback_and_notify(State),
+ {Res, _State1} = rollback_and_notify(State),
case Reason of
normal -> ok = Res;
shutdown -> ok = Res;
{shutdown, _Term} -> ok = Res;
_ -> ok
end,
- terminate(State).
+ pg_local:leave(rabbit_channels, self()),
+ rabbit_event:notify(channel_closed, [{pid, self()}]).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -351,10 +363,22 @@ return_ok(State, false, Msg) -> {reply, Msg, State}.
ok_msg(true, _Msg) -> undefined;
ok_msg(false, Msg) -> Msg.
-terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) ->
- ok = rollback_and_notify(State),
- Reader ! {channel_exit, Channel, Reason},
- State#ch{state = terminating}.
+send_exception(Reason, State = #ch{protocol = Protocol,
+ channel = Channel,
+ writer_pid = WriterPid,
+ reader_pid = ReaderPid}) ->
+ {CloseChannel, CloseMethod} =
+ rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
+ rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
+ [ReaderPid, Channel, Reason]),
+ %% something bad's happened: rollback_and_notify may not be 'ok'
+ {_Result, State1} = rollback_and_notify(State),
+ case CloseChannel of
+ Channel -> ok = rabbit_writer:send_command(WriterPid, CloseMethod),
+ {noreply, State1};
+ _ -> ReaderPid ! {channel_exit, Channel, Reason},
+ {stop, normal, State1}
+ end.
return_queue_declare_ok(#resource{name = ActualName},
NoWait, MessageCount, ConsumerCount, State) ->
@@ -476,13 +500,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-remove_queue_unconfirmed(none, _QPid, Acc, _State) ->
- Acc;
-remove_queue_unconfirmed({MsgSeqNo, XQ, Next}, QPid, Acc, State) ->
- remove_queue_unconfirmed(gb_trees:next(Next), QPid,
- remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State),
- State).
-
record_confirm(undefined, _, State) ->
State;
record_confirm(MsgSeqNo, XName, State) ->
@@ -495,25 +512,43 @@ record_confirms(MXs, State = #ch{confirmed = C}) ->
confirm([], _QPid, State) ->
State;
-confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
- {MXs, UC1} =
+confirm(MsgSeqNos, QPid, State) ->
+ {MXs, State1} = process_confirms(MsgSeqNos, QPid, State),
+ record_confirms(MXs, State1).
+
+process_confirms(MsgSeqNos, QPid, State = #ch{unconfirmed_mq = UMQ,
+ unconfirmed_qm = UQM}) ->
+ {MXs, UMQ1, UQM1} =
lists:foldl(
- fun(MsgSeqNo, {_DMs, UC0} = Acc) ->
- case gb_trees:lookup(MsgSeqNo, UC0) of
- none -> Acc;
- {value, XQ} -> remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State)
+ fun(MsgSeqNo, {_DMs, UMQ0, _UQM} = Acc) ->
+ case gb_trees:lookup(MsgSeqNo, UMQ0) of
+ {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, Acc,
+ State);
+ none -> Acc
end
- end, {[], UC}, MsgSeqNos),
- record_confirms(MXs, State#ch{unconfirmed = UC1}).
+ end, {[], UMQ, UQM}, MsgSeqNos),
+ {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}.
-remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) ->
- Qs1 = sets:del_element(QPid, Qs),
+remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, State) ->
%% these confirms will be emitted even when a queue dies, but that
%% should be fine, since the queue stats get erased immediately
maybe_incr_stats([{{QPid, XName}, 1}], confirm, State),
- case sets:size(Qs1) of
- 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)};
- _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC)}
+ UQM1 = case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSeqNos} ->
+ MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
+ case gb_sets:is_empty(MsgSeqNos1) of
+ true -> gb_trees:delete(QPid, UQM);
+ false -> gb_trees:update(QPid, MsgSeqNos1, UQM)
+ end;
+ none ->
+ UQM
+ end,
+ Qs1 = gb_sets:del_element(QPid, Qs),
+ case gb_sets:is_empty(Qs1) of
+ true ->
+ {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1};
+ false ->
+ {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1}
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -526,11 +561,20 @@ handle_method(#'channel.open'{}, _, _State) ->
handle_method(_Method, _, #ch{state = starting}) ->
rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []);
-handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
- ok = rollback_and_notify(State),
- ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
+handle_method(#'channel.close_ok'{}, _, #ch{state = closing}) ->
stop;
+handle_method(#'channel.close'{}, _, State = #ch{state = closing}) ->
+ {reply, #'channel.close_ok'{}, State};
+
+handle_method(_Method, _, State = #ch{state = closing}) ->
+ {noreply, State};
+
+handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
+ {ok, State1} = rollback_and_notify(State),
+ ReaderPid ! {channel_closing, self()},
+ {noreply, State1};
+
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
@@ -1081,9 +1125,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = Content},
- WriterPid, Reason) ->
- {_Close, ReplyCode, ReplyText} =
- rabbit_framing_amqp_0_9_1:lookup_amqp_exception(Reason),
+ #ch{protocol = Protocol, writer_pid = WriterPid}, Reason) ->
+ {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.return'{reply_code = ReplyCode,
@@ -1170,10 +1213,13 @@ internal_rollback(State = #ch{transaction_id = TxnKey,
NewUAMQ = queue:join(UAQ, UAMQ),
new_tx(State#ch{unacked_message_q = NewUAMQ}).
+rollback_and_notify(State = #ch{state = closing}) ->
+ {ok, State};
rollback_and_notify(State = #ch{transaction_id = none}) ->
- notify_queues(State);
+ {notify_queues(State), State#ch{state = closing}};
rollback_and_notify(State) ->
- notify_queues(internal_rollback(State)).
+ State1 = internal_rollback(State),
+ {notify_queues(State1), State1#ch{state = closing}}.
fold_per_queue(F, Acc0, UAQ) ->
D = rabbit_misc:queue_fold(
@@ -1240,20 +1286,31 @@ is_message_persistent(Content) ->
end.
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
- ok = basic_return(Msg, State#ch.writer_pid, no_route),
+ ok = basic_return(Msg, State, no_route),
record_confirm(MsgSeqNo, XName, State);
process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
- ok = basic_return(Msg, State#ch.writer_pid, no_consumers),
+ ok = basic_return(Msg, State, no_consumers),
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, _, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
- #ch{unconfirmed = UC} = State,
- [maybe_monitor(QPid) || QPid <- QPids],
- UC1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UC),
- State#ch{unconfirmed = UC1}.
+ #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State,
+ UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ),
+ SingletonSet = gb_sets:singleton(MsgSeqNo),
+ UQM1 = lists:foldl(
+ fun (QPid, UQM2) ->
+ maybe_monitor(QPid),
+ case gb_trees:lookup(QPid, UQM2) of
+ {value, MsgSeqNos} ->
+ MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
+ gb_trees:update(QPid, MsgSeqNos1, UQM2);
+ none ->
+ gb_trees:insert(QPid, SingletonSet, UQM2)
+ end
+ end, UQM, QPids),
+ State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
@@ -1289,11 +1346,11 @@ send_confirms(Cs, State) ->
end, State).
coalesce_and_send(MsgSeqNos, MkMsgFun,
- State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
+ State = #ch{writer_pid = WriterPid, unconfirmed_mq = UMQ}) ->
SMsgSeqNos = lists:usort(MsgSeqNos),
- CutOff = case gb_trees:is_empty(UC) of
+ CutOff = case gb_trees:is_empty(UMQ) of
true -> lists:last(SMsgSeqNos) + 1;
- false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo
+ false -> {SeqNo, _XQ} = gb_trees:smallest(UMQ), SeqNo
end,
{Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos),
case Ms of
@@ -1305,10 +1362,6 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss],
State.
-terminate(_State) ->
- pg_local:leave(rabbit_channels, self()),
- rabbit_event:notify(channel_closed, [{pid, self()}]).
-
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _) -> self();
@@ -1320,8 +1373,8 @@ i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
-i(messages_unconfirmed, #ch{unconfirmed = UC}) ->
- gb_trees:size(UC);
+i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) ->
+ gb_trees:size(UMQ);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ,
uncommitted_ack_q = UAQ}) ->
queue:len(UAMQ) + queue:len(UAQ);
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index d21cfdb7fb..9cc407bc34 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -31,11 +31,13 @@
-export_type([start_link_args/0]).
-type(start_link_args() ::
- {'tcp', rabbit_types:protocol(), rabbit_net:socket(),
- rabbit_channel:channel_number(), non_neg_integer(), pid(),
- rabbit_types:user(), rabbit_types:vhost(), pid()} |
- {'direct', rabbit_channel:channel_number(), pid(), rabbit_types:user(),
- rabbit_types:vhost(), pid()}).
+ {'tcp', rabbit_net:socket(), rabbit_channel:channel_number(),
+ non_neg_integer(), pid(), rabbit_types:protocol(), rabbit_types:user(),
+ rabbit_types:vhost(), rabbit_framing:amqp_table(),
+ pid()} |
+ {'direct', rabbit_channel:channel_number(), pid(),
+ rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
+ rabbit_framing:amqp_table(), pid()}).
-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}).
@@ -43,8 +45,8 @@
%%----------------------------------------------------------------------------
-start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
- Collector}) ->
+start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, User, VHost,
+ Capabilities, Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, WriterPid} =
supervisor2:start_child(
@@ -56,19 +58,21 @@ start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
- [Channel, ReaderPid, WriterPid, User, VHost,
- Collector, start_limiter_fun(SupPid)]},
+ [Channel, ReaderPid, WriterPid, Protocol, User, VHost,
+ Capabilities, Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
{ok, SupPid, {ChannelPid, AState}};
-start_link({direct, Channel, ClientChannelPid, User, VHost, Collector}) ->
+start_link({direct, Channel, ClientChannelPid, Protocol, User, VHost,
+ Capabilities, Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, ChannelPid} =
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
- [Channel, ClientChannelPid, ClientChannelPid,
- User, VHost, Collector, start_limiter_fun(SupPid)]},
+ [Channel, ClientChannelPid, ClientChannelPid, Protocol,
+ User, VHost, Capabilities, Collector,
+ start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, SupPid, {ChannelPid, none}}.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 15f1e77d77..4b8e4f3b15 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -20,6 +20,7 @@
-export([start/0, stop/0, action/5, diagnostics/1]).
-define(RPC_TIMEOUT, infinity).
+-define(WAIT_FOR_VM_ATTEMPTS, 5).
-define(QUIET_OPT, "-q").
-define(NODE_OPT, "-n").
@@ -44,22 +45,18 @@
start() ->
{ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
- FullCommand = init:get_plain_arguments(),
- case FullCommand of
- [] -> usage();
- _ -> ok
- end,
{[Command0 | Args], Opts} =
- rabbit_misc:get_options(
- [{flag, ?QUIET_OPT}, {option, ?NODE_OPT, NodeStr},
- {option, ?VHOST_OPT, "/"}],
- FullCommand),
- Opts1 = lists:map(fun({K, V}) ->
- case K of
- ?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)};
- _ -> {K, V}
- end
- end, Opts),
+ case rabbit_misc:get_options([{flag, ?QUIET_OPT},
+ {option, ?NODE_OPT, NodeStr},
+ {option, ?VHOST_OPT, "/"}],
+ init:get_plain_arguments()) of
+ {[], _Opts} -> usage();
+ CmdArgsAndOpts -> CmdArgsAndOpts
+ end,
+ Opts1 = [case K of
+ ?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)};
+ _ -> {K, V}
+ end || {K, V} <- Opts],
Command = list_to_atom(Command0),
Quiet = proplists:get_bool(?QUIET_OPT, Opts1),
Node = proplists:get_value(?NODE_OPT, Opts1),
@@ -297,7 +294,30 @@ action(list_permissions, Node, [], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Listing permissions in vhost ~p", [VHost]),
display_list(call(Node, {rabbit_auth_backend_internal,
- list_vhost_permissions, [VHost]})).
+ list_vhost_permissions, [VHost]}));
+
+action(wait, Node, [], _Opts, Inform) ->
+ Inform("Waiting for ~p", [Node]),
+ wait_for_application(Node, ?WAIT_FOR_VM_ATTEMPTS).
+
+wait_for_application(Node, Attempts) ->
+ case rpc_call(Node, application, which_applications, [infinity]) of
+ {badrpc, _} = E -> NewAttempts = Attempts - 1,
+ case NewAttempts of
+ 0 -> E;
+ _ -> wait_for_application0(Node, NewAttempts)
+ end;
+ Apps -> case proplists:is_defined(rabbit, Apps) of
+ %% We've seen the node up; if it goes down
+ %% die immediately.
+ true -> ok;
+ false -> wait_for_application0(Node, 0)
+ end
+ end.
+
+wait_for_application0(Node, Attempts) ->
+ timer:sleep(1000),
+ wait_for_application(Node, Attempts).
default_if_empty(List, Default) when is_list(List) ->
if List == [] ->
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 3b8c9fba39..586563f61e 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -16,7 +16,7 @@
-module(rabbit_direct).
--export([boot/0, connect/3, start_channel/5]).
+-export([boot/0, connect/4, start_channel/7]).
-include("rabbit.hrl").
@@ -25,12 +25,13 @@
-ifdef(use_specs).
-spec(boot/0 :: () -> 'ok').
--spec(connect/3 :: (binary(), binary(), binary()) ->
+-spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) ->
{'ok', {rabbit_types:user(),
rabbit_framing:amqp_table()}}).
--spec(start_channel/5 :: (rabbit_channel:channel_number(), pid(),
- rabbit_types:user(), rabbit_types:vhost(), pid()) ->
- {'ok', pid()}).
+-spec(start_channel/7 ::
+ (rabbit_channel:channel_number(), pid(), rabbit_types:protocol(),
+ rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
+ pid()) -> {'ok', pid()}).
-endif.
@@ -49,13 +50,14 @@ boot() ->
%%----------------------------------------------------------------------------
-connect(Username, Password, VHost) ->
+connect(Username, Password, VHost, Protocol) ->
case lists:keymember(rabbit, 1, application:which_applications()) of
true ->
try rabbit_access_control:user_pass_login(Username, Password) of
#user{} = User ->
try rabbit_access_control:check_vhost_access(User, VHost) of
- ok -> {ok, {User, rabbit_reader:server_properties()}}
+ ok -> {ok, {User,
+ rabbit_reader:server_properties(Protocol)}}
catch
exit:#amqp_error{name = access_refused} ->
{error, access_refused}
@@ -67,9 +69,11 @@ connect(Username, Password, VHost) ->
{error, broker_not_found_on_node}
end.
-start_channel(Number, ClientChannelPid, User, VHost, Collector) ->
+start_channel(Number, ClientChannelPid, Protocol, User, VHost, Capabilities,
+ Collector) ->
{ok, _, {ChannelPid, _}} =
supervisor2:start_child(
rabbit_direct_client_sup,
- [{direct, Number, ClientChannelPid, User, VHost, Collector}]),
+ [{direct, Number, ClientChannelPid, Protocol, User, VHost,
+ Capabilities, Collector}]),
{ok, ChannelPid}.
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 86ea7282d9..1b72dd761a 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -65,7 +65,7 @@ start_link(ChPid, UnackedMsgCount) ->
limit(undefined, 0) ->
ok;
limit(LimiterPid, PrefetchCount) ->
- gen_server2:call(LimiterPid, {limit, PrefetchCount}).
+ gen_server2:call(LimiterPid, {limit, PrefetchCount}, infinity).
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 1ad65759d4..b1488365db 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -372,15 +372,14 @@ init_db(ClusterNodes, Force) ->
%% True single disc node, attempt upgrade
ok = wait_for_tables(),
case rabbit_upgrade:maybe_upgrade() of
- ok -> ensure_schema_ok();
+ ok -> ensure_schema_integrity();
version_not_available -> schema_ok_or_move()
end;
{[], true, _} ->
%% "Master" (i.e. without config) disc node in cluster,
%% verify schema
- ok = wait_for_tables(),
ensure_version_ok(rabbit_upgrade:read_version()),
- ensure_schema_ok();
+ ensure_schema_integrity();
{[], false, _} ->
%% Nothing there at all, start from scratch
ok = create_schema();
@@ -397,7 +396,7 @@ init_db(ClusterNodes, Force) ->
true -> disc;
false -> ram
end),
- ensure_schema_ok()
+ ensure_schema_integrity()
end;
{error, Reason} ->
%% one reason we may end up here is if we try to join
@@ -430,12 +429,6 @@ ensure_version_ok({ok, DiscVersion}) ->
ensure_version_ok({error, _}) ->
ok = rabbit_upgrade:write_version().
-ensure_schema_ok() ->
- case check_schema_integrity() of
- ok -> ok;
- {error, Reason} -> throw({error, {schema_invalid, Reason}})
- end.
-
create_schema() ->
mnesia:stop(),
rabbit_misc:ensure_ok(mnesia:create_schema([node()]),
@@ -444,7 +437,6 @@ create_schema() ->
cannot_start_mnesia),
ok = create_tables(),
ok = ensure_schema_integrity(),
- ok = wait_for_tables(),
ok = rabbit_upgrade:write_version().
move_db() ->
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e9c356e1f6..7f3cf35faa 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -33,7 +33,7 @@
-include("rabbit_msg_store.hrl").
--define(SYNC_INTERVAL, 5). %% milliseconds
+-define(SYNC_INTERVAL, 25). %% milliseconds
-define(CLEAN_FILENAME, "clean.dot").
-define(FILE_SUMMARY_FILENAME, "file_summary.ets").
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
deleted file mode 100644
index ebd7fe8a06..0000000000
--- a/src/rabbit_multi.erl
+++ /dev/null
@@ -1,349 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
-%%
-
--module(rabbit_multi).
--include("rabbit.hrl").
-
--export([start/0, stop/0]).
-
--define(RPC_SLEEP, 500).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--spec(start/0 :: () -> no_return()).
--spec(stop/0 :: () -> 'ok').
--spec(usage/0 :: () -> no_return()).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-start() ->
- RpcTimeout =
- case init:get_argument(maxwait) of
- {ok,[[N1]]} -> 1000 * list_to_integer(N1);
- _ -> ?MAX_WAIT
- end,
- case init:get_plain_arguments() of
- [] ->
- usage();
- FullCommand ->
- {Command, Args} = parse_args(FullCommand),
- case catch action(Command, Args, RpcTimeout) of
- ok ->
- io:format("done.~n"),
- halt();
- {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
- print_error("invalid command '~s'",
- [string:join(FullCommand, " ")]),
- usage();
- timeout ->
- print_error("timeout starting some nodes.", []),
- halt(1);
- Other ->
- print_error("~p", [Other]),
- halt(2)
- end
- end.
-
-print_error(Format, Args) ->
- rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args).
-
-parse_args([Command | Args]) ->
- {list_to_atom(Command), Args}.
-
-stop() ->
- ok.
-
-usage() ->
- io:format("~s", [rabbit_multi_usage:usage()]),
- halt(1).
-
-action(start_all, [NodeCount], RpcTimeout) ->
- io:format("Starting all nodes...~n", []),
- application:load(rabbit),
- {_NodeNamePrefix, NodeHost} = NodeName = rabbit_misc:nodeparts(
- getenv("RABBITMQ_NODENAME")),
- case net_adm:names(NodeHost) of
- {error, EpmdReason} ->
- throw({cannot_connect_to_epmd, NodeHost, EpmdReason});
- {ok, _} ->
- ok
- end,
- {NodePids, Running} =
- case list_to_integer(NodeCount) of
- 1 -> {NodePid, Started} = start_node(rabbit_misc:makenode(NodeName),
- RpcTimeout),
- {[NodePid], Started};
- N -> start_nodes(N, N, [], true, NodeName,
- get_node_tcp_listener(), RpcTimeout)
- end,
- write_pids_file(NodePids),
- case Running of
- true -> ok;
- false -> timeout
- end;
-
-action(status, [], RpcTimeout) ->
- io:format("Status of all running nodes...~n", []),
- call_all_nodes(
- fun ({Node, Pid}) ->
- RabbitRunning =
- case is_rabbit_running(Node, RpcTimeout) of
- false -> not_running;
- true -> running
- end,
- io:format("Node '~p' with Pid ~p: ~p~n",
- [Node, Pid, RabbitRunning])
- end);
-
-action(stop_all, [], RpcTimeout) ->
- io:format("Stopping all nodes...~n", []),
- call_all_nodes(fun ({Node, Pid}) ->
- io:format("Stopping node ~p~n", [Node]),
- rpc:call(Node, rabbit, stop_and_halt, []),
- case kill_wait(Pid, RpcTimeout, false) of
- false -> kill_wait(Pid, RpcTimeout, true);
- true -> ok
- end,
- io:format("OK~n", [])
- end),
- delete_pids_file();
-
-action(rotate_logs, [], RpcTimeout) ->
- action(rotate_logs, [""], RpcTimeout);
-
-action(rotate_logs, [Suffix], RpcTimeout) ->
- io:format("Rotating logs for all nodes...~n", []),
- BinarySuffix = list_to_binary(Suffix),
- call_all_nodes(
- fun ({Node, _}) ->
- io:format("Rotating logs for node ~p", [Node]),
- case rpc:call(Node, rabbit, rotate_logs,
- [BinarySuffix], RpcTimeout) of
- {badrpc, Error} -> io:format(": ~p.~n", [Error]);
- ok -> io:format(": ok.~n", [])
- end
- end).
-
-%% PNodePid is the list of PIDs
-%% Running is a boolean exhibiting success at some moment
-start_nodes(0, _, PNodePid, Running, _, _, _) -> {PNodePid, Running};
-
-start_nodes(N, Total, PNodePid, Running, NodeNameBase, Listener, RpcTimeout) ->
- {NodePre, NodeSuff} = NodeNameBase,
- NodeNumber = Total - N,
- NodePre1 = case NodeNumber of
- %% For compatibility with running a single node
- 0 -> NodePre;
- _ -> NodePre ++ "_" ++ integer_to_list(NodeNumber)
- end,
- Node = rabbit_misc:makenode({NodePre1, NodeSuff}),
- os:putenv("RABBITMQ_NODENAME", atom_to_list(Node)),
- case Listener of
- {NodeIpAddress, NodePortBase} ->
- NodePort = NodePortBase + NodeNumber,
- os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)),
- os:putenv("RABBITMQ_NODE_IP_ADDRESS", NodeIpAddress);
- undefined ->
- ok
- end,
- {NodePid, Started} = start_node(Node, RpcTimeout),
- start_nodes(N - 1, Total, [NodePid | PNodePid],
- Started and Running, NodeNameBase, Listener, RpcTimeout).
-
-start_node(Node, RpcTimeout) ->
- io:format("Starting node ~s...~n", [Node]),
- case rpc:call(Node, os, getpid, []) of
- {badrpc, _} ->
- Port = run_rabbitmq_server(),
- Started = wait_for_rabbit_to_start(Node, RpcTimeout, Port),
- Pid = case rpc:call(Node, os, getpid, []) of
- {badrpc, _} -> throw(cannot_get_pid);
- PidS -> list_to_integer(PidS)
- end,
- io:format("~s~n", [case Started of
- true -> "OK";
- false -> "timeout"
- end]),
- {{Node, Pid}, Started};
- PidS ->
- Pid = list_to_integer(PidS),
- throw({node_already_running, Node, Pid})
- end.
-
-wait_for_rabbit_to_start(_ , RpcTimeout, _) when RpcTimeout < 0 ->
- false;
-wait_for_rabbit_to_start(Node, RpcTimeout, Port) ->
- case is_rabbit_running(Node, RpcTimeout) of
- true -> true;
- false -> receive
- {'EXIT', Port, PosixCode} ->
- throw({node_start_failed, PosixCode})
- after ?RPC_SLEEP ->
- wait_for_rabbit_to_start(
- Node, RpcTimeout - ?RPC_SLEEP, Port)
- end
- end.
-
-run_rabbitmq_server() ->
- with_os([{unix, fun run_rabbitmq_server_unix/0},
- {win32, fun run_rabbitmq_server_win32/0}]).
-
-run_rabbitmq_server_unix() ->
- CmdLine = getenv("RABBITMQ_SCRIPT_HOME") ++ "/rabbitmq-server -noinput",
- erlang:open_port({spawn, CmdLine}, [nouse_stdio]).
-
-run_rabbitmq_server_win32() ->
- Cmd = filename:nativename(os:find_executable("cmd")),
- CmdLine = "\"" ++ getenv("RABBITMQ_SCRIPT_HOME") ++
- "\\rabbitmq-server.bat\" -noinput -detached",
- erlang:open_port({spawn_executable, Cmd},
- [{arg0, Cmd}, {args, ["/q", "/s", "/c", CmdLine]},
- nouse_stdio]).
-
-is_rabbit_running(Node, RpcTimeout) ->
- case rpc:call(Node, rabbit, status, [], RpcTimeout) of
- {badrpc, _} -> false;
- Status -> case proplists:get_value(running_applications, Status) of
- undefined -> false;
- Apps -> lists:keymember(rabbit, 1, Apps)
- end
- end.
-
-with_os(Handlers) ->
- {OsFamily, _} = os:type(),
- case proplists:get_value(OsFamily, Handlers) of
- undefined -> throw({unsupported_os, OsFamily});
- Handler -> Handler()
- end.
-
-pids_file() -> getenv("RABBITMQ_PIDS_FILE").
-
-write_pids_file(Pids) ->
- FileName = pids_file(),
- Handle = case file:open(FileName, [write]) of
- {ok, Device} ->
- Device;
- {error, Reason} ->
- throw({cannot_create_pids_file, FileName, Reason})
- end,
- try
- ok = io:write(Handle, Pids),
- ok = io:put_chars(Handle, [$.])
- after
- case file:close(Handle) of
- ok -> ok;
- {error, Reason1} ->
- throw({cannot_create_pids_file, FileName, Reason1})
- end
- end,
- ok.
-
-delete_pids_file() ->
- FileName = pids_file(),
- case file:delete(FileName) of
- ok -> ok;
- {error, enoent} -> ok;
- {error, Reason} -> throw({cannot_delete_pids_file, FileName, Reason})
- end.
-
-read_pids_file() ->
- FileName = pids_file(),
- case file:consult(FileName) of
- {ok, [Pids]} -> Pids;
- {error, enoent} -> [];
- {error, Reason} -> throw({cannot_read_pids_file, FileName, Reason})
- end.
-
-kill_wait(Pid, TimeLeft, Forceful) when TimeLeft < 0 ->
- Cmd = with_os([{unix, fun () -> if Forceful -> "kill -9";
- true -> "kill"
- end
- end},
- %% Kill forcefully always on Windows, since erl.exe
- %% seems to completely ignore non-forceful killing
- %% even when everything is working
- {win32, fun () -> "taskkill /f /pid" end}]),
- os:cmd(Cmd ++ " " ++ integer_to_list(Pid)),
- false; % Don't assume what we did just worked!
-
-% Returns true if the process is dead, false otherwise.
-kill_wait(Pid, TimeLeft, Forceful) ->
- timer:sleep(?RPC_SLEEP),
- io:format(".", []),
- is_dead(Pid) orelse kill_wait(Pid, TimeLeft - ?RPC_SLEEP, Forceful).
-
-% Test using some OS clunkiness since we shouldn't trust
-% rpc:call(os, getpid, []) at this point
-is_dead(Pid) ->
- PidS = integer_to_list(Pid),
- with_os([{unix, fun () ->
- system("kill -0 " ++ PidS
- ++ " >/dev/null 2>&1") /= 0
- end},
- {win32, fun () ->
- Res = os:cmd("tasklist /nh /fi \"pid eq " ++
- PidS ++ "\" 2>&1"),
- case re:run(Res, "erl\\.exe", [{capture, none}]) of
- match -> false;
- _ -> true
- end
- end}]).
-
-% Like system(3)
-system(Cmd) ->
- ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'",
- Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]),
- receive {Port, {exit_status, Status}} -> Status end.
-
-% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'"
-escape_quotes(Cmd) ->
- lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)).
-
-call_all_nodes(Func) ->
- case read_pids_file() of
- [] -> throw(no_nodes_running);
- NodePids -> lists:foreach(Func, NodePids)
- end.
-
-getenv(Var) ->
- case os:getenv(Var) of
- false -> throw({missing_env_var, Var});
- Value -> Value
- end.
-
-get_node_tcp_listener() ->
- try
- {getenv("RABBITMQ_NODE_IP_ADDRESS"),
- list_to_integer(getenv("RABBITMQ_NODE_PORT"))}
- catch _ ->
- case application:get_env(rabbit, tcp_listeners) of
- {ok, [{_IpAddy, _Port} = Listener]} ->
- Listener;
- {ok, [Port]} when is_number(Port) ->
- {"0.0.0.0", Port};
- {ok, []} ->
- undefined;
- {ok, Other} ->
- throw({cannot_start_multiple_nodes, multiple_tcp_listeners,
- Other});
- undefined ->
- throw({missing_configuration, tcp_listeners})
- end
- end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 1781469a13..3908b64692 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -24,7 +24,7 @@
-export([init/4, mainloop/2]).
--export([conserve_memory/2, server_properties/0]).
+-export([conserve_memory/2, server_properties/1]).
-export([process_channel_frame/5]). %% used by erlang-client
@@ -57,92 +57,6 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
-%% connection lifecycle
-%%
-%% all state transitions and terminations are marked with *...*
-%%
-%% The lifecycle begins with: start handshake_timeout timer, *pre-init*
-%%
-%% all states, unless specified otherwise:
-%% socket error -> *exit*
-%% socket close -> *throw*
-%% writer send failure -> *throw*
-%% forced termination -> *exit*
-%% handshake_timeout -> *throw*
-%% pre-init:
-%% receive protocol header -> send connection.start, *starting*
-%% starting:
-%% receive connection.start_ok -> *securing*
-%% securing:
-%% check authentication credentials
-%% if authentication success -> send connection.tune, *tuning*
-%% if more challenge needed -> send connection.secure,
-%% receive connection.secure_ok *securing*
-%% otherwise send close, *exit*
-%% tuning:
-%% receive connection.tune_ok -> start heartbeats, *opening*
-%% opening:
-%% receive connection.open -> send connection.open_ok, *running*
-%% running:
-%% receive connection.close ->
-%% tell channels to terminate gracefully
-%% if no channels then send connection.close_ok, start
-%% terminate_connection timer, *closed*
-%% else *closing*
-%% forced termination
-%% -> wait for channels to terminate forcefully, start
-%% terminate_connection timer, send close, *exit*
-%% channel exit with hard error
-%% -> log error, wait for channels to terminate forcefully, start
-%% terminate_connection timer, send close, *closed*
-%% channel exit with soft error
-%% -> log error, mark channel as closing, *running*
-%% handshake_timeout -> ignore, *running*
-%% heartbeat timeout -> *throw*
-%% conserve_memory=true -> *blocking*
-%% blocking:
-%% conserve_memory=true -> *blocking*
-%% conserve_memory=false -> *running*
-%% receive a method frame for a content-bearing method
-%% -> process, stop receiving, *blocked*
-%% ...rest same as 'running'
-%% blocked:
-%% conserve_memory=true -> *blocked*
-%% conserve_memory=false -> resume receiving, *running*
-%% ...rest same as 'running'
-%% closing:
-%% socket close -> *terminate*
-%% receive connection.close -> send connection.close_ok,
-%% *closing*
-%% receive frame -> ignore, *closing*
-%% handshake_timeout -> ignore, *closing*
-%% heartbeat timeout -> *throw*
-%% channel exit with hard error
-%% -> log error, wait for channels to terminate forcefully, start
-%% terminate_connection timer, send close, *closed*
-%% channel exit with soft error
-%% -> log error, mark channel as closing
-%% if last channel to exit then send connection.close_ok,
-%% start terminate_connection timer, *closed*
-%% else *closing*
-%% channel exits normally
-%% -> if last channel to exit then send connection.close_ok,
-%% start terminate_connection timer, *closed*
-%% closed:
-%% socket close -> *terminate*
-%% receive connection.close -> send connection.close_ok,
-%% *closed*
-%% receive connection.close_ok -> self() ! terminate_connection,
-%% *closed*
-%% receive frame -> ignore, *closed*
-%% terminate_connection timeout -> *terminate*
-%% handshake_timeout -> ignore, *closed*
-%% heartbeat timeout -> *throw*
-%% channel exit -> log error, *closed*
-%%
-%%
-%% TODO: refactor the code so that the above is obvious
-
-define(IS_RUNNING(State),
(State#v1.connection_state =:= running orelse
State#v1.connection_state =:= blocking orelse
@@ -160,7 +74,8 @@
-spec(emit_stats/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
--spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
+-spec(server_properties/1 :: (rabbit_types:protocol()) ->
+ rabbit_framing:amqp_table()).
%% These specs only exists to add no_return() to keep dialyzer happy
-spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun())
@@ -219,7 +134,7 @@ conserve_memory(Pid, Conserve) ->
Pid ! {conserve_memory, Conserve},
ok.
-server_properties() ->
+server_properties(Protocol) ->
{ok, Product} = application:get_key(rabbit, id),
{ok, Version} = application:get_key(rabbit, vsn),
@@ -230,22 +145,30 @@ server_properties() ->
%% Normalize the simplifed (2-tuple) and unsimplified (3-tuple) forms
%% from the config and merge them with the generated built-in properties
NormalizedConfigServerProps =
- [case X of
- {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)),
- longstr,
- list_to_binary(Value)};
- {BinKey, Type, Value} -> {BinKey, Type, Value}
- end || X <- RawConfigServerProps ++
- [{product, Product},
- {version, Version},
- {platform, "Erlang/OTP"},
- {copyright, ?COPYRIGHT_MESSAGE},
- {information, ?INFORMATION_MESSAGE}]],
+ [{<<"capabilities">>, table, server_capabilities(Protocol)} |
+ [case X of
+ {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)),
+ longstr,
+ list_to_binary(Value)};
+ {BinKey, Type, Value} -> {BinKey, Type, Value}
+ end || X <- RawConfigServerProps ++
+ [{product, Product},
+ {version, Version},
+ {platform, "Erlang/OTP"},
+ {copyright, ?COPYRIGHT_MESSAGE},
+ {information, ?INFORMATION_MESSAGE}]]],
%% Filter duplicated properties in favor of config file provided values
lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end,
NormalizedConfigServerProps).
+server_capabilities(rabbit_framing_amqp_0_9_1) ->
+ [{<<"publisher_confirms">>, bool, true},
+ {<<"exchange_exchange_bindings">>, bool, true},
+ {<<"basic.nack">>, bool, true}];
+server_capabilities(_) ->
+ [].
+
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
socket_op(Sock, Fun) ->
@@ -328,6 +251,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
throw({inet_error, Reason});
{conserve_memory, Conserve} ->
mainloop(Deb, internal_conserve_memory(Conserve, State));
+ {channel_closing, ChPid} ->
+ ok = rabbit_channel:ready_for_close(ChPid),
+ channel_cleanup(ChPid),
+ mainloop(Deb, State);
{'EXIT', Parent, Reason} ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -435,32 +362,32 @@ close_connection(State = #v1{queue_collector = Collector,
erlang:send_after(TimeoutMillisec, self(), terminate_connection),
State#v1{connection_state = closed}.
-close_channel(Channel, State) ->
- put({channel, Channel}, closing),
- State.
-
handle_dependent_exit(ChPid, Reason, State) ->
case termination_kind(Reason) of
controlled ->
- erase({ch_pid, ChPid}),
+ channel_cleanup(ChPid),
maybe_close(State);
uncontrolled ->
case channel_cleanup(ChPid) of
undefined -> exit({abnormal_dependent_exit, ChPid, Reason});
- Channel -> maybe_close(
+ Channel -> rabbit_log:error(
+ "connection ~p, channel ~p - error:~n~p~n",
+ [self(), Channel, Reason]),
+ maybe_close(
handle_exception(State, Channel, Reason))
end
end.
channel_cleanup(ChPid) ->
case get({ch_pid, ChPid}) of
- undefined -> undefined;
- Channel -> erase({channel, Channel}),
- erase({ch_pid, ChPid}),
- Channel
+ undefined -> undefined;
+ {Channel, MRef} -> erase({channel, Channel}),
+ erase({ch_pid, ChPid}),
+ erlang:demonitor(MRef, [flush]),
+ Channel
end.
-all_channels() -> [ChPid || {{ch_pid, ChPid}, _Channel} <- get()].
+all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
terminate_channels() ->
NChannels =
@@ -515,8 +442,8 @@ maybe_close(State = #v1{connection_state = closing,
maybe_close(State) ->
State.
-termination_kind(normal) -> controlled;
-termination_kind(_) -> uncontrolled.
+termination_kind(normal) -> controlled;
+termination_kind(_) -> uncontrolled.
handle_frame(Type, 0, Payload,
State = #v1{connection_state = CS,
@@ -552,8 +479,8 @@ handle_frame(Type, Channel, Payload,
Channel, ChPid, FramingState),
put({channel, Channel}, {ChPid, NewAState}),
case AnalyzedFrame of
- {method, 'channel.close', _} ->
- erase({channel, Channel}),
+ {method, 'channel.close_ok', _} ->
+ channel_cleanup(ChPid),
State;
{method, MethodName, _} ->
case (State#v1.connection_state =:= blocking
@@ -565,25 +492,6 @@ handle_frame(Type, Channel, Payload,
_ ->
State
end;
- closing ->
- %% According to the spec, after sending a
- %% channel.close we must ignore all frames except
- %% channel.close and channel.close_ok. In the
- %% event of a channel.close, we should send back a
- %% channel.close_ok.
- case AnalyzedFrame of
- {method, 'channel.close_ok', _} ->
- erase({channel, Channel});
- {method, 'channel.close', _} ->
- %% We're already closing this channel, so
- %% there's no cleanup to do (notify
- %% queues, etc.)
- ok = rabbit_writer:internal_send_command(
- State#v1.sock, Channel,
- #'channel.close_ok'{}, Protocol);
- _ -> ok
- end,
- State;
undefined ->
case ?IS_RUNNING(State) of
true -> send_to_new_channel(
@@ -655,7 +563,7 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
Start = #'connection.start'{
version_major = ProtocolMajor,
version_minor = ProtocolMinor,
- server_properties = server_properties(),
+ server_properties = server_properties(Protocol),
mechanisms = auth_mechanisms_binary(),
locales = <<"en_US">> },
ok = send_on_channel0(Sock, Start, Protocol),
@@ -709,12 +617,18 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
connection = Connection,
sock = Sock}) ->
AuthMechanism = auth_mechanism_to_module(Mechanism),
+ Capabilities =
+ case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of
+ {table, Capabilities1} -> Capabilities1;
+ _ -> []
+ end,
State = State0#v1{auth_mechanism = AuthMechanism,
auth_state = AuthMechanism:init(Sock),
connection_state = securing,
connection =
Connection#connection{
- client_properties = ClientProperties}},
+ client_properties = ClientProperties,
+ capabilities = Capabilities}},
auth_phase(Response, State);
handle_method0(#'connection.secure_ok'{response = Response},
@@ -947,19 +861,20 @@ cert_info(F, Sock) ->
send_to_new_channel(Channel, AnalyzedFrame, State) ->
#v1{sock = Sock, queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
- connection = #connection{protocol = Protocol,
- frame_max = FrameMax,
- user = User,
- vhost = VHost}} = State,
+ connection = #connection{protocol = Protocol,
+ frame_max = FrameMax,
+ user = User,
+ vhost = VHost,
+ capabilities = Capabilities}} = State,
{ok, _ChSupPid, {ChPid, AState}} =
rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {tcp, Protocol, Sock, Channel, FrameMax, self(), User,
- VHost, Collector}),
- erlang:monitor(process, ChPid),
+ ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User,
+ VHost, Capabilities, Collector}),
+ MRef = erlang:monitor(process, ChPid),
NewAState = process_channel_frame(AnalyzedFrame, self(),
Channel, ChPid, AState),
put({channel, Channel}, {ChPid, NewAState}),
- put({ch_pid, ChPid}, Channel),
+ put({ch_pid, ChPid}, {Channel, MRef}),
State.
process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) ->
@@ -975,29 +890,20 @@ process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) ->
AState
end.
-log_channel_error(ConnectionState, Channel, Reason) ->
- rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",
- [self(), ConnectionState, Channel, Reason]).
-
-handle_exception(State = #v1{connection_state = closed}, Channel, Reason) ->
- log_channel_error(closed, Channel, Reason),
+handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) ->
State;
-handle_exception(State = #v1{connection_state = CS}, Channel, Reason) ->
- log_channel_error(CS, Channel, Reason),
+handle_exception(State, Channel, Reason) ->
send_exception(State, Channel, Reason).
send_exception(State = #v1{connection = #connection{protocol = Protocol}},
Channel, Reason) ->
- {ShouldClose, CloseChannel, CloseMethod} =
+ {0, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
- NewState = case ShouldClose of
- true -> terminate_channels(),
- close_connection(State);
- false -> close_channel(Channel, State)
- end,
+ terminate_channels(),
+ State1 = close_connection(State),
ok = rabbit_writer:internal_send_command(
- NewState#v1.sock, CloseChannel, CloseMethod, Protocol),
- NewState.
+ State1#v1.sock, 0, CloseMethod, Protocol),
+ State1.
internal_emit_stats(State = #v1{stats_timer = StatsTimer}) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 795413aa5c..9821ae7b86 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -48,7 +48,7 @@ start_link() ->
%%---------------------------------------------------------------------------
register(Class, TypeName, ModuleName) ->
- gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}).
+ gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}, infinity).
%% This is used with user-supplied arguments (e.g., on exchange
%% declare), so we restrict it to existing atoms only. This means it
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index e895b0edb2..992e5efd34 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -35,6 +35,7 @@ test_content_prop_roundtrip(Datum, Binary) ->
Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion
all_tests() ->
+ passed = gm_tests:all_tests(),
application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
ok = file_handle_cache:set_limit(10),
passed = test_file_handle_cache(),
@@ -1019,9 +1020,9 @@ test_user_management() ->
test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
- {ok, Ch} = rabbit_channel:start_link(1, self(), Writer,
- user(<<"user">>), <<"/">>, self(),
- fun (_) -> {ok, self()} end),
+ {ok, Ch} = rabbit_channel:start_link(
+ 1, self(), Writer, rabbit_framing_amqp_0_9_1, user(<<"user">>),
+ <<"/">>, [], self(), fun (_) -> {ok, self()} end),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
[rabbit_amqqueue:declare(
@@ -1079,9 +1080,9 @@ test_server_status() ->
test_spawn(Receiver) ->
Me = self(),
Writer = spawn(fun () -> Receiver(Me) end),
- {ok, Ch} = rabbit_channel:start_link(1, Me, Writer,
- user(<<"guest">>), <<"/">>, self(),
- fun (_) -> {ok, self()} end),
+ {ok, Ch} = rabbit_channel:start_link(
+ 1, Me, Writer, rabbit_framing_amqp_0_9_1, user(<<"guest">>),
+ <<"/">>, [], self(), fun (_) -> {ok, self()} end),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
after 1000 -> throw(failed_to_receive_channel_open_ok)
@@ -1233,7 +1234,7 @@ must_exit(Fun) ->
end.
test_delegates_sync(SecondaryNode) ->
- Sender = fun (Pid) -> gen_server:call(Pid, invoked) end,
+ Sender = fun (Pid) -> gen_server:call(Pid, invoked, infinity) end,
BadSender = fun (_Pid) -> exit(exception) end,
Responder = make_responder(fun ({'$gen_call', From, invoked}) ->
@@ -1305,7 +1306,7 @@ test_queue_cleanup(_SecondaryNode) ->
rabbit_channel:do(Ch, #'queue.declare'{ passive = true,
queue = ?CLEANUP_QUEUE_NAME }),
receive
- {channel_exit, 1, {amqp_error, not_found, _, _}} ->
+ #'channel.close'{reply_code = 404} ->
ok
after 2000 ->
throw(failed_to_receive_channel_exit)
@@ -2206,9 +2207,11 @@ test_configurable_server_properties() ->
BuiltInPropNames = [<<"product">>, <<"version">>, <<"platform">>,
<<"copyright">>, <<"information">>],
+ Protocol = rabbit_framing_amqp_0_9_1,
+
%% Verify that the built-in properties are initially present
- ActualPropNames = [Key ||
- {Key, longstr, _} <- rabbit_reader:server_properties()],
+ ActualPropNames = [Key || {Key, longstr, _} <-
+ rabbit_reader:server_properties(Protocol)],
true = lists:all(fun (X) -> lists:member(X, ActualPropNames) end,
BuiltInPropNames),
@@ -2219,9 +2222,10 @@ test_configurable_server_properties() ->
ConsProp = fun (X) -> application:set_env(rabbit,
server_properties,
[X | ServerProperties]) end,
- IsPropPresent = fun (X) -> lists:member(X,
- rabbit_reader:server_properties())
- end,
+ IsPropPresent =
+ fun (X) ->
+ lists:member(X, rabbit_reader:server_properties(Protocol))
+ end,
%% Add a wholly new property of the simplified {KeyAtom, StringValue} form
NewSimplifiedProperty = {NewHareKey, NewHareVal} = {hare, "soup"},
@@ -2244,7 +2248,7 @@ test_configurable_server_properties() ->
{BinNewVerKey, BinNewVerVal} = {list_to_binary(atom_to_list(NewVerKey)),
list_to_binary(NewVerVal)},
ConsProp(NewVersion),
- ClobberedServerProps = rabbit_reader:server_properties(),
+ ClobberedServerProps = rabbit_reader:server_properties(Protocol),
%% Is the clobbering insert present?
true = IsPropPresent({BinNewVerKey, longstr, BinNewVerVal}),
%% Is the clobbering insert the only thing with the clobbering key?