summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-22 10:23:02 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-22 10:23:02 +0200
commit440f05ab5e3d488d21ceb52b7eb15e5ce5bf033a (patch)
tree340634edc6bef1d1e42519855924cb656b19ec40
parentdc54e7f943497f3a385e42bbfc8c88b56ef2cca8 (diff)
parent3147ee9a71ee37a63551fa93f91e279b650c834b (diff)
downloadrabbitmq-server-git-440f05ab5e3d488d21ceb52b7eb15e5ce5bf033a.tar.gz
Merge branch 'master' into queue-type-info-keysqueue-type-info-keys
-rw-r--r--docs/rabbitmqctl.812
-rw-r--r--rabbitmq-components.mk4
-rwxr-xr-xscripts/rabbitmq-env1
-rwxr-xr-xscripts/rabbitmq-server18
-rw-r--r--scripts/rabbitmq-server.bat5
-rw-r--r--scripts/rabbitmq-service.bat5
-rw-r--r--src/rabbit_amqqueue.erl13
-rw-r--r--src/rabbit_classic_queue.erl4
-rw-r--r--src/rabbit_maintenance.erl23
-rw-r--r--src/rabbit_policies.erl10
-rw-r--r--src/rabbit_queue_type.erl17
-rw-r--r--src/rabbit_quorum_queue.erl2
-rw-r--r--src/rabbit_stream_coordinator.erl45
-rw-r--r--src/rabbit_stream_queue.erl14
-rw-r--r--test/quorum_queue_SUITE.erl42
-rw-r--r--test/rabbit_stream_queue_SUITE.erl229
16 files changed, 291 insertions, 153 deletions
diff --git a/docs/rabbitmqctl.8 b/docs/rabbitmqctl.8
index 2c60ba10d3..3e041ad2c8 100644
--- a/docs/rabbitmqctl.8
+++ b/docs/rabbitmqctl.8
@@ -1429,18 +1429,6 @@ is located on the current node.
.Sp
.Dl rabbitmqctl list_unresponsive_queues --local name
.\" ------------------------------------------------------------------
-.It Cm node_health_check
-.Pp
-DEPRECATED. Performs intrusive, opinionated health checks on a fully booted node.
-To learn more, see the
-.Lk https://www.rabbitmq.com/monitoring.html#health-checks "Health Checks documentation"
-.Pp
-Verifies the RabbitMQ application is running and alarms are not set,
-then checks that every queue and channel on the node can emit basic stats.
-.sp
-Example:
-.Dl rabbitmqctl node_health_check -n rabbit@hostname
-.\" ------------------------------------------------------------------
.It Cm ping
.Pp
Checks that the node OS process is up, registered with EPMD and CLI tools can authenticate with it
diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk
index 091a35b7c0..f2b677da62 100644
--- a/rabbitmq-components.mk
+++ b/rabbitmq-components.mk
@@ -111,8 +111,8 @@ dep_rabbitmq_public_umbrella = git_rmq rabbitmq-public-umbrella $(curre
# possible to work with rabbitmq-public-umbrella.
dep_accept = hex 0.3.5
-dep_cowboy = hex 2.6.1
-dep_cowlib = hex 2.7.0
+dep_cowboy = hex 2.8.0
+dep_cowlib = hex 2.9.1
dep_jsx = hex 2.11.0
dep_lager = hex 3.8.0
dep_prometheus = git https://github.com/deadtrickster/prometheus.erl.git master
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index afae3651aa..90702c43bb 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -130,7 +130,6 @@ SERVER_ERL_ARGS=" +P $RABBITMQ_MAX_NUMBER_OF_PROCESSES +t $RABBITMQ_MAX_NUMBER_O
[ "x" = "x$RABBITMQ_CTL_DIST_PORT_MIN" ] && RABBITMQ_CTL_DIST_PORT_MIN='35672'
[ "x" = "x$RABBITMQ_CTL_DIST_PORT_MAX" ] && RABBITMQ_CTL_DIST_PORT_MAX="$(($RABBITMQ_CTL_DIST_PORT_MIN + 10))"
-[ "x" = "x$RABBITMQ_IO_THREAD_POOL_SIZE" ] && RABBITMQ_IO_THREAD_POOL_SIZE=${IO_THREAD_POOL_SIZE}
[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS}
[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS}
[ "x" = "x$RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS" ] && RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=${SERVER_ADDITIONAL_ERL_ARGS}
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index b464c5fa09..82058dcb26 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -23,25 +23,9 @@ if [ "x" = "x$ERL_MAX_ETS_TABLES" ]; then
ERL_MAX_ETS_TABLES=50000
fi
-# Lazy initialization of threed pool size - if it wasn't set
-# explicitly. This parameter is only needed when server is starting,
-# so it makes no sense to do this calculations in rabbitmq-env or
-# rabbitmq-defaults scripts.
-ensure_thread_pool_size() {
- if [ -z "${RABBITMQ_IO_THREAD_POOL_SIZE}" ]; then
- RABBITMQ_IO_THREAD_POOL_SIZE=$(
- erl \
- -noinput \
- -boot "${CLEAN_BOOT_FILE}" \
- -s rabbit_misc report_default_thread_pool_size
- )
- fi
-}
-
check_start_params() {
check_not_empty RABBITMQ_BOOT_MODULE
check_not_empty SASL_BOOT_FILE
- check_not_empty RABBITMQ_IO_THREAD_POOL_SIZE
}
check_not_empty() {
@@ -59,7 +43,6 @@ start_rabbitmq_server() {
set -e
_rmq_env_set_erl_libs
- ensure_thread_pool_size
RABBITMQ_START_RABBIT=
[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT=" -noinput"
@@ -92,7 +75,6 @@ start_rabbitmq_server() {
${RABBITMQ_START_RABBIT} \
-boot "${SASL_BOOT_FILE}" \
+W w \
- +A ${RABBITMQ_IO_THREAD_POOL_SIZE} \
${RABBITMQ_DEFAULT_ALLOC_ARGS} \
${RABBITMQ_SERVER_ERL_ARGS} \
${RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS} \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 3eb033efc2..3a386b63c4 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -47,10 +47,6 @@ if "!RABBITMQ_NODE_ONLY!"=="" (
set RABBITMQ_START_RABBIT=!RABBITMQ_START_RABBIT! -s "!RABBITMQ_BOOT_MODULE!" boot
)
-if "!RABBITMQ_IO_THREAD_POOL_SIZE!"=="" (
- set RABBITMQ_IO_THREAD_POOL_SIZE=64
-)
-
set ENV_OK=true
CALL :check_not_empty "RABBITMQ_BOOT_MODULE" !RABBITMQ_BOOT_MODULE!
@@ -68,7 +64,6 @@ if "!RABBITMQ_ALLOW_INPUT!"=="" (
!RABBITMQ_START_RABBIT! ^
-boot "!SASL_BOOT_FILE!" ^
+W w ^
-+A "!RABBITMQ_IO_THREAD_POOL_SIZE!" ^
!RABBITMQ_DEFAULT_ALLOC_ARGS! ^
!RABBITMQ_SERVER_ERL_ARGS! ^
!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index b42e0dd89a..0b7906d4bf 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -176,10 +176,6 @@ if "!RABBITMQ_NODE_ONLY!"=="" (
set RABBITMQ_START_RABBIT=-s "!RABBITMQ_BOOT_MODULE!" boot
)
-if "!RABBITMQ_IO_THREAD_POOL_SIZE!"=="" (
- set RABBITMQ_IO_THREAD_POOL_SIZE=64
-)
-
if "!RABBITMQ_SERVICE_RESTART!"=="" (
set RABBITMQ_SERVICE_RESTART=restart
)
@@ -197,7 +193,6 @@ set ERLANG_SERVICE_ARGUMENTS= ^
!RABBITMQ_START_RABBIT! ^
-boot "!SASL_BOOT_FILE!" ^
+W w ^
-+A "!RABBITMQ_IO_THREAD_POOL_SIZE!" ^
!RABBITMQ_DEFAULT_ALLOC_ARGS! ^
!RABBITMQ_SERVER_ERL_ARGS! ^
!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index fdf5d4feea..6104228c54 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -783,7 +783,8 @@ declare_args() ->
{<<"x-quorum-initial-group-size">>, fun check_initial_cluster_size_arg/2},
{<<"x-max-age">>, fun check_max_age_arg/2},
{<<"x-max-segment-size">>, fun check_non_neg_int_arg/2},
- {<<"x-initial-cluster-size">>, fun check_initial_cluster_size_arg/2}].
+ {<<"x-initial-cluster-size">>, fun check_initial_cluster_size_arg/2},
+ {<<"x-queue-leader-locator">>, fun check_queue_leader_locator_arg/2}].
consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
{<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}].
@@ -902,6 +903,16 @@ check_overflow({longstr, Val}, _Args) ->
check_overflow({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
+check_queue_leader_locator_arg({longstr, Val}, _Args) ->
+ case lists:member(Val, [<<"client-local">>,
+ <<"random">>,
+ <<"least-leaders">>]) of
+ true -> ok;
+ false -> {error, invalid_queue_locator_arg}
+ end;
+check_queue_leader_locator_arg({Type, _}, _Args) ->
+ {error, {unacceptable_type, Type}}.
+
check_queue_mode({longstr, Val}, _Args) ->
case lists:member(Val, [<<"default">>, <<"lazy">>]) of
true -> ok;
diff --git a/src/rabbit_classic_queue.erl b/src/rabbit_classic_queue.erl
index 5830c11f65..45cb43dbd1 100644
--- a/src/rabbit_classic_queue.erl
+++ b/src/rabbit_classic_queue.erl
@@ -453,7 +453,9 @@ capabilities() ->
<<"x-max-in-memory-bytes">>, <<"x-max-priority">>,
<<"x-overflow">>, <<"x-queue-mode">>, <<"x-single-active-consumer">>,
<<"x-queue-type">>, <<"x-queue-master-locator">>],
- consumer_arguments => [<<"x-cancel-on-ha-failover">>],
+ consumer_arguments => [<<"x-cancel-on-ha-failover">>,
+ <<"x-priority">>, <<"x-credit">>
+ ],
server_named => true}.
reject_seq_no(SeqNo, U0) ->
diff --git a/src/rabbit_maintenance.erl b/src/rabbit_maintenance.erl
index 004f8b4cea..039d125b13 100644
--- a/src/rabbit_maintenance.erl
+++ b/src/rabbit_maintenance.erl
@@ -89,6 +89,7 @@ do_drain() ->
[length(TransferCandidates), ReadableCandidates]),
transfer_leadership_of_classic_mirrored_queues(TransferCandidates),
transfer_leadership_of_quorum_queues(TransferCandidates),
+ stop_local_quorum_queue_followers(),
%% allow plugins to react
rabbit_event:notify(maintenance_draining, #{
@@ -276,6 +277,28 @@ transfer_leadership_of_classic_mirrored_queues(TransferCandidates) ->
end || Q <- Queues],
rabbit_log:info("Leadership transfer for local classic mirrored queues is complete").
+-spec stop_local_quorum_queue_followers() -> ok.
+stop_local_quorum_queue_followers() ->
+ Queues = rabbit_amqqueue:list_local_followers(),
+ rabbit_log:info("Will stop local follower replicas of ~b quorum queues on this node",
+ [length(Queues)]),
+ [begin
+ Name = amqqueue:get_name(Q),
+ rabbit_log:debug("Will stop a local follower replica of quorum queue ~s",
+ [rabbit_misc:rs(Name)]),
+ %% shut down Ra nodes so that they are not considered for leader election
+ {RegisteredName, _LeaderNode} = amqqueue:get_pid(Q),
+ RaNode = {RegisteredName, node()},
+ rabbit_log:debug("Will stop Ra server ~p", [RaNode]),
+ case ra:stop_server(RaNode) of
+ ok ->
+ rabbit_log:debug("Successfully stopped Ra server ~p", [RaNode]);
+ {error, nodedown} ->
+ rabbit_log:error("Failed to stop Ra server ~p: target node was reported as down")
+ end
+ end || Q <- Queues],
+ rabbit_log:info("Stopped all local replicas of quorum queues hosted on this node").
+
-spec primary_replica_transfer_candidate_nodes() -> [node()].
primary_replica_transfer_candidate_nodes() ->
filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running() -- [node()]).
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index 0dd56a8ccd..f2c44e2289 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -41,6 +41,7 @@ register() ->
{policy_validator, <<"delivery-limit">>},
{policy_validator, <<"max-age">>},
{policy_validator, <<"max-segment-size">>},
+ {policy_validator, <<"queue-leader-locator">>},
{operator_policy_validator, <<"expires">>},
{operator_policy_validator, <<"message-ttl">>},
{operator_policy_validator, <<"max-length">>},
@@ -147,6 +148,15 @@ validate_policy0(<<"max-age">>, Value) ->
ok
end;
+validate_policy0(<<"queue-leader-locator">>, <<"client-local">>) ->
+ ok;
+validate_policy0(<<"queue-leader-locator">>, <<"random">>) ->
+ ok;
+validate_policy0(<<"queue-leader-locator">>, <<"least-leaders">>) ->
+ ok;
+validate_policy0(<<"queue-leader-locator">>, Value) ->
+ {error, "~p is not a valid queue leader locator value", [Value]};
+
validate_policy0(<<"max-segment-size">>, Value)
when is_integer(Value), Value >= 0 ->
ok;
diff --git a/src/rabbit_queue_type.erl b/src/rabbit_queue_type.erl
index 3d34c969e0..2d45c043ba 100644
--- a/src/rabbit_queue_type.erl
+++ b/src/rabbit_queue_type.erl
@@ -218,9 +218,6 @@ is_enabled(Type) ->
rabbit_types:channel_exit().
declare(Q, Node) ->
Mod = amqqueue:get_type(Q),
- Capabilities = Mod:capabilities(),
- ValidQueueArgs = maps:get(queue_arguments, Capabilities, []),
- check_invalid_arguments(amqqueue:get_name(Q), amqqueue:get_arguments(Q), ValidQueueArgs),
Mod:declare(Q, Node).
-spec delete(amqqueue:amqqueue(), boolean(),
@@ -334,9 +331,6 @@ new(Q, State) when ?is_amqqueue(Q) ->
consume(Q, Spec, State) ->
#ctx{state = State0} = Ctx = get_ctx(Q, State),
Mod = amqqueue:get_type(Q),
- Capabilities = Mod:capabilities(),
- ValidConsumerArgs = maps:get(consumer_arguments, Capabilities, []),
- check_invalid_arguments(amqqueue:get_name(Q), maps:get(args, Spec), ValidConsumerArgs),
case Mod:consume(Q, Spec, State0) of
{ok, CtxState, Actions} ->
return_ok(set_ctx(Q, Ctx#ctx{state = CtxState}, State), Actions);
@@ -577,14 +571,3 @@ return_ok(State0, Actions0) ->
{S, [Act | A0]}
end, {State0, []}, Actions0),
{ok, State, lists:reverse(Actions)}.
-
-
-check_invalid_arguments(QueueName, Args, Keys) ->
- [case lists:member(Arg, Keys) of
- true -> ok;
- false -> rabbit_misc:protocol_error(
- precondition_failed,
- "invalid arg '~s' for ~s",
- [Arg, rabbit_misc:rs(QueueName)])
- end || {Arg, _, _} <- Args],
- ok.
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 32ac9c4036..563b206590 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -343,7 +343,7 @@ capabilities() ->
<<"x-max-in-memory-bytes">>, <<"x-overflow">>,
<<"x-single-active-consumer">>, <<"x-queue-type">>,
<<"x-quorum-initial-group-size">>, <<"x-delivery-limit">>],
- consumer_arguments => [<<"x-priority">>],
+ consumer_arguments => [<<"x-priority">>, <<"x-credit">>],
server_named => false}.
rpc_delete_metrics(QName) ->
diff --git a/src/rabbit_stream_coordinator.erl b/src/rabbit_stream_coordinator.erl
index 472cf5d70f..8a1205b9af 100644
--- a/src/rabbit_stream_coordinator.erl
+++ b/src/rabbit_stream_coordinator.erl
@@ -192,13 +192,14 @@ apply(#{from := From}, {policy_changed, #{stream_id := StreamId}} = Cmd,
end;
apply(#{from := From}, {start_cluster, #{queue := Q}}, #?MODULE{streams = Streams} = State) ->
- #{name := StreamId} = Conf = amqqueue:get_type_state(Q),
+ #{name := StreamId} = Conf0 = amqqueue:get_type_state(Q),
+ Conf = apply_leader_locator_strategy(Conf0, Streams),
case maps:is_key(StreamId, Streams) of
true ->
{State, '$ra_no_reply', wrap_reply(From, {error, already_started})};
false ->
Phase = phase_start_cluster,
- PhaseArgs = [Q],
+ PhaseArgs = [amqqueue:set_type_state(Q, Conf)],
SState = #{state => start_cluster,
phase => Phase,
phase_args => PhaseArgs,
@@ -904,3 +905,43 @@ add_unique(Node, Nodes) ->
delete_replica_pid(Node, ReplicaPids) ->
lists:partition(fun(P) -> node(P) =/= Node end, ReplicaPids).
+
+apply_leader_locator_strategy(#{leader_locator_strategy := <<"client-local">>} = Conf, _) ->
+ Conf;
+apply_leader_locator_strategy(#{leader_node := Leader,
+ replica_nodes := Replicas0,
+ leader_locator_strategy := <<"random">>,
+ name := StreamId} = Conf, _) ->
+ Replicas = [Leader | Replicas0],
+ ClusterSize = length(Replicas),
+ Hash = erlang:phash2(StreamId),
+ Pos = (Hash rem ClusterSize) + 1,
+ NewLeader = lists:nth(Pos, Replicas),
+ NewReplicas = lists:delete(NewLeader, Replicas),
+ Conf#{leader_node => NewLeader,
+ replica_nodes => NewReplicas};
+apply_leader_locator_strategy(#{leader_node := Leader,
+ replica_nodes := Replicas0,
+ leader_locator_strategy := <<"least-leaders">>} = Conf,
+ Streams) ->
+ Replicas = [Leader | Replicas0],
+ Counters0 = maps:from_list([{R, 0} || R <- Replicas]),
+ Counters = maps:to_list(maps:fold(fun(_Key, #{conf := #{leader_node := L}}, Acc) ->
+ maps:update_with(L, fun(V) -> V + 1 end, 0, Acc)
+ end, Counters0, Streams)),
+ Ordered = lists:sort(fun({_, V1}, {_, V2}) ->
+ V1 =< V2
+ end, Counters),
+ %% We could have potentially introduced nodes that are not in the list of replicas if
+ %% initial cluster size is smaller than the cluster size. Let's select the first one
+ %% that is on the list of replicas
+ NewLeader = select_first_matching_node(Ordered, Replicas),
+ NewReplicas = lists:delete(NewLeader, Replicas),
+ Conf#{leader_node => NewLeader,
+ replica_nodes => NewReplicas}.
+
+select_first_matching_node([{N, _} | Rest], Replicas) ->
+ case lists:member(N, Replicas) of
+ true -> N;
+ false -> select_first_matching_node(Rest, Replicas)
+ end.
diff --git a/src/rabbit_stream_queue.erl b/src/rabbit_stream_queue.erl
index 8e45290da4..0ae58fc6b8 100644
--- a/src/rabbit_stream_queue.erl
+++ b/src/rabbit_stream_queue.erl
@@ -504,6 +504,8 @@ make_stream_conf(Node, Q) ->
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
MaxAge = max_age(args_policy_lookup(<<"max-age">>, fun max_age/2, Q)),
MaxSegmentSize = args_policy_lookup(<<"max-segment-size">>, fun min/2, Q),
+ LeaderLocator = queue_leader_locator(args_policy_lookup(<<"queue-leader-locator">>,
+ fun res_arg/2, Q)),
Replicas0 = rabbit_mnesia:cluster_nodes(all) -- [Node],
Arguments = amqqueue:get_arguments(Q),
Replicas = select_stream_nodes(get_initial_cluster_size(Arguments) - 1, Replicas0),
@@ -515,6 +517,7 @@ make_stream_conf(Node, Q) ->
add_if_defined(max_segment_size, MaxSegmentSize, #{reference => QName,
name => Name,
retention => Retention,
+ leader_locator_strategy => LeaderLocator,
leader_node => Node,
replica_nodes => Replicas,
event_formatter => Formatter,
@@ -570,6 +573,12 @@ max_age(Age) ->
max_age(Age1, Age2) ->
min(rabbit_amqqueue:check_max_age(Age1), rabbit_amqqueue:check_max_age(Age2)).
+queue_leader_locator(undefined) -> <<"client-local">>;
+queue_leader_locator(Val) -> Val.
+
+res_arg(PolVal, undefined) -> PolVal;
+res_arg(_, ArgVal) -> ArgVal.
+
queue_name(#resource{virtual_host = VHost, name = Name}) ->
Timestamp = erlang:integer_to_binary(erlang:system_time()),
osiris_util:to_base64uri(erlang:binary_to_list(<<VHost/binary, "_", Name/binary, "_",
@@ -678,12 +687,13 @@ msg_to_iodata(#basic_message{exchange_name = #resource{name = Exchange},
rabbit_msg_record:to_iodata(R).
capabilities() ->
- #{policies => [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>],
+ #{policies => [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>,
+ <<"queue-leader-locator">>],
queue_arguments => [<<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>,
<<"x-max-length">>, <<"x-max-length-bytes">>,
<<"x-single-active-consumer">>, <<"x-queue-type">>,
<<"x-max-age">>, <<"x-max-segment-size">>,
- <<"x-initial-cluster-size">>],
+ <<"x-initial-cluster-size">>, <<"x-queue-leader-locator">>],
consumer_arguments => [<<"x-stream-offset">>],
server_named => false}.
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 97b2235411..8a2bf7f5d9 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -89,7 +89,6 @@ groups() ->
all_tests() ->
[
declare_args,
- declare_invalid_args,
declare_invalid_properties,
declare_server_named,
start_queue,
@@ -338,47 +337,6 @@ declare_invalid_properties(Config) ->
durable = false,
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]})).
-declare_invalid_args(Config) ->
- Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
- LQ = ?config(queue_name, Config),
-
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-message-ttl">>, long, 2000}])),
-
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-max-priority">>, long, 2000}])),
-
- [?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-overflow">>, longstr, XOverflow}]))
- || XOverflow <- [<<"reject-publish-dlx">>]],
-
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-queue-mode">>, longstr, <<"lazy">>}])),
-
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-quorum-initial-group-size">>, longstr, <<"hop">>}])),
-
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
- {<<"x-quorum-initial-group-size">>, long, 0}])).
-
declare_server_named(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl
index 4362db1d65..5ae4de82f4 100644
--- a/test/rabbit_stream_queue_SUITE.erl
+++ b/test/rabbit_stream_queue_SUITE.erl
@@ -48,7 +48,11 @@ groups() ->
consume_from_replica,
leader_failover,
initial_cluster_size_one,
- initial_cluster_size_two]},
+ initial_cluster_size_two,
+ leader_locator_client_local,
+ leader_locator_random,
+ leader_locator_least_leaders,
+ leader_locator_policy]},
{unclustered_size_3_1, [], [add_replica]},
{unclustered_size_3_2, [], [consume_without_local_replica]},
{unclustered_size_3_3, [], [grow_coordinator_cluster]},
@@ -59,7 +63,6 @@ all_tests() ->
[
declare_args,
declare_max_age,
- declare_invalid_args,
declare_invalid_properties,
declare_server_named,
declare_queue,
@@ -226,46 +229,6 @@ declare_invalid_properties(Config) ->
durable = false,
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]})).
-declare_invalid_args(Config) ->
- Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
- Q = ?config(queue_name, Config),
-
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
- {<<"x-expires">>, long, 2000}])),
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
- {<<"x-message-ttl">>, long, 2000}])),
-
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
- {<<"x-max-priority">>, long, 2000}])),
-
- [?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
- {<<"x-overflow">>, longstr, XOverflow}]))
- || XOverflow <- [<<"reject-publish">>, <<"reject-publish-dlx">>]],
-
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
- {<<"x-queue-mode">>, longstr, <<"lazy">>}])),
-
- ?assertExit(
- {{shutdown, {server_initiated_close, 406, _}}, _},
- declare(rabbit_ct_client_helpers:open_channel(Config, Server),
- Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
- {<<"x-quorum-initial-group-size">>, longstr, <<"hop">>}])).
-
declare_server_named(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
@@ -1146,7 +1109,7 @@ leader_failover(Config) ->
ok = rabbit_ct_broker_helpers:start_node(Config, Server1).
initial_cluster_size_one(Config) ->
- [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
Q = ?config(queue_name, Config),
@@ -1160,7 +1123,7 @@ initial_cluster_size_one(Config) ->
amqp_channel:call(Ch, #'queue.delete'{queue = Q})).
initial_cluster_size_two(Config) ->
- [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
Q = ?config(queue_name, Config),
@@ -1181,6 +1144,184 @@ initial_cluster_size_two(Config) ->
?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q})).
+leader_locator_client_local(Config) ->
+ [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),
+
+ [Info] = lists:filter(
+ fun(Props) ->
+ lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader]])),
+ ?assertEqual(Server1, proplists:get_value(leader, Info)),
+
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
+
+ %% Try second node
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch2, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),
+
+ [Info2] = lists:filter(
+ fun(Props) ->
+ lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader]])),
+ ?assertEqual(Server2, proplists:get_value(leader, Info2)),
+
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch2, #'queue.delete'{queue = Q})),
+
+ %% Try third node
+ Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server3),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch3, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),
+
+ [Info3] = lists:filter(
+ fun(Props) ->
+ lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader]])),
+ ?assertEqual(Server3, proplists:get_value(leader, Info3)),
+
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch3, #'queue.delete'{queue = Q})).
+
+leader_locator_random(Config) ->
+ [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-queue-leader-locator">>, longstr, <<"random">>}])),
+
+ [Info] = lists:filter(
+ fun(Props) ->
+ lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader]])),
+ Leader = proplists:get_value(leader, Info),
+
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
+
+ repeat_until(
+ fun() ->
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-queue-leader-locator">>, longstr, <<"random">>}])),
+
+ [Info2] = lists:filter(
+ fun(Props) ->
+ lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader]])),
+ Leader2 = proplists:get_value(leader, Info2),
+
+ Leader =/= Leader2
+ end, 10).
+
+leader_locator_least_leaders(Config) ->
+ [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ Q = ?config(queue_name, Config),
+
+ Q1 = <<"q1">>,
+ Q2 = <<"q2">>,
+ ?assertEqual({'queue.declare_ok', Q1, 0, 0},
+ declare(Ch, Q1, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),
+ ?assertEqual({'queue.declare_ok', Q2, 0, 0},
+ declare(Ch, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-queue-leader-locator">>, longstr, <<"least-leaders">>}])),
+
+ [Info] = lists:filter(
+ fun(Props) ->
+ lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader]])),
+ Leader = proplists:get_value(leader, Info),
+
+ ?assert(lists:member(Leader, [Server2, Server3])).
+
+leader_locator_policy(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"leader-locator">>, <<"leader_locator_.*">>, <<"queues">>,
+ [{<<"queue-leader-locator">>, <<"random">>}]),
+
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [policy, operator_policy,
+ effective_policy_definition,
+ name, leader]]),
+
+ ?assertEqual(<<"leader-locator">>, proplists:get_value(policy, Info)),
+ ?assertEqual('', proplists:get_value(operator_policy, Info)),
+ ?assertEqual([{<<"queue-leader-locator">>, <<"random">>}],
+ proplists:get_value(effective_policy_definition, Info)),
+
+ Leader = proplists:get_value(leader, Info),
+
+ repeat_until(
+ fun() ->
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ [Info2] = lists:filter(
+ fun(Props) ->
+ lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader]])),
+ Leader2 = proplists:get_value(leader, Info2),
+ Leader =/= Leader2
+ end, 10),
+
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"leader-locator">>).
+
+repeat_until(_, 0) ->
+ ct:fail("Condition did not materialize in the expected amount of attempts");
+repeat_until(Fun, N) ->
+ case Fun() of
+ true -> ok;
+ false -> repeat_until(Fun, N - 1)
+ end.
+
invalid_policy(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),