summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2020-03-31 00:13:01 +0300
committerMichael Klishin <michael@clojurewerkz.org>2020-03-31 00:13:01 +0300
commitd0de21f5cc2a39174b47b3c235ef0cac9fcc87de (patch)
tree6b4c9e3861af1348fe56d4192c2a30d064670a01
parent26fd310719a4729084d785986d83d7469748ee71 (diff)
downloadrabbitmq-server-git-d0de21f5cc2a39174b47b3c235ef0cac9fcc87de.tar.gz
Finish splitting unit_*_SUITE suites
-rw-r--r--test/dynamic_ha_SUITE.erl68
-rw-r--r--test/many_node_ha_SUITE.erl2
-rw-r--r--test/message_size_limit_SUITE.erl154
-rw-r--r--test/queue_length_limits_SUITE.erl391
-rw-r--r--test/unit_access_control_authn_authz_context_propagation_SUITE.erl2
-rw-r--r--test/unit_amqp091_server_properties_SUITE.erl153
-rw-r--r--test/unit_gen_server2_SUITE.erl84
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl923
8 files changed, 845 insertions, 932 deletions
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
index 1dd9e8df1f..6a50bf13a3 100644
--- a/test/dynamic_ha_SUITE.erl
+++ b/test/dynamic_ha_SUITE.erl
@@ -64,7 +64,8 @@ groups() ->
slave_recovers_after_vhost_down_and_up,
master_migrates_on_vhost_down,
slave_recovers_after_vhost_down_and_master_migrated,
- queue_survive_adding_dead_vhost_mirror
+ queue_survive_adding_dead_vhost_mirror,
+ dynamic_mirroring
]},
{cluster_size_3, [], [
change_policy,
@@ -76,10 +77,6 @@ groups() ->
rebalance_exactly,
rebalance_nodes,
rebalance_multiple_blocked
- % FIXME: Re-enable those tests when the know issues are
- % fixed.
- % failing_random_policies,
- % random_policy
]}
]}
].
@@ -128,9 +125,68 @@ end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
%% -------------------------------------------------------------------
-%% Testcases.
+%% Test Cases
%% -------------------------------------------------------------------
+dynamic_mirroring(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, dynamic_mirroring1, [Config]).
+
+dynamic_mirroring1(_Config) ->
+ %% Just unit tests of the node selection logic, see multi node
+ %% tests for the rest...
+ Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params,
+ {MNode, SNodes, SSNodes}, All) ->
+ {ok, M} = rabbit_mirror_queue_misc:module(Policy),
+ {NewM, NewSs0} = M:suggested_queue_nodes(
+ Params, MNode, SNodes, SSNodes, All),
+ NewSs1 = lists:sort(NewSs0),
+ case dm_list_match(NewSs, NewSs1, ExtraSs) of
+ ok -> ok;
+ error -> exit({no_match, NewSs, NewSs1, ExtraSs})
+ end
+ end,
+
+ Test({a,[b,c],0},<<"all">>,'_',{a,[], []}, [a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[b,c],[b,c]},[a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[d], [d]}, [a,b,c]),
+
+ N = fun (Atoms) -> [list_to_binary(atom_to_list(A)) || A <- Atoms] end,
+
+ %% Add a node
+ Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[b],[b]},[a,b,c,d]),
+ Test({b,[a,c],0},<<"nodes">>,N([a,b,c]),{b,[a],[a]},[a,b,c,d]),
+ %% Add two nodes and drop one
+ Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[d],[d]},[a,b,c,d]),
+ %% Don't try to include nodes that are not running
+ Test({a,[b], 0},<<"nodes">>,N([a,b,f]),{a,[b],[b]},[a,b,c,d]),
+ %% If we can't find any of the nodes listed then just keep the master
+ Test({a,[], 0},<<"nodes">>,N([f,g,h]),{a,[b],[b]},[a,b,c,d]),
+ %% And once that's happened, still keep the master even when not listed,
+ %% if nothing is synced
+ Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[], []}, [a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[b],[]}, [a,b,c,d]),
+ %% But if something is synced we can lose the master - but make
+ %% sure we pick the new master from the nodes which are synced!
+ Test({b,[c], 0},<<"nodes">>,N([b,c]), {a,[b],[b]},[a,b,c,d]),
+ Test({b,[c], 0},<<"nodes">>,N([c,b]), {a,[b],[b]},[a,b,c,d]),
+
+ Test({a,[], 1},<<"exactly">>,2,{a,[], []}, [a,b,c,d]),
+ Test({a,[], 2},<<"exactly">>,3,{a,[], []}, [a,b,c,d]),
+ Test({a,[c], 0},<<"exactly">>,2,{a,[c], [c]}, [a,b,c,d]),
+ Test({a,[c], 1},<<"exactly">>,3,{a,[c], [c]}, [a,b,c,d]),
+ Test({a,[c], 0},<<"exactly">>,2,{a,[c,d],[c,d]},[a,b,c,d]),
+ Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d],[c,d]},[a,b,c,d]),
+
+ passed.
+
+%% Does the first list match the second where the second is required
+%% to have exactly Extra superfluous items?
+dm_list_match([], [], 0) -> ok;
+dm_list_match(_, [], _Extra) -> error;
+dm_list_match([H|T1], [H |T2], Extra) -> dm_list_match(T1, T2, Extra);
+dm_list_match(L1, [_H|T2], Extra) -> dm_list_match(L1, T2, Extra - 1).
+
change_policy(Config) ->
[A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
diff --git a/test/many_node_ha_SUITE.erl b/test/many_node_ha_SUITE.erl
index 8eebe62f0d..6f3bc49de7 100644
--- a/test/many_node_ha_SUITE.erl
+++ b/test/many_node_ha_SUITE.erl
@@ -74,7 +74,7 @@ end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
%% -------------------------------------------------------------------
-%% Testcases.
+%% Test Cases
%% -------------------------------------------------------------------
kill_intermediate(Config) ->
diff --git a/test/message_size_limit_SUITE.erl b/test/message_size_limit_SUITE.erl
new file mode 100644
index 0000000000..693cd3d3b4
--- /dev/null
+++ b/test/message_size_limit_SUITE.erl
@@ -0,0 +1,154 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(message_size_limit_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("kernel/include/file.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+-define(TIMEOUT_LIST_OPS_PASS, 5000).
+-define(TIMEOUT, 30000).
+-define(TIMEOUT_CHANNEL_EXCEPTION, 5000).
+
+-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
+
+all() ->
+ [
+ {group, parallel_tests}
+ ].
+
+groups() ->
+ [
+ {parallel_tests, [parallel], [
+ max_message_size
+ ]}
+ ].
+
+suite() ->
+ [
+ {timetrap, {minutes, 3}}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(Group, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, 1}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_group(_Group, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% -------------------------------------------------------------------
+%% Test cases
+%% -------------------------------------------------------------------
+
+max_message_size(Config) ->
+ Binary2M = gen_binary_mb(2),
+ Binary4M = gen_binary_mb(4),
+ Binary6M = gen_binary_mb(6),
+ Binary10M = gen_binary_mb(10),
+
+ Size2Mb = 1024 * 1024 * 2,
+ Size2Mb = byte_size(Binary2M),
+
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ application, set_env, [rabbit, max_message_size, 1024 * 1024 * 3]),
+
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+
+ %% Binary is within the max size limit
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary2M}),
+ %% The channel process is alive
+ assert_channel_alive(Ch),
+
+ Monitor = monitor(process, Ch),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary4M}),
+ assert_channel_fail_max_size(Ch, Monitor),
+
+ %% increase the limit
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ application, set_env, [rabbit, max_message_size, 1024 * 1024 * 8]),
+
+ {_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary2M}),
+ assert_channel_alive(Ch1),
+
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary4M}),
+ assert_channel_alive(Ch1),
+
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary6M}),
+ assert_channel_alive(Ch1),
+
+ Monitor1 = monitor(process, Ch1),
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary10M}),
+ assert_channel_fail_max_size(Ch1, Monitor1),
+
+ %% increase beyond the hard limit
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ application, set_env, [rabbit, max_message_size, 1024 * 1024 * 600]),
+ Val = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_channel, get_max_message_size, []),
+
+ ?assertEqual(?MAX_MSG_SIZE, Val).
+
+%% -------------------------------------------------------------------
+%% Implementation
+%% -------------------------------------------------------------------
+
+gen_binary_mb(N) ->
+ B1M = << <<"_">> || _ <- lists:seq(1, 1024 * 1024) >>,
+ << B1M || _ <- lists:seq(1, N) >>.
+
+assert_channel_alive(Ch) ->
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>},
+ #amqp_msg{payload = <<"HI">>}).
+
+assert_channel_fail_max_size(Ch, Monitor) ->
+ receive
+ {'DOWN', Monitor, process, Ch,
+ {shutdown,
+ {server_initiated_close, 406, _Error}}} ->
+ ok
+ after ?TIMEOUT_CHANNEL_EXCEPTION ->
+ error({channel_exception_expected, max_message_size})
+ end.
diff --git a/test/queue_length_limits_SUITE.erl b/test/queue_length_limits_SUITE.erl
new file mode 100644
index 0000000000..40e6ae79fe
--- /dev/null
+++ b/test/queue_length_limits_SUITE.erl
@@ -0,0 +1,391 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(queue_length_limits_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("kernel/include/file.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+-define(TIMEOUT_LIST_OPS_PASS, 5000).
+-define(TIMEOUT, 30000).
+-define(TIMEOUT_CHANNEL_EXCEPTION, 5000).
+
+-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
+
+all() ->
+ [
+ {group, parallel_tests}
+ ].
+
+groups() ->
+ MaxLengthTests = [max_length_default,
+ max_length_bytes_default,
+ max_length_drop_head,
+ max_length_bytes_drop_head,
+ max_length_reject_confirm,
+ max_length_bytes_reject_confirm,
+ max_length_drop_publish,
+ max_length_drop_publish_requeue,
+ max_length_bytes_drop_publish],
+ [
+ {parallel_tests, [parallel], [
+ {max_length_classic, [], MaxLengthTests},
+ {max_length_quorum, [], [max_length_default,
+ max_length_bytes_default]
+ },
+ {max_length_mirrored, [], MaxLengthTests}
+ ]}
+ ].
+
+suite() ->
+ [
+ {timetrap, {minutes, 3}}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(max_length_classic, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, false}]);
+init_per_group(max_length_quorum, Config) ->
+ case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of
+ ok ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ {queue_durable, true}]);
+ Skip ->
+ Skip
+ end;
+init_per_group(max_length_mirrored, Config) ->
+ rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
+ <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config, [{is_mirrored, true},
+ {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, false}]),
+ rabbit_ct_helpers:run_steps(Config1, []);
+init_per_group(Group, Config) ->
+ case lists:member({group, Group}, all()) of
+ true ->
+ ClusterSize = 2,
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, ClusterSize}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
+ false ->
+ rabbit_ct_helpers:run_steps(Config, [])
+ end.
+
+end_per_group(max_length_mirrored, Config) ->
+ rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"^max_length.*queue">>),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{is_mirrored, false}]),
+ Config1;
+end_per_group(queue_max_length, Config) ->
+ Config;
+end_per_group(Group, Config) ->
+ case lists:member({group, Group}, all()) of
+ true ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps());
+ false ->
+ Config
+ end.
+
+init_per_testcase(Testcase, Config) ->
+ Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
+ Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q}]),
+ rabbit_ct_helpers:testcase_started(Config1, Testcase).
+
+end_per_testcase(Testcase, Config)
+ when Testcase == max_length_drop_publish; Testcase == max_length_bytes_drop_publish;
+ Testcase == max_length_drop_publish_requeue;
+ Testcase == max_length_reject_confirm; Testcase == max_length_bytes_reject_confirm;
+ Testcase == max_length_drop_head; Testcase == max_length_bytes_drop_head;
+ Testcase == max_length_default; Testcase == max_length_bytes_default ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}),
+ rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase);
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+
+%% -------------------------------------------------------------------
+%% Test cases
+%% -------------------------------------------------------------------
+
+max_length_bytes_drop_head(Config) ->
+ max_length_bytes_drop_head(Config, [{<<"x-overflow">>, longstr, <<"drop-head">>}]).
+
+max_length_bytes_default(Config) ->
+ max_length_bytes_drop_head(Config, []).
+
+max_length_bytes_drop_head(Config, ExtraArgs) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+
+ MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthBytesArgs ++ Args ++ ExtraArgs, durable = Durable}),
+
+ %% 80 bytes payload
+ Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
+ Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>,
+ Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>,
+ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3).
+
+max_length_drop_head(Config) ->
+ max_length_drop_head(Config, [{<<"x-overflow">>, longstr, <<"drop-head">>}]).
+
+max_length_default(Config) ->
+ %% Defaults to drop_head
+ max_length_drop_head(Config, []).
+
+max_length_drop_head(Config, ExtraArgs) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+
+ MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ Args ++ ExtraArgs, durable = Durable}),
+
+ check_max_length_drops_head(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>).
+
+max_length_reject_confirm(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ QName = ?config(queue_name, Config),
+ Durable = ?config(queue_durable, Config),
+ MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
+ check_max_length_rejects(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>).
+
+max_length_bytes_reject_confirm(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ QNameBytes = ?config(queue_name, Config),
+ Durable = ?config(queue_durable, Config),
+ MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs ++ Args, durable = Durable}),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+
+ %% 80 bytes payload
+ Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
+ Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>,
+ Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>,
+
+ check_max_length_drops_publish(Config, QNameBytes, Ch, Payload1, Payload2, Payload3),
+ check_max_length_rejects(Config, QNameBytes, Ch, Payload1, Payload2, Payload3).
+
+max_length_drop_publish(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+ MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}),
+ %% If confirms are not enable, publishes will still be dropped in reject-publish mode.
+ check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>).
+
+max_length_drop_publish_requeue(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+ MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}),
+ %% If confirms are not enable, publishes will still be dropped in reject-publish mode.
+ check_max_length_requeue(Config, QName, Ch, <<"1">>, <<"2">>).
+
+max_length_bytes_drop_publish(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QNameBytes = ?config(queue_name, Config),
+ MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs ++ Args, durable = Durable}),
+
+ %% 80 bytes payload
+ Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
+ Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>,
+ Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>,
+
+ check_max_length_drops_publish(Config, QNameBytes, Ch, Payload1, Payload2, Payload3).
+
+%% -------------------------------------------------------------------
+%% Implementation
+%% -------------------------------------------------------------------
+
+check_max_length_requeue(Config, QName, Ch, Payload1, Payload2) ->
+ sync_mirrors(QName, Config),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ %% A single message is published and consumed
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ {#'basic.get_ok'{delivery_tag = DeliveryTag},
+ #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+
+ %% Another message is published
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
+
+check_max_length_drops_publish(Config, QName, Ch, Payload1, Payload2, Payload3) ->
+ sync_mirrors(QName, Config),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ %% A single message is published and consumed
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+
+ %% Message 2 is dropped, message 1 stays
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+
+ %% Messages 2 and 3 are dropped, message 1 stays
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
+
+check_max_length_rejects(Config, QName, Ch, Payload1, Payload2, Payload3) ->
+ sync_mirrors(QName, Config),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ flush(),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ %% First message can be enqueued and acks
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ receive #'basic.ack'{} -> ok
+ after 1000 -> error(expected_ack)
+ end,
+
+ %% The message cannot be enqueued and nacks
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ receive #'basic.nack'{} -> ok
+ after 1000 -> error(expected_nack)
+ end,
+
+ %% The message cannot be enqueued and nacks
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
+ receive #'basic.nack'{} -> ok
+ after 1000 -> error(expected_nack)
+ end,
+
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+
+ %% Now we can publish message 2.
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ receive #'basic.ack'{} -> ok
+ after 1000 -> error(expected_ack)
+ end,
+
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
+
+check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) ->
+ sync_mirrors(QName, Config),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ %% A single message is published and consumed
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+
+ %% Message 1 is replaced by message 2
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+
+ %% Messages 1 and 2 are replaced
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload3}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
+
+sync_mirrors(QName, Config) ->
+ case rabbit_ct_helpers:get_config(Config, is_mirrored) of
+ true ->
+ rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, [<<"sync_queue">>, QName]);
+ _ -> ok
+ end.
+
+flush() ->
+ receive _ -> flush()
+ after 10 -> ok
+ end.
diff --git a/test/unit_access_control_authn_authz_context_propagation_SUITE.erl b/test/unit_access_control_authn_authz_context_propagation_SUITE.erl
index e26a10eb2b..b7a1353292 100644
--- a/test/unit_access_control_authn_authz_context_propagation_SUITE.erl
+++ b/test/unit_access_control_authn_authz_context_propagation_SUITE.erl
@@ -75,7 +75,7 @@ end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
%% -------------------------------------------------------------------
-%% Testcases.
+%% Test cases
%% -------------------------------------------------------------------
propagate_context_to_auth_backend(Config) ->
diff --git a/test/unit_amqp091_server_properties_SUITE.erl b/test/unit_amqp091_server_properties_SUITE.erl
new file mode 100644
index 0000000000..f672e219dd
--- /dev/null
+++ b/test/unit_amqp091_server_properties_SUITE.erl
@@ -0,0 +1,153 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(unit_amqp091_server_properties_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("kernel/include/file.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+-define(TIMEOUT_LIST_OPS_PASS, 5000).
+-define(TIMEOUT, 30000).
+-define(TIMEOUT_CHANNEL_EXCEPTION, 5000).
+
+-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
+
+all() ->
+ [
+ {group, parallel_tests}
+ ].
+
+groups() ->
+ [
+ {parallel_tests, [parallel], [
+ configurable_server_properties
+ ]}
+ ].
+
+suite() ->
+ [
+ {timetrap, {minutes, 3}}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(Group, Config) ->
+ case lists:member({group, Group}, all()) of
+ true ->
+ ClusterSize = 2,
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, ClusterSize}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
+ false ->
+ rabbit_ct_helpers:run_steps(Config, [])
+ end.
+
+end_per_group(Group, Config) ->
+ case lists:member({group, Group}, all()) of
+ true ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps());
+ false ->
+ Config
+ end.
+
+init_per_testcase(Testcase, Config) ->
+ Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
+ Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q}]),
+ rabbit_ct_helpers:testcase_started(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Test cases
+%% -------------------------------------------------------------------
+
+configurable_server_properties(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, configurable_server_properties1, [Config]).
+
+configurable_server_properties1(_Config) ->
+ %% List of the names of the built-in properties do we expect to find
+ BuiltInPropNames = [<<"product">>, <<"version">>, <<"platform">>,
+ <<"copyright">>, <<"information">>],
+
+ Protocol = rabbit_framing_amqp_0_9_1,
+
+ %% Verify that the built-in properties are initially present
+ ActualPropNames = [Key || {Key, longstr, _} <-
+ rabbit_reader:server_properties(Protocol)],
+ true = lists:all(fun (X) -> lists:member(X, ActualPropNames) end,
+ BuiltInPropNames),
+
+ %% Get the initial server properties configured in the environment
+ {ok, ServerProperties} = application:get_env(rabbit, server_properties),
+
+ %% Helper functions
+ ConsProp = fun (X) -> application:set_env(rabbit,
+ server_properties,
+ [X | ServerProperties]) end,
+ IsPropPresent =
+ fun (X) ->
+ lists:member(X, rabbit_reader:server_properties(Protocol))
+ end,
+
+ %% Add a wholly new property of the simplified {KeyAtom, StringValue} form
+ NewSimplifiedProperty = {NewHareKey, NewHareVal} = {hare, "soup"},
+ ConsProp(NewSimplifiedProperty),
+ %% Do we find hare soup, appropriately formatted in the generated properties?
+ ExpectedHareImage = {list_to_binary(atom_to_list(NewHareKey)),
+ longstr,
+ list_to_binary(NewHareVal)},
+ true = IsPropPresent(ExpectedHareImage),
+
+ %% Add a wholly new property of the {BinaryKey, Type, Value} form
+ %% and check for it
+ NewProperty = {<<"new-bin-key">>, signedint, -1},
+ ConsProp(NewProperty),
+ %% Do we find the new property?
+ true = IsPropPresent(NewProperty),
+
+ %% Add a property that clobbers a built-in, and verify correct clobbering
+ {NewVerKey, NewVerVal} = NewVersion = {version, "X.Y.Z."},
+ {BinNewVerKey, BinNewVerVal} = {list_to_binary(atom_to_list(NewVerKey)),
+ list_to_binary(NewVerVal)},
+ ConsProp(NewVersion),
+ ClobberedServerProps = rabbit_reader:server_properties(Protocol),
+ %% Is the clobbering insert present?
+ true = IsPropPresent({BinNewVerKey, longstr, BinNewVerVal}),
+ %% Is the clobbering insert the only thing with the clobbering key?
+ [{BinNewVerKey, longstr, BinNewVerVal}] =
+ [E || {K, longstr, _V} = E <- ClobberedServerProps, K =:= BinNewVerKey],
+
+ application:set_env(rabbit, server_properties, ServerProperties),
+ passed.
diff --git a/test/unit_gen_server2_SUITE.erl b/test/unit_gen_server2_SUITE.erl
index 0135193ff1..3eef0a35a2 100644
--- a/test/unit_gen_server2_SUITE.erl
+++ b/test/unit_gen_server2_SUITE.erl
@@ -29,7 +29,8 @@ all() ->
groups() ->
[
{sequential_tests, [], [
- gen_server2_with_state
+ gen_server2_with_state,
+ mcall
]}
].
@@ -77,3 +78,84 @@ gen_server2_with_state1(_Config) ->
fhc_state = gen_server2:with_state(file_handle_cache,
fun (S) -> element(1, S) end),
passed.
+
+
+mcall(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, mcall1, [Config]).
+
+mcall1(_Config) ->
+ P1 = spawn(fun gs2_test_listener/0),
+ register(foo, P1),
+ global:register_name(gfoo, P1),
+
+ P2 = spawn(fun() -> exit(bang) end),
+ %% ensure P2 is dead (ignore the race setting up the monitor)
+ await_exit(P2),
+
+ P3 = spawn(fun gs2_test_crasher/0),
+
+ %% since P2 crashes almost immediately and P3 after receiving its first
+ %% message, we have to spawn a few more processes to handle the additional
+ %% cases we're interested in here
+ register(baz, spawn(fun gs2_test_crasher/0)),
+ register(bog, spawn(fun gs2_test_crasher/0)),
+ global:register_name(gbaz, spawn(fun gs2_test_crasher/0)),
+
+ NoNode = rabbit_nodes:make("nonode"),
+
+ Targets =
+ %% pids
+ [P1, P2, P3]
+ ++
+ %% registered names
+ [foo, bar, baz]
+ ++
+ %% {Name, Node} pairs
+ [{foo, node()}, {bar, node()}, {bog, node()}, {foo, NoNode}]
+ ++
+ %% {global, Name}
+ [{global, gfoo}, {global, gbar}, {global, gbaz}],
+
+ GoodResults = [{D, goodbye} || D <- [P1, foo,
+ {foo, node()},
+ {global, gfoo}]],
+
+ BadResults = [{P2, noproc}, % died before use
+ {P3, boom}, % died on first use
+ {bar, noproc}, % never registered
+ {baz, boom}, % died on first use
+ {{bar, node()}, noproc}, % never registered
+ {{bog, node()}, boom}, % died on first use
+ {{foo, NoNode}, nodedown}, % invalid node
+ {{global, gbar}, noproc}, % never registered globally
+ {{global, gbaz}, boom}], % died on first use
+
+ {Replies, Errors} = gen_server2:mcall([{T, hello} || T <- Targets]),
+ true = lists:sort(Replies) == lists:sort(GoodResults),
+ true = lists:sort(Errors) == lists:sort(BadResults),
+
+ %% cleanup (ignore the race setting up the monitor)
+ P1 ! stop,
+ await_exit(P1),
+ passed.
+
+await_exit(Pid) ->
+ MRef = erlang:monitor(process, Pid),
+ receive
+ {'DOWN', MRef, _, _, _} -> ok
+ end.
+
+gs2_test_crasher() ->
+ receive
+ {'$gen_call', _From, hello} -> exit(boom)
+ end.
+
+gs2_test_listener() ->
+ receive
+ {'$gen_call', From, hello} ->
+ gen_server2:reply(From, goodbye),
+ gs2_test_listener();
+ stop ->
+ ok
+ end.
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
deleted file mode 100644
index 3933c37748..0000000000
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ /dev/null
@@ -1,923 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% https://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2011-2020 VMware, Inc. or its affiliates. All rights reserved.
-%%
-
--module(unit_inbroker_parallel_SUITE).
-
--include_lib("common_test/include/ct.hrl").
--include_lib("kernel/include/file.hrl").
--include_lib("amqp_client/include/amqp_client.hrl").
--include_lib("eunit/include/eunit.hrl").
-
--compile(export_all).
-
--define(TIMEOUT_LIST_OPS_PASS, 5000).
--define(TIMEOUT, 30000).
--define(TIMEOUT_CHANNEL_EXCEPTION, 5000).
-
--define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
-
-all() ->
- [
- {group, parallel_tests}
- ].
-
-groups() ->
- MaxLengthTests = [max_length_default,
- max_length_bytes_default,
- max_length_drop_head,
- max_length_bytes_drop_head,
- max_length_reject_confirm,
- max_length_bytes_reject_confirm,
- max_length_drop_publish,
- max_length_drop_publish_requeue,
- max_length_bytes_drop_publish],
- [
- {parallel_tests, [parallel], [
- configurable_server_properties,
- dynamic_mirroring,
- mcall,
- max_message_size,
-
- {queue_max_length, [], [
- {max_length_classic, [], MaxLengthTests},
- {max_length_quorum, [], [max_length_default,
- max_length_bytes_default]
- },
- {max_length_mirrored, [], MaxLengthTests}
- ]}
- ]}
- ].
-
-suite() ->
- [
- {timetrap, {minutes, 3}}
- ].
-
-%% -------------------------------------------------------------------
-%% Testsuite setup/teardown.
-%% -------------------------------------------------------------------
-
-init_per_suite(Config) ->
- rabbit_ct_helpers:log_environment(),
- rabbit_ct_helpers:run_setup_steps(Config).
-
-end_per_suite(Config) ->
- rabbit_ct_helpers:run_teardown_steps(Config).
-
-init_per_group(max_length_classic, Config) ->
- rabbit_ct_helpers:set_config(
- Config,
- [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
- {queue_durable, false}]);
-init_per_group(max_length_quorum, Config) ->
- case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of
- ok ->
- rabbit_ct_helpers:set_config(
- Config,
- [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
- {queue_durable, true}]);
- Skip ->
- Skip
- end;
-init_per_group(max_length_mirrored, Config) ->
- rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
- <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
- Config1 = rabbit_ct_helpers:set_config(
- Config, [{is_mirrored, true},
- {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
- {queue_durable, false}]),
- rabbit_ct_helpers:run_steps(Config1, []);
-init_per_group(Group, Config) ->
- case lists:member({group, Group}, all()) of
- true ->
- ClusterSize = 2,
- Config1 = rabbit_ct_helpers:set_config(Config, [
- {rmq_nodename_suffix, Group},
- {rmq_nodes_count, ClusterSize}
- ]),
- rabbit_ct_helpers:run_steps(Config1,
- rabbit_ct_broker_helpers:setup_steps() ++
- rabbit_ct_client_helpers:setup_steps() ++ [
- fun setup_file_handle_cache/1
- ]);
- false ->
- rabbit_ct_helpers:run_steps(Config, [])
- end.
-
-setup_file_handle_cache(Config) ->
- ok = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, setup_file_handle_cache1, []),
- Config.
-
-setup_file_handle_cache1() ->
- %% FIXME: Why are we doing this?
- application:set_env(rabbit, file_handles_high_watermark, 10),
- ok = file_handle_cache:set_limit(10),
- ok.
-
-end_per_group(max_length_mirrored, Config) ->
- rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"^max_length.*queue">>),
- Config1 = rabbit_ct_helpers:set_config(Config, [{is_mirrored, false}]),
- Config1;
-end_per_group(queue_max_length, Config) ->
- Config;
-end_per_group(Group, Config) ->
- case lists:member({group, Group}, all()) of
- true ->
- rabbit_ct_helpers:run_steps(Config,
- rabbit_ct_client_helpers:teardown_steps() ++
- rabbit_ct_broker_helpers:teardown_steps());
- false ->
- Config
- end.
-
-init_per_testcase(Testcase, Config) ->
- Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
- Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
- Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q}]),
- rabbit_ct_helpers:testcase_started(Config1, Testcase).
-
-end_per_testcase(Testcase, Config)
- when Testcase == max_length_drop_publish; Testcase == max_length_bytes_drop_publish;
- Testcase == max_length_drop_publish_requeue;
- Testcase == max_length_reject_confirm; Testcase == max_length_bytes_reject_confirm;
- Testcase == max_length_drop_head; Testcase == max_length_bytes_drop_head;
- Testcase == max_length_default; Testcase == max_length_bytes_default ->
- {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}),
- rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
- rabbit_ct_helpers:testcase_finished(Config, Testcase);
-
-end_per_testcase(Testcase, Config) ->
- rabbit_ct_helpers:testcase_finished(Config, Testcase).
-
-msg_id_bin(X) ->
- erlang:md5(term_to_binary(X)).
-
-on_disk_capture() ->
- receive
- {await, MsgIds, Pid} -> on_disk_capture([], MsgIds, Pid);
- stop -> done
- end.
-
-on_disk_capture([_|_], _Awaiting, Pid) ->
- Pid ! {self(), surplus};
-on_disk_capture(OnDisk, Awaiting, Pid) ->
- receive
- {on_disk, MsgIdsS} ->
- MsgIds = gb_sets:to_list(MsgIdsS),
- on_disk_capture(OnDisk ++ (MsgIds -- Awaiting), Awaiting -- MsgIds,
- Pid);
- stop ->
- done
- after (case Awaiting of [] -> 200; _ -> ?TIMEOUT end) ->
- case Awaiting of
- [] -> Pid ! {self(), arrived}, on_disk_capture();
- _ -> Pid ! {self(), timeout}
- end
- end.
-
-on_disk_await(Pid, MsgIds) when is_list(MsgIds) ->
- Pid ! {await, MsgIds, self()},
- receive
- {Pid, arrived} -> ok;
- {Pid, Error} -> Error
- end.
-
-on_disk_stop(Pid) ->
- MRef = erlang:monitor(process, Pid),
- Pid ! stop,
- receive {'DOWN', MRef, process, Pid, _Reason} ->
- ok
- end.
-
-queue_name(Config, Name) ->
- Name1 = iolist_to_binary(rabbit_ct_helpers:config_to_testcase_name(Config, Name)),
- queue_name(Name1).
-
-queue_name(Name) ->
- rabbit_misc:r(<<"/">>, queue, Name).
-
-test_queue() ->
- queue_name(<<"test">>).
-
-verify_read_with_published(_Delivered, _Persistent, [], _) ->
- ok;
-verify_read_with_published(Delivered, Persistent,
- [{MsgId, SeqId, _Props, Persistent, Delivered}|Read],
- [{SeqId, MsgId}|Published]) ->
- verify_read_with_published(Delivered, Persistent, Read, Published);
-verify_read_with_published(_Delivered, _Persistent, _Read, _Published) ->
- ko.
-
-nop(_) -> ok.
-nop(_, _) -> ok.
-
-variable_queue_init(Q, Recover) ->
- rabbit_variable_queue:init(
- Q, case Recover of
- true -> non_clean_shutdown;
- false -> new
- end, fun nop/2, fun nop/2, fun nop/1, fun nop/1).
-
-test_amqqueue(Durable) ->
- rabbit_amqqueue:pseudo_queue(test_queue(), self(), Durable).
-
-assert_prop(List, Prop, Value) ->
- case proplists:get_value(Prop, List)of
- Value -> ok;
- _ -> {exit, Prop, exp, Value, List}
- end.
-
-assert_props(List, PropVals) ->
- [assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals].
-
-variable_queue_wait_for_shuffling_end(VQ) ->
- case credit_flow:blocked() of
- false -> VQ;
- true -> receive
- {bump_credit, Msg} ->
- credit_flow:handle_bump_msg(Msg),
- variable_queue_wait_for_shuffling_end(
- rabbit_variable_queue:resume(VQ))
- end
- end.
-
-msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) ->
- binary_to_term(list_to_binary(lists:reverse(P))).
-
-ack_subset(AckSeqs, Interval, Rem) ->
- lists:filter(fun ({_Ack, N}) -> (N + Rem) rem Interval == 0 end, AckSeqs).
-
-requeue_one_by_one(Acks, VQ) ->
- lists:foldl(fun (AckTag, VQN) ->
- {_MsgId, VQM} = rabbit_variable_queue:requeue(
- [AckTag], VQN),
- VQM
- end, VQ, Acks).
-
-%% -------------------------------------------------------------------
-%% dynamic_mirroring.
-%% -------------------------------------------------------------------
-
-dynamic_mirroring(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, dynamic_mirroring1, [Config]).
-
-dynamic_mirroring1(_Config) ->
- %% Just unit tests of the node selection logic, see multi node
- %% tests for the rest...
- Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params,
- {MNode, SNodes, SSNodes}, All) ->
- {ok, M} = rabbit_mirror_queue_misc:module(Policy),
- {NewM, NewSs0} = M:suggested_queue_nodes(
- Params, MNode, SNodes, SSNodes, All),
- NewSs1 = lists:sort(NewSs0),
- case dm_list_match(NewSs, NewSs1, ExtraSs) of
- ok -> ok;
- error -> exit({no_match, NewSs, NewSs1, ExtraSs})
- end
- end,
-
- Test({a,[b,c],0},<<"all">>,'_',{a,[], []}, [a,b,c]),
- Test({a,[b,c],0},<<"all">>,'_',{a,[b,c],[b,c]},[a,b,c]),
- Test({a,[b,c],0},<<"all">>,'_',{a,[d], [d]}, [a,b,c]),
-
- N = fun (Atoms) -> [list_to_binary(atom_to_list(A)) || A <- Atoms] end,
-
- %% Add a node
- Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[b],[b]},[a,b,c,d]),
- Test({b,[a,c],0},<<"nodes">>,N([a,b,c]),{b,[a],[a]},[a,b,c,d]),
- %% Add two nodes and drop one
- Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[d],[d]},[a,b,c,d]),
- %% Don't try to include nodes that are not running
- Test({a,[b], 0},<<"nodes">>,N([a,b,f]),{a,[b],[b]},[a,b,c,d]),
- %% If we can't find any of the nodes listed then just keep the master
- Test({a,[], 0},<<"nodes">>,N([f,g,h]),{a,[b],[b]},[a,b,c,d]),
- %% And once that's happened, still keep the master even when not listed,
- %% if nothing is synced
- Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[], []}, [a,b,c,d]),
- Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[b],[]}, [a,b,c,d]),
- %% But if something is synced we can lose the master - but make
- %% sure we pick the new master from the nodes which are synced!
- Test({b,[c], 0},<<"nodes">>,N([b,c]), {a,[b],[b]},[a,b,c,d]),
- Test({b,[c], 0},<<"nodes">>,N([c,b]), {a,[b],[b]},[a,b,c,d]),
-
- Test({a,[], 1},<<"exactly">>,2,{a,[], []}, [a,b,c,d]),
- Test({a,[], 2},<<"exactly">>,3,{a,[], []}, [a,b,c,d]),
- Test({a,[c], 0},<<"exactly">>,2,{a,[c], [c]}, [a,b,c,d]),
- Test({a,[c], 1},<<"exactly">>,3,{a,[c], [c]}, [a,b,c,d]),
- Test({a,[c], 0},<<"exactly">>,2,{a,[c,d],[c,d]},[a,b,c,d]),
- Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d],[c,d]},[a,b,c,d]),
-
- passed.
-
-%% Does the first list match the second where the second is required
-%% to have exactly Extra superfluous items?
-dm_list_match([], [], 0) -> ok;
-dm_list_match(_, [], _Extra) -> error;
-dm_list_match([H|T1], [H |T2], Extra) -> dm_list_match(T1, T2, Extra);
-dm_list_match(L1, [_H|T2], Extra) -> dm_list_match(L1, T2, Extra - 1).
-
-override_group_leader() ->
- %% Override group leader, otherwise SASL fake events are ignored by
- %% the error_logger local to RabbitMQ.
- {group_leader, Leader} = erlang:process_info(whereis(rabbit), group_leader),
- erlang:group_leader(Leader, self()).
-
-set_permissions(Path, Mode) ->
- case file:read_file_info(Path) of
- {ok, FInfo} -> file:write_file_info(
- Path,
- FInfo#file_info{mode=Mode});
- Error -> Error
- end.
-
-clean_logs(Files, Suffix) ->
- [begin
- ok = delete_file(File),
- ok = delete_file([File, Suffix])
- end || File <- Files],
- ok.
-
-assert_ram_node() ->
- case rabbit_mnesia:node_type() of
- disc -> exit('not_ram_node');
- ram -> ok
- end.
-
-assert_disc_node() ->
- case rabbit_mnesia:node_type() of
- disc -> ok;
- ram -> exit('not_disc_node')
- end.
-
-delete_file(File) ->
- case file:delete(File) of
- ok -> ok;
- {error, enoent} -> ok;
- Error -> Error
- end.
-
-make_files_non_writable(Files) ->
- [ok = file:write_file_info(File, #file_info{mode=8#444}) ||
- File <- Files],
- ok.
-
-add_log_handlers(Handlers) ->
- [ok = error_logger:add_report_handler(Handler, Args) ||
- {Handler, Args} <- Handlers],
- ok.
-
-%% sasl_report_file_h returns [] during terminate
-%% see: https://github.com/erlang/otp/blob/maint/lib/stdlib/src/error_logger_file_h.erl#L98
-%%
-%% error_logger_file_h returns ok since OTP 18.1
-%% see: https://github.com/erlang/otp/blob/maint/lib/stdlib/src/error_logger_file_h.erl#L98
-delete_log_handlers(Handlers) ->
- [ok_or_empty_list(error_logger:delete_report_handler(Handler))
- || Handler <- Handlers],
- ok.
-
-ok_or_empty_list([]) ->
- [];
-ok_or_empty_list(ok) ->
- ok.
-
-
-
-%% -------------------------------------------------------------------
-%% rabbitmqctl.
-%% -------------------------------------------------------------------
-
-rabbitmqctl_list_consumers(Config, Node) ->
- {ok, StdOut} = rabbit_ct_broker_helpers:rabbitmqctl(Config, Node,
- ["list_consumers"]),
- [<<"Listing consumers", _/binary>> | ConsumerRows] = re:split(StdOut, <<"\n">>, [trim]),
- CTags = [ lists:nth(3, re:split(Row, <<"\t">>)) || Row <- ConsumerRows ],
- CTags.
-
-test_ch_metrics(Fun, Timeout) when Timeout =< 0 ->
- Fun();
-test_ch_metrics(Fun, Timeout) ->
- try
- Fun()
- catch
- _:{badmatch, _} ->
- timer:sleep(1000),
- test_ch_metrics(Fun, Timeout - 1000)
- end.
-
-test_queue_statistics_receive_event(Q, Matcher) ->
- %% Q ! emit_stats,
- test_queue_statistics_receive_event1(Q, Matcher).
-
-test_queue_statistics_receive_event1(Q, Matcher) ->
- receive #event{type = queue_stats, props = Props} ->
- case Matcher(Props) of
- true -> Props;
- _ -> test_queue_statistics_receive_event1(Q, Matcher)
- end
- after ?TIMEOUT -> throw(failed_to_receive_event)
- end.
-
-test_spawn() ->
- {Writer, _Limiter, Ch} = rabbit_ct_broker_helpers:test_channel(),
- ok = rabbit_channel:do(Ch, #'channel.open'{}),
- receive #'channel.open_ok'{} -> ok
- after ?TIMEOUT -> throw(failed_to_receive_channel_open_ok)
- end,
- {Writer, Ch}.
-
-test_spawn(Node) ->
- rpc:call(Node, ?MODULE, test_spawn_remote, []).
-
-%% Spawn an arbitrary long lived process, so we don't end up linking
-%% the channel to the short-lived process (RPC, here) spun up by the
-%% RPC server.
-test_spawn_remote() ->
- RPC = self(),
- spawn(fun () ->
- {Writer, Ch} = test_spawn(),
- RPC ! {Writer, Ch},
- link(Ch),
- receive
- _ -> ok
- end
- end),
- receive Res -> Res
- after ?TIMEOUT -> throw(failed_to_receive_result)
- end.
-
-
-mcall(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, mcall1, [Config]).
-
-mcall1(_Config) ->
- P1 = spawn(fun gs2_test_listener/0),
- register(foo, P1),
- global:register_name(gfoo, P1),
-
- P2 = spawn(fun() -> exit(bang) end),
- %% ensure P2 is dead (ignore the race setting up the monitor)
- await_exit(P2),
-
- P3 = spawn(fun gs2_test_crasher/0),
-
- %% since P2 crashes almost immediately and P3 after receiving its first
- %% message, we have to spawn a few more processes to handle the additional
- %% cases we're interested in here
- register(baz, spawn(fun gs2_test_crasher/0)),
- register(bog, spawn(fun gs2_test_crasher/0)),
- global:register_name(gbaz, spawn(fun gs2_test_crasher/0)),
-
- NoNode = rabbit_nodes:make("nonode"),
-
- Targets =
- %% pids
- [P1, P2, P3]
- ++
- %% registered names
- [foo, bar, baz]
- ++
- %% {Name, Node} pairs
- [{foo, node()}, {bar, node()}, {bog, node()}, {foo, NoNode}]
- ++
- %% {global, Name}
- [{global, gfoo}, {global, gbar}, {global, gbaz}],
-
- GoodResults = [{D, goodbye} || D <- [P1, foo,
- {foo, node()},
- {global, gfoo}]],
-
- BadResults = [{P2, noproc}, % died before use
- {P3, boom}, % died on first use
- {bar, noproc}, % never registered
- {baz, boom}, % died on first use
- {{bar, node()}, noproc}, % never registered
- {{bog, node()}, boom}, % died on first use
- {{foo, NoNode}, nodedown}, % invalid node
- {{global, gbar}, noproc}, % never registered globally
- {{global, gbaz}, boom}], % died on first use
-
- {Replies, Errors} = gen_server2:mcall([{T, hello} || T <- Targets]),
- true = lists:sort(Replies) == lists:sort(GoodResults),
- true = lists:sort(Errors) == lists:sort(BadResults),
-
- %% cleanup (ignore the race setting up the monitor)
- P1 ! stop,
- await_exit(P1),
- passed.
-
-await_exit(Pid) ->
- MRef = erlang:monitor(process, Pid),
- receive
- {'DOWN', MRef, _, _, _} -> ok
- end.
-
-gs2_test_crasher() ->
- receive
- {'$gen_call', _From, hello} -> exit(boom)
- end.
-
-gs2_test_listener() ->
- receive
- {'$gen_call', From, hello} ->
- gen_server2:reply(From, goodbye),
- gs2_test_listener();
- stop ->
- ok
- end.
-
-configurable_server_properties(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, configurable_server_properties1, [Config]).
-
-configurable_server_properties1(_Config) ->
- %% List of the names of the built-in properties do we expect to find
- BuiltInPropNames = [<<"product">>, <<"version">>, <<"platform">>,
- <<"copyright">>, <<"information">>],
-
- Protocol = rabbit_framing_amqp_0_9_1,
-
- %% Verify that the built-in properties are initially present
- ActualPropNames = [Key || {Key, longstr, _} <-
- rabbit_reader:server_properties(Protocol)],
- true = lists:all(fun (X) -> lists:member(X, ActualPropNames) end,
- BuiltInPropNames),
-
- %% Get the initial server properties configured in the environment
- {ok, ServerProperties} = application:get_env(rabbit, server_properties),
-
- %% Helper functions
- ConsProp = fun (X) -> application:set_env(rabbit,
- server_properties,
- [X | ServerProperties]) end,
- IsPropPresent =
- fun (X) ->
- lists:member(X, rabbit_reader:server_properties(Protocol))
- end,
-
- %% Add a wholly new property of the simplified {KeyAtom, StringValue} form
- NewSimplifiedProperty = {NewHareKey, NewHareVal} = {hare, "soup"},
- ConsProp(NewSimplifiedProperty),
- %% Do we find hare soup, appropriately formatted in the generated properties?
- ExpectedHareImage = {list_to_binary(atom_to_list(NewHareKey)),
- longstr,
- list_to_binary(NewHareVal)},
- true = IsPropPresent(ExpectedHareImage),
-
- %% Add a wholly new property of the {BinaryKey, Type, Value} form
- %% and check for it
- NewProperty = {<<"new-bin-key">>, signedint, -1},
- ConsProp(NewProperty),
- %% Do we find the new property?
- true = IsPropPresent(NewProperty),
-
- %% Add a property that clobbers a built-in, and verify correct clobbering
- {NewVerKey, NewVerVal} = NewVersion = {version, "X.Y.Z."},
- {BinNewVerKey, BinNewVerVal} = {list_to_binary(atom_to_list(NewVerKey)),
- list_to_binary(NewVerVal)},
- ConsProp(NewVersion),
- ClobberedServerProps = rabbit_reader:server_properties(Protocol),
- %% Is the clobbering insert present?
- true = IsPropPresent({BinNewVerKey, longstr, BinNewVerVal}),
- %% Is the clobbering insert the only thing with the clobbering key?
- [{BinNewVerKey, longstr, BinNewVerVal}] =
- [E || {K, longstr, _V} = E <- ClobberedServerProps, K =:= BinNewVerKey],
-
- application:set_env(rabbit, server_properties, ServerProperties),
- passed.
-
-max_length_bytes_drop_head(Config) ->
- max_length_bytes_drop_head(Config, [{<<"x-overflow">>, longstr, <<"drop-head">>}]).
-
-max_length_bytes_default(Config) ->
- max_length_bytes_drop_head(Config, []).
-
-max_length_bytes_drop_head(Config, ExtraArgs) ->
- {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- Args = ?config(queue_args, Config),
- Durable = ?config(queue_durable, Config),
- QName = ?config(queue_name, Config),
-
- MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthBytesArgs ++ Args ++ ExtraArgs, durable = Durable}),
-
- %% 80 bytes payload
- Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
- Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>,
- Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>,
- check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3).
-
-max_length_drop_head(Config) ->
- max_length_drop_head(Config, [{<<"x-overflow">>, longstr, <<"drop-head">>}]).
-
-max_length_default(Config) ->
- %% Defaults to drop_head
- max_length_drop_head(Config, []).
-
-max_length_drop_head(Config, ExtraArgs) ->
- {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- Args = ?config(queue_args, Config),
- Durable = ?config(queue_durable, Config),
- QName = ?config(queue_name, Config),
-
- MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ Args ++ ExtraArgs, durable = Durable}),
-
- check_max_length_drops_head(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>).
-
-max_length_reject_confirm(Config) ->
- {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- Args = ?config(queue_args, Config),
- QName = ?config(queue_name, Config),
- Durable = ?config(queue_durable, Config),
- MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
- OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}),
- #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
- check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
- check_max_length_rejects(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>).
-
-max_length_bytes_reject_confirm(Config) ->
- {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- Args = ?config(queue_args, Config),
- QNameBytes = ?config(queue_name, Config),
- Durable = ?config(queue_durable, Config),
- MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
- OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs ++ Args, durable = Durable}),
- #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
-
- %% 80 bytes payload
- Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
- Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>,
- Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>,
-
- check_max_length_drops_publish(Config, QNameBytes, Ch, Payload1, Payload2, Payload3),
- check_max_length_rejects(Config, QNameBytes, Ch, Payload1, Payload2, Payload3).
-
-max_length_drop_publish(Config) ->
- {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- Args = ?config(queue_args, Config),
- Durable = ?config(queue_durable, Config),
- QName = ?config(queue_name, Config),
- MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
- OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}),
- %% If confirms are not enable, publishes will still be dropped in reject-publish mode.
- check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>).
-
-max_length_drop_publish_requeue(Config) ->
- {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- Args = ?config(queue_args, Config),
- Durable = ?config(queue_durable, Config),
- QName = ?config(queue_name, Config),
- MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
- OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs ++ Args, durable = Durable}),
- %% If confirms are not enable, publishes will still be dropped in reject-publish mode.
- check_max_length_requeue(Config, QName, Ch, <<"1">>, <<"2">>).
-
-check_max_length_requeue(Config, QName, Ch, Payload1, Payload2) ->
- sync_mirrors(QName, Config),
-
- #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
- amqp_channel:register_confirm_handler(Ch, self()),
-
- #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
- %% A single message is published and consumed
- amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
- amqp_channel:wait_for_confirms(Ch, 5000),
-
- {#'basic.get_ok'{delivery_tag = DeliveryTag},
- #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
- #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
-
- %% Another message is published
- amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
- amqp_channel:wait_for_confirms(Ch, 5000),
-
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = false,
- requeue = true}),
- {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
- {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
- #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
-
-max_length_bytes_drop_publish(Config) ->
- {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- Args = ?config(queue_args, Config),
- Durable = ?config(queue_durable, Config),
- QNameBytes = ?config(queue_name, Config),
- MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
- OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}],
- #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs ++ Args, durable = Durable}),
-
- %% 80 bytes payload
- Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
- Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>,
- Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>,
-
- check_max_length_drops_publish(Config, QNameBytes, Ch, Payload1, Payload2, Payload3).
-
-check_max_length_drops_publish(Config, QName, Ch, Payload1, Payload2, Payload3) ->
- sync_mirrors(QName, Config),
-
- #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
- amqp_channel:register_confirm_handler(Ch, self()),
-
- #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
- %% A single message is published and consumed
- amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
- amqp_channel:wait_for_confirms(Ch, 5000),
-
- {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
- #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
-
- %% Message 2 is dropped, message 1 stays
- amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
- amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
- amqp_channel:wait_for_confirms(Ch, 5000),
- {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
- #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
-
- %% Messages 2 and 3 are dropped, message 1 stays
- amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
- amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
- amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
- amqp_channel:wait_for_confirms(Ch, 5000),
- {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
- #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
-
-check_max_length_rejects(Config, QName, Ch, Payload1, Payload2, Payload3) ->
- sync_mirrors(QName, Config),
- amqp_channel:register_confirm_handler(Ch, self()),
- flush(),
- #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
- %% First message can be enqueued and acks
- amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
- receive #'basic.ack'{} -> ok
- after 1000 -> error(expected_ack)
- end,
-
- %% The message cannot be enqueued and nacks
- amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
- receive #'basic.nack'{} -> ok
- after 1000 -> error(expected_nack)
- end,
-
- %% The message cannot be enqueued and nacks
- amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
- receive #'basic.nack'{} -> ok
- after 1000 -> error(expected_nack)
- end,
-
- {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
-
- %% Now we can publish message 2.
- amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
- receive #'basic.ack'{} -> ok
- after 1000 -> error(expected_ack)
- end,
-
- {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
-
-check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) ->
- sync_mirrors(QName, Config),
-
- #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
- amqp_channel:register_confirm_handler(Ch, self()),
-
- #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
- %% A single message is published and consumed
- amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
- amqp_channel:wait_for_confirms(Ch, 5000),
-
- {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
- #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
-
- %% Message 1 is replaced by message 2
- amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
- amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
- amqp_channel:wait_for_confirms(Ch, 5000),
-
- {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
- #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
-
- %% Messages 1 and 2 are replaced
- amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
- amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
- amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
- amqp_channel:wait_for_confirms(Ch, 5000),
- {#'basic.get_ok'{}, #amqp_msg{payload = Payload3}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
- #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
-
-sync_mirrors(QName, Config) ->
- case rabbit_ct_helpers:get_config(Config, is_mirrored) of
- true ->
- rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, [<<"sync_queue">>, QName]);
- _ -> ok
- end.
-
-gen_binary_mb(N) ->
- B1M = << <<"_">> || _ <- lists:seq(1, 1024 * 1024) >>,
- << B1M || _ <- lists:seq(1, N) >>.
-
-assert_channel_alive(Ch) ->
- amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>},
- #amqp_msg{payload = <<"HI">>}).
-
-assert_channel_fail_max_size(Ch, Monitor) ->
- receive
- {'DOWN', Monitor, process, Ch,
- {shutdown,
- {server_initiated_close, 406, _Error}}} ->
- ok
- after ?TIMEOUT_CHANNEL_EXCEPTION ->
- error({channel_exception_expected, max_message_size})
- end.
-
-max_message_size(Config) ->
- Binary2M = gen_binary_mb(2),
- Binary4M = gen_binary_mb(4),
- Binary6M = gen_binary_mb(6),
- Binary10M = gen_binary_mb(10),
-
- Size2Mb = 1024 * 1024 * 2,
- Size2Mb = byte_size(Binary2M),
-
- rabbit_ct_broker_helpers:rpc(Config, 0,
- application, set_env, [rabbit, max_message_size, 1024 * 1024 * 3]),
-
- {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
-
- %% Binary is within the max size limit
- amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary2M}),
- %% The channel process is alive
- assert_channel_alive(Ch),
-
- Monitor = monitor(process, Ch),
- amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary4M}),
- assert_channel_fail_max_size(Ch, Monitor),
-
- %% increase the limit
- rabbit_ct_broker_helpers:rpc(Config, 0,
- application, set_env, [rabbit, max_message_size, 1024 * 1024 * 8]),
-
- {_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
-
- amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary2M}),
- assert_channel_alive(Ch1),
-
- amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary4M}),
- assert_channel_alive(Ch1),
-
- amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary6M}),
- assert_channel_alive(Ch1),
-
- Monitor1 = monitor(process, Ch1),
- amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary10M}),
- assert_channel_fail_max_size(Ch1, Monitor1),
-
- %% increase beyond the hard limit
- rabbit_ct_broker_helpers:rpc(Config, 0,
- application, set_env, [rabbit, max_message_size, 1024 * 1024 * 600]),
- Val = rabbit_ct_broker_helpers:rpc(Config, 0,
- rabbit_channel, get_max_message_size, []),
-
- ?assertEqual(?MAX_MSG_SIZE, Val).
-
-%% ---------------------------------------------------------------------------
-%% rabbitmqctl helpers.
-%% ---------------------------------------------------------------------------
-
-default_options() -> [{"-p", "/"}, {"-q", "false"}].
-
-expand_options(As, Bs) ->
- lists:foldl(fun({K, _}=A, R) ->
- case proplists:is_defined(K, R) of
- true -> R;
- false -> [A | R]
- end
- end, Bs, As).
-
-flush() ->
- receive _ -> flush()
- after 10 -> ok
- end.