summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_feature_flags.erl741
-rw-r--r--src/rabbit_ff_registry.erl18
-rw-r--r--src/rabbit_mnesia.erl15
-rw-r--r--src/rabbit_plugins.erl5
5 files changed, 634 insertions, 147 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 222b6cf3c0..20f9b17abf 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -520,7 +520,7 @@ start_apps(Apps) ->
start_apps(Apps, RestartTypes) ->
app_utils:load_applications(Apps),
- rabbit_feature_flags:initialize_registry(),
+ ok = rabbit_feature_flags:refresh_feature_flags_after_app_load(Apps),
start_loaded_apps(Apps, RestartTypes).
start_loaded_apps(Apps) ->
diff --git a/src/rabbit_feature_flags.erl b/src/rabbit_feature_flags.erl
index 74f5a34db0..78f1c9e03c 100644
--- a/src/rabbit_feature_flags.erl
+++ b/src/rabbit_feature_flags.erl
@@ -112,8 +112,9 @@
check_node_compatibility/2,
is_node_compatible/1,
is_node_compatible/2,
- sync_feature_flags_with_cluster/1,
sync_feature_flags_with_cluster/2,
+ sync_feature_flags_with_cluster/3,
+ refresh_feature_flags_after_app_load/1,
enabled_feature_flags_list_file/0
]).
@@ -122,10 +123,15 @@
mark_as_enabled_locally/2,
remote_nodes/0,
running_remote_nodes/0,
- does_node_support/3]).
+ does_node_support/3,
+ merge_feature_flags_from_unknown_apps/1,
+ do_sync_feature_flags_with_node/1]).
-ifdef(TEST).
--export([mark_as_enabled_remotely/2,
+-export([initialize_registry/1,
+ initialize_registry/3,
+ query_supported_feature_flags/0,
+ mark_as_enabled_remotely/2,
mark_as_enabled_remotely/4]).
-endif.
@@ -193,6 +199,12 @@
%% <li>`provided_by': the name of the application providing the feature flag</li>
%% </ul>
+-type feature_state() :: boolean() | state_changing.
+%% The state of the feature flag: enabled if `true', disabled if `false'
+%% or `state_changing'.
+
+-type feature_states() :: #{feature_name() => feature_state()}.
+
-type stability() :: stable | experimental.
%% The level of stability of a feature flag. Currently, only informational.
@@ -227,6 +239,8 @@
feature_name/0,
feature_flags/0,
feature_props_extended/0,
+ feature_state/0,
+ feature_states/0,
stability/0,
migration_fun_name/0,
migration_fun/0,
@@ -516,7 +530,7 @@ is_enabled(FeatureNames) ->
(feature_name() | [feature_name()], blocking) ->
boolean();
(feature_name() | [feature_name()], non_blocking) ->
- boolean() | state_changing.
+ feature_state().
%% @doc
%% Returns if a single feature flag or a set of feature flags is
%% enabled.
@@ -586,7 +600,7 @@ is_disabled(FeatureNames) ->
(feature_name() | [feature_name()], blocking) ->
boolean();
(feature_name() | [feature_name()], non_blocking) ->
- boolean() | state_changing.
+ feature_state().
%% @doc
%% Returns if a single feature flag or one feature flag in a set of
%% feature flags is disabled.
@@ -735,17 +749,35 @@ init() ->
%% The registry is local to all RabbitMQ nodes.
initialize_registry() ->
- %% The first step is to get the list of enabled feature flags: if
- %% this is the first time we initialize it, we read the list from
- %% disk (the `feature_flags` file). Otherwise we query the existing
- %% registry before it is replaced.
+ initialize_registry(#{}).
+
+-spec initialize_registry(feature_flags()) ->
+ ok | {error, any()} | no_return().
+%% @private
+%% @doc
+%% Initializes or reinitializes the registry.
+%%
+%% See {@link initialize_registry/0} for a description of the registry.
+%%
+%% This function takes a map of new supported feature flags (so their
+%% name and extended properties) to add to the existing known feature
+%% flags.
+
+initialize_registry(NewSupportedFeatureFlags) ->
+ %% The first step is to get the feature flag states: if this is the
+ %% first time we initialize it, we read the list from disk (the
+ %% `feature_flags` file). Otherwise we query the existing registry
+ %% before it is replaced.
RegistryInitialized = rabbit_ff_registry:is_registry_initialized(),
- EnabledFeatureNames = case RegistryInitialized of
- true ->
- maps:keys(rabbit_ff_registry:list(enabled));
- false ->
- read_enabled_feature_flags_list()
- end,
+ FeatureStates = case RegistryInitialized of
+ true ->
+ rabbit_ff_registry:states();
+ false ->
+ EnabledFeatureNames =
+ read_enabled_feature_flags_list(),
+ list_of_enabled_feature_flags_to_feature_states(
+ EnabledFeatureNames)
+ end,
%% We also record if the feature flags state was correctly written
%% to disk. Currently we don't use this information, but in the
@@ -759,9 +791,19 @@ initialize_registry() ->
false ->
true
end,
- initialize_registry(EnabledFeatureNames, [], WrittenToDisk).
+ initialize_registry(NewSupportedFeatureFlags,
+ FeatureStates,
+ WrittenToDisk).
+
+-spec list_of_enabled_feature_flags_to_feature_states([feature_name()]) ->
+ feature_states().
+
+list_of_enabled_feature_flags_to_feature_states(FeatureNames) ->
+ maps:from_list([{FeatureName, true} || FeatureName <- FeatureNames]).
--spec initialize_registry([feature_name()], [feature_name()], boolean()) ->
+-spec initialize_registry(feature_flags(),
+ feature_states(),
+ boolean()) ->
ok | {error, any()} | no_return().
%% @private
%% @doc
@@ -769,54 +811,190 @@ initialize_registry() ->
%%
%% See {@link initialize_registry/0} for a description of the registry.
%%
-%% This function takes two list of feature flag names:
-%% <ul>
-%% <li>the complete list of feature flags to mark as enabled</li>
-%% <li>the list of feature flags being enabled or disabled</li>
-%% </ul>
+%% This function takes a map of new supported feature flags (so their
+%% name and extended properties) to add to the existing known feature
+%% flags, a map of the new feature flag states (whether they are
+%% enabled, disabled or `state_changing'), and a flag to indicate if the
+%% feature flag states was recorded to disk.
%%
%% The latter is used to block callers asking if a feature flag is
%% enabled or disabled while its state is changing.
-initialize_registry(EnabledFeatureNames,
- ChangingFeatureNames,
+initialize_registry(NewSupportedFeatureFlags,
+ NewFeatureStates,
WrittenToDisk) ->
- %% Query the list (it's a map to be exact) of supported feature
- %% flags. That list comes from the `-rabbitmq_feature_flag().`
- %% module attributes exposed by all currently loaded Erlang modules.
- rabbit_log:debug("Feature flags: (re)initialize registry", []),
- AllFeatureFlags = query_supported_feature_flags(),
+ %% We take the feature flags already registered.
+ RegistryInitialized = rabbit_ff_registry:is_registry_initialized(),
+ KnownFeatureFlags1 = case RegistryInitialized of
+ true -> rabbit_ff_registry:list(all);
+ false -> #{}
+ end,
+
+ %% Query the list (it's a map to be exact) of known
+ %% supported feature flags. That list comes from the
+ %% `-rabbitmq_feature_flag().` module attributes exposed by all
+ %% currently loaded Erlang modules.
+ KnownFeatureFlags2 = query_supported_feature_flags(),
+
+ %% We merge the feature flags we already knew about
+ %% (KnownFeatureFlags1), those found in the loaded applications
+ %% (KnownFeatureFlags2) and those specified in arguments
+ %% (NewSupportedFeatureFlags). The latter come from remote nodes
+ %% usually: for example, they can come from plugins loaded on remote
+ %% node but the plugins are missing locally. In this case, we
+ %% consider those feature flags supported because there is no code
+ %% locally which would cause issues.
+ %%
+ %% It means that the list of feature flags only grows. we don't try
+ %% to clean it at some point because we want to remember about the
+ %% feature flags we saw (and their state). It should be fine because
+ %% that list should remain small.
+ KnownFeatureFlags = maps:merge(KnownFeatureFlags1,
+ KnownFeatureFlags2),
+ AllFeatureFlags = maps:merge(KnownFeatureFlags,
+ NewSupportedFeatureFlags),
+
+ %% Next we want to update the feature states, based on the new
+ %% states passed as arguments.
+ FeatureStates0 = case RegistryInitialized of
+ true ->
+ maps:merge(rabbit_ff_registry:states(),
+ NewFeatureStates);
+ false ->
+ NewFeatureStates
+ end,
+ FeatureStates = maps:filter(
+ fun(_, true) -> true;
+ (_, state_changing) -> true;
+ (_, false) -> false
+ end, FeatureStates0),
+
+ Proceed = does_registry_need_refresh(AllFeatureFlags,
+ FeatureStates,
+ WrittenToDisk),
+
+ case Proceed of
+ true ->
+ rabbit_log:debug("Feature flags: (re)initialize registry", []),
+ do_initialize_registry(AllFeatureFlags,
+ FeatureStates,
+ WrittenToDisk);
+ false ->
+ rabbit_log:debug("Feature flags: registry already up-to-date, "
+ "skipping init", []),
+ ok
+ end.
+-spec does_registry_need_refresh(feature_flags(),
+ feature_states(),
+ boolean()) ->
+ boolean().
+
+does_registry_need_refresh(AllFeatureFlags,
+ FeatureStates,
+ WrittenToDisk) ->
+ case rabbit_ff_registry:is_registry_initialized() of
+ true ->
+ %% Before proceeding with the actual
+ %% (re)initialization, let's see if there are any
+ %% changes.
+ CurrentAllFeatureFlags = rabbit_ff_registry:list(all),
+ CurrentFeatureStates = rabbit_ff_registry:states(),
+ CurrentWrittenToDisk = rabbit_ff_registry:is_registry_written_to_disk(),
+
+ AllFeatureFlags =/= CurrentAllFeatureFlags orelse
+ FeatureStates =/= CurrentFeatureStates orelse
+ WrittenToDisk =/= CurrentWrittenToDisk;
+ false ->
+ true
+ end.
+
+-spec do_initialize_registry(feature_flags(),
+ feature_states(),
+ boolean()) ->
+ ok | {error, any()} | no_return().
+%% @private
+
+do_initialize_registry(AllFeatureFlags,
+ FeatureStates,
+ WrittenToDisk) ->
%% We log the state of those feature flags.
rabbit_log:info("Feature flags: list of feature flags found:", []),
lists:foreach(
fun(FeatureName) ->
rabbit_log:info(
"Feature flags: [~s] ~s",
- [case lists:member(FeatureName, EnabledFeatureNames) of
- true -> "x";
- false -> " "
+ [case maps:is_key(FeatureName, FeatureStates) of
+ true ->
+ case maps:get(FeatureName, FeatureStates) of
+ true -> "x";
+ state_changing -> "~"
+ end;
+ false ->
+ " "
end,
FeatureName])
end, lists:sort(maps:keys(AllFeatureFlags))),
+ rabbit_log:info("Feature flags: feature flag states written to disk: ~s",
+ [case WrittenToDisk of
+ true -> "yes";
+ false -> "no"
+ end]),
%% We request the registry to be regenerated and reloaded with the
%% new state.
regen_registry_mod(AllFeatureFlags,
- EnabledFeatureNames,
- ChangingFeatureNames,
+ FeatureStates,
WrittenToDisk).
-spec query_supported_feature_flags() -> feature_flags().
%% @private
+-ifdef(TEST).
+module_attributes_from_testsuite() ->
+ try
+ throw(force_exception)
+ catch
+ throw:force_exception:Stacktrace ->
+ Modules = lists:filter(
+ fun({Mod, _, _, _}) ->
+ ModS = atom_to_list(Mod),
+ re:run(ModS, "_SUITE$", [{capture, none}])
+ =:=
+ match
+ end, Stacktrace),
+ case Modules of
+ [{Module, _, _, _} | _] ->
+ ModInfo = Module:module_info(attributes),
+ Attrs = lists:append(
+ [Attr || {Name, Attr} <- ModInfo,
+ Name =:= rabbit_feature_flag]),
+ case Attrs of
+ [] -> [];
+ _ -> [{Module, Module, Attrs}]
+ end;
+ _ ->
+ []
+ end
+ end.
+
+query_supported_feature_flags() ->
+ rabbit_log:debug(
+ "Feature flags: query feature flags in loaded applications + test "
+ "module"),
+ AttributesPerApp = rabbit_misc:all_module_attributes(rabbit_feature_flag),
+ AttributesFromTestsuite = module_attributes_from_testsuite(),
+ AllAttributes = AttributesPerApp ++ AttributesFromTestsuite,
+ prepare_queried_feature_flags(AllAttributes, #{}).
+-else.
query_supported_feature_flags() ->
rabbit_log:debug(
"Feature flags: query feature flags in loaded applications"),
AttributesPerApp = rabbit_misc:all_module_attributes(rabbit_feature_flag),
- query_supported_feature_flags(AttributesPerApp, #{}).
+ prepare_queried_feature_flags(AttributesPerApp, #{}).
+-endif.
-query_supported_feature_flags([{App, _Module, Attributes} | Rest],
+prepare_queried_feature_flags([{App, _Module, Attributes} | Rest],
AllFeatureFlags) ->
rabbit_log:debug("Feature flags: application `~s` "
"has ~b feature flags",
@@ -828,8 +1006,8 @@ query_supported_feature_flags([{App, _Module, Attributes} | Rest],
FeatureName,
FeatureProps)
end, AllFeatureFlags, Attributes),
- query_supported_feature_flags(Rest, AllFeatureFlags1);
-query_supported_feature_flags([], AllFeatureFlags) ->
+ prepare_queried_feature_flags(Rest, AllFeatureFlags1);
+prepare_queried_feature_flags([], AllFeatureFlags) ->
AllFeatureFlags.
-spec merge_new_feature_flags(feature_flags(),
@@ -849,14 +1027,12 @@ merge_new_feature_flags(AllFeatureFlags, App, FeatureName, FeatureProps)
#{FeatureName => FeatureProps1}).
-spec regen_registry_mod(feature_flags(),
- [feature_name()],
- [feature_name()],
+ feature_states(),
boolean()) -> ok | {error, any()} | no_return().
%% @private
regen_registry_mod(AllFeatureFlags,
- EnabledFeatureNames,
- ChangingFeatureNames,
+ FeatureStates,
WrittenToDisk) ->
%% Here, we recreate the source code of the `rabbit_ff_registry`
%% module from scratch.
@@ -879,6 +1055,7 @@ regen_registry_mod(AllFeatureFlags,
erl_syntax:integer(A))
|| {F, A} <- [{get, 1},
{list, 1},
+ {states, 0},
{is_supported, 1},
{is_enabled, 1},
{is_registry_initialized, 0},
@@ -910,17 +1087,56 @@ regen_registry_mod(AllFeatureFlags,
[ListAllBody]),
EnabledFeatureFlags = maps:filter(
fun(FeatureName, _) ->
- lists:member(FeatureName,
- EnabledFeatureNames)
+ maps:is_key(FeatureName,
+ FeatureStates)
+ andalso
+ maps:get(FeatureName, FeatureStates)
+ =:=
+ true
end, AllFeatureFlags),
ListEnabledBody = erl_syntax:abstract(EnabledFeatureFlags),
- ListEnabledClause = erl_syntax:clause([erl_syntax:atom(enabled)],
- [],
- [ListEnabledBody]),
+ ListEnabledClause = erl_syntax:clause(
+ [erl_syntax:atom(enabled)],
+ [],
+ [ListEnabledBody]),
+ DisabledFeatureFlags = maps:filter(
+ fun(FeatureName, _) ->
+ not maps:is_key(FeatureName,
+ FeatureStates)
+ end, AllFeatureFlags),
+ ListDisabledBody = erl_syntax:abstract(DisabledFeatureFlags),
+ ListDisabledClause = erl_syntax:clause(
+ [erl_syntax:atom(disabled)],
+ [],
+ [ListDisabledBody]),
+ StateChangingFeatureFlags = maps:filter(
+ fun(FeatureName, _) ->
+ maps:is_key(FeatureName,
+ FeatureStates)
+ andalso
+ maps:get(FeatureName, FeatureStates)
+ =:=
+ state_changing
+ end, AllFeatureFlags),
+ ListStateChangingBody = erl_syntax:abstract(StateChangingFeatureFlags),
+ ListStateChangingClause = erl_syntax:clause(
+ [erl_syntax:atom(state_changing)],
+ [],
+ [ListStateChangingBody]),
ListFun = erl_syntax:function(
erl_syntax:atom(list),
- [ListAllClause, ListEnabledClause]),
+ [ListAllClause,
+ ListEnabledClause,
+ ListDisabledClause,
+ ListStateChangingClause]),
ListFunForm = erl_syntax:revert(ListFun),
+ %% states() -> ...
+ StatesBody = erl_syntax:abstract(FeatureStates),
+ StatesClause = erl_syntax:clause([], [], [StatesBody]),
+ StatesFun = erl_syntax:function(
+ erl_syntax:atom(states),
+ [StatesClause]),
+ StatesFunForm = erl_syntax:revert(StatesFun),
%% is_supported(_) -> ...
IsSupportedClauses = [erl_syntax:clause(
[erl_syntax:atom(FeatureName)],
@@ -940,14 +1156,12 @@ regen_registry_mod(AllFeatureFlags,
IsEnabledClauses = [erl_syntax:clause(
[erl_syntax:atom(FeatureName)],
[],
- [case lists:member(FeatureName,
- ChangingFeatureNames) of
+ [case maps:is_key(FeatureName, FeatureStates) of
true ->
- erl_syntax:atom(state_changing);
- false ->
erl_syntax:atom(
- lists:member(FeatureName,
- EnabledFeatureNames))
+ maps:get(FeatureName, FeatureStates));
+ false ->
+ erl_syntax:atom(false)
end])
|| FeatureName <- maps:keys(AllFeatureFlags)
],
@@ -984,10 +1198,12 @@ regen_registry_mod(AllFeatureFlags,
ExportForm,
GetFunForm,
ListFunForm,
+ StatesFunForm,
IsSupportedFunForm,
IsEnabledFunForm,
IsInitializedFunForm,
IsWrittenToDiskFunForm],
+ maybe_log_registry_source_code(Forms),
CompileOpts = [return_errors,
return_warnings],
case compile:forms(Forms, CompileOpts) of
@@ -1001,6 +1217,23 @@ regen_registry_mod(AllFeatureFlags,
{error, {compilation_failure, Errors, Warnings}}
end.
+-ifdef(TEST).
+maybe_log_registry_source_code(Forms) ->
+ case os:getenv("LOG_FF_REGISTRY") of
+ false ->
+ ok;
+ _ ->
+ rabbit_log:debug(
+ "== FEATURE FLAGS REGISTRY ==~n"
+ "~s~n"
+ "== END ==~n",
+ [erl_prettypr:format(erl_syntax:form_list(Forms))])
+ end.
+-else.
+maybe_log_registry_source_code(_) ->
+ ok.
+-endif.
+
-spec load_registry_mod(atom(), binary()) ->
ok | {error, any()} | no_return().
%% @private
@@ -1181,8 +1414,8 @@ enable_locally(FeatureName) when is_atom(FeatureName) ->
ok;
false ->
rabbit_log:debug(
- "Feature flag `~s`: enable locally (i.e. was enabled on the cluster "
- "when this node was not part of it)",
+ "Feature flag `~s`: enable locally (as part of feature "
+ "flag states synchronization)",
[FeatureName]),
do_enable_locally(FeatureName)
end.
@@ -1267,7 +1500,7 @@ run_migration_fun(FeatureName, FeatureProps, Arg) ->
{error, {invalid_migration_fun, Invalid}}
end.
--spec mark_as_enabled(feature_name(), boolean() | state_changing) ->
+-spec mark_as_enabled(feature_name(), feature_state()) ->
any() | {error, any()} | no_return().
%% @private
@@ -1279,7 +1512,7 @@ mark_as_enabled(FeatureName, IsEnabled) ->
Error
end.
--spec mark_as_enabled_locally(feature_name(), boolean() | state_changing) ->
+-spec mark_as_enabled_locally(feature_name(), feature_state()) ->
any() | {error, any()} | no_return().
%% @private
@@ -1302,18 +1535,11 @@ mark_as_enabled_locally(FeatureName, IsEnabled) ->
ok =:= try_to_write_enabled_feature_flags_list(
NewEnabledFeatureNames)
end,
- case IsEnabled of
- state_changing ->
- initialize_registry(EnabledFeatureNames,
- [FeatureName],
- WrittenToDisk);
- _ ->
- initialize_registry(NewEnabledFeatureNames,
- [],
- WrittenToDisk)
- end.
+ initialize_registry(#{},
+ #{FeatureName => IsEnabled},
+ WrittenToDisk).
--spec mark_as_enabled_remotely(feature_name(), boolean() | state_changing) ->
+-spec mark_as_enabled_remotely(feature_name(), feature_state()) ->
any() | {error, any()} | no_return().
%% @private
@@ -1321,7 +1547,10 @@ mark_as_enabled_remotely(FeatureName, IsEnabled) ->
Nodes = running_remote_nodes(),
mark_as_enabled_remotely(Nodes, FeatureName, IsEnabled, ?TIMEOUT).
--spec mark_as_enabled_remotely([node()], feature_name(), boolean() | state_changing, timeout()) ->
+-spec mark_as_enabled_remotely([node()],
+ feature_name(),
+ feature_state(),
+ timeout()) ->
any() | {error, any()} | no_return().
%% @private
@@ -1389,6 +1618,12 @@ remote_nodes() ->
running_remote_nodes() ->
mnesia:system_info(running_db_nodes) -- [node()].
+query_running_remote_nodes(Node, Timeout) ->
+ case rpc:call(Node, mnesia, system_info, [running_db_nodes], Timeout) of
+ {badrpc, _} = Error -> Error;
+ Nodes -> Nodes -- [node()]
+ end.
+
-spec does_node_support(node(), [feature_name()], timeout()) -> boolean().
%% @private
@@ -1399,30 +1634,19 @@ does_node_support(Node, FeatureNames, Timeout) ->
Node ->
is_supported_locally(FeatureNames);
_ ->
- rpc:call(Node,
- ?MODULE, is_supported_locally, [FeatureNames],
- Timeout)
+ run_feature_flags_mod_on_remote_node(
+ Node, is_supported_locally, [FeatureNames], Timeout)
end,
case Ret of
- {badrpc, {'EXIT',
- {undef,
- [{?MODULE, is_supported_locally, [FeatureNames], []}
- | _]}}} ->
- %% If rabbit_feature_flags:is_supported_locally/1 is undefined
- %% on the remote node, we consider it to be a 3.7.x node.
- %%
- %% Theoretically, it could be an older version (3.6.x and
- %% older). But the RabbitMQ version consistency check
- %% (rabbit_misc:version_minor_equivalent/2) called from
- %% rabbit_mnesia:check_rabbit_consistency/2 already blocked
- %% this situation from happening before we reach this point.
+ {error, pre_feature_flags_rabbitmq} ->
+ %% See run_feature_flags_mod_on_remote_node/4 for an
+ %% explanation why we consider this node a 3.7.x node.
rabbit_log:debug(
- "Feature flags: ?MODULE:is_supported_locally(~p) unavailable "
- "on node `~s`: assuming it is a RabbitMQ 3.7.x node "
- "=> consider the feature flags unsupported",
- [FeatureNames, Node]),
+ "Feature flags: no feature flags support on node `~s`, "
+ "consider the feature flags unsupported: ~p",
+ [Node, FeatureNames]),
false;
- {badrpc, Reason} ->
+ {error, Reason} ->
rabbit_log:error("Feature flags: error while querying `~p` "
"support on node ~s: ~p",
[FeatureNames, Node, Reason]),
@@ -1470,6 +1694,23 @@ check_node_compatibility(Node) ->
%% @see check_node_compatibility/1
check_node_compatibility(Node, Timeout) ->
+ %% Before checking compatibility, we exchange feature flags from
+ %% unknown Erlang applications. So we fetch remote feature flags
+ %% from applications which are not loaded locally, and the opposite.
+ %%
+ %% The goal is that such feature flags are not blocking the
+ %% communication between nodes because the code (which would
+ %% break) is missing on those nodes. Therefore they should not be
+ %% considered when determinig compatibility.
+ exchange_feature_flags_from_unknown_apps(Node, Timeout),
+
+ %% FIXME FIXME FIXME
+ %% Quand on tente de mettre deux nœuds en cluster, on a :
+ %% Feature flags: starting an unclustered node: all feature flags
+ %% will be enabled by default
+ %% Ça ne devrait sans doute pas être le cas...
+
+ %% We can now proceed with the actual compatibility check.
rabbit_log:debug("Feature flags: node `~s` compatibility check, part 1/2",
[Node]),
Part1 = local_enabled_feature_flags_is_supported_remotely(Node, Timeout),
@@ -1549,6 +1790,42 @@ remote_enabled_feature_flags_is_supported_locally(Node, Timeout) ->
is_supported_locally(RemoteEnabledFeatureNames)
end.
+-spec run_feature_flags_mod_on_remote_node(node(),
+ atom(),
+ [term()],
+ timeout()) ->
+ term() | {error, term()}.
+%% @private
+
+run_feature_flags_mod_on_remote_node(Node, Function, Args, Timeout) ->
+ case rpc:call(Node, ?MODULE, Function, Args, Timeout) of
+ {badrpc, {'EXIT',
+ {undef,
+ [{?MODULE, Function, Args, []}
+ | _]}}} ->
+ %% If rabbit_feature_flags:is_supported_locally/1 is undefined
+ %% on the remote node, we consider it to be a 3.7.x node.
+ %%
+ %% Theoretically, it could be an older version (3.6.x and
+ %% older). But the RabbitMQ version consistency check
+ %% (rabbit_misc:version_minor_equivalent/2) called from
+ %% rabbit_mnesia:check_rabbit_consistency/2 already blocked
+ %% this situation from happening before we reach this point.
+ rabbit_log:debug(
+ "Feature flags: ~s:~s~p unavailable on node `~s`: "
+ "assuming it is a RabbitMQ 3.7.x node",
+ [?MODULE, Function, Args, Node]),
+ {error, pre_feature_flags_rabbitmq};
+ {badrpc, Reason} = Error ->
+ rabbit_log:error(
+ "Feature flags: error while running ~s:~s~p "
+ "on node `~s`: ~p",
+ [?MODULE, Function, Args, Node, Reason]),
+ {error, Error};
+ Ret ->
+ Ret
+ end.
+
-spec query_remote_feature_flags(node(),
Which :: all | enabled | disabled,
timeout()) ->
@@ -1559,25 +1836,20 @@ query_remote_feature_flags(Node, Which, Timeout) ->
rabbit_log:debug("Feature flags: querying ~s feature flags "
"on node `~s`...",
[Which, Node]),
- case rpc:call(Node, ?MODULE, list, [Which], Timeout) of
- {badrpc, {'EXIT',
- {undef,
- [{?MODULE, list, [Which], []}
- | _]}}} ->
- %% See does_node_support/3 for an explanation why we
- %% consider this node a 3.7.x node.
+ case run_feature_flags_mod_on_remote_node(Node, list, [Which], Timeout) of
+ {error, pre_feature_flags_rabbitmq} ->
+ %% See run_feature_flags_mod_on_remote_node/4 for an
+ %% explanation why we consider this node a 3.7.x node.
rabbit_log:debug(
- "Feature flags: ?MODULE:list(~s) unavailable on node `~s`: "
- "assuming it is a RabbitMQ 3.7.x node "
- "=> consider the list empty",
- [Which, Node]),
+ "Feature flags: no feature flags support on node `~s`, "
+ "consider the list of feature flags empty", [Node]),
#{};
- {badrpc, Reason} = Error ->
+ {error, Reason} = Error ->
rabbit_log:error(
"Feature flags: error while querying ~s feature flags "
"on node `~s`: ~p",
[Which, Node, Reason]),
- {error, Error};
+ Error;
RemoteFeatureFlags when is_map(RemoteFeatureFlags) ->
RemoteFeatureNames = maps:keys(RemoteFeatureFlags),
rabbit_log:debug("Feature flags: querying ~s feature flags "
@@ -1586,47 +1858,129 @@ query_remote_feature_flags(Node, Which, Timeout) ->
RemoteFeatureFlags
end.
--spec sync_feature_flags_with_cluster([node()]) ->
+-spec merge_feature_flags_from_unknown_apps(feature_flags()) ->
+ ok | {error, any()}.
+%% @private
+
+merge_feature_flags_from_unknown_apps(FeatureFlags)
+ when is_map(FeatureFlags) ->
+ LoadedApps = [App || {App, _, _} <- application:loaded_applications()],
+ FeatureFlagsFromUnknownApps =
+ maps:fold(
+ fun(FeatureName, FeatureProps, UnknownFF) ->
+ case is_supported_locally(FeatureName) of
+ true ->
+ UnknownFF;
+ false ->
+ FeatureProvider = maps:get(provided_by, FeatureProps),
+ case lists:member(FeatureProvider, LoadedApps) of
+ true -> UnknownFF;
+ false -> maps:put(FeatureName, FeatureProps,
+ UnknownFF)
+ end
+ end
+ end,
+ #{},
+ FeatureFlags),
+ rabbit_log:debug(
+ "Feature flags: register feature flags provided by applications "
+ "unknown locally: ~p",
+ [maps:keys(FeatureFlagsFromUnknownApps)]),
+ initialize_registry(FeatureFlagsFromUnknownApps).
+
+exchange_feature_flags_from_unknown_apps(Node, Timeout) ->
+ %% The first step is to fetch feature flags from Erlang applications
+ %% we don't know locally (they are loaded remotely, but not
+ %% locally).
+ fetch_remote_feature_flags_from_apps_unknown_locally(Node, Timeout),
+
+ %% The next step is to do the opposite: push feature flags to remote
+ %% nodes so they can register those from applications they don't
+ %% know.
+ push_local_feature_flags_from_apps_unknown_remotely(Node, Timeout).
+
+fetch_remote_feature_flags_from_apps_unknown_locally(Node, Timeout) ->
+ RemoteFeatureFlags = query_remote_feature_flags(Node, all, Timeout),
+ merge_feature_flags_from_unknown_apps(RemoteFeatureFlags).
+
+push_local_feature_flags_from_apps_unknown_remotely(Node, Timeout) ->
+ LocalFeatureFlags = list(all),
+ push_local_feature_flags_from_apps_unknown_remotely(
+ Node, LocalFeatureFlags, Timeout).
+
+push_local_feature_flags_from_apps_unknown_remotely(
+ Node, FeatureFlags, Timeout)
+ when map_size(FeatureFlags) > 0 ->
+ case query_running_remote_nodes(Node, Timeout) of
+ {badrpc, Reason} ->
+ {error, Reason};
+ Nodes ->
+ lists:foreach(
+ fun(N) ->
+ run_feature_flags_mod_on_remote_node(
+ N,
+ merge_feature_flags_from_unknown_apps,
+ [FeatureFlags],
+ Timeout)
+ end, Nodes)
+ end;
+push_local_feature_flags_from_apps_unknown_remotely(_, _, _) ->
+ ok.
+
+-spec sync_feature_flags_with_cluster([node()], boolean()) ->
ok | {error, any()} | no_return().
%% @private
-sync_feature_flags_with_cluster(Nodes) ->
- sync_feature_flags_with_cluster(Nodes, ?TIMEOUT).
+sync_feature_flags_with_cluster(Nodes, NodeIsVirgin) ->
+ sync_feature_flags_with_cluster(Nodes, NodeIsVirgin, ?TIMEOUT).
--spec sync_feature_flags_with_cluster([node()], timeout()) ->
+-spec sync_feature_flags_with_cluster([node()], boolean(), timeout()) ->
ok | {error, any()} | no_return().
%% @private
-sync_feature_flags_with_cluster([], _) ->
+sync_feature_flags_with_cluster([], NodeIsVirgin, _) ->
verify_which_feature_flags_are_actually_enabled(),
- FeatureNames = get_forced_feature_flag_names(),
- case remote_nodes() of
- [] when FeatureNames =:= undefined ->
- rabbit_log:debug(
- "Feature flags: starting an unclustered node: "
- "all feature flags will be enabled by default"),
- enable_all();
- [] ->
- case FeatureNames of
- [] ->
+ case NodeIsVirgin of
+ true ->
+ FeatureNames = get_forced_feature_flag_names(),
+ case remote_nodes() of
+ [] when FeatureNames =:= undefined ->
rabbit_log:debug(
- "Feature flags: starting an unclustered node: "
- "all feature flags are forcibly left disabled "
- "from the RABBITMQ_FEATURE_FLAGS environment "
- "variable");
+ "Feature flags: starting an unclustered node "
+ "for the first time: all feature flags will be "
+ "enabled by default"),
+ enable_all();
+ [] ->
+ case FeatureNames of
+ [] ->
+ rabbit_log:debug(
+ "Feature flags: starting an unclustered "
+ "node for the first time: all feature "
+ "flags are forcibly left disabled from "
+ "the RABBITMQ_FEATURE_FLAGS environment "
+ "variable"),
+ ok;
+ _ ->
+ rabbit_log:debug(
+ "Feature flags: starting an unclustered "
+ "node for the first time: only the "
+ "following feature flags specified in "
+ "the RABBITMQ_FEATURE_FLAGS environment "
+ "variable will be enabled: ~p",
+ [FeatureNames]),
+ enable(FeatureNames)
+ end;
_ ->
- rabbit_log:debug(
- "Feature flags: starting an unclustered node: "
- "only the following feature flags specified in "
- "the RABBITMQ_FEATURE_FLAGS environment variable "
- "will be enabled: ~p",
- [FeatureNames])
- end,
- enable(FeatureNames);
- _ ->
+ ok
+ end;
+ false ->
+ rabbit_log:debug(
+ "Feature flags: starting an unclustered node which is "
+ "already initialized: all feature flags left in their "
+ "current state"),
ok
end;
-sync_feature_flags_with_cluster(Nodes, Timeout) ->
+sync_feature_flags_with_cluster(Nodes, _, Timeout) ->
verify_which_feature_flags_are_actually_enabled(),
RemoteNodes = Nodes -- [node()],
sync_feature_flags_with_cluster1(RemoteNodes, Timeout).
@@ -1642,19 +1996,45 @@ sync_feature_flags_with_cluster1(RemoteNodes, Timeout) ->
Error;
RemoteFeatureFlags ->
RemoteFeatureNames = maps:keys(RemoteFeatureFlags),
- do_sync_feature_flags_with_node1(RemoteFeatureNames)
+ rabbit_log:debug(
+ "Feature flags: enabling locally feature flags already "
+ "enabled on node `~s`...",
+ [RandomRemoteNode]),
+ case do_sync_feature_flags_with_node(RemoteFeatureNames) of
+ ok ->
+ sync_feature_flags_with_cluster2(
+ RandomRemoteNode, Timeout);
+ Error ->
+ Error
+ end
+ end.
+
+sync_feature_flags_with_cluster2(RandomRemoteNode, Timeout) ->
+ LocalFeatureNames = maps:keys(list(enabled)),
+ rabbit_log:debug(
+ "Feature flags: enabling on node `~s` feature flags already "
+ "enabled locally...",
+ [RandomRemoteNode]),
+ Ret = run_feature_flags_mod_on_remote_node(
+ RandomRemoteNode,
+ do_sync_feature_flags_with_node,
+ [LocalFeatureNames],
+ Timeout),
+ case Ret of
+ {error, pre_feature_flags_rabbitmq} -> ok;
+ _ -> Ret
end.
pick_one_node(Nodes) ->
RandomIndex = rand:uniform(length(Nodes)),
lists:nth(RandomIndex, Nodes).
-do_sync_feature_flags_with_node1([FeatureFlag | Rest]) ->
+do_sync_feature_flags_with_node([FeatureFlag | Rest]) ->
case enable_locally(FeatureFlag) of
- ok -> do_sync_feature_flags_with_node1(Rest);
+ ok -> do_sync_feature_flags_with_node(Rest);
Error -> Error
end;
-do_sync_feature_flags_with_node1([]) ->
+do_sync_feature_flags_with_node([]) ->
ok.
-spec get_forced_feature_flag_names() -> [feature_name()] | undefined.
@@ -1793,5 +2173,92 @@ verify_which_feature_flags_are_actually_enabled() ->
WrittenToDisk = ok =:= try_to_write_enabled_feature_flags_list(
RepairedEnabledFeatureNames),
initialize_registry(
- RepairedEnabledFeatureNames, [], WrittenToDisk)
+ #{},
+ list_of_enabled_feature_flags_to_feature_states(
+ RepairedEnabledFeatureNames),
+ WrittenToDisk)
+ end.
+
+-spec refresh_feature_flags_after_app_load([atom()]) ->
+ ok | {error, any()} | no_return().
+
+refresh_feature_flags_after_app_load([]) ->
+ ok;
+refresh_feature_flags_after_app_load(Apps) ->
+ rabbit_log:debug(
+ "Feature flags: new apps loaded: ~p -> refreshing feature flags",
+ [Apps]),
+
+ FeatureFlags0 = list(all),
+ FeatureFlags1 = query_supported_feature_flags(),
+
+ %% The following list contains all the feature flags this node
+ %% learned about only because remote nodes have them. Now, the
+ %% applications providing them are loaded locally as well.
+ %% Therefore, we may run their migration function in case the state
+ %% of this node needs it.
+ AlreadySupportedFeatureNames = maps:keys(
+ maps:filter(
+ fun(_, #{provided_by := App}) ->
+ lists:member(App, Apps)
+ end, FeatureFlags0)),
+ case AlreadySupportedFeatureNames of
+ [] ->
+ ok;
+ _ ->
+ rabbit_log:debug(
+ "Feature flags: new apps loaded: feature flags already "
+ "supported: ~p",
+ [lists:sort(AlreadySupportedFeatureNames)])
+ end,
+
+ %% The following list contains all the feature flags no nodes in the
+ %% cluster knew about before: this is the first time we see them in
+ %% this instance of the cluster. We need to register them on all
+ %% nodes.
+ NewSupportedFeatureFlags = maps:filter(
+ fun(FeatureName, _) ->
+ not maps:is_key(FeatureName,
+ FeatureFlags0)
+ end, FeatureFlags1),
+ case maps:keys(NewSupportedFeatureFlags) of
+ [] ->
+ ok;
+ NewSupportedFeatureNames ->
+ rabbit_log:debug(
+ "Feature flags: new apps loaded: new feature flags (unseen so "
+ "far): ~p ",
+ [lists:sort(NewSupportedFeatureNames)])
+ end,
+
+ case initialize_registry() of
+ ok ->
+ Ret = maybe_enable_locally_after_app_load(
+ AlreadySupportedFeatureNames),
+ case Ret of
+ ok ->
+ share_new_feature_flags_after_app_load(
+ NewSupportedFeatureFlags, ?TIMEOUT);
+ Error ->
+ Error
+ end;
+ Error ->
+ Error
+ end.
+
+maybe_enable_locally_after_app_load([]) ->
+ ok;
+maybe_enable_locally_after_app_load([FeatureName | Rest]) ->
+ case is_enabled(FeatureName) of
+ true ->
+ case do_enable_locally(FeatureName) of
+ ok -> maybe_enable_locally_after_app_load(Rest);
+ Error -> Error
+ end;
+ false ->
+ maybe_enable_locally_after_app_load(Rest)
end.
+
+share_new_feature_flags_after_app_load(FeatureFlags, Timeout) ->
+ push_local_feature_flags_from_apps_unknown_remotely(
+ node(), FeatureFlags, Timeout).
diff --git a/src/rabbit_ff_registry.erl b/src/rabbit_ff_registry.erl
index 46d439001f..b79f8b45ef 100644
--- a/src/rabbit_ff_registry.erl
+++ b/src/rabbit_ff_registry.erl
@@ -31,6 +31,7 @@
-export([get/1,
list/1,
+ states/0,
is_supported/1,
is_enabled/1,
is_registry_initialized/0,
@@ -76,6 +77,23 @@ list(Which) ->
true -> #{}
end.
+-spec states() -> rabbit_feature_flags:feature_states().
+%% @doc
+%% Returns the states of supported feature flags.
+%%
+%% Only the informations stored in the local registry is used to answer
+%% this call.
+%%
+%% @returns A map of feature flag states.
+
+states() ->
+ rabbit_feature_flags:initialize_registry(),
+ %% See get/1 for an explanation of the case statement below.
+ case is_registry_initialized() of
+ false -> ?MODULE:states();
+ true -> #{}
+ end.
+
-spec is_supported(rabbit_feature_flags:feature_name()) -> boolean().
%% @doc
%% Returns if a feature flag is supported.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 61b9e10e70..83a6e65817 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -528,6 +528,7 @@ dir() -> mnesia:system_info(directory).
%% nodes in the cluster already. It also updates the cluster status
%% file.
init_db(ClusterNodes, NodeType, CheckOtherNodes) ->
+ NodeIsVirgin = is_virgin_node(),
Nodes = change_extra_db_nodes(ClusterNodes, CheckOtherNodes),
%% Note that we use `system_info' here and not the cluster status
%% since when we start rabbit for the first time the cluster
@@ -551,7 +552,7 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) ->
ok = rabbit_table:wait_for_replicated(_Retry = true),
ok = rabbit_table:create_local_copy(NodeType)
end,
- ensure_feature_flags_are_in_sync(Nodes),
+ ensure_feature_flags_are_in_sync(Nodes, NodeIsVirgin),
ensure_schema_integrity(),
rabbit_node_monitor:update_cluster_status(),
ok.
@@ -621,12 +622,12 @@ ensure_mnesia_not_running() ->
throw({error, mnesia_unexpectedly_running})
end.
-ensure_feature_flags_are_in_sync(Nodes) ->
- case rabbit_feature_flags:sync_feature_flags_with_cluster(Nodes) of
- ok ->
- ok;
- {error, Reason} ->
- throw({error, {incompatible_feature_flags, Reason}})
+ensure_feature_flags_are_in_sync(Nodes, NodeIsVirgin) ->
+ Ret = rabbit_feature_flags:sync_feature_flags_with_cluster(
+ Nodes, NodeIsVirgin),
+ case Ret of
+ ok -> ok;
+ {error, Reason} -> throw({error, {incompatible_feature_flags, Reason}})
end.
ensure_schema_integrity() ->
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index abb62a4f9d..ea7c2d47fa 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -469,7 +469,6 @@ clean_plugins(Plugins) ->
clean_plugin(Plugin, ExpandDir) ->
{ok, Mods} = application:get_key(Plugin, modules),
application:unload(Plugin),
- rabbit_feature_flags:initialize_registry(),
[begin
code:soft_purge(Mod),
code:delete(Mod),
@@ -714,4 +713,6 @@ remove_plugins(Plugins) ->
maybe_report_plugin_loading_problems([]) ->
ok;
maybe_report_plugin_loading_problems(Problems) ->
- rabbit_log:warning("Problem reading some plugins: ~p~n", [Problems]).
+ io:format(standard_error,
+ "Problem reading some plugins: ~p~n",
+ [Problems]).