diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-22 10:23:02 +0200 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-22 10:23:02 +0200 |
| commit | 440f05ab5e3d488d21ceb52b7eb15e5ce5bf033a (patch) | |
| tree | 340634edc6bef1d1e42519855924cb656b19ec40 | |
| parent | dc54e7f943497f3a385e42bbfc8c88b56ef2cca8 (diff) | |
| parent | 3147ee9a71ee37a63551fa93f91e279b650c834b (diff) | |
| download | rabbitmq-server-git-440f05ab5e3d488d21ceb52b7eb15e5ce5bf033a.tar.gz | |
Merge branch 'master' into queue-type-info-keysqueue-type-info-keys
| -rw-r--r-- | docs/rabbitmqctl.8 | 12 | ||||
| -rw-r--r-- | rabbitmq-components.mk | 4 | ||||
| -rwxr-xr-x | scripts/rabbitmq-env | 1 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server | 18 | ||||
| -rw-r--r-- | scripts/rabbitmq-server.bat | 5 | ||||
| -rw-r--r-- | scripts/rabbitmq-service.bat | 5 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_classic_queue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_maintenance.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_policies.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_queue_type.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_stream_coordinator.erl | 45 | ||||
| -rw-r--r-- | src/rabbit_stream_queue.erl | 14 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 42 | ||||
| -rw-r--r-- | test/rabbit_stream_queue_SUITE.erl | 229 |
16 files changed, 291 insertions, 153 deletions
diff --git a/docs/rabbitmqctl.8 b/docs/rabbitmqctl.8 index 2c60ba10d3..3e041ad2c8 100644 --- a/docs/rabbitmqctl.8 +++ b/docs/rabbitmqctl.8 @@ -1429,18 +1429,6 @@ is located on the current node. .Sp .Dl rabbitmqctl list_unresponsive_queues --local name .\" ------------------------------------------------------------------ -.It Cm node_health_check -.Pp -DEPRECATED. Performs intrusive, opinionated health checks on a fully booted node. -To learn more, see the -.Lk https://www.rabbitmq.com/monitoring.html#health-checks "Health Checks documentation" -.Pp -Verifies the RabbitMQ application is running and alarms are not set, -then checks that every queue and channel on the node can emit basic stats. -.sp -Example: -.Dl rabbitmqctl node_health_check -n rabbit@hostname -.\" ------------------------------------------------------------------ .It Cm ping .Pp Checks that the node OS process is up, registered with EPMD and CLI tools can authenticate with it diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 091a35b7c0..f2b677da62 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -111,8 +111,8 @@ dep_rabbitmq_public_umbrella = git_rmq rabbitmq-public-umbrella $(curre # possible to work with rabbitmq-public-umbrella. dep_accept = hex 0.3.5 -dep_cowboy = hex 2.6.1 -dep_cowlib = hex 2.7.0 +dep_cowboy = hex 2.8.0 +dep_cowlib = hex 2.9.1 dep_jsx = hex 2.11.0 dep_lager = hex 3.8.0 dep_prometheus = git https://github.com/deadtrickster/prometheus.erl.git master diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env index afae3651aa..90702c43bb 100755 --- a/scripts/rabbitmq-env +++ b/scripts/rabbitmq-env @@ -130,7 +130,6 @@ SERVER_ERL_ARGS=" +P $RABBITMQ_MAX_NUMBER_OF_PROCESSES +t $RABBITMQ_MAX_NUMBER_O [ "x" = "x$RABBITMQ_CTL_DIST_PORT_MIN" ] && RABBITMQ_CTL_DIST_PORT_MIN='35672' [ "x" = "x$RABBITMQ_CTL_DIST_PORT_MAX" ] && RABBITMQ_CTL_DIST_PORT_MAX="$(($RABBITMQ_CTL_DIST_PORT_MIN + 10))" -[ "x" = "x$RABBITMQ_IO_THREAD_POOL_SIZE" ] && RABBITMQ_IO_THREAD_POOL_SIZE=${IO_THREAD_POOL_SIZE} [ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS} [ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS} [ "x" = "x$RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS" ] && RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=${SERVER_ADDITIONAL_ERL_ARGS} diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index b464c5fa09..82058dcb26 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -23,25 +23,9 @@ if [ "x" = "x$ERL_MAX_ETS_TABLES" ]; then ERL_MAX_ETS_TABLES=50000 fi -# Lazy initialization of threed pool size - if it wasn't set -# explicitly. This parameter is only needed when server is starting, -# so it makes no sense to do this calculations in rabbitmq-env or -# rabbitmq-defaults scripts. -ensure_thread_pool_size() { - if [ -z "${RABBITMQ_IO_THREAD_POOL_SIZE}" ]; then - RABBITMQ_IO_THREAD_POOL_SIZE=$( - erl \ - -noinput \ - -boot "${CLEAN_BOOT_FILE}" \ - -s rabbit_misc report_default_thread_pool_size - ) - fi -} - check_start_params() { check_not_empty RABBITMQ_BOOT_MODULE check_not_empty SASL_BOOT_FILE - check_not_empty RABBITMQ_IO_THREAD_POOL_SIZE } check_not_empty() { @@ -59,7 +43,6 @@ start_rabbitmq_server() { set -e _rmq_env_set_erl_libs - ensure_thread_pool_size RABBITMQ_START_RABBIT= [ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT=" -noinput" @@ -92,7 +75,6 @@ start_rabbitmq_server() { ${RABBITMQ_START_RABBIT} \ -boot "${SASL_BOOT_FILE}" \ +W w \ - +A ${RABBITMQ_IO_THREAD_POOL_SIZE} \ ${RABBITMQ_DEFAULT_ALLOC_ARGS} \ ${RABBITMQ_SERVER_ERL_ARGS} \ ${RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS} \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 3eb033efc2..3a386b63c4 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -47,10 +47,6 @@ if "!RABBITMQ_NODE_ONLY!"=="" ( set RABBITMQ_START_RABBIT=!RABBITMQ_START_RABBIT! -s "!RABBITMQ_BOOT_MODULE!" boot
)
-if "!RABBITMQ_IO_THREAD_POOL_SIZE!"=="" (
- set RABBITMQ_IO_THREAD_POOL_SIZE=64
-)
-
set ENV_OK=true
CALL :check_not_empty "RABBITMQ_BOOT_MODULE" !RABBITMQ_BOOT_MODULE!
@@ -68,7 +64,6 @@ if "!RABBITMQ_ALLOW_INPUT!"=="" ( !RABBITMQ_START_RABBIT! ^
-boot "!SASL_BOOT_FILE!" ^
+W w ^
-+A "!RABBITMQ_IO_THREAD_POOL_SIZE!" ^
!RABBITMQ_DEFAULT_ALLOC_ARGS! ^
!RABBITMQ_SERVER_ERL_ARGS! ^
!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index b42e0dd89a..0b7906d4bf 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -176,10 +176,6 @@ if "!RABBITMQ_NODE_ONLY!"=="" ( set RABBITMQ_START_RABBIT=-s "!RABBITMQ_BOOT_MODULE!" boot
)
-if "!RABBITMQ_IO_THREAD_POOL_SIZE!"=="" (
- set RABBITMQ_IO_THREAD_POOL_SIZE=64
-)
-
if "!RABBITMQ_SERVICE_RESTART!"=="" (
set RABBITMQ_SERVICE_RESTART=restart
)
@@ -197,7 +193,6 @@ set ERLANG_SERVICE_ARGUMENTS= ^ !RABBITMQ_START_RABBIT! ^
-boot "!SASL_BOOT_FILE!" ^
+W w ^
-+A "!RABBITMQ_IO_THREAD_POOL_SIZE!" ^
!RABBITMQ_DEFAULT_ALLOC_ARGS! ^
!RABBITMQ_SERVER_ERL_ARGS! ^
!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index fdf5d4feea..6104228c54 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -783,7 +783,8 @@ declare_args() -> {<<"x-quorum-initial-group-size">>, fun check_initial_cluster_size_arg/2}, {<<"x-max-age">>, fun check_max_age_arg/2}, {<<"x-max-segment-size">>, fun check_non_neg_int_arg/2}, - {<<"x-initial-cluster-size">>, fun check_initial_cluster_size_arg/2}]. + {<<"x-initial-cluster-size">>, fun check_initial_cluster_size_arg/2}, + {<<"x-queue-leader-locator">>, fun check_queue_leader_locator_arg/2}]. consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}]. @@ -902,6 +903,16 @@ check_overflow({longstr, Val}, _Args) -> check_overflow({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. +check_queue_leader_locator_arg({longstr, Val}, _Args) -> + case lists:member(Val, [<<"client-local">>, + <<"random">>, + <<"least-leaders">>]) of + true -> ok; + false -> {error, invalid_queue_locator_arg} + end; +check_queue_leader_locator_arg({Type, _}, _Args) -> + {error, {unacceptable_type, Type}}. + check_queue_mode({longstr, Val}, _Args) -> case lists:member(Val, [<<"default">>, <<"lazy">>]) of true -> ok; diff --git a/src/rabbit_classic_queue.erl b/src/rabbit_classic_queue.erl index 5830c11f65..45cb43dbd1 100644 --- a/src/rabbit_classic_queue.erl +++ b/src/rabbit_classic_queue.erl @@ -453,7 +453,9 @@ capabilities() -> <<"x-max-in-memory-bytes">>, <<"x-max-priority">>, <<"x-overflow">>, <<"x-queue-mode">>, <<"x-single-active-consumer">>, <<"x-queue-type">>, <<"x-queue-master-locator">>], - consumer_arguments => [<<"x-cancel-on-ha-failover">>], + consumer_arguments => [<<"x-cancel-on-ha-failover">>, + <<"x-priority">>, <<"x-credit">> + ], server_named => true}. reject_seq_no(SeqNo, U0) -> diff --git a/src/rabbit_maintenance.erl b/src/rabbit_maintenance.erl index 004f8b4cea..039d125b13 100644 --- a/src/rabbit_maintenance.erl +++ b/src/rabbit_maintenance.erl @@ -89,6 +89,7 @@ do_drain() -> [length(TransferCandidates), ReadableCandidates]), transfer_leadership_of_classic_mirrored_queues(TransferCandidates), transfer_leadership_of_quorum_queues(TransferCandidates), + stop_local_quorum_queue_followers(), %% allow plugins to react rabbit_event:notify(maintenance_draining, #{ @@ -276,6 +277,28 @@ transfer_leadership_of_classic_mirrored_queues(TransferCandidates) -> end || Q <- Queues], rabbit_log:info("Leadership transfer for local classic mirrored queues is complete"). +-spec stop_local_quorum_queue_followers() -> ok. +stop_local_quorum_queue_followers() -> + Queues = rabbit_amqqueue:list_local_followers(), + rabbit_log:info("Will stop local follower replicas of ~b quorum queues on this node", + [length(Queues)]), + [begin + Name = amqqueue:get_name(Q), + rabbit_log:debug("Will stop a local follower replica of quorum queue ~s", + [rabbit_misc:rs(Name)]), + %% shut down Ra nodes so that they are not considered for leader election + {RegisteredName, _LeaderNode} = amqqueue:get_pid(Q), + RaNode = {RegisteredName, node()}, + rabbit_log:debug("Will stop Ra server ~p", [RaNode]), + case ra:stop_server(RaNode) of + ok -> + rabbit_log:debug("Successfully stopped Ra server ~p", [RaNode]); + {error, nodedown} -> + rabbit_log:error("Failed to stop Ra server ~p: target node was reported as down") + end + end || Q <- Queues], + rabbit_log:info("Stopped all local replicas of quorum queues hosted on this node"). + -spec primary_replica_transfer_candidate_nodes() -> [node()]. primary_replica_transfer_candidate_nodes() -> filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running() -- [node()]). diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index 0dd56a8ccd..f2c44e2289 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -41,6 +41,7 @@ register() -> {policy_validator, <<"delivery-limit">>}, {policy_validator, <<"max-age">>}, {policy_validator, <<"max-segment-size">>}, + {policy_validator, <<"queue-leader-locator">>}, {operator_policy_validator, <<"expires">>}, {operator_policy_validator, <<"message-ttl">>}, {operator_policy_validator, <<"max-length">>}, @@ -147,6 +148,15 @@ validate_policy0(<<"max-age">>, Value) -> ok end; +validate_policy0(<<"queue-leader-locator">>, <<"client-local">>) -> + ok; +validate_policy0(<<"queue-leader-locator">>, <<"random">>) -> + ok; +validate_policy0(<<"queue-leader-locator">>, <<"least-leaders">>) -> + ok; +validate_policy0(<<"queue-leader-locator">>, Value) -> + {error, "~p is not a valid queue leader locator value", [Value]}; + validate_policy0(<<"max-segment-size">>, Value) when is_integer(Value), Value >= 0 -> ok; diff --git a/src/rabbit_queue_type.erl b/src/rabbit_queue_type.erl index 3d34c969e0..2d45c043ba 100644 --- a/src/rabbit_queue_type.erl +++ b/src/rabbit_queue_type.erl @@ -218,9 +218,6 @@ is_enabled(Type) -> rabbit_types:channel_exit(). declare(Q, Node) -> Mod = amqqueue:get_type(Q), - Capabilities = Mod:capabilities(), - ValidQueueArgs = maps:get(queue_arguments, Capabilities, []), - check_invalid_arguments(amqqueue:get_name(Q), amqqueue:get_arguments(Q), ValidQueueArgs), Mod:declare(Q, Node). -spec delete(amqqueue:amqqueue(), boolean(), @@ -334,9 +331,6 @@ new(Q, State) when ?is_amqqueue(Q) -> consume(Q, Spec, State) -> #ctx{state = State0} = Ctx = get_ctx(Q, State), Mod = amqqueue:get_type(Q), - Capabilities = Mod:capabilities(), - ValidConsumerArgs = maps:get(consumer_arguments, Capabilities, []), - check_invalid_arguments(amqqueue:get_name(Q), maps:get(args, Spec), ValidConsumerArgs), case Mod:consume(Q, Spec, State0) of {ok, CtxState, Actions} -> return_ok(set_ctx(Q, Ctx#ctx{state = CtxState}, State), Actions); @@ -577,14 +571,3 @@ return_ok(State0, Actions0) -> {S, [Act | A0]} end, {State0, []}, Actions0), {ok, State, lists:reverse(Actions)}. - - -check_invalid_arguments(QueueName, Args, Keys) -> - [case lists:member(Arg, Keys) of - true -> ok; - false -> rabbit_misc:protocol_error( - precondition_failed, - "invalid arg '~s' for ~s", - [Arg, rabbit_misc:rs(QueueName)]) - end || {Arg, _, _} <- Args], - ok. diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 32ac9c4036..563b206590 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -343,7 +343,7 @@ capabilities() -> <<"x-max-in-memory-bytes">>, <<"x-overflow">>, <<"x-single-active-consumer">>, <<"x-queue-type">>, <<"x-quorum-initial-group-size">>, <<"x-delivery-limit">>], - consumer_arguments => [<<"x-priority">>], + consumer_arguments => [<<"x-priority">>, <<"x-credit">>], server_named => false}. rpc_delete_metrics(QName) -> diff --git a/src/rabbit_stream_coordinator.erl b/src/rabbit_stream_coordinator.erl index 472cf5d70f..8a1205b9af 100644 --- a/src/rabbit_stream_coordinator.erl +++ b/src/rabbit_stream_coordinator.erl @@ -192,13 +192,14 @@ apply(#{from := From}, {policy_changed, #{stream_id := StreamId}} = Cmd, end; apply(#{from := From}, {start_cluster, #{queue := Q}}, #?MODULE{streams = Streams} = State) -> - #{name := StreamId} = Conf = amqqueue:get_type_state(Q), + #{name := StreamId} = Conf0 = amqqueue:get_type_state(Q), + Conf = apply_leader_locator_strategy(Conf0, Streams), case maps:is_key(StreamId, Streams) of true -> {State, '$ra_no_reply', wrap_reply(From, {error, already_started})}; false -> Phase = phase_start_cluster, - PhaseArgs = [Q], + PhaseArgs = [amqqueue:set_type_state(Q, Conf)], SState = #{state => start_cluster, phase => Phase, phase_args => PhaseArgs, @@ -904,3 +905,43 @@ add_unique(Node, Nodes) -> delete_replica_pid(Node, ReplicaPids) -> lists:partition(fun(P) -> node(P) =/= Node end, ReplicaPids). + +apply_leader_locator_strategy(#{leader_locator_strategy := <<"client-local">>} = Conf, _) -> + Conf; +apply_leader_locator_strategy(#{leader_node := Leader, + replica_nodes := Replicas0, + leader_locator_strategy := <<"random">>, + name := StreamId} = Conf, _) -> + Replicas = [Leader | Replicas0], + ClusterSize = length(Replicas), + Hash = erlang:phash2(StreamId), + Pos = (Hash rem ClusterSize) + 1, + NewLeader = lists:nth(Pos, Replicas), + NewReplicas = lists:delete(NewLeader, Replicas), + Conf#{leader_node => NewLeader, + replica_nodes => NewReplicas}; +apply_leader_locator_strategy(#{leader_node := Leader, + replica_nodes := Replicas0, + leader_locator_strategy := <<"least-leaders">>} = Conf, + Streams) -> + Replicas = [Leader | Replicas0], + Counters0 = maps:from_list([{R, 0} || R <- Replicas]), + Counters = maps:to_list(maps:fold(fun(_Key, #{conf := #{leader_node := L}}, Acc) -> + maps:update_with(L, fun(V) -> V + 1 end, 0, Acc) + end, Counters0, Streams)), + Ordered = lists:sort(fun({_, V1}, {_, V2}) -> + V1 =< V2 + end, Counters), + %% We could have potentially introduced nodes that are not in the list of replicas if + %% initial cluster size is smaller than the cluster size. Let's select the first one + %% that is on the list of replicas + NewLeader = select_first_matching_node(Ordered, Replicas), + NewReplicas = lists:delete(NewLeader, Replicas), + Conf#{leader_node => NewLeader, + replica_nodes => NewReplicas}. + +select_first_matching_node([{N, _} | Rest], Replicas) -> + case lists:member(N, Replicas) of + true -> N; + false -> select_first_matching_node(Rest, Replicas) + end. diff --git a/src/rabbit_stream_queue.erl b/src/rabbit_stream_queue.erl index 8e45290da4..0ae58fc6b8 100644 --- a/src/rabbit_stream_queue.erl +++ b/src/rabbit_stream_queue.erl @@ -504,6 +504,8 @@ make_stream_conf(Node, Q) -> MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), MaxAge = max_age(args_policy_lookup(<<"max-age">>, fun max_age/2, Q)), MaxSegmentSize = args_policy_lookup(<<"max-segment-size">>, fun min/2, Q), + LeaderLocator = queue_leader_locator(args_policy_lookup(<<"queue-leader-locator">>, + fun res_arg/2, Q)), Replicas0 = rabbit_mnesia:cluster_nodes(all) -- [Node], Arguments = amqqueue:get_arguments(Q), Replicas = select_stream_nodes(get_initial_cluster_size(Arguments) - 1, Replicas0), @@ -515,6 +517,7 @@ make_stream_conf(Node, Q) -> add_if_defined(max_segment_size, MaxSegmentSize, #{reference => QName, name => Name, retention => Retention, + leader_locator_strategy => LeaderLocator, leader_node => Node, replica_nodes => Replicas, event_formatter => Formatter, @@ -570,6 +573,12 @@ max_age(Age) -> max_age(Age1, Age2) -> min(rabbit_amqqueue:check_max_age(Age1), rabbit_amqqueue:check_max_age(Age2)). +queue_leader_locator(undefined) -> <<"client-local">>; +queue_leader_locator(Val) -> Val. + +res_arg(PolVal, undefined) -> PolVal; +res_arg(_, ArgVal) -> ArgVal. + queue_name(#resource{virtual_host = VHost, name = Name}) -> Timestamp = erlang:integer_to_binary(erlang:system_time()), osiris_util:to_base64uri(erlang:binary_to_list(<<VHost/binary, "_", Name/binary, "_", @@ -678,12 +687,13 @@ msg_to_iodata(#basic_message{exchange_name = #resource{name = Exchange}, rabbit_msg_record:to_iodata(R). capabilities() -> - #{policies => [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>], + #{policies => [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>, + <<"queue-leader-locator">>], queue_arguments => [<<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>, <<"x-max-length">>, <<"x-max-length-bytes">>, <<"x-single-active-consumer">>, <<"x-queue-type">>, <<"x-max-age">>, <<"x-max-segment-size">>, - <<"x-initial-cluster-size">>], + <<"x-initial-cluster-size">>, <<"x-queue-leader-locator">>], consumer_arguments => [<<"x-stream-offset">>], server_named => false}. diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 97b2235411..8a2bf7f5d9 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -89,7 +89,6 @@ groups() -> all_tests() -> [ declare_args, - declare_invalid_args, declare_invalid_properties, declare_server_named, start_queue, @@ -338,47 +337,6 @@ declare_invalid_properties(Config) -> durable = false, arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]})). -declare_invalid_args(Config) -> - Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - LQ = ?config(queue_name, Config), - - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-message-ttl">>, long, 2000}])), - - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-max-priority">>, long, 2000}])), - - [?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-overflow">>, longstr, XOverflow}])) - || XOverflow <- [<<"reject-publish-dlx">>]], - - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-queue-mode">>, longstr, <<"lazy">>}])), - - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-quorum-initial-group-size">>, longstr, <<"hop">>}])), - - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-quorum-initial-group-size">>, long, 0}])). - declare_server_named(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl index 4362db1d65..5ae4de82f4 100644 --- a/test/rabbit_stream_queue_SUITE.erl +++ b/test/rabbit_stream_queue_SUITE.erl @@ -48,7 +48,11 @@ groups() -> consume_from_replica, leader_failover, initial_cluster_size_one, - initial_cluster_size_two]}, + initial_cluster_size_two, + leader_locator_client_local, + leader_locator_random, + leader_locator_least_leaders, + leader_locator_policy]}, {unclustered_size_3_1, [], [add_replica]}, {unclustered_size_3_2, [], [consume_without_local_replica]}, {unclustered_size_3_3, [], [grow_coordinator_cluster]}, @@ -59,7 +63,6 @@ all_tests() -> [ declare_args, declare_max_age, - declare_invalid_args, declare_invalid_properties, declare_server_named, declare_queue, @@ -226,46 +229,6 @@ declare_invalid_properties(Config) -> durable = false, arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]})). -declare_invalid_args(Config) -> - Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - Q = ?config(queue_name, Config), - - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-expires">>, long, 2000}])), - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-message-ttl">>, long, 2000}])), - - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-max-priority">>, long, 2000}])), - - [?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-overflow">>, longstr, XOverflow}])) - || XOverflow <- [<<"reject-publish">>, <<"reject-publish-dlx">>]], - - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-queue-mode">>, longstr, <<"lazy">>}])), - - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-quorum-initial-group-size">>, longstr, <<"hop">>}])). - declare_server_named(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), @@ -1146,7 +1109,7 @@ leader_failover(Config) -> ok = rabbit_ct_broker_helpers:start_node(Config, Server1). initial_cluster_size_one(Config) -> - [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), Q = ?config(queue_name, Config), @@ -1160,7 +1123,7 @@ initial_cluster_size_one(Config) -> amqp_channel:call(Ch, #'queue.delete'{queue = Q})). initial_cluster_size_two(Config) -> - [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), Q = ?config(queue_name, Config), @@ -1181,6 +1144,184 @@ initial_cluster_size_two(Config) -> ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = Q})). +leader_locator_client_local(Config) -> + [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), + + [Info] = lists:filter( + fun(Props) -> + lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader]])), + ?assertEqual(Server1, proplists:get_value(leader, Info)), + + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})), + + %% Try second node + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch2, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), + + [Info2] = lists:filter( + fun(Props) -> + lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader]])), + ?assertEqual(Server2, proplists:get_value(leader, Info2)), + + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch2, #'queue.delete'{queue = Q})), + + %% Try third node + Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server3), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch3, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), + + [Info3] = lists:filter( + fun(Props) -> + lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader]])), + ?assertEqual(Server3, proplists:get_value(leader, Info3)), + + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch3, #'queue.delete'{queue = Q})). + +leader_locator_random(Config) -> + [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"random">>}])), + + [Info] = lists:filter( + fun(Props) -> + lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader]])), + Leader = proplists:get_value(leader, Info), + + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})), + + repeat_until( + fun() -> + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"random">>}])), + + [Info2] = lists:filter( + fun(Props) -> + lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader]])), + Leader2 = proplists:get_value(leader, Info2), + + Leader =/= Leader2 + end, 10). + +leader_locator_least_leaders(Config) -> + [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), + Q = ?config(queue_name, Config), + + Q1 = <<"q1">>, + Q2 = <<"q2">>, + ?assertEqual({'queue.declare_ok', Q1, 0, 0}, + declare(Ch, Q1, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), + ?assertEqual({'queue.declare_ok', Q2, 0, 0}, + declare(Ch, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-queue-leader-locator">>, longstr, <<"least-leaders">>}])), + + [Info] = lists:filter( + fun(Props) -> + lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader]])), + Leader = proplists:get_value(leader, Info), + + ?assert(lists:member(Leader, [Server2, Server3])). + +leader_locator_policy(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"leader-locator">>, <<"leader_locator_.*">>, <<"queues">>, + [{<<"queue-leader-locator">>, <<"random">>}]), + + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [policy, operator_policy, + effective_policy_definition, + name, leader]]), + + ?assertEqual(<<"leader-locator">>, proplists:get_value(policy, Info)), + ?assertEqual('', proplists:get_value(operator_policy, Info)), + ?assertEqual([{<<"queue-leader-locator">>, <<"random">>}], + proplists:get_value(effective_policy_definition, Info)), + + Leader = proplists:get_value(leader, Info), + + repeat_until( + fun() -> + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + [Info2] = lists:filter( + fun(Props) -> + lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader]])), + Leader2 = proplists:get_value(leader, Info2), + Leader =/= Leader2 + end, 10), + + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"leader-locator">>). + +repeat_until(_, 0) -> + ct:fail("Condition did not materialize in the expected amount of attempts"); +repeat_until(Fun, N) -> + case Fun() of + true -> ok; + false -> repeat_until(Fun, N - 1) + end. + invalid_policy(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |
