summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2016-09-12 13:30:23 +0300
committerMichael Klishin <mklishin@pivotal.io>2016-09-12 13:30:23 +0300
commit60c2a6163fe1fbc360763f24c1456261fa7d22c2 (patch)
treed32fb536c2c2e9abc595a02eb2ccc2a033f053a8
parentacc39e857742cfb2ebbeb6675fd233913edf9552 (diff)
parentdc8a921682342e2b44a3dd59a76b08643ff3c0b5 (diff)
downloadrabbitmq-server-git-60c2a6163fe1fbc360763f24c1456261fa7d22c2.tar.gz
Merge branch 'stable' into rabbitmq-server-950
-rw-r--r--Makefile4
-rw-r--r--packaging/Makefile9
-rw-r--r--packaging/RPMS/Fedora/Makefile16
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.service16
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec81
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.tmpfiles1
-rw-r--r--rabbitmq-components.mk14
-rwxr-xr-xscripts/rabbitmq-server-ha.ocf1
-rw-r--r--src/mochinum.erl358
-rw-r--r--src/rabbit_amqqueue_process.erl77
-rw-r--r--src/rabbit_mirror_queue_misc.erl15
-rw-r--r--src/rabbit_policy.erl4
-rw-r--r--src/rabbit_upgrade_functions.erl19
-rw-r--r--test/dynamic_ha_SUITE.erl154
14 files changed, 359 insertions, 410 deletions
diff --git a/Makefile b/Makefile
index e211234bf6..0885e5d48d 100644
--- a/Makefile
+++ b/Makefile
@@ -5,6 +5,7 @@ VERSION ?= $(call get_app_version,src/$(PROJECT).app.src)
PACKAGES_DIR ?= $(abspath PACKAGES)
DEPS = ranch $(PLUGINS)
+TEST_DEPS = amqp_client meck proper
define usage_xml_to_erl
$(subst __,_,$(patsubst $(DOCS_DIR)/rabbitmq%.1.xml, src/rabbit_%_usage.erl, $(subst -,_,$(1))))
@@ -68,9 +69,6 @@ DEPS += $(DISTRIBUTED_DEPS)
endif
endif
-# FIXME: Remove rabbitmq_test as TEST_DEPS from here for now.
-TEST_DEPS := amqp_client meck $(filter-out rabbitmq_test,$(TEST_DEPS))
-
include erlang.mk
# --------------------------------------------------------------------
diff --git a/packaging/Makefile b/packaging/Makefile
index 02820bdb23..64a1243b51 100644
--- a/packaging/Makefile
+++ b/packaging/Makefile
@@ -35,6 +35,7 @@ all: packages
.PHONY: packages package-deb \
package-rpm package-rpm-fedora package-rpm-suse \
+ package-rpm-rhel6 package-rpm-rhel7 \
package-windows package-standalone-macosx \
package-generic-unix
@@ -70,12 +71,18 @@ packages: package-deb package-rpm package-windows package-generic-unix
package-deb: $(SOURCE_DIST_FILE)
$(gen_verbose) $(MAKE) -C debs/Debian $(VARS) all $(DO_CLEAN)
-package-rpm: package-rpm-fedora package-rpm-suse
+package-rpm: package-rpm-rhel6 package-rpm-rhel7 package-rpm-suse
@:
package-rpm-fedora: $(SOURCE_DIST_FILE)
$(gen_verbose) $(MAKE) -C RPMS/Fedora $(VARS) all $(DO_CLEAN)
+package-rpm-rhel6: $(SOURCE_DIST_FILE)
+ $(gen_verbose) $(MAKE) -C RPMS/Fedora $(VARS) RPM_OS=rhel6 all $(DO_CLEAN)
+
+package-rpm-rhel7: $(SOURCE_DIST_FILE)
+ $(gen_verbose) $(MAKE) -C RPMS/Fedora $(VARS) RPM_OS=rhel7 all $(DO_CLEAN)
+
package-rpm-suse: $(SOURCE_DIST_FILE)
$(gen_verbose) $(MAKE) -C RPMS/Fedora $(VARS) RPM_OS=suse all $(DO_CLEAN)
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index 5763cadf04..11ecaff652 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -5,7 +5,7 @@ ifeq ($(SOURCE_DIST_FILE),)
$(error Cannot find source archive; please specify SOURCE_DIST_FILE)
endif
ifneq ($(words $(SOURCE_DIST_FILE)),1)
-$(error Multile source archives found; please specify SOURCE_DIST_FILE)
+$(error Multiple source archives found; please specify SOURCE_DIST_FILE)
endif
VERSION ?= $(patsubst rabbitmq-server-%.tar.xz,%,$(notdir $(SOURCE_DIST_FILE)))
@@ -34,7 +34,13 @@ else
FUNCTION_LIBRARY=\# Source function library.\n. /etc/init.d/functions
REQUIRES=chkconfig initscripts
OS_DEFINES=--define '_initrddir /etc/rc.d/init.d'
+ifeq "$(RPM_OS)" "rhel6"
+SPEC_DEFINES=--define 'group_tag Development/Libraries' --define 'dist .el6' --define 'rhel 6'
+else ifeq "$(RPM_OS)" "rhel7"
+SPEC_DEFINES=--define 'group_tag Development/Libraries' --define '_unitdir /usr/lib/systemd/system' --define 'dist .el7' --define 'rhel 7'
+else
SPEC_DEFINES=--define 'group_tag Development/Libraries'
+endif
START_PROG=daemon
endif
@@ -53,17 +59,13 @@ prepare:
sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|' \
SPECS/rabbitmq-server.spec
+ cp rabbitmq-server.service SOURCES/rabbitmq-server.service
+ cp rabbitmq-server.tmpfiles SOURCES/rabbitmq-server.tmpfiles
cp rabbitmq-server.init SOURCES/rabbitmq-server.init
sed -i \
-e 's|^START_PROG=.*$$|START_PROG="$(START_PROG)"|' \
-e 's|^@FUNCTION_LIBRARY@|$(FUNCTION_LIBRARY)|' \
SOURCES/rabbitmq-server.init
-ifeq "$(RPM_OS)" "fedora"
-# Fedora says that only vital services should have Default-Start
- sed -i -e '/^# Default-Start:/d;/^# Default-Stop:/d' \
- SOURCES/rabbitmq-server.init
-endif
-
cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate
server: prepare
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.service b/packaging/RPMS/Fedora/rabbitmq-server.service
new file mode 100644
index 0000000000..7fe15fe36c
--- /dev/null
+++ b/packaging/RPMS/Fedora/rabbitmq-server.service
@@ -0,0 +1,16 @@
+[Unit]
+Description=RabbitMQ broker
+After=syslog.target network.target
+
+[Service]
+Type=notify
+User=rabbitmq
+Group=rabbitmq
+WorkingDirectory=/var/lib/rabbitmq
+ExecStart=/usr/sbin/rabbitmq-server
+ExecStop=/usr/sbin/rabbitmqctl stop
+NotifyAccess=all
+TimeoutStartSec=3600
+
+[Install]
+WantedBy=multi-user.target
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 83b9678ec8..9ad05e59fb 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -1,4 +1,5 @@
%define debug_package %{nil}
+%define erlang_minver R16B-03
Name: rabbitmq-server
Version: %%VERSION%%
@@ -8,14 +9,28 @@ Group: %{group_tag}
Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{version}.tar.xz
Source1: rabbitmq-server.init
Source2: rabbitmq-server.logrotate
+Source3: rabbitmq-server.service
+Source4: rabbitmq-server.tmpfiles
URL: http://www.rabbitmq.com/
BuildArch: noarch
-BuildRequires: erlang >= R16B-03, python-simplejson, xmlto, libxslt, gzip, sed, zip, rsync
-Requires: erlang >= R16B-03, logrotate, socat
+BuildRequires: erlang >= %{erlang_minver}, python-simplejson, xmlto, libxslt, gzip, sed, zip, rsync
+
+%if 0%{?fedora} || 0%{?rhel} >= 7
+BuildRequires: systemd
+%endif
+
+Requires: erlang >= %{erlang_minver}, logrotate, socat
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root
Summary: The RabbitMQ server
+
+%if 0%{?fedora} || 0%{?rhel} >= 7
+Requires(pre): systemd
+Requires(post): systemd
+Requires(preun): systemd
+%else
Requires(post): %%REQUIRES%%
Requires(pre): %%REQUIRES%%
+%endif
%description
RabbitMQ is an open source multi-protocol messaging broker.
@@ -47,7 +62,13 @@ mkdir -p %{buildroot}%{_localstatedir}/lib/rabbitmq/mnesia
mkdir -p %{buildroot}%{_localstatedir}/log/rabbitmq
#Copy all necessary lib files etc.
+
+%if 0%{?fedora} || 0%{?rhel} >= 7
+install -p -D -m 0644 %{S:3} %{buildroot}%{_unitdir}/%{name}.service
+%else
install -p -D -m 0755 %{S:1} %{buildroot}%{_initrddir}/rabbitmq-server
+%endif
+
install -p -D -m 0755 %{_rabbit_server_ocf} %{buildroot}%{_exec_prefix}/lib/ocf/resource.d/rabbitmq/rabbitmq-server
install -p -D -m 0755 %{_rabbit_server_ha_ocf} %{buildroot}%{_exec_prefix}/lib/ocf/resource.d/rabbitmq/rabbitmq-server-ha
install -p -D -m 0644 %{S:2} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server
@@ -65,6 +86,10 @@ for script in rabbitmq-server rabbitmq-plugins; do \
%{buildroot}%{_sbindir}/$script; \
done
+%if 0%{?fedora} > 14 || 0%{?rhel} >= 7
+install -D -p -m 0644 %{SOURCE4} %{buildroot}%{_prefix}/lib/tmpfiles.d/%{name}.conf
+%endif
+
rm %{_maindir}/LICENSE* %{_maindir}/INSTALL
#Build the list of files
@@ -74,7 +99,8 @@ find %{buildroot} -path %{buildroot}%{_sysconfdir} -prune -o '!' -type d -printf
%pre
if [ $1 -gt 1 ]; then
- # Upgrade - stop previous instance of rabbitmq-server init.d script
+ # Upgrade - stop previous instance of rabbitmq-server init.d (this
+ # will also activate systemd if it was used) script.
/sbin/service rabbitmq-server stop
fi
@@ -90,7 +116,19 @@ if ! getent passwd rabbitmq >/dev/null; then
fi
%post
+
+%if 0%{?fedora} || 0%{?rhel} >= 7
+# %%systemd_post %%{name}.service
+# manual expansion of systemd_post as this doesn't appear to
+# expand correctly on debian machines
+if [ $1 -eq 1 ] ; then
+ # Initial installation
+ systemctl preset %{name}.service >/dev/null 2>&1 || :
+fi
+/bin/systemctl daemon-reload
+%else
/sbin/chkconfig --add %{name}
+%endif
if [ -f %{_sysconfdir}/rabbitmq/rabbitmq.conf ] && [ ! -f %{_sysconfdir}/rabbitmq/rabbitmq-env.conf ]; then
mv %{_sysconfdir}/rabbitmq/rabbitmq.conf %{_sysconfdir}/rabbitmq/rabbitmq-env.conf
fi
@@ -99,8 +137,12 @@ chmod -R o-rwx,g-w %{_localstatedir}/lib/rabbitmq/mnesia
%preun
if [ $1 = 0 ]; then
#Complete uninstall
+%if 0%{?fedora} || 0%{?rhel} >= 7
+ systemctl stop rabbitmq-server
+%else
/sbin/service rabbitmq-server stop
/sbin/chkconfig --del rabbitmq-server
+%endif
# We do not remove /var/log and /var/lib directories
# Leave rabbitmq user and group
@@ -112,13 +154,46 @@ for ext in rel script boot ; do
rm -f %{_rabbit_erllibdir}/ebin/rabbit.$ext
done
+%postun
+%if 0%{?fedora} || 0%{?rhel} >= 7
+# %%systemd_postun_with_restart %%{name}.service
+# manual expansion of systemd_postun_with_restart as this doesn't appear to
+# expand correctly on debian machines
+if [ $1 -ge 1 ] ; then
+ # Package upgrade, not uninstall
+ systemctl try-restart %{name}.service >/dev/null 2>&1 || :
+fi
+%else
+if [ $1 -gt 1 ]; then
+ /sbin/service %{name} try-restart
+fi
+%endif
+
+%if 0%{?fedora} > 17 || 0%{?rhel} >= 7
+# For prior versions older than this, do a conversion
+# from sysv to systemd
+%triggerun -- %{name} < 3.6.5
+# Save the current service runlevel info
+# User must manually run systemd-sysv-convert --apply opensips
+# to migrate them to systemd targets
+/usr/bin/systemd-sysv-convert --save %{name} >/dev/null 2>&1 ||:
+
+# Run these because the SysV package being removed won't do them
+/sbin/chkconfig --del %{name} >/dev/null 2>&1 || :
+/bin/systemctl try-restart %{name}.service >/dev/null 2>&1 || :
+%endif
+
%files -f ../%{name}.files
%defattr(-,root,root,-)
%attr(0755, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq
%attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq/mnesia
%attr(0755, rabbitmq, rabbitmq) %dir %{_localstatedir}/log/rabbitmq
%dir %{_sysconfdir}/rabbitmq
+
+%if 0%{?rhel} < 7
%{_initrddir}/rabbitmq-server
+%endif
+
%config(noreplace) %{_sysconfdir}/logrotate.d/rabbitmq-server
%doc LICENSE*
%doc README
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.tmpfiles b/packaging/RPMS/Fedora/rabbitmq-server.tmpfiles
new file mode 100644
index 0000000000..c2681827e0
--- /dev/null
+++ b/packaging/RPMS/Fedora/rabbitmq-server.tmpfiles
@@ -0,0 +1 @@
+D /var/run/rabbitmq 0755 rabbitmq rabbitmq -
diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk
index eb9e9e3e03..e3417856f8 100644
--- a/rabbitmq-components.mk
+++ b/rabbitmq-components.mk
@@ -123,7 +123,6 @@ RABBITMQ_COMPONENTS = amqp_client \
rabbitmq_shovel \
rabbitmq_shovel_management \
rabbitmq_stomp \
- rabbitmq_test \
rabbitmq_toke \
rabbitmq_top \
rabbitmq_tracing \
@@ -262,10 +261,13 @@ prepare-dist::
ifneq ($(PROJECT),rabbit)
ifeq ($(filter rabbit,$(DEPS) $(BUILD_DEPS)),)
RUN_RMQ_TARGETS = run-broker \
+ run-tls-broker \
run-background-broker \
run-node \
run-background-node \
- start-background-node
+ start-background-node \
+ start-background-broker \
+ start-rabbit-on-node
ifneq ($(filter $(RUN_RMQ_TARGETS),$(MAKECMDGOALS)),)
BUILD_DEPS += rabbit
@@ -273,18 +275,12 @@ endif
endif
ifeq ($(filter rabbit,$(DEPS) $(BUILD_DEPS) $(TEST_DEPS)),)
-ifneq ($(filter check tests tests-with-broker test,$(MAKECMDGOALS)),)
+ifneq ($(filter check tests,$(MAKECMDGOALS)),)
TEST_DEPS += rabbit
endif
endif
endif
-ifeq ($(filter rabbit_public_umbrella amqp_client rabbit_common rabbitmq_test,$(PROJECT)),)
-ifeq ($(filter rabbitmq_test,$(DEPS) $(BUILD_DEPS) $(TEST_DEPS)),)
-TEST_DEPS += rabbitmq_test
-endif
-endif
-
# --------------------------------------------------------------------
# rabbitmq-components.mk checks.
# --------------------------------------------------------------------
diff --git a/scripts/rabbitmq-server-ha.ocf b/scripts/rabbitmq-server-ha.ocf
index c74525c936..63e5f700d9 100755
--- a/scripts/rabbitmq-server-ha.ocf
+++ b/scripts/rabbitmq-server-ha.ocf
@@ -332,7 +332,6 @@ $EXTENDED_OCF_PARAMS
<action name="status" timeout="20" />
<action name="monitor" depth="0" timeout="30" interval="5" />
<action name="monitor" depth="0" timeout="30" interval="3" role="Master"/>
-<action name="monitor" depth="30" timeout="60" interval="103" />
<action name="promote" timeout="30" />
<action name="demote" timeout="30" />
<action name="notify" timeout="20" />
diff --git a/src/mochinum.erl b/src/mochinum.erl
deleted file mode 100644
index 4ea7a22acf..0000000000
--- a/src/mochinum.erl
+++ /dev/null
@@ -1,358 +0,0 @@
-%% This file is a copy of `mochijson2.erl' from mochiweb, revision
-%% d541e9a0f36c00dcadc2e589f20e47fbf46fc76f. For the license, see
-%% `LICENSE-MIT-Mochi'.
-
-%% @copyright 2007 Mochi Media, Inc.
-%% @author Bob Ippolito <bob@mochimedia.com>
-
-%% @doc Useful numeric algorithms for floats that cover some deficiencies
-%% in the math module. More interesting is digits/1, which implements
-%% the algorithm from:
-%% http://www.cs.indiana.edu/~burger/fp/index.html
-%% See also "Printing Floating-Point Numbers Quickly and Accurately"
-%% in Proceedings of the SIGPLAN '96 Conference on Programming Language
-%% Design and Implementation.
-
--module(mochinum).
--author("Bob Ippolito <bob@mochimedia.com>").
--export([digits/1, frexp/1, int_pow/2, int_ceil/1]).
-
-%% IEEE 754 Float exponent bias
--define(FLOAT_BIAS, 1022).
--define(MIN_EXP, -1074).
--define(BIG_POW, 4503599627370496).
-
-%% External API
-
-%% @spec digits(number()) -> string()
-%% @doc Returns a string that accurately represents the given integer or float
-%% using a conservative amount of digits. Great for generating
-%% human-readable output, or compact ASCII serializations for floats.
-digits(N) when is_integer(N) ->
- integer_to_list(N);
-digits(0.0) ->
- "0.0";
-digits(Float) ->
- {Frac1, Exp1} = frexp_int(Float),
- [Place0 | Digits0] = digits1(Float, Exp1, Frac1),
- {Place, Digits} = transform_digits(Place0, Digits0),
- R = insert_decimal(Place, Digits),
- case Float < 0 of
- true ->
- [$- | R];
- _ ->
- R
- end.
-
-%% @spec frexp(F::float()) -> {Frac::float(), Exp::float()}
-%% @doc Return the fractional and exponent part of an IEEE 754 double,
-%% equivalent to the libc function of the same name.
-%% F = Frac * pow(2, Exp).
-frexp(F) ->
- frexp1(unpack(F)).
-
-%% @spec int_pow(X::integer(), N::integer()) -> Y::integer()
-%% @doc Moderately efficient way to exponentiate integers.
-%% int_pow(10, 2) = 100.
-int_pow(_X, 0) ->
- 1;
-int_pow(X, N) when N > 0 ->
- int_pow(X, N, 1).
-
-%% @spec int_ceil(F::float()) -> integer()
-%% @doc Return the ceiling of F as an integer. The ceiling is defined as
-%% F when F == trunc(F);
-%% trunc(F) when F &lt; 0;
-%% trunc(F) + 1 when F &gt; 0.
-int_ceil(X) ->
- T = trunc(X),
- case (X - T) of
- Pos when Pos > 0 -> T + 1;
- _ -> T
- end.
-
-
-%% Internal API
-
-int_pow(X, N, R) when N < 2 ->
- R * X;
-int_pow(X, N, R) ->
- int_pow(X * X, N bsr 1, case N band 1 of 1 -> R * X; 0 -> R end).
-
-insert_decimal(0, S) ->
- "0." ++ S;
-insert_decimal(Place, S) when Place > 0 ->
- L = length(S),
- case Place - L of
- 0 ->
- S ++ ".0";
- N when N < 0 ->
- {S0, S1} = lists:split(L + N, S),
- S0 ++ "." ++ S1;
- N when N < 6 ->
- %% More places than digits
- S ++ lists:duplicate(N, $0) ++ ".0";
- _ ->
- insert_decimal_exp(Place, S)
- end;
-insert_decimal(Place, S) when Place > -6 ->
- "0." ++ lists:duplicate(abs(Place), $0) ++ S;
-insert_decimal(Place, S) ->
- insert_decimal_exp(Place, S).
-
-insert_decimal_exp(Place, S) ->
- [C | S0] = S,
- S1 = case S0 of
- [] ->
- "0";
- _ ->
- S0
- end,
- Exp = case Place < 0 of
- true ->
- "e-";
- false ->
- "e+"
- end,
- [C] ++ "." ++ S1 ++ Exp ++ integer_to_list(abs(Place - 1)).
-
-
-digits1(Float, Exp, Frac) ->
- Round = ((Frac band 1) =:= 0),
- case Exp >= 0 of
- true ->
- BExp = 1 bsl Exp,
- case (Frac =/= ?BIG_POW) of
- true ->
- scale((Frac * BExp * 2), 2, BExp, BExp,
- Round, Round, Float);
- false ->
- scale((Frac * BExp * 4), 4, (BExp * 2), BExp,
- Round, Round, Float)
- end;
- false ->
- case (Exp =:= ?MIN_EXP) orelse (Frac =/= ?BIG_POW) of
- true ->
- scale((Frac * 2), 1 bsl (1 - Exp), 1, 1,
- Round, Round, Float);
- false ->
- scale((Frac * 4), 1 bsl (2 - Exp), 2, 1,
- Round, Round, Float)
- end
- end.
-
-scale(R, S, MPlus, MMinus, LowOk, HighOk, Float) ->
- Est = int_ceil(math:log10(abs(Float)) - 1.0e-10),
- %% Note that the scheme implementation uses a 326 element look-up table
- %% for int_pow(10, N) where we do not.
- case Est >= 0 of
- true ->
- fixup(R, S * int_pow(10, Est), MPlus, MMinus, Est,
- LowOk, HighOk);
- false ->
- Scale = int_pow(10, -Est),
- fixup(R * Scale, S, MPlus * Scale, MMinus * Scale, Est,
- LowOk, HighOk)
- end.
-
-fixup(R, S, MPlus, MMinus, K, LowOk, HighOk) ->
- TooLow = case HighOk of
- true ->
- (R + MPlus) >= S;
- false ->
- (R + MPlus) > S
- end,
- case TooLow of
- true ->
- [(K + 1) | generate(R, S, MPlus, MMinus, LowOk, HighOk)];
- false ->
- [K | generate(R * 10, S, MPlus * 10, MMinus * 10, LowOk, HighOk)]
- end.
-
-generate(R0, S, MPlus, MMinus, LowOk, HighOk) ->
- D = R0 div S,
- R = R0 rem S,
- TC1 = case LowOk of
- true ->
- R =< MMinus;
- false ->
- R < MMinus
- end,
- TC2 = case HighOk of
- true ->
- (R + MPlus) >= S;
- false ->
- (R + MPlus) > S
- end,
- case TC1 of
- false ->
- case TC2 of
- false ->
- [D | generate(R * 10, S, MPlus * 10, MMinus * 10,
- LowOk, HighOk)];
- true ->
- [D + 1]
- end;
- true ->
- case TC2 of
- false ->
- [D];
- true ->
- case R * 2 < S of
- true ->
- [D];
- false ->
- [D + 1]
- end
- end
- end.
-
-unpack(Float) ->
- <<Sign:1, Exp:11, Frac:52>> = <<Float:64/float>>,
- {Sign, Exp, Frac}.
-
-frexp1({_Sign, 0, 0}) ->
- {0.0, 0};
-frexp1({Sign, 0, Frac}) ->
- Exp = log2floor(Frac),
- <<Frac1:64/float>> = <<Sign:1, ?FLOAT_BIAS:11, (Frac-1):52>>,
- {Frac1, -(?FLOAT_BIAS) - 52 + Exp};
-frexp1({Sign, Exp, Frac}) ->
- <<Frac1:64/float>> = <<Sign:1, ?FLOAT_BIAS:11, Frac:52>>,
- {Frac1, Exp - ?FLOAT_BIAS}.
-
-log2floor(Int) ->
- log2floor(Int, 0).
-
-log2floor(0, N) ->
- N;
-log2floor(Int, N) ->
- log2floor(Int bsr 1, 1 + N).
-
-
-transform_digits(Place, [0 | Rest]) ->
- transform_digits(Place, Rest);
-transform_digits(Place, Digits) ->
- {Place, [$0 + D || D <- Digits]}.
-
-
-frexp_int(F) ->
- case unpack(F) of
- {_Sign, 0, Frac} ->
- {Frac, ?MIN_EXP};
- {_Sign, Exp, Frac} ->
- {Frac + (1 bsl 52), Exp - 53 - ?FLOAT_BIAS}
- end.
-
-%%
-%% Tests
-%%
--ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
-
-int_ceil_test() ->
- ?assertEqual(1, int_ceil(0.0001)),
- ?assertEqual(0, int_ceil(0.0)),
- ?assertEqual(1, int_ceil(0.99)),
- ?assertEqual(1, int_ceil(1.0)),
- ?assertEqual(-1, int_ceil(-1.5)),
- ?assertEqual(-2, int_ceil(-2.0)),
- ok.
-
-int_pow_test() ->
- ?assertEqual(1, int_pow(1, 1)),
- ?assertEqual(1, int_pow(1, 0)),
- ?assertEqual(1, int_pow(10, 0)),
- ?assertEqual(10, int_pow(10, 1)),
- ?assertEqual(100, int_pow(10, 2)),
- ?assertEqual(1000, int_pow(10, 3)),
- ok.
-
-digits_test() ->
- ?assertEqual("0",
- digits(0)),
- ?assertEqual("0.0",
- digits(0.0)),
- ?assertEqual("1.0",
- digits(1.0)),
- ?assertEqual("-1.0",
- digits(-1.0)),
- ?assertEqual("0.1",
- digits(0.1)),
- ?assertEqual("0.01",
- digits(0.01)),
- ?assertEqual("0.001",
- digits(0.001)),
- ?assertEqual("1.0e+6",
- digits(1000000.0)),
- ?assertEqual("0.5",
- digits(0.5)),
- ?assertEqual("4503599627370496.0",
- digits(4503599627370496.0)),
- %% small denormalized number
- %% 4.94065645841246544177e-324 =:= 5.0e-324
- <<SmallDenorm/float>> = <<0,0,0,0,0,0,0,1>>,
- ?assertEqual("5.0e-324",
- digits(SmallDenorm)),
- ?assertEqual(SmallDenorm,
- list_to_float(digits(SmallDenorm))),
- %% large denormalized number
- %% 2.22507385850720088902e-308
- <<BigDenorm/float>> = <<0,15,255,255,255,255,255,255>>,
- ?assertEqual("2.225073858507201e-308",
- digits(BigDenorm)),
- ?assertEqual(BigDenorm,
- list_to_float(digits(BigDenorm))),
- %% small normalized number
- %% 2.22507385850720138309e-308
- <<SmallNorm/float>> = <<0,16,0,0,0,0,0,0>>,
- ?assertEqual("2.2250738585072014e-308",
- digits(SmallNorm)),
- ?assertEqual(SmallNorm,
- list_to_float(digits(SmallNorm))),
- %% large normalized number
- %% 1.79769313486231570815e+308
- <<LargeNorm/float>> = <<127,239,255,255,255,255,255,255>>,
- ?assertEqual("1.7976931348623157e+308",
- digits(LargeNorm)),
- ?assertEqual(LargeNorm,
- list_to_float(digits(LargeNorm))),
- %% issue #10 - mochinum:frexp(math:pow(2, -1074)).
- ?assertEqual("5.0e-324",
- digits(math:pow(2, -1074))),
- ok.
-
-frexp_test() ->
- %% zero
- ?assertEqual({0.0, 0}, frexp(0.0)),
- %% one
- ?assertEqual({0.5, 1}, frexp(1.0)),
- %% negative one
- ?assertEqual({-0.5, 1}, frexp(-1.0)),
- %% small denormalized number
- %% 4.94065645841246544177e-324
- <<SmallDenorm/float>> = <<0,0,0,0,0,0,0,1>>,
- ?assertEqual({0.5, -1073}, frexp(SmallDenorm)),
- %% large denormalized number
- %% 2.22507385850720088902e-308
- <<BigDenorm/float>> = <<0,15,255,255,255,255,255,255>>,
- ?assertEqual(
- {0.99999999999999978, -1022},
- frexp(BigDenorm)),
- %% small normalized number
- %% 2.22507385850720138309e-308
- <<SmallNorm/float>> = <<0,16,0,0,0,0,0,0>>,
- ?assertEqual({0.5, -1021}, frexp(SmallNorm)),
- %% large normalized number
- %% 1.79769313486231570815e+308
- <<LargeNorm/float>> = <<127,239,255,255,255,255,255,255>>,
- ?assertEqual(
- {0.99999999999999989, 1024},
- frexp(LargeNorm)),
- %% issue #10 - mochinum:frexp(math:pow(2, -1074)).
- ?assertEqual(
- {0.5, -1073},
- frexp(math:pow(2, -1074))),
- ok.
-
--endif.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 66df42987c..29f8e53f08 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -85,6 +85,7 @@
%% e.g. message expiration messages from previously set up timers
%% that may or may not be still valid
args_policy_version,
+ mirroring_policy_version = 0,
%% running | flow | idle
status
}).
@@ -1225,22 +1226,15 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast(start_mirroring, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- %% lookup again to get policy for init_with_existing_bq
- {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
- true = BQ =/= rabbit_mirror_queue_master, %% assertion
- BQ1 = rabbit_mirror_queue_master,
- BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
- noreply(State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
-
-handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- BQ = rabbit_mirror_queue_master, %% assertion
- {BQ1, BQS1} = BQ:stop_mirroring(BQS),
- noreply(State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
+handle_cast(update_mirroring, State = #q{q = Q,
+ mirroring_policy_version = Version}) ->
+ case needs_update_mirroring(Q, Version) of
+ false ->
+ noreply(State);
+ {Policy, NewVersion} ->
+ State1 = State#q{mirroring_policy_version = NewVersion},
+ noreply(update_mirroring(Policy, State1))
+ end;
handle_cast({credit, ChPid, CTag, Credit, Drain},
State = #q{consumers = Consumers,
@@ -1383,3 +1377,54 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
{hibernate, stop_rate_timer(State1)}.
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+
+needs_update_mirroring(Q, Version) ->
+ {ok, UpQ} = rabbit_amqqueue:lookup(Q#amqqueue.name),
+ DBVersion = UpQ#amqqueue.policy_version,
+ case DBVersion > Version of
+ true -> {rabbit_policy:get(<<"ha-mode">>, UpQ), DBVersion};
+ false -> false
+ end.
+
+update_mirroring(Policy, State = #q{backing_queue = BQ}) ->
+ case update_to(Policy, BQ) of
+ start_mirroring ->
+ start_mirroring(State);
+ stop_mirroring ->
+ stop_mirroring(State);
+ ignore ->
+ State;
+ update_ha_mode ->
+ update_ha_mode(State)
+ end.
+
+update_to(undefined, rabbit_mirror_queue_master) ->
+ stop_mirroring;
+update_to(_, rabbit_mirror_queue_master) ->
+ update_ha_mode;
+update_to(undefined, BQ) when BQ =/= rabbit_mirror_queue_master ->
+ ignore;
+update_to(_, BQ) when BQ =/= rabbit_mirror_queue_master ->
+ start_mirroring.
+
+start_mirroring(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ %% lookup again to get policy for init_with_existing_bq
+ {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
+ true = BQ =/= rabbit_mirror_queue_master, %% assertion
+ BQ1 = rabbit_mirror_queue_master,
+ BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
+ State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1}.
+
+stop_mirroring(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQ = rabbit_mirror_queue_master, %% assertion
+ {BQ1, BQS1} = BQ:stop_mirroring(BQS),
+ State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1}.
+
+update_ha_mode(State) ->
+ {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
+ ok = rabbit_mirror_queue_misc:update_mirrors(Q),
+ State.
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 4205fabb83..375a0366dd 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -20,7 +20,7 @@
-export([remove_from_queue/3, on_node_up/0, add_mirrors/3,
report_deaths/4, store_updated_slaves/1,
initial_queue_node/2, suggested_queue_nodes/1,
- is_mirrored/1, update_mirrors/2, validate_policy/1,
+ is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1,
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
sync_batch_size/1, log_info/3, log_warning/3]).
@@ -402,15 +402,12 @@ update_mirrors(OldQ = #amqqueue{pid = QPid},
NewQ = #amqqueue{pid = QPid}) ->
case {is_mirrored(OldQ), is_mirrored(NewQ)} of
{false, false} -> ok;
- {true, false} -> rabbit_amqqueue:stop_mirroring(QPid);
- {false, true} -> rabbit_amqqueue:start_mirroring(QPid);
- {true, true} -> update_mirrors0(OldQ, NewQ)
+ _ -> rabbit_amqqueue:update_mirroring(QPid)
end.
-update_mirrors0(OldQ = #amqqueue{name = QName},
- NewQ = #amqqueue{name = QName}) ->
- {OldMNode, OldSNodes, _} = actual_queue_nodes(OldQ),
- {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ),
+update_mirrors(Q = #amqqueue{name = QName}) ->
+ {OldMNode, OldSNodes, _} = actual_queue_nodes(Q),
+ {NewMNode, NewSNodes} = suggested_queue_nodes(Q),
OldNodes = [OldMNode | OldSNodes],
NewNodes = [NewMNode | NewSNodes],
%% When a mirror dies, remove_from_queue/2 might have to add new
@@ -424,7 +421,7 @@ update_mirrors0(OldQ = #amqqueue{name = QName},
drop_mirrors(QName, OldNodes -- NewNodes),
%% This is for the case where no extra nodes were added but we changed to
%% a policy requiring auto-sync.
- maybe_auto_sync(NewQ),
+ maybe_auto_sync(Q),
ok.
%% The arrival of a newly synced slave may cause the master to die if
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index eb8cf63327..a9caadf972 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -276,7 +276,9 @@ update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
NewPolicy -> case rabbit_amqqueue:update(
QName, fun(Q1) ->
rabbit_queue_decorator:set(
- Q1#amqqueue{policy = NewPolicy})
+ Q1#amqqueue{policy = NewPolicy,
+ policy_version =
+ Q1#amqqueue.policy_version + 1 })
end) of
#amqqueue{} = Q1 -> {Q, Q1};
not_found -> {Q, Q }
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 67c2a84a0e..8609a0e424 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -51,6 +51,7 @@
-rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}).
-rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}).
-rabbit_upgrade({recoverable_slaves, mnesia, [queue_state]}).
+-rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}).
-rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}).
%% -------------------------------------------------------------------
@@ -433,6 +434,24 @@ recoverable_slaves(Table) ->
sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators,
state]).
+policy_version() ->
+ ok = policy_version(rabbit_queue),
+ ok = policy_version(rabbit_durable_queue).
+
+policy_version(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators,
+ State}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators,
+ State, 0}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state,
+ policy_version]).
+
%% Prior to 3.6.0, passwords were hashed using MD5, this populates
%% existing records with said default. Users created with 3.6.0+ will
%% have internal_user.hashing_algorithm populated by the internal
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
index 5872d97d4c..bba7fad707 100644
--- a/test/dynamic_ha_SUITE.erl
+++ b/test/dynamic_ha_SUITE.erl
@@ -31,6 +31,7 @@
%% The first two are change_policy, the last two are change_cluster
-include_lib("common_test/include/ct.hrl").
+-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
@@ -61,6 +62,10 @@ groups() ->
{cluster_size_3, [], [
change_policy,
rapid_change
+ % FIXME: Re-enable those tests when the know issues are
+ % fixed.
+ %failing_random_policies,
+ %random_policy
]}
]}
].
@@ -137,7 +142,7 @@ change_policy(Config) ->
assert_slaves(A, ?QNAME, {A, [C]}, [{A, [B, C]}]),
%% Clear the policy, and we go back to non-mirrored
- rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY),
assert_slaves(A, ?QNAME, {A, ''}),
%% Test switching "away" from an unmirrored node
@@ -206,7 +211,7 @@ rapid_loop(Config, Node, MRef) ->
after 0 ->
rabbit_ct_broker_helpers:set_ha_policy(Config, Node, ?POLICY,
<<"all">>),
- rabbit_ct_broker_helpers:clear_policy(Config, Node, ?POLICY),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, Node, ?POLICY),
rapid_loop(Config, Node, MRef)
end.
@@ -253,6 +258,23 @@ promote_on_shutdown(Config) ->
durable = true}),
ok.
+random_policy(Config) ->
+ run_proper(fun prop_random_policy/1, [Config]).
+
+failing_random_policies(Config) ->
+ [A, B | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+ %% Those set of policies were found as failing by PropEr in the
+ %% `random_policy` test above. We add them explicitely here to make
+ %% sure they get tested.
+ ?assertEqual(true, test_random_policy(Config, Nodes,
+ [{nodes, [A, B]}, {nodes, [A]}])),
+ ?assertEqual(true, test_random_policy(Config, Nodes,
+ [{exactly, 3}, undefined, all, {nodes, [B]}])),
+ ?assertEqual(true, test_random_policy(Config, Nodes,
+ [all, undefined, {exactly, 2}, all, {exactly, 3}, {exactly, 3},
+ undefined, {exactly, 3}, all])).
+
%%----------------------------------------------------------------------------
assert_slaves(RPCNode, QName, Exp) ->
@@ -327,3 +349,131 @@ get_stacktrace() ->
_:e ->
erlang:get_stacktrace()
end.
+
+%%----------------------------------------------------------------------------
+run_proper(Fun, Args) ->
+ ?assertEqual(true,
+ proper:counterexample(erlang:apply(Fun, Args),
+ [{numtests, 25},
+ {on_output, fun(F, A) -> ct:pal(?LOW_IMPORTANCE, F, A) end}])).
+
+prop_random_policy(Config) ->
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(
+ Config, nodename),
+ ?FORALL(
+ Policies, non_empty(list(policy_gen(Nodes))),
+ test_random_policy(Config, Nodes, Policies)).
+
+test_random_policy(Config, Nodes, Policies) ->
+ [NodeA | _] = Nodes,
+ Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA),
+ amqp_channel:call(Ch, #'queue.declare'{queue = ?QNAME}),
+ %% Add some load so mirrors can be busy synchronising
+ rabbit_ct_client_helpers:publish(Ch, ?QNAME, 100000),
+ %% Apply policies in parallel on all nodes
+ apply_in_parallel(Config, Nodes, Policies),
+ %% Give it some time to generate all internal notifications
+ timer:sleep(2000),
+ %% Check the result
+ Result = wait_for_last_policy(?QNAME, NodeA, Policies, 30),
+ %% Cleanup
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?QNAME}),
+ _ = rabbit_ct_broker_helpers:clear_policy(Config, NodeA, ?POLICY),
+ Result.
+
+apply_in_parallel(Config, Nodes, Policies) ->
+ Self = self(),
+ [spawn_link(fun() ->
+ [begin
+ apply_policy(Config, N, Policy)
+ end || Policy <- Policies],
+ Self ! parallel_task_done
+ end) || N <- Nodes],
+ [receive
+ parallel_task_done ->
+ ok
+ end || _ <- Nodes].
+
+%% Proper generators
+policy_gen(Nodes) ->
+ %% Stop mirroring needs to be called often to trigger rabbitmq-server#803
+ frequency([{3, undefined},
+ {1, all},
+ {1, {nodes, nodes_gen(Nodes)}},
+ {1, {exactly, choose(1, 3)}}
+ ]).
+
+nodes_gen(Nodes) ->
+ ?LET(List, non_empty(list(oneof(Nodes))),
+ sets:to_list(sets:from_list(List))).
+
+%% Checks
+wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries) ->
+ %% Ensure the owner/master is able to process a call request,
+ %% which means that all pending casts have been processed.
+ %% Use the information returned by owner/master to verify the
+ %% test result
+ Info = find_queue(QueueName, NodeA),
+ Pid = proplists:get_value(pid, Info),
+ Node = node(Pid),
+ %% Gets owner/master
+ case rpc:call(Node, gen_server, call, [Pid, info], 5000) of
+ {badrpc, _} ->
+ %% The queue is probably being migrated to another node.
+ %% Let's wait a bit longer.
+ timer:sleep(1000),
+ wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries - 1);
+ FinalInfo ->
+ %% The last policy is the final state
+ LastPolicy = lists:last(TestedPolicies),
+ case verify_policy(LastPolicy, FinalInfo) of
+ true ->
+ true;
+ false when Tries =:= 1 ->
+ Policies = rpc:call(Node, rabbit_policy, list, [], 5000),
+ ct:pal(
+ "Last policy not applied:~n"
+ " Queue node: ~s (~p)~n"
+ " Queue info: ~p~n"
+ " Configured policies: ~p~n"
+ " Tested policies: ~p",
+ [Node, Pid, FinalInfo, Policies, TestedPolicies]),
+ false;
+ false ->
+ timer:sleep(1000),
+ wait_for_last_policy(QueueName, NodeA, TestedPolicies,
+ Tries - 1)
+ end
+ end.
+
+verify_policy(undefined, Info) ->
+ %% If the queue is not mirrored, it returns ''
+ '' == proplists:get_value(slave_pids, Info);
+verify_policy(all, Info) ->
+ 2 == length(proplists:get_value(slave_pids, Info));
+verify_policy({exactly, 1}, Info) ->
+ %% If the queue is mirrored, it returns a list
+ [] == proplists:get_value(slave_pids, Info);
+verify_policy({exactly, N}, Info) ->
+ (N - 1) == length(proplists:get_value(slave_pids, Info));
+verify_policy({nodes, Nodes}, Info) ->
+ Master = node(proplists:get_value(pid, Info)),
+ Slaves = [node(P) || P <- proplists:get_value(slave_pids, Info)],
+ lists:sort(Nodes) == lists:sort([Master | Slaves]).
+
+%% Policies
+apply_policy(Config, N, undefined) ->
+ _ = rabbit_ct_broker_helpers:clear_policy(Config, N, ?POLICY);
+apply_policy(Config, N, all) ->
+ rabbit_ct_broker_helpers:set_ha_policy(
+ Config, N, ?POLICY, <<"all">>,
+ [{<<"ha-sync-mode">>, <<"automatic">>}]);
+apply_policy(Config, N, {nodes, Nodes}) ->
+ NNodes = [rabbit_misc:atom_to_binary(Node) || Node <- Nodes],
+ rabbit_ct_broker_helpers:set_ha_policy(
+ Config, N, ?POLICY, {<<"nodes">>, NNodes},
+ [{<<"ha-sync-mode">>, <<"automatic">>}]);
+apply_policy(Config, N, {exactly, Exactly}) ->
+ rabbit_ct_broker_helpers:set_ha_policy(
+ Config, N, ?POLICY, {<<"exactly">>, Exactly},
+ [{<<"ha-sync-mode">>, <<"automatic">>}]).