summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-09-29 11:43:24 +0100
committerkjnilsson <knilsson@pivotal.io>2020-09-30 14:52:53 +0100
commitf20fa273e99e6dcd925f8cd5988b6385b7fc0b6a (patch)
tree18bea003bb781196698622bb20767477267d7f7b /test
parentbdb6f9b508dd1aaad5e618d9a620adb7972e618f (diff)
downloadrabbitmq-server-git-f20fa273e99e6dcd925f8cd5988b6385b7fc0b6a.tar.gz
Stream Queue
This is an aggregated commit of all changes related to the initial implementation of queue types and on top of that the stream queue type. The varios commit messages have simply been included mostly un-edited below. Make rabbit_amqqueue:not_found_or_absent_dirty/1 visible For use in the stream plugin. Use bigger retention policy on max-age test Set coordinator timeout to 30s Handle coordinator unavailable error Handle operator policies as maps when checking if is applicable Add is_policy_applicable/2 to classic queues Ignore restart commands if the stream has been deleted It could happen that after termination some of the monitors are still up and trigger writer/replica restarts Policy support on stream queues Remove subscription events on stream coordinator Ensure old leaders are removed from monitors Introduce delay when retrying a failed phase Note that this ensures monitor is setup, there was a bug where no monitor was really started when re-trying the same phase Restart replicas after leader election instead of relying on old monitors Use timer for stream coordinator retries Fix stream stats for members/online Multiple fixes for replica monitoring and restart Ensure pending commands are appended at the end and re-run Ensure phase is reset with the state Remove duplicates from replica list Restart current phase on state_enter Remove unused import Ensure rabbit is running when checking for stream quorum Restart replicas Add a close/1 function to queue types So that we can get a chance of cleaning up resources if needed. Stream queues close their osiris logs at this point. fix compiler errors stream-queue: take retention into account When calculating ready messages metrics. Add osiris to the list of rabbit deps Retry restart of replicas Do not restart replicas or leaders after receiving a delete cluster command Add more logging to the stream coordinator Monitor subscribed processes on the stream coordinator Memory breakdown for stream queues Update quorum queue event formatter rabbit_msg_record fixes Refactor channel confirms Remove old unconfirmed_messages module that was designed to handle multiple queue fan in logic including all ha mirrors etc. Replaced with simpler rabbit_confirms module that handles the fan out and leaves any queue specific logic (such as confirms from mirrors) to the queue type implemention. Also this module has a dedicated test module. Which is nice. Backward compatibility with 3.8.x events Supports mixed version cluster upgrades Match specification when stream queue already exists Max age retention for stream queues Stop all replicas before starting leader election stream: disallow global qos remove IS_CLASSIC|QUORUM macros Ensure only classic queues are notified on channel down This also removes the delivering_queues map in the channel state as it should not be needed for this and just cause additional unecessary accounting. Polish AMQP 1.0/0.9.1 properties conversion Support byte in application properties, handle 1-bit representation for booleans. Use binary in header for long AMQP 1.0 ID Fix AMQP 1.0 to 0.9.1 conversion Fix test due to incorrect type Convert timestamp application properties to/from seconds AMQP 1.0 uses milliseconds for timestamp and AMQP 0.9.1 uses seconds, so conversion needed. Dialyzer fixes Handle all message-id types AMQP 1.0 is more liberal in it's allowed types of message-id and correlation-id - this adds headers to describe the type of the data in the message_id / correlation_id properties and also handles the case where the data cannot fit by again using headers. Resize stream coordinator cluster when broker configuration changes convert timestamp to and fro seconds user_id should be a binary message annotations keys need to be symbols stream-queue: default exchange and routing key As these won't be present for data written using the rabbitmq-stream plugin. Add exchange, routing key as message annotations To the AMQP 1.0 formatted data to enable roundtrip. Add osiris logging module config And update logging config test suite. Restart election when start of new leader fails The node might have just gone down so we need to try another one Only aux keeps track of phase now, as it might change if the leader election fails Stream coordinator refactor - all state is kept on the ra machine Ensure any ra cluster not a qq is not cleaned up Fixes to recovery and monitoring Add AMQP 1.0 common to dependencies Add rabbit_msg_record module To handle conversions into internal stream storage format. Use rabbitmq-common stream-queue branch Use SSH for osiris dependency Stream coordinator: delete replica Stream coordinator: add replica Stream coordinator: leader failover Stream coordinator: declare and delete Test consuming from a random offset Previous offsets should not be delivered to consumers Consume from stream replicas and multiple test fixes Use max-length-bytes and add new max-segment-size Use SSH for osiris dependency Basic cancel for stream queues Publish stream queues and settle/reject/requeue refactor Consume from stream queues Fix recovery Publish stream messages Add/delete stream replicas Use safe queue names Set retention policy for stream queues Required by the ctl command [#171207092] Stream queue delete queue fix missing callback impl Stream queue declare Queue type abstraction And use the implementing module as the value of the amqqueue record `type` field. This will allow for easy dispatch to the queue type implementation. Queue type abstraction Move queue declare into rabbit_queue_type Move queue delete into queue type implementation Queue type: dequeue/basic_get Move info inside queue type abstraction Move policy change into queue type interface Add purge to queue type Add recovery to the queue type interface Rename amqqueue quorum_nodes field To a more generic an extensible opaque queue type specific map. Fix tests and handle classic API response Fix HA queue confirm bug All mirrors need to be present as queue names. This introduces context linking allowing additional queue refs to be linked to a single "master" queue ref contining the actual queue context. Fix issue with events of deleted queues Also update queue type smoke test to use a cluster by default. correct default value of amqqueue getter Move classic queues further inside queue type interface why [TrackerId] Dialyzer fixes
Diffstat (limited to 'test')
-rw-r--r--test/backing_queue_SUITE.erl77
-rw-r--r--test/channel_operation_timeout_SUITE.erl3
-rw-r--r--test/confirms_rejects_SUITE.erl17
-rw-r--r--test/dead_lettering_SUITE.erl6
-rw-r--r--test/dynamic_ha_SUITE.erl8
-rw-r--r--test/queue_parallel_SUITE.erl21
-rw-r--r--test/queue_type_SUITE.erl234
-rw-r--r--test/quorum_queue_SUITE.erl83
-rw-r--r--test/quorum_queue_utils.erl9
-rw-r--r--test/rabbit_confirms_SUITE.erl154
-rw-r--r--test/rabbit_fifo_SUITE.erl2
-rw-r--r--test/rabbit_fifo_int_SUITE.erl186
-rw-r--r--test/rabbit_ha_test_consumer.erl7
-rw-r--r--test/rabbit_msg_record_SUITE.erl213
-rw-r--r--test/rabbit_stream_queue_SUITE.erl1304
-rw-r--r--test/simple_ha_SUITE.erl4
-rw-r--r--test/unit_log_config_SUITE.erl24
17 files changed, 2193 insertions, 159 deletions
diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl
index 2025576a57..ff37e1fb04 100644
--- a/test/backing_queue_SUITE.erl
+++ b/test/backing_queue_SUITE.erl
@@ -684,17 +684,17 @@ bq_variable_queue_delete_msg_store_files_callback1(Config) ->
QPid = amqqueue:get_pid(Q),
Payload = <<0:8388608>>, %% 1MB
Count = 30,
- publish_and_confirm(Q, Payload, Count),
+ QTState = publish_and_confirm(Q, Payload, Count),
rabbit_amqqueue:set_ram_duration_target(QPid, 0),
{ok, Limiter} = rabbit_limiter:start_link(no_id),
CountMinusOne = Count - 1,
- {ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} =
- rabbit_amqqueue:basic_get(Q, self(), true, Limiter,
+ {ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}, _} =
+ rabbit_amqqueue:basic_get(Q, true, Limiter,
<<"bq_variable_queue_delete_msg_store_files_callback1">>,
- #{}),
+ QTState),
{ok, CountMinusOne} = rabbit_amqqueue:purge(Q),
%% give the queue a second to receive the close_fds callback msg
@@ -713,8 +713,7 @@ bq_queue_recover1(Config) ->
{new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>),
QName = amqqueue:get_name(Q),
QPid = amqqueue:get_pid(Q),
- publish_and_confirm(Q, <<>>, Count),
-
+ QT = publish_and_confirm(Q, <<>>, Count),
SupPid = get_queue_sup_pid(Q),
true = is_pid(SupPid),
exit(SupPid, kill),
@@ -724,7 +723,7 @@ bq_queue_recover1(Config) ->
after 10000 -> exit(timeout_waiting_for_queue_death)
end,
rabbit_amqqueue:stop(?VHOST),
- {Recovered, [], []} = rabbit_amqqueue:recover(?VHOST),
+ {Recovered, []} = rabbit_amqqueue:recover(?VHOST),
rabbit_amqqueue:start(Recovered),
{ok, Limiter} = rabbit_limiter:start_link(no_id),
rabbit_amqqueue:with_or_die(
@@ -732,9 +731,9 @@ bq_queue_recover1(Config) ->
fun (Q1) when ?is_amqqueue(Q1) ->
QPid1 = amqqueue:get_pid(Q1),
CountMinusOne = Count - 1,
- {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
- rabbit_amqqueue:basic_get(Q1, self(), false, Limiter,
- <<"bq_queue_recover1">>, #{}),
+ {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}, _} =
+ rabbit_amqqueue:basic_get(Q1, false, Limiter,
+ <<"bq_queue_recover1">>, QT),
exit(QPid1, shutdown),
VQ1 = variable_queue_init(Q, true),
{{_Msg1, true, _AckTag1}, VQ2} =
@@ -1366,25 +1365,34 @@ variable_queue_init(Q, Recover) ->
publish_and_confirm(Q, Payload, Count) ->
Seqs = lists:seq(1, Count),
- [begin
- Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
- <<>>, #'P_basic'{delivery_mode = 2},
- Payload),
- Delivery = #delivery{mandatory = false, sender = self(),
- confirm = true, message = Msg, msg_seq_no = Seq,
- flow = noflow},
- _QPids = rabbit_amqqueue:deliver([Q], Delivery)
- end || Seq <- Seqs],
- wait_for_confirms(gb_sets:from_list(Seqs)).
+ QTState0 = rabbit_queue_type:new(Q, rabbit_queue_type:init()),
+ QTState =
+ lists:foldl(
+ fun (Seq, Acc0) ->
+ Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
+ <<>>, #'P_basic'{delivery_mode = 2},
+ Payload),
+ Delivery = #delivery{mandatory = false, sender = self(),
+ confirm = true, message = Msg, msg_seq_no = Seq,
+ flow = noflow},
+ {ok, Acc, _Actions} = rabbit_queue_type:deliver([Q], Delivery, Acc0),
+ Acc
+ end, QTState0, Seqs),
+ wait_for_confirms(gb_sets:from_list(Seqs)),
+ QTState.
wait_for_confirms(Unconfirmed) ->
case gb_sets:is_empty(Unconfirmed) of
true -> ok;
- false -> receive {'$gen_cast', {confirm, Confirmed, _}} ->
+ false -> receive {'$gen_cast',
+ {queue_event, _QName,
+ {confirm, Confirmed, _}}} ->
wait_for_confirms(
rabbit_misc:gb_sets_difference(
Unconfirmed, gb_sets:from_list(Confirmed)))
- after ?TIMEOUT -> exit(timeout_waiting_for_confirm)
+ after ?TIMEOUT ->
+ flush(),
+ exit(timeout_waiting_for_confirm)
end
end.
@@ -1436,6 +1444,7 @@ variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) ->
variable_queue_wait_for_shuffling_end(
lists:foldl(
fun (N, VQN) ->
+
rabbit_variable_queue:publish(
rabbit_basic:message(
rabbit_misc:r(<<>>, exchange, <<>>),
@@ -1526,12 +1535,13 @@ variable_queue_status(VQ) ->
variable_queue_wait_for_shuffling_end(VQ) ->
case credit_flow:blocked() of
false -> VQ;
- true -> receive
- {bump_credit, Msg} ->
- credit_flow:handle_bump_msg(Msg),
- variable_queue_wait_for_shuffling_end(
- rabbit_variable_queue:resume(VQ))
- end
+ true ->
+ receive
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg),
+ variable_queue_wait_for_shuffling_end(
+ rabbit_variable_queue:resume(VQ))
+ end
end.
msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) ->
@@ -1576,11 +1586,13 @@ variable_queue_with_holes(VQ0) ->
fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ7),
%% assertions
Status = variable_queue_status(VQ8),
+
vq_with_holes_assertions(VQ8, proplists:get_value(mode, Status)),
Depth = Count + Interval,
Depth = rabbit_variable_queue:depth(VQ8),
Len = Depth - length(Subset3),
Len = rabbit_variable_queue:len(VQ8),
+
{Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + Interval), VQ8}.
vq_with_holes_assertions(VQ, default) ->
@@ -1604,3 +1616,12 @@ check_variable_queue_status(VQ0, Props) ->
S = variable_queue_status(VQ1),
assert_props(S, Props),
VQ1.
+
+flush() ->
+ receive
+ Any ->
+ ct:pal("flush ~p", [Any]),
+ flush()
+ after 0 ->
+ ok
+ end.
diff --git a/test/channel_operation_timeout_SUITE.erl b/test/channel_operation_timeout_SUITE.erl
index f8da35d6ff..15e0188604 100644
--- a/test/channel_operation_timeout_SUITE.erl
+++ b/test/channel_operation_timeout_SUITE.erl
@@ -71,11 +71,13 @@ notify_down_all(Config) ->
RabbitCh = rabbit_ct_client_helpers:open_channel(Config, 0),
HareCh = rabbit_ct_client_helpers:open_channel(Config, 1),
+ ct:pal("one"),
%% success
set_channel_operation_timeout_config(Config, 1000),
configure_bq(Config),
QCfg0 = qconfig(RabbitCh, <<"q0">>, <<"ex0">>, true, false),
declare(QCfg0),
+ ct:pal("two"),
%% Testing rabbit_amqqueue:notify_down_all via rabbit_channel.
%% Consumer count = 0 after correct channel termination and
%% notification of queues via delegate:call/3
@@ -83,6 +85,7 @@ notify_down_all(Config) ->
rabbit_ct_client_helpers:close_channel(RabbitCh),
0 = length(get_consumers(Config, Rabbit, ?DEFAULT_VHOST)),
false = is_process_alive(RabbitCh),
+ ct:pal("three"),
%% fail
set_channel_operation_timeout_config(Config, 10),
diff --git a/test/confirms_rejects_SUITE.erl b/test/confirms_rejects_SUITE.erl
index aaaeb4a939..a51253885c 100644
--- a/test/confirms_rejects_SUITE.erl
+++ b/test/confirms_rejects_SUITE.erl
@@ -388,9 +388,12 @@ kill_the_queue(QueueName) ->
[begin
{ok, Q} = rabbit_amqqueue:lookup({resource, <<"/">>, queue, QueueName}),
Pid = amqqueue:get_pid(Q),
+ ct:pal("~w killed", [Pid]),
+ timer:sleep(1),
exit(Pid, kill)
end
- || _ <- lists:seq(1, 11)],
+ || _ <- lists:seq(1, 50)],
+ timer:sleep(1),
{ok, Q} = rabbit_amqqueue:lookup({resource, <<"/">>, queue, QueueName}),
Pid = amqqueue:get_pid(Q),
case is_process_alive(Pid) of
@@ -399,7 +402,11 @@ kill_the_queue(QueueName) ->
false -> ok
end.
-
-
-
-
+flush() ->
+ receive
+ Any ->
+ ct:pal("flush ~p", [Any]),
+ flush()
+ after 0 ->
+ ok
+ end.
diff --git a/test/dead_lettering_SUITE.erl b/test/dead_lettering_SUITE.erl
index 87b5566c57..4ee917aa21 100644
--- a/test/dead_lettering_SUITE.erl
+++ b/test/dead_lettering_SUITE.erl
@@ -1059,9 +1059,11 @@ dead_letter_headers_BCC(Config) ->
?assertMatch({array, _}, rabbit_misc:table_lookup(Headers3, <<"x-death">>)).
-%% Three top-level headers are added for the very first dead-lettering event. They are
+%% Three top-level headers are added for the very first dead-lettering event.
+%% They are
%% x-first-death-reason, x-first-death-queue, x-first-death-exchange
-%% They have the same values as the reason, queue, and exchange fields of the original
+%% They have the same values as the reason, queue, and exchange fields of the
+%% original
%% dead lettering event. Once added, these headers are never modified.
dead_letter_headers_first_death(Config) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
index 25027c7ef9..c881aef8a1 100644
--- a/test/dynamic_ha_SUITE.erl
+++ b/test/dynamic_ha_SUITE.erl
@@ -424,8 +424,7 @@ nodes_policy_should_pick_master_from_its_params(Config) ->
nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, A),
- ?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A],
- [all])),
+ ?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A], [all])),
%% --> Master: A
%% Slaves: [B, C] or [C, B]
SSPids = ?awaitMatch(SSPids when is_list(SSPids),
@@ -450,7 +449,7 @@ nodes_policy_should_pick_master_from_its_params(Config) ->
%% should instead use an existing synchronised mirror as the new master,
%% even though that isn't in the policy.
?assertEqual(true, apply_policy_to_declared_queue(Config, Ch, [A],
- [{nodes, [LastSlave, A]}])),
+ [{nodes, [LastSlave, A]}])),
%% --> Master: B or C (same as previous policy)
%% Slaves: [A]
@@ -931,6 +930,7 @@ apply_in_parallel(Config, Nodes, Policies) ->
Self = self(),
[spawn_link(fun() ->
[begin
+
apply_policy(Config, N, Policy)
end || Policy <- Policies],
Self ! parallel_task_done
@@ -969,7 +969,7 @@ wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries) ->
%% Let's wait a bit longer.
timer:sleep(1000),
wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries - 1);
- FinalInfo ->
+ {ok, FinalInfo} ->
%% The last policy is the final state
LastPolicy = lists:last(TestedPolicies),
case verify_policy(LastPolicy, FinalInfo) of
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl
index c4d16a5900..0fbf7ec975 100644
--- a/test/queue_parallel_SUITE.erl
+++ b/test/queue_parallel_SUITE.erl
@@ -57,7 +57,8 @@ groups() ->
trigger_message_store_compaction]},
{quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
{quorum_queue_in_memory_limit, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
- {quorum_queue_in_memory_bytes, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}
+ {quorum_queue_in_memory_bytes, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
+ {stream_queue, [parallel], AllTests}
]}
].
@@ -122,13 +123,24 @@ init_per_group(mirrored_queue, Config) ->
{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
{queue_durable, true}]),
rabbit_ct_helpers:run_steps(Config1, []);
+init_per_group(stream_queue, Config) ->
+ case rabbit_ct_broker_helpers:enable_feature_flag(Config, stream_queue) of
+ ok ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"stream">>}]},
+ {queue_durable, true}]);
+ Skip ->
+ Skip
+ end;
init_per_group(Group, Config0) ->
case lists:member({group, Group}, all()) of
true ->
ClusterSize = 3,
Config = rabbit_ct_helpers:merge_app_env(
Config0, {rabbit, [{channel_tick_interval, 1000},
- {quorum_tick_interval, 1000}]}),
+ {quorum_tick_interval, 1000},
+ {stream_tick_interval, 1000}]}),
Config1 = rabbit_ct_helpers:set_config(
Config, [ {rmq_nodename_suffix, Group},
{rmq_nodes_count, ClusterSize}
@@ -514,6 +526,11 @@ basic_cancel(Config) ->
publish(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
CTag = atom_to_binary(?FUNCTION_NAME, utf8),
+
+ %% Let's set consumer prefetch so it works with stream queues
+ ?assertMatch(#'basic.qos_ok'{},
+ amqp_channel:call(Ch, #'basic.qos'{global = false,
+ prefetch_count = 1})),
subscribe(Ch, QName, false, CTag),
receive
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
diff --git a/test/queue_type_SUITE.erl b/test/queue_type_SUITE.erl
new file mode 100644
index 0000000000..eeeabc3d1e
--- /dev/null
+++ b/test/queue_type_SUITE.erl
@@ -0,0 +1,234 @@
+-module(queue_type_SUITE).
+
+-compile(export_all).
+
+-export([
+ ]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+%%%===================================================================
+%%% Common Test callbacks
+%%%===================================================================
+
+all() ->
+ [
+ {group, classic},
+ {group, quorum}
+ ].
+
+
+all_tests() ->
+ [
+ smoke
+ ].
+
+groups() ->
+ [
+ {classic, [], all_tests()},
+ {quorum, [], all_tests()}
+ ].
+
+init_per_suite(Config0) ->
+ rabbit_ct_helpers:log_environment(),
+ Config = rabbit_ct_helpers:merge_app_env(
+ Config0, {rabbit, [{quorum_tick_interval, 1000}]}),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config),
+ ok.
+
+init_per_group(Group, Config) ->
+ ClusterSize = 3,
+ Config1 = rabbit_ct_helpers:set_config(Config,
+ [{rmq_nodes_count, ClusterSize},
+ {rmq_nodename_suffix, Group},
+ {tcp_ports_base}]),
+ Config1b = rabbit_ct_helpers:set_config(Config1,
+ [{queue_type, atom_to_binary(Group, utf8)},
+ {net_ticktime, 10}]),
+ Config2 = rabbit_ct_helpers:run_steps(Config1b,
+ [fun merge_app_env/1 ] ++
+ rabbit_ct_broker_helpers:setup_steps()),
+ Config3 =
+ case rabbit_ct_broker_helpers:enable_feature_flag(Config2, quorum_queue) of
+ ok ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config2, 0, application, set_env,
+ [rabbit, channel_tick_interval, 100]),
+ %% HACK: the larger cluster sizes benefit for a bit more time
+ %% after clustering before running the tests.
+ case Group of
+ cluster_size_5 ->
+ timer:sleep(5000),
+ Config2;
+ _ ->
+ Config2
+ end;
+ Skip ->
+ end_per_group(Group, Config2),
+ Skip
+ end,
+ rabbit_ct_broker_helpers:set_policy(
+ Config3, 0,
+ <<"ha-policy">>, <<".*">>, <<"queues">>,
+ [{<<"ha-mode">>, <<"all">>}]),
+ Config3.
+
+merge_app_env(Config) ->
+ rabbit_ct_helpers:merge_app_env(
+ rabbit_ct_helpers:merge_app_env(Config,
+ {rabbit,
+ [{core_metrics_gc_interval, 100},
+ {log, [{file, [{level, debug}]}]}]}),
+ {ra, [{min_wal_roll_over_interval, 30000}]}).
+
+end_per_group(_Group, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
+ rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
+ Q = rabbit_data_coercion:to_binary(Testcase),
+ Config2 = rabbit_ct_helpers:set_config(Config1,
+ [{queue_name, Q},
+ {alt_queue_name, <<Q/binary, "_alt">>}
+ ]),
+ rabbit_ct_helpers:run_steps(Config2,
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config) ->
+ catch delete_queues(),
+ Config1 = rabbit_ct_helpers:run_steps(
+ Config,
+ rabbit_ct_client_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%%%===================================================================
+%%% Test cases
+%%%===================================================================
+
+smoke(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QName = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QName, 0, 0},
+ declare(Ch, QName, [{<<"x-queue-type">>, longstr,
+ ?config(queue_type, Config)}])),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ publish(Ch, QName, <<"msg1">>),
+ ct:pal("waiting for confirms from ~s", [QName]),
+ ok = receive
+ #'basic.ack'{} -> ok;
+ #'basic.nack'{} -> fail
+ after 2500 ->
+ flush(),
+ exit(confirm_timeout)
+ end,
+ DTag = basic_get(Ch, QName),
+
+ basic_ack(Ch, DTag),
+ basic_get_empty(Ch, QName),
+
+ %% consume
+ publish(Ch, QName, <<"msg2">>),
+ ConsumerTag1 = <<"ctag1">>,
+ ok = subscribe(Ch, QName, ConsumerTag1),
+ %% receive and ack
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false},
+ #amqp_msg{}} ->
+ basic_ack(Ch, DeliveryTag)
+ after 5000 ->
+ flush(),
+ exit(basic_deliver_timeout)
+ end,
+ basic_cancel(Ch, ConsumerTag1),
+
+ %% assert empty
+ basic_get_empty(Ch, QName),
+
+ %% consume and nack
+ ConsumerTag2 = <<"ctag2">>,
+ ok = subscribe(Ch, QName, ConsumerTag2),
+ publish(Ch, QName, <<"msg3">>),
+ receive
+ {#'basic.deliver'{delivery_tag = T,
+ redelivered = false},
+ #amqp_msg{}} ->
+ basic_cancel(Ch, ConsumerTag2),
+ basic_nack(Ch, T)
+ after 5000 ->
+ exit(basic_deliver_timeout)
+ end,
+ %% get and ack
+ basic_ack(Ch, basic_get(Ch, QName)),
+ ok.
+
+%% Utility
+delete_queues() ->
+ [rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
+ || Q <- rabbit_amqqueue:list()].
+
+declare(Ch, Q, Args) ->
+ amqp_channel:call(Ch, #'queue.declare'{queue = Q,
+ durable = true,
+ auto_delete = false,
+ arguments = Args}).
+
+publish(Ch, Queue, Msg) ->
+ ok = amqp_channel:cast(Ch,
+ #'basic.publish'{routing_key = Queue},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = Msg}).
+
+basic_get(Ch, Queue) ->
+ {GetOk, _} = Reply = amqp_channel:call(Ch, #'basic.get'{queue = Queue,
+ no_ack = false}),
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{}}, Reply),
+ GetOk#'basic.get_ok'.delivery_tag.
+
+basic_get_empty(Ch, Queue) ->
+ ?assertMatch(#'basic.get_empty'{},
+ amqp_channel:call(Ch, #'basic.get'{queue = Queue,
+ no_ack = false})).
+
+subscribe(Ch, Queue, CTag) ->
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
+ no_ack = false,
+ consumer_tag = CTag},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = CTag} ->
+ ok
+ after 5000 ->
+ exit(basic_consume_timeout)
+ end.
+
+basic_ack(Ch, DTag) ->
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag,
+ multiple = false}).
+
+basic_cancel(Ch, CTag) ->
+ #'basic.cancel_ok'{} =
+ amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}).
+
+basic_nack(Ch, DTag) ->
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag,
+ requeue = true,
+ multiple = false}).
+
+flush() ->
+ receive
+ Any ->
+ ct:pal("flush ~p", [Any]),
+ flush()
+ after 0 ->
+ ok
+ end.
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index ecb4fdac63..16042b71e8 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -383,13 +383,19 @@ start_queue(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
LQ = ?config(queue_name, Config),
+ %% The stream coordinator is also a ra process, we need to ensure the quorum tests
+ %% are not affected by any other ra cluster that could be added in the future
+ Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+
?assertEqual({'queue.declare_ok', LQ, 0, 0},
declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
%% Check that the application and one ra node are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+ Expected = Children + 1,
+ ?assertMatch(Expected,
+ length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))),
%% Test declare an existing queue
?assertEqual({'queue.declare_ok', LQ, 0, 0},
@@ -405,7 +411,8 @@ start_queue(Config) ->
%% Check that the application and process are still up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])).
+ ?assertMatch(Expected,
+ length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))).
start_queue_concurrent(Config) ->
Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -463,6 +470,10 @@ quorum_cluster_size_x(Config, Max, Expected) ->
stop_queue(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ %% The stream coordinator is also a ra process, we need to ensure the quorum tests
+ %% are not affected by any other ra cluster that could be added in the future
+ Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
LQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', LQ, 0, 0},
@@ -471,13 +482,15 @@ stop_queue(Config) ->
%% Check that the application and one ra node are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+ Expected = Children + 1,
+ ?assertMatch(Expected,
+ length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))),
%% Delete the quorum queue
?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = LQ})),
%% Check that the application and process are down
wait_until(fun() ->
- [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
+ Children == length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))
end),
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))).
@@ -485,6 +498,10 @@ stop_queue(Config) ->
restart_queue(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ %% The stream coordinator is also a ra process, we need to ensure the quorum tests
+ %% are not affected by any other ra cluster that could be added in the future
+ Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
LQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', LQ, 0, 0},
@@ -496,7 +513,9 @@ restart_queue(Config) ->
%% Check that the application and one ra node are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])).
+ Expected = Children + 1,
+ ?assertMatch(Expected,
+ length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))).
idempotent_recover(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
@@ -554,6 +573,10 @@ restart_all_types(Config) ->
%% ensure there are no regressions
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ %% The stream coordinator is also a ra process, we need to ensure the quorum tests
+ %% are not affected by any other ra cluster that could be added in the future
+ Children = rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]),
+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ1 = <<"restart_all_types-qq1">>,
?assertEqual({'queue.declare_ok', QQ1, 0, 0},
@@ -575,7 +598,9 @@ restart_all_types(Config) ->
%% Check that the application and two ra nodes are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+ Expected = length(Children) + 2,
+ Got = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+ ?assertMatch(Expected, Got),
%% Check the classic queues restarted correctly
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
{#'basic.get_ok'{}, #amqp_msg{}} =
@@ -592,6 +617,10 @@ stop_start_rabbit_app(Config) ->
%% classic) to ensure there are no regressions
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ %% The stream coordinator is also a ra process, we need to ensure the quorum tests
+ %% are not affected by any other ra cluster that could be added in the future
+ Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ1 = <<"stop_start_rabbit_app-qq">>,
?assertEqual({'queue.declare_ok', QQ1, 0, 0},
@@ -617,7 +646,9 @@ stop_start_rabbit_app(Config) ->
%% Check that the application and two ra nodes are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+ Expected = Children + 2,
+ ?assertMatch(Expected,
+ length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))),
%% Check the classic queues restarted correctly
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
{#'basic.get_ok'{}, #amqp_msg{}} =
@@ -935,6 +966,10 @@ cleanup_queue_state_on_channel_after_publish(Config) ->
%% to verify that the cleanup is propagated through channels
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ %% The stream coordinator is also a ra process, we need to ensure the quorum tests
+ %% are not affected by any other ra cluster that could be added in the future
+ Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
@@ -955,18 +990,22 @@ cleanup_queue_state_on_channel_after_publish(Config) ->
?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
wait_until(fun() ->
- [] == rpc:call(Server, supervisor, which_children,
- [ra_server_sup_sup])
+ Children == length(rpc:call(Server, supervisor, which_children,
+ [ra_server_sup_sup]))
end),
%% Check that all queue states have been cleaned
- wait_for_cleanup(Server, NCh1, 0),
- wait_for_cleanup(Server, NCh2, 0).
+ wait_for_cleanup(Server, NCh2, 0),
+ wait_for_cleanup(Server, NCh1, 0).
cleanup_queue_state_on_channel_after_subscribe(Config) ->
%% Declare/delete the queue and publish in one channel, while consuming on a
%% different one to verify that the cleanup is propagated through channels
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ %% The stream coordinator is also a ra process, we need to ensure the quorum tests
+ %% are not affected by any other ra cluster that could be added in the future
+ Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
@@ -993,7 +1032,7 @@ cleanup_queue_state_on_channel_after_subscribe(Config) ->
wait_for_cleanup(Server, NCh2, 1),
?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
wait_until(fun() ->
- [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
+ Children == length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))
end),
%% Check that all queue states have been cleaned
wait_for_cleanup(Server, NCh1, 0),
@@ -1596,8 +1635,8 @@ cleanup_data_dir(Config) ->
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
timer:sleep(100),
- [{_, UId1}] = rpc:call(Server1, ra_directory, list_registered, []),
- [{_, UId2}] = rpc:call(Server2, ra_directory, list_registered, []),
+ UId1 = proplists:get_value(ra_name(QQ), rpc:call(Server1, ra_directory, list_registered, [])),
+ UId2 = proplists:get_value(ra_name(QQ), rpc:call(Server2, ra_directory, list_registered, [])),
DataDir1 = rpc:call(Server1, ra_env, server_data_dir, [UId1]),
DataDir2 = rpc:call(Server2, ra_env, server_data_dir, [UId2]),
?assert(filelib:is_dir(DataDir1)),
@@ -1748,6 +1787,11 @@ reconnect_consumer_and_wait_channel_down(Config) ->
delete_immediately_by_resource(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+
+ %% The stream coordinator is also a ra process, we need to ensure the quorum tests
+ %% are not affected by any other ra cluster that could be added in the future
+ Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
@@ -1756,7 +1800,7 @@ delete_immediately_by_resource(Config) ->
%% Check that the application and process are down
wait_until(fun() ->
- [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
+ Children == length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))
end),
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))).
@@ -1784,6 +1828,8 @@ subscribe_redelivery_count(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
requeue = true})
+ after 5000 ->
+ exit(basic_deliver_timeout)
end,
receive
@@ -1794,6 +1840,8 @@ subscribe_redelivery_count(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
multiple = false,
requeue = true})
+ after 5000 ->
+ exit(basic_deliver_timeout_2)
end,
receive
@@ -1803,8 +1851,13 @@ subscribe_redelivery_count(Config) ->
?assertMatch({DCHeader, _, 2}, rabbit_basic:header(DCHeader, H2)),
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2,
multiple = false}),
+ ct:pal("wait_for_messages_ready", []),
wait_for_messages_ready(Servers, RaName, 0),
+ ct:pal("wait_for_messages_pending_ack", []),
wait_for_messages_pending_ack(Servers, RaName, 0)
+ after 5000 ->
+ flush(500),
+ exit(basic_deliver_timeout_3)
end.
subscribe_redelivery_limit(Config) ->
diff --git a/test/quorum_queue_utils.erl b/test/quorum_queue_utils.erl
index 95ddc892f1..caabd617ae 100644
--- a/test/quorum_queue_utils.erl
+++ b/test/quorum_queue_utils.erl
@@ -28,15 +28,10 @@ wait_for_messages_total(Servers, QName, Total) ->
wait_for_messages(Servers, QName, Number, Fun, 0) ->
Msgs = dirty_query(Servers, QName, Fun),
- Totals = lists:map(fun(M) when is_map(M) ->
- maps:size(M);
- (_) ->
- -1
- end, Msgs),
- ?assertEqual(Totals, [Number || _ <- lists:seq(1, length(Servers))]);
+ ?assertEqual(Msgs, [Number || _ <- lists:seq(1, length(Servers))]);
wait_for_messages(Servers, QName, Number, Fun, N) ->
Msgs = dirty_query(Servers, QName, Fun),
- ct:pal("Got messages ~p", [Msgs]),
+ ct:pal("Got messages ~p ~p", [QName, Msgs]),
%% hack to allow the check to succeed in mixed versions clusters if at
%% least one node matches the criteria rather than all nodes for
F = case is_mixed_versions() of
diff --git a/test/rabbit_confirms_SUITE.erl b/test/rabbit_confirms_SUITE.erl
new file mode 100644
index 0000000000..331c3ca7c3
--- /dev/null
+++ b/test/rabbit_confirms_SUITE.erl
@@ -0,0 +1,154 @@
+-module(rabbit_confirms_SUITE).
+
+-compile(export_all).
+
+-export([
+ ]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+%%%===================================================================
+%%% Common Test callbacks
+%%%===================================================================
+
+all() ->
+ [
+ {group, tests}
+ ].
+
+
+all_tests() ->
+ [
+ confirm,
+ reject,
+ remove_queue
+ ].
+
+groups() ->
+ [
+ {tests, [], all_tests()}
+ ].
+
+init_per_suite(Config) ->
+ Config.
+
+end_per_suite(_Config) ->
+ ok.
+
+init_per_group(_Group, Config) ->
+ Config.
+
+end_per_group(_Group, _Config) ->
+ ok.
+
+init_per_testcase(_TestCase, Config) ->
+ Config.
+
+end_per_testcase(_TestCase, _Config) ->
+ ok.
+
+%%%===================================================================
+%%% Test cases
+%%%===================================================================
+
+confirm(_Config) ->
+ XName = rabbit_misc:r(<<"/">>, exchange, <<"X">>),
+ QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>),
+ QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>),
+ U0 = rabbit_confirms:init(),
+ ?assertEqual(0, rabbit_confirms:size(U0)),
+ ?assertEqual(undefined, rabbit_confirms:smallest(U0)),
+ ?assertEqual(true, rabbit_confirms:is_empty(U0)),
+
+ U1 = rabbit_confirms:insert(1, [QName], XName, U0),
+ ?assertEqual(1, rabbit_confirms:size(U1)),
+ ?assertEqual(1, rabbit_confirms:smallest(U1)),
+ ?assertEqual(false, rabbit_confirms:is_empty(U1)),
+
+ {[{1, XName}], U2} = rabbit_confirms:confirm([1], QName, U1),
+ ?assertEqual(0, rabbit_confirms:size(U2)),
+ ?assertEqual(undefined, rabbit_confirms:smallest(U2)),
+ ?assertEqual(true, rabbit_confirms:is_empty(U2)),
+
+ U3 = rabbit_confirms:insert(2, [QName], XName, U1),
+ ?assertEqual(2, rabbit_confirms:size(U3)),
+ ?assertEqual(1, rabbit_confirms:smallest(U3)),
+ ?assertEqual(false, rabbit_confirms:is_empty(U3)),
+
+ {[{1, XName}], U4} = rabbit_confirms:confirm([1], QName, U3),
+ ?assertEqual(1, rabbit_confirms:size(U4)),
+ ?assertEqual(2, rabbit_confirms:smallest(U4)),
+ ?assertEqual(false, rabbit_confirms:is_empty(U4)),
+
+ U5 = rabbit_confirms:insert(2, [QName, QName2], XName, U1),
+ ?assertEqual(2, rabbit_confirms:size(U5)),
+ ?assertEqual(1, rabbit_confirms:smallest(U5)),
+ ?assertEqual(false, rabbit_confirms:is_empty(U5)),
+
+ {[{1, XName}], U6} = rabbit_confirms:confirm([1, 2], QName, U5),
+ ?assertEqual(2, rabbit_confirms:smallest(U6)),
+
+ {[{2, XName}], U7} = rabbit_confirms:confirm([2], QName2, U6),
+ ?assertEqual(0, rabbit_confirms:size(U7)),
+ ?assertEqual(undefined, rabbit_confirms:smallest(U7)),
+
+
+ U8 = rabbit_confirms:insert(2, [QName], XName, U1),
+ {[{1, XName}, {2, XName}], _U9} = rabbit_confirms:confirm([1, 2], QName, U8),
+ ok.
+
+
+reject(_Config) ->
+ XName = rabbit_misc:r(<<"/">>, exchange, <<"X">>),
+ QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>),
+ QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>),
+ U0 = rabbit_confirms:init(),
+ ?assertEqual(0, rabbit_confirms:size(U0)),
+ ?assertEqual(undefined, rabbit_confirms:smallest(U0)),
+ ?assertEqual(true, rabbit_confirms:is_empty(U0)),
+
+ U1 = rabbit_confirms:insert(1, [QName], XName, U0),
+
+ {ok, {1, XName}, U2} = rabbit_confirms:reject(1, U1),
+ {error, not_found} = rabbit_confirms:reject(1, U2),
+ ?assertEqual(0, rabbit_confirms:size(U2)),
+ ?assertEqual(undefined, rabbit_confirms:smallest(U2)),
+
+ U3 = rabbit_confirms:insert(2, [QName, QName2], XName, U1),
+
+ {ok, {1, XName}, U4} = rabbit_confirms:reject(1, U3),
+ {error, not_found} = rabbit_confirms:reject(1, U4),
+ ?assertEqual(1, rabbit_confirms:size(U4)),
+ ?assertEqual(2, rabbit_confirms:smallest(U4)),
+
+ {ok, {2, XName}, U5} = rabbit_confirms:reject(2, U3),
+ {error, not_found} = rabbit_confirms:reject(2, U5),
+ ?assertEqual(1, rabbit_confirms:size(U5)),
+ ?assertEqual(1, rabbit_confirms:smallest(U5)),
+
+ ok.
+
+remove_queue(_Config) ->
+ XName = rabbit_misc:r(<<"/">>, exchange, <<"X">>),
+ QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>),
+ QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>),
+ U0 = rabbit_confirms:init(),
+
+ U1 = rabbit_confirms:insert(1, [QName, QName2], XName, U0),
+ U2 = rabbit_confirms:insert(2, [QName2], XName, U1),
+ {[{2, XName}], U3} = rabbit_confirms:remove_queue(QName2, U2),
+ ?assertEqual(1, rabbit_confirms:size(U3)),
+ ?assertEqual(1, rabbit_confirms:smallest(U3)),
+ {[{1, XName}], U4} = rabbit_confirms:remove_queue(QName, U3),
+ ?assertEqual(0, rabbit_confirms:size(U4)),
+ ?assertEqual(undefined, rabbit_confirms:smallest(U4)),
+
+ U5 = rabbit_confirms:insert(1, [QName], XName, U0),
+ U6 = rabbit_confirms:insert(2, [QName], XName, U5),
+ {[{1, XName}, {2, XName}], _U} = rabbit_confirms:remove_queue(QName, U6),
+
+ ok.
+
+
+%% Utility
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index d19dcb3682..7b90d91bfa 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -674,7 +674,7 @@ single_active_consumer_basic_get_test(_) ->
?assertEqual(single_active, State0#rabbit_fifo.cfg#cfg.consumer_strategy),
?assertEqual(0, map_size(State0#rabbit_fifo.consumers)),
{State1, _} = enq(1, 1, first, State0),
- {_State, {error, unsupported}} =
+ {_State, {error, {unsupported, single_active_consumer}}} =
apply(meta(2), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}),
State1),
ok.
diff --git a/test/rabbit_fifo_int_SUITE.erl b/test/rabbit_fifo_int_SUITE.erl
index b51975b062..b2ed7160a2 100644
--- a/test/rabbit_fifo_int_SUITE.erl
+++ b/test/rabbit_fifo_int_SUITE.erl
@@ -86,7 +86,7 @@ basics(Config) ->
CustomerTag = UId,
ok = start_cluster(ClusterName, [ServerId]),
FState0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, undefined, FState0),
+ {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, #{}, FState0),
ra_log_wal:force_roll_over(ra_log_wal),
% create segment the segment will trigger a snapshot
@@ -99,11 +99,10 @@ basics(Config) ->
FState5 = receive
{ra_event, From, Evt} ->
case rabbit_fifo_client:handle_ra_event(From, Evt, FState3) of
- {internal, _AcceptedSeqs, _Actions, _FState4} ->
- exit(unexpected_internal_event);
- {{delivery, C, [{MsgId, _Msg}]}, FState4} ->
- {ok, S} = rabbit_fifo_client:settle(C, [MsgId],
- FState4),
+ {ok, FState4,
+ [{deliver, C, true,
+ [{_Qname, _QRef, MsgId, _SomBool, _Msg}]}]} ->
+ {S, _A} = rabbit_fifo_client:settle(C, [MsgId], FState4),
S
end
after 5000 ->
@@ -129,10 +128,9 @@ basics(Config) ->
receive
{ra_event, Frm, E} ->
case rabbit_fifo_client:handle_ra_event(Frm, E, FState6b) of
- {internal, _, _, _FState7} ->
- exit({unexpected_internal_event, E});
- {{delivery, Ctag, [{Mid, {_, two}}]}, FState7} ->
- {ok, _S} = rabbit_fifo_client:return(Ctag, [Mid], FState7),
+ {ok, FState7, [{deliver, Ctag, true,
+ [{_, _, Mid, _, two}]}]} ->
+ {_, _} = rabbit_fifo_client:return(Ctag, [Mid], FState7),
ok
end
after 2000 ->
@@ -150,8 +148,8 @@ return(Config) ->
{ok, F0} = rabbit_fifo_client:enqueue(1, msg1, F00),
{ok, F1} = rabbit_fifo_client:enqueue(2, msg2, F0),
{_, _, F2} = process_ra_events(receive_ra_events(2, 0), F1),
- {ok, {{MsgId, _}, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2),
- {ok, _F2} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F),
+ {ok, _, {_, _, MsgId, _, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2),
+ _F2 = rabbit_fifo_client:return(<<"tag">>, [MsgId], F),
ra:stop_server(ServerId),
ok.
@@ -165,9 +163,9 @@ rabbit_fifo_returns_correlation(Config) ->
receive
{ra_event, Frm, E} ->
case rabbit_fifo_client:handle_ra_event(Frm, E, F1) of
- {internal, [corr1], [], _F2} ->
+ {ok, _F2, [{settled, _, _}]} ->
ok;
- {Del, _} ->
+ Del ->
exit({unexpected, Del})
end
after 2000 ->
@@ -181,23 +179,24 @@ duplicate_delivery(Config) ->
ServerId = ?config(node_id, Config),
ok = start_cluster(ClusterName, [ServerId]),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0),
{ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1),
Fun = fun Loop(S0) ->
receive
{ra_event, Frm, E} = Evt ->
case rabbit_fifo_client:handle_ra_event(Frm, E, S0) of
- {internal, [corr1], [], S1} ->
+ {ok, S1, [{settled, _, _}]} ->
Loop(S1);
- {_Del, S1} ->
+ {ok, S1, _} ->
%% repeat event delivery
self() ! Evt,
%% check that then next received delivery doesn't
%% repeat or crash
receive
{ra_event, F, E1} ->
- case rabbit_fifo_client:handle_ra_event(F, E1, S1) of
- {internal, [], [], S2} ->
+ case rabbit_fifo_client:handle_ra_event(
+ F, E1, S1) of
+ {ok, S2, _} ->
S2
end
end
@@ -215,7 +214,7 @@ usage(Config) ->
ServerId = ?config(node_id, Config),
ok = start_cluster(ClusterName, [ServerId]),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0),
{ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1),
{ok, F3} = rabbit_fifo_client:enqueue(corr2, msg2, F2),
{_, _, _} = process_ra_events(receive_ra_events(2, 2), F3),
@@ -242,9 +241,9 @@ resends_lost_command(Config) ->
meck:unload(ra),
{ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
{_, _, F4} = process_ra_events(receive_ra_events(2, 0), F3),
- {ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
- {ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
- {ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
+ {ok, _, {_, _, _, _, msg1}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
+ {ok, _, {_, _, _, _, msg2}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
+ {ok, _, {_, _, _, _, msg3}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
ra:stop_server(ServerId),
ok.
@@ -268,7 +267,7 @@ detects_lost_delivery(Config) ->
F000 = rabbit_fifo_client:init(ClusterName, [ServerId]),
{ok, F00} = rabbit_fifo_client:enqueue(msg1, F000),
{_, _, F0} = process_ra_events(receive_ra_events(1, 0), F00),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0),
{ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
{ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
% lose first delivery
@@ -298,13 +297,13 @@ returns_after_down(Config) ->
_Pid = spawn(fun () ->
F = rabbit_fifo_client:init(ClusterName, [ServerId]),
{ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10,
- undefined, F),
+ #{}, F),
Self ! checkout_done
end),
receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end,
timer:sleep(1000),
% message should be available for dequeue
- {ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2),
+ {ok, _, {_, _, _, _, msg1}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2),
ra:stop_server(ServerId),
ok.
@@ -327,9 +326,9 @@ resends_after_lost_applied(Config) ->
% send another message
{ok, F4} = rabbit_fifo_client:enqueue(msg3, F3),
{_, _, F5} = process_ra_events(receive_ra_events(1, 0), F4),
- {ok, {{_, {_, msg1}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
- {ok, {{_, {_, msg2}}, _}, F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
- {ok, {{_, {_, msg3}}, _}, _F8} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F7),
+ {ok, _, {_, _, _, _, msg1}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
+ {ok, _, {_, _, _, _, msg2}, F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
+ {ok, _, {_, _, _, _, msg3}, _F8} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F7),
ra:stop_server(ServerId),
ok.
@@ -377,15 +376,16 @@ discard(Config) ->
_ = ra:members(ServerId),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0),
{ok, F2} = rabbit_fifo_client:enqueue(msg1, F1),
- F3 = discard_next_delivery(F2, 500),
- {ok, empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3),
+ F3 = discard_next_delivery(F2, 5000),
+ {empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3),
receive
{dead_letter, Letters} ->
[{_, msg1}] = Letters,
ok
after 500 ->
+ flush(),
exit(dead_letter_timeout)
end,
ra:stop_server(ServerId),
@@ -397,11 +397,11 @@ cancel_checkout(Config) ->
ok = start_cluster(ClusterName, [ServerId]),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
{ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
- {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1),
+ {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F1),
{_, _, F3} = process_ra_events(receive_ra_events(1, 1), F2, [], [], fun (_, S) -> S end),
{ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3),
- {ok, F5} = rabbit_fifo_client:return(<<"tag">>, [0], F4),
- {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5),
+ {F5, _} = rabbit_fifo_client:return(<<"tag">>, [0], F4),
+ {ok, _, {_, _, _, _, m1}, F5} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5),
ok.
credit(Config) ->
@@ -413,20 +413,20 @@ credit(Config) ->
{ok, F2} = rabbit_fifo_client:enqueue(m2, F1),
{_, _, F3} = process_ra_events(receive_ra_events(2, 0), F2),
%% checkout with 0 prefetch
- {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, undefined, F3),
+ {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, #{}, F3),
%% assert no deliveries
{_, _, F5} = process_ra_events(receive_ra_events(), F4, [], [],
fun
(D, _) -> error({unexpected_delivery, D})
end),
%% provide some credit
- {ok, F6} = rabbit_fifo_client:credit(<<"tag">>, 1, false, F5),
- {[{_, {_, m1}}], [{send_credit_reply, _}], F7} =
+ F6 = rabbit_fifo_client:credit(<<"tag">>, 1, false, F5),
+ {[{_, _, _, _, m1}], [{send_credit_reply, _}], F7} =
process_ra_events(receive_ra_events(1, 1), F6),
%% credit and drain
- {ok, F8} = rabbit_fifo_client:credit(<<"tag">>, 4, true, F7),
- {[{_, {_, m2}}], [{send_credit_reply, _}, {send_drained, _}], F9} =
+ F8 = rabbit_fifo_client:credit(<<"tag">>, 4, true, F7),
+ {[{_, _, _, _, m2}], [{send_credit_reply, _}, {send_drained, _}], F9} =
process_ra_events(receive_ra_events(1, 1), F8),
flush(),
@@ -439,9 +439,8 @@ credit(Config) ->
(D, _) -> error({unexpected_delivery, D})
end),
%% credit again and receive the last message
- {ok, F12} = rabbit_fifo_client:credit(<<"tag">>, 10, false, F11),
- {[{_, {_, m3}}], [{send_credit_reply, _}], _} =
- process_ra_events(receive_ra_events(1, 1), F12),
+ F12 = rabbit_fifo_client:credit(<<"tag">>, 10, false, F11),
+ {[{_, _, _, _, m3}], _, _} = process_ra_events(receive_ra_events(1, 1), F12),
ok.
untracked_enqueue(Config) ->
@@ -452,7 +451,7 @@ untracked_enqueue(Config) ->
ok = rabbit_fifo_client:untracked_enqueue([ServerId], msg1),
timer:sleep(100),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0),
+ {ok, _, {_, _, _, _, msg1}, _F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0),
ra:stop_server(ServerId),
ok.
@@ -472,6 +471,7 @@ flow(Config) ->
ok.
test_queries(Config) ->
+ % ok = logger:set_primary_config(level, all),
ClusterName = ?config(cluster_name, Config),
ServerId = ?config(node_id, Config),
ok = start_cluster(ClusterName, [ServerId]),
@@ -484,20 +484,23 @@ test_queries(Config) ->
Self ! ready,
receive stop -> ok end
end),
+ receive
+ ready -> ok
+ after 5000 ->
+ exit(ready_timeout)
+ end,
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
- ok = receive ready -> ok after 5000 -> timeout end,
- {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, undefined, F0),
- ?assertMatch({ok, {_RaIdxTerm, 1}, _Leader},
- ra:local_query(ServerId,
- fun rabbit_fifo:query_messages_ready/1)),
- ?assertMatch({ok, {_RaIdxTerm, 1}, _Leader},
- ra:local_query(ServerId,
- fun rabbit_fifo:query_messages_checked_out/1)),
- ?assertMatch({ok, {_RaIdxTerm, Processes}, _Leader}
- when length(Processes) == 2,
- ra:local_query(ServerId,
- fun rabbit_fifo:query_processes/1)),
- P ! stop,
+ {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, #{}, F0),
+ {ok, {_, Ready}, _} = ra:local_query(ServerId,
+ fun rabbit_fifo:query_messages_ready/1),
+ ?assertEqual(1, Ready),
+ {ok, {_, Checked}, _} = ra:local_query(ServerId,
+ fun rabbit_fifo:query_messages_checked_out/1),
+ ?assertEqual(1, Checked),
+ {ok, {_, Processes}, _} = ra:local_query(ServerId,
+ fun rabbit_fifo:query_processes/1),
+ ?assertEqual(2, length(Processes)),
+ P ! stop,
ra:stop_server(ServerId),
ok.
@@ -511,15 +514,16 @@ dequeue(Config) ->
Tag = UId,
ok = start_cluster(ClusterName, [ServerId]),
F1 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, empty, F1b} = rabbit_fifo_client:dequeue(Tag, settled, F1),
+ {empty, F1b} = rabbit_fifo_client:dequeue(Tag, settled, F1),
{ok, F2_} = rabbit_fifo_client:enqueue(msg1, F1b),
{_, _, F2} = process_ra_events(receive_ra_events(1, 0), F2_),
- {ok, {{0, {_, msg1}}, _}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2),
+ % {ok, {{0, {_, msg1}}, _}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2),
+ {ok, _, {_, _, 0, _, msg1}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2),
{ok, F4_} = rabbit_fifo_client:enqueue(msg2, F3),
{_, _, F4} = process_ra_events(receive_ra_events(1, 0), F4_),
- {ok, {{MsgId, {_, msg2}}, _}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4),
- {ok, _F6} = rabbit_fifo_client:settle(Tag, [MsgId], F5),
+ {ok, _, {_, _, MsgId, _, msg2}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4),
+ {_F6, _A} = rabbit_fifo_client:settle(Tag, [MsgId], F5),
ra:stop_server(ServerId),
ok.
@@ -534,8 +538,8 @@ conf(ClusterName, UId, ServerId, _, Peers) ->
process_ra_event(State, Wait) ->
receive
{ra_event, From, Evt} ->
- {internal, _, _, S} =
- rabbit_fifo_client:handle_ra_event(From, Evt, State),
+ {ok, S, _Actions} =
+ rabbit_fifo_client:handle_ra_event(From, Evt, State),
S
after Wait ->
exit(ra_event_timeout)
@@ -572,10 +576,10 @@ receive_ra_events(Acc) ->
end.
process_ra_events(Events, State) ->
- DeliveryFun = fun ({delivery, Tag, Msgs}, S) ->
+ DeliveryFun = fun ({deliver, _, Tag, Msgs}, S) ->
MsgIds = [element(1, M) || M <- Msgs],
- {ok, S2} = rabbit_fifo_client:settle(Tag, MsgIds, S),
- S2
+ {S0, _} = rabbit_fifo_client:settle(Tag, MsgIds, S),
+ S0
end,
process_ra_events(Events, State, [], [], DeliveryFun).
@@ -583,43 +587,41 @@ process_ra_events([], State0, Acc, Actions0, _DeliveryFun) ->
{Acc, Actions0, State0};
process_ra_events([{ra_event, From, Evt} | Events], State0, Acc, Actions0, DeliveryFun) ->
case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
- {internal, _, Actions, State} ->
- process_ra_events(Events, State, Acc, Actions0 ++ Actions, DeliveryFun);
- {{delivery, _Tag, Msgs} = Del, State1} ->
- State = DeliveryFun(Del, State1),
- process_ra_events(Events, State, Acc ++ Msgs, Actions0, DeliveryFun);
+ {ok, State1, Actions1} ->
+ {Msgs, Actions, State} =
+ lists:foldl(
+ fun ({deliver, _, _, Msgs} = Del, {M, A, S}) ->
+ {M ++ Msgs, A, DeliveryFun(Del, S)};
+ (Ac, {M, A, S}) ->
+ {M, A ++ [Ac], S}
+ end, {Acc, [], State1}, Actions1),
+ process_ra_events(Events, State, Msgs, Actions0 ++ Actions, DeliveryFun);
eol ->
eol
end.
discard_next_delivery(State0, Wait) ->
receive
- {ra_event, From, Evt} ->
- case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
- {internal, _, _Actions, State} ->
- discard_next_delivery(State, Wait);
- {{delivery, Tag, Msgs}, State1} ->
- MsgIds = [element(1, M) || M <- Msgs],
- {ok, State} = rabbit_fifo_client:discard(Tag, MsgIds,
- State1),
- State
- end
+ {ra_event, _, {machine, {delivery, _, _}}} = Evt ->
+ element(3, process_ra_events([Evt], State0, [], [],
+ fun ({deliver, Tag, _, Msgs}, S) ->
+ MsgIds = [element(3, M) || M <- Msgs],
+ {S0, _} = rabbit_fifo_client:discard(Tag, MsgIds, S),
+ S0
+ end))
after Wait ->
- State0
+ State0
end.
return_next_delivery(State0, Wait) ->
receive
- {ra_event, From, Evt} ->
- case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
- {internal, _, _, State} ->
- return_next_delivery(State, Wait);
- {{delivery, Tag, Msgs}, State1} ->
- MsgIds = [element(1, M) || M <- Msgs],
- {ok, State} = rabbit_fifo_client:return(Tag, MsgIds,
- State1),
- State
- end
+ {ra_event, _, {machine, {delivery, _, _}}} = Evt ->
+ element(3, process_ra_events([Evt], State0, [], [],
+ fun ({deliver, Tag, _, Msgs}, S) ->
+ MsgIds = [element(3, M) || M <- Msgs],
+ {S0, _} = rabbit_fifo_client:return(Tag, MsgIds, S),
+ S0
+ end))
after Wait ->
State0
end.
diff --git a/test/rabbit_ha_test_consumer.erl b/test/rabbit_ha_test_consumer.erl
index 3324a1253c..2467e40028 100644
--- a/test/rabbit_ha_test_consumer.erl
+++ b/test/rabbit_ha_test_consumer.erl
@@ -51,12 +51,15 @@ run(TestPid, Channel, Queue, CancelOnFailover, LowestSeen, MsgsToConsume) ->
%% counter.
if
MsgNum + 1 == LowestSeen ->
+ error_logger:info_msg("recording ~w left ~w",
+ [MsgNum, MsgsToConsume]),
run(TestPid, Channel, Queue,
CancelOnFailover, MsgNum, MsgsToConsume - 1);
MsgNum >= LowestSeen ->
error_logger:info_msg(
- "consumer ~p on ~p ignoring redelivered msg ~p~n",
- [self(), Channel, MsgNum]),
+ "consumer ~p on ~p ignoring redelivered msg ~p"
+ "lowest seen ~w~n",
+ [self(), Channel, MsgNum, LowestSeen]),
true = Redelivered, %% ASSERTION
run(TestPid, Channel, Queue,
CancelOnFailover, LowestSeen, MsgsToConsume);
diff --git a/test/rabbit_msg_record_SUITE.erl b/test/rabbit_msg_record_SUITE.erl
new file mode 100644
index 0000000000..a82ba7481d
--- /dev/null
+++ b/test/rabbit_msg_record_SUITE.erl
@@ -0,0 +1,213 @@
+-module(rabbit_msg_record_SUITE).
+
+-compile(export_all).
+
+-export([
+ ]).
+
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp10_common/include/amqp10_framing.hrl").
+
+%%%===================================================================
+%%% Common Test callbacks
+%%%===================================================================
+
+all() ->
+ [
+ {group, tests}
+ ].
+
+
+all_tests() ->
+ [
+ ampq091_roundtrip,
+ message_id_ulong,
+ message_id_uuid,
+ message_id_binary,
+ message_id_large_binary,
+ message_id_large_string
+ ].
+
+groups() ->
+ [
+ {tests, [], all_tests()}
+ ].
+
+init_per_suite(Config) ->
+ Config.
+
+end_per_suite(_Config) ->
+ ok.
+
+init_per_group(_Group, Config) ->
+ Config.
+
+end_per_group(_Group, _Config) ->
+ ok.
+
+init_per_testcase(_TestCase, Config) ->
+ Config.
+
+end_per_testcase(_TestCase, _Config) ->
+ ok.
+
+%%%===================================================================
+%%% Test cases
+%%%===================================================================
+
+ampq091_roundtrip(_Config) ->
+ Props = #'P_basic'{content_type = <<"text/plain">>,
+ content_encoding = <<"gzip">>,
+ headers = [{<<"x-stream-offset">>, long, 99},
+ {<<"x-string">>, longstr, <<"a string">>},
+ {<<"x-bool">>, bool, false},
+ {<<"x-unsignedbyte">>, unsignedbyte, 1},
+ {<<"x-unsignedshort">>, unsignedshort, 1},
+ {<<"x-unsignedint">>, unsignedint, 1},
+ {<<"x-signedint">>, signedint, 1},
+ {<<"x-timestamp">>, timestamp, 1},
+ {<<"x-double">>, double, 1.0},
+ {<<"x-float">>, float, 1.0},
+ {<<"x-binary">>, binary, <<"data">>}
+ ],
+ delivery_mode = 2,
+ priority = 99,
+ correlation_id = <<"corr">> ,
+ reply_to = <<"reply-to">>,
+ expiration = <<"1">>,
+ message_id = <<"msg-id">>,
+ timestamp = 99,
+ type = <<"45">>,
+ user_id = <<"banana">>,
+ app_id = <<"rmq">>
+ % cluster_id = <<"adf">>
+ },
+ Payload = [<<"data">>],
+ test_amqp091_roundtrip(Props, Payload),
+ test_amqp091_roundtrip(#'P_basic'{}, Payload),
+ ok.
+
+message_id_ulong(_Config) ->
+ Num = 9876789,
+ ULong = erlang:integer_to_binary(Num),
+ P = #'v1_0.properties'{message_id = {ulong, Num},
+ correlation_id = {ulong, Num}},
+ D = #'v1_0.data'{content = <<"data">>},
+ Bin = [amqp10_framing:encode_bin(P),
+ amqp10_framing:encode_bin(D)],
+ R = rabbit_msg_record:init(iolist_to_binary(Bin)),
+ {Props, _} = rabbit_msg_record:to_amqp091(R),
+ ?assertMatch(#'P_basic'{message_id = ULong,
+ correlation_id = ULong,
+ headers =
+ [
+ %% ordering shouldn't matter
+ {<<"x-correlation-id-type">>, longstr, <<"ulong">>},
+ {<<"x-message-id-type">>, longstr, <<"ulong">>}
+ ]},
+ Props),
+ ok.
+
+message_id_uuid(_Config) ->
+ %% fake a uuid
+ UUId = erlang:md5(term_to_binary(make_ref())),
+ TextUUId = rabbit_data_coercion:to_binary(rabbit_guid:to_string(UUId)),
+ P = #'v1_0.properties'{message_id = {uuid, UUId},
+ correlation_id = {uuid, UUId}},
+ D = #'v1_0.data'{content = <<"data">>},
+ Bin = [amqp10_framing:encode_bin(P),
+ amqp10_framing:encode_bin(D)],
+ R = rabbit_msg_record:init(iolist_to_binary(Bin)),
+ {Props, _} = rabbit_msg_record:to_amqp091(R),
+ ?assertMatch(#'P_basic'{message_id = TextUUId,
+ correlation_id = TextUUId,
+ headers =
+ [
+ %% ordering shouldn't matter
+ {<<"x-correlation-id-type">>, longstr, <<"uuid">>},
+ {<<"x-message-id-type">>, longstr, <<"uuid">>}
+ ]},
+ Props),
+ ok.
+
+message_id_binary(_Config) ->
+ %% fake a uuid
+ Orig = <<"asdfasdf">>,
+ Text = base64:encode(Orig),
+ P = #'v1_0.properties'{message_id = {binary, Orig},
+ correlation_id = {binary, Orig}},
+ D = #'v1_0.data'{content = <<"data">>},
+ Bin = [amqp10_framing:encode_bin(P),
+ amqp10_framing:encode_bin(D)],
+ R = rabbit_msg_record:init(iolist_to_binary(Bin)),
+ {Props, _} = rabbit_msg_record:to_amqp091(R),
+ ?assertMatch(#'P_basic'{message_id = Text,
+ correlation_id = Text,
+ headers =
+ [
+ %% ordering shouldn't matter
+ {<<"x-correlation-id-type">>, longstr, <<"binary">>},
+ {<<"x-message-id-type">>, longstr, <<"binary">>}
+ ]},
+ Props),
+ ok.
+
+message_id_large_binary(_Config) ->
+ %% cannot fit in a shortstr
+ Orig = crypto:strong_rand_bytes(500),
+ P = #'v1_0.properties'{message_id = {binary, Orig},
+ correlation_id = {binary, Orig}},
+ D = #'v1_0.data'{content = <<"data">>},
+ Bin = [amqp10_framing:encode_bin(P),
+ amqp10_framing:encode_bin(D)],
+ R = rabbit_msg_record:init(iolist_to_binary(Bin)),
+ {Props, _} = rabbit_msg_record:to_amqp091(R),
+ ?assertMatch(#'P_basic'{message_id = undefined,
+ correlation_id = undefined,
+ headers =
+ [
+ %% ordering shouldn't matter
+ {<<"x-correlation-id">>, longstr, Orig},
+ {<<"x-message-id">>, longstr, Orig}
+ ]},
+ Props),
+ ok.
+
+message_id_large_string(_Config) ->
+ %% cannot fit in a shortstr
+ Orig = base64:encode(crypto:strong_rand_bytes(500)),
+ P = #'v1_0.properties'{message_id = {utf8, Orig},
+ correlation_id = {utf8, Orig}},
+ D = #'v1_0.data'{content = <<"data">>},
+ Bin = [amqp10_framing:encode_bin(P),
+ amqp10_framing:encode_bin(D)],
+ R = rabbit_msg_record:init(iolist_to_binary(Bin)),
+ {Props, _} = rabbit_msg_record:to_amqp091(R),
+ ?assertMatch(#'P_basic'{message_id = undefined,
+ correlation_id = undefined,
+ headers =
+ [
+ %% ordering shouldn't matter
+ {<<"x-correlation-id">>, longstr, Orig},
+ {<<"x-message-id">>, longstr, Orig}
+ ]},
+ Props),
+ ok.
+
+%% Utility
+
+test_amqp091_roundtrip(Props, Payload) ->
+ MsgRecord0 = rabbit_msg_record:from_amqp091(Props, Payload),
+ MsgRecord = rabbit_msg_record:init(
+ iolist_to_binary(rabbit_msg_record:to_iodata(MsgRecord0))),
+ % meck:unload(),
+ {PropsOut, PayloadOut} = rabbit_msg_record:to_amqp091(MsgRecord),
+ ?assertEqual(Props, PropsOut),
+ ?assertEqual(iolist_to_binary(Payload),
+ iolist_to_binary(PayloadOut)),
+ ok.
+
+
diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl
new file mode 100644
index 0000000000..67ca8eba8b
--- /dev/null
+++ b/test/rabbit_stream_queue_SUITE.erl
@@ -0,0 +1,1304 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% Copyright (c) 2012-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_stream_queue_SUITE).
+
+-include_lib("proper/include/proper.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+suite() ->
+ [{timetrap, 5 * 60000}].
+
+all() ->
+ [
+ {group, single_node},
+ {group, cluster_size_2},
+ {group, cluster_size_3},
+ {group, unclustered_size_3_1},
+ {group, unclustered_size_3_2},
+ {group, unclustered_size_3_3},
+ {group, cluster_size_3_1}
+ ].
+
+groups() ->
+ [
+ {single_node, [], [restart_single_node] ++ all_tests()},
+ {cluster_size_2, [], all_tests()},
+ {cluster_size_3, [], all_tests() ++
+ [delete_replica,
+ delete_down_replica,
+ delete_classic_replica,
+ delete_quorum_replica,
+ consume_from_replica,
+ leader_failover]},
+ {unclustered_size_3_1, [], [add_replica]},
+ {unclustered_size_3_2, [], [consume_without_local_replica]},
+ {unclustered_size_3_3, [], [grow_coordinator_cluster]},
+ {cluster_size_3_1, [], [shrink_coordinator_cluster]}
+ ].
+
+all_tests() ->
+ [
+ declare_args,
+ declare_max_age,
+ declare_invalid_args,
+ declare_invalid_properties,
+ declare_queue,
+ delete_queue,
+ publish,
+ publish_confirm,
+ recover,
+ consume_without_qos,
+ consume,
+ consume_offset,
+ basic_get,
+ consume_with_autoack,
+ consume_and_nack,
+ consume_and_ack,
+ consume_and_reject,
+ consume_from_last,
+ consume_from_next,
+ consume_from_default,
+ consume_credit,
+ consume_credit_out_of_order_ack,
+ consume_credit_multiple_ack,
+ basic_cancel,
+ max_length_bytes,
+ max_age,
+ invalid_policy,
+ max_age_policy,
+ max_segment_size_policy
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config0) ->
+ rabbit_ct_helpers:log_environment(),
+ Config = rabbit_ct_helpers:merge_app_env(
+ Config0, {rabbit, [{stream_tick_interval, 1000},
+ {log, [{file, [{level, debug}]}]}]}),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(Group, Config) ->
+ ClusterSize = case Group of
+ single_node -> 1;
+ cluster_size_2 -> 2;
+ cluster_size_3 -> 3;
+ cluster_size_3_1 -> 3;
+ unclustered_size_3_1 -> 3;
+ unclustered_size_3_2 -> 3;
+ unclustered_size_3_3 -> 3
+ end,
+ Clustered = case Group of
+ unclustered_size_3_1 -> false;
+ unclustered_size_3_2 -> false;
+ unclustered_size_3_3 -> false;
+ _ -> true
+ end,
+ Config1 = rabbit_ct_helpers:set_config(Config,
+ [{rmq_nodes_count, ClusterSize},
+ {rmq_nodename_suffix, Group},
+ {tcp_ports_base},
+ {rmq_nodes_clustered, Clustered}]),
+ Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]),
+ Ret = rabbit_ct_helpers:run_steps(Config1b,
+ [fun merge_app_env/1 ] ++
+ rabbit_ct_broker_helpers:setup_steps()),
+ case Ret of
+ {skip, _} ->
+ Ret;
+ Config2 ->
+ EnableFF = rabbit_ct_broker_helpers:enable_feature_flag(
+ Config2, stream_queue),
+ case EnableFF of
+ ok ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config2, 0, application, set_env,
+ [rabbit, channel_tick_interval, 100]),
+ Config2;
+ Skip ->
+ end_per_group(Group, Config2),
+ Skip
+ end
+ end.
+
+end_per_group(_, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
+ Q = rabbit_data_coercion:to_binary(Testcase),
+ Config2 = rabbit_ct_helpers:set_config(Config1, [{queue_name, Q}]),
+ rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()).
+
+merge_app_env(Config) ->
+ rabbit_ct_helpers:merge_app_env(Config,
+ {rabbit, [{core_metrics_gc_interval, 100}]}).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
+ Config1 = rabbit_ct_helpers:run_steps(
+ Config,
+ rabbit_ct_client_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+declare_args(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-max-length">>, long, 2000}])),
+ assert_queue_type(Server, Q, rabbit_stream_queue).
+
+declare_max_age(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server), Q,
+ [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-max-age">>, longstr, <<"1A">>}])),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-max-age">>, longstr, <<"1Y">>}])),
+ assert_queue_type(Server, Q, rabbit_stream_queue).
+
+declare_invalid_properties(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Q = ?config(queue_name, Config),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ amqp_channel:call(
+ rabbit_ct_client_helpers:open_channel(Config, Server),
+ #'queue.declare'{queue = Q,
+ auto_delete = true,
+ durable = true,
+ arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]})),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ amqp_channel:call(
+ rabbit_ct_client_helpers:open_channel(Config, Server),
+ #'queue.declare'{queue = Q,
+ exclusive = true,
+ durable = true,
+ arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]})),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ amqp_channel:call(
+ rabbit_ct_client_helpers:open_channel(Config, Server),
+ #'queue.declare'{queue = Q,
+ durable = false,
+ arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]})).
+
+declare_invalid_args(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Q = ?config(queue_name, Config),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-expires">>, long, 2000}])),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-message-ttl">>, long, 2000}])),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-max-priority">>, long, 2000}])),
+
+ [?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-overflow">>, longstr, XOverflow}]))
+ || XOverflow <- [<<"reject-publish">>, <<"reject-publish-dlx">>]],
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-queue-mode">>, longstr, <<"lazy">>}])),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-quorum-initial-group-size">>, longstr, <<"hop">>}])).
+
+declare_queue(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ %% Test declare an existing queue
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children,
+ [osiris_server_sup])),
+
+ %% Test declare an existing queue with different arguments
+ ?assertExit(_, declare(Ch, Q, [])).
+
+delete_queue(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q})).
+
+add_replica(Config) ->
+ [Server0, Server1, Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ Q = ?config(queue_name, Config),
+
+ %% Let's also try the add replica command on other queue types, it should fail
+ %% We're doing it in the same test for efficiency, otherwise we have to
+ %% start new rabbitmq clusters every time for a minor testcase
+ QClassic = <<Q/binary, "_classic">>,
+ QQuorum = <<Q/binary, "_quorum">>,
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ ?assertEqual({'queue.declare_ok', QClassic, 0, 0},
+ declare(Ch, QClassic, [{<<"x-queue-type">>, longstr, <<"classic">>}])),
+ ?assertEqual({'queue.declare_ok', QQuorum, 0, 0},
+ declare(Ch, QQuorum, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ %% Not a member of the cluster, what would happen?
+ ?assertEqual({error, node_not_running},
+ rpc:call(Server0, rabbit_stream_queue, add_replica,
+ [<<"/">>, Q, Server1])),
+ ?assertEqual({error, classic_queue_not_supported},
+ rpc:call(Server0, rabbit_stream_queue, add_replica,
+ [<<"/">>, QClassic, Server1])),
+ ?assertEqual({error, quorum_queue_not_supported},
+ rpc:call(Server0, rabbit_stream_queue, add_replica,
+ [<<"/">>, QQuorum, Server1])),
+
+ ok = rabbit_control_helper:command(stop_app, Server1),
+ ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
+ rabbit_control_helper:command(start_app, Server1),
+ timer:sleep(1000),
+ ?assertEqual({error, classic_queue_not_supported},
+ rpc:call(Server0, rabbit_stream_queue, add_replica,
+ [<<"/">>, QClassic, Server1])),
+ ?assertEqual({error, quorum_queue_not_supported},
+ rpc:call(Server0, rabbit_stream_queue, add_replica,
+ [<<"/">>, QQuorum, Server1])),
+ ?assertEqual(ok,
+ rpc:call(Server0, rabbit_stream_queue, add_replica,
+ [<<"/">>, Q, Server1])),
+ %% replicas must be recorded on the state, and if we publish messages then they must
+ %% be stored on disk
+ check_leader_and_replicas(Config, Q, Server0, [Server1]),
+ %% And if we try again? Idempotent
+ ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, add_replica,
+ [<<"/">>, Q, Server1])),
+ %% Add another node
+ ok = rabbit_control_helper:command(stop_app, Server2),
+ ok = rabbit_control_helper:command(join_cluster, Server2, [atom_to_list(Server0)], []),
+ rabbit_control_helper:command(start_app, Server2),
+ ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, add_replica,
+ [<<"/">>, Q, Server2])),
+ check_leader_and_replicas(Config, Q, Server0, [Server1, Server2]).
+
+delete_replica(Config) ->
+ [Server0, Server1, Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ check_leader_and_replicas(Config, Q, Server0, [Server1, Server2]),
+ %% Not a member of the cluster, what would happen?
+ ?assertEqual({error, node_not_running},
+ rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, 'zen@rabbit'])),
+ ?assertEqual(ok,
+ rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, Server1])),
+ %% check it's gone
+ check_leader_and_replicas(Config, Q, Server0, [Server2]),
+ %% And if we try again? Idempotent
+ ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, Server1])),
+ %% Delete the last replica
+ ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, Server2])),
+ check_leader_and_replicas(Config, Q, Server0, []).
+
+grow_coordinator_cluster(Config) ->
+ [Server0, Server1, _Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ ok = rabbit_control_helper:command(stop_app, Server1),
+ ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
+ rabbit_control_helper:command(start_app, Server1),
+
+ rabbit_ct_helpers:await_condition(
+ fun() ->
+ case rpc:call(Server0, ra, members, [{rabbit_stream_coordinator, Server0}]) of
+ {_, Members, _} ->
+ Nodes = lists:sort([N || {_, N} <- Members]),
+ lists:sort([Server0, Server1]) == Nodes;
+ _ ->
+ false
+ end
+ end, 60000).
+
+shrink_coordinator_cluster(Config) ->
+ [Server0, Server1, Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ ok = rabbit_control_helper:command(stop_app, Server2),
+ ok = rabbit_control_helper:command(forget_cluster_node, Server0, [atom_to_list(Server2)], []),
+
+ rabbit_ct_helpers:await_condition(
+ fun() ->
+ case rpc:call(Server0, ra, members, [{rabbit_stream_coordinator, Server0}]) of
+ {_, Members, _} ->
+ Nodes = lists:sort([N || {_, N} <- Members]),
+ lists:sort([Server0, Server1]) == Nodes;
+ _ ->
+ false
+ end
+ end, 60000).
+
+delete_classic_replica(Config) ->
+ [Server0, Server1, _Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"classic">>}])),
+ %% Not a member of the cluster, what would happen?
+ ?assertEqual({error, classic_queue_not_supported},
+ rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, 'zen@rabbit'])),
+ ?assertEqual({error, classic_queue_not_supported},
+ rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, Server1])).
+
+delete_quorum_replica(Config) ->
+ [Server0, Server1, _Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ %% Not a member of the cluster, what would happen?
+ ?assertEqual({error, quorum_queue_not_supported},
+ rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, 'zen@rabbit'])),
+ ?assertEqual({error, quorum_queue_not_supported},
+ rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, Server1])).
+
+delete_down_replica(Config) ->
+ [Server0, Server1, Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ check_leader_and_replicas(Config, Q, Server0, [Server1, Server2]),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
+ ?assertEqual({error, node_not_running},
+ rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, Server1])),
+ %% check it isn't gone
+ check_leader_and_replicas(Config, Q, Server0, [Server1, Server2]),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
+ ?assertEqual(ok,
+ rpc:call(Server0, rabbit_stream_queue, delete_replica,
+ [<<"/">>, Q, Server1])).
+
+publish(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ publish(Ch, Q),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]).
+
+publish_confirm(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ publish(Ch, Q),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]).
+
+restart_single_node(Config) ->
+ [Server] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ publish(Ch, Q),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]),
+
+ rabbit_control_helper:command(stop_app, Server),
+ rabbit_control_helper:command(start_app, Server),
+
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]),
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ publish(Ch1, Q),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"2">>, <<"0">>]]).
+
+recover(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ publish(Ch, Q),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]),
+
+ [rabbit_ct_broker_helpers:stop_node(Config, S) || S <- Servers],
+ [rabbit_ct_broker_helpers:start_node(Config, S) || S <- lists:reverse(Servers)],
+
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]),
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ publish(Ch1, Q),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"2">>, <<"0">>]]).
+
+consume_without_qos(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ ?assertExit({{shutdown, {server_initiated_close, 406, _}}, _},
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, consumer_tag = <<"ctag">>},
+ self())).
+
+consume_without_local_replica(Config) ->
+ [Server0, Server1 | _] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ %% Add another node to the cluster, but it won't have a replica
+ ok = rabbit_control_helper:command(stop_app, Server1),
+ ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
+ rabbit_control_helper:command(start_app, Server1),
+ timer:sleep(1000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ qos(Ch1, 10, false),
+ ?assertExit({{shutdown, {server_initiated_close, 406, _}}, _},
+ amqp_channel:subscribe(Ch1, #'basic.consume'{queue = Q, consumer_tag = <<"ctag">>},
+ self())).
+
+consume(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ publish(Ch, Q),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+ subscribe(Ch1, Q, false, 0),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false}),
+ _ = amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}),
+ ok = amqp_channel:close(Ch1),
+ ok
+ after 5000 ->
+ exit(timeout)
+ end.
+
+consume_offset(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ Payload = << <<"1">> || _ <- lists:seq(1, 500) >>,
+ [publish(Ch, Q, Payload) || _ <- lists:seq(1, 1000)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ run_proper(
+ fun () ->
+ ?FORALL(Offset, range(0, 999),
+ begin
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+ subscribe(Ch1, Q, false, Offset),
+ receive_batch(Ch1, Offset, 999),
+ receive
+ {_,
+ #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S}]}}}
+ when S < Offset ->
+ exit({unexpected_offset, S})
+ after 1000 ->
+ ok
+ end,
+ amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}),
+ true
+ end)
+ end, [], 25).
+
+basic_get(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _},
+ amqp_channel:call(Ch, #'basic.get'{queue = Q})).
+
+consume_with_autoack(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+
+ ?assertExit(
+ {{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _},
+ subscribe(Ch1, Q, true, 0)).
+
+consume_and_nack(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ publish(Ch, Q),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+ subscribe(Ch1, Q, false, 0),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ ok = amqp_channel:cast(Ch1, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true}),
+ %% Nack will throw a not implemented exception. As it is a cast operation,
+ %% we'll detect the conneciton/channel closure on the next call.
+ %% Let's try to redeclare and see what happens
+ ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _},
+ declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}]))
+ after 10000 ->
+ exit(timeout)
+ end.
+
+basic_cancel(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ publish(Ch, Q),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+ subscribe(Ch1, Q, false, 0),
+ rabbit_ct_helpers:await_condition(
+ fun() ->
+ 1 == length(rabbit_ct_broker_helpers:rpc(Config, Server, ets, tab2list,
+ [consumer_created]))
+ end, 30000),
+ receive
+ {#'basic.deliver'{}, _} ->
+ amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}),
+ ?assertMatch([], rabbit_ct_broker_helpers:rpc(Config, Server, ets, tab2list, [consumer_created]))
+ after 10000 ->
+ exit(timeout)
+ end.
+
+consume_and_reject(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ publish(Ch, Q),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+ subscribe(Ch1, Q, false, 0),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ ok = amqp_channel:cast(Ch1, #'basic.reject'{delivery_tag = DeliveryTag,
+ requeue = true}),
+ %% Reject will throw a not implemented exception. As it is a cast operation,
+ %% we'll detect the conneciton/channel closure on the next call.
+ %% Let's try to redeclare and see what happens
+ ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _},
+ declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}]))
+ after 10000 ->
+ exit(timeout)
+ end.
+
+consume_and_ack(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ publish(Ch, Q),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+ subscribe(Ch1, Q, false, 0),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false}),
+ %% It will succeed as ack is now a credit operation. We should be
+ %% able to redeclare a queue (gen_server call op) as the channel
+ %% should still be open and declare is an idempotent operation
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]])
+ after 5000 ->
+ exit(timeout)
+ end.
+
+consume_from_last(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+
+ [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [committed_offset]]),
+
+ %% We'll receive data from the last committed offset, let's check that is not the
+ %% first offset
+ CommittedOffset = proplists:get_value(committed_offset, Info),
+ ?assert(CommittedOffset > 0),
+
+ %% If the offset is not provided, we're subscribing to the tail of the stream
+ amqp_channel:subscribe(
+ Ch1, #'basic.consume'{queue = Q,
+ no_ack = false,
+ consumer_tag = <<"ctag">>,
+ arguments = [{<<"x-stream-offset">>, longstr, <<"last">>}]},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ ok
+ end,
+
+ %% And receive the messages from the last committed offset to the end of the stream
+ receive_batch(Ch1, CommittedOffset, 99),
+
+ %% Publish a few more
+ [publish(Ch, Q, <<"msg2">>) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ %% Yeah! we got them
+ receive_batch(Ch1, 100, 199).
+
+consume_from_next(Config) ->
+ consume_from_next(Config, [{<<"x-stream-offset">>, longstr, <<"next">>}]).
+
+consume_from_default(Config) ->
+ consume_from_next(Config, []).
+
+consume_from_next(Config, Args) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 10, false),
+
+ [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [committed_offset]]),
+
+ %% We'll receive data from the last committed offset, let's check that is not the
+ %% first offset
+ CommittedOffset = proplists:get_value(committed_offset, Info),
+ ?assert(CommittedOffset > 0),
+
+ %% If the offset is not provided, we're subscribing to the tail of the stream
+ amqp_channel:subscribe(
+ Ch1, #'basic.consume'{queue = Q,
+ no_ack = false,
+ consumer_tag = <<"ctag">>,
+ arguments = Args},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ ok
+ end,
+
+ %% Publish a few more
+ [publish(Ch, Q, <<"msg2">>) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ %% Yeah! we got them
+ receive_batch(Ch1, 100, 199).
+
+consume_from_replica(Config) ->
+ [Server1, Server2 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch1, self()),
+ [publish(Ch1, Q, <<"msg1">>) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch1, 5000),
+
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
+ qos(Ch2, 10, false),
+
+ subscribe(Ch2, Q, false, 0),
+ receive_batch(Ch2, 0, 99).
+
+consume_credit(Config) ->
+ %% Because osiris provides one chunk on every read and we don't want to buffer
+ %% messages in the broker to avoid memory penalties, the credit value won't
+ %% be strict - we allow it into the negative values.
+ %% We can test that after receiving a chunk, no more messages are delivered until
+ %% the credit goes back to a positive value.
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ %% Let's publish a big batch, to ensure we have more than a chunk available
+ NumMsgs = 100,
+ [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, NumMsgs)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+
+ %% Let's subscribe with a small credit, easier to test
+ Credit = 2,
+ qos(Ch1, Credit, false),
+ subscribe(Ch1, Q, false, 0),
+
+ %% Receive everything
+ DeliveryTags = receive_batch(),
+
+ %% We receive at least the given credit as we know there are 100 messages in the queue
+ ?assert(length(DeliveryTags) >= Credit),
+
+ %% Let's ack as many messages as we can while avoiding a positive credit for new deliveries
+ {ToAck, Pending} = lists:split(length(DeliveryTags) - Credit, DeliveryTags),
+
+ [ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false})
+ || DeliveryTag <- ToAck],
+
+ %% Nothing here, this is good
+ receive
+ {#'basic.deliver'{}, _} ->
+ exit(unexpected_delivery)
+ after 1000 ->
+ ok
+ end,
+
+ %% Let's ack one more, we should receive a new chunk
+ ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = hd(Pending),
+ multiple = false}),
+
+ %% Yeah, here is the new chunk!
+ receive
+ {#'basic.deliver'{}, _} ->
+ ok
+ after 5000 ->
+ exit(timeout)
+ end.
+
+consume_credit_out_of_order_ack(Config) ->
+ %% Like consume_credit but acknowledging the messages out of order.
+ %% We want to ensure it doesn't behave like multiple, that is if we have
+ %% credit 2 and received 10 messages, sending the ack for the message id
+ %% number 10 should only increase credit by 1.
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ %% Let's publish a big batch, to ensure we have more than a chunk available
+ NumMsgs = 100,
+ [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, NumMsgs)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+
+ %% Let's subscribe with a small credit, easier to test
+ Credit = 2,
+ qos(Ch1, Credit, false),
+ subscribe(Ch1, Q, false, 0),
+
+ %% ******* This is the difference with consume_credit
+ %% Receive everything, let's reverse the delivery tags here so we ack out of order
+ DeliveryTags = lists:reverse(receive_batch()),
+
+ %% We receive at least the given credit as we know there are 100 messages in the queue
+ ?assert(length(DeliveryTags) >= Credit),
+
+ %% Let's ack as many messages as we can while avoiding a positive credit for new deliveries
+ {ToAck, Pending} = lists:split(length(DeliveryTags) - Credit, DeliveryTags),
+
+ [ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false})
+ || DeliveryTag <- ToAck],
+
+ %% Nothing here, this is good
+ receive
+ {#'basic.deliver'{}, _} ->
+ exit(unexpected_delivery)
+ after 1000 ->
+ ok
+ end,
+
+ %% Let's ack one more, we should receive a new chunk
+ ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = hd(Pending),
+ multiple = false}),
+
+ %% Yeah, here is the new chunk!
+ receive
+ {#'basic.deliver'{}, _} ->
+ ok
+ after 5000 ->
+ exit(timeout)
+ end.
+
+consume_credit_multiple_ack(Config) ->
+ %% Like consume_credit but acknowledging the messages out of order.
+ %% We want to ensure it doesn't behave like multiple, that is if we have
+ %% credit 2 and received 10 messages, sending the ack for the message id
+ %% number 10 should only increase credit by 1.
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ %% Let's publish a big batch, to ensure we have more than a chunk available
+ NumMsgs = 100,
+ [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, NumMsgs)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+
+ %% Let's subscribe with a small credit, easier to test
+ Credit = 2,
+ qos(Ch1, Credit, false),
+ subscribe(Ch1, Q, false, 0),
+
+ %% ******* This is the difference with consume_credit
+ %% Receive everything, let's reverse the delivery tags here so we ack out of order
+ DeliveryTag = lists:last(receive_batch()),
+
+ ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = true}),
+
+ %% Yeah, here is the new chunk!
+ receive
+ {#'basic.deliver'{}, _} ->
+ ok
+ after 5000 ->
+ exit(timeout)
+ end.
+
+max_length_bytes(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-max-length-bytes">>, long, 500},
+ {<<"x-max-segment-size">>, long, 250}])),
+
+ Payload = << <<"1">> || _ <- lists:seq(1, 500) >>,
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ [publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ %% We don't yet have reliable metrics, as the committed offset doesn't work
+ %% as a counter once we start applying retention policies.
+ %% Let's wait for messages and hope these are less than the number of published ones
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 100, false),
+ subscribe(Ch1, Q, false, 0),
+
+ ?assert(length(receive_batch()) < 100).
+
+max_age(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-max-age">>, longstr, <<"10s">>},
+ {<<"x-max-segment-size">>, long, 250}])),
+
+ Payload = << <<"1">> || _ <- lists:seq(1, 500) >>,
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ [publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ timer:sleep(10000),
+
+ %% Let's publish again so the new segments will trigger the retention policy
+ [publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch1, 200, false),
+ subscribe(Ch1, Q, false, 0),
+ ?assertEqual(100, length(receive_batch())).
+
+leader_failover(Config) ->
+ [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch1, self()),
+ [publish(Ch1, Q, <<"msg">>) || _ <- lists:seq(1, 100)],
+ amqp_channel:wait_for_confirms(Ch1, 5000),
+
+ check_leader_and_replicas(Config, Q, Server1, [Server2, Server3]),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
+ timer:sleep(30000),
+
+ [Info] = lists:filter(
+ fun(Props) ->
+ QName = rabbit_misc:r(<<"/">>, queue, Q),
+ lists:member({name, QName}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader, members]])),
+ NewLeader = proplists:get_value(leader, Info),
+ ?assert(NewLeader =/= Server1),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server1).
+
+invalid_policy(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"ha">>, <<"invalid_policy.*">>, <<"queues">>,
+ [{<<"ha-mode">>, <<"all">>}]),
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"ttl">>, <<"invalid_policy.*">>, <<"queues">>,
+ [{<<"message-ttl">>, 5}]),
+
+ [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [policy, operator_policy,
+ effective_policy_definition]]),
+
+ ?assertEqual('', proplists:get_value(policy, Info)),
+ ?assertEqual('', proplists:get_value(operator_policy, Info)),
+ ?assertEqual([], proplists:get_value(effective_policy_definition, Info)),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ha">>),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl">>).
+
+max_age_policy(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"age">>, <<"max_age_policy.*">>, <<"queues">>,
+ [{<<"max-age">>, <<"1Y">>}]),
+
+ [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [policy, operator_policy,
+ effective_policy_definition]]),
+
+ ?assertEqual(<<"age">>, proplists:get_value(policy, Info)),
+ ?assertEqual('', proplists:get_value(operator_policy, Info)),
+ ?assertEqual([{<<"max-age">>, <<"1Y">>}],
+ proplists:get_value(effective_policy_definition, Info)),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"age">>).
+
+max_segment_size_policy(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"segment">>, <<"max_segment_size.*">>, <<"queues">>,
+ [{<<"max-segment-size">>, 5000}]),
+
+ [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [policy, operator_policy,
+ effective_policy_definition]]),
+
+ ?assertEqual(<<"segment">>, proplists:get_value(policy, Info)),
+ ?assertEqual('', proplists:get_value(operator_policy, Info)),
+ ?assertEqual([{<<"max-segment-size">>, 5000}],
+ proplists:get_value(effective_policy_definition, Info)),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"segment">>).
+
+%%----------------------------------------------------------------------------
+
+delete_queues() ->
+ [{ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
+ || Q <- rabbit_amqqueue:list()].
+
+declare(Ch, Q) ->
+ declare(Ch, Q, []).
+
+declare(Ch, Q, Args) ->
+ amqp_channel:call(Ch, #'queue.declare'{queue = Q,
+ durable = true,
+ auto_delete = false,
+ arguments = Args}).
+assert_queue_type(Server, Q, Expected) ->
+ Actual = get_queue_type(Server, Q),
+ Expected = Actual.
+
+get_queue_type(Server, Q0) ->
+ QNameRes = rabbit_misc:r(<<"/">>, queue, Q0),
+ {ok, Q1} = rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]),
+ amqqueue:get_type(Q1).
+
+check_leader_and_replicas(Config, Name, Leader, Replicas0) ->
+ QNameRes = rabbit_misc:r(<<"/">>, queue, Name),
+ [Info] = lists:filter(
+ fun(Props) ->
+ lists:member({name, QNameRes}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader, members]])),
+ ?assertEqual(Leader, proplists:get_value(leader, Info)),
+ Replicas = lists:sort(Replicas0),
+ ?assertEqual(Replicas, lists:sort(proplists:get_value(members, Info))).
+
+publish(Ch, Queue) ->
+ publish(Ch, Queue, <<"msg">>).
+
+publish(Ch, Queue, Msg) ->
+ ok = amqp_channel:cast(Ch,
+ #'basic.publish'{routing_key = Queue},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = Msg}).
+
+subscribe(Ch, Queue, NoAck, Offset) ->
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
+ no_ack = NoAck,
+ consumer_tag = <<"ctag">>,
+ arguments = [{<<"x-stream-offset">>, long, Offset}]},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ ok
+ end.
+
+qos(Ch, Prefetch, Global) ->
+ ?assertMatch(#'basic.qos_ok'{},
+ amqp_channel:call(Ch, #'basic.qos'{global = Global,
+ prefetch_count = Prefetch})).
+
+receive_batch(Ch, N, N) ->
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag},
+ #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, N}]}}} ->
+ ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false})
+ after 5000 ->
+ exit({missing_offset, N})
+ end;
+receive_batch(Ch, N, M) ->
+ receive
+ {_,
+ #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, S}]}}}
+ when S < N ->
+ exit({unexpected_offset, S});
+ {#'basic.deliver'{delivery_tag = DeliveryTag},
+ #amqp_msg{props = #'P_basic'{headers = [{<<"x-stream-offset">>, long, N}]}}} ->
+ ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false}),
+ receive_batch(Ch, N + 1, M)
+ after 5000 ->
+ exit({missing_offset, N})
+ end.
+
+receive_batch() ->
+ receive_batch([]).
+
+receive_batch(Acc) ->
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ receive_batch([DeliveryTag | Acc])
+ after 5000 ->
+ lists:reverse(Acc)
+ end.
+
+run_proper(Fun, Args, NumTests) ->
+ ?assertEqual(
+ true,
+ proper:counterexample(
+ erlang:apply(Fun, Args),
+ [{numtests, NumTests},
+ {on_output, fun(".", _) -> ok; % don't print the '.'s on new lines
+ (F, A) -> ct:pal(?LOW_IMPORTANCE, F, A)
+ end}])).
diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl
index 013e625159..8b2c1d6ebb 100644
--- a/test/simple_ha_SUITE.erl
+++ b/test/simple_ha_SUITE.erl
@@ -234,8 +234,10 @@ consume_survives(Config,
DeathFun(Config, A),
%% verify that the consumer got all msgs, or die - the await_response
%% calls throw an exception if anything goes wrong....
- rabbit_ha_test_consumer:await_response(ConsumerPid),
+ ct:pal("awaiting produce ~w", [ProducerPid]),
rabbit_ha_test_producer:await_response(ProducerPid),
+ ct:pal("awaiting consumer ~w", [ConsumerPid]),
+ rabbit_ha_test_consumer:await_response(ConsumerPid),
ok.
confirms_survive(Config, DeathFun) ->
diff --git a/test/unit_log_config_SUITE.erl b/test/unit_log_config_SUITE.erl
index 3610fd1a80..6be403fd3e 100644
--- a/test/unit_log_config_SUITE.erl
+++ b/test/unit_log_config_SUITE.erl
@@ -126,6 +126,10 @@ sink_rewrite_sinks() ->
{rabbit_log_mirroring_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]},
+ {rabbit_log_osiris_lager_event,
+ [{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
+ {rabbit_handlers,
+ [{lager_forwarder_backend,[lager_event,info]}]}]},
{rabbit_log_prelaunch_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]},
@@ -226,6 +230,10 @@ sink_handlers_merged_with_lager_extra_sinks_handlers(_) ->
{rabbit_log_mirroring_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]},
+ {rabbit_log_osiris_lager_event,
+ [{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]},
+ {rabbit_handlers,
+ [{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]},
{rabbit_log_prelaunch_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]},
@@ -317,6 +325,10 @@ level_sinks() ->
{rabbit_log_mirroring_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,error]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,error]}]}]},
+ {rabbit_log_osiris_lager_event,
+ [{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
+ {rabbit_handlers,
+ [{lager_forwarder_backend,[lager_event,info]}]}]},
{rabbit_log_prelaunch_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]},
@@ -427,6 +439,10 @@ file_sinks(DefaultLevel) ->
{rabbit_log_mirroring_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]},
+ {rabbit_log_osiris_lager_event,
+ [{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]},
+ {rabbit_handlers,
+ [{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]},
{rabbit_log_prelaunch_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,DefaultLevel]}]}]},
@@ -674,6 +690,10 @@ default_expected_sinks(UpgradeFile) ->
{rabbit_log_mirroring_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]},
+ {rabbit_log_osiris_lager_event,
+ [{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
+ {rabbit_handlers,
+ [{lager_forwarder_backend,[lager_event,info]}]}]},
{rabbit_log_prelaunch_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]},
@@ -761,6 +781,10 @@ tty_expected_sinks() ->
{rabbit_log_mirroring_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]},
+ {rabbit_log_osiris_lager_event,
+ [{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
+ {rabbit_handlers,
+ [{lager_forwarder_backend,[lager_event,info]}]}]},
{rabbit_log_prelaunch_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,info]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,info]}]}]},