summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/backing_queue_SUITE.erl77
-rw-r--r--test/channel_operation_timeout_SUITE.erl3
-rw-r--r--test/confirms_rejects_SUITE.erl17
-rw-r--r--test/dead_lettering_SUITE.erl6
-rw-r--r--test/dynamic_ha_SUITE.erl8
-rw-r--r--test/queue_parallel_SUITE.erl21
-rw-r--r--test/queue_type_SUITE.erl234
-rw-r--r--test/quorum_queue_SUITE.erl83
-rw-r--r--test/quorum_queue_utils.erl9
-rw-r--r--test/rabbit_confirms_SUITE.erl154
-rw-r--r--test/rabbit_fifo_SUITE.erl2
-rw-r--r--test/rabbit_fifo_int_SUITE.erl186
-rw-r--r--test/rabbit_ha_test_consumer.erl7
-rw-r--r--test/rabbit_msg_record_SUITE.erl213
-rw-r--r--test/rabbit_stream_queue_SUITE.erl1304
-rw-r--r--test/simple_ha_SUITE.erl4
-rw-r--r--test/unit_log_config_SUITE.erl24
17 files changed, 2193 insertions, 159 deletions
diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl
index 2025576a57..ff37e1fb04 100644
--- a/test/backing_queue_SUITE.erl
+++ b/test/backing_queue_SUITE.erl
@@ -684,17 +684,17 @@ bq_variable_queue_delete_msg_store_files_callback1(Config) ->
QPid = amqqueue:get_pid(Q),
Payload = <<0:8388608>>, %% 1MB
Count = 30,
- publish_and_confirm(Q, Payload, Count),
+ QTState = publish_and_confirm(Q, Payload, Count),
rabbit_amqqueue:set_ram_duration_target(QPid, 0),
{ok, Limiter} = rabbit_limiter:start_link(no_id),
CountMinusOne = Count - 1,
- {ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} =
- rabbit_amqqueue:basic_get(Q, self(), true, Limiter,
+ {ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}, _} =
+ rabbit_amqqueue:basic_get(Q, true, Limiter,
<<"bq_variable_queue_delete_msg_store_files_callback1">>,
- #{}),
+ QTState),
{ok, CountMinusOne} = rabbit_amqqueue:purge(Q),
%% give the queue a second to receive the close_fds callback msg
@@ -713,8 +713,7 @@ bq_queue_recover1(Config) ->
{new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>),
QName = amqqueue:get_name(Q),
QPid = amqqueue:get_pid(Q),
- publish_and_confirm(Q, <<>>, Count),
-
+ QT = publish_and_confirm(Q, <<>>, Count),
SupPid = get_queue_sup_pid(Q),
true = is_pid(SupPid),
exit(SupPid, kill),
@@ -724,7 +723,7 @@ bq_queue_recover1(Config) ->
after 10000 -> exit(timeout_waiting_for_queue_death)
end,
rabbit_amqqueue:stop(?VHOST),
- {Recovered, [], []} = rabbit_amqqueue:recover(?VHOST),
+ {Recovered, []} = rabbit_amqqueue:recover(?VHOST),
rabbit_amqqueue:start(Recovered),
{ok, Limiter} = rabbit_limiter:start_link(no_id),
rabbit_amqqueue:with_or_die(
@@ -732,9 +731,9 @@ bq_queue_recover1(Config) ->
fun (Q1) when ?is_amqqueue(Q1) ->
QPid1 = amqqueue:get_pid(Q1),
CountMinusOne = Count - 1,
- {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
- rabbit_amqqueue:basic_get(Q1, self(), false, Limiter,
- <<"bq_queue_recover1">>, #{}),
+ {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}, _} =
+ rabbit_amqqueue:basic_get(Q1, false, Limiter,
+ <<"bq_queue_recover1">>, QT),
exit(QPid1, shutdown),
VQ1 = variable_queue_init(Q, true),
{{_Msg1, true, _AckTag1}, VQ2} =
@@ -1366,25 +1365,34 @@ variable_queue_init(Q, Recover) ->
publish_and_confirm(Q, Payload, Count) ->
Seqs = lists:seq(1, Count),
- [begin
- Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
- <<>>, #'P_basic'{delivery_mode = 2},
- Payload),
- Delivery = #delivery{mandatory = false, sender = self(),
- confirm = true, message = Msg, msg_seq_no = Seq,
- flow = noflow},
- _QPids = rabbit_amqqueue:deliver([Q], Delivery)
- end || Seq <- Seqs],
- wait_for_confirms(gb_sets:from_list(Seqs)).
+ QTState0 = rabbit_queue_type:new(Q, rabbit_queue_type:init()),
+ QTState =
+ lists:foldl(
+ fun (Seq, Acc0) ->
+ Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
+ <<>>, #'P_basic'{delivery_mode = 2},
+ Payload),
+ Delivery = #delivery{mandatory = false, sender = self(),
+ confirm = true, message = Msg, msg_seq_no = Seq,
+ flow = noflow},
+ {ok, Acc, _Actions} = rabbit_queue_type:deliver([Q], Delivery, Acc0),
+ Acc
+ end, QTState0, Seqs),
+ wait_for_confirms(gb_sets:from_list(Seqs)),
+ QTState.
wait_for_confirms(Unconfirmed) ->
case gb_sets:is_empty(Unconfirmed) of
true -> ok;
- false -> receive {'$gen_cast', {confirm, Confirmed, _}} ->
+ false -> receive {'$gen_cast',
+ {queue_event, _QName,
+ {confirm, Confirmed, _}}} ->
wait_for_confirms(
rabbit_misc:gb_sets_difference(
Unconfirmed, gb_sets:from_list(Confirmed)))
- after ?TIMEOUT -> exit(timeout_waiting_for_confirm)
+ after ?TIMEOUT ->
+ flush(),
+ exit(timeout_waiting_for_confirm)
end
end.
@@ -1436,6 +1444,7 @@ variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) ->
variable_queue_wait_for_shuffling_end(
lists:foldl(
fun (N, VQN) ->
+
rabbit_variable_queue:publish(
rabbit_basic:message(
rabbit_misc:r(<<>>, exchange, <<>>),
@@ -1526,12 +1535,13 @@ variable_queue_status(VQ) ->
variable_queue_wait_for_shuffling_end(VQ) ->
case credit_flow:blocked() of
false -> VQ;
- true -> receive
- {bump_credit, Msg} ->
- credit_flow:handle_bump_msg(Msg),
- variable_queue_wait_for_shuffling_end(
- rabbit_variable_queue:resume(VQ))
- end
+ true ->
+ receive
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg),
+ variable_queue_wait_for_shuffling_end(
+ rabbit_variable_queue:resume(VQ))
+ end
end.
msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) ->
@@ -1576,11 +1586,13 @@ variable_queue_with_holes(VQ0) ->
fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ7),
%% assertions
Status = variable_queue_status(VQ8),
+
vq_with_holes_assertions(VQ8, proplists:get_value(mode, Status)),
Depth = Count + Interval,
Depth = rabbit_variable_queue:depth(VQ8),
Len = Depth - length(Subset3),
Len = rabbit_variable_queue:len(VQ8),
+
{Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + Interval), VQ8}.
vq_with_holes_assertions(VQ, default) ->
@@ -1604,3 +1616,12 @@ check_variable_queue_status(VQ0, Props) ->
S = variable_queue_status(VQ1),
assert_props(S, Props),
VQ1.
+
+flush() ->
+ receive
+ Any ->
+ ct:pal("flush ~p", [Any]),
+ flush()
+ after 0 ->
+ ok
+ end.
diff --git a/test/channel_operation_timeout_SUITE.erl b/test/channel_operation_timeout_SUITE.erl
index f8da35d6ff..15e0188604 100644
--- a/test/channel_operation_timeout_SUITE.erl
+++ b/test/channel_operation_timeout_SUITE.erl
@@ -71,11 +71,13 @@ notify_down_all(Config) ->
RabbitCh = rabbit_ct_client_helpers:open_channel(Config, 0),
HareCh = rabbit_ct_client_helpers:open_channel(Config, 1),
+ ct:pal("one"),
%% success
set_channel_operation_timeout_config(Config, 1000),
configure_bq(Config),
QCfg0 = qconfig(RabbitCh, <<"q0">>, <<"ex0">>, true, false),
declare(QCfg0),
+ ct:pal("two"),
%% Testing rabbit_amqqueue:notify_down_all via rabbit_channel.
%% Consumer count = 0 after correct channel termination and
%% notification of queues via delegate:call/3
@@ -83,6 +85,7 @@ notify_down_all(Config) ->
rabbit_ct_client_helpers:close_channel(RabbitCh),
0 = length(get_consumers(Config, Rabbit, ?DEFAULT_VHOST)),
false = is_process_alive(RabbitCh),
+ ct:pal("three"),
%% fail
set_channel_operation_timeout_config(Config, 10),
diff --git a/test/confirms_rejects_SUITE.erl b/test/confirms_rejects_SUITE.erl
index aaaeb4a939..a51253885c 100644
--- a/test/confirms_rejects_SUITE.erl
+++ b/test/confirms_rejects_SUITE.erl
@@ -388,9 +388,12 @@ kill_the_queue(QueueName) ->
[begin
{ok, Q} = rabbit_amqqueue:lookup({resource, <<"/">>, queue, QueueName}),
Pid = amqqueue:get_pid(Q),
+ ct:pal("~w killed", [Pid]),
+ timer:sleep(1),
exit(Pid, kill)
end
- || _ <- lists:seq(1, 11)],
+ || _ <- lists:seq(1, 50)],
+ timer:sleep(1),
{ok, Q} = rabbit_amqqueue:lookup({resource, <<"/">>, queue, QueueName}),
Pid = amqqueue:get_pid(Q),
case is_process_alive(Pid) of
@@ -399,7 +402,11 @@ kill_the_queue(QueueName) ->
false -> ok
end.
-
-
-
-
+flush() ->
+ receive
+ Any ->
+ ct:pal("flush ~p", [Any]),
+ flush()
+ after 0 ->
+ ok
+ end.
diff --git a/test/dead_lettering_SUITE.erl b/test/dead_lettering_SUITE.erl
index 87b5566c57..4ee917aa21 100644
--- a/test/dead_lettering_SUITE.erl
+++ b/test/dead_lettering_SUITE.erl
@@ -1059,9 +1059,11 @@ dead_letter_headers_BCC(Config) ->
?assertMatch({array, _}, rabbit_misc:table_lookup(Headers3, <<"x-death">>)).
-%% Three top-level headers are added for the very first dead-lettering event. They are
+%% Three top-level headers are added for the very first dead-lettering event.
+%% They are
%% x-first-death-reason, x-first-death-queue, x-first-death-exchange
-%% They have the same values as the reason, queue, and exchange fields of the original
+%% They have the same values as the reason, queue, and exchange fields of the
+%% original
%% dead lettering event. Once added, these headers are never modified.
dead_letter_headers_first_death(Config) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
index 25027c7ef9..c881aef8a1 100644
--- a/test/dynamic_ha_SUITE.erl
+++ b/test/dynamic_ha_SUITE.erl
@@ -424,8 +424,7 @@ nodes_policy_should_pick_master_from_its_params(Config) ->
nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, A),
- ?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A],
- [all])),
+ ?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A], [all])),
%% --> Master: A
%% Slaves: [B, C] or [C, B]
SSPids = ?awaitMatch(SSPids when is_list(SSPids),
@@ -450,7 +449,7 @@ nodes_policy_should_pick_master_from_its_params(Config) ->
%% should instead use an existing synchronised mirror as the new master,
%% even though that isn't in the policy.
?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A],
- [{nodes, [LastSlave, A]}])),
+ [{nodes, [LastSlave, A]}])),
%% --> Master: B or C (same as previous policy)
%% Slaves: [A]
@@ -931,6 +930,7 @@ apply_in_parallel(Config, Nodes, Policies) ->
Self = self(),
[spawn_link(fun() ->
[begin
+
apply_policy(Config, N, Policy)
end || Policy <- Policies],
Self ! parallel_task_done
@@ -969,7 +969,7 @@ wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries) ->
%% Let's wait a bit longer.
timer:sleep(1000),
wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries - 1);
- FinalInfo ->
+ {ok, FinalInfo} ->
%% The last policy is the final state
LastPolicy = lists:last(TestedPolicies),
case verify_policy(LastPolicy, FinalInfo) of
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl
index c4d16a5900..0fbf7ec975 100644
--- a/test/queue_parallel_SUITE.erl
+++ b/test/queue_parallel_SUITE.erl
@@ -57,7 +57,8 @@ groups() ->
trigger_message_store_compaction]},
{quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
{quorum_queue_in_memory_limit, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
- {quorum_queue_in_memory_bytes, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}
+ {quorum_queue_in_memory_bytes, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
+ {stream_queue, [parallel], AllTests}
]}
].
@@ -122,13 +123,24 @@ init_per_group(mirrored_queue, Config) ->
{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
{queue_durable, true}]),
rabbit_ct_helpers:run_steps(Config1, []);
+init_per_group(stream_queue, Config) ->
+ case rabbit_ct_broker_helpers:enable_feature_flag(Config, stream_queue) of
+ ok ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"stream">>}]},
+ {queue_durable, true}]);
+ Skip ->
+ Skip
+ end;
init_per_group(Group, Config0) ->
case lists:member({group, Group}, all()) of
true ->
ClusterSize = 3,
Config = rabbit_ct_helpers:merge_app_env(
Config0, {rabbit, [{channel_tick_interval, 1000},
- {quorum_tick_interval, 1000}]}),
+ {quorum_tick_interval, 1000},
+ {stream_tick_interval, 1000}]}),
Config1 = rabbit_ct_helpers:set_config(
Config, [ {rmq_nodename_suffix, Group},
{rmq_nodes_count, ClusterSize}
@@ -514,6 +526,11 @@ basic_cancel(Config) ->
publish(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
CTag = atom_to_binary(?FUNCTION_NAME, utf8),
+
+ %% Let's set consumer prefetch so it works with stream queues
+ ?assertMatch(#'basic.qos_ok'{},
+ amqp_channel:call(Ch, #'basic.qos'{global = false,
+ prefetch_count = 1})),
subscribe(Ch, QName, false, CTag),
receive
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
diff --git a/test/queue_type_SUITE.erl b/test/queue_type_SUITE.erl
new file mode 100644
index 0000000000..eeeabc3d1e
--- /dev/null
+++ b/test/queue_type_SUITE.erl
@@ -0,0 +1,234 @@
+-module(queue_type_SUITE).
+
+-compile(export_all).
+
+-export([
+ ]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+%%%===================================================================
+%%% Common Test callbacks
+%%%===================================================================
+
+all() ->
+ [
+ {group, classic},
+ {group, quorum}
+ ].
+
+
+all_tests() ->
+ [
+ smoke
+ ].
+
+groups() ->
+ [
+ {classic, [], all_tests()},
+ {quorum, [], all_tests()}
+ ].
+
+init_per_suite(Config0) ->
+ rabbit_ct_helpers:log_environment(),
+ Config = rabbit_ct_helpers:merge_app_env(
+ Config0, {rabbit, [{quorum_tick_interval, 1000}]}),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config),
+ ok.
+
+init_per_group(Group, Config) ->
+ ClusterSize = 3,
+ Config1 = rabbit_ct_helpers:set_config(Config,
+ [{rmq_nodes_count, ClusterSize},
+ {rmq_nodename_suffix, Group},
+ {tcp_ports_base}]),
+ Config1b = rabbit_ct_helpers:set_config(Config1,
+ [{queue_type, atom_to_binary(Group, utf8)},
+ {net_ticktime, 10}]),
+ Config2 = rabbit_ct_helpers:run_steps(Config1b,
+ [fun merge_app_env/1 ] ++
+ rabbit_ct_broker_helpers:setup_steps()),
+ Config3 =
+ case rabbit_ct_broker_helpers:enable_feature_flag(Config2, quorum_queue) of
+ ok ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config2, 0, application, set_env,
+ [rabbit, channel_tick_interval, 100]),
+ %% HACK: the larger cluster sizes benefit for a bit more time
+ %% after clustering before running the tests.
+ case Group of
+ cluster_size_5 ->
+ timer:sleep(5000),
+ Config2;
+ _ ->
+ Config2
+ end;
+ Skip ->
+ end_per_group(Group, Config2),
+ Skip
+ end,
+ rabbit_ct_broker_helpers:set_policy(
+ Config3, 0,
+ <<"ha-policy">>, <<".*">>, <<"queues">>,
+ [{<<"ha-mode">>, <<"all">>}]),
+ Config3.
+
+merge_app_env(Config) ->
+ rabbit_ct_helpers:merge_app_env(
+ rabbit_ct_helpers:merge_app_env(Config,
+ {rabbit,
+ [{core_metrics_gc_interval, 100},
+ {log, [{file, [{level, debug}]}]}]}),
+ {ra, [{min_wal_roll_over_interval, 30000}]}).
+
+end_per_group(_Group, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
+ rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
+ Q = rabbit_data_coercion:to_binary(Testcase),
+ Config2 = rabbit_ct_helpers:set_config(Config1,
+ [{queue_name, Q},
+ {alt_queue_name, <<Q/binary, "_alt">>}
+ ]),
+ rabbit_ct_helpers:run_steps(Config2,
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config) ->
+ catch delete_queues(),
+ Config1 = rabbit_ct_helpers:run_steps(
+ Config,
+ rabbit_ct_client_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%%%===================================================================
+%%% Test cases
+%%%===================================================================
+
+smoke(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QName = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QName, 0, 0},
+ declare(Ch, QName, [{<<"x-queue-type">>, longstr,
+ ?config(queue_type, Config)}])),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ publish(Ch, QName, <<"msg1">>),
+ ct:pal("waiting for confirms from ~s", [QName]),
+ ok = receive
+ #'basic.ack'{} -> ok;
+ #'basic.nack'{} -> fail
+ after 2500 ->
+ flush(),
+ exit(confirm_timeout)
+ end,
+ DTag = basic_get(Ch, QName),
+
+ basic_ack(Ch, DTag),
+ basic_get_empty(Ch, QName),
+
+ %% consume
+ publish(Ch, QName, <<"msg2">>),
+ ConsumerTag1 = <<"ctag1">>,
+ ok = subscribe(Ch, QName, ConsumerTag1),
+ %% receive and ack
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false},
+ #amqp_msg{}} ->
+ basic_ack(Ch, DeliveryTag)
+ after 5000 ->
+ flush(),
+ exit(basic_deliver_timeout)
+ end,
+ basic_cancel(Ch, ConsumerTag1),
+
+ %% assert empty
+ basic_get_empty(Ch, QName),
+
+ %% consume and nack
+ ConsumerTag2 = <<"ctag2">>,
+ ok = subscribe(Ch, QName, ConsumerTag2),
+ publish(Ch, QName, <<"msg3">>),
+ receive
+ {#'basic.deliver'{delivery_tag = T,
+ redelivered = false},
+ #amqp_msg{}} ->
+ basic_cancel(Ch, ConsumerTag2),
+ basic_nack(Ch, T)
+ after 5000 ->
+ exit(basic_deliver_timeout)
+ end,
+ %% get and ack
+ basic_ack(Ch, basic_get(Ch, QName)),
+ ok.
+
+%% Utility
+delete_queues() ->
+ [rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
+ || Q <- rabbit_amqqueue:list()].
+
+declare(Ch, Q, Args) ->
+ amqp_channel:call(Ch, #'queue.declare'{queue = Q,
+ durable = true,
+ auto_delete = false,
+ arguments = Args}).
+
+publish(Ch, Queue, Msg) ->
+ ok = amqp_channel:cast(Ch,
+ #'basic.publish'{routing_key = Queue},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = Msg}).
+
+basic_get(Ch, Queue) ->
+ {GetOk, _} = Reply = amqp_channel:call(Ch, #'basic.get'{queue = Queue,
+ no_ack = false}),
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{}}, Reply),
+ GetOk#'basic.get_ok'.delivery_tag.
+
+basic_get_empty(Ch, Queue) ->
+ ?assertMatch(#'basic.get_empty'{},
+ amqp_channel:call(Ch, #'basic.get'{queue = Queue,
+ no_ack = false})).
+
+subscribe(Ch, Queue, CTag) ->
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
+ no_ack = false,
+ consumer_tag = CTag},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = CTag} ->
+ ok
+ after 5000 ->
+ exit(basic_consume_timeout)
+ end.
+
+basic_ack(Ch, DTag) ->
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag,
+ multiple = false}).
+
+basic_cancel(Ch, CTag) ->
+ #'basic.cancel_ok'{} =
+ amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}).
+
+basic_nack(Ch, DTag) ->
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag,
+ requeue = true,
+ multiple = false}).
+
+flush() ->
+ receive
+ Any ->
+ ct:pal("flush ~p", [Any]),
+ flush()
+ after 0 ->
+ ok
+ end.
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index ecb4fdac63..16042b71e8 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -383,13 +383,19 @@ start_queue(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
LQ = ?config(queue_name, Config),
+ %% The stream coordinator is also a ra process, we need to ensure the quorum tests
+ %% are not affected by any other ra cluster that could be added in the future
+ Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+
?assertEqual({'queue.declare_ok', LQ, 0, 0},
declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
%% Check that the application and one ra node are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+ Expected = Children + 1,
+ ?assertMatch(Expected,
+ length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))),
%% Test declare an existing queue
?assertEqual({'queue.declare_ok', LQ, 0, 0},
@@ -405,7 +411,8 @@ start_queue(Config) ->
%% Check that the application and process are still up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])).
+ ?assertMatch(Expected,
+ length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))).
start_queue_concurrent(Config) ->
Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -463,6 +470,10 @@ quorum_cluster_size_x(Config, Max, Expected) ->
stop_queue(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ %% The stream coordinator is also a ra process, we need to ensure the quorum tests
+ %% are not affected by any other ra cluster that could be added in the future
+ Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
LQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', LQ, 0, 0},
@@ -471,13 +482,15 @@ stop_queue(Config) ->
%% Check that the application and one ra node are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+ Expected = Children + 1,
+ ?assertMatch(Expected,
+ length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))),
%% Delete the quorum queue
?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = LQ})),
%% Check that the application and process are down
wait_until(fun() ->
- [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
+ Children == length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))
end),
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))).
@@ -485,6 +498,10 @@ stop_queue(Config) ->
restart_queue(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ %% The stream coordinator is also a ra process, we need to ensure the quorum tests
+ %% are not affected by any other ra cluster that could be added in the future
+ Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
LQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', LQ, 0, 0},
@@ -496,7 +513,9 @@ restart_queue(Config) ->
%% Check that the application and one ra node are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])).
+ Expected = Children + 1,
+ ?assertMatch(Expected,
+ length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))).
idempotent_recover(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
@@ -554,6 +573,10 @@ restart_all_types(Config) ->
%% ensure there are no regressions
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ %% The stream coordinator is also a ra process, we need to ensure the quorum tests
+ %% are not affected by any other ra cluster that could be added in the future
+ Children = rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]),
+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ1 = <<"restart_all_types-qq1">>,
?assertEqual({'queue.declare_ok', QQ1, 0, 0},
@@ -575,7 +598,9 @@ restart_all_types(Config) ->
%% Check that the application and two ra nodes are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+ Expected = length(Children) + 2,
+ Got = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+ ?assertMatch(Expected, Got),
%% Check the classic queues restarted correctly
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
{#'basic.get_ok'{}, #amqp_msg{}} =
@@ -592,6 +617,10 @@ stop_start_rabbit_app(Config) ->
%% classic) to ensure there are no regressions
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ %% The stream coordinator is also a ra process, we need to ensure the quorum tests
+ %% are not affected by any other ra cluster that could be added in the future
+ Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ1 = <<"stop_start_rabbit_app-qq">>,
?assertEqual({'queue.declare_ok', QQ1, 0, 0},
@@ -617,7 +646,9 @@ stop_start_rabbit_app(Config) ->
%% Check that the application and two ra nodes are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+ Expected = Children + 2,
+ ?assertMatch(Expected,
+ length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))),
%% Check the classic queues restarted correctly
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
{#'basic.get_ok'{}, #amqp_msg{}} =
@@ -935,6 +966,10 @@ cleanup_queue_state_on_channel_after_publish(Config) ->
%% to verify that the cleanup is propagated through channels
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ %% The stream coordinator is also a ra process, we need to ensure the quorum tests
+ %% are not affected by any other ra cluster that could be added in the future
+ Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
@@ -955,18 +990,22 @@ cleanup_queue_state_on_channel_after_publish(Config) ->
?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
wait_until(fun() ->
- [] == rpc:call(Server, supervisor, which_children,
- [ra_server_sup_sup])
+ Children == length(rpc:call(Server, supervisor, which_children,
+ [ra_server_sup_sup]))
end),
%% Check that all queue states have been cleaned
- wait_for_cleanup(Server, NCh1, 0),
- wait_for_cleanup(Server, NCh2, 0).
+ wait_for_cleanup(Server, NCh2, 0),
+ wait_for_cleanup(Server, NCh1, 0).
cleanup_queue_state_on_channel_after_subscribe(Config) ->
%% Declare/delete the queue and publish in one channel, while consuming on a
%% different one to verify that the cleanup is propagated through channels
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ %% The stream coordinator is also a ra process, we need to ensure the quorum tests
+ %% are not affected by any other ra cluster that could be added in the future
+ Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
@@ -993,7 +1032,7 @@ cleanup_queue_state_on_channel_after_subscribe(Config) ->
wait_for_cleanup(Server, NCh2, 1),
?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
wait_until(fun() ->
- [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
+ Children == length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))
end),
%% Check that all queue states have been cleaned
wait_for_cleanup(Server, NCh1, 0),
@@ -1596,8 +1635,8 @@ cleanup_data_dir(Config) ->
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
timer:sleep(100),
- [{_, UId1}] = rpc:call(Server1, ra_directory, list_registered, []),
- [{_, UId2}] = rpc:call(Server2, ra_directory, list_registered, []),
+ UId1 = proplists:get_value(ra_name(QQ), rpc:call(Server1, ra_directory, list_registered, [])),
+ UId2 = proplists:get_value(ra_name(QQ), rpc:call(Server2, ra_directory, list_registered, [])),
DataDir1 = rpc:call(Server1, ra_env, server_data_dir, [UId1]),
DataDir2 = rpc:call(Server2, ra_env, server_data_dir, [UId2]),
?assert(filelib:is_dir(DataDir1)),
@@ -1748,6 +1787,11 @@ reconnect_consumer_and_wait_channel_down(Config) ->
delete_immediately_by_resource(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+
+ %% The stream coordinator is also a ra process, we need to ensure the quorum tests
+ %% are not affected by any other ra cluster that could be added in the future
+ Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
@@ -1756,7 +1800,7 @@ delete_immediately_by_resource(Config) ->
%% Check that the application and process are down
wait_until(fun() ->
- [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
+ Children == length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))
end),
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))).
@@ -1784,6 +1828,8 @@ subscribe_redelivery_count(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
requeue = true})
+ after 5000 ->
+ exit(basic_deliver_timeout)
end,
receive
@@ -1794,6 +1840,8 @@ subscribe_redelivery_count(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
multiple = false,
requeue = true})
+ after 5000 ->
+ exit(basic_deliver_timeout_2)
end,
receive
@@ -1803,8 +1851,13 @@ subscribe_redelivery_count(Config) ->
?assertMatch({DCHeader, _, 2}, rabbit_basic:header(DCHeader, H2)),
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2,
multiple = false}),
+ ct:pal("wait_for_messages_ready", []),
wait_for_messages_ready(Servers, RaName, 0),
+ ct:pal("wait_for_messages_pending_ack", []),
wait_for_messages_pending_ack(Servers, RaName, 0)
+ after 5000 ->
+ flush(500),
+ exit(basic_deliver_timeout_3)
end.
subscribe_redelivery_limit(Config) ->
diff --git a/test/quorum_queue_utils.erl b/test/quorum_queue_utils.erl
index 95ddc892f1..caabd617ae 100644
--- a/test/quorum_queue_utils.erl
+++ b/test/quorum_queue_utils.erl
@@ -28,15 +28,10 @@ wait_for_messages_total(Servers, QName, Total) ->
wait_for_messages(Servers, QName, Number, Fun, 0) ->
Msgs = dirty_query(Servers, QName, Fun),
- Totals = lists:map(fun(M) when is_map(M) ->
- maps:size(M);
- (_) ->
- -1
- end, Msgs),
- ?assertEqual(Totals, [Number || _ <- lists:seq(1, length(Servers))]);
+ ?assertEqual(Msgs, [Number || _ <- lists:seq(1, length(Servers))]);
wait_for_messages(Servers, QName, Number, Fun, N) ->
Msgs = dirty_query(Servers, QName, Fun),
- ct:pal("Got messages ~p", [Msgs]),
+ ct:pal("Got messages ~p ~p", [QName, Msgs]),
%% hack to allow the check to succeed in mixed versions clusters if at
%% least one node matches the criteria rather than all nodes for
F = case is_mixed_versions() of
diff --git a/test/rabbit_confirms_SUITE.erl b/test/rabbit_confirms_SUITE.erl
new file mode 100644
index 0000000000..331c3ca7c3
--- /dev/null
+++ b/test/rabbit_confirms_SUITE.erl
@@ -0,0 +1,154 @@
+-module(rabbit_confirms_SUITE).
+
+-compile(export_all).
+
+-export([
+ ]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+%%%===================================================================
+%%% Common Test callbacks
+%%%===================================================================
+
+all() ->
+ [
+ {group, tests}
+ ].
+
+
+all_tests() ->
+ [
+ confirm,
+ reject,
+ remove_queue
+ ].
+
+groups() ->
+ [
+ {tests, [], all_tests()}
+ ].
+
+init_per_suite(Config) ->
+ Config.
+
+end_per_suite(_Config) ->
+ ok.
+
+init_per_group(_Group, Config) ->
+ Config.
+
+end_per_group(_Group, _Config) ->
+ ok.
+
+init_per_testcase(_TestCase, Config) ->
+ Config.
+
+end_per_testcase(_TestCase, _Config) ->
+ ok.
+
+%%%===================================================================
+%%% Test cases
+%%%===================================================================
+
+confirm(_Config) ->
+ XName = rabbit_misc:r(<<"/">>, exchange, <<"X">>),
+ QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>),
+ QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>),
+ U0 = rabbit_confirms:init(),
+ ?assertEqual(0, rabbit_confirms:size(U0)),
+ ?assertEqual(undefined, rabbit_confirms:smallest(U0)),
+ ?assertEqual(true, rabbit_confirms:is_empty(U0)),
+
+ U1 = rabbit_confirms:insert(1, [QName], XName, U0),
+ ?assertEqual(1, rabbit_confirms:size(U1)),
+ ?assertEqual(1, rabbit_confirms:smallest(U1)),
+ ?assertEqual(false, rabbit_confirms:is_empty(U1)),
+
+ {[{1, XName}], U2} = rabbit_confirms:confirm([1], QName, U1),
+ ?assertEqual(0, rabbit_confirms:size(U2)),
+ ?assertEqual(undefined, rabbit_confirms:smallest(U2)),
+ ?assertEqual(true, rabbit_confirms:is_empty(U2)),
+
+ U3 = rabbit_confirms:insert(2, [QName], XName, U1),
+ ?assertEqual(2, rabbit_confirms:size(U3)),
+ ?assertEqual(1, rabbit_confirms:smallest(U3)),
+ ?assertEqual(false, rabbit_confirms:is_empty(U3)),
+
+ {[{1, XName}], U4} = rabbit_confirms:confirm([1], QName, U3),
+ ?assertEqual(1, rabbit_confirms:size(U4)),
+ ?assertEqual(2, rabbit_confirms:smallest(U4)),
+ ?assertEqual(false, rabbit_confirms:is_empty(U4)),
+
+ U5 = rabbit_confirms:insert(2, [QName, QName2], XName, U1),
+ ?assertEqual(2, rabbit_confirms:size(U5)),
+ ?assertEqual(1, rabbit_confirms:smallest(U5)),
+ ?assertEqual(false, rabbit_confirms:is_empty(U5)),
+
+ {[{1, XName}], U6} = rabbit_confirms:confirm([1, 2], QName, U5),
+ ?assertEqual(2, rabbit_confirms:smallest(U6)),
+
+ {[{2, XName}], U7} = rabbit_confirms:confirm([2], QName2, U6),
+ ?assertEqual(0, rabbit_confirms:size(U7)),
+ ?assertEqual(undefined, rabbit_confirms:smallest(U7)),
+
+
+ U8 = rabbit_confirms:insert(2, [QName], XName, U1),
+ {[{1, XName}, {2, XName}], _U9} = rabbit_confirms:confirm([1, 2], QName, U8),
+ ok.
+
+
+reject(_Config) ->
+ XName = rabbit_misc:r(<<"/">>, exchange, <<"X">>),
+ QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>),
+ QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>),
+ U0 = rabbit_confirms:init(),
+ ?assertEqual(0, rabbit_confirms:size(U0)),
+ ?assertEqual(undefined, rabbit_confirms:smallest(U0)),
+ ?assertEqual(true, rabbit_confirms:is_empty(U0)),
+
+ U1 = rabbit_confirms:insert(1, [QName], XName, U0),
+
+ {ok, {1, XName}, U2} = rabbit_confirms:reject(1, U1),
+ {error, not_found} = rabbit_confirms:reject(1, U2),
+ ?assertEqual(0, rabbit_confirms:size(U2)),
+ ?assertEqual(undefined, rabbit_confirms:smallest(U2)),
+
+ U3 = rabbit_confirms:insert(2, [QName, QName2], XName, U1),
+
+ {ok, {1, XName}, U4} = rabbit_confirms:reject(1, U3),
+ {error, not_found} = rabbit_confirms:reject(1, U4),
+ ?assertEqual(1, rabbit_confirms:size(U4)),
+ ?assertEqual(2, rabbit_confirms:smallest(U4)),
+
+ {ok, {2, XName}, U5} = rabbit_confirms:reject(2, U3),
+ {error, not_found} = rabbit_confirms:reject(2, U5),
+ ?assertEqual(1, rabbit_confirms:size(U5)),
+ ?assertEqual(1, rabbit_confirms:smallest(U5)),
+
+ ok.
+
+remove_queue(_Config) ->
+ XName = rabbit_misc:r(<<"/">>, exchange, <<"X">>),
+ QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>),
+ QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>),
+ U0 = rabbit_confirms:init(),
+
+ U1 = rabbit_confirms:insert(1, [QName, QName2], XName, U0),
+ U2 = rabbit_confirms:insert(2, [QName2], XName, U1),
+ {[{2, XName}], U3} = rabbit_confirms:remove_queue(QName2, U2),
+ ?assertEqual(1, rabbit_confirms:size(U3)),
+ ?assertEqual(1, rabbit_confirms:smallest(U3)),
+ {[{1, XName}], U4} = rabbit_confirms:remove_queue(QName, U3),
+ ?assertEqual(0, rabbit_confirms:size(U4)),
+ ?assertEqual(undefined, rabbit_confirms:smallest(U4)),
+
+ U5 = rabbit_confirms:insert(1, [QName], XName, U0),
+ U6 = rabbit_confirms:insert(2, [QName], XName, U5),
+ {[{1, XName}, {2, XName}], _U} = rabbit_confirms:remove_queue(QName, U6),
+
+ ok.
+
+
+%% Utility
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index d19dcb3682..7b90d91bfa 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -674,7 +674,7 @@ single_active_consumer_basic_get_test(_) ->
?assertEqual(single_active, State0#rabbit_fifo.cfg#cfg.consumer_strategy),
?assertEqual(0, map_size(State0#rabbit_fifo.consumers)),
{State1, _} = enq(1, 1, first, State0),
- {_State, {error, unsupported}} =
+ {_State, {error, {unsupported, single_active_consumer}}} =
apply(meta(2), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}),
State1),
ok.
diff --git a/test/rabbit_fifo_int_SUITE.erl b/test/rabbit_fifo_int_SUITE.erl
index b51975b062..b2ed7160a2 100644
--- a/test/rabbit_fifo_int_SUITE.erl
+++ b/test/rabbit_fifo_int_SUITE.erl
@@ -86,7 +86,7 @@ basics(Config) ->
CustomerTag = UId,
ok = start_cluster(ClusterName, [ServerId]),
FState0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, undefined, FState0),
+ {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, #{}, FState0),
ra_log_wal:force_roll_over(ra_log_wal),
% create segment the segment will trigger a snapshot
@@ -99,11 +99,10 @@ basics(Config) ->
FState5 = receive
{ra_event, From, Evt} ->
case rabbit_fifo_client:handle_ra_event(From, Evt, FState3) of
- {internal, _AcceptedSeqs, _Actions, _FState4} ->
- exit(unexpected_internal_event);
- {{delivery, C, [{MsgId, _Msg}]}, FState4} ->
- {ok, S} = rabbit_fifo_client:settle(C, [MsgId],
- FState4),
+ {ok, FState4,
+ [{deliver, C, true,
+ [{_Qname, _QRef, MsgId, _SomBool, _Msg}]}]} ->
+ {S, _A} = rabbit_fifo_client:settle(C, [MsgId], FState4),
S
end
after 5000 ->
@@ -129,10 +128,9 @@ basics(Config) ->
receive
{ra_event, Frm, E} ->
case rabbit_fifo_client:handle_ra_event(Frm, E, FState6b) of
- {internal, _, _, _FState7} ->
- exit({unexpected_internal_event, E});
- {{delivery, Ctag, [{Mid, {_, two}}]}, FState7} ->
- {ok, _S} = rabbit_fifo_client:return(Ctag, [Mid], FState7),
+ {ok, FState7, [{deliver, Ctag, true,
+ [{_, _, Mid, _, two}]}]} ->
+ {_, _} = rabbit_fifo_client:return(Ctag, [Mid], FState7),
ok
end
after 2000 ->
@@ -150,8 +148,8 @@ return(Config) ->
{ok, F0} = rabbit_fifo_client:enqueue(1, msg1, F00),
{ok, F1} = rabbit_fifo_client:enqueue(2, msg2, F0),
{_, _, F2} = process_ra_events(receive_ra_events(2, 0), F1),
- {ok, {{MsgId, _}, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2),
- {ok, _F2} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F),
+ {ok, _, {_, _, MsgId, _, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2),
+ _F2 = rabbit_fifo_client:return(<<"tag">>, [MsgId], F),
ra:stop_server(ServerId),
ok.
@@ -165,9 +163,9 @@ rabbit_fifo_returns_correlation(Config) ->
receive
{ra_event, Frm, E} ->
case rabbit_fifo_client:handle_ra_event(Frm, E, F1) of
- {internal, [corr1], [], _F2} ->
+ {ok, _F2, [{settled, _, _}]} ->
ok;
- {Del, _} ->
+ Del ->
exit({unexpected, Del})
end
after 2000 ->
@@ -181,23 +179,24 @@ duplicate_delivery(Config) ->
ServerId = ?config(node_id, Config),
ok = start_cluster(ClusterName, [ServerId]),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0),
{ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1),
Fun = fun Loop(S0) ->
receive
{ra_event, Frm, E} = Evt ->
case rabbit_fifo_client:handle_ra_event(Frm, E, S0) of
- {internal, [corr1], [], S1} ->
+ {ok, S1, [{settled, _, _}]} ->
Loop(S1);
- {_Del, S1} ->
+ {ok, S1, _} ->
%% repeat event delivery
self() ! Evt,
%% check that then next received delivery doesn't
%% repeat or crash
receive
{ra_event, F, E1} ->
- case rabbit_fifo_client:handle_ra_event(F, E1, S1) of
- {internal, [], [], S2} ->
+ case rabbit_fifo_client:handle_ra_event(
+ F, E1, S1) of
+ {ok, S2, _} ->
S2
end
end
@@ -215,7 +214,7 @@ usage(Config) ->
ServerId = ?config(node_id, Config),
ok = start_cluster(ClusterName, [ServerId]),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0),
{ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1),
{ok, F3} = rabbit_fifo_client:enqueue(corr2, msg2, F2),
{_, _, _} = process_ra_events(receive_ra_events(2, 2), F3),
@@ -242,9 +241,9 @@ resends_lost_command(Config) ->
meck:unload(ra),
{ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
{_, _, F4} = process_ra_events(receive_ra_events(2, 0), F3),
- {ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
- {ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
- {ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
+ {ok, _, {_, _, _, _, msg1}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
+ {ok, _, {_, _, _, _, msg2}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
+ {ok, _, {_, _, _, _, msg3}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
ra:stop_server(ServerId),
ok.
@@ -268,7 +267,7 @@ detects_lost_delivery(Config) ->
F000 = rabbit_fifo_client:init(ClusterName, [ServerId]),
{ok, F00} = rabbit_fifo_client:enqueue(msg1, F000),
{_, _, F0} = process_ra_events(receive_ra_events(1, 0), F00),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0),
{ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
{ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
% lose first delivery
@@ -298,13 +297,13 @@ returns_after_down(Config) ->
_Pid = spawn(fun () ->
F = rabbit_fifo_client:init(ClusterName, [ServerId]),
{ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10,
- undefined, F),
+ #{}, F),
Self ! checkout_done
end),
receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end,
timer:sleep(1000),
% message should be available for dequeue
- {ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2),
+ {ok, _, {_, _, _, _, msg1}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2),
ra:stop_server(ServerId),
ok.
@@ -327,9 +326,9 @@ resends_after_lost_applied(Config) ->
% send another message
{ok, F4} = rabbit_fifo_client:enqueue(msg3, F3),
{_, _, F5} = process_ra_events(receive_ra_events(1, 0), F4),
- {ok, {{_, {_, msg1}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
- {ok, {{_, {_, msg2}}, _}, F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
- {ok, {{_, {_, msg3}}, _}, _F8} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F7),
+ {ok, _, {_, _, _, _, msg1}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
+ {ok, _, {_, _, _, _, msg2}, F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
+ {ok, _, {_, _, _, _, msg3}, _F8} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F7),
ra:stop_server(ServerId),
ok.
@@ -377,15 +376,16 @@ discard(Config) ->
_ = ra:members(ServerId),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0),
{ok, F2} = rabbit_fifo_client:enqueue(msg1, F1),
- F3 = discard_next_delivery(F2, 500),
- {ok, empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3),
+ F3 = discard_next_delivery(F2, 5000),
+ {empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3),
receive
{dead_letter, Letters} ->
[{_, msg1}] = Letters,
ok
after 500 ->
+ flush(),
exit(dead_letter_timeout)
end,
ra:stop_server(ServerId),
@@ -397,11 +397,11 @@ cancel_checkout(Config) ->
ok = start_cluster(ClusterName, [ServerId]),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
{ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
- {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1),
+ {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F1),
{_, _, F3} = process_ra_events(receive_ra_events(1, 1), F2, [], [], fun (_, S) -> S end),
{ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3),
- {ok, F5} = rabbit_fifo_client:return(<<"tag">>, [0], F4),
- {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5),
+ {F5, _} = rabbit_fifo_client:return(<<"tag">>, [0], F4),
+ {ok, _, {_, _, _, _, m1}, F5} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5),
ok.
credit(Config) ->
@@ -413,20 +413,20 @@ credit(Config) ->
{ok, F2} = rabbit_fifo_client:enqueue(m2, F1),
{_, _, F3} = process_ra_events(receive_ra_events(2, 0), F2),
%% checkout with 0 prefetch
- {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, undefined, F3),
+ {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, #{}, F3),
%% assert no deliveries
{_, _, F5} = process_ra_events(receive_ra_events(), F4, [], [],
fun
(D, _) -> error({unexpected_delivery, D})
end),
%% provide some credit
- {ok, F6} = rabbit_fifo_client:credit(<<"tag">>, 1, false, F5),
- {[{_, {_, m1}}], [{send_credit_reply, _}], F7} =
+ F6 = rabbit_fifo_client:credit(<<"tag">>, 1, false, F5),
+ {[{_, _, _, _, m1}], [{send_credit_reply, _}], F7} =
process_ra_events(receive_ra_events(1, 1), F6),
%% credit and drain
- {ok, F8} = rabbit_fifo_client:credit(<<"tag">>, 4, true, F7),
- {[{_, {_, m2}}], [{send_credit_reply, _}, {send_drained, _}], F9} =
+ F8 = rabbit_fifo_client:credit(<<"tag">>, 4, true, F7),
+ {[{_, _, _, _, m2}], [{send_credit_reply, _}, {send_drained, _}], F9} =
process_ra_events(receive_ra_events(1, 1), F8),
flush(),
@@ -439,9 +439,8 @@ credit(Config) ->
(D, _) -> error({unexpected_delivery, D})
end),
%% credit again and receive the last message
- {ok, F12} = rabbit_fifo_client:credit(<<"tag">>, 10, false, F11),
- {[{_, {_, m3}}], [{send_credit_reply, _}], _} =
- process_ra_events(receive_ra_events(1, 1), F12),
+ F12 = rabbit_fifo_client:credit(<<"tag">>, 10, false, F11),
+ {[{_, _, _, _, m3}], _, _} = process_ra_events(receive_ra_events(1, 1), F12),
ok.
untracked_enqueue(Config) ->
@@ -452,7 +451,7 @@ untracked_enqueue(Config) ->
ok = rabbit_fifo_client:untracked_enqueue([ServerId], msg1),
timer:sleep(100),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0),
+ {ok, _, {_, _, _, _, msg1}, _F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0),
ra:stop_server(ServerId),
ok.
@@ -472,6 +471,7 @@ flow(Config) ->
ok.
test_queries(Config) ->
+ % ok = logger:set_primary_config(level, all),
ClusterName = ?config(cluster_name, Config),
ServerId = ?config(node_id, Config),
ok = start_cluster(ClusterName, [ServerId]),
@@ -484,20 +484,23 @@ test_queries(Config) ->
Self ! ready,
receive stop -> ok end
end),
+ receive
+ ready -> ok
+ after 5000 ->
+ exit(ready_timeout)
+ end,
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
- ok = receive ready -> ok after 5000 -> timeout end,
- {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, undefined, F0),
- ?assertMatch({ok, {_RaIdxTerm, 1}, _Leader},
- ra:local_query(ServerId,
- fun rabbit_fifo:query_messages_ready/1)),
- ?assertMatch({ok, {_RaIdxTerm, 1}, _Leader},
- ra:local_query(ServerId,
- fun rabbit_fifo:query_messages_checked_out/1)),
- ?assertMatch({ok, {_RaIdxTerm, Processes}, _Leader}
- when length(Processes) == 2,
- ra:local_query(ServerId,
- fun rabbit_fifo:query_processes/1)),
- P ! stop,
+ {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, #{}, F0),
+ {ok, {_, Ready}, _} = ra:local_query(ServerId,
+ fun rabbit_fifo:query_messages_ready/1),
+ ?assertEqual(1, Ready),
+ {ok, {_, Checked}, _} = ra:local_query(ServerId,
+ fun rabbit_fifo:query_messages_checked_out/1),
+ ?assertEqual(1, Checked),
+ {ok, {_, Processes}, _} = ra:local_query(ServerId,
+ fun rabbit_fifo:query_processes/1),
+ ?assertEqual(2, length(Processes)),
+ P ! stop,
ra:stop_server(ServerId),
ok.
@@ -511,15 +514,16 @@ dequeue(Config) ->
Tag = UId,
ok = start_cluster(ClusterName, [ServerId]),
F1 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, empty, F1b} = rabbit_fifo_client:dequeue(Tag, settled, F1),
+ {empty, F1b} = rabbit_fifo_client:dequeue(Tag, settled, F1),
{ok, F2_} = rabbit_fifo_client:enqueue(msg1, F1b),
{_, _, F2} = process_ra_events(receive_ra_events(1, 0), F2_),
- {ok, {{0, {_, msg1}}, _}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2),
+ % {ok, {{0, {_, msg1}}, _}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2),
+ {ok, _, {_, _, 0, _, msg1}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2),
{ok, F4_} = rabbit_fifo_client:enqueue(msg2, F3),
{_, _, F4} = process_ra_events(receive_ra_events(1, 0), F4_),
- {ok, {{MsgId, {_, msg2}}, _}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4),
- {ok, _F6} = rabbit_fifo_client:settle(Tag, [MsgId], F5),
+ {ok, _, {_, _, MsgId, _, msg2}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4),
+ {_F6, _A} = rabbit_fifo_client:settle(Tag, [MsgId], F5),
ra:stop_server(ServerId),
ok.
@@ -534,8 +538,8 @@ conf(ClusterName, UId, ServerId, _, Peers) ->
process_ra_event(State, Wait) ->
receive
{ra_event, From, Evt} ->
- {internal, _, _, S} =
- rabbit_fifo_client:handle_ra_event(From, Evt, State),
+ {ok, S, _Actions} =
+ rabbit_fifo_client:handle_ra_event(From, Evt, State),
S
after Wait ->
exit(ra_event_timeout)
@@ -572,10 +576,10 @@ receive_ra_events(Acc) ->
end.
process_ra_events(Events, State) ->
- DeliveryFun = fun ({delivery, Tag, Msgs}, S) ->
+ DeliveryFun = fun ({deliver, _, Tag, Msgs}, S) ->
MsgIds = [element(1, M) || M <- Msgs],
- {ok, S2} = rabbit_fifo_client:settle(Tag, MsgIds, S),
- S2
+ {S0, _} = rabbit_fifo_client:settle(Tag, MsgIds, S),
+ S0
end,
process_ra_events(Events, State, [], [], DeliveryFun).
@@ -583,43 +587,41 @@ process_ra_events([], State0, Acc, Actions0, _DeliveryFun) ->
{Acc, Actions0, State0};
process_ra_events([{ra_event, From, Evt} | Events], State0, Acc, Actions0, DeliveryFun) ->
case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
- {internal, _, Actions, State} ->
- process_ra_events(Events, State, Acc, Actions0 ++ Actions, DeliveryFun);
- {{delivery, _Tag, Msgs} = Del, State1} ->
- State = DeliveryFun(Del, State1),
- process_ra_events(Events, State, Acc ++ Msgs, Actions0, DeliveryFun);
+ {ok, State1, Actions1} ->
+ {Msgs, Actions, State} =
+ lists:foldl(
+ fun ({deliver, _, _, Msgs} = Del, {M, A, S}) ->
+ {M ++ Msgs, A, DeliveryFun(Del, S)};
+ (Ac, {M, A, S}) ->
+ {M, A ++ [Ac], S}
+ end, {Acc, [], State1}, Actions1),
+ process_ra_events(Events, State, Msgs, Actions0 ++ Actions, DeliveryFun);
eol ->
eol
end.
discard_next_delivery(State0, Wait) ->
receive
- {ra_event, From, Evt} ->
- case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
- {internal, _, _Actions, State} ->
- discard_next_delivery(State, Wait);
- {{delivery, Tag, Msgs}, State1} ->
- MsgIds = [element(1, M) || M <- Msgs],
- {ok, State} = rabbit_fifo_client:discard(Tag, MsgIds,
- State1),
- State
- end
+ {ra_event, _, {machine, {delivery, _, _}}} = Evt ->
+ element(3, process_ra_events([Evt], State0, [], [],
+ fun ({deliver, Tag, _, Msgs}, S) ->
+ MsgIds = [element(3, M) || M <- Msgs],
+ {S0, _} = rabbit_fifo_client:discard(Tag, MsgIds, S),
+ S0
+ end))
after Wait ->
- State0
+ State0
end.
return_next_delivery(State0, Wait) ->
receive
- {ra_event, From, Evt} ->
- case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
- {internal, _, _, State} ->
- return_next_delivery(State, Wait);
- {{delivery, Tag, Msgs}, State1} ->
- MsgIds = [element(1, M) || M <- Msgs],
- {ok, State} = rabbit_fifo_client:return(Tag, MsgIds,
- State1),
- State
- end
+ {ra_event, _, {machine, {delivery, _, _}}} = Evt ->
+ element(3, process_ra_events([Evt], State0, [], [],
+ fun ({deliver, Tag, _, Msgs}, S) ->
+ MsgIds = [element(3, M) || M <- Msgs],
+ {S0, _} = rabbit_fifo_client:return(Tag, MsgIds, S),
+ S0
+ end))
after Wait ->
State0
end.
diff --git a/test/rabbit_ha_test_consumer.erl b/test/rabbit_ha_test_consumer.erl
index 3324a1253c..2467e40028 100644
--- a/test/rabbit_ha_test_consumer.erl
+++ b/test/rabbit_ha_test_consumer.erl
@@ -51,12 +51,15 @@ run(TestPid, Channel, Queue, CancelOnFailover, LowestSeen, MsgsToConsume) ->
%% counter.
if
MsgNum + 1 == LowestSeen ->
+ error_logger:info_msg("recording ~w left ~w",
+ [MsgNum, MsgsToConsume]),
run(TestPid, Channel, Queue,
CancelOnFailover, MsgNum, MsgsToConsume - 1);
MsgNum >= LowestSeen ->
error_logger:info_msg(
- "consumer ~p on ~p ignoring redelivered msg ~p~n",
- [self(), Channel, MsgNum]),
+ "consumer ~p on ~p ignoring redelivered msg ~p"
+ "lowest seen ~w~n",
+ [self(), Channel, MsgNum, LowestSeen]),
true = Redelivered, %% ASSERTION
run(TestPid, Channel, Queue,
CancelOnFailover, LowestSeen, MsgsToConsume);
diff --git a/test/rabbit_msg_record_SUITE.erl b/test/rabbit_msg_record_SUITE.erl
new file mode 100644
index 0000000000..a82ba7481d
--- /dev/null
+++ b/test/rabbit_msg_record_SUITE.erl
@@ -0,0 +1,213 @@
+-module(rabbit_msg_record_SUITE).
+
+-compile(export_all).
+
+-export([
+ ]).
+
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp10_common/include/amqp10_framing.hrl").
+
+%%%===================================================================
+%%% Common Test callbacks
+%%%===================================================================
+
+all() ->
+ [
+ {group, tests}
+ ].
+
+
+all_tests() ->
+ [
+ ampq091_roundtrip,
+ message_id_ulong,
+ message_id_uuid,
+ message_id_binary,
+ message_id_large_binary,
+ message_id_large_string
+ ].
+
+groups() ->
+ [
+ {tests, [], all_tests()}
+ ].
+
+init_per_suite(Config) ->
+ Config.
+
+end_per_suite(_Config) ->
+ ok.
+
+init_per_group(_Group, Config) ->
+ Config.
+
+end_per_group(_Group, _Config) ->
+ ok.
+
+init_per_testcase(_TestCase, Config) ->
+ Config.
+
+end_per_testcase(_TestCase, _Config) ->
+ ok.
+
+%%%===================================================================
+%%% Test cases
+%%%===================================================================
+
+ampq091_roundtrip(_Config) ->
+ Props = #'P_basic'{content_type = <<"text/plain">>,
+ content_encoding = <<"gzip">>,
+ headers = [{<<"x-stream-offset">>, long, 99},
+ {<<"x-string">>, longstr, <<"a string">>},
+ {<<"x-bool">>, bool, false},
+ {<<"x-unsignedbyte">>, unsignedbyte, 1},
+ {<<"x-unsignedshort">>, unsignedshort, 1},
+ {<<"x-unsignedint">>, unsignedint, 1},
+ {<<"x-signedint">>, signedint, 1},
+ {<<"x-timestamp">>, timestamp, 1},
+ {<<"x-double">>, double, 1.0},
+ {<<"x-float">>, float, 1.0},
+ {<<"x-binary">>, binary, <<"data">>}
+ ],
+ delivery_mode = 2,
+ priority = 99,
+ correlation_id = <<"corr">> ,
+ reply_to = <<"reply-to">>,
+ expiration = <<"1">>,
+ message_id = <<"msg-id">>,
+ timestamp = 99,
+ type = <<"45">>,
+ user_id = <<"banana">>,
+ app_id = <<"rmq">>
+ % cluster_id = <<"adf">>
+ },
+ Payload = [<<"data">>],
+ test_amqp091_roundtrip(Props, Payload),
+ test_amqp091_roundtrip(#'P_basic'{}, Payload),
+ ok.
+
+message_id_ulong(_Config) ->
+ Num = 9876789,
+ ULong = erlang:integer_to_binary(Num),
+ P = #'v1_0.properties'{message_id = {ulong, Num},
+ correlation_id = {ulong, Num}},
+ D = #'v1_0.data'{content = <<"data">>},
+ Bin = [amqp10_framing:encode_bin(P),
+ amqp10_framing:encode_bin(D)],
+ R = rabbit_msg_record:init(iolist_to_binary(Bin)),
+ {Props, _} = rabbit_msg_record:to_amqp091(R),
+ ?assertMatch(#'P_basic'{message_id = ULong,
+ correlation_id = ULong,
+ headers =
+ [
+ %% ordering shouldn't matter
+ {<<"x-correlation-id-type">>, longstr, <<"ulong">>},
+ {<<"x-message-id-type">>, longstr, <<"ulong">>}
+ ]},
+ Props),
+ ok.
+
+message_id_uuid(_Config) ->
+ %% fake a uuid
+ UUId = erlang:md5(term_to_binary(make_ref())),
+ TextUUId = rabbit_data_coercion:to_binary(rabbit_guid:to_string(UUId)),
+ P = #'v1_0.properties'{message_id = {uuid, UUId},
+ correlation_id = {uuid, UUId}},
+ D = #'v1_0.data'{content = <<"data">>},
+ Bin = [amqp10_framing:encode_bin(P),
+ amqp10_framing:encode_bin(D)],
+ R = rabbit_msg_record:init(iolist_to_binary(Bin)),
+ {Props, _} = rabbit_msg_record:to_amqp091(R),
+ ?assertMatch(#'P_basic'{message_id = TextUUId,
+ correlation_id = TextUUId,
+ headers =
+ [
+ %% ordering shouldn't matter
+ {<<"x-correlation-id-type">>, longstr, <<"uuid">>},
+ {<<"x-message-id-type">>, longstr, <<"uuid">>}
+ ]},
+ Props),
+ ok.
+
+message_id_binary(_Config) ->
+ %% fake a uuid
+ Orig = <<"asdfasdf">>,
+ Text = base64:encode(Orig),
+ P = #'v1_0.properties'{message_id = {binary, Orig},
+ correlation_id = {binary, Orig}},
+ D = #'v1_0.data'{content = <<"data">>},
+ Bin = [amqp10_framing:encode_bin(P),
+ amqp10_framing:encode_bin(D)],
+ R = rabbit_msg_record:init(iolist_to_binary(Bin)),
+ {Props, _} = rabbit_msg_record:to_amqp091(R),
+ ?assertMatch(#'P_basic'{message_id = Text,
+ correlation_id = Text,
+ headers =
+ [
+ %% ordering shouldn't matter
+ {<<"x-correlation-id-type">>, longstr, <<"binary">>},
+ {<<"x-message-id-type">>, longstr, <<"binary">>}
+ ]},
+ Props),
+ ok.
+
+message_id_large_binary(_Config) ->
+ %% cannot fit in a shortstr
+ Orig = crypto:strong_rand_bytes(500),
+ P = #'v1_0.properties'{message_id = {binary, Orig},
+ correlation_id = {binary, Orig}},
+ D = #'v1_0.data'{content = <<"data">>},
+ Bin = [amqp10_framing:encode_bin(P),
+ amqp10_framing:encode_bin(D)],
+ R = rabbit_msg_record:init(iolist_to_binary(Bin)),
+ {Props, _} = rabbit_msg_record:to_amqp091(R),
+ ?assertMatch(#'P_basic'{message_id = undefined,
+ correlation_id = undefined,
+ headers =
+ [
+ %% ordering shouldn't matter
+ {<<"x-correlation-id">>, longstr, Orig},
+ {<<"x-message-id">>, longstr, Orig}
+ ]},
+ Props),
+ ok.
+
+message_id_large_string(_Config) ->
+ %% cannot fit in a shortstr
+ Orig = base64:encode(crypto:strong_rand_bytes(500)),
+ P = #'v1_0.properties'{message_id = {utf8, Orig},
+ correlation_id = {utf8, Orig}},
+ D = #'v1_0.data'{content = <<"data">>},
+ Bin = [amqp10_framing:encode_bin(P),
+ amqp10_framing:encode_bin(D)],
+ R = rabbit_msg_record:init(iolist_to_binary(Bin)),
+ {Props, _} = rabbit_msg_record:to_amqp091(R),
+ ?assertMatch(#'P_basic'{message_id = undefined,
+ correlation_id = undefined,
+ headers =
+ [
+ %% ordering shouldn't matter
+ {<<"x-correlation-id">>, longstr, Orig},
+ {<<"x-message-id">>, longstr, Orig}
+ ]},
+ Props),
+ ok.
+
+%% Utility
+
+test_amqp091_roundtrip(Props, Payload) ->
+ MsgRecord0 = rabbit_msg_record:from_amqp091(Props, Payload),
+ MsgRecord = rabbit_msg_record:init(
+ iolist_to_binary(rabbit_msg_record:to_iodata(MsgRecord0))),
+ % meck:unload(),
+ {PropsOut, PayloadOut} = rabbit_msg_record:to_amqp091(MsgRecord),
+ ?assertEqual(Props, PropsOut),
+ ?assertEqual(iolist_to_binary(Payload),
+ iolist_to_binary(PayloadOut)),
+ ok.
+
+
diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl
new file mode 100644
index 0000000000..67ca8eba8b
--- /dev/null
+++ b/test/rabbit_stream_queue_SUITE.erl
@@ -0,0 +1,1304 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% Copyright (c) 2012-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_stream_queue_SUITE).
+
+-include_lib("proper/include/proper.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+suite() ->
+ [{timetrap, 5 * 60000}].
+
+all() ->
+ [
+ {group, single_node},
+ {group, cluster_size_2},
+ {group, cluster_size_3},
+ {group, unclustered_size_3_1},
+ {group, unclustered_size_3_2},
+ {group, unclustered_size_3_3},
+ {group, cluster_size_3_1}
+ ].
+
+groups() ->
+ [
+ {single_node, [], [restart_single_node] ++ all_tests()},
+ {cluster_size_2, [], all_tests()},
+ {cluster_size_3, [], all_tests() ++
+ [delete_replica,
+ delete_down_replica,
+ delete_classic_replica,
+ delete_quorum_replica,
+ consume_from_replica,
+ leader_failover]},
+ {unclustered_size_3_1, [], [add_replica]},
+ {unclustered_size_3_2, [], [consume_without_local_replica]},
+ {unclustered_size_3_3, [], [grow_coordinator_cluster]},
+ {cluster_size_3_1, [], [shrink_coordinator_cluster]}
+ ].
+
+all_tests() ->
+ [
+ declare_args,
+ declare_max_age,
+ declare_invalid_args,
+ declare_invalid_properties,
+ declare_queue,
+ delete_queue,
+ publish,
+ publish_confirm,
+ recover,
+ consume_without_qos,
+ consume,
+ consume_offset,
+ basic_get,
+ consume_with_autoack,
+ consume_and_nack,
+ consume_and_ack,
+ consume_and_reject,
+ consume_from_last,
+ consume_from_next,
+ consume_from_default,
+ consume_credit,
+ consume_credit_out_of_order_ack,
+ consume_credit_multiple_ack,
+ basic_cancel,
+ max_length_bytes,
+ max_age,
+ invalid_policy,
+ max_age_policy,
+ max_segment_size_policy
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config0) ->
+ rabbit_ct_helpers:log_environment(),
+ Config = rabbit_ct_helpers:merge_app_env(
+ Config0, {rabbit, [{stream_tick_interval, 1000},
+ {log, [{file, [{level, debug}]}]}]}),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(Group, Config) ->
+ ClusterSize = case Group of
+ single_node -> 1;
+ cluster_size_2 -> 2;
+ cluster_size_3 -> 3;
+ cluster_size_3_1 -> 3;
+ unclustered_size_3_1 -> 3;
+ unclustered_size_3_2 -> 3;
+ unclustered_size_3_3 -> 3
+ end,
+ Clustered = case Group of
+ unclustered_size_3_1 -> false;
+ unclustered_size_3_2 -> false;
+ unclustered_size_3_3 -> false;
+ _ -> true
+ end,
+ Config1 = rabbit_ct_helpers:set_config(Config,
+ [{rmq_nodes_count, ClusterSize},
+ {rmq_nodename_suffix, Group},
+ {tcp_ports_base},
+ {rmq_nodes_clustered, Clustered}]),
+ Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]),
+ Ret = rabbit_ct_helpers:run_steps(Config1b,
+ [fun merge_app_env/1 ] ++
+ rabbit_ct_broker_helpers:setup_steps()),
+ case Ret of
+ {skip, _} ->
+ Ret;
+ Config2 ->
+ EnableFF = rabbit_ct_broker_helpers:enable_feature_flag(
+ Config2, stream_queue),
+ case EnableFF of
+ ok ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config2, 0, application, set_env,
+ [rabbit, channel_tick_interval, 100]),
+ Config2;
+ Skip ->
+ end_per_group(Group, Config2),
+ Skip
+ end
+ end.
+
+end_per_group(_, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
+ Q = rabbit_data_coercion:to_binary(Testcase),
+ Config2 = rabbit_ct_helpers:set_config(Config1, [{queue_name, Q}]),
+ rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()).
+
+merge_app_env(Config) ->
+ rabbit_ct_helpers:merge_app_env(Config,
+ {rabbit, [{core_metrics_gc_interval, 100}]}).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
+ Config1 = rabbit_ct_helpers:run_steps(
+ Config,
+ rabbit_ct_client_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+declare_args(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-max-length">>, long, 2000}])),
+ assert_queue_type(Server, Q, rabbit_stream_queue).
+
+declare_max_age(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server), Q,
+ [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-max-age">>, longstr, <<"1A">>}])),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-max-age">>, longstr, <<"1Y">>}])),
+ assert_queue_type(Server, Q, rabbit_stream_queue).
+
+declare_invalid_properties(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Q = ?config(queue_name, Config),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ amqp_channel:call(
+ rabbit_ct_client_helpers:open_channel(Config, Server),
+ #'queue.declare'{queue = Q,
+ auto_delete = true,
+ durable = true,
+ arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]})),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ amqp_channel:call(
+ rabbit_ct_client_helpers:open_channel(Config, Server),
+ #'queue.declare'{queue = Q,
+ exclusive = true,
+ durable = true,
+ arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]})),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ amqp_channel:call(
+ rabbit_ct_client_helpers:open_channel(Config, Server),
+ #'queue.declare'{queue = Q,
+ durable = false,
+ arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]})).
+
+declare_invalid_args(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Q = ?config(queue_name, Config),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-expires">>, long, 2000}])),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-message-ttl">>, long, 2000}])),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-max-priority">>, long, 2000}])),
+
+ [?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-overflow">>, longstr, XOverflow}]))
+ || XOverflow <- [<<"reject-publish">>, <<"reject-publish-dlx">>]],
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-queue-mode">>, longstr, <<"lazy">>}])),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-quorum-initial-group-size">>, longstr, <<"hop">>}])).
+
+declare_queue(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ %% Test declare an existing queue
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children,
+ [osiris_server_sup])),
+
+ %% Test declare an existing queue with different arguments
+ ?assertExit(_, declare(Ch, Q, [])).
+
+delete_queue(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q})).
+
+add_replica(Config) ->
+ [Server0, Server1, Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ Q = ?config(queue_name, Config),
+
+ %% Let's also try the add replica command on other queue types, it should fail
+ %% We're doing it in the same test for efficiency, otherwise we have to
+ %% start new rabbitmq clusters every time for a minor testcase
+ QClassic = <<Q/binary, "_classic">>,
+ QQuorum = <<Q/binary, "_quorum">>,
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ ?assertEqual({'queue.declare_ok', QClassic, 0, 0},
+ declare(Ch, QClassic, [{<<"x-queue-type">>, longstr, <<"classic">>}])),
+ ?assertEqual({'queue.declare_ok', QQuorum, 0, 0},
+ declare(Ch, QQuorum, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ %% Not a member of the cluster, what would happen?
+ ?assertEqual({error, node_not_running},
+ rpc:call(Server0, rabbit_stream_queue, add_replica,
+ [<<"/">>, Q, Server1])),
+ ?assertEqual({error, classic_queue_not_supported},
+ rpc:call(Server0, rabbit_stream_queue, add_replica,
+ [<<"/">>, QClassic, Server1])),
+ ?assertEqual({error, quorum_queue_not_supported},
+ rpc:call(Server0, rabbit_stream_queue, add_replica,
+ [<<"/">>, QQuorum, Server1])),
+
+ ok = rabbit_control_helper:command(stop_app, Server1),
+ ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
+ rabbit_control_helper:command(start_app, Server1),
+ timer:sleep(1000),
+ ?assertEqual({error, classic_queue_not_supported},
+ rpc:call(Server0, rabbit_stream_queue, add_replica,
+ [<<"/">>, QClassic, Server1])),
+ ?assertEqual({error, quorum_queue_not_supported},
+ rpc:call(Server0, rabbit_stream_queue, add_replica,
+ [<<"/">>, QQuorum, Server1])),
+ ?assertEqual(ok,
+ rpc:call(Server0, rabbit_stream_queue, add_replica,
+ [<<"/">>, Q, Server1])),
+ %% replicas must be recorded on the state, and if we publish messages then they must
+ %% be stored on disk
+ check_leader_and_replicas(Config, Q, Server0, [Server1]),
+ %% And if we try again? Idempotent
+ ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, add_replica,
+ [<<"/">>, Q, Server1])),
+ %% Add another node
+ ok = rabbit_control_helper:command(stop_app, Server2),
+ ok = rabbit_control_helper:command(join_cluster, Server2, [atom_to_list(Server0)], []),
+ rabbit_control_helper:command(start_app, Server2),
+ ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, add_replica,
+ [<<"/">>, Q, Server2])),
+ check_leader_and_replicas(Config, Q, Server0, [Server1, Server2]).
+
+delete_replica(Config) ->
+ [Server0, Server1, Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ check_leader_and_replicas(Config, Q, Server0, [Server1, Server2]),
+ %% Not a member of the cluster, what would happen?
+ ?assertEqual({error, node_not_running},
+ rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, 'zen@rabbit'])),
+ ?assertEqual(ok,
+ rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, Server1])),
+ %% check it's gone
+ check_leader_and_replicas(Config, Q, Server0, [Server2]),
+ %% And if we try again? Idempotent
+ ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, Server1])),
+ %% Delete the last replica
+ ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, Server2])),
+ check_leader_and_replicas(Config, Q, Server0, []).
+
+grow_coordinator_cluster(Config) ->
+ [Server0, Server1, _Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ ok = rabbit_control_helper:command(stop_app, Server1),
+ ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
+ rabbit_control_helper:command(start_app, Server1),
+
+ rabbit_ct_helpers:await_condition(
+ fun() ->
+ case rpc:call(Server0, ra, members, [{rabbit_stream_coordinator, Server0}]) of
+ {_, Members, _} ->
+ Nodes = lists:sort([N || {_, N} <- Members]),
+ lists:sort([Server0, Server1]) == Nodes;
+ _ ->
+ false
+ end
+ end, 60000).
+
+shrink_coordinator_cluster(Config) ->
+ [Server0, Server1, Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ ok = rabbit_control_helper:command(stop_app, Server2),
+ ok = rabbit_control_helper:command(forget_cluster_node, Server0, [atom_to_list(Server2)], []),
+
+ rabbit_ct_helpers:await_condition(
+ fun() ->
+ case rpc:call(Server0, ra, members, [{rabbit_stream_coordinator, Server0}]) of
+ {_, Members, _} ->
+ Nodes = lists:sort([N || {_, N} <- Members]),
+ lists:sort([Server0, Server1]) == Nodes;
+ _ ->
+ false
+ end
+ end, 60000).
+
+delete_classic_replica(Config) ->
+ [Server0, Server1, _Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"classic">>}])),
+ %% Not a member of the cluster, what would happen?
+ ?assertEqual({error, classic_queue_not_supported},
+ rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, 'zen@rabbit'])),
+ ?assertEqual({error, classic_queue_not_supported},
+ rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, Server1])).
+
+delete_quorum_replica(Config) ->
+ [Server0, Server1, _Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ %% Not a member of the cluster, what would happen?
+ ?assertEqual({error, quorum_queue_not_supported},
+ rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, 'zen@rabbit'])),
+ ?assertEqual({error, quorum_queue_not_supported},
+ rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, Server1])).
+
+delete_down_replica(Config) ->
+ [Server0, Server1, Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ check_leader_and_replicas(Config, Q, Server0, [Server1, Server2]),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
+ ?assertEqual({error, node_not_running},
+ rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, Server1])),
+ %% check it isn't gone
+ check_leader_and_replicas(Config, Q, Server0, [Server1, Server2]),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
+ ?assertEqual(ok,
+ rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, Server1])).
+
+publish(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ publish(Ch, Q),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]).
+
+publish_confirm(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ publish(Ch, Q),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]).
+
+restart_single_node(Config) ->
+ [Server] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ publish(Ch, Q),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]),
+
+ rabbit_control_helper:command(stop_app, Server),
+ rabbit_control_helper:command(start_app, Server),
+
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]),
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ publish(Ch1, Q),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"2">>, <<"0">>]]).
+
+recover(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ publish(Ch, Q),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]),
+
+ [rabbit_ct_broker_helpers:stop_node(Config, S) || S <- Servers],
+ [rabbit_ct_broker_helpers:start_node(Config, S) || S <- lists:reverse(Servers)],
+
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]),
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ publish(Ch1, Q),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"2">>, <<"0">>]]).
+
+consume_without_qos(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ ?assertExit({{shutdown, {server_initiated_close, 406, _}}, _},
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, consumer_tag = <<"ctag">>},
+ self())).
+
+consume_without_local_replica(Config) ->
+ [Server0, Server1 | _] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ %% Add another node to the cluster, but it won't have a replica
+ ok = rabbit_control_helper:command(stop_app, Server1),
+ ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
+ rabbit_control_helper:command(start_app, Server1),
+ timer:sleep(1000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ qos(Ch1, 10, false),
+ ?assertExit({{shutdown, {server_initiated_close, 406, _}}, _},
+ amqp_channel:subscribe(Ch1, #'basic.consume'{queue = Q, consumer_tag = <<"ctag">>},
+ self())).
+
+consume(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ publish(Ch, Q),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+ subscribe(Ch1, Q, false, 0),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false}),
+ _ = amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}),
+ ok = amqp_channel:close(Ch1),
+ ok
+ after 5000 ->
+ exit(timeout)
+ end.
+
+consume_offset(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ Payload = << <<"1">> || _ <- lists:seq(1, 500) >>,
+ [publish(Ch, Q, Payload) || _ <- lists:seq(1, 1000)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ run_proper(
+ fun () ->
+ ?FORALL(Offset, range(0, 999),
+ begin
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+ subscribe(Ch1, Q, false, Offset),
+ receive_batch(Ch1, Offset, 999),
+ receive
+ {_,
+ #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S}]}}}
+ when S < Offset ->
+ exit({unexpected_offset, S})
+ after 1000 ->
+ ok
+ end,
+ amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}),
+ true
+ end)
+ end, [], 25).
+
+basic_get(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _},
+ amqp_channel:call(Ch, #'basic.get'{queue = Q})).
+
+consume_with_autoack(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+
+ ?assertExit(
+ {{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _},
+ subscribe(Ch1, Q, true, 0)).
+
+consume_and_nack(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ publish(Ch, Q),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+ subscribe(Ch1, Q, false, 0),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ ok = amqp_channel:cast(Ch1, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true}),
+ %% Nack will throw a not implemented exception. As it is a cast operation,
+ %% we'll detect the conneciton/channel closure on the next call.
+ %% Let's try to redeclare and see what happens
+ ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _},
+ declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}]))
+ after 10000 ->
+ exit(timeout)
+ end.
+
+basic_cancel(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ publish(Ch, Q),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+ subscribe(Ch1, Q, false, 0),
+ rabbit_ct_helpers:await_condition(
+ fun() ->
+ 1 == length(rabbit_ct_broker_helpers:rpc(Config, Server, ets, tab2list,
+ [consumer_created]))
+ end, 30000),
+ receive
+ {#'basic.deliver'{}, _} ->
+ amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}),
+ ?assertMatch([], rabbit_ct_broker_helpers:rpc(Config, Server, ets, tab2list, [consumer_created]))
+ after 10000 ->
+ exit(timeout)
+ end.
+
+consume_and_reject(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ publish(Ch, Q),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+ subscribe(Ch1, Q, false, 0),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ ok = amqp_channel:cast(Ch1, #'basic.reject'{delivery_tag = DeliveryTag,
+ requeue = true}),
+ %% Reject will throw a not implemented exception. As it is a cast operation,
+ %% we'll detect the conneciton/channel closure on the next call.
+ %% Let's try to redeclare and see what happens
+ ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _},
+ declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}]))
+ after 10000 ->
+ exit(timeout)
+ end.
+
+consume_and_ack(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ publish(Ch, Q),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+ subscribe(Ch1, Q, false, 0),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false}),
+ %% It will succeed as ack is now a credit operation. We should be
+ %% able to redeclare a queue (gen_server call op) as the channel
+ %% should still be open and declare is an idempotent operation
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]])
+ after 5000 ->
+ exit(timeout)
+ end.
+
+consume_from_last(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+
+ [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [committed_offset]]),
+
+ %% We'll receive data from the last committed offset, let's check that is not the
+ %% first offset
+ CommittedOffset = proplists:get_value(committed_offset, Info),
+ ?assert(CommittedOffset > 0),
+
+ %% If the offset is not provided, we're subscribing to the tail of the stream
+ amqp_channel:subscribe(
+ Ch1, #'basic.consume'{queue = Q,
+ no_ack = false,
+ consumer_tag = <<"ctag">>,
+ arguments = [{<<"x-stream-offset">>, longstr, <<"last">>}]},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ ok
+ end,
+
+ %% And receive the messages from the last committed offset to the end of the stream
+ receive_batch(Ch1, CommittedOffset, 99),
+
+ %% Publish a few more
+ [publish(Ch, Q, <<"msg2">>) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ %% Yeah! we got them
+ receive_batch(Ch1, 100, 199).
+
+consume_from_next(Config) ->
+ consume_from_next(Config, [{<<"x-stream-offset">>, longstr, <<"next">>}]).
+
+consume_from_default(Config) ->
+ consume_from_next(Config, []).
+
+consume_from_next(Config, Args) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+
+ [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [committed_offset]]),
+
+ %% We'll receive data from the last committed offset, let's check that is not the
+ %% first offset
+ CommittedOffset = proplists:get_value(committed_offset, Info),
+ ?assert(CommittedOffset > 0),
+
+ %% If the offset is not provided, we're subscribing to the tail of the stream
+ amqp_channel:subscribe(
+ Ch1, #'basic.consume'{queue = Q,
+ no_ack = false,
+ consumer_tag = <<"ctag">>,
+ arguments = Args},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ ok
+ end,
+
+ %% Publish a few more
+ [publish(Ch, Q, <<"msg2">>) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ %% Yeah! we got them
+ receive_batch(Ch1, 100, 199).
+
+consume_from_replica(Config) ->
+ [Server1, Server2 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch1, self()),
+ [publish(Ch1, Q, <<"msg1">>) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch1, 5000),
+
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
+ qos(Ch2, 10, false),
+
+ subscribe(Ch2, Q, false, 0),
+ receive_batch(Ch2, 0, 99).
+
+consume_credit(Config) ->
+ %% Because osiris provides one chunk on every read and we don't want to buffer
+ %% messages in the broker to avoid memory penalties, the credit value won't
+ %% be strict - we allow it into the negative values.
+ %% We can test that after receiving a chunk, no more messages are delivered until
+ %% the credit goes back to a positive value.
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ %% Let's publish a big batch, to ensure we have more than a chunk available
+ NumMsgs = 100,
+ [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, NumMsgs)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+
+ %% Let's subscribe with a small credit, easier to test
+ Credit = 2,
+ qos(Ch1, Credit, false),
+ subscribe(Ch1, Q, false, 0),
+
+ %% Receive everything
+ DeliveryTags = receive_batch(),
+
+ %% We receive at least the given credit as we know there are 100 messages in the queue
+ ?assert(length(DeliveryTags) >= Credit),
+
+ %% Let's ack as many messages as we can while avoiding a positive credit for new deliveries
+ {ToAck, Pending} = lists:split(length(DeliveryTags) - Credit, DeliveryTags),
+
+ [ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false})
+ || DeliveryTag <- ToAck],
+
+ %% Nothing here, this is good
+ receive
+ {#'basic.deliver'{}, _} ->
+ exit(unexpected_delivery)
+ after 1000 ->
+ ok
+ end,
+
+ %% Let's ack one more, we should receive a new chunk
+ ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = hd(Pending),
+ multiple = false}),
+
+ %% Yeah, here is the new chunk!
+ receive
+ {#'basic.deliver'{}, _} ->
+ ok
+ after 5000 ->
+ exit(timeout)
+ end.
+
+consume_credit_out_of_order_ack(Config) ->
+ %% Like consume_credit but acknowledging the messages out of order.
+ %% We want to ensure it doesn't behave like multiple, that is if we have
+ %% credit 2 and received 10 messages, sending the ack for the message id
+ %% number 10 should only increase credit by 1.
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ %% Let's publish a big batch, to ensure we have more than a chunk available
+ NumMsgs = 100,
+ [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, NumMsgs)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+
+ %% Let's subscribe with a small credit, easier to test
+ Credit = 2,
+ qos(Ch1, Credit, false),
+ subscribe(Ch1, Q, false, 0),
+
+ %% ******* This is the difference with consume_credit
+ %% Receive everything, let's reverse the delivery tags here so we ack out of order
+ DeliveryTags = lists:reverse(receive_batch()),
+
+ %% We receive at least the given credit as we know there are 100 messages in the queue
+ ?assert(length(DeliveryTags) >= Credit),
+
+ %% Let's ack as many messages as we can while avoiding a positive credit for new deliveries
+ {ToAck, Pending} = lists:split(length(DeliveryTags) - Credit, DeliveryTags),
+
+ [ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false})
+ || DeliveryTag <- ToAck],
+
+ %% Nothing here, this is good
+ receive
+ {#'basic.deliver'{}, _} ->
+ exit(unexpected_delivery)
+ after 1000 ->
+ ok
+ end,
+
+ %% Let's ack one more, we should receive a new chunk
+ ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = hd(Pending),
+ multiple = false}),
+
+ %% Yeah, here is the new chunk!
+ receive
+ {#'basic.deliver'{}, _} ->
+ ok
+ after 5000 ->
+ exit(timeout)
+ end.
+
+consume_credit_multiple_ack(Config) ->
+ %% Like consume_credit but acknowledging the messages out of order.
+ %% We want to ensure it doesn't behave like multiple, that is if we have
+ %% credit 2 and received 10 messages, sending the ack for the message id
+ %% number 10 should only increase credit by 1.
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ %% Let's publish a big batch, to ensure we have more than a chunk available
+ NumMsgs = 100,
+ [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, NumMsgs)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+
+ %% Let's subscribe with a small credit, easier to test
+ Credit = 2,
+ qos(Ch1, Credit, false),
+ subscribe(Ch1, Q, false, 0),
+
+ %% ******* This is the difference with consume_credit
+ %% Receive everything, let's reverse the delivery tags here so we ack out of order
+ DeliveryTag = lists:last(receive_batch()),
+
+ ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = true}),
+
+ %% Yeah, here is the new chunk!
+ receive
+ {#'basic.deliver'{}, _} ->
+ ok
+ after 5000 ->
+ exit(timeout)
+ end.
+
+max_length_bytes(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-max-length-bytes">>, long, 500},
+ {<<"x-max-segment-size">>, long, 250}])),
+
+ Payload = << <<"1">> || _ <- lists:seq(1, 500) >>,
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ [publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ %% We don't yet have reliable metrics, as the committed offset doesn't work
+ %% as a counter once we start applying retention policies.
+ %% Let's wait for messages and hope these are less than the number of published ones
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 100, false),
+ subscribe(Ch1, Q, false, 0),
+
+ ?assert(length(receive_batch()) < 100).
+
+max_age(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-max-age">>, longstr, <<"10s">>},
+ {<<"x-max-segment-size">>, long, 250}])),
+
+ Payload = << <<"1">> || _ <- lists:seq(1, 500) >>,
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ [publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ timer:sleep(10000),
+
+ %% Let's publish again so the new segments will trigger the retention policy
+ [publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 200, false),
+ subscribe(Ch1, Q, false, 0),
+ ?assertEqual(100, length(receive_batch())).
+
+leader_failover(Config) ->
+ [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch1, self()),
+ [publish(Ch1, Q, <<"msg">>) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch1, 5000),
+
+ check_leader_and_replicas(Config, Q, Server1, [Server2, Server3]),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
+ timer:sleep(30000),
+
+ [Info] = lists:filter(
+ fun(Props) ->
+ QName = rabbit_misc:r(<<"/">>, queue, Q),
+ lists:member({name, QName}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader, members]])),
+ NewLeader = proplists:get_value(leader, Info),
+ ?assert(NewLeader =/= Server1),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server1).
+
+invalid_policy(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"ha">>, <<"invalid_policy.*">>, <<"queues">>,
+ [{<<"ha-mode">>, <<"all">>}]),
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"ttl">>, <<"invalid_policy.*">>, <<"queues">>,
+ [{<<"message-ttl">>, 5}]),
+
+ [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [policy, operator_policy,
+ effective_policy_definition]]),
+
+ ?assertEqual('', proplists:get_value(policy, Info)),
+ ?assertEqual('', proplists:get_value(operator_policy, Info)),
+ ?assertEqual([], proplists:get_value(effective_policy_definition, Info)),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ha">>),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl">>).
+
+max_age_policy(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"age">>, <<"max_age_policy.*">>, <<"queues">>,
+ [{<<"max-age">>, <<"1Y">>}]),
+
+ [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [policy, operator_policy,
+ effective_policy_definition]]),
+
+ ?assertEqual(<<"age">>, proplists:get_value(policy, Info)),
+ ?assertEqual('', proplists:get_value(operator_policy, Info)),
+ ?assertEqual([{<<"max-age">>, <<"1Y">>}],
+ proplists:get_value(effective_policy_definition, Info)),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"age">>).
+
+max_segment_size_policy(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"segment">>, <<"max_segment_size.*">>, <<"queues">>,
+ [{<<"max-segment-size">>, 5000}]),
+
+ [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [policy, operator_policy,
+ effective_policy_definition]]),
+
+ ?assertEqual(<<"segment">>, proplists:get_value(policy, Info)),
+ ?assertEqual('', proplists:get_value(operator_policy, Info)),
+ ?assertEqual([{<<"max-segment-size">>, 5000}],
+ proplists:get_value(effective_policy_definition, Info)),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"segment">>).
+
+%%----------------------------------------------------------------------------
+
+delete_queues() ->
+ [{ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
+ || Q <- rabbit_amqqueue:list()].
+
+declare(Ch, Q) ->
+ declare(Ch, Q, []).
+
+declare(Ch, Q, Args) ->
+ amqp_channel:call(Ch, #'queue.declare'{queue = Q,
+ durable = true,
+ auto_delete = false,
+ arguments = Args}).
+assert_queue_type(Server, Q, Expected) ->
+ Actual = get_queue_type(Server, Q),
+ Expected = Actual.
+
+get_queue_type(Server, Q0) ->
+ QNameRes = rabbit_misc:r(<<"/">>, queue, Q0),
+ {ok, Q1} = rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]),
+ amqqueue:get_type(Q1).
+
+check_leader_and_replicas(Config, Name, Leader, Replicas0) ->
+ QNameRes = rabbit_misc:r(<<"/">>, queue, Name),
+ [Info] = lists:filter(
+ fun(Props) ->
+ lists:member({name, QNameRes}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader, members]])),
+ ?assertEqual(Leader, proplists:get_value(leader, Info)),
+ Replicas = lists:sort(Replicas0),
+ ?assertEqual(Replicas, lists:sort(proplists:get_value(members, Info))).
+
+publish(Ch, Queue) ->
+ publish(Ch, Queue, <<"msg">>).
+
+publish(Ch, Queue, Msg) ->
+ ok = amqp_channel:cast(Ch,
+ #'basic.publish'{routing_key = Queue},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = Msg}).
+
+subscribe(Ch, Queue, NoAck, Offset) ->
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
+ no_ack = NoAck,
+ consumer_tag = <<"ctag">>,
+ arguments = [{<<"x-stream-offset">>, long, Offset}]},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ ok
+ end.
+
+qos(Ch, Prefetch, Global) ->
+ ?assertMatch(#'basic.qos_ok'{},
+ amqp_channel:call(Ch, #'basic.qos'{global = Global,
+ prefetch_count = Prefetch})).
+
+receive_batch(Ch, N, N) ->
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag},
+ #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, N}]}}} ->
+ ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false})
+ after 5000 ->
+ exit({missing_offset, N})
+ end;
+receive_batch(Ch, N, M) ->
+ receive
+ {_,
+ #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S}]}}}
+ when S < N ->
+ exit({unexpected_offset, S});
+ {#'basic.deliver'{delivery_tag = DeliveryTag},
+ #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, N}]}}} ->
+ ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false}),
+ receive_batch(Ch, N + 1, M)
+ after 5000 ->
+ exit({missing_offset, N})
+ end.
+
+receive_batch() ->
+ receive_batch([]).
+
+receive_batch(Acc) ->
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ receive_batch([DeliveryTag | Acc])
+ after 5000 ->
+ lists:reverse(Acc)
+ end.
+
+run_proper(Fun, Args, NumTests) ->
+ ?assertEqual(
+ true,
+ proper:counterexample(
+ erlang:apply(Fun, Args),
+ [{numtests, NumTests},
+ {on_output, fun(".", _) -> ok; % don't print the '.'s on new lines
+ (F, A) -> ct:pal(?LOW_IMPORTANCE, F, A)
+ end}])).
diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl
index 013e625159..8b2c1d6ebb 100644
--- a/test/simple_ha_SUITE.erl
+++ b/test/simple_ha_SUITE.erl
@@ -234,8 +234,10 @@ consume_survives(Config,
DeathFun(Config, A),
%% verify that the consumer got all msgs, or die - the await_response
%% calls throw an exception if anything goes wrong....
- rabbit_ha_test_consumer:await_response(ConsumerPid),
+ ct:pal("awaiting produce ~w", [ProducerPid]),
rabbit_ha_test_producer:await_response(ProducerPid),
+ ct:pal("awaiting consumer ~w", [ConsumerPid]),
+ rabbit_ha_test_consumer:await_response(ConsumerPid),
ok.
confirms_survive(Config, DeathFun) ->
diff --git a/test/unit_log_config_SUITE.erl b/test/unit_log_config_SUITE.erl
index 3610fd1a80..6be403fd3e 100644
--- a/test/unit_log_config_SUITE.erl
+++ b/test/unit_log_config_SUITE.erl
@@ -126,6 +126,10 @@ sink_rewrite_sinks() ->
{rabbit_log_mirroring_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]},
+ {rabbit_log_osiris_lager_event,
+ [{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
+ {rabbit_handlers,
+ [{lager_forwarder_backend,[lager_event,info]}]}]},
{rabbit_log_prelaunch_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]},
@@ -226,6 +230,10 @@ sink_handlers_merged_with_lager_extra_sinks_handlers(_) ->
{rabbit_log_mirroring_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]},
+ {rabbit_log_osiris_lager_event,
+ [{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]},
+ {rabbit_handlers,
+ [{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]},
{rabbit_log_prelaunch_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]},
@@ -317,6 +325,10 @@ level_sinks() ->
{rabbit_log_mirroring_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,error]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,error]}]}]},
+ {rabbit_log_osiris_lager_event,
+ [{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
+ {rabbit_handlers,
+ [{lager_forwarder_backend,[lager_event,info]}]}]},
{rabbit_log_prelaunch_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]},
@@ -427,6 +439,10 @@ file_sinks(DefaultLevel) ->
{rabbit_log_mirroring_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]},
+ {rabbit_log_osiris_lager_event,
+ [{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]},
+ {rabbit_handlers,
+ [{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]},
{rabbit_log_prelaunch_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]},
@@ -674,6 +690,10 @@ default_expected_sinks(UpgradeFile) ->
{rabbit_log_mirroring_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]},
+ {rabbit_log_osiris_lager_event,
+ [{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
+ {rabbit_handlers,
+ [{lager_forwarder_backend,[lager_event,info]}]}]},
{rabbit_log_prelaunch_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]},
@@ -761,6 +781,10 @@ tty_expected_sinks() ->
{rabbit_log_mirroring_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]},
+ {rabbit_log_osiris_lager_event,
+ [{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
+ {rabbit_handlers,
+ [{lager_forwarder_backend,[lager_event,info]}]}]},
{rabbit_log_prelaunch_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]},