summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2022-01-22 18:34:44 +0100
committerJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2022-01-22 18:34:44 +0100
commit3675ddf24a00b8e4e85e0420352680a51b98a69c (patch)
treeaba869c16f8b8495fe396e0fdd22c0e9693ba906
parent9179b55acc797d04e3e138a6572c9be530c0f626 (diff)
downloadrabbitmq-server-git-add-feature-flags-controller.tar.gz
WIP; more testing with MFv2 + testcase improvementsadd-feature-flags-controller
-rw-r--r--deps/rabbit/src/rabbit_ff_controller.erl6
-rw-r--r--deps/rabbit/test/feature_flags_v2_SUITE.erl224
2 files changed, 177 insertions, 53 deletions
diff --git a/deps/rabbit/src/rabbit_ff_controller.erl b/deps/rabbit/src/rabbit_ff_controller.erl
index 4d150f18cc..2b5dcc9f77 100644
--- a/deps/rabbit/src/rabbit_ff_controller.erl
+++ b/deps/rabbit/src/rabbit_ff_controller.erl
@@ -765,8 +765,10 @@ rpc_call(Node, Module, Function, Args, Timeout) ->
{error, pre_feature_flags_rabbitmq};
{badrpc, Reason} = Error ->
?LOG_ERROR(
- "Feature flags: error while running ~s:~s~p "
- "on node `~s`: ~p",
+ "Feature flags: error while running:~n"
+ "Feature flags: ~s:~s~p~n"
+ "Feature flags: on node `~s`:~n"
+ "Feature flags: ~p",
[?MODULE, Function, Args, Node, Reason],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
{error, Error};
diff --git a/deps/rabbit/test/feature_flags_v2_SUITE.erl b/deps/rabbit/test/feature_flags_v2_SUITE.erl
index e3f0474d88..af6d2dd9b5 100644
--- a/deps/rabbit/test/feature_flags_v2_SUITE.erl
+++ b/deps/rabbit/test/feature_flags_v2_SUITE.erl
@@ -35,10 +35,11 @@
enable_supported_feature_flag_in_a_3node_cluster/1,
enable_partially_supported_feature_flag_in_a_3node_cluster/1,
enable_unsupported_feature_flag_in_a_3node_cluster/1,
- enable_feature_flag_in_a_cluster_and_add_member_after/1,
- enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv1/1,
- enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv2/1,
- enable_feature_flag_in_a_cluster_and_remove_member_concurrently/1
+ enable_feature_flag_in_cluster_and_add_member_after/1,
+ enable_feature_flag_in_cluster_and_add_member_concurrently_mfv1/1,
+ enable_feature_flag_in_cluster_and_add_member_concurrently_mfv2/1,
+ enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv1/1,
+ enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2/1
]).
suite() ->
@@ -64,10 +65,11 @@ groups() ->
enable_supported_feature_flag_in_a_3node_cluster,
enable_partially_supported_feature_flag_in_a_3node_cluster,
enable_unsupported_feature_flag_in_a_3node_cluster,
- enable_feature_flag_in_a_cluster_and_add_member_after,
- enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv1,
- enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv2,
- enable_feature_flag_in_a_cluster_and_remove_member_concurrently
+ enable_feature_flag_in_cluster_and_add_member_after,
+ enable_feature_flag_in_cluster_and_add_member_concurrently_mfv1,
+ enable_feature_flag_in_cluster_and_add_member_concurrently_mfv2,
+ enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv1,
+ enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2
]}
],
[
@@ -279,7 +281,7 @@ mf_count_runs(_FeatureName, _FeatureProps, enable) ->
mf_wait_and_count_runs(FeatureName, FeatureProps, enable = Arg) ->
Peer = get_peer_proc(),
- Peer ! {self(), waiting},
+ Peer ! {node(), self(), waiting},
?LOG_NOTICE("Migration function: waiting for signal from ~p...", [Peer]),
receive proceed -> ok end,
?LOG_NOTICE("Migration function: unblocked!", []),
@@ -290,7 +292,7 @@ mf_wait_and_count_runs_v2(
props = FeatureProps,
command = enable = Arg}) ->
Peer = get_peer_proc(),
- Peer ! {self(), waiting},
+ Peer ! {node(), self(), waiting},
?LOG_NOTICE("Migration function: waiting for signal from ~p...", [Peer]),
receive proceed -> ok end,
?LOG_NOTICE("Migration function: unblocked!", []),
@@ -590,7 +592,7 @@ enable_unsupported_feature_flag_in_a_3node_cluster(Config) ->
|| Node <- Nodes],
ok.
-enable_feature_flag_in_a_cluster_and_add_member_after(Config) ->
+enable_feature_flag_in_cluster_and_add_member_after(Config) ->
AllNodes = [NewNode | [FirstNode | _] = Nodes] = ?config(nodes, Config),
connect_nodes(Nodes),
override_running_nodes([NewNode]),
@@ -700,7 +702,7 @@ enable_feature_flag_in_a_cluster_and_add_member_after(Config) ->
|| Node <- AllNodes],
ok.
-enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv1(Config) ->
+enable_feature_flag_in_cluster_and_add_member_concurrently_mfv1(Config) ->
AllNodes = [NewNode | [FirstNode | _] = Nodes] = ?config(nodes, Config),
connect_nodes(Nodes),
override_running_nodes([NewNode]),
@@ -755,7 +757,9 @@ enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv1(Config) ->
%% blocked and waits for a message from this process. Therefore, we are
%% sure the feature flag is in the `state_changing' state and we can try
%% to add a new node and sync its feature flags.
- FirstNodeMigFunPid = receive {MigFunPid1, waiting} -> MigFunPid1 end,
+ FirstNodeMigFunPid = receive
+ {_Node, MigFunPid1, waiting} -> MigFunPid1
+ end,
%% Check compatibility between NewNodes and Nodes. This doesn't block.
ok = run_on_node(
@@ -862,10 +866,10 @@ enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv1(Config) ->
%% which ran.
ct:pal("Unblocking other nodes, including the joining one"),
OtherMigratedNodes = [receive
- {MigFunPid2, waiting} ->
+ {Node, MigFunPid2, waiting} ->
MigFunPid2 ! proceed,
- node(MigFunPid2)
- end || _ <- ExpectedNodes -- [FirstMigratedNode]],
+ Node
+ end || Node <- ExpectedNodes -- [FirstMigratedNode]],
MigratedNodes = [FirstMigratedNode | OtherMigratedNodes],
?assertEqual(lists:sort(ExpectedNodes), lists:sort(MigratedNodes)),
@@ -894,7 +898,7 @@ enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv1(Config) ->
|| Node <- AllNodes],
ok.
-enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv2(Config) ->
+enable_feature_flag_in_cluster_and_add_member_concurrently_mfv2(Config) ->
AllNodes = [NewNode | [FirstNode | _] = Nodes] = ?config(nodes, Config),
connect_nodes(Nodes),
override_running_nodes([NewNode]),
@@ -924,18 +928,25 @@ enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv2(Config) ->
ct:pal(
"Enabling the feature flag in the cluster (in a separate process)"),
Peer = self(),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ %% The migration function uses the `Peer' PID (the process
+ %% executing the testcase) to notify its own PID and wait
+ %% for a signal from `Peer' to proceed and finish the
+ %% migration.
+ record_peer_proc(Peer),
+ ok
+ end,
+ [])
+ || Node <- AllNodes],
Enabler = spawn_link(
fun() ->
ok =
run_on_node(
FirstNode,
fun() ->
- %% The migration function uses the `Peer'
- %% PID (the process executing the testcase)
- %% to notify its own PID and wait for a
- %% signal from `Peer' to proceed and finish
- %% the migration.
- record_peer_proc(Peer),
?assertEqual(
ok,
rabbit_feature_flags:enable(
@@ -944,25 +955,15 @@ enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv2(Config) ->
end,
[])
end),
- _ = [ok =
- run_on_node(
- Node,
- fun() ->
- %% We just need to record `Peer' on other nodes in the
- %% cluster because the migration will run on them with
- %% `feature_flags_v2'.
- record_peer_proc(Peer),
- ok
- end,
- [])
- || Node <- tl(Nodes)],
%% By waiting for the message from one of the migration function
%% instances, we make sure the feature flags controller on `FirstNode' is
%% blocked and waits for a message from this process. Therefore, we are
%% sure the feature flag is in the `state_changing' state and we can try
%% to add a new node and sync its feature flags.
- FirstNodeMigFunPid = receive {MigFunPid1, waiting} -> MigFunPid1 end,
+ FirstNodeMigFunPid = receive
+ {_Node, MigFunPid1, waiting} -> MigFunPid1
+ end,
%% Check compatibility between NewNodes and Nodes. This doesn't block.
ok = run_on_node(
@@ -994,7 +995,6 @@ enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv2(Config) ->
run_on_node(
NewNode,
fun() ->
- record_peer_proc(Peer),
?assertEqual(
ok,
rabbit_feature_flags:
@@ -1033,11 +1033,12 @@ enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv2(Config) ->
SyncerMRef = erlang:monitor(process, Syncer),
unlink(Enabler),
unlink(Syncer),
+
%% The migration function runs on all clustered nodes with v2, including
%% the one joining the cluster, thanks to the synchronization.
%%
%% When this testcase runs with feature flags v1, the feature flag we want
- %% to enable uses the migration function API v2: this implicitely enables
+ %% to enable uses the migration function API v2: this implicitly enables
%% `feature_flags_v2'. As part of the synchronization, the node still on
%% feature flags v1 will try to sync `feature_flags_v2' specificaly first.
%% After that, the controller-based sync proceeds.
@@ -1055,10 +1056,10 @@ enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv2(Config) ->
%% which ran.
ct:pal("Unblocking other nodes, including the joining one"),
OtherMigratedNodes = [receive
- {MigFunPid2, waiting} ->
+ {Node, MigFunPid2, waiting} ->
MigFunPid2 ! proceed,
- node(MigFunPid2)
- end || _ <- ExpectedNodes -- [FirstMigratedNode]],
+ Node
+ end || Node <- ExpectedNodes -- [FirstMigratedNode]],
MigratedNodes = [FirstMigratedNode | OtherMigratedNodes],
?assertEqual(lists:sort(ExpectedNodes), lists:sort(MigratedNodes)),
@@ -1097,7 +1098,7 @@ enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv2(Config) ->
|| Node <- AllNodes],
ok.
-enable_feature_flag_in_a_cluster_and_remove_member_concurrently(Config) ->
+enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv1(Config) ->
AllNodes = [LeavingNode | [FirstNode | _] = Nodes] = ?config(
nodes, Config),
connect_nodes(AllNodes),
@@ -1110,8 +1111,6 @@ enable_feature_flag_in_a_cluster_and_remove_member_concurrently(Config) ->
migration_fun => {?MODULE, mf_wait_and_count_runs}}},
inject_on_nodes(AllNodes, FeatureFlags),
- UsingFFv1 = not ?config(enable_feature_flags_v2, Config),
-
ct:pal(
"Checking the feature flag is supported but disabled on all nodes"),
_ = [ok =
@@ -1128,10 +1127,6 @@ enable_feature_flag_in_a_cluster_and_remove_member_concurrently(Config) ->
ct:pal(
"Enabling the feature flag in the cluster (in a separate process)"),
Peer = self(),
- ExpectedRet = case UsingFFv1 of
- true -> ok;
- false -> {error, {badrpc, nodedown}}
- end,
Enabler = spawn_link(
fun() ->
ok =
@@ -1145,7 +1140,7 @@ enable_feature_flag_in_a_cluster_and_remove_member_concurrently(Config) ->
%% the migration.
record_peer_proc(Peer),
?assertEqual(
- ExpectedRet,
+ ok,
rabbit_feature_flags:enable(
FeatureName)),
ok
@@ -1158,7 +1153,9 @@ enable_feature_flag_in_a_cluster_and_remove_member_concurrently(Config) ->
%% blocked and waits for a message from this process. Therefore, we are
%% sure the feature flag is in the `state_changing' state and we can try
%% to add a new node and sync its feature flags.
- FirstNodeMigFunPid = receive {MigFunPid1, waiting} -> MigFunPid1 end,
+ FirstNodeMigFunPid = receive
+ {_Node, MigFunPid1, waiting} -> MigFunPid1
+ end,
%% Remove node from cluster.
stop_slave_node(LeavingNode),
@@ -1189,9 +1186,134 @@ enable_feature_flag_in_a_cluster_and_remove_member_concurrently(Config) ->
run_on_node(
Node,
fun() ->
+ ?assert(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok
+ end,
+ [])
+ || Node <- Nodes],
+ ok.
+
+enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2(Config) ->
+ AllNodes = [LeavingNode | [FirstNode | _] = Nodes] = ?config(
+ nodes, Config),
+ connect_nodes(AllNodes),
+ override_running_nodes(AllNodes),
+
+ FeatureName = ?FUNCTION_NAME,
+ FeatureFlags = #{FeatureName =>
+ #{provided_by => ?MODULE,
+ stability => stable,
+ migration_fun => {?MODULE,
+ mf_wait_and_count_runs_v2}}},
+ inject_on_nodes(AllNodes, FeatureFlags),
+
+ UsingFFv1 = not ?config(enable_feature_flags_v2, Config),
+
+ ct:pal(
+ "Checking the feature flag is supported but disabled on all nodes"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assert(rabbit_feature_flags:is_supported(FeatureName)),
+ ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok
+ end,
+ [])
+ || Node <- AllNodes],
+
+ ct:pal(
+ "Enabling the feature flag in the cluster (in a separate process)"),
+ Peer = self(),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ %% The migration function uses the `Peer' PID (the process
+ %% executing the testcase) to notify its own PID and wait
+ %% for a signal from `Peer' to proceed and finish the
+ %% migration.
+ record_peer_proc(Peer),
+ ok
+ end,
+ [])
+ || Node <- AllNodes],
+ Enabler = spawn_link(
+ fun() ->
+ ok =
+ run_on_node(
+ FirstNode,
+ fun() ->
+ ?assertEqual(
+ {error, {badrpc, nodedown}},
+ rabbit_feature_flags:enable(
+ FeatureName)),
+ ok
+ end,
+ [])
+ end),
+
+ %% By waiting for the message from one of the migration function
+ %% instances, we make sure the feature flags controller on `FirstNode' is
+ %% blocked and waits for a message from this process. Therefore, we are
+ %% sure the feature flag is in the `state_changing' state and we can try
+ %% to add a new node and sync its feature flags.
+ FirstNodeMigFunPid = receive
+ {Node, MigFunPid1, waiting} -> MigFunPid1
+ end,
+
+ %% Remove node from cluster.
+ stop_slave_node(LeavingNode),
+ override_running_nodes(Nodes),
+
+ %% Unblock the migration functions on `Nodes'.
+ EnablerMRef = erlang:monitor(process, Enabler),
+ unlink(Enabler),
+
+ %% The migration function runs on all clustered nodes with v2.
+ %%
+ %% When this testcase runs with feature flags v1, the feature flag we want
+ %% to enable uses the migration function API v2: this implicitly enables
+ %% `feature_flags_v2'. As part of the synchronization, the node still on
+ %% feature flags v1 will try to sync `feature_flags_v2' specificaly first.
+ %% After that, the controller-based sync proceeds.
+ ExpectedNodes = Nodes,
+
+ %% Unblock the migration function for which we already consumed the
+ %% `waiting' notification.
+ FirstMigratedNode = node(FirstNodeMigFunPid),
+ ct:pal(
+ "Unblocking first node (~p @ ~s)",
+ [FirstNodeMigFunPid, FirstMigratedNode]),
+ FirstNodeMigFunPid ! proceed,
+
+ %% Unblock the rest and collect the node names of all migration functions
+ %% which ran.
+ ct:pal("Unblocking other nodes"),
+ OtherMigratedNodes = [receive
+ {Node, MigFunPid2, waiting} ->
+ MigFunPid2 ! proceed,
+ Node
+ end || Node <- ExpectedNodes -- [FirstMigratedNode]],
+ MigratedNodes = [FirstMigratedNode | OtherMigratedNodes],
+ ?assertEqual(lists:sort(ExpectedNodes), lists:sort(MigratedNodes)),
+
+ ct:pal("Waiting for spawned processes to terminate"),
+ receive
+ {'DOWN', EnablerMRef, process, Enabler, EnablerReason} ->
+ ?assertEqual(normal, EnablerReason)
+ end,
+
+ ct:pal(
+ "Checking the feature flag is enabled (v1) or disabled (v2) in the "
+ "cluster"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
case UsingFFv1 of
true ->
- ?assert(
+ ?assertNot(
rabbit_feature_flags:is_enabled(FeatureName));
false ->
?assertNot(