diff options
Diffstat (limited to 'test')
| -rw-r--r-- | test/backing_queue_SUITE.erl | 77 | ||||
| -rw-r--r-- | test/channel_operation_timeout_SUITE.erl | 3 | ||||
| -rw-r--r-- | test/confirms_rejects_SUITE.erl | 17 | ||||
| -rw-r--r-- | test/dead_lettering_SUITE.erl | 6 | ||||
| -rw-r--r-- | test/dynamic_ha_SUITE.erl | 8 | ||||
| -rw-r--r-- | test/queue_parallel_SUITE.erl | 21 | ||||
| -rw-r--r-- | test/queue_type_SUITE.erl | 234 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 83 | ||||
| -rw-r--r-- | test/quorum_queue_utils.erl | 9 | ||||
| -rw-r--r-- | test/rabbit_confirms_SUITE.erl | 154 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/rabbit_fifo_int_SUITE.erl | 186 | ||||
| -rw-r--r-- | test/rabbit_ha_test_consumer.erl | 7 | ||||
| -rw-r--r-- | test/rabbit_msg_record_SUITE.erl | 213 | ||||
| -rw-r--r-- | test/rabbit_stream_queue_SUITE.erl | 1304 | ||||
| -rw-r--r-- | test/simple_ha_SUITE.erl | 4 | ||||
| -rw-r--r-- | test/unit_log_config_SUITE.erl | 24 |
17 files changed, 2193 insertions, 159 deletions
diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl index 2025576a57..ff37e1fb04 100644 --- a/test/backing_queue_SUITE.erl +++ b/test/backing_queue_SUITE.erl @@ -684,17 +684,17 @@ bq_variable_queue_delete_msg_store_files_callback1(Config) -> QPid = amqqueue:get_pid(Q), Payload = <<0:8388608>>, %% 1MB Count = 30, - publish_and_confirm(Q, Payload, Count), + QTState = publish_and_confirm(Q, Payload, Count), rabbit_amqqueue:set_ram_duration_target(QPid, 0), {ok, Limiter} = rabbit_limiter:start_link(no_id), CountMinusOne = Count - 1, - {ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} = - rabbit_amqqueue:basic_get(Q, self(), true, Limiter, + {ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}, _} = + rabbit_amqqueue:basic_get(Q, true, Limiter, <<"bq_variable_queue_delete_msg_store_files_callback1">>, - #{}), + QTState), {ok, CountMinusOne} = rabbit_amqqueue:purge(Q), %% give the queue a second to receive the close_fds callback msg @@ -713,8 +713,7 @@ bq_queue_recover1(Config) -> {new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>), QName = amqqueue:get_name(Q), QPid = amqqueue:get_pid(Q), - publish_and_confirm(Q, <<>>, Count), - + QT = publish_and_confirm(Q, <<>>, Count), SupPid = get_queue_sup_pid(Q), true = is_pid(SupPid), exit(SupPid, kill), @@ -724,7 +723,7 @@ bq_queue_recover1(Config) -> after 10000 -> exit(timeout_waiting_for_queue_death) end, rabbit_amqqueue:stop(?VHOST), - {Recovered, [], []} = rabbit_amqqueue:recover(?VHOST), + {Recovered, []} = rabbit_amqqueue:recover(?VHOST), rabbit_amqqueue:start(Recovered), {ok, Limiter} = rabbit_limiter:start_link(no_id), rabbit_amqqueue:with_or_die( @@ -732,9 +731,9 @@ bq_queue_recover1(Config) -> fun (Q1) when ?is_amqqueue(Q1) -> QPid1 = amqqueue:get_pid(Q1), CountMinusOne = Count - 1, - {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = - rabbit_amqqueue:basic_get(Q1, self(), false, Limiter, - <<"bq_queue_recover1">>, #{}), + {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}, _} = + rabbit_amqqueue:basic_get(Q1, false, Limiter, + <<"bq_queue_recover1">>, QT), exit(QPid1, shutdown), VQ1 = variable_queue_init(Q, true), {{_Msg1, true, _AckTag1}, VQ2} = @@ -1366,25 +1365,34 @@ variable_queue_init(Q, Recover) -> publish_and_confirm(Q, Payload, Count) -> Seqs = lists:seq(1, Count), - [begin - Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{delivery_mode = 2}, - Payload), - Delivery = #delivery{mandatory = false, sender = self(), - confirm = true, message = Msg, msg_seq_no = Seq, - flow = noflow}, - _QPids = rabbit_amqqueue:deliver([Q], Delivery) - end || Seq <- Seqs], - wait_for_confirms(gb_sets:from_list(Seqs)). + QTState0 = rabbit_queue_type:new(Q, rabbit_queue_type:init()), + QTState = + lists:foldl( + fun (Seq, Acc0) -> + Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{delivery_mode = 2}, + Payload), + Delivery = #delivery{mandatory = false, sender = self(), + confirm = true, message = Msg, msg_seq_no = Seq, + flow = noflow}, + {ok, Acc, _Actions} = rabbit_queue_type:deliver([Q], Delivery, Acc0), + Acc + end, QTState0, Seqs), + wait_for_confirms(gb_sets:from_list(Seqs)), + QTState. wait_for_confirms(Unconfirmed) -> case gb_sets:is_empty(Unconfirmed) of true -> ok; - false -> receive {'$gen_cast', {confirm, Confirmed, _}} -> + false -> receive {'$gen_cast', + {queue_event, _QName, + {confirm, Confirmed, _}}} -> wait_for_confirms( rabbit_misc:gb_sets_difference( Unconfirmed, gb_sets:from_list(Confirmed))) - after ?TIMEOUT -> exit(timeout_waiting_for_confirm) + after ?TIMEOUT -> + flush(), + exit(timeout_waiting_for_confirm) end end. @@ -1436,6 +1444,7 @@ variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) -> variable_queue_wait_for_shuffling_end( lists:foldl( fun (N, VQN) -> + rabbit_variable_queue:publish( rabbit_basic:message( rabbit_misc:r(<<>>, exchange, <<>>), @@ -1526,12 +1535,13 @@ variable_queue_status(VQ) -> variable_queue_wait_for_shuffling_end(VQ) -> case credit_flow:blocked() of false -> VQ; - true -> receive - {bump_credit, Msg} -> - credit_flow:handle_bump_msg(Msg), - variable_queue_wait_for_shuffling_end( - rabbit_variable_queue:resume(VQ)) - end + true -> + receive + {bump_credit, Msg} -> + credit_flow:handle_bump_msg(Msg), + variable_queue_wait_for_shuffling_end( + rabbit_variable_queue:resume(VQ)) + end end. msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) -> @@ -1576,11 +1586,13 @@ variable_queue_with_holes(VQ0) -> fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ7), %% assertions Status = variable_queue_status(VQ8), + vq_with_holes_assertions(VQ8, proplists:get_value(mode, Status)), Depth = Count + Interval, Depth = rabbit_variable_queue:depth(VQ8), Len = Depth - length(Subset3), Len = rabbit_variable_queue:len(VQ8), + {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + Interval), VQ8}. vq_with_holes_assertions(VQ, default) -> @@ -1604,3 +1616,12 @@ check_variable_queue_status(VQ0, Props) -> S = variable_queue_status(VQ1), assert_props(S, Props), VQ1. + +flush() -> + receive + Any -> + ct:pal("flush ~p", [Any]), + flush() + after 0 -> + ok + end. diff --git a/test/channel_operation_timeout_SUITE.erl b/test/channel_operation_timeout_SUITE.erl index f8da35d6ff..15e0188604 100644 --- a/test/channel_operation_timeout_SUITE.erl +++ b/test/channel_operation_timeout_SUITE.erl @@ -71,11 +71,13 @@ notify_down_all(Config) -> RabbitCh = rabbit_ct_client_helpers:open_channel(Config, 0), HareCh = rabbit_ct_client_helpers:open_channel(Config, 1), + ct:pal("one"), %% success set_channel_operation_timeout_config(Config, 1000), configure_bq(Config), QCfg0 = qconfig(RabbitCh, <<"q0">>, <<"ex0">>, true, false), declare(QCfg0), + ct:pal("two"), %% Testing rabbit_amqqueue:notify_down_all via rabbit_channel. %% Consumer count = 0 after correct channel termination and %% notification of queues via delegate:call/3 @@ -83,6 +85,7 @@ notify_down_all(Config) -> rabbit_ct_client_helpers:close_channel(RabbitCh), 0 = length(get_consumers(Config, Rabbit, ?DEFAULT_VHOST)), false = is_process_alive(RabbitCh), + ct:pal("three"), %% fail set_channel_operation_timeout_config(Config, 10), diff --git a/test/confirms_rejects_SUITE.erl b/test/confirms_rejects_SUITE.erl index aaaeb4a939..a51253885c 100644 --- a/test/confirms_rejects_SUITE.erl +++ b/test/confirms_rejects_SUITE.erl @@ -388,9 +388,12 @@ kill_the_queue(QueueName) -> [begin {ok, Q} = rabbit_amqqueue:lookup({resource, <<"/">>, queue, QueueName}), Pid = amqqueue:get_pid(Q), + ct:pal("~w killed", [Pid]), + timer:sleep(1), exit(Pid, kill) end - || _ <- lists:seq(1, 11)], + || _ <- lists:seq(1, 50)], + timer:sleep(1), {ok, Q} = rabbit_amqqueue:lookup({resource, <<"/">>, queue, QueueName}), Pid = amqqueue:get_pid(Q), case is_process_alive(Pid) of @@ -399,7 +402,11 @@ kill_the_queue(QueueName) -> false -> ok end. - - - - +flush() -> + receive + Any -> + ct:pal("flush ~p", [Any]), + flush() + after 0 -> + ok + end. diff --git a/test/dead_lettering_SUITE.erl b/test/dead_lettering_SUITE.erl index 87b5566c57..4ee917aa21 100644 --- a/test/dead_lettering_SUITE.erl +++ b/test/dead_lettering_SUITE.erl @@ -1059,9 +1059,11 @@ dead_letter_headers_BCC(Config) -> ?assertMatch({array, _}, rabbit_misc:table_lookup(Headers3, <<"x-death">>)). -%% Three top-level headers are added for the very first dead-lettering event. They are +%% Three top-level headers are added for the very first dead-lettering event. +%% They are %% x-first-death-reason, x-first-death-queue, x-first-death-exchange -%% They have the same values as the reason, queue, and exchange fields of the original +%% They have the same values as the reason, queue, and exchange fields of the +%% original %% dead lettering event. Once added, these headers are never modified. dead_letter_headers_first_death(Config) -> {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index 25027c7ef9..c881aef8a1 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -424,8 +424,7 @@ nodes_policy_should_pick_master_from_its_params(Config) -> nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, A), - ?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A], - [all])), + ?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A], [all])), %% --> Master: A %% Slaves: [B, C] or [C, B] SSPids = ?awaitMatch(SSPids when is_list(SSPids), @@ -450,7 +449,7 @@ nodes_policy_should_pick_master_from_its_params(Config) -> %% should instead use an existing synchronised mirror as the new master, %% even though that isn't in the policy. ?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A], - [{nodes, [LastSlave, A]}])), + [{nodes, [LastSlave, A]}])), %% --> Master: B or C (same as previous policy) %% Slaves: [A] @@ -931,6 +930,7 @@ apply_in_parallel(Config, Nodes, Policies) -> Self = self(), [spawn_link(fun() -> [begin + apply_policy(Config, N, Policy) end || Policy <- Policies], Self ! parallel_task_done @@ -969,7 +969,7 @@ wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries) -> %% Let's wait a bit longer. timer:sleep(1000), wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries - 1); - FinalInfo -> + {ok, FinalInfo} -> %% The last policy is the final state LastPolicy = lists:last(TestedPolicies), case verify_policy(LastPolicy, FinalInfo) of diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl index c4d16a5900..0fbf7ec975 100644 --- a/test/queue_parallel_SUITE.erl +++ b/test/queue_parallel_SUITE.erl @@ -57,7 +57,8 @@ groups() -> trigger_message_store_compaction]}, {quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}, {quorum_queue_in_memory_limit, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}, - {quorum_queue_in_memory_bytes, [parallel], AllTests ++ [delete_immediately_by_pid_fails]} + {quorum_queue_in_memory_bytes, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}, + {stream_queue, [parallel], AllTests} ]} ]. @@ -122,13 +123,24 @@ init_per_group(mirrored_queue, Config) -> {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, {queue_durable, true}]), rabbit_ct_helpers:run_steps(Config1, []); +init_per_group(stream_queue, Config) -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config, stream_queue) of + ok -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"stream">>}]}, + {queue_durable, true}]); + Skip -> + Skip + end; init_per_group(Group, Config0) -> case lists:member({group, Group}, all()) of true -> ClusterSize = 3, Config = rabbit_ct_helpers:merge_app_env( Config0, {rabbit, [{channel_tick_interval, 1000}, - {quorum_tick_interval, 1000}]}), + {quorum_tick_interval, 1000}, + {stream_tick_interval, 1000}]}), Config1 = rabbit_ct_helpers:set_config( Config, [ {rmq_nodename_suffix, Group}, {rmq_nodes_count, ClusterSize} @@ -514,6 +526,11 @@ basic_cancel(Config) -> publish(Ch, QName, [<<"msg1">>]), wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), CTag = atom_to_binary(?FUNCTION_NAME, utf8), + + %% Let's set consumer prefetch so it works with stream queues + ?assertMatch(#'basic.qos_ok'{}, + amqp_channel:call(Ch, #'basic.qos'{global = false, + prefetch_count = 1})), subscribe(Ch, QName, false, CTag), receive {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> diff --git a/test/queue_type_SUITE.erl b/test/queue_type_SUITE.erl new file mode 100644 index 0000000000..eeeabc3d1e --- /dev/null +++ b/test/queue_type_SUITE.erl @@ -0,0 +1,234 @@ +-module(queue_type_SUITE). + +-compile(export_all). + +-export([ + ]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +%%%=================================================================== +%%% Common Test callbacks +%%%=================================================================== + +all() -> + [ + {group, classic}, + {group, quorum} + ]. + + +all_tests() -> + [ + smoke + ]. + +groups() -> + [ + {classic, [], all_tests()}, + {quorum, [], all_tests()} + ]. + +init_per_suite(Config0) -> + rabbit_ct_helpers:log_environment(), + Config = rabbit_ct_helpers:merge_app_env( + Config0, {rabbit, [{quorum_tick_interval, 1000}]}), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config), + ok. + +init_per_group(Group, Config) -> + ClusterSize = 3, + Config1 = rabbit_ct_helpers:set_config(Config, + [{rmq_nodes_count, ClusterSize}, + {rmq_nodename_suffix, Group}, + {tcp_ports_base}]), + Config1b = rabbit_ct_helpers:set_config(Config1, + [{queue_type, atom_to_binary(Group, utf8)}, + {net_ticktime, 10}]), + Config2 = rabbit_ct_helpers:run_steps(Config1b, + [fun merge_app_env/1 ] ++ + rabbit_ct_broker_helpers:setup_steps()), + Config3 = + case rabbit_ct_broker_helpers:enable_feature_flag(Config2, quorum_queue) of + ok -> + ok = rabbit_ct_broker_helpers:rpc( + Config2, 0, application, set_env, + [rabbit, channel_tick_interval, 100]), + %% HACK: the larger cluster sizes benefit for a bit more time + %% after clustering before running the tests. + case Group of + cluster_size_5 -> + timer:sleep(5000), + Config2; + _ -> + Config2 + end; + Skip -> + end_per_group(Group, Config2), + Skip + end, + rabbit_ct_broker_helpers:set_policy( + Config3, 0, + <<"ha-policy">>, <<".*">>, <<"queues">>, + [{<<"ha-mode">>, <<"all">>}]), + Config3. + +merge_app_env(Config) -> + rabbit_ct_helpers:merge_app_env( + rabbit_ct_helpers:merge_app_env(Config, + {rabbit, + [{core_metrics_gc_interval, 100}, + {log, [{file, [{level, debug}]}]}]}), + {ra, [{min_wal_roll_over_interval, 30000}]}). + +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), + Q = rabbit_data_coercion:to_binary(Testcase), + Config2 = rabbit_ct_helpers:set_config(Config1, + [{queue_name, Q}, + {alt_queue_name, <<Q/binary, "_alt">>} + ]), + rabbit_ct_helpers:run_steps(Config2, + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config) -> + catch delete_queues(), + Config1 = rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_client_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%%%=================================================================== +%%% Test cases +%%%=================================================================== + +smoke(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QName = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, + ?config(queue_type, Config)}])), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, QName, <<"msg1">>), + ct:pal("waiting for confirms from ~s", [QName]), + ok = receive + #'basic.ack'{} -> ok; + #'basic.nack'{} -> fail + after 2500 -> + flush(), + exit(confirm_timeout) + end, + DTag = basic_get(Ch, QName), + + basic_ack(Ch, DTag), + basic_get_empty(Ch, QName), + + %% consume + publish(Ch, QName, <<"msg2">>), + ConsumerTag1 = <<"ctag1">>, + ok = subscribe(Ch, QName, ConsumerTag1), + %% receive and ack + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, + #amqp_msg{}} -> + basic_ack(Ch, DeliveryTag) + after 5000 -> + flush(), + exit(basic_deliver_timeout) + end, + basic_cancel(Ch, ConsumerTag1), + + %% assert empty + basic_get_empty(Ch, QName), + + %% consume and nack + ConsumerTag2 = <<"ctag2">>, + ok = subscribe(Ch, QName, ConsumerTag2), + publish(Ch, QName, <<"msg3">>), + receive + {#'basic.deliver'{delivery_tag = T, + redelivered = false}, + #amqp_msg{}} -> + basic_cancel(Ch, ConsumerTag2), + basic_nack(Ch, T) + after 5000 -> + exit(basic_deliver_timeout) + end, + %% get and ack + basic_ack(Ch, basic_get(Ch, QName)), + ok. + +%% Utility +delete_queues() -> + [rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) + || Q <- rabbit_amqqueue:list()]. + +declare(Ch, Q, Args) -> + amqp_channel:call(Ch, #'queue.declare'{queue = Q, + durable = true, + auto_delete = false, + arguments = Args}). + +publish(Ch, Queue, Msg) -> + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = Queue}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = Msg}). + +basic_get(Ch, Queue) -> + {GetOk, _} = Reply = amqp_channel:call(Ch, #'basic.get'{queue = Queue, + no_ack = false}), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{}}, Reply), + GetOk#'basic.get_ok'.delivery_tag. + +basic_get_empty(Ch, Queue) -> + ?assertMatch(#'basic.get_empty'{}, + amqp_channel:call(Ch, #'basic.get'{queue = Queue, + no_ack = false})). + +subscribe(Ch, Queue, CTag) -> + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, + no_ack = false, + consumer_tag = CTag}, + self()), + receive + #'basic.consume_ok'{consumer_tag = CTag} -> + ok + after 5000 -> + exit(basic_consume_timeout) + end. + +basic_ack(Ch, DTag) -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag, + multiple = false}). + +basic_cancel(Ch, CTag) -> + #'basic.cancel_ok'{} = + amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}). + +basic_nack(Ch, DTag) -> + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag, + requeue = true, + multiple = false}). + +flush() -> + receive + Any -> + ct:pal("flush ~p", [Any]), + flush() + after 0 -> + ok + end. diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index ecb4fdac63..16042b71e8 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -383,13 +383,19 @@ start_queue(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config, Server), LQ = ?config(queue_name, Config), + %% The stream coordinator is also a ra process, we need to ensure the quorum tests + %% are not affected by any other ra cluster that could be added in the future + Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + ?assertEqual({'queue.declare_ok', LQ, 0, 0}, declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), %% Check that the application and one ra node are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Expected = Children + 1, + ?assertMatch(Expected, + length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))), %% Test declare an existing queue ?assertEqual({'queue.declare_ok', LQ, 0, 0}, @@ -405,7 +411,8 @@ start_queue(Config) -> %% Check that the application and process are still up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])). + ?assertMatch(Expected, + length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))). start_queue_concurrent(Config) -> Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -463,6 +470,10 @@ quorum_cluster_size_x(Config, Max, Expected) -> stop_queue(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + %% The stream coordinator is also a ra process, we need to ensure the quorum tests + %% are not affected by any other ra cluster that could be added in the future + Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), LQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', LQ, 0, 0}, @@ -471,13 +482,15 @@ stop_queue(Config) -> %% Check that the application and one ra node are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Expected = Children + 1, + ?assertMatch(Expected, + length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))), %% Delete the quorum queue ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = LQ})), %% Check that the application and process are down wait_until(fun() -> - [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]) + Children == length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])) end), ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))). @@ -485,6 +498,10 @@ stop_queue(Config) -> restart_queue(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + %% The stream coordinator is also a ra process, we need to ensure the quorum tests + %% are not affected by any other ra cluster that could be added in the future + Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), LQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', LQ, 0, 0}, @@ -496,7 +513,9 @@ restart_queue(Config) -> %% Check that the application and one ra node are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])). + Expected = Children + 1, + ?assertMatch(Expected, + length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))). idempotent_recover(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), @@ -554,6 +573,10 @@ restart_all_types(Config) -> %% ensure there are no regressions Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + %% The stream coordinator is also a ra process, we need to ensure the quorum tests + %% are not affected by any other ra cluster that could be added in the future + Children = rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ1 = <<"restart_all_types-qq1">>, ?assertEqual({'queue.declare_ok', QQ1, 0, 0}, @@ -575,7 +598,9 @@ restart_all_types(Config) -> %% Check that the application and two ra nodes are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Expected = length(Children) + 2, + Got = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + ?assertMatch(Expected, Got), %% Check the classic queues restarted correctly Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), {#'basic.get_ok'{}, #amqp_msg{}} = @@ -592,6 +617,10 @@ stop_start_rabbit_app(Config) -> %% classic) to ensure there are no regressions Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + %% The stream coordinator is also a ra process, we need to ensure the quorum tests + %% are not affected by any other ra cluster that could be added in the future + Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ1 = <<"stop_start_rabbit_app-qq">>, ?assertEqual({'queue.declare_ok', QQ1, 0, 0}, @@ -617,7 +646,9 @@ stop_start_rabbit_app(Config) -> %% Check that the application and two ra nodes are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Expected = Children + 2, + ?assertMatch(Expected, + length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))), %% Check the classic queues restarted correctly Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), {#'basic.get_ok'{}, #amqp_msg{}} = @@ -935,6 +966,10 @@ cleanup_queue_state_on_channel_after_publish(Config) -> %% to verify that the cleanup is propagated through channels [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + %% The stream coordinator is also a ra process, we need to ensure the quorum tests + %% are not affected by any other ra cluster that could be added in the future + Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), @@ -955,18 +990,22 @@ cleanup_queue_state_on_channel_after_publish(Config) -> ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})), wait_until(fun() -> - [] == rpc:call(Server, supervisor, which_children, - [ra_server_sup_sup]) + Children == length(rpc:call(Server, supervisor, which_children, + [ra_server_sup_sup])) end), %% Check that all queue states have been cleaned - wait_for_cleanup(Server, NCh1, 0), - wait_for_cleanup(Server, NCh2, 0). + wait_for_cleanup(Server, NCh2, 0), + wait_for_cleanup(Server, NCh1, 0). cleanup_queue_state_on_channel_after_subscribe(Config) -> %% Declare/delete the queue and publish in one channel, while consuming on a %% different one to verify that the cleanup is propagated through channels [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + %% The stream coordinator is also a ra process, we need to ensure the quorum tests + %% are not affected by any other ra cluster that could be added in the future + Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), @@ -993,7 +1032,7 @@ cleanup_queue_state_on_channel_after_subscribe(Config) -> wait_for_cleanup(Server, NCh2, 1), ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})), wait_until(fun() -> - [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]) + Children == length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])) end), %% Check that all queue states have been cleaned wait_for_cleanup(Server, NCh1, 0), @@ -1596,8 +1635,8 @@ cleanup_data_dir(Config) -> declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), timer:sleep(100), - [{_, UId1}] = rpc:call(Server1, ra_directory, list_registered, []), - [{_, UId2}] = rpc:call(Server2, ra_directory, list_registered, []), + UId1 = proplists:get_value(ra_name(QQ), rpc:call(Server1, ra_directory, list_registered, [])), + UId2 = proplists:get_value(ra_name(QQ), rpc:call(Server2, ra_directory, list_registered, [])), DataDir1 = rpc:call(Server1, ra_env, server_data_dir, [UId1]), DataDir2 = rpc:call(Server2, ra_env, server_data_dir, [UId2]), ?assert(filelib:is_dir(DataDir1)), @@ -1748,6 +1787,11 @@ reconnect_consumer_and_wait_channel_down(Config) -> delete_immediately_by_resource(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + + %% The stream coordinator is also a ra process, we need to ensure the quorum tests + %% are not affected by any other ra cluster that could be added in the future + Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), @@ -1756,7 +1800,7 @@ delete_immediately_by_resource(Config) -> %% Check that the application and process are down wait_until(fun() -> - [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]) + Children == length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])) end), ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))). @@ -1784,6 +1828,8 @@ subscribe_redelivery_count(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = true}) + after 5000 -> + exit(basic_deliver_timeout) end, receive @@ -1794,6 +1840,8 @@ subscribe_redelivery_count(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, multiple = false, requeue = true}) + after 5000 -> + exit(basic_deliver_timeout_2) end, receive @@ -1803,8 +1851,13 @@ subscribe_redelivery_count(Config) -> ?assertMatch({DCHeader, _, 2}, rabbit_basic:header(DCHeader, H2)), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2, multiple = false}), + ct:pal("wait_for_messages_ready", []), wait_for_messages_ready(Servers, RaName, 0), + ct:pal("wait_for_messages_pending_ack", []), wait_for_messages_pending_ack(Servers, RaName, 0) + after 5000 -> + flush(500), + exit(basic_deliver_timeout_3) end. subscribe_redelivery_limit(Config) -> diff --git a/test/quorum_queue_utils.erl b/test/quorum_queue_utils.erl index 95ddc892f1..caabd617ae 100644 --- a/test/quorum_queue_utils.erl +++ b/test/quorum_queue_utils.erl @@ -28,15 +28,10 @@ wait_for_messages_total(Servers, QName, Total) -> wait_for_messages(Servers, QName, Number, Fun, 0) -> Msgs = dirty_query(Servers, QName, Fun), - Totals = lists:map(fun(M) when is_map(M) -> - maps:size(M); - (_) -> - -1 - end, Msgs), - ?assertEqual(Totals, [Number || _ <- lists:seq(1, length(Servers))]); + ?assertEqual(Msgs, [Number || _ <- lists:seq(1, length(Servers))]); wait_for_messages(Servers, QName, Number, Fun, N) -> Msgs = dirty_query(Servers, QName, Fun), - ct:pal("Got messages ~p", [Msgs]), + ct:pal("Got messages ~p ~p", [QName, Msgs]), %% hack to allow the check to succeed in mixed versions clusters if at %% least one node matches the criteria rather than all nodes for F = case is_mixed_versions() of diff --git a/test/rabbit_confirms_SUITE.erl b/test/rabbit_confirms_SUITE.erl new file mode 100644 index 0000000000..331c3ca7c3 --- /dev/null +++ b/test/rabbit_confirms_SUITE.erl @@ -0,0 +1,154 @@ +-module(rabbit_confirms_SUITE). + +-compile(export_all). + +-export([ + ]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +%%%=================================================================== +%%% Common Test callbacks +%%%=================================================================== + +all() -> + [ + {group, tests} + ]. + + +all_tests() -> + [ + confirm, + reject, + remove_queue + ]. + +groups() -> + [ + {tests, [], all_tests()} + ]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + ok. + +%%%=================================================================== +%%% Test cases +%%%=================================================================== + +confirm(_Config) -> + XName = rabbit_misc:r(<<"/">>, exchange, <<"X">>), + QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>), + QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>), + U0 = rabbit_confirms:init(), + ?assertEqual(0, rabbit_confirms:size(U0)), + ?assertEqual(undefined, rabbit_confirms:smallest(U0)), + ?assertEqual(true, rabbit_confirms:is_empty(U0)), + + U1 = rabbit_confirms:insert(1, [QName], XName, U0), + ?assertEqual(1, rabbit_confirms:size(U1)), + ?assertEqual(1, rabbit_confirms:smallest(U1)), + ?assertEqual(false, rabbit_confirms:is_empty(U1)), + + {[{1, XName}], U2} = rabbit_confirms:confirm([1], QName, U1), + ?assertEqual(0, rabbit_confirms:size(U2)), + ?assertEqual(undefined, rabbit_confirms:smallest(U2)), + ?assertEqual(true, rabbit_confirms:is_empty(U2)), + + U3 = rabbit_confirms:insert(2, [QName], XName, U1), + ?assertEqual(2, rabbit_confirms:size(U3)), + ?assertEqual(1, rabbit_confirms:smallest(U3)), + ?assertEqual(false, rabbit_confirms:is_empty(U3)), + + {[{1, XName}], U4} = rabbit_confirms:confirm([1], QName, U3), + ?assertEqual(1, rabbit_confirms:size(U4)), + ?assertEqual(2, rabbit_confirms:smallest(U4)), + ?assertEqual(false, rabbit_confirms:is_empty(U4)), + + U5 = rabbit_confirms:insert(2, [QName, QName2], XName, U1), + ?assertEqual(2, rabbit_confirms:size(U5)), + ?assertEqual(1, rabbit_confirms:smallest(U5)), + ?assertEqual(false, rabbit_confirms:is_empty(U5)), + + {[{1, XName}], U6} = rabbit_confirms:confirm([1, 2], QName, U5), + ?assertEqual(2, rabbit_confirms:smallest(U6)), + + {[{2, XName}], U7} = rabbit_confirms:confirm([2], QName2, U6), + ?assertEqual(0, rabbit_confirms:size(U7)), + ?assertEqual(undefined, rabbit_confirms:smallest(U7)), + + + U8 = rabbit_confirms:insert(2, [QName], XName, U1), + {[{1, XName}, {2, XName}], _U9} = rabbit_confirms:confirm([1, 2], QName, U8), + ok. + + +reject(_Config) -> + XName = rabbit_misc:r(<<"/">>, exchange, <<"X">>), + QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>), + QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>), + U0 = rabbit_confirms:init(), + ?assertEqual(0, rabbit_confirms:size(U0)), + ?assertEqual(undefined, rabbit_confirms:smallest(U0)), + ?assertEqual(true, rabbit_confirms:is_empty(U0)), + + U1 = rabbit_confirms:insert(1, [QName], XName, U0), + + {ok, {1, XName}, U2} = rabbit_confirms:reject(1, U1), + {error, not_found} = rabbit_confirms:reject(1, U2), + ?assertEqual(0, rabbit_confirms:size(U2)), + ?assertEqual(undefined, rabbit_confirms:smallest(U2)), + + U3 = rabbit_confirms:insert(2, [QName, QName2], XName, U1), + + {ok, {1, XName}, U4} = rabbit_confirms:reject(1, U3), + {error, not_found} = rabbit_confirms:reject(1, U4), + ?assertEqual(1, rabbit_confirms:size(U4)), + ?assertEqual(2, rabbit_confirms:smallest(U4)), + + {ok, {2, XName}, U5} = rabbit_confirms:reject(2, U3), + {error, not_found} = rabbit_confirms:reject(2, U5), + ?assertEqual(1, rabbit_confirms:size(U5)), + ?assertEqual(1, rabbit_confirms:smallest(U5)), + + ok. + +remove_queue(_Config) -> + XName = rabbit_misc:r(<<"/">>, exchange, <<"X">>), + QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>), + QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>), + U0 = rabbit_confirms:init(), + + U1 = rabbit_confirms:insert(1, [QName, QName2], XName, U0), + U2 = rabbit_confirms:insert(2, [QName2], XName, U1), + {[{2, XName}], U3} = rabbit_confirms:remove_queue(QName2, U2), + ?assertEqual(1, rabbit_confirms:size(U3)), + ?assertEqual(1, rabbit_confirms:smallest(U3)), + {[{1, XName}], U4} = rabbit_confirms:remove_queue(QName, U3), + ?assertEqual(0, rabbit_confirms:size(U4)), + ?assertEqual(undefined, rabbit_confirms:smallest(U4)), + + U5 = rabbit_confirms:insert(1, [QName], XName, U0), + U6 = rabbit_confirms:insert(2, [QName], XName, U5), + {[{1, XName}, {2, XName}], _U} = rabbit_confirms:remove_queue(QName, U6), + + ok. + + +%% Utility diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index d19dcb3682..7b90d91bfa 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -674,7 +674,7 @@ single_active_consumer_basic_get_test(_) -> ?assertEqual(single_active, State0#rabbit_fifo.cfg#cfg.consumer_strategy), ?assertEqual(0, map_size(State0#rabbit_fifo.consumers)), {State1, _} = enq(1, 1, first, State0), - {_State, {error, unsupported}} = + {_State, {error, {unsupported, single_active_consumer}}} = apply(meta(2), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), State1), ok. diff --git a/test/rabbit_fifo_int_SUITE.erl b/test/rabbit_fifo_int_SUITE.erl index b51975b062..b2ed7160a2 100644 --- a/test/rabbit_fifo_int_SUITE.erl +++ b/test/rabbit_fifo_int_SUITE.erl @@ -86,7 +86,7 @@ basics(Config) -> CustomerTag = UId, ok = start_cluster(ClusterName, [ServerId]), FState0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, undefined, FState0), + {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, #{}, FState0), ra_log_wal:force_roll_over(ra_log_wal), % create segment the segment will trigger a snapshot @@ -99,11 +99,10 @@ basics(Config) -> FState5 = receive {ra_event, From, Evt} -> case rabbit_fifo_client:handle_ra_event(From, Evt, FState3) of - {internal, _AcceptedSeqs, _Actions, _FState4} -> - exit(unexpected_internal_event); - {{delivery, C, [{MsgId, _Msg}]}, FState4} -> - {ok, S} = rabbit_fifo_client:settle(C, [MsgId], - FState4), + {ok, FState4, + [{deliver, C, true, + [{_Qname, _QRef, MsgId, _SomBool, _Msg}]}]} -> + {S, _A} = rabbit_fifo_client:settle(C, [MsgId], FState4), S end after 5000 -> @@ -129,10 +128,9 @@ basics(Config) -> receive {ra_event, Frm, E} -> case rabbit_fifo_client:handle_ra_event(Frm, E, FState6b) of - {internal, _, _, _FState7} -> - exit({unexpected_internal_event, E}); - {{delivery, Ctag, [{Mid, {_, two}}]}, FState7} -> - {ok, _S} = rabbit_fifo_client:return(Ctag, [Mid], FState7), + {ok, FState7, [{deliver, Ctag, true, + [{_, _, Mid, _, two}]}]} -> + {_, _} = rabbit_fifo_client:return(Ctag, [Mid], FState7), ok end after 2000 -> @@ -150,8 +148,8 @@ return(Config) -> {ok, F0} = rabbit_fifo_client:enqueue(1, msg1, F00), {ok, F1} = rabbit_fifo_client:enqueue(2, msg2, F0), {_, _, F2} = process_ra_events(receive_ra_events(2, 0), F1), - {ok, {{MsgId, _}, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2), - {ok, _F2} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F), + {ok, _, {_, _, MsgId, _, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2), + _F2 = rabbit_fifo_client:return(<<"tag">>, [MsgId], F), ra:stop_server(ServerId), ok. @@ -165,9 +163,9 @@ rabbit_fifo_returns_correlation(Config) -> receive {ra_event, Frm, E} -> case rabbit_fifo_client:handle_ra_event(Frm, E, F1) of - {internal, [corr1], [], _F2} -> + {ok, _F2, [{settled, _, _}]} -> ok; - {Del, _} -> + Del -> exit({unexpected, Del}) end after 2000 -> @@ -181,23 +179,24 @@ duplicate_delivery(Config) -> ServerId = ?config(node_id, Config), ok = start_cluster(ClusterName, [ServerId]), F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0), {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1), Fun = fun Loop(S0) -> receive {ra_event, Frm, E} = Evt -> case rabbit_fifo_client:handle_ra_event(Frm, E, S0) of - {internal, [corr1], [], S1} -> + {ok, S1, [{settled, _, _}]} -> Loop(S1); - {_Del, S1} -> + {ok, S1, _} -> %% repeat event delivery self() ! Evt, %% check that then next received delivery doesn't %% repeat or crash receive {ra_event, F, E1} -> - case rabbit_fifo_client:handle_ra_event(F, E1, S1) of - {internal, [], [], S2} -> + case rabbit_fifo_client:handle_ra_event( + F, E1, S1) of + {ok, S2, _} -> S2 end end @@ -215,7 +214,7 @@ usage(Config) -> ServerId = ?config(node_id, Config), ok = start_cluster(ClusterName, [ServerId]), F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0), {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1), {ok, F3} = rabbit_fifo_client:enqueue(corr2, msg2, F2), {_, _, _} = process_ra_events(receive_ra_events(2, 2), F3), @@ -242,9 +241,9 @@ resends_lost_command(Config) -> meck:unload(ra), {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2), {_, _, F4} = process_ra_events(receive_ra_events(2, 0), F3), - {ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4), - {ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), - {ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), + {ok, _, {_, _, _, _, msg1}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4), + {ok, _, {_, _, _, _, msg2}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), + {ok, _, {_, _, _, _, msg3}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), ra:stop_server(ServerId), ok. @@ -268,7 +267,7 @@ detects_lost_delivery(Config) -> F000 = rabbit_fifo_client:init(ClusterName, [ServerId]), {ok, F00} = rabbit_fifo_client:enqueue(msg1, F000), {_, _, F0} = process_ra_events(receive_ra_events(1, 0), F00), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0), {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2), % lose first delivery @@ -298,13 +297,13 @@ returns_after_down(Config) -> _Pid = spawn(fun () -> F = rabbit_fifo_client:init(ClusterName, [ServerId]), {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10, - undefined, F), + #{}, F), Self ! checkout_done end), receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end, timer:sleep(1000), % message should be available for dequeue - {ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2), + {ok, _, {_, _, _, _, msg1}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2), ra:stop_server(ServerId), ok. @@ -327,9 +326,9 @@ resends_after_lost_applied(Config) -> % send another message {ok, F4} = rabbit_fifo_client:enqueue(msg3, F3), {_, _, F5} = process_ra_events(receive_ra_events(1, 0), F4), - {ok, {{_, {_, msg1}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), - {ok, {{_, {_, msg2}}, _}, F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), - {ok, {{_, {_, msg3}}, _}, _F8} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F7), + {ok, _, {_, _, _, _, msg1}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), + {ok, _, {_, _, _, _, msg2}, F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), + {ok, _, {_, _, _, _, msg3}, _F8} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F7), ra:stop_server(ServerId), ok. @@ -377,15 +376,16 @@ discard(Config) -> _ = ra:members(ServerId), F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0), {ok, F2} = rabbit_fifo_client:enqueue(msg1, F1), - F3 = discard_next_delivery(F2, 500), - {ok, empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3), + F3 = discard_next_delivery(F2, 5000), + {empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3), receive {dead_letter, Letters} -> [{_, msg1}] = Letters, ok after 500 -> + flush(), exit(dead_letter_timeout) end, ra:stop_server(ServerId), @@ -397,11 +397,11 @@ cancel_checkout(Config) -> ok = start_cluster(ClusterName, [ServerId]), F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), {ok, F1} = rabbit_fifo_client:enqueue(m1, F0), - {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1), + {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F1), {_, _, F3} = process_ra_events(receive_ra_events(1, 1), F2, [], [], fun (_, S) -> S end), {ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3), - {ok, F5} = rabbit_fifo_client:return(<<"tag">>, [0], F4), - {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5), + {F5, _} = rabbit_fifo_client:return(<<"tag">>, [0], F4), + {ok, _, {_, _, _, _, m1}, F5} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5), ok. credit(Config) -> @@ -413,20 +413,20 @@ credit(Config) -> {ok, F2} = rabbit_fifo_client:enqueue(m2, F1), {_, _, F3} = process_ra_events(receive_ra_events(2, 0), F2), %% checkout with 0 prefetch - {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, undefined, F3), + {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, #{}, F3), %% assert no deliveries {_, _, F5} = process_ra_events(receive_ra_events(), F4, [], [], fun (D, _) -> error({unexpected_delivery, D}) end), %% provide some credit - {ok, F6} = rabbit_fifo_client:credit(<<"tag">>, 1, false, F5), - {[{_, {_, m1}}], [{send_credit_reply, _}], F7} = + F6 = rabbit_fifo_client:credit(<<"tag">>, 1, false, F5), + {[{_, _, _, _, m1}], [{send_credit_reply, _}], F7} = process_ra_events(receive_ra_events(1, 1), F6), %% credit and drain - {ok, F8} = rabbit_fifo_client:credit(<<"tag">>, 4, true, F7), - {[{_, {_, m2}}], [{send_credit_reply, _}, {send_drained, _}], F9} = + F8 = rabbit_fifo_client:credit(<<"tag">>, 4, true, F7), + {[{_, _, _, _, m2}], [{send_credit_reply, _}, {send_drained, _}], F9} = process_ra_events(receive_ra_events(1, 1), F8), flush(), @@ -439,9 +439,8 @@ credit(Config) -> (D, _) -> error({unexpected_delivery, D}) end), %% credit again and receive the last message - {ok, F12} = rabbit_fifo_client:credit(<<"tag">>, 10, false, F11), - {[{_, {_, m3}}], [{send_credit_reply, _}], _} = - process_ra_events(receive_ra_events(1, 1), F12), + F12 = rabbit_fifo_client:credit(<<"tag">>, 10, false, F11), + {[{_, _, _, _, m3}], _, _} = process_ra_events(receive_ra_events(1, 1), F12), ok. untracked_enqueue(Config) -> @@ -452,7 +451,7 @@ untracked_enqueue(Config) -> ok = rabbit_fifo_client:untracked_enqueue([ServerId], msg1), timer:sleep(100), F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0), + {ok, _, {_, _, _, _, msg1}, _F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0), ra:stop_server(ServerId), ok. @@ -472,6 +471,7 @@ flow(Config) -> ok. test_queries(Config) -> + % ok = logger:set_primary_config(level, all), ClusterName = ?config(cluster_name, Config), ServerId = ?config(node_id, Config), ok = start_cluster(ClusterName, [ServerId]), @@ -484,20 +484,23 @@ test_queries(Config) -> Self ! ready, receive stop -> ok end end), + receive + ready -> ok + after 5000 -> + exit(ready_timeout) + end, F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), - ok = receive ready -> ok after 5000 -> timeout end, - {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, undefined, F0), - ?assertMatch({ok, {_RaIdxTerm, 1}, _Leader}, - ra:local_query(ServerId, - fun rabbit_fifo:query_messages_ready/1)), - ?assertMatch({ok, {_RaIdxTerm, 1}, _Leader}, - ra:local_query(ServerId, - fun rabbit_fifo:query_messages_checked_out/1)), - ?assertMatch({ok, {_RaIdxTerm, Processes}, _Leader} - when length(Processes) == 2, - ra:local_query(ServerId, - fun rabbit_fifo:query_processes/1)), - P ! stop, + {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, #{}, F0), + {ok, {_, Ready}, _} = ra:local_query(ServerId, + fun rabbit_fifo:query_messages_ready/1), + ?assertEqual(1, Ready), + {ok, {_, Checked}, _} = ra:local_query(ServerId, + fun rabbit_fifo:query_messages_checked_out/1), + ?assertEqual(1, Checked), + {ok, {_, Processes}, _} = ra:local_query(ServerId, + fun rabbit_fifo:query_processes/1), + ?assertEqual(2, length(Processes)), + P ! stop, ra:stop_server(ServerId), ok. @@ -511,15 +514,16 @@ dequeue(Config) -> Tag = UId, ok = start_cluster(ClusterName, [ServerId]), F1 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, empty, F1b} = rabbit_fifo_client:dequeue(Tag, settled, F1), + {empty, F1b} = rabbit_fifo_client:dequeue(Tag, settled, F1), {ok, F2_} = rabbit_fifo_client:enqueue(msg1, F1b), {_, _, F2} = process_ra_events(receive_ra_events(1, 0), F2_), - {ok, {{0, {_, msg1}}, _}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2), + % {ok, {{0, {_, msg1}}, _}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2), + {ok, _, {_, _, 0, _, msg1}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2), {ok, F4_} = rabbit_fifo_client:enqueue(msg2, F3), {_, _, F4} = process_ra_events(receive_ra_events(1, 0), F4_), - {ok, {{MsgId, {_, msg2}}, _}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4), - {ok, _F6} = rabbit_fifo_client:settle(Tag, [MsgId], F5), + {ok, _, {_, _, MsgId, _, msg2}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4), + {_F6, _A} = rabbit_fifo_client:settle(Tag, [MsgId], F5), ra:stop_server(ServerId), ok. @@ -534,8 +538,8 @@ conf(ClusterName, UId, ServerId, _, Peers) -> process_ra_event(State, Wait) -> receive {ra_event, From, Evt} -> - {internal, _, _, S} = - rabbit_fifo_client:handle_ra_event(From, Evt, State), + {ok, S, _Actions} = + rabbit_fifo_client:handle_ra_event(From, Evt, State), S after Wait -> exit(ra_event_timeout) @@ -572,10 +576,10 @@ receive_ra_events(Acc) -> end. process_ra_events(Events, State) -> - DeliveryFun = fun ({delivery, Tag, Msgs}, S) -> + DeliveryFun = fun ({deliver, _, Tag, Msgs}, S) -> MsgIds = [element(1, M) || M <- Msgs], - {ok, S2} = rabbit_fifo_client:settle(Tag, MsgIds, S), - S2 + {S0, _} = rabbit_fifo_client:settle(Tag, MsgIds, S), + S0 end, process_ra_events(Events, State, [], [], DeliveryFun). @@ -583,43 +587,41 @@ process_ra_events([], State0, Acc, Actions0, _DeliveryFun) -> {Acc, Actions0, State0}; process_ra_events([{ra_event, From, Evt} | Events], State0, Acc, Actions0, DeliveryFun) -> case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of - {internal, _, Actions, State} -> - process_ra_events(Events, State, Acc, Actions0 ++ Actions, DeliveryFun); - {{delivery, _Tag, Msgs} = Del, State1} -> - State = DeliveryFun(Del, State1), - process_ra_events(Events, State, Acc ++ Msgs, Actions0, DeliveryFun); + {ok, State1, Actions1} -> + {Msgs, Actions, State} = + lists:foldl( + fun ({deliver, _, _, Msgs} = Del, {M, A, S}) -> + {M ++ Msgs, A, DeliveryFun(Del, S)}; + (Ac, {M, A, S}) -> + {M, A ++ [Ac], S} + end, {Acc, [], State1}, Actions1), + process_ra_events(Events, State, Msgs, Actions0 ++ Actions, DeliveryFun); eol -> eol end. discard_next_delivery(State0, Wait) -> receive - {ra_event, From, Evt} -> - case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of - {internal, _, _Actions, State} -> - discard_next_delivery(State, Wait); - {{delivery, Tag, Msgs}, State1} -> - MsgIds = [element(1, M) || M <- Msgs], - {ok, State} = rabbit_fifo_client:discard(Tag, MsgIds, - State1), - State - end + {ra_event, _, {machine, {delivery, _, _}}} = Evt -> + element(3, process_ra_events([Evt], State0, [], [], + fun ({deliver, Tag, _, Msgs}, S) -> + MsgIds = [element(3, M) || M <- Msgs], + {S0, _} = rabbit_fifo_client:discard(Tag, MsgIds, S), + S0 + end)) after Wait -> - State0 + State0 end. return_next_delivery(State0, Wait) -> receive - {ra_event, From, Evt} -> - case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of - {internal, _, _, State} -> - return_next_delivery(State, Wait); - {{delivery, Tag, Msgs}, State1} -> - MsgIds = [element(1, M) || M <- Msgs], - {ok, State} = rabbit_fifo_client:return(Tag, MsgIds, - State1), - State - end + {ra_event, _, {machine, {delivery, _, _}}} = Evt -> + element(3, process_ra_events([Evt], State0, [], [], + fun ({deliver, Tag, _, Msgs}, S) -> + MsgIds = [element(3, M) || M <- Msgs], + {S0, _} = rabbit_fifo_client:return(Tag, MsgIds, S), + S0 + end)) after Wait -> State0 end. diff --git a/test/rabbit_ha_test_consumer.erl b/test/rabbit_ha_test_consumer.erl index 3324a1253c..2467e40028 100644 --- a/test/rabbit_ha_test_consumer.erl +++ b/test/rabbit_ha_test_consumer.erl @@ -51,12 +51,15 @@ run(TestPid, Channel, Queue, CancelOnFailover, LowestSeen, MsgsToConsume) -> %% counter. if MsgNum + 1 == LowestSeen -> + error_logger:info_msg("recording ~w left ~w", + [MsgNum, MsgsToConsume]), run(TestPid, Channel, Queue, CancelOnFailover, MsgNum, MsgsToConsume - 1); MsgNum >= LowestSeen -> error_logger:info_msg( - "consumer ~p on ~p ignoring redelivered msg ~p~n", - [self(), Channel, MsgNum]), + "consumer ~p on ~p ignoring redelivered msg ~p" + "lowest seen ~w~n", + [self(), Channel, MsgNum, LowestSeen]), true = Redelivered, %% ASSERTION run(TestPid, Channel, Queue, CancelOnFailover, LowestSeen, MsgsToConsume); diff --git a/test/rabbit_msg_record_SUITE.erl b/test/rabbit_msg_record_SUITE.erl new file mode 100644 index 0000000000..a82ba7481d --- /dev/null +++ b/test/rabbit_msg_record_SUITE.erl @@ -0,0 +1,213 @@ +-module(rabbit_msg_record_SUITE). + +-compile(export_all). + +-export([ + ]). + +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp10_common/include/amqp10_framing.hrl"). + +%%%=================================================================== +%%% Common Test callbacks +%%%=================================================================== + +all() -> + [ + {group, tests} + ]. + + +all_tests() -> + [ + ampq091_roundtrip, + message_id_ulong, + message_id_uuid, + message_id_binary, + message_id_large_binary, + message_id_large_string + ]. + +groups() -> + [ + {tests, [], all_tests()} + ]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + ok. + +%%%=================================================================== +%%% Test cases +%%%=================================================================== + +ampq091_roundtrip(_Config) -> + Props = #'P_basic'{content_type = <<"text/plain">>, + content_encoding = <<"gzip">>, + headers = [{<<"x-stream-offset">>, long, 99}, + {<<"x-string">>, longstr, <<"a string">>}, + {<<"x-bool">>, bool, false}, + {<<"x-unsignedbyte">>, unsignedbyte, 1}, + {<<"x-unsignedshort">>, unsignedshort, 1}, + {<<"x-unsignedint">>, unsignedint, 1}, + {<<"x-signedint">>, signedint, 1}, + {<<"x-timestamp">>, timestamp, 1}, + {<<"x-double">>, double, 1.0}, + {<<"x-float">>, float, 1.0}, + {<<"x-binary">>, binary, <<"data">>} + ], + delivery_mode = 2, + priority = 99, + correlation_id = <<"corr">> , + reply_to = <<"reply-to">>, + expiration = <<"1">>, + message_id = <<"msg-id">>, + timestamp = 99, + type = <<"45">>, + user_id = <<"banana">>, + app_id = <<"rmq">> + % cluster_id = <<"adf">> + }, + Payload = [<<"data">>], + test_amqp091_roundtrip(Props, Payload), + test_amqp091_roundtrip(#'P_basic'{}, Payload), + ok. + +message_id_ulong(_Config) -> + Num = 9876789, + ULong = erlang:integer_to_binary(Num), + P = #'v1_0.properties'{message_id = {ulong, Num}, + correlation_id = {ulong, Num}}, + D = #'v1_0.data'{content = <<"data">>}, + Bin = [amqp10_framing:encode_bin(P), + amqp10_framing:encode_bin(D)], + R = rabbit_msg_record:init(iolist_to_binary(Bin)), + {Props, _} = rabbit_msg_record:to_amqp091(R), + ?assertMatch(#'P_basic'{message_id = ULong, + correlation_id = ULong, + headers = + [ + %% ordering shouldn't matter + {<<"x-correlation-id-type">>, longstr, <<"ulong">>}, + {<<"x-message-id-type">>, longstr, <<"ulong">>} + ]}, + Props), + ok. + +message_id_uuid(_Config) -> + %% fake a uuid + UUId = erlang:md5(term_to_binary(make_ref())), + TextUUId = rabbit_data_coercion:to_binary(rabbit_guid:to_string(UUId)), + P = #'v1_0.properties'{message_id = {uuid, UUId}, + correlation_id = {uuid, UUId}}, + D = #'v1_0.data'{content = <<"data">>}, + Bin = [amqp10_framing:encode_bin(P), + amqp10_framing:encode_bin(D)], + R = rabbit_msg_record:init(iolist_to_binary(Bin)), + {Props, _} = rabbit_msg_record:to_amqp091(R), + ?assertMatch(#'P_basic'{message_id = TextUUId, + correlation_id = TextUUId, + headers = + [ + %% ordering shouldn't matter + {<<"x-correlation-id-type">>, longstr, <<"uuid">>}, + {<<"x-message-id-type">>, longstr, <<"uuid">>} + ]}, + Props), + ok. + +message_id_binary(_Config) -> + %% fake a uuid + Orig = <<"asdfasdf">>, + Text = base64:encode(Orig), + P = #'v1_0.properties'{message_id = {binary, Orig}, + correlation_id = {binary, Orig}}, + D = #'v1_0.data'{content = <<"data">>}, + Bin = [amqp10_framing:encode_bin(P), + amqp10_framing:encode_bin(D)], + R = rabbit_msg_record:init(iolist_to_binary(Bin)), + {Props, _} = rabbit_msg_record:to_amqp091(R), + ?assertMatch(#'P_basic'{message_id = Text, + correlation_id = Text, + headers = + [ + %% ordering shouldn't matter + {<<"x-correlation-id-type">>, longstr, <<"binary">>}, + {<<"x-message-id-type">>, longstr, <<"binary">>} + ]}, + Props), + ok. + +message_id_large_binary(_Config) -> + %% cannot fit in a shortstr + Orig = crypto:strong_rand_bytes(500), + P = #'v1_0.properties'{message_id = {binary, Orig}, + correlation_id = {binary, Orig}}, + D = #'v1_0.data'{content = <<"data">>}, + Bin = [amqp10_framing:encode_bin(P), + amqp10_framing:encode_bin(D)], + R = rabbit_msg_record:init(iolist_to_binary(Bin)), + {Props, _} = rabbit_msg_record:to_amqp091(R), + ?assertMatch(#'P_basic'{message_id = undefined, + correlation_id = undefined, + headers = + [ + %% ordering shouldn't matter + {<<"x-correlation-id">>, longstr, Orig}, + {<<"x-message-id">>, longstr, Orig} + ]}, + Props), + ok. + +message_id_large_string(_Config) -> + %% cannot fit in a shortstr + Orig = base64:encode(crypto:strong_rand_bytes(500)), + P = #'v1_0.properties'{message_id = {utf8, Orig}, + correlation_id = {utf8, Orig}}, + D = #'v1_0.data'{content = <<"data">>}, + Bin = [amqp10_framing:encode_bin(P), + amqp10_framing:encode_bin(D)], + R = rabbit_msg_record:init(iolist_to_binary(Bin)), + {Props, _} = rabbit_msg_record:to_amqp091(R), + ?assertMatch(#'P_basic'{message_id = undefined, + correlation_id = undefined, + headers = + [ + %% ordering shouldn't matter + {<<"x-correlation-id">>, longstr, Orig}, + {<<"x-message-id">>, longstr, Orig} + ]}, + Props), + ok. + +%% Utility + +test_amqp091_roundtrip(Props, Payload) -> + MsgRecord0 = rabbit_msg_record:from_amqp091(Props, Payload), + MsgRecord = rabbit_msg_record:init( + iolist_to_binary(rabbit_msg_record:to_iodata(MsgRecord0))), + % meck:unload(), + {PropsOut, PayloadOut} = rabbit_msg_record:to_amqp091(MsgRecord), + ?assertEqual(Props, PropsOut), + ?assertEqual(iolist_to_binary(Payload), + iolist_to_binary(PayloadOut)), + ok. + + diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl new file mode 100644 index 0000000000..67ca8eba8b --- /dev/null +++ b/test/rabbit_stream_queue_SUITE.erl @@ -0,0 +1,1304 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% Copyright (c) 2012-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_stream_queue_SUITE). + +-include_lib("proper/include/proper.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +suite() -> + [{timetrap, 5 * 60000}]. + +all() -> + [ + {group, single_node}, + {group, cluster_size_2}, + {group, cluster_size_3}, + {group, unclustered_size_3_1}, + {group, unclustered_size_3_2}, + {group, unclustered_size_3_3}, + {group, cluster_size_3_1} + ]. + +groups() -> + [ + {single_node, [], [restart_single_node] ++ all_tests()}, + {cluster_size_2, [], all_tests()}, + {cluster_size_3, [], all_tests() ++ + [delete_replica, + delete_down_replica, + delete_classic_replica, + delete_quorum_replica, + consume_from_replica, + leader_failover]}, + {unclustered_size_3_1, [], [add_replica]}, + {unclustered_size_3_2, [], [consume_without_local_replica]}, + {unclustered_size_3_3, [], [grow_coordinator_cluster]}, + {cluster_size_3_1, [], [shrink_coordinator_cluster]} + ]. + +all_tests() -> + [ + declare_args, + declare_max_age, + declare_invalid_args, + declare_invalid_properties, + declare_queue, + delete_queue, + publish, + publish_confirm, + recover, + consume_without_qos, + consume, + consume_offset, + basic_get, + consume_with_autoack, + consume_and_nack, + consume_and_ack, + consume_and_reject, + consume_from_last, + consume_from_next, + consume_from_default, + consume_credit, + consume_credit_out_of_order_ack, + consume_credit_multiple_ack, + basic_cancel, + max_length_bytes, + max_age, + invalid_policy, + max_age_policy, + max_segment_size_policy + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config0) -> + rabbit_ct_helpers:log_environment(), + Config = rabbit_ct_helpers:merge_app_env( + Config0, {rabbit, [{stream_tick_interval, 1000}, + {log, [{file, [{level, debug}]}]}]}), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(Group, Config) -> + ClusterSize = case Group of + single_node -> 1; + cluster_size_2 -> 2; + cluster_size_3 -> 3; + cluster_size_3_1 -> 3; + unclustered_size_3_1 -> 3; + unclustered_size_3_2 -> 3; + unclustered_size_3_3 -> 3 + end, + Clustered = case Group of + unclustered_size_3_1 -> false; + unclustered_size_3_2 -> false; + unclustered_size_3_3 -> false; + _ -> true + end, + Config1 = rabbit_ct_helpers:set_config(Config, + [{rmq_nodes_count, ClusterSize}, + {rmq_nodename_suffix, Group}, + {tcp_ports_base}, + {rmq_nodes_clustered, Clustered}]), + Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]), + Ret = rabbit_ct_helpers:run_steps(Config1b, + [fun merge_app_env/1 ] ++ + rabbit_ct_broker_helpers:setup_steps()), + case Ret of + {skip, _} -> + Ret; + Config2 -> + EnableFF = rabbit_ct_broker_helpers:enable_feature_flag( + Config2, stream_queue), + case EnableFF of + ok -> + ok = rabbit_ct_broker_helpers:rpc( + Config2, 0, application, set_env, + [rabbit, channel_tick_interval, 100]), + Config2; + Skip -> + end_per_group(Group, Config2), + Skip + end + end. + +end_per_group(_, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), + Q = rabbit_data_coercion:to_binary(Testcase), + Config2 = rabbit_ct_helpers:set_config(Config1, [{queue_name, Q}]), + rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()). + +merge_app_env(Config) -> + rabbit_ct_helpers:merge_app_env(Config, + {rabbit, [{core_metrics_gc_interval, 100}]}). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), + Config1 = rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_client_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +declare_args(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-max-length">>, long, 2000}])), + assert_queue_type(Server, Q, rabbit_stream_queue). + +declare_max_age(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), Q, + [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-max-age">>, longstr, <<"1A">>}])), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-max-age">>, longstr, <<"1Y">>}])), + assert_queue_type(Server, Q, rabbit_stream_queue). + +declare_invalid_properties(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Q = ?config(queue_name, Config), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + amqp_channel:call( + rabbit_ct_client_helpers:open_channel(Config, Server), + #'queue.declare'{queue = Q, + auto_delete = true, + durable = true, + arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]})), + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + amqp_channel:call( + rabbit_ct_client_helpers:open_channel(Config, Server), + #'queue.declare'{queue = Q, + exclusive = true, + durable = true, + arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]})), + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + amqp_channel:call( + rabbit_ct_client_helpers:open_channel(Config, Server), + #'queue.declare'{queue = Q, + durable = false, + arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]})). + +declare_invalid_args(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Q = ?config(queue_name, Config), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-expires">>, long, 2000}])), + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-message-ttl">>, long, 2000}])), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-max-priority">>, long, 2000}])), + + [?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-overflow">>, longstr, XOverflow}])) + || XOverflow <- [<<"reject-publish">>, <<"reject-publish-dlx">>]], + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-mode">>, longstr, <<"lazy">>}])), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-quorum-initial-group-size">>, longstr, <<"hop">>}])). + +declare_queue(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + %% Test declare an existing queue + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + ?assertMatch([_], rpc:call(Server, supervisor, which_children, + [osiris_server_sup])), + + %% Test declare an existing queue with different arguments + ?assertExit(_, declare(Ch, Q, [])). + +delete_queue(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})). + +add_replica(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Q = ?config(queue_name, Config), + + %% Let's also try the add replica command on other queue types, it should fail + %% We're doing it in the same test for efficiency, otherwise we have to + %% start new rabbitmq clusters every time for a minor testcase + QClassic = <<Q/binary, "_classic">>, + QQuorum = <<Q/binary, "_quorum">>, + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + ?assertEqual({'queue.declare_ok', QClassic, 0, 0}, + declare(Ch, QClassic, [{<<"x-queue-type">>, longstr, <<"classic">>}])), + ?assertEqual({'queue.declare_ok', QQuorum, 0, 0}, + declare(Ch, QQuorum, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + %% Not a member of the cluster, what would happen? + ?assertEqual({error, node_not_running}, + rpc:call(Server0, rabbit_stream_queue, add_replica, + [<<"/">>, Q, Server1])), + ?assertEqual({error, classic_queue_not_supported}, + rpc:call(Server0, rabbit_stream_queue, add_replica, + [<<"/">>, QClassic, Server1])), + ?assertEqual({error, quorum_queue_not_supported}, + rpc:call(Server0, rabbit_stream_queue, add_replica, + [<<"/">>, QQuorum, Server1])), + + ok = rabbit_control_helper:command(stop_app, Server1), + ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), + rabbit_control_helper:command(start_app, Server1), + timer:sleep(1000), + ?assertEqual({error, classic_queue_not_supported}, + rpc:call(Server0, rabbit_stream_queue, add_replica, + [<<"/">>, QClassic, Server1])), + ?assertEqual({error, quorum_queue_not_supported}, + rpc:call(Server0, rabbit_stream_queue, add_replica, + [<<"/">>, QQuorum, Server1])), + ?assertEqual(ok, + rpc:call(Server0, rabbit_stream_queue, add_replica, + [<<"/">>, Q, Server1])), + %% replicas must be recorded on the state, and if we publish messages then they must + %% be stored on disk + check_leader_and_replicas(Config, Q, Server0, [Server1]), + %% And if we try again? Idempotent + ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, add_replica, + [<<"/">>, Q, Server1])), + %% Add another node + ok = rabbit_control_helper:command(stop_app, Server2), + ok = rabbit_control_helper:command(join_cluster, Server2, [atom_to_list(Server0)], []), + rabbit_control_helper:command(start_app, Server2), + ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, add_replica, + [<<"/">>, Q, Server2])), + check_leader_and_replicas(Config, Q, Server0, [Server1, Server2]). + +delete_replica(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + check_leader_and_replicas(Config, Q, Server0, [Server1, Server2]), + %% Not a member of the cluster, what would happen? + ?assertEqual({error, node_not_running}, + rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, 'zen@rabbit'])), + ?assertEqual(ok, + rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, Server1])), + %% check it's gone + check_leader_and_replicas(Config, Q, Server0, [Server2]), + %% And if we try again? Idempotent + ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, Server1])), + %% Delete the last replica + ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, Server2])), + check_leader_and_replicas(Config, Q, Server0, []). + +grow_coordinator_cluster(Config) -> + [Server0, Server1, _Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + ok = rabbit_control_helper:command(stop_app, Server1), + ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), + rabbit_control_helper:command(start_app, Server1), + + rabbit_ct_helpers:await_condition( + fun() -> + case rpc:call(Server0, ra, members, [{rabbit_stream_coordinator, Server0}]) of + {_, Members, _} -> + Nodes = lists:sort([N || {_, N} <- Members]), + lists:sort([Server0, Server1]) == Nodes; + _ -> + false + end + end, 60000). + +shrink_coordinator_cluster(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + ok = rabbit_control_helper:command(stop_app, Server2), + ok = rabbit_control_helper:command(forget_cluster_node, Server0, [atom_to_list(Server2)], []), + + rabbit_ct_helpers:await_condition( + fun() -> + case rpc:call(Server0, ra, members, [{rabbit_stream_coordinator, Server0}]) of + {_, Members, _} -> + Nodes = lists:sort([N || {_, N} <- Members]), + lists:sort([Server0, Server1]) == Nodes; + _ -> + false + end + end, 60000). + +delete_classic_replica(Config) -> + [Server0, Server1, _Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"classic">>}])), + %% Not a member of the cluster, what would happen? + ?assertEqual({error, classic_queue_not_supported}, + rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, 'zen@rabbit'])), + ?assertEqual({error, classic_queue_not_supported}, + rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, Server1])). + +delete_quorum_replica(Config) -> + [Server0, Server1, _Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + %% Not a member of the cluster, what would happen? + ?assertEqual({error, quorum_queue_not_supported}, + rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, 'zen@rabbit'])), + ?assertEqual({error, quorum_queue_not_supported}, + rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, Server1])). + +delete_down_replica(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + check_leader_and_replicas(Config, Q, Server0, [Server1, Server2]), + ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), + ?assertEqual({error, node_not_running}, + rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, Server1])), + %% check it isn't gone + check_leader_and_replicas(Config, Q, Server0, [Server1, Server2]), + ok = rabbit_ct_broker_helpers:start_node(Config, Server1), + ?assertEqual(ok, + rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, Server1])). + +publish(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + publish(Ch, Q), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]). + +publish_confirm(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, Q), + amqp_channel:wait_for_confirms(Ch, 5000), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]). + +restart_single_node(Config) -> + [Server] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + publish(Ch, Q), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]), + + rabbit_control_helper:command(stop_app, Server), + rabbit_control_helper:command(start_app, Server), + + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]), + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + publish(Ch1, Q), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"2">>, <<"0">>]]). + +recover(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + publish(Ch, Q), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]), + + [rabbit_ct_broker_helpers:stop_node(Config, S) || S <- Servers], + [rabbit_ct_broker_helpers:start_node(Config, S) || S <- lists:reverse(Servers)], + + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]), + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + publish(Ch1, Q), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"2">>, <<"0">>]]). + +consume_without_qos(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + ?assertExit({{shutdown, {server_initiated_close, 406, _}}, _}, + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, consumer_tag = <<"ctag">>}, + self())). + +consume_without_local_replica(Config) -> + [Server0, Server1 | _] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + %% Add another node to the cluster, but it won't have a replica + ok = rabbit_control_helper:command(stop_app, Server1), + ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), + rabbit_control_helper:command(start_app, Server1), + timer:sleep(1000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1), + qos(Ch1, 10, false), + ?assertExit({{shutdown, {server_initiated_close, 406, _}}, _}, + amqp_channel:subscribe(Ch1, #'basic.consume'{queue = Q, consumer_tag = <<"ctag">>}, + self())). + +consume(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, Q), + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + subscribe(Ch1, Q, false, 0), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + _ = amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}), + ok = amqp_channel:close(Ch1), + ok + after 5000 -> + exit(timeout) + end. + +consume_offset(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + Payload = << <<"1">> || _ <- lists:seq(1, 500) >>, + [publish(Ch, Q, Payload) || _ <- lists:seq(1, 1000)], + amqp_channel:wait_for_confirms(Ch, 5000), + + run_proper( + fun () -> + ?FORALL(Offset, range(0, 999), + begin + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + subscribe(Ch1, Q, false, Offset), + receive_batch(Ch1, Offset, 999), + receive + {_, + #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S}]}}} + when S < Offset -> + exit({unexpected_offset, S}) + after 1000 -> + ok + end, + amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}), + true + end) + end, [], 25). + +basic_get(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _}, + amqp_channel:call(Ch, #'basic.get'{queue = Q})). + +consume_with_autoack(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + + ?assertExit( + {{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _}, + subscribe(Ch1, Q, true, 0)). + +consume_and_nack(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, Q), + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + subscribe(Ch1, Q, false, 0), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + ok = amqp_channel:cast(Ch1, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}), + %% Nack will throw a not implemented exception. As it is a cast operation, + %% we'll detect the conneciton/channel closure on the next call. + %% Let's try to redeclare and see what happens + ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _}, + declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])) + after 10000 -> + exit(timeout) + end. + +basic_cancel(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, Q), + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + subscribe(Ch1, Q, false, 0), + rabbit_ct_helpers:await_condition( + fun() -> + 1 == length(rabbit_ct_broker_helpers:rpc(Config, Server, ets, tab2list, + [consumer_created])) + end, 30000), + receive + {#'basic.deliver'{}, _} -> + amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}), + ?assertMatch([], rabbit_ct_broker_helpers:rpc(Config, Server, ets, tab2list, [consumer_created])) + after 10000 -> + exit(timeout) + end. + +consume_and_reject(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, Q), + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + subscribe(Ch1, Q, false, 0), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + ok = amqp_channel:cast(Ch1, #'basic.reject'{delivery_tag = DeliveryTag, + requeue = true}), + %% Reject will throw a not implemented exception. As it is a cast operation, + %% we'll detect the conneciton/channel closure on the next call. + %% Let's try to redeclare and see what happens + ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _}, + declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])) + after 10000 -> + exit(timeout) + end. + +consume_and_ack(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, Q), + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + subscribe(Ch1, Q, false, 0), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + %% It will succeed as ack is now a credit operation. We should be + %% able to redeclare a queue (gen_server call op) as the channel + %% should still be open and declare is an idempotent operation + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]) + after 5000 -> + exit(timeout) + end. + +consume_from_last(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + + [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [committed_offset]]), + + %% We'll receive data from the last committed offset, let's check that is not the + %% first offset + CommittedOffset = proplists:get_value(committed_offset, Info), + ?assert(CommittedOffset > 0), + + %% If the offset is not provided, we're subscribing to the tail of the stream + amqp_channel:subscribe( + Ch1, #'basic.consume'{queue = Q, + no_ack = false, + consumer_tag = <<"ctag">>, + arguments = [{<<"x-stream-offset">>, longstr, <<"last">>}]}, + self()), + receive + #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + ok + end, + + %% And receive the messages from the last committed offset to the end of the stream + receive_batch(Ch1, CommittedOffset, 99), + + %% Publish a few more + [publish(Ch, Q, <<"msg2">>) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch, 5000), + + %% Yeah! we got them + receive_batch(Ch1, 100, 199). + +consume_from_next(Config) -> + consume_from_next(Config, [{<<"x-stream-offset">>, longstr, <<"next">>}]). + +consume_from_default(Config) -> + consume_from_next(Config, []). + +consume_from_next(Config, Args) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + + [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [committed_offset]]), + + %% We'll receive data from the last committed offset, let's check that is not the + %% first offset + CommittedOffset = proplists:get_value(committed_offset, Info), + ?assert(CommittedOffset > 0), + + %% If the offset is not provided, we're subscribing to the tail of the stream + amqp_channel:subscribe( + Ch1, #'basic.consume'{queue = Q, + no_ack = false, + consumer_tag = <<"ctag">>, + arguments = Args}, + self()), + receive + #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + ok + end, + + %% Publish a few more + [publish(Ch, Q, <<"msg2">>) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch, 5000), + + %% Yeah! we got them + receive_batch(Ch1, 100, 199). + +consume_from_replica(Config) -> + [Server1, Server2 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch1, self()), + [publish(Ch1, Q, <<"msg1">>) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch1, 5000), + + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2), + qos(Ch2, 10, false), + + subscribe(Ch2, Q, false, 0), + receive_batch(Ch2, 0, 99). + +consume_credit(Config) -> + %% Because osiris provides one chunk on every read and we don't want to buffer + %% messages in the broker to avoid memory penalties, the credit value won't + %% be strict - we allow it into the negative values. + %% We can test that after receiving a chunk, no more messages are delivered until + %% the credit goes back to a positive value. + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + %% Let's publish a big batch, to ensure we have more than a chunk available + NumMsgs = 100, + [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, NumMsgs)], + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + + %% Let's subscribe with a small credit, easier to test + Credit = 2, + qos(Ch1, Credit, false), + subscribe(Ch1, Q, false, 0), + + %% Receive everything + DeliveryTags = receive_batch(), + + %% We receive at least the given credit as we know there are 100 messages in the queue + ?assert(length(DeliveryTags) >= Credit), + + %% Let's ack as many messages as we can while avoiding a positive credit for new deliveries + {ToAck, Pending} = lists:split(length(DeliveryTags) - Credit, DeliveryTags), + + [ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}) + || DeliveryTag <- ToAck], + + %% Nothing here, this is good + receive + {#'basic.deliver'{}, _} -> + exit(unexpected_delivery) + after 1000 -> + ok + end, + + %% Let's ack one more, we should receive a new chunk + ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = hd(Pending), + multiple = false}), + + %% Yeah, here is the new chunk! + receive + {#'basic.deliver'{}, _} -> + ok + after 5000 -> + exit(timeout) + end. + +consume_credit_out_of_order_ack(Config) -> + %% Like consume_credit but acknowledging the messages out of order. + %% We want to ensure it doesn't behave like multiple, that is if we have + %% credit 2 and received 10 messages, sending the ack for the message id + %% number 10 should only increase credit by 1. + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + %% Let's publish a big batch, to ensure we have more than a chunk available + NumMsgs = 100, + [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, NumMsgs)], + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + + %% Let's subscribe with a small credit, easier to test + Credit = 2, + qos(Ch1, Credit, false), + subscribe(Ch1, Q, false, 0), + + %% ******* This is the difference with consume_credit + %% Receive everything, let's reverse the delivery tags here so we ack out of order + DeliveryTags = lists:reverse(receive_batch()), + + %% We receive at least the given credit as we know there are 100 messages in the queue + ?assert(length(DeliveryTags) >= Credit), + + %% Let's ack as many messages as we can while avoiding a positive credit for new deliveries + {ToAck, Pending} = lists:split(length(DeliveryTags) - Credit, DeliveryTags), + + [ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}) + || DeliveryTag <- ToAck], + + %% Nothing here, this is good + receive + {#'basic.deliver'{}, _} -> + exit(unexpected_delivery) + after 1000 -> + ok + end, + + %% Let's ack one more, we should receive a new chunk + ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = hd(Pending), + multiple = false}), + + %% Yeah, here is the new chunk! + receive + {#'basic.deliver'{}, _} -> + ok + after 5000 -> + exit(timeout) + end. + +consume_credit_multiple_ack(Config) -> + %% Like consume_credit but acknowledging the messages out of order. + %% We want to ensure it doesn't behave like multiple, that is if we have + %% credit 2 and received 10 messages, sending the ack for the message id + %% number 10 should only increase credit by 1. + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + %% Let's publish a big batch, to ensure we have more than a chunk available + NumMsgs = 100, + [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, NumMsgs)], + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + + %% Let's subscribe with a small credit, easier to test + Credit = 2, + qos(Ch1, Credit, false), + subscribe(Ch1, Q, false, 0), + + %% ******* This is the difference with consume_credit + %% Receive everything, let's reverse the delivery tags here so we ack out of order + DeliveryTag = lists:last(receive_batch()), + + ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = true}), + + %% Yeah, here is the new chunk! + receive + {#'basic.deliver'{}, _} -> + ok + after 5000 -> + exit(timeout) + end. + +max_length_bytes(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-max-length-bytes">>, long, 500}, + {<<"x-max-segment-size">>, long, 250}])), + + Payload = << <<"1">> || _ <- lists:seq(1, 500) >>, + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + [publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch, 5000), + + %% We don't yet have reliable metrics, as the committed offset doesn't work + %% as a counter once we start applying retention policies. + %% Let's wait for messages and hope these are less than the number of published ones + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 100, false), + subscribe(Ch1, Q, false, 0), + + ?assert(length(receive_batch()) < 100). + +max_age(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-max-age">>, longstr, <<"10s">>}, + {<<"x-max-segment-size">>, long, 250}])), + + Payload = << <<"1">> || _ <- lists:seq(1, 500) >>, + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + [publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch, 5000), + + timer:sleep(10000), + + %% Let's publish again so the new segments will trigger the retention policy + [publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 200, false), + subscribe(Ch1, Q, false, 0), + ?assertEqual(100, length(receive_batch())). + +leader_failover(Config) -> + [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch1, self()), + [publish(Ch1, Q, <<"msg">>) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch1, 5000), + + check_leader_and_replicas(Config, Q, Server1, [Server2, Server3]), + + ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), + timer:sleep(30000), + + [Info] = lists:filter( + fun(Props) -> + QName = rabbit_misc:r(<<"/">>, queue, Q), + lists:member({name, QName}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader, members]])), + NewLeader = proplists:get_value(leader, Info), + ?assert(NewLeader =/= Server1), + ok = rabbit_ct_broker_helpers:start_node(Config, Server1). + +invalid_policy(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"ha">>, <<"invalid_policy.*">>, <<"queues">>, + [{<<"ha-mode">>, <<"all">>}]), + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"ttl">>, <<"invalid_policy.*">>, <<"queues">>, + [{<<"message-ttl">>, 5}]), + + [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [policy, operator_policy, + effective_policy_definition]]), + + ?assertEqual('', proplists:get_value(policy, Info)), + ?assertEqual('', proplists:get_value(operator_policy, Info)), + ?assertEqual([], proplists:get_value(effective_policy_definition, Info)), + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ha">>), + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl">>). + +max_age_policy(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"age">>, <<"max_age_policy.*">>, <<"queues">>, + [{<<"max-age">>, <<"1Y">>}]), + + [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [policy, operator_policy, + effective_policy_definition]]), + + ?assertEqual(<<"age">>, proplists:get_value(policy, Info)), + ?assertEqual('', proplists:get_value(operator_policy, Info)), + ?assertEqual([{<<"max-age">>, <<"1Y">>}], + proplists:get_value(effective_policy_definition, Info)), + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"age">>). + +max_segment_size_policy(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"segment">>, <<"max_segment_size.*">>, <<"queues">>, + [{<<"max-segment-size">>, 5000}]), + + [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [policy, operator_policy, + effective_policy_definition]]), + + ?assertEqual(<<"segment">>, proplists:get_value(policy, Info)), + ?assertEqual('', proplists:get_value(operator_policy, Info)), + ?assertEqual([{<<"max-segment-size">>, 5000}], + proplists:get_value(effective_policy_definition, Info)), + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"segment">>). + +%%---------------------------------------------------------------------------- + +delete_queues() -> + [{ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) + || Q <- rabbit_amqqueue:list()]. + +declare(Ch, Q) -> + declare(Ch, Q, []). + +declare(Ch, Q, Args) -> + amqp_channel:call(Ch, #'queue.declare'{queue = Q, + durable = true, + auto_delete = false, + arguments = Args}). +assert_queue_type(Server, Q, Expected) -> + Actual = get_queue_type(Server, Q), + Expected = Actual. + +get_queue_type(Server, Q0) -> + QNameRes = rabbit_misc:r(<<"/">>, queue, Q0), + {ok, Q1} = rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]), + amqqueue:get_type(Q1). + +check_leader_and_replicas(Config, Name, Leader, Replicas0) -> + QNameRes = rabbit_misc:r(<<"/">>, queue, Name), + [Info] = lists:filter( + fun(Props) -> + lists:member({name, QNameRes}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader, members]])), + ?assertEqual(Leader, proplists:get_value(leader, Info)), + Replicas = lists:sort(Replicas0), + ?assertEqual(Replicas, lists:sort(proplists:get_value(members, Info))). + +publish(Ch, Queue) -> + publish(Ch, Queue, <<"msg">>). + +publish(Ch, Queue, Msg) -> + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = Queue}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = Msg}). + +subscribe(Ch, Queue, NoAck, Offset) -> + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, + no_ack = NoAck, + consumer_tag = <<"ctag">>, + arguments = [{<<"x-stream-offset">>, long, Offset}]}, + self()), + receive + #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + ok + end. + +qos(Ch, Prefetch, Global) -> + ?assertMatch(#'basic.qos_ok'{}, + amqp_channel:call(Ch, #'basic.qos'{global = Global, + prefetch_count = Prefetch})). + +receive_batch(Ch, N, N) -> + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, + #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, N}]}}} -> + ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}) + after 5000 -> + exit({missing_offset, N}) + end; +receive_batch(Ch, N, M) -> + receive + {_, + #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S}]}}} + when S < N -> + exit({unexpected_offset, S}); + {#'basic.deliver'{delivery_tag = DeliveryTag}, + #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, N}]}}} -> + ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + receive_batch(Ch, N + 1, M) + after 5000 -> + exit({missing_offset, N}) + end. + +receive_batch() -> + receive_batch([]). + +receive_batch(Acc) -> + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + receive_batch([DeliveryTag | Acc]) + after 5000 -> + lists:reverse(Acc) + end. + +run_proper(Fun, Args, NumTests) -> + ?assertEqual( + true, + proper:counterexample( + erlang:apply(Fun, Args), + [{numtests, NumTests}, + {on_output, fun(".", _) -> ok; % don't print the '.'s on new lines + (F, A) -> ct:pal(?LOW_IMPORTANCE, F, A) + end}])). diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl index 013e625159..8b2c1d6ebb 100644 --- a/test/simple_ha_SUITE.erl +++ b/test/simple_ha_SUITE.erl @@ -234,8 +234,10 @@ consume_survives(Config, DeathFun(Config, A), %% verify that the consumer got all msgs, or die - the await_response %% calls throw an exception if anything goes wrong.... - rabbit_ha_test_consumer:await_response(ConsumerPid), + ct:pal("awaiting produce ~w", [ProducerPid]), rabbit_ha_test_producer:await_response(ProducerPid), + ct:pal("awaiting consumer ~w", [ConsumerPid]), + rabbit_ha_test_consumer:await_response(ConsumerPid), ok. confirms_survive(Config, DeathFun) -> diff --git a/test/unit_log_config_SUITE.erl b/test/unit_log_config_SUITE.erl index 3610fd1a80..6be403fd3e 100644 --- a/test/unit_log_config_SUITE.erl +++ b/test/unit_log_config_SUITE.erl @@ -126,6 +126,10 @@ sink_rewrite_sinks() -> {rabbit_log_mirroring_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]}, + {rabbit_log_osiris_lager_event, + [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, + {rabbit_handlers, + [{lager_forwarder_backend,[lager_event,info]}]}]}, {rabbit_log_prelaunch_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]}, @@ -226,6 +230,10 @@ sink_handlers_merged_with_lager_extra_sinks_handlers(_) -> {rabbit_log_mirroring_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]}, + {rabbit_log_osiris_lager_event, + [{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}, + {rabbit_handlers, + [{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]}, {rabbit_log_prelaunch_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]}, @@ -317,6 +325,10 @@ level_sinks() -> {rabbit_log_mirroring_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,error]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,error]}]}]}, + {rabbit_log_osiris_lager_event, + [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, + {rabbit_handlers, + [{lager_forwarder_backend,[lager_event,info]}]}]}, {rabbit_log_prelaunch_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]}, @@ -427,6 +439,10 @@ file_sinks(DefaultLevel) -> {rabbit_log_mirroring_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]}, + {rabbit_log_osiris_lager_event, + [{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}, + {rabbit_handlers, + [{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]}, {rabbit_log_prelaunch_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]}, @@ -674,6 +690,10 @@ default_expected_sinks(UpgradeFile) -> {rabbit_log_mirroring_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]}, + {rabbit_log_osiris_lager_event, + [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, + {rabbit_handlers, + [{lager_forwarder_backend,[lager_event,info]}]}]}, {rabbit_log_prelaunch_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]}, @@ -761,6 +781,10 @@ tty_expected_sinks() -> {rabbit_log_mirroring_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]}, + {rabbit_log_osiris_lager_event, + [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, + {rabbit_handlers, + [{lager_forwarder_backend,[lager_event,info]}]}]}, {rabbit_log_prelaunch_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]}, |
