diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2016-09-12 13:30:23 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2016-09-12 13:30:23 +0300 |
| commit | 60c2a6163fe1fbc360763f24c1456261fa7d22c2 (patch) | |
| tree | d32fb536c2c2e9abc595a02eb2ccc2a033f053a8 | |
| parent | acc39e857742cfb2ebbeb6675fd233913edf9552 (diff) | |
| parent | dc8a921682342e2b44a3dd59a76b08643ff3c0b5 (diff) | |
| download | rabbitmq-server-git-60c2a6163fe1fbc360763f24c1456261fa7d22c2.tar.gz | |
Merge branch 'stable' into rabbitmq-server-950
| -rw-r--r-- | Makefile | 4 | ||||
| -rw-r--r-- | packaging/Makefile | 9 | ||||
| -rw-r--r-- | packaging/RPMS/Fedora/Makefile | 16 | ||||
| -rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.service | 16 | ||||
| -rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 81 | ||||
| -rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.tmpfiles | 1 | ||||
| -rw-r--r-- | rabbitmq-components.mk | 14 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server-ha.ocf | 1 | ||||
| -rw-r--r-- | src/mochinum.erl | 358 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 77 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_policy.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 19 | ||||
| -rw-r--r-- | test/dynamic_ha_SUITE.erl | 154 |
14 files changed, 359 insertions, 410 deletions
@@ -5,6 +5,7 @@ VERSION ?= $(call get_app_version,src/$(PROJECT).app.src) PACKAGES_DIR ?= $(abspath PACKAGES) DEPS = ranch $(PLUGINS) +TEST_DEPS = amqp_client meck proper define usage_xml_to_erl $(subst __,_,$(patsubst $(DOCS_DIR)/rabbitmq%.1.xml, src/rabbit_%_usage.erl, $(subst -,_,$(1)))) @@ -68,9 +69,6 @@ DEPS += $(DISTRIBUTED_DEPS) endif endif -# FIXME: Remove rabbitmq_test as TEST_DEPS from here for now. -TEST_DEPS := amqp_client meck $(filter-out rabbitmq_test,$(TEST_DEPS)) - include erlang.mk # -------------------------------------------------------------------- diff --git a/packaging/Makefile b/packaging/Makefile index 02820bdb23..64a1243b51 100644 --- a/packaging/Makefile +++ b/packaging/Makefile @@ -35,6 +35,7 @@ all: packages .PHONY: packages package-deb \ package-rpm package-rpm-fedora package-rpm-suse \ + package-rpm-rhel6 package-rpm-rhel7 \ package-windows package-standalone-macosx \ package-generic-unix @@ -70,12 +71,18 @@ packages: package-deb package-rpm package-windows package-generic-unix package-deb: $(SOURCE_DIST_FILE) $(gen_verbose) $(MAKE) -C debs/Debian $(VARS) all $(DO_CLEAN) -package-rpm: package-rpm-fedora package-rpm-suse +package-rpm: package-rpm-rhel6 package-rpm-rhel7 package-rpm-suse @: package-rpm-fedora: $(SOURCE_DIST_FILE) $(gen_verbose) $(MAKE) -C RPMS/Fedora $(VARS) all $(DO_CLEAN) +package-rpm-rhel6: $(SOURCE_DIST_FILE) + $(gen_verbose) $(MAKE) -C RPMS/Fedora $(VARS) RPM_OS=rhel6 all $(DO_CLEAN) + +package-rpm-rhel7: $(SOURCE_DIST_FILE) + $(gen_verbose) $(MAKE) -C RPMS/Fedora $(VARS) RPM_OS=rhel7 all $(DO_CLEAN) + package-rpm-suse: $(SOURCE_DIST_FILE) $(gen_verbose) $(MAKE) -C RPMS/Fedora $(VARS) RPM_OS=suse all $(DO_CLEAN) diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index 5763cadf04..11ecaff652 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -5,7 +5,7 @@ ifeq ($(SOURCE_DIST_FILE),) $(error Cannot find source archive; please specify SOURCE_DIST_FILE) endif ifneq ($(words $(SOURCE_DIST_FILE)),1) -$(error Multile source archives found; please specify SOURCE_DIST_FILE) +$(error Multiple source archives found; please specify SOURCE_DIST_FILE) endif VERSION ?= $(patsubst rabbitmq-server-%.tar.xz,%,$(notdir $(SOURCE_DIST_FILE))) @@ -34,7 +34,13 @@ else FUNCTION_LIBRARY=\# Source function library.\n. /etc/init.d/functions REQUIRES=chkconfig initscripts OS_DEFINES=--define '_initrddir /etc/rc.d/init.d' +ifeq "$(RPM_OS)" "rhel6" +SPEC_DEFINES=--define 'group_tag Development/Libraries' --define 'dist .el6' --define 'rhel 6' +else ifeq "$(RPM_OS)" "rhel7" +SPEC_DEFINES=--define 'group_tag Development/Libraries' --define '_unitdir /usr/lib/systemd/system' --define 'dist .el7' --define 'rhel 7' +else SPEC_DEFINES=--define 'group_tag Development/Libraries' +endif START_PROG=daemon endif @@ -53,17 +59,13 @@ prepare: sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|' \ SPECS/rabbitmq-server.spec + cp rabbitmq-server.service SOURCES/rabbitmq-server.service + cp rabbitmq-server.tmpfiles SOURCES/rabbitmq-server.tmpfiles cp rabbitmq-server.init SOURCES/rabbitmq-server.init sed -i \ -e 's|^START_PROG=.*$$|START_PROG="$(START_PROG)"|' \ -e 's|^@FUNCTION_LIBRARY@|$(FUNCTION_LIBRARY)|' \ SOURCES/rabbitmq-server.init -ifeq "$(RPM_OS)" "fedora" -# Fedora says that only vital services should have Default-Start - sed -i -e '/^# Default-Start:/d;/^# Default-Stop:/d' \ - SOURCES/rabbitmq-server.init -endif - cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate server: prepare diff --git a/packaging/RPMS/Fedora/rabbitmq-server.service b/packaging/RPMS/Fedora/rabbitmq-server.service new file mode 100644 index 0000000000..7fe15fe36c --- /dev/null +++ b/packaging/RPMS/Fedora/rabbitmq-server.service @@ -0,0 +1,16 @@ +[Unit] +Description=RabbitMQ broker +After=syslog.target network.target + +[Service] +Type=notify +User=rabbitmq +Group=rabbitmq +WorkingDirectory=/var/lib/rabbitmq +ExecStart=/usr/sbin/rabbitmq-server +ExecStop=/usr/sbin/rabbitmqctl stop +NotifyAccess=all +TimeoutStartSec=3600 + +[Install] +WantedBy=multi-user.target diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 83b9678ec8..9ad05e59fb 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -1,4 +1,5 @@ %define debug_package %{nil} +%define erlang_minver R16B-03 Name: rabbitmq-server Version: %%VERSION%% @@ -8,14 +9,28 @@ Group: %{group_tag} Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{version}.tar.xz Source1: rabbitmq-server.init Source2: rabbitmq-server.logrotate +Source3: rabbitmq-server.service +Source4: rabbitmq-server.tmpfiles URL: http://www.rabbitmq.com/ BuildArch: noarch -BuildRequires: erlang >= R16B-03, python-simplejson, xmlto, libxslt, gzip, sed, zip, rsync -Requires: erlang >= R16B-03, logrotate, socat +BuildRequires: erlang >= %{erlang_minver}, python-simplejson, xmlto, libxslt, gzip, sed, zip, rsync + +%if 0%{?fedora} || 0%{?rhel} >= 7 +BuildRequires: systemd +%endif + +Requires: erlang >= %{erlang_minver}, logrotate, socat BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root Summary: The RabbitMQ server + +%if 0%{?fedora} || 0%{?rhel} >= 7 +Requires(pre): systemd +Requires(post): systemd +Requires(preun): systemd +%else Requires(post): %%REQUIRES%% Requires(pre): %%REQUIRES%% +%endif %description RabbitMQ is an open source multi-protocol messaging broker. @@ -47,7 +62,13 @@ mkdir -p %{buildroot}%{_localstatedir}/lib/rabbitmq/mnesia mkdir -p %{buildroot}%{_localstatedir}/log/rabbitmq #Copy all necessary lib files etc. + +%if 0%{?fedora} || 0%{?rhel} >= 7 +install -p -D -m 0644 %{S:3} %{buildroot}%{_unitdir}/%{name}.service +%else install -p -D -m 0755 %{S:1} %{buildroot}%{_initrddir}/rabbitmq-server +%endif + install -p -D -m 0755 %{_rabbit_server_ocf} %{buildroot}%{_exec_prefix}/lib/ocf/resource.d/rabbitmq/rabbitmq-server install -p -D -m 0755 %{_rabbit_server_ha_ocf} %{buildroot}%{_exec_prefix}/lib/ocf/resource.d/rabbitmq/rabbitmq-server-ha install -p -D -m 0644 %{S:2} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server @@ -65,6 +86,10 @@ for script in rabbitmq-server rabbitmq-plugins; do \ %{buildroot}%{_sbindir}/$script; \ done +%if 0%{?fedora} > 14 || 0%{?rhel} >= 7 +install -D -p -m 0644 %{SOURCE4} %{buildroot}%{_prefix}/lib/tmpfiles.d/%{name}.conf +%endif + rm %{_maindir}/LICENSE* %{_maindir}/INSTALL #Build the list of files @@ -74,7 +99,8 @@ find %{buildroot} -path %{buildroot}%{_sysconfdir} -prune -o '!' -type d -printf %pre if [ $1 -gt 1 ]; then - # Upgrade - stop previous instance of rabbitmq-server init.d script + # Upgrade - stop previous instance of rabbitmq-server init.d (this + # will also activate systemd if it was used) script. /sbin/service rabbitmq-server stop fi @@ -90,7 +116,19 @@ if ! getent passwd rabbitmq >/dev/null; then fi %post + +%if 0%{?fedora} || 0%{?rhel} >= 7 +# %%systemd_post %%{name}.service +# manual expansion of systemd_post as this doesn't appear to +# expand correctly on debian machines +if [ $1 -eq 1 ] ; then + # Initial installation + systemctl preset %{name}.service >/dev/null 2>&1 || : +fi +/bin/systemctl daemon-reload +%else /sbin/chkconfig --add %{name} +%endif if [ -f %{_sysconfdir}/rabbitmq/rabbitmq.conf ] && [ ! -f %{_sysconfdir}/rabbitmq/rabbitmq-env.conf ]; then mv %{_sysconfdir}/rabbitmq/rabbitmq.conf %{_sysconfdir}/rabbitmq/rabbitmq-env.conf fi @@ -99,8 +137,12 @@ chmod -R o-rwx,g-w %{_localstatedir}/lib/rabbitmq/mnesia %preun if [ $1 = 0 ]; then #Complete uninstall +%if 0%{?fedora} || 0%{?rhel} >= 7 + systemctl stop rabbitmq-server +%else /sbin/service rabbitmq-server stop /sbin/chkconfig --del rabbitmq-server +%endif # We do not remove /var/log and /var/lib directories # Leave rabbitmq user and group @@ -112,13 +154,46 @@ for ext in rel script boot ; do rm -f %{_rabbit_erllibdir}/ebin/rabbit.$ext done +%postun +%if 0%{?fedora} || 0%{?rhel} >= 7 +# %%systemd_postun_with_restart %%{name}.service +# manual expansion of systemd_postun_with_restart as this doesn't appear to +# expand correctly on debian machines +if [ $1 -ge 1 ] ; then + # Package upgrade, not uninstall + systemctl try-restart %{name}.service >/dev/null 2>&1 || : +fi +%else +if [ $1 -gt 1 ]; then + /sbin/service %{name} try-restart +fi +%endif + +%if 0%{?fedora} > 17 || 0%{?rhel} >= 7 +# For prior versions older than this, do a conversion +# from sysv to systemd +%triggerun -- %{name} < 3.6.5 +# Save the current service runlevel info +# User must manually run systemd-sysv-convert --apply opensips +# to migrate them to systemd targets +/usr/bin/systemd-sysv-convert --save %{name} >/dev/null 2>&1 ||: + +# Run these because the SysV package being removed won't do them +/sbin/chkconfig --del %{name} >/dev/null 2>&1 || : +/bin/systemctl try-restart %{name}.service >/dev/null 2>&1 || : +%endif + %files -f ../%{name}.files %defattr(-,root,root,-) %attr(0755, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq %attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq/mnesia %attr(0755, rabbitmq, rabbitmq) %dir %{_localstatedir}/log/rabbitmq %dir %{_sysconfdir}/rabbitmq + +%if 0%{?rhel} < 7 %{_initrddir}/rabbitmq-server +%endif + %config(noreplace) %{_sysconfdir}/logrotate.d/rabbitmq-server %doc LICENSE* %doc README diff --git a/packaging/RPMS/Fedora/rabbitmq-server.tmpfiles b/packaging/RPMS/Fedora/rabbitmq-server.tmpfiles new file mode 100644 index 0000000000..c2681827e0 --- /dev/null +++ b/packaging/RPMS/Fedora/rabbitmq-server.tmpfiles @@ -0,0 +1 @@ +D /var/run/rabbitmq 0755 rabbitmq rabbitmq - diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index eb9e9e3e03..e3417856f8 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -123,7 +123,6 @@ RABBITMQ_COMPONENTS = amqp_client \ rabbitmq_shovel \ rabbitmq_shovel_management \ rabbitmq_stomp \ - rabbitmq_test \ rabbitmq_toke \ rabbitmq_top \ rabbitmq_tracing \ @@ -262,10 +261,13 @@ prepare-dist:: ifneq ($(PROJECT),rabbit) ifeq ($(filter rabbit,$(DEPS) $(BUILD_DEPS)),) RUN_RMQ_TARGETS = run-broker \ + run-tls-broker \ run-background-broker \ run-node \ run-background-node \ - start-background-node + start-background-node \ + start-background-broker \ + start-rabbit-on-node ifneq ($(filter $(RUN_RMQ_TARGETS),$(MAKECMDGOALS)),) BUILD_DEPS += rabbit @@ -273,18 +275,12 @@ endif endif ifeq ($(filter rabbit,$(DEPS) $(BUILD_DEPS) $(TEST_DEPS)),) -ifneq ($(filter check tests tests-with-broker test,$(MAKECMDGOALS)),) +ifneq ($(filter check tests,$(MAKECMDGOALS)),) TEST_DEPS += rabbit endif endif endif -ifeq ($(filter rabbit_public_umbrella amqp_client rabbit_common rabbitmq_test,$(PROJECT)),) -ifeq ($(filter rabbitmq_test,$(DEPS) $(BUILD_DEPS) $(TEST_DEPS)),) -TEST_DEPS += rabbitmq_test -endif -endif - # -------------------------------------------------------------------- # rabbitmq-components.mk checks. # -------------------------------------------------------------------- diff --git a/scripts/rabbitmq-server-ha.ocf b/scripts/rabbitmq-server-ha.ocf index c74525c936..63e5f700d9 100755 --- a/scripts/rabbitmq-server-ha.ocf +++ b/scripts/rabbitmq-server-ha.ocf @@ -332,7 +332,6 @@ $EXTENDED_OCF_PARAMS <action name="status" timeout="20" /> <action name="monitor" depth="0" timeout="30" interval="5" /> <action name="monitor" depth="0" timeout="30" interval="3" role="Master"/> -<action name="monitor" depth="30" timeout="60" interval="103" /> <action name="promote" timeout="30" /> <action name="demote" timeout="30" /> <action name="notify" timeout="20" /> diff --git a/src/mochinum.erl b/src/mochinum.erl deleted file mode 100644 index 4ea7a22acf..0000000000 --- a/src/mochinum.erl +++ /dev/null @@ -1,358 +0,0 @@ -%% This file is a copy of `mochijson2.erl' from mochiweb, revision -%% d541e9a0f36c00dcadc2e589f20e47fbf46fc76f. For the license, see -%% `LICENSE-MIT-Mochi'. - -%% @copyright 2007 Mochi Media, Inc. -%% @author Bob Ippolito <bob@mochimedia.com> - -%% @doc Useful numeric algorithms for floats that cover some deficiencies -%% in the math module. More interesting is digits/1, which implements -%% the algorithm from: -%% http://www.cs.indiana.edu/~burger/fp/index.html -%% See also "Printing Floating-Point Numbers Quickly and Accurately" -%% in Proceedings of the SIGPLAN '96 Conference on Programming Language -%% Design and Implementation. - --module(mochinum). --author("Bob Ippolito <bob@mochimedia.com>"). --export([digits/1, frexp/1, int_pow/2, int_ceil/1]). - -%% IEEE 754 Float exponent bias --define(FLOAT_BIAS, 1022). --define(MIN_EXP, -1074). --define(BIG_POW, 4503599627370496). - -%% External API - -%% @spec digits(number()) -> string() -%% @doc Returns a string that accurately represents the given integer or float -%% using a conservative amount of digits. Great for generating -%% human-readable output, or compact ASCII serializations for floats. -digits(N) when is_integer(N) -> - integer_to_list(N); -digits(0.0) -> - "0.0"; -digits(Float) -> - {Frac1, Exp1} = frexp_int(Float), - [Place0 | Digits0] = digits1(Float, Exp1, Frac1), - {Place, Digits} = transform_digits(Place0, Digits0), - R = insert_decimal(Place, Digits), - case Float < 0 of - true -> - [$- | R]; - _ -> - R - end. - -%% @spec frexp(F::float()) -> {Frac::float(), Exp::float()} -%% @doc Return the fractional and exponent part of an IEEE 754 double, -%% equivalent to the libc function of the same name. -%% F = Frac * pow(2, Exp). -frexp(F) -> - frexp1(unpack(F)). - -%% @spec int_pow(X::integer(), N::integer()) -> Y::integer() -%% @doc Moderately efficient way to exponentiate integers. -%% int_pow(10, 2) = 100. -int_pow(_X, 0) -> - 1; -int_pow(X, N) when N > 0 -> - int_pow(X, N, 1). - -%% @spec int_ceil(F::float()) -> integer() -%% @doc Return the ceiling of F as an integer. The ceiling is defined as -%% F when F == trunc(F); -%% trunc(F) when F < 0; -%% trunc(F) + 1 when F > 0. -int_ceil(X) -> - T = trunc(X), - case (X - T) of - Pos when Pos > 0 -> T + 1; - _ -> T - end. - - -%% Internal API - -int_pow(X, N, R) when N < 2 -> - R * X; -int_pow(X, N, R) -> - int_pow(X * X, N bsr 1, case N band 1 of 1 -> R * X; 0 -> R end). - -insert_decimal(0, S) -> - "0." ++ S; -insert_decimal(Place, S) when Place > 0 -> - L = length(S), - case Place - L of - 0 -> - S ++ ".0"; - N when N < 0 -> - {S0, S1} = lists:split(L + N, S), - S0 ++ "." ++ S1; - N when N < 6 -> - %% More places than digits - S ++ lists:duplicate(N, $0) ++ ".0"; - _ -> - insert_decimal_exp(Place, S) - end; -insert_decimal(Place, S) when Place > -6 -> - "0." ++ lists:duplicate(abs(Place), $0) ++ S; -insert_decimal(Place, S) -> - insert_decimal_exp(Place, S). - -insert_decimal_exp(Place, S) -> - [C | S0] = S, - S1 = case S0 of - [] -> - "0"; - _ -> - S0 - end, - Exp = case Place < 0 of - true -> - "e-"; - false -> - "e+" - end, - [C] ++ "." ++ S1 ++ Exp ++ integer_to_list(abs(Place - 1)). - - -digits1(Float, Exp, Frac) -> - Round = ((Frac band 1) =:= 0), - case Exp >= 0 of - true -> - BExp = 1 bsl Exp, - case (Frac =/= ?BIG_POW) of - true -> - scale((Frac * BExp * 2), 2, BExp, BExp, - Round, Round, Float); - false -> - scale((Frac * BExp * 4), 4, (BExp * 2), BExp, - Round, Round, Float) - end; - false -> - case (Exp =:= ?MIN_EXP) orelse (Frac =/= ?BIG_POW) of - true -> - scale((Frac * 2), 1 bsl (1 - Exp), 1, 1, - Round, Round, Float); - false -> - scale((Frac * 4), 1 bsl (2 - Exp), 2, 1, - Round, Round, Float) - end - end. - -scale(R, S, MPlus, MMinus, LowOk, HighOk, Float) -> - Est = int_ceil(math:log10(abs(Float)) - 1.0e-10), - %% Note that the scheme implementation uses a 326 element look-up table - %% for int_pow(10, N) where we do not. - case Est >= 0 of - true -> - fixup(R, S * int_pow(10, Est), MPlus, MMinus, Est, - LowOk, HighOk); - false -> - Scale = int_pow(10, -Est), - fixup(R * Scale, S, MPlus * Scale, MMinus * Scale, Est, - LowOk, HighOk) - end. - -fixup(R, S, MPlus, MMinus, K, LowOk, HighOk) -> - TooLow = case HighOk of - true -> - (R + MPlus) >= S; - false -> - (R + MPlus) > S - end, - case TooLow of - true -> - [(K + 1) | generate(R, S, MPlus, MMinus, LowOk, HighOk)]; - false -> - [K | generate(R * 10, S, MPlus * 10, MMinus * 10, LowOk, HighOk)] - end. - -generate(R0, S, MPlus, MMinus, LowOk, HighOk) -> - D = R0 div S, - R = R0 rem S, - TC1 = case LowOk of - true -> - R =< MMinus; - false -> - R < MMinus - end, - TC2 = case HighOk of - true -> - (R + MPlus) >= S; - false -> - (R + MPlus) > S - end, - case TC1 of - false -> - case TC2 of - false -> - [D | generate(R * 10, S, MPlus * 10, MMinus * 10, - LowOk, HighOk)]; - true -> - [D + 1] - end; - true -> - case TC2 of - false -> - [D]; - true -> - case R * 2 < S of - true -> - [D]; - false -> - [D + 1] - end - end - end. - -unpack(Float) -> - <<Sign:1, Exp:11, Frac:52>> = <<Float:64/float>>, - {Sign, Exp, Frac}. - -frexp1({_Sign, 0, 0}) -> - {0.0, 0}; -frexp1({Sign, 0, Frac}) -> - Exp = log2floor(Frac), - <<Frac1:64/float>> = <<Sign:1, ?FLOAT_BIAS:11, (Frac-1):52>>, - {Frac1, -(?FLOAT_BIAS) - 52 + Exp}; -frexp1({Sign, Exp, Frac}) -> - <<Frac1:64/float>> = <<Sign:1, ?FLOAT_BIAS:11, Frac:52>>, - {Frac1, Exp - ?FLOAT_BIAS}. - -log2floor(Int) -> - log2floor(Int, 0). - -log2floor(0, N) -> - N; -log2floor(Int, N) -> - log2floor(Int bsr 1, 1 + N). - - -transform_digits(Place, [0 | Rest]) -> - transform_digits(Place, Rest); -transform_digits(Place, Digits) -> - {Place, [$0 + D || D <- Digits]}. - - -frexp_int(F) -> - case unpack(F) of - {_Sign, 0, Frac} -> - {Frac, ?MIN_EXP}; - {_Sign, Exp, Frac} -> - {Frac + (1 bsl 52), Exp - 53 - ?FLOAT_BIAS} - end. - -%% -%% Tests -%% --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). - -int_ceil_test() -> - ?assertEqual(1, int_ceil(0.0001)), - ?assertEqual(0, int_ceil(0.0)), - ?assertEqual(1, int_ceil(0.99)), - ?assertEqual(1, int_ceil(1.0)), - ?assertEqual(-1, int_ceil(-1.5)), - ?assertEqual(-2, int_ceil(-2.0)), - ok. - -int_pow_test() -> - ?assertEqual(1, int_pow(1, 1)), - ?assertEqual(1, int_pow(1, 0)), - ?assertEqual(1, int_pow(10, 0)), - ?assertEqual(10, int_pow(10, 1)), - ?assertEqual(100, int_pow(10, 2)), - ?assertEqual(1000, int_pow(10, 3)), - ok. - -digits_test() -> - ?assertEqual("0", - digits(0)), - ?assertEqual("0.0", - digits(0.0)), - ?assertEqual("1.0", - digits(1.0)), - ?assertEqual("-1.0", - digits(-1.0)), - ?assertEqual("0.1", - digits(0.1)), - ?assertEqual("0.01", - digits(0.01)), - ?assertEqual("0.001", - digits(0.001)), - ?assertEqual("1.0e+6", - digits(1000000.0)), - ?assertEqual("0.5", - digits(0.5)), - ?assertEqual("4503599627370496.0", - digits(4503599627370496.0)), - %% small denormalized number - %% 4.94065645841246544177e-324 =:= 5.0e-324 - <<SmallDenorm/float>> = <<0,0,0,0,0,0,0,1>>, - ?assertEqual("5.0e-324", - digits(SmallDenorm)), - ?assertEqual(SmallDenorm, - list_to_float(digits(SmallDenorm))), - %% large denormalized number - %% 2.22507385850720088902e-308 - <<BigDenorm/float>> = <<0,15,255,255,255,255,255,255>>, - ?assertEqual("2.225073858507201e-308", - digits(BigDenorm)), - ?assertEqual(BigDenorm, - list_to_float(digits(BigDenorm))), - %% small normalized number - %% 2.22507385850720138309e-308 - <<SmallNorm/float>> = <<0,16,0,0,0,0,0,0>>, - ?assertEqual("2.2250738585072014e-308", - digits(SmallNorm)), - ?assertEqual(SmallNorm, - list_to_float(digits(SmallNorm))), - %% large normalized number - %% 1.79769313486231570815e+308 - <<LargeNorm/float>> = <<127,239,255,255,255,255,255,255>>, - ?assertEqual("1.7976931348623157e+308", - digits(LargeNorm)), - ?assertEqual(LargeNorm, - list_to_float(digits(LargeNorm))), - %% issue #10 - mochinum:frexp(math:pow(2, -1074)). - ?assertEqual("5.0e-324", - digits(math:pow(2, -1074))), - ok. - -frexp_test() -> - %% zero - ?assertEqual({0.0, 0}, frexp(0.0)), - %% one - ?assertEqual({0.5, 1}, frexp(1.0)), - %% negative one - ?assertEqual({-0.5, 1}, frexp(-1.0)), - %% small denormalized number - %% 4.94065645841246544177e-324 - <<SmallDenorm/float>> = <<0,0,0,0,0,0,0,1>>, - ?assertEqual({0.5, -1073}, frexp(SmallDenorm)), - %% large denormalized number - %% 2.22507385850720088902e-308 - <<BigDenorm/float>> = <<0,15,255,255,255,255,255,255>>, - ?assertEqual( - {0.99999999999999978, -1022}, - frexp(BigDenorm)), - %% small normalized number - %% 2.22507385850720138309e-308 - <<SmallNorm/float>> = <<0,16,0,0,0,0,0,0>>, - ?assertEqual({0.5, -1021}, frexp(SmallNorm)), - %% large normalized number - %% 1.79769313486231570815e+308 - <<LargeNorm/float>> = <<127,239,255,255,255,255,255,255>>, - ?assertEqual( - {0.99999999999999989, 1024}, - frexp(LargeNorm)), - %% issue #10 - mochinum:frexp(math:pow(2, -1074)). - ?assertEqual( - {0.5, -1073}, - frexp(math:pow(2, -1074))), - ok. - --endif. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 66df42987c..29f8e53f08 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -85,6 +85,7 @@ %% e.g. message expiration messages from previously set up timers %% that may or may not be still valid args_policy_version, + mirroring_policy_version = 0, %% running | flow | idle status }). @@ -1225,22 +1226,15 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); -handle_cast(start_mirroring, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - %% lookup again to get policy for init_with_existing_bq - {ok, Q} = rabbit_amqqueue:lookup(qname(State)), - true = BQ =/= rabbit_mirror_queue_master, %% assertion - BQ1 = rabbit_mirror_queue_master, - BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), - noreply(State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); - -handle_cast(stop_mirroring, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - BQ = rabbit_mirror_queue_master, %% assertion - {BQ1, BQS1} = BQ:stop_mirroring(BQS), - noreply(State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); +handle_cast(update_mirroring, State = #q{q = Q, + mirroring_policy_version = Version}) -> + case needs_update_mirroring(Q, Version) of + false -> + noreply(State); + {Policy, NewVersion} -> + State1 = State#q{mirroring_policy_version = NewVersion}, + noreply(update_mirroring(Policy, State1)) + end; handle_cast({credit, ChPid, CTag, Credit, Drain}, State = #q{consumers = Consumers, @@ -1383,3 +1377,54 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, {hibernate, stop_rate_timer(State1)}. format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). + +needs_update_mirroring(Q, Version) -> + {ok, UpQ} = rabbit_amqqueue:lookup(Q#amqqueue.name), + DBVersion = UpQ#amqqueue.policy_version, + case DBVersion > Version of + true -> {rabbit_policy:get(<<"ha-mode">>, UpQ), DBVersion}; + false -> false + end. + +update_mirroring(Policy, State = #q{backing_queue = BQ}) -> + case update_to(Policy, BQ) of + start_mirroring -> + start_mirroring(State); + stop_mirroring -> + stop_mirroring(State); + ignore -> + State; + update_ha_mode -> + update_ha_mode(State) + end. + +update_to(undefined, rabbit_mirror_queue_master) -> + stop_mirroring; +update_to(_, rabbit_mirror_queue_master) -> + update_ha_mode; +update_to(undefined, BQ) when BQ =/= rabbit_mirror_queue_master -> + ignore; +update_to(_, BQ) when BQ =/= rabbit_mirror_queue_master -> + start_mirroring. + +start_mirroring(State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + %% lookup again to get policy for init_with_existing_bq + {ok, Q} = rabbit_amqqueue:lookup(qname(State)), + true = BQ =/= rabbit_mirror_queue_master, %% assertion + BQ1 = rabbit_mirror_queue_master, + BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), + State#q{backing_queue = BQ1, + backing_queue_state = BQS1}. + +stop_mirroring(State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQ = rabbit_mirror_queue_master, %% assertion + {BQ1, BQS1} = BQ:stop_mirroring(BQS), + State#q{backing_queue = BQ1, + backing_queue_state = BQS1}. + +update_ha_mode(State) -> + {ok, Q} = rabbit_amqqueue:lookup(qname(State)), + ok = rabbit_mirror_queue_misc:update_mirrors(Q), + State. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 4205fabb83..375a0366dd 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -20,7 +20,7 @@ -export([remove_from_queue/3, on_node_up/0, add_mirrors/3, report_deaths/4, store_updated_slaves/1, initial_queue_node/2, suggested_queue_nodes/1, - is_mirrored/1, update_mirrors/2, validate_policy/1, + is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1, maybe_auto_sync/1, maybe_drop_master_after_sync/1, sync_batch_size/1, log_info/3, log_warning/3]). @@ -402,15 +402,12 @@ update_mirrors(OldQ = #amqqueue{pid = QPid}, NewQ = #amqqueue{pid = QPid}) -> case {is_mirrored(OldQ), is_mirrored(NewQ)} of {false, false} -> ok; - {true, false} -> rabbit_amqqueue:stop_mirroring(QPid); - {false, true} -> rabbit_amqqueue:start_mirroring(QPid); - {true, true} -> update_mirrors0(OldQ, NewQ) + _ -> rabbit_amqqueue:update_mirroring(QPid) end. -update_mirrors0(OldQ = #amqqueue{name = QName}, - NewQ = #amqqueue{name = QName}) -> - {OldMNode, OldSNodes, _} = actual_queue_nodes(OldQ), - {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ), +update_mirrors(Q = #amqqueue{name = QName}) -> + {OldMNode, OldSNodes, _} = actual_queue_nodes(Q), + {NewMNode, NewSNodes} = suggested_queue_nodes(Q), OldNodes = [OldMNode | OldSNodes], NewNodes = [NewMNode | NewSNodes], %% When a mirror dies, remove_from_queue/2 might have to add new @@ -424,7 +421,7 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, drop_mirrors(QName, OldNodes -- NewNodes), %% This is for the case where no extra nodes were added but we changed to %% a policy requiring auto-sync. - maybe_auto_sync(NewQ), + maybe_auto_sync(Q), ok. %% The arrival of a newly synced slave may cause the master to die if diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index eb8cf63327..a9caadf972 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -276,7 +276,9 @@ update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) -> NewPolicy -> case rabbit_amqqueue:update( QName, fun(Q1) -> rabbit_queue_decorator:set( - Q1#amqqueue{policy = NewPolicy}) + Q1#amqqueue{policy = NewPolicy, + policy_version = + Q1#amqqueue.policy_version + 1 }) end) of #amqqueue{} = Q1 -> {Q, Q1}; not_found -> {Q, Q } diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 67c2a84a0e..8609a0e424 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -51,6 +51,7 @@ -rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}). -rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}). -rabbit_upgrade({recoverable_slaves, mnesia, [queue_state]}). +-rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}). -rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}). %% ------------------------------------------------------------------- @@ -433,6 +434,24 @@ recoverable_slaves(Table) -> sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state]). +policy_version() -> + ok = policy_version(rabbit_queue), + ok = policy_version(rabbit_durable_queue). + +policy_version(Table) -> + transform( + Table, + fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators, + State}) -> + {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators, + State, 0} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state, + policy_version]). + %% Prior to 3.6.0, passwords were hashed using MD5, this populates %% existing records with said default. Users created with 3.6.0+ will %% have internal_user.hashing_algorithm populated by the internal diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index 5872d97d4c..bba7fad707 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -31,6 +31,7 @@ %% The first two are change_policy, the last two are change_cluster -include_lib("common_test/include/ct.hrl"). +-include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). @@ -61,6 +62,10 @@ groups() -> {cluster_size_3, [], [ change_policy, rapid_change + % FIXME: Re-enable those tests when the know issues are + % fixed. + %failing_random_policies, + %random_policy ]} ]} ]. @@ -137,7 +142,7 @@ change_policy(Config) -> assert_slaves(A, ?QNAME, {A, [C]}, [{A, [B, C]}]), %% Clear the policy, and we go back to non-mirrored - rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY), + ok = rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY), assert_slaves(A, ?QNAME, {A, ''}), %% Test switching "away" from an unmirrored node @@ -206,7 +211,7 @@ rapid_loop(Config, Node, MRef) -> after 0 -> rabbit_ct_broker_helpers:set_ha_policy(Config, Node, ?POLICY, <<"all">>), - rabbit_ct_broker_helpers:clear_policy(Config, Node, ?POLICY), + ok = rabbit_ct_broker_helpers:clear_policy(Config, Node, ?POLICY), rapid_loop(Config, Node, MRef) end. @@ -253,6 +258,23 @@ promote_on_shutdown(Config) -> durable = true}), ok. +random_policy(Config) -> + run_proper(fun prop_random_policy/1, [Config]). + +failing_random_policies(Config) -> + [A, B | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, + nodename), + %% Those set of policies were found as failing by PropEr in the + %% `random_policy` test above. We add them explicitely here to make + %% sure they get tested. + ?assertEqual(true, test_random_policy(Config, Nodes, + [{nodes, [A, B]}, {nodes, [A]}])), + ?assertEqual(true, test_random_policy(Config, Nodes, + [{exactly, 3}, undefined, all, {nodes, [B]}])), + ?assertEqual(true, test_random_policy(Config, Nodes, + [all, undefined, {exactly, 2}, all, {exactly, 3}, {exactly, 3}, + undefined, {exactly, 3}, all])). + %%---------------------------------------------------------------------------- assert_slaves(RPCNode, QName, Exp) -> @@ -327,3 +349,131 @@ get_stacktrace() -> _:e -> erlang:get_stacktrace() end. + +%%---------------------------------------------------------------------------- +run_proper(Fun, Args) -> + ?assertEqual(true, + proper:counterexample(erlang:apply(Fun, Args), + [{numtests, 25}, + {on_output, fun(F, A) -> ct:pal(?LOW_IMPORTANCE, F, A) end}])). + +prop_random_policy(Config) -> + Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + ?FORALL( + Policies, non_empty(list(policy_gen(Nodes))), + test_random_policy(Config, Nodes, Policies)). + +test_random_policy(Config, Nodes, Policies) -> + [NodeA | _] = Nodes, + Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA), + amqp_channel:call(Ch, #'queue.declare'{queue = ?QNAME}), + %% Add some load so mirrors can be busy synchronising + rabbit_ct_client_helpers:publish(Ch, ?QNAME, 100000), + %% Apply policies in parallel on all nodes + apply_in_parallel(Config, Nodes, Policies), + %% Give it some time to generate all internal notifications + timer:sleep(2000), + %% Check the result + Result = wait_for_last_policy(?QNAME, NodeA, Policies, 30), + %% Cleanup + amqp_channel:call(Ch, #'queue.delete'{queue = ?QNAME}), + _ = rabbit_ct_broker_helpers:clear_policy(Config, NodeA, ?POLICY), + Result. + +apply_in_parallel(Config, Nodes, Policies) -> + Self = self(), + [spawn_link(fun() -> + [begin + apply_policy(Config, N, Policy) + end || Policy <- Policies], + Self ! parallel_task_done + end) || N <- Nodes], + [receive + parallel_task_done -> + ok + end || _ <- Nodes]. + +%% Proper generators +policy_gen(Nodes) -> + %% Stop mirroring needs to be called often to trigger rabbitmq-server#803 + frequency([{3, undefined}, + {1, all}, + {1, {nodes, nodes_gen(Nodes)}}, + {1, {exactly, choose(1, 3)}} + ]). + +nodes_gen(Nodes) -> + ?LET(List, non_empty(list(oneof(Nodes))), + sets:to_list(sets:from_list(List))). + +%% Checks +wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries) -> + %% Ensure the owner/master is able to process a call request, + %% which means that all pending casts have been processed. + %% Use the information returned by owner/master to verify the + %% test result + Info = find_queue(QueueName, NodeA), + Pid = proplists:get_value(pid, Info), + Node = node(Pid), + %% Gets owner/master + case rpc:call(Node, gen_server, call, [Pid, info], 5000) of + {badrpc, _} -> + %% The queue is probably being migrated to another node. + %% Let's wait a bit longer. + timer:sleep(1000), + wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries - 1); + FinalInfo -> + %% The last policy is the final state + LastPolicy = lists:last(TestedPolicies), + case verify_policy(LastPolicy, FinalInfo) of + true -> + true; + false when Tries =:= 1 -> + Policies = rpc:call(Node, rabbit_policy, list, [], 5000), + ct:pal( + "Last policy not applied:~n" + " Queue node: ~s (~p)~n" + " Queue info: ~p~n" + " Configured policies: ~p~n" + " Tested policies: ~p", + [Node, Pid, FinalInfo, Policies, TestedPolicies]), + false; + false -> + timer:sleep(1000), + wait_for_last_policy(QueueName, NodeA, TestedPolicies, + Tries - 1) + end + end. + +verify_policy(undefined, Info) -> + %% If the queue is not mirrored, it returns '' + '' == proplists:get_value(slave_pids, Info); +verify_policy(all, Info) -> + 2 == length(proplists:get_value(slave_pids, Info)); +verify_policy({exactly, 1}, Info) -> + %% If the queue is mirrored, it returns a list + [] == proplists:get_value(slave_pids, Info); +verify_policy({exactly, N}, Info) -> + (N - 1) == length(proplists:get_value(slave_pids, Info)); +verify_policy({nodes, Nodes}, Info) -> + Master = node(proplists:get_value(pid, Info)), + Slaves = [node(P) || P <- proplists:get_value(slave_pids, Info)], + lists:sort(Nodes) == lists:sort([Master | Slaves]). + +%% Policies +apply_policy(Config, N, undefined) -> + _ = rabbit_ct_broker_helpers:clear_policy(Config, N, ?POLICY); +apply_policy(Config, N, all) -> + rabbit_ct_broker_helpers:set_ha_policy( + Config, N, ?POLICY, <<"all">>, + [{<<"ha-sync-mode">>, <<"automatic">>}]); +apply_policy(Config, N, {nodes, Nodes}) -> + NNodes = [rabbit_misc:atom_to_binary(Node) || Node <- Nodes], + rabbit_ct_broker_helpers:set_ha_policy( + Config, N, ?POLICY, {<<"nodes">>, NNodes}, + [{<<"ha-sync-mode">>, <<"automatic">>}]); +apply_policy(Config, N, {exactly, Exactly}) -> + rabbit_ct_broker_helpers:set_ha_policy( + Config, N, ?POLICY, {<<"exactly">>, Exactly}, + [{<<"ha-sync-mode">>, <<"automatic">>}]). |
