summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2020-09-15 13:51:32 +0100
committerGitHub <noreply@github.com>2020-09-15 13:51:32 +0100
commit48f38874d701e34727b4f482ea42bb0e3e82c460 (patch)
treea29bbd0db16ee8047118c9a160bda4b28d250d74 /test
parentc2943570b94642122145facc12e3f37d5e6bc5c1 (diff)
parentddc2eacbe2690ec61dda3a8648f7acf5cec39a03 (diff)
downloadrabbitmq-server-git-48f38874d701e34727b4f482ea42bb0e3e82c460.tar.gz
Merge pull request #2407 from rabbitmq/qq-reject-publish
reject-publish strategy support for quorum queues
Diffstat (limited to 'test')
-rw-r--r--test/consumer_timeout_SUITE.erl2
-rw-r--r--test/dead_lettering_SUITE.erl2
-rw-r--r--test/dynamic_qq_SUITE.erl100
-rw-r--r--test/publisher_confirms_parallel_SUITE.erl10
-rw-r--r--test/queue_length_limits_SUITE.erl2
-rw-r--r--test/queue_parallel_SUITE.erl2
-rw-r--r--test/quorum_queue_SUITE.erl214
-rw-r--r--test/quorum_queue_utils.erl24
-rw-r--r--test/rabbit_fifo_SUITE.erl280
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl25
-rw-r--r--test/rabbit_fifo_v0_SUITE.erl1392
-rw-r--r--test/rabbitmq_queues_cli_integration_SUITE.erl9
12 files changed, 1910 insertions, 152 deletions
diff --git a/test/consumer_timeout_SUITE.erl b/test/consumer_timeout_SUITE.erl
index 6144eca20b..468714328d 100644
--- a/test/consumer_timeout_SUITE.erl
+++ b/test/consumer_timeout_SUITE.erl
@@ -78,7 +78,7 @@ init_per_group(mirrored_queue, Config) ->
init_per_group(Group, Config0) ->
case lists:member({group, Group}, all()) of
true ->
- ClusterSize = 2,
+ ClusterSize = 3,
Config = rabbit_ct_helpers:merge_app_env(
Config0, {rabbit, [{channel_tick_interval, 1000},
{quorum_tick_interval, 1000},
diff --git a/test/dead_lettering_SUITE.erl b/test/dead_lettering_SUITE.erl
index d1196e79fc..87b5566c57 100644
--- a/test/dead_lettering_SUITE.erl
+++ b/test/dead_lettering_SUITE.erl
@@ -106,7 +106,7 @@ init_per_group(mirrored_queue, Config) ->
init_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
true ->
- ClusterSize = 2,
+ ClusterSize = 3,
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, Group},
{rmq_nodes_count, ClusterSize}
diff --git a/test/dynamic_qq_SUITE.erl b/test/dynamic_qq_SUITE.erl
index 0376b2b838..9a8f2110d6 100644
--- a/test/dynamic_qq_SUITE.erl
+++ b/test/dynamic_qq_SUITE.erl
@@ -24,15 +24,13 @@ all() ->
groups() ->
[
{clustered, [], [
- {cluster_size_2, [], [
+ {cluster_size_3, [], [
+ recover_follower_after_standalone_restart,
vhost_deletion,
force_delete_if_no_consensus,
takeover_on_failure,
takeover_on_shutdown,
quorum_unaffected_after_vhost_failure
- ]},
- {cluster_size_3, [], [
- recover_follower_after_standalone_restart
]}
]}
].
@@ -108,7 +106,7 @@ vhost_deletion(Config) ->
ok.
force_delete_if_no_consensus(Config) ->
- [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
QName = ?config(queue_name, Config),
Args = ?config(queue_args, Config),
@@ -119,6 +117,7 @@ force_delete_if_no_consensus(Config) ->
rabbit_ct_client_helpers:publish(ACh, QName, 10),
ok = rabbit_ct_broker_helpers:restart_node(Config, B),
ok = rabbit_ct_broker_helpers:stop_node(Config, A),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, C),
BCh = rabbit_ct_client_helpers:open_channel(Config, B),
?assertMatch(
@@ -140,7 +139,7 @@ takeover_on_shutdown(Config) ->
takeover_on(Config, stop_node).
takeover_on(Config, Fun) ->
- [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
QName = ?config(queue_name, Config),
@@ -152,10 +151,11 @@ takeover_on(Config, Fun) ->
rabbit_ct_client_helpers:publish(ACh, QName, 10),
ok = rabbit_ct_broker_helpers:restart_node(Config, B),
+ ok = rabbit_ct_broker_helpers:Fun(Config, C),
ok = rabbit_ct_broker_helpers:Fun(Config, A),
BCh = rabbit_ct_client_helpers:open_channel(Config, B),
- #'queue.declare_ok'{message_count = 0} =
+ #'queue.declare_ok'{} =
amqp_channel:call(
BCh, #'queue.declare'{queue = QName,
arguments = Args,
@@ -170,7 +170,7 @@ takeover_on(Config, Fun) ->
ok.
quorum_unaffected_after_vhost_failure(Config) ->
- [A, B] = Servers0 = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [A, B, _] = Servers0 = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Servers = lists:sort(Servers0),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
@@ -197,46 +197,50 @@ quorum_unaffected_after_vhost_failure(Config) ->
?assertEqual(Servers, lists:sort(proplists:get_value(online, Info, []))).
recover_follower_after_standalone_restart(Config) ->
- %% Tests that followers can be brought up standalone after forgetting the rest
- %% of the cluster. Consensus won't be reached as there is only one node in the
- %% new cluster.
- Servers = [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
- Ch = rabbit_ct_client_helpers:open_channel(Config, A),
-
- QName = ?config(queue_name, Config),
- Args = ?config(queue_args, Config),
- amqp_channel:call(Ch, #'queue.declare'{queue = QName,
- arguments = Args,
- durable = true
- }),
-
- rabbit_ct_client_helpers:publish(Ch, QName, 15),
- rabbit_ct_client_helpers:close_channel(Ch),
-
- Name = ra_name(QName),
- wait_for_messages_ready(Servers, Name, 15),
-
- rabbit_ct_broker_helpers:stop_node(Config, C),
- rabbit_ct_broker_helpers:stop_node(Config, B),
- rabbit_ct_broker_helpers:stop_node(Config, A),
-
- %% Restart one follower
- forget_cluster_node(Config, B, C),
- forget_cluster_node(Config, B, A),
-
- ok = rabbit_ct_broker_helpers:start_node(Config, B),
- wait_for_messages_ready([B], Name, 15),
- ok = rabbit_ct_broker_helpers:stop_node(Config, B),
-
- %% Restart the other
- forget_cluster_node(Config, C, B),
- forget_cluster_node(Config, C, A),
-
- ok = rabbit_ct_broker_helpers:start_node(Config, C),
- wait_for_messages_ready([C], Name, 15),
- ok = rabbit_ct_broker_helpers:stop_node(Config, C),
-
- ok.
+ case os:getenv("SECONDARY_UMBRELLA") of
+ false ->
+ %% Tests that followers can be brought up standalone after forgetting the
+ %% rest of the cluster. Consensus won't be reached as there is only one node in the
+ %% new cluster.
+ Servers = [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, A),
+
+ QName = ?config(queue_name, Config),
+ Args = ?config(queue_args, Config),
+ amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ arguments = Args,
+ durable = true
+ }),
+
+ rabbit_ct_client_helpers:publish(Ch, QName, 15),
+ rabbit_ct_client_helpers:close_channel(Ch),
+
+ Name = ra_name(QName),
+ wait_for_messages_ready(Servers, Name, 15),
+
+ rabbit_ct_broker_helpers:stop_node(Config, C),
+ rabbit_ct_broker_helpers:stop_node(Config, B),
+ rabbit_ct_broker_helpers:stop_node(Config, A),
+
+ %% Restart one follower
+ forget_cluster_node(Config, B, C),
+ forget_cluster_node(Config, B, A),
+
+ ok = rabbit_ct_broker_helpers:start_node(Config, B),
+ wait_for_messages_ready([B], Name, 15),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, B),
+
+ %% Restart the other
+ forget_cluster_node(Config, C, B),
+ forget_cluster_node(Config, C, A),
+
+ ok = rabbit_ct_broker_helpers:start_node(Config, C),
+ wait_for_messages_ready([C], Name, 15),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, C),
+ ok;
+ _ ->
+ {skip, "cannot be run in mixed mode"}
+ end.
%%----------------------------------------------------------------------------
forget_cluster_node(Config, Node, NodeToRemove) ->
diff --git a/test/publisher_confirms_parallel_SUITE.erl b/test/publisher_confirms_parallel_SUITE.erl
index 410dabb08a..c31527f0ba 100644
--- a/test/publisher_confirms_parallel_SUITE.erl
+++ b/test/publisher_confirms_parallel_SUITE.erl
@@ -82,7 +82,7 @@ init_per_group(mirrored_queue, Config) ->
init_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
true ->
- ClusterSize = 2,
+ ClusterSize = 3,
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, Group},
{rmq_nodes_count, ClusterSize}
@@ -302,21 +302,23 @@ confirm_nack1(Config) ->
%% The closest to a nack behaviour that we can get on quorum queues is not answering while
%% the cluster is in minority. Once the cluster recovers, a 'basic.ack' will be issued.
confirm_minority(Config) ->
- [_A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [_A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
QName = ?config(queue_name, Config),
declare_queue(Ch, Config, QName),
ok = rabbit_ct_broker_helpers:stop_node(Config, B),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, C),
amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
publish(Ch, QName, [<<"msg1">>]),
receive
- #'basic.nack'{} -> throw(unexpected_nack);
+ #'basic.nack'{} -> ok;
#'basic.ack'{} -> throw(unexpected_ack)
- after 30000 ->
+ after 120000 ->
ok
end,
ok = rabbit_ct_broker_helpers:start_node(Config, B),
+ publish(Ch, QName, [<<"msg2">>]),
receive
#'basic.nack'{} -> throw(unexpected_nack);
#'basic.ack'{} -> ok
diff --git a/test/queue_length_limits_SUITE.erl b/test/queue_length_limits_SUITE.erl
index 20c7f46c86..f9b6f2f368 100644
--- a/test/queue_length_limits_SUITE.erl
+++ b/test/queue_length_limits_SUITE.erl
@@ -87,7 +87,7 @@ init_per_group(max_length_mirrored, Config) ->
init_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
true ->
- ClusterSize = 2,
+ ClusterSize = 3,
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, Group},
{rmq_nodes_count, ClusterSize}
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl
index d13675bf58..c4d16a5900 100644
--- a/test/queue_parallel_SUITE.erl
+++ b/test/queue_parallel_SUITE.erl
@@ -125,7 +125,7 @@ init_per_group(mirrored_queue, Config) ->
init_per_group(Group, Config0) ->
case lists:member({group, Group}, all()) of
true ->
- ClusterSize = 2,
+ ClusterSize = 3,
Config = rabbit_ct_helpers:merge_app_env(
Config0, {rabbit, [{channel_tick_interval, 1000},
{quorum_tick_interval, 1000}]}),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index be3b46fbfb..cc54aae78e 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -16,7 +16,8 @@
wait_for_messages_total/3,
wait_for_messages/2,
dirty_query/3,
- ra_name/1]).
+ ra_name/1,
+ is_mixed_versions/0]).
-compile(export_all).
@@ -113,6 +114,7 @@ all_tests() ->
subscribe_redelivery_count,
message_bytes_metrics,
queue_length_limit_drop_head,
+ queue_length_limit_reject_publish,
subscribe_redelivery_limit,
subscribe_redelivery_policy,
subscribe_redelivery_limit_with_dead_letter,
@@ -128,7 +130,8 @@ all_tests() ->
consumer_metrics,
invalid_policy,
delete_if_empty,
- delete_if_unused
+ delete_if_unused,
+ queue_ttl
].
memory_tests() ->
@@ -156,7 +159,12 @@ init_per_group(clustered, Config) ->
init_per_group(unclustered, Config) ->
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]);
init_per_group(clustered_with_partitions, Config) ->
- rabbit_ct_helpers:set_config(Config, [{net_ticktime, 10}]);
+ case is_mixed_versions() of
+ true ->
+ {skip, "clustered_with_partitions is too unreliable in mixed mode"};
+ false ->
+ rabbit_ct_helpers:set_config(Config, [{net_ticktime, 10}])
+ end;
init_per_group(Group, Config) ->
ClusterSize = case Group of
single_node -> 1;
@@ -164,38 +172,44 @@ init_per_group(Group, Config) ->
cluster_size_3 -> 3;
cluster_size_5 -> 5
end,
- Config1 = rabbit_ct_helpers:set_config(Config,
- [{rmq_nodes_count, ClusterSize},
- {rmq_nodename_suffix, Group},
- {tcp_ports_base}]),
- Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]),
- Ret = rabbit_ct_helpers:run_steps(Config1b,
- [fun merge_app_env/1 ] ++
- rabbit_ct_broker_helpers:setup_steps()),
- case Ret of
- {skip, _} ->
- Ret;
- Config2 ->
- EnableFF = rabbit_ct_broker_helpers:enable_feature_flag(
- Config2, quorum_queue),
- case EnableFF of
- ok ->
- ok = rabbit_ct_broker_helpers:rpc(
- Config2, 0, application, set_env,
- [rabbit, channel_tick_interval, 100]),
- %% HACK: the larger cluster sizes benefit for a bit
- %% more time after clustering before running the
- %% tests.
- case Group of
- cluster_size_5 ->
- timer:sleep(5000),
- Config2;
- _ ->
- Config2
- end;
- Skip ->
- end_per_group(Group, Config2),
- Skip
+ IsMixed = not (false == os:getenv("SECONDARY_UMBRELLA")),
+ case ClusterSize of
+ 2 when IsMixed ->
+ {skip, "cluster size 2 isn't mixed versions compatible"};
+ _ ->
+ Config1 = rabbit_ct_helpers:set_config(Config,
+ [{rmq_nodes_count, ClusterSize},
+ {rmq_nodename_suffix, Group},
+ {tcp_ports_base}]),
+ Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]),
+ Ret = rabbit_ct_helpers:run_steps(Config1b,
+ [fun merge_app_env/1 ] ++
+ rabbit_ct_broker_helpers:setup_steps()),
+ case Ret of
+ {skip, _} ->
+ Ret;
+ Config2 ->
+ EnableFF = rabbit_ct_broker_helpers:enable_feature_flag(
+ Config2, quorum_queue),
+ case EnableFF of
+ ok ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config2, 0, application, set_env,
+ [rabbit, channel_tick_interval, 100]),
+ %% HACK: the larger cluster sizes benefit for a bit
+ %% more time after clustering before running the
+ %% tests.
+ case Group of
+ cluster_size_5 ->
+ timer:sleep(5000),
+ Config2;
+ _ ->
+ Config2
+ end;
+ Skip ->
+ end_per_group(Group, Config2),
+ Skip
+ end
end
end.
@@ -327,11 +341,6 @@ declare_invalid_args(Config) ->
{{shutdown, {server_initiated_close, 406, _}}, _},
declare(rabbit_ct_client_helpers:open_channel(Config, Server),
LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-expires">>, long, 2000}])),
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-message-ttl">>, long, 2000}])),
?assertExit(
@@ -345,7 +354,7 @@ declare_invalid_args(Config) ->
declare(rabbit_ct_client_helpers:open_channel(Config, Server),
LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-overflow">>, longstr, XOverflow}]))
- || XOverflow <- [<<"reject-publish">>, <<"reject-publish-dlx">>]],
+ || XOverflow <- [<<"reject-publish-dlx">>]],
?assertExit(
{{shutdown, {server_initiated_close, 406, _}}, _},
@@ -612,14 +621,16 @@ publish_confirm(Ch, QName) ->
publish(Ch, QName),
amqp_channel:register_confirm_handler(Ch, self()),
ct:pal("waiting for confirms from ~s", [QName]),
- ok = receive
- #'basic.ack'{} -> ok;
- #'basic.nack'{} -> fail
- after 2500 ->
- exit(confirm_timeout)
- end,
- ct:pal("CONFIRMED! ~s", [QName]),
- ok.
+ receive
+ #'basic.ack'{} ->
+ ct:pal("CONFIRMED! ~s", [QName]),
+ ok;
+ #'basic.nack'{} ->
+ ct:pal("NOT CONFIRMED! ~s", [QName]),
+ fail
+ after 2500 ->
+ exit(confirm_timeout)
+ end.
publish_and_restart(Config) ->
%% Test the node restart with both types of queues (quorum and classic) to
@@ -685,6 +696,14 @@ shrink_all(Config) ->
ok.
rebalance(Config) ->
+ case is_mixed_versions() of
+ true ->
+ {skip, "rebalance tests isn't mixed version compatible"};
+ false ->
+ rebalance0(Config)
+ end.
+
+rebalance0(Config) ->
[Server0, _, _] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1025,9 +1044,11 @@ recover_from_multiple_failures(Config) ->
wait_for_messages_pending_ack(Servers, RaName, 0).
publishing_to_unavailable_queue(Config) ->
+ %% publishing to an unavialable queue but with a reachable member should result
+ %% in the initial enqueuer session timing out and the message being nacked
[Server, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
- TCh = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ TCh = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(TCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
@@ -1035,19 +1056,29 @@ publishing_to_unavailable_queue(Config) ->
ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
+ ct:pal("opening channel to ~w", [Server]),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
- publish_many(Ch, QQ, 300),
- timer:sleep(1000),
+ publish_many(Ch, QQ, 1),
+ %% this should result in a nack
+ ok = receive
+ #'basic.ack'{} -> fail;
+ #'basic.nack'{} -> ok
+ after 90000 ->
+ exit(confirm_timeout)
+ end,
ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
- %% check we get at least on ack
+ timer:sleep(2000),
+ publish_many(Ch, QQ, 1),
+ %% this should now be acked
ok = receive
#'basic.ack'{} -> ok;
#'basic.nack'{} -> fail
- after 30000 ->
+ after 90000 ->
exit(confirm_timeout)
end,
+ %% check we get at least on ack
ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
ok.
@@ -1087,6 +1118,14 @@ leadership_takeover(Config) ->
wait_for_messages_pending_ack(Servers, RaName, 0).
metrics_cleanup_on_leadership_takeover(Config) ->
+ case is_mixed_versions() of
+ true ->
+ {skip, "metrics_cleanup_on_leadership_takeover tests isn't mixed version compatible"};
+ false ->
+ metrics_cleanup_on_leadership_takeover0(Config)
+ end.
+
+metrics_cleanup_on_leadership_takeover0(Config) ->
%% Queue core metrics should be deleted from a node once the leadership is transferred
%% to another follower
[Server, _, _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1244,13 +1283,13 @@ simple_confirm_availability_on_leader_change(Config) ->
%% open a channel to another node
Ch = rabbit_ct_client_helpers:open_channel(Config, Node1),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
- publish_confirm(Ch, QQ),
+ ok = publish_confirm(Ch, QQ),
%% stop the node hosting the leader
ok = rabbit_ct_broker_helpers:stop_node(Config, Node2),
%% this should not fail as the channel should detect the new leader and
%% resend to that
- publish_confirm(Ch, QQ),
+ ok = publish_confirm(Ch, QQ),
ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
ok.
@@ -1270,7 +1309,7 @@ confirm_availability_on_leader_change(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Node1),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
ConfirmLoop = fun Loop() ->
- publish_confirm(Ch, QQ),
+ ok = publish_confirm(Ch, QQ),
receive {done, P} ->
P ! done,
ok
@@ -1462,7 +1501,16 @@ node_removal_is_not_quorum_critical(Config) ->
Qs = rpc:call(Server, rabbit_quorum_queue, list_with_minimum_quorum, []),
?assertEqual([], Qs).
+
file_handle_reservations(Config) ->
+ case is_mixed_versions() of
+ true ->
+ {skip, "file_handle_reservations tests isn't mixed version compatible"};
+ false ->
+ file_handle_reservations0(Config)
+ end.
+
+file_handle_reservations0(Config) ->
Servers = [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
QQ = ?config(queue_name, Config),
@@ -2020,6 +2068,35 @@ queue_length_limit_drop_head(Config) ->
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = true})).
+queue_length_limit_reject_publish(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ RaName = ra_name(QQ),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-length">>, long, 1},
+ {<<"x-overflow">>, longstr, <<"reject-publish">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ ok = publish_confirm(Ch, QQ),
+ ok = publish_confirm(Ch, QQ),
+ %% give the channel some time to process the async reject_publish notification
+ %% now that we are over the limit it should start failing
+ wait_for_messages_total(Servers, RaName, 2),
+ fail = publish_confirm(Ch, QQ),
+ %% remove all messages
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = _}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})),
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = _}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})),
+ %% publish should be allowed again now
+ ok = publish_confirm(Ch, QQ),
+ ok.
+
queue_length_in_memory_limit_basic_get(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -2381,6 +2458,29 @@ delete_if_unused(Config) ->
amqp_channel:call(Ch, #'queue.delete'{queue = QQ,
if_unused = true})).
+queue_ttl(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-expires">>, long, 1000}])),
+ timer:sleep(5500),
+ %% check queue no longer exists
+ ?assertExit(
+ {{shutdown,
+ {server_initiated_close,404,
+ <<"NOT_FOUND - no queue 'queue_ttl' in vhost '/'">>}},
+ _},
+ amqp_channel:call(Ch, #'queue.declare'{queue = QQ,
+ passive = true,
+ durable = true,
+ auto_delete = false,
+ arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-expires">>, long, 1000}]})),
+ ok.
+
%%----------------------------------------------------------------------------
declare(Ch, Q) ->
diff --git a/test/quorum_queue_utils.erl b/test/quorum_queue_utils.erl
index 9c988bc066..4b65bc2737 100644
--- a/test/quorum_queue_utils.erl
+++ b/test/quorum_queue_utils.erl
@@ -8,7 +8,8 @@
wait_for_messages_total/3,
wait_for_messages/2,
dirty_query/3,
- ra_name/1
+ ra_name/1,
+ is_mixed_versions/0
]).
wait_for_messages_ready(Servers, QName, Ready) ->
@@ -34,11 +35,19 @@ wait_for_messages(Servers, QName, Number, Fun, 0) ->
wait_for_messages(Servers, QName, Number, Fun, N) ->
Msgs = dirty_query(Servers, QName, Fun),
ct:pal("Got messages ~p", [Msgs]),
- case lists:all(fun(C) when is_integer(C) ->
- C == Number;
- (_) ->
- false
- end, Msgs) of
+ %% hack to allow the check to succeed in mixed versions clusters if at
+ %% least one node matches the criteria rather than all nodes for
+ F = case is_mixed_versions() of
+ true ->
+ any;
+ false ->
+ all
+ end,
+ case lists:F(fun(C) when is_integer(C) ->
+ C == Number;
+ (_) ->
+ false
+ end, Msgs) of
true ->
ok;
_ ->
@@ -88,3 +97,6 @@ filter_queues(Expected, Got) ->
lists:filter(fun([K, _, _, _]) ->
lists:member(K, Keys)
end, Got).
+
+is_mixed_versions() ->
+ not (false == os:getenv("SECONDARY_UMBRELLA")).
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 9f2daac762..7778e04afb 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -59,19 +59,24 @@ end_per_testcase(_TestCase, _Config) ->
?ASSERT_EFF(EfxPat, true, Effects)).
-define(ASSERT_EFF(EfxPat, Guard, Effects),
- ?assert(lists:any(fun (EfxPat) when Guard -> true;
- (_) -> false
- end, Effects))).
+ ?assert(lists:any(fun (EfxPat) when Guard -> true;
+ (_) -> false
+ end, Effects))).
-define(ASSERT_NO_EFF(EfxPat, Effects),
- ?assert(not lists:any(fun (EfxPat) -> true;
- (_) -> false
- end, Effects))).
+ ?assert(not lists:any(fun (EfxPat) -> true;
+ (_) -> false
+ end, Effects))).
+
+-define(ASSERT_NO_EFF(EfxPat, Guard, Effects),
+ ?assert(not lists:any(fun (EfxPat) when Guard -> true;
+ (_) -> false
+ end, Effects))).
-define(assertNoEffect(EfxPat, Effects),
- ?assert(not lists:any(fun (EfxPat) -> true;
- (_) -> false
- end, Effects))).
+ ?assert(not lists:any(fun (EfxPat) -> true;
+ (_) -> false
+ end, Effects))).
test_init(Name) ->
init(#{name => Name,
@@ -380,7 +385,7 @@ cancelled_checkout_out_test(_) ->
{State1, _} = check_auto(Cid, 2, State0),
% cancelled checkout should not return pending messages to queue
{State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
- ?assertEqual(1, maps:size(State2#rabbit_fifo.messages)),
+ ?assertEqual(1, lqueue:len(State2#rabbit_fifo.messages)),
?assertEqual(0, lqueue:len(State2#rabbit_fifo.returns)),
{State3, {dequeue, empty}} =
@@ -436,13 +441,13 @@ down_with_noconnection_returns_unack_test(_) ->
Pid = spawn(fun() -> ok end),
Cid = {<<"down_with_noconnect">>, Pid},
{State0, _} = enq(1, 1, second, test_init(test)),
- ?assertEqual(1, maps:size(State0#rabbit_fifo.messages)),
+ ?assertEqual(1, lqueue:len(State0#rabbit_fifo.messages)),
?assertEqual(0, lqueue:len(State0#rabbit_fifo.returns)),
{State1, {_, _}} = deq(2, Cid, unsettled, State0),
- ?assertEqual(0, maps:size(State1#rabbit_fifo.messages)),
+ ?assertEqual(0, lqueue:len(State1#rabbit_fifo.messages)),
?assertEqual(0, lqueue:len(State1#rabbit_fifo.returns)),
{State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
- ?assertEqual(0, maps:size(State2a#rabbit_fifo.messages)),
+ ?assertEqual(0, lqueue:len(State2a#rabbit_fifo.messages)),
?assertEqual(1, lqueue:len(State2a#rabbit_fifo.returns)),
?assertMatch(#consumer{checked_out = Ch,
status = suspected_down}
@@ -539,7 +544,7 @@ duplicate_delivery_test(_) ->
{#rabbit_fifo{ra_indexes = RaIdxs,
messages = Messages}, _} = enq(2, 1, first, State0),
?assertEqual(1, rabbit_fifo_index:size(RaIdxs)),
- ?assertEqual(1, maps:size(Messages)),
+ ?assertEqual(1, lqueue:len(Messages)),
ok.
state_enter_file_handle_leader_reservation_test(_) ->
@@ -622,7 +627,7 @@ down_noproc_returns_checked_out_in_order_test(_) ->
{FS, _} = enq(Num, Num, Num, FS0),
FS
end, S0, lists:seq(1, 100)),
- ?assertEqual(100, maps:size(S1#rabbit_fifo.messages)),
+ ?assertEqual(100, lqueue:len(S1#rabbit_fifo.messages)),
Cid = {<<"cid">>, self()},
{S2, _} = check(Cid, 101, 1000, S1),
#consumer{checked_out = Checked} = maps:get(Cid, S2#rabbit_fifo.consumers),
@@ -643,7 +648,7 @@ down_noconnection_returns_checked_out_test(_) ->
{FS, _} = enq(Num, Num, Num, FS0),
FS
end, S0, lists:seq(1, NumMsgs)),
- ?assertEqual(NumMsgs, maps:size(S1#rabbit_fifo.messages)),
+ ?assertEqual(NumMsgs, lqueue:len(S1#rabbit_fifo.messages)),
Cid = {<<"cid">>, self()},
{S2, _} = check(Cid, 101, 1000, S1),
#consumer{checked_out = Checked} = maps:get(Cid, S2#rabbit_fifo.consumers),
@@ -797,7 +802,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
State1 = lists:foldl(AddConsumer, State0, Consumers),
% the channel of the active consumer goes down
- {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, noproc}, State1),
+ {State2, _, Effects} = apply(meta(2), {down, Pid1, noproc}, State1),
% fell back to another consumer
?assertEqual(1, map_size(State2#rabbit_fifo.consumers)),
% there are still waiting consumers
@@ -810,7 +815,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
update_consumer_handler, _}, Effects),
% the channel of the active consumer and a waiting consumer goes down
- {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, noproc}, State2),
+ {State3, _, Effects2} = apply(meta(3), {down, Pid2, noproc}, State2),
% fell back to another consumer
?assertEqual(1, map_size(State3#rabbit_fifo.consumers)),
% no more waiting consumer
@@ -824,7 +829,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
update_consumer_handler, _}, Effects2),
% the last channel goes down
- {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3),
+ {State4, _, Effects3} = apply(meta(4), {down, Pid3, doesnotmatter}, State3),
% no more consumers
?assertEqual(0, map_size(State4#rabbit_fifo.consumers)),
?assertEqual(0, length(State4#rabbit_fifo.waiting_consumers)),
@@ -1130,7 +1135,8 @@ active_flag_updated_when_consumer_suspected_unsuspected_test(_) ->
State1 = lists:foldl(AddConsumer, State0,
[{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
- {State2, _, Effects2} = apply(#{index => 3}, {down, Pid1, noconnection}, State1),
+ {State2, _, Effects2} = apply(#{index => 3,
+ system_time => 1500}, {down, Pid1, noconnection}, State1),
% 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node
?assertEqual(4 + 1, length(Effects2)),
@@ -1163,11 +1169,11 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co
[{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2},
{<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
- {State2, _, Effects2} = apply(#{index => 2}, {down, Pid1, noconnection}, State1),
+ {State2, _, Effects2} = apply(meta(2), {down, Pid1, noconnection}, State1),
% one monitor and one consumer status update (deactivated)
?assertEqual(3, length(Effects2)),
- {_, _, Effects3} = apply(#{index => 3}, {nodeup, node(self())}, State2),
+ {_, _, Effects3} = apply(meta(3), {nodeup, node(self())}, State2),
% for each consumer: 1 effect to monitor the consumer PID
?assertEqual(5, length(Effects3)).
@@ -1258,6 +1264,99 @@ single_active_with_credited_test(_) ->
State3#rabbit_fifo.waiting_consumers),
ok.
+
+register_enqueuer_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ max_length => 2,
+ overflow_strategy => reject_publish}),
+ %% simply registering should be ok when we're below limit
+ Pid1 = test_util:fake_pid(node()),
+ {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0),
+
+ {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1),
+ %% register another enqueuer shoudl be ok
+ Pid2 = test_util:fake_pid(node()),
+ {State3, ok, [_]} = apply(meta(3), make_register_enqueuer(Pid2), State2),
+
+ {State4, ok, _} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 2, two), State3),
+ {State5, ok, Efx} = apply(meta(5), rabbit_fifo:make_enqueue(Pid1, 3, three), State4),
+ % ct:pal("Efx ~p", [Efx]),
+ %% validate all registered enqueuers are notified of overflow state
+ ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx),
+ ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid2, Efx),
+
+ %% this time, registry should return reject_publish
+ {State6, reject_publish, [_]} = apply(meta(6), make_register_enqueuer(
+ test_util:fake_pid(node())), State5),
+ ?assertMatch(#{num_enqueuers := 3}, rabbit_fifo:overview(State6)),
+
+
+ %% remove two messages this should make the queue fall below the 0.8 limit
+ {State7, {dequeue, _, _}, _Efx7} =
+ apply(meta(7),
+ rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State6),
+ ct:pal("Efx7 ~p", [_Efx7]),
+ {State8, {dequeue, _, _}, Efx8} =
+ apply(meta(8),
+ rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State7),
+ ct:pal("Efx8 ~p", [Efx8]),
+ %% validate all registered enqueuers are notified of overflow state
+ ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid1, Efx8),
+ ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid2, Efx8),
+ {_State9, {dequeue, _, _}, Efx9} =
+ apply(meta(9),
+ rabbit_fifo:make_checkout(<<"a">>, {dequeue, settled}, #{}), State8),
+ ?ASSERT_NO_EFF({send_msg, P, go, [ra_event]}, P == Pid1, Efx9),
+ ?ASSERT_NO_EFF({send_msg, P, go, [ra_event]}, P == Pid2, Efx9),
+ ok.
+
+reject_publish_purge_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ max_length => 2,
+ overflow_strategy => reject_publish}),
+ %% simply registering should be ok when we're below limit
+ Pid1 = test_util:fake_pid(node()),
+ {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0),
+ {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1),
+ {State3, ok, _} = apply(meta(3), rabbit_fifo:make_enqueue(Pid1, 2, two), State2),
+ {State4, ok, Efx} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 3, three), State3),
+ % ct:pal("Efx ~p", [Efx]),
+ ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx),
+ {_State5, {purge, 3}, Efx1} = apply(meta(5), rabbit_fifo:make_purge(), State4),
+ ?ASSERT_EFF({send_msg, P, {queue_status, go}, [ra_event]}, P == Pid1, Efx1),
+ ok.
+
+reject_publish_applied_after_limit_test(_) ->
+ InitConf = #{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8))
+ },
+ State0 = init(InitConf),
+ %% simply registering should be ok when we're below limit
+ Pid1 = test_util:fake_pid(node()),
+ {State1, ok, [_]} = apply(meta(1), make_register_enqueuer(Pid1), State0),
+ {State2, ok, _} = apply(meta(2), rabbit_fifo:make_enqueue(Pid1, 1, one), State1),
+ {State3, ok, _} = apply(meta(3), rabbit_fifo:make_enqueue(Pid1, 2, two), State2),
+ {State4, ok, Efx} = apply(meta(4), rabbit_fifo:make_enqueue(Pid1, 3, three), State3),
+ % ct:pal("Efx ~p", [Efx]),
+ ?ASSERT_NO_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx),
+ %% apply new config
+ Conf = #{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ max_length => 2,
+ overflow_strategy => reject_publish
+ },
+ {State5, ok, Efx1} = apply(meta(5), rabbit_fifo:make_update_config(Conf), State4),
+ ?ASSERT_EFF({send_msg, P, {queue_status, reject_publish}, [ra_event]}, P == Pid1, Efx1),
+ Pid2 = test_util:fake_pid(node()),
+ {_State6, reject_publish, _} = apply(meta(1), make_register_enqueuer(Pid2), State5),
+ ok.
+
purge_nodes_test(_) ->
Node = purged@node,
ThisNode = node(),
@@ -1305,7 +1404,12 @@ purge_nodes_test(_) ->
ok.
meta(Idx) ->
- #{index => Idx, term => 1,
+ meta(Idx, 0).
+
+meta(Idx, Timestamp) ->
+ #{index => Idx,
+ term => 1,
+ system_time => Timestamp,
from => {make_ref(), self()}}.
enq(Idx, MsgSeq, Msg, State) ->
@@ -1386,9 +1490,139 @@ aux_test(_) ->
?assert(X > 0.0),
ok.
+
+%% machine version conversion test
+
+machine_version_test(_) ->
+ V0 = rabbit_fifo_v0,
+ S0 = V0:init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>)}),
+ Idx = 1,
+ {#rabbit_fifo{}, ok, []} = apply(meta(Idx), {machine_version, 0, 1}, S0),
+
+ Cid = {atom_to_binary(?FUNCTION_NAME, utf8), self()},
+ Entries = [
+ {1, rabbit_fifo_v0:make_enqueue(self(), 1, banana)},
+ {2, rabbit_fifo_v0:make_enqueue(self(), 2, apple)},
+ {3, rabbit_fifo_v0:make_checkout(Cid, {auto, 1, unsettled}, #{})}
+ ],
+ {S1, _Effects} = rabbit_fifo_v0_SUITE:run_log(S0, Entries),
+ Self = self(),
+ {#rabbit_fifo{enqueuers = #{Self := #enqueuer{}},
+ messages = Msgs}, ok, []} = apply(meta(Idx),
+ {machine_version, 0, 1}, S1),
+ %% validate message conversion to lqueue
+ ?assertEqual(1, lqueue:len(Msgs)),
+ ok.
+
+queue_ttl_test(_) ->
+ QName = rabbit_misc:r(<<"/">>, queue, <<"test">>),
+ Conf = #{name => ?FUNCTION_NAME,
+ queue_resource => QName,
+ created => 1000,
+ expires => 1000},
+ S0 = rabbit_fifo:init(Conf),
+ Now = 1500,
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S0),
+ %% this should delete the queue
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 1000, S0),
+ %% adding a consumer should not ever trigger deletion
+ Cid = {<<"cid1">>, self()},
+ {S1, _} = check_auto(Cid, 1, S0),
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S1),
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S1),
+ %% cancelling the consumer should then
+ {S2, _, _} = apply(meta(2, Now),
+ rabbit_fifo:make_checkout(Cid, cancel, #{}), S1),
+ %% last_active should have been reset when consumer was cancelled
+ %% last_active = 2500
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2),
+ %% but now it should be deleted
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 2500, S2),
+
+ %% Same for downs
+ {S2D, _, _} = apply(meta(2, Now),
+ {down, self(), noconnection}, S1),
+ %% last_active should have been reset when consumer was cancelled
+ %% last_active = 2500
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2D),
+ %% but now it should be deleted
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 2500, S2D),
+
+ %% dequeue should set last applied
+ {S1Deq, {dequeue, empty}} =
+ apply(meta(2, Now),
+ rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}),
+ S0),
+
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S1Deq),
+ %% but now it should be deleted
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 2500, S1Deq),
+ %% Enqueue message,
+ {E1, _, _} = apply(meta(2, Now),
+ rabbit_fifo:make_enqueue(self(), 1, msg1), S0),
+ Deq = {<<"deq1">>, self()},
+ {E2, {dequeue, {MsgId, _}, _}, _} =
+ apply(meta(3, Now),
+ rabbit_fifo:make_checkout(Deq, {dequeue, unsettled}, #{}),
+ E1),
+ {E3, _, _} = apply(meta(3, Now + 1000),
+ rabbit_fifo:make_settle(Deq, [MsgId]), E2),
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1500, E3),
+ %% but now it should be deleted
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 3000, E3),
+
+ ok.
+
+queue_ttl_with_single_active_consumer_test(_) ->
+ QName = rabbit_misc:r(<<"/">>, queue, <<"test">>),
+ Conf = #{name => ?FUNCTION_NAME,
+ queue_resource => QName,
+ created => 1000,
+ expires => 1000,
+ single_active_consumer_on => true},
+ S0 = rabbit_fifo:init(Conf),
+ Now = 1500,
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S0),
+ %% this should delete the queue
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 1000, S0),
+ %% adding a consumer should not ever trigger deletion
+ Cid = {<<"cid1">>, self()},
+ {S1, _} = check_auto(Cid, 1, S0),
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now, S1),
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S1),
+ %% cancelling the consumer should then
+ {S2, _, _} = apply(meta(2, Now),
+ rabbit_fifo:make_checkout(Cid, cancel, #{}), S1),
+ %% last_active should have been reset when consumer was cancelled
+ %% last_active = 2500
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2),
+ %% but now it should be deleted
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 2500, S2),
+
+ %% Same for downs
+ {S2D, _, _} = apply(meta(2, Now),
+ {down, self(), noconnection}, S1),
+ %% last_active should have been reset when consumer was cancelled
+ %% last_active = 2500
+ [{mod_call, _, handle_tick, _}] = rabbit_fifo:tick(Now + 1000, S2D),
+ %% but now it should be deleted
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}]
+ = rabbit_fifo:tick(Now + 2500, S2D),
+
+ ok.
+
%% Utility
init(Conf) -> rabbit_fifo:init(Conf).
+make_register_enqueuer(Pid) -> rabbit_fifo:make_register_enqueuer(Pid).
apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State).
init_aux(Conf) -> rabbit_fifo:init_aux(Conf).
handle_aux(S, T, C, A, L, M) -> rabbit_fifo:handle_aux(S, T, C, A, L, M).
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index 23522e71f9..859db2178f 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -479,15 +479,17 @@ test_run_log(_Config) ->
snapshots(_Config) ->
run_proper(
fun () ->
- ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength,
- InMemoryBytes},
- frequency([{10, {0, 0, false, 0, 0, 0}},
+ ?FORALL({Length, Bytes, SingleActiveConsumer,
+ DeliveryLimit, InMemoryLength, InMemoryBytes,
+ Overflow},
+ frequency([{10, {0, 0, false, 0, 0, 0, drop_head}},
{5, {oneof([range(1, 10), undefined]),
oneof([range(1, 1000), undefined]),
boolean(),
oneof([range(1, 3), undefined]),
oneof([range(1, 10), undefined]),
- oneof([range(1, 1000), undefined])
+ oneof([range(1, 1000), undefined]),
+ oneof([drop_head, reject_publish])
}}]),
begin
Config = config(?FUNCTION_NAME,
@@ -496,7 +498,8 @@ snapshots(_Config) ->
SingleActiveConsumer,
DeliveryLimit,
InMemoryLength,
- InMemoryBytes),
+ InMemoryBytes,
+ Overflow),
?FORALL(O, ?LET(Ops, log_gen(256), expand(Ops, Config)),
collect({log_size, length(O)},
snapshots_prop(Config, O)))
@@ -681,6 +684,11 @@ max_length(_Config) ->
config(Name, Length, Bytes, SingleActive, DeliveryLimit,
InMemoryLength, InMemoryBytes) ->
+config(Name, Length, Bytes, SingleActive, DeliveryLimit,
+ InMemoryLength, InMemoryBytes, drop_head).
+
+config(Name, Length, Bytes, SingleActive, DeliveryLimit,
+ InMemoryLength, InMemoryBytes, Overflow) ->
#{name => Name,
max_length => map_max(Length),
max_bytes => map_max(Bytes),
@@ -688,7 +696,8 @@ config(Name, Length, Bytes, SingleActive, DeliveryLimit,
single_active_consumer_on => SingleActive,
delivery_limit => map_max(DeliveryLimit),
max_in_memory_length => map_max(InMemoryLength),
- max_in_memory_bytes => map_max(InMemoryBytes)}.
+ max_in_memory_bytes => map_max(InMemoryBytes),
+ overflow_strategy => Overflow}.
map_max(0) -> undefined;
map_max(N) -> N.
@@ -1072,7 +1081,7 @@ do_apply(Cmd, #t{effects = Effs,
%% down
T;
_ ->
- {St, Effects} = case rabbit_fifo:apply(#{index => Index}, Cmd, S0) of
+ {St, Effects} = case rabbit_fifo:apply(meta(Index), Cmd, S0) of
{S, _, E} when is_list(E) ->
{S, E};
{S, _, E} ->
@@ -1187,7 +1196,7 @@ test_init(Conf) ->
rabbit_fifo:init(maps:merge(Default, Conf)).
meta(Idx) ->
- #{index => Idx, term => 1}.
+ #{index => Idx, term => 1, system_time => 0}.
make_checkout(Cid, Spec) ->
rabbit_fifo:make_checkout(Cid, Spec, #{}).
diff --git a/test/rabbit_fifo_v0_SUITE.erl b/test/rabbit_fifo_v0_SUITE.erl
new file mode 100644
index 0000000000..fcb84377de
--- /dev/null
+++ b/test/rabbit_fifo_v0_SUITE.erl
@@ -0,0 +1,1392 @@
+-module(rabbit_fifo_v0_SUITE).
+
+%% rabbit_fifo unit tests suite
+
+-compile(export_all).
+
+-compile({no_auto_import, [apply/3]}).
+-export([
+ ]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("src/rabbit_fifo_v0.hrl").
+
+%%%===================================================================
+%%% Common Test callbacks
+%%%===================================================================
+
+all() ->
+ [
+ {group, tests}
+ ].
+
+
+%% replicate eunit like test resultion
+all_tests() ->
+ [F || {F, _} <- ?MODULE:module_info(functions),
+ re:run(atom_to_list(F), "_test$") /= nomatch].
+
+groups() ->
+ [
+ {tests, [], all_tests()}
+ ].
+
+init_per_suite(Config) ->
+ Config.
+
+end_per_suite(_Config) ->
+ ok.
+
+init_per_group(_Group, Config) ->
+ Config.
+
+end_per_group(_Group, _Config) ->
+ ok.
+
+init_per_testcase(_TestCase, Config) ->
+ Config.
+
+end_per_testcase(_TestCase, _Config) ->
+ ok.
+
+%%%===================================================================
+%%% Test cases
+%%%===================================================================
+
+-define(ASSERT_EFF(EfxPat, Effects),
+ ?ASSERT_EFF(EfxPat, true, Effects)).
+
+-define(ASSERT_EFF(EfxPat, Guard, Effects),
+ ?assert(lists:any(fun (EfxPat) when Guard -> true;
+ (_) -> false
+ end, Effects))).
+
+-define(ASSERT_NO_EFF(EfxPat, Effects),
+ ?assert(not lists:any(fun (EfxPat) -> true;
+ (_) -> false
+ end, Effects))).
+
+-define(assertNoEffect(EfxPat, Effects),
+ ?assert(not lists:any(fun (EfxPat) -> true;
+ (_) -> false
+ end, Effects))).
+
+test_init(Name) ->
+ init(#{name => Name,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(Name, utf8)),
+ release_cursor_interval => 0}).
+
+enq_enq_checkout_test(_) ->
+ Cid = {<<"enq_enq_checkout_test">>, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ {State2, _} = enq(2, 2, second, State1),
+ {_State3, _, Effects} =
+ apply(meta(3),
+ rabbit_fifo_v0:make_checkout(Cid, {once, 2, simple_prefetch}, #{}),
+ State2),
+ ?ASSERT_EFF({monitor, _, _}, Effects),
+ ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, Effects),
+ ok.
+
+credit_enq_enq_checkout_settled_credit_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ {State2, _} = enq(2, 2, second, State1),
+ {State3, _, Effects} =
+ apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {auto, 1, credited}, #{}), State2),
+ ?ASSERT_EFF({monitor, _, _}, Effects),
+ Deliveries = lists:filter(fun ({send_msg, _, {delivery, _, _}, _}) -> true;
+ (_) -> false
+ end, Effects),
+ ?assertEqual(1, length(Deliveries)),
+ %% settle the delivery this should _not_ result in further messages being
+ %% delivered
+ {State4, SettledEffects} = settle(Cid, 4, 1, State3),
+ ?assertEqual(false, lists:any(fun ({send_msg, _, {delivery, _, _}, _}) ->
+ true;
+ (_) -> false
+ end, SettledEffects)),
+ %% granting credit (3) should deliver the second msg if the receivers
+ %% delivery count is (1)
+ {State5, CreditEffects} = credit(Cid, 5, 1, 1, false, State4),
+ % ?debugFmt("CreditEffects ~p ~n~p", [CreditEffects, State4]),
+ ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, CreditEffects),
+ {_State6, FinalEffects} = enq(6, 3, third, State5),
+ ?assertEqual(false, lists:any(fun ({send_msg, _, {delivery, _, _}, _}) ->
+ true;
+ (_) -> false
+ end, FinalEffects)),
+ ok.
+
+credit_with_drained_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ State0 = test_init(test),
+ %% checkout with a single credit
+ {State1, _, _} =
+ apply(meta(1), rabbit_fifo_v0:make_checkout(Cid, {auto, 1, credited},#{}),
+ State0),
+ ?assertMatch(#?STATE{consumers = #{Cid := #consumer{credit = 1,
+ delivery_count = 0}}},
+ State1),
+ {State, Result, _} =
+ apply(meta(3), rabbit_fifo_v0:make_credit(Cid, 0, 5, true), State1),
+ ?assertMatch(#?STATE{consumers = #{Cid := #consumer{credit = 0,
+ delivery_count = 5}}},
+ State),
+ ?assertEqual({multi, [{send_credit_reply, 0},
+ {send_drained, {?FUNCTION_NAME, 5}}]},
+ Result),
+ ok.
+
+credit_and_drain_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ {State2, _} = enq(2, 2, second, State1),
+ %% checkout without any initial credit (like AMQP 1.0 would)
+ {State3, _, CheckEffs} =
+ apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {auto, 0, credited}, #{}),
+ State2),
+
+ ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, CheckEffs),
+ {State4, {multi, [{send_credit_reply, 0},
+ {send_drained, {?FUNCTION_NAME, 2}}]},
+ Effects} = apply(meta(4), rabbit_fifo_v0:make_credit(Cid, 4, 0, true), State3),
+ ?assertMatch(#?STATE{consumers = #{Cid := #consumer{credit = 0,
+ delivery_count = 4}}},
+ State4),
+
+ ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}},
+ {_, {_, second}}]}, _}, Effects),
+ {_State5, EnqEffs} = enq(5, 2, third, State4),
+ ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, EnqEffs),
+ ok.
+
+
+
+enq_enq_deq_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ {State2, _} = enq(2, 2, second, State1),
+ % get returns a reply value
+ NumReady = 1,
+ {_State3, {dequeue, {0, {_, first}}, NumReady}, [{monitor, _, _}]} =
+ apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}),
+ State2),
+ ok.
+
+enq_enq_deq_deq_settle_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ {State2, _} = enq(2, 2, second, State1),
+ % get returns a reply value
+ {State3, {dequeue, {0, {_, first}}, 1}, [{monitor, _, _}]} =
+ apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}),
+ State2),
+ {_State4, {dequeue, empty}} =
+ apply(meta(4), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}),
+ State3),
+ ok.
+
+enq_enq_checkout_get_settled_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ % get returns a reply value
+ {_State2, {dequeue, {0, {_, first}}, _}, _Effs} =
+ apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}),
+ State1),
+ ok.
+
+checkout_get_empty_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ State = test_init(test),
+ {_State2, {dequeue, empty}} =
+ apply(meta(1), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), State),
+ ok.
+
+untracked_enq_deq_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ State0 = test_init(test),
+ {State1, _, _} = apply(meta(1),
+ rabbit_fifo_v0:make_enqueue(undefined, undefined, first),
+ State0),
+ {_State2, {dequeue, {0, {_, first}}, _}, _} =
+ apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State1),
+ ok.
+
+release_cursor_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ {State2, _} = enq(2, 2, second, State1),
+ {State3, _} = check(Cid, 3, 10, State2),
+ % no release cursor effect at this point
+ {State4, _} = settle(Cid, 4, 1, State3),
+ {_Final, Effects1} = settle(Cid, 5, 0, State4),
+ % empty queue forwards release cursor all the way
+ ?ASSERT_EFF({release_cursor, 5, _}, Effects1),
+ ok.
+
+checkout_enq_settle_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, [{monitor, _, _} | _]} = check(Cid, 1, test_init(test)),
+ {State2, Effects0} = enq(2, 1, first, State1),
+ ?ASSERT_EFF({send_msg, _,
+ {delivery, ?FUNCTION_NAME,
+ [{0, {_, first}}]}, _},
+ Effects0),
+ {State3, [_Inactive]} = enq(3, 2, second, State2),
+ {_, _Effects} = settle(Cid, 4, 0, State3),
+ % the release cursor is the smallest raft index that does not
+ % contribute to the state of the application
+ % ?ASSERT_EFF({release_cursor, 2, _}, Effects),
+ ok.
+
+out_of_order_enqueue_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
+ {State2, Effects2} = enq(2, 1, first, State1),
+ ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
+ % assert monitor was set up
+ ?ASSERT_EFF({monitor, _, _}, Effects2),
+ % enqueue seq num 3 and 4 before 2
+ {State3, Effects3} = enq(3, 3, third, State2),
+ ?assertNoEffect({send_msg, _, {delivery, _, _}, _}, Effects3),
+ {State4, Effects4} = enq(4, 4, fourth, State3),
+ % assert no further deliveries where made
+ ?assertNoEffect({send_msg, _, {delivery, _, _}, _}, Effects4),
+ {_State5, Effects5} = enq(5, 2, second, State4),
+ % assert two deliveries were now made
+ ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, second}},
+ {_, {_, third}},
+ {_, {_, fourth}}]}, _},
+ Effects5),
+ ok.
+
+out_of_order_first_enqueue_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ {State1, _} = check_n(Cid, 5, 5, test_init(test)),
+ {_State2, Effects2} = enq(2, 10, first, State1),
+ ?ASSERT_EFF({monitor, process, _}, Effects2),
+ ?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _},
+ Effects2),
+ ok.
+
+duplicate_enqueue_test(_) ->
+ Cid = {<<"duplicate_enqueue_test">>, self()},
+ {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
+ {State2, Effects2} = enq(2, 1, first, State1),
+ ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
+ {_State3, Effects3} = enq(3, 1, first, State2),
+ ?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects3),
+ ok.
+
+return_test(_) ->
+ Cid = {<<"cid">>, self()},
+ Cid2 = {<<"cid2">>, self()},
+ {State0, _} = enq(1, 1, msg, test_init(test)),
+ {State1, _} = check_auto(Cid, 2, State0),
+ {State2, _} = check_auto(Cid2, 3, State1),
+ {State3, _, _} = apply(meta(4), rabbit_fifo_v0:make_return(Cid, [0]), State2),
+ ?assertMatch(#{Cid := #consumer{checked_out = C}} when map_size(C) == 0,
+ State3#?STATE.consumers),
+ ?assertMatch(#{Cid2 := #consumer{checked_out = C2}} when map_size(C2) == 1,
+ State3#?STATE.consumers),
+ ok.
+
+return_dequeue_delivery_limit_test(_) ->
+ Init = init(#{name => test,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(test, utf8)),
+ release_cursor_interval => 0,
+ delivery_limit => 1}),
+ {State0, _} = enq(1, 1, msg, Init),
+
+ Cid = {<<"cid">>, self()},
+ Cid2 = {<<"cid2">>, self()},
+
+ {State1, {MsgId1, _}} = deq(2, Cid, unsettled, State0),
+ {State2, _, _} = apply(meta(4), rabbit_fifo_v0:make_return(Cid, [MsgId1]),
+ State1),
+
+ {State3, {MsgId2, _}} = deq(2, Cid2, unsettled, State2),
+ {State4, _, _} = apply(meta(4), rabbit_fifo_v0:make_return(Cid2, [MsgId2]),
+ State3),
+ ?assertMatch(#{num_messages := 0}, rabbit_fifo_v0:overview(State4)),
+ ok.
+
+return_non_existent_test(_) ->
+ Cid = {<<"cid">>, self()},
+ {State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)),
+ % return non-existent
+ {_State2, _} = apply(meta(3), rabbit_fifo_v0:make_return(Cid, [99]), State0),
+ ok.
+
+return_checked_out_test(_) ->
+ Cid = {<<"cid">>, self()},
+ {State0, [_, _]} = enq(1, 1, first, test_init(test)),
+ {State1, [_Monitor,
+ {send_msg, _, {delivery, _, [{MsgId, _}]}, _},
+ {aux, active} | _ ]} = check_auto(Cid, 2, State0),
+ % returning immediately checks out the same message again
+ {_, ok, [{send_msg, _, {delivery, _, [{_, _}]}, _},
+ {aux, active}]} =
+ apply(meta(3), rabbit_fifo_v0:make_return(Cid, [MsgId]), State1),
+ ok.
+
+return_checked_out_limit_test(_) ->
+ Cid = {<<"cid">>, self()},
+ Init = init(#{name => test,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(test, utf8)),
+ release_cursor_interval => 0,
+ delivery_limit => 1}),
+ {State0, [_, _]} = enq(1, 1, first, Init),
+ {State1, [_Monitor,
+ {send_msg, _, {delivery, _, [{MsgId, _}]}, _},
+ {aux, active} | _ ]} = check_auto(Cid, 2, State0),
+ % returning immediately checks out the same message again
+ {State2, ok, [{send_msg, _, {delivery, _, [{MsgId2, _}]}, _},
+ {aux, active}]} =
+ apply(meta(3), rabbit_fifo_v0:make_return(Cid, [MsgId]), State1),
+ {#?STATE{ra_indexes = RaIdxs}, ok, [_ReleaseEff]} =
+ apply(meta(4), rabbit_fifo_v0:make_return(Cid, [MsgId2]), State2),
+ ?assertEqual(0, rabbit_fifo_index:size(RaIdxs)),
+ ok.
+
+return_auto_checked_out_test(_) ->
+ Cid = {<<"cid">>, self()},
+ {State00, [_, _]} = enq(1, 1, first, test_init(test)),
+ {State0, [_]} = enq(2, 2, second, State00),
+ % it first active then inactive as the consumer took on but cannot take
+ % any more
+ {State1, [_Monitor,
+ {send_msg, _, {delivery, _, [{MsgId, _}]}, _},
+ {aux, active},
+ {aux, inactive}
+ ]} = check_auto(Cid, 2, State0),
+ % return should include another delivery
+ {_State2, _, Effects} = apply(meta(3), rabbit_fifo_v0:make_return(Cid, [MsgId]), State1),
+ ?ASSERT_EFF({send_msg, _,
+ {delivery, _, [{_, {#{delivery_count := 1}, first}}]}, _},
+ Effects),
+ ok.
+
+cancelled_checkout_out_test(_) ->
+ Cid = {<<"cid">>, self()},
+ {State00, [_, _]} = enq(1, 1, first, test_init(test)),
+ {State0, [_]} = enq(2, 2, second, State00),
+ {State1, _} = check_auto(Cid, 2, State0),
+ % cancelled checkout should not return pending messages to queue
+ {State2, _, _} = apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, cancel, #{}), State1),
+ ?assertEqual(1, maps:size(State2#?STATE.messages)),
+ ?assertEqual(0, lqueue:len(State2#?STATE.returns)),
+
+ {State3, {dequeue, empty}} =
+ apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State2),
+ %% settle
+ {State4, ok, _} =
+ apply(meta(4), rabbit_fifo_v0:make_settle(Cid, [0]), State3),
+
+ {_State, {dequeue, {_, {_, second}}, _}, _} =
+ apply(meta(5), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State4),
+ ok.
+
+down_with_noproc_consumer_returns_unsettled_test(_) ->
+ Cid = {<<"down_consumer_returns_unsettled_test">>, self()},
+ {State0, [_, _]} = enq(1, 1, second, test_init(test)),
+ {State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0),
+ {State2, _, _} = apply(meta(3), {down, Pid, noproc}, State1),
+ {_State, Effects} = check(Cid, 4, State2),
+ ?ASSERT_EFF({monitor, process, _}, Effects),
+ ok.
+
+down_with_noconnection_marks_suspect_and_node_is_monitored_test(_) ->
+ Pid = spawn(fun() -> ok end),
+ Cid = {<<"down_with_noconnect">>, Pid},
+ Self = self(),
+ Node = node(Pid),
+ {State0, Effects0} = enq(1, 1, second, test_init(test)),
+ ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0),
+ {State1, Effects1} = check_auto(Cid, 2, State0),
+ #consumer{credit = 0} = maps:get(Cid, State1#?STATE.consumers),
+ ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1),
+ % monitor both enqueuer and consumer
+ % because we received a noconnection we now need to monitor the node
+ {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
+ #consumer{credit = 1,
+ checked_out = Ch,
+ status = suspected_down} = maps:get(Cid, State2a#?STATE.consumers),
+ ?assertEqual(#{}, Ch),
+ %% validate consumer has credit
+ {State2, _, Effects2} = apply(meta(3), {down, Self, noconnection}, State2a),
+ ?ASSERT_EFF({monitor, node, _}, Effects2),
+ ?assertNoEffect({demonitor, process, _}, Effects2),
+ % when the node comes up we need to retry the process monitors for the
+ % disconnected processes
+ {State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2),
+ #consumer{status = up} = maps:get(Cid, State3#?STATE.consumers),
+ % try to re-monitor the suspect processes
+ ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects3),
+ ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3),
+ ok.
+
+down_with_noconnection_returns_unack_test(_) ->
+ Pid = spawn(fun() -> ok end),
+ Cid = {<<"down_with_noconnect">>, Pid},
+ {State0, _} = enq(1, 1, second, test_init(test)),
+ ?assertEqual(1, maps:size(State0#?STATE.messages)),
+ ?assertEqual(0, lqueue:len(State0#?STATE.returns)),
+ {State1, {_, _}} = deq(2, Cid, unsettled, State0),
+ ?assertEqual(0, maps:size(State1#?STATE.messages)),
+ ?assertEqual(0, lqueue:len(State1#?STATE.returns)),
+ {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
+ ?assertEqual(0, maps:size(State2a#?STATE.messages)),
+ ?assertEqual(1, lqueue:len(State2a#?STATE.returns)),
+ ?assertMatch(#consumer{checked_out = Ch,
+ status = suspected_down}
+ when map_size(Ch) == 0,
+ maps:get(Cid, State2a#?STATE.consumers)),
+ ok.
+
+down_with_noproc_enqueuer_is_cleaned_up_test(_) ->
+ State00 = test_init(test),
+ Pid = spawn(fun() -> ok end),
+ {State0, _, Effects0} = apply(meta(1), rabbit_fifo_v0:make_enqueue(Pid, 1, first), State00),
+ ?ASSERT_EFF({monitor, process, _}, Effects0),
+ {State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0),
+ % ensure there are no enqueuers
+ ?assert(0 =:= maps:size(State1#?STATE.enqueuers)),
+ ok.
+
+discarded_message_without_dead_letter_handler_is_removed_test(_) ->
+ Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()},
+ {State0, [_, _]} = enq(1, 1, first, test_init(test)),
+ {State1, Effects1} = check_n(Cid, 2, 10, State0),
+ ?ASSERT_EFF({send_msg, _,
+ {delivery, _, [{0, {_, first}}]}, _},
+ Effects1),
+ {_State2, _, Effects2} = apply(meta(1),
+ rabbit_fifo_v0:make_discard(Cid, [0]), State1),
+ ?assertNoEffect({send_msg, _,
+ {delivery, _, [{0, {_, first}}]}, _},
+ Effects2),
+ ok.
+
+discarded_message_with_dead_letter_handler_emits_log_effect_test(_) ->
+ Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()},
+ State00 = init(#{name => test,
+ queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
+ dead_letter_handler =>
+ {somemod, somefun, [somearg]}}),
+ {State0, [_, _]} = enq(1, 1, first, State00),
+ {State1, Effects1} = check_n(Cid, 2, 10, State0),
+ ?ASSERT_EFF({send_msg, _,
+ {delivery, _, [{0, {_, first}}]}, _},
+ Effects1),
+ {_State2, _, Effects2} = apply(meta(1), rabbit_fifo_v0:make_discard(Cid, [0]), State1),
+ % assert mod call effect with appended reason and message
+ ?ASSERT_EFF({log, _RaftIdxs, _}, Effects2),
+ ok.
+
+tick_test(_) ->
+ Cid = {<<"c">>, self()},
+ Cid2 = {<<"c2">>, self()},
+ {S0, _} = enq(1, 1, <<"fst">>, test_init(?FUNCTION_NAME)),
+ {S1, _} = enq(2, 2, <<"snd">>, S0),
+ {S2, {MsgId, _}} = deq(3, Cid, unsettled, S1),
+ {S3, {_, _}} = deq(4, Cid2, unsettled, S2),
+ {S4, _, _} = apply(meta(5), rabbit_fifo_v0:make_return(Cid, [MsgId]), S3),
+
+ [{mod_call, rabbit_quorum_queue, handle_tick,
+ [#resource{},
+ {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3},
+ [_Node]
+ ]}] = rabbit_fifo_v0:tick(1, S4),
+ ok.
+
+
+delivery_query_returns_deliveries_test(_) ->
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
+ Cid = {Tag, self()},
+ Commands = [
+ rabbit_fifo_v0:make_checkout(Cid, {auto, 5, simple_prefetch}, #{}),
+ rabbit_fifo_v0:make_enqueue(self(), 1, one),
+ rabbit_fifo_v0:make_enqueue(self(), 2, two),
+ rabbit_fifo_v0:make_enqueue(self(), 3, tre),
+ rabbit_fifo_v0:make_enqueue(self(), 4, for)
+ ],
+ Indexes = lists:seq(1, length(Commands)),
+ Entries = lists:zip(Indexes, Commands),
+ {State, _Effects} = run_log(test_init(help), Entries),
+ % 3 deliveries are returned
+ [{0, {_, one}}] = rabbit_fifo_v0:get_checked_out(Cid, 0, 0, State),
+ [_, _, _] = rabbit_fifo_v0:get_checked_out(Cid, 1, 3, State),
+ ok.
+
+pending_enqueue_is_enqueued_on_down_test(_) ->
+ Cid = {<<"cid">>, self()},
+ Pid = self(),
+ {State0, _} = enq(1, 2, first, test_init(test)),
+ {State1, _, _} = apply(meta(2), {down, Pid, noproc}, State0),
+ {_State2, {dequeue, {0, {_, first}}, 0}, _} =
+ apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State1),
+ ok.
+
+duplicate_delivery_test(_) ->
+ {State0, _} = enq(1, 1, first, test_init(test)),
+ {#?STATE{ra_indexes = RaIdxs,
+ messages = Messages}, _} = enq(2, 1, first, State0),
+ ?assertEqual(1, rabbit_fifo_index:size(RaIdxs)),
+ ?assertEqual(1, maps:size(Messages)),
+ ok.
+
+state_enter_file_handle_leader_reservation_test(_) ->
+ S0 = init(#{name => the_name,
+ queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
+ become_leader_handler => {m, f, [a]}}),
+
+ Resource = {resource, <<"/">>, queue, <<"test">>},
+ Effects = rabbit_fifo_v0:state_enter(leader, S0),
+ ?assertEqual([
+ {mod_call, m, f, [a, the_name]},
+ {mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}
+ ], Effects),
+ ok.
+
+state_enter_file_handle_other_reservation_test(_) ->
+ S0 = init(#{name => the_name,
+ queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>)}),
+ Effects = rabbit_fifo_v0:state_enter(other, S0),
+ ?assertEqual([
+ {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []}
+ ],
+ Effects),
+ ok.
+
+state_enter_monitors_and_notifications_test(_) ->
+ Oth = spawn(fun () -> ok end),
+ {State0, _} = enq(1, 1, first, test_init(test)),
+ Cid = {<<"adf">>, self()},
+ OthCid = {<<"oth">>, Oth},
+ {State1, _} = check(Cid, 2, State0),
+ {State, _} = check(OthCid, 3, State1),
+ Self = self(),
+ Effects = rabbit_fifo_v0:state_enter(leader, State),
+
+ %% monitor all enqueuers and consumers
+ [{monitor, process, Self},
+ {monitor, process, Oth}] =
+ lists:filter(fun ({monitor, process, _}) -> true;
+ (_) -> false
+ end, Effects),
+ [{send_msg, Self, leader_change, ra_event},
+ {send_msg, Oth, leader_change, ra_event}] =
+ lists:filter(fun ({send_msg, _, leader_change, ra_event}) -> true;
+ (_) -> false
+ end, Effects),
+ ?ASSERT_EFF({monitor, process, _}, Effects),
+ ok.
+
+purge_test(_) ->
+ Cid = {<<"purge_test">>, self()},
+ {State1, _} = enq(1, 1, first, test_init(test)),
+ {State2, {purge, 1}, _} = apply(meta(2), rabbit_fifo_v0:make_purge(), State1),
+ {State3, _} = enq(3, 2, second, State2),
+ % get returns a reply value
+ {_State4, {dequeue, {0, {_, second}}, _}, [{monitor, _, _}]} =
+ apply(meta(4), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), State3),
+ ok.
+
+purge_with_checkout_test(_) ->
+ Cid = {<<"purge_test">>, self()},
+ {State0, _} = check_auto(Cid, 1, test_init(?FUNCTION_NAME)),
+ {State1, _} = enq(2, 1, <<"first">>, State0),
+ {State2, _} = enq(3, 2, <<"second">>, State1),
+ %% assert message bytes are non zero
+ ?assert(State2#?STATE.msg_bytes_checkout > 0),
+ ?assert(State2#?STATE.msg_bytes_enqueue > 0),
+ {State3, {purge, 1}, _} = apply(meta(2), rabbit_fifo_v0:make_purge(), State2),
+ ?assert(State2#?STATE.msg_bytes_checkout > 0),
+ ?assertEqual(0, State3#?STATE.msg_bytes_enqueue),
+ ?assertEqual(1, rabbit_fifo_index:size(State3#?STATE.ra_indexes)),
+ #consumer{checked_out = Checked} = maps:get(Cid, State3#?STATE.consumers),
+ ?assertEqual(1, maps:size(Checked)),
+ ok.
+
+down_noproc_returns_checked_out_in_order_test(_) ->
+ S0 = test_init(?FUNCTION_NAME),
+ %% enqueue 100
+ S1 = lists:foldl(fun (Num, FS0) ->
+ {FS, _} = enq(Num, Num, Num, FS0),
+ FS
+ end, S0, lists:seq(1, 100)),
+ ?assertEqual(100, maps:size(S1#?STATE.messages)),
+ Cid = {<<"cid">>, self()},
+ {S2, _} = check(Cid, 101, 1000, S1),
+ #consumer{checked_out = Checked} = maps:get(Cid, S2#?STATE.consumers),
+ ?assertEqual(100, maps:size(Checked)),
+ %% simulate down
+ {S, _, _} = apply(meta(102), {down, self(), noproc}, S2),
+ Returns = lqueue:to_list(S#?STATE.returns),
+ ?assertEqual(100, length(Returns)),
+ ?assertEqual(0, maps:size(S#?STATE.consumers)),
+ %% validate returns are in order
+ ?assertEqual(lists:sort(Returns), Returns),
+ ok.
+
+down_noconnection_returns_checked_out_test(_) ->
+ S0 = test_init(?FUNCTION_NAME),
+ NumMsgs = 20,
+ S1 = lists:foldl(fun (Num, FS0) ->
+ {FS, _} = enq(Num, Num, Num, FS0),
+ FS
+ end, S0, lists:seq(1, NumMsgs)),
+ ?assertEqual(NumMsgs, maps:size(S1#?STATE.messages)),
+ Cid = {<<"cid">>, self()},
+ {S2, _} = check(Cid, 101, 1000, S1),
+ #consumer{checked_out = Checked} = maps:get(Cid, S2#?STATE.consumers),
+ ?assertEqual(NumMsgs, maps:size(Checked)),
+ %% simulate down
+ {S, _, _} = apply(meta(102), {down, self(), noconnection}, S2),
+ Returns = lqueue:to_list(S#?STATE.returns),
+ ?assertEqual(NumMsgs, length(Returns)),
+ ?assertMatch(#consumer{checked_out = Ch}
+ when map_size(Ch) == 0,
+ maps:get(Cid, S#?STATE.consumers)),
+ %% validate returns are in order
+ ?assertEqual(lists:sort(Returns), Returns),
+ ok.
+
+single_active_consumer_basic_get_test(_) ->
+ Cid = {?FUNCTION_NAME, self()},
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+ ?assertEqual(single_active, State0#?STATE.cfg#cfg.consumer_strategy),
+ ?assertEqual(0, map_size(State0#?STATE.consumers)),
+ {State1, _} = enq(1, 1, first, State0),
+ {_State, {error, unsupported}} =
+ apply(meta(2), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}),
+ State1),
+ ok.
+
+single_active_consumer_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+ ?assertEqual(single_active, State0#?STATE.cfg#cfg.consumer_strategy),
+ ?assertEqual(0, map_size(State0#?STATE.consumers)),
+
+ % adding some consumers
+ AddConsumer = fun(CTag, State) ->
+ {NewState, _, _} = apply(
+ meta(1),
+ make_checkout({CTag, self()},
+ {once, 1, simple_prefetch},
+ #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+ C1 = {<<"ctag1">>, self()},
+ C2 = {<<"ctag2">>, self()},
+ C3 = {<<"ctag3">>, self()},
+ C4 = {<<"ctag4">>, self()},
+
+ % the first registered consumer is the active one, the others are waiting
+ ?assertEqual(1, map_size(State1#?STATE.consumers)),
+ ?assertMatch(#{C1 := _}, State1#?STATE.consumers),
+ ?assertEqual(3, length(State1#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C2, 1, State1#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C3, 1, State1#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C4, 1, State1#?STATE.waiting_consumers)),
+
+ % cancelling a waiting consumer
+ {State2, _, Effects1} = apply(meta(2),
+ make_checkout(C3, cancel, #{}),
+ State1),
+ % the active consumer should still be in place
+ ?assertEqual(1, map_size(State2#?STATE.consumers)),
+ ?assertMatch(#{C1 := _}, State2#?STATE.consumers),
+ % the cancelled consumer has been removed from waiting consumers
+ ?assertEqual(2, length(State2#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C2, 1, State2#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C4, 1, State2#?STATE.waiting_consumers)),
+ % there are some effects to unregister the consumer
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C3, Effects1),
+
+ % cancelling the active consumer
+ {State3, _, Effects2} = apply(meta(3),
+ make_checkout(C1, cancel, #{}),
+ State2),
+ % the second registered consumer is now the active one
+ ?assertEqual(1, map_size(State3#?STATE.consumers)),
+ ?assertMatch(#{C2 := _}, State3#?STATE.consumers),
+ % the new active consumer is no longer in the waiting list
+ ?assertEqual(1, length(State3#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C4, 1,
+ State3#?STATE.waiting_consumers)),
+ %% should have a cancel consumer handler mod_call effect and
+ %% an active new consumer effect
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C1, Effects2),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ update_consumer_handler, _}, Effects2),
+
+ % cancelling the active consumer
+ {State4, _, Effects3} = apply(meta(4),
+ make_checkout(C2, cancel, #{}),
+ State3),
+ % the last waiting consumer became the active one
+ ?assertEqual(1, map_size(State4#?STATE.consumers)),
+ ?assertMatch(#{C4 := _}, State4#?STATE.consumers),
+ % the waiting consumer list is now empty
+ ?assertEqual(0, length(State4#?STATE.waiting_consumers)),
+ % there are some effects to unregister the consumer and
+ % to update the new active one (metrics)
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C2, Effects3),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ update_consumer_handler, _}, Effects3),
+
+ % cancelling the last consumer
+ {State5, _, Effects4} = apply(meta(5),
+ make_checkout(C4, cancel, #{}),
+ State4),
+ % no active consumer anymore
+ ?assertEqual(0, map_size(State5#?STATE.consumers)),
+ % still nothing in the waiting list
+ ?assertEqual(0, length(State5#?STATE.waiting_consumers)),
+ % there is an effect to unregister the consumer + queue inactive effect
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, _}, Effects4),
+
+ ok.
+
+single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ [C1, C2, C3, C4] = Consumers =
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2},
+ {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}],
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ #{index => 1},
+ make_checkout({CTag, ChannelId}, {once, 1, simple_prefetch}, #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0, Consumers),
+
+ % the channel of the active consumer goes down
+ {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, noproc}, State1),
+ % fell back to another consumer
+ ?assertEqual(1, map_size(State2#?STATE.consumers)),
+ % there are still waiting consumers
+ ?assertEqual(2, length(State2#?STATE.waiting_consumers)),
+ % effects to unregister the consumer and
+ % to update the new active one (metrics) are there
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C1, Effects),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ update_consumer_handler, _}, Effects),
+
+ % the channel of the active consumer and a waiting consumer goes down
+ {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, noproc}, State2),
+ % fell back to another consumer
+ ?assertEqual(1, map_size(State3#?STATE.consumers)),
+ % no more waiting consumer
+ ?assertEqual(0, length(State3#?STATE.waiting_consumers)),
+ % effects to cancel both consumers of this channel + effect to update the new active one (metrics)
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C2, Effects2),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C3, Effects2),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ update_consumer_handler, _}, Effects2),
+
+ % the last channel goes down
+ {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3),
+ % no more consumers
+ ?assertEqual(0, map_size(State4#?STATE.consumers)),
+ ?assertEqual(0, length(State4#?STATE.waiting_consumers)),
+ % there is an effect to unregister the consumer + queue inactive effect
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C4, Effects3),
+
+ ok.
+
+single_active_returns_messages_on_noconnection_test(_) ->
+ R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => R,
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+ Meta = #{index => 1},
+ Nodes = [n1],
+ ConsumerIds = [{_, DownPid}] =
+ [begin
+ B = atom_to_binary(N, utf8),
+ {<<"ctag_", B/binary>>,
+ test_util:fake_pid(N)}
+ end || N <- Nodes],
+ % adding some consumers
+ State1 = lists:foldl(
+ fun(CId, Acc0) ->
+ {Acc, _, _} =
+ apply(Meta,
+ make_checkout(CId,
+ {once, 1, simple_prefetch}, #{}),
+ Acc0),
+ Acc
+ end, State0, ConsumerIds),
+ {State2, _} = enq(4, 1, msg1, State1),
+ % simulate node goes down
+ {State3, _, _} = apply(meta(5), {down, DownPid, noconnection}, State2),
+ %% assert the consumer is up
+ ?assertMatch([_], lqueue:to_list(State3#?STATE.returns)),
+ ?assertMatch([{_, #consumer{checked_out = Checked}}]
+ when map_size(Checked) == 0,
+ State3#?STATE.waiting_consumers),
+
+ ok.
+
+single_active_consumer_replaces_consumer_when_down_noconnection_test(_) ->
+ R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => R,
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+ Meta = #{index => 1},
+ Nodes = [n1, n2, node()],
+ ConsumerIds = [C1 = {_, DownPid}, C2, _C3] =
+ [begin
+ B = atom_to_binary(N, utf8),
+ {<<"ctag_", B/binary>>,
+ test_util:fake_pid(N)}
+ end || N <- Nodes],
+ % adding some consumers
+ State1a = lists:foldl(
+ fun(CId, Acc0) ->
+ {Acc, _, _} =
+ apply(Meta,
+ make_checkout(CId,
+ {once, 1, simple_prefetch}, #{}),
+ Acc0),
+ Acc
+ end, State0, ConsumerIds),
+
+ %% assert the consumer is up
+ ?assertMatch(#{C1 := #consumer{status = up}},
+ State1a#?STATE.consumers),
+
+ {State1, _} = enq(10, 1, msg, State1a),
+
+ % simulate node goes down
+ {State2, _, _} = apply(meta(5), {down, DownPid, noconnection}, State1),
+
+ %% assert a new consumer is in place and it is up
+ ?assertMatch([{C2, #consumer{status = up,
+ checked_out = Ch}}]
+ when map_size(Ch) == 1,
+ maps:to_list(State2#?STATE.consumers)),
+
+ %% the disconnected consumer has been returned to waiting
+ ?assert(lists:any(fun ({C,_}) -> C =:= C1 end,
+ State2#?STATE.waiting_consumers)),
+ ?assertEqual(2, length(State2#?STATE.waiting_consumers)),
+
+ % simulate node comes back up
+ {State3, _, _} = apply(#{index => 2}, {nodeup, node(DownPid)}, State2),
+
+ %% the consumer is still active and the same as before
+ ?assertMatch([{C2, #consumer{status = up}}],
+ maps:to_list(State3#?STATE.consumers)),
+ % the waiting consumers should be un-suspected
+ ?assertEqual(2, length(State3#?STATE.waiting_consumers)),
+ lists:foreach(fun({_, #consumer{status = Status}}) ->
+ ?assert(Status /= suspected_down)
+ end, State3#?STATE.waiting_consumers),
+ ok.
+
+single_active_consumer_all_disconnected_test(_) ->
+ R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => R,
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+ Meta = #{index => 1},
+ Nodes = [n1, n2],
+ ConsumerIds = [C1 = {_, C1Pid}, C2 = {_, C2Pid}] =
+ [begin
+ B = atom_to_binary(N, utf8),
+ {<<"ctag_", B/binary>>,
+ test_util:fake_pid(N)}
+ end || N <- Nodes],
+ % adding some consumers
+ State1 = lists:foldl(
+ fun(CId, Acc0) ->
+ {Acc, _, _} =
+ apply(Meta,
+ make_checkout(CId,
+ {once, 1, simple_prefetch}, #{}),
+ Acc0),
+ Acc
+ end, State0, ConsumerIds),
+
+ %% assert the consumer is up
+ ?assertMatch(#{C1 := #consumer{status = up}}, State1#?STATE.consumers),
+
+ % simulate node goes down
+ {State2, _, _} = apply(meta(5), {down, C1Pid, noconnection}, State1),
+ %% assert the consumer fails over to the consumer on n2
+ ?assertMatch(#{C2 := #consumer{status = up}}, State2#?STATE.consumers),
+ {State3, _, _} = apply(meta(6), {down, C2Pid, noconnection}, State2),
+ %% assert these no active consumer after both nodes are maked as down
+ ?assertMatch([], maps:to_list(State3#?STATE.consumers)),
+ %% n2 comes back
+ {State4, _, _} = apply(meta(7), {nodeup, node(C2Pid)}, State3),
+ %% ensure n2 is the active consumer as this node as been registered
+ %% as up again
+ ?assertMatch([{{<<"ctag_n2">>, _}, #consumer{status = up,
+ credit = 1}}],
+ maps:to_list(State4#?STATE.consumers)),
+ ok.
+
+single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource =>
+ rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ Meta = #{index => 1},
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ Meta,
+ make_checkout({CTag, ChannelId},
+ {once, 1, simple_prefetch}, #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ Effects = rabbit_fifo_v0:state_enter(leader, State1),
+ %% 2 effects for each consumer process (channel process), 1 effect for the node,
+ %% 1 effect for file handle reservation
+ ?assertEqual(2 * 3 + 1 + 1, length(Effects)).
+
+single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) ->
+ Resource = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => Resource,
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ Meta = #{index => 1},
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ Meta,
+ make_checkout({CTag, ChannelId},
+ {once, 1, simple_prefetch}, #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2},
+ {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ Effects = rabbit_fifo_v0:state_enter(eol, State1),
+ %% 1 effect for each consumer process (channel process),
+ %% 1 effect for file handle reservation
+ ?assertEqual(4, length(Effects)).
+
+query_consumers_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => false}),
+
+ % adding some consumers
+ AddConsumer = fun(CTag, State) ->
+ {NewState, _, _} = apply(
+ #{index => 1},
+ make_checkout({CTag, self()},
+ {once, 1, simple_prefetch}, #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+ Consumers0 = State1#?STATE.consumers,
+ Consumer = maps:get({<<"ctag2">>, self()}, Consumers0),
+ Consumers1 = maps:put({<<"ctag2">>, self()},
+ Consumer#consumer{status = suspected_down}, Consumers0),
+ State2 = State1#?STATE{consumers = Consumers1},
+
+ ?assertEqual(4, rabbit_fifo_v0:query_consumer_count(State2)),
+ Consumers2 = rabbit_fifo_v0:query_consumers(State2),
+ ?assertEqual(4, maps:size(Consumers2)),
+ maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) ->
+ ?assertEqual(self(), Pid),
+ case Tag of
+ <<"ctag2">> ->
+ ?assertNot(Active),
+ ?assertEqual(suspected_down, ActivityStatus);
+ _ ->
+ ?assert(Active),
+ ?assertEqual(up, ActivityStatus)
+ end
+ end, [], Consumers2).
+
+query_consumers_when_single_active_consumer_is_on_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+ Meta = #{index => 1},
+ % adding some consumers
+ AddConsumer = fun(CTag, State) ->
+ {NewState, _, _} = apply(
+ Meta,
+ make_checkout({CTag, self()},
+ {once, 1, simple_prefetch}, #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+
+ ?assertEqual(4, rabbit_fifo_v0:query_consumer_count(State1)),
+ Consumers = rabbit_fifo_v0:query_consumers(State1),
+ ?assertEqual(4, maps:size(Consumers)),
+ maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) ->
+ ?assertEqual(self(), Pid),
+ case Tag of
+ <<"ctag1">> ->
+ ?assert(Active),
+ ?assertEqual(single_active, ActivityStatus);
+ _ ->
+ ?assertNot(Active),
+ ?assertEqual(waiting, ActivityStatus)
+ end
+ end, [], Consumers).
+
+active_flag_updated_when_consumer_suspected_unsuspected_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => false}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} =
+ apply(
+ #{index => 1},
+ rabbit_fifo_v0:make_checkout({CTag, ChannelId},
+ {once, 1, simple_prefetch},
+ #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ {State2, _, Effects2} = apply(#{index => 3}, {down, Pid1, noconnection}, State1),
+ % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node
+ ?assertEqual(4 + 1, length(Effects2)),
+
+ {_, _, Effects3} = apply(#{index => 4}, {nodeup, node(self())}, State2),
+ % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID
+ ?assertEqual(4 + 4, length(Effects3)).
+
+active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ #{index => 1},
+ make_checkout({CTag, ChannelId},
+ {once, 1, simple_prefetch}, #{}),
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2},
+ {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ {State2, _, Effects2} = apply(#{index => 2}, {down, Pid1, noconnection}, State1),
+ % one monitor and one consumer status update (deactivated)
+ ?assertEqual(3, length(Effects2)),
+
+ {_, _, Effects3} = apply(#{index => 3}, {nodeup, node(self())}, State2),
+ % for each consumer: 1 effect to monitor the consumer PID
+ ?assertEqual(5, length(Effects3)).
+
+single_active_cancelled_with_unacked_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+
+ C1 = {<<"ctag1">>, self()},
+ C2 = {<<"ctag2">>, self()},
+ % adding some consumers
+ AddConsumer = fun(C, S0) ->
+ {S, _, _} = apply(
+ meta(1),
+ make_checkout(C,
+ {auto, 1, simple_prefetch},
+ #{}),
+ S0),
+ S
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [C1, C2]),
+
+ %% enqueue 2 messages
+ {State2, _Effects2} = enq(3, 1, msg1, State1),
+ {State3, _Effects3} = enq(4, 2, msg2, State2),
+ %% one should be checked ou to C1
+ %% cancel C1
+ {State4, _, _} = apply(meta(5),
+ make_checkout(C1, cancel, #{}),
+ State3),
+ %% C2 should be the active consumer
+ ?assertMatch(#{C2 := #consumer{status = up,
+ checked_out = #{0 := _}}},
+ State4#?STATE.consumers),
+ %% C1 should be a cancelled consumer
+ ?assertMatch(#{C1 := #consumer{status = cancelled,
+ lifetime = once,
+ checked_out = #{0 := _}}},
+ State4#?STATE.consumers),
+ ?assertMatch([], State4#?STATE.waiting_consumers),
+
+ %% Ack both messages
+ {State5, _Effects5} = settle(C1, 1, 0, State4),
+ %% C1 should now be cancelled
+ {State6, _Effects6} = settle(C2, 2, 0, State5),
+
+ %% C2 should remain
+ ?assertMatch(#{C2 := #consumer{status = up}},
+ State6#?STATE.consumers),
+ %% C1 should be gone
+ ?assertNotMatch(#{C1 := _},
+ State6#?STATE.consumers),
+ ?assertMatch([], State6#?STATE.waiting_consumers),
+ ok.
+
+single_active_with_credited_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+
+ C1 = {<<"ctag1">>, self()},
+ C2 = {<<"ctag2">>, self()},
+ % adding some consumers
+ AddConsumer = fun(C, S0) ->
+ {S, _, _} = apply(
+ meta(1),
+ make_checkout(C,
+ {auto, 0, credited},
+ #{}),
+ S0),
+ S
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [C1, C2]),
+
+ %% add some credit
+ C1Cred = rabbit_fifo_v0:make_credit(C1, 5, 0, false),
+ {State2, _, _Effects2} = apply(meta(3), C1Cred, State1),
+ C2Cred = rabbit_fifo_v0:make_credit(C2, 4, 0, false),
+ {State3, _} = apply(meta(4), C2Cred, State2),
+ %% both consumers should have credit
+ ?assertMatch(#{C1 := #consumer{credit = 5}},
+ State3#?STATE.consumers),
+ ?assertMatch([{C2, #consumer{credit = 4}}],
+ State3#?STATE.waiting_consumers),
+ ok.
+
+purge_nodes_test(_) ->
+ Node = purged@node,
+ ThisNode = node(),
+ EnqPid = test_util:fake_pid(Node),
+ EnqPid2 = test_util:fake_pid(node()),
+ ConPid = test_util:fake_pid(Node),
+ Cid = {<<"tag">>, ConPid},
+ % WaitingPid = test_util:fake_pid(Node),
+
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ single_active_consumer_on => false}),
+ {State1, _, _} = apply(meta(1),
+ rabbit_fifo_v0:make_enqueue(EnqPid, 1, msg1),
+ State0),
+ {State2, _, _} = apply(meta(2),
+ rabbit_fifo_v0:make_enqueue(EnqPid2, 1, msg2),
+ State1),
+ {State3, _} = check(Cid, 3, 1000, State2),
+ {State4, _, _} = apply(meta(4),
+ {down, EnqPid, noconnection},
+ State3),
+ ?assertMatch(
+ [{mod_call, rabbit_quorum_queue, handle_tick,
+ [#resource{}, _Metrics,
+ [ThisNode, Node]
+ ]}] , rabbit_fifo_v0:tick(1, State4)),
+ %% assert there are both enqueuers and consumers
+ {State, _, _} = apply(meta(5),
+ rabbit_fifo_v0:make_purge_nodes([Node]),
+ State4),
+
+ %% assert there are no enqueuers nor consumers
+ ?assertMatch(#?STATE{enqueuers = Enqs} when map_size(Enqs) == 1, State),
+ ?assertMatch(#?STATE{consumers = Cons} when map_size(Cons) == 0, State),
+ ?assertMatch(
+ [{mod_call, rabbit_quorum_queue, handle_tick,
+ [#resource{}, _Metrics,
+ [ThisNode]
+ ]}] , rabbit_fifo_v0:tick(1, State)),
+ ok.
+
+meta(Idx) ->
+ #{index => Idx, term => 1,
+ from => {make_ref(), self()}}.
+
+enq(Idx, MsgSeq, Msg, State) ->
+ strip_reply(
+ apply(meta(Idx), rabbit_fifo_v0:make_enqueue(self(), MsgSeq, Msg), State)).
+
+deq(Idx, Cid, Settlement, State0) ->
+ {State, {dequeue, {MsgId, Msg}, _}, _} =
+ apply(meta(Idx),
+ rabbit_fifo_v0:make_checkout(Cid, {dequeue, Settlement}, #{}),
+ State0),
+ {State, {MsgId, Msg}}.
+
+check_n(Cid, Idx, N, State) ->
+ strip_reply(
+ apply(meta(Idx),
+ rabbit_fifo_v0:make_checkout(Cid, {auto, N, simple_prefetch}, #{}),
+ State)).
+
+check(Cid, Idx, State) ->
+ strip_reply(
+ apply(meta(Idx),
+ rabbit_fifo_v0:make_checkout(Cid, {once, 1, simple_prefetch}, #{}),
+ State)).
+
+check_auto(Cid, Idx, State) ->
+ strip_reply(
+ apply(meta(Idx),
+ rabbit_fifo_v0:make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
+ State)).
+
+check(Cid, Idx, Num, State) ->
+ strip_reply(
+ apply(meta(Idx),
+ rabbit_fifo_v0:make_checkout(Cid, {auto, Num, simple_prefetch}, #{}),
+ State)).
+
+settle(Cid, Idx, MsgId, State) ->
+ strip_reply(apply(meta(Idx), rabbit_fifo_v0:make_settle(Cid, [MsgId]), State)).
+
+credit(Cid, Idx, Credit, DelCnt, Drain, State) ->
+ strip_reply(apply(meta(Idx), rabbit_fifo_v0:make_credit(Cid, Credit, DelCnt, Drain),
+ State)).
+
+strip_reply({State, _, Effects}) ->
+ {State, Effects}.
+
+run_log(InitState, Entries) ->
+ lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) ->
+ case apply(meta(Idx), E, Acc0) of
+ {Acc, _, Efx} when is_list(Efx) ->
+ {Acc, Efx0 ++ Efx};
+ {Acc, _, Efx} ->
+ {Acc, Efx0 ++ [Efx]};
+ {Acc, _} ->
+ {Acc, Efx0}
+ end
+ end, {InitState, []}, Entries).
+
+
+%% AUX Tests
+
+aux_test(_) ->
+ _ = ra_machine_ets:start_link(),
+ Aux0 = init_aux(aux_test),
+ MacState = init(#{name => aux_test,
+ queue_resource =>
+ rabbit_misc:r(<<"/">>, queue, <<"test">>)}),
+ ok = meck:new(ra_log, []),
+ Log = mock_log,
+ meck:expect(ra_log, last_index_term, fun (_) -> {0, 0} end),
+ {no_reply, Aux, mock_log} = handle_aux(leader, cast, active, Aux0,
+ Log, MacState),
+ {no_reply, _Aux, mock_log} = handle_aux(leader, cast, tick, Aux,
+ Log, MacState),
+ [X] = ets:lookup(rabbit_fifo_usage, aux_test),
+ meck:unload(),
+ ?assert(X > 0.0),
+ ok.
+
+%% Utility
+
+init(Conf) -> rabbit_fifo_v0:init(Conf).
+apply(Meta, Entry, State) -> rabbit_fifo_v0:apply(Meta, Entry, State).
+init_aux(Conf) -> rabbit_fifo_v0:init_aux(Conf).
+handle_aux(S, T, C, A, L, M) -> rabbit_fifo_v0:handle_aux(S, T, C, A, L, M).
+make_checkout(C, S, M) -> rabbit_fifo_v0:make_checkout(C, S, M).
diff --git a/test/rabbitmq_queues_cli_integration_SUITE.erl b/test/rabbitmq_queues_cli_integration_SUITE.erl
index a41ee02427..bf5e9ee79e 100644
--- a/test/rabbitmq_queues_cli_integration_SUITE.erl
+++ b/test/rabbitmq_queues_cli_integration_SUITE.erl
@@ -27,8 +27,13 @@ groups() ->
].
init_per_suite(Config) ->
- rabbit_ct_helpers:log_environment(),
- rabbit_ct_helpers:run_setup_steps(Config).
+ case os:getenv("SECONDARY_UMBRELLA") of
+ false ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config);
+ _ ->
+ {skip, "growing and shrinking cannot be done in mixed mode"}
+ end.
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).