summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2017-05-08 18:01:10 +0300
committerMichael Klishin <michael@clojurewerkz.org>2017-05-08 18:01:10 +0300
commit7132664520b09cf6e15dc95e91ce73eedd4407e4 (patch)
tree34d0b51dabde73dbde00e2b928e4b52450eaaee8
parentd6fa2093375ac295b765261e34213de4a61960b9 (diff)
parent0bd4b78b3540c65648563714239d2a270c9816a4 (diff)
downloadrabbitmq-server-git-7132664520b09cf6e15dc95e91ce73eedd4407e4.tar.gz
Merge branch 'master' into rabbitmq-server-1146-full
Conflicts: Makefile
-rw-r--r--Makefile4
-rw-r--r--erlang.mk72
-rw-r--r--priv/schema/rabbitmq.schema7
-rw-r--r--scripts/rabbitmq-env.bat7
-rwxr-xr-xscripts/rabbitmq-server41
-rw-r--r--src/rabbit.erl14
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_config.erl38
-rw-r--r--src/rabbit_disk_monitor.erl49
-rw-r--r--src/rabbit_mirror_queue_master.erl28
-rw-r--r--src/rabbit_mirror_queue_slave.erl56
-rw-r--r--src/rabbit_mnesia.erl13
-rw-r--r--src/rabbit_msg_store.erl8
-rw-r--r--src/rabbit_msg_store_gc.erl8
-rw-r--r--src/rabbit_peer_discovery.erl119
-rw-r--r--src/rabbit_peer_discovery_classic_config.erl13
-rw-r--r--src/rabbit_peer_discovery_dns.erl36
-rw-r--r--src/rabbit_queue_consumers.erl5
-rw-r--r--src/rabbit_queue_index.erl22
-rw-r--r--test/peer_discovery_dns_SUITE.erl34
-rw-r--r--test/rabbitmqctl_shutdown_SUITE.erl21
-rw-r--r--test/unit_inbroker_non_parallel_SUITE.erl32
22 files changed, 455 insertions, 174 deletions
diff --git a/Makefile b/Makefile
index 03daeb96e1..799f9c4d65 100644
--- a/Makefile
+++ b/Makefile
@@ -116,6 +116,8 @@ define PROJECT_ENV
{background_gc_target_interval, 60000},
%% rabbitmq-server-589
{proxy_protocol, false},
+ {disk_monitor_failure_retries, 10},
+ {disk_monitor_failure_retry_interval, 120000},
%% can be stop_rabbit or give_up see rabbitmq-server-1458
{vhost_restart_strategy, stop_rabbit}
]
@@ -224,6 +226,8 @@ clean-escripts:
opt='--stringparam man.indent.verbatims=0' ; \
xsltproc --novalid $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \
xmlto -vv -o $(DOCS_DIR) $$opt man $< 2>&1 | (grep -v '^Note: Writing' || :) && \
+ awk -F"'u " '/^\.HP / { print $$1; print $$2; next; } { print; }' "$@" > "$@.tmp" && \
+ mv "$@.tmp" "$@" && \
test -f $@ && \
rm $<.tmp
diff --git a/erlang.mk b/erlang.mk
index 5c290dc767..377a72025b 100644
--- a/erlang.mk
+++ b/erlang.mk
@@ -16,7 +16,7 @@
ERLANG_MK_FILENAME := $(realpath $(lastword $(MAKEFILE_LIST)))
-ERLANG_MK_VERSION = 2.0.0-pre.2-207-g9e9b7d2
+ERLANG_MK_VERSION = 2.0.0-pre.2-220-g7a200f5
# Make 3.81 and 3.82 are deprecated.
@@ -4203,17 +4203,16 @@ endif
# in practice only Makefile is needed so far.
define dep_autopatch
if [ -f $(DEPS_DIR)/$(1)/erlang.mk ]; then \
+ rm -rf $(DEPS_DIR)/$1/ebin/; \
$(call erlang,$(call dep_autopatch_appsrc.erl,$(1))); \
$(call dep_autopatch_erlang_mk,$(1)); \
elif [ -f $(DEPS_DIR)/$(1)/Makefile ]; then \
if [ 0 != `grep -c "include ../\w*\.mk" $(DEPS_DIR)/$(1)/Makefile` ]; then \
$(call dep_autopatch2,$(1)); \
- elif [ 0 != `grep -ci rebar $(DEPS_DIR)/$(1)/Makefile` ]; then \
+ elif [ 0 != `grep -ci "^[^#].*rebar" $(DEPS_DIR)/$(1)/Makefile` ]; then \
$(call dep_autopatch2,$(1)); \
- elif [ -n "`find $(DEPS_DIR)/$(1)/ -type f -name \*.mk -not -name erlang.mk -exec grep -i rebar '{}' \;`" ]; then \
+ elif [ -n "`find $(DEPS_DIR)/$(1)/ -type f -name \*.mk -not -name erlang.mk -exec grep -i "^[^#].*rebar" '{}' \;`" ]; then \
$(call dep_autopatch2,$(1)); \
- else \
- $(call erlang,$(call dep_autopatch_app.erl,$(1))); \
fi \
else \
if [ ! -d $(DEPS_DIR)/$(1)/src/ ]; then \
@@ -4225,6 +4224,8 @@ define dep_autopatch
endef
define dep_autopatch2
+ mv -n $(DEPS_DIR)/$1/ebin/$1.app $(DEPS_DIR)/$1/src/$1.app.src; \
+ rm -f $(DEPS_DIR)/$1/ebin/$1.app; \
if [ -f $(DEPS_DIR)/$1/src/$1.app.src.script ]; then \
$(call erlang,$(call dep_autopatch_appsrc_script.erl,$(1))); \
fi; \
@@ -4536,22 +4537,6 @@ define dep_autopatch_rebar.erl
halt()
endef
-define dep_autopatch_app.erl
- UpdateModules = fun(App) ->
- case filelib:is_regular(App) of
- false -> ok;
- true ->
- {ok, [{application, '$(1)', L0}]} = file:consult(App),
- Mods = filelib:fold_files("$(call core_native_path,$(DEPS_DIR)/$1/src)", "\\\\.erl$$", true,
- fun (F, Acc) -> [list_to_atom(filename:rootname(filename:basename(F)))|Acc] end, []),
- L = lists:keystore(modules, 1, L0, {modules, Mods}),
- ok = file:write_file(App, io_lib:format("~p.~n", [{application, '$(1)', L}]))
- end
- end,
- UpdateModules("$(call core_native_path,$(DEPS_DIR)/$1/ebin/$1.app)"),
- halt()
-endef
-
define dep_autopatch_appsrc_script.erl
AppSrc = "$(call core_native_path,$(DEPS_DIR)/$1/src/$1.app.src)",
AppSrcScript = AppSrc ++ ".script",
@@ -4828,6 +4813,8 @@ COMPILE_FIRST_PATHS = $(addprefix src/,$(addsuffix .erl,$(COMPILE_FIRST)))
ERLC_EXCLUDE ?=
ERLC_EXCLUDE_PATHS = $(addprefix src/,$(addsuffix .erl,$(ERLC_EXCLUDE)))
+ERLC_ASN1_OPTS ?=
+
ERLC_MIB_OPTS ?=
COMPILE_MIB_FIRST ?=
COMPILE_MIB_FIRST_PATHS = $(addprefix mibs/,$(addsuffix .mib,$(COMPILE_MIB_FIRST)))
@@ -4877,7 +4864,7 @@ endif
ifeq ($(wildcard src/$(PROJECT_MOD).erl),)
define app_file
-{application, $(PROJECT), [
+{application, '$(PROJECT)', [
{description, "$(PROJECT_DESCRIPTION)"},
{vsn, "$(PROJECT_VERSION)"},$(if $(IS_DEP),
{id$(comma)$(space)"$(1)"}$(comma))
@@ -4889,7 +4876,7 @@ define app_file
endef
else
define app_file
-{application, $(PROJECT), [
+{application, '$(PROJECT)', [
{description, "$(PROJECT_DESCRIPTION)"},
{vsn, "$(PROJECT_VERSION)"},$(if $(IS_DEP),
{id$(comma)$(space)"$(1)"}$(comma))
@@ -4920,7 +4907,7 @@ ERL_FILES += $(addprefix src/,$(patsubst %.asn1,%.erl,$(notdir $(ASN1_FILES))))
define compile_asn1
$(verbose) mkdir -p include/
- $(asn1_verbose) erlc -v -I include/ -o asn1/ +noobj $(1)
+ $(asn1_verbose) erlc -v -I include/ -o asn1/ +noobj $(ERLC_ASN1_OPTS) $(1)
$(verbose) mv asn1/*.erl src/
$(verbose) mv asn1/*.hrl include/
$(verbose) mv asn1/*.asn1db include/
@@ -5052,7 +5039,7 @@ $(ERL_FILES) $(CORE_FILES) $(ASN1_FILES) $(MIB_FILES) $(XRL_FILES) $(YRL_FILES):
ebin/$(PROJECT).app:: $(ERLANG_MK_TMP)/last-makefile-change
endif
--include $(PROJECT).d
+include $(wildcard $(PROJECT).d)
ebin/$(PROJECT).app:: ebin/
@@ -5277,6 +5264,7 @@ MAN_VERSION ?= $(PROJECT_VERSION)
define asciidoc2man.erl
try
[begin
+ io:format(" ADOC ~s~n", [F]),
ok = asciideck:to_manpage(asciideck:parse_file(F), #{
compress => gzip,
outdir => filename:dirname(F),
@@ -5285,7 +5273,8 @@ try
})
end || F <- [$(shell echo $(addprefix $(comma)\",$(addsuffix \",$1)) | sed 's/^.//')]],
halt(0)
-catch _:_ ->
+catch C:E ->
+ io:format("Exception ~p:~p~nStacktrace: ~p~n", [C, E, erlang:get_stacktrace()]),
halt(1)
end.
endef
@@ -6123,6 +6112,7 @@ CT_SUITES := $(sort $(subst _SUITE.erl,,$(notdir $(call core_find,$(TEST_DIR)/,*
endif
endif
CT_SUITES ?=
+CT_LOGS_DIR ?= $(CURDIR)/logs
# Core targets.
@@ -6145,13 +6135,13 @@ CT_RUN = ct_run \
-noinput \
-pa $(CURDIR)/ebin $(DEPS_DIR)/*/ebin $(APPS_DIR)/*/ebin $(TEST_DIR) \
-dir $(TEST_DIR) \
- -logdir $(CURDIR)/logs
+ -logdir $(CT_LOGS_DIR)
ifeq ($(CT_SUITES),)
ct: $(if $(IS_APP),,apps-ct)
else
ct: test-build $(if $(IS_APP),,apps-ct)
- $(verbose) mkdir -p $(CURDIR)/logs/
+ $(verbose) mkdir -p $(CT_LOGS_DIR)
$(gen_verbose) $(CT_RUN) -sname ct_$(PROJECT) -suite $(addsuffix _SUITE,$(CT_SUITES)) $(CT_OPTS)
endif
@@ -6179,14 +6169,14 @@ endif
define ct_suite_target
ct-$(1): test-build
- $(verbose) mkdir -p $(CURDIR)/logs/
+ $(verbose) mkdir -p $(CT_LOGS_DIR)
$(gen_verbose) $(CT_RUN) -sname ct_$(PROJECT) -suite $(addsuffix _SUITE,$(1)) $(CT_EXTRA) $(CT_OPTS)
endef
$(foreach test,$(CT_SUITES),$(eval $(call ct_suite_target,$(test))))
distclean-ct:
- $(gen_verbose) rm -rf $(CURDIR)/logs/
+ $(gen_verbose) rm -rf $(CT_LOGS_DIR)
# Copyright (c) 2013-2016, Loïc Hoguin <essen@ninenines.eu>
# This file is part of erlang.mk and subject to the terms of the ISC License.
@@ -6232,8 +6222,10 @@ define filter_opts.erl
endef
$(DIALYZER_PLT): deps app
- $(verbose) dialyzer --build_plt --apps erts kernel stdlib $(PLT_APPS) $(OTP_DEPS) $(LOCAL_DEPS) \
- `test -f $(ERLANG_MK_TMP)/deps.log && cat $(ERLANG_MK_TMP)/deps.log`
+ $(eval DEPS_LOG := $(shell test -f $(ERLANG_MK_TMP)/deps.log && \
+ while read p; do test -d $$p/ebin && echo $$p/ebin; done <$(ERLANG_MK_TMP)/deps.log))
+ $(verbose) dialyzer --build_plt --apps erts kernel stdlib \
+ $(PLT_APPS) $(OTP_DEPS) $(LOCAL_DEPS) $(DEPS_LOG)
plt: $(DIALYZER_PLT)
@@ -6319,7 +6311,7 @@ escript:: escript-zip
$(verbose) chmod +x $(ESCRIPT_FILE)
distclean-escript:
- $(gen_verbose) rm -f $(ESCRIPT_NAME)
+ $(gen_verbose) rm -f $(ESCRIPT_FILE)
# Copyright (c) 2015-2016, Loïc Hoguin <essen@ninenines.eu>
# Copyright (c) 2014, Enrique Fernandez <enrique.fernandez@erlang-solutions.com>
@@ -6495,6 +6487,20 @@ build-shell-deps: $(ALL_SHELL_DEPS_DIRS)
shell: build-shell-deps
$(gen_verbose) $(SHELL_ERL) -pa $(SHELL_PATHS) $(SHELL_OPTS)
+# Copyright (c) 2017, Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com>
+# This file is contributed to erlang.mk and subject to the terms of the ISC License.
+
+.PHONY: show-ERL_LIBS show-ERLC_OPTS show-TEST_ERLC_OPTS
+
+show-ERL_LIBS:
+ @echo $(ERL_LIBS)
+
+show-ERLC_OPTS:
+ @$(foreach opt,$(ERLC_OPTS) -pa ebin -I include,echo "$(opt)";)
+
+show-TEST_ERLC_OPTS:
+ @$(foreach opt,$(TEST_ERLC_OPTS) -pa ebin -I include,echo "$(opt)";)
+
# Copyright (c) 2015-2016, Loïc Hoguin <essen@ninenines.eu>
# This file is part of erlang.mk and subject to the terms of the ISC License.
diff --git a/priv/schema/rabbitmq.schema b/priv/schema/rabbitmq.schema
index 6f3f42e4ed..fab07baeb4 100644
--- a/priv/schema/rabbitmq.schema
+++ b/priv/schema/rabbitmq.schema
@@ -999,7 +999,7 @@ fun(Conf) ->
[{lager_console_backend, ConsoleLevel}];
false -> []
end,
- FileHandler = case cuttlefish:conf_get("log.file", Conf, false) of
+ FileHandler = case cuttlefish:conf_get("log.file", Conf, undefined) of
false -> [];
File ->
FileLevel = cuttlefish:conf_get("log.file.level", Conf, info),
@@ -1010,6 +1010,11 @@ fun(Conf) ->
{level, FileLevel},
{date, RotationDate},
{size, RotationSize},
+ {formatter_config,
+ [date, " ", time, " ", color,
+ "[", severity, "] ",
+ {pid, ""},
+ " ", message, "\n"]},
{count, RotationCount}]}]
end,
SyslogHandler = case cuttlefish:conf_get("log.syslog", Conf, false) of
diff --git a/scripts/rabbitmq-env.bat b/scripts/rabbitmq-env.bat
index 56b2f69b2d..9d7a724318 100644
--- a/scripts/rabbitmq-env.bat
+++ b/scripts/rabbitmq-env.bat
@@ -417,7 +417,12 @@ set paths=
exit /b
:filter_path
-set ERL_LIBS=%ERL_LIBS%;%~dps1%~n1%~x1
+REM Ensure ERL_LIBS begins with valid path
+IF [%ERL_LIBS%] EQU [] (
+ set ERL_LIBS=%~dps1%~n1%~x1
+) else (
+ set ERL_LIBS=%ERL_LIBS%;%~dps1%~n1%~x1
+)
exit /b
:filter_paths_done
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 41d1a81332..a318b55d75 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -1,4 +1,4 @@
-#!/bin/sh -e
+#!/bin/sh
## The contents of this file are subject to the Mozilla Public License
## Version 1.1 (the "License"); you may not use this file except in
## compliance with the License. You may obtain a copy of the License
@@ -12,9 +12,11 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is GoPivotal, Inc.
-## Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
+## Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
##
+set -e
+
# Get default settings with user overrides for (RABBITMQ_)<var_name>
# Non-empty defaults should be set in rabbitmq-env
. `dirname $0`/rabbitmq-env
@@ -278,21 +280,38 @@ else
# The Erlang VM should ignore SIGINT.
RABBITMQ_SERVER_START_ARGS="${RABBITMQ_SERVER_START_ARGS} ${RABBITMQ_IGNORE_SIGINT_FLAG}"
- # Signal handlers. They all stop RabbitMQ properly (using
- # rabbitmqctl stop). Depending on the signal, this script will exit
- # with a non-zero error code:
+ # Signal handlers. They all stop RabbitMQ properly, using
+ # rabbitmqctl stop. This script will exit with different exit codes:
# SIGHUP SIGTERM SIGTSTP
- # They are considered a normal process termination, so the script
- # exits with 0.
+ # Exits 0 since this is considered a normal process termination.
# SIGINT
- # They are considered an abnormal process termination, the script
- # exits with the job exit code.
+ # Exits 128 + $signal_number where $signal_number is 2 for SIGINT (see
+ # http://pubs.opengroup.org/onlinepubs/009695399/utilities/kill.html).
+ # This is considered an abnormal process termination. Normally, we
+ # don't need to specify this exit code because the shell propagates it.
+ # Unfortunately, the signal handler doesn't work as expected in Dash,
+ # thus we need to explicitely restate the exit code.
trap "stop_rabbitmq_server; exit 0" HUP TERM TSTP
- trap "stop_rabbitmq_server" INT
+ trap "stop_rabbitmq_server; exit 130" INT
start_rabbitmq_server "$@" &
+ rabbitmq_server_pid=$!
# Block until RabbitMQ exits or a signal is caught.
# Waits for last command (which is start_rabbitmq_server)
- wait $!
+ #
+ # The "|| true" is here to work around an issue with Dash. Normally
+ # in a Bourne shell, if `wait` is interrupted by a signal, the
+ # signal handlers defined above are executed and the script
+ # terminates with the exit code of `wait` (unless the signal handler
+ # overrides that).
+ # In the case of Dash, it looks like `set -e` (set at the beginning
+ # of this script) gets precedence over signal handling. Therefore,
+ # when `wait` is interrupted, its exit code is non-zero and because
+ # of `set -e`, the script terminates immediately without running the
+ # signal handler. To work around this issue, we use "|| true" to
+ # force that statement to succeed and the signal handler to properly
+ # execute. Because the statement below has an exit code of 0, the
+ # signal handler has to restate the expected exit code.
+ wait $rabbitmq_server_pid || true
fi
diff --git a/src/rabbit.erl b/src/rabbit.erl
index b63d624293..8e6c9ead26 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -22,7 +22,7 @@
stop_and_halt/0, await_startup/0, status/0, is_running/0, alarms/0,
is_running/1, environment/0, rotate_logs/0, force_event_refresh/1,
start_fhc/0]).
--export([start/2, stop/1]).
+-export([start/2, stop/1, prep_stop/1]).
-export([start_apps/1, stop_apps/1]).
-export([log_locations/0, config_files/0, decrypt_config/2]). %% for testing and mgmt-agent
@@ -327,7 +327,11 @@ broker_start() ->
ToBeLoaded = Plugins ++ ?APPS,
start_apps(ToBeLoaded),
maybe_sd_notify(),
- ok = log_broker_started(rabbit_plugins:strictly_plugins(rabbit_plugins:active())).
+ ok = log_broker_started(rabbit_plugins:strictly_plugins(rabbit_plugins:active())),
+ %% See rabbitmq/rabbitmq-server#1202 for details.
+ rabbit_peer_discovery:maybe_inject_randomized_delay(),
+ rabbit_peer_discovery:maybe_register(),
+ ok.
%% Try to send systemd ready notification if it makes sense in the
%% current environment. standard_error is used intentionally in all
@@ -471,6 +475,8 @@ stop() ->
end,
rabbit_log:info("RabbitMQ is asked to stop...~n", []),
Apps = ?APPS ++ rabbit_plugins:active(),
+ %% this will also perform unregistration with the peer discovery backend
+ %% as needed
stop_apps(app_utils:app_dependency_order(Apps, true)),
rabbit_log:info("Successfully stopped RabbitMQ and its dependencies~n", []).
@@ -759,6 +765,10 @@ start(normal, []) ->
Error
end.
+prep_stop(State) ->
+ rabbit_peer_discovery:maybe_unregister(),
+ State.
+
stop(_State) ->
ok = rabbit_alarm:stop(),
ok = case rabbit_mnesia:is_clustered() of
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e9f91041e7..c52d329392 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -99,7 +99,7 @@
-spec info_keys() -> rabbit_types:info_keys().
-spec init_with_backing_queue_state
(rabbit_types:amqqueue(), atom(), tuple(), any(),
- [rabbit_types:delivery()], pmon:pmon(), dict:dict()) ->
+ [rabbit_types:delivery()], pmon:pmon(), gb_trees:tree()) ->
#q{}.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_config.erl b/src/rabbit_config.erl
index 67e7523ec0..9e70898c82 100644
--- a/src/rabbit_config.erl
+++ b/src/rabbit_config.erl
@@ -33,10 +33,10 @@ prepare_and_use_config() ->
legacy_erlang_term_config_used() ->
case init:get_argument(config) of
error -> false;
- {ok, [Config | _]} ->
+ {ok, [Config | _]} ->
ConfigFile = Config ++ ".config",
- rabbit_file:is_file(ConfigFile)
- andalso
+ rabbit_file:is_file(ConfigFile)
+ andalso
get_advanced_config() == none
end.
@@ -66,10 +66,34 @@ prepare_config(Configs) ->
end.
update_app_config(ConfigFile) ->
- ok = application_controller:change_application_data([], [ConfigFile]).
+ RunningApps = [ App || {App, _, _} <- application:which_applications() ],
+ LoadedApps = [ App || {App, _, _} <- application:loaded_applications() ],
+ {ok, [Config]} = file:consult(ConfigFile),
+ %% For application config to be updated, applications should
+ %% be unloaded first.
+ %% If an application is already running, print an error.
+ lists:foreach(fun({App, _Config}) ->
+ case lists:member(App, RunningApps) of
+ true ->
+ io:format(standard_error,
+ "~nUnable to update config for app ~p from *.conf file."
+ " App is already running. Use advanced.config instead.~n",
+ [App]);
+ false ->
+ case lists:member(App, LoadedApps) of
+ true -> application:unload(App);
+ false -> ok
+ end
+ end
+ end,
+ Config),
+ ok = application_controller:change_application_data([], [ConfigFile]),
+ %% Make sure to load all the applications we're unloaded
+ lists:foreach(fun(App) -> application:load(App) end, LoadedApps),
+ ok.
generate_config_file(ConfFiles, ConfDir, ScriptDir) ->
- generate_config_file(ConfFiles, ConfDir, ScriptDir,
+ generate_config_file(ConfFiles, ConfDir, ScriptDir,
schema_dir(), get_advanced_config()).
@@ -145,8 +169,8 @@ config_files() ->
{ok, Files} -> [Abs(File, ".config") || [File] <- Files];
error -> case config_setting() of
none -> [];
- File -> [Abs(File, ".config")
- ++
+ File -> [Abs(File, ".config")
+ ++
" (not found)"]
end
end;
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index b2548cb61a..868fc1a4aa 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -65,12 +65,17 @@
alarmed,
%% is monitoring enabled? false on unsupported
%% platforms
- enabled
+ enabled,
+ %% number of retries to enable monitoring if it fails
+ %% on start-up
+ retries,
+ %% Interval between retries
+ interval
}).
%%----------------------------------------------------------------------------
--type disk_free_limit() :: (integer() | string() | {'mem_relative', float()}).
+-type disk_free_limit() :: (integer() | string() | {'mem_relative', float() | integer()}).
-spec start_link(disk_free_limit()) -> rabbit_types:ok_pid_or_error().
-spec get_disk_free_limit() -> integer().
-spec set_disk_free_limit(disk_free_limit()) -> 'ok'.
@@ -114,20 +119,17 @@ start_link(Args) ->
init([Limit]) ->
Dir = dir(),
+ {ok, Retries} = application:get_env(rabbit, disk_monitor_failure_retries),
+ {ok, Interval} = application:get_env(rabbit, disk_monitor_failure_retry_interval),
State = #state{dir = Dir,
min_interval = ?DEFAULT_MIN_DISK_CHECK_INTERVAL,
max_interval = ?DEFAULT_MAX_DISK_CHECK_INTERVAL,
alarmed = false,
- enabled = true},
- case {catch get_disk_free(Dir),
- vm_memory_monitor:get_total_memory()} of
- {N1, N2} when is_integer(N1), is_integer(N2) ->
- {ok, start_timer(set_disk_limits(State, Limit))};
- Err ->
- rabbit_log:info("Disabling disk free space monitoring "
- "on unsupported platform:~n~p~n", [Err]),
- {ok, State#state{enabled = false}}
- end.
+ enabled = true,
+ limit = Limit,
+ retries = Retries,
+ interval = Interval},
+ {ok, enable(State)}.
handle_call(get_disk_free_limit, _From, State = #state{limit = Limit}) ->
{reply, Limit, State};
@@ -161,6 +163,8 @@ handle_call(_Request, _From, State) ->
handle_cast(_Request, State) ->
{noreply, State}.
+handle_info(try_enable, #state{retries = Retries} = State) ->
+ {noreply, enable(State#state{retries = Retries - 1})};
handle_info(update, State) ->
{noreply, start_timer(internal_update(State))};
@@ -233,7 +237,7 @@ parse_free_win32(CommandResult) ->
list_to_integer(lists:reverse(Free)).
interpret_limit({mem_relative, Relative})
- when is_float(Relative) ->
+ when is_number(Relative) ->
round(Relative * vm_memory_monitor:get_total_memory());
interpret_limit(Absolute) ->
case rabbit_resource_monitor_misc:parse_information_unit(Absolute) of
@@ -246,7 +250,7 @@ interpret_limit(Absolute) ->
emit_update_info(StateStr, CurrentFree, Limit) ->
rabbit_log:info(
- "Disk free space ~s. Free bytes:~p Limit:~p~n",
+ "Free disk space is ~s. Free bytes: ~p. Limit: ~p~n",
[StateStr, CurrentFree, Limit]).
start_timer(State) ->
@@ -261,3 +265,20 @@ interval(#state{limit = Limit,
max_interval = MaxInterval}) ->
IdealInterval = 2 * (Actual - Limit) / ?FAST_RATE,
trunc(erlang:max(MinInterval, erlang:min(MaxInterval, IdealInterval))).
+
+enable(#state{retries = 0} = State) ->
+ State;
+enable(#state{dir = Dir, interval = Interval, limit = Limit, retries = Retries}
+ = State) ->
+ case {catch get_disk_free(Dir),
+ vm_memory_monitor:get_total_memory()} of
+ {N1, N2} when is_integer(N1), is_integer(N2) ->
+ rabbit_log:info("Enabling free disk space monitoring~n", []),
+ start_timer(set_disk_limits(State, Limit));
+ Err ->
+ rabbit_log:info("Free disk space monitor encountered an error "
+ "(e.g. failed to parse output from OS tools): ~p, retries left: ~s~n",
+ [Err, Retries]),
+ timer:send_after(Interval, self(), try_enable),
+ State#state{enabled = false}
+ end.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 94710aed43..fefa0de1c9 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -57,13 +57,13 @@
coordinator :: pid(),
backing_queue :: atom(),
backing_queue_state :: any(),
- seen_status :: dict:dict(),
+ seen_status :: map(),
confirmed :: [rabbit_guid:guid()],
known_senders :: sets:set()
}.
-spec promote_backing_queue_state
(rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()],
- dict:dict(), [pid()]) ->
+ map(), [pid()]) ->
master_state().
-spec sender_death_fun() -> death_fun().
@@ -127,7 +127,7 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- seen_status = dict:new(),
+ seen_status = #{},
confirmed = [],
known_senders = sets:new(),
wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) };
@@ -266,7 +266,7 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- false = dict:is_key(MsgId, SS), %% ASSERTION
+ false = maps:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish, ChPid, Flow, MsgProps, Msg},
rabbit_basic:msg_size(Msg)),
BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS),
@@ -281,7 +281,7 @@ batch_publish(Publishes, ChPid, Flow,
lists:foldl(fun ({Msg = #basic_message { id = MsgId },
MsgProps, _IsDelivered}, {Pubs, false, Sizes}) ->
{[{Msg, MsgProps, true} | Pubs], %% [0]
- false = dict:is_key(MsgId, SS), %% ASSERTION
+ false = maps:is_key(MsgId, SS), %% ASSERTION
Sizes + rabbit_basic:msg_size(Msg)}
end, {[], false, 0}, Publishes),
Publishes2 = lists:reverse(Publishes1),
@@ -298,7 +298,7 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- false = dict:is_key(MsgId, SS), %% ASSERTION
+ false = maps:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish_delivered, ChPid, Flow, MsgProps, Msg},
rabbit_basic:msg_size(Msg)),
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS),
@@ -313,7 +313,7 @@ batch_publish_delivered(Publishes, ChPid, Flow,
{false, MsgSizes} =
lists:foldl(fun ({Msg = #basic_message { id = MsgId }, _MsgProps},
{false, Sizes}) ->
- {false = dict:is_key(MsgId, SS), %% ASSERTION
+ {false = maps:is_key(MsgId, SS), %% ASSERTION
Sizes + rabbit_basic:msg_size(Msg)}
end, {false, 0}, Publishes),
ok = gm:broadcast(GM, {batch_publish_delivered, ChPid, Flow, Publishes},
@@ -326,7 +326,7 @@ discard(MsgId, ChPid, Flow, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
seen_status = SS }) ->
- false = dict:is_key(MsgId, SS), %% ASSERTION
+ false = maps:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}),
ensure_monitoring(ChPid,
State #state { backing_queue_state =
@@ -353,7 +353,7 @@ drain_confirmed(State = #state { backing_queue = BQ,
lists:foldl(
fun (MsgId, {MsgIdsN, SSN}) ->
%% We will never see 'discarded' here
- case dict:find(MsgId, SSN) of
+ case maps:find(MsgId, SSN) of
error ->
{[MsgId | MsgIdsN], SSN};
{ok, published} ->
@@ -364,7 +364,7 @@ drain_confirmed(State = #state { backing_queue = BQ,
%% consequently we need to filter out the
%% confirm here. We will issue the confirm
%% when we see the publish from the channel.
- {MsgIdsN, dict:store(MsgId, confirmed, SSN)};
+ {MsgIdsN, maps:put(MsgId, confirmed, SSN)};
{ok, confirmed} ->
%% Well, confirms are racy by definition.
{[MsgId | MsgIdsN], SSN}
@@ -457,7 +457,7 @@ msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
info(backing_queue_status,
State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:info(backing_queue_status, BQS) ++
- [ {mirror_seen, dict:size(State #state.seen_status)},
+ [ {mirror_seen, maps:size(State #state.seen_status)},
{mirror_senders, sets:size(State #state.known_senders)} ];
info(Item, #state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:info(Item, BQS).
@@ -480,7 +480,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% it.
%% We will never see {published, ChPid, MsgSeqNo} here.
- case dict:find(MsgId, SS) of
+ case maps:find(MsgId, SS) of
error ->
%% We permit the underlying BQ to have a peek at it, but
%% only if we ourselves are not filtering out the msg.
@@ -494,7 +494,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% immediately after calling is_duplicate). The msg is
%% invalid. We will not see this again, nor will we be
%% further involved in confirming this message, so erase.
- {true, State #state { seen_status = dict:erase(MsgId, SS) }};
+ {true, State #state { seen_status = maps:remove(MsgId, SS) }};
{ok, Disposition}
when Disposition =:= confirmed
%% It got published when we were a slave via gm, and
@@ -509,7 +509,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% Message was discarded while we were a slave. Confirm now.
%% As above, amqqueue_process will have the entry for the
%% msg_id_to_channel mapping.
- {true, State #state { seen_status = dict:erase(MsgId, SS),
+ {true, State #state { seen_status = maps:remove(MsgId, SS),
confirmed = [MsgId | Confirmed] }}
end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 5c1af7f4aa..ee697be501 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -129,10 +129,10 @@ handle_go(Q = #amqqueue{name = QName}) ->
rate_timer_ref = undefined,
sync_timer_ref = undefined,
- sender_queues = dict:new(),
- msg_id_ack = dict:new(),
+ sender_queues = #{},
+ msg_id_ack = #{},
- msg_id_status = dict:new(),
+ msg_id_status = #{},
known_senders = pmon:new(delegate),
depth_delta = undefined
@@ -312,7 +312,7 @@ handle_cast({sync_start, Ref, Syncer},
State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State),
S = fun({MA, TRefN, BQSN}) ->
State1#state{depth_delta = undefined,
- msg_id_ack = dict:from_list(MA),
+ msg_id_ack = maps:from_list(MA),
rate_timer_ref = TRefN,
backing_queue_state = BQSN}
end,
@@ -548,7 +548,7 @@ send_or_record_confirm(published, #delivery { sender = ChPid,
id = MsgId,
is_persistent = true } },
MS, #state { q = #amqqueue { durable = true } }) ->
- dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS);
+ maps:put(MsgId, {published, ChPid, MsgSeqNo} , MS);
send_or_record_confirm(_Status, #delivery { sender = ChPid,
confirm = true,
msg_seq_no = MsgSeqNo },
@@ -561,7 +561,7 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
lists:foldl(
fun (MsgId, {CMsN, MSN} = Acc) ->
%% We will never see 'discarded' here
- case dict:find(MsgId, MSN) of
+ case maps:find(MsgId, MSN) of
error ->
%% If it needed confirming, it'll have
%% already been done.
@@ -569,12 +569,12 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
{ok, published} ->
%% Still not seen it from the channel, just
%% record that it's been confirmed.
- {CMsN, dict:store(MsgId, confirmed, MSN)};
+ {CMsN, maps:put(MsgId, confirmed, MSN)};
{ok, {published, ChPid, MsgSeqNo}} ->
%% Seen from both GM and Channel. Can now
%% confirm.
{rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN),
- dict:erase(MsgId, MSN)};
+ maps:remove(MsgId, MSN)};
{ok, confirmed} ->
%% It's already been confirmed. This is
%% probably it's been both sync'd to disk
@@ -674,21 +674,21 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% Master, or MTC in queue_process.
St = [published, confirmed, discarded],
- SS = dict:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS),
- AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)],
+ SS = maps:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS),
+ AckTags = [AckTag || {_MsgId, AckTag} <- maps:to_list(MA)],
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
QName, CPid, BQ, BQS, GM, AckTags, SS, MPids),
- MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
+ MTC = maps:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
(_Msgid, _Status, MTC0) ->
MTC0
end, gb_trees:empty(), MS),
Deliveries = [promote_delivery(Delivery) ||
- {_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ),
+ {_ChPid, {PubQ, _PendCh, _ChState}} <- maps:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
- AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)],
+ AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- maps:to_list(SQ)],
KS1 = lists:foldl(fun (ChPid0, KS0) ->
pmon:demonitor(ChPid0, KS0)
end, KS, AwaitGmDown),
@@ -800,20 +800,20 @@ forget_sender(Down1, Down2) when Down1 =/= Down2 -> true.
maybe_forget_sender(ChPid, ChState, State = #state { sender_queues = SQ,
msg_id_status = MS,
known_senders = KS }) ->
- case dict:find(ChPid, SQ) of
+ case maps:find(ChPid, SQ) of
error ->
State;
{ok, {MQ, PendCh, ChStateRecord}} ->
case forget_sender(ChState, ChStateRecord) of
true ->
credit_flow:peer_down(ChPid),
- State #state { sender_queues = dict:erase(ChPid, SQ),
+ State #state { sender_queues = maps:remove(ChPid, SQ),
msg_id_status = lists:foldl(
- fun dict:erase/2,
+ fun maps:remove/2,
MS, sets:to_list(PendCh)),
known_senders = pmon:demonitor(ChPid, KS) };
false ->
- SQ1 = dict:store(ChPid, {MQ, PendCh, ChState}, SQ),
+ SQ1 = maps:put(ChPid, {MQ, PendCh, ChState}, SQ),
State #state { sender_queues = SQ1 }
end
end.
@@ -825,32 +825,32 @@ maybe_enqueue_message(
send_mandatory(Delivery), %% must do this before confirms
State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
- case dict:find(MsgId, MS) of
+ case maps:find(MsgId, MS) of
error ->
{MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ),
MQ1 = queue:in(Delivery, MQ),
- SQ1 = dict:store(ChPid, {MQ1, PendingCh, ChState}, SQ),
+ SQ1 = maps:put(ChPid, {MQ1, PendingCh, ChState}, SQ),
State1 #state { sender_queues = SQ1 };
{ok, Status} ->
MS1 = send_or_record_confirm(
- Status, Delivery, dict:erase(MsgId, MS), State1),
+ Status, Delivery, maps:remove(MsgId, MS), State1),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { msg_id_status = MS1,
sender_queues = SQ1 }
end.
get_sender_queue(ChPid, SQ) ->
- case dict:find(ChPid, SQ) of
+ case maps:find(ChPid, SQ) of
error -> {queue:new(), sets:new(), running};
{ok, Val} -> Val
end.
remove_from_pending_ch(MsgId, ChPid, SQ) ->
- case dict:find(ChPid, SQ) of
+ case maps:find(ChPid, SQ) of
error ->
SQ;
{ok, {MQ, PendingCh, ChState}} ->
- dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState},
+ maps:put(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState},
SQ)
end.
@@ -867,7 +867,7 @@ publish_or_discard(Status, ChPid, MsgId,
case queue:out(MQ) of
{empty, _MQ2} ->
{MQ, sets:add_element(MsgId, PendingCh),
- dict:store(MsgId, Status, MS)};
+ maps:put(MsgId, Status, MS)};
{{value, Delivery = #delivery {
message = #basic_message { id = MsgId } }}, MQ2} ->
{MQ2, PendingCh,
@@ -882,7 +882,7 @@ publish_or_discard(Status, ChPid, MsgId,
%% expecting any confirms from us.
{MQ, PendingCh, MS}
end,
- SQ1 = dict:store(ChPid, {MQ1, PendingCh1, ChState}, SQ),
+ SQ1 = maps:put(ChPid, {MQ1, PendingCh1, ChState}, SQ),
State1 #state { sender_queues = SQ1, msg_id_status = MS1 }.
@@ -1004,9 +1004,9 @@ msg_ids_to_acktags(MsgIds, MA) ->
{AckTags, MA1} =
lists:foldl(
fun (MsgId, {Acc, MAN}) ->
- case dict:find(MsgId, MA) of
+ case maps:find(MsgId, MA) of
error -> {Acc, MAN};
- {ok, AckTag} -> {[AckTag | Acc], dict:erase(MsgId, MAN)}
+ {ok, AckTag} -> {[AckTag | Acc], maps:remove(MsgId, MAN)}
end
end, {[], MA}, MsgIds),
{lists:reverse(AckTags), MA1}.
@@ -1014,7 +1014,7 @@ msg_ids_to_acktags(MsgIds, MA) ->
maybe_store_ack(false, _MsgId, _AckTag, State) ->
State;
maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA }) ->
- State #state { msg_id_ack = dict:store(MsgId, AckTag, MA) }.
+ State #state { msg_id_ack = maps:put(MsgId, AckTag, MA) }.
set_delta(0, State = #state { depth_delta = undefined }) ->
ok = record_synchronised(State#state.q),
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 6d3b47a405..e48eb2b91f 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -119,6 +119,8 @@ init_from_config() ->
(Name, BadNames) when is_atom(Name) -> BadNames;
(Name, BadNames) -> [Name | BadNames]
end,
+ %% See rabbitmq/rabbitmq-server#1202 for details.
+ rabbit_peer_discovery:maybe_inject_randomized_delay(),
{DiscoveredNodes, NodeType} =
case rabbit_peer_discovery:discover_cluster_nodes() of
{ok, {Nodes, Type} = Config}
@@ -139,14 +141,17 @@ init_from_config() ->
{ok, _} ->
e(invalid_cluster_nodes_conf)
end,
- case DiscoveredNodes of
+ rabbit_log:info("All discovered existing cluster peers: ~p~n",
+ [rabbit_peer_discovery:format_discovered_nodes(DiscoveredNodes)]),
+ Peers = nodes_excl_me(DiscoveredNodes),
+ case Peers of
[] ->
rabbit_log:info("Discovered no peer nodes to cluster with"),
init_db_and_upgrade([node()], disc, false, _Retry = true);
_ ->
- rabbit_log:info("Discovered peer nodes: ~s~n",
- [rabbit_peer_discovery:format_discovered_nodes(DiscoveredNodes)]),
- join_discovered_peers(DiscoveredNodes, NodeType)
+ rabbit_log:info("Peer nodes we can cluster with: ~s~n",
+ [rabbit_peer_discovery:format_discovered_nodes(Peers)]),
+ join_discovered_peers(Peers, NodeType)
end.
%% Attempts to join discovered,
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 891cdf0236..be85091948 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1337,9 +1337,11 @@ update_pending_confirms(Fun, CRef,
record_pending_confirm(CRef, MsgId, State) ->
update_pending_confirms(
fun (_MsgOnDiskFun, CTM) ->
- maps:update_with(CRef,
- fun (MsgIds) -> gb_sets:add(MsgId, MsgIds) end,
- gb_sets:singleton(MsgId), CTM)
+ NewMsgIds = case maps:find(CRef, CTM) of
+ error -> gb_sets:singleton(MsgId);
+ {ok, MsgIds} -> gb_sets:add(MsgId, MsgIds)
+ end,
+ maps:put(CRef, NewMsgIds, CTM)
end, CRef, State).
client_confirm(CRef, MsgIds, ActionTaken, State) ->
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 6179ef95bb..728c9652d0 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -70,7 +70,7 @@ set_maximum_since_use(Pid, Age) ->
init([MsgStoreState]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
- {ok, #state { pending_no_readers = dict:new(),
+ {ok, #state { pending_no_readers = #{},
on_action = [],
msg_store_state = MsgStoreState }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -89,11 +89,11 @@ handle_cast({delete, File}, State) ->
handle_cast({no_readers, File},
State = #state { pending_no_readers = Pending }) ->
- {noreply, case dict:find(File, Pending) of
+ {noreply, case maps:find(File, Pending) of
error ->
State;
{ok, {Action, Files}} ->
- Pending1 = dict:erase(File, Pending),
+ Pending1 = maps:remove(File, Pending),
attempt_action(
Action, Files,
State #state { pending_no_readers = Pending1 })
@@ -123,7 +123,7 @@ attempt_action(Action, Files,
fun (Thunk) -> not Thunk() end,
[do_action(Action, Files, MsgStoreState) |
Thunks]) };
- [File | _] -> Pending1 = dict:store(File, {Action, Files}, Pending),
+ [File | _] -> Pending1 = maps:put(File, {Action, Files}, Pending),
State #state { pending_no_readers = Pending1 }
end.
diff --git a/src/rabbit_peer_discovery.erl b/src/rabbit_peer_discovery.erl
index 7cdb35f6f4..088f1c88fd 100644
--- a/src/rabbit_peer_discovery.erl
+++ b/src/rabbit_peer_discovery.erl
@@ -21,7 +21,9 @@
%%
-export([discover_cluster_nodes/0, backend/0, node_type/0,
- normalize/1, format_discovered_nodes/1, log_configured_backend/0]).
+ normalize/1, format_discovered_nodes/1, log_configured_backend/0,
+ register/0, unregister/0, maybe_register/0, maybe_unregister/0,
+ maybe_inject_randomized_delay/0]).
-export([append_node_prefix/1, node_prefix/0]).
-define(DEFAULT_BACKEND, rabbit_peer_discovery_classic_config).
@@ -30,6 +32,9 @@
-define(DEFAULT_NODE_TYPE, disc).
%% default node prefix to attach to discovered hostnames
-define(DEFAULT_PREFIX, "rabbit").
+%% default randomized delay range, in seconds
+-define(DEFAULT_STARTUP_RANDOMIZED_DELAY, {5, 60}).
+
-define(NODENAME_PART_SEPARATOR, "@").
@@ -72,6 +77,117 @@ discover_cluster_nodes() ->
normalize(Backend:list_nodes()).
+-spec maybe_register() -> ok.
+
+maybe_register() ->
+ Backend = backend(),
+ case Backend:supports_registration() of
+ true ->
+ register(),
+ Backend:post_registration();
+ false ->
+ rabbit_log:info("Peer discovery backend ~s does not support registration, skipping registration.", [Backend]),
+ ok
+ end.
+
+
+-spec maybe_unregister() -> ok.
+
+maybe_unregister() ->
+ Backend = backend(),
+ case Backend:supports_registration() of
+ true ->
+ unregister();
+ false ->
+ rabbit_log:info("Peer discovery backend ~s does not support registration, skipping unregistration.", [Backend]),
+ ok
+ end.
+
+
+-spec maybe_inject_randomized_delay() -> ok.
+maybe_inject_randomized_delay() ->
+ Backend = backend(),
+ case Backend:supports_registration() of
+ true ->
+ rabbit_log:info("Peer discovery backend ~s supports registration.", [Backend]),
+ inject_randomized_delay();
+ false ->
+ rabbit_log:info("Peer discovery backend ~s does not support registration, skipping randomized startup delay.", [Backend]),
+ ok
+ end.
+
+-spec inject_randomized_delay() -> ok.
+
+inject_randomized_delay() ->
+ {Min, Max} = case randomized_delay_range_in_ms() of
+ {A, B} -> {A, B};
+ [A, B] -> {A, B}
+ end,
+ case {Min, Max} of
+ %% When the max value is set to 0, consider the delay to be disabled.
+ %% In addition, `rand:uniform/1` will fail with a "no function clause"
+ %% when the argument is 0.
+ {_, 0} ->
+ rabbit_log:info("Randomized delay range's upper bound is set to 0. Considering it disabled."),
+ ok;
+ {_, N} when is_number(N) ->
+ rand:seed(exsplus),
+ RandomVal = rand:uniform(round(N)),
+ rabbit_log:debug("Randomized startup delay: configured range is from ~p to ~p milliseconds, PRNG pick: ~p...",
+ [Min, Max, RandomVal]),
+ Effective = case RandomVal < Min of
+ true -> Min;
+ false -> RandomVal
+ end,
+ rabbit_log:info("Will wait for ~p milliseconds before proceeding with regitration...", [Effective]),
+ timer:sleep(Effective),
+ ok
+ end.
+
+-spec randomized_delay_range_in_ms() -> {integer(), integer()}.
+
+randomized_delay_range_in_ms() ->
+ {Min, Max} = case application:get_env(rabbit, autocluster) of
+ {ok, Proplist} ->
+ proplists:get_value(randomized_startup_delay_range, Proplist, ?DEFAULT_STARTUP_RANDOMIZED_DELAY);
+ undefined ->
+ ?DEFAULT_STARTUP_RANDOMIZED_DELAY
+ end,
+ {Min * 1000, Max * 1000}.
+
+
+-spec register() -> ok.
+
+register() ->
+ Backend = backend(),
+ rabbit_log:info("Will register with peer discovery backend ~s", [Backend]),
+ case Backend:register() of
+ ok -> ok;
+ {error, Error} ->
+ rabbit_log:error("Failed to register with peer discovery backend ~s: ~p",
+ [Backend, Error]),
+ ok
+ end.
+
+
+-spec unregister() -> ok.
+
+unregister() ->
+ Backend = backend(),
+ rabbit_log:info("Will unregister with peer discovery backend ~s", [Backend]),
+ case Backend:unregister() of
+ ok -> ok;
+ {error, Error} ->
+ rabbit_log:error("Failed to unregister with peer discovery backend ~s: ~p",
+ [Backend, Error]),
+ ok
+ end.
+
+
+%%
+%% Implementation
+%%
+
-spec normalize(Nodes :: list() |
{Nodes :: list(), NodeType :: rabbit_types:node_type()} |
{ok, Nodes :: list()} |
@@ -90,7 +206,6 @@ normalize({ok, {Nodes, NodeType}}) when is_list(Nodes) andalso is_atom(NodeType)
normalize({error, Reason}) ->
{error, Reason}.
-
-spec format_discovered_nodes(Nodes :: list()) -> string().
format_discovered_nodes(Nodes) ->
diff --git a/src/rabbit_peer_discovery_classic_config.erl b/src/rabbit_peer_discovery_classic_config.erl
index 95e5532548..da5fb6c971 100644
--- a/src/rabbit_peer_discovery_classic_config.erl
+++ b/src/rabbit_peer_discovery_classic_config.erl
@@ -19,7 +19,8 @@
-include("rabbit.hrl").
--export([list_nodes/0, register/0, unregister/0]).
+-export([list_nodes/0, supports_registration/0, register/0, unregister/0,
+ post_registration/0]).
%%
%% API
@@ -34,6 +35,11 @@ list_nodes() ->
undefined -> {[], disc}
end.
+-spec supports_registration() -> boolean().
+
+supports_registration() ->
+ false.
+
-spec register() -> ok.
register() ->
@@ -43,3 +49,8 @@ register() ->
unregister() ->
ok.
+
+-spec post_registration() -> ok.
+
+post_registration() ->
+ ok.
diff --git a/src/rabbit_peer_discovery_dns.erl b/src/rabbit_peer_discovery_dns.erl
index c8f6a7f39a..e4215cd048 100644
--- a/src/rabbit_peer_discovery_dns.erl
+++ b/src/rabbit_peer_discovery_dns.erl
@@ -19,7 +19,8 @@
-include("rabbit.hrl").
--export([list_nodes/0, register/0, unregister/0]).
+-export([list_nodes/0, supports_registration/0, register/0, unregister/0,
+ post_registration/0]).
%% for tests
-export([discover_nodes/2, discover_hostnames/2]).
@@ -48,6 +49,13 @@ list_nodes() ->
end
end.
+
+-spec supports_registration() -> boolean().
+
+supports_registration() ->
+ false.
+
+
-spec register() -> ok.
register() ->
@@ -58,6 +66,11 @@ register() ->
unregister() ->
ok.
+-spec post_registration() -> ok.
+
+post_registration() ->
+ ok.
+
%%
%% Implementation
@@ -68,14 +81,25 @@ discover_nodes(SeedHostname, LongNamesUsed) ->
H <- discover_hostnames(SeedHostname, LongNamesUsed)].
discover_hostnames(SeedHostname, LongNamesUsed) ->
- %% TODO: IPv6 support
- IPs = inet_res:lookup(SeedHostname, in, a),
- rabbit_log:info("Addresses discovered via A records of ~s: ~s",
- [SeedHostname, string:join([inet_parse:ntoa(IP) || IP <- IPs], ", ")]),
+ lookup(SeedHostname, LongNamesUsed, ipv4) ++
+ lookup(SeedHostname, LongNamesUsed, ipv6).
+
+decode_record(ipv4) ->
+ a;
+decode_record(ipv6) ->
+ aaaa.
+
+lookup(SeedHostname, LongNamesUsed, IPv) ->
+ IPs = inet_res:lookup(SeedHostname, in, decode_record(IPv)),
+ rabbit_log:info("Addresses discovered via ~s records of ~s: ~s",
+ [string:to_upper(atom_to_list(decode_record(IPv))),
+ SeedHostname,
+ string:join([inet_parse:ntoa(IP) || IP <- IPs], ", ")]),
Hosts = [extract_host(inet:gethostbyaddr(A), LongNamesUsed, A) ||
- A <- IPs],
+ A <- IPs],
lists:filter(fun(E) -> E =/= error end, Hosts).
+
%% long node names are used
extract_host({ok, {hostent, FQDN, _, _, _, _}}, true, _Address) ->
FQDN;
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 4e9715fba2..f13a46fcf3 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -64,7 +64,8 @@
-spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'.
-spec inactive(state()) -> boolean().
-spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(),
- non_neg_integer(), rabbit_framing:amqp_table()}].
+ non_neg_integer(), rabbit_framing:amqp_table(),
+ rabbit_types:username()}].
-spec count() -> non_neg_integer().
-spec unacknowledged_message_count() -> non_neg_integer().
-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
@@ -280,7 +281,7 @@ subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) ->
orddict:update_counter(CTag, 1, CTagCounts), QTail);
{{value, V}, QTail} ->
subtract_acks(AckTags, [V | Prefix], CTagCounts, QTail);
- {empty, _} ->
+ {empty, _} ->
subtract_acks([], Prefix, CTagCounts, AckQ)
end.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 3c4f23a416..a71eaf1ff4 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -83,7 +83,7 @@
%% contains a mapping from segment numbers to state-per-segment (this
%% state is held for all segments which have been "seen": thus a
%% segment which has been read but has no pending entries in the
-%% journal is still held in this mapping. Also note that a dict is
+%% journal is still held in this mapping. Also note that a map is
%% used for this mapping, not an array because with an array, you will
%% always have entries from 0). Actions are stored directly in this
%% state. Thus at the point of flushing the journal, firstly no
@@ -236,10 +236,10 @@
unacked :: non_neg_integer()
}).
-type seq_id() :: integer().
--type seg_dict() :: {dict:dict(), [segment()]}.
+-type seg_map() :: {map(), [segment()]}.
-type on_sync_fun() :: fun ((gb_sets:set()) -> ok).
-type qistate() :: #qistate { dir :: file:filename(),
- segments :: 'undefined' | seg_dict(),
+ segments :: 'undefined' | seg_map(),
journal_handle :: hdl(),
dirty_count :: integer(),
max_journal_entries :: non_neg_integer(),
@@ -1027,7 +1027,7 @@ segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_]}) ->
segment_find(Seg, {_Segments, [_, Segment = #segment { num = Seg }]}) ->
{ok, Segment}; %% 2, matches tail
segment_find(Seg, {Segments, _}) -> %% no match
- dict:find(Seg, Segments).
+ maps:find(Seg, Segments).
segment_store(Segment = #segment { num = Seg }, %% 1 or (2, matches head)
{Segments, [#segment { num = Seg } | Tail]}) ->
@@ -1036,28 +1036,28 @@ segment_store(Segment = #segment { num = Seg }, %% 2, matches tail
{Segments, [SegmentA, #segment { num = Seg }]}) ->
{Segments, [Segment, SegmentA]};
segment_store(Segment = #segment { num = Seg }, {Segments, []}) ->
- {dict:erase(Seg, Segments), [Segment]};
+ {maps:remove(Seg, Segments), [Segment]};
segment_store(Segment = #segment { num = Seg }, {Segments, [SegmentA]}) ->
- {dict:erase(Seg, Segments), [Segment, SegmentA]};
+ {maps:remove(Seg, Segments), [Segment, SegmentA]};
segment_store(Segment = #segment { num = Seg },
{Segments, [SegmentA, SegmentB]}) ->
- {dict:store(SegmentB#segment.num, SegmentB, dict:erase(Seg, Segments)),
+ {maps:put(SegmentB#segment.num, SegmentB, maps:remove(Seg, Segments)),
[Segment, SegmentA]}.
segment_fold(Fun, Acc, {Segments, CachedSegments}) ->
- dict:fold(fun (_Seg, Segment, Acc1) -> Fun(Segment, Acc1) end,
+ maps:fold(fun (_Seg, Segment, Acc1) -> Fun(Segment, Acc1) end,
lists:foldl(Fun, Acc, CachedSegments), Segments).
segment_map(Fun, {Segments, CachedSegments}) ->
- {dict:map(fun (_Seg, Segment) -> Fun(Segment) end, Segments),
+ {maps:map(fun (_Seg, Segment) -> Fun(Segment) end, Segments),
lists:map(Fun, CachedSegments)}.
segment_nums({Segments, CachedSegments}) ->
lists:map(fun (#segment { num = Num }) -> Num end, CachedSegments) ++
- dict:fetch_keys(Segments).
+ maps:keys(Segments).
segments_new() ->
- {dict:new(), []}.
+ {#{}, []}.
entry_to_segment(_RelSeq, {?PUB, del, ack}, Initial) ->
Initial;
diff --git a/test/peer_discovery_dns_SUITE.erl b/test/peer_discovery_dns_SUITE.erl
index 687569890f..7deb38ba97 100644
--- a/test/peer_discovery_dns_SUITE.erl
+++ b/test/peer_discovery_dns_SUITE.erl
@@ -33,7 +33,8 @@ groups() ->
hostname_discovery_with_long_node_names,
hostname_discovery_with_short_node_names,
node_discovery_with_long_node_names,
- node_discovery_with_short_node_names
+ node_discovery_with_short_node_names,
+ test_aaaa_record_hostname_discovery
]}
].
@@ -54,7 +55,9 @@ suite() ->
%% * One does not resolve to a [typically] non-reachable IP
%% * One does not support reverse lookup queries
--define(DISCOVERY_ENDPOINT, "peer_discovery.tests.rabbitmq.net").
+-define(DISCOVERY_ENDPOINT_RECORD_A, "peer_discovery.tests.rabbitmq.net").
+
+-define(DISCOVERY_ENDPOINT_RECORD_AAAA, "www.v6.facebook.com").
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
@@ -63,17 +66,26 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_testcase(test_aaaa_record, Config) ->
+ case inet_res:lookup(?DISCOVERY_ENDPOINT_RECORD_AAAA, in, aaaa) of
+ [] ->
+ {skip, "pre-configured AAAA record does not resolve, skipping"};
+ [_ | _] ->
+ Config
+ end;
+
init_per_testcase(_Testcase, Config) ->
- %% TODO: support IPv6-only environments
- case inet_res:lookup(?DISCOVERY_ENDPOINT, in, a) of
+ case inet_res:lookup(?DISCOVERY_ENDPOINT_RECORD_A, in, a) of
[] ->
{skip, "pre-configured *.rabbitmq.net record does not resolve, skipping"};
[_ | _] ->
Config
end.
+
end_per_testcase(_Testcase, Config) ->
- case inet_res:lookup(?DISCOVERY_ENDPOINT, in, a) of
+ case inet_res:lookup(?DISCOVERY_ENDPOINT_RECORD_A, in, a) of
[] ->
{skip, "pre-configured *.rabbitmq.net record does not resolve, skipping"};
[_ | _] ->
@@ -85,18 +97,22 @@ end_per_testcase(_Testcase, Config) ->
%% Test cases
%% -------------------------------------------------------------------
+test_aaaa_record_hostname_discovery(_) ->
+ Result = rabbit_peer_discovery_dns:discover_hostnames(?DISCOVERY_ENDPOINT_RECORD_AAAA, true),
+ ?assert(string:str(lists:flatten(Result), "facebook.com") > 0).
+
hostname_discovery_with_long_node_names(_) ->
- Result = rabbit_peer_discovery_dns:discover_hostnames(?DISCOVERY_ENDPOINT, true),
+ Result = rabbit_peer_discovery_dns:discover_hostnames(?DISCOVERY_ENDPOINT_RECORD_A, true),
?assert(lists:member("www.rabbitmq.com", Result)).
hostname_discovery_with_short_node_names(_) ->
- Result = rabbit_peer_discovery_dns:discover_hostnames(?DISCOVERY_ENDPOINT, false),
+ Result = rabbit_peer_discovery_dns:discover_hostnames(?DISCOVERY_ENDPOINT_RECORD_A, false),
?assert(lists:member("www", Result)).
node_discovery_with_long_node_names(_) ->
- Result = rabbit_peer_discovery_dns:discover_nodes(?DISCOVERY_ENDPOINT, true),
+ Result = rabbit_peer_discovery_dns:discover_nodes(?DISCOVERY_ENDPOINT_RECORD_A, true),
?assert(lists:member('ct_rabbit@www.rabbitmq.com', Result)).
node_discovery_with_short_node_names(_) ->
- Result = rabbit_peer_discovery_dns:discover_nodes(?DISCOVERY_ENDPOINT, false),
+ Result = rabbit_peer_discovery_dns:discover_nodes(?DISCOVERY_ENDPOINT_RECORD_A, false),
?assert(lists:member(ct_rabbit@www, Result)).
diff --git a/test/rabbitmqctl_shutdown_SUITE.erl b/test/rabbitmqctl_shutdown_SUITE.erl
index fa04dc6504..25cc2fdef8 100644
--- a/test/rabbitmqctl_shutdown_SUITE.erl
+++ b/test/rabbitmqctl_shutdown_SUITE.erl
@@ -29,8 +29,7 @@ all() ->
groups() ->
[
{running_node, [], [
- successful_shutdown,
- error_during_shutdown
+ successful_shutdown
]},
{non_running_node, [], [
nothing_to_shutdown
@@ -91,16 +90,6 @@ successful_shutdown(Config) ->
false = erlang_pid_is_running(Pid),
false = node_is_running(Node).
-error_during_shutdown(Config) ->
- Node = ?config(node, Config),
- ok = rabbit_ct_broker_helpers:control_action(stop_app, Node, []),
- ok = rpc:call(Node, application, unload, [os_mon]),
-
- {badrpc,
- {'EXIT', {
- {error, {badmatch, {error,{edge,{bad_vertex,os_mon},os_mon,rabbit}}}},
- _}}} = shutdown_error(Node).
-
nothing_to_shutdown(Config) ->
Node = ?config(node, Config),
@@ -119,14 +108,6 @@ erlang_pid_is_running(Pid) ->
node_is_running(Node) ->
net_adm:ping(Node) == pong.
-shutdown_error(Node) ->
- %% Start a command
- {stream, Stream} = rabbit_ct_broker_helpers:control_action(shutdown, Node, []),
- %% Execute command steps. The last one should be error
- Lines = 'Elixir.Enum':to_list(Stream),
- {error, Err} = lists:last(Lines),
- Err.
-
shutdown_ok(Node) ->
%% Start a command
{stream, Stream} = rabbit_ct_broker_helpers:control_action(shutdown, Node, []),
diff --git a/test/unit_inbroker_non_parallel_SUITE.erl b/test/unit_inbroker_non_parallel_SUITE.erl
index 93f107de6b..2af6368f34 100644
--- a/test/unit_inbroker_non_parallel_SUITE.erl
+++ b/test/unit_inbroker_non_parallel_SUITE.erl
@@ -35,6 +35,7 @@ groups() ->
app_management, %% Restart RabbitMQ.
channel_statistics, %% Expect specific statistics.
disk_monitor, %% Replace rabbit_misc module.
+ disk_monitor_enable,
file_handle_cache, %% Change FHC limit.
head_message_timestamp_statistics, %% Expect specific statistics.
log_management, %% Check log files.
@@ -631,6 +632,37 @@ disk_monitor1(_Config) ->
meck:unload(rabbit_misc),
passed.
+disk_monitor_enable(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, disk_monitor_enable1, [Config]).
+
+disk_monitor_enable1(_Config) ->
+ case os:type() of
+ {unix, _} ->
+ disk_monitor_enable1();
+ _ ->
+ %% skip windows testing
+ skipped
+ end.
+
+disk_monitor_enable1() ->
+ ok = meck:new(rabbit_misc, [passthrough]),
+ ok = meck:expect(rabbit_misc, os_cmd, fun(_) -> "\n" end),
+ application:set_env(rabbit, disk_monitor_failure_retries, 20000),
+ application:set_env(rabbit, disk_monitor_failure_retry_interval, 100),
+ ok = rabbit_sup:stop_child(rabbit_disk_monitor_sup),
+ ok = rabbit_sup:start_delayed_restartable_child(rabbit_disk_monitor, [1000]),
+ undefined = rabbit_disk_monitor:get_disk_free(),
+ Cmd = "Filesystem 1024-blocks Used Available Capacity iused ifree %iused Mounted on\n/dev/disk1 975798272 234783364 740758908 25% 58759839 185189727 24% /\n",
+ ok = meck:expect(rabbit_misc, os_cmd, fun(_) -> Cmd end),
+ timer:sleep(1000),
+ Bytes = 740758908 * 1024,
+ Bytes = rabbit_disk_monitor:get_disk_free(),
+ meck:unload(rabbit_misc),
+ application:set_env(rabbit, disk_monitor_failure_retries, 10),
+ application:set_env(rabbit, disk_monitor_failure_retry_interval, 120000),
+ passed.
+
%% ---------------------------------------------------------------------------
%% rabbitmqctl helpers.
%% ---------------------------------------------------------------------------