diff options
-rw-r--r-- | deps/rabbit/src/rabbit_ff_controller.erl | 6 | ||||
-rw-r--r-- | deps/rabbit/test/feature_flags_v2_SUITE.erl | 224 |
2 files changed, 177 insertions, 53 deletions
diff --git a/deps/rabbit/src/rabbit_ff_controller.erl b/deps/rabbit/src/rabbit_ff_controller.erl index 4d150f18cc..2b5dcc9f77 100644 --- a/deps/rabbit/src/rabbit_ff_controller.erl +++ b/deps/rabbit/src/rabbit_ff_controller.erl @@ -765,8 +765,10 @@ rpc_call(Node, Module, Function, Args, Timeout) -> {error, pre_feature_flags_rabbitmq}; {badrpc, Reason} = Error -> ?LOG_ERROR( - "Feature flags: error while running ~s:~s~p " - "on node `~s`: ~p", + "Feature flags: error while running:~n" + "Feature flags: ~s:~s~p~n" + "Feature flags: on node `~s`:~n" + "Feature flags: ~p", [?MODULE, Function, Args, Node, Reason], #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), {error, Error}; diff --git a/deps/rabbit/test/feature_flags_v2_SUITE.erl b/deps/rabbit/test/feature_flags_v2_SUITE.erl index e3f0474d88..af6d2dd9b5 100644 --- a/deps/rabbit/test/feature_flags_v2_SUITE.erl +++ b/deps/rabbit/test/feature_flags_v2_SUITE.erl @@ -35,10 +35,11 @@ enable_supported_feature_flag_in_a_3node_cluster/1, enable_partially_supported_feature_flag_in_a_3node_cluster/1, enable_unsupported_feature_flag_in_a_3node_cluster/1, - enable_feature_flag_in_a_cluster_and_add_member_after/1, - enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv1/1, - enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv2/1, - enable_feature_flag_in_a_cluster_and_remove_member_concurrently/1 + enable_feature_flag_in_cluster_and_add_member_after/1, + enable_feature_flag_in_cluster_and_add_member_concurrently_mfv1/1, + enable_feature_flag_in_cluster_and_add_member_concurrently_mfv2/1, + enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv1/1, + enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2/1 ]). suite() -> @@ -64,10 +65,11 @@ groups() -> enable_supported_feature_flag_in_a_3node_cluster, enable_partially_supported_feature_flag_in_a_3node_cluster, enable_unsupported_feature_flag_in_a_3node_cluster, - enable_feature_flag_in_a_cluster_and_add_member_after, - enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv1, - enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv2, - enable_feature_flag_in_a_cluster_and_remove_member_concurrently + enable_feature_flag_in_cluster_and_add_member_after, + enable_feature_flag_in_cluster_and_add_member_concurrently_mfv1, + enable_feature_flag_in_cluster_and_add_member_concurrently_mfv2, + enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv1, + enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2 ]} ], [ @@ -279,7 +281,7 @@ mf_count_runs(_FeatureName, _FeatureProps, enable) -> mf_wait_and_count_runs(FeatureName, FeatureProps, enable = Arg) -> Peer = get_peer_proc(), - Peer ! {self(), waiting}, + Peer ! {node(), self(), waiting}, ?LOG_NOTICE("Migration function: waiting for signal from ~p...", [Peer]), receive proceed -> ok end, ?LOG_NOTICE("Migration function: unblocked!", []), @@ -290,7 +292,7 @@ mf_wait_and_count_runs_v2( props = FeatureProps, command = enable = Arg}) -> Peer = get_peer_proc(), - Peer ! {self(), waiting}, + Peer ! {node(), self(), waiting}, ?LOG_NOTICE("Migration function: waiting for signal from ~p...", [Peer]), receive proceed -> ok end, ?LOG_NOTICE("Migration function: unblocked!", []), @@ -590,7 +592,7 @@ enable_unsupported_feature_flag_in_a_3node_cluster(Config) -> || Node <- Nodes], ok. -enable_feature_flag_in_a_cluster_and_add_member_after(Config) -> +enable_feature_flag_in_cluster_and_add_member_after(Config) -> AllNodes = [NewNode | [FirstNode | _] = Nodes] = ?config(nodes, Config), connect_nodes(Nodes), override_running_nodes([NewNode]), @@ -700,7 +702,7 @@ enable_feature_flag_in_a_cluster_and_add_member_after(Config) -> || Node <- AllNodes], ok. -enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv1(Config) -> +enable_feature_flag_in_cluster_and_add_member_concurrently_mfv1(Config) -> AllNodes = [NewNode | [FirstNode | _] = Nodes] = ?config(nodes, Config), connect_nodes(Nodes), override_running_nodes([NewNode]), @@ -755,7 +757,9 @@ enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv1(Config) -> %% blocked and waits for a message from this process. Therefore, we are %% sure the feature flag is in the `state_changing' state and we can try %% to add a new node and sync its feature flags. - FirstNodeMigFunPid = receive {MigFunPid1, waiting} -> MigFunPid1 end, + FirstNodeMigFunPid = receive + {_Node, MigFunPid1, waiting} -> MigFunPid1 + end, %% Check compatibility between NewNodes and Nodes. This doesn't block. ok = run_on_node( @@ -862,10 +866,10 @@ enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv1(Config) -> %% which ran. ct:pal("Unblocking other nodes, including the joining one"), OtherMigratedNodes = [receive - {MigFunPid2, waiting} -> + {Node, MigFunPid2, waiting} -> MigFunPid2 ! proceed, - node(MigFunPid2) - end || _ <- ExpectedNodes -- [FirstMigratedNode]], + Node + end || Node <- ExpectedNodes -- [FirstMigratedNode]], MigratedNodes = [FirstMigratedNode | OtherMigratedNodes], ?assertEqual(lists:sort(ExpectedNodes), lists:sort(MigratedNodes)), @@ -894,7 +898,7 @@ enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv1(Config) -> || Node <- AllNodes], ok. -enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv2(Config) -> +enable_feature_flag_in_cluster_and_add_member_concurrently_mfv2(Config) -> AllNodes = [NewNode | [FirstNode | _] = Nodes] = ?config(nodes, Config), connect_nodes(Nodes), override_running_nodes([NewNode]), @@ -924,18 +928,25 @@ enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv2(Config) -> ct:pal( "Enabling the feature flag in the cluster (in a separate process)"), Peer = self(), + _ = [ok = + run_on_node( + Node, + fun() -> + %% The migration function uses the `Peer' PID (the process + %% executing the testcase) to notify its own PID and wait + %% for a signal from `Peer' to proceed and finish the + %% migration. + record_peer_proc(Peer), + ok + end, + []) + || Node <- AllNodes], Enabler = spawn_link( fun() -> ok = run_on_node( FirstNode, fun() -> - %% The migration function uses the `Peer' - %% PID (the process executing the testcase) - %% to notify its own PID and wait for a - %% signal from `Peer' to proceed and finish - %% the migration. - record_peer_proc(Peer), ?assertEqual( ok, rabbit_feature_flags:enable( @@ -944,25 +955,15 @@ enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv2(Config) -> end, []) end), - _ = [ok = - run_on_node( - Node, - fun() -> - %% We just need to record `Peer' on other nodes in the - %% cluster because the migration will run on them with - %% `feature_flags_v2'. - record_peer_proc(Peer), - ok - end, - []) - || Node <- tl(Nodes)], %% By waiting for the message from one of the migration function %% instances, we make sure the feature flags controller on `FirstNode' is %% blocked and waits for a message from this process. Therefore, we are %% sure the feature flag is in the `state_changing' state and we can try %% to add a new node and sync its feature flags. - FirstNodeMigFunPid = receive {MigFunPid1, waiting} -> MigFunPid1 end, + FirstNodeMigFunPid = receive + {_Node, MigFunPid1, waiting} -> MigFunPid1 + end, %% Check compatibility between NewNodes and Nodes. This doesn't block. ok = run_on_node( @@ -994,7 +995,6 @@ enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv2(Config) -> run_on_node( NewNode, fun() -> - record_peer_proc(Peer), ?assertEqual( ok, rabbit_feature_flags: @@ -1033,11 +1033,12 @@ enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv2(Config) -> SyncerMRef = erlang:monitor(process, Syncer), unlink(Enabler), unlink(Syncer), + %% The migration function runs on all clustered nodes with v2, including %% the one joining the cluster, thanks to the synchronization. %% %% When this testcase runs with feature flags v1, the feature flag we want - %% to enable uses the migration function API v2: this implicitely enables + %% to enable uses the migration function API v2: this implicitly enables %% `feature_flags_v2'. As part of the synchronization, the node still on %% feature flags v1 will try to sync `feature_flags_v2' specificaly first. %% After that, the controller-based sync proceeds. @@ -1055,10 +1056,10 @@ enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv2(Config) -> %% which ran. ct:pal("Unblocking other nodes, including the joining one"), OtherMigratedNodes = [receive - {MigFunPid2, waiting} -> + {Node, MigFunPid2, waiting} -> MigFunPid2 ! proceed, - node(MigFunPid2) - end || _ <- ExpectedNodes -- [FirstMigratedNode]], + Node + end || Node <- ExpectedNodes -- [FirstMigratedNode]], MigratedNodes = [FirstMigratedNode | OtherMigratedNodes], ?assertEqual(lists:sort(ExpectedNodes), lists:sort(MigratedNodes)), @@ -1097,7 +1098,7 @@ enable_feature_flag_in_a_cluster_and_add_member_concurrently_mfv2(Config) -> || Node <- AllNodes], ok. -enable_feature_flag_in_a_cluster_and_remove_member_concurrently(Config) -> +enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv1(Config) -> AllNodes = [LeavingNode | [FirstNode | _] = Nodes] = ?config( nodes, Config), connect_nodes(AllNodes), @@ -1110,8 +1111,6 @@ enable_feature_flag_in_a_cluster_and_remove_member_concurrently(Config) -> migration_fun => {?MODULE, mf_wait_and_count_runs}}}, inject_on_nodes(AllNodes, FeatureFlags), - UsingFFv1 = not ?config(enable_feature_flags_v2, Config), - ct:pal( "Checking the feature flag is supported but disabled on all nodes"), _ = [ok = @@ -1128,10 +1127,6 @@ enable_feature_flag_in_a_cluster_and_remove_member_concurrently(Config) -> ct:pal( "Enabling the feature flag in the cluster (in a separate process)"), Peer = self(), - ExpectedRet = case UsingFFv1 of - true -> ok; - false -> {error, {badrpc, nodedown}} - end, Enabler = spawn_link( fun() -> ok = @@ -1145,7 +1140,7 @@ enable_feature_flag_in_a_cluster_and_remove_member_concurrently(Config) -> %% the migration. record_peer_proc(Peer), ?assertEqual( - ExpectedRet, + ok, rabbit_feature_flags:enable( FeatureName)), ok @@ -1158,7 +1153,9 @@ enable_feature_flag_in_a_cluster_and_remove_member_concurrently(Config) -> %% blocked and waits for a message from this process. Therefore, we are %% sure the feature flag is in the `state_changing' state and we can try %% to add a new node and sync its feature flags. - FirstNodeMigFunPid = receive {MigFunPid1, waiting} -> MigFunPid1 end, + FirstNodeMigFunPid = receive + {_Node, MigFunPid1, waiting} -> MigFunPid1 + end, %% Remove node from cluster. stop_slave_node(LeavingNode), @@ -1189,9 +1186,134 @@ enable_feature_flag_in_a_cluster_and_remove_member_concurrently(Config) -> run_on_node( Node, fun() -> + ?assert(rabbit_feature_flags:is_enabled(FeatureName)), + ok + end, + []) + || Node <- Nodes], + ok. + +enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2(Config) -> + AllNodes = [LeavingNode | [FirstNode | _] = Nodes] = ?config( + nodes, Config), + connect_nodes(AllNodes), + override_running_nodes(AllNodes), + + FeatureName = ?FUNCTION_NAME, + FeatureFlags = #{FeatureName => + #{provided_by => ?MODULE, + stability => stable, + migration_fun => {?MODULE, + mf_wait_and_count_runs_v2}}}, + inject_on_nodes(AllNodes, FeatureFlags), + + UsingFFv1 = not ?config(enable_feature_flags_v2, Config), + + ct:pal( + "Checking the feature flag is supported but disabled on all nodes"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assert(rabbit_feature_flags:is_supported(FeatureName)), + ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)), + ok + end, + []) + || Node <- AllNodes], + + ct:pal( + "Enabling the feature flag in the cluster (in a separate process)"), + Peer = self(), + _ = [ok = + run_on_node( + Node, + fun() -> + %% The migration function uses the `Peer' PID (the process + %% executing the testcase) to notify its own PID and wait + %% for a signal from `Peer' to proceed and finish the + %% migration. + record_peer_proc(Peer), + ok + end, + []) + || Node <- AllNodes], + Enabler = spawn_link( + fun() -> + ok = + run_on_node( + FirstNode, + fun() -> + ?assertEqual( + {error, {badrpc, nodedown}}, + rabbit_feature_flags:enable( + FeatureName)), + ok + end, + []) + end), + + %% By waiting for the message from one of the migration function + %% instances, we make sure the feature flags controller on `FirstNode' is + %% blocked and waits for a message from this process. Therefore, we are + %% sure the feature flag is in the `state_changing' state and we can try + %% to add a new node and sync its feature flags. + FirstNodeMigFunPid = receive + {Node, MigFunPid1, waiting} -> MigFunPid1 + end, + + %% Remove node from cluster. + stop_slave_node(LeavingNode), + override_running_nodes(Nodes), + + %% Unblock the migration functions on `Nodes'. + EnablerMRef = erlang:monitor(process, Enabler), + unlink(Enabler), + + %% The migration function runs on all clustered nodes with v2. + %% + %% When this testcase runs with feature flags v1, the feature flag we want + %% to enable uses the migration function API v2: this implicitly enables + %% `feature_flags_v2'. As part of the synchronization, the node still on + %% feature flags v1 will try to sync `feature_flags_v2' specificaly first. + %% After that, the controller-based sync proceeds. + ExpectedNodes = Nodes, + + %% Unblock the migration function for which we already consumed the + %% `waiting' notification. + FirstMigratedNode = node(FirstNodeMigFunPid), + ct:pal( + "Unblocking first node (~p @ ~s)", + [FirstNodeMigFunPid, FirstMigratedNode]), + FirstNodeMigFunPid ! proceed, + + %% Unblock the rest and collect the node names of all migration functions + %% which ran. + ct:pal("Unblocking other nodes"), + OtherMigratedNodes = [receive + {Node, MigFunPid2, waiting} -> + MigFunPid2 ! proceed, + Node + end || Node <- ExpectedNodes -- [FirstMigratedNode]], + MigratedNodes = [FirstMigratedNode | OtherMigratedNodes], + ?assertEqual(lists:sort(ExpectedNodes), lists:sort(MigratedNodes)), + + ct:pal("Waiting for spawned processes to terminate"), + receive + {'DOWN', EnablerMRef, process, Enabler, EnablerReason} -> + ?assertEqual(normal, EnablerReason) + end, + + ct:pal( + "Checking the feature flag is enabled (v1) or disabled (v2) in the " + "cluster"), + _ = [ok = + run_on_node( + Node, + fun() -> case UsingFFv1 of true -> - ?assert( + ?assertNot( rabbit_feature_flags:is_enabled(FeatureName)); false -> ?assertNot( |