summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile2
-rwxr-xr-xscripts/rabbitmq-env21
-rw-r--r--scripts/rabbitmq-env.bat17
-rwxr-xr-xscripts/rabbitmq-server2
-rw-r--r--scripts/rabbitmq-server.bat2
-rw-r--r--scripts/rabbitmq-service.bat2
-rw-r--r--src/rabbit.erl7
-rw-r--r--src/rabbit_feature_flags.erl1797
-rw-r--r--src/rabbit_ff_extra.erl236
-rw-r--r--src/rabbit_ff_registry.erl167
-rw-r--r--src/rabbit_mnesia.erl23
-rw-r--r--src/rabbit_plugins.erl1
-rw-r--r--test/dynamic_qq_SUITE.erl23
-rw-r--r--test/feature_flags_SUITE.erl372
-rw-r--r--test/quorum_queue_SUITE.erl67
-rw-r--r--test/single_active_consumer_SUITE.erl22
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl22
17 files changed, 2742 insertions, 41 deletions
diff --git a/Makefile b/Makefile
index 137fde2d95..49883bfef6 100644
--- a/Makefile
+++ b/Makefile
@@ -136,7 +136,7 @@ define PROJECT_ENV
]
endef
-LOCAL_DEPS = sasl mnesia os_mon inets
+LOCAL_DEPS = sasl mnesia os_mon inets compiler syntax_tools
BUILD_DEPS = rabbitmq_cli syslog
DEPS = ranch lager rabbit_common ra sysmon_handler
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck proper
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index f4b06c171b..2b06dec2cd 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -278,6 +278,10 @@ rmq_normalize_path_var RABBITMQ_PID_FILE
[ "x" = "x$RABBITMQ_BOOT_MODULE" ] && RABBITMQ_BOOT_MODULE=${BOOT_MODULE}
+[ "x" != "x$RABBITMQ_FEATURE_FLAGS_FILE" ] && RABBITMQ_FEATURE_FLAGS_FILE_source=environment
+[ "x" = "x$RABBITMQ_FEATURE_FLAGS_FILE" ] && RABBITMQ_FEATURE_FLAGS_FILE=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-feature_flags
+rmq_normalize_path_var RABBITMQ_FEATURE_FLAGS_FILE
+
[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR}
[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand
rmq_normalize_path_var RABBITMQ_PLUGINS_EXPAND_DIR
@@ -310,6 +314,7 @@ rmq_check_if_shared_with_mnesia \
RABBITMQ_CONFIG_FILE \
RABBITMQ_LOG_BASE \
RABBITMQ_PID_FILE \
+ RABBITMQ_FEATURE_FLAGS_FILE \
RABBITMQ_PLUGINS_EXPAND_DIR \
RABBITMQ_ENABLED_PLUGINS_FILE \
RABBITMQ_PLUGINS_DIR \
@@ -320,21 +325,29 @@ rmq_check_if_shared_with_mnesia \
## Development-specific environment.
if [ "${RABBITMQ_DEV_ENV}" ]; then
- if [ "$RABBITMQ_PLUGINS_DIR_source" != 'environment' -o \
+ if [ "$RABBITMQ_FEATURE_FLAGS_FILE_source" != 'environment' -o \
+ "$RABBITMQ_PLUGINS_DIR_source" != 'environment' -o \
"$RABBITMQ_ENABLED_PLUGINS_FILE_source" != 'environment' ]; then
# We need to query the running node for the plugins directory
# and the "enabled plugins" file.
eval $( (${RABBITMQ_SCRIPTS_DIR}/rabbitmqctl eval \
- '{ok, P} = application:get_env(rabbit, plugins_dir),
+ '{ok, F} = application:get_env(rabbit, feature_flags_file),
+ {ok, P} = application:get_env(rabbit, plugins_dir),
{ok, E} = application:get_env(rabbit, enabled_plugins_file),
B = os:getenv("RABBITMQ_MNESIA_BASE"),
M = os:getenv("RABBITMQ_MNESIA_DIR"),
io:format(
+ "feature_flags_file=\"~s\"~n"
"plugins_dir=\"~s\"~n"
"enabled_plugins_file=\"~s\"~n"
"mnesia_base=\"~s\"~n"
- "mnesia_dir=\"~s\"~n", [P, E, B, M]).' \
- 2>/dev/null | grep -E '^(plugins_dir|enabled_plugins_file|mnesia_base|mnesia_dir)=') || :)
+ "mnesia_dir=\"~s\"~n", [F, P, E, B, M]).' \
+ 2>/dev/null | grep -E '^(feature_flags_file|plugins_dir|enabled_plugins_file|mnesia_base|mnesia_dir)=') || :)
+
+ if [ "${feature_flags_file}" -a \
+ "$RABBITMQ_FEATURE_FLAGS_FILE_source" != 'environment' ]; then
+ RABBITMQ_FEATURE_FLAGS_FILE="${feature_flags_file}"
+ fi
if [ "${plugins_dir}" -a \
"$RABBITMQ_PLUGINS_DIR_source" != 'environment' ]; then
RABBITMQ_PLUGINS_DIR="${plugins_dir}"
diff --git a/scripts/rabbitmq-env.bat b/scripts/rabbitmq-env.bat
index 912668c8ac..783fc55287 100644
--- a/scripts/rabbitmq-env.bat
+++ b/scripts/rabbitmq-env.bat
@@ -284,6 +284,15 @@ if "!RABBITMQ_BOOT_MODULE!"=="" (
)
)
+REM [ "x" = "x$RABBITMQ_FEATURE_FLAGS_FILE" ] && RABBITMQ_FEATURE_FLAGS_FILE=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-feature_flags
+if "!RABBITMQ_FEATURE_FLAGS_FILE!"=="" (
+ if "!FEATURE_FLAGS_FILE!"=="" (
+ set RABBITMQ_FEATURE_FLAGS_FILE=!RABBITMQ_MNESIA_BASE!\!RABBITMQ_NODENAME!-feature_flags
+ ) else (
+ set RABBITMQ_FEATURE_FLAGS_FILE=!FEATURE_FLAGS_FILE!
+ )
+)
+
REM [ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR}
REM [ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand
if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
@@ -397,6 +406,13 @@ if defined RABBITMQ_DEV_ENV (
if "!SCRIPT_NAME!" == "rabbitmq-plugins" (
REM We may need to query the running node for the plugins directory
REM and the "enabled plugins" file.
+ if not "%RABBITMQ_FEATURE_FLAGS_FILE_source%" == "environment" (
+ for /f "delims=" %%F in ('!SCRIPT_DIR!\rabbitmqctl eval "{ok, P} = application:get_env(rabbit, feature_flags_file), io:format(""~s~n"", [P])."') do @set feature_flags_file=%%F
+ if exist "!feature_flags_file!" (
+ set RABBITMQ_FEATURE_FLAGS_FILE=!feature_flags_file!
+ )
+ REM set feature_flags_file=
+ )
if not "%RABBITMQ_PLUGINS_DIR_source%" == "environment" (
for /f "delims=" %%F in ('!SCRIPT_DIR!\rabbitmqctl eval "{ok, P} = application:get_env(rabbit, plugins_dir), io:format(""~s~n"", [P])."') do @set plugins_dir=%%F
if exist "!plugins_dir!" (
@@ -476,6 +492,7 @@ exit /b
REM Environment cleanup
set BOOT_MODULE=
set CONFIG_FILE=
+set FEATURE_FLAGS_FILE=
set ENABLED_PLUGINS_FILE=
set LOG_BASE=
set MNESIA_BASE=
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index efff38c3af..aeac38e46f 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -195,6 +195,7 @@ RABBITMQ_DIST_PORT=$RABBITMQ_DIST_PORT \
-s rabbit_prelaunch \
${RABBITMQ_NAME_TYPE} ${RABBITMQ_PRELAUNCH_NODENAME} \
-conf_advanced "${RABBITMQ_ADVANCED_CONFIG_FILE}" \
+ -rabbit feature_flags_file "\"$RABBITMQ_FEATURE_FLAGS_FILE\"" \
-rabbit enabled_plugins_file "\"$RABBITMQ_ENABLED_PLUGINS_FILE\"" \
-rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \
-extra "${RABBITMQ_NODENAME}"
@@ -302,6 +303,7 @@ start_rabbitmq_server() {
-rabbit lager_log_root "\"$RABBITMQ_LOG_BASE\"" \
-rabbit lager_default_file "$RABBIT_LAGER_HANDLER" \
-rabbit lager_upgrade_file "$RABBITMQ_LAGER_HANDLER_UPGRADE" \
+ -rabbit feature_flags_file "\"$RABBITMQ_FEATURE_FLAGS_FILE\"" \
-rabbit enabled_plugins_file "\"$RABBITMQ_ENABLED_PLUGINS_FILE\"" \
-rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \
-rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 9039243c62..d417091732 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -146,6 +146,7 @@ if "!RABBITMQ_CONFIG_FILE_NOEX!.conf" == "!RABBITMQ_CONFIG_FILE!" (
-s rabbit_prelaunch ^
!RABBITMQ_NAME_TYPE! rabbitmqprelaunch!RANDOM!!TIME:~9!@localhost ^
-conf_advanced "!RABBITMQ_ADVANCED_CONFIG_FILE!" ^
+ -rabbit feature_flags_file "!RABBITMQ_FEATURE_FLAGS_FILE!" ^
-rabbit enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" ^
-rabbit plugins_dir "!RABBITMQ_PLUGINS_DIR!" ^
-extra "!RABBITMQ_NODENAME!"
@@ -247,6 +248,7 @@ if "!ENV_OK!"=="false" (
-rabbit lager_log_root \""!RABBITMQ_LOG_BASE:\=/!"\" ^
-rabbit lager_default_file !RABBIT_LAGER_HANDLER! ^
-rabbit lager_upgrade_file !RABBITMQ_LAGER_HANDLER_UPGRADE! ^
+-rabbit feature_flags_file \""!RABBITMQ_FEATURE_FLAGS_FILE:\=/!"\" ^
-rabbit enabled_plugins_file \""!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!"\" ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 08cc29f2c8..7bb1f124b5 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -229,6 +229,7 @@ if "!RABBITMQ_CONFIG_FILE_NOEX!.conf" == "!RABBITMQ_CONFIG_FILE!" (
-s rabbit_prelaunch ^
!RABBITMQ_NAME_TYPE! rabbitmqprelaunch!RANDOM!!TIME:~9!@localhost ^
-conf_advanced "!RABBITMQ_ADVANCED_CONFIG_FILE!" ^
+ -rabbit feature_flags_file "!RABBITMQ_FEATURE_FLAGS_FILE!" ^
-rabbit enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" ^
-rabbit plugins_dir "!RABBITMQ_PLUGINS_DIR!" ^
-extra "!RABBITMQ_NODENAME!"
@@ -320,6 +321,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-rabbit lager_log_root \""!RABBITMQ_LOG_BASE:\=/!"\" ^
-rabbit lager_default_file !RABBIT_LAGER_HANDLER! ^
-rabbit lager_upgrade_file !RABBITMQ_LAGER_HANDLER_UPGRADE! ^
+-rabbit feature_flags_file \""!RABBITMQ_FEATURE_FLAGS_FILE:\=/!"\" ^
-rabbit enabled_plugins_file \""!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!"\" ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
diff --git a/src/rabbit.erl b/src/rabbit.erl
index d40aa5a279..c0ecee6423 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -63,6 +63,12 @@
{requires, pre_boot},
{enables, external_infrastructure}]}).
+-rabbit_boot_step({feature_flags,
+ [{description, "feature flags registry and initial state"},
+ {mfa, {rabbit_feature_flags, init, []}},
+ {requires, pre_boot},
+ {enables, external_infrastructure}]}).
+
-rabbit_boot_step({database,
[{mfa, {rabbit_mnesia, init, []}},
{requires, file_handle_cache},
@@ -530,6 +536,7 @@ start_apps(Apps) ->
start_apps(Apps, RestartTypes) ->
app_utils:load_applications(Apps),
+ rabbit_feature_flags:initialize_registry(),
ensure_sysmon_handler_app_config(),
%% make Ra use a custom logger that dispatches to lager instead of the
%% default OTP logger
diff --git a/src/rabbit_feature_flags.erl b/src/rabbit_feature_flags.erl
new file mode 100644
index 0000000000..e5d59b1a47
--- /dev/null
+++ b/src/rabbit_feature_flags.erl
@@ -0,0 +1,1797 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018-2019 Pivotal Software, Inc. All rights reserved.
+%%
+
+%% @author The RabbitMQ team
+%% @copyright 2018-2019 Pivotal Software, Inc.
+%%
+%% @doc
+%% This module offers a framework to declare capabilities a RabbitMQ node
+%% supports and therefore a way to determine if multiple RabbitMQ nodes in
+%% a cluster are compatible and can work together.
+%%
+%% == What a feature flag is ==
+%%
+%% A <strong>feature flag</strong> is a name and several properties given
+%% to a change in RabbitMQ which impacts its communication with other
+%% RabbitMQ nodes. This kind of change can be:
+%% <ul>
+%% <li>an update to an Erlang record</li>
+%% <li>a modification to a replicated Mnesia table schema</li>
+%% <li>a modification to Erlang messages exchanged between Erlang processes
+%% which might run on remote nodes</li>
+%% </ul>
+%%
+%% A feature flag is qualified by:
+%% <ul>
+%% <li>a <strong>name</strong></li>
+%% <li>a <strong>description</strong> (optional)</li>
+%% <li>a list of other <strong>feature flags this feature flag depends on
+%% </strong> (optional). This can be useful when the change builds up on
+%% top of a previous change. For instance, it expands a record which was
+%% already modified by a previous feature flag.</li>
+%% <li>a <strong>migration function</strong> (optional). If provided, this
+%% function is called when the feature flag is enabled. It is responsible
+%% for doing all the data conversion, if any, and confirming the feature
+%% flag can be enabled.</li>
+%% <li>a level of stability (stable or experimental). For now, this is only
+%% informational. But it might be used for specific purposes in the
+%% future.</li>
+%% </ul>
+%%
+%% == How to declare a feature flag ==
+%%
+%% To define a new feature flag, you need to use the
+%% `rabbitmq_feature_flag()' module attribute:
+%%
+%% ```
+%% -rabitmq_feature_flag(FeatureFlag).
+%% '''
+%%
+%% `FeatureFlag' is a {@type feature_flag_modattr()}.
+%%
+%% == How to enable a feature flag ==
+%%
+%% To enable a supported feature flag, you have the following solutions:
+%%
+%% <ul>
+%% <li>Using this module API:
+%% ```
+%% rabbit_feature_flags:enable(FeatureFlagName).
+%% '''
+%% </li>
+%% <li>Using the `rabbitmqctl' CLI:
+%% ```
+%% rabbitmqctl enable_feature_flag "$feature_flag_name"
+%% '''
+%% </li>
+%% </ul>
+%%
+%% == How to disable a feature flag ==
+%%
+%% Once enabled, there is <strong>currently no way to disable</strong> a
+%% feature flag.
+
+-module(rabbit_feature_flags).
+
+-export([list/0,
+ list/1,
+ list/2,
+ enable/1,
+ enable_all/0,
+ disable/1,
+ disable_all/0,
+ is_supported/1,
+ is_supported/2,
+ is_supported_locally/1,
+ is_supported_remotely/1,
+ is_supported_remotely/2,
+ is_supported_remotely/3,
+ is_enabled/1,
+ is_enabled/2,
+ is_disabled/1,
+ is_disabled/2,
+ info/0,
+ info/1,
+ init/0,
+ get_state/1,
+ get_stability/1,
+ check_node_compatibility/1,
+ check_node_compatibility/2,
+ is_node_compatible/1,
+ is_node_compatible/2,
+ sync_feature_flags_with_cluster/1,
+ sync_feature_flags_with_cluster/2,
+ enabled_feature_flags_list_file/0
+ ]).
+
+%% RabbitMQ internal use only.
+-export([initialize_registry/0,
+ mark_as_enabled_locally/2,
+ remote_nodes/0,
+ running_remote_nodes/0,
+ does_node_support/3]).
+
+-ifdef(TEST).
+-export([mark_as_enabled_remotely/2,
+ mark_as_enabled_remotely/4]).
+-endif.
+
+%% Default timeout for operations on remote nodes.
+-define(TIMEOUT, 60000).
+
+-define(FF_REGISTRY_LOADING_LOCK, {feature_flags_registry_loading, self()}).
+-define(FF_STATE_CHANGE_LOCK, {feature_flags_state_change, self()}).
+
+-type feature_flag_modattr() :: {feature_name(),
+ feature_props()}.
+%% The value of a `-rabbitmq_feature_flag()' module attribute used to
+%% declare a new feature flag.
+
+-type feature_name() :: atom().
+%% The feature flag's name. It is used in many places to identify a
+%% specific feature flag. In particular, this is how an end-user (or
+%% the CLI) can enable a feature flag. This is also the only bit which
+%% is persisted so a node remember which feature flags are enabled.
+
+-type feature_props() :: #{desc => string(),
+ doc_url => string(),
+ stability => stability(),
+ depends_on => [feature_name()],
+ migration_fun => migration_fun_name()}.
+%% The feature flag properties.
+%%
+%% All properties are optional.
+%%
+%% The properties are:
+%% <ul>
+%% <li>`desc': a description of the feature flag</li>
+%% <li>`doc_url': a URL pointing to more documentation about the feature
+%% flag</li>
+%% <li>`stability': the level of stability</li>
+%% <li>`depends_on': a list of feature flags name which must be enabled
+%% before this one</li>
+%% <li>`migration_fun': a migration function specified by its module and
+%% function names</li>
+%% </ul>
+%%
+%% Note that the `migration_fun' is a {@type migration_fun_name()},
+%% not a {@type migration_fun()}. However, the function signature
+%% must conform to the {@type migration_fun()} signature. The reason
+%% is that we must be able to represent it as an Erlang term when
+%% we regenerate the registry module source code (using {@link
+%% erl_syntax:abstract/1}).
+
+-type feature_flags() :: #{feature_name() => feature_props_extended()}.
+%% The feature flags map as returned or accepted by several functions in
+%% this module. In particular, this what the {@link list/0} function
+%% returns.
+
+-type feature_props_extended() :: #{desc => string(),
+ doc_url => string(),
+ stability => stability(),
+ migration_fun => migration_fun_name(),
+ depends_on => [feature_name()],
+ provided_by => atom()}.
+%% The feature flag properties, once expanded by this module when feature
+%% flags are discovered.
+%%
+%% The new properties compared to {@type feature_props()} are:
+%% <ul>
+%% <li>`provided_by': the name of the application providing the feature flag</li>
+%% </ul>
+
+-type stability() :: stable | experimental.
+%% The level of stability of a feature flag. Currently, only informational.
+
+-type migration_fun_name() :: {Module :: atom(), Function :: atom()}.
+%% The name of the module and function to call when changing the state of
+%% the feature flag.
+
+-type migration_fun() :: fun((feature_name(),
+ feature_props_extended(),
+ migration_fun_context())
+ -> ok | {error, any()} | % context = enable
+ boolean() | undefined). % context = is_enabled
+%% The migration function signature.
+%%
+%% It is called with context `enable' when a feature flag is being enabled.
+%% The function is responsible for this feature-flag-specific verification
+%% and data conversion. It returns `ok' if RabbitMQ can mark the feature
+%% flag as enabled an continue with the next one, if any. Otherwise, it
+%% returns `{error, any()}' if there is an error and the feature flag should
+%% remain disabled. The function must be idempotent: if the feature flag is
+%% already enabled on another node and the local node is running this function
+%% again because it is syncing its feature flags state, it should succeed.
+%%
+%% It is called with the context `is_enabled' to check if a feature flag
+%% is actually enabled. It is useful on RabbitMQ startup, just in case
+%% the previous instance failed to write the feature flags list file.
+
+-type migration_fun_context() :: enable | is_enabled.
+
+-export_type([feature_flag_modattr/0,
+ feature_props/0,
+ feature_name/0,
+ feature_flags/0,
+ feature_props_extended/0,
+ stability/0,
+ migration_fun_name/0,
+ migration_fun/0,
+ migration_fun_context/0]).
+
+-spec list() -> feature_flags().
+%% @doc
+%% Lists all supported feature flags.
+%%
+%% @returns A map of all supported feature flags.
+
+list() -> list(all).
+
+-spec list(Which :: all | enabled | disabled) -> feature_flags().
+%% @doc
+%% Lists all, enabled or disabled feature flags, depending on the argument.
+%%
+%% @param Which The group of feature flags to return: `all', `enabled' or
+%% `disabled'.
+%% @returns A map of selected feature flags.
+
+list(all) -> rabbit_ff_registry:list(all);
+list(enabled) -> rabbit_ff_registry:list(enabled);
+list(disabled) -> maps:filter(
+ fun(FeatureName, _) -> is_disabled(FeatureName) end,
+ list(all)).
+
+-spec list(all | enabled | disabled, stability()) -> feature_flags().
+%% @doc
+%% Lists all, enabled or disabled feature flags, depending on the first
+%% argument, only keeping those having the specified stability.
+%%
+%% @param Which The group of feature flags to return: `all', `enabled' or
+%% `disabled'.
+%% @param Stability The level of stability used to filter the map of feature
+%% flags.
+%% @returns A map of selected feature flags.
+
+list(Which, Stability)
+ when Stability =:= stable orelse Stability =:= experimental ->
+ maps:filter(fun(_, FeatureProps) ->
+ Stability =:= get_stability(FeatureProps)
+ end, list(Which)).
+
+-spec enable(feature_name() | [feature_name()]) -> ok |
+ {error, Reason :: any()}.
+%% @doc
+%% Enables the specified feature flag or set of feature flags.
+%%
+%% @param FeatureName The name or the list of names of feature flags to
+%% enable.
+%% @returns `ok' if the feature flags (and all the feature flags they
+%% depend on) were successfully enabled, or `{error, Reason}' if one
+%% feature flag could not be enabled (subsequent feature flags in the
+%% dependency tree are left unchanged).
+
+enable(FeatureName) when is_atom(FeatureName) ->
+ rabbit_log:debug("Feature flag `~s`: REQUEST TO ENABLE",
+ [FeatureName]),
+ case is_enabled(FeatureName) of
+ true ->
+ rabbit_log:debug("Feature flag `~s`: already enabled",
+ [FeatureName]),
+ ok;
+ false ->
+ rabbit_log:debug("Feature flag `~s`: not enabled, "
+ "check if supported by cluster",
+ [FeatureName]),
+ %% The feature flag must be supported locally and remotely
+ %% (i.e. by all members of the cluster).
+ case is_supported(FeatureName) of
+ true ->
+ rabbit_log:info("Feature flag `~s`: supported, "
+ "attempt to enable...",
+ [FeatureName]),
+ do_enable(FeatureName);
+ false ->
+ rabbit_log:error("Feature flag `~s`: not supported",
+ [FeatureName]),
+ {error, unsupported}
+ end
+ end;
+enable(FeatureNames) when is_list(FeatureNames) ->
+ with_feature_flags(FeatureNames, fun enable/1).
+
+-spec enable_all() -> ok | {error, any()}.
+%% @doc
+%% Enables all supported feature flags.
+%%
+%% @returns `ok' if the feature flags were successfully enabled,
+%% or `{error, Reason}' if one feature flag could not be enabled
+%% (subsequent feature flags in the dependency tree are left
+%% unchanged).
+
+enable_all() ->
+ with_feature_flags(maps:keys(list(all)), fun enable/1).
+
+-spec disable(feature_name() | [feature_name()]) -> ok | {error, any()}.
+%% @doc
+%% Disables the specified feature flag or set of feature flags.
+%%
+%% @param FeatureName The name or the list of names of feature flags to
+%% disable.
+%% @returns `ok' if the feature flags (and all the feature flags they
+%% depend on) were successfully disabled, or `{error, Reason}' if one
+%% feature flag could not be disabled (subsequent feature flags in the
+%% dependency tree are left unchanged).
+
+disable(FeatureName) when is_atom(FeatureName) ->
+ {error, unsupported};
+disable(FeatureNames) when is_list(FeatureNames) ->
+ with_feature_flags(FeatureNames, fun disable/1).
+
+-spec disable_all() -> ok | {error, any()}.
+%% @doc
+%% Disables all supported feature flags.
+%%
+%% @returns `ok' if the feature flags were successfully disabled,
+%% or `{error, Reason}' if one feature flag could not be disabled
+%% (subsequent feature flags in the dependency tree are left
+%% unchanged).
+
+disable_all() ->
+ with_feature_flags(maps:keys(list(all)), fun disable/1).
+
+-spec with_feature_flags([feature_name()],
+ fun((feature_name()) -> ok | {error, any()})) ->
+ ok | {error, any()}.
+%% @private
+
+with_feature_flags([FeatureName | Rest], Fun) ->
+ case Fun(FeatureName) of
+ ok -> with_feature_flags(Rest, Fun);
+ Error -> Error
+ end;
+with_feature_flags([], _) ->
+ ok.
+
+-spec is_supported(feature_name() | [feature_name()]) -> boolean().
+%% @doc
+%% Returns if a single feature flag or a set of feature flags is
+%% supported by the entire cluster.
+%%
+%% This is the same as calling both {@link is_supported_locally/1} and
+%% {@link is_supported_remotely/1} with a logical AND.
+%%
+%% @param FeatureNames The name or a list of names of the feature flag(s)
+%% to be checked.
+%% @returns `true' if the set of feature flags is entirely supported, or
+%% `false' if one of them is not or the RPC timed out.
+
+is_supported(FeatureNames) ->
+ is_supported_locally(FeatureNames) andalso
+ is_supported_remotely(FeatureNames).
+
+-spec is_supported(feature_name() | [feature_name()], timeout()) ->
+ boolean().
+%% @doc
+%% Returns if a single feature flag or a set of feature flags is
+%% supported by the entire cluster.
+%%
+%% This is the same as calling both {@link is_supported_locally/1} and
+%% {@link is_supported_remotely/2} with a logical AND.
+%%
+%% @param FeatureNames The name or a list of names of the feature flag(s)
+%% to be checked.
+%% @param Timeout Time in milliseconds after which the RPC gives up.
+%% @returns `true' if the set of feature flags is entirely supported, or
+%% `false' if one of them is not or the RPC timed out.
+
+is_supported(FeatureNames, Timeout) ->
+ is_supported_locally(FeatureNames) andalso
+ is_supported_remotely(FeatureNames, Timeout).
+
+-spec is_supported_locally(feature_name() | [feature_name()]) -> boolean().
+%% @doc
+%% Returns if a single feature flag or a set of feature flags is
+%% supported by the local node.
+%%
+%% @param FeatureNames The name or a list of names of the feature flag(s)
+%% to be checked.
+%% @returns `true' if the set of feature flags is entirely supported, or
+%% `false' if one of them is not.
+
+is_supported_locally(FeatureName) when is_atom(FeatureName) ->
+ rabbit_ff_registry:is_supported(FeatureName);
+is_supported_locally(FeatureNames) when is_list(FeatureNames) ->
+ lists:all(fun(F) -> rabbit_ff_registry:is_supported(F) end, FeatureNames).
+
+-spec is_supported_remotely(feature_name() | [feature_name()]) -> boolean().
+%% @doc
+%% Returns if a single feature flag or a set of feature flags is
+%% supported by all remote nodes.
+%%
+%% @param FeatureNames The name or a list of names of the feature flag(s)
+%% to be checked.
+%% @returns `true' if the set of feature flags is entirely supported, or
+%% `false' if one of them is not or the RPC timed out.
+
+is_supported_remotely(FeatureNames) ->
+ is_supported_remotely(FeatureNames, ?TIMEOUT).
+
+-spec is_supported_remotely(feature_name() | [feature_name()], timeout()) -> boolean().
+%% @doc
+%% Returns if a single feature flag or a set of feature flags is
+%% supported by all remote nodes.
+%%
+%% @param FeatureNames The name or a list of names of the feature flag(s)
+%% to be checked.
+%% @param Timeout Time in milliseconds after which the RPC gives up.
+%% @returns `true' if the set of feature flags is entirely supported, or
+%% `false' if one of them is not or the RPC timed out.
+
+is_supported_remotely(FeatureName, Timeout) when is_atom(FeatureName) ->
+ is_supported_remotely([FeatureName], Timeout);
+is_supported_remotely([], _) ->
+ rabbit_log:debug("Feature flags: skipping query for feature flags "
+ "support as the given list is empty",
+ []),
+ true;
+is_supported_remotely(FeatureNames, Timeout) when is_list(FeatureNames) ->
+ case running_remote_nodes() of
+ [] ->
+ rabbit_log:debug("Feature flags: isolated node; "
+ "skipping remote node query "
+ "=> consider `~p` supported",
+ [FeatureNames]),
+ true;
+ RemoteNodes ->
+ rabbit_log:debug("Feature flags: about to query these remote nodes "
+ "about support for `~p`: ~p",
+ [FeatureNames, RemoteNodes]),
+ is_supported_remotely(RemoteNodes, FeatureNames, Timeout)
+ end.
+
+-spec is_supported_remotely([node()],
+ feature_name() | [feature_name()],
+ timeout()) -> boolean().
+%% @doc
+%% Returns if a single feature flag or a set of feature flags is
+%% supported by specified remote nodes.
+%%
+%% @param RemoteNodes The list of remote nodes to query.
+%% @param FeatureNames The name or a list of names of the feature flag(s)
+%% to be checked.
+%% @param Timeout Time in milliseconds after which the RPC gives up.
+%% @returns `true' if the set of feature flags is entirely supported by
+%% all nodes, or `false' if one of them is not or the RPC timed out.
+
+is_supported_remotely(_, [], _) ->
+ rabbit_log:debug("Feature flags: skipping query for feature flags "
+ "support as the given list is empty",
+ []),
+ true;
+is_supported_remotely([Node | Rest], FeatureNames, Timeout) ->
+ case does_node_support(Node, FeatureNames, Timeout) of
+ true ->
+ is_supported_remotely(Rest, FeatureNames, Timeout);
+ false ->
+ rabbit_log:debug("Feature flags: stopping query "
+ "for support for `~p` here",
+ [FeatureNames]),
+ false
+ end;
+is_supported_remotely([], FeatureNames, _) ->
+ rabbit_log:info("Feature flags: all running remote nodes support `~p`",
+ [FeatureNames]),
+ true.
+
+-spec is_enabled(feature_name() | [feature_name()]) -> boolean().
+%% @doc
+%% Returns if a single feature flag or a set of feature flags is
+%% enabled.
+%%
+%% This is the same as calling {@link is_enabled/2} as a `blocking'
+%% call.
+%%
+%% @param FeatureNames The name or a list of names of the feature flag(s)
+%% to be checked.
+%% @returns `true' if the set of feature flags is enabled, or
+%% `false' if one of them is not.
+
+is_enabled(FeatureNames) ->
+ is_enabled(FeatureNames, blocking).
+
+-spec is_enabled
+(feature_name() | [feature_name()], blocking) ->
+ boolean();
+(feature_name() | [feature_name()], non_blocking) ->
+ boolean() | state_changing.
+%% @doc
+%% Returns if a single feature flag or a set of feature flags is
+%% enabled.
+%%
+%% When `blocking' is passed, the function waits (blocks) for the
+%% state of a feature flag being disabled or enabled stabilizes before
+%% returning its final state.
+%%
+%% When `non_blocking' is passed, the function returns immediately with
+%% the state of the feature flag (`true' if enabled, `false' otherwise)
+%% or `state_changing' is the state is being changed at the time of the
+%% call.
+%%
+%% @param FeatureNames The name or a list of names of the feature flag(s)
+%% to be checked.
+%% @returns `true' if the set of feature flags is enabled,
+%% `false' if one of them is not, or `state_changing' if one of them
+%% is being worked on. Note that `state_changing' has precedence over
+%% `false', so if one is `false' and another one is `state_changing',
+%% `state_changing' is returned.
+
+is_enabled(FeatureNames, non_blocking) ->
+ is_enabled_nb(FeatureNames);
+is_enabled(FeatureNames, blocking) ->
+ case is_enabled_nb(FeatureNames) of
+ state_changing ->
+ global:set_lock(?FF_STATE_CHANGE_LOCK),
+ global:del_lock(?FF_STATE_CHANGE_LOCK),
+ is_enabled(FeatureNames, blocking);
+ IsEnabled ->
+ IsEnabled
+ end.
+
+is_enabled_nb(FeatureName) when is_atom(FeatureName) ->
+ rabbit_ff_registry:is_enabled(FeatureName);
+is_enabled_nb(FeatureNames) when is_list(FeatureNames) ->
+ lists:foldl(
+ fun
+ (_F, state_changing = Acc) ->
+ Acc;
+ (F, false = Acc) ->
+ case rabbit_ff_registry:is_enabled(F) of
+ state_changing -> state_changing;
+ _ -> Acc
+ end;
+ (F, _) ->
+ rabbit_ff_registry:is_enabled(F)
+ end,
+ true, FeatureNames).
+
+-spec is_disabled(feature_name() | [feature_name()]) -> boolean().
+%% @doc
+%% Returns if a single feature flag or one feature flag in a set of
+%% feature flags is disabled.
+%%
+%% This is the same as negating the result of {@link is_enabled/1}.
+%%
+%% @param FeatureNames The name or a list of names of the feature flag(s)
+%% to be checked.
+%% @returns `true' if one of the feature flags is disabled, or
+%% `false' if they are all enabled.
+
+is_disabled(FeatureNames) ->
+ is_disabled(FeatureNames, blocking).
+
+-spec is_disabled
+(feature_name() | [feature_name()], blocking) ->
+ boolean();
+(feature_name() | [feature_name()], non_blocking) ->
+ boolean() | state_changing.
+%% @doc
+%% Returns if a single feature flag or one feature flag in a set of
+%% feature flags is disabled.
+%%
+%% This is the same as negating the result of {@link is_enabled/2},
+%% except that `state_changing' is returned as is.
+%%
+%% See {@link is_enabled/2} for a description of the `blocking' and
+%% `non_blocking' modes.
+%%
+%% @param FeatureNames The name or a list of names of the feature flag(s)
+%% to be checked.
+%% @returns `true' if one feature flag in the set of feature flags is
+%% disabled, `false' if they are all enabled, or `state_changing' if
+%% one of them is being worked on. Note that `state_changing' has
+%% precedence over `true', so if one is `true' (i.e. disabled) and
+%% another one is `state_changing', `state_changing' is returned.
+%%
+%% @see is_enabled/2
+
+is_disabled(FeatureName, Blocking) ->
+ case is_enabled(FeatureName, Blocking) of
+ state_changing -> state_changing;
+ IsEnabled -> not IsEnabled
+ end.
+
+-spec info() -> ok.
+%% @doc
+%% Displays a table on stdout summing up the supported feature flags,
+%% their state and various informations about them.
+
+info() ->
+ info(#{}).
+
+-spec info(#{color => boolean(),
+ lines => boolean(),
+ verbose => non_neg_integer()}) -> ok.
+%% @doc
+%% Displays a table on stdout summing up the supported feature flags,
+%% their state and various informations about them.
+%%
+%% Supported options are:
+%% <ul>
+%% <li>`color': a boolean to indicate if colors should be used to
+%% highlight some elements.</li>
+%% <li>`lines': a boolean to indicate if table borders should be drawn
+%% using ASCII lines instead of regular characters.</li>
+%% <li>`verbose': a non-negative integer to specify the level of
+%% verbosity.</li>
+%% </ul>
+%%
+%% @param Options A map of various options to tune the displayed table.
+
+info(Options) when is_map(Options) ->
+ rabbit_ff_extra:info(Options).
+
+-spec get_state(feature_name()) -> enabled | disabled | unavailable.
+%% @doc
+%% Returns the state of a feature flag.
+%%
+%% The possible states are:
+%% <ul>
+%% <li>`enabled': the feature flag is enabled.</li>
+%% <li>`disabled': the feature flag is supported by all nodes in the
+%% cluster but currently disabled.</li>
+%% <li>`unavailable': the feature flag is unsupported by at least one
+%% node in the cluster and can not be enabled for now.</li>
+%% </ul>
+%%
+%% @param FeatureName The name of the feature flag to check.
+%% @returns `enabled', `disabled' or `unavailable'.
+
+get_state(FeatureName) when is_atom(FeatureName) ->
+ IsEnabled = rabbit_feature_flags:is_enabled(FeatureName),
+ IsSupported = rabbit_feature_flags:is_supported(FeatureName),
+ case IsEnabled of
+ true -> enabled;
+ false -> case IsSupported of
+ true -> disabled;
+ false -> unavailable
+ end
+ end.
+
+-spec get_stability(feature_name() | feature_props_extended()) -> stability().
+%% @doc
+%% Returns the stability of a feature flag.
+%%
+%% The possible stability levels are:
+%% <ul>
+%% <li>`stable': the feature flag is stable and will not change in future
+%% releases: it can be enabled in production.</li>
+%% <li>`experimental': the feature flag is experimental and may change in
+%% the future (without a guaranteed upgrade path): enabling it in
+%% production is not recommended.</li>
+%% <li>`unavailable': the feature flag is unsupported by at least one
+%% node in the cluster and can not be enabled for now.</li>
+%% </ul>
+%%
+%% @param FeatureName The name of the feature flag to check.
+%% @returns `stable' or `experimental'.
+
+get_stability(FeatureName) when is_atom(FeatureName) ->
+ case rabbit_ff_registry:get(FeatureName) of
+ undefined -> undefined;
+ FeatureProps -> get_stability(FeatureProps)
+ end;
+get_stability(FeatureProps) when is_map(FeatureProps) ->
+ maps:get(stability, FeatureProps, stable).
+
+%% -------------------------------------------------------------------
+%% Feature flags registry.
+%% -------------------------------------------------------------------
+
+-spec init() -> ok | no_return().
+%% @private
+
+init() ->
+ %% We want to make sure the `feature_flags` file exists once
+ %% RabbitMQ was started at least once. This is not required by
+ %% this module (it works fine if the file is missing) but it helps
+ %% external tools.
+ _ = ensure_enabled_feature_flags_list_file_exists(),
+
+ %% We also "list" supported feature flags. We are not interested in
+ %% that list, however, it triggers the first initialization of the
+ %% registry.
+ _ = list(all),
+ ok.
+
+-spec initialize_registry() -> ok | {error, any()} | no_return().
+%% @private
+%% @doc
+%% Initializes or reinitializes the registry.
+%%
+%% The registry is an Erlang module recompiled at runtime to hold the
+%% state of all supported feature flags.
+%%
+%% That Erlang module is called {@link rabbit_ff_registry}. The initial
+%% source code of this module simply calls this function so it is
+%% replaced by a proper registry.
+%%
+%% Once replaced, the registry contains the map of all supported feature
+%% flags and their state. This is makes it very efficient to query a
+%% feature flag state or property.
+%%
+%% The registry is local to all RabbitMQ nodes.
+
+initialize_registry() ->
+ %% The first step is to get the list of enabled feature flags: if
+ %% this is the first time we initialize it, we read the list from
+ %% disk (the `feature_flags` file). Otherwise we query the existing
+ %% registry before it is replaced.
+ RegistryInitialized = rabbit_ff_registry:is_registry_initialized(),
+ EnabledFeatureNames = case RegistryInitialized of
+ true ->
+ maps:keys(rabbit_ff_registry:list(enabled));
+ false ->
+ read_enabled_feature_flags_list()
+ end,
+
+ %% We also record if the feature flags state was correctly written
+ %% to disk. Currently we don't use this information, but in the
+ %% future, we might want to retry the write if it failed so far.
+ %%
+ %% TODO: Retry to write the feature flags state if the first try
+ %% failed.
+ WrittenToDisk = case RegistryInitialized of
+ true ->
+ rabbit_ff_registry:is_registry_written_to_disk();
+ false ->
+ true
+ end,
+ initialize_registry(EnabledFeatureNames, [], WrittenToDisk).
+
+-spec initialize_registry([feature_name()], [feature_name()], boolean()) ->
+ ok | {error, any()} | no_return().
+%% @private
+%% @doc
+%% Initializes or reinitializes the registry.
+%%
+%% See {@link initialize_registry/0} for a description of the registry.
+%%
+%% This function takes two list of feature flag names:
+%% <ul>
+%% <li>the complete list of feature flags to mark as enabled</li>
+%% <li>the list of feature flags being enabled or disabled</li>
+%% </ul>
+%%
+%% The latter is used to block callers asking if a feature flag is
+%% enabled or disabled while its state is changing.
+
+initialize_registry(EnabledFeatureNames,
+ ChangingFeatureNames,
+ WrittenToDisk) ->
+ %% Query the list (it's a map to be exact) of supported feature
+ %% flags. That list comes from the `-rabbitmq_feature_flag().`
+ %% module attributes exposed by all currently loaded Erlang modules.
+ rabbit_log:debug("Feature flags: (re)initialize registry", []),
+ AllFeatureFlags = query_supported_feature_flags(),
+
+ %% We log the state of those feature flags.
+ rabbit_log:info("Feature flags: list of feature flags found:", []),
+ lists:foreach(
+ fun(FeatureName) ->
+ rabbit_log:info(
+ "Feature flags: [~s] ~s",
+ [case lists:member(FeatureName, EnabledFeatureNames) of
+ true -> "x";
+ false -> " "
+ end,
+ FeatureName])
+ end, lists:sort(maps:keys(AllFeatureFlags))),
+
+ %% We request the registry to be regenerated and reloaded with the
+ %% new state.
+ regen_registry_mod(AllFeatureFlags,
+ EnabledFeatureNames,
+ ChangingFeatureNames,
+ WrittenToDisk).
+
+-spec query_supported_feature_flags() -> feature_flags().
+%% @private
+
+query_supported_feature_flags() ->
+ rabbit_log:debug(
+ "Feature flags: query feature flags in loaded applications"),
+ AttributesPerApp = rabbit_misc:all_module_attributes(rabbit_feature_flag),
+ query_supported_feature_flags(AttributesPerApp, #{}).
+
+query_supported_feature_flags([{App, _Module, Attributes} | Rest],
+ AllFeatureFlags) ->
+ rabbit_log:debug("Feature flags: application `~s` "
+ "has ~b feature flags",
+ [App, length(Attributes)]),
+ AllFeatureFlags1 = lists:foldl(
+ fun({FeatureName, FeatureProps}, AllFF) ->
+ merge_new_feature_flags(AllFF,
+ App,
+ FeatureName,
+ FeatureProps)
+ end, AllFeatureFlags, Attributes),
+ query_supported_feature_flags(Rest, AllFeatureFlags1);
+query_supported_feature_flags([], AllFeatureFlags) ->
+ AllFeatureFlags.
+
+-spec merge_new_feature_flags(feature_flags(),
+ atom(),
+ feature_name(),
+ feature_props()) -> feature_flags().
+%% @private
+
+merge_new_feature_flags(AllFeatureFlags, App, FeatureName, FeatureProps)
+ when is_atom(FeatureName) andalso is_map(FeatureProps) ->
+ %% We expand the feature flag properties map with:
+ %% - the name of the application providing it: only informational
+ %% for now, but can be handy to understand that a feature flag
+ %% comes from a plugin.
+ FeatureProps1 = maps:put(provided_by, App, FeatureProps),
+ maps:merge(AllFeatureFlags,
+ #{FeatureName => FeatureProps1}).
+
+-spec regen_registry_mod(feature_flags(),
+ [feature_name()],
+ [feature_name()],
+ boolean()) -> ok | {error, any()} | no_return().
+%% @private
+
+regen_registry_mod(AllFeatureFlags,
+ EnabledFeatureNames,
+ ChangingFeatureNames,
+ WrittenToDisk) ->
+ %% Here, we recreate the source code of the `rabbit_ff_registry`
+ %% module from scratch.
+ %%
+ %% IMPORTANT: We want both modules to have the exact same public
+ %% API in order to simplify the life of developers and their tools
+ %% (Dialyzer, completion, and so on).
+
+ %% -module(rabbit_ff_registry).
+ ModuleAttr = erl_syntax:attribute(
+ erl_syntax:atom(module),
+ [erl_syntax:atom(rabbit_ff_registry)]),
+ ModuleForm = erl_syntax:revert(ModuleAttr),
+ %% -export([...]).
+ ExportAttr = erl_syntax:attribute(
+ erl_syntax:atom(export),
+ [erl_syntax:list(
+ [erl_syntax:arity_qualifier(
+ erl_syntax:atom(F),
+ erl_syntax:integer(A))
+ || {F, A} <- [{get, 1},
+ {list, 1},
+ {is_supported, 1},
+ {is_enabled, 1},
+ {is_registry_initialized, 0},
+ {is_registry_written_to_disk, 0}]]
+ )
+ ]
+ ),
+ ExportForm = erl_syntax:revert(ExportAttr),
+ %% get(_) -> ...
+ GetClauses = [erl_syntax:clause(
+ [erl_syntax:atom(FeatureName)],
+ [],
+ [erl_syntax:abstract(maps:get(FeatureName,
+ AllFeatureFlags))])
+ || FeatureName <- maps:keys(AllFeatureFlags)
+ ],
+ GetUnknownClause = erl_syntax:clause(
+ [erl_syntax:variable("_")],
+ [],
+ [erl_syntax:atom(undefined)]),
+ GetFun = erl_syntax:function(
+ erl_syntax:atom(get),
+ GetClauses ++ [GetUnknownClause]),
+ GetFunForm = erl_syntax:revert(GetFun),
+ %% list(_) -> ...
+ ListAllBody = erl_syntax:abstract(AllFeatureFlags),
+ ListAllClause = erl_syntax:clause([erl_syntax:atom(all)],
+ [],
+ [ListAllBody]),
+ EnabledFeatureFlags = maps:filter(
+ fun(FeatureName, _) ->
+ lists:member(FeatureName,
+ EnabledFeatureNames)
+ end, AllFeatureFlags),
+ ListEnabledBody = erl_syntax:abstract(EnabledFeatureFlags),
+ ListEnabledClause = erl_syntax:clause([erl_syntax:atom(enabled)],
+ [],
+ [ListEnabledBody]),
+ ListFun = erl_syntax:function(
+ erl_syntax:atom(list),
+ [ListAllClause, ListEnabledClause]),
+ ListFunForm = erl_syntax:revert(ListFun),
+ %% is_supported(_) -> ...
+ IsSupportedClauses = [erl_syntax:clause(
+ [erl_syntax:atom(FeatureName)],
+ [],
+ [erl_syntax:atom(true)])
+ || FeatureName <- maps:keys(AllFeatureFlags)
+ ],
+ NotSupportedClause = erl_syntax:clause(
+ [erl_syntax:variable("_")],
+ [],
+ [erl_syntax:atom(false)]),
+ IsSupportedFun = erl_syntax:function(
+ erl_syntax:atom(is_supported),
+ IsSupportedClauses ++ [NotSupportedClause]),
+ IsSupportedFunForm = erl_syntax:revert(IsSupportedFun),
+ %% is_enabled(_) -> ...
+ IsEnabledClauses = [erl_syntax:clause(
+ [erl_syntax:atom(FeatureName)],
+ [],
+ [case lists:member(FeatureName,
+ ChangingFeatureNames) of
+ true ->
+ erl_syntax:atom(state_changing);
+ false ->
+ erl_syntax:atom(
+ lists:member(FeatureName,
+ EnabledFeatureNames))
+ end])
+ || FeatureName <- maps:keys(AllFeatureFlags)
+ ],
+ NotEnabledClause = erl_syntax:clause(
+ [erl_syntax:variable("_")],
+ [],
+ [erl_syntax:atom(false)]),
+ IsEnabledFun = erl_syntax:function(
+ erl_syntax:atom(is_enabled),
+ IsEnabledClauses ++ [NotEnabledClause]),
+ IsEnabledFunForm = erl_syntax:revert(IsEnabledFun),
+ %% is_registry_initialized() -> ...
+ IsInitializedClauses = [erl_syntax:clause(
+ [],
+ [],
+ [erl_syntax:atom(true)])
+ ],
+ IsInitializedFun = erl_syntax:function(
+ erl_syntax:atom(is_registry_initialized),
+ IsInitializedClauses),
+ IsInitializedFunForm = erl_syntax:revert(IsInitializedFun),
+ %% is_registry_written_to_disk() -> ...
+ IsWrittenToDiskClauses = [erl_syntax:clause(
+ [],
+ [],
+ [erl_syntax:atom(WrittenToDisk)])
+ ],
+ IsWrittenToDiskFun = erl_syntax:function(
+ erl_syntax:atom(is_registry_written_to_disk),
+ IsWrittenToDiskClauses),
+ IsWrittenToDiskFunForm = erl_syntax:revert(IsWrittenToDiskFun),
+ %% Compilation!
+ Forms = [ModuleForm,
+ ExportForm,
+ GetFunForm,
+ ListFunForm,
+ IsSupportedFunForm,
+ IsEnabledFunForm,
+ IsInitializedFunForm,
+ IsWrittenToDiskFunForm],
+ CompileOpts = [return_errors,
+ return_warnings],
+ case compile:forms(Forms, CompileOpts) of
+ {ok, Mod, Bin, _} ->
+ load_registry_mod(Mod, Bin);
+ {error, Errors, Warnings} ->
+ rabbit_log:error("Feature flags: registry compilation:~n"
+ "Errors: ~p~n"
+ "Warnings: ~p",
+ [Errors, Warnings]),
+ {error, {compilation_failure, Errors, Warnings}}
+ end.
+
+-spec load_registry_mod(atom(), binary()) ->
+ ok | {error, any()} | no_return().
+%% @private
+
+load_registry_mod(Mod, Bin) ->
+ rabbit_log:debug("Feature flags: registry module ready, loading it..."),
+ FakeFilename = "Compiled and loaded by " ++ ?MODULE_STRING,
+ %% Time to load the new registry, replacing the old one. We use a
+ %% lock here to synchronize concurrent reloads.
+ global:set_lock(?FF_REGISTRY_LOADING_LOCK, [node()]),
+ _ = code:soft_purge(Mod),
+ _ = code:delete(Mod),
+ Ret = code:load_binary(Mod, FakeFilename, Bin),
+ global:del_lock(?FF_REGISTRY_LOADING_LOCK, [node()]),
+ case Ret of
+ {module, _} ->
+ rabbit_log:debug("Feature flags: registry module loaded"),
+ ok;
+ {error, Reason} ->
+ rabbit_log:error("Feature flags: failed to load registry "
+ "module: ~p", [Reason]),
+ throw({feature_flag_registry_reload_failure, Reason})
+ end.
+
+%% -------------------------------------------------------------------
+%% Feature flags state storage.
+%% -------------------------------------------------------------------
+
+-spec ensure_enabled_feature_flags_list_file_exists() -> ok | {error, any()}.
+%% @private
+
+ensure_enabled_feature_flags_list_file_exists() ->
+ File = enabled_feature_flags_list_file(),
+ case filelib:is_regular(File) of
+ true -> ok;
+ false -> write_enabled_feature_flags_list([])
+ end.
+
+-spec read_enabled_feature_flags_list() ->
+ [feature_name()] | no_return().
+%% @private
+
+read_enabled_feature_flags_list() ->
+ case try_to_read_enabled_feature_flags_list() of
+ {error, Reason} ->
+ File = enabled_feature_flags_list_file(),
+ throw({feature_flags_file_read_error, File, Reason});
+ Ret ->
+ Ret
+ end.
+
+-spec try_to_read_enabled_feature_flags_list() ->
+ [feature_name()] | {error, any()}.
+%% @private
+
+try_to_read_enabled_feature_flags_list() ->
+ File = enabled_feature_flags_list_file(),
+ case file:consult(File) of
+ {ok, [List]} ->
+ List;
+ {error, enoent} ->
+ %% If the file is missing, we consider the list of enabled
+ %% feature flags to be empty.
+ [];
+ {error, Reason} = Error ->
+ rabbit_log:error(
+ "Feature flags: failed to read the `feature_flags` "
+ "file at `~s`: ~s",
+ [File, file:format_error(Reason)]),
+ Error
+ end.
+
+-spec write_enabled_feature_flags_list([feature_name()]) ->
+ ok | no_return().
+%% @private
+
+write_enabled_feature_flags_list(FeatureNames) ->
+ case try_to_write_enabled_feature_flags_list(FeatureNames) of
+ {error, Reason} ->
+ File = enabled_feature_flags_list_file(),
+ throw({feature_flags_file_write_error, File, Reason});
+ Ret ->
+ Ret
+ end.
+
+-spec try_to_write_enabled_feature_flags_list([feature_name()]) ->
+ ok | {error, any()}.
+%% @private
+
+try_to_write_enabled_feature_flags_list(FeatureNames) ->
+ %% Before writing the new file, we read the existing one. If there
+ %% are unknown feature flags in that file, we want to keep their
+ %% state, even though they are unsupported at this time. It could be
+ %% that a plugin was disabled in the meantime.
+ PreviouslyEnabled = case try_to_read_enabled_feature_flags_list() of
+ {error, _} -> [];
+ List -> List
+ end,
+ FeatureNames1 = lists:foldl(
+ fun(Name, Acc) ->
+ case is_supported_locally(Name) of
+ true -> Acc;
+ false -> [Name | Acc]
+ end
+ end, FeatureNames, PreviouslyEnabled),
+ FeatureNames2 = lists:sort(FeatureNames1),
+
+ File = enabled_feature_flags_list_file(),
+ Content = io_lib:format("~p.~n", [FeatureNames2]),
+ %% TODO: If we fail to write the the file, we should spawn a process
+ %% to retry the operation.
+ case file:write_file(File, Content) of
+ ok ->
+ ok;
+ {error, Reason} = Error ->
+ rabbit_log:error(
+ "Feature flags: failed to write the `feature_flags` "
+ "file at `~s`: ~s",
+ [File, file:format_error(Reason)]),
+ Error
+ end.
+
+-spec enabled_feature_flags_list_file() -> file:filename().
+%% @doc
+%% Returns the path to the file where the state of feature flags is stored.
+%%
+%% @returns the path to the file.
+
+enabled_feature_flags_list_file() ->
+ case application:get_env(rabbit, feature_flags_file) of
+ {ok, Val} -> Val;
+ _ -> filename:join([rabbit_mnesia:dir(), "feature_flags"])
+ end.
+
+%% -------------------------------------------------------------------
+%% Feature flags management: enabling.
+%% -------------------------------------------------------------------
+
+-spec do_enable(feature_name()) -> ok | {error, any()} | no_return().
+%% @private
+
+do_enable(FeatureName) ->
+ %% We mark this feature flag as "state changing" before doing the
+ %% actual state change. We also take a global lock: this permits
+ %% to block callers asking about a feature flag changing state.
+ global:set_lock(?FF_STATE_CHANGE_LOCK),
+ Ret = case mark_as_enabled(FeatureName, state_changing) of
+ ok ->
+ case enable_dependencies(FeatureName, true) of
+ ok ->
+ case run_migration_fun(FeatureName, enable) of
+ ok ->
+ mark_as_enabled(FeatureName, true);
+ {error, no_migration_fun} ->
+ mark_as_enabled(FeatureName, true);
+ Error ->
+ Error
+ end;
+ Error ->
+ Error
+ end;
+ Error ->
+ Error
+ end,
+ case Ret of
+ ok -> ok;
+ _ -> mark_as_enabled(FeatureName, false)
+ end,
+ global:del_lock(?FF_STATE_CHANGE_LOCK),
+ Ret.
+
+-spec enable_locally(feature_name()) -> ok | {error, any()} | no_return().
+%% @private
+
+enable_locally(FeatureName) when is_atom(FeatureName) ->
+ case is_enabled(FeatureName) of
+ true ->
+ ok;
+ false ->
+ rabbit_log:debug(
+ "Feature flag `~s`: enable locally (i.e. was enabled on the cluster "
+ "when this node was not part of it)",
+ [FeatureName]),
+ do_enable_locally(FeatureName)
+ end.
+
+-spec do_enable_locally(feature_name()) -> ok | {error, any()} | no_return().
+%% @private
+
+do_enable_locally(FeatureName) ->
+ case enable_dependencies(FeatureName, false) of
+ ok ->
+ case run_migration_fun(FeatureName, enable) of
+ ok ->
+ mark_as_enabled_locally(FeatureName, true);
+ {error, no_migration_fun} ->
+ mark_as_enabled_locally(FeatureName, true);
+ Error ->
+ Error
+ end;
+ Error ->
+ Error
+ end.
+
+-spec enable_dependencies(feature_name(), boolean()) ->
+ ok | {error, any()} | no_return().
+%% @private
+
+enable_dependencies(FeatureName, Everywhere) ->
+ FeatureProps = rabbit_ff_registry:get(FeatureName),
+ DependsOn = maps:get(depends_on, FeatureProps, []),
+ rabbit_log:debug("Feature flag `~s`: enable dependencies: ~p",
+ [FeatureName, DependsOn]),
+ enable_dependencies(FeatureName, DependsOn, Everywhere).
+
+-spec enable_dependencies(feature_name(), [feature_name()], boolean()) ->
+ ok | {error, any()} | no_return().
+%% @private
+
+enable_dependencies(TopLevelFeatureName, [FeatureName | Rest], Everywhere) ->
+ Ret = case Everywhere of
+ true -> enable(FeatureName);
+ false -> enable_locally(FeatureName)
+ end,
+ case Ret of
+ ok -> enable_dependencies(TopLevelFeatureName, Rest, Everywhere);
+ Error -> Error
+ end;
+enable_dependencies(_, [], _) ->
+ ok.
+
+-spec run_migration_fun(feature_name(), any()) ->
+ any() | {error, any()}.
+%% @private
+
+run_migration_fun(FeatureName, Arg) ->
+ FeatureProps = rabbit_ff_registry:get(FeatureName),
+ run_migration_fun(FeatureName, FeatureProps, Arg).
+
+run_migration_fun(FeatureName, FeatureProps, Arg) ->
+ case maps:get(migration_fun, FeatureProps, none) of
+ {MigrationMod, MigrationFun}
+ when is_atom(MigrationMod) andalso is_atom(MigrationFun) ->
+ rabbit_log:debug("Feature flag `~s`: run migration function ~p "
+ "with arg: ~p",
+ [FeatureName, MigrationFun, Arg]),
+ try
+ erlang:apply(MigrationMod,
+ MigrationFun,
+ [FeatureName, FeatureProps, Arg])
+ catch
+ _:Reason:Stacktrace ->
+ rabbit_log:error("Feature flag `~s`: migration function "
+ "crashed: ~p~n~p",
+ [FeatureName, Reason, Stacktrace]),
+ {error, {migration_fun_crash, Reason, Stacktrace}}
+ end;
+ none ->
+ {error, no_migration_fun};
+ Invalid ->
+ rabbit_log:error("Feature flag `~s`: invalid migration "
+ "function: ~p",
+ [FeatureName, Invalid]),
+ {error, {invalid_migration_fun, Invalid}}
+ end.
+
+-spec mark_as_enabled(feature_name(), boolean() | state_changing) ->
+ any() | {error, any()} | no_return().
+%% @private
+
+mark_as_enabled(FeatureName, IsEnabled) ->
+ case mark_as_enabled_locally(FeatureName, IsEnabled) of
+ ok ->
+ mark_as_enabled_remotely(FeatureName, IsEnabled);
+ Error ->
+ Error
+ end.
+
+-spec mark_as_enabled_locally(feature_name(), boolean() | state_changing) ->
+ any() | {error, any()} | no_return().
+%% @private
+
+mark_as_enabled_locally(FeatureName, IsEnabled) ->
+ rabbit_log:info("Feature flag `~s`: mark as enabled=~p",
+ [FeatureName, IsEnabled]),
+ EnabledFeatureNames = maps:keys(list(enabled)),
+ NewEnabledFeatureNames = case IsEnabled of
+ true ->
+ [FeatureName | EnabledFeatureNames];
+ false ->
+ EnabledFeatureNames -- [FeatureName];
+ state_changing ->
+ EnabledFeatureNames
+ end,
+ WrittenToDisk = case NewEnabledFeatureNames of
+ EnabledFeatureNames ->
+ rabbit_ff_registry:is_registry_written_to_disk();
+ _ ->
+ ok =:= try_to_write_enabled_feature_flags_list(
+ NewEnabledFeatureNames)
+ end,
+ case IsEnabled of
+ state_changing ->
+ initialize_registry(EnabledFeatureNames,
+ [FeatureName],
+ WrittenToDisk);
+ _ ->
+ initialize_registry(NewEnabledFeatureNames,
+ [],
+ WrittenToDisk)
+ end.
+
+-spec mark_as_enabled_remotely(feature_name(), boolean() | state_changing) ->
+ any() | {error, any()} | no_return().
+%% @private
+
+mark_as_enabled_remotely(FeatureName, IsEnabled) ->
+ Nodes = running_remote_nodes(),
+ mark_as_enabled_remotely(Nodes, FeatureName, IsEnabled, ?TIMEOUT).
+
+-spec mark_as_enabled_remotely([node()], feature_name(), boolean() | state_changing, timeout()) ->
+ any() | {error, any()} | no_return().
+%% @private
+
+mark_as_enabled_remotely([], _FeatureName, _IsEnabled, _Timeout) ->
+ ok;
+mark_as_enabled_remotely(Nodes, FeatureName, IsEnabled, Timeout) ->
+ T0 = erlang:timestamp(),
+ Rets = [{Node, rpc:call(Node,
+ ?MODULE,
+ mark_as_enabled_locally,
+ [FeatureName, IsEnabled],
+ Timeout)}
+ || Node <- Nodes],
+ FailedNodes = [Node || {Node, Ret} <- Rets, Ret =/= ok],
+ case FailedNodes of
+ [] ->
+ rabbit_log:debug(
+ "Feature flags: `~s` successfully marked as enabled=~p on all "
+ "nodes", [FeatureName, IsEnabled]),
+ ok;
+ _ ->
+ T1 = erlang:timestamp(),
+ rabbit_log:error(
+ "Feature flags: failed to mark feature flag `~s` as enabled=~p "
+ "on the following nodes:", [FeatureName, IsEnabled]),
+ [rabbit_log:error(
+ "Feature flags: - ~s: ~p",
+ [Node, Ret])
+ || {Node, Ret} <- Rets,
+ Ret =/= ok],
+ NewTimeout = Timeout - (timer:now_diff(T1, T0) div 1000),
+ if
+ NewTimeout > 0 ->
+ rabbit_log:debug(
+ "Feature flags: retrying with a timeout of ~b "
+ "milliseconds", [NewTimeout]),
+ mark_as_enabled_remotely(FailedNodes,
+ FeatureName,
+ IsEnabled,
+ NewTimeout);
+ true ->
+ rabbit_log:debug(
+ "Feature flags: not retrying; RPC went over the "
+ "~b milliseconds timeout", [Timeout]),
+ %% FIXME: Is crashing the process the best solution here?
+ throw(
+ {failed_to_mark_feature_flag_as_enabled_on_remote_nodes,
+ FeatureName, IsEnabled, FailedNodes})
+ end
+ end.
+
+%% -------------------------------------------------------------------
+%% Coordination with remote nodes.
+%% -------------------------------------------------------------------
+
+-spec remote_nodes() -> [node()].
+%% @private
+
+remote_nodes() ->
+ mnesia:system_info(db_nodes) -- [node()].
+
+-spec running_remote_nodes() -> [node()].
+%% @private
+
+running_remote_nodes() ->
+ mnesia:system_info(running_db_nodes) -- [node()].
+
+-spec does_node_support(node(), [feature_name()], timeout()) -> boolean().
+%% @private
+
+does_node_support(Node, FeatureNames, Timeout) ->
+ rabbit_log:debug("Feature flags: querying `~p` support on node ~s...",
+ [FeatureNames, Node]),
+ Ret = case node() of
+ Node ->
+ is_supported_locally(FeatureNames);
+ _ ->
+ rpc:call(Node,
+ ?MODULE, is_supported_locally, [FeatureNames],
+ Timeout)
+ end,
+ case Ret of
+ {badrpc, {'EXIT',
+ {undef,
+ [{?MODULE, is_supported_locally, [FeatureNames], []}
+ | _]}}} ->
+ %% If rabbit_feature_flags:is_supported_locally/1 is undefined
+ %% on the remote node, we consider it to be a 3.7.x node.
+ %%
+ %% Theoritically, it could be an older version (3.6.x and
+ %% older). But the RabbitMQ version consistency check
+ %% (rabbit_misc:version_minor_equivalent/2) called from
+ %% rabbit_mnesia:check_rabbit_consistency/2 already blocked
+ %% this situation from happening before we reach this point.
+ rabbit_log:debug(
+ "Feature flags: ?MODULE:is_supported_locally(~p) unavailable "
+ "on node `~s`: assuming it is a RabbitMQ 3.7.x node "
+ "=> consider the feature flags unsupported",
+ [FeatureNames, Node]),
+ false;
+ {badrpc, Reason} ->
+ rabbit_log:error("Feature flags: error while querying `~p` "
+ "support on node ~s: ~p",
+ [FeatureNames, Node, Reason]),
+ false;
+ true ->
+ rabbit_log:debug("Feature flags: node `~s` supports `~p`",
+ [Node, FeatureNames]),
+ true;
+ false ->
+ rabbit_log:debug("Feature flags: node `~s` does not support `~p`; "
+ "stopping query here",
+ [Node, FeatureNames]),
+ false
+ end.
+
+-spec check_node_compatibility(node()) -> ok | {error, any()}.
+%% @doc
+%% Checks if a node is compatible with the local node.
+%%
+%% To be compatible, the following two conditions must be met:
+%% <ol>
+%% <li>feature flags enabled on the local node must be supported by the
+%% remote node</li>
+%% <li>feature flags enabled on the remote node must be supported by the
+%% local node</li>
+%% </ol>
+%%
+%% @param Node the name of the remote node to test.
+%% @returns `ok' if they are compatible, `{error, Reason}' if they are not.
+
+check_node_compatibility(Node) ->
+ check_node_compatibility(Node, ?TIMEOUT).
+
+-spec check_node_compatibility(node(), timeout()) -> ok | {error, any()}.
+%% @doc
+%% Checks if a node is compatible with the local node.
+%%
+%% See {@link check_node_compatibility/1} for the conditions required to
+%% consider two nodes compatible.
+%%
+%% @param Node the name of the remote node to test.
+%% @param Timeout Time in milliseconds after which the RPC gives up.
+%% @returns `ok' if they are compatible, `{error, Reason}' if they are not.
+%%
+%% @see check_node_compatibility/1
+
+check_node_compatibility(Node, Timeout) ->
+ rabbit_log:debug("Feature flags: node `~s` compatibility check, part 1/2",
+ [Node]),
+ Part1 = local_enabled_feature_flags_is_supported_remotely(Node, Timeout),
+ rabbit_log:debug("Feature flags: node `~s` compatibility check, part 2/2",
+ [Node]),
+ Part2 = remote_enabled_feature_flags_is_supported_locally(Node, Timeout),
+ case {Part1, Part2} of
+ {true, true} ->
+ rabbit_log:debug("Feature flags: node `~s` is compatible", [Node]),
+ ok;
+ {false, _} ->
+ rabbit_log:error("Feature flags: node `~s` is INCOMPATIBLE: "
+ "feature flags enabled locally are not "
+ "supported remotely",
+ [Node]),
+ {error, incompatible_feature_flags};
+ {_, false} ->
+ rabbit_log:error("Feature flags: node `~s` is INCOMPATIBLE: "
+ "feature flags enabled remotely are not "
+ "supported locally",
+ [Node]),
+ {error, incompatible_feature_flags}
+ end.
+
+-spec is_node_compatible(node()) -> boolean().
+%% @doc
+%% Returns if a node is compatible with the local node.
+%%
+%% This function calls {@link check_node_compatibility/2} and returns
+%% `true' the latter returns `ok'. Therefore this is the same code,
+%% except that this function returns a boolean, but not the reason of
+%% the incompatibility if any.
+%%
+%% @param Node the name of the remote node to test.
+%% @returns `true' if they are compatible, `false' otherwise.
+
+is_node_compatible(Node) ->
+ is_node_compatible(Node, ?TIMEOUT).
+
+-spec is_node_compatible(node(), timeout()) -> boolean().
+%% @doc
+%% Returns if a node is compatible with the local node.
+%%
+%% This function calls {@link check_node_compatibility/2} and returns
+%% `true' the latter returns `ok'. Therefore this is the same code,
+%% except that this function returns a boolean, but not the reason
+%% of the incompatibility if any. If the RPC times out, nodes are
+%% considered incompatible.
+%%
+%% @param Node the name of the remote node to test.
+%% @param Timeout Time in milliseconds after which the RPC gives up.
+%% @returns `true' if they are compatible, `false' otherwise.
+
+is_node_compatible(Node, Timeout) ->
+ check_node_compatibility(Node, Timeout) =:= ok.
+
+-spec local_enabled_feature_flags_is_supported_remotely(node(),
+ timeout()) ->
+ boolean().
+%% @private
+
+local_enabled_feature_flags_is_supported_remotely(Node, Timeout) ->
+ LocalEnabledFeatureNames = maps:keys(list(enabled)),
+ is_supported_remotely([Node], LocalEnabledFeatureNames, Timeout).
+
+-spec remote_enabled_feature_flags_is_supported_locally(node(),
+ timeout()) ->
+ boolean().
+%% @private
+
+remote_enabled_feature_flags_is_supported_locally(Node, Timeout) ->
+ case query_remote_feature_flags(Node, enabled, Timeout) of
+ {error, _} ->
+ false;
+ RemoteEnabledFeatureFlags when is_map(RemoteEnabledFeatureFlags) ->
+ RemoteEnabledFeatureNames = maps:keys(RemoteEnabledFeatureFlags),
+ is_supported_locally(RemoteEnabledFeatureNames)
+ end.
+
+-spec query_remote_feature_flags(node(),
+ Which :: all | enabled | disabled,
+ timeout()) ->
+ feature_flags() | {error, any()}.
+%% @private
+
+query_remote_feature_flags(Node, Which, Timeout) ->
+ rabbit_log:debug("Feature flags: querying ~s feature flags "
+ "on node `~s`...",
+ [Which, Node]),
+ case rpc:call(Node, ?MODULE, list, [Which], Timeout) of
+ {badrpc, {'EXIT',
+ {undef,
+ [{?MODULE, list, [Which], []}
+ | _]}}} ->
+ %% See does_node_support/3 for an explanation why we
+ %% consider this node a 3.7.x node.
+ rabbit_log:debug(
+ "Feature flags: ?MODULE:list(~s) unavailable on node `~s`: "
+ "assuming it is a RabbitMQ 3.7.x node "
+ "=> consider the list empty",
+ [Which, Node]),
+ #{};
+ {badrpc, Reason} = Error ->
+ rabbit_log:error(
+ "Feature flags: error while querying ~s feature flags "
+ "on node `~s`: ~p",
+ [Which, Node, Reason]),
+ {error, Error};
+ RemoteFeatureFlags when is_map(RemoteFeatureFlags) ->
+ RemoteFeatureNames = maps:keys(RemoteFeatureFlags),
+ rabbit_log:debug("Feature flags: querying ~s feature flags "
+ "on node `~s` done; ~s features: ~p",
+ [Which, Node, Which, RemoteFeatureNames]),
+ RemoteFeatureFlags
+ end.
+
+-spec sync_feature_flags_with_cluster([node()]) ->
+ ok | {error, any()} | no_return().
+%% @private
+
+sync_feature_flags_with_cluster(Nodes) ->
+ sync_feature_flags_with_cluster(Nodes, ?TIMEOUT).
+
+-spec sync_feature_flags_with_cluster([node()], timeout()) ->
+ ok | {error, any()} | no_return().
+%% @private
+
+sync_feature_flags_with_cluster([], _) ->
+ verify_which_feature_flags_are_actually_enabled(),
+ FeatureNames = get_forced_feature_flag_names(),
+ case remote_nodes() of
+ [] when FeatureNames =:= undefined ->
+ rabbit_log:debug(
+ "Feature flags: starting an unclustered node: "
+ "all feature flags will be enabled by default"),
+ enable_all();
+ [] ->
+ case FeatureNames of
+ [] ->
+ rabbit_log:debug(
+ "Feature flags: starting an unclustered node: "
+ "all feature flags are forcibly left disabled "
+ "from the RABBITMQ_FEATURE_FLAGS environment "
+ "variable");
+ _ ->
+ rabbit_log:debug(
+ "Feature flags: starting an unclustered node: "
+ "only the following feature flags specified in "
+ "the RABBITMQ_FEATURE_FLAGS environment variable "
+ "will be enabled: ~p",
+ [FeatureNames])
+ end,
+ enable(FeatureNames);
+ _ ->
+ ok
+ end;
+sync_feature_flags_with_cluster(Nodes, Timeout) ->
+ verify_which_feature_flags_are_actually_enabled(),
+ RemoteNodes = Nodes -- [node()],
+ sync_feature_flags_with_cluster1(RemoteNodes, Timeout).
+
+sync_feature_flags_with_cluster1([], _) ->
+ ok;
+sync_feature_flags_with_cluster1(RemoteNodes, Timeout) ->
+ RandomRemoteNode = pick_one_node(RemoteNodes),
+ rabbit_log:debug("Feature flags: SYNCING FEATURE FLAGS with node `~s`...",
+ [RandomRemoteNode]),
+ case query_remote_feature_flags(RandomRemoteNode, enabled, Timeout) of
+ {error, _} = Error ->
+ Error;
+ RemoteFeatureFlags ->
+ RemoteFeatureNames = maps:keys(RemoteFeatureFlags),
+ do_sync_feature_flags_with_node1(RemoteFeatureNames)
+ end.
+
+pick_one_node(Nodes) ->
+ RandomIndex = rand:uniform(length(Nodes)),
+ lists:nth(RandomIndex, Nodes).
+
+do_sync_feature_flags_with_node1([FeatureFlag | Rest]) ->
+ case enable_locally(FeatureFlag) of
+ ok -> do_sync_feature_flags_with_node1(Rest);
+ Error -> Error
+ end;
+do_sync_feature_flags_with_node1([]) ->
+ ok.
+
+-spec get_forced_feature_flag_names() -> [feature_name()] | undefined.
+%% @private
+%% @doc
+%% Returns the (possibly empty) list of feature flags the user want
+%% to enable out-of-the-box when starting a node for the first time.
+%%
+%% Without this, the default is to enable all the supported feature
+%% flags.
+%%
+%% There are two ways to specify that list:
+%% <ol>
+%% <li>Using the `$RABBITMQ_FEATURE_FLAGS' environment variable; for
+%% instance `RABBITMQ_FEATURE_FLAGS=quorum_queue,mnevis'.</li>
+%% <li>Using the `forced_feature_flags_on_init' configuration parameter;
+%% for instance
+%% `{rabbit, [{forced_feature_flags_on_init, [quorum_queue, mnevis]}]}'.</li>
+%% </ol>
+%%
+%% The environment variable has precedence over the configuration
+%% parameter.
+
+get_forced_feature_flag_names() ->
+ Ret = case get_forced_feature_flag_names_from_env() of
+ undefined -> get_forced_feature_flag_names_from_config();
+ List -> List
+ end,
+ case Ret of
+ undefined -> ok;
+ [] -> rabbit_log:info("Feature flags: automatic enablement "
+ "of feature flags disabled (i.e. none "
+ "will be enabled automatically)", []);
+ _ -> rabbit_log:info("Feature flags: automatic enablement "
+ "of feature flags limited to the "
+ "following list: ~p", [Ret])
+ end,
+ Ret.
+
+-spec get_forced_feature_flag_names_from_env() -> [feature_name()] | undefined.
+%% @private
+
+get_forced_feature_flag_names_from_env() ->
+ case os:getenv("RABBITMQ_FEATURE_FLAGS") of
+ false -> undefined;
+ Value -> [list_to_atom(V) ||V <- string:lexemes(Value, ",")]
+ end.
+
+-spec get_forced_feature_flag_names_from_config() -> [feature_name()] | undefined.
+%% @private
+
+get_forced_feature_flag_names_from_config() ->
+ Value = application:get_env(rabbit,
+ forced_feature_flags_on_init,
+ undefined),
+ case Value of
+ undefined ->
+ Value;
+ _ when is_list(Value) ->
+ case lists:all(fun is_atom/1, Value) of
+ true -> Value;
+ false -> undefined
+ end;
+ _ ->
+ undefined
+ end.
+
+-spec verify_which_feature_flags_are_actually_enabled() ->
+ ok | {error, any()} | no_return().
+%% @private
+
+verify_which_feature_flags_are_actually_enabled() ->
+ AllFeatureFlags = list(all),
+ EnabledFeatureNames = read_enabled_feature_flags_list(),
+ rabbit_log:debug("Feature flags: double-checking feature flag states..."),
+ %% In case the previous instance of the node failed to write the
+ %% feature flags list file, we want to double-check the list of
+ %% enabled feature flags read from disk. For each feature flag,
+ %% we call the migration function to query if the feature flag is
+ %% actually enabled.
+ %%
+ %% If a feature flag doesn't provide a migration function (or if the
+ %% function fails), we keep the current state of the feature flag.
+ List1 = maps:fold(
+ fun(Name, Props, Acc) ->
+ Ret = run_migration_fun(Name, Props, is_enabled),
+ case Ret of
+ true ->
+ [Name | Acc];
+ false ->
+ Acc;
+ _ ->
+ MarkedAsEnabled = is_enabled(Name),
+ case MarkedAsEnabled of
+ true -> [Name | Acc];
+ false -> Acc
+ end
+ end
+ end,
+ [], AllFeatureFlags),
+ RepairedEnabledFeatureNames = lists:sort(List1),
+ %% We log the list of feature flags for which the state changes
+ %% after the check above.
+ WereEnabled = RepairedEnabledFeatureNames -- EnabledFeatureNames,
+ WereDisabled = EnabledFeatureNames -- RepairedEnabledFeatureNames,
+ case {WereEnabled, WereDisabled} of
+ {[], []} -> ok;
+ _ -> rabbit_log:warning(
+ "Feature flags: the previous instance of this node "
+ "must have failed to write the `feature_flags` "
+ "file at `~s`:",
+ [enabled_feature_flags_list_file()])
+ end,
+ case WereEnabled of
+ [] -> ok;
+ _ -> rabbit_log:warning(
+ "Feature flags: - list of previously enabled "
+ "feature flags now marked as such: ~p", [WereEnabled])
+ end,
+ case WereDisabled of
+ [] -> ok;
+ _ -> rabbit_log:warning(
+ "Feature flags: - list of previously disabled "
+ "feature flags now marked as such: ~p", [WereDisabled])
+ end,
+ %% Finally, if the new list of enabled feature flags is different
+ %% than the one on disk, we write the new list and re-initialize the
+ %% registry.
+ case RepairedEnabledFeatureNames of
+ EnabledFeatureNames ->
+ ok;
+ _ ->
+ rabbit_log:debug(
+ "Feature flags: write the repaired list of enabled feature "
+ "flags"),
+ WrittenToDisk = ok =:= try_to_write_enabled_feature_flags_list(
+ RepairedEnabledFeatureNames),
+ initialize_registry(
+ RepairedEnabledFeatureNames, [], WrittenToDisk)
+ end.
diff --git a/src/rabbit_ff_extra.erl b/src/rabbit_ff_extra.erl
new file mode 100644
index 0000000000..35cd98d740
--- /dev/null
+++ b/src/rabbit_ff_extra.erl
@@ -0,0 +1,236 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018-2019 Pivotal Software, Inc. All rights reserved.
+%%
+
+%% @author The RabbitMQ team
+%% @copyright 2018-2019 Pivotal Software, Inc.
+%%
+%% @doc
+%% This module provides extra functions unused by the feature flags
+%% subsystem core functionality.
+
+-module(rabbit_ff_extra).
+
+-export([cli_info/0,
+ info/1,
+ info/2,
+ format_error/1]).
+
+-type cli_info() :: [cli_info_entry()].
+%% A list of feature flags properties, formatted for the RabbitMQ CLI.
+
+-type cli_info_entry() :: [{name, rabbit_feature_flags:feature_name()} |
+ {state, enabled | disabled | unavailable} |
+ {stability, rabbit_feature_flags:stability()} |
+ {provided_by, atom()} |
+ {desc, string()} |
+ {doc_url, string()}].
+%% A list of properties for a single feature flag, formatted for the
+%% RabbitMQ CLI.
+
+-type info_options() :: #{color => boolean(),
+ lines => boolean(),
+ verbose => non_neg_integer()}.
+%% Options accepted by {@link info/1} and {@link info/2}.
+
+-export_type([info_options/0]).
+
+-spec cli_info() -> cli_info().
+%% @doc
+%% Returns a list of all feature flags properties.
+%%
+%% @returns the list of all feature flags properties.
+
+cli_info() ->
+ cli_info(rabbit_feature_flags:list(all)).
+
+-spec cli_info(rabbit_feature_flags:feature_flags()) -> cli_info().
+%% @doc
+%% Formats a map of feature flags and their properties into a list of
+%% feature flags properties as expected by the RabbitMQ CLI.
+%%
+%% @param FeatureFlags A map of feature flags.
+%% @returns the list of feature flags properties, created from the map
+%% specified in arguments.
+
+cli_info(FeatureFlags) ->
+ lists:foldr(
+ fun(FeatureName, Acc) ->
+ FeatureProps = maps:get(FeatureName, FeatureFlags),
+ State = rabbit_feature_flags:get_state(FeatureName),
+ Stability = rabbit_feature_flags:get_stability(FeatureProps),
+ App = maps:get(provided_by, FeatureProps),
+ Desc = maps:get(desc, FeatureProps, ""),
+ DocUrl = maps:get(doc_url, FeatureProps, ""),
+ FFInfo = [{name, FeatureName},
+ {desc, unicode:characters_to_binary(Desc)},
+ {doc_url, unicode:characters_to_binary(DocUrl)},
+ {state, State},
+ {stability, Stability},
+ {provided_by, App}],
+ [FFInfo | Acc]
+ end, [], lists:sort(maps:keys(FeatureFlags))).
+
+-spec info(info_options()) -> ok.
+%% @doc
+%% Displays an array of all supported feature flags and their properties
+%% on `stdout'.
+%%
+%% @param Options Options to tune what is displayed and how.
+
+info(Options) ->
+ %% Two tables: one for stable feature flags, one for experimental ones.
+ StableFF = rabbit_feature_flags:list(all, stable),
+ case maps:size(StableFF) of
+ 0 ->
+ ok;
+ _ ->
+ io:format("~n~s## Stable feature flags:~s~n",
+ [rabbit_pretty_stdout:ascii_color(bright_white),
+ rabbit_pretty_stdout:ascii_color(default)]),
+ info(StableFF, Options)
+ end,
+ ExpFF = rabbit_feature_flags:list(all, experimental),
+ case maps:size(ExpFF) of
+ 0 ->
+ ok;
+ _ ->
+ io:format("~n~s## Experimental feature flags:~s~n",
+ [rabbit_pretty_stdout:ascii_color(bright_white),
+ rabbit_pretty_stdout:ascii_color(default)]),
+ info(ExpFF, Options)
+ end,
+ case maps:size(StableFF) + maps:size(ExpFF) of
+ 0 -> ok;
+ _ -> state_legend()
+ end.
+
+-spec info(rabbit_feature_flags:feature_flags(), info_options()) -> ok.
+%% @doc
+%% Displays an array of feature flags and their properties on `stdout',
+%% based on the specified feature flags map.
+%%
+%% @param FeatureFlags Map of the feature flags to display.
+%% @param Options Options to tune what is displayed and how.
+
+info(FeatureFlags, Options) ->
+ Verbose = maps:get(verbose, Options, 0),
+ IsaTty= case os:getenv("TERM") of
+ %% We don't have access to isatty(3), so let's
+ %% assume that is $TERM is defined, we can use
+ %% colors and drawing characters.
+ false -> false;
+ _ -> true
+ end,
+ UseColors = maps:get(color, Options, IsaTty),
+ UseLines = maps:get(lines, Options, IsaTty),
+ %% Table columns:
+ %% | Name | State | Provided by | Description
+ %%
+ %% where:
+ %% State = Enabled | Disabled | Unavailable (if a node doesn't
+ %% support it).
+ TableHeader = [
+ [{"Name", bright_white},
+ {"State", bright_white},
+ {"Provided by", bright_white},
+ {"Description", bright_white}]
+ ],
+ Nodes = lists:sort([node() | rabbit_feature_flags:remote_nodes()]),
+ Rows = lists:foldr(
+ fun(FeatureName, Acc) ->
+ FeatureProps = maps:get(FeatureName, FeatureFlags),
+ State0 = rabbit_feature_flags:get_state(FeatureName),
+ {State, StateColor} = case State0 of
+ enabled ->
+ {"Enabled", green};
+ disabled ->
+ {"Disabled", yellow};
+ unavailable ->
+ {"Unavailable", red_bg}
+ end,
+ App = maps:get(provided_by, FeatureProps),
+ Desc = maps:get(desc, FeatureProps, ""),
+ MainLine = [{atom_to_list(FeatureName), bright_white},
+ {State, StateColor},
+ {atom_to_list(App), default},
+ {Desc, default}],
+ VFun = fun(Node) ->
+ Supported =
+ rabbit_feature_flags:does_node_support(
+ Node, [FeatureName], 60000),
+ {Label, LabelColor} =
+ case Supported of
+ true -> {"supported", default};
+ false -> {"unsupported", red_bg}
+ end,
+ Uncolored = rabbit_misc:format(
+ " ~s: ~s", [Node, Label]),
+ Colored = rabbit_misc:format(
+ " ~s: ~s~s~s",
+ [Node,
+ rabbit_pretty_stdout:
+ ascii_color(LabelColor),
+ Label,
+ rabbit_pretty_stdout:
+ ascii_color(default)]),
+ [empty,
+ empty,
+ empty,
+ {Uncolored, Colored}]
+ end,
+ if
+ Verbose > 0 ->
+ [[MainLine,
+ empty,
+ [empty,
+ empty,
+ empty,
+ {"Per-node support level:", default}]
+ | lists:map(VFun, Nodes)] | Acc];
+ true ->
+ [[MainLine] | Acc]
+ end
+ end, [], lists:sort(maps:keys(FeatureFlags))),
+ io:format("~n", []),
+ rabbit_pretty_stdout:display_table([TableHeader | Rows],
+ UseColors,
+ UseLines).
+
+state_legend() ->
+ io:format(
+ "~n"
+ "Possible states:~n"
+ " ~sEnabled~s: The feature flag is enabled on all nodes~n"
+ " ~sDisabled~s: The feature flag is disabled on all nodes~n"
+ " ~sUnavailable~s: The feature flag cannot be enabled because one or "
+ "more nodes do not support it~n"
+ "~n",
+ [rabbit_pretty_stdout:ascii_color(green),
+ rabbit_pretty_stdout:ascii_color(default),
+ rabbit_pretty_stdout:ascii_color(yellow),
+ rabbit_pretty_stdout:ascii_color(default),
+ rabbit_pretty_stdout:ascii_color(red_bg),
+ rabbit_pretty_stdout:ascii_color(default)]).
+
+-spec format_error(any()) -> string().
+%% @doc
+%% Formats the error reason term so it can be presented to human beings.
+%%
+%% @param Reason The term in the `{error, Reason}' tuple.
+%% @returns the formatted error reason.
+
+format_error(Reason) ->
+ rabbit_misc:format("~p", [Reason]).
diff --git a/src/rabbit_ff_registry.erl b/src/rabbit_ff_registry.erl
new file mode 100644
index 0000000000..e18a4b3456
--- /dev/null
+++ b/src/rabbit_ff_registry.erl
@@ -0,0 +1,167 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018-2019 Pivotal Software, Inc. All rights reserved.
+%%
+
+%% @author The RabbitMQ team
+%% @copyright 2018-2019 Pivotal Software, Inc.
+%%
+%% @doc
+%% This module exposes the API of the {@link rabbit_feature_flags}
+%% registry. The feature flags registry is an Erlang module, compiled at
+%% runtime, storing all the informations about feature flags: which are
+%% supported, which are enabled, etc.
+%%
+%% Because it is compiled at runtime, the initial source code is mostly
+%% an API reference. What the initial module does is merely ask {@link
+%% rabbit_feature_flags} to generate the real registry.
+
+-module(rabbit_ff_registry).
+
+-export([get/1,
+ list/1,
+ is_supported/1,
+ is_enabled/1,
+ is_registry_initialized/0,
+ is_registry_written_to_disk/0]).
+
+-spec get(rabbit_feature_flags:feature_name()) ->
+ rabbit_feature_flags:feature_props() | undefined.
+%% @doc
+%% Returns the properties of a feature flag.
+%%
+%% Only the informations stored in the local registry is used to answer
+%% this call.
+%%
+%% @param FeatureName The name of the feature flag.
+%% @returns the properties of the specified feature flag.
+
+get(FeatureName) ->
+ rabbit_feature_flags:initialize_registry(),
+ %% Initially, is_registry_initialized/0 always returns `false`
+ %% and this ?MODULE:get(FeatureName) is always called. The case
+ %% statement is here to please Dialyzer.
+ case is_registry_initialized() of
+ false -> ?MODULE:get(FeatureName);
+ true -> undefined
+ end.
+
+-spec list(all | enabled | disabled) -> rabbit_feature_flags:feature_flags().
+%% @doc
+%% Lists all, enabled or disabled feature flags, depending on the argument.
+%%
+%% Only the informations stored in the local registry is used to answer
+%% this call.
+%%
+%% @param Which The group of feature flags to return: `all', `enabled' or
+%% `disabled'.
+%% @returns A map of selected feature flags.
+
+list(Which) ->
+ rabbit_feature_flags:initialize_registry(),
+ %% See get/1 for an explanation of the case statement below.
+ case is_registry_initialized() of
+ false -> ?MODULE:list(Which);
+ true -> #{}
+ end.
+
+-spec is_supported(rabbit_feature_flags:feature_name()) -> boolean().
+%% @doc
+%% Returns if a feature flag is supported.
+%%
+%% Only the informations stored in the local registry is used to answer
+%% this call.
+%%
+%% @param FeatureName The name of the feature flag to be checked.
+%% @returns `true' if the feature flag is supported, or `false'
+%% otherwise.
+
+is_supported(FeatureName) ->
+ rabbit_feature_flags:initialize_registry(),
+ %% See get/1 for an explanation of the case statement below.
+ case is_registry_initialized() of
+ false -> ?MODULE:is_supported(FeatureName);
+ true -> false
+ end.
+
+-spec is_enabled(rabbit_feature_flags:feature_name()) -> boolean() | state_changing.
+%% @doc
+%% Returns if a feature flag is supported or if its state is changing.
+%%
+%% Only the informations stored in the local registry is used to answer
+%% this call.
+%%
+%% @param FeatureName The name of the feature flag to be checked.
+%% @returns `true' if the feature flag is supported, `state_changing' if
+%% its state is transient, or `false' otherwise.
+
+is_enabled(FeatureName) ->
+ rabbit_feature_flags:initialize_registry(),
+ %% See get/1 for an explanation of the case statement below.
+ case is_registry_initialized() of
+ false -> ?MODULE:is_enabled(FeatureName);
+ true -> false
+ end.
+
+-spec is_registry_initialized() -> boolean().
+%% @doc
+%% Indicates if the registry is initialized.
+%%
+%% The registry is considered initialized once the initial Erlang module
+%% was replaced by the copy compiled at runtime.
+%%
+%% @returns `true' when the module is the one compiled at runtime,
+%% `false' when the module is the initial one compiled from RabbitMQ
+%% source code.
+
+is_registry_initialized() ->
+ always_return_false().
+
+-spec is_registry_written_to_disk() -> boolean().
+%% @doc
+%% Indicates if the feature flags state was successfully persisted to disk.
+%%
+%% Note that on startup, {@link rabbit_feature_flags} tries to determine
+%% the state of each supported feature flag, regardless of the
+%% information on disk, to ensure maximum consistency. However, this can
+%% be done for feature flags supporting it only.
+%%
+%% @returns `true' if the state was successfully written to disk and
+%% the registry can be initialized from that during the next RabbitMQ
+%% startup, `false' if the write failed and the node might loose feature
+%% flags state on restart.
+
+is_registry_written_to_disk() ->
+ always_return_true().
+
+always_return_true() ->
+ %% This function is here to trick Dialyzer. We want some functions
+ %% in this initial on-disk registry to always return `true` or
+ %% `false`. However the generated regsitry will return actual
+ %% booleans. The `-spec()` correctly advertises a return type of
+ %% `boolean()`. But in the meantime, Dialyzer only knows about this
+ %% copy which, without the trick below, would always return either
+ %% `true` (e.g. in is_registry_written_to_disk/0) or `false` (e.g.
+ %% is_registry_initialized/0). This obviously causes some warnings
+ %% where the registry functions are used: Dialyzer believes that
+ %% e.g. matching the return value of is_registry_initialized/0
+ %% against `true` will never succeed.
+ %%
+ %% That's why this function makes a call which we know the result,
+ %% but not Dialyzer, to "create" that hard-coded `true` return
+ %% value.
+ rand:uniform(1) > 0.
+
+always_return_false() ->
+ not always_return_true().
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 4760a9432e..3d561aea5e 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -547,6 +547,7 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) ->
ok = rabbit_table:wait_for_replicated(_Retry = true),
ok = rabbit_table:create_local_copy(NodeType)
end,
+ ensure_feature_flags_are_in_sync(Nodes),
ensure_schema_integrity(),
rabbit_node_monitor:update_cluster_status(),
ok.
@@ -612,6 +613,14 @@ ensure_mnesia_not_running() ->
throw({error, mnesia_unexpectedly_running})
end.
+ensure_feature_flags_are_in_sync(Nodes) ->
+ case rabbit_feature_flags:sync_feature_flags_with_cluster(Nodes) of
+ ok ->
+ ok;
+ {error, Reason} ->
+ throw({error, {incompatible_feature_flags, Reason}})
+ end.
+
ensure_schema_integrity() ->
case rabbit_table:check_schema_integrity(_Retry = true) of
ok ->
@@ -857,12 +866,12 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) ->
check_consistency(Node, OTP, Rabbit, ProtocolVersion) ->
rabbit_misc:sequence_error(
[check_mnesia_or_otp_consistency(Node, ProtocolVersion, OTP),
- check_rabbit_consistency(Rabbit)]).
+ check_rabbit_consistency(Node, Rabbit)]).
check_consistency(Node, OTP, Rabbit, ProtocolVersion, Status) ->
rabbit_misc:sequence_error(
[check_mnesia_or_otp_consistency(Node, ProtocolVersion, OTP),
- check_rabbit_consistency(Rabbit),
+ check_rabbit_consistency(Node, Rabbit),
check_nodes_consistency(Node, Status)]).
check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) ->
@@ -923,10 +932,12 @@ with_running_or_clean_mnesia(Fun) ->
Result
end.
-check_rabbit_consistency(Remote) ->
- rabbit_version:check_version_consistency(
- rabbit_misc:version(), Remote, "Rabbit",
- fun rabbit_misc:version_minor_equivalent/2).
+check_rabbit_consistency(RemoteNode, RemoteVersion) ->
+ rabbit_misc:sequence_error(
+ [rabbit_version:check_version_consistency(
+ rabbit_misc:version(), RemoteVersion, "Rabbit",
+ fun rabbit_misc:version_minor_equivalent/2),
+ rabbit_feature_flags:check_node_compatibility(RemoteNode)]).
%% This is fairly tricky. We want to know if the node is in the state
%% that a `reset' would leave it in. We cannot simply check if the
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 80f70a1b7f..2d8ca07b28 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -453,6 +453,7 @@ clean_plugins(Plugins) ->
clean_plugin(Plugin, ExpandDir) ->
{ok, Mods} = application:get_key(Plugin, modules),
application:unload(Plugin),
+ rabbit_feature_flags:initialize_registry(),
[begin
code:soft_purge(Mod),
code:delete(Mod),
diff --git a/test/dynamic_qq_SUITE.erl b/test/dynamic_qq_SUITE.erl
index fbc1e81827..89344af30c 100644
--- a/test/dynamic_qq_SUITE.erl
+++ b/test/dynamic_qq_SUITE.erl
@@ -83,9 +83,26 @@ init_per_testcase(Testcase, Config) ->
{queue_name, Q},
{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}
]),
- rabbit_ct_helpers:run_steps(Config1,
- rabbit_ct_broker_helpers:setup_steps() ++
- rabbit_ct_client_helpers:setup_steps()).
+ Config2 = rabbit_ct_helpers:run_steps(
+ Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()),
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(
+ Config2, nodename),
+ Ret = rabbit_ct_broker_helpers:rpc(
+ Config2, 0,
+ rabbit_feature_flags,
+ is_supported_remotely,
+ [Nodes, [quorum_queue], 60000]),
+ case Ret of
+ true ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config2, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ Config2;
+ false ->
+ end_per_testcase(Testcase, Config2),
+ {skip, "Quorum queues are unsupported"}
+ end.
end_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:run_steps(Config,
diff --git a/test/feature_flags_SUITE.erl b/test/feature_flags_SUITE.erl
new file mode 100644
index 0000000000..db87442105
--- /dev/null
+++ b/test/feature_flags_SUITE.erl
@@ -0,0 +1,372 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018-2019 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(feature_flags_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-export([suite/0,
+ all/0,
+ groups/0,
+ init_per_suite/1,
+ end_per_suite/1,
+ init_per_group/2,
+ end_per_group/2,
+ init_per_testcase/2,
+ end_per_testcase/2,
+
+ enable_quorum_queue_in_a_healthy_situation/1,
+ enable_unsupported_feature_flag_in_a_healthy_situation/1,
+ enable_quorum_queue_when_ff_file_is_unwritable/1,
+ enable_quorum_queue_with_a_network_partition/1,
+ mark_quorum_queue_as_enabled_with_a_network_partition/1
+ ]).
+
+suite() ->
+ [{timetrap, 5 * 60000}].
+
+all() ->
+ [
+ {group, unclustered},
+ {group, clustered}
+ ].
+
+groups() ->
+ [
+ {unclustered, [],
+ [
+ enable_quorum_queue_in_a_healthy_situation,
+ enable_unsupported_feature_flag_in_a_healthy_situation,
+ enable_quorum_queue_when_ff_file_is_unwritable
+ ]},
+ {clustered, [],
+ [
+ enable_quorum_queue_in_a_healthy_situation,
+ enable_unsupported_feature_flag_in_a_healthy_situation,
+ enable_quorum_queue_when_ff_file_is_unwritable,
+ enable_quorum_queue_with_a_network_partition,
+ mark_quorum_queue_as_enabled_with_a_network_partition
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config, [
+ fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1
+ ]).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(clustered, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 5}]);
+init_per_group(unclustered, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 1}]);
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_clustered, false},
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}},
+ {net_ticktime, 5}
+ ]),
+ Config2 = rabbit_ct_helpers:merge_app_env(
+ Config1,
+ {rabbit,
+ [{forced_feature_flags_on_init, []},
+ {log, [{file, [{level, debug}]}]}]}),
+ Config3 = rabbit_ct_helpers:run_steps(
+ Config2,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++
+ [fun rabbit_ct_broker_helpers:enable_dist_proxy/1,
+ fun rabbit_ct_broker_helpers:cluster_nodes/1]),
+ Ret = rabbit_ct_broker_helpers:rpc(
+ Config3, 0, rabbit_feature_flags, is_supported, [quorum_queue]),
+ case Ret of
+ true ->
+ Config3;
+ false ->
+ end_per_testcase(Testcase, Config3),
+ {skip, "Quorum queues are unsupported"}
+ end.
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+enable_quorum_queue_in_a_healthy_situation(Config) ->
+ FeatureName = quorum_queue,
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ Node = ClusterSize - 1,
+ True = lists:duplicate(ClusterSize, true),
+ False = lists:duplicate(ClusterSize, false),
+
+ %% The feature flag is supported but disabled initially.
+ ?assertEqual(
+ True,
+ is_feature_flag_supported(Config, FeatureName)),
+ ?assertEqual(
+ False,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% Enabling the feature flag works.
+ ?assertEqual(
+ ok,
+ enable_feature_flag_on(Config, Node, FeatureName)),
+ ?assertEqual(
+ True,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% Re-enabling the feature flag also works.
+ ?assertEqual(
+ ok,
+ enable_feature_flag_on(Config, Node, FeatureName)),
+ ?assertEqual(
+ True,
+ is_feature_flag_enabled(Config, FeatureName)).
+
+enable_unsupported_feature_flag_in_a_healthy_situation(Config) ->
+ FeatureName = unsupported_feature_flag,
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ Node = ClusterSize - 1,
+ False = lists:duplicate(ClusterSize, false),
+
+ %% The feature flag is unsupported and thus disabled.
+ ?assertEqual(
+ False,
+ is_feature_flag_supported(Config, FeatureName)),
+ ?assertEqual(
+ False,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% Enabling the feature flag works.
+ ?assertEqual(
+ {error, unsupported},
+ enable_feature_flag_on(Config, Node, FeatureName)),
+ ?assertEqual(
+ False,
+ is_feature_flag_enabled(Config, FeatureName)).
+
+enable_quorum_queue_when_ff_file_is_unwritable(Config) ->
+ FeatureName = quorum_queue,
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ Node = ClusterSize - 1,
+ True = lists:duplicate(ClusterSize, true),
+ False = lists:duplicate(ClusterSize, false),
+ Files = feature_flags_files(Config),
+
+ %% The feature flag is supported but disabled initially.
+ ?assertEqual(
+ True,
+ is_feature_flag_supported(Config, FeatureName)),
+ ?assertEqual(
+ False,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% Restrict permissions on the `feature_flags` files.
+ [?assertEqual(ok, file:change_mode(File, 8#0444)) || File <- Files],
+
+ %% Enabling the feature flag works.
+ ?assertEqual(
+ ok,
+ enable_feature_flag_on(Config, Node, FeatureName)),
+ ?assertEqual(
+ True,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% The `feature_flags` file were not updated.
+ ?assertEqual(
+ lists:duplicate(ClusterSize, {ok, [[]]}),
+ [file:consult(File) || File <- feature_flags_files(Config)]),
+
+ %% Stop all nodes and restore permissions on the `feature_flags` files.
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [?assertEqual(ok, rabbit_ct_broker_helpers:stop_node(Config, N))
+ || N <- Nodes],
+ [?assertEqual(ok, file:change_mode(File, 8#0644)) || File <- Files],
+
+ %% Restart all nodes and assert the feature flag is still enabled and
+ %% the `feature_flags` files were correctly repaired.
+ [?assertEqual(ok, rabbit_ct_broker_helpers:start_node(Config, N))
+ || N <- lists:reverse(Nodes)],
+
+ ?assertEqual(
+ True,
+ is_feature_flag_enabled(Config, FeatureName)),
+ ?assertEqual(
+ lists:duplicate(ClusterSize, {ok, [[FeatureName]]}),
+ [file:consult(File) || File <- feature_flags_files(Config)]).
+
+enable_quorum_queue_with_a_network_partition(Config) ->
+ FeatureName = quorum_queue,
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ [A, B, C, D, E] = rabbit_ct_broker_helpers:get_node_configs(
+ Config, nodename),
+ True = lists:duplicate(ClusterSize, true),
+ False = lists:duplicate(ClusterSize, false),
+
+ %% The feature flag is supported but disabled initially.
+ ?assertEqual(
+ True,
+ is_feature_flag_supported(Config, FeatureName)),
+ ?assertEqual(
+ False,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% Isolate nodes B and E from the rest of the cluster.
+ NodePairs = [{B, A},
+ {B, C},
+ {B, D},
+ {E, A},
+ {E, C},
+ {E, D}],
+ block(NodePairs),
+ timer:sleep(1000),
+
+ %% Enabling the feature flag should fail in the specific case of
+ %% `quorum_queue`, if the network is broken.
+ ?assertEqual(
+ {error, unsupported},
+ enable_feature_flag_on(Config, B, FeatureName)),
+ ?assertEqual(
+ False,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% Repair the network and try again to enable the feature flag.
+ unblock(NodePairs),
+ timer:sleep(1000),
+ [?assertEqual(ok, rabbit_ct_broker_helpers:stop_node(Config, N))
+ || N <- [A, C, D]],
+ [?assertEqual(ok, rabbit_ct_broker_helpers:start_node(Config, N))
+ || N <- [A, C, D]],
+
+ %% Enabling the feature flag works.
+ ?assertEqual(
+ ok,
+ enable_feature_flag_on(Config, B, FeatureName)),
+ ?assertEqual(
+ True,
+ is_feature_flag_enabled(Config, FeatureName)).
+
+mark_quorum_queue_as_enabled_with_a_network_partition(Config) ->
+ FeatureName = quorum_queue,
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ [A, B, C, D, E] = rabbit_ct_broker_helpers:get_node_configs(
+ Config, nodename),
+ True = lists:duplicate(ClusterSize, true),
+ False = lists:duplicate(ClusterSize, false),
+
+ %% The feature flag is supported but disabled initially.
+ ?assertEqual(
+ True,
+ is_feature_flag_supported(Config, FeatureName)),
+ ?assertEqual(
+ False,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% Isolate node B from the rest of the cluster.
+ NodePairs = [{B, A},
+ {B, C},
+ {B, D},
+ {B, E}],
+ block(NodePairs),
+ timer:sleep(1000),
+
+ %% Mark the feature flag as enabled on all nodes from node B. This
+ %% is expected to timeout.
+ RemoteNodes = [A, C, D, E],
+ ?assertEqual(
+ {failed_to_mark_feature_flag_as_enabled_on_remote_nodes,
+ FeatureName,
+ true,
+ RemoteNodes},
+ rabbit_ct_broker_helpers:rpc(
+ Config, B,
+ rabbit_feature_flags, mark_as_enabled_remotely,
+ [RemoteNodes, FeatureName, true, 20000])),
+
+ RepairFun = fun() ->
+ %% Wait a few seconds before we repair the network.
+ timer:sleep(5000),
+
+ %% Repair the network and try again to enable
+ %% the feature flag.
+ unblock(NodePairs),
+ timer:sleep(1000)
+ end,
+ spawn(RepairFun),
+
+ %% Mark the feature flag as enabled on all nodes from node B. This
+ %% is expected to work this time.
+ ct:pal(?LOW_IMPORTANCE,
+ "Marking the feature flag as enabled on remote nodes...", []),
+ ?assertEqual(
+ ok,
+ rabbit_ct_broker_helpers:rpc(
+ Config, B,
+ rabbit_feature_flags, mark_as_enabled_remotely,
+ [RemoteNodes, FeatureName, true, 120000])).
+
+%% FIXME: Finish the testcase above ^
+
+%% -------------------------------------------------------------------
+%% Internal helpers.
+%% -------------------------------------------------------------------
+
+enable_feature_flag_on(Config, Node, FeatureName) ->
+ rabbit_ct_broker_helpers:rpc(
+ Config, Node, rabbit_feature_flags, enable, [FeatureName]).
+
+is_feature_flag_supported(Config, FeatureName) ->
+ rabbit_ct_broker_helpers:rpc_all(
+ Config, rabbit_feature_flags, is_supported, [FeatureName]).
+
+is_feature_flag_enabled(Config, FeatureName) ->
+ rabbit_ct_broker_helpers:rpc_all(
+ Config, rabbit_feature_flags, is_enabled, [FeatureName]).
+
+feature_flags_files(Config) ->
+ rabbit_ct_broker_helpers:rpc_all(
+ Config, rabbit_feature_flags, enabled_feature_flags_list_file, []).
+
+block(Pairs) -> [block(X, Y) || {X, Y} <- Pairs].
+unblock(Pairs) -> [allow(X, Y) || {X, Y} <- Pairs].
+
+block(X, Y) ->
+ rabbit_ct_broker_helpers:block_traffic_between(X, Y).
+
+allow(X, Y) ->
+ rabbit_ct_broker_helpers:allow_traffic_between(X, Y).
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 48dac3ca57..9f40bc7b0c 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -28,6 +28,9 @@
-compile(export_all).
+suite() ->
+ [{timetrap, 5 * 60000}].
+
all() ->
[
{group, single_node},
@@ -172,17 +175,32 @@ init_per_group(Group, Config) ->
Config2 = rabbit_ct_helpers:run_steps(Config1b,
[fun merge_app_env/1 ] ++
rabbit_ct_broker_helpers:setup_steps()),
- ok = rabbit_ct_broker_helpers:rpc(
- Config2, 0, application, set_env,
- [rabbit, channel_queue_cleanup_interval, 100]),
- %% HACK: the larger cluster sizes benefit for a bit more time
- %% after clustering before running the tests.
- case Group of
- cluster_size_5 ->
- timer:sleep(5000),
- Config2;
- _ ->
- Config2
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(
+ Config2, nodename),
+ Ret = rabbit_ct_broker_helpers:rpc(
+ Config2, 0,
+ rabbit_feature_flags,
+ is_supported_remotely,
+ [Nodes, [quorum_queue], 60000]),
+ case Ret of
+ true ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config2, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config2, 0, application, set_env,
+ [rabbit, channel_queue_cleanup_interval, 100]),
+ %% HACK: the larger cluster sizes benefit for a bit more time
+ %% after clustering before running the tests.
+ case Group of
+ cluster_size_5 ->
+ timer:sleep(5000),
+ Config2;
+ _ ->
+ Config2
+ end;
+ false ->
+ end_per_group(Group, Config2),
+ {skip, "Quorum queues are unsupported"}
end.
end_per_group(clustered, Config) ->
@@ -206,11 +224,28 @@ init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publ
{tcp_ports_base},
{queue_name, Q}
]),
- rabbit_ct_helpers:run_steps(Config2,
- rabbit_ct_broker_helpers:setup_steps() ++
- rabbit_ct_client_helpers:setup_steps() ++
- [fun rabbit_ct_broker_helpers:enable_dist_proxy/1,
- fun rabbit_ct_broker_helpers:cluster_nodes/1]);
+ Config3 = rabbit_ct_helpers:run_steps(
+ Config2,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++
+ [fun rabbit_ct_broker_helpers:enable_dist_proxy/1,
+ fun rabbit_ct_broker_helpers:cluster_nodes/1]),
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(
+ Config3, nodename),
+ Ret = rabbit_ct_broker_helpers:rpc(
+ Config3, 0,
+ rabbit_feature_flags,
+ is_supported_remotely,
+ [Nodes, [quorum_queue], 60000]),
+ case Ret of
+ true ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config3, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ Config3;
+ false ->
+ end_per_testcase(Testcase, Config3),
+ {skip, "Quorum queues are unsupported"}
+ end;
init_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl
index 25261042b2..6071aeb5a5 100644
--- a/test/single_active_consumer_SUITE.erl
+++ b/test/single_active_consumer_SUITE.erl
@@ -66,13 +66,21 @@ init_per_group(classic_queue, Config) ->
auto_delete = true}
} | Config];
init_per_group(quorum_queue, Config) ->
- [{single_active_consumer_queue_declare,
- #'queue.declare'{arguments = [
- {<<"x-single-active-consumer">>, bool, true},
- {<<"x-queue-type">>, longstr, <<"quorum">>}
- ],
- durable = true, exclusive = false, auto_delete = false}
- } | Config].
+ Ret = rabbit_ct_broker_helpers:rpc(
+ Config, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ case Ret of
+ ok ->
+ [{single_active_consumer_queue_declare,
+ #'queue.declare'{
+ arguments = [
+ {<<"x-single-active-consumer">>, bool, true},
+ {<<"x-queue-type">>, longstr, <<"quorum">>}
+ ],
+ durable = true, exclusive = false, auto_delete = false}
+ } | Config];
+ Error ->
+ {skip, {"Quorum queues are unsupported", Error}}
+ end.
end_per_group(_, Config) ->
Config.
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
index 581440d179..d028ed3ccf 100644
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ b/test/unit_inbroker_parallel_SUITE.erl
@@ -99,10 +99,24 @@ init_per_group(max_length_classic, Config) ->
[{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
{queue_durable, false}]);
init_per_group(max_length_quorum, Config) ->
- rabbit_ct_helpers:set_config(
- Config,
- [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
- {queue_durable, true}]);
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(
+ Config, nodename),
+ Ret = rabbit_ct_broker_helpers:rpc(
+ Config, 0,
+ rabbit_feature_flags,
+ is_supported_remotely,
+ [Nodes, [quorum_queue], 60000]),
+ case Ret of
+ true ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ {queue_durable, true}]);
+ false ->
+ {skip, "Quorum queues are unsupported"}
+ end;
init_per_group(max_length_mirrored, Config) ->
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
<<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),