diff options
Diffstat (limited to 'test')
| -rw-r--r-- | test/consumer_timeout_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/dead_lettering_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/dynamic_qq_SUITE.erl | 100 | ||||
| -rw-r--r-- | test/publisher_confirms_parallel_SUITE.erl | 10 | ||||
| -rw-r--r-- | test/queue_length_limits_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/queue_parallel_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 214 | ||||
| -rw-r--r-- | test/quorum_queue_utils.erl | 24 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 280 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 25 | ||||
| -rw-r--r-- | test/rabbit_fifo_v0_SUITE.erl | 1392 | ||||
| -rw-r--r-- | test/rabbitmq_queues_cli_integration_SUITE.erl | 9 |
12 files changed, 1910 insertions, 152 deletions
diff --git a/test/consumer_timeout_SUITE.erl b/test/consumer_timeout_SUITE.erl index 6144eca20b..468714328d 100644 --- a/test/consumer_timeout_SUITE.erl +++ b/test/consumer_timeout_SUITE.erl @@ -78,7 +78,7 @@ init_per_group(mirrored_queue, Config) -> init_per_group(Group, Config0) -> case lists:member({group, Group}, all()) of true -> - ClusterSize = 2, + ClusterSize = 3, Config = rabbit_ct_helpers:merge_app_env( Config0, {rabbit, [{channel_tick_interval, 1000}, {quorum_tick_interval, 1000}, diff --git a/test/dead_lettering_SUITE.erl b/test/dead_lettering_SUITE.erl index d1196e79fc..87b5566c57 100644 --- a/test/dead_lettering_SUITE.erl +++ b/test/dead_lettering_SUITE.erl @@ -106,7 +106,7 @@ init_per_group(mirrored_queue, Config) -> init_per_group(Group, Config) -> case lists:member({group, Group}, all()) of true -> - ClusterSize = 2, + ClusterSize = 3, Config1 = rabbit_ct_helpers:set_config(Config, [ {rmq_nodename_suffix, Group}, {rmq_nodes_count, ClusterSize} diff --git a/test/dynamic_qq_SUITE.erl b/test/dynamic_qq_SUITE.erl index 0376b2b838..9a8f2110d6 100644 --- a/test/dynamic_qq_SUITE.erl +++ b/test/dynamic_qq_SUITE.erl @@ -24,15 +24,13 @@ all() -> groups() -> [ {clustered, [], [ - {cluster_size_2, [], [ + {cluster_size_3, [], [ + recover_follower_after_standalone_restart, vhost_deletion, force_delete_if_no_consensus, takeover_on_failure, takeover_on_shutdown, quorum_unaffected_after_vhost_failure - ]}, - {cluster_size_3, [], [ - recover_follower_after_standalone_restart ]} ]} ]. @@ -108,7 +106,7 @@ vhost_deletion(Config) -> ok. force_delete_if_no_consensus(Config) -> - [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), ACh = rabbit_ct_client_helpers:open_channel(Config, A), QName = ?config(queue_name, Config), Args = ?config(queue_args, Config), @@ -119,6 +117,7 @@ force_delete_if_no_consensus(Config) -> rabbit_ct_client_helpers:publish(ACh, QName, 10), ok = rabbit_ct_broker_helpers:restart_node(Config, B), ok = rabbit_ct_broker_helpers:stop_node(Config, A), + ok = rabbit_ct_broker_helpers:stop_node(Config, C), BCh = rabbit_ct_client_helpers:open_channel(Config, B), ?assertMatch( @@ -140,7 +139,7 @@ takeover_on_shutdown(Config) -> takeover_on(Config, stop_node). takeover_on(Config, Fun) -> - [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), ACh = rabbit_ct_client_helpers:open_channel(Config, A), QName = ?config(queue_name, Config), @@ -152,10 +151,11 @@ takeover_on(Config, Fun) -> rabbit_ct_client_helpers:publish(ACh, QName, 10), ok = rabbit_ct_broker_helpers:restart_node(Config, B), + ok = rabbit_ct_broker_helpers:Fun(Config, C), ok = rabbit_ct_broker_helpers:Fun(Config, A), BCh = rabbit_ct_client_helpers:open_channel(Config, B), - #'queue.declare_ok'{message_count = 0} = + #'queue.declare_ok'{} = amqp_channel:call( BCh, #'queue.declare'{queue = QName, arguments = Args, @@ -170,7 +170,7 @@ takeover_on(Config, Fun) -> ok. quorum_unaffected_after_vhost_failure(Config) -> - [A, B] = Servers0 = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [A, B, _] = Servers0 = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Servers = lists:sort(Servers0), ACh = rabbit_ct_client_helpers:open_channel(Config, A), @@ -197,46 +197,50 @@ quorum_unaffected_after_vhost_failure(Config) -> ?assertEqual(Servers, lists:sort(proplists:get_value(online, Info, []))). recover_follower_after_standalone_restart(Config) -> - %% Tests that followers can be brought up standalone after forgetting the rest - %% of the cluster. Consensus won't be reached as there is only one node in the - %% new cluster. - Servers = [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, A), - - QName = ?config(queue_name, Config), - Args = ?config(queue_args, Config), - amqp_channel:call(Ch, #'queue.declare'{queue = QName, - arguments = Args, - durable = true - }), - - rabbit_ct_client_helpers:publish(Ch, QName, 15), - rabbit_ct_client_helpers:close_channel(Ch), - - Name = ra_name(QName), - wait_for_messages_ready(Servers, Name, 15), - - rabbit_ct_broker_helpers:stop_node(Config, C), - rabbit_ct_broker_helpers:stop_node(Config, B), - rabbit_ct_broker_helpers:stop_node(Config, A), - - %% Restart one follower - forget_cluster_node(Config, B, C), - forget_cluster_node(Config, B, A), - - ok = rabbit_ct_broker_helpers:start_node(Config, B), - wait_for_messages_ready([B], Name, 15), - ok = rabbit_ct_broker_helpers:stop_node(Config, B), - - %% Restart the other - forget_cluster_node(Config, C, B), - forget_cluster_node(Config, C, A), - - ok = rabbit_ct_broker_helpers:start_node(Config, C), - wait_for_messages_ready([C], Name, 15), - ok = rabbit_ct_broker_helpers:stop_node(Config, C), - - ok. + case os:getenv("SECONDARY_UMBRELLA") of + false -> + %% Tests that followers can be brought up standalone after forgetting the + %% rest of the cluster. Consensus won't be reached as there is only one node in the + %% new cluster. + Servers = [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, A), + + QName = ?config(queue_name, Config), + Args = ?config(queue_args, Config), + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + arguments = Args, + durable = true + }), + + rabbit_ct_client_helpers:publish(Ch, QName, 15), + rabbit_ct_client_helpers:close_channel(Ch), + + Name = ra_name(QName), + wait_for_messages_ready(Servers, Name, 15), + + rabbit_ct_broker_helpers:stop_node(Config, C), + rabbit_ct_broker_helpers:stop_node(Config, B), + rabbit_ct_broker_helpers:stop_node(Config, A), + + %% Restart one follower + forget_cluster_node(Config, B, C), + forget_cluster_node(Config, B, A), + + ok = rabbit_ct_broker_helpers:start_node(Config, B), + wait_for_messages_ready([B], Name, 15), + ok = rabbit_ct_broker_helpers:stop_node(Config, B), + + %% Restart the other + forget_cluster_node(Config, C, B), + forget_cluster_node(Config, C, A), + + ok = rabbit_ct_broker_helpers:start_node(Config, C), + wait_for_messages_ready([C], Name, 15), + ok = rabbit_ct_broker_helpers:stop_node(Config, C), + ok; + _ -> + {skip, "cannot be run in mixed mode"} + end. %%---------------------------------------------------------------------------- forget_cluster_node(Config, Node, NodeToRemove) -> diff --git a/test/publisher_confirms_parallel_SUITE.erl b/test/publisher_confirms_parallel_SUITE.erl index 410dabb08a..c31527f0ba 100644 --- a/test/publisher_confirms_parallel_SUITE.erl +++ b/test/publisher_confirms_parallel_SUITE.erl @@ -82,7 +82,7 @@ init_per_group(mirrored_queue, Config) -> init_per_group(Group, Config) -> case lists:member({group, Group}, all()) of true -> - ClusterSize = 2, + ClusterSize = 3, Config1 = rabbit_ct_helpers:set_config(Config, [ {rmq_nodename_suffix, Group}, {rmq_nodes_count, ClusterSize} @@ -302,21 +302,23 @@ confirm_nack1(Config) -> %% The closest to a nack behaviour that we can get on quorum queues is not answering while %% the cluster is in minority. Once the cluster recovers, a 'basic.ack' will be issued. confirm_minority(Config) -> - [_A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [_A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), QName = ?config(queue_name, Config), declare_queue(Ch, Config, QName), ok = rabbit_ct_broker_helpers:stop_node(Config, B), + ok = rabbit_ct_broker_helpers:stop_node(Config, C), amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch, self()), publish(Ch, QName, [<<"msg1">>]), receive - #'basic.nack'{} -> throw(unexpected_nack); + #'basic.nack'{} -> ok; #'basic.ack'{} -> throw(unexpected_ack) - after 30000 -> + after 120000 -> ok end, ok = rabbit_ct_broker_helpers:start_node(Config, B), + publish(Ch, QName, [<<"msg2">>]), receive #'basic.nack'{} -> throw(unexpected_nack); #'basic.ack'{} -> ok diff --git a/test/queue_length_limits_SUITE.erl b/test/queue_length_limits_SUITE.erl index 20c7f46c86..f9b6f2f368 100644 --- a/test/queue_length_limits_SUITE.erl +++ b/test/queue_length_limits_SUITE.erl @@ -87,7 +87,7 @@ init_per_group(max_length_mirrored, Config) -> init_per_group(Group, Config) -> case lists:member({group, Group}, all()) of true -> - ClusterSize = 2, + ClusterSize = 3, Config1 = rabbit_ct_helpers:set_config(Config, [ {rmq_nodename_suffix, Group}, {rmq_nodes_count, ClusterSize} diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl index d13675bf58..c4d16a5900 100644 --- a/test/queue_parallel_SUITE.erl +++ b/test/queue_parallel_SUITE.erl @@ -125,7 +125,7 @@ init_per_group(mirrored_queue, Config) -> init_per_group(Group, Config0) -> case lists:member({group, Group}, all()) of true -> - ClusterSize = 2, + ClusterSize = 3, Config = rabbit_ct_helpers:merge_app_env( Config0, {rabbit, [{channel_tick_interval, 1000}, {quorum_tick_interval, 1000}]}), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index be3b46fbfb..cc54aae78e 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -16,7 +16,8 @@ wait_for_messages_total/3, wait_for_messages/2, dirty_query/3, - ra_name/1]). + ra_name/1, + is_mixed_versions/0]). -compile(export_all). @@ -113,6 +114,7 @@ all_tests() -> subscribe_redelivery_count, message_bytes_metrics, queue_length_limit_drop_head, + queue_length_limit_reject_publish, subscribe_redelivery_limit, subscribe_redelivery_policy, subscribe_redelivery_limit_with_dead_letter, @@ -128,7 +130,8 @@ all_tests() -> consumer_metrics, invalid_policy, delete_if_empty, - delete_if_unused + delete_if_unused, + queue_ttl ]. memory_tests() -> @@ -156,7 +159,12 @@ init_per_group(clustered, Config) -> init_per_group(unclustered, Config) -> rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]); init_per_group(clustered_with_partitions, Config) -> - rabbit_ct_helpers:set_config(Config, [{net_ticktime, 10}]); + case is_mixed_versions() of + true -> + {skip, "clustered_with_partitions is too unreliable in mixed mode"}; + false -> + rabbit_ct_helpers:set_config(Config, [{net_ticktime, 10}]) + end; init_per_group(Group, Config) -> ClusterSize = case Group of single_node -> 1; @@ -164,38 +172,44 @@ init_per_group(Group, Config) -> cluster_size_3 -> 3; cluster_size_5 -> 5 end, - Config1 = rabbit_ct_helpers:set_config(Config, - [{rmq_nodes_count, ClusterSize}, - {rmq_nodename_suffix, Group}, - {tcp_ports_base}]), - Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]), - Ret = rabbit_ct_helpers:run_steps(Config1b, - [fun merge_app_env/1 ] ++ - rabbit_ct_broker_helpers:setup_steps()), - case Ret of - {skip, _} -> - Ret; - Config2 -> - EnableFF = rabbit_ct_broker_helpers:enable_feature_flag( - Config2, quorum_queue), - case EnableFF of - ok -> - ok = rabbit_ct_broker_helpers:rpc( - Config2, 0, application, set_env, - [rabbit, channel_tick_interval, 100]), - %% HACK: the larger cluster sizes benefit for a bit - %% more time after clustering before running the - %% tests. - case Group of - cluster_size_5 -> - timer:sleep(5000), - Config2; - _ -> - Config2 - end; - Skip -> - end_per_group(Group, Config2), - Skip + IsMixed = not (false == os:getenv("SECONDARY_UMBRELLA")), + case ClusterSize of + 2 when IsMixed -> + {skip, "cluster size 2 isn't mixed versions compatible"}; + _ -> + Config1 = rabbit_ct_helpers:set_config(Config, + [{rmq_nodes_count, ClusterSize}, + {rmq_nodename_suffix, Group}, + {tcp_ports_base}]), + Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]), + Ret = rabbit_ct_helpers:run_steps(Config1b, + [fun merge_app_env/1 ] ++ + rabbit_ct_broker_helpers:setup_steps()), + case Ret of + {skip, _} -> + Ret; + Config2 -> + EnableFF = rabbit_ct_broker_helpers:enable_feature_flag( + Config2, quorum_queue), + case EnableFF of + ok -> + ok = rabbit_ct_broker_helpers:rpc( + Config2, 0, application, set_env, + [rabbit, channel_tick_interval, 100]), + %% HACK: the larger cluster sizes benefit for a bit + %% more time after clustering before running the + %% tests. + case Group of + cluster_size_5 -> + timer:sleep(5000), + Config2; + _ -> + Config2 + end; + Skip -> + end_per_group(Group, Config2), + Skip + end end end. @@ -327,11 +341,6 @@ declare_invalid_args(Config) -> {{shutdown, {server_initiated_close, 406, _}}, _}, declare(rabbit_ct_client_helpers:open_channel(Config, Server), LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-expires">>, long, 2000}])), - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, {<<"x-message-ttl">>, long, 2000}])), ?assertExit( @@ -345,7 +354,7 @@ declare_invalid_args(Config) -> declare(rabbit_ct_client_helpers:open_channel(Config, Server), LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, {<<"x-overflow">>, longstr, XOverflow}])) - || XOverflow <- [<<"reject-publish">>, <<"reject-publish-dlx">>]], + || XOverflow <- [<<"reject-publish-dlx">>]], ?assertExit( {{shutdown, {server_initiated_close, 406, _}}, _}, @@ -612,14 +621,16 @@ publish_confirm(Ch, QName) -> publish(Ch, QName), amqp_channel:register_confirm_handler(Ch, self()), ct:pal("waiting for confirms from ~s", [QName]), - ok = receive - #'basic.ack'{} -> ok; - #'basic.nack'{} -> fail - after 2500 -> - exit(confirm_timeout) - end, - ct:pal("CONFIRMED! ~s", [QName]), - ok. + receive + #'basic.ack'{} -> + ct:pal("CONFIRMED! ~s", [QName]), + ok; + #'basic.nack'{} -> + ct:pal("NOT CONFIRMED! ~s", [QName]), + fail + after 2500 -> + exit(confirm_timeout) + end. publish_and_restart(Config) -> %% Test the node restart with both types of queues (quorum and classic) to @@ -685,6 +696,14 @@ shrink_all(Config) -> ok. rebalance(Config) -> + case is_mixed_versions() of + true -> + {skip, "rebalance tests isn't mixed version compatible"}; + false -> + rebalance0(Config) + end. + +rebalance0(Config) -> [Server0, _, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1025,9 +1044,11 @@ recover_from_multiple_failures(Config) -> wait_for_messages_pending_ack(Servers, RaName, 0). publishing_to_unavailable_queue(Config) -> + %% publishing to an unavialable queue but with a reachable member should result + %% in the initial enqueuer session timing out and the message being nacked [Server, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - TCh = rabbit_ct_client_helpers:open_channel(Config, Server1), + TCh = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(TCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), @@ -1035,19 +1056,29 @@ publishing_to_unavailable_queue(Config) -> ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), ok = rabbit_ct_broker_helpers:stop_node(Config, Server2), + ct:pal("opening channel to ~w", [Server]), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch, self()), - publish_many(Ch, QQ, 300), - timer:sleep(1000), + publish_many(Ch, QQ, 1), + %% this should result in a nack + ok = receive + #'basic.ack'{} -> fail; + #'basic.nack'{} -> ok + after 90000 -> + exit(confirm_timeout) + end, ok = rabbit_ct_broker_helpers:start_node(Config, Server1), - %% check we get at least on ack + timer:sleep(2000), + publish_many(Ch, QQ, 1), + %% this should now be acked ok = receive #'basic.ack'{} -> ok; #'basic.nack'{} -> fail - after 30000 -> + after 90000 -> exit(confirm_timeout) end, + %% check we get at least on ack ok = rabbit_ct_broker_helpers:start_node(Config, Server2), ok. @@ -1087,6 +1118,14 @@ leadership_takeover(Config) -> wait_for_messages_pending_ack(Servers, RaName, 0). metrics_cleanup_on_leadership_takeover(Config) -> + case is_mixed_versions() of + true -> + {skip, "metrics_cleanup_on_leadership_takeover tests isn't mixed version compatible"}; + false -> + metrics_cleanup_on_leadership_takeover0(Config) + end. + +metrics_cleanup_on_leadership_takeover0(Config) -> %% Queue core metrics should be deleted from a node once the leadership is transferred %% to another follower [Server, _, _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1244,13 +1283,13 @@ simple_confirm_availability_on_leader_change(Config) -> %% open a channel to another node Ch = rabbit_ct_client_helpers:open_channel(Config, Node1), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), - publish_confirm(Ch, QQ), + ok = publish_confirm(Ch, QQ), %% stop the node hosting the leader ok = rabbit_ct_broker_helpers:stop_node(Config, Node2), %% this should not fail as the channel should detect the new leader and %% resend to that - publish_confirm(Ch, QQ), + ok = publish_confirm(Ch, QQ), ok = rabbit_ct_broker_helpers:start_node(Config, Node2), ok. @@ -1270,7 +1309,7 @@ confirm_availability_on_leader_change(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config, Node1), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), ConfirmLoop = fun Loop() -> - publish_confirm(Ch, QQ), + ok = publish_confirm(Ch, QQ), receive {done, P} -> P ! done, ok @@ -1462,7 +1501,16 @@ node_removal_is_not_quorum_critical(Config) -> Qs = rpc:call(Server, rabbit_quorum_queue, list_with_minimum_quorum, []), ?assertEqual([], Qs). + file_handle_reservations(Config) -> + case is_mixed_versions() of + true -> + {skip, "file_handle_reservations tests isn't mixed version compatible"}; + false -> + file_handle_reservations0(Config) + end. + +file_handle_reservations0(Config) -> Servers = [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), QQ = ?config(queue_name, Config), @@ -2020,6 +2068,35 @@ queue_length_limit_drop_head(Config) -> amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = true})). +queue_length_limit_reject_publish(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + RaName = ra_name(QQ), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-length">>, long, 1}, + {<<"x-overflow">>, longstr, <<"reject-publish">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + ok = publish_confirm(Ch, QQ), + ok = publish_confirm(Ch, QQ), + %% give the channel some time to process the async reject_publish notification + %% now that we are over the limit it should start failing + wait_for_messages_total(Servers, RaName, 2), + fail = publish_confirm(Ch, QQ), + %% remove all messages + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = _}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = _}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})), + %% publish should be allowed again now + ok = publish_confirm(Ch, QQ), + ok. + queue_length_in_memory_limit_basic_get(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -2381,6 +2458,29 @@ delete_if_unused(Config) -> amqp_channel:call(Ch, #'queue.delete'{queue = QQ, if_unused = true})). +queue_ttl(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-expires">>, long, 1000}])), + timer:sleep(5500), + %% check queue no longer exists + ?assertExit( + {{shutdown, + {server_initiated_close,404, + <<"NOT_FOUND - no queue 'queue_ttl' in vhost '/'">>}}, + _}, + amqp_channel:call(Ch, #'queue.declare'{queue = QQ, + passive = true, + durable = true, + auto_delete = false, + arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-expires">>, long, 1000}]})), + ok. + %%---------------------------------------------------------------------------- declare(Ch, Q) -> diff --git a/test/quorum_queue_utils.erl b/test/quorum_queue_utils.erl index 9c988bc066..4b65bc2737 100644 --- a/test/quorum_queue_utils.erl +++ b/test/quorum_queue_utils.erl @@ -8,7 +8,8 @@ wait_for_messages_total/3, wait_for_messages/2, dirty_query/3, - ra_name/1 + ra_name/1, + is_mixed_versions/0 ]). wait_for_messages_ready(Servers, QName, Ready) -> @@ -34,11 +35,19 @@ wait_for_messages(Servers, QName, Number, Fun, 0) -> wait_for_messages(Servers, QName, Number, Fun, N) -> Msgs = dirty_query(Servers, QName, Fun), ct:pal("Got messages ~p", [Msgs]), - case lists:all(fun(C) when is_integer(C) -> - C == Number; - (_) -> - false - end, Msgs) of + %% hack to allow the check to succeed in mixed versions clusters if at + %% least one node matches the criteria rather than all nodes for + F = case is_mixed_versions() of + true -> + any; + false -> + all + end, + case lists:F(fun(C) when is_integer(C) -> + C == Number; + (_) -> + false + end, Msgs) of true -> ok; _ -> @@ -88,3 +97,6 @@ filter_queues(Expected, Got) -> lists:filter(fun([K, _, _, _]) -> lists:member(K, Keys) end, Got). + +is_mixed_versions() -> + not (false == os:getenv("SECONDARY_UMBRELLA")). diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 9f2daac762..7778e04afb 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -59,19 +59,24 @@ end_per_testcase(_TestCase, _Config) -> ?ASSERT_EFF(EfxPat, true, Effects)). -define(ASSERT_EFF(EfxPat, Guard, Effects), - ?assert(lists:any(fun (EfxPat) when Guard -> true; - (_) -> false - end, Effects))). + ?assert(lists:any(fun (EfxPat) when Guard -> true; + (_) -> false + end, Effects))). -define(ASSERT_NO_EFF(EfxPat, Effects), - ?assert(not lists:any(fun (EfxPat) -> true; - (_) -> false - end, Effects))). + ?assert(not lists:any(fun (EfxPat) -> true; + (_) -> false + end, Effects))). + +-define(ASSERT_NO_EFF(EfxPat, Guard, Effects), + ?assert(not lists:any(fun (EfxPat) when Guard -> true; + (_) -> false + end, Effects))). -define(assertNoEffect(EfxPat, Effects), - ?assert(not lists:any(fun (EfxPat) -> true; - (_) -> false - end, Effects))). + ?assert(not lists:any(fun (EfxPat) -> true; + (_) -> false + end, Effects))). test_init(Name) -> init(#{name => Name, @@ -380,7 +385,7 @@ cancelled_checkout_out_test(_) -> {State1, _} = check_auto(Cid, 2, State0), % cancelled checkout should not return pending messages to queue {State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1), - ?assertEqual(1, maps:size(State2#rabbit_fifo.messages)), + ?assertEqual(1, lqueue:len(State2#rabbit_fifo.messages)), ?assertEqual(0, lqueue:len(State2#rabbit_fifo.returns)), {State3, {dequeue, empty}} = @@ -436,13 +441,13 @@ down_with_noconnection_returns_unack_test(_) -> Pid = spawn(fun() -> ok end), Cid = {<<"down_with_noconnect">>, Pid}, {State0, _} = enq(1, 1, second, test_init(test)), - ?assertEqual(1, maps:size(State0#rabbit_fifo.messages)), + ?assertEqual(1, lqueue:len(State0#rabbit_fifo.messages)), ?assertEqual(0, lqueue:len(State0#rabbit_fifo.returns)), {State1, {_, _}} = deq(2, Cid, unsettled, State0), - ?assertEqual(0, maps:size(State1#rabbit_fifo.messages)), + ?assertEqual(0, lqueue:len(State1#rabbit_fifo.messages)), ?assertEqual(0, lqueue:len(State1#rabbit_fifo.returns)), {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), - ?assertEqual(0, maps:size(State2a#rabbit_fifo.messages)), + ?assertEqual(0, lqueue:len(State2a#rabbit_fifo.messages)), ?assertEqual(1, lqueue:len(State2a#rabbit_fifo.returns)), ?assertMatch(#consumer{checked_out = Ch, status = suspected_down} @@ -539,7 +544,7 @@ duplicate_delivery_test(_) -> {#rabbit_fifo{ra_indexes = RaIdxs, messages = Messages}, _} = enq(2, 1, first, State0), ?assertEqual(1, rabbit_fifo_index:size(RaIdxs)), - ?assertEqual(1, maps:size(Messages)), + ?assertEqual(1, lqueue:len(Messages)), ok. state_enter_file_handle_leader_reservation_test(_) -> @@ -622,7 +627,7 @@ down_noproc_returns_checked_out_in_order_test(_) -> {FS, _} = enq(Num, Num, Num, FS0), FS end, S0, lists:seq(1, 100)), - ?assertEqual(100, maps:size(S1#rabbit_fifo.messages)), + ?assertEqual(100, lqueue:len(S1#rabbit_fifo.messages)), Cid = {<<"cid">>, self()}, {S2, _} = check(Cid, 101, 1000, S1), #consumer{checked_out = Checked} = maps:get(Cid, S2#rabbit_fifo.consumers), @@ -643,7 +648,7 @@ down_noconnection_returns_checked_out_test(_) -> {FS, _} = enq(Num, Num, Num, FS0), FS end, S0, lists:seq(1, NumMsgs)), - ?assertEqual(NumMsgs, maps:size(S1#rabbit_fifo.messages)), + ?assertEqual(NumMsgs, lqueue:len(S1#rabbit_fifo.messages)), Cid = {<<"cid">>, self()}, {S2, _} = check(Cid, 101, 1000, S1), #consumer{checked_out = Checked} = maps:get(Cid, S2#rabbit_fifo.consumers), @@ -797,7 +802,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> State1 = lists:foldl(AddConsumer, State0, Consumers), % the channel of the active consumer goes down - {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, noproc}, State1), + {State2, _, Effects} = apply(meta(2), {down, Pid1, noproc}, State1), % fell back to another consumer ?assertEqual(1, map_size(State2#rabbit_fifo.consumers)), % there are still waiting consumers @@ -810,7 +815,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> update_consumer_handler, _}, Effects), % the channel of the active consumer and a waiting consumer goes down - {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, noproc}, State2), + {State3, _, Effects2} = apply(meta(3), {down, Pid2, noproc}, State2), % fell back to another consumer ?assertEqual(1, map_size(State3#rabbit_fifo.consumers)), % no more waiting consumer @@ -824,7 +829,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> update_consumer_handler, _}, Effects2), % the last channel goes down - {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3), + {State4, _, Effects3} = apply(meta(4), {down, Pid3, doesnotmatter}, State3), % no more consumers ?assertEqual(0, map_size(State4#rabbit_fifo.consumers)), ?assertEqual(0, length(State4#rabbit_fifo.waiting_consumers)), @@ -1130,7 +1135,8 @@ active_flag_updated_when_consumer_suspected_unsuspected_test(_) -> State1 = lists:foldl(AddConsumer, State0, [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), - {State2, _, Effects2} = apply(#{index => 3}, {down, Pid1, noconnection}, State1), + {State2, _, Effects2} = apply(#{index => 3, + system_time => 1500}, {down, Pid1, noconnection}, State1), % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node ?assertEqual(4 + 1, length(Effects2)), @@ -1163,11 +1169,11 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), - {State2, _, Effects2} = apply(#{index => 2}, {down, Pid1, noconnection}, State1), + {State2, _, Effects2} = apply(meta(2), {down, Pid1, noconnection}, State1), % one monitor and one consumer status update (deactivated) ?assertEqual(3, length(Effects2)), - {_, _, Effects3} = apply(#{index => 3}, {nodeup, node(self())}, State2), + {_, _, Effects3} = apply(meta(3), {nodeup, node(self())}, State2), % for each consumer: 1 effect to monitor the consumer PID ?assertEqual(5, length(Effects3)). @@ -1258,6 +1264,99 @@ single_active_with_credited_test(_) -> State3#rabbit_fifo.waiting_consumers), ok. + +register_enqueuer_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + max_length => 2, + overflow_strategy => reject_publish}), + %% simply registering should be ok when we're below limit + Pid1 = test_util:fake_pid(node()), + {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0), + + {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1), + %% register another enqueuer shoudl be ok + Pid2 = test_util:fake_pid(node()), + {State3, ok, [_]} = apply(meta(3), make_register_enqueuer(Pid2), State2), + + {State4, ok, _} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 2, two), State3), + {State5, ok, Efx} = apply(meta(5), rabbit_fifo:make_enqueue(Pid1, 3, three), State4), + % ct:pal("Efx ~p", [Efx]), + %% validate all registered enqueuers are notified of overflow state + ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx), + ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid2, Efx), + + %% this time, registry should return reject_publish + {State6, reject_publish, [_]} = apply(meta(6), make_register_enqueuer( + test_util:fake_pid(node())), State5), + ?assertMatch(#{num_enqueuers := 3}, rabbit_fifo:overview(State6)), + + + %% remove two messages this should make the queue fall below the 0.8 limit + {State7, {dequeue, _, _}, _Efx7} = + apply(meta(7), + rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State6), + ct:pal("Efx7 ~p", [_Efx7]), + {State8, {dequeue, _, _}, Efx8} = + apply(meta(8), + rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State7), + ct:pal("Efx8 ~p", [Efx8]), + %% validate all registered enqueuers are notified of overflow state + ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid1, Efx8), + ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid2, Efx8), + {_State9, {dequeue, _, _}, Efx9} = + apply(meta(9), + rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State8), + ?ASSERT_NO_EFF({send_msg, P, go, [ra_event]}, P == Pid1, Efx9), + ?ASSERT_NO_EFF({send_msg, P, go, [ra_event]}, P == Pid2, Efx9), + ok. + +reject_publish_purge_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + max_length => 2, + overflow_strategy => reject_publish}), + %% simply registering should be ok when we're below limit + Pid1 = test_util:fake_pid(node()), + {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0), + {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1), + {State3, ok, _} = apply(meta(3), rabbit_fifo:make_enqueue(Pid1, 2, two), State2), + {State4, ok, Efx} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 3, three), State3), + % ct:pal("Efx ~p", [Efx]), + ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx), + {_State5, {purge, 3}, Efx1} = apply(meta(5), rabbit_fifo:make_purge(), State4), + ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid1, Efx1), + ok. + +reject_publish_applied_after_limit_test(_) -> + InitConf = #{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)) + }, + State0 = init(InitConf), + %% simply registering should be ok when we're below limit + Pid1 = test_util:fake_pid(node()), + {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0), + {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1), + {State3, ok, _} = apply(meta(3), rabbit_fifo:make_enqueue(Pid1, 2, two), State2), + {State4, ok, Efx} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 3, three), State3), + % ct:pal("Efx ~p", [Efx]), + ?ASSERT_NO_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx), + %% apply new config + Conf = #{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + max_length => 2, + overflow_strategy => reject_publish + }, + {State5, ok, Efx1} = apply(meta(5), rabbit_fifo:make_update_config(Conf), State4), + ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx1), + Pid2 = test_util:fake_pid(node()), + {_State6, reject_publish, _} = apply(meta(1), make_register_enqueuer(Pid2), State5), + ok. + purge_nodes_test(_) -> Node = purged@node, ThisNode = node(), @@ -1305,7 +1404,12 @@ purge_nodes_test(_) -> ok. meta(Idx) -> - #{index => Idx, term => 1, + meta(Idx, 0). + +meta(Idx, Timestamp) -> + #{index => Idx, + term => 1, + system_time => Timestamp, from => {make_ref(), self()}}. enq(Idx, MsgSeq, Msg, State) -> @@ -1386,9 +1490,139 @@ aux_test(_) -> ?assert(X > 0.0), ok. + +%% machine version conversion test + +machine_version_test(_) -> + V0 = rabbit_fifo_v0, + S0 = V0:init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>)}), + Idx = 1, + {#rabbit_fifo{}, ok, []} = apply(meta(Idx), {machine_version, 0, 1}, S0), + + Cid = {atom_to_binary(?FUNCTION_NAME, utf8), self()}, + Entries = [ + {1, rabbit_fifo_v0:make_enqueue(self(), 1, banana)}, + {2, rabbit_fifo_v0:make_enqueue(self(), 2, apple)}, + {3, rabbit_fifo_v0:make_checkout(Cid, {auto, 1, unsettled}, #{})} + ], + {S1, _Effects} = rabbit_fifo_v0_SUITE:run_log(S0, Entries), + Self = self(), + {#rabbit_fifo{enqueuers = #{Self := #enqueuer{}}, + messages = Msgs}, ok, []} = apply(meta(Idx), + {machine_version, 0, 1}, S1), + %% validate message conversion to lqueue + ?assertEqual(1, lqueue:len(Msgs)), + ok. + +queue_ttl_test(_) -> + QName = rabbit_misc:r(<<"/">>, queue, <<"test">>), + Conf = #{name => ?FUNCTION_NAME, + queue_resource => QName, + created => 1000, + expires => 1000}, + S0 = rabbit_fifo:init(Conf), + Now = 1500, + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S0), + %% this should delete the queue + [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}] + = rabbit_fifo:tick(Now + 1000, S0), + %% adding a consumer should not ever trigger deletion + Cid = {<<"cid1">>, self()}, + {S1, _} = check_auto(Cid, 1, S0), + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S1), + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S1), + %% cancelling the consumer should then + {S2, _, _} = apply(meta(2, Now), + rabbit_fifo:make_checkout(Cid, cancel, #{}), S1), + %% last_active should have been reset when consumer was cancelled + %% last_active = 2500 + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2), + %% but now it should be deleted + [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}] + = rabbit_fifo:tick(Now + 2500, S2), + + %% Same for downs + {S2D, _, _} = apply(meta(2, Now), + {down, self(), noconnection}, S1), + %% last_active should have been reset when consumer was cancelled + %% last_active = 2500 + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2D), + %% but now it should be deleted + [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}] + = rabbit_fifo:tick(Now + 2500, S2D), + + %% dequeue should set last applied + {S1Deq, {dequeue, empty}} = + apply(meta(2, Now), + rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), + S0), + + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S1Deq), + %% but now it should be deleted + [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}] + = rabbit_fifo:tick(Now + 2500, S1Deq), + %% Enqueue message, + {E1, _, _} = apply(meta(2, Now), + rabbit_fifo:make_enqueue(self(), 1, msg1), S0), + Deq = {<<"deq1">>, self()}, + {E2, {dequeue, {MsgId, _}, _}, _} = + apply(meta(3, Now), + rabbit_fifo:make_checkout(Deq, {dequeue, unsettled}, #{}), + E1), + {E3, _, _} = apply(meta(3, Now + 1000), + rabbit_fifo:make_settle(Deq, [MsgId]), E2), + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1500, E3), + %% but now it should be deleted + [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}] + = rabbit_fifo:tick(Now + 3000, E3), + + ok. + +queue_ttl_with_single_active_consumer_test(_) -> + QName = rabbit_misc:r(<<"/">>, queue, <<"test">>), + Conf = #{name => ?FUNCTION_NAME, + queue_resource => QName, + created => 1000, + expires => 1000, + single_active_consumer_on => true}, + S0 = rabbit_fifo:init(Conf), + Now = 1500, + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S0), + %% this should delete the queue + [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}] + = rabbit_fifo:tick(Now + 1000, S0), + %% adding a consumer should not ever trigger deletion + Cid = {<<"cid1">>, self()}, + {S1, _} = check_auto(Cid, 1, S0), + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S1), + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S1), + %% cancelling the consumer should then + {S2, _, _} = apply(meta(2, Now), + rabbit_fifo:make_checkout(Cid, cancel, #{}), S1), + %% last_active should have been reset when consumer was cancelled + %% last_active = 2500 + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2), + %% but now it should be deleted + [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}] + = rabbit_fifo:tick(Now + 2500, S2), + + %% Same for downs + {S2D, _, _} = apply(meta(2, Now), + {down, self(), noconnection}, S1), + %% last_active should have been reset when consumer was cancelled + %% last_active = 2500 + [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2D), + %% but now it should be deleted + [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}] + = rabbit_fifo:tick(Now + 2500, S2D), + + ok. + %% Utility init(Conf) -> rabbit_fifo:init(Conf). +make_register_enqueuer(Pid) -> rabbit_fifo:make_register_enqueuer(Pid). apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State). init_aux(Conf) -> rabbit_fifo:init_aux(Conf). handle_aux(S, T, C, A, L, M) -> rabbit_fifo:handle_aux(S, T, C, A, L, M). diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index 23522e71f9..859db2178f 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -479,15 +479,17 @@ test_run_log(_Config) -> snapshots(_Config) -> run_proper( fun () -> - ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength, - InMemoryBytes}, - frequency([{10, {0, 0, false, 0, 0, 0}}, + ?FORALL({Length, Bytes, SingleActiveConsumer, + DeliveryLimit, InMemoryLength, InMemoryBytes, + Overflow}, + frequency([{10, {0, 0, false, 0, 0, 0, drop_head}}, {5, {oneof([range(1, 10), undefined]), oneof([range(1, 1000), undefined]), boolean(), oneof([range(1, 3), undefined]), oneof([range(1, 10), undefined]), - oneof([range(1, 1000), undefined]) + oneof([range(1, 1000), undefined]), + oneof([drop_head, reject_publish]) }}]), begin Config = config(?FUNCTION_NAME, @@ -496,7 +498,8 @@ snapshots(_Config) -> SingleActiveConsumer, DeliveryLimit, InMemoryLength, - InMemoryBytes), + InMemoryBytes, + Overflow), ?FORALL(O, ?LET(Ops, log_gen(256), expand(Ops, Config)), collect({log_size, length(O)}, snapshots_prop(Config, O))) @@ -681,6 +684,11 @@ max_length(_Config) -> config(Name, Length, Bytes, SingleActive, DeliveryLimit, InMemoryLength, InMemoryBytes) -> +config(Name, Length, Bytes, SingleActive, DeliveryLimit, + InMemoryLength, InMemoryBytes, drop_head). + +config(Name, Length, Bytes, SingleActive, DeliveryLimit, + InMemoryLength, InMemoryBytes, Overflow) -> #{name => Name, max_length => map_max(Length), max_bytes => map_max(Bytes), @@ -688,7 +696,8 @@ config(Name, Length, Bytes, SingleActive, DeliveryLimit, single_active_consumer_on => SingleActive, delivery_limit => map_max(DeliveryLimit), max_in_memory_length => map_max(InMemoryLength), - max_in_memory_bytes => map_max(InMemoryBytes)}. + max_in_memory_bytes => map_max(InMemoryBytes), + overflow_strategy => Overflow}. map_max(0) -> undefined; map_max(N) -> N. @@ -1072,7 +1081,7 @@ do_apply(Cmd, #t{effects = Effs, %% down T; _ -> - {St, Effects} = case rabbit_fifo:apply(#{index => Index}, Cmd, S0) of + {St, Effects} = case rabbit_fifo:apply(meta(Index), Cmd, S0) of {S, _, E} when is_list(E) -> {S, E}; {S, _, E} -> @@ -1187,7 +1196,7 @@ test_init(Conf) -> rabbit_fifo:init(maps:merge(Default, Conf)). meta(Idx) -> - #{index => Idx, term => 1}. + #{index => Idx, term => 1, system_time => 0}. make_checkout(Cid, Spec) -> rabbit_fifo:make_checkout(Cid, Spec, #{}). diff --git a/test/rabbit_fifo_v0_SUITE.erl b/test/rabbit_fifo_v0_SUITE.erl new file mode 100644 index 0000000000..fcb84377de --- /dev/null +++ b/test/rabbit_fifo_v0_SUITE.erl @@ -0,0 +1,1392 @@ +-module(rabbit_fifo_v0_SUITE). + +%% rabbit_fifo unit tests suite + +-compile(export_all). + +-compile({no_auto_import, [apply/3]}). +-export([ + ]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("src/rabbit_fifo_v0.hrl"). + +%%%=================================================================== +%%% Common Test callbacks +%%%=================================================================== + +all() -> + [ + {group, tests} + ]. + + +%% replicate eunit like test resultion +all_tests() -> + [F || {F, _} <- ?MODULE:module_info(functions), + re:run(atom_to_list(F), "_test$") /= nomatch]. + +groups() -> + [ + {tests, [], all_tests()} + ]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + ok. + +%%%=================================================================== +%%% Test cases +%%%=================================================================== + +-define(ASSERT_EFF(EfxPat, Effects), + ?ASSERT_EFF(EfxPat, true, Effects)). + +-define(ASSERT_EFF(EfxPat, Guard, Effects), + ?assert(lists:any(fun (EfxPat) when Guard -> true; + (_) -> false + end, Effects))). + +-define(ASSERT_NO_EFF(EfxPat, Effects), + ?assert(not lists:any(fun (EfxPat) -> true; + (_) -> false + end, Effects))). + +-define(assertNoEffect(EfxPat, Effects), + ?assert(not lists:any(fun (EfxPat) -> true; + (_) -> false + end, Effects))). + +test_init(Name) -> + init(#{name => Name, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(Name, utf8)), + release_cursor_interval => 0}). + +enq_enq_checkout_test(_) -> + Cid = {<<"enq_enq_checkout_test">>, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + {_State3, _, Effects} = + apply(meta(3), + rabbit_fifo_v0:make_checkout(Cid, {once, 2, simple_prefetch}, #{}), + State2), + ?ASSERT_EFF({monitor, _, _}, Effects), + ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, Effects), + ok. + +credit_enq_enq_checkout_settled_credit_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + {State3, _, Effects} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {auto, 1, credited}, #{}), State2), + ?ASSERT_EFF({monitor, _, _}, Effects), + Deliveries = lists:filter(fun ({send_msg, _, {delivery, _, _}, _}) -> true; + (_) -> false + end, Effects), + ?assertEqual(1, length(Deliveries)), + %% settle the delivery this should _not_ result in further messages being + %% delivered + {State4, SettledEffects} = settle(Cid, 4, 1, State3), + ?assertEqual(false, lists:any(fun ({send_msg, _, {delivery, _, _}, _}) -> + true; + (_) -> false + end, SettledEffects)), + %% granting credit (3) should deliver the second msg if the receivers + %% delivery count is (1) + {State5, CreditEffects} = credit(Cid, 5, 1, 1, false, State4), + % ?debugFmt("CreditEffects ~p ~n~p", [CreditEffects, State4]), + ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, CreditEffects), + {_State6, FinalEffects} = enq(6, 3, third, State5), + ?assertEqual(false, lists:any(fun ({send_msg, _, {delivery, _, _}, _}) -> + true; + (_) -> false + end, FinalEffects)), + ok. + +credit_with_drained_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + State0 = test_init(test), + %% checkout with a single credit + {State1, _, _} = + apply(meta(1), rabbit_fifo_v0:make_checkout(Cid, {auto, 1, credited},#{}), + State0), + ?assertMatch(#?STATE{consumers = #{Cid := #consumer{credit = 1, + delivery_count = 0}}}, + State1), + {State, Result, _} = + apply(meta(3), rabbit_fifo_v0:make_credit(Cid, 0, 5, true), State1), + ?assertMatch(#?STATE{consumers = #{Cid := #consumer{credit = 0, + delivery_count = 5}}}, + State), + ?assertEqual({multi, [{send_credit_reply, 0}, + {send_drained, {?FUNCTION_NAME, 5}}]}, + Result), + ok. + +credit_and_drain_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + %% checkout without any initial credit (like AMQP 1.0 would) + {State3, _, CheckEffs} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {auto, 0, credited}, #{}), + State2), + + ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, CheckEffs), + {State4, {multi, [{send_credit_reply, 0}, + {send_drained, {?FUNCTION_NAME, 2}}]}, + Effects} = apply(meta(4), rabbit_fifo_v0:make_credit(Cid, 4, 0, true), State3), + ?assertMatch(#?STATE{consumers = #{Cid := #consumer{credit = 0, + delivery_count = 4}}}, + State4), + + ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}, + {_, {_, second}}]}, _}, Effects), + {_State5, EnqEffs} = enq(5, 2, third, State4), + ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, EnqEffs), + ok. + + + +enq_enq_deq_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + % get returns a reply value + NumReady = 1, + {_State3, {dequeue, {0, {_, first}}, NumReady}, [{monitor, _, _}]} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), + State2), + ok. + +enq_enq_deq_deq_settle_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + % get returns a reply value + {State3, {dequeue, {0, {_, first}}, 1}, [{monitor, _, _}]} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), + State2), + {_State4, {dequeue, empty}} = + apply(meta(4), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), + State3), + ok. + +enq_enq_checkout_get_settled_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + % get returns a reply value + {_State2, {dequeue, {0, {_, first}}, _}, _Effs} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), + State1), + ok. + +checkout_get_empty_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + State = test_init(test), + {_State2, {dequeue, empty}} = + apply(meta(1), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), State), + ok. + +untracked_enq_deq_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + State0 = test_init(test), + {State1, _, _} = apply(meta(1), + rabbit_fifo_v0:make_enqueue(undefined, undefined, first), + State0), + {_State2, {dequeue, {0, {_, first}}, _}, _} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State1), + ok. + +release_cursor_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + {State3, _} = check(Cid, 3, 10, State2), + % no release cursor effect at this point + {State4, _} = settle(Cid, 4, 1, State3), + {_Final, Effects1} = settle(Cid, 5, 0, State4), + % empty queue forwards release cursor all the way + ?ASSERT_EFF({release_cursor, 5, _}, Effects1), + ok. + +checkout_enq_settle_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, [{monitor, _, _} | _]} = check(Cid, 1, test_init(test)), + {State2, Effects0} = enq(2, 1, first, State1), + ?ASSERT_EFF({send_msg, _, + {delivery, ?FUNCTION_NAME, + [{0, {_, first}}]}, _}, + Effects0), + {State3, [_Inactive]} = enq(3, 2, second, State2), + {_, _Effects} = settle(Cid, 4, 0, State3), + % the release cursor is the smallest raft index that does not + % contribute to the state of the application + % ?ASSERT_EFF({release_cursor, 2, _}, Effects), + ok. + +out_of_order_enqueue_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), + {State2, Effects2} = enq(2, 1, first, State1), + ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2), + % assert monitor was set up + ?ASSERT_EFF({monitor, _, _}, Effects2), + % enqueue seq num 3 and 4 before 2 + {State3, Effects3} = enq(3, 3, third, State2), + ?assertNoEffect({send_msg, _, {delivery, _, _}, _}, Effects3), + {State4, Effects4} = enq(4, 4, fourth, State3), + % assert no further deliveries where made + ?assertNoEffect({send_msg, _, {delivery, _, _}, _}, Effects4), + {_State5, Effects5} = enq(5, 2, second, State4), + % assert two deliveries were now made + ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, second}}, + {_, {_, third}}, + {_, {_, fourth}}]}, _}, + Effects5), + ok. + +out_of_order_first_enqueue_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = check_n(Cid, 5, 5, test_init(test)), + {_State2, Effects2} = enq(2, 10, first, State1), + ?ASSERT_EFF({monitor, process, _}, Effects2), + ?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, + Effects2), + ok. + +duplicate_enqueue_test(_) -> + Cid = {<<"duplicate_enqueue_test">>, self()}, + {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), + {State2, Effects2} = enq(2, 1, first, State1), + ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2), + {_State3, Effects3} = enq(3, 1, first, State2), + ?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects3), + ok. + +return_test(_) -> + Cid = {<<"cid">>, self()}, + Cid2 = {<<"cid2">>, self()}, + {State0, _} = enq(1, 1, msg, test_init(test)), + {State1, _} = check_auto(Cid, 2, State0), + {State2, _} = check_auto(Cid2, 3, State1), + {State3, _, _} = apply(meta(4), rabbit_fifo_v0:make_return(Cid, [0]), State2), + ?assertMatch(#{Cid := #consumer{checked_out = C}} when map_size(C) == 0, + State3#?STATE.consumers), + ?assertMatch(#{Cid2 := #consumer{checked_out = C2}} when map_size(C2) == 1, + State3#?STATE.consumers), + ok. + +return_dequeue_delivery_limit_test(_) -> + Init = init(#{name => test, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(test, utf8)), + release_cursor_interval => 0, + delivery_limit => 1}), + {State0, _} = enq(1, 1, msg, Init), + + Cid = {<<"cid">>, self()}, + Cid2 = {<<"cid2">>, self()}, + + {State1, {MsgId1, _}} = deq(2, Cid, unsettled, State0), + {State2, _, _} = apply(meta(4), rabbit_fifo_v0:make_return(Cid, [MsgId1]), + State1), + + {State3, {MsgId2, _}} = deq(2, Cid2, unsettled, State2), + {State4, _, _} = apply(meta(4), rabbit_fifo_v0:make_return(Cid2, [MsgId2]), + State3), + ?assertMatch(#{num_messages := 0}, rabbit_fifo_v0:overview(State4)), + ok. + +return_non_existent_test(_) -> + Cid = {<<"cid">>, self()}, + {State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)), + % return non-existent + {_State2, _} = apply(meta(3), rabbit_fifo_v0:make_return(Cid, [99]), State0), + ok. + +return_checked_out_test(_) -> + Cid = {<<"cid">>, self()}, + {State0, [_, _]} = enq(1, 1, first, test_init(test)), + {State1, [_Monitor, + {send_msg, _, {delivery, _, [{MsgId, _}]}, _}, + {aux, active} | _ ]} = check_auto(Cid, 2, State0), + % returning immediately checks out the same message again + {_, ok, [{send_msg, _, {delivery, _, [{_, _}]}, _}, + {aux, active}]} = + apply(meta(3), rabbit_fifo_v0:make_return(Cid, [MsgId]), State1), + ok. + +return_checked_out_limit_test(_) -> + Cid = {<<"cid">>, self()}, + Init = init(#{name => test, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(test, utf8)), + release_cursor_interval => 0, + delivery_limit => 1}), + {State0, [_, _]} = enq(1, 1, first, Init), + {State1, [_Monitor, + {send_msg, _, {delivery, _, [{MsgId, _}]}, _}, + {aux, active} | _ ]} = check_auto(Cid, 2, State0), + % returning immediately checks out the same message again + {State2, ok, [{send_msg, _, {delivery, _, [{MsgId2, _}]}, _}, + {aux, active}]} = + apply(meta(3), rabbit_fifo_v0:make_return(Cid, [MsgId]), State1), + {#?STATE{ra_indexes = RaIdxs}, ok, [_ReleaseEff]} = + apply(meta(4), rabbit_fifo_v0:make_return(Cid, [MsgId2]), State2), + ?assertEqual(0, rabbit_fifo_index:size(RaIdxs)), + ok. + +return_auto_checked_out_test(_) -> + Cid = {<<"cid">>, self()}, + {State00, [_, _]} = enq(1, 1, first, test_init(test)), + {State0, [_]} = enq(2, 2, second, State00), + % it first active then inactive as the consumer took on but cannot take + % any more + {State1, [_Monitor, + {send_msg, _, {delivery, _, [{MsgId, _}]}, _}, + {aux, active}, + {aux, inactive} + ]} = check_auto(Cid, 2, State0), + % return should include another delivery + {_State2, _, Effects} = apply(meta(3), rabbit_fifo_v0:make_return(Cid, [MsgId]), State1), + ?ASSERT_EFF({send_msg, _, + {delivery, _, [{_, {#{delivery_count := 1}, first}}]}, _}, + Effects), + ok. + +cancelled_checkout_out_test(_) -> + Cid = {<<"cid">>, self()}, + {State00, [_, _]} = enq(1, 1, first, test_init(test)), + {State0, [_]} = enq(2, 2, second, State00), + {State1, _} = check_auto(Cid, 2, State0), + % cancelled checkout should not return pending messages to queue + {State2, _, _} = apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, cancel, #{}), State1), + ?assertEqual(1, maps:size(State2#?STATE.messages)), + ?assertEqual(0, lqueue:len(State2#?STATE.returns)), + + {State3, {dequeue, empty}} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State2), + %% settle + {State4, ok, _} = + apply(meta(4), rabbit_fifo_v0:make_settle(Cid, [0]), State3), + + {_State, {dequeue, {_, {_, second}}, _}, _} = + apply(meta(5), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State4), + ok. + +down_with_noproc_consumer_returns_unsettled_test(_) -> + Cid = {<<"down_consumer_returns_unsettled_test">>, self()}, + {State0, [_, _]} = enq(1, 1, second, test_init(test)), + {State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0), + {State2, _, _} = apply(meta(3), {down, Pid, noproc}, State1), + {_State, Effects} = check(Cid, 4, State2), + ?ASSERT_EFF({monitor, process, _}, Effects), + ok. + +down_with_noconnection_marks_suspect_and_node_is_monitored_test(_) -> + Pid = spawn(fun() -> ok end), + Cid = {<<"down_with_noconnect">>, Pid}, + Self = self(), + Node = node(Pid), + {State0, Effects0} = enq(1, 1, second, test_init(test)), + ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0), + {State1, Effects1} = check_auto(Cid, 2, State0), + #consumer{credit = 0} = maps:get(Cid, State1#?STATE.consumers), + ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1), + % monitor both enqueuer and consumer + % because we received a noconnection we now need to monitor the node + {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), + #consumer{credit = 1, + checked_out = Ch, + status = suspected_down} = maps:get(Cid, State2a#?STATE.consumers), + ?assertEqual(#{}, Ch), + %% validate consumer has credit + {State2, _, Effects2} = apply(meta(3), {down, Self, noconnection}, State2a), + ?ASSERT_EFF({monitor, node, _}, Effects2), + ?assertNoEffect({demonitor, process, _}, Effects2), + % when the node comes up we need to retry the process monitors for the + % disconnected processes + {State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2), + #consumer{status = up} = maps:get(Cid, State3#?STATE.consumers), + % try to re-monitor the suspect processes + ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects3), + ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3), + ok. + +down_with_noconnection_returns_unack_test(_) -> + Pid = spawn(fun() -> ok end), + Cid = {<<"down_with_noconnect">>, Pid}, + {State0, _} = enq(1, 1, second, test_init(test)), + ?assertEqual(1, maps:size(State0#?STATE.messages)), + ?assertEqual(0, lqueue:len(State0#?STATE.returns)), + {State1, {_, _}} = deq(2, Cid, unsettled, State0), + ?assertEqual(0, maps:size(State1#?STATE.messages)), + ?assertEqual(0, lqueue:len(State1#?STATE.returns)), + {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), + ?assertEqual(0, maps:size(State2a#?STATE.messages)), + ?assertEqual(1, lqueue:len(State2a#?STATE.returns)), + ?assertMatch(#consumer{checked_out = Ch, + status = suspected_down} + when map_size(Ch) == 0, + maps:get(Cid, State2a#?STATE.consumers)), + ok. + +down_with_noproc_enqueuer_is_cleaned_up_test(_) -> + State00 = test_init(test), + Pid = spawn(fun() -> ok end), + {State0, _, Effects0} = apply(meta(1), rabbit_fifo_v0:make_enqueue(Pid, 1, first), State00), + ?ASSERT_EFF({monitor, process, _}, Effects0), + {State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0), + % ensure there are no enqueuers + ?assert(0 =:= maps:size(State1#?STATE.enqueuers)), + ok. + +discarded_message_without_dead_letter_handler_is_removed_test(_) -> + Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()}, + {State0, [_, _]} = enq(1, 1, first, test_init(test)), + {State1, Effects1} = check_n(Cid, 2, 10, State0), + ?ASSERT_EFF({send_msg, _, + {delivery, _, [{0, {_, first}}]}, _}, + Effects1), + {_State2, _, Effects2} = apply(meta(1), + rabbit_fifo_v0:make_discard(Cid, [0]), State1), + ?assertNoEffect({send_msg, _, + {delivery, _, [{0, {_, first}}]}, _}, + Effects2), + ok. + +discarded_message_with_dead_letter_handler_emits_log_effect_test(_) -> + Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()}, + State00 = init(#{name => test, + queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), + dead_letter_handler => + {somemod, somefun, [somearg]}}), + {State0, [_, _]} = enq(1, 1, first, State00), + {State1, Effects1} = check_n(Cid, 2, 10, State0), + ?ASSERT_EFF({send_msg, _, + {delivery, _, [{0, {_, first}}]}, _}, + Effects1), + {_State2, _, Effects2} = apply(meta(1), rabbit_fifo_v0:make_discard(Cid, [0]), State1), + % assert mod call effect with appended reason and message + ?ASSERT_EFF({log, _RaftIdxs, _}, Effects2), + ok. + +tick_test(_) -> + Cid = {<<"c">>, self()}, + Cid2 = {<<"c2">>, self()}, + {S0, _} = enq(1, 1, <<"fst">>, test_init(?FUNCTION_NAME)), + {S1, _} = enq(2, 2, <<"snd">>, S0), + {S2, {MsgId, _}} = deq(3, Cid, unsettled, S1), + {S3, {_, _}} = deq(4, Cid2, unsettled, S2), + {S4, _, _} = apply(meta(5), rabbit_fifo_v0:make_return(Cid, [MsgId]), S3), + + [{mod_call, rabbit_quorum_queue, handle_tick, + [#resource{}, + {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3}, + [_Node] + ]}] = rabbit_fifo_v0:tick(1, S4), + ok. + + +delivery_query_returns_deliveries_test(_) -> + Tag = atom_to_binary(?FUNCTION_NAME, utf8), + Cid = {Tag, self()}, + Commands = [ + rabbit_fifo_v0:make_checkout(Cid, {auto, 5, simple_prefetch}, #{}), + rabbit_fifo_v0:make_enqueue(self(), 1, one), + rabbit_fifo_v0:make_enqueue(self(), 2, two), + rabbit_fifo_v0:make_enqueue(self(), 3, tre), + rabbit_fifo_v0:make_enqueue(self(), 4, for) + ], + Indexes = lists:seq(1, length(Commands)), + Entries = lists:zip(Indexes, Commands), + {State, _Effects} = run_log(test_init(help), Entries), + % 3 deliveries are returned + [{0, {_, one}}] = rabbit_fifo_v0:get_checked_out(Cid, 0, 0, State), + [_, _, _] = rabbit_fifo_v0:get_checked_out(Cid, 1, 3, State), + ok. + +pending_enqueue_is_enqueued_on_down_test(_) -> + Cid = {<<"cid">>, self()}, + Pid = self(), + {State0, _} = enq(1, 2, first, test_init(test)), + {State1, _, _} = apply(meta(2), {down, Pid, noproc}, State0), + {_State2, {dequeue, {0, {_, first}}, 0}, _} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State1), + ok. + +duplicate_delivery_test(_) -> + {State0, _} = enq(1, 1, first, test_init(test)), + {#?STATE{ra_indexes = RaIdxs, + messages = Messages}, _} = enq(2, 1, first, State0), + ?assertEqual(1, rabbit_fifo_index:size(RaIdxs)), + ?assertEqual(1, maps:size(Messages)), + ok. + +state_enter_file_handle_leader_reservation_test(_) -> + S0 = init(#{name => the_name, + queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), + become_leader_handler => {m, f, [a]}}), + + Resource = {resource, <<"/">>, queue, <<"test">>}, + Effects = rabbit_fifo_v0:state_enter(leader, S0), + ?assertEqual([ + {mod_call, m, f, [a, the_name]}, + {mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]} + ], Effects), + ok. + +state_enter_file_handle_other_reservation_test(_) -> + S0 = init(#{name => the_name, + queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>)}), + Effects = rabbit_fifo_v0:state_enter(other, S0), + ?assertEqual([ + {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []} + ], + Effects), + ok. + +state_enter_monitors_and_notifications_test(_) -> + Oth = spawn(fun () -> ok end), + {State0, _} = enq(1, 1, first, test_init(test)), + Cid = {<<"adf">>, self()}, + OthCid = {<<"oth">>, Oth}, + {State1, _} = check(Cid, 2, State0), + {State, _} = check(OthCid, 3, State1), + Self = self(), + Effects = rabbit_fifo_v0:state_enter(leader, State), + + %% monitor all enqueuers and consumers + [{monitor, process, Self}, + {monitor, process, Oth}] = + lists:filter(fun ({monitor, process, _}) -> true; + (_) -> false + end, Effects), + [{send_msg, Self, leader_change, ra_event}, + {send_msg, Oth, leader_change, ra_event}] = + lists:filter(fun ({send_msg, _, leader_change, ra_event}) -> true; + (_) -> false + end, Effects), + ?ASSERT_EFF({monitor, process, _}, Effects), + ok. + +purge_test(_) -> + Cid = {<<"purge_test">>, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, {purge, 1}, _} = apply(meta(2), rabbit_fifo_v0:make_purge(), State1), + {State3, _} = enq(3, 2, second, State2), + % get returns a reply value + {_State4, {dequeue, {0, {_, second}}, _}, [{monitor, _, _}]} = + apply(meta(4), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), State3), + ok. + +purge_with_checkout_test(_) -> + Cid = {<<"purge_test">>, self()}, + {State0, _} = check_auto(Cid, 1, test_init(?FUNCTION_NAME)), + {State1, _} = enq(2, 1, <<"first">>, State0), + {State2, _} = enq(3, 2, <<"second">>, State1), + %% assert message bytes are non zero + ?assert(State2#?STATE.msg_bytes_checkout > 0), + ?assert(State2#?STATE.msg_bytes_enqueue > 0), + {State3, {purge, 1}, _} = apply(meta(2), rabbit_fifo_v0:make_purge(), State2), + ?assert(State2#?STATE.msg_bytes_checkout > 0), + ?assertEqual(0, State3#?STATE.msg_bytes_enqueue), + ?assertEqual(1, rabbit_fifo_index:size(State3#?STATE.ra_indexes)), + #consumer{checked_out = Checked} = maps:get(Cid, State3#?STATE.consumers), + ?assertEqual(1, maps:size(Checked)), + ok. + +down_noproc_returns_checked_out_in_order_test(_) -> + S0 = test_init(?FUNCTION_NAME), + %% enqueue 100 + S1 = lists:foldl(fun (Num, FS0) -> + {FS, _} = enq(Num, Num, Num, FS0), + FS + end, S0, lists:seq(1, 100)), + ?assertEqual(100, maps:size(S1#?STATE.messages)), + Cid = {<<"cid">>, self()}, + {S2, _} = check(Cid, 101, 1000, S1), + #consumer{checked_out = Checked} = maps:get(Cid, S2#?STATE.consumers), + ?assertEqual(100, maps:size(Checked)), + %% simulate down + {S, _, _} = apply(meta(102), {down, self(), noproc}, S2), + Returns = lqueue:to_list(S#?STATE.returns), + ?assertEqual(100, length(Returns)), + ?assertEqual(0, maps:size(S#?STATE.consumers)), + %% validate returns are in order + ?assertEqual(lists:sort(Returns), Returns), + ok. + +down_noconnection_returns_checked_out_test(_) -> + S0 = test_init(?FUNCTION_NAME), + NumMsgs = 20, + S1 = lists:foldl(fun (Num, FS0) -> + {FS, _} = enq(Num, Num, Num, FS0), + FS + end, S0, lists:seq(1, NumMsgs)), + ?assertEqual(NumMsgs, maps:size(S1#?STATE.messages)), + Cid = {<<"cid">>, self()}, + {S2, _} = check(Cid, 101, 1000, S1), + #consumer{checked_out = Checked} = maps:get(Cid, S2#?STATE.consumers), + ?assertEqual(NumMsgs, maps:size(Checked)), + %% simulate down + {S, _, _} = apply(meta(102), {down, self(), noconnection}, S2), + Returns = lqueue:to_list(S#?STATE.returns), + ?assertEqual(NumMsgs, length(Returns)), + ?assertMatch(#consumer{checked_out = Ch} + when map_size(Ch) == 0, + maps:get(Cid, S#?STATE.consumers)), + %% validate returns are in order + ?assertEqual(lists:sort(Returns), Returns), + ok. + +single_active_consumer_basic_get_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + ?assertEqual(single_active, State0#?STATE.cfg#cfg.consumer_strategy), + ?assertEqual(0, map_size(State0#?STATE.consumers)), + {State1, _} = enq(1, 1, first, State0), + {_State, {error, unsupported}} = + apply(meta(2), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), + State1), + ok. + +single_active_consumer_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + ?assertEqual(single_active, State0#?STATE.cfg#cfg.consumer_strategy), + ?assertEqual(0, map_size(State0#?STATE.consumers)), + + % adding some consumers + AddConsumer = fun(CTag, State) -> + {NewState, _, _} = apply( + meta(1), + make_checkout({CTag, self()}, + {once, 1, simple_prefetch}, + #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + C1 = {<<"ctag1">>, self()}, + C2 = {<<"ctag2">>, self()}, + C3 = {<<"ctag3">>, self()}, + C4 = {<<"ctag4">>, self()}, + + % the first registered consumer is the active one, the others are waiting + ?assertEqual(1, map_size(State1#?STATE.consumers)), + ?assertMatch(#{C1 := _}, State1#?STATE.consumers), + ?assertEqual(3, length(State1#?STATE.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C2, 1, State1#?STATE.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C3, 1, State1#?STATE.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C4, 1, State1#?STATE.waiting_consumers)), + + % cancelling a waiting consumer + {State2, _, Effects1} = apply(meta(2), + make_checkout(C3, cancel, #{}), + State1), + % the active consumer should still be in place + ?assertEqual(1, map_size(State2#?STATE.consumers)), + ?assertMatch(#{C1 := _}, State2#?STATE.consumers), + % the cancelled consumer has been removed from waiting consumers + ?assertEqual(2, length(State2#?STATE.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C2, 1, State2#?STATE.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C4, 1, State2#?STATE.waiting_consumers)), + % there are some effects to unregister the consumer + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C3, Effects1), + + % cancelling the active consumer + {State3, _, Effects2} = apply(meta(3), + make_checkout(C1, cancel, #{}), + State2), + % the second registered consumer is now the active one + ?assertEqual(1, map_size(State3#?STATE.consumers)), + ?assertMatch(#{C2 := _}, State3#?STATE.consumers), + % the new active consumer is no longer in the waiting list + ?assertEqual(1, length(State3#?STATE.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C4, 1, + State3#?STATE.waiting_consumers)), + %% should have a cancel consumer handler mod_call effect and + %% an active new consumer effect + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C1, Effects2), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects2), + + % cancelling the active consumer + {State4, _, Effects3} = apply(meta(4), + make_checkout(C2, cancel, #{}), + State3), + % the last waiting consumer became the active one + ?assertEqual(1, map_size(State4#?STATE.consumers)), + ?assertMatch(#{C4 := _}, State4#?STATE.consumers), + % the waiting consumer list is now empty + ?assertEqual(0, length(State4#?STATE.waiting_consumers)), + % there are some effects to unregister the consumer and + % to update the new active one (metrics) + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C2, Effects3), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects3), + + % cancelling the last consumer + {State5, _, Effects4} = apply(meta(5), + make_checkout(C4, cancel, #{}), + State4), + % no active consumer anymore + ?assertEqual(0, map_size(State5#?STATE.consumers)), + % still nothing in the waiting list + ?assertEqual(0, length(State5#?STATE.waiting_consumers)), + % there is an effect to unregister the consumer + queue inactive effect + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, _}, Effects4), + + ok. + +single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + [C1, C2, C3, C4] = Consumers = + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, + {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}], + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + #{index => 1}, + make_checkout({CTag, ChannelId}, {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, Consumers), + + % the channel of the active consumer goes down + {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, noproc}, State1), + % fell back to another consumer + ?assertEqual(1, map_size(State2#?STATE.consumers)), + % there are still waiting consumers + ?assertEqual(2, length(State2#?STATE.waiting_consumers)), + % effects to unregister the consumer and + % to update the new active one (metrics) are there + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C1, Effects), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects), + + % the channel of the active consumer and a waiting consumer goes down + {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, noproc}, State2), + % fell back to another consumer + ?assertEqual(1, map_size(State3#?STATE.consumers)), + % no more waiting consumer + ?assertEqual(0, length(State3#?STATE.waiting_consumers)), + % effects to cancel both consumers of this channel + effect to update the new active one (metrics) + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C2, Effects2), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C3, Effects2), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects2), + + % the last channel goes down + {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3), + % no more consumers + ?assertEqual(0, map_size(State4#?STATE.consumers)), + ?assertEqual(0, length(State4#?STATE.waiting_consumers)), + % there is an effect to unregister the consumer + queue inactive effect + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C4, Effects3), + + ok. + +single_active_returns_messages_on_noconnection_test(_) -> + R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => R, + release_cursor_interval => 0, + single_active_consumer_on => true}), + Meta = #{index => 1}, + Nodes = [n1], + ConsumerIds = [{_, DownPid}] = + [begin + B = atom_to_binary(N, utf8), + {<<"ctag_", B/binary>>, + test_util:fake_pid(N)} + end || N <- Nodes], + % adding some consumers + State1 = lists:foldl( + fun(CId, Acc0) -> + {Acc, _, _} = + apply(Meta, + make_checkout(CId, + {once, 1, simple_prefetch}, #{}), + Acc0), + Acc + end, State0, ConsumerIds), + {State2, _} = enq(4, 1, msg1, State1), + % simulate node goes down + {State3, _, _} = apply(meta(5), {down, DownPid, noconnection}, State2), + %% assert the consumer is up + ?assertMatch([_], lqueue:to_list(State3#?STATE.returns)), + ?assertMatch([{_, #consumer{checked_out = Checked}}] + when map_size(Checked) == 0, + State3#?STATE.waiting_consumers), + + ok. + +single_active_consumer_replaces_consumer_when_down_noconnection_test(_) -> + R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => R, + release_cursor_interval => 0, + single_active_consumer_on => true}), + Meta = #{index => 1}, + Nodes = [n1, n2, node()], + ConsumerIds = [C1 = {_, DownPid}, C2, _C3] = + [begin + B = atom_to_binary(N, utf8), + {<<"ctag_", B/binary>>, + test_util:fake_pid(N)} + end || N <- Nodes], + % adding some consumers + State1a = lists:foldl( + fun(CId, Acc0) -> + {Acc, _, _} = + apply(Meta, + make_checkout(CId, + {once, 1, simple_prefetch}, #{}), + Acc0), + Acc + end, State0, ConsumerIds), + + %% assert the consumer is up + ?assertMatch(#{C1 := #consumer{status = up}}, + State1a#?STATE.consumers), + + {State1, _} = enq(10, 1, msg, State1a), + + % simulate node goes down + {State2, _, _} = apply(meta(5), {down, DownPid, noconnection}, State1), + + %% assert a new consumer is in place and it is up + ?assertMatch([{C2, #consumer{status = up, + checked_out = Ch}}] + when map_size(Ch) == 1, + maps:to_list(State2#?STATE.consumers)), + + %% the disconnected consumer has been returned to waiting + ?assert(lists:any(fun ({C,_}) -> C =:= C1 end, + State2#?STATE.waiting_consumers)), + ?assertEqual(2, length(State2#?STATE.waiting_consumers)), + + % simulate node comes back up + {State3, _, _} = apply(#{index => 2}, {nodeup, node(DownPid)}, State2), + + %% the consumer is still active and the same as before + ?assertMatch([{C2, #consumer{status = up}}], + maps:to_list(State3#?STATE.consumers)), + % the waiting consumers should be un-suspected + ?assertEqual(2, length(State3#?STATE.waiting_consumers)), + lists:foreach(fun({_, #consumer{status = Status}}) -> + ?assert(Status /= suspected_down) + end, State3#?STATE.waiting_consumers), + ok. + +single_active_consumer_all_disconnected_test(_) -> + R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => R, + release_cursor_interval => 0, + single_active_consumer_on => true}), + Meta = #{index => 1}, + Nodes = [n1, n2], + ConsumerIds = [C1 = {_, C1Pid}, C2 = {_, C2Pid}] = + [begin + B = atom_to_binary(N, utf8), + {<<"ctag_", B/binary>>, + test_util:fake_pid(N)} + end || N <- Nodes], + % adding some consumers + State1 = lists:foldl( + fun(CId, Acc0) -> + {Acc, _, _} = + apply(Meta, + make_checkout(CId, + {once, 1, simple_prefetch}, #{}), + Acc0), + Acc + end, State0, ConsumerIds), + + %% assert the consumer is up + ?assertMatch(#{C1 := #consumer{status = up}}, State1#?STATE.consumers), + + % simulate node goes down + {State2, _, _} = apply(meta(5), {down, C1Pid, noconnection}, State1), + %% assert the consumer fails over to the consumer on n2 + ?assertMatch(#{C2 := #consumer{status = up}}, State2#?STATE.consumers), + {State3, _, _} = apply(meta(6), {down, C2Pid, noconnection}, State2), + %% assert these no active consumer after both nodes are maked as down + ?assertMatch([], maps:to_list(State3#?STATE.consumers)), + %% n2 comes back + {State4, _, _} = apply(meta(7), {nodeup, node(C2Pid)}, State3), + %% ensure n2 is the active consumer as this node as been registered + %% as up again + ?assertMatch([{{<<"ctag_n2">>, _}, #consumer{status = up, + credit = 1}}], + maps:to_list(State4#?STATE.consumers)), + ok. + +single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => + rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + Meta = #{index => 1}, + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + Meta, + make_checkout({CTag, ChannelId}, + {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + Effects = rabbit_fifo_v0:state_enter(leader, State1), + %% 2 effects for each consumer process (channel process), 1 effect for the node, + %% 1 effect for file handle reservation + ?assertEqual(2 * 3 + 1 + 1, length(Effects)). + +single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) -> + Resource = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => Resource, + release_cursor_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + Meta = #{index => 1}, + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + Meta, + make_checkout({CTag, ChannelId}, + {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, + {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + Effects = rabbit_fifo_v0:state_enter(eol, State1), + %% 1 effect for each consumer process (channel process), + %% 1 effect for file handle reservation + ?assertEqual(4, length(Effects)). + +query_consumers_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => false}), + + % adding some consumers + AddConsumer = fun(CTag, State) -> + {NewState, _, _} = apply( + #{index => 1}, + make_checkout({CTag, self()}, + {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + Consumers0 = State1#?STATE.consumers, + Consumer = maps:get({<<"ctag2">>, self()}, Consumers0), + Consumers1 = maps:put({<<"ctag2">>, self()}, + Consumer#consumer{status = suspected_down}, Consumers0), + State2 = State1#?STATE{consumers = Consumers1}, + + ?assertEqual(4, rabbit_fifo_v0:query_consumer_count(State2)), + Consumers2 = rabbit_fifo_v0:query_consumers(State2), + ?assertEqual(4, maps:size(Consumers2)), + maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) -> + ?assertEqual(self(), Pid), + case Tag of + <<"ctag2">> -> + ?assertNot(Active), + ?assertEqual(suspected_down, ActivityStatus); + _ -> + ?assert(Active), + ?assertEqual(up, ActivityStatus) + end + end, [], Consumers2). + +query_consumers_when_single_active_consumer_is_on_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + Meta = #{index => 1}, + % adding some consumers + AddConsumer = fun(CTag, State) -> + {NewState, _, _} = apply( + Meta, + make_checkout({CTag, self()}, + {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + + ?assertEqual(4, rabbit_fifo_v0:query_consumer_count(State1)), + Consumers = rabbit_fifo_v0:query_consumers(State1), + ?assertEqual(4, maps:size(Consumers)), + maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) -> + ?assertEqual(self(), Pid), + case Tag of + <<"ctag1">> -> + ?assert(Active), + ?assertEqual(single_active, ActivityStatus); + _ -> + ?assertNot(Active), + ?assertEqual(waiting, ActivityStatus) + end + end, [], Consumers). + +active_flag_updated_when_consumer_suspected_unsuspected_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => false}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = + apply( + #{index => 1}, + rabbit_fifo_v0:make_checkout({CTag, ChannelId}, + {once, 1, simple_prefetch}, + #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + {State2, _, Effects2} = apply(#{index => 3}, {down, Pid1, noconnection}, State1), + % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node + ?assertEqual(4 + 1, length(Effects2)), + + {_, _, Effects3} = apply(#{index => 4}, {nodeup, node(self())}, State2), + % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID + ?assertEqual(4 + 4, length(Effects3)). + +active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + #{index => 1}, + make_checkout({CTag, ChannelId}, + {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, + {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + {State2, _, Effects2} = apply(#{index => 2}, {down, Pid1, noconnection}, State1), + % one monitor and one consumer status update (deactivated) + ?assertEqual(3, length(Effects2)), + + {_, _, Effects3} = apply(#{index => 3}, {nodeup, node(self())}, State2), + % for each consumer: 1 effect to monitor the consumer PID + ?assertEqual(5, length(Effects3)). + +single_active_cancelled_with_unacked_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + C1 = {<<"ctag1">>, self()}, + C2 = {<<"ctag2">>, self()}, + % adding some consumers + AddConsumer = fun(C, S0) -> + {S, _, _} = apply( + meta(1), + make_checkout(C, + {auto, 1, simple_prefetch}, + #{}), + S0), + S + end, + State1 = lists:foldl(AddConsumer, State0, [C1, C2]), + + %% enqueue 2 messages + {State2, _Effects2} = enq(3, 1, msg1, State1), + {State3, _Effects3} = enq(4, 2, msg2, State2), + %% one should be checked ou to C1 + %% cancel C1 + {State4, _, _} = apply(meta(5), + make_checkout(C1, cancel, #{}), + State3), + %% C2 should be the active consumer + ?assertMatch(#{C2 := #consumer{status = up, + checked_out = #{0 := _}}}, + State4#?STATE.consumers), + %% C1 should be a cancelled consumer + ?assertMatch(#{C1 := #consumer{status = cancelled, + lifetime = once, + checked_out = #{0 := _}}}, + State4#?STATE.consumers), + ?assertMatch([], State4#?STATE.waiting_consumers), + + %% Ack both messages + {State5, _Effects5} = settle(C1, 1, 0, State4), + %% C1 should now be cancelled + {State6, _Effects6} = settle(C2, 2, 0, State5), + + %% C2 should remain + ?assertMatch(#{C2 := #consumer{status = up}}, + State6#?STATE.consumers), + %% C1 should be gone + ?assertNotMatch(#{C1 := _}, + State6#?STATE.consumers), + ?assertMatch([], State6#?STATE.waiting_consumers), + ok. + +single_active_with_credited_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + C1 = {<<"ctag1">>, self()}, + C2 = {<<"ctag2">>, self()}, + % adding some consumers + AddConsumer = fun(C, S0) -> + {S, _, _} = apply( + meta(1), + make_checkout(C, + {auto, 0, credited}, + #{}), + S0), + S + end, + State1 = lists:foldl(AddConsumer, State0, [C1, C2]), + + %% add some credit + C1Cred = rabbit_fifo_v0:make_credit(C1, 5, 0, false), + {State2, _, _Effects2} = apply(meta(3), C1Cred, State1), + C2Cred = rabbit_fifo_v0:make_credit(C2, 4, 0, false), + {State3, _} = apply(meta(4), C2Cred, State2), + %% both consumers should have credit + ?assertMatch(#{C1 := #consumer{credit = 5}}, + State3#?STATE.consumers), + ?assertMatch([{C2, #consumer{credit = 4}}], + State3#?STATE.waiting_consumers), + ok. + +purge_nodes_test(_) -> + Node = purged@node, + ThisNode = node(), + EnqPid = test_util:fake_pid(Node), + EnqPid2 = test_util:fake_pid(node()), + ConPid = test_util:fake_pid(Node), + Cid = {<<"tag">>, ConPid}, + % WaitingPid = test_util:fake_pid(Node), + + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + single_active_consumer_on => false}), + {State1, _, _} = apply(meta(1), + rabbit_fifo_v0:make_enqueue(EnqPid, 1, msg1), + State0), + {State2, _, _} = apply(meta(2), + rabbit_fifo_v0:make_enqueue(EnqPid2, 1, msg2), + State1), + {State3, _} = check(Cid, 3, 1000, State2), + {State4, _, _} = apply(meta(4), + {down, EnqPid, noconnection}, + State3), + ?assertMatch( + [{mod_call, rabbit_quorum_queue, handle_tick, + [#resource{}, _Metrics, + [ThisNode, Node] + ]}] , rabbit_fifo_v0:tick(1, State4)), + %% assert there are both enqueuers and consumers + {State, _, _} = apply(meta(5), + rabbit_fifo_v0:make_purge_nodes([Node]), + State4), + + %% assert there are no enqueuers nor consumers + ?assertMatch(#?STATE{enqueuers = Enqs} when map_size(Enqs) == 1, State), + ?assertMatch(#?STATE{consumers = Cons} when map_size(Cons) == 0, State), + ?assertMatch( + [{mod_call, rabbit_quorum_queue, handle_tick, + [#resource{}, _Metrics, + [ThisNode] + ]}] , rabbit_fifo_v0:tick(1, State)), + ok. + +meta(Idx) -> + #{index => Idx, term => 1, + from => {make_ref(), self()}}. + +enq(Idx, MsgSeq, Msg, State) -> + strip_reply( + apply(meta(Idx), rabbit_fifo_v0:make_enqueue(self(), MsgSeq, Msg), State)). + +deq(Idx, Cid, Settlement, State0) -> + {State, {dequeue, {MsgId, Msg}, _}, _} = + apply(meta(Idx), + rabbit_fifo_v0:make_checkout(Cid, {dequeue, Settlement}, #{}), + State0), + {State, {MsgId, Msg}}. + +check_n(Cid, Idx, N, State) -> + strip_reply( + apply(meta(Idx), + rabbit_fifo_v0:make_checkout(Cid, {auto, N, simple_prefetch}, #{}), + State)). + +check(Cid, Idx, State) -> + strip_reply( + apply(meta(Idx), + rabbit_fifo_v0:make_checkout(Cid, {once, 1, simple_prefetch}, #{}), + State)). + +check_auto(Cid, Idx, State) -> + strip_reply( + apply(meta(Idx), + rabbit_fifo_v0:make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), + State)). + +check(Cid, Idx, Num, State) -> + strip_reply( + apply(meta(Idx), + rabbit_fifo_v0:make_checkout(Cid, {auto, Num, simple_prefetch}, #{}), + State)). + +settle(Cid, Idx, MsgId, State) -> + strip_reply(apply(meta(Idx), rabbit_fifo_v0:make_settle(Cid, [MsgId]), State)). + +credit(Cid, Idx, Credit, DelCnt, Drain, State) -> + strip_reply(apply(meta(Idx), rabbit_fifo_v0:make_credit(Cid, Credit, DelCnt, Drain), + State)). + +strip_reply({State, _, Effects}) -> + {State, Effects}. + +run_log(InitState, Entries) -> + lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) -> + case apply(meta(Idx), E, Acc0) of + {Acc, _, Efx} when is_list(Efx) -> + {Acc, Efx0 ++ Efx}; + {Acc, _, Efx} -> + {Acc, Efx0 ++ [Efx]}; + {Acc, _} -> + {Acc, Efx0} + end + end, {InitState, []}, Entries). + + +%% AUX Tests + +aux_test(_) -> + _ = ra_machine_ets:start_link(), + Aux0 = init_aux(aux_test), + MacState = init(#{name => aux_test, + queue_resource => + rabbit_misc:r(<<"/">>, queue, <<"test">>)}), + ok = meck:new(ra_log, []), + Log = mock_log, + meck:expect(ra_log, last_index_term, fun (_) -> {0, 0} end), + {no_reply, Aux, mock_log} = handle_aux(leader, cast, active, Aux0, + Log, MacState), + {no_reply, _Aux, mock_log} = handle_aux(leader, cast, tick, Aux, + Log, MacState), + [X] = ets:lookup(rabbit_fifo_usage, aux_test), + meck:unload(), + ?assert(X > 0.0), + ok. + +%% Utility + +init(Conf) -> rabbit_fifo_v0:init(Conf). +apply(Meta, Entry, State) -> rabbit_fifo_v0:apply(Meta, Entry, State). +init_aux(Conf) -> rabbit_fifo_v0:init_aux(Conf). +handle_aux(S, T, C, A, L, M) -> rabbit_fifo_v0:handle_aux(S, T, C, A, L, M). +make_checkout(C, S, M) -> rabbit_fifo_v0:make_checkout(C, S, M). diff --git a/test/rabbitmq_queues_cli_integration_SUITE.erl b/test/rabbitmq_queues_cli_integration_SUITE.erl index a41ee02427..bf5e9ee79e 100644 --- a/test/rabbitmq_queues_cli_integration_SUITE.erl +++ b/test/rabbitmq_queues_cli_integration_SUITE.erl @@ -27,8 +27,13 @@ groups() -> ]. init_per_suite(Config) -> - rabbit_ct_helpers:log_environment(), - rabbit_ct_helpers:run_setup_steps(Config). + case os:getenv("SECONDARY_UMBRELLA") of + false -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config); + _ -> + {skip, "growing and shrinking cannot be done in mixed mode"} + end. end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). |
