diff options
author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2021-12-14 18:50:59 +0100 |
---|---|---|
committer | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2021-12-14 18:50:59 +0100 |
commit | 70aa58514e969d955ca46f3e7b6b92b79a5cd19c (patch) | |
tree | 6ad0bbb7c2ec24f3f6b8435bd37ec30654c2eca7 | |
parent | cf76b479300b767b8ea450293d096cbf729ed734 (diff) | |
download | rabbitmq-server-git-add-feature-flags-controler.tar.gz |
-rw-r--r-- | deps/rabbit/src/rabbit.erl | 7 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_feature_flags.erl | 300 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_ff_controler.erl | 864 |
3 files changed, 886 insertions, 285 deletions
diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index 32e07d095b..5db546450f 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -114,6 +114,13 @@ {requires, external_infrastructure}, {enables, kernel_ready}]}). +-rabbit_boot_step({rabbit_ff_controler, + [{description, "feature flags controler"}, + {mfa, {rabbit_sup, start_child, + [rabbit_ff_controler]}}, + {requires, pre_boot}, + {enables, external_infrastructure}]}). + -rabbit_boot_step({rabbit_core_metrics, [{description, "core metrics storage"}, {mfa, {rabbit_sup, start_child, diff --git a/deps/rabbit/src/rabbit_feature_flags.erl b/deps/rabbit/src/rabbit_feature_flags.erl index 66e9dd49eb..e8bd4663f9 100644 --- a/deps/rabbit/src/rabbit_feature_flags.erl +++ b/deps/rabbit/src/rabbit_feature_flags.erl @@ -86,10 +86,6 @@ disable_all/0, is_supported/1, is_supported/2, - is_supported_locally/1, - is_supported_remotely/1, - is_supported_remotely/2, - is_supported_remotely/3, is_enabled/1, is_enabled/2, is_disabled/1, @@ -104,7 +100,6 @@ is_node_compatible/1, is_node_compatible/2, sync_feature_flags_with_cluster/2, - sync_feature_flags_with_cluster/3, refresh_feature_flags_after_app_load/1, enabled_feature_flags_list_file/0 ]). @@ -112,16 +107,15 @@ %% RabbitMQ internal use only. -export([initialize_registry/0, initialize_registry/1, + initialize_registry/3, mark_as_enabled_locally/2, remote_nodes/0, running_remote_nodes/0, does_node_support/3, - merge_feature_flags_from_unknown_apps/1, - do_sync_feature_flags_with_node/1]). + merge_feature_flags_from_unknown_apps/1]). -ifdef(TEST). -export([inject_test_feature_flags/1, - initialize_registry/3, query_supported_feature_flags/0, mark_as_enabled_remotely/2, mark_as_enabled_remotely/4, @@ -294,37 +288,8 @@ list(Which, Stability) %% feature flag could not be enabled (subsequent feature flags in the %% dependency tree are left unchanged). -enable(FeatureName) when is_atom(FeatureName) -> - rabbit_log_feature_flags:debug( - "Feature flag `~s`: REQUEST TO ENABLE", - [FeatureName]), - case is_enabled(FeatureName) of - true -> - rabbit_log_feature_flags:debug( - "Feature flag `~s`: already enabled", - [FeatureName]), - ok; - false -> - rabbit_log_feature_flags:debug( - "Feature flag `~s`: not enabled, check if supported by cluster", - [FeatureName]), - %% The feature flag must be supported locally and remotely - %% (i.e. by all members of the cluster). - case is_supported(FeatureName) of - true -> - rabbit_log_feature_flags:info( - "Feature flag `~s`: supported, attempt to enable...", - [FeatureName]), - do_enable(FeatureName); - false -> - rabbit_log_feature_flags:error( - "Feature flag `~s`: not supported", - [FeatureName]), - {error, unsupported} - end - end; -enable(FeatureNames) when is_list(FeatureNames) -> - with_feature_flags(FeatureNames, fun enable/1). +enable(FeatureNames) -> + rabbit_ff_controler:enable(FeatureNames). -spec enable_all() -> ok | {error, any()}. %% @doc @@ -336,7 +301,8 @@ enable(FeatureNames) when is_list(FeatureNames) -> %% unchanged). enable_all() -> - with_feature_flags(maps:keys(list(all)), fun enable/1). + AllFeatureNames = maps:keys(list(all)), + enable(AllFeatureNames). -spec disable(feature_name() | [feature_name()]) -> ok | {error, any()}. %% @doc @@ -349,10 +315,8 @@ enable_all() -> %% feature flag could not be disabled (subsequent feature flags in the %% dependency tree are left unchanged). -disable(FeatureName) when is_atom(FeatureName) -> - {error, unsupported}; -disable(FeatureNames) when is_list(FeatureNames) -> - with_feature_flags(FeatureNames, fun disable/1). +disable(FeatureNames) -> + rabbit_ff_controler:disable(FeatureNames). -spec disable_all() -> ok | {error, any()}. %% @doc @@ -364,20 +328,8 @@ disable(FeatureNames) when is_list(FeatureNames) -> %% unchanged). disable_all() -> - with_feature_flags(maps:keys(list(all)), fun disable/1). - --spec with_feature_flags([feature_name()], - fun((feature_name()) -> ok | {error, any()})) -> - ok | {error, any()}. -%% @private - -with_feature_flags([FeatureName | Rest], Fun) -> - case Fun(FeatureName) of - ok -> with_feature_flags(Rest, Fun); - Error -> Error - end; -with_feature_flags([], _) -> - ok. + AllFeatureNames = maps:keys(list(all)), + disable(AllFeatureNames). -spec is_supported(feature_name() | [feature_name()]) -> boolean(). %% @doc @@ -393,8 +345,7 @@ with_feature_flags([], _) -> %% `false' if one of them is not or the RPC timed out. is_supported(FeatureNames) -> - is_supported_locally(FeatureNames) andalso - is_supported_remotely(FeatureNames). + rabbit_ff_controler:are_supported(FeatureNames). -spec is_supported(feature_name() | [feature_name()], timeout()) -> boolean(). @@ -412,8 +363,7 @@ is_supported(FeatureNames) -> %% `false' if one of them is not or the RPC timed out. is_supported(FeatureNames, Timeout) -> - is_supported_locally(FeatureNames) andalso - is_supported_remotely(FeatureNames, Timeout). + rabbit_ff_controler:are_supported(FeatureNames, Timeout). -spec is_supported_locally(feature_name() | [feature_name()]) -> boolean(). %% @doc @@ -430,53 +380,6 @@ is_supported_locally(FeatureName) when is_atom(FeatureName) -> is_supported_locally(FeatureNames) when is_list(FeatureNames) -> lists:all(fun(F) -> rabbit_ff_registry:is_supported(F) end, FeatureNames). --spec is_supported_remotely(feature_name() | [feature_name()]) -> boolean(). -%% @doc -%% Returns if a single feature flag or a set of feature flags is -%% supported by all remote nodes. -%% -%% @param FeatureNames The name or a list of names of the feature flag(s) -%% to be checked. -%% @returns `true' if the set of feature flags is entirely supported, or -%% `false' if one of them is not or the RPC timed out. - -is_supported_remotely(FeatureNames) -> - is_supported_remotely(FeatureNames, ?TIMEOUT). - --spec is_supported_remotely(feature_name() | [feature_name()], timeout()) -> boolean(). -%% @doc -%% Returns if a single feature flag or a set of feature flags is -%% supported by all remote nodes. -%% -%% @param FeatureNames The name or a list of names of the feature flag(s) -%% to be checked. -%% @param Timeout Time in milliseconds after which the RPC gives up. -%% @returns `true' if the set of feature flags is entirely supported, or -%% `false' if one of them is not or the RPC timed out. - -is_supported_remotely(FeatureName, Timeout) when is_atom(FeatureName) -> - is_supported_remotely([FeatureName], Timeout); -is_supported_remotely([], _) -> - rabbit_log_feature_flags:debug( - "Feature flags: skipping query for feature flags support as the " - "given list is empty"), - true; -is_supported_remotely(FeatureNames, Timeout) when is_list(FeatureNames) -> - case running_remote_nodes() of - [] -> - rabbit_log_feature_flags:debug( - "Feature flags: isolated node; skipping remote node query " - "=> consider `~p` supported", - [FeatureNames]), - true; - RemoteNodes -> - rabbit_log_feature_flags:debug( - "Feature flags: about to query these remote nodes about " - "support for `~p`: ~p", - [FeatureNames, RemoteNodes]), - is_supported_remotely(RemoteNodes, FeatureNames, Timeout) - end. - -spec is_supported_remotely([node()], feature_name() | [feature_name()], timeout()) -> boolean(). @@ -1489,39 +1392,6 @@ enabled_feature_flags_list_file() -> %% Feature flags management: enabling. %% ------------------------------------------------------------------- --spec do_enable(feature_name()) -> ok | {error, any()} | no_return(). -%% @private - -do_enable(FeatureName) -> - %% We mark this feature flag as "state changing" before doing the - %% actual state change. We also take a global lock: this permits - %% to block callers asking about a feature flag changing state. - global:set_lock(?FF_STATE_CHANGE_LOCK), - Ret = case mark_as_enabled(FeatureName, state_changing) of - ok -> - case enable_dependencies(FeatureName, true) of - ok -> - case run_migration_fun(FeatureName, enable) of - ok -> - mark_as_enabled(FeatureName, true); - {error, no_migration_fun} -> - mark_as_enabled(FeatureName, true); - Error -> - Error - end; - Error -> - Error - end; - Error -> - Error - end, - case Ret of - ok -> ok; - _ -> mark_as_enabled(FeatureName, false) - end, - global:del_lock(?FF_STATE_CHANGE_LOCK), - Ret. - -spec enable_locally(feature_name()) -> ok | {error, any()} | no_return(). %% @private @@ -1618,18 +1488,6 @@ run_migration_fun(FeatureName, FeatureProps, Arg) -> {error, {invalid_migration_fun, Invalid}} end. --spec mark_as_enabled(feature_name(), feature_state()) -> - any() | {error, any()} | no_return(). -%% @private - -mark_as_enabled(FeatureName, IsEnabled) -> - case mark_as_enabled_locally(FeatureName, IsEnabled) of - ok -> - mark_as_enabled_remotely(FeatureName, IsEnabled); - Error -> - Error - end. - -spec mark_as_enabled_locally(feature_name(), feature_state()) -> any() | {error, any()} | no_return(). %% @private @@ -1658,73 +1516,6 @@ mark_as_enabled_locally(FeatureName, IsEnabled) -> #{FeatureName => IsEnabled}, WrittenToDisk). --spec mark_as_enabled_remotely(feature_name(), feature_state()) -> - any() | {error, any()} | no_return(). -%% @private - -mark_as_enabled_remotely(FeatureName, IsEnabled) -> - Nodes = running_remote_nodes(), - mark_as_enabled_remotely(Nodes, FeatureName, IsEnabled, ?TIMEOUT). - --spec mark_as_enabled_remotely([node()], - feature_name(), - feature_state(), - timeout()) -> - any() | {error, any()} | no_return(). -%% @private - -mark_as_enabled_remotely([], _FeatureName, _IsEnabled, _Timeout) -> - ok; -mark_as_enabled_remotely(Nodes, FeatureName, IsEnabled, Timeout) -> - T0 = erlang:timestamp(), - Rets = [{Node, rpc:call(Node, - ?MODULE, - mark_as_enabled_locally, - [FeatureName, IsEnabled], - Timeout)} - || Node <- Nodes], - FailedNodes = [Node || {Node, Ret} <- Rets, Ret =/= ok], - case FailedNodes of - [] -> - rabbit_log_feature_flags:debug( - "Feature flags: `~s` successfully marked as enabled=~p on all " - "nodes", [FeatureName, IsEnabled]), - ok; - _ -> - rabbit_log_feature_flags:error( - "Feature flags: failed to mark feature flag `~s` as enabled=~p " - "on the following nodes:", [FeatureName, IsEnabled]), - [rabbit_log_feature_flags:error( - "Feature flags: - ~s: ~p", - [Node, Ret]) - || {Node, Ret} <- Rets, - Ret =/= ok], - Sleep = 1000, - T1 = erlang:timestamp(), - Duration = timer:now_diff(T1, T0), - NewTimeout = (Timeout * 1000 - Duration) div 1000 - Sleep, - if - NewTimeout > 0 -> - rabbit_log_feature_flags:debug( - "Feature flags: retrying with a timeout of ~b " - "ms after sleeping for ~b ms", - [NewTimeout, Sleep]), - timer:sleep(Sleep), - mark_as_enabled_remotely(FailedNodes, - FeatureName, - IsEnabled, - NewTimeout); - true -> - rabbit_log_feature_flags:debug( - "Feature flags: not retrying; RPC went over the " - "~b milliseconds timeout", [Timeout]), - %% FIXME: Is crashing the process the best solution here? - throw( - {failed_to_mark_feature_flag_as_enabled_on_remote_nodes, - FeatureName, IsEnabled, FailedNodes}) - end - end. - %% ------------------------------------------------------------------- %% Coordination with remote nodes. %% ------------------------------------------------------------------- @@ -2071,14 +1862,7 @@ push_local_feature_flags_from_apps_unknown_remotely(_, _, _) -> ok | {error, any()} | no_return(). %% @private -sync_feature_flags_with_cluster(Nodes, NodeIsVirgin) -> - sync_feature_flags_with_cluster(Nodes, NodeIsVirgin, ?TIMEOUT). - --spec sync_feature_flags_with_cluster([node()], boolean(), timeout()) -> - ok | {error, any()} | no_return(). -%% @private - -sync_feature_flags_with_cluster([], NodeIsVirgin, _) -> +sync_feature_flags_with_cluster([], NodeIsVirgin) -> verify_which_feature_flags_are_actually_enabled(), case NodeIsVirgin of true -> @@ -2120,63 +1904,9 @@ sync_feature_flags_with_cluster([], NodeIsVirgin, _) -> "current state"), ok end; -sync_feature_flags_with_cluster(Nodes, _, Timeout) -> +sync_feature_flags_with_cluster(_Nodes, _NodeIsVirgin) -> verify_which_feature_flags_are_actually_enabled(), - RemoteNodes = Nodes -- [node()], - sync_feature_flags_with_cluster1(RemoteNodes, Timeout). - -sync_feature_flags_with_cluster1([], _) -> - ok; -sync_feature_flags_with_cluster1(RemoteNodes, Timeout) -> - RandomRemoteNode = pick_one_node(RemoteNodes), - rabbit_log_feature_flags:debug( - "Feature flags: SYNCING FEATURE FLAGS with node `~s`...", - [RandomRemoteNode]), - case query_remote_feature_flags(RandomRemoteNode, enabled, Timeout) of - {error, _} = Error -> - Error; - RemoteFeatureFlags -> - RemoteFeatureNames = maps:keys(RemoteFeatureFlags), - rabbit_log_feature_flags:debug( - "Feature flags: enabling locally feature flags already " - "enabled on node `~s`...", - [RandomRemoteNode]), - case do_sync_feature_flags_with_node(RemoteFeatureNames) of - ok -> - sync_feature_flags_with_cluster2( - RandomRemoteNode, Timeout); - Error -> - Error - end - end. - -sync_feature_flags_with_cluster2(RandomRemoteNode, Timeout) -> - LocalFeatureNames = maps:keys(list(enabled)), - rabbit_log_feature_flags:debug( - "Feature flags: enabling on node `~s` feature flags already " - "enabled locally...", - [RandomRemoteNode]), - Ret = run_feature_flags_mod_on_remote_node( - RandomRemoteNode, - do_sync_feature_flags_with_node, - [LocalFeatureNames], - Timeout), - case Ret of - {error, pre_feature_flags_rabbitmq} -> ok; - _ -> Ret - end. - -pick_one_node(Nodes) -> - RandomIndex = rand:uniform(length(Nodes)), - lists:nth(RandomIndex, Nodes). - -do_sync_feature_flags_with_node([FeatureFlag | Rest]) -> - case enable_locally(FeatureFlag) of - ok -> do_sync_feature_flags_with_node(Rest); - Error -> Error - end; -do_sync_feature_flags_with_node([]) -> - ok. + rabbit_ff_controler:sync_cluster(). -spec get_forced_feature_flag_names() -> [feature_name()] | undefined. %% @private diff --git a/deps/rabbit/src/rabbit_ff_controler.erl b/deps/rabbit/src/rabbit_ff_controler.erl new file mode 100644 index 0000000000..a6aada41de --- /dev/null +++ b/deps/rabbit/src/rabbit_ff_controler.erl @@ -0,0 +1,864 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2018-2021 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_ff_controler). +-behaviour(gen_statem). + +-include_lib("kernel/include/logger.hrl"). + +-include_lib("rabbit_common/include/logging.hrl"). + +-export([enable/1, + disable/1, + sync_cluster/0, + are_supported/1, + are_supported/2]). + +%% Internal use only. +-export([start_link/0, + are_supported_locally/1, + mark_as_enabled_locally/2, + run_migration_fun_locally/2]). + +%% gen_statem callbacks. +-export([callback_mode/0, + init/1, + terminate/3, + code_change/4, + + standing_by/3, + waiting_for_end_of_controler_task/3, + updating_feature_flag_states/3]). + +-record(?MODULE, {from, + notify = #{}}). + +-define(LOCAL_NAME, ?MODULE). +-define(GLOBAL_NAME, {?MODULE, global}). + +-define(FF_STATE_CHANGE_LOCK, {feature_flags_state_change, self()}). + +%% Default timeout for operations on remote nodes. +-define(TIMEOUT, 60000). + +start_link() -> + gen_statem:start_link({local, ?LOCAL_NAME}, ?MODULE, none, []). + +enable(FeatureName) when is_atom(FeatureName) -> + enable([FeatureName]); +enable(FeatureNames) when is_list(FeatureNames) -> + ?LOG_DEBUG( + "Feature flags: REQUEST TO ENABLE: ~p", + [FeatureNames], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + gen_statem:call(?LOCAL_NAME, {enable, FeatureNames}). + +disable(FeatureName) when is_atom(FeatureName) -> + disable([FeatureName]); +disable(FeatureNames) when is_list(FeatureNames) -> + {error, unsupported}. + +sync_cluster() -> + ?LOG_DEBUG( + "Feature flags: SYNCING FEATURE FLAGS in cluster...", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + gen_statem:call(?LOCAL_NAME, sync_cluster). + +%% -------------------------------------------------------------------- +%% gen_statem callbacks. +%% -------------------------------------------------------------------- + +callback_mode() -> + state_functions. + +init(_Args) -> + {ok, standing_by, none}. + +standing_by( + {call, From} = EventType, EventContent, none) + when EventContent =/= notify_when_done -> + case EventContent of + {enable, FeatureNames} -> + ?LOG_NOTICE( + "Feature flags: attempt to enable the following feature " + "flags: ~p", + [FeatureNames], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}); + sync_cluster -> + ?LOG_NOTICE( + "Feature flags: attempt to synchronize feature flag states " + "among running cluster members", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}) + end, + + %% The first step is to register this process globally (it is already + %% registered locally). The purpose is to make sure this one takes full + %% control on feature flag changes among other controlers. + %% + %% This is useful for situations where a new node joins the cluster while + %% a feature flag is being enabled. In this case, when that new node joins + %% and its controler wants to synchronize feature flags, it will block and + %% wait for this one to finish. + case register_globally() of + yes -> + %% We would register the process globally. Therefore we can + %% proceed with enabling/syncing feature flags. + ?LOG_DEBUG( + "Feature flags: controler globally registered; can proceed " + "with task", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + + Data = #?MODULE{from = From}, + {next_state, updating_feature_flag_states, Data, + [{next_event, internal, EventContent}]}; + + no -> + %% Another controler is globally registered. We ask that global + %% controler to notify us when it is done, and we wait for its + %% response. + ?LOG_DEBUG( + "Feature flags: controler NOT globally registered; need to " + "wait for the current global controler's task to finish", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + + RequestId = notify_me_when_done(), + {next_state, waiting_for_end_of_controler_task, RequestId, + [{next_event, EventType, EventContent}]} + end; +standing_by( + {call, From}, notify_when_done, none) -> + %% This state is entered when a globally-registered controler finished its + %% task but had unhandled `notify_when_done` requests in its inbox. We + %% just need to notify the caller that it can proceed. + notify_waiting_controler(From), + {keep_state_and_data, []}. + +waiting_for_end_of_controler_task( + {call, _From}, _EventContent, _RequestId) -> + {keep_state_and_data, [postpone]}; +waiting_for_end_of_controler_task( + info, Msg, RequestId) -> + case gen_statem:check_response(Msg, RequestId) of + {reply, done} -> + ?LOG_DEBUG( + "Feature flags: current global controler's task finished; " + "trying to take next turn", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + {next_state, standing_by, none, []}; + {error, Reason} -> + ?LOG_DEBUG( + "Feature flags: error while waiting for current global " + "controler's task: ~0p; trying to take next turn", + [Reason], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + {next_state, standing_by, none, []}; + no_reply -> + ?LOG_DEBUG( + "Feature flags: unknown message while waiting for current " + "global controler's task: ~0p; still waiting", + [Msg], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + keep_state_and_data + end. + +updating_feature_flag_states( + internal, Task, #?MODULE{from = From} = Data) -> + Reply = proceed_with_task(Task), + unregister_globally(), + notify_waiting_controlers(Data), + {next_state, standing_by, none, [{reply, From, Reply}]}; +updating_feature_flag_states( + {call, From}, notify_when_done, #?MODULE{notify = Notify} = Data) -> + Notify1 = Notify#{From => true}, + Data1 = Data#?MODULE{notify = Notify1}, + {keep_state, Data1}. + +terminate(_Reason, _State, _Data) -> + ok. + +code_change(_OldVsn, OldState, OldData, _Extra) -> + {ok, OldState, OldData}. + +%% -------------------------------------------------------------------- +%% Code to enable and sync feature flags. +%% -------------------------------------------------------------------- + +proceed_with_task({enable, FeatureNames}) -> + ?LOG_DEBUG( + "Feature flags: filter out already enabled feature flags", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + DisabledFeatureNames = [FeatureName + || FeatureName <- FeatureNames, + %% This check can't block because only one + %% controler can modify feature flag states at + %% a time. + not rabbit_feature_flags:is_enabled(FeatureName) + ], + case DisabledFeatureNames of + _ when DisabledFeatureNames =/= [] -> + ?LOG_DEBUG( + "Feature flags: feature flags left to enable: ~p", + [DisabledFeatureNames], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + ?LOG_DEBUG( + "Feature flags: checking running nodes before we enable " + "feature flags (this requires all of them to run)", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + case want_all_nodes() of + {ok, Nodes} -> + ?LOG_DEBUG( + "Feature flags: nodes where the feature flags will " + "be enabled: ~p~n" + "Feature flags: new nodes joining the cluster in " + "between will be taken care of later", + [Nodes], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + case enable_if_supported(Nodes, DisabledFeatureNames) of + ok -> + ?LOG_NOTICE( + "Feature flags: following feature flags " + "enabled: ~p", + [FeatureNames], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + ok; + Error -> + ?LOG_ERROR( + "Feature flags: error(s) while enabling " + "feature flags:~n~p", + [FeatureNames, Error], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + Error + end; + Error -> + Error + end; + [] -> + %% All specified feature flags are already enabled, there is + %% nothing to do. + ?LOG_DEBUG( + "Feature flags: all requested feature flags already enabled", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + ok + end; +proceed_with_task(sync_cluster) -> + %% We assume that a feature flag can only be enabled, not disabled. + %% Therefore this synchronization searches for feature flags enabled on + %% some nodes, but not all and make sure they are enabled everywhere. + %% + %% This happens when a node joins a cluster and that node has a different + %% set of enabled feature flags. + Nodes = running_nodes(), + ?LOG_DEBUG( + "Feature flags: synchronizing feature flags among running nodes: ~p", + [Nodes], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + case get_enabled_feature_flags_inventory(Nodes) of + {ok, Inventory} -> + DisabledPerNode = organize_inventory_per_node(Nodes, Inventory), + case maps:size(DisabledPerNode) of + 0 -> + ?LOG_NOTICE( + "Feature flags: all running nodes are in sync " + "feature-flags-wise", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + ok; + _ -> + ?LOG_DEBUG( + "Feature flags: list of feature flags to enable per " + "node: ~p", + [DisabledPerNode], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + case sync_on_nodes(DisabledPerNode) of + ok -> + ?LOG_NOTICE( + "Feature flags: following nodes " + "synchronized: ~p", + [Nodes], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + ok; + Error -> + ?LOG_ERROR( + "Feature flags: error(s) while synchronizing " + "nodes ~0p:~n~p", + [Nodes, Error], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + Error + end + end; + Error -> + ?LOG_DEBUG( + "Feature flags: failed to query enabled feature flags on " + "running nodes: ~p", + [Error], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + Error + end. + +enable_if_supported(Nodes, FeatureNames) -> + ?LOG_DEBUG( + "Feature flags: checking all requested feature flags are supported", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + case are_supported_on_nodes(Nodes, FeatureNames) of + true -> enable_on_nodes(Nodes, FeatureNames); + false -> {error, unsupported} + end. + +enable_on_nodes(Nodes, FeatureNames) -> + ?LOG_DEBUG( + "Feature flags: acquiring registry state change lock", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + global:set_lock(?FF_STATE_CHANGE_LOCK), + Ret = enable_on_nodes_locked(Nodes, FeatureNames), + ?LOG_DEBUG( + "Feature flags: releasing registry state change lock", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + global:del_lock(?FF_STATE_CHANGE_LOCK), + Ret. + +enable_on_nodes_locked(Nodes, [FeatureName | Rest]) -> + Ret = case mark_as_enabled_on_nodes(Nodes, FeatureName, state_changing) of + ok -> + case do_enable_on_nodes_locked(Nodes, FeatureName) of + ok -> + mark_as_enabled_on_nodes(Nodes, FeatureName, true); + Error -> + Error + end; + Error -> + Error + end, + case Ret of + ok -> + enable_on_nodes_locked(Nodes, Rest); + _ -> + _ = mark_as_enabled_on_nodes(Nodes, FeatureName, false), + Ret + end; +enable_on_nodes_locked(_Node, []) -> + ok. + +do_enable_on_nodes_locked(Nodes, FeatureName) -> + case enable_dependencies_on_nodes(Nodes, FeatureName) of + ok -> + Ret = run_migration_fun_on_nodes( + Nodes, FeatureName, enable, infinity), + case Ret of + ok -> ok; + {error, no_migration_fun} -> ok; + Error -> Error + end; + Error -> + Error + end. + +%% -------------------------------------------------------------------- +%% Global name registration. +%% -------------------------------------------------------------------- + +register_globally() -> + global:register_name(?GLOBAL_NAME, self()). + +unregister_globally() -> + _ = global:unregister_name(?GLOBAL_NAME), + ok. + +notify_me_when_done() -> + gen_statem:send_request({global, ?GLOBAL_NAME}, notify_when_done). + +notify_waiting_controlers(#?MODULE{notify = Notify}) -> + maps:fold( + fun(From, true, Acc) -> + notify_waiting_controler(From), + Acc + end, ok, Notify). + +notify_waiting_controler({ControlerPid, _} = From) -> + ControlerNode = node(ControlerPid), + ?LOG_DEBUG( + "Feature flags: controler's task finished; notify waiting controler " + "on node ~p", + [ControlerNode], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + gen_statem:reply(From, done). + +%% -------------------------------------------------------------------- +%% Cluster relationship. +%% -------------------------------------------------------------------- + +want_all_nodes() -> + AllNodes = all_nodes(), + RunningNodes = running_nodes(), + case AllNodes -- RunningNodes of + [] -> + ?LOG_DEBUG( + "Feature flags: all nodes in the cluster are running, can " + "proceed with the task", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + {ok, AllNodes}; + StoppedNodes -> + ?LOG_ERROR( + "Feature flags: some nodes in the cluster are stopped: ~0p; " + "abort task", + [StoppedNodes], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + {error, {missing_nodes, StoppedNodes}} + end. + +all_nodes() -> + lists:sort(mnesia:system_info(db_nodes)). + +running_nodes() -> + lists:sort(mnesia:system_info(running_db_nodes)). + +run_feature_flags_mod_on_remote_node(Node, Module, Function, Args, Timeout) -> + %% FIXME: Feature flags protocol versioning? + case rpc:call(Node, Module, Function, Args, Timeout) of + {badrpc, {'EXIT', + {undef, + [{rabbit_feature_flags, Function, Args, []} + | _]}}} -> + %% If rabbit_feature_flags:Function() is undefined + %% on the remote node, we consider it to be a 3.7.x + %% pre-feature-flags node. + %% + %% Theoretically, it could be an older version (3.6.x and + %% older). But the RabbitMQ version consistency check + %% (rabbit_misc:version_minor_equivalent/2) called from + %% rabbit_mnesia:check_rabbit_consistency/2 already blocked + %% this situation from happening before we reach this point. + ?LOG_DEBUG( + "Feature flags: ~s:~s~p unavailable on node `~s`: " + "assuming it is a RabbitMQ 3.7.x pre-feature-flags node", + [?MODULE, Function, Args, Node], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + {error, pre_feature_flags_rabbitmq}; + {badrpc, Reason} = Error -> + ?LOG_ERROR( + "Feature flags: error while running ~s:~s~p " + "on node `~s`: ~p", + [?MODULE, Function, Args, Node, Reason], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + {error, Error}; + Ret -> + Ret + end. + +%% -------------------------------------------------------------------- +%% Feature flag support queries. +%% -------------------------------------------------------------------- + +-spec are_supported(FeatureNames) -> AreSupported when + FeatureNames :: [rabbit_feature_flags:feature_name()], + AreSupported :: boolean(). + +are_supported(FeatureNames) -> + are_supported(FeatureNames, ?TIMEOUT). + +-spec are_supported(FeatureNames, Timeout) -> AreSupported when + FeatureNames :: [rabbit_feature_flags:feature_name()], + Timeout :: timeout(), + AreSupported :: boolean(). + +are_supported(FeatureNames, Timeout) -> + Nodes = running_nodes(), + are_supported_on_nodes(Nodes, FeatureNames, Timeout). + +-spec are_supported_on_nodes(Nodes, FeatureNames) -> AreSupported when + Nodes :: [node()], + FeatureNames :: [rabbit_feature_flags:feature_name()], + AreSupported :: boolean(). + +are_supported_on_nodes(Nodes, FeatureNames) -> + are_supported_on_nodes(Nodes, FeatureNames, ?TIMEOUT). + +-spec are_supported_on_nodes(Nodes, FeatureNames, Timeout) -> AreSupported when + Nodes :: [node()], + FeatureNames :: [rabbit_feature_flags:feature_name()], + Timeout :: timeout(), + AreSupported :: boolean(). +%% @doc +%% Returns if a set of feature flags is supported by specified remote nodes. +%% +%% @param RemoteNodes The list of remote nodes to query. +%% @param FeatureNames A list of names of the feature flag(s) to be checked. +%% @param Timeout Time in milliseconds after which the RPC gives up. +%% @returns `true' if the set of feature flags is entirely supported by +%% all nodes, or `false' if one of them is not or the RPC timed out. + +are_supported_on_nodes(_, [], _) -> + ?LOG_DEBUG( + "Feature flags: skipping query for feature flags support as the " + "given list is empty", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + true; +are_supported_on_nodes([Node | Rest], FeatureNames, Timeout) -> + case are_supported_on_node(Node, FeatureNames, Timeout) of + true -> + are_supported_on_nodes(Rest, FeatureNames, Timeout); + false -> + ?LOG_DEBUG( + "Feature flags: stopping query for support for ~p here", + [FeatureNames], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + false + end; +are_supported_on_nodes([], FeatureNames, _Timeout) -> + ?LOG_DEBUG( + "Feature flags: all nodes support ~p", + [FeatureNames], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + true. + +are_supported_on_node(Node, FeatureNames, Timeout) -> + ?LOG_DEBUG( + "Feature flags: querying `~p` support on node ~p...", + [FeatureNames, Node], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + Ret = case node() of + Node -> + are_supported_locally(FeatureNames); + _ -> + run_feature_flags_mod_on_remote_node( + Node, ?MODULE, are_supported_locally, [FeatureNames], + Timeout) + end, + case Ret of + true -> + ?LOG_DEBUG( + "Feature flags: node ~p supports ~p", + [Node, FeatureNames], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + true; + false -> + ?LOG_DEBUG( + "Feature flags: node ~p does not support ~p", + [Node, FeatureNames], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + false; + {error, pre_feature_flags_rabbitmq} -> + %% See run_feature_flags_mod_on_remote_node/5 for + %% an explanation why we consider this node a 3.7.x + %% pre-feature-flags node. + ?LOG_DEBUG( + "Feature flags: no feature flags support on node ~p, " + "consider the feature flags unsupported: ~p", + [Node, FeatureNames], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + false; + {error, Reason} -> + ?LOG_DEBUG( + "Feature flags: error while querying ~p support on " + "node ~p: ~p", + [FeatureNames, Node, Reason], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + false + end. + +-spec are_supported_locally(FeatureNames) -> AreSupported when + FeatureNames :: FeatureName | [FeatureNames], + FeatureName :: rabbit_feature_flags:feature_name(), + AreSupported :: boolean(). +%% @doc +%% Returns if a single feature flag or a set of feature flags is +%% supported by the local node. +%% +%% @param FeatureNames The name or a list of names of the feature flag(s) +%% to be checked. +%% @returns `true' if the set of feature flags is entirely supported, or +%% `false' if one of them is not. + +are_supported_locally(FeatureName) when is_atom(FeatureName) -> + rabbit_ff_registry:is_supported(FeatureName); +are_supported_locally(FeatureNames) when is_list(FeatureNames) -> + lists:all(fun(F) -> rabbit_ff_registry:is_supported(F) end, FeatureNames). + +%% -------------------------------------------------------------------- +%% Feature flag state changes. +%% -------------------------------------------------------------------- + +mark_as_enabled_on_nodes(Nodes, FeatureName, IsEnabled) -> + mark_as_enabled_on_nodes(Nodes, FeatureName, IsEnabled, ?TIMEOUT). + +mark_as_enabled_on_nodes([Node | Rest], FeatureName, IsEnabled, Timeout) -> + ?LOG_DEBUG( + "Feature flags: ~s: mark as enabled=~p on node ~p", + [FeatureName, IsEnabled, Node], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + case mark_as_enabled_on_node(Node, FeatureName, IsEnabled, Timeout) of + ok -> + mark_as_enabled_on_nodes(Rest, FeatureName, IsEnabled, Timeout); + Error -> + Error + end; +mark_as_enabled_on_nodes([], _FeatureName, _IsEnabled, _Timeout) -> + ok. + +mark_as_enabled_on_node(Node, FeatureName, IsEnabled, _Timeout) + when Node =:= node() -> + mark_as_enabled_locally(FeatureName, IsEnabled); +mark_as_enabled_on_node(Node, FeatureName, IsEnabled, Timeout) -> + run_feature_flags_mod_on_remote_node( + Node, ?MODULE, mark_as_enabled_locally, [FeatureName, IsEnabled], + Timeout). + +mark_as_enabled_locally(FeatureName, IsEnabled) -> + ?LOG_DEBUG( + "Feature flags: ~s: mark as enabled=~p", + [FeatureName, IsEnabled], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + EnabledFeatureNames = maps:keys(rabbit_feature_flags:list(enabled)), + NewEnabledFeatureNames = case IsEnabled of + true -> + [FeatureName | EnabledFeatureNames]; + false -> + EnabledFeatureNames -- [FeatureName]; + state_changing -> + EnabledFeatureNames + end, + WrittenToDisk = case NewEnabledFeatureNames of + EnabledFeatureNames -> + rabbit_ff_registry:is_registry_written_to_disk(); + _ -> + ok =:= try_to_write_enabled_feature_flags_list( + NewEnabledFeatureNames) + end, + rabbit_feature_flags:initialize_registry(#{}, + #{FeatureName => IsEnabled}, + WrittenToDisk). + +try_to_read_enabled_feature_flags_list() -> + File = rabbit_feature_flags:enabled_feature_flags_list_file(), + case file:consult(File) of + {ok, [List]} -> + List; + {error, enoent} -> + %% If the file is missing, we consider the list of enabled + %% feature flags to be empty. + []; + {error, Reason} = Error -> + ?LOG_ERROR( + "Feature flags: failed to read the `feature_flags` " + "file at `~ts`: ~s", + [File, file:format_error(Reason)], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + Error + end. + +try_to_write_enabled_feature_flags_list(FeatureNames) -> + %% Before writing the new file, we read the existing one. If there + %% are unknown feature flags in that file, we want to keep their + %% state, even though they are unsupported at this time. It could be + %% that a plugin was disabled in the meantime. + %% + %% FIXME: Lock this code to fix concurrent read/modify/write. + PreviouslyEnabled = case try_to_read_enabled_feature_flags_list() of + {error, _} -> []; + List -> List + end, + FeatureNames1 = [Name + || Name <- PreviouslyEnabled, + not are_supported_locally(Name)] ++ FeatureNames, + FeatureNames2 = lists:sort(FeatureNames1), + + File = rabbit_feature_flags:enabled_feature_flags_list_file(), + Content = io_lib:format("~p.~n", [FeatureNames2]), + %% TODO: If we fail to write the the file, we should spawn a process + %% to retry the operation. + case file:write_file(File, Content) of + ok -> + ok; + {error, Reason} = Error -> + ?LOG_ERROR( + "Feature flags: failed to write the `feature_flags` " + "file at `~s`: ~s", + [File, file:format_error(Reason)], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + Error + end. + +%% -------------------------------------------------------------------- +%% Feature flag dependencies handling. +%% -------------------------------------------------------------------- + +-spec enable_dependencies_on_nodes(Nodes, FeatureName) -> Ret when + Nodes :: [node()], + FeatureName :: rabbit_feature_flags:feature_name(), + Ret :: ok | {error, any()} | no_return(). +%% @private + +enable_dependencies_on_nodes(Nodes, FeatureName) -> + FeatureProps = rabbit_ff_registry:get(FeatureName), + DependsOn = maps:get(depends_on, FeatureProps, []), + case DependsOn of + [] -> + ok; + _ -> + ?LOG_DEBUG( + "Feature flags: `~s`: enable dependencies: ~p", + [FeatureName, DependsOn], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + enable_on_nodes_locked(Nodes, DependsOn) + end. + +%% -------------------------------------------------------------------- +%% Migration function. +%% -------------------------------------------------------------------- + +run_migration_fun_on_nodes([Node | Rest], FeatureName, Arg, Timeout) -> + ?LOG_DEBUG( + "Feature flags: `~s`: run migration function with arg=~p on node ~p", + [FeatureName, Arg, Node], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + case run_migration_fun_on_node(Node, FeatureName, Arg, Timeout) of + ok -> run_migration_fun_on_nodes(Rest, FeatureName, Arg, Timeout); + Error -> Error + end; +run_migration_fun_on_nodes([], _FeatureName, _Arg, _Timeout) -> + ok. + +run_migration_fun_on_node(Node, FeatureName, Arg, _Timeout) + when Node =:= node() -> + run_migration_fun_locally(FeatureName, Arg); +run_migration_fun_on_node(Node, FeatureName, Arg, Timeout) -> + run_feature_flags_mod_on_remote_node( + Node, ?MODULE, run_migration_fun_locally, [FeatureName, Arg], + Timeout). + +run_migration_fun_locally(FeatureName, Arg) -> + FeatureProps = rabbit_ff_registry:get(FeatureName), + run_migration_fun_locally(FeatureName, FeatureProps, Arg). + +run_migration_fun_locally(FeatureName, FeatureProps, Arg) -> + case maps:get(migration_fun, FeatureProps, none) of + {MigrationMod, MigrationFun} + when is_atom(MigrationMod) andalso is_atom(MigrationFun) -> + ?LOG_DEBUG( + "Feature flags: `~s`: run migration function ~p with arg: ~p", + [FeatureName, MigrationFun, Arg], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + try + erlang:apply(MigrationMod, + MigrationFun, + [FeatureName, FeatureProps, Arg]) + catch + _:Reason:Stacktrace -> + ?LOG_ERROR( + "Feature flags: `~s`: migration function crashed: " + "~p~n~p", + [FeatureName, Reason, Stacktrace], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + {error, {migration_fun_crash, Reason, Stacktrace}} + end; + none -> + {error, no_migration_fun}; + Invalid -> + ?LOG_ERROR( + "Feature flags: `~s`: invalid migration function: ~p", + [FeatureName, Invalid], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + {error, {invalid_migration_fun, Invalid}} + end. + +%% -------------------------------------------------------------------- +%% Synchronization. +%% -------------------------------------------------------------------- + +get_enabled_feature_flags_inventory(Nodes) -> + get_enabled_feature_flags_inventory(Nodes, #{}, ?TIMEOUT). + +get_enabled_feature_flags_inventory([Node | Rest], Inventory, Timeout) + when Node =:= node() -> + EnabledFeatureFlags = rabbit_feature_flags:list(enabled), + EnabledFeatureNames = maps:keys(EnabledFeatureFlags), + Inventory1 = update_inventory(Inventory, Node, EnabledFeatureNames), + get_enabled_feature_flags_inventory(Rest, Inventory1, Timeout); +get_enabled_feature_flags_inventory([Node | Rest], Inventory, Timeout) -> + Ret = run_feature_flags_mod_on_remote_node( + Node, rabbit_feature_flags, list, [enabled], Timeout), + case Ret of + EnabledFeatureFlags when is_map(EnabledFeatureFlags) -> + EnabledFeatureNames = maps:keys(EnabledFeatureFlags), + Inventory1 = update_inventory( + Inventory, Node, EnabledFeatureNames), + get_enabled_feature_flags_inventory(Rest, Inventory1, Timeout); + Error -> + Error + end; +get_enabled_feature_flags_inventory([], Inventory, _Timeout) -> + {ok, Inventory}. + +update_inventory(Inventory, Node, [FeatureName | Rest]) -> + Inventory1 = case Inventory of + #{FeatureName := Nodes} -> + Inventory#{FeatureName => [Node | Nodes]}; + _ -> + Inventory#{FeatureName => [Node]} + end, + update_inventory(Inventory1, Node, Rest); +update_inventory(Inventory, _Node, []) -> + Inventory. + +organize_inventory_per_node(Nodes, Inventory) -> + EnabledFeatureNames = lists:sort(maps:keys(Inventory)), + organize_inventory_per_node1(Nodes, EnabledFeatureNames, Inventory, #{}). + +organize_inventory_per_node1( + [Node | Rest], EnabledFeatureNames, Inventory, DisabledPerNode) -> + DisabledFeatureNames = [FeatureName + || FeatureName <- EnabledFeatureNames, + not lists:member( + Node, + maps:get(FeatureName, Inventory))], + case DisabledFeatureNames of + [] -> + organize_inventory_per_node1( + Rest, EnabledFeatureNames, Inventory, DisabledPerNode); + _ -> + DisabledPerNode1 = DisabledPerNode#{Node => DisabledFeatureNames}, + organize_inventory_per_node1( + Rest, EnabledFeatureNames, Inventory, DisabledPerNode1) + end; +organize_inventory_per_node1( + [], _EnabledFeatureNames, _Inventory, DisabledPerNode) -> + DisabledPerNode. + +sync_on_nodes(DisabledPerNode) -> + Nodes = lists:sort(maps:keys(DisabledPerNode)), + sync_on_nodes(Nodes, DisabledPerNode). + +sync_on_nodes([Node | Rest], DisabledPerNode) -> + #{Node := FeatureNames} = DisabledPerNode, + ?LOG_DEBUG( + "Feature flags: feature flags to enable to synchronize node ~p: ~p~n", + [Node, FeatureNames], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + case enable_if_supported([Node], FeatureNames) of + ok -> + sync_on_nodes(Rest, DisabledPerNode); + Error -> + ?LOG_ERROR( + "Feature flags: failed to synchronize node ~p: ~p~n", + [Node, Error], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + Error + end; +sync_on_nodes([], _DisabledPerNode) -> + ok. |