summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-26 17:43:34 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-26 17:43:34 +0100
commitb3bb364725f16585433cb9b5c8019a98a5a7a2b2 (patch)
tree658c8873c44323172df9c6742628ea59aaf570d7
parent7e671d094da670ca0b519ee8936cce9da016b0c9 (diff)
parent9637b0056fb1112022bda43e5451bee9237d7f63 (diff)
downloadrabbitmq-server-git-b3bb364725f16585433cb9b5c8019a98a5a7a2b2.tar.gz
Merging default into bug19844
-rw-r--r--Makefile7
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--include/rabbit.hrl2
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec6
-rwxr-xr-xpackaging/common/rabbitmq-server.ocf362
-rw-r--r--packaging/debs/Debian/debian/control2
-rw-r--r--packaging/debs/Debian/debian/rules1
-rw-r--r--packaging/macports/Portfile.in2
-rw-r--r--src/rabbit.erl14
-rw-r--r--src/rabbit_binary_generator.erl53
-rw-r--r--src/rabbit_guid.erl14
-rw-r--r--src/rabbit_misc.erl9
-rw-r--r--src/rabbit_persister.erl51
13 files changed, 440 insertions, 85 deletions
diff --git a/Makefile b/Makefile
index e058a6fa1a..3f5821ee87 100644
--- a/Makefile
+++ b/Makefile
@@ -5,7 +5,6 @@ RABBITMQ_NODENAME ?= rabbit
RABBITMQ_SERVER_START_ARGS ?=
RABBITMQ_MNESIA_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-mnesia
RABBITMQ_LOG_BASE ?= $(TMPDIR)
-RABBITMQ_CONFIG_FILE ?= $(CURDIR)/rabbitmq
DEPS_FILE=deps.mk
SOURCE_DIR=src
@@ -131,7 +130,6 @@ run: all
$(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \
RABBITMQ_ALLOW_INPUT=true \
RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS)" \
- RABBITMQ_CONFIG_FILE="$(RABBITMQ_CONFIG_FILE)" \
./scripts/rabbitmq-server
run-node: all
@@ -139,7 +137,6 @@ run-node: all
RABBITMQ_NODE_ONLY=true \
RABBITMQ_ALLOW_INPUT=true \
RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS)" \
- RABBITMQ_CONFIG_FILE="$(RABBITMQ_CONFIG_FILE)" \
./scripts/rabbitmq-server
run-tests: all
@@ -149,7 +146,6 @@ start-background-node:
$(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \
RABBITMQ_NODE_ONLY=true \
RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -detached" \
- RABBITMQ_CONFIG_FILE="$(RABBITMQ_CONFIG_FILE)" \
./scripts/rabbitmq-server ; sleep 1
start-rabbit-on-node: all
@@ -169,7 +165,8 @@ COVER_DIR=.
SECONDARY_NODENAME=hare
start-cover: all
- echo "cover:start(), rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL)
+ echo "rabbit_misc:start_cover([\"rabbit\", \"hare\"])." | $(ERL_CALL)
+ echo "rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL)
start-secondary-cover:
echo "rabbit_misc:enable_cover_node(\"$(SECONDARY_NODENAME)\")." | $(ERL_CALL)
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 3616fcbf6c..ad8e35492b 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -18,6 +18,8 @@
{ssl_listeners, []},
{ssl_options, []},
{vm_memory_high_watermark, 0.4},
+ {persister_max_wrap_entries, 500},
+ {persister_hibernate_after, 10000},
{default_user, <<"guest">>},
{default_pass, <<"guest">>},
{default_vhost, <<"/">>},
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 552aec6726..a4abc1ffb2 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -87,7 +87,7 @@
-type(file_path() :: string()).
%% this is really an abstract type, but dialyzer does not support them
--type(guid() :: any()).
+-type(guid() :: binary()).
-type(txn() :: guid()).
-type(pkey() :: guid()).
-type(r(Kind) ::
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index c318a96c22..b052d889b3 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -10,9 +10,10 @@ Source1: rabbitmq-server.init
Source2: rabbitmq-script-wrapper
Source3: rabbitmq-server.logrotate
Source4: rabbitmq-asroot-script-wrapper
+Source5: rabbitmq-server.ocf
URL: http://www.rabbitmq.com/
BuildArch: noarch
-BuildRequires: erlang, python-simplejson
+BuildRequires: erlang, python-simplejson, xmlto, libxslt
Requires: erlang, logrotate
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root
Summary: The RabbitMQ server
@@ -29,6 +30,7 @@ scalable implementation of an AMQP broker.
%define _rabbit_erllibdir %{_rabbit_libdir}/lib/rabbitmq_server-%{version}
%define _rabbit_wrapper %{_builddir}/`basename %{S:2}`
%define _rabbit_asroot_wrapper %{_builddir}/`basename %{S:4}`
+%define _rabbit_server_ocf %{_builddir}/`basename %{S:5}`
%define _maindir %{buildroot}%{_rabbit_erllibdir}
@@ -38,6 +40,7 @@ scalable implementation of an AMQP broker.
%build
cp %{S:2} %{_rabbit_wrapper}
cp %{S:4} %{_rabbit_asroot_wrapper}
+cp %{S:5} %{_rabbit_server_ocf}
make %{?_smp_mflags}
%install
@@ -57,6 +60,7 @@ install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server
install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi
install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-activate-plugins
install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-deactivate-plugins
+install -p -D -m 0755 %{_rabbit_server_ocf} %{buildroot}%{_exec_prefix}/lib/ocf/resource.d/rabbitmq/rabbitmq-server
install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server
diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf
new file mode 100755
index 0000000000..97c58ea2f8
--- /dev/null
+++ b/packaging/common/rabbitmq-server.ocf
@@ -0,0 +1,362 @@
+#!/bin/sh
+##
+## OCF Resource Agent compliant rabbitmq-server resource script.
+##
+
+## The contents of this file are subject to the Mozilla Public License
+## Version 1.1 (the "License"); you may not use this file except in
+## compliance with the License. You may obtain a copy of the License at
+## http://www.mozilla.org/MPL/
+##
+## Software distributed under the License is distributed on an "AS IS"
+## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+## License for the specific language governing rights and limitations
+## under the License.
+##
+## The Original Code is RabbitMQ.
+##
+## The Initial Developers of the Original Code are LShift Ltd,
+## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+## Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+## Ltd. Portions created by Cohesive Financial Technologies LLC are
+## Copyright (C) 2007-2010 Cohesive Financial Technologies
+## LLC. Portions created by Rabbit Technologies Ltd are Copyright
+## (C) 2007-2010 Rabbit Technologies Ltd.
+##
+## All Rights Reserved.
+##
+## Contributor(s): ______________________________________.
+##
+
+## OCF instance parameters
+## OCF_RESKEY_multi
+## OCF_RESKEY_ctl
+## OCF_RESKEY_nodename
+## OCF_RESKEY_ip
+## OCF_RESKEY_port
+## OCF_RESKEY_cluster_config_file
+## OCF_RESKEY_config_file
+## OCF_RESKEY_log_base
+## OCF_RESKEY_mnesia_base
+## OCF_RESKEY_server_start_args
+
+#######################################################################
+# Initialization:
+
+. ${OCF_ROOT}/resource.d/heartbeat/.ocf-shellfuncs
+
+#######################################################################
+
+OCF_RESKEY_multi_default="/usr/sbin/rabbitmq-multi"
+OCF_RESKEY_ctl_default="/usr/sbin/rabbitmqctl"
+OCF_RESKEY_nodename_default="rabbit@localhost"
+OCF_RESKEY_log_base_default="/var/log/rabbitmq"
+: ${OCF_RESKEY_multi=${OCF_RESKEY_multi_default}}
+: ${OCF_RESKEY_ctl=${OCF_RESKEY_ctl_default}}
+: ${OCF_RESKEY_nodename=${OCF_RESKEY_nodename_default}}
+: ${OCF_RESKEY_log_base=${OCF_RESKEY_log_base_default}}
+
+meta_data() {
+ cat <<END
+<?xml version="1.0"?>
+<!DOCTYPE resource-agent SYSTEM "ra-api-1.dtd">
+<resource-agent name="rabbitmq-server">
+<version>1.0</version>
+
+<longdesc lang="en">
+Resource agent for RabbitMQ-server
+</longdesc>
+
+<shortdesc lang="en">Resource agent for RabbitMQ-server</shortdesc>
+
+<parameters>
+<parameter name="multi" unique="0" required="0">
+<longdesc lang="en">
+The path to the rabbitmq-multi script
+</longdesc>
+<shortdesc lang="en">Path to rabbitmq-multi</shortdesc>
+<content type="string" default="${OCF_RESKEY_multi_default}" />
+</parameter>
+
+<parameter name="ctl" unique="0" required="0">
+<longdesc lang="en">
+The path to the rabbitmqctl script
+</longdesc>
+<shortdesc lang="en">Path to rabbitmqctl</shortdesc>
+<content type="string" default="${OCF_RESKEY_ctl_default}" />
+</parameter>
+
+<parameter name="nodename" unique="0" required="0">
+<longdesc lang="en">
+The node name for rabbitmq-server
+</longdesc>
+<shortdesc lang="en">Node name</shortdesc>
+<content type="string" default="${OCF_RESKEY_nodename_default}" />
+</parameter>
+
+<parameter name="ip" unique="0" required="0">
+<longdesc lang="en">
+The IP address for rabbitmq-server to listen on
+</longdesc>
+<shortdesc lang="en">IP Address</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+<parameter name="port" unique="0" required="0">
+<longdesc lang="en">
+The IP Port for rabbitmq-server to listen on
+</longdesc>
+<shortdesc lang="en">IP Port</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+<parameter name="cluster_config_file" unique="0" required="0">
+<longdesc lang="en">
+Location of the cluster config file
+</longdesc>
+<shortdesc lang="en">Cluster config file path</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+<parameter name="config_file" unique="0" required="0">
+<longdesc lang="en">
+Location of the config file
+</longdesc>
+<shortdesc lang="en">Config file path</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+<parameter name="log_base" unique="0" required="0">
+<longdesc lang="en">
+Location of the directory under which logs will be created
+</longdesc>
+<shortdesc lang="en">Log base path</shortdesc>
+<content type="string" default="${OCF_RESKEY_log_base_default}" />
+</parameter>
+
+<parameter name="mnesia_base" unique="0" required="0">
+<longdesc lang="en">
+Location of the directory under which mnesia will store data
+</longdesc>
+<shortdesc lang="en">Mnesia base path</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+<parameter name="server_start_args" unique="0" required="0">
+<longdesc lang="en">
+Additional arguments provided to the server on startup
+</longdesc>
+<shortdesc lang="en">Server start arguments</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+</parameters>
+
+<actions>
+<action name="start" timeout="600" />
+<action name="stop" timeout="120" />
+<action name="monitor" timeout="20" interval="10" depth="0" start-delay="0" />
+<action name="validate-all" timeout="30" />
+<action name="meta-data" timeout="5" />
+</actions>
+</resource-agent>
+END
+}
+
+rabbit_usage() {
+ cat <<END
+usage: $0 {start|stop|monitor|validate-all|meta-data}
+
+Expects to have a fully populated OCF RA-compliant environment set.
+END
+}
+
+RABBITMQ_MULTI=$OCF_RESKEY_multi
+RABBITMQ_CTL=$OCF_RESKEY_ctl
+RABBITMQ_NODENAME=$OCF_RESKEY_nodename
+RABBITMQ_NODE_IP_ADDRESS=$OCF_RESKEY_ip
+RABBITMQ_NODE_PORT=$OCF_RESKEY_port
+RABBITMQ_CLUSTER_CONFIG_FILE=$OCF_RESKEY_cluster_config_file
+RABBITMQ_CONFIG_FILE=$OCF_RESKEY_config_file
+RABBITMQ_LOG_BASE=$OCF_RESKEY_log_base
+RABBITMQ_MNESIA_BASE=$OCF_RESKEY_mnesia_base
+RABBITMQ_SERVER_START_ARGS=$OCF_RESKEY_server_start_args
+[ ! -z $RABBITMQ_NODENAME ] && NODENAME_ARG="-n $RABBITMQ_NODENAME"
+[ ! -z $RABBITMQ_NODENAME ] && export RABBITMQ_NODENAME
+
+export_vars() {
+ [ ! -z $RABBITMQ_NODE_IP_ADDRESS ] && export RABBITMQ_NODE_IP_ADDRESS
+ [ ! -z $RABBITMQ_NODE_PORT ] && export RABBITMQ_NODE_PORT
+ [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && export RABBITMQ_CLUSTER_CONFIG_FILE
+ [ ! -z $RABBITMQ_CONFIG_FILE ] && export RABBITMQ_CONFIG_FILE
+ [ ! -z $RABBITMQ_LOG_BASE ] && export RABBITMQ_LOG_BASE
+ [ ! -z $RABBITMQ_MNESIA_BASE ] && export RABBITMQ_MNESIA_BASE
+ [ ! -z $RABBITMQ_SERVER_START_ARGS ] && export RABBITMQ_SERVER_START_ARGS
+}
+
+rabbit_validate_partial() {
+ if [ ! -x $RABBITMQ_MULTI ]; then
+ ocf_log err "rabbitmq-server multi $RABBITMQ_MULTI does not exist or is not executable";
+ return $OCF_ERR_ARGS;
+ fi
+
+ if [ ! -x $RABBITMQ_CTL ]; then
+ ocf_log err "rabbitmq-server ctl $RABBITMQ_CTL does not exist or is not executable";
+ return $OCF_ERR_ARGS;
+ fi
+}
+
+rabbit_validate_full() {
+ if [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && [ ! -e $RABBITMQ_CLUSTER_CONFIG_FILE ]; then
+ ocf_log err "rabbitmq-server cluster_config_file $RABBITMQ_CLUSTER_CONFIG_FILE does not exist or is not a file";
+ return $OCF_ERR_ARGS;
+ fi
+
+ if [ ! -z $RABBITMQ_CONFIG_FILE ] && [ ! -e $RABBITMQ_CONFIG_FILE ]; then
+ ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file";
+ return $OCF_ERR_ARGS;
+ fi
+
+ if [ ! -z $RABBITMQ_LOG_BASE ] && [ ! -d $RABBITMQ_LOG_BASE ]; then
+ ocf_log err "rabbitmq-server log_base $RABBITMQ_LOG_BASE does not exist or is not a directory";
+ return $OCF_ERR_ARGS;
+ fi
+
+ if [ ! -z $RABBITMQ_MNESIA_BASE ] && [ ! -d $RABBITMQ_MNESIA_BASE ]; then
+ ocf_log err "rabbitmq-server mnesia_base $RABBITMQ_MNESIA_BASE does not exist or is not a directory";
+ return $OCF_ERR_ARGS;
+ fi
+
+ rabbit_validate_partial
+
+ return $OCF_SUCCESS
+}
+
+rabbit_status() {
+ local rc
+ $RABBITMQ_CTL $NODENAME_ARG status > /dev/null 2> /dev/null
+ rc=$?
+ case "$rc" in
+ 0)
+ return $OCF_SUCCESS
+ ;;
+ 2)
+ return $OCF_NOT_RUNNING
+ ;;
+ *)
+ ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG status: $rc"
+ return $OCF_ERR_GENERIC
+ esac
+}
+
+rabbit_start() {
+ local rc
+
+ rabbit_validate_full
+ rc=$?
+ if [ "$rc" != $OCF_SUCCESS ]; then
+ return $rc
+ fi
+
+ export_vars
+
+ $RABBITMQ_MULTI start_all 1 > ${RABBITMQ_LOG_BASE}/startup_log 2> ${RABBITMQ_LOG_BASE}/startup_err &
+ rc=$?
+
+ if [ "$rc" != 0 ]; then
+ ocf_log err "rabbitmq-server start command failed: $RABBITMQ_MULTI start_all 1, $rc"
+ return $rc
+ fi
+
+ # Spin waiting for the server to come up.
+ # Let the CRM/LRM time us out if required
+ start_wait=1
+ while [ $start_wait = 1 ]; do
+ rabbit_status
+ rc=$?
+ if [ "$rc" = $OCF_SUCCESS ]; then
+ start_wait=0
+
+ elif [ "$rc" != $OCF_NOT_RUNNING ]; then
+ ocf_log info "rabbitmq-server start failed: $rc"
+ return $OCF_ERR_GENERIC
+ fi
+ sleep 2
+ done
+
+ return $OCF_SUCCESS
+}
+
+rabbit_stop() {
+ local rc
+ $RABBITMQ_MULTI stop_all &
+ rc=$?
+
+ if [ "$rc" != 0 ]; then
+ ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_MULTI stop_all, $rc"
+ return $rc
+ fi
+
+ # Spin waiting for the server to shut down.
+ # Let the CRM/LRM time us out if required
+ stop_wait=1
+ while [ $stop_wait = 1 ]; do
+ rabbit_status
+ rc=$?
+ if [ "$rc" = $OCF_NOT_RUNNING ]; then
+ stop_wait=0
+ break
+ elif [ "$rc" != $OCF_SUCCESS ]; then
+ ocf_log info "rabbitmq-server stop failed: $rc"
+ return $OCF_ERR_GENERIC
+ fi
+ sleep 2
+ done
+
+ return $OCF_SUCCESS
+}
+
+rabbit_monitor() {
+ rabbit_status
+ return $?
+}
+
+case $__OCF_ACTION in
+ meta-data)
+ meta_data
+ exit $OCF_SUCCESS
+ ;;
+ usage|help)
+ rabbit_usage
+ exit $OCF_SUCCESS
+ ;;
+esac
+
+rabbit_validate_partial || exit
+
+case $__OCF_ACTION in
+ start)
+ rabbit_start
+ ;;
+ stop)
+ rabbit_stop
+ ;;
+ monitor)
+ rabbit_monitor
+ ;;
+ validate-all)
+ exit $OCF_SUCCESS
+ ;;
+ *)
+ rabbit_usage
+ exit $OCF_ERR_UNIMPLEMENTED
+ ;;
+esac
+
+exit $? \ No newline at end of file
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index d4e2cd1763..479c356829 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -2,7 +2,7 @@ Source: rabbitmq-server
Section: net
Priority: extra
Maintainer: Tony Garnock-Jones <tonyg@rabbitmq.com>
-Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson
+Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc
Standards-Version: 3.8.0
Package: rabbitmq-server
diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules
index 3799c4387a..45659602f5 100644
--- a/packaging/debs/Debian/debian/rules
+++ b/packaging/debs/Debian/debian/rules
@@ -21,3 +21,4 @@ install/rabbitmq-server::
install -p -D -m 0755 debian/rabbitmq-asroot-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \
done
sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm
+ install -p -D -m 0755 debian/rabbitmq-server.ocf $(DEB_DESTDIR)usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server
diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in
index e1f582124a..153727be9a 100644
--- a/packaging/macports/Portfile.in
+++ b/packaging/macports/Portfile.in
@@ -23,7 +23,7 @@ checksums \
sha1 @sha1@ \
rmd160 @rmd160@
-depends_build port:erlang
+depends_build port:erlang port:xmlto port:libxslt
depends_run port:erlang
platform darwin 7 {
diff --git a/src/rabbit.erl b/src/rabbit.erl
index e7bff27082..26f244bc6d 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -91,6 +91,13 @@
{requires, kernel_ready},
{enables, core_initialized}]}).
+-rabbit_boot_step({guid_generator,
+ [{description, "guid generator"},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [rabbit_guid]}},
+ {requires, kernel_ready},
+ {enables, core_initialized}]}).
+
-rabbit_boot_step({delegate_sup,
[{description, "cluster delegate"},
{mfa, {rabbit_sup, start_child,
@@ -128,13 +135,6 @@
[rabbit_persister]}},
{requires, queue_sup_queue_recovery}]}).
--rabbit_boot_step({guid_generator,
- [{description, "guid generator"},
- {mfa, {rabbit_sup, start_restartable_child,
- [rabbit_guid]}},
- {requires, persister},
- {enables, routing_ready}]}).
-
-rabbit_boot_step({routing_ready,
[{description, "message delivery logic ready"}]}).
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 1d47d7640e..ed84373585 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -95,33 +95,36 @@ maybe_encode_properties(_ContentProperties, ContentPropertiesBin)
maybe_encode_properties(ContentProperties, none) ->
rabbit_framing:encode_properties(ContentProperties).
-build_content_frames(FragmentsRev, FrameMax, ChannelInt) ->
- BodyPayloadMax = if
- FrameMax == 0 ->
- none;
- true ->
+build_content_frames(FragsRev, FrameMax, ChannelInt) ->
+ BodyPayloadMax = if FrameMax == 0 ->
+ iolist_size(FragsRev);
+ true ->
FrameMax - ?EMPTY_CONTENT_BODY_FRAME_SIZE
end,
- build_content_frames(0, [], FragmentsRev, BodyPayloadMax, ChannelInt).
-
-build_content_frames(SizeAcc, FragmentAcc, [], _BodyPayloadMax, _ChannelInt) ->
- {SizeAcc, FragmentAcc};
-build_content_frames(SizeAcc, FragmentAcc, [Fragment | FragmentsRev],
- BodyPayloadMax, ChannelInt)
- when is_number(BodyPayloadMax) and (size(Fragment) > BodyPayloadMax) ->
- <<Head:BodyPayloadMax/binary, Tail/binary>> = Fragment,
- build_content_frames(SizeAcc, FragmentAcc, [Tail, Head | FragmentsRev],
- BodyPayloadMax, ChannelInt);
-build_content_frames(SizeAcc, FragmentAcc, [<<>> | FragmentsRev],
- BodyPayloadMax, ChannelInt) ->
- build_content_frames(SizeAcc, FragmentAcc, FragmentsRev, BodyPayloadMax, ChannelInt);
-build_content_frames(SizeAcc, FragmentAcc, [Fragment | FragmentsRev],
- BodyPayloadMax, ChannelInt) ->
- build_content_frames(SizeAcc + size(Fragment),
- [create_frame(3, ChannelInt, Fragment) | FragmentAcc],
- FragmentsRev,
- BodyPayloadMax,
- ChannelInt).
+ build_content_frames(0, [], BodyPayloadMax, [],
+ lists:reverse(FragsRev), BodyPayloadMax, ChannelInt).
+
+build_content_frames(SizeAcc, FramesAcc, _FragSizeRem, [],
+ [], _BodyPayloadMax, _ChannelInt) ->
+ {SizeAcc, lists:reverse(FramesAcc)};
+build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc,
+ Frags, BodyPayloadMax, ChannelInt)
+ when FragSizeRem == 0 orelse Frags == [] ->
+ Frame = create_frame(3, ChannelInt, lists:reverse(FragAcc)),
+ FrameSize = BodyPayloadMax - FragSizeRem,
+ build_content_frames(SizeAcc + FrameSize, [Frame | FramesAcc],
+ BodyPayloadMax, [], Frags, BodyPayloadMax, ChannelInt);
+build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc,
+ [Frag | Frags], BodyPayloadMax, ChannelInt) ->
+ Size = size(Frag),
+ {NewFragSizeRem, NewFragAcc, NewFrags} =
+ case Size =< FragSizeRem of
+ true -> {FragSizeRem - Size, [Frag | FragAcc], Frags};
+ false -> <<Head:FragSizeRem/binary, Tail/binary>> = Frag,
+ {0, [Head | FragAcc], [Tail | Frags]}
+ end,
+ build_content_frames(SizeAcc, FramesAcc, NewFragSizeRem, NewFragAcc,
+ NewFrags, BodyPayloadMax, ChannelInt).
build_heartbeat_frame() ->
create_frame(?FRAME_HEARTBEAT, 0, <<>>).
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 2fa531a7ce..1ae8f7dac4 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -67,7 +67,7 @@ update_disk_serial() ->
Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME),
Serial = case rabbit_misc:read_term_file(Filename) of
{ok, [Num]} -> Num;
- {error, enoent} -> rabbit_persister:serial();
+ {error, enoent} -> 0;
{error, Reason} ->
throw({error, {cannot_read_serial_file, Filename, Reason}})
end,
@@ -78,7 +78,7 @@ update_disk_serial() ->
end,
Serial.
-%% generate a guid that is monotonically increasing per process.
+%% generate a GUID.
%%
%% The id is only unique within a single cluster and as long as the
%% serial store hasn't been deleted.
@@ -92,20 +92,18 @@ guid() ->
%% A persisted serial number, in combination with self/0 (which
%% includes the node name) uniquely identifies a process in space
%% and time. We combine that with a process-local counter to give
- %% us a GUID that is monotonically increasing per process.
+ %% us a GUID.
G = case get(guid) of
undefined -> {{gen_server:call(?SERVER, serial, infinity), self()},
0};
{S, I} -> {S, I+1}
end,
put(guid, G),
- G.
+ erlang:md5(term_to_binary(G)).
-%% generate a readable string representation of a guid. Note that any
-%% monotonicity of the guid is not preserved in the encoding.
+%% generate a readable string representation of a GUID.
string_guid(Prefix) ->
- Prefix ++ "-" ++ base64:encode_to_string(
- erlang:md5(term_to_binary(guid()))).
+ Prefix ++ "-" ++ base64:encode_to_string(guid()).
binstring_guid(Prefix) ->
list_to_binary(string_guid(Prefix)).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index d35c0a2512..119808af35 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -43,7 +43,7 @@
-export([r/3, r/2, r_arg/4, rs/1]).
-export([enable_cover/0, report_cover/0]).
-export([enable_cover/1, report_cover/1]).
--export([enable_cover_node/1]).
+-export([start_cover/1]).
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
@@ -98,6 +98,7 @@
undefined | r(K) when is_subtype(K, atom())).
-spec(rs/1 :: (r(atom())) -> string()).
-spec(enable_cover/0 :: () -> ok_or_error()).
+-spec(start_cover/1 :: ([{string(), string()} | string()]) -> 'ok').
-spec(report_cover/0 :: () -> 'ok').
-spec(enable_cover/1 :: (file_path()) -> ok_or_error()).
-spec(report_cover/1 :: (file_path()) -> 'ok').
@@ -230,9 +231,9 @@ enable_cover(Root) ->
_ -> ok
end.
-enable_cover_node(NodeS) ->
- Node = makenode(NodeS),
- {ok, _} = cover:start([Node]).
+start_cover(NodesS) ->
+ {ok, _} = cover:start([makenode(N) || N <- NodesS]),
+ ok.
report_cover() ->
report_cover(".").
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 53335a6fab..a9e0cab928 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -40,7 +40,7 @@
-export([transaction/1, extend_transaction/2, dirty_work/1,
commit_transaction/1, rollback_transaction/1,
- force_snapshot/0, serial/0]).
+ force_snapshot/0]).
-include("rabbit.hrl").
@@ -49,11 +49,7 @@
-define(LOG_BUNDLE_DELAY, 5).
-define(COMPLETE_BUNDLE_DELAY, 2).
--define(HIBERNATE_AFTER, 10000).
-
--define(MAX_WRAP_ENTRIES, 500).
-
--define(PERSISTER_LOG_FORMAT_VERSION, {2, 5}).
+-define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}).
-record(pstate, {log_handle, entry_count, deadline,
pending_logs, pending_replies,
@@ -64,7 +60,7 @@
%% the other maps a key to one or more queues.
%% The aim is to reduce the overload of storing a message multiple times
%% when it appears in several queues.
--record(psnapshot, {serial, transactions, messages, queues, next_seq_id}).
+-record(psnapshot, {transactions, messages, queues, next_seq_id}).
%%----------------------------------------------------------------------------
@@ -83,7 +79,6 @@
-spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
-spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
-spec(force_snapshot/0 :: () -> 'ok').
--spec(serial/0 :: () -> non_neg_integer()).
-endif.
@@ -116,17 +111,13 @@ rollback_transaction(TxnKey) ->
force_snapshot() ->
gen_server:call(?SERVER, force_snapshot, infinity).
-serial() ->
- gen_server:call(?SERVER, serial, infinity).
-
%%--------------------------------------------------------------------
init(_Args) ->
process_flag(trap_exit, true),
FileName = base_filename(),
ok = filelib:ensure_dir(FileName),
- Snapshot = #psnapshot{serial = 0,
- transactions = dict:new(),
+ Snapshot = #psnapshot{transactions = dict:new(),
messages = ets:new(messages, []),
queues = ets:new(queues, []),
next_seq_id = 0},
@@ -144,9 +135,7 @@ init(_Args) ->
[Recovered, Bad]),
LH
end,
- {Res, LoadedSnapshot} = internal_load_snapshot(LogHandle, Snapshot),
- NewSnapshot = LoadedSnapshot#psnapshot{
- serial = LoadedSnapshot#psnapshot.serial + 1},
+ {Res, NewSnapshot} = internal_load_snapshot(LogHandle, Snapshot),
case Res of
ok ->
ok = take_snapshot(LogHandle, NewSnapshot);
@@ -169,9 +158,6 @@ handle_call({commit_transaction, TxnKey}, From, State) ->
do_noreply(internal_commit(From, TxnKey, State));
handle_call(force_snapshot, _From, State) ->
do_reply(ok, flush(true, State));
-handle_call(serial, _From,
- State = #pstate{snapshot = #psnapshot{serial = Serial}}) ->
- do_reply(Serial, State);
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -282,12 +268,15 @@ take_snapshot_and_save_old(LogHandle, Snapshot) ->
maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount,
log_handle = LH,
- snapshot = Snapshot})
- when Force orelse EntryCount >= ?MAX_WRAP_ENTRIES ->
- ok = take_snapshot(LH, Snapshot),
- State#pstate{entry_count = 0};
-maybe_take_snapshot(_Force, State) ->
- State.
+ snapshot = Snapshot}) ->
+ {ok, MaxWrapEntries} = application:get_env(persister_max_wrap_entries),
+ if
+ Force orelse EntryCount >= MaxWrapEntries ->
+ ok = take_snapshot(LH, Snapshot),
+ State#pstate{entry_count = 0};
+ true ->
+ State
+ end.
later_ms(DeltaMilliSec) ->
{MegaSec, Sec, MicroSec} = now(),
@@ -304,7 +293,8 @@ compute_deadline(_TimerDelay, ExistingDeadline) ->
ExistingDeadline.
compute_timeout(infinity) ->
- ?HIBERNATE_AFTER;
+ {ok, HibernateAfter} = application:get_env(persister_hibernate_after),
+ HibernateAfter;
compute_timeout(Deadline) ->
DeltaMilliSec = time_diff(Deadline, now()) * 1000.0,
if
@@ -343,8 +333,7 @@ flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs,
pending_logs = [],
pending_replies = []}.
-current_snapshot(_Snapshot = #psnapshot{serial = Serial,
- transactions = Ts,
+current_snapshot(_Snapshot = #psnapshot{transactions = Ts,
messages = Messages,
queues = Queues,
next_seq_id = NextSeqId}) ->
@@ -354,8 +343,7 @@ current_snapshot(_Snapshot = #psnapshot{serial = Serial,
fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
sets:add_element(PKey, S)
end, sets:new(), Queues)),
- InnerSnapshot = {{serial, Serial},
- {txns, Ts},
+ InnerSnapshot = {{txns, Ts},
{messages, ets:tab2list(Messages)},
{queues, ets:tab2list(Queues)},
{next_seq_id, NextSeqId}},
@@ -382,13 +370,12 @@ internal_load_snapshot(LogHandle,
{K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start),
case check_version(Loaded_Snapshot) of
{ok, StateBin} ->
- {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs},
+ {{txns, Ts}, {messages, Ms}, {queues, Qs},
{next_seq_id, NextSeqId}} = binary_to_term(StateBin),
true = ets:insert(Messages, Ms),
true = ets:insert(Queues, Qs),
Snapshot1 = replay(Items, LogHandle, K,
Snapshot#psnapshot{
- serial = Serial,
transactions = Ts,
next_seq_id = NextSeqId}),
Snapshot2 = requeue_messages(Snapshot1),