summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2016-11-10 19:45:20 +0300
committerMichael Klishin <mklishin@pivotal.io>2016-11-10 19:45:20 +0300
commit4837af6cd04ccb87b5e2399495eff468fa99c1a3 (patch)
tree6ce7d0f7fb9dedd25a400d5a016f1ccb3b0d36ac
parent008c4eb7f0a9b14838439db934eb98dbe7801dbb (diff)
parent5c6d8e523349bb7df418cfff52b99f173a7cc4c1 (diff)
downloadrabbitmq-server-git-4837af6cd04ccb87b5e2399495eff468fa99c1a3.tar.gz
Merge branch 'master' into rabbitmq-server-567
-rw-r--r--.gitignore2
-rw-r--r--LICENSE2
-rw-r--r--Makefile15
-rw-r--r--README.md15
-rw-r--r--docs/rabbitmq.conf.example7
-rw-r--r--docs/rabbitmq.config.example7
-rw-r--r--priv/schema/rabbitmq.schema12
-rw-r--r--rabbitmq.conf.d/rabbitmq.conf7
-rwxr-xr-xscripts/rabbitmq-diagnostics7
-rw-r--r--scripts/rabbitmq-diagnostics.bat61
-rwxr-xr-xscripts/rabbitmq-env6
-rwxr-xr-xscripts/rabbitmq-plugins39
-rw-r--r--scripts/rabbitmq-plugins.bat21
-rwxr-xr-xscripts/rabbitmqctl42
-rw-r--r--scripts/rabbitmqctl.bat21
-rw-r--r--src/gm.erl19
-rw-r--r--src/rabbit.app.src5
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl8
-rw-r--r--src/rabbit_diagnostics.erl1
-rw-r--r--src/rabbit_mirror_queue_slave.erl7
-rw-r--r--src/rabbit_mnesia.erl30
-rw-r--r--src/rabbit_mnesia_rename.erl2
-rw-r--r--src/rabbit_policy.erl15
-rw-r--r--src/rabbit_runtime_parameters.erl15
-rw-r--r--src/rabbit_table.erl65
-rw-r--r--src/rabbit_variable_queue.erl111
-rw-r--r--src/rabbit_vhost_limit.erl7
-rw-r--r--src/rabbit_vm.erl3
-rw-r--r--test/clustering_management_SUITE.erl5
30 files changed, 335 insertions, 228 deletions
diff --git a/.gitignore b/.gitignore
index 7af1ad4260..f7a1da4397 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,7 +16,7 @@
/test/ct.cover.spec
/test/config_schema_SUITE_data/schema/**
/xrefr
-
+/escript
rabbit.d
# Generated sources files.
diff --git a/LICENSE b/LICENSE
index 9feeceac32..86b189fe04 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,7 +1,7 @@
This package, the RabbitMQ server is licensed under the MPL. For the
MPL, please see LICENSE-MPL-RabbitMQ.
-The files `mochijson2.erl' and `mochinum.erl' are (c) 2007 Mochi Media, Inc and
+The files `mochinum.erl' and `mochiweb_util.erl` are (c) 2007 Mochi Media, Inc and
licensed under a MIT license, see LICENSE-MIT-Mochi.
If you have any questions regarding licensing, please contact us at
diff --git a/Makefile b/Makefile
index 588b7149f7..f7a0c361bc 100644
--- a/Makefile
+++ b/Makefile
@@ -1,9 +1,11 @@
PROJECT = rabbit
VERSION ?= $(call get_app_version,src/$(PROJECT).app.src)
-DEPS = ranch lager rabbit_common
+DEPS = ranch lager rabbit_common rabbitmq_cli
TEST_DEPS = rabbitmq_ct_helpers amqp_client meck proper
+dep_rabbitmq_cli = git_rmq rabbitmq-cli $(current_rmq_ref) $(base_rmq_ref) rabbitmq-cli-integration
+
define usage_xml_to_erl
$(subst __,_,$(patsubst $(DOCS_DIR)/rabbitmq%.1.xml, src/rabbit_%_usage.erl, $(subst -,_,$(1))))
endef
@@ -19,6 +21,10 @@ EXTRA_SOURCES += $(USAGES_ERL)
.DEFAULT_GOAL = all
$(PROJECT).d:: $(EXTRA_SOURCES)
+copy-escripts:
+ cp -r ${DEPS_DIR}/rabbitmq_cli/escript ./
+
+
DEP_PLUGINS = rabbit_common/mk/rabbitmq-build.mk \
rabbit_common/mk/rabbitmq-run.mk \
rabbit_common/mk/rabbitmq-dist.mk \
@@ -56,11 +62,14 @@ USE_PROPER_QC := $(shell $(ERL) -eval 'io:format({module, proper} =:= code:ensur
RMQ_ERLC_OPTS += $(if $(filter true,$(USE_PROPER_QC)),-Duse_proper_qc)
endif
-clean:: clean-extra-sources
+clean:: clean-extra-sources clean-escripts
clean-extra-sources:
$(gen_verbose) rm -f $(EXTRA_SOURCES)
+clean-escripts:
+ $(gen_verbose) rm -rf escript
+
# --------------------------------------------------------------------
# Documentation.
# --------------------------------------------------------------------
@@ -116,3 +125,5 @@ distclean:: distclean-manpages
distclean-manpages::
$(gen_verbose) rm -f $(MANPAGES) $(WEB_MANPAGES)
+
+app-build: copy-escripts
diff --git a/README.md b/README.md
index 9e2a046dff..16b56d0b33 100644
--- a/README.md
+++ b/README.md
@@ -17,14 +17,10 @@
* [RabbitMQ tutorials](http://www.rabbitmq.com/getstarted.html)
* [Documentation guides](http://www.rabbitmq.com/documentation.html)
+ * [Documentation Source Code](https://github.com/rabbitmq/rabbitmq-website/)
* [Client libraries and tools](http://www.rabbitmq.com/devtools.html)
-
-## Building From Source and Packaging
-
- * [Building RabbitMQ Server From Source](http://www.rabbitmq.com/build-server.html)
- * [Building RabbitMQ Server Packages](http://www.rabbitmq.com/build-server.html)
-
-
+ * [Tutorials Source Code](https://github.com/rabbitmq/rabbitmq-tutorials/)
+
## Getting Help
* [RabbitMQ mailing list](https://groups.google.com/forum/#!forum/rabbitmq-users)
@@ -42,9 +38,10 @@ See [CONTRIBUTING.md](./CONTRIBUTING.md) and our [development process overview](
RabbitMQ server is [licensed under the MPL](LICENSE-MPL-RabbitMQ).
-## Building From Source
+## Building From Source and Packaging
-See [building RabbitMQ server from source](http://www.rabbitmq.com/build-server.html).
+ * [Building RabbitMQ Server From Source](http://www.rabbitmq.com/build-server.html)
+ * [Building RabbitMQ Server Packages](http://www.rabbitmq.com/build-server.html)
## Copyright
diff --git a/docs/rabbitmq.conf.example b/docs/rabbitmq.conf.example
index f03145b447..db65572c2a 100644
--- a/docs/rabbitmq.conf.example
+++ b/docs/rabbitmq.conf.example
@@ -354,7 +354,12 @@
## Timeout used when waiting for Mnesia tables in a cluster to
## become available.
##
-# mnesia_table_loading_timeout = 30000
+# mnesia_table_loading_retry_timeout = 30000
+
+## Retries when waiting for Mnesia tables in the cluster startup. Note that
+## this setting is not applied to Mnesia upgrades or node deletions.
+##
+# mnesia_table_loading_retry_limit = 10
## Size in bytes below which to embed messages in the queue index. See
## http://www.rabbitmq.com/persistence-conf.html
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example
index b31ec4d673..3e1137aa8b 100644
--- a/docs/rabbitmq.config.example
+++ b/docs/rabbitmq.config.example
@@ -321,7 +321,12 @@
%% Timeout used when waiting for Mnesia tables in a cluster to
%% become available.
%%
- %% {mnesia_table_loading_timeout, 30000},
+ %% {mnesia_table_loading_retry_timeout, 30000},
+
+ %% Retries when waiting for Mnesia tables in the cluster startup. Note that
+ %% this setting is not applied to Mnesia upgrades or node deletions.
+ %%
+ %% {mnesia_table_loading_retry_limit, 10},
%% Size in bytes below which to embed messages in the queue index. See
%% http://www.rabbitmq.com/persistence-conf.html
diff --git a/priv/schema/rabbitmq.schema b/priv/schema/rabbitmq.schema
index 687101dd74..150a26b60d 100644
--- a/priv/schema/rabbitmq.schema
+++ b/priv/schema/rabbitmq.schema
@@ -845,9 +845,17 @@ end}.
%% Timeout used when waiting for Mnesia tables in a cluster to
%% become available.
%%
-%% {mnesia_table_loading_timeout, 30000},
+%% {mnesia_table_loading_retry_timeout, 30000},
-{mapping, "mnesia_table_loading_timeout", "rabbit.mnesia_table_loading_timeout",
+{mapping, "mnesia_table_loading_retry_timeout", "rabbit.mnesia_table_loading_retry_timeout",
+ [{datatype, integer}]}.
+
+%% Retries when waiting for Mnesia tables in the cluster startup. Note that
+%% this setting is not applied to Mnesia upgrades or node deletions.
+%%
+%% {mnesia_table_loading_retry_limit, 10},
+
+{mapping, "mnesia_table_loading_retry_limit", "rabbit.mnesia_table_loading_retry_limit",
[{datatype, integer}]}.
%% Size in bytes below which to embed messages in the queue index. See
diff --git a/rabbitmq.conf.d/rabbitmq.conf b/rabbitmq.conf.d/rabbitmq.conf
index e702ec08b4..6d43dc9f7f 100644
--- a/rabbitmq.conf.d/rabbitmq.conf
+++ b/rabbitmq.conf.d/rabbitmq.conf
@@ -344,7 +344,12 @@ hipe_compile = false
## Timeout used when waiting for Mnesia tables in a cluster to
## become available.
##
-mnesia_table_loading_timeout = 30000
+mnesia_table_loading_retry_timeout = 30000
+
+## Retries when waiting for Mnesia tables in the cluster startup. Note that
+## this setting is not applied to Mnesia upgrades or node deletions.
+##
+## mnesia_table_loading_retry_limit = 10
## Size in bytes below which to embed messages in the queue index. See
## http://www.rabbitmq.com/persistence-conf.html
diff --git a/scripts/rabbitmq-diagnostics b/scripts/rabbitmq-diagnostics
new file mode 100755
index 0000000000..40e9ec0177
--- /dev/null
+++ b/scripts/rabbitmq-diagnostics
@@ -0,0 +1,7 @@
+#!/usr/bin/env bash
+
+set -a
+
+. `dirname $0`/rabbitmq-env
+
+escript $ESCRIPT_DIR/rabbitmq-diagnostics "$@" \ No newline at end of file
diff --git a/scripts/rabbitmq-diagnostics.bat b/scripts/rabbitmq-diagnostics.bat
new file mode 100644
index 0000000000..e2032ad048
--- /dev/null
+++ b/scripts/rabbitmq-diagnostics.bat
@@ -0,0 +1,61 @@
+@echo off
+REM The contents of this file are subject to the Mozilla Public License
+REM Version 1.1 (the "License"); you may not use this file except in
+REM compliance with the License. You may obtain a copy of the License
+REM at http://www.mozilla.org/MPL/
+REM
+REM Software distributed under the License is distributed on an "AS IS"
+REM basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+REM the License for the specific language governing rights and
+REM limitations under the License.
+REM
+REM The Original Code is RabbitMQ.
+REM
+REM The Initial Developer of the Original Code is GoPivotal, Inc.
+REM Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
+REM
+
+REM Scopes the variables to the current batch file
+setlocal
+
+rem Preserve values that might contain exclamation marks before
+rem enabling delayed expansion
+set TDP0=%~dp0
+set STAR=%*
+setlocal enabledelayedexpansion
+
+REM Get default settings with user overrides for (RABBITMQ_)<var_name>
+REM Non-empty defaults should be set in rabbitmq-env
+call "%TDP0%\rabbitmq-env.bat" %~n0
+
+if not exist "!ERLANG_HOME!\bin\erl.exe" (
+ echo.
+ echo ******************************
+ echo ERLANG_HOME not set correctly.
+ echo ******************************
+ echo.
+ echo Please either set ERLANG_HOME to point to your Erlang installation or place the
+ echo RabbitMQ server distribution in the Erlang lib folder.
+ echo.
+ exit /B 1
+)
+
+REM Disable erl_crash.dump by default for control scripts.
+if not defined ERL_CRASH_DUMP_SECONDS (
+ set ERL_CRASH_DUMP_SECONDS=0
+)
+
+"!ERLANG_HOME!\bin\escript.exe" ^
+"%RABBITMQ_HOME%\escript\rabbitmq-diagnostics" !STAR!
+rem -pa "!RABBITMQ_HOME!\ebin" ^
+rem -noinput ^
+rem -hidden ^
+rem !RABBITMQ_CTL_ERL_ARGS! ^
+rem -sasl errlog_type error ^
+rem -mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
+rem -s rabbit_control_main ^
+rem -nodename !RABBITMQ_NODENAME! ^
+rem -extra !STAR!
+
+endlocal
+endlocal
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index 206bdd0c20..f1962e2b7b 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -58,6 +58,7 @@ rmq_realpath() {
}
RABBITMQ_HOME="$(rmq_realpath "${RABBITMQ_SCRIPTS_DIR}/..")"
+ESCRIPT_DIR="${RABBITMQ_HOME}/escript"
## Set defaults
. ${RABBITMQ_SCRIPTS_DIR}/rabbitmq-defaults
@@ -244,9 +245,8 @@ rmq_check_if_shared_with_mnesia \
## Development-specific environment.
if [ "${RABBITMQ_DEV_ENV}" ]; then
- if [ "$(basename "$0")" = 'rabbitmq-plugins' -a \( \
- "$RABBITMQ_PLUGINS_DIR_source" != 'environment' -o \
- "$RABBITMQ_ENABLED_PLUGINS_FILE_source" != 'environment' \) ]; then
+ if [ "$RABBITMQ_PLUGINS_DIR_source" != 'environment' -o \
+ "$RABBITMQ_ENABLED_PLUGINS_FILE_source" != 'environment' ]; then
# We need to query the running node for the plugins directory
# and the "enabled plugins" file.
eval $( (${RABBITMQ_SCRIPTS_DIR}/rabbitmqctl eval \
diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins
index d72df8ad86..c9b6408ea0 100755
--- a/scripts/rabbitmq-plugins
+++ b/scripts/rabbitmq-plugins
@@ -1,38 +1,7 @@
-#!/bin/sh -e
-## The contents of this file are subject to the Mozilla Public License
-## Version 1.1 (the "License"); you may not use this file except in
-## compliance with the License. You may obtain a copy of the License
-## at http://www.mozilla.org/MPL/
-##
-## Software distributed under the License is distributed on an "AS IS"
-## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-## the License for the specific language governing rights and
-## limitations under the License.
-##
-## The Original Code is RabbitMQ.
-##
-## The Initial Developer of the Original Code is GoPivotal, Inc.
-## Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
-##
+#!/usr/bin/env bash
-# Get default settings with user overrides for (RABBITMQ_)<var_name>
-# Non-empty defaults should be set in rabbitmq-env
-. `dirname $0`/rabbitmq-env
+set -a
-# Disable erl_crash.dump by default for control scripts.
-if [ -z "$ERL_CRASH_DUMP_SECONDS" ]; then
- export ERL_CRASH_DUMP_SECONDS=0
-fi
+. `dirname $0`/rabbitmq-env
-RABBITMQ_USE_LONGNAME=${RABBITMQ_USE_LONGNAME} \
-exec ${ERL_DIR}erl \
- -pa "${RABBITMQ_HOME}/ebin" \
- -noinput \
- -hidden \
- ${RABBITMQ_CTL_ERL_ARGS} \
- -boot "${CLEAN_BOOT_FILE}" \
- -s rabbit_plugins_main \
- -enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \
- -plugins_dist_dir "$RABBITMQ_PLUGINS_DIR" \
- -nodename $RABBITMQ_NODENAME \
- -extra "$@"
+escript $ESCRIPT_DIR/rabbitmq-plugins --formatter=plugins -q "$@" \ No newline at end of file
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
index c270d5d945..3cd8a7273a 100644
--- a/scripts/rabbitmq-plugins.bat
+++ b/scripts/rabbitmq-plugins.bat
@@ -45,16 +45,17 @@ if not defined ERL_CRASH_DUMP_SECONDS (
set ERL_CRASH_DUMP_SECONDS=0
)
-"!ERLANG_HOME!\bin\erl.exe" ^
--pa "!RABBITMQ_HOME!\ebin" ^
--noinput ^
--hidden ^
-!RABBITMQ_CTL_ERL_ARGS! ^
--s rabbit_plugins_main ^
--enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" ^
--plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" ^
--nodename !RABBITMQ_NODENAME! ^
--extra !STAR!
+"!ERLANG_HOME!\bin\escript.exe" ^
+"%RABBITMQ_HOME%\escript\rabbitmq-plugins" --formatter=plugins !STAR!
+rem -pa "!RABBITMQ_HOME!\ebin" ^
+rem -noinput ^
+rem -hidden ^
+rem !RABBITMQ_CTL_ERL_ARGS! ^
+rem -s rabbit_plugins_main ^
+rem -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" ^
+rem -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" ^
+rem -nodename !RABBITMQ_NODENAME! ^
+rem -extra !STAR!
endlocal
endlocal
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index 2336c3d466..048062b00d 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -1,41 +1,7 @@
-#!/bin/sh -e
-## The contents of this file are subject to the Mozilla Public License
-## Version 1.1 (the "License"); you may not use this file except in
-## compliance with the License. You may obtain a copy of the License
-## at http://www.mozilla.org/MPL/
-##
-## Software distributed under the License is distributed on an "AS IS"
-## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-## the License for the specific language governing rights and
-## limitations under the License.
-##
-## The Original Code is RabbitMQ.
-##
-## The Initial Developer of the Original Code is GoPivotal, Inc.
-## Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
-##
+#!/usr/bin/env bash
-# Get default settings with user overrides for (RABBITMQ_)<var_name>
-# Non-empty defaults should be set in rabbitmq-env
-. `dirname $0`/rabbitmq-env
+set -a
-# Disable erl_crash.dump by default for control scripts.
-if [ -z "$ERL_CRASH_DUMP_SECONDS" ]; then
- export ERL_CRASH_DUMP_SECONDS=0
-fi
+. `dirname $0`/rabbitmq-env
-# We specify Mnesia dir and sasl error logger since some actions
-# (e.g. forget_cluster_node --offline) require us to impersonate the
-# real node.
-RABBITMQ_USE_LONGNAME=${RABBITMQ_USE_LONGNAME} \
-exec ${ERL_DIR}erl \
- -pa "${RABBITMQ_HOME}/ebin" \
- -noinput +B \
- -hidden \
- ${RABBITMQ_CTL_ERL_ARGS} \
- -boot "${CLEAN_BOOT_FILE}" \
- -sasl errlog_type error \
- -mnesia dir "\"${RABBITMQ_MNESIA_DIR}\"" \
- -s rabbit_control_main \
- -nodename $RABBITMQ_NODENAME \
- -extra "$@"
+escript $ESCRIPT_DIR/rabbitmqctl "$@"
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index 56e856fb43..01e10195d2 100644
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -45,16 +45,17 @@ if not defined ERL_CRASH_DUMP_SECONDS (
set ERL_CRASH_DUMP_SECONDS=0
)
-"!ERLANG_HOME!\bin\erl.exe" ^
--pa "!RABBITMQ_HOME!\ebin" ^
--noinput ^
--hidden ^
-!RABBITMQ_CTL_ERL_ARGS! ^
--sasl errlog_type error ^
--mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
--s rabbit_control_main ^
--nodename !RABBITMQ_NODENAME! ^
--extra !STAR!
+"!ERLANG_HOME!\bin\escript.exe" ^
+"%RABBITMQ_HOME%\escript\rabbitmqctl" !STAR!
+rem -pa "!RABBITMQ_HOME!\ebin" ^
+rem -noinput ^
+rem -hidden ^
+rem !RABBITMQ_CTL_ERL_ARGS! ^
+rem -sasl errlog_type error ^
+rem -mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
+rem -s rabbit_control_main ^
+rem -nodename !RABBITMQ_NODENAME! ^
+rem -extra !STAR!
endlocal
endlocal
diff --git a/src/gm.erl b/src/gm.erl
index aa4ffcf511..3a1459fea4 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -1175,11 +1175,20 @@ record_new_member_in_group(NewMember, Left, GroupName, TxnFun) ->
try
Group = #gm_group { members = Members, version = Ver } =
check_membership(Left, read_group(GroupName)),
- {Prefix, [Left | Suffix]} =
- lists:splitwith(fun (M) -> M =/= Left end, Members),
- write_group(Group #gm_group {
- members = Prefix ++ [Left, NewMember | Suffix],
- version = Ver + 1 })
+ case lists:member(NewMember, Members) of
+ true ->
+ %% This avois duplicates during partial partitions,
+ %% as inconsistent views might happen during them
+ rabbit_log:warning("(~p) GM avoiding duplicate of ~p",
+ [self(), NewMember]),
+ Group;
+ false ->
+ {Prefix, [Left | Suffix]} =
+ lists:splitwith(fun (M) -> M =/= Left end, Members),
+ write_group(Group #gm_group {
+ members = Prefix ++ [Left, NewMember | Suffix],
+ version = Ver + 1 })
+ end
catch
lost_membership ->
%% The transaction must not be abruptly crashed, but
diff --git a/src/rabbit.app.src b/src/rabbit.app.src
index c06f7630fa..250b14bc32 100644
--- a/src/rabbit.app.src
+++ b/src/rabbit.app.src
@@ -12,7 +12,7 @@
rabbit_direct_client_sup]},
%% FIXME: Remove goldrush, once rabbit_plugins.erl knows how to ignore
%% indirect dependencies of rabbit.
- {applications, [kernel, stdlib, sasl, mnesia, goldrush, lager, rabbit_common, ranch, os_mon, xmerl]},
+ {applications, [kernel, stdlib, sasl, mnesia, goldrush, lager, rabbit_common, ranch, os_mon, xmerl, jsx]},
%% we also depend on crypto, public_key and ssl but they shouldn't be
%% in here as we don't actually want to start it
{mod, {rabbit, []}},
@@ -47,7 +47,8 @@
{server_properties, []},
{collect_statistics, none},
{collect_statistics_interval, 5000},
- {mnesia_table_loading_timeout, 30000},
+ {mnesia_table_loading_retry_timeout, 30000},
+ {mnesia_table_loading_retry_limit, 10},
{auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
{auth_backends, [rabbit_auth_backend_internal]},
{delegate_count, 16},
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 2581fd49f6..6dde2aca88 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -19,7 +19,7 @@
-behaviour(application).
-export([start/0, boot/0, stop/0,
- stop_and_halt/0, await_startup/0, status/0, is_running/0,
+ stop_and_halt/0, await_startup/0, status/0, is_running/0, alarms/0,
is_running/1, environment/0, rotate_logs/0, force_event_refresh/1,
start_fhc/0]).
-export([start/2, stop/1, prep_stop/1]).
@@ -1069,9 +1069,9 @@ ensure_working_fhc() ->
end,
TestPid = spawn_link(TestFun),
%% Because we are waiting for the test fun, abuse the
- %% 'mnesia_table_loading_timeout' parameter to find a sane timeout
+ %% 'mnesia_table_loading_retry_timeout' parameter to find a sane timeout
%% value.
- Timeout = rabbit_table:wait_timeout(),
+ Timeout = rabbit_table:retry_timeout(),
receive
fhc_ok -> ok;
{'EXIT', TestPid, Exception} -> throw({ensure_working_fhc, Exception})
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3e6d961d5f..25555156d6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1334,7 +1334,13 @@ handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) ->
%% This also has the side effect of waking us up so we emit a
%% stats event - so event consumers see the changed policy.
{ok, Q} = rabbit_amqqueue:lookup(Name),
- noreply(process_args_policy(State#q{q = Q})).
+ noreply(process_args_policy(State#q{q = Q}));
+
+handle_cast({sync_start, _, _}, State = #q{q = #amqqueue{name = Name}}) ->
+ %% Only a slave should receive this, it means we are a duplicated master
+ rabbit_mirror_queue_misc:log_warning(
+ Name, "Stopping after receiving sync_start from another master", []),
+ stop(State).
handle_info({maybe_expire, Vsn}, State = #q{args_policy_version = Vsn}) ->
case is_unused(State) of
diff --git a/src/rabbit_diagnostics.erl b/src/rabbit_diagnostics.erl
index d28bb9ffd7..e5df1d5baf 100644
--- a/src/rabbit_diagnostics.erl
+++ b/src/rabbit_diagnostics.erl
@@ -64,7 +64,6 @@ maybe_stuck_stacktrace({prim_inet, accept0, _}) -> false;
maybe_stuck_stacktrace({prim_inet, recv0, _}) -> false;
maybe_stuck_stacktrace({rabbit_heartbeat, heartbeater, _}) -> false;
maybe_stuck_stacktrace({rabbit_net, recv, _}) -> false;
-maybe_stuck_stacktrace({mochiweb_http, request, _}) -> false;
maybe_stuck_stacktrace({group, _, _}) -> false;
maybe_stuck_stacktrace({shell, _, _}) -> false;
maybe_stuck_stacktrace({io, _, _}) -> false;
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 29ba21d374..61623c9441 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -336,7 +336,12 @@ handle_cast({set_ram_duration_target, Duration},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
- noreply(State #state { backing_queue_state = BQS1 }).
+ noreply(State #state { backing_queue_state = BQS1 });
+
+handle_cast(policy_changed, State) ->
+ %% During partial partitions, we might end up receiving messages expected by a master
+ %% Ignore them
+ noreply(State).
handle_info(update_ram_duration, State = #state{backing_queue = BQ,
backing_queue_state = BQS}) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index d66f5b8fab..1ec9a46880 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -107,7 +107,7 @@ init() ->
false ->
NodeType = node_type(),
init_db_and_upgrade(cluster_nodes(all), NodeType,
- NodeType =:= ram)
+ NodeType =:= ram, _Retry = true)
end,
%% We intuitively expect the global name server to be synced when
%% Mnesia is up. In fact that's not guaranteed to be the case -
@@ -141,7 +141,7 @@ init_from_config() ->
e(invalid_cluster_nodes_conf)
end,
case DiscoveredNodes of
- [] -> init_db_and_upgrade([node()], disc, false);
+ [] -> init_db_and_upgrade([node()], disc, false, _Retry = true);
_ ->
rabbit_log:info("Discovered peer nodes: ~s~n",
[rabbit_peer_discovery:format_discovered_nodes(DiscoveredNodes)]),
@@ -153,14 +153,14 @@ auto_cluster(TryNodes, NodeType) ->
{ok, Node} ->
rabbit_log:info("Node '~p' selected for auto-clustering~n", [Node]),
{ok, {_, DiscNodes, _}} = discover_cluster0(Node),
- init_db_and_upgrade(DiscNodes, NodeType, true),
+ init_db_and_upgrade(DiscNodes, NodeType, true, _Retry = true),
rabbit_connection_tracking:boot(),
rabbit_node_monitor:notify_joined_cluster();
none ->
rabbit_log:warning(
"Could not find any node for auto-clustering from: ~p~n"
"Starting blank node...~n", [TryNodes]),
- init_db_and_upgrade([node()], disc, false)
+ init_db_and_upgrade([node()], disc, false, _Retry = true)
end.
%% Make the node join a cluster. The node will be reset automatically
@@ -200,7 +200,7 @@ join_cluster(DiscoveryNode, NodeType) ->
rabbit_log:info("Clustering with ~p as ~p node~n",
[ClusterNodes, NodeType]),
ok = init_db_with_mnesia(ClusterNodes, NodeType,
- true, true),
+ true, true, _Retry = true),
rabbit_connection_tracking:boot(),
rabbit_node_monitor:notify_joined_cluster(),
ok;
@@ -240,7 +240,7 @@ reset_gracefully() ->
%% need to check for consistency because we are resetting.
%% Force=true here so that reset still works when clustered with a
%% node which is down.
- init_db_with_mnesia(AllNodes, node_type(), false, false),
+ init_db_with_mnesia(AllNodes, node_type(), false, false, _Retry = false),
case is_only_clustered_disc_node() of
true -> e(resetting_only_disc_node);
false -> ok
@@ -289,7 +289,7 @@ update_cluster_nodes(DiscoveryNode) ->
rabbit_node_monitor:write_cluster_status(Status),
rabbit_log:info("Updating cluster nodes from ~p~n",
[DiscoveryNode]),
- init_db_with_mnesia(AllNodes, node_type(), true, true);
+ init_db_with_mnesia(AllNodes, node_type(), true, true, _Retry = false);
false ->
e(inconsistent_cluster)
end,
@@ -339,7 +339,7 @@ remove_node_offline_node(Node) ->
%% is by force loading the table, and making sure that
%% they are loaded.
rabbit_table:force_load(),
- rabbit_table:wait_for_replicated(),
+ rabbit_table:wait_for_replicated(_Retry = false),
%% We skip the 'node_deleted' event because the
%% application is stopped and thus, rabbit_event is not
%% enabled.
@@ -487,7 +487,7 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) ->
{[_ | _], _, _} ->
%% Subsequent node in cluster, catch up
maybe_force_load(),
- ok = rabbit_table:wait_for_replicated(),
+ ok = rabbit_table:wait_for_replicated(_Retry = true),
ok = rabbit_table:create_local_copy(NodeType)
end,
ensure_schema_integrity(),
@@ -497,7 +497,7 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) ->
init_db_unchecked(ClusterNodes, NodeType) ->
init_db(ClusterNodes, NodeType, false).
-init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes) ->
+init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes, Retry) ->
ok = init_db(ClusterNodes, NodeType, CheckOtherNodes),
ok = case rabbit_upgrade:maybe_upgrade_local() of
ok -> ok;
@@ -512,14 +512,14 @@ init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes) ->
disc -> ok
end,
%% ...and all nodes will need to wait for tables
- rabbit_table:wait_for_replicated(),
+ rabbit_table:wait_for_replicated(Retry),
ok.
init_db_with_mnesia(ClusterNodes, NodeType,
- CheckOtherNodes, CheckConsistency) ->
+ CheckOtherNodes, CheckConsistency, Retry) ->
start_mnesia(CheckConsistency),
try
- init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes)
+ init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes, Retry)
after
stop_mnesia()
end.
@@ -556,7 +556,7 @@ ensure_mnesia_not_running() ->
end.
ensure_schema_integrity() ->
- case rabbit_table:check_schema_integrity() of
+ case rabbit_table:check_schema_integrity(_Retry = true) of
ok ->
ok;
{error, Reason} ->
@@ -687,7 +687,7 @@ discover_cluster0(Node) ->
rpc:call(Node, rabbit_mnesia, cluster_status_from_mnesia, []).
schema_ok_or_move() ->
- case rabbit_table:check_schema_integrity() of
+ case rabbit_table:check_schema_integrity(_Retry = false) of
ok ->
ok;
{error, Reason} ->
diff --git a/src/rabbit_mnesia_rename.erl b/src/rabbit_mnesia_rename.erl
index 0c3e7c2366..2d7e0f56b6 100644
--- a/src/rabbit_mnesia_rename.erl
+++ b/src/rabbit_mnesia_rename.erl
@@ -193,7 +193,7 @@ delete_rename_files() -> ok = rabbit_file:recursive_delete([dir()]).
start_mnesia() -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
rabbit_table:force_load(),
- rabbit_table:wait_for_replicated().
+ rabbit_table:wait_for_replicated(_Retry = false).
stop_mnesia() -> stopped = mnesia:stop().
convert_backup(NodeMap, FromBackup, ToBackup) ->
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 7e39164882..cfbf116cbd 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -212,11 +212,12 @@ parse_set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
end.
parse_set0(Type, VHost, Name, Pattern, Defn, Priority, ApplyTo) ->
- case rabbit_misc:json_decode(Defn) of
- {ok, JSON} ->
+ Definition = rabbit_data_coercion:to_binary(Defn),
+ case rabbit_json:try_decode(Definition) of
+ {ok, Term} ->
set0(Type, VHost, Name,
[{<<"pattern">>, list_to_binary(Pattern)},
- {<<"definition">>, rabbit_misc:json_to_term(JSON)},
+ {<<"definition">>, maps:to_list(Term)},
{<<"priority">>, Priority},
{<<"apply-to">>, ApplyTo}]);
error ->
@@ -270,7 +271,7 @@ list_op(VHost) ->
list0_op(VHost, fun ident/1).
list_formatted_op(VHost) ->
- order_policies(list0_op(VHost, fun format/1)).
+ order_policies(list0_op(VHost, fun rabbit_json:encode/1)).
list_formatted_op(VHost, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map(AggregatorPid, Ref,
@@ -288,7 +289,7 @@ list(VHost) ->
list0(VHost, fun ident/1).
list_formatted(VHost) ->
- order_policies(list0(VHost, fun format/1)).
+ order_policies(list0(VHost, fun rabbit_json:encode/1)).
list_formatted(VHost, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map(AggregatorPid, Ref,
@@ -309,10 +310,6 @@ p(Parameter, DefnFun) ->
{definition, DefnFun(pget(<<"definition">>, Value))},
{priority, pget(<<"priority">>, Value)}].
-format(Term) ->
- {ok, JSON} = rabbit_misc:json_encode(rabbit_misc:term_to_json(Term)),
- list_to_binary(JSON).
-
ident(X) -> X.
info_keys() -> [vhost, name, 'apply-to', pattern, definition, priority].
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index 97f78da8ba..072a48be3d 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -97,9 +97,10 @@
parse_set(_, <<"policy">>, _, _, _) ->
{error_string, "policies may not be set using this method"};
parse_set(VHost, Component, Name, String, User) ->
- case rabbit_misc:json_decode(String) of
- {ok, JSON} -> set(VHost, Component, Name,
- rabbit_misc:json_to_term(JSON), User);
+ Definition = rabbit_data_coercion:to_binary(String),
+ case rabbit_json:try_decode(Definition) of
+ {ok, Term} when is_map(Term) -> set(VHost, Component, Name, maps:to_list(Term), User);
+ {ok, Term} -> set(VHost, Component, Name, Term, User);
error -> {error_string, "JSON decoding error"}
end.
@@ -235,12 +236,12 @@ list(VHost, Component) ->
end).
list_formatted(VHost) ->
- [pset(value, format(pget(value, P)), P) || P <- list(VHost)].
+ [pset(value, rabbit_json:encode(pget(value, P)), P) || P <- list(VHost)].
list_formatted(VHost, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map(
AggregatorPid, Ref,
- fun(P) -> pset(value, format(pget(value, P)), P) end, list(VHost)).
+ fun(P) -> pset(value, rabbit_json:encode(pget(value, P)), P) end, list(VHost)).
lookup(VHost, Component, Name) ->
case lookup0({VHost, Component, Name}, rabbit_misc:const(not_found)) of
@@ -303,10 +304,6 @@ lookup_component(Component) ->
{ok, Module} -> {ok, Module}
end.
-format(Term) ->
- {ok, JSON} = rabbit_misc:json_encode(rabbit_misc:term_to_json(Term)),
- list_to_binary(JSON).
-
flatten_errors(L) ->
case [{F, A} || I <- lists:flatten([L]), {error, F, A} <- [I]] of
[] -> ok;
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index 1bb19b23da..c8946e179d 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -16,24 +16,25 @@
-module(rabbit_table).
--export([create/0, create_local_copy/1, wait_for_replicated/0, wait/1,
+-export([create/0, create_local_copy/1, wait_for_replicated/1, wait/1,
force_load/0, is_present/0, is_empty/0, needs_default_data/0,
- check_schema_integrity/0, clear_ram_only_tables/0, wait_timeout/0]).
+ check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0]).
-include("rabbit.hrl").
%%----------------------------------------------------------------------------
+-type retry() :: boolean().
-spec create() -> 'ok'.
-spec create_local_copy('disc' | 'ram') -> 'ok'.
--spec wait_for_replicated() -> 'ok'.
+-spec wait_for_replicated(retry()) -> 'ok'.
-spec wait([atom()]) -> 'ok'.
--spec wait_timeout() -> non_neg_integer() | infinity.
+-spec retry_timeout() -> {non_neg_integer() | infinity, non_neg_integer()}.
-spec force_load() -> 'ok'.
-spec is_present() -> boolean().
-spec is_empty() -> boolean().
-spec needs_default_data() -> boolean().
--spec check_schema_integrity() -> rabbit_types:ok_or_error(any()).
+-spec check_schema_integrity(retry()) -> rabbit_types:ok_or_error(any()).
-spec clear_ram_only_tables() -> 'ok'.
%%----------------------------------------------------------------------------
@@ -75,25 +76,53 @@ create_local_copy(ram) ->
create_local_copies(ram),
create_local_copy(schema, ram_copies).
-wait_for_replicated() ->
+wait_for_replicated(Retry) ->
wait([Tab || {Tab, TabDef} <- definitions(),
- not lists:member({local_content, true}, TabDef)]).
+ not lists:member({local_content, true}, TabDef)], Retry).
wait(TableNames) ->
+ wait(TableNames, _Retry = false).
+
+wait(TableNames, Retry) ->
+ {Timeout, Retries} = retry_timeout(Retry),
+ wait(TableNames, Timeout, Retries).
+
+wait(TableNames, Timeout, Retries) ->
%% We might be in ctl here for offline ops, in which case we can't
%% get_env() for the rabbit app.
- Timeout = wait_timeout(),
- case mnesia:wait_for_tables(TableNames, Timeout) of
- ok ->
+ rabbit_log:info("Waiting for Mnesia tables for ~p ms, ~p retries left~n",
+ [Timeout, Retries - 1]),
+ Result = case mnesia:wait_for_tables(TableNames, Timeout) of
+ ok ->
+ ok;
+ {timeout, BadTabs} ->
+ {error, {timeout_waiting_for_tables, BadTabs}};
+ {error, Reason} ->
+ {error, {failed_waiting_for_tables, Reason}}
+ end,
+ case {Retries, Result} of
+ {_, ok} ->
ok;
- {timeout, BadTabs} ->
- throw({error, {timeout_waiting_for_tables, BadTabs}});
- {error, Reason} ->
- throw({error, {failed_waiting_for_tables, Reason}})
+ {1, {error, _} = Error} ->
+ throw(Error);
+ {_, {error, Error}} ->
+ rabbit_log:warning("Error while waiting for Mnesia tables: ~p~n", [Error]),
+ wait(TableNames, Timeout, Retries - 1);
+ _ ->
+ wait(TableNames, Timeout, Retries - 1)
end.
-wait_timeout() ->
- case application:get_env(rabbit, mnesia_table_loading_timeout) of
+retry_timeout(_Retry = false) ->
+ {retry_timeout(), 1};
+retry_timeout(_Retry = true) ->
+ Retries = case application:get_env(rabbit, mnesia_table_loading_retry_limit) of
+ {ok, T} -> T;
+ undefined -> 10
+ end,
+ {retry_timeout(), Retries}.
+
+retry_timeout() ->
+ case application:get_env(rabbit, mnesia_table_loading_retry_timeout) of
{ok, T} -> T;
undefined -> 30000
end.
@@ -110,7 +139,7 @@ is_empty(Names) ->
lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
Names).
-check_schema_integrity() ->
+check_schema_integrity(Retry) ->
Tables = mnesia:system_info(tables),
case check(fun (Tab, TabDef) ->
case lists:member(Tab, Tables) of
@@ -118,7 +147,7 @@ check_schema_integrity() ->
true -> check_attributes(Tab, TabDef)
end
end) of
- ok -> ok = wait(names()),
+ ok -> wait(names(), Retry),
check(fun check_content/2);
Other -> Other
end.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 961246058e..95604d08f4 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -682,25 +682,28 @@ ack([], State) ->
%% optimisation: this head is essentially a partial evaluation of the
%% general case below, for the single-ack case.
ack([SeqId], State) ->
- {#msg_status { msg_id = MsgId,
- is_persistent = IsPersistent,
- msg_in_store = MsgInStore,
- index_on_disk = IndexOnDisk },
- State1 = #vqstate { index_state = IndexState,
- msg_store_clients = MSCState,
- ack_out_counter = AckOutCount }} =
- remove_pending_ack(true, SeqId, State),
- IndexState1 = case IndexOnDisk of
- true -> rabbit_queue_index:ack([SeqId], IndexState);
- false -> IndexState
- end,
- case MsgInStore of
- true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
- false -> ok
- end,
- {[MsgId],
- a(State1 #vqstate { index_state = IndexState1,
- ack_out_counter = AckOutCount + 1 })};
+ case remove_pending_ack(true, SeqId, State) of
+ {none, _} ->
+ State;
+ {#msg_status { msg_id = MsgId,
+ is_persistent = IsPersistent,
+ msg_in_store = MsgInStore,
+ index_on_disk = IndexOnDisk },
+ State1 = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState,
+ ack_out_counter = AckOutCount }} ->
+ IndexState1 = case IndexOnDisk of
+ true -> rabbit_queue_index:ack([SeqId], IndexState);
+ false -> IndexState
+ end,
+ case MsgInStore of
+ true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
+ false -> ok
+ end,
+ {[MsgId],
+ a(State1 #vqstate { index_state = IndexState1,
+ ack_out_counter = AckOutCount + 1 })}
+ end;
ack(AckTags, State) ->
{{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
State1 = #vqstate { index_state = IndexState,
@@ -708,8 +711,12 @@ ack(AckTags, State) ->
ack_out_counter = AckOutCount }} =
lists:foldl(
fun (SeqId, {Acc, State2}) ->
- {MsgStatus, State3} = remove_pending_ack(true, SeqId, State2),
- {accumulate_ack(MsgStatus, Acc), State3}
+ case remove_pending_ack(true, SeqId, State2) of
+ {none, _} ->
+ {Acc, State2};
+ {MsgStatus, State3} ->
+ {accumulate_ack(MsgStatus, Acc), State3}
+ end
end, {accumulate_ack_init(), State}, AckTags),
IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
remove_msgs_by_id(MsgIdsByStore, MSCState),
@@ -2030,8 +2037,12 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
%% First parameter = UpdateStats
remove_pending_ack(true, SeqId, State) ->
- {MsgStatus, State1} = remove_pending_ack(false, SeqId, State),
- {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)};
+ case remove_pending_ack(false, SeqId, State) of
+ {none, _} ->
+ {none, State};
+ {MsgStatus, State1} ->
+ {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)}
+ end;
remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
disk_pending_ack = DPA,
qi_pending_ack = QPA}) ->
@@ -2043,9 +2054,13 @@ remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
DPA1 = gb_trees:delete(SeqId, DPA),
{V, State#vqstate{disk_pending_ack = DPA1}};
none ->
- QPA1 = gb_trees:delete(SeqId, QPA),
- {gb_trees:get(SeqId, QPA),
- State#vqstate{qi_pending_ack = QPA1}}
+ case gb_trees:lookup(SeqId, QPA) of
+ {value, V} ->
+ QPA1 = gb_trees:delete(SeqId, QPA),
+ {V, State#vqstate{qi_pending_ack = QPA1}};
+ none ->
+ {none, State}
+ end
end
end.
@@ -2196,11 +2211,15 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
Limit, PubFun, State);
{_, _Q1} ->
%% enqueue from the remaining list of sequence ids
- {MsgStatus, State1} = msg_from_pending_ack(SeqId, State),
- {#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
- PubFun(MsgStatus, State1),
- queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
- Limit, PubFun, State2)
+ case msg_from_pending_ack(SeqId, State) of
+ {none, _} ->
+ queue_merge(Rest, Q, Front, MsgIds, Limit, PubFun, State);
+ {MsgStatus, State1} ->
+ {#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
+ PubFun(MsgStatus, State1),
+ queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
+ Limit, PubFun, State2)
+ end
end;
queue_merge(SeqIds, Q, Front, MsgIds,
_Limit, _PubFun, State) ->
@@ -2209,22 +2228,28 @@ queue_merge(SeqIds, Q, Front, MsgIds,
delta_merge([], Delta, MsgIds, State) ->
{Delta, MsgIds, State};
delta_merge(SeqIds, Delta, MsgIds, State) ->
- lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) ->
- {#msg_status { msg_id = MsgId } = MsgStatus, State1} =
- msg_from_pending_ack(SeqId, State0),
- {_MsgStatus, State2} =
- maybe_prepare_write_to_disk(true, true, MsgStatus, State1),
- {expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
- stats({1, -1}, {MsgStatus, none}, State2)}
+ lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0} = Acc) ->
+ case msg_from_pending_ack(SeqId, State0) of
+ {none, _} ->
+ Acc;
+ {#msg_status { msg_id = MsgId } = MsgStatus, State1} ->
+ {_MsgStatus, State2} =
+ maybe_prepare_write_to_disk(true, true, MsgStatus, State1),
+ {expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
+ stats({1, -1}, {MsgStatus, none}, State2)}
+ end
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2
msg_from_pending_ack(SeqId, State) ->
- {#msg_status { msg_props = MsgProps } = MsgStatus, State1} =
- remove_pending_ack(false, SeqId, State),
- {MsgStatus #msg_status {
- msg_props = MsgProps #message_properties { needs_confirming = false } },
- State1}.
+ case remove_pending_ack(false, SeqId, State) of
+ {none, _} ->
+ {none, State};
+ {#msg_status { msg_props = MsgProps } = MsgStatus, State1} ->
+ {MsgStatus #msg_status {
+ msg_props = MsgProps #message_properties { needs_confirming = false } },
+ State1}
+ end.
beta_limit(Q) ->
case ?QUEUE:peek(Q) of
diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl
index b933c31402..287488b28b 100644
--- a/src/rabbit_vhost_limit.erl
+++ b/src/rabbit_vhost_limit.erl
@@ -124,9 +124,10 @@ is_over_queue_limit(VirtualHost) ->
%%----------------------------------------------------------------------------
parse_set(VHost, Defn) ->
- case rabbit_misc:json_decode(Defn) of
- {ok, JSON} ->
- set(VHost, rabbit_misc:json_to_term(JSON));
+ Definition = rabbit_data_coercion:to_binary(Defn),
+ case rabbit_json:try_decode(Definition) of
+ {ok, Term} ->
+ set(VHost, maps:to_list(Term));
error ->
{error_string, "JSON decoding error"}
end.
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 1e3e65abc4..7a6e290490 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -18,8 +18,7 @@
-export([memory/0, binary/0, ets_tables_memory/1]).
--define(MAGIC_PLUGINS, ["mochiweb", "webmachine", "cowboy", "sockjs",
- "rfc4627_jsonrpc"]).
+-define(MAGIC_PLUGINS, ["cowboy", "ranch", "sockjs"]).
%%----------------------------------------------------------------------------
diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl
index c40d624ddf..eac5fa3683 100644
--- a/test/clustering_management_SUITE.erl
+++ b/test/clustering_management_SUITE.erl
@@ -72,7 +72,10 @@ suite() ->
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
- rabbit_ct_helpers:run_setup_steps(Config).
+ Config1 = rabbit_ct_helpers:merge_app_env(
+ Config,
+ {rabbit, [{mnesia_table_loading_retry_limit, 1}]}),
+ rabbit_ct_helpers:run_setup_steps(Config1).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).