diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2017-05-08 18:01:10 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2017-05-08 18:01:10 +0300 |
| commit | 7132664520b09cf6e15dc95e91ce73eedd4407e4 (patch) | |
| tree | 34d0b51dabde73dbde00e2b928e4b52450eaaee8 | |
| parent | d6fa2093375ac295b765261e34213de4a61960b9 (diff) | |
| parent | 0bd4b78b3540c65648563714239d2a270c9816a4 (diff) | |
| download | rabbitmq-server-git-7132664520b09cf6e15dc95e91ce73eedd4407e4.tar.gz | |
Merge branch 'master' into rabbitmq-server-1146-full
Conflicts:
Makefile
| -rw-r--r-- | Makefile | 4 | ||||
| -rw-r--r-- | erlang.mk | 72 | ||||
| -rw-r--r-- | priv/schema/rabbitmq.schema | 7 | ||||
| -rw-r--r-- | scripts/rabbitmq-env.bat | 7 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server | 41 | ||||
| -rw-r--r-- | src/rabbit.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_config.erl | 38 | ||||
| -rw-r--r-- | src/rabbit_disk_monitor.erl | 49 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 56 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_peer_discovery.erl | 119 | ||||
| -rw-r--r-- | src/rabbit_peer_discovery_classic_config.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_peer_discovery_dns.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 22 | ||||
| -rw-r--r-- | test/peer_discovery_dns_SUITE.erl | 34 | ||||
| -rw-r--r-- | test/rabbitmqctl_shutdown_SUITE.erl | 21 | ||||
| -rw-r--r-- | test/unit_inbroker_non_parallel_SUITE.erl | 32 |
22 files changed, 455 insertions, 174 deletions
@@ -116,6 +116,8 @@ define PROJECT_ENV {background_gc_target_interval, 60000}, %% rabbitmq-server-589 {proxy_protocol, false}, + {disk_monitor_failure_retries, 10}, + {disk_monitor_failure_retry_interval, 120000}, %% can be stop_rabbit or give_up see rabbitmq-server-1458 {vhost_restart_strategy, stop_rabbit} ] @@ -224,6 +226,8 @@ clean-escripts: opt='--stringparam man.indent.verbatims=0' ; \ xsltproc --novalid $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \ xmlto -vv -o $(DOCS_DIR) $$opt man $< 2>&1 | (grep -v '^Note: Writing' || :) && \ + awk -F"'u " '/^\.HP / { print $$1; print $$2; next; } { print; }' "$@" > "$@.tmp" && \ + mv "$@.tmp" "$@" && \ test -f $@ && \ rm $<.tmp @@ -16,7 +16,7 @@ ERLANG_MK_FILENAME := $(realpath $(lastword $(MAKEFILE_LIST))) -ERLANG_MK_VERSION = 2.0.0-pre.2-207-g9e9b7d2 +ERLANG_MK_VERSION = 2.0.0-pre.2-220-g7a200f5 # Make 3.81 and 3.82 are deprecated. @@ -4203,17 +4203,16 @@ endif # in practice only Makefile is needed so far. define dep_autopatch if [ -f $(DEPS_DIR)/$(1)/erlang.mk ]; then \ + rm -rf $(DEPS_DIR)/$1/ebin/; \ $(call erlang,$(call dep_autopatch_appsrc.erl,$(1))); \ $(call dep_autopatch_erlang_mk,$(1)); \ elif [ -f $(DEPS_DIR)/$(1)/Makefile ]; then \ if [ 0 != `grep -c "include ../\w*\.mk" $(DEPS_DIR)/$(1)/Makefile` ]; then \ $(call dep_autopatch2,$(1)); \ - elif [ 0 != `grep -ci rebar $(DEPS_DIR)/$(1)/Makefile` ]; then \ + elif [ 0 != `grep -ci "^[^#].*rebar" $(DEPS_DIR)/$(1)/Makefile` ]; then \ $(call dep_autopatch2,$(1)); \ - elif [ -n "`find $(DEPS_DIR)/$(1)/ -type f -name \*.mk -not -name erlang.mk -exec grep -i rebar '{}' \;`" ]; then \ + elif [ -n "`find $(DEPS_DIR)/$(1)/ -type f -name \*.mk -not -name erlang.mk -exec grep -i "^[^#].*rebar" '{}' \;`" ]; then \ $(call dep_autopatch2,$(1)); \ - else \ - $(call erlang,$(call dep_autopatch_app.erl,$(1))); \ fi \ else \ if [ ! -d $(DEPS_DIR)/$(1)/src/ ]; then \ @@ -4225,6 +4224,8 @@ define dep_autopatch endef define dep_autopatch2 + mv -n $(DEPS_DIR)/$1/ebin/$1.app $(DEPS_DIR)/$1/src/$1.app.src; \ + rm -f $(DEPS_DIR)/$1/ebin/$1.app; \ if [ -f $(DEPS_DIR)/$1/src/$1.app.src.script ]; then \ $(call erlang,$(call dep_autopatch_appsrc_script.erl,$(1))); \ fi; \ @@ -4536,22 +4537,6 @@ define dep_autopatch_rebar.erl halt() endef -define dep_autopatch_app.erl - UpdateModules = fun(App) -> - case filelib:is_regular(App) of - false -> ok; - true -> - {ok, [{application, '$(1)', L0}]} = file:consult(App), - Mods = filelib:fold_files("$(call core_native_path,$(DEPS_DIR)/$1/src)", "\\\\.erl$$", true, - fun (F, Acc) -> [list_to_atom(filename:rootname(filename:basename(F)))|Acc] end, []), - L = lists:keystore(modules, 1, L0, {modules, Mods}), - ok = file:write_file(App, io_lib:format("~p.~n", [{application, '$(1)', L}])) - end - end, - UpdateModules("$(call core_native_path,$(DEPS_DIR)/$1/ebin/$1.app)"), - halt() -endef - define dep_autopatch_appsrc_script.erl AppSrc = "$(call core_native_path,$(DEPS_DIR)/$1/src/$1.app.src)", AppSrcScript = AppSrc ++ ".script", @@ -4828,6 +4813,8 @@ COMPILE_FIRST_PATHS = $(addprefix src/,$(addsuffix .erl,$(COMPILE_FIRST))) ERLC_EXCLUDE ?= ERLC_EXCLUDE_PATHS = $(addprefix src/,$(addsuffix .erl,$(ERLC_EXCLUDE))) +ERLC_ASN1_OPTS ?= + ERLC_MIB_OPTS ?= COMPILE_MIB_FIRST ?= COMPILE_MIB_FIRST_PATHS = $(addprefix mibs/,$(addsuffix .mib,$(COMPILE_MIB_FIRST))) @@ -4877,7 +4864,7 @@ endif ifeq ($(wildcard src/$(PROJECT_MOD).erl),) define app_file -{application, $(PROJECT), [ +{application, '$(PROJECT)', [ {description, "$(PROJECT_DESCRIPTION)"}, {vsn, "$(PROJECT_VERSION)"},$(if $(IS_DEP), {id$(comma)$(space)"$(1)"}$(comma)) @@ -4889,7 +4876,7 @@ define app_file endef else define app_file -{application, $(PROJECT), [ +{application, '$(PROJECT)', [ {description, "$(PROJECT_DESCRIPTION)"}, {vsn, "$(PROJECT_VERSION)"},$(if $(IS_DEP), {id$(comma)$(space)"$(1)"}$(comma)) @@ -4920,7 +4907,7 @@ ERL_FILES += $(addprefix src/,$(patsubst %.asn1,%.erl,$(notdir $(ASN1_FILES)))) define compile_asn1 $(verbose) mkdir -p include/ - $(asn1_verbose) erlc -v -I include/ -o asn1/ +noobj $(1) + $(asn1_verbose) erlc -v -I include/ -o asn1/ +noobj $(ERLC_ASN1_OPTS) $(1) $(verbose) mv asn1/*.erl src/ $(verbose) mv asn1/*.hrl include/ $(verbose) mv asn1/*.asn1db include/ @@ -5052,7 +5039,7 @@ $(ERL_FILES) $(CORE_FILES) $(ASN1_FILES) $(MIB_FILES) $(XRL_FILES) $(YRL_FILES): ebin/$(PROJECT).app:: $(ERLANG_MK_TMP)/last-makefile-change endif --include $(PROJECT).d +include $(wildcard $(PROJECT).d) ebin/$(PROJECT).app:: ebin/ @@ -5277,6 +5264,7 @@ MAN_VERSION ?= $(PROJECT_VERSION) define asciidoc2man.erl try [begin + io:format(" ADOC ~s~n", [F]), ok = asciideck:to_manpage(asciideck:parse_file(F), #{ compress => gzip, outdir => filename:dirname(F), @@ -5285,7 +5273,8 @@ try }) end || F <- [$(shell echo $(addprefix $(comma)\",$(addsuffix \",$1)) | sed 's/^.//')]], halt(0) -catch _:_ -> +catch C:E -> + io:format("Exception ~p:~p~nStacktrace: ~p~n", [C, E, erlang:get_stacktrace()]), halt(1) end. endef @@ -6123,6 +6112,7 @@ CT_SUITES := $(sort $(subst _SUITE.erl,,$(notdir $(call core_find,$(TEST_DIR)/,* endif endif CT_SUITES ?= +CT_LOGS_DIR ?= $(CURDIR)/logs # Core targets. @@ -6145,13 +6135,13 @@ CT_RUN = ct_run \ -noinput \ -pa $(CURDIR)/ebin $(DEPS_DIR)/*/ebin $(APPS_DIR)/*/ebin $(TEST_DIR) \ -dir $(TEST_DIR) \ - -logdir $(CURDIR)/logs + -logdir $(CT_LOGS_DIR) ifeq ($(CT_SUITES),) ct: $(if $(IS_APP),,apps-ct) else ct: test-build $(if $(IS_APP),,apps-ct) - $(verbose) mkdir -p $(CURDIR)/logs/ + $(verbose) mkdir -p $(CT_LOGS_DIR) $(gen_verbose) $(CT_RUN) -sname ct_$(PROJECT) -suite $(addsuffix _SUITE,$(CT_SUITES)) $(CT_OPTS) endif @@ -6179,14 +6169,14 @@ endif define ct_suite_target ct-$(1): test-build - $(verbose) mkdir -p $(CURDIR)/logs/ + $(verbose) mkdir -p $(CT_LOGS_DIR) $(gen_verbose) $(CT_RUN) -sname ct_$(PROJECT) -suite $(addsuffix _SUITE,$(1)) $(CT_EXTRA) $(CT_OPTS) endef $(foreach test,$(CT_SUITES),$(eval $(call ct_suite_target,$(test)))) distclean-ct: - $(gen_verbose) rm -rf $(CURDIR)/logs/ + $(gen_verbose) rm -rf $(CT_LOGS_DIR) # Copyright (c) 2013-2016, Loïc Hoguin <essen@ninenines.eu> # This file is part of erlang.mk and subject to the terms of the ISC License. @@ -6232,8 +6222,10 @@ define filter_opts.erl endef $(DIALYZER_PLT): deps app - $(verbose) dialyzer --build_plt --apps erts kernel stdlib $(PLT_APPS) $(OTP_DEPS) $(LOCAL_DEPS) \ - `test -f $(ERLANG_MK_TMP)/deps.log && cat $(ERLANG_MK_TMP)/deps.log` + $(eval DEPS_LOG := $(shell test -f $(ERLANG_MK_TMP)/deps.log && \ + while read p; do test -d $$p/ebin && echo $$p/ebin; done <$(ERLANG_MK_TMP)/deps.log)) + $(verbose) dialyzer --build_plt --apps erts kernel stdlib \ + $(PLT_APPS) $(OTP_DEPS) $(LOCAL_DEPS) $(DEPS_LOG) plt: $(DIALYZER_PLT) @@ -6319,7 +6311,7 @@ escript:: escript-zip $(verbose) chmod +x $(ESCRIPT_FILE) distclean-escript: - $(gen_verbose) rm -f $(ESCRIPT_NAME) + $(gen_verbose) rm -f $(ESCRIPT_FILE) # Copyright (c) 2015-2016, Loïc Hoguin <essen@ninenines.eu> # Copyright (c) 2014, Enrique Fernandez <enrique.fernandez@erlang-solutions.com> @@ -6495,6 +6487,20 @@ build-shell-deps: $(ALL_SHELL_DEPS_DIRS) shell: build-shell-deps $(gen_verbose) $(SHELL_ERL) -pa $(SHELL_PATHS) $(SHELL_OPTS) +# Copyright (c) 2017, Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> +# This file is contributed to erlang.mk and subject to the terms of the ISC License. + +.PHONY: show-ERL_LIBS show-ERLC_OPTS show-TEST_ERLC_OPTS + +show-ERL_LIBS: + @echo $(ERL_LIBS) + +show-ERLC_OPTS: + @$(foreach opt,$(ERLC_OPTS) -pa ebin -I include,echo "$(opt)";) + +show-TEST_ERLC_OPTS: + @$(foreach opt,$(TEST_ERLC_OPTS) -pa ebin -I include,echo "$(opt)";) + # Copyright (c) 2015-2016, Loïc Hoguin <essen@ninenines.eu> # This file is part of erlang.mk and subject to the terms of the ISC License. diff --git a/priv/schema/rabbitmq.schema b/priv/schema/rabbitmq.schema index 6f3f42e4ed..fab07baeb4 100644 --- a/priv/schema/rabbitmq.schema +++ b/priv/schema/rabbitmq.schema @@ -999,7 +999,7 @@ fun(Conf) -> [{lager_console_backend, ConsoleLevel}]; false -> [] end, - FileHandler = case cuttlefish:conf_get("log.file", Conf, false) of + FileHandler = case cuttlefish:conf_get("log.file", Conf, undefined) of false -> []; File -> FileLevel = cuttlefish:conf_get("log.file.level", Conf, info), @@ -1010,6 +1010,11 @@ fun(Conf) -> {level, FileLevel}, {date, RotationDate}, {size, RotationSize}, + {formatter_config, + [date, " ", time, " ", color, + "[", severity, "] ", + {pid, ""}, + " ", message, "\n"]}, {count, RotationCount}]}] end, SyslogHandler = case cuttlefish:conf_get("log.syslog", Conf, false) of diff --git a/scripts/rabbitmq-env.bat b/scripts/rabbitmq-env.bat index 56b2f69b2d..9d7a724318 100644 --- a/scripts/rabbitmq-env.bat +++ b/scripts/rabbitmq-env.bat @@ -417,7 +417,12 @@ set paths= exit /b
:filter_path
-set ERL_LIBS=%ERL_LIBS%;%~dps1%~n1%~x1
+REM Ensure ERL_LIBS begins with valid path
+IF [%ERL_LIBS%] EQU [] (
+ set ERL_LIBS=%~dps1%~n1%~x1
+) else (
+ set ERL_LIBS=%ERL_LIBS%;%~dps1%~n1%~x1
+)
exit /b
:filter_paths_done
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 41d1a81332..a318b55d75 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -1,4 +1,4 @@ -#!/bin/sh -e +#!/bin/sh ## The contents of this file are subject to the Mozilla Public License ## Version 1.1 (the "License"); you may not use this file except in ## compliance with the License. You may obtain a copy of the License @@ -12,9 +12,11 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is GoPivotal, Inc. -## Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. +## Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. ## +set -e + # Get default settings with user overrides for (RABBITMQ_)<var_name> # Non-empty defaults should be set in rabbitmq-env . `dirname $0`/rabbitmq-env @@ -278,21 +280,38 @@ else # The Erlang VM should ignore SIGINT. RABBITMQ_SERVER_START_ARGS="${RABBITMQ_SERVER_START_ARGS} ${RABBITMQ_IGNORE_SIGINT_FLAG}" - # Signal handlers. They all stop RabbitMQ properly (using - # rabbitmqctl stop). Depending on the signal, this script will exit - # with a non-zero error code: + # Signal handlers. They all stop RabbitMQ properly, using + # rabbitmqctl stop. This script will exit with different exit codes: # SIGHUP SIGTERM SIGTSTP - # They are considered a normal process termination, so the script - # exits with 0. + # Exits 0 since this is considered a normal process termination. # SIGINT - # They are considered an abnormal process termination, the script - # exits with the job exit code. + # Exits 128 + $signal_number where $signal_number is 2 for SIGINT (see + # http://pubs.opengroup.org/onlinepubs/009695399/utilities/kill.html). + # This is considered an abnormal process termination. Normally, we + # don't need to specify this exit code because the shell propagates it. + # Unfortunately, the signal handler doesn't work as expected in Dash, + # thus we need to explicitely restate the exit code. trap "stop_rabbitmq_server; exit 0" HUP TERM TSTP - trap "stop_rabbitmq_server" INT + trap "stop_rabbitmq_server; exit 130" INT start_rabbitmq_server "$@" & + rabbitmq_server_pid=$! # Block until RabbitMQ exits or a signal is caught. # Waits for last command (which is start_rabbitmq_server) - wait $! + # + # The "|| true" is here to work around an issue with Dash. Normally + # in a Bourne shell, if `wait` is interrupted by a signal, the + # signal handlers defined above are executed and the script + # terminates with the exit code of `wait` (unless the signal handler + # overrides that). + # In the case of Dash, it looks like `set -e` (set at the beginning + # of this script) gets precedence over signal handling. Therefore, + # when `wait` is interrupted, its exit code is non-zero and because + # of `set -e`, the script terminates immediately without running the + # signal handler. To work around this issue, we use "|| true" to + # force that statement to succeed and the signal handler to properly + # execute. Because the statement below has an exit code of 0, the + # signal handler has to restate the expected exit code. + wait $rabbitmq_server_pid || true fi diff --git a/src/rabbit.erl b/src/rabbit.erl index b63d624293..8e6c9ead26 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -22,7 +22,7 @@ stop_and_halt/0, await_startup/0, status/0, is_running/0, alarms/0, is_running/1, environment/0, rotate_logs/0, force_event_refresh/1, start_fhc/0]). --export([start/2, stop/1]). +-export([start/2, stop/1, prep_stop/1]). -export([start_apps/1, stop_apps/1]). -export([log_locations/0, config_files/0, decrypt_config/2]). %% for testing and mgmt-agent @@ -327,7 +327,11 @@ broker_start() -> ToBeLoaded = Plugins ++ ?APPS, start_apps(ToBeLoaded), maybe_sd_notify(), - ok = log_broker_started(rabbit_plugins:strictly_plugins(rabbit_plugins:active())). + ok = log_broker_started(rabbit_plugins:strictly_plugins(rabbit_plugins:active())), + %% See rabbitmq/rabbitmq-server#1202 for details. + rabbit_peer_discovery:maybe_inject_randomized_delay(), + rabbit_peer_discovery:maybe_register(), + ok. %% Try to send systemd ready notification if it makes sense in the %% current environment. standard_error is used intentionally in all @@ -471,6 +475,8 @@ stop() -> end, rabbit_log:info("RabbitMQ is asked to stop...~n", []), Apps = ?APPS ++ rabbit_plugins:active(), + %% this will also perform unregistration with the peer discovery backend + %% as needed stop_apps(app_utils:app_dependency_order(Apps, true)), rabbit_log:info("Successfully stopped RabbitMQ and its dependencies~n", []). @@ -759,6 +765,10 @@ start(normal, []) -> Error end. +prep_stop(State) -> + rabbit_peer_discovery:maybe_unregister(), + State. + stop(_State) -> ok = rabbit_alarm:stop(), ok = case rabbit_mnesia:is_clustered() of diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e9f91041e7..c52d329392 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -99,7 +99,7 @@ -spec info_keys() -> rabbit_types:info_keys(). -spec init_with_backing_queue_state (rabbit_types:amqqueue(), atom(), tuple(), any(), - [rabbit_types:delivery()], pmon:pmon(), dict:dict()) -> + [rabbit_types:delivery()], pmon:pmon(), gb_trees:tree()) -> #q{}. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_config.erl b/src/rabbit_config.erl index 67e7523ec0..9e70898c82 100644 --- a/src/rabbit_config.erl +++ b/src/rabbit_config.erl @@ -33,10 +33,10 @@ prepare_and_use_config() -> legacy_erlang_term_config_used() -> case init:get_argument(config) of error -> false; - {ok, [Config | _]} -> + {ok, [Config | _]} -> ConfigFile = Config ++ ".config", - rabbit_file:is_file(ConfigFile) - andalso + rabbit_file:is_file(ConfigFile) + andalso get_advanced_config() == none end. @@ -66,10 +66,34 @@ prepare_config(Configs) -> end. update_app_config(ConfigFile) -> - ok = application_controller:change_application_data([], [ConfigFile]). + RunningApps = [ App || {App, _, _} <- application:which_applications() ], + LoadedApps = [ App || {App, _, _} <- application:loaded_applications() ], + {ok, [Config]} = file:consult(ConfigFile), + %% For application config to be updated, applications should + %% be unloaded first. + %% If an application is already running, print an error. + lists:foreach(fun({App, _Config}) -> + case lists:member(App, RunningApps) of + true -> + io:format(standard_error, + "~nUnable to update config for app ~p from *.conf file." + " App is already running. Use advanced.config instead.~n", + [App]); + false -> + case lists:member(App, LoadedApps) of + true -> application:unload(App); + false -> ok + end + end + end, + Config), + ok = application_controller:change_application_data([], [ConfigFile]), + %% Make sure to load all the applications we're unloaded + lists:foreach(fun(App) -> application:load(App) end, LoadedApps), + ok. generate_config_file(ConfFiles, ConfDir, ScriptDir) -> - generate_config_file(ConfFiles, ConfDir, ScriptDir, + generate_config_file(ConfFiles, ConfDir, ScriptDir, schema_dir(), get_advanced_config()). @@ -145,8 +169,8 @@ config_files() -> {ok, Files} -> [Abs(File, ".config") || [File] <- Files]; error -> case config_setting() of none -> []; - File -> [Abs(File, ".config") - ++ + File -> [Abs(File, ".config") + ++ " (not found)"] end end; diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index b2548cb61a..868fc1a4aa 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -65,12 +65,17 @@ alarmed, %% is monitoring enabled? false on unsupported %% platforms - enabled + enabled, + %% number of retries to enable monitoring if it fails + %% on start-up + retries, + %% Interval between retries + interval }). %%---------------------------------------------------------------------------- --type disk_free_limit() :: (integer() | string() | {'mem_relative', float()}). +-type disk_free_limit() :: (integer() | string() | {'mem_relative', float() | integer()}). -spec start_link(disk_free_limit()) -> rabbit_types:ok_pid_or_error(). -spec get_disk_free_limit() -> integer(). -spec set_disk_free_limit(disk_free_limit()) -> 'ok'. @@ -114,20 +119,17 @@ start_link(Args) -> init([Limit]) -> Dir = dir(), + {ok, Retries} = application:get_env(rabbit, disk_monitor_failure_retries), + {ok, Interval} = application:get_env(rabbit, disk_monitor_failure_retry_interval), State = #state{dir = Dir, min_interval = ?DEFAULT_MIN_DISK_CHECK_INTERVAL, max_interval = ?DEFAULT_MAX_DISK_CHECK_INTERVAL, alarmed = false, - enabled = true}, - case {catch get_disk_free(Dir), - vm_memory_monitor:get_total_memory()} of - {N1, N2} when is_integer(N1), is_integer(N2) -> - {ok, start_timer(set_disk_limits(State, Limit))}; - Err -> - rabbit_log:info("Disabling disk free space monitoring " - "on unsupported platform:~n~p~n", [Err]), - {ok, State#state{enabled = false}} - end. + enabled = true, + limit = Limit, + retries = Retries, + interval = Interval}, + {ok, enable(State)}. handle_call(get_disk_free_limit, _From, State = #state{limit = Limit}) -> {reply, Limit, State}; @@ -161,6 +163,8 @@ handle_call(_Request, _From, State) -> handle_cast(_Request, State) -> {noreply, State}. +handle_info(try_enable, #state{retries = Retries} = State) -> + {noreply, enable(State#state{retries = Retries - 1})}; handle_info(update, State) -> {noreply, start_timer(internal_update(State))}; @@ -233,7 +237,7 @@ parse_free_win32(CommandResult) -> list_to_integer(lists:reverse(Free)). interpret_limit({mem_relative, Relative}) - when is_float(Relative) -> + when is_number(Relative) -> round(Relative * vm_memory_monitor:get_total_memory()); interpret_limit(Absolute) -> case rabbit_resource_monitor_misc:parse_information_unit(Absolute) of @@ -246,7 +250,7 @@ interpret_limit(Absolute) -> emit_update_info(StateStr, CurrentFree, Limit) -> rabbit_log:info( - "Disk free space ~s. Free bytes:~p Limit:~p~n", + "Free disk space is ~s. Free bytes: ~p. Limit: ~p~n", [StateStr, CurrentFree, Limit]). start_timer(State) -> @@ -261,3 +265,20 @@ interval(#state{limit = Limit, max_interval = MaxInterval}) -> IdealInterval = 2 * (Actual - Limit) / ?FAST_RATE, trunc(erlang:max(MinInterval, erlang:min(MaxInterval, IdealInterval))). + +enable(#state{retries = 0} = State) -> + State; +enable(#state{dir = Dir, interval = Interval, limit = Limit, retries = Retries} + = State) -> + case {catch get_disk_free(Dir), + vm_memory_monitor:get_total_memory()} of + {N1, N2} when is_integer(N1), is_integer(N2) -> + rabbit_log:info("Enabling free disk space monitoring~n", []), + start_timer(set_disk_limits(State, Limit)); + Err -> + rabbit_log:info("Free disk space monitor encountered an error " + "(e.g. failed to parse output from OS tools): ~p, retries left: ~s~n", + [Err, Retries]), + timer:send_after(Interval, self(), try_enable), + State#state{enabled = false} + end. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 94710aed43..fefa0de1c9 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -57,13 +57,13 @@ coordinator :: pid(), backing_queue :: atom(), backing_queue_state :: any(), - seen_status :: dict:dict(), + seen_status :: map(), confirmed :: [rabbit_guid:guid()], known_senders :: sets:set() }. -spec promote_backing_queue_state (rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()], - dict:dict(), [pid()]) -> + map(), [pid()]) -> master_state(). -spec sender_death_fun() -> death_fun(). @@ -127,7 +127,7 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, - seen_status = dict:new(), + seen_status = #{}, confirmed = [], known_senders = sets:new(), wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) }; @@ -266,7 +266,7 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow, seen_status = SS, backing_queue = BQ, backing_queue_state = BQS }) -> - false = dict:is_key(MsgId, SS), %% ASSERTION + false = maps:is_key(MsgId, SS), %% ASSERTION ok = gm:broadcast(GM, {publish, ChPid, Flow, MsgProps, Msg}, rabbit_basic:msg_size(Msg)), BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS), @@ -281,7 +281,7 @@ batch_publish(Publishes, ChPid, Flow, lists:foldl(fun ({Msg = #basic_message { id = MsgId }, MsgProps, _IsDelivered}, {Pubs, false, Sizes}) -> {[{Msg, MsgProps, true} | Pubs], %% [0] - false = dict:is_key(MsgId, SS), %% ASSERTION + false = maps:is_key(MsgId, SS), %% ASSERTION Sizes + rabbit_basic:msg_size(Msg)} end, {[], false, 0}, Publishes), Publishes2 = lists:reverse(Publishes1), @@ -298,7 +298,7 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, seen_status = SS, backing_queue = BQ, backing_queue_state = BQS }) -> - false = dict:is_key(MsgId, SS), %% ASSERTION + false = maps:is_key(MsgId, SS), %% ASSERTION ok = gm:broadcast(GM, {publish_delivered, ChPid, Flow, MsgProps, Msg}, rabbit_basic:msg_size(Msg)), {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS), @@ -313,7 +313,7 @@ batch_publish_delivered(Publishes, ChPid, Flow, {false, MsgSizes} = lists:foldl(fun ({Msg = #basic_message { id = MsgId }, _MsgProps}, {false, Sizes}) -> - {false = dict:is_key(MsgId, SS), %% ASSERTION + {false = maps:is_key(MsgId, SS), %% ASSERTION Sizes + rabbit_basic:msg_size(Msg)} end, {false, 0}, Publishes), ok = gm:broadcast(GM, {batch_publish_delivered, ChPid, Flow, Publishes}, @@ -326,7 +326,7 @@ discard(MsgId, ChPid, Flow, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, seen_status = SS }) -> - false = dict:is_key(MsgId, SS), %% ASSERTION + false = maps:is_key(MsgId, SS), %% ASSERTION ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}), ensure_monitoring(ChPid, State #state { backing_queue_state = @@ -353,7 +353,7 @@ drain_confirmed(State = #state { backing_queue = BQ, lists:foldl( fun (MsgId, {MsgIdsN, SSN}) -> %% We will never see 'discarded' here - case dict:find(MsgId, SSN) of + case maps:find(MsgId, SSN) of error -> {[MsgId | MsgIdsN], SSN}; {ok, published} -> @@ -364,7 +364,7 @@ drain_confirmed(State = #state { backing_queue = BQ, %% consequently we need to filter out the %% confirm here. We will issue the confirm %% when we see the publish from the channel. - {MsgIdsN, dict:store(MsgId, confirmed, SSN)}; + {MsgIdsN, maps:put(MsgId, confirmed, SSN)}; {ok, confirmed} -> %% Well, confirms are racy by definition. {[MsgId | MsgIdsN], SSN} @@ -457,7 +457,7 @@ msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) -> info(backing_queue_status, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:info(backing_queue_status, BQS) ++ - [ {mirror_seen, dict:size(State #state.seen_status)}, + [ {mirror_seen, maps:size(State #state.seen_status)}, {mirror_senders, sets:size(State #state.known_senders)} ]; info(Item, #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:info(Item, BQS). @@ -480,7 +480,7 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% it. %% We will never see {published, ChPid, MsgSeqNo} here. - case dict:find(MsgId, SS) of + case maps:find(MsgId, SS) of error -> %% We permit the underlying BQ to have a peek at it, but %% only if we ourselves are not filtering out the msg. @@ -494,7 +494,7 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% immediately after calling is_duplicate). The msg is %% invalid. We will not see this again, nor will we be %% further involved in confirming this message, so erase. - {true, State #state { seen_status = dict:erase(MsgId, SS) }}; + {true, State #state { seen_status = maps:remove(MsgId, SS) }}; {ok, Disposition} when Disposition =:= confirmed %% It got published when we were a slave via gm, and @@ -509,7 +509,7 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% Message was discarded while we were a slave. Confirm now. %% As above, amqqueue_process will have the entry for the %% msg_id_to_channel mapping. - {true, State #state { seen_status = dict:erase(MsgId, SS), + {true, State #state { seen_status = maps:remove(MsgId, SS), confirmed = [MsgId | Confirmed] }} end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 5c1af7f4aa..ee697be501 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -129,10 +129,10 @@ handle_go(Q = #amqqueue{name = QName}) -> rate_timer_ref = undefined, sync_timer_ref = undefined, - sender_queues = dict:new(), - msg_id_ack = dict:new(), + sender_queues = #{}, + msg_id_ack = #{}, - msg_id_status = dict:new(), + msg_id_status = #{}, known_senders = pmon:new(delegate), depth_delta = undefined @@ -312,7 +312,7 @@ handle_cast({sync_start, Ref, Syncer}, State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State), S = fun({MA, TRefN, BQSN}) -> State1#state{depth_delta = undefined, - msg_id_ack = dict:from_list(MA), + msg_id_ack = maps:from_list(MA), rate_timer_ref = TRefN, backing_queue_state = BQSN} end, @@ -548,7 +548,7 @@ send_or_record_confirm(published, #delivery { sender = ChPid, id = MsgId, is_persistent = true } }, MS, #state { q = #amqqueue { durable = true } }) -> - dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS); + maps:put(MsgId, {published, ChPid, MsgSeqNo} , MS); send_or_record_confirm(_Status, #delivery { sender = ChPid, confirm = true, msg_seq_no = MsgSeqNo }, @@ -561,7 +561,7 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> lists:foldl( fun (MsgId, {CMsN, MSN} = Acc) -> %% We will never see 'discarded' here - case dict:find(MsgId, MSN) of + case maps:find(MsgId, MSN) of error -> %% If it needed confirming, it'll have %% already been done. @@ -569,12 +569,12 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> {ok, published} -> %% Still not seen it from the channel, just %% record that it's been confirmed. - {CMsN, dict:store(MsgId, confirmed, MSN)}; + {CMsN, maps:put(MsgId, confirmed, MSN)}; {ok, {published, ChPid, MsgSeqNo}} -> %% Seen from both GM and Channel. Can now %% confirm. {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN), - dict:erase(MsgId, MSN)}; + maps:remove(MsgId, MSN)}; {ok, confirmed} -> %% It's already been confirmed. This is %% probably it's been both sync'd to disk @@ -674,21 +674,21 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% Master, or MTC in queue_process. St = [published, confirmed, discarded], - SS = dict:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS), - AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)], + SS = maps:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS), + AckTags = [AckTag || {_MsgId, AckTag} <- maps:to_list(MA)], MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( QName, CPid, BQ, BQS, GM, AckTags, SS, MPids), - MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) -> + MTC = maps:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) -> gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); (_Msgid, _Status, MTC0) -> MTC0 end, gb_trees:empty(), MS), Deliveries = [promote_delivery(Delivery) || - {_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ), + {_ChPid, {PubQ, _PendCh, _ChState}} <- maps:to_list(SQ), Delivery <- queue:to_list(PubQ)], - AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)], + AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- maps:to_list(SQ)], KS1 = lists:foldl(fun (ChPid0, KS0) -> pmon:demonitor(ChPid0, KS0) end, KS, AwaitGmDown), @@ -800,20 +800,20 @@ forget_sender(Down1, Down2) when Down1 =/= Down2 -> true. maybe_forget_sender(ChPid, ChState, State = #state { sender_queues = SQ, msg_id_status = MS, known_senders = KS }) -> - case dict:find(ChPid, SQ) of + case maps:find(ChPid, SQ) of error -> State; {ok, {MQ, PendCh, ChStateRecord}} -> case forget_sender(ChState, ChStateRecord) of true -> credit_flow:peer_down(ChPid), - State #state { sender_queues = dict:erase(ChPid, SQ), + State #state { sender_queues = maps:remove(ChPid, SQ), msg_id_status = lists:foldl( - fun dict:erase/2, + fun maps:remove/2, MS, sets:to_list(PendCh)), known_senders = pmon:demonitor(ChPid, KS) }; false -> - SQ1 = dict:store(ChPid, {MQ, PendCh, ChState}, SQ), + SQ1 = maps:put(ChPid, {MQ, PendCh, ChState}, SQ), State #state { sender_queues = SQ1 } end end. @@ -825,32 +825,32 @@ maybe_enqueue_message( send_mandatory(Delivery), %% must do this before confirms State1 = ensure_monitoring(ChPid, State), %% We will never see {published, ChPid, MsgSeqNo} here. - case dict:find(MsgId, MS) of + case maps:find(MsgId, MS) of error -> {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ), MQ1 = queue:in(Delivery, MQ), - SQ1 = dict:store(ChPid, {MQ1, PendingCh, ChState}, SQ), + SQ1 = maps:put(ChPid, {MQ1, PendingCh, ChState}, SQ), State1 #state { sender_queues = SQ1 }; {ok, Status} -> MS1 = send_or_record_confirm( - Status, Delivery, dict:erase(MsgId, MS), State1), + Status, Delivery, maps:remove(MsgId, MS), State1), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), State1 #state { msg_id_status = MS1, sender_queues = SQ1 } end. get_sender_queue(ChPid, SQ) -> - case dict:find(ChPid, SQ) of + case maps:find(ChPid, SQ) of error -> {queue:new(), sets:new(), running}; {ok, Val} -> Val end. remove_from_pending_ch(MsgId, ChPid, SQ) -> - case dict:find(ChPid, SQ) of + case maps:find(ChPid, SQ) of error -> SQ; {ok, {MQ, PendingCh, ChState}} -> - dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState}, + maps:put(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState}, SQ) end. @@ -867,7 +867,7 @@ publish_or_discard(Status, ChPid, MsgId, case queue:out(MQ) of {empty, _MQ2} -> {MQ, sets:add_element(MsgId, PendingCh), - dict:store(MsgId, Status, MS)}; + maps:put(MsgId, Status, MS)}; {{value, Delivery = #delivery { message = #basic_message { id = MsgId } }}, MQ2} -> {MQ2, PendingCh, @@ -882,7 +882,7 @@ publish_or_discard(Status, ChPid, MsgId, %% expecting any confirms from us. {MQ, PendingCh, MS} end, - SQ1 = dict:store(ChPid, {MQ1, PendingCh1, ChState}, SQ), + SQ1 = maps:put(ChPid, {MQ1, PendingCh1, ChState}, SQ), State1 #state { sender_queues = SQ1, msg_id_status = MS1 }. @@ -1004,9 +1004,9 @@ msg_ids_to_acktags(MsgIds, MA) -> {AckTags, MA1} = lists:foldl( fun (MsgId, {Acc, MAN}) -> - case dict:find(MsgId, MA) of + case maps:find(MsgId, MA) of error -> {Acc, MAN}; - {ok, AckTag} -> {[AckTag | Acc], dict:erase(MsgId, MAN)} + {ok, AckTag} -> {[AckTag | Acc], maps:remove(MsgId, MAN)} end end, {[], MA}, MsgIds), {lists:reverse(AckTags), MA1}. @@ -1014,7 +1014,7 @@ msg_ids_to_acktags(MsgIds, MA) -> maybe_store_ack(false, _MsgId, _AckTag, State) -> State; maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA }) -> - State #state { msg_id_ack = dict:store(MsgId, AckTag, MA) }. + State #state { msg_id_ack = maps:put(MsgId, AckTag, MA) }. set_delta(0, State = #state { depth_delta = undefined }) -> ok = record_synchronised(State#state.q), diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 6d3b47a405..e48eb2b91f 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -119,6 +119,8 @@ init_from_config() -> (Name, BadNames) when is_atom(Name) -> BadNames; (Name, BadNames) -> [Name | BadNames] end, + %% See rabbitmq/rabbitmq-server#1202 for details. + rabbit_peer_discovery:maybe_inject_randomized_delay(), {DiscoveredNodes, NodeType} = case rabbit_peer_discovery:discover_cluster_nodes() of {ok, {Nodes, Type} = Config} @@ -139,14 +141,17 @@ init_from_config() -> {ok, _} -> e(invalid_cluster_nodes_conf) end, - case DiscoveredNodes of + rabbit_log:info("All discovered existing cluster peers: ~p~n", + [rabbit_peer_discovery:format_discovered_nodes(DiscoveredNodes)]), + Peers = nodes_excl_me(DiscoveredNodes), + case Peers of [] -> rabbit_log:info("Discovered no peer nodes to cluster with"), init_db_and_upgrade([node()], disc, false, _Retry = true); _ -> - rabbit_log:info("Discovered peer nodes: ~s~n", - [rabbit_peer_discovery:format_discovered_nodes(DiscoveredNodes)]), - join_discovered_peers(DiscoveredNodes, NodeType) + rabbit_log:info("Peer nodes we can cluster with: ~s~n", + [rabbit_peer_discovery:format_discovered_nodes(Peers)]), + join_discovered_peers(Peers, NodeType) end. %% Attempts to join discovered, diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 891cdf0236..be85091948 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -1337,9 +1337,11 @@ update_pending_confirms(Fun, CRef, record_pending_confirm(CRef, MsgId, State) -> update_pending_confirms( fun (_MsgOnDiskFun, CTM) -> - maps:update_with(CRef, - fun (MsgIds) -> gb_sets:add(MsgId, MsgIds) end, - gb_sets:singleton(MsgId), CTM) + NewMsgIds = case maps:find(CRef, CTM) of + error -> gb_sets:singleton(MsgId); + {ok, MsgIds} -> gb_sets:add(MsgId, MsgIds) + end, + maps:put(CRef, NewMsgIds, CTM) end, CRef, State). client_confirm(CRef, MsgIds, ActionTaken, State) -> diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 6179ef95bb..728c9652d0 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -70,7 +70,7 @@ set_maximum_since_use(Pid, Age) -> init([MsgStoreState]) -> ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), - {ok, #state { pending_no_readers = dict:new(), + {ok, #state { pending_no_readers = #{}, on_action = [], msg_store_state = MsgStoreState }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -89,11 +89,11 @@ handle_cast({delete, File}, State) -> handle_cast({no_readers, File}, State = #state { pending_no_readers = Pending }) -> - {noreply, case dict:find(File, Pending) of + {noreply, case maps:find(File, Pending) of error -> State; {ok, {Action, Files}} -> - Pending1 = dict:erase(File, Pending), + Pending1 = maps:remove(File, Pending), attempt_action( Action, Files, State #state { pending_no_readers = Pending1 }) @@ -123,7 +123,7 @@ attempt_action(Action, Files, fun (Thunk) -> not Thunk() end, [do_action(Action, Files, MsgStoreState) | Thunks]) }; - [File | _] -> Pending1 = dict:store(File, {Action, Files}, Pending), + [File | _] -> Pending1 = maps:put(File, {Action, Files}, Pending), State #state { pending_no_readers = Pending1 } end. diff --git a/src/rabbit_peer_discovery.erl b/src/rabbit_peer_discovery.erl index 7cdb35f6f4..088f1c88fd 100644 --- a/src/rabbit_peer_discovery.erl +++ b/src/rabbit_peer_discovery.erl @@ -21,7 +21,9 @@ %% -export([discover_cluster_nodes/0, backend/0, node_type/0, - normalize/1, format_discovered_nodes/1, log_configured_backend/0]). + normalize/1, format_discovered_nodes/1, log_configured_backend/0, + register/0, unregister/0, maybe_register/0, maybe_unregister/0, + maybe_inject_randomized_delay/0]). -export([append_node_prefix/1, node_prefix/0]). -define(DEFAULT_BACKEND, rabbit_peer_discovery_classic_config). @@ -30,6 +32,9 @@ -define(DEFAULT_NODE_TYPE, disc). %% default node prefix to attach to discovered hostnames -define(DEFAULT_PREFIX, "rabbit"). +%% default randomized delay range, in seconds +-define(DEFAULT_STARTUP_RANDOMIZED_DELAY, {5, 60}). + -define(NODENAME_PART_SEPARATOR, "@"). @@ -72,6 +77,117 @@ discover_cluster_nodes() -> normalize(Backend:list_nodes()). +-spec maybe_register() -> ok. + +maybe_register() -> + Backend = backend(), + case Backend:supports_registration() of + true -> + register(), + Backend:post_registration(); + false -> + rabbit_log:info("Peer discovery backend ~s does not support registration, skipping registration.", [Backend]), + ok + end. + + +-spec maybe_unregister() -> ok. + +maybe_unregister() -> + Backend = backend(), + case Backend:supports_registration() of + true -> + unregister(); + false -> + rabbit_log:info("Peer discovery backend ~s does not support registration, skipping unregistration.", [Backend]), + ok + end. + + +-spec maybe_inject_randomized_delay() -> ok. +maybe_inject_randomized_delay() -> + Backend = backend(), + case Backend:supports_registration() of + true -> + rabbit_log:info("Peer discovery backend ~s supports registration.", [Backend]), + inject_randomized_delay(); + false -> + rabbit_log:info("Peer discovery backend ~s does not support registration, skipping randomized startup delay.", [Backend]), + ok + end. + +-spec inject_randomized_delay() -> ok. + +inject_randomized_delay() -> + {Min, Max} = case randomized_delay_range_in_ms() of + {A, B} -> {A, B}; + [A, B] -> {A, B} + end, + case {Min, Max} of + %% When the max value is set to 0, consider the delay to be disabled. + %% In addition, `rand:uniform/1` will fail with a "no function clause" + %% when the argument is 0. + {_, 0} -> + rabbit_log:info("Randomized delay range's upper bound is set to 0. Considering it disabled."), + ok; + {_, N} when is_number(N) -> + rand:seed(exsplus), + RandomVal = rand:uniform(round(N)), + rabbit_log:debug("Randomized startup delay: configured range is from ~p to ~p milliseconds, PRNG pick: ~p...", + [Min, Max, RandomVal]), + Effective = case RandomVal < Min of + true -> Min; + false -> RandomVal + end, + rabbit_log:info("Will wait for ~p milliseconds before proceeding with regitration...", [Effective]), + timer:sleep(Effective), + ok + end. + +-spec randomized_delay_range_in_ms() -> {integer(), integer()}. + +randomized_delay_range_in_ms() -> + {Min, Max} = case application:get_env(rabbit, autocluster) of + {ok, Proplist} -> + proplists:get_value(randomized_startup_delay_range, Proplist, ?DEFAULT_STARTUP_RANDOMIZED_DELAY); + undefined -> + ?DEFAULT_STARTUP_RANDOMIZED_DELAY + end, + {Min * 1000, Max * 1000}. + + +-spec register() -> ok. + +register() -> + Backend = backend(), + rabbit_log:info("Will register with peer discovery backend ~s", [Backend]), + case Backend:register() of + ok -> ok; + {error, Error} -> + rabbit_log:error("Failed to register with peer discovery backend ~s: ~p", + [Backend, Error]), + ok + end. + + +-spec unregister() -> ok. + +unregister() -> + Backend = backend(), + rabbit_log:info("Will unregister with peer discovery backend ~s", [Backend]), + case Backend:unregister() of + ok -> ok; + {error, Error} -> + rabbit_log:error("Failed to unregister with peer discovery backend ~s: ~p", + [Backend, Error]), + ok + end. + + +%% +%% Implementation +%% + -spec normalize(Nodes :: list() | {Nodes :: list(), NodeType :: rabbit_types:node_type()} | {ok, Nodes :: list()} | @@ -90,7 +206,6 @@ normalize({ok, {Nodes, NodeType}}) when is_list(Nodes) andalso is_atom(NodeType) normalize({error, Reason}) -> {error, Reason}. - -spec format_discovered_nodes(Nodes :: list()) -> string(). format_discovered_nodes(Nodes) -> diff --git a/src/rabbit_peer_discovery_classic_config.erl b/src/rabbit_peer_discovery_classic_config.erl index 95e5532548..da5fb6c971 100644 --- a/src/rabbit_peer_discovery_classic_config.erl +++ b/src/rabbit_peer_discovery_classic_config.erl @@ -19,7 +19,8 @@ -include("rabbit.hrl"). --export([list_nodes/0, register/0, unregister/0]). +-export([list_nodes/0, supports_registration/0, register/0, unregister/0, + post_registration/0]). %% %% API @@ -34,6 +35,11 @@ list_nodes() -> undefined -> {[], disc} end. +-spec supports_registration() -> boolean(). + +supports_registration() -> + false. + -spec register() -> ok. register() -> @@ -43,3 +49,8 @@ register() -> unregister() -> ok. + +-spec post_registration() -> ok. + +post_registration() -> + ok. diff --git a/src/rabbit_peer_discovery_dns.erl b/src/rabbit_peer_discovery_dns.erl index c8f6a7f39a..e4215cd048 100644 --- a/src/rabbit_peer_discovery_dns.erl +++ b/src/rabbit_peer_discovery_dns.erl @@ -19,7 +19,8 @@ -include("rabbit.hrl"). --export([list_nodes/0, register/0, unregister/0]). +-export([list_nodes/0, supports_registration/0, register/0, unregister/0, + post_registration/0]). %% for tests -export([discover_nodes/2, discover_hostnames/2]). @@ -48,6 +49,13 @@ list_nodes() -> end end. + +-spec supports_registration() -> boolean(). + +supports_registration() -> + false. + + -spec register() -> ok. register() -> @@ -58,6 +66,11 @@ register() -> unregister() -> ok. +-spec post_registration() -> ok. + +post_registration() -> + ok. + %% %% Implementation @@ -68,14 +81,25 @@ discover_nodes(SeedHostname, LongNamesUsed) -> H <- discover_hostnames(SeedHostname, LongNamesUsed)]. discover_hostnames(SeedHostname, LongNamesUsed) -> - %% TODO: IPv6 support - IPs = inet_res:lookup(SeedHostname, in, a), - rabbit_log:info("Addresses discovered via A records of ~s: ~s", - [SeedHostname, string:join([inet_parse:ntoa(IP) || IP <- IPs], ", ")]), + lookup(SeedHostname, LongNamesUsed, ipv4) ++ + lookup(SeedHostname, LongNamesUsed, ipv6). + +decode_record(ipv4) -> + a; +decode_record(ipv6) -> + aaaa. + +lookup(SeedHostname, LongNamesUsed, IPv) -> + IPs = inet_res:lookup(SeedHostname, in, decode_record(IPv)), + rabbit_log:info("Addresses discovered via ~s records of ~s: ~s", + [string:to_upper(atom_to_list(decode_record(IPv))), + SeedHostname, + string:join([inet_parse:ntoa(IP) || IP <- IPs], ", ")]), Hosts = [extract_host(inet:gethostbyaddr(A), LongNamesUsed, A) || - A <- IPs], + A <- IPs], lists:filter(fun(E) -> E =/= error end, Hosts). + %% long node names are used extract_host({ok, {hostent, FQDN, _, _, _, _}}, true, _Address) -> FQDN; diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 4e9715fba2..f13a46fcf3 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -64,7 +64,8 @@ -spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'. -spec inactive(state()) -> boolean(). -spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(), - non_neg_integer(), rabbit_framing:amqp_table()}]. + non_neg_integer(), rabbit_framing:amqp_table(), + rabbit_types:username()}]. -spec count() -> non_neg_integer(). -spec unacknowledged_message_count() -> non_neg_integer(). -spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(), @@ -280,7 +281,7 @@ subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) -> orddict:update_counter(CTag, 1, CTagCounts), QTail); {{value, V}, QTail} -> subtract_acks(AckTags, [V | Prefix], CTagCounts, QTail); - {empty, _} -> + {empty, _} -> subtract_acks([], Prefix, CTagCounts, AckQ) end. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 3c4f23a416..a71eaf1ff4 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -83,7 +83,7 @@ %% contains a mapping from segment numbers to state-per-segment (this %% state is held for all segments which have been "seen": thus a %% segment which has been read but has no pending entries in the -%% journal is still held in this mapping. Also note that a dict is +%% journal is still held in this mapping. Also note that a map is %% used for this mapping, not an array because with an array, you will %% always have entries from 0). Actions are stored directly in this %% state. Thus at the point of flushing the journal, firstly no @@ -236,10 +236,10 @@ unacked :: non_neg_integer() }). -type seq_id() :: integer(). --type seg_dict() :: {dict:dict(), [segment()]}. +-type seg_map() :: {map(), [segment()]}. -type on_sync_fun() :: fun ((gb_sets:set()) -> ok). -type qistate() :: #qistate { dir :: file:filename(), - segments :: 'undefined' | seg_dict(), + segments :: 'undefined' | seg_map(), journal_handle :: hdl(), dirty_count :: integer(), max_journal_entries :: non_neg_integer(), @@ -1027,7 +1027,7 @@ segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_]}) -> segment_find(Seg, {_Segments, [_, Segment = #segment { num = Seg }]}) -> {ok, Segment}; %% 2, matches tail segment_find(Seg, {Segments, _}) -> %% no match - dict:find(Seg, Segments). + maps:find(Seg, Segments). segment_store(Segment = #segment { num = Seg }, %% 1 or (2, matches head) {Segments, [#segment { num = Seg } | Tail]}) -> @@ -1036,28 +1036,28 @@ segment_store(Segment = #segment { num = Seg }, %% 2, matches tail {Segments, [SegmentA, #segment { num = Seg }]}) -> {Segments, [Segment, SegmentA]}; segment_store(Segment = #segment { num = Seg }, {Segments, []}) -> - {dict:erase(Seg, Segments), [Segment]}; + {maps:remove(Seg, Segments), [Segment]}; segment_store(Segment = #segment { num = Seg }, {Segments, [SegmentA]}) -> - {dict:erase(Seg, Segments), [Segment, SegmentA]}; + {maps:remove(Seg, Segments), [Segment, SegmentA]}; segment_store(Segment = #segment { num = Seg }, {Segments, [SegmentA, SegmentB]}) -> - {dict:store(SegmentB#segment.num, SegmentB, dict:erase(Seg, Segments)), + {maps:put(SegmentB#segment.num, SegmentB, maps:remove(Seg, Segments)), [Segment, SegmentA]}. segment_fold(Fun, Acc, {Segments, CachedSegments}) -> - dict:fold(fun (_Seg, Segment, Acc1) -> Fun(Segment, Acc1) end, + maps:fold(fun (_Seg, Segment, Acc1) -> Fun(Segment, Acc1) end, lists:foldl(Fun, Acc, CachedSegments), Segments). segment_map(Fun, {Segments, CachedSegments}) -> - {dict:map(fun (_Seg, Segment) -> Fun(Segment) end, Segments), + {maps:map(fun (_Seg, Segment) -> Fun(Segment) end, Segments), lists:map(Fun, CachedSegments)}. segment_nums({Segments, CachedSegments}) -> lists:map(fun (#segment { num = Num }) -> Num end, CachedSegments) ++ - dict:fetch_keys(Segments). + maps:keys(Segments). segments_new() -> - {dict:new(), []}. + {#{}, []}. entry_to_segment(_RelSeq, {?PUB, del, ack}, Initial) -> Initial; diff --git a/test/peer_discovery_dns_SUITE.erl b/test/peer_discovery_dns_SUITE.erl index 687569890f..7deb38ba97 100644 --- a/test/peer_discovery_dns_SUITE.erl +++ b/test/peer_discovery_dns_SUITE.erl @@ -33,7 +33,8 @@ groups() -> hostname_discovery_with_long_node_names, hostname_discovery_with_short_node_names, node_discovery_with_long_node_names, - node_discovery_with_short_node_names + node_discovery_with_short_node_names, + test_aaaa_record_hostname_discovery ]} ]. @@ -54,7 +55,9 @@ suite() -> %% * One does not resolve to a [typically] non-reachable IP %% * One does not support reverse lookup queries --define(DISCOVERY_ENDPOINT, "peer_discovery.tests.rabbitmq.net"). +-define(DISCOVERY_ENDPOINT_RECORD_A, "peer_discovery.tests.rabbitmq.net"). + +-define(DISCOVERY_ENDPOINT_RECORD_AAAA, "www.v6.facebook.com"). init_per_suite(Config) -> rabbit_ct_helpers:log_environment(), @@ -63,17 +66,26 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_testcase(test_aaaa_record, Config) -> + case inet_res:lookup(?DISCOVERY_ENDPOINT_RECORD_AAAA, in, aaaa) of + [] -> + {skip, "pre-configured AAAA record does not resolve, skipping"}; + [_ | _] -> + Config + end; + init_per_testcase(_Testcase, Config) -> - %% TODO: support IPv6-only environments - case inet_res:lookup(?DISCOVERY_ENDPOINT, in, a) of + case inet_res:lookup(?DISCOVERY_ENDPOINT_RECORD_A, in, a) of [] -> {skip, "pre-configured *.rabbitmq.net record does not resolve, skipping"}; [_ | _] -> Config end. + end_per_testcase(_Testcase, Config) -> - case inet_res:lookup(?DISCOVERY_ENDPOINT, in, a) of + case inet_res:lookup(?DISCOVERY_ENDPOINT_RECORD_A, in, a) of [] -> {skip, "pre-configured *.rabbitmq.net record does not resolve, skipping"}; [_ | _] -> @@ -85,18 +97,22 @@ end_per_testcase(_Testcase, Config) -> %% Test cases %% ------------------------------------------------------------------- +test_aaaa_record_hostname_discovery(_) -> + Result = rabbit_peer_discovery_dns:discover_hostnames(?DISCOVERY_ENDPOINT_RECORD_AAAA, true), + ?assert(string:str(lists:flatten(Result), "facebook.com") > 0). + hostname_discovery_with_long_node_names(_) -> - Result = rabbit_peer_discovery_dns:discover_hostnames(?DISCOVERY_ENDPOINT, true), + Result = rabbit_peer_discovery_dns:discover_hostnames(?DISCOVERY_ENDPOINT_RECORD_A, true), ?assert(lists:member("www.rabbitmq.com", Result)). hostname_discovery_with_short_node_names(_) -> - Result = rabbit_peer_discovery_dns:discover_hostnames(?DISCOVERY_ENDPOINT, false), + Result = rabbit_peer_discovery_dns:discover_hostnames(?DISCOVERY_ENDPOINT_RECORD_A, false), ?assert(lists:member("www", Result)). node_discovery_with_long_node_names(_) -> - Result = rabbit_peer_discovery_dns:discover_nodes(?DISCOVERY_ENDPOINT, true), + Result = rabbit_peer_discovery_dns:discover_nodes(?DISCOVERY_ENDPOINT_RECORD_A, true), ?assert(lists:member('ct_rabbit@www.rabbitmq.com', Result)). node_discovery_with_short_node_names(_) -> - Result = rabbit_peer_discovery_dns:discover_nodes(?DISCOVERY_ENDPOINT, false), + Result = rabbit_peer_discovery_dns:discover_nodes(?DISCOVERY_ENDPOINT_RECORD_A, false), ?assert(lists:member(ct_rabbit@www, Result)). diff --git a/test/rabbitmqctl_shutdown_SUITE.erl b/test/rabbitmqctl_shutdown_SUITE.erl index fa04dc6504..25cc2fdef8 100644 --- a/test/rabbitmqctl_shutdown_SUITE.erl +++ b/test/rabbitmqctl_shutdown_SUITE.erl @@ -29,8 +29,7 @@ all() -> groups() -> [ {running_node, [], [ - successful_shutdown, - error_during_shutdown + successful_shutdown ]}, {non_running_node, [], [ nothing_to_shutdown @@ -91,16 +90,6 @@ successful_shutdown(Config) -> false = erlang_pid_is_running(Pid), false = node_is_running(Node). -error_during_shutdown(Config) -> - Node = ?config(node, Config), - ok = rabbit_ct_broker_helpers:control_action(stop_app, Node, []), - ok = rpc:call(Node, application, unload, [os_mon]), - - {badrpc, - {'EXIT', { - {error, {badmatch, {error,{edge,{bad_vertex,os_mon},os_mon,rabbit}}}}, - _}}} = shutdown_error(Node). - nothing_to_shutdown(Config) -> Node = ?config(node, Config), @@ -119,14 +108,6 @@ erlang_pid_is_running(Pid) -> node_is_running(Node) -> net_adm:ping(Node) == pong. -shutdown_error(Node) -> - %% Start a command - {stream, Stream} = rabbit_ct_broker_helpers:control_action(shutdown, Node, []), - %% Execute command steps. The last one should be error - Lines = 'Elixir.Enum':to_list(Stream), - {error, Err} = lists:last(Lines), - Err. - shutdown_ok(Node) -> %% Start a command {stream, Stream} = rabbit_ct_broker_helpers:control_action(shutdown, Node, []), diff --git a/test/unit_inbroker_non_parallel_SUITE.erl b/test/unit_inbroker_non_parallel_SUITE.erl index 93f107de6b..2af6368f34 100644 --- a/test/unit_inbroker_non_parallel_SUITE.erl +++ b/test/unit_inbroker_non_parallel_SUITE.erl @@ -35,6 +35,7 @@ groups() -> app_management, %% Restart RabbitMQ. channel_statistics, %% Expect specific statistics. disk_monitor, %% Replace rabbit_misc module. + disk_monitor_enable, file_handle_cache, %% Change FHC limit. head_message_timestamp_statistics, %% Expect specific statistics. log_management, %% Check log files. @@ -631,6 +632,37 @@ disk_monitor1(_Config) -> meck:unload(rabbit_misc), passed. +disk_monitor_enable(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, disk_monitor_enable1, [Config]). + +disk_monitor_enable1(_Config) -> + case os:type() of + {unix, _} -> + disk_monitor_enable1(); + _ -> + %% skip windows testing + skipped + end. + +disk_monitor_enable1() -> + ok = meck:new(rabbit_misc, [passthrough]), + ok = meck:expect(rabbit_misc, os_cmd, fun(_) -> "\n" end), + application:set_env(rabbit, disk_monitor_failure_retries, 20000), + application:set_env(rabbit, disk_monitor_failure_retry_interval, 100), + ok = rabbit_sup:stop_child(rabbit_disk_monitor_sup), + ok = rabbit_sup:start_delayed_restartable_child(rabbit_disk_monitor, [1000]), + undefined = rabbit_disk_monitor:get_disk_free(), + Cmd = "Filesystem 1024-blocks Used Available Capacity iused ifree %iused Mounted on\n/dev/disk1 975798272 234783364 740758908 25% 58759839 185189727 24% /\n", + ok = meck:expect(rabbit_misc, os_cmd, fun(_) -> Cmd end), + timer:sleep(1000), + Bytes = 740758908 * 1024, + Bytes = rabbit_disk_monitor:get_disk_free(), + meck:unload(rabbit_misc), + application:set_env(rabbit, disk_monitor_failure_retries, 10), + application:set_env(rabbit, disk_monitor_failure_retry_interval, 120000), + passed. + %% --------------------------------------------------------------------------- %% rabbitmqctl helpers. %% --------------------------------------------------------------------------- |
