diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-16 14:07:19 +0200 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-16 14:07:19 +0200 |
| commit | cc030ac195c00dc7f34616f0d2303deee6864371 (patch) | |
| tree | ea2cf9d907542b0c48811a27da44080345d32ce7 /deps/rabbitmq_stream | |
| parent | b704e2f8effd34164061c712e7238b24b0e4bcec (diff) | |
| download | rabbitmq-server-git-cc030ac195c00dc7f34616f0d2303deee6864371.tar.gz | |
Support initial-cluster-size argument on creation
See rabbitmq/rabbitmq-server#2467
Diffstat (limited to 'deps/rabbitmq_stream')
3 files changed, 106 insertions, 19 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 3b8c766552..9fa4e3521d 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -71,30 +71,49 @@ stream_queue_arguments(ArgumentsAcc, #{<<"max-segment-size">> := Value} = Argume [{<<"x-max-segment-size">>, long, binary_to_integer(Value)}] ++ ArgumentsAcc, maps:remove(<<"max-segment-size">>, Arguments) ); +stream_queue_arguments(ArgumentsAcc, #{<<"initial-cluster-size">> := Value} = Arguments) -> + stream_queue_arguments( + [{<<"x-initial-cluster-size">>, long, binary_to_integer(Value)}] ++ ArgumentsAcc, + maps:remove(<<"initial-cluster-size">>, Arguments) + ); stream_queue_arguments(ArgumentsAcc, _Arguments) -> ArgumentsAcc. +validate_stream_queue_arguments([]) -> + ok; +validate_stream_queue_arguments([{<<"x-initial-cluster-size">>, long, ClusterSize} | _]) when ClusterSize =< 0 -> + error; +validate_stream_queue_arguments([_ | T]) -> + validate_stream_queue_arguments(T). + + handle_call({create, VirtualHost, Reference, Arguments, Username}, _From, State) -> Name = #resource{virtual_host = VirtualHost, kind = queue, name = Reference}, - Q0 = amqqueue:new( - Name, - none, true, false, none, stream_queue_arguments(Arguments), - VirtualHost, #{user => Username}, rabbit_stream_queue - ), - try - case rabbit_stream_queue:declare(Q0, node()) of - {new, Q} -> - {reply, {ok, amqqueue:get_type_state(Q)}, State}; - {existing, _} -> - {reply, {error, reference_already_exists}, State}; - {error, Err} -> - rabbit_log:warn("Error while creating ~p stream, ~p~n", [Reference, Err]), - {reply, {error, internal_error}, State} - end - catch - exit:Error -> - rabbit_log:info("Error while creating ~p stream, ~p~n", [Reference, Error]), - {reply, {error, internal_error}, State} + StreamQueueArguments = stream_queue_arguments(Arguments), + case validate_stream_queue_arguments(StreamQueueArguments) of + ok -> + Q0 = amqqueue:new( + Name, + none, true, false, none, StreamQueueArguments, + VirtualHost, #{user => Username}, rabbit_stream_queue + ), + try + case rabbit_stream_queue:declare(Q0, node()) of + {new, Q} -> + {reply, {ok, amqqueue:get_type_state(Q)}, State}; + {existing, _} -> + {reply, {error, reference_already_exists}, State}; + {error, Err} -> + rabbit_log:warn("Error while creating ~p stream, ~p~n", [Reference, Err]), + {reply, {error, internal_error}, State} + end + catch + exit:Error -> + rabbit_log:info("Error while creating ~p stream, ~p~n", [Reference, Error]), + {reply, {error, internal_error}, State} + end; + error -> + {reply, {error, validation_failed}, State} end; handle_call({delete, VirtualHost, Reference, Username}, _From, State) -> Name = #resource{virtual_host = VirtualHost, kind = queue, name = Reference}, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 6ffad09c5e..5dc5d5fa38 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -809,6 +809,9 @@ handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost, rabbit_log:info("Created cluster with leader ~p and replicas ~p~n", [LeaderPid, ReturnedReplicas]), response_ok(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId), {Connection, State, Rest}; + {error, validation_failed} -> + response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED), + {Connection, State, Rest}; {error, reference_already_exists} -> response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_STREAM_ALREADY_EXISTS), {Connection, State, Rest}; diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java new file mode 100644 index 0000000000..993c19b852 --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/ClusterSizeTest.java @@ -0,0 +1,65 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at https://www.mozilla.org/en-US/MPL/2.0/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. +// + +package com.rabbitmq.stream; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.rabbitmq.stream.impl.Client; +import com.rabbitmq.stream.impl.Client.Response; +import com.rabbitmq.stream.impl.Client.StreamMetadata; +import java.util.Collections; +import java.util.UUID; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +public class ClusterSizeTest { + + TestUtils.ClientFactory cf; + + @ParameterizedTest + @ValueSource(strings = {"-1", "0"}) + void clusterSizeZeroShouldReturnError(String clusterSize) { + Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + String s = UUID.randomUUID().toString(); + Response response = + client.create(s, Collections.singletonMap("initial-cluster-size", clusterSize)); + assertThat(response.isOk()).isFalse(); + assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_PRECONDITION_FAILED); + } + + @ParameterizedTest + @CsvSource({"1,1", "2,2", "3,3", "5,3"}) + void clusterSizeShouldReflectOnMetadata(String requestedClusterSize, int expectedClusterSize) { + Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + String s = UUID.randomUUID().toString(); + try { + Response response = + client.create(s, Collections.singletonMap("initial-cluster-size", requestedClusterSize)); + assertThat(response.isOk()).isTrue(); + StreamMetadata metadata = client.metadata(s).get(s); + assertThat(metadata).isNotNull(); + assertThat(metadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + int actualClusterSize = metadata.getLeader() == null ? 0 : 1 + metadata.getReplicas().size(); + assertThat(actualClusterSize).isEqualTo(expectedClusterSize); + } finally { + client.delete(s); + } + } +} |
