diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_feature_flags.erl | 741 | ||||
| -rw-r--r-- | src/rabbit_ff_registry.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_plugins.erl | 5 |
5 files changed, 634 insertions, 147 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 222b6cf3c0..20f9b17abf 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -520,7 +520,7 @@ start_apps(Apps) -> start_apps(Apps, RestartTypes) -> app_utils:load_applications(Apps), - rabbit_feature_flags:initialize_registry(), + ok = rabbit_feature_flags:refresh_feature_flags_after_app_load(Apps), start_loaded_apps(Apps, RestartTypes). start_loaded_apps(Apps) -> diff --git a/src/rabbit_feature_flags.erl b/src/rabbit_feature_flags.erl index 74f5a34db0..78f1c9e03c 100644 --- a/src/rabbit_feature_flags.erl +++ b/src/rabbit_feature_flags.erl @@ -112,8 +112,9 @@ check_node_compatibility/2, is_node_compatible/1, is_node_compatible/2, - sync_feature_flags_with_cluster/1, sync_feature_flags_with_cluster/2, + sync_feature_flags_with_cluster/3, + refresh_feature_flags_after_app_load/1, enabled_feature_flags_list_file/0 ]). @@ -122,10 +123,15 @@ mark_as_enabled_locally/2, remote_nodes/0, running_remote_nodes/0, - does_node_support/3]). + does_node_support/3, + merge_feature_flags_from_unknown_apps/1, + do_sync_feature_flags_with_node/1]). -ifdef(TEST). --export([mark_as_enabled_remotely/2, +-export([initialize_registry/1, + initialize_registry/3, + query_supported_feature_flags/0, + mark_as_enabled_remotely/2, mark_as_enabled_remotely/4]). -endif. @@ -193,6 +199,12 @@ %% <li>`provided_by': the name of the application providing the feature flag</li> %% </ul> +-type feature_state() :: boolean() | state_changing. +%% The state of the feature flag: enabled if `true', disabled if `false' +%% or `state_changing'. + +-type feature_states() :: #{feature_name() => feature_state()}. + -type stability() :: stable | experimental. %% The level of stability of a feature flag. Currently, only informational. @@ -227,6 +239,8 @@ feature_name/0, feature_flags/0, feature_props_extended/0, + feature_state/0, + feature_states/0, stability/0, migration_fun_name/0, migration_fun/0, @@ -516,7 +530,7 @@ is_enabled(FeatureNames) -> (feature_name() | [feature_name()], blocking) -> boolean(); (feature_name() | [feature_name()], non_blocking) -> - boolean() | state_changing. + feature_state(). %% @doc %% Returns if a single feature flag or a set of feature flags is %% enabled. @@ -586,7 +600,7 @@ is_disabled(FeatureNames) -> (feature_name() | [feature_name()], blocking) -> boolean(); (feature_name() | [feature_name()], non_blocking) -> - boolean() | state_changing. + feature_state(). %% @doc %% Returns if a single feature flag or one feature flag in a set of %% feature flags is disabled. @@ -735,17 +749,35 @@ init() -> %% The registry is local to all RabbitMQ nodes. initialize_registry() -> - %% The first step is to get the list of enabled feature flags: if - %% this is the first time we initialize it, we read the list from - %% disk (the `feature_flags` file). Otherwise we query the existing - %% registry before it is replaced. + initialize_registry(#{}). + +-spec initialize_registry(feature_flags()) -> + ok | {error, any()} | no_return(). +%% @private +%% @doc +%% Initializes or reinitializes the registry. +%% +%% See {@link initialize_registry/0} for a description of the registry. +%% +%% This function takes a map of new supported feature flags (so their +%% name and extended properties) to add to the existing known feature +%% flags. + +initialize_registry(NewSupportedFeatureFlags) -> + %% The first step is to get the feature flag states: if this is the + %% first time we initialize it, we read the list from disk (the + %% `feature_flags` file). Otherwise we query the existing registry + %% before it is replaced. RegistryInitialized = rabbit_ff_registry:is_registry_initialized(), - EnabledFeatureNames = case RegistryInitialized of - true -> - maps:keys(rabbit_ff_registry:list(enabled)); - false -> - read_enabled_feature_flags_list() - end, + FeatureStates = case RegistryInitialized of + true -> + rabbit_ff_registry:states(); + false -> + EnabledFeatureNames = + read_enabled_feature_flags_list(), + list_of_enabled_feature_flags_to_feature_states( + EnabledFeatureNames) + end, %% We also record if the feature flags state was correctly written %% to disk. Currently we don't use this information, but in the @@ -759,9 +791,19 @@ initialize_registry() -> false -> true end, - initialize_registry(EnabledFeatureNames, [], WrittenToDisk). + initialize_registry(NewSupportedFeatureFlags, + FeatureStates, + WrittenToDisk). + +-spec list_of_enabled_feature_flags_to_feature_states([feature_name()]) -> + feature_states(). + +list_of_enabled_feature_flags_to_feature_states(FeatureNames) -> + maps:from_list([{FeatureName, true} || FeatureName <- FeatureNames]). --spec initialize_registry([feature_name()], [feature_name()], boolean()) -> +-spec initialize_registry(feature_flags(), + feature_states(), + boolean()) -> ok | {error, any()} | no_return(). %% @private %% @doc @@ -769,54 +811,190 @@ initialize_registry() -> %% %% See {@link initialize_registry/0} for a description of the registry. %% -%% This function takes two list of feature flag names: -%% <ul> -%% <li>the complete list of feature flags to mark as enabled</li> -%% <li>the list of feature flags being enabled or disabled</li> -%% </ul> +%% This function takes a map of new supported feature flags (so their +%% name and extended properties) to add to the existing known feature +%% flags, a map of the new feature flag states (whether they are +%% enabled, disabled or `state_changing'), and a flag to indicate if the +%% feature flag states was recorded to disk. %% %% The latter is used to block callers asking if a feature flag is %% enabled or disabled while its state is changing. -initialize_registry(EnabledFeatureNames, - ChangingFeatureNames, +initialize_registry(NewSupportedFeatureFlags, + NewFeatureStates, WrittenToDisk) -> - %% Query the list (it's a map to be exact) of supported feature - %% flags. That list comes from the `-rabbitmq_feature_flag().` - %% module attributes exposed by all currently loaded Erlang modules. - rabbit_log:debug("Feature flags: (re)initialize registry", []), - AllFeatureFlags = query_supported_feature_flags(), + %% We take the feature flags already registered. + RegistryInitialized = rabbit_ff_registry:is_registry_initialized(), + KnownFeatureFlags1 = case RegistryInitialized of + true -> rabbit_ff_registry:list(all); + false -> #{} + end, + + %% Query the list (it's a map to be exact) of known + %% supported feature flags. That list comes from the + %% `-rabbitmq_feature_flag().` module attributes exposed by all + %% currently loaded Erlang modules. + KnownFeatureFlags2 = query_supported_feature_flags(), + + %% We merge the feature flags we already knew about + %% (KnownFeatureFlags1), those found in the loaded applications + %% (KnownFeatureFlags2) and those specified in arguments + %% (NewSupportedFeatureFlags). The latter come from remote nodes + %% usually: for example, they can come from plugins loaded on remote + %% node but the plugins are missing locally. In this case, we + %% consider those feature flags supported because there is no code + %% locally which would cause issues. + %% + %% It means that the list of feature flags only grows. we don't try + %% to clean it at some point because we want to remember about the + %% feature flags we saw (and their state). It should be fine because + %% that list should remain small. + KnownFeatureFlags = maps:merge(KnownFeatureFlags1, + KnownFeatureFlags2), + AllFeatureFlags = maps:merge(KnownFeatureFlags, + NewSupportedFeatureFlags), + + %% Next we want to update the feature states, based on the new + %% states passed as arguments. + FeatureStates0 = case RegistryInitialized of + true -> + maps:merge(rabbit_ff_registry:states(), + NewFeatureStates); + false -> + NewFeatureStates + end, + FeatureStates = maps:filter( + fun(_, true) -> true; + (_, state_changing) -> true; + (_, false) -> false + end, FeatureStates0), + + Proceed = does_registry_need_refresh(AllFeatureFlags, + FeatureStates, + WrittenToDisk), + + case Proceed of + true -> + rabbit_log:debug("Feature flags: (re)initialize registry", []), + do_initialize_registry(AllFeatureFlags, + FeatureStates, + WrittenToDisk); + false -> + rabbit_log:debug("Feature flags: registry already up-to-date, " + "skipping init", []), + ok + end. +-spec does_registry_need_refresh(feature_flags(), + feature_states(), + boolean()) -> + boolean(). + +does_registry_need_refresh(AllFeatureFlags, + FeatureStates, + WrittenToDisk) -> + case rabbit_ff_registry:is_registry_initialized() of + true -> + %% Before proceeding with the actual + %% (re)initialization, let's see if there are any + %% changes. + CurrentAllFeatureFlags = rabbit_ff_registry:list(all), + CurrentFeatureStates = rabbit_ff_registry:states(), + CurrentWrittenToDisk = rabbit_ff_registry:is_registry_written_to_disk(), + + AllFeatureFlags =/= CurrentAllFeatureFlags orelse + FeatureStates =/= CurrentFeatureStates orelse + WrittenToDisk =/= CurrentWrittenToDisk; + false -> + true + end. + +-spec do_initialize_registry(feature_flags(), + feature_states(), + boolean()) -> + ok | {error, any()} | no_return(). +%% @private + +do_initialize_registry(AllFeatureFlags, + FeatureStates, + WrittenToDisk) -> %% We log the state of those feature flags. rabbit_log:info("Feature flags: list of feature flags found:", []), lists:foreach( fun(FeatureName) -> rabbit_log:info( "Feature flags: [~s] ~s", - [case lists:member(FeatureName, EnabledFeatureNames) of - true -> "x"; - false -> " " + [case maps:is_key(FeatureName, FeatureStates) of + true -> + case maps:get(FeatureName, FeatureStates) of + true -> "x"; + state_changing -> "~" + end; + false -> + " " end, FeatureName]) end, lists:sort(maps:keys(AllFeatureFlags))), + rabbit_log:info("Feature flags: feature flag states written to disk: ~s", + [case WrittenToDisk of + true -> "yes"; + false -> "no" + end]), %% We request the registry to be regenerated and reloaded with the %% new state. regen_registry_mod(AllFeatureFlags, - EnabledFeatureNames, - ChangingFeatureNames, + FeatureStates, WrittenToDisk). -spec query_supported_feature_flags() -> feature_flags(). %% @private +-ifdef(TEST). +module_attributes_from_testsuite() -> + try + throw(force_exception) + catch + throw:force_exception:Stacktrace -> + Modules = lists:filter( + fun({Mod, _, _, _}) -> + ModS = atom_to_list(Mod), + re:run(ModS, "_SUITE$", [{capture, none}]) + =:= + match + end, Stacktrace), + case Modules of + [{Module, _, _, _} | _] -> + ModInfo = Module:module_info(attributes), + Attrs = lists:append( + [Attr || {Name, Attr} <- ModInfo, + Name =:= rabbit_feature_flag]), + case Attrs of + [] -> []; + _ -> [{Module, Module, Attrs}] + end; + _ -> + [] + end + end. + +query_supported_feature_flags() -> + rabbit_log:debug( + "Feature flags: query feature flags in loaded applications + test " + "module"), + AttributesPerApp = rabbit_misc:all_module_attributes(rabbit_feature_flag), + AttributesFromTestsuite = module_attributes_from_testsuite(), + AllAttributes = AttributesPerApp ++ AttributesFromTestsuite, + prepare_queried_feature_flags(AllAttributes, #{}). +-else. query_supported_feature_flags() -> rabbit_log:debug( "Feature flags: query feature flags in loaded applications"), AttributesPerApp = rabbit_misc:all_module_attributes(rabbit_feature_flag), - query_supported_feature_flags(AttributesPerApp, #{}). + prepare_queried_feature_flags(AttributesPerApp, #{}). +-endif. -query_supported_feature_flags([{App, _Module, Attributes} | Rest], +prepare_queried_feature_flags([{App, _Module, Attributes} | Rest], AllFeatureFlags) -> rabbit_log:debug("Feature flags: application `~s` " "has ~b feature flags", @@ -828,8 +1006,8 @@ query_supported_feature_flags([{App, _Module, Attributes} | Rest], FeatureName, FeatureProps) end, AllFeatureFlags, Attributes), - query_supported_feature_flags(Rest, AllFeatureFlags1); -query_supported_feature_flags([], AllFeatureFlags) -> + prepare_queried_feature_flags(Rest, AllFeatureFlags1); +prepare_queried_feature_flags([], AllFeatureFlags) -> AllFeatureFlags. -spec merge_new_feature_flags(feature_flags(), @@ -849,14 +1027,12 @@ merge_new_feature_flags(AllFeatureFlags, App, FeatureName, FeatureProps) #{FeatureName => FeatureProps1}). -spec regen_registry_mod(feature_flags(), - [feature_name()], - [feature_name()], + feature_states(), boolean()) -> ok | {error, any()} | no_return(). %% @private regen_registry_mod(AllFeatureFlags, - EnabledFeatureNames, - ChangingFeatureNames, + FeatureStates, WrittenToDisk) -> %% Here, we recreate the source code of the `rabbit_ff_registry` %% module from scratch. @@ -879,6 +1055,7 @@ regen_registry_mod(AllFeatureFlags, erl_syntax:integer(A)) || {F, A} <- [{get, 1}, {list, 1}, + {states, 0}, {is_supported, 1}, {is_enabled, 1}, {is_registry_initialized, 0}, @@ -910,17 +1087,56 @@ regen_registry_mod(AllFeatureFlags, [ListAllBody]), EnabledFeatureFlags = maps:filter( fun(FeatureName, _) -> - lists:member(FeatureName, - EnabledFeatureNames) + maps:is_key(FeatureName, + FeatureStates) + andalso + maps:get(FeatureName, FeatureStates) + =:= + true end, AllFeatureFlags), ListEnabledBody = erl_syntax:abstract(EnabledFeatureFlags), - ListEnabledClause = erl_syntax:clause([erl_syntax:atom(enabled)], - [], - [ListEnabledBody]), + ListEnabledClause = erl_syntax:clause( + [erl_syntax:atom(enabled)], + [], + [ListEnabledBody]), + DisabledFeatureFlags = maps:filter( + fun(FeatureName, _) -> + not maps:is_key(FeatureName, + FeatureStates) + end, AllFeatureFlags), + ListDisabledBody = erl_syntax:abstract(DisabledFeatureFlags), + ListDisabledClause = erl_syntax:clause( + [erl_syntax:atom(disabled)], + [], + [ListDisabledBody]), + StateChangingFeatureFlags = maps:filter( + fun(FeatureName, _) -> + maps:is_key(FeatureName, + FeatureStates) + andalso + maps:get(FeatureName, FeatureStates) + =:= + state_changing + end, AllFeatureFlags), + ListStateChangingBody = erl_syntax:abstract(StateChangingFeatureFlags), + ListStateChangingClause = erl_syntax:clause( + [erl_syntax:atom(state_changing)], + [], + [ListStateChangingBody]), ListFun = erl_syntax:function( erl_syntax:atom(list), - [ListAllClause, ListEnabledClause]), + [ListAllClause, + ListEnabledClause, + ListDisabledClause, + ListStateChangingClause]), ListFunForm = erl_syntax:revert(ListFun), + %% states() -> ... + StatesBody = erl_syntax:abstract(FeatureStates), + StatesClause = erl_syntax:clause([], [], [StatesBody]), + StatesFun = erl_syntax:function( + erl_syntax:atom(states), + [StatesClause]), + StatesFunForm = erl_syntax:revert(StatesFun), %% is_supported(_) -> ... IsSupportedClauses = [erl_syntax:clause( [erl_syntax:atom(FeatureName)], @@ -940,14 +1156,12 @@ regen_registry_mod(AllFeatureFlags, IsEnabledClauses = [erl_syntax:clause( [erl_syntax:atom(FeatureName)], [], - [case lists:member(FeatureName, - ChangingFeatureNames) of + [case maps:is_key(FeatureName, FeatureStates) of true -> - erl_syntax:atom(state_changing); - false -> erl_syntax:atom( - lists:member(FeatureName, - EnabledFeatureNames)) + maps:get(FeatureName, FeatureStates)); + false -> + erl_syntax:atom(false) end]) || FeatureName <- maps:keys(AllFeatureFlags) ], @@ -984,10 +1198,12 @@ regen_registry_mod(AllFeatureFlags, ExportForm, GetFunForm, ListFunForm, + StatesFunForm, IsSupportedFunForm, IsEnabledFunForm, IsInitializedFunForm, IsWrittenToDiskFunForm], + maybe_log_registry_source_code(Forms), CompileOpts = [return_errors, return_warnings], case compile:forms(Forms, CompileOpts) of @@ -1001,6 +1217,23 @@ regen_registry_mod(AllFeatureFlags, {error, {compilation_failure, Errors, Warnings}} end. +-ifdef(TEST). +maybe_log_registry_source_code(Forms) -> + case os:getenv("LOG_FF_REGISTRY") of + false -> + ok; + _ -> + rabbit_log:debug( + "== FEATURE FLAGS REGISTRY ==~n" + "~s~n" + "== END ==~n", + [erl_prettypr:format(erl_syntax:form_list(Forms))]) + end. +-else. +maybe_log_registry_source_code(_) -> + ok. +-endif. + -spec load_registry_mod(atom(), binary()) -> ok | {error, any()} | no_return(). %% @private @@ -1181,8 +1414,8 @@ enable_locally(FeatureName) when is_atom(FeatureName) -> ok; false -> rabbit_log:debug( - "Feature flag `~s`: enable locally (i.e. was enabled on the cluster " - "when this node was not part of it)", + "Feature flag `~s`: enable locally (as part of feature " + "flag states synchronization)", [FeatureName]), do_enable_locally(FeatureName) end. @@ -1267,7 +1500,7 @@ run_migration_fun(FeatureName, FeatureProps, Arg) -> {error, {invalid_migration_fun, Invalid}} end. --spec mark_as_enabled(feature_name(), boolean() | state_changing) -> +-spec mark_as_enabled(feature_name(), feature_state()) -> any() | {error, any()} | no_return(). %% @private @@ -1279,7 +1512,7 @@ mark_as_enabled(FeatureName, IsEnabled) -> Error end. --spec mark_as_enabled_locally(feature_name(), boolean() | state_changing) -> +-spec mark_as_enabled_locally(feature_name(), feature_state()) -> any() | {error, any()} | no_return(). %% @private @@ -1302,18 +1535,11 @@ mark_as_enabled_locally(FeatureName, IsEnabled) -> ok =:= try_to_write_enabled_feature_flags_list( NewEnabledFeatureNames) end, - case IsEnabled of - state_changing -> - initialize_registry(EnabledFeatureNames, - [FeatureName], - WrittenToDisk); - _ -> - initialize_registry(NewEnabledFeatureNames, - [], - WrittenToDisk) - end. + initialize_registry(#{}, + #{FeatureName => IsEnabled}, + WrittenToDisk). --spec mark_as_enabled_remotely(feature_name(), boolean() | state_changing) -> +-spec mark_as_enabled_remotely(feature_name(), feature_state()) -> any() | {error, any()} | no_return(). %% @private @@ -1321,7 +1547,10 @@ mark_as_enabled_remotely(FeatureName, IsEnabled) -> Nodes = running_remote_nodes(), mark_as_enabled_remotely(Nodes, FeatureName, IsEnabled, ?TIMEOUT). --spec mark_as_enabled_remotely([node()], feature_name(), boolean() | state_changing, timeout()) -> +-spec mark_as_enabled_remotely([node()], + feature_name(), + feature_state(), + timeout()) -> any() | {error, any()} | no_return(). %% @private @@ -1389,6 +1618,12 @@ remote_nodes() -> running_remote_nodes() -> mnesia:system_info(running_db_nodes) -- [node()]. +query_running_remote_nodes(Node, Timeout) -> + case rpc:call(Node, mnesia, system_info, [running_db_nodes], Timeout) of + {badrpc, _} = Error -> Error; + Nodes -> Nodes -- [node()] + end. + -spec does_node_support(node(), [feature_name()], timeout()) -> boolean(). %% @private @@ -1399,30 +1634,19 @@ does_node_support(Node, FeatureNames, Timeout) -> Node -> is_supported_locally(FeatureNames); _ -> - rpc:call(Node, - ?MODULE, is_supported_locally, [FeatureNames], - Timeout) + run_feature_flags_mod_on_remote_node( + Node, is_supported_locally, [FeatureNames], Timeout) end, case Ret of - {badrpc, {'EXIT', - {undef, - [{?MODULE, is_supported_locally, [FeatureNames], []} - | _]}}} -> - %% If rabbit_feature_flags:is_supported_locally/1 is undefined - %% on the remote node, we consider it to be a 3.7.x node. - %% - %% Theoretically, it could be an older version (3.6.x and - %% older). But the RabbitMQ version consistency check - %% (rabbit_misc:version_minor_equivalent/2) called from - %% rabbit_mnesia:check_rabbit_consistency/2 already blocked - %% this situation from happening before we reach this point. + {error, pre_feature_flags_rabbitmq} -> + %% See run_feature_flags_mod_on_remote_node/4 for an + %% explanation why we consider this node a 3.7.x node. rabbit_log:debug( - "Feature flags: ?MODULE:is_supported_locally(~p) unavailable " - "on node `~s`: assuming it is a RabbitMQ 3.7.x node " - "=> consider the feature flags unsupported", - [FeatureNames, Node]), + "Feature flags: no feature flags support on node `~s`, " + "consider the feature flags unsupported: ~p", + [Node, FeatureNames]), false; - {badrpc, Reason} -> + {error, Reason} -> rabbit_log:error("Feature flags: error while querying `~p` " "support on node ~s: ~p", [FeatureNames, Node, Reason]), @@ -1470,6 +1694,23 @@ check_node_compatibility(Node) -> %% @see check_node_compatibility/1 check_node_compatibility(Node, Timeout) -> + %% Before checking compatibility, we exchange feature flags from + %% unknown Erlang applications. So we fetch remote feature flags + %% from applications which are not loaded locally, and the opposite. + %% + %% The goal is that such feature flags are not blocking the + %% communication between nodes because the code (which would + %% break) is missing on those nodes. Therefore they should not be + %% considered when determinig compatibility. + exchange_feature_flags_from_unknown_apps(Node, Timeout), + + %% FIXME FIXME FIXME + %% Quand on tente de mettre deux nœuds en cluster, on a : + %% Feature flags: starting an unclustered node: all feature flags + %% will be enabled by default + %% Ça ne devrait sans doute pas être le cas... + + %% We can now proceed with the actual compatibility check. rabbit_log:debug("Feature flags: node `~s` compatibility check, part 1/2", [Node]), Part1 = local_enabled_feature_flags_is_supported_remotely(Node, Timeout), @@ -1549,6 +1790,42 @@ remote_enabled_feature_flags_is_supported_locally(Node, Timeout) -> is_supported_locally(RemoteEnabledFeatureNames) end. +-spec run_feature_flags_mod_on_remote_node(node(), + atom(), + [term()], + timeout()) -> + term() | {error, term()}. +%% @private + +run_feature_flags_mod_on_remote_node(Node, Function, Args, Timeout) -> + case rpc:call(Node, ?MODULE, Function, Args, Timeout) of + {badrpc, {'EXIT', + {undef, + [{?MODULE, Function, Args, []} + | _]}}} -> + %% If rabbit_feature_flags:is_supported_locally/1 is undefined + %% on the remote node, we consider it to be a 3.7.x node. + %% + %% Theoretically, it could be an older version (3.6.x and + %% older). But the RabbitMQ version consistency check + %% (rabbit_misc:version_minor_equivalent/2) called from + %% rabbit_mnesia:check_rabbit_consistency/2 already blocked + %% this situation from happening before we reach this point. + rabbit_log:debug( + "Feature flags: ~s:~s~p unavailable on node `~s`: " + "assuming it is a RabbitMQ 3.7.x node", + [?MODULE, Function, Args, Node]), + {error, pre_feature_flags_rabbitmq}; + {badrpc, Reason} = Error -> + rabbit_log:error( + "Feature flags: error while running ~s:~s~p " + "on node `~s`: ~p", + [?MODULE, Function, Args, Node, Reason]), + {error, Error}; + Ret -> + Ret + end. + -spec query_remote_feature_flags(node(), Which :: all | enabled | disabled, timeout()) -> @@ -1559,25 +1836,20 @@ query_remote_feature_flags(Node, Which, Timeout) -> rabbit_log:debug("Feature flags: querying ~s feature flags " "on node `~s`...", [Which, Node]), - case rpc:call(Node, ?MODULE, list, [Which], Timeout) of - {badrpc, {'EXIT', - {undef, - [{?MODULE, list, [Which], []} - | _]}}} -> - %% See does_node_support/3 for an explanation why we - %% consider this node a 3.7.x node. + case run_feature_flags_mod_on_remote_node(Node, list, [Which], Timeout) of + {error, pre_feature_flags_rabbitmq} -> + %% See run_feature_flags_mod_on_remote_node/4 for an + %% explanation why we consider this node a 3.7.x node. rabbit_log:debug( - "Feature flags: ?MODULE:list(~s) unavailable on node `~s`: " - "assuming it is a RabbitMQ 3.7.x node " - "=> consider the list empty", - [Which, Node]), + "Feature flags: no feature flags support on node `~s`, " + "consider the list of feature flags empty", [Node]), #{}; - {badrpc, Reason} = Error -> + {error, Reason} = Error -> rabbit_log:error( "Feature flags: error while querying ~s feature flags " "on node `~s`: ~p", [Which, Node, Reason]), - {error, Error}; + Error; RemoteFeatureFlags when is_map(RemoteFeatureFlags) -> RemoteFeatureNames = maps:keys(RemoteFeatureFlags), rabbit_log:debug("Feature flags: querying ~s feature flags " @@ -1586,47 +1858,129 @@ query_remote_feature_flags(Node, Which, Timeout) -> RemoteFeatureFlags end. --spec sync_feature_flags_with_cluster([node()]) -> +-spec merge_feature_flags_from_unknown_apps(feature_flags()) -> + ok | {error, any()}. +%% @private + +merge_feature_flags_from_unknown_apps(FeatureFlags) + when is_map(FeatureFlags) -> + LoadedApps = [App || {App, _, _} <- application:loaded_applications()], + FeatureFlagsFromUnknownApps = + maps:fold( + fun(FeatureName, FeatureProps, UnknownFF) -> + case is_supported_locally(FeatureName) of + true -> + UnknownFF; + false -> + FeatureProvider = maps:get(provided_by, FeatureProps), + case lists:member(FeatureProvider, LoadedApps) of + true -> UnknownFF; + false -> maps:put(FeatureName, FeatureProps, + UnknownFF) + end + end + end, + #{}, + FeatureFlags), + rabbit_log:debug( + "Feature flags: register feature flags provided by applications " + "unknown locally: ~p", + [maps:keys(FeatureFlagsFromUnknownApps)]), + initialize_registry(FeatureFlagsFromUnknownApps). + +exchange_feature_flags_from_unknown_apps(Node, Timeout) -> + %% The first step is to fetch feature flags from Erlang applications + %% we don't know locally (they are loaded remotely, but not + %% locally). + fetch_remote_feature_flags_from_apps_unknown_locally(Node, Timeout), + + %% The next step is to do the opposite: push feature flags to remote + %% nodes so they can register those from applications they don't + %% know. + push_local_feature_flags_from_apps_unknown_remotely(Node, Timeout). + +fetch_remote_feature_flags_from_apps_unknown_locally(Node, Timeout) -> + RemoteFeatureFlags = query_remote_feature_flags(Node, all, Timeout), + merge_feature_flags_from_unknown_apps(RemoteFeatureFlags). + +push_local_feature_flags_from_apps_unknown_remotely(Node, Timeout) -> + LocalFeatureFlags = list(all), + push_local_feature_flags_from_apps_unknown_remotely( + Node, LocalFeatureFlags, Timeout). + +push_local_feature_flags_from_apps_unknown_remotely( + Node, FeatureFlags, Timeout) + when map_size(FeatureFlags) > 0 -> + case query_running_remote_nodes(Node, Timeout) of + {badrpc, Reason} -> + {error, Reason}; + Nodes -> + lists:foreach( + fun(N) -> + run_feature_flags_mod_on_remote_node( + N, + merge_feature_flags_from_unknown_apps, + [FeatureFlags], + Timeout) + end, Nodes) + end; +push_local_feature_flags_from_apps_unknown_remotely(_, _, _) -> + ok. + +-spec sync_feature_flags_with_cluster([node()], boolean()) -> ok | {error, any()} | no_return(). %% @private -sync_feature_flags_with_cluster(Nodes) -> - sync_feature_flags_with_cluster(Nodes, ?TIMEOUT). +sync_feature_flags_with_cluster(Nodes, NodeIsVirgin) -> + sync_feature_flags_with_cluster(Nodes, NodeIsVirgin, ?TIMEOUT). --spec sync_feature_flags_with_cluster([node()], timeout()) -> +-spec sync_feature_flags_with_cluster([node()], boolean(), timeout()) -> ok | {error, any()} | no_return(). %% @private -sync_feature_flags_with_cluster([], _) -> +sync_feature_flags_with_cluster([], NodeIsVirgin, _) -> verify_which_feature_flags_are_actually_enabled(), - FeatureNames = get_forced_feature_flag_names(), - case remote_nodes() of - [] when FeatureNames =:= undefined -> - rabbit_log:debug( - "Feature flags: starting an unclustered node: " - "all feature flags will be enabled by default"), - enable_all(); - [] -> - case FeatureNames of - [] -> + case NodeIsVirgin of + true -> + FeatureNames = get_forced_feature_flag_names(), + case remote_nodes() of + [] when FeatureNames =:= undefined -> rabbit_log:debug( - "Feature flags: starting an unclustered node: " - "all feature flags are forcibly left disabled " - "from the RABBITMQ_FEATURE_FLAGS environment " - "variable"); + "Feature flags: starting an unclustered node " + "for the first time: all feature flags will be " + "enabled by default"), + enable_all(); + [] -> + case FeatureNames of + [] -> + rabbit_log:debug( + "Feature flags: starting an unclustered " + "node for the first time: all feature " + "flags are forcibly left disabled from " + "the RABBITMQ_FEATURE_FLAGS environment " + "variable"), + ok; + _ -> + rabbit_log:debug( + "Feature flags: starting an unclustered " + "node for the first time: only the " + "following feature flags specified in " + "the RABBITMQ_FEATURE_FLAGS environment " + "variable will be enabled: ~p", + [FeatureNames]), + enable(FeatureNames) + end; _ -> - rabbit_log:debug( - "Feature flags: starting an unclustered node: " - "only the following feature flags specified in " - "the RABBITMQ_FEATURE_FLAGS environment variable " - "will be enabled: ~p", - [FeatureNames]) - end, - enable(FeatureNames); - _ -> + ok + end; + false -> + rabbit_log:debug( + "Feature flags: starting an unclustered node which is " + "already initialized: all feature flags left in their " + "current state"), ok end; -sync_feature_flags_with_cluster(Nodes, Timeout) -> +sync_feature_flags_with_cluster(Nodes, _, Timeout) -> verify_which_feature_flags_are_actually_enabled(), RemoteNodes = Nodes -- [node()], sync_feature_flags_with_cluster1(RemoteNodes, Timeout). @@ -1642,19 +1996,45 @@ sync_feature_flags_with_cluster1(RemoteNodes, Timeout) -> Error; RemoteFeatureFlags -> RemoteFeatureNames = maps:keys(RemoteFeatureFlags), - do_sync_feature_flags_with_node1(RemoteFeatureNames) + rabbit_log:debug( + "Feature flags: enabling locally feature flags already " + "enabled on node `~s`...", + [RandomRemoteNode]), + case do_sync_feature_flags_with_node(RemoteFeatureNames) of + ok -> + sync_feature_flags_with_cluster2( + RandomRemoteNode, Timeout); + Error -> + Error + end + end. + +sync_feature_flags_with_cluster2(RandomRemoteNode, Timeout) -> + LocalFeatureNames = maps:keys(list(enabled)), + rabbit_log:debug( + "Feature flags: enabling on node `~s` feature flags already " + "enabled locally...", + [RandomRemoteNode]), + Ret = run_feature_flags_mod_on_remote_node( + RandomRemoteNode, + do_sync_feature_flags_with_node, + [LocalFeatureNames], + Timeout), + case Ret of + {error, pre_feature_flags_rabbitmq} -> ok; + _ -> Ret end. pick_one_node(Nodes) -> RandomIndex = rand:uniform(length(Nodes)), lists:nth(RandomIndex, Nodes). -do_sync_feature_flags_with_node1([FeatureFlag | Rest]) -> +do_sync_feature_flags_with_node([FeatureFlag | Rest]) -> case enable_locally(FeatureFlag) of - ok -> do_sync_feature_flags_with_node1(Rest); + ok -> do_sync_feature_flags_with_node(Rest); Error -> Error end; -do_sync_feature_flags_with_node1([]) -> +do_sync_feature_flags_with_node([]) -> ok. -spec get_forced_feature_flag_names() -> [feature_name()] | undefined. @@ -1793,5 +2173,92 @@ verify_which_feature_flags_are_actually_enabled() -> WrittenToDisk = ok =:= try_to_write_enabled_feature_flags_list( RepairedEnabledFeatureNames), initialize_registry( - RepairedEnabledFeatureNames, [], WrittenToDisk) + #{}, + list_of_enabled_feature_flags_to_feature_states( + RepairedEnabledFeatureNames), + WrittenToDisk) + end. + +-spec refresh_feature_flags_after_app_load([atom()]) -> + ok | {error, any()} | no_return(). + +refresh_feature_flags_after_app_load([]) -> + ok; +refresh_feature_flags_after_app_load(Apps) -> + rabbit_log:debug( + "Feature flags: new apps loaded: ~p -> refreshing feature flags", + [Apps]), + + FeatureFlags0 = list(all), + FeatureFlags1 = query_supported_feature_flags(), + + %% The following list contains all the feature flags this node + %% learned about only because remote nodes have them. Now, the + %% applications providing them are loaded locally as well. + %% Therefore, we may run their migration function in case the state + %% of this node needs it. + AlreadySupportedFeatureNames = maps:keys( + maps:filter( + fun(_, #{provided_by := App}) -> + lists:member(App, Apps) + end, FeatureFlags0)), + case AlreadySupportedFeatureNames of + [] -> + ok; + _ -> + rabbit_log:debug( + "Feature flags: new apps loaded: feature flags already " + "supported: ~p", + [lists:sort(AlreadySupportedFeatureNames)]) + end, + + %% The following list contains all the feature flags no nodes in the + %% cluster knew about before: this is the first time we see them in + %% this instance of the cluster. We need to register them on all + %% nodes. + NewSupportedFeatureFlags = maps:filter( + fun(FeatureName, _) -> + not maps:is_key(FeatureName, + FeatureFlags0) + end, FeatureFlags1), + case maps:keys(NewSupportedFeatureFlags) of + [] -> + ok; + NewSupportedFeatureNames -> + rabbit_log:debug( + "Feature flags: new apps loaded: new feature flags (unseen so " + "far): ~p ", + [lists:sort(NewSupportedFeatureNames)]) + end, + + case initialize_registry() of + ok -> + Ret = maybe_enable_locally_after_app_load( + AlreadySupportedFeatureNames), + case Ret of + ok -> + share_new_feature_flags_after_app_load( + NewSupportedFeatureFlags, ?TIMEOUT); + Error -> + Error + end; + Error -> + Error + end. + +maybe_enable_locally_after_app_load([]) -> + ok; +maybe_enable_locally_after_app_load([FeatureName | Rest]) -> + case is_enabled(FeatureName) of + true -> + case do_enable_locally(FeatureName) of + ok -> maybe_enable_locally_after_app_load(Rest); + Error -> Error + end; + false -> + maybe_enable_locally_after_app_load(Rest) end. + +share_new_feature_flags_after_app_load(FeatureFlags, Timeout) -> + push_local_feature_flags_from_apps_unknown_remotely( + node(), FeatureFlags, Timeout). diff --git a/src/rabbit_ff_registry.erl b/src/rabbit_ff_registry.erl index 46d439001f..b79f8b45ef 100644 --- a/src/rabbit_ff_registry.erl +++ b/src/rabbit_ff_registry.erl @@ -31,6 +31,7 @@ -export([get/1, list/1, + states/0, is_supported/1, is_enabled/1, is_registry_initialized/0, @@ -76,6 +77,23 @@ list(Which) -> true -> #{} end. +-spec states() -> rabbit_feature_flags:feature_states(). +%% @doc +%% Returns the states of supported feature flags. +%% +%% Only the informations stored in the local registry is used to answer +%% this call. +%% +%% @returns A map of feature flag states. + +states() -> + rabbit_feature_flags:initialize_registry(), + %% See get/1 for an explanation of the case statement below. + case is_registry_initialized() of + false -> ?MODULE:states(); + true -> #{} + end. + -spec is_supported(rabbit_feature_flags:feature_name()) -> boolean(). %% @doc %% Returns if a feature flag is supported. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 61b9e10e70..83a6e65817 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -528,6 +528,7 @@ dir() -> mnesia:system_info(directory). %% nodes in the cluster already. It also updates the cluster status %% file. init_db(ClusterNodes, NodeType, CheckOtherNodes) -> + NodeIsVirgin = is_virgin_node(), Nodes = change_extra_db_nodes(ClusterNodes, CheckOtherNodes), %% Note that we use `system_info' here and not the cluster status %% since when we start rabbit for the first time the cluster @@ -551,7 +552,7 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) -> ok = rabbit_table:wait_for_replicated(_Retry = true), ok = rabbit_table:create_local_copy(NodeType) end, - ensure_feature_flags_are_in_sync(Nodes), + ensure_feature_flags_are_in_sync(Nodes, NodeIsVirgin), ensure_schema_integrity(), rabbit_node_monitor:update_cluster_status(), ok. @@ -621,12 +622,12 @@ ensure_mnesia_not_running() -> throw({error, mnesia_unexpectedly_running}) end. -ensure_feature_flags_are_in_sync(Nodes) -> - case rabbit_feature_flags:sync_feature_flags_with_cluster(Nodes) of - ok -> - ok; - {error, Reason} -> - throw({error, {incompatible_feature_flags, Reason}}) +ensure_feature_flags_are_in_sync(Nodes, NodeIsVirgin) -> + Ret = rabbit_feature_flags:sync_feature_flags_with_cluster( + Nodes, NodeIsVirgin), + case Ret of + ok -> ok; + {error, Reason} -> throw({error, {incompatible_feature_flags, Reason}}) end. ensure_schema_integrity() -> diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index abb62a4f9d..ea7c2d47fa 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -469,7 +469,6 @@ clean_plugins(Plugins) -> clean_plugin(Plugin, ExpandDir) -> {ok, Mods} = application:get_key(Plugin, modules), application:unload(Plugin), - rabbit_feature_flags:initialize_registry(), [begin code:soft_purge(Mod), code:delete(Mod), @@ -714,4 +713,6 @@ remove_plugins(Plugins) -> maybe_report_plugin_loading_problems([]) -> ok; maybe_report_plugin_loading_problems(Problems) -> - rabbit_log:warning("Problem reading some plugins: ~p~n", [Problems]). + io:format(standard_error, + "Problem reading some plugins: ~p~n", + [Problems]). |
