diff options
| author | kjnilsson <knilsson@pivotal.io> | 2020-09-29 11:43:24 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2020-09-30 14:52:53 +0100 |
| commit | f20fa273e99e6dcd925f8cd5988b6385b7fc0b6a (patch) | |
| tree | 18bea003bb781196698622bb20767477267d7f7b /test | |
| parent | bdb6f9b508dd1aaad5e618d9a620adb7972e618f (diff) | |
| download | rabbitmq-server-git-f20fa273e99e6dcd925f8cd5988b6385b7fc0b6a.tar.gz | |
Stream Queue
This is an aggregated commit of all changes related to the
initial implementation of queue types and on top of that the stream
queue type. The varios commit messages have simply been included mostly
un-edited below.
Make rabbit_amqqueue:not_found_or_absent_dirty/1 visible
For use in the stream plugin.
Use bigger retention policy on max-age test
Set coordinator timeout to 30s
Handle coordinator unavailable error
Handle operator policies as maps when checking if is applicable
Add is_policy_applicable/2 to classic queues
Ignore restart commands if the stream has been deleted
It could happen that after termination some of the monitors are still
up and trigger writer/replica restarts
Policy support on stream queues
Remove subscription events on stream coordinator
Ensure old leaders are removed from monitors
Introduce delay when retrying a failed phase
Note that this ensures monitor is setup, there was a bug where no
monitor was really started when re-trying the same phase
Restart replicas after leader election instead of relying on old monitors
Use timer for stream coordinator retries
Fix stream stats for members/online
Multiple fixes for replica monitoring and restart
Ensure pending commands are appended at the end and re-run
Ensure phase is reset with the state
Remove duplicates from replica list
Restart current phase on state_enter
Remove unused import
Ensure rabbit is running when checking for stream quorum
Restart replicas
Add a close/1 function to queue types
So that we can get a chance of cleaning up resources if needed.
Stream queues close their osiris logs at this point.
fix compiler errors
stream-queue: take retention into account
When calculating ready messages metrics.
Add osiris to the list of rabbit deps
Retry restart of replicas
Do not restart replicas or leaders after receiving a delete cluster command
Add more logging to the stream coordinator
Monitor subscribed processes on the stream coordinator
Memory breakdown for stream queues
Update quorum queue event formatter
rabbit_msg_record fixes
Refactor channel confirms
Remove old unconfirmed_messages module that was designed to handle
multiple queue fan in logic including all ha mirrors etc. Replaced with
simpler rabbit_confirms module that handles the fan out and leaves any
queue specific logic (such as confirms from mirrors) to the queue type
implemention. Also this module has a dedicated test module. Which is
nice.
Backward compatibility with 3.8.x events
Supports mixed version cluster upgrades
Match specification when stream queue already exists
Max age retention for stream queues
Stop all replicas before starting leader election
stream: disallow global qos
remove IS_CLASSIC|QUORUM macros
Ensure only classic queues are notified on channel down
This also removes the delivering_queues map in the channel state as it
should not be needed for this and just cause additional unecessary
accounting.
Polish AMQP 1.0/0.9.1 properties conversion
Support byte in application properties, handle 1-bit representation for
booleans.
Use binary in header for long AMQP 1.0 ID
Fix AMQP 1.0 to 0.9.1 conversion
Fix test due to incorrect type
Convert timestamp application properties to/from seconds
AMQP 1.0 uses milliseconds for timestamp and AMQP 0.9.1 uses seconds, so
conversion needed.
Dialyzer fixes
Handle all message-id types
AMQP 1.0 is more liberal in it's allowed types of message-id and
correlation-id - this adds headers to describe the type of the data in
the message_id / correlation_id properties and also handles the case
where the data cannot fit by again using headers.
Resize stream coordinator cluster when broker configuration changes
convert timestamp to and fro seconds
user_id should be a binary
message annotations keys need to be symbols
stream-queue: default exchange and routing key
As these won't be present for data written using the rabbitmq-stream
plugin.
Add exchange, routing key as message annotations
To the AMQP 1.0 formatted data to enable roundtrip.
Add osiris logging module config
And update logging config test suite.
Restart election when start of new leader fails
The node might have just gone down so we need to try another one
Only aux keeps track of phase now, as it might change if the leader election fails
Stream coordinator refactor - all state is kept on the ra machine
Ensure any ra cluster not a qq is not cleaned up
Fixes to recovery and monitoring
Add AMQP 1.0 common to dependencies
Add rabbit_msg_record module
To handle conversions into internal stream storage format.
Use rabbitmq-common stream-queue branch
Use SSH for osiris dependency
Stream coordinator: delete replica
Stream coordinator: add replica
Stream coordinator: leader failover
Stream coordinator: declare and delete
Test consuming from a random offset
Previous offsets should not be delivered to consumers
Consume from stream replicas and multiple test fixes
Use max-length-bytes and add new max-segment-size
Use SSH for osiris dependency
Basic cancel for stream queues
Publish stream queues and settle/reject/requeue refactor
Consume from stream queues
Fix recovery
Publish stream messages
Add/delete stream replicas
Use safe queue names
Set retention policy for stream queues
Required by the ctl command
[#171207092]
Stream queue delete queue
fix missing callback impl
Stream queue declare
Queue type abstraction
And use the implementing module as the value of the amqqueue record
`type` field. This will allow for easy dispatch to the queue type
implementation.
Queue type abstraction
Move queue declare into rabbit_queue_type
Move queue delete into queue type implementation
Queue type: dequeue/basic_get
Move info inside queue type abstraction
Move policy change into queue type interface
Add purge to queue type
Add recovery to the queue type interface
Rename amqqueue quorum_nodes field
To a more generic an extensible opaque queue type specific map.
Fix tests and handle classic API response
Fix HA queue confirm bug
All mirrors need to be present as queue names. This introduces context
linking allowing additional queue refs to be linked to a single "master"
queue ref contining the actual queue context.
Fix issue with events of deleted queues
Also update queue type smoke test to use a cluster by default.
correct default value of amqqueue getter
Move classic queues further inside queue type interface
why
[TrackerId]
Dialyzer fixes
Diffstat (limited to 'test')
| -rw-r--r-- | test/backing_queue_SUITE.erl | 77 | ||||
| -rw-r--r-- | test/channel_operation_timeout_SUITE.erl | 3 | ||||
| -rw-r--r-- | test/confirms_rejects_SUITE.erl | 17 | ||||
| -rw-r--r-- | test/dead_lettering_SUITE.erl | 6 | ||||
| -rw-r--r-- | test/dynamic_ha_SUITE.erl | 8 | ||||
| -rw-r--r-- | test/queue_parallel_SUITE.erl | 21 | ||||
| -rw-r--r-- | test/queue_type_SUITE.erl | 234 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 83 | ||||
| -rw-r--r-- | test/quorum_queue_utils.erl | 9 | ||||
| -rw-r--r-- | test/rabbit_confirms_SUITE.erl | 154 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/rabbit_fifo_int_SUITE.erl | 186 | ||||
| -rw-r--r-- | test/rabbit_ha_test_consumer.erl | 7 | ||||
| -rw-r--r-- | test/rabbit_msg_record_SUITE.erl | 213 | ||||
| -rw-r--r-- | test/rabbit_stream_queue_SUITE.erl | 1304 | ||||
| -rw-r--r-- | test/simple_ha_SUITE.erl | 4 | ||||
| -rw-r--r-- | test/unit_log_config_SUITE.erl | 24 |
17 files changed, 2193 insertions, 159 deletions
diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl index 2025576a57..ff37e1fb04 100644 --- a/test/backing_queue_SUITE.erl +++ b/test/backing_queue_SUITE.erl @@ -684,17 +684,17 @@ bq_variable_queue_delete_msg_store_files_callback1(Config) -> QPid = amqqueue:get_pid(Q), Payload = <<0:8388608>>, %% 1MB Count = 30, - publish_and_confirm(Q, Payload, Count), + QTState = publish_and_confirm(Q, Payload, Count), rabbit_amqqueue:set_ram_duration_target(QPid, 0), {ok, Limiter} = rabbit_limiter:start_link(no_id), CountMinusOne = Count - 1, - {ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} = - rabbit_amqqueue:basic_get(Q, self(), true, Limiter, + {ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}, _} = + rabbit_amqqueue:basic_get(Q, true, Limiter, <<"bq_variable_queue_delete_msg_store_files_callback1">>, - #{}), + QTState), {ok, CountMinusOne} = rabbit_amqqueue:purge(Q), %% give the queue a second to receive the close_fds callback msg @@ -713,8 +713,7 @@ bq_queue_recover1(Config) -> {new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>), QName = amqqueue:get_name(Q), QPid = amqqueue:get_pid(Q), - publish_and_confirm(Q, <<>>, Count), - + QT = publish_and_confirm(Q, <<>>, Count), SupPid = get_queue_sup_pid(Q), true = is_pid(SupPid), exit(SupPid, kill), @@ -724,7 +723,7 @@ bq_queue_recover1(Config) -> after 10000 -> exit(timeout_waiting_for_queue_death) end, rabbit_amqqueue:stop(?VHOST), - {Recovered, [], []} = rabbit_amqqueue:recover(?VHOST), + {Recovered, []} = rabbit_amqqueue:recover(?VHOST), rabbit_amqqueue:start(Recovered), {ok, Limiter} = rabbit_limiter:start_link(no_id), rabbit_amqqueue:with_or_die( @@ -732,9 +731,9 @@ bq_queue_recover1(Config) -> fun (Q1) when ?is_amqqueue(Q1) -> QPid1 = amqqueue:get_pid(Q1), CountMinusOne = Count - 1, - {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = - rabbit_amqqueue:basic_get(Q1, self(), false, Limiter, - <<"bq_queue_recover1">>, #{}), + {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}, _} = + rabbit_amqqueue:basic_get(Q1, false, Limiter, + <<"bq_queue_recover1">>, QT), exit(QPid1, shutdown), VQ1 = variable_queue_init(Q, true), {{_Msg1, true, _AckTag1}, VQ2} = @@ -1366,25 +1365,34 @@ variable_queue_init(Q, Recover) -> publish_and_confirm(Q, Payload, Count) -> Seqs = lists:seq(1, Count), - [begin - Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{delivery_mode = 2}, - Payload), - Delivery = #delivery{mandatory = false, sender = self(), - confirm = true, message = Msg, msg_seq_no = Seq, - flow = noflow}, - _QPids = rabbit_amqqueue:deliver([Q], Delivery) - end || Seq <- Seqs], - wait_for_confirms(gb_sets:from_list(Seqs)). + QTState0 = rabbit_queue_type:new(Q, rabbit_queue_type:init()), + QTState = + lists:foldl( + fun (Seq, Acc0) -> + Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{delivery_mode = 2}, + Payload), + Delivery = #delivery{mandatory = false, sender = self(), + confirm = true, message = Msg, msg_seq_no = Seq, + flow = noflow}, + {ok, Acc, _Actions} = rabbit_queue_type:deliver([Q], Delivery, Acc0), + Acc + end, QTState0, Seqs), + wait_for_confirms(gb_sets:from_list(Seqs)), + QTState. wait_for_confirms(Unconfirmed) -> case gb_sets:is_empty(Unconfirmed) of true -> ok; - false -> receive {'$gen_cast', {confirm, Confirmed, _}} -> + false -> receive {'$gen_cast', + {queue_event, _QName, + {confirm, Confirmed, _}}} -> wait_for_confirms( rabbit_misc:gb_sets_difference( Unconfirmed, gb_sets:from_list(Confirmed))) - after ?TIMEOUT -> exit(timeout_waiting_for_confirm) + after ?TIMEOUT -> + flush(), + exit(timeout_waiting_for_confirm) end end. @@ -1436,6 +1444,7 @@ variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) -> variable_queue_wait_for_shuffling_end( lists:foldl( fun (N, VQN) -> + rabbit_variable_queue:publish( rabbit_basic:message( rabbit_misc:r(<<>>, exchange, <<>>), @@ -1526,12 +1535,13 @@ variable_queue_status(VQ) -> variable_queue_wait_for_shuffling_end(VQ) -> case credit_flow:blocked() of false -> VQ; - true -> receive - {bump_credit, Msg} -> - credit_flow:handle_bump_msg(Msg), - variable_queue_wait_for_shuffling_end( - rabbit_variable_queue:resume(VQ)) - end + true -> + receive + {bump_credit, Msg} -> + credit_flow:handle_bump_msg(Msg), + variable_queue_wait_for_shuffling_end( + rabbit_variable_queue:resume(VQ)) + end end. msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) -> @@ -1576,11 +1586,13 @@ variable_queue_with_holes(VQ0) -> fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ7), %% assertions Status = variable_queue_status(VQ8), + vq_with_holes_assertions(VQ8, proplists:get_value(mode, Status)), Depth = Count + Interval, Depth = rabbit_variable_queue:depth(VQ8), Len = Depth - length(Subset3), Len = rabbit_variable_queue:len(VQ8), + {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + Interval), VQ8}. vq_with_holes_assertions(VQ, default) -> @@ -1604,3 +1616,12 @@ check_variable_queue_status(VQ0, Props) -> S = variable_queue_status(VQ1), assert_props(S, Props), VQ1. + +flush() -> + receive + Any -> + ct:pal("flush ~p", [Any]), + flush() + after 0 -> + ok + end. diff --git a/test/channel_operation_timeout_SUITE.erl b/test/channel_operation_timeout_SUITE.erl index f8da35d6ff..15e0188604 100644 --- a/test/channel_operation_timeout_SUITE.erl +++ b/test/channel_operation_timeout_SUITE.erl @@ -71,11 +71,13 @@ notify_down_all(Config) -> RabbitCh = rabbit_ct_client_helpers:open_channel(Config, 0), HareCh = rabbit_ct_client_helpers:open_channel(Config, 1), + ct:pal("one"), %% success set_channel_operation_timeout_config(Config, 1000), configure_bq(Config), QCfg0 = qconfig(RabbitCh, <<"q0">>, <<"ex0">>, true, false), declare(QCfg0), + ct:pal("two"), %% Testing rabbit_amqqueue:notify_down_all via rabbit_channel. %% Consumer count = 0 after correct channel termination and %% notification of queues via delegate:call/3 @@ -83,6 +85,7 @@ notify_down_all(Config) -> rabbit_ct_client_helpers:close_channel(RabbitCh), 0 = length(get_consumers(Config, Rabbit, ?DEFAULT_VHOST)), false = is_process_alive(RabbitCh), + ct:pal("three"), %% fail set_channel_operation_timeout_config(Config, 10), diff --git a/test/confirms_rejects_SUITE.erl b/test/confirms_rejects_SUITE.erl index aaaeb4a939..a51253885c 100644 --- a/test/confirms_rejects_SUITE.erl +++ b/test/confirms_rejects_SUITE.erl @@ -388,9 +388,12 @@ kill_the_queue(QueueName) -> [begin {ok, Q} = rabbit_amqqueue:lookup({resource, <<"/">>, queue, QueueName}), Pid = amqqueue:get_pid(Q), + ct:pal("~w killed", [Pid]), + timer:sleep(1), exit(Pid, kill) end - || _ <- lists:seq(1, 11)], + || _ <- lists:seq(1, 50)], + timer:sleep(1), {ok, Q} = rabbit_amqqueue:lookup({resource, <<"/">>, queue, QueueName}), Pid = amqqueue:get_pid(Q), case is_process_alive(Pid) of @@ -399,7 +402,11 @@ kill_the_queue(QueueName) -> false -> ok end. - - - - +flush() -> + receive + Any -> + ct:pal("flush ~p", [Any]), + flush() + after 0 -> + ok + end. diff --git a/test/dead_lettering_SUITE.erl b/test/dead_lettering_SUITE.erl index 87b5566c57..4ee917aa21 100644 --- a/test/dead_lettering_SUITE.erl +++ b/test/dead_lettering_SUITE.erl @@ -1059,9 +1059,11 @@ dead_letter_headers_BCC(Config) -> ?assertMatch({array, _}, rabbit_misc:table_lookup(Headers3, <<"x-death">>)). -%% Three top-level headers are added for the very first dead-lettering event. They are +%% Three top-level headers are added for the very first dead-lettering event. +%% They are %% x-first-death-reason, x-first-death-queue, x-first-death-exchange -%% They have the same values as the reason, queue, and exchange fields of the original +%% They have the same values as the reason, queue, and exchange fields of the +%% original %% dead lettering event. Once added, these headers are never modified. dead_letter_headers_first_death(Config) -> {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index 25027c7ef9..c881aef8a1 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -424,8 +424,7 @@ nodes_policy_should_pick_master_from_its_params(Config) -> nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, A), - ?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A], - [all])), + ?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A], [all])), %% --> Master: A %% Slaves: [B, C] or [C, B] SSPids = ?awaitMatch(SSPids when is_list(SSPids), @@ -450,7 +449,7 @@ nodes_policy_should_pick_master_from_its_params(Config) -> %% should instead use an existing synchronised mirror as the new master, %% even though that isn't in the policy. ?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A], - [{nodes, [LastSlave, A]}])), + [{nodes, [LastSlave, A]}])), %% --> Master: B or C (same as previous policy) %% Slaves: [A] @@ -931,6 +930,7 @@ apply_in_parallel(Config, Nodes, Policies) -> Self = self(), [spawn_link(fun() -> [begin + apply_policy(Config, N, Policy) end || Policy <- Policies], Self ! parallel_task_done @@ -969,7 +969,7 @@ wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries) -> %% Let's wait a bit longer. timer:sleep(1000), wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries - 1); - FinalInfo -> + {ok, FinalInfo} -> %% The last policy is the final state LastPolicy = lists:last(TestedPolicies), case verify_policy(LastPolicy, FinalInfo) of diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl index c4d16a5900..0fbf7ec975 100644 --- a/test/queue_parallel_SUITE.erl +++ b/test/queue_parallel_SUITE.erl @@ -57,7 +57,8 @@ groups() -> trigger_message_store_compaction]}, {quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}, {quorum_queue_in_memory_limit, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}, - {quorum_queue_in_memory_bytes, [parallel], AllTests ++ [delete_immediately_by_pid_fails]} + {quorum_queue_in_memory_bytes, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}, + {stream_queue, [parallel], AllTests} ]} ]. @@ -122,13 +123,24 @@ init_per_group(mirrored_queue, Config) -> {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, {queue_durable, true}]), rabbit_ct_helpers:run_steps(Config1, []); +init_per_group(stream_queue, Config) -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config, stream_queue) of + ok -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"stream">>}]}, + {queue_durable, true}]); + Skip -> + Skip + end; init_per_group(Group, Config0) -> case lists:member({group, Group}, all()) of true -> ClusterSize = 3, Config = rabbit_ct_helpers:merge_app_env( Config0, {rabbit, [{channel_tick_interval, 1000}, - {quorum_tick_interval, 1000}]}), + {quorum_tick_interval, 1000}, + {stream_tick_interval, 1000}]}), Config1 = rabbit_ct_helpers:set_config( Config, [ {rmq_nodename_suffix, Group}, {rmq_nodes_count, ClusterSize} @@ -514,6 +526,11 @@ basic_cancel(Config) -> publish(Ch, QName, [<<"msg1">>]), wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), CTag = atom_to_binary(?FUNCTION_NAME, utf8), + + %% Let's set consumer prefetch so it works with stream queues + ?assertMatch(#'basic.qos_ok'{}, + amqp_channel:call(Ch, #'basic.qos'{global = false, + prefetch_count = 1})), subscribe(Ch, QName, false, CTag), receive {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> diff --git a/test/queue_type_SUITE.erl b/test/queue_type_SUITE.erl new file mode 100644 index 0000000000..eeeabc3d1e --- /dev/null +++ b/test/queue_type_SUITE.erl @@ -0,0 +1,234 @@ +-module(queue_type_SUITE). + +-compile(export_all). + +-export([ + ]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +%%%=================================================================== +%%% Common Test callbacks +%%%=================================================================== + +all() -> + [ + {group, classic}, + {group, quorum} + ]. + + +all_tests() -> + [ + smoke + ]. + +groups() -> + [ + {classic, [], all_tests()}, + {quorum, [], all_tests()} + ]. + +init_per_suite(Config0) -> + rabbit_ct_helpers:log_environment(), + Config = rabbit_ct_helpers:merge_app_env( + Config0, {rabbit, [{quorum_tick_interval, 1000}]}), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config), + ok. + +init_per_group(Group, Config) -> + ClusterSize = 3, + Config1 = rabbit_ct_helpers:set_config(Config, + [{rmq_nodes_count, ClusterSize}, + {rmq_nodename_suffix, Group}, + {tcp_ports_base}]), + Config1b = rabbit_ct_helpers:set_config(Config1, + [{queue_type, atom_to_binary(Group, utf8)}, + {net_ticktime, 10}]), + Config2 = rabbit_ct_helpers:run_steps(Config1b, + [fun merge_app_env/1 ] ++ + rabbit_ct_broker_helpers:setup_steps()), + Config3 = + case rabbit_ct_broker_helpers:enable_feature_flag(Config2, quorum_queue) of + ok -> + ok = rabbit_ct_broker_helpers:rpc( + Config2, 0, application, set_env, + [rabbit, channel_tick_interval, 100]), + %% HACK: the larger cluster sizes benefit for a bit more time + %% after clustering before running the tests. + case Group of + cluster_size_5 -> + timer:sleep(5000), + Config2; + _ -> + Config2 + end; + Skip -> + end_per_group(Group, Config2), + Skip + end, + rabbit_ct_broker_helpers:set_policy( + Config3, 0, + <<"ha-policy">>, <<".*">>, <<"queues">>, + [{<<"ha-mode">>, <<"all">>}]), + Config3. + +merge_app_env(Config) -> + rabbit_ct_helpers:merge_app_env( + rabbit_ct_helpers:merge_app_env(Config, + {rabbit, + [{core_metrics_gc_interval, 100}, + {log, [{file, [{level, debug}]}]}]}), + {ra, [{min_wal_roll_over_interval, 30000}]}). + +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), + Q = rabbit_data_coercion:to_binary(Testcase), + Config2 = rabbit_ct_helpers:set_config(Config1, + [{queue_name, Q}, + {alt_queue_name, <<Q/binary, "_alt">>} + ]), + rabbit_ct_helpers:run_steps(Config2, + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config) -> + catch delete_queues(), + Config1 = rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_client_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%%%=================================================================== +%%% Test cases +%%%=================================================================== + +smoke(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QName = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, + ?config(queue_type, Config)}])), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, QName, <<"msg1">>), + ct:pal("waiting for confirms from ~s", [QName]), + ok = receive + #'basic.ack'{} -> ok; + #'basic.nack'{} -> fail + after 2500 -> + flush(), + exit(confirm_timeout) + end, + DTag = basic_get(Ch, QName), + + basic_ack(Ch, DTag), + basic_get_empty(Ch, QName), + + %% consume + publish(Ch, QName, <<"msg2">>), + ConsumerTag1 = <<"ctag1">>, + ok = subscribe(Ch, QName, ConsumerTag1), + %% receive and ack + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, + #amqp_msg{}} -> + basic_ack(Ch, DeliveryTag) + after 5000 -> + flush(), + exit(basic_deliver_timeout) + end, + basic_cancel(Ch, ConsumerTag1), + + %% assert empty + basic_get_empty(Ch, QName), + + %% consume and nack + ConsumerTag2 = <<"ctag2">>, + ok = subscribe(Ch, QName, ConsumerTag2), + publish(Ch, QName, <<"msg3">>), + receive + {#'basic.deliver'{delivery_tag = T, + redelivered = false}, + #amqp_msg{}} -> + basic_cancel(Ch, ConsumerTag2), + basic_nack(Ch, T) + after 5000 -> + exit(basic_deliver_timeout) + end, + %% get and ack + basic_ack(Ch, basic_get(Ch, QName)), + ok. + +%% Utility +delete_queues() -> + [rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) + || Q <- rabbit_amqqueue:list()]. + +declare(Ch, Q, Args) -> + amqp_channel:call(Ch, #'queue.declare'{queue = Q, + durable = true, + auto_delete = false, + arguments = Args}). + +publish(Ch, Queue, Msg) -> + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = Queue}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = Msg}). + +basic_get(Ch, Queue) -> + {GetOk, _} = Reply = amqp_channel:call(Ch, #'basic.get'{queue = Queue, + no_ack = false}), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{}}, Reply), + GetOk#'basic.get_ok'.delivery_tag. + +basic_get_empty(Ch, Queue) -> + ?assertMatch(#'basic.get_empty'{}, + amqp_channel:call(Ch, #'basic.get'{queue = Queue, + no_ack = false})). + +subscribe(Ch, Queue, CTag) -> + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, + no_ack = false, + consumer_tag = CTag}, + self()), + receive + #'basic.consume_ok'{consumer_tag = CTag} -> + ok + after 5000 -> + exit(basic_consume_timeout) + end. + +basic_ack(Ch, DTag) -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag, + multiple = false}). + +basic_cancel(Ch, CTag) -> + #'basic.cancel_ok'{} = + amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}). + +basic_nack(Ch, DTag) -> + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag, + requeue = true, + multiple = false}). + +flush() -> + receive + Any -> + ct:pal("flush ~p", [Any]), + flush() + after 0 -> + ok + end. diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index ecb4fdac63..16042b71e8 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -383,13 +383,19 @@ start_queue(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config, Server), LQ = ?config(queue_name, Config), + %% The stream coordinator is also a ra process, we need to ensure the quorum tests + %% are not affected by any other ra cluster that could be added in the future + Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + ?assertEqual({'queue.declare_ok', LQ, 0, 0}, declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), %% Check that the application and one ra node are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Expected = Children + 1, + ?assertMatch(Expected, + length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))), %% Test declare an existing queue ?assertEqual({'queue.declare_ok', LQ, 0, 0}, @@ -405,7 +411,8 @@ start_queue(Config) -> %% Check that the application and process are still up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])). + ?assertMatch(Expected, + length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))). start_queue_concurrent(Config) -> Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -463,6 +470,10 @@ quorum_cluster_size_x(Config, Max, Expected) -> stop_queue(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + %% The stream coordinator is also a ra process, we need to ensure the quorum tests + %% are not affected by any other ra cluster that could be added in the future + Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), LQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', LQ, 0, 0}, @@ -471,13 +482,15 @@ stop_queue(Config) -> %% Check that the application and one ra node are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Expected = Children + 1, + ?assertMatch(Expected, + length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))), %% Delete the quorum queue ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = LQ})), %% Check that the application and process are down wait_until(fun() -> - [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]) + Children == length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])) end), ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))). @@ -485,6 +498,10 @@ stop_queue(Config) -> restart_queue(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + %% The stream coordinator is also a ra process, we need to ensure the quorum tests + %% are not affected by any other ra cluster that could be added in the future + Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), LQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', LQ, 0, 0}, @@ -496,7 +513,9 @@ restart_queue(Config) -> %% Check that the application and one ra node are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])). + Expected = Children + 1, + ?assertMatch(Expected, + length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))). idempotent_recover(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), @@ -554,6 +573,10 @@ restart_all_types(Config) -> %% ensure there are no regressions Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + %% The stream coordinator is also a ra process, we need to ensure the quorum tests + %% are not affected by any other ra cluster that could be added in the future + Children = rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ1 = <<"restart_all_types-qq1">>, ?assertEqual({'queue.declare_ok', QQ1, 0, 0}, @@ -575,7 +598,9 @@ restart_all_types(Config) -> %% Check that the application and two ra nodes are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Expected = length(Children) + 2, + Got = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + ?assertMatch(Expected, Got), %% Check the classic queues restarted correctly Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), {#'basic.get_ok'{}, #amqp_msg{}} = @@ -592,6 +617,10 @@ stop_start_rabbit_app(Config) -> %% classic) to ensure there are no regressions Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + %% The stream coordinator is also a ra process, we need to ensure the quorum tests + %% are not affected by any other ra cluster that could be added in the future + Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ1 = <<"stop_start_rabbit_app-qq">>, ?assertEqual({'queue.declare_ok', QQ1, 0, 0}, @@ -617,7 +646,9 @@ stop_start_rabbit_app(Config) -> %% Check that the application and two ra nodes are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Expected = Children + 2, + ?assertMatch(Expected, + length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))), %% Check the classic queues restarted correctly Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), {#'basic.get_ok'{}, #amqp_msg{}} = @@ -935,6 +966,10 @@ cleanup_queue_state_on_channel_after_publish(Config) -> %% to verify that the cleanup is propagated through channels [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + %% The stream coordinator is also a ra process, we need to ensure the quorum tests + %% are not affected by any other ra cluster that could be added in the future + Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), @@ -955,18 +990,22 @@ cleanup_queue_state_on_channel_after_publish(Config) -> ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})), wait_until(fun() -> - [] == rpc:call(Server, supervisor, which_children, - [ra_server_sup_sup]) + Children == length(rpc:call(Server, supervisor, which_children, + [ra_server_sup_sup])) end), %% Check that all queue states have been cleaned - wait_for_cleanup(Server, NCh1, 0), - wait_for_cleanup(Server, NCh2, 0). + wait_for_cleanup(Server, NCh2, 0), + wait_for_cleanup(Server, NCh1, 0). cleanup_queue_state_on_channel_after_subscribe(Config) -> %% Declare/delete the queue and publish in one channel, while consuming on a %% different one to verify that the cleanup is propagated through channels [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + %% The stream coordinator is also a ra process, we need to ensure the quorum tests + %% are not affected by any other ra cluster that could be added in the future + Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), @@ -993,7 +1032,7 @@ cleanup_queue_state_on_channel_after_subscribe(Config) -> wait_for_cleanup(Server, NCh2, 1), ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})), wait_until(fun() -> - [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]) + Children == length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])) end), %% Check that all queue states have been cleaned wait_for_cleanup(Server, NCh1, 0), @@ -1596,8 +1635,8 @@ cleanup_data_dir(Config) -> declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), timer:sleep(100), - [{_, UId1}] = rpc:call(Server1, ra_directory, list_registered, []), - [{_, UId2}] = rpc:call(Server2, ra_directory, list_registered, []), + UId1 = proplists:get_value(ra_name(QQ), rpc:call(Server1, ra_directory, list_registered, [])), + UId2 = proplists:get_value(ra_name(QQ), rpc:call(Server2, ra_directory, list_registered, [])), DataDir1 = rpc:call(Server1, ra_env, server_data_dir, [UId1]), DataDir2 = rpc:call(Server2, ra_env, server_data_dir, [UId2]), ?assert(filelib:is_dir(DataDir1)), @@ -1748,6 +1787,11 @@ reconnect_consumer_and_wait_channel_down(Config) -> delete_immediately_by_resource(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + + %% The stream coordinator is also a ra process, we need to ensure the quorum tests + %% are not affected by any other ra cluster that could be added in the future + Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), @@ -1756,7 +1800,7 @@ delete_immediately_by_resource(Config) -> %% Check that the application and process are down wait_until(fun() -> - [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]) + Children == length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])) end), ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))). @@ -1784,6 +1828,8 @@ subscribe_redelivery_count(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = true}) + after 5000 -> + exit(basic_deliver_timeout) end, receive @@ -1794,6 +1840,8 @@ subscribe_redelivery_count(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, multiple = false, requeue = true}) + after 5000 -> + exit(basic_deliver_timeout_2) end, receive @@ -1803,8 +1851,13 @@ subscribe_redelivery_count(Config) -> ?assertMatch({DCHeader, _, 2}, rabbit_basic:header(DCHeader, H2)), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2, multiple = false}), + ct:pal("wait_for_messages_ready", []), wait_for_messages_ready(Servers, RaName, 0), + ct:pal("wait_for_messages_pending_ack", []), wait_for_messages_pending_ack(Servers, RaName, 0) + after 5000 -> + flush(500), + exit(basic_deliver_timeout_3) end. subscribe_redelivery_limit(Config) -> diff --git a/test/quorum_queue_utils.erl b/test/quorum_queue_utils.erl index 95ddc892f1..caabd617ae 100644 --- a/test/quorum_queue_utils.erl +++ b/test/quorum_queue_utils.erl @@ -28,15 +28,10 @@ wait_for_messages_total(Servers, QName, Total) -> wait_for_messages(Servers, QName, Number, Fun, 0) -> Msgs = dirty_query(Servers, QName, Fun), - Totals = lists:map(fun(M) when is_map(M) -> - maps:size(M); - (_) -> - -1 - end, Msgs), - ?assertEqual(Totals, [Number || _ <- lists:seq(1, length(Servers))]); + ?assertEqual(Msgs, [Number || _ <- lists:seq(1, length(Servers))]); wait_for_messages(Servers, QName, Number, Fun, N) -> Msgs = dirty_query(Servers, QName, Fun), - ct:pal("Got messages ~p", [Msgs]), + ct:pal("Got messages ~p ~p", [QName, Msgs]), %% hack to allow the check to succeed in mixed versions clusters if at %% least one node matches the criteria rather than all nodes for F = case is_mixed_versions() of diff --git a/test/rabbit_confirms_SUITE.erl b/test/rabbit_confirms_SUITE.erl new file mode 100644 index 0000000000..331c3ca7c3 --- /dev/null +++ b/test/rabbit_confirms_SUITE.erl @@ -0,0 +1,154 @@ +-module(rabbit_confirms_SUITE). + +-compile(export_all). + +-export([ + ]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +%%%=================================================================== +%%% Common Test callbacks +%%%=================================================================== + +all() -> + [ + {group, tests} + ]. + + +all_tests() -> + [ + confirm, + reject, + remove_queue + ]. + +groups() -> + [ + {tests, [], all_tests()} + ]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + ok. + +%%%=================================================================== +%%% Test cases +%%%=================================================================== + +confirm(_Config) -> + XName = rabbit_misc:r(<<"/">>, exchange, <<"X">>), + QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>), + QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>), + U0 = rabbit_confirms:init(), + ?assertEqual(0, rabbit_confirms:size(U0)), + ?assertEqual(undefined, rabbit_confirms:smallest(U0)), + ?assertEqual(true, rabbit_confirms:is_empty(U0)), + + U1 = rabbit_confirms:insert(1, [QName], XName, U0), + ?assertEqual(1, rabbit_confirms:size(U1)), + ?assertEqual(1, rabbit_confirms:smallest(U1)), + ?assertEqual(false, rabbit_confirms:is_empty(U1)), + + {[{1, XName}], U2} = rabbit_confirms:confirm([1], QName, U1), + ?assertEqual(0, rabbit_confirms:size(U2)), + ?assertEqual(undefined, rabbit_confirms:smallest(U2)), + ?assertEqual(true, rabbit_confirms:is_empty(U2)), + + U3 = rabbit_confirms:insert(2, [QName], XName, U1), + ?assertEqual(2, rabbit_confirms:size(U3)), + ?assertEqual(1, rabbit_confirms:smallest(U3)), + ?assertEqual(false, rabbit_confirms:is_empty(U3)), + + {[{1, XName}], U4} = rabbit_confirms:confirm([1], QName, U3), + ?assertEqual(1, rabbit_confirms:size(U4)), + ?assertEqual(2, rabbit_confirms:smallest(U4)), + ?assertEqual(false, rabbit_confirms:is_empty(U4)), + + U5 = rabbit_confirms:insert(2, [QName, QName2], XName, U1), + ?assertEqual(2, rabbit_confirms:size(U5)), + ?assertEqual(1, rabbit_confirms:smallest(U5)), + ?assertEqual(false, rabbit_confirms:is_empty(U5)), + + {[{1, XName}], U6} = rabbit_confirms:confirm([1, 2], QName, U5), + ?assertEqual(2, rabbit_confirms:smallest(U6)), + + {[{2, XName}], U7} = rabbit_confirms:confirm([2], QName2, U6), + ?assertEqual(0, rabbit_confirms:size(U7)), + ?assertEqual(undefined, rabbit_confirms:smallest(U7)), + + + U8 = rabbit_confirms:insert(2, [QName], XName, U1), + {[{1, XName}, {2, XName}], _U9} = rabbit_confirms:confirm([1, 2], QName, U8), + ok. + + +reject(_Config) -> + XName = rabbit_misc:r(<<"/">>, exchange, <<"X">>), + QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>), + QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>), + U0 = rabbit_confirms:init(), + ?assertEqual(0, rabbit_confirms:size(U0)), + ?assertEqual(undefined, rabbit_confirms:smallest(U0)), + ?assertEqual(true, rabbit_confirms:is_empty(U0)), + + U1 = rabbit_confirms:insert(1, [QName], XName, U0), + + {ok, {1, XName}, U2} = rabbit_confirms:reject(1, U1), + {error, not_found} = rabbit_confirms:reject(1, U2), + ?assertEqual(0, rabbit_confirms:size(U2)), + ?assertEqual(undefined, rabbit_confirms:smallest(U2)), + + U3 = rabbit_confirms:insert(2, [QName, QName2], XName, U1), + + {ok, {1, XName}, U4} = rabbit_confirms:reject(1, U3), + {error, not_found} = rabbit_confirms:reject(1, U4), + ?assertEqual(1, rabbit_confirms:size(U4)), + ?assertEqual(2, rabbit_confirms:smallest(U4)), + + {ok, {2, XName}, U5} = rabbit_confirms:reject(2, U3), + {error, not_found} = rabbit_confirms:reject(2, U5), + ?assertEqual(1, rabbit_confirms:size(U5)), + ?assertEqual(1, rabbit_confirms:smallest(U5)), + + ok. + +remove_queue(_Config) -> + XName = rabbit_misc:r(<<"/">>, exchange, <<"X">>), + QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>), + QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>), + U0 = rabbit_confirms:init(), + + U1 = rabbit_confirms:insert(1, [QName, QName2], XName, U0), + U2 = rabbit_confirms:insert(2, [QName2], XName, U1), + {[{2, XName}], U3} = rabbit_confirms:remove_queue(QName2, U2), + ?assertEqual(1, rabbit_confirms:size(U3)), + ?assertEqual(1, rabbit_confirms:smallest(U3)), + {[{1, XName}], U4} = rabbit_confirms:remove_queue(QName, U3), + ?assertEqual(0, rabbit_confirms:size(U4)), + ?assertEqual(undefined, rabbit_confirms:smallest(U4)), + + U5 = rabbit_confirms:insert(1, [QName], XName, U0), + U6 = rabbit_confirms:insert(2, [QName], XName, U5), + {[{1, XName}, {2, XName}], _U} = rabbit_confirms:remove_queue(QName, U6), + + ok. + + +%% Utility diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index d19dcb3682..7b90d91bfa 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -674,7 +674,7 @@ single_active_consumer_basic_get_test(_) -> ?assertEqual(single_active, State0#rabbit_fifo.cfg#cfg.consumer_strategy), ?assertEqual(0, map_size(State0#rabbit_fifo.consumers)), {State1, _} = enq(1, 1, first, State0), - {_State, {error, unsupported}} = + {_State, {error, {unsupported, single_active_consumer}}} = apply(meta(2), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), State1), ok. diff --git a/test/rabbit_fifo_int_SUITE.erl b/test/rabbit_fifo_int_SUITE.erl index b51975b062..b2ed7160a2 100644 --- a/test/rabbit_fifo_int_SUITE.erl +++ b/test/rabbit_fifo_int_SUITE.erl @@ -86,7 +86,7 @@ basics(Config) -> CustomerTag = UId, ok = start_cluster(ClusterName, [ServerId]), FState0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, undefined, FState0), + {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, #{}, FState0), ra_log_wal:force_roll_over(ra_log_wal), % create segment the segment will trigger a snapshot @@ -99,11 +99,10 @@ basics(Config) -> FState5 = receive {ra_event, From, Evt} -> case rabbit_fifo_client:handle_ra_event(From, Evt, FState3) of - {internal, _AcceptedSeqs, _Actions, _FState4} -> - exit(unexpected_internal_event); - {{delivery, C, [{MsgId, _Msg}]}, FState4} -> - {ok, S} = rabbit_fifo_client:settle(C, [MsgId], - FState4), + {ok, FState4, + [{deliver, C, true, + [{_Qname, _QRef, MsgId, _SomBool, _Msg}]}]} -> + {S, _A} = rabbit_fifo_client:settle(C, [MsgId], FState4), S end after 5000 -> @@ -129,10 +128,9 @@ basics(Config) -> receive {ra_event, Frm, E} -> case rabbit_fifo_client:handle_ra_event(Frm, E, FState6b) of - {internal, _, _, _FState7} -> - exit({unexpected_internal_event, E}); - {{delivery, Ctag, [{Mid, {_, two}}]}, FState7} -> - {ok, _S} = rabbit_fifo_client:return(Ctag, [Mid], FState7), + {ok, FState7, [{deliver, Ctag, true, + [{_, _, Mid, _, two}]}]} -> + {_, _} = rabbit_fifo_client:return(Ctag, [Mid], FState7), ok end after 2000 -> @@ -150,8 +148,8 @@ return(Config) -> {ok, F0} = rabbit_fifo_client:enqueue(1, msg1, F00), {ok, F1} = rabbit_fifo_client:enqueue(2, msg2, F0), {_, _, F2} = process_ra_events(receive_ra_events(2, 0), F1), - {ok, {{MsgId, _}, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2), - {ok, _F2} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F), + {ok, _, {_, _, MsgId, _, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2), + _F2 = rabbit_fifo_client:return(<<"tag">>, [MsgId], F), ra:stop_server(ServerId), ok. @@ -165,9 +163,9 @@ rabbit_fifo_returns_correlation(Config) -> receive {ra_event, Frm, E} -> case rabbit_fifo_client:handle_ra_event(Frm, E, F1) of - {internal, [corr1], [], _F2} -> + {ok, _F2, [{settled, _, _}]} -> ok; - {Del, _} -> + Del -> exit({unexpected, Del}) end after 2000 -> @@ -181,23 +179,24 @@ duplicate_delivery(Config) -> ServerId = ?config(node_id, Config), ok = start_cluster(ClusterName, [ServerId]), F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0), {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1), Fun = fun Loop(S0) -> receive {ra_event, Frm, E} = Evt -> case rabbit_fifo_client:handle_ra_event(Frm, E, S0) of - {internal, [corr1], [], S1} -> + {ok, S1, [{settled, _, _}]} -> Loop(S1); - {_Del, S1} -> + {ok, S1, _} -> %% repeat event delivery self() ! Evt, %% check that then next received delivery doesn't %% repeat or crash receive {ra_event, F, E1} -> - case rabbit_fifo_client:handle_ra_event(F, E1, S1) of - {internal, [], [], S2} -> + case rabbit_fifo_client:handle_ra_event( + F, E1, S1) of + {ok, S2, _} -> S2 end end @@ -215,7 +214,7 @@ usage(Config) -> ServerId = ?config(node_id, Config), ok = start_cluster(ClusterName, [ServerId]), F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0), {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1), {ok, F3} = rabbit_fifo_client:enqueue(corr2, msg2, F2), {_, _, _} = process_ra_events(receive_ra_events(2, 2), F3), @@ -242,9 +241,9 @@ resends_lost_command(Config) -> meck:unload(ra), {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2), {_, _, F4} = process_ra_events(receive_ra_events(2, 0), F3), - {ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4), - {ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), - {ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), + {ok, _, {_, _, _, _, msg1}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4), + {ok, _, {_, _, _, _, msg2}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), + {ok, _, {_, _, _, _, msg3}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), ra:stop_server(ServerId), ok. @@ -268,7 +267,7 @@ detects_lost_delivery(Config) -> F000 = rabbit_fifo_client:init(ClusterName, [ServerId]), {ok, F00} = rabbit_fifo_client:enqueue(msg1, F000), {_, _, F0} = process_ra_events(receive_ra_events(1, 0), F00), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0), {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2), % lose first delivery @@ -298,13 +297,13 @@ returns_after_down(Config) -> _Pid = spawn(fun () -> F = rabbit_fifo_client:init(ClusterName, [ServerId]), {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10, - undefined, F), + #{}, F), Self ! checkout_done end), receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end, timer:sleep(1000), % message should be available for dequeue - {ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2), + {ok, _, {_, _, _, _, msg1}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2), ra:stop_server(ServerId), ok. @@ -327,9 +326,9 @@ resends_after_lost_applied(Config) -> % send another message {ok, F4} = rabbit_fifo_client:enqueue(msg3, F3), {_, _, F5} = process_ra_events(receive_ra_events(1, 0), F4), - {ok, {{_, {_, msg1}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), - {ok, {{_, {_, msg2}}, _}, F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), - {ok, {{_, {_, msg3}}, _}, _F8} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F7), + {ok, _, {_, _, _, _, msg1}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), + {ok, _, {_, _, _, _, msg2}, F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), + {ok, _, {_, _, _, _, msg3}, _F8} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F7), ra:stop_server(ServerId), ok. @@ -377,15 +376,16 @@ discard(Config) -> _ = ra:members(ServerId), F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0), {ok, F2} = rabbit_fifo_client:enqueue(msg1, F1), - F3 = discard_next_delivery(F2, 500), - {ok, empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3), + F3 = discard_next_delivery(F2, 5000), + {empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3), receive {dead_letter, Letters} -> [{_, msg1}] = Letters, ok after 500 -> + flush(), exit(dead_letter_timeout) end, ra:stop_server(ServerId), @@ -397,11 +397,11 @@ cancel_checkout(Config) -> ok = start_cluster(ClusterName, [ServerId]), F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), {ok, F1} = rabbit_fifo_client:enqueue(m1, F0), - {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1), + {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F1), {_, _, F3} = process_ra_events(receive_ra_events(1, 1), F2, [], [], fun (_, S) -> S end), {ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3), - {ok, F5} = rabbit_fifo_client:return(<<"tag">>, [0], F4), - {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5), + {F5, _} = rabbit_fifo_client:return(<<"tag">>, [0], F4), + {ok, _, {_, _, _, _, m1}, F5} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5), ok. credit(Config) -> @@ -413,20 +413,20 @@ credit(Config) -> {ok, F2} = rabbit_fifo_client:enqueue(m2, F1), {_, _, F3} = process_ra_events(receive_ra_events(2, 0), F2), %% checkout with 0 prefetch - {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, undefined, F3), + {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, #{}, F3), %% assert no deliveries {_, _, F5} = process_ra_events(receive_ra_events(), F4, [], [], fun (D, _) -> error({unexpected_delivery, D}) end), %% provide some credit - {ok, F6} = rabbit_fifo_client:credit(<<"tag">>, 1, false, F5), - {[{_, {_, m1}}], [{send_credit_reply, _}], F7} = + F6 = rabbit_fifo_client:credit(<<"tag">>, 1, false, F5), + {[{_, _, _, _, m1}], [{send_credit_reply, _}], F7} = process_ra_events(receive_ra_events(1, 1), F6), %% credit and drain - {ok, F8} = rabbit_fifo_client:credit(<<"tag">>, 4, true, F7), - {[{_, {_, m2}}], [{send_credit_reply, _}, {send_drained, _}], F9} = + F8 = rabbit_fifo_client:credit(<<"tag">>, 4, true, F7), + {[{_, _, _, _, m2}], [{send_credit_reply, _}, {send_drained, _}], F9} = process_ra_events(receive_ra_events(1, 1), F8), flush(), @@ -439,9 +439,8 @@ credit(Config) -> (D, _) -> error({unexpected_delivery, D}) end), %% credit again and receive the last message - {ok, F12} = rabbit_fifo_client:credit(<<"tag">>, 10, false, F11), - {[{_, {_, m3}}], [{send_credit_reply, _}], _} = - process_ra_events(receive_ra_events(1, 1), F12), + F12 = rabbit_fifo_client:credit(<<"tag">>, 10, false, F11), + {[{_, _, _, _, m3}], _, _} = process_ra_events(receive_ra_events(1, 1), F12), ok. untracked_enqueue(Config) -> @@ -452,7 +451,7 @@ untracked_enqueue(Config) -> ok = rabbit_fifo_client:untracked_enqueue([ServerId], msg1), timer:sleep(100), F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0), + {ok, _, {_, _, _, _, msg1}, _F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0), ra:stop_server(ServerId), ok. @@ -472,6 +471,7 @@ flow(Config) -> ok. test_queries(Config) -> + % ok = logger:set_primary_config(level, all), ClusterName = ?config(cluster_name, Config), ServerId = ?config(node_id, Config), ok = start_cluster(ClusterName, [ServerId]), @@ -484,20 +484,23 @@ test_queries(Config) -> Self ! ready, receive stop -> ok end end), + receive + ready -> ok + after 5000 -> + exit(ready_timeout) + end, F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), - ok = receive ready -> ok after 5000 -> timeout end, - {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, undefined, F0), - ?assertMatch({ok, {_RaIdxTerm, 1}, _Leader}, - ra:local_query(ServerId, - fun rabbit_fifo:query_messages_ready/1)), - ?assertMatch({ok, {_RaIdxTerm, 1}, _Leader}, - ra:local_query(ServerId, - fun rabbit_fifo:query_messages_checked_out/1)), - ?assertMatch({ok, {_RaIdxTerm, Processes}, _Leader} - when length(Processes) == 2, - ra:local_query(ServerId, - fun rabbit_fifo:query_processes/1)), - P ! stop, + {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, #{}, F0), + {ok, {_, Ready}, _} = ra:local_query(ServerId, + fun rabbit_fifo:query_messages_ready/1), + ?assertEqual(1, Ready), + {ok, {_, Checked}, _} = ra:local_query(ServerId, + fun rabbit_fifo:query_messages_checked_out/1), + ?assertEqual(1, Checked), + {ok, {_, Processes}, _} = ra:local_query(ServerId, + fun rabbit_fifo:query_processes/1), + ?assertEqual(2, length(Processes)), + P ! stop, ra:stop_server(ServerId), ok. @@ -511,15 +514,16 @@ dequeue(Config) -> Tag = UId, ok = start_cluster(ClusterName, [ServerId]), F1 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, empty, F1b} = rabbit_fifo_client:dequeue(Tag, settled, F1), + {empty, F1b} = rabbit_fifo_client:dequeue(Tag, settled, F1), {ok, F2_} = rabbit_fifo_client:enqueue(msg1, F1b), {_, _, F2} = process_ra_events(receive_ra_events(1, 0), F2_), - {ok, {{0, {_, msg1}}, _}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2), + % {ok, {{0, {_, msg1}}, _}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2), + {ok, _, {_, _, 0, _, msg1}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2), {ok, F4_} = rabbit_fifo_client:enqueue(msg2, F3), {_, _, F4} = process_ra_events(receive_ra_events(1, 0), F4_), - {ok, {{MsgId, {_, msg2}}, _}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4), - {ok, _F6} = rabbit_fifo_client:settle(Tag, [MsgId], F5), + {ok, _, {_, _, MsgId, _, msg2}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4), + {_F6, _A} = rabbit_fifo_client:settle(Tag, [MsgId], F5), ra:stop_server(ServerId), ok. @@ -534,8 +538,8 @@ conf(ClusterName, UId, ServerId, _, Peers) -> process_ra_event(State, Wait) -> receive {ra_event, From, Evt} -> - {internal, _, _, S} = - rabbit_fifo_client:handle_ra_event(From, Evt, State), + {ok, S, _Actions} = + rabbit_fifo_client:handle_ra_event(From, Evt, State), S after Wait -> exit(ra_event_timeout) @@ -572,10 +576,10 @@ receive_ra_events(Acc) -> end. process_ra_events(Events, State) -> - DeliveryFun = fun ({delivery, Tag, Msgs}, S) -> + DeliveryFun = fun ({deliver, _, Tag, Msgs}, S) -> MsgIds = [element(1, M) || M <- Msgs], - {ok, S2} = rabbit_fifo_client:settle(Tag, MsgIds, S), - S2 + {S0, _} = rabbit_fifo_client:settle(Tag, MsgIds, S), + S0 end, process_ra_events(Events, State, [], [], DeliveryFun). @@ -583,43 +587,41 @@ process_ra_events([], State0, Acc, Actions0, _DeliveryFun) -> {Acc, Actions0, State0}; process_ra_events([{ra_event, From, Evt} | Events], State0, Acc, Actions0, DeliveryFun) -> case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of - {internal, _, Actions, State} -> - process_ra_events(Events, State, Acc, Actions0 ++ Actions, DeliveryFun); - {{delivery, _Tag, Msgs} = Del, State1} -> - State = DeliveryFun(Del, State1), - process_ra_events(Events, State, Acc ++ Msgs, Actions0, DeliveryFun); + {ok, State1, Actions1} -> + {Msgs, Actions, State} = + lists:foldl( + fun ({deliver, _, _, Msgs} = Del, {M, A, S}) -> + {M ++ Msgs, A, DeliveryFun(Del, S)}; + (Ac, {M, A, S}) -> + {M, A ++ [Ac], S} + end, {Acc, [], State1}, Actions1), + process_ra_events(Events, State, Msgs, Actions0 ++ Actions, DeliveryFun); eol -> eol end. discard_next_delivery(State0, Wait) -> receive - {ra_event, From, Evt} -> - case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of - {internal, _, _Actions, State} -> - discard_next_delivery(State, Wait); - {{delivery, Tag, Msgs}, State1} -> - MsgIds = [element(1, M) || M <- Msgs], - {ok, State} = rabbit_fifo_client:discard(Tag, MsgIds, - State1), - State - end + {ra_event, _, {machine, {delivery, _, _}}} = Evt -> + element(3, process_ra_events([Evt], State0, [], [], + fun ({deliver, Tag, _, Msgs}, S) -> + MsgIds = [element(3, M) || M <- Msgs], + {S0, _} = rabbit_fifo_client:discard(Tag, MsgIds, S), + S0 + end)) after Wait -> - State0 + State0 end. return_next_delivery(State0, Wait) -> receive - {ra_event, From, Evt} -> - case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of - {internal, _, _, State} -> - return_next_delivery(State, Wait); - {{delivery, Tag, Msgs}, State1} -> - MsgIds = [element(1, M) || M <- Msgs], - {ok, State} = rabbit_fifo_client:return(Tag, MsgIds, - State1), - State - end + {ra_event, _, {machine, {delivery, _, _}}} = Evt -> + element(3, process_ra_events([Evt], State0, [], [], + fun ({deliver, Tag, _, Msgs}, S) -> + MsgIds = [element(3, M) || M <- Msgs], + {S0, _} = rabbit_fifo_client:return(Tag, MsgIds, S), + S0 + end)) after Wait -> State0 end. diff --git a/test/rabbit_ha_test_consumer.erl b/test/rabbit_ha_test_consumer.erl index 3324a1253c..2467e40028 100644 --- a/test/rabbit_ha_test_consumer.erl +++ b/test/rabbit_ha_test_consumer.erl @@ -51,12 +51,15 @@ run(TestPid, Channel, Queue, CancelOnFailover, LowestSeen, MsgsToConsume) -> %% counter. if MsgNum + 1 == LowestSeen -> + error_logger:info_msg("recording ~w left ~w", + [MsgNum, MsgsToConsume]), run(TestPid, Channel, Queue, CancelOnFailover, MsgNum, MsgsToConsume - 1); MsgNum >= LowestSeen -> error_logger:info_msg( - "consumer ~p on ~p ignoring redelivered msg ~p~n", - [self(), Channel, MsgNum]), + "consumer ~p on ~p ignoring redelivered msg ~p" + "lowest seen ~w~n", + [self(), Channel, MsgNum, LowestSeen]), true = Redelivered, %% ASSERTION run(TestPid, Channel, Queue, CancelOnFailover, LowestSeen, MsgsToConsume); diff --git a/test/rabbit_msg_record_SUITE.erl b/test/rabbit_msg_record_SUITE.erl new file mode 100644 index 0000000000..a82ba7481d --- /dev/null +++ b/test/rabbit_msg_record_SUITE.erl @@ -0,0 +1,213 @@ +-module(rabbit_msg_record_SUITE). + +-compile(export_all). + +-export([ + ]). + +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp10_common/include/amqp10_framing.hrl"). + +%%%=================================================================== +%%% Common Test callbacks +%%%=================================================================== + +all() -> + [ + {group, tests} + ]. + + +all_tests() -> + [ + ampq091_roundtrip, + message_id_ulong, + message_id_uuid, + message_id_binary, + message_id_large_binary, + message_id_large_string + ]. + +groups() -> + [ + {tests, [], all_tests()} + ]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + ok. + +%%%=================================================================== +%%% Test cases +%%%=================================================================== + +ampq091_roundtrip(_Config) -> + Props = #'P_basic'{content_type = <<"text/plain">>, + content_encoding = <<"gzip">>, + headers = [{<<"x-stream-offset">>, long, 99}, + {<<"x-string">>, longstr, <<"a string">>}, + {<<"x-bool">>, bool, false}, + {<<"x-unsignedbyte">>, unsignedbyte, 1}, + {<<"x-unsignedshort">>, unsignedshort, 1}, + {<<"x-unsignedint">>, unsignedint, 1}, + {<<"x-signedint">>, signedint, 1}, + {<<"x-timestamp">>, timestamp, 1}, + {<<"x-double">>, double, 1.0}, + {<<"x-float">>, float, 1.0}, + {<<"x-binary">>, binary, <<"data">>} + ], + delivery_mode = 2, + priority = 99, + correlation_id = <<"corr">> , + reply_to = <<"reply-to">>, + expiration = <<"1">>, + message_id = <<"msg-id">>, + timestamp = 99, + type = <<"45">>, + user_id = <<"banana">>, + app_id = <<"rmq">> + % cluster_id = <<"adf">> + }, + Payload = [<<"data">>], + test_amqp091_roundtrip(Props, Payload), + test_amqp091_roundtrip(#'P_basic'{}, Payload), + ok. + +message_id_ulong(_Config) -> + Num = 9876789, + ULong = erlang:integer_to_binary(Num), + P = #'v1_0.properties'{message_id = {ulong, Num}, + correlation_id = {ulong, Num}}, + D = #'v1_0.data'{content = <<"data">>}, + Bin = [amqp10_framing:encode_bin(P), + amqp10_framing:encode_bin(D)], + R = rabbit_msg_record:init(iolist_to_binary(Bin)), + {Props, _} = rabbit_msg_record:to_amqp091(R), + ?assertMatch(#'P_basic'{message_id = ULong, + correlation_id = ULong, + headers = + [ + %% ordering shouldn't matter + {<<"x-correlation-id-type">>, longstr, <<"ulong">>}, + {<<"x-message-id-type">>, longstr, <<"ulong">>} + ]}, + Props), + ok. + +message_id_uuid(_Config) -> + %% fake a uuid + UUId = erlang:md5(term_to_binary(make_ref())), + TextUUId = rabbit_data_coercion:to_binary(rabbit_guid:to_string(UUId)), + P = #'v1_0.properties'{message_id = {uuid, UUId}, + correlation_id = {uuid, UUId}}, + D = #'v1_0.data'{content = <<"data">>}, + Bin = [amqp10_framing:encode_bin(P), + amqp10_framing:encode_bin(D)], + R = rabbit_msg_record:init(iolist_to_binary(Bin)), + {Props, _} = rabbit_msg_record:to_amqp091(R), + ?assertMatch(#'P_basic'{message_id = TextUUId, + correlation_id = TextUUId, + headers = + [ + %% ordering shouldn't matter + {<<"x-correlation-id-type">>, longstr, <<"uuid">>}, + {<<"x-message-id-type">>, longstr, <<"uuid">>} + ]}, + Props), + ok. + +message_id_binary(_Config) -> + %% fake a uuid + Orig = <<"asdfasdf">>, + Text = base64:encode(Orig), + P = #'v1_0.properties'{message_id = {binary, Orig}, + correlation_id = {binary, Orig}}, + D = #'v1_0.data'{content = <<"data">>}, + Bin = [amqp10_framing:encode_bin(P), + amqp10_framing:encode_bin(D)], + R = rabbit_msg_record:init(iolist_to_binary(Bin)), + {Props, _} = rabbit_msg_record:to_amqp091(R), + ?assertMatch(#'P_basic'{message_id = Text, + correlation_id = Text, + headers = + [ + %% ordering shouldn't matter + {<<"x-correlation-id-type">>, longstr, <<"binary">>}, + {<<"x-message-id-type">>, longstr, <<"binary">>} + ]}, + Props), + ok. + +message_id_large_binary(_Config) -> + %% cannot fit in a shortstr + Orig = crypto:strong_rand_bytes(500), + P = #'v1_0.properties'{message_id = {binary, Orig}, + correlation_id = {binary, Orig}}, + D = #'v1_0.data'{content = <<"data">>}, + Bin = [amqp10_framing:encode_bin(P), + amqp10_framing:encode_bin(D)], + R = rabbit_msg_record:init(iolist_to_binary(Bin)), + {Props, _} = rabbit_msg_record:to_amqp091(R), + ?assertMatch(#'P_basic'{message_id = undefined, + correlation_id = undefined, + headers = + [ + %% ordering shouldn't matter + {<<"x-correlation-id">>, longstr, Orig}, + {<<"x-message-id">>, longstr, Orig} + ]}, + Props), + ok. + +message_id_large_string(_Config) -> + %% cannot fit in a shortstr + Orig = base64:encode(crypto:strong_rand_bytes(500)), + P = #'v1_0.properties'{message_id = {utf8, Orig}, + correlation_id = {utf8, Orig}}, + D = #'v1_0.data'{content = <<"data">>}, + Bin = [amqp10_framing:encode_bin(P), + amqp10_framing:encode_bin(D)], + R = rabbit_msg_record:init(iolist_to_binary(Bin)), + {Props, _} = rabbit_msg_record:to_amqp091(R), + ?assertMatch(#'P_basic'{message_id = undefined, + correlation_id = undefined, + headers = + [ + %% ordering shouldn't matter + {<<"x-correlation-id">>, longstr, Orig}, + {<<"x-message-id">>, longstr, Orig} + ]}, + Props), + ok. + +%% Utility + +test_amqp091_roundtrip(Props, Payload) -> + MsgRecord0 = rabbit_msg_record:from_amqp091(Props, Payload), + MsgRecord = rabbit_msg_record:init( + iolist_to_binary(rabbit_msg_record:to_iodata(MsgRecord0))), + % meck:unload(), + {PropsOut, PayloadOut} = rabbit_msg_record:to_amqp091(MsgRecord), + ?assertEqual(Props, PropsOut), + ?assertEqual(iolist_to_binary(Payload), + iolist_to_binary(PayloadOut)), + ok. + + diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl new file mode 100644 index 0000000000..67ca8eba8b --- /dev/null +++ b/test/rabbit_stream_queue_SUITE.erl @@ -0,0 +1,1304 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% Copyright (c) 2012-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_stream_queue_SUITE). + +-include_lib("proper/include/proper.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +suite() -> + [{timetrap, 5 * 60000}]. + +all() -> + [ + {group, single_node}, + {group, cluster_size_2}, + {group, cluster_size_3}, + {group, unclustered_size_3_1}, + {group, unclustered_size_3_2}, + {group, unclustered_size_3_3}, + {group, cluster_size_3_1} + ]. + +groups() -> + [ + {single_node, [], [restart_single_node] ++ all_tests()}, + {cluster_size_2, [], all_tests()}, + {cluster_size_3, [], all_tests() ++ + [delete_replica, + delete_down_replica, + delete_classic_replica, + delete_quorum_replica, + consume_from_replica, + leader_failover]}, + {unclustered_size_3_1, [], [add_replica]}, + {unclustered_size_3_2, [], [consume_without_local_replica]}, + {unclustered_size_3_3, [], [grow_coordinator_cluster]}, + {cluster_size_3_1, [], [shrink_coordinator_cluster]} + ]. + +all_tests() -> + [ + declare_args, + declare_max_age, + declare_invalid_args, + declare_invalid_properties, + declare_queue, + delete_queue, + publish, + publish_confirm, + recover, + consume_without_qos, + consume, + consume_offset, + basic_get, + consume_with_autoack, + consume_and_nack, + consume_and_ack, + consume_and_reject, + consume_from_last, + consume_from_next, + consume_from_default, + consume_credit, + consume_credit_out_of_order_ack, + consume_credit_multiple_ack, + basic_cancel, + max_length_bytes, + max_age, + invalid_policy, + max_age_policy, + max_segment_size_policy + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config0) -> + rabbit_ct_helpers:log_environment(), + Config = rabbit_ct_helpers:merge_app_env( + Config0, {rabbit, [{stream_tick_interval, 1000}, + {log, [{file, [{level, debug}]}]}]}), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(Group, Config) -> + ClusterSize = case Group of + single_node -> 1; + cluster_size_2 -> 2; + cluster_size_3 -> 3; + cluster_size_3_1 -> 3; + unclustered_size_3_1 -> 3; + unclustered_size_3_2 -> 3; + unclustered_size_3_3 -> 3 + end, + Clustered = case Group of + unclustered_size_3_1 -> false; + unclustered_size_3_2 -> false; + unclustered_size_3_3 -> false; + _ -> true + end, + Config1 = rabbit_ct_helpers:set_config(Config, + [{rmq_nodes_count, ClusterSize}, + {rmq_nodename_suffix, Group}, + {tcp_ports_base}, + {rmq_nodes_clustered, Clustered}]), + Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]), + Ret = rabbit_ct_helpers:run_steps(Config1b, + [fun merge_app_env/1 ] ++ + rabbit_ct_broker_helpers:setup_steps()), + case Ret of + {skip, _} -> + Ret; + Config2 -> + EnableFF = rabbit_ct_broker_helpers:enable_feature_flag( + Config2, stream_queue), + case EnableFF of + ok -> + ok = rabbit_ct_broker_helpers:rpc( + Config2, 0, application, set_env, + [rabbit, channel_tick_interval, 100]), + Config2; + Skip -> + end_per_group(Group, Config2), + Skip + end + end. + +end_per_group(_, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), + Q = rabbit_data_coercion:to_binary(Testcase), + Config2 = rabbit_ct_helpers:set_config(Config1, [{queue_name, Q}]), + rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()). + +merge_app_env(Config) -> + rabbit_ct_helpers:merge_app_env(Config, + {rabbit, [{core_metrics_gc_interval, 100}]}). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), + Config1 = rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_client_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +declare_args(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-max-length">>, long, 2000}])), + assert_queue_type(Server, Q, rabbit_stream_queue). + +declare_max_age(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), Q, + [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-max-age">>, longstr, <<"1A">>}])), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-max-age">>, longstr, <<"1Y">>}])), + assert_queue_type(Server, Q, rabbit_stream_queue). + +declare_invalid_properties(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Q = ?config(queue_name, Config), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + amqp_channel:call( + rabbit_ct_client_helpers:open_channel(Config, Server), + #'queue.declare'{queue = Q, + auto_delete = true, + durable = true, + arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]})), + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + amqp_channel:call( + rabbit_ct_client_helpers:open_channel(Config, Server), + #'queue.declare'{queue = Q, + exclusive = true, + durable = true, + arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]})), + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + amqp_channel:call( + rabbit_ct_client_helpers:open_channel(Config, Server), + #'queue.declare'{queue = Q, + durable = false, + arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]})). + +declare_invalid_args(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Q = ?config(queue_name, Config), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-expires">>, long, 2000}])), + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-message-ttl">>, long, 2000}])), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-max-priority">>, long, 2000}])), + + [?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-overflow">>, longstr, XOverflow}])) + || XOverflow <- [<<"reject-publish">>, <<"reject-publish-dlx">>]], + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-mode">>, longstr, <<"lazy">>}])), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-quorum-initial-group-size">>, longstr, <<"hop">>}])). + +declare_queue(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + %% Test declare an existing queue + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + ?assertMatch([_], rpc:call(Server, supervisor, which_children, + [osiris_server_sup])), + + %% Test declare an existing queue with different arguments + ?assertExit(_, declare(Ch, Q, [])). + +delete_queue(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})). + +add_replica(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Q = ?config(queue_name, Config), + + %% Let's also try the add replica command on other queue types, it should fail + %% We're doing it in the same test for efficiency, otherwise we have to + %% start new rabbitmq clusters every time for a minor testcase + QClassic = <<Q/binary, "_classic">>, + QQuorum = <<Q/binary, "_quorum">>, + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + ?assertEqual({'queue.declare_ok', QClassic, 0, 0}, + declare(Ch, QClassic, [{<<"x-queue-type">>, longstr, <<"classic">>}])), + ?assertEqual({'queue.declare_ok', QQuorum, 0, 0}, + declare(Ch, QQuorum, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + %% Not a member of the cluster, what would happen? + ?assertEqual({error, node_not_running}, + rpc:call(Server0, rabbit_stream_queue, add_replica, + [<<"/">>, Q, Server1])), + ?assertEqual({error, classic_queue_not_supported}, + rpc:call(Server0, rabbit_stream_queue, add_replica, + [<<"/">>, QClassic, Server1])), + ?assertEqual({error, quorum_queue_not_supported}, + rpc:call(Server0, rabbit_stream_queue, add_replica, + [<<"/">>, QQuorum, Server1])), + + ok = rabbit_control_helper:command(stop_app, Server1), + ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), + rabbit_control_helper:command(start_app, Server1), + timer:sleep(1000), + ?assertEqual({error, classic_queue_not_supported}, + rpc:call(Server0, rabbit_stream_queue, add_replica, + [<<"/">>, QClassic, Server1])), + ?assertEqual({error, quorum_queue_not_supported}, + rpc:call(Server0, rabbit_stream_queue, add_replica, + [<<"/">>, QQuorum, Server1])), + ?assertEqual(ok, + rpc:call(Server0, rabbit_stream_queue, add_replica, + [<<"/">>, Q, Server1])), + %% replicas must be recorded on the state, and if we publish messages then they must + %% be stored on disk + check_leader_and_replicas(Config, Q, Server0, [Server1]), + %% And if we try again? Idempotent + ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, add_replica, + [<<"/">>, Q, Server1])), + %% Add another node + ok = rabbit_control_helper:command(stop_app, Server2), + ok = rabbit_control_helper:command(join_cluster, Server2, [atom_to_list(Server0)], []), + rabbit_control_helper:command(start_app, Server2), + ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, add_replica, + [<<"/">>, Q, Server2])), + check_leader_and_replicas(Config, Q, Server0, [Server1, Server2]). + +delete_replica(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + check_leader_and_replicas(Config, Q, Server0, [Server1, Server2]), + %% Not a member of the cluster, what would happen? + ?assertEqual({error, node_not_running}, + rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, 'zen@rabbit'])), + ?assertEqual(ok, + rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, Server1])), + %% check it's gone + check_leader_and_replicas(Config, Q, Server0, [Server2]), + %% And if we try again? Idempotent + ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, Server1])), + %% Delete the last replica + ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, Server2])), + check_leader_and_replicas(Config, Q, Server0, []). + +grow_coordinator_cluster(Config) -> + [Server0, Server1, _Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + ok = rabbit_control_helper:command(stop_app, Server1), + ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), + rabbit_control_helper:command(start_app, Server1), + + rabbit_ct_helpers:await_condition( + fun() -> + case rpc:call(Server0, ra, members, [{rabbit_stream_coordinator, Server0}]) of + {_, Members, _} -> + Nodes = lists:sort([N || {_, N} <- Members]), + lists:sort([Server0, Server1]) == Nodes; + _ -> + false + end + end, 60000). + +shrink_coordinator_cluster(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + ok = rabbit_control_helper:command(stop_app, Server2), + ok = rabbit_control_helper:command(forget_cluster_node, Server0, [atom_to_list(Server2)], []), + + rabbit_ct_helpers:await_condition( + fun() -> + case rpc:call(Server0, ra, members, [{rabbit_stream_coordinator, Server0}]) of + {_, Members, _} -> + Nodes = lists:sort([N || {_, N} <- Members]), + lists:sort([Server0, Server1]) == Nodes; + _ -> + false + end + end, 60000). + +delete_classic_replica(Config) -> + [Server0, Server1, _Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"classic">>}])), + %% Not a member of the cluster, what would happen? + ?assertEqual({error, classic_queue_not_supported}, + rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, 'zen@rabbit'])), + ?assertEqual({error, classic_queue_not_supported}, + rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, Server1])). + +delete_quorum_replica(Config) -> + [Server0, Server1, _Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + %% Not a member of the cluster, what would happen? + ?assertEqual({error, quorum_queue_not_supported}, + rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, 'zen@rabbit'])), + ?assertEqual({error, quorum_queue_not_supported}, + rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, Server1])). + +delete_down_replica(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + check_leader_and_replicas(Config, Q, Server0, [Server1, Server2]), + ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), + ?assertEqual({error, node_not_running}, + rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, Server1])), + %% check it isn't gone + check_leader_and_replicas(Config, Q, Server0, [Server1, Server2]), + ok = rabbit_ct_broker_helpers:start_node(Config, Server1), + ?assertEqual(ok, + rpc:call(Server0, rabbit_stream_queue, delete_replica, + [<<"/">>, Q, Server1])). + +publish(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + publish(Ch, Q), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]). + +publish_confirm(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, Q), + amqp_channel:wait_for_confirms(Ch, 5000), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]). + +restart_single_node(Config) -> + [Server] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + publish(Ch, Q), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]), + + rabbit_control_helper:command(stop_app, Server), + rabbit_control_helper:command(start_app, Server), + + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]), + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + publish(Ch1, Q), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"2">>, <<"0">>]]). + +recover(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + publish(Ch, Q), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]), + + [rabbit_ct_broker_helpers:stop_node(Config, S) || S <- Servers], + [rabbit_ct_broker_helpers:start_node(Config, S) || S <- lists:reverse(Servers)], + + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]), + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + publish(Ch1, Q), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"2">>, <<"0">>]]). + +consume_without_qos(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + ?assertExit({{shutdown, {server_initiated_close, 406, _}}, _}, + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, consumer_tag = <<"ctag">>}, + self())). + +consume_without_local_replica(Config) -> + [Server0, Server1 | _] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + %% Add another node to the cluster, but it won't have a replica + ok = rabbit_control_helper:command(stop_app, Server1), + ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), + rabbit_control_helper:command(start_app, Server1), + timer:sleep(1000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1), + qos(Ch1, 10, false), + ?assertExit({{shutdown, {server_initiated_close, 406, _}}, _}, + amqp_channel:subscribe(Ch1, #'basic.consume'{queue = Q, consumer_tag = <<"ctag">>}, + self())). + +consume(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, Q), + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + subscribe(Ch1, Q, false, 0), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + _ = amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}), + ok = amqp_channel:close(Ch1), + ok + after 5000 -> + exit(timeout) + end. + +consume_offset(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + Payload = << <<"1">> || _ <- lists:seq(1, 500) >>, + [publish(Ch, Q, Payload) || _ <- lists:seq(1, 1000)], + amqp_channel:wait_for_confirms(Ch, 5000), + + run_proper( + fun () -> + ?FORALL(Offset, range(0, 999), + begin + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + subscribe(Ch1, Q, false, Offset), + receive_batch(Ch1, Offset, 999), + receive + {_, + #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S}]}}} + when S < Offset -> + exit({unexpected_offset, S}) + after 1000 -> + ok + end, + amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}), + true + end) + end, [], 25). + +basic_get(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _}, + amqp_channel:call(Ch, #'basic.get'{queue = Q})). + +consume_with_autoack(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + + ?assertExit( + {{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _}, + subscribe(Ch1, Q, true, 0)). + +consume_and_nack(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, Q), + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + subscribe(Ch1, Q, false, 0), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + ok = amqp_channel:cast(Ch1, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}), + %% Nack will throw a not implemented exception. As it is a cast operation, + %% we'll detect the conneciton/channel closure on the next call. + %% Let's try to redeclare and see what happens + ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _}, + declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])) + after 10000 -> + exit(timeout) + end. + +basic_cancel(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, Q), + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + subscribe(Ch1, Q, false, 0), + rabbit_ct_helpers:await_condition( + fun() -> + 1 == length(rabbit_ct_broker_helpers:rpc(Config, Server, ets, tab2list, + [consumer_created])) + end, 30000), + receive + {#'basic.deliver'{}, _} -> + amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}), + ?assertMatch([], rabbit_ct_broker_helpers:rpc(Config, Server, ets, tab2list, [consumer_created])) + after 10000 -> + exit(timeout) + end. + +consume_and_reject(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, Q), + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + subscribe(Ch1, Q, false, 0), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + ok = amqp_channel:cast(Ch1, #'basic.reject'{delivery_tag = DeliveryTag, + requeue = true}), + %% Reject will throw a not implemented exception. As it is a cast operation, + %% we'll detect the conneciton/channel closure on the next call. + %% Let's try to redeclare and see what happens + ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _}, + declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])) + after 10000 -> + exit(timeout) + end. + +consume_and_ack(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, Q), + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + subscribe(Ch1, Q, false, 0), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + %% It will succeed as ack is now a credit operation. We should be + %% able to redeclare a queue (gen_server call op) as the channel + %% should still be open and declare is an idempotent operation + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]) + after 5000 -> + exit(timeout) + end. + +consume_from_last(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + + [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [committed_offset]]), + + %% We'll receive data from the last committed offset, let's check that is not the + %% first offset + CommittedOffset = proplists:get_value(committed_offset, Info), + ?assert(CommittedOffset > 0), + + %% If the offset is not provided, we're subscribing to the tail of the stream + amqp_channel:subscribe( + Ch1, #'basic.consume'{queue = Q, + no_ack = false, + consumer_tag = <<"ctag">>, + arguments = [{<<"x-stream-offset">>, longstr, <<"last">>}]}, + self()), + receive + #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + ok + end, + + %% And receive the messages from the last committed offset to the end of the stream + receive_batch(Ch1, CommittedOffset, 99), + + %% Publish a few more + [publish(Ch, Q, <<"msg2">>) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch, 5000), + + %% Yeah! we got them + receive_batch(Ch1, 100, 199). + +consume_from_next(Config) -> + consume_from_next(Config, [{<<"x-stream-offset">>, longstr, <<"next">>}]). + +consume_from_default(Config) -> + consume_from_next(Config, []). + +consume_from_next(Config, Args) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 10, false), + + [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [committed_offset]]), + + %% We'll receive data from the last committed offset, let's check that is not the + %% first offset + CommittedOffset = proplists:get_value(committed_offset, Info), + ?assert(CommittedOffset > 0), + + %% If the offset is not provided, we're subscribing to the tail of the stream + amqp_channel:subscribe( + Ch1, #'basic.consume'{queue = Q, + no_ack = false, + consumer_tag = <<"ctag">>, + arguments = Args}, + self()), + receive + #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + ok + end, + + %% Publish a few more + [publish(Ch, Q, <<"msg2">>) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch, 5000), + + %% Yeah! we got them + receive_batch(Ch1, 100, 199). + +consume_from_replica(Config) -> + [Server1, Server2 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch1, self()), + [publish(Ch1, Q, <<"msg1">>) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch1, 5000), + + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2), + qos(Ch2, 10, false), + + subscribe(Ch2, Q, false, 0), + receive_batch(Ch2, 0, 99). + +consume_credit(Config) -> + %% Because osiris provides one chunk on every read and we don't want to buffer + %% messages in the broker to avoid memory penalties, the credit value won't + %% be strict - we allow it into the negative values. + %% We can test that after receiving a chunk, no more messages are delivered until + %% the credit goes back to a positive value. + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + %% Let's publish a big batch, to ensure we have more than a chunk available + NumMsgs = 100, + [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, NumMsgs)], + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + + %% Let's subscribe with a small credit, easier to test + Credit = 2, + qos(Ch1, Credit, false), + subscribe(Ch1, Q, false, 0), + + %% Receive everything + DeliveryTags = receive_batch(), + + %% We receive at least the given credit as we know there are 100 messages in the queue + ?assert(length(DeliveryTags) >= Credit), + + %% Let's ack as many messages as we can while avoiding a positive credit for new deliveries + {ToAck, Pending} = lists:split(length(DeliveryTags) - Credit, DeliveryTags), + + [ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}) + || DeliveryTag <- ToAck], + + %% Nothing here, this is good + receive + {#'basic.deliver'{}, _} -> + exit(unexpected_delivery) + after 1000 -> + ok + end, + + %% Let's ack one more, we should receive a new chunk + ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = hd(Pending), + multiple = false}), + + %% Yeah, here is the new chunk! + receive + {#'basic.deliver'{}, _} -> + ok + after 5000 -> + exit(timeout) + end. + +consume_credit_out_of_order_ack(Config) -> + %% Like consume_credit but acknowledging the messages out of order. + %% We want to ensure it doesn't behave like multiple, that is if we have + %% credit 2 and received 10 messages, sending the ack for the message id + %% number 10 should only increase credit by 1. + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + %% Let's publish a big batch, to ensure we have more than a chunk available + NumMsgs = 100, + [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, NumMsgs)], + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + + %% Let's subscribe with a small credit, easier to test + Credit = 2, + qos(Ch1, Credit, false), + subscribe(Ch1, Q, false, 0), + + %% ******* This is the difference with consume_credit + %% Receive everything, let's reverse the delivery tags here so we ack out of order + DeliveryTags = lists:reverse(receive_batch()), + + %% We receive at least the given credit as we know there are 100 messages in the queue + ?assert(length(DeliveryTags) >= Credit), + + %% Let's ack as many messages as we can while avoiding a positive credit for new deliveries + {ToAck, Pending} = lists:split(length(DeliveryTags) - Credit, DeliveryTags), + + [ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}) + || DeliveryTag <- ToAck], + + %% Nothing here, this is good + receive + {#'basic.deliver'{}, _} -> + exit(unexpected_delivery) + after 1000 -> + ok + end, + + %% Let's ack one more, we should receive a new chunk + ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = hd(Pending), + multiple = false}), + + %% Yeah, here is the new chunk! + receive + {#'basic.deliver'{}, _} -> + ok + after 5000 -> + exit(timeout) + end. + +consume_credit_multiple_ack(Config) -> + %% Like consume_credit but acknowledging the messages out of order. + %% We want to ensure it doesn't behave like multiple, that is if we have + %% credit 2 and received 10 messages, sending the ack for the message id + %% number 10 should only increase credit by 1. + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + %% Let's publish a big batch, to ensure we have more than a chunk available + NumMsgs = 100, + [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, NumMsgs)], + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + + %% Let's subscribe with a small credit, easier to test + Credit = 2, + qos(Ch1, Credit, false), + subscribe(Ch1, Q, false, 0), + + %% ******* This is the difference with consume_credit + %% Receive everything, let's reverse the delivery tags here so we ack out of order + DeliveryTag = lists:last(receive_batch()), + + ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = true}), + + %% Yeah, here is the new chunk! + receive + {#'basic.deliver'{}, _} -> + ok + after 5000 -> + exit(timeout) + end. + +max_length_bytes(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-max-length-bytes">>, long, 500}, + {<<"x-max-segment-size">>, long, 250}])), + + Payload = << <<"1">> || _ <- lists:seq(1, 500) >>, + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + [publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch, 5000), + + %% We don't yet have reliable metrics, as the committed offset doesn't work + %% as a counter once we start applying retention policies. + %% Let's wait for messages and hope these are less than the number of published ones + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 100, false), + subscribe(Ch1, Q, false, 0), + + ?assert(length(receive_batch()) < 100). + +max_age(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-max-age">>, longstr, <<"10s">>}, + {<<"x-max-segment-size">>, long, 250}])), + + Payload = << <<"1">> || _ <- lists:seq(1, 500) >>, + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + [publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch, 5000), + + timer:sleep(10000), + + %% Let's publish again so the new segments will trigger the retention policy + [publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch, 5000), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch1, 200, false), + subscribe(Ch1, Q, false, 0), + ?assertEqual(100, length(receive_batch())). + +leader_failover(Config) -> + [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + #'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch1, self()), + [publish(Ch1, Q, <<"msg">>) || _ <- lists:seq(1, 100)], + amqp_channel:wait_for_confirms(Ch1, 5000), + + check_leader_and_replicas(Config, Q, Server1, [Server2, Server3]), + + ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), + timer:sleep(30000), + + [Info] = lists:filter( + fun(Props) -> + QName = rabbit_misc:r(<<"/">>, queue, Q), + lists:member({name, QName}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader, members]])), + NewLeader = proplists:get_value(leader, Info), + ?assert(NewLeader =/= Server1), + ok = rabbit_ct_broker_helpers:start_node(Config, Server1). + +invalid_policy(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"ha">>, <<"invalid_policy.*">>, <<"queues">>, + [{<<"ha-mode">>, <<"all">>}]), + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"ttl">>, <<"invalid_policy.*">>, <<"queues">>, + [{<<"message-ttl">>, 5}]), + + [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [policy, operator_policy, + effective_policy_definition]]), + + ?assertEqual('', proplists:get_value(policy, Info)), + ?assertEqual('', proplists:get_value(operator_policy, Info)), + ?assertEqual([], proplists:get_value(effective_policy_definition, Info)), + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ha">>), + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl">>). + +max_age_policy(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"age">>, <<"max_age_policy.*">>, <<"queues">>, + [{<<"max-age">>, <<"1Y">>}]), + + [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [policy, operator_policy, + effective_policy_definition]]), + + ?assertEqual(<<"age">>, proplists:get_value(policy, Info)), + ?assertEqual('', proplists:get_value(operator_policy, Info)), + ?assertEqual([{<<"max-age">>, <<"1Y">>}], + proplists:get_value(effective_policy_definition, Info)), + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"age">>). + +max_segment_size_policy(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"segment">>, <<"max_segment_size.*">>, <<"queues">>, + [{<<"max-segment-size">>, 5000}]), + + [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [policy, operator_policy, + effective_policy_definition]]), + + ?assertEqual(<<"segment">>, proplists:get_value(policy, Info)), + ?assertEqual('', proplists:get_value(operator_policy, Info)), + ?assertEqual([{<<"max-segment-size">>, 5000}], + proplists:get_value(effective_policy_definition, Info)), + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"segment">>). + +%%---------------------------------------------------------------------------- + +delete_queues() -> + [{ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) + || Q <- rabbit_amqqueue:list()]. + +declare(Ch, Q) -> + declare(Ch, Q, []). + +declare(Ch, Q, Args) -> + amqp_channel:call(Ch, #'queue.declare'{queue = Q, + durable = true, + auto_delete = false, + arguments = Args}). +assert_queue_type(Server, Q, Expected) -> + Actual = get_queue_type(Server, Q), + Expected = Actual. + +get_queue_type(Server, Q0) -> + QNameRes = rabbit_misc:r(<<"/">>, queue, Q0), + {ok, Q1} = rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]), + amqqueue:get_type(Q1). + +check_leader_and_replicas(Config, Name, Leader, Replicas0) -> + QNameRes = rabbit_misc:r(<<"/">>, queue, Name), + [Info] = lists:filter( + fun(Props) -> + lists:member({name, QNameRes}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader, members]])), + ?assertEqual(Leader, proplists:get_value(leader, Info)), + Replicas = lists:sort(Replicas0), + ?assertEqual(Replicas, lists:sort(proplists:get_value(members, Info))). + +publish(Ch, Queue) -> + publish(Ch, Queue, <<"msg">>). + +publish(Ch, Queue, Msg) -> + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = Queue}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = Msg}). + +subscribe(Ch, Queue, NoAck, Offset) -> + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, + no_ack = NoAck, + consumer_tag = <<"ctag">>, + arguments = [{<<"x-stream-offset">>, long, Offset}]}, + self()), + receive + #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + ok + end. + +qos(Ch, Prefetch, Global) -> + ?assertMatch(#'basic.qos_ok'{}, + amqp_channel:call(Ch, #'basic.qos'{global = Global, + prefetch_count = Prefetch})). + +receive_batch(Ch, N, N) -> + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, + #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, N}]}}} -> + ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}) + after 5000 -> + exit({missing_offset, N}) + end; +receive_batch(Ch, N, M) -> + receive + {_, + #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S}]}}} + when S < N -> + exit({unexpected_offset, S}); + {#'basic.deliver'{delivery_tag = DeliveryTag}, + #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, N}]}}} -> + ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + receive_batch(Ch, N + 1, M) + after 5000 -> + exit({missing_offset, N}) + end. + +receive_batch() -> + receive_batch([]). + +receive_batch(Acc) -> + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + receive_batch([DeliveryTag | Acc]) + after 5000 -> + lists:reverse(Acc) + end. + +run_proper(Fun, Args, NumTests) -> + ?assertEqual( + true, + proper:counterexample( + erlang:apply(Fun, Args), + [{numtests, NumTests}, + {on_output, fun(".", _) -> ok; % don't print the '.'s on new lines + (F, A) -> ct:pal(?LOW_IMPORTANCE, F, A) + end}])). diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl index 013e625159..8b2c1d6ebb 100644 --- a/test/simple_ha_SUITE.erl +++ b/test/simple_ha_SUITE.erl @@ -234,8 +234,10 @@ consume_survives(Config, DeathFun(Config, A), %% verify that the consumer got all msgs, or die - the await_response %% calls throw an exception if anything goes wrong.... - rabbit_ha_test_consumer:await_response(ConsumerPid), + ct:pal("awaiting produce ~w", [ProducerPid]), rabbit_ha_test_producer:await_response(ProducerPid), + ct:pal("awaiting consumer ~w", [ConsumerPid]), + rabbit_ha_test_consumer:await_response(ConsumerPid), ok. confirms_survive(Config, DeathFun) -> diff --git a/test/unit_log_config_SUITE.erl b/test/unit_log_config_SUITE.erl index 3610fd1a80..6be403fd3e 100644 --- a/test/unit_log_config_SUITE.erl +++ b/test/unit_log_config_SUITE.erl @@ -126,6 +126,10 @@ sink_rewrite_sinks() -> {rabbit_log_mirroring_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]}, + {rabbit_log_osiris_lager_event, + [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, + {rabbit_handlers, + [{lager_forwarder_backend,[lager_event,info]}]}]}, {rabbit_log_prelaunch_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]}, @@ -226,6 +230,10 @@ sink_handlers_merged_with_lager_extra_sinks_handlers(_) -> {rabbit_log_mirroring_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]}, + {rabbit_log_osiris_lager_event, + [{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}, + {rabbit_handlers, + [{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]}, {rabbit_log_prelaunch_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]}, @@ -317,6 +325,10 @@ level_sinks() -> {rabbit_log_mirroring_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,error]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,error]}]}]}, + {rabbit_log_osiris_lager_event, + [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, + {rabbit_handlers, + [{lager_forwarder_backend,[lager_event,info]}]}]}, {rabbit_log_prelaunch_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]}, @@ -427,6 +439,10 @@ file_sinks(DefaultLevel) -> {rabbit_log_mirroring_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]}, + {rabbit_log_osiris_lager_event, + [{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}, + {rabbit_handlers, + [{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]}, {rabbit_log_prelaunch_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]}, @@ -674,6 +690,10 @@ default_expected_sinks(UpgradeFile) -> {rabbit_log_mirroring_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]}, + {rabbit_log_osiris_lager_event, + [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, + {rabbit_handlers, + [{lager_forwarder_backend,[lager_event,info]}]}]}, {rabbit_log_prelaunch_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]}, @@ -761,6 +781,10 @@ tty_expected_sinks() -> {rabbit_log_mirroring_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]}, + {rabbit_log_osiris_lager_event, + [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, + {rabbit_handlers, + [{lager_forwarder_backend,[lager_event,info]}]}]}, {rabbit_log_prelaunch_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,info]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]}, |
