diff options
| author | Daniil Fedotov <hairyhum@gmail.com> | 2018-12-27 19:26:37 +0400 |
|---|---|---|
| committer | Daniil Fedotov <hairyhum@gmail.com> | 2018-12-27 19:26:37 +0400 |
| commit | 03fc22ddd9d0b47fb735f36c4c5421a7baeb2ad9 (patch) | |
| tree | 36c1b3c141f262ca6eb47bf6e90a3efb2590024c | |
| parent | 794ef8de24ec261d1f3217f0a1ccfae98c8c6873 (diff) | |
| download | rabbitmq-server-git-03fc22ddd9d0b47fb735f36c4c5421a7baeb2ad9.tar.gz | |
Introduce a configurable limit to message size.
Add `max_message_size` configuration to configure limit
in bytes.
If message is bigger - channel exception will be thrown.
Default limit is 128MB.
There is still a hard limit of 521MB.
[#161983593]
| -rw-r--r-- | Makefile | 4 | ||||
| -rw-r--r-- | priv/schema/rabbit.schema | 8 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 10 | ||||
| -rw-r--r-- | test/unit_inbroker_parallel_SUITE.erl | 79 |
4 files changed, 99 insertions, 2 deletions
@@ -130,7 +130,9 @@ define PROJECT_ENV {vhost_restart_strategy, continue}, %% {global, prefetch count} {default_consumer_prefetch, {false, 0}}, - {channel_queue_cleanup_interval, 60000} + {channel_queue_cleanup_interval, 60000}, + %% Default max message size is 128 MB + {max_message_size, 134217728} ] endef diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema index 5c6078a413..5c62daaf8a 100644 --- a/priv/schema/rabbit.schema +++ b/priv/schema/rabbit.schema @@ -554,6 +554,9 @@ end}. }. +{mapping, "msx_message_size", "rabbit.max_message_size", + [{datatype, integer}, {validators, ["less_then_512MB"]}]}. + %% Customising Socket Options. %% %% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for @@ -1361,6 +1364,11 @@ fun(Size) when is_integer(Size) -> Size > 0 andalso Size < 2147483648 end}. +{validator, "less_then_512MB", "Max message size should be less than 512MB and gre than 0", +fun(Size) when is_integer(Size) -> + Size > 0 andalso Size < 536870912 +end}. + {validator, "less_than_1", "Flooat is not beetween 0 and 1", fun(Float) when is_float(Float) -> Float > 0 andalso Float < 1 diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d1f3b06528..f626f059a0 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -990,7 +990,15 @@ check_msg_size(Content) -> case Size > ?MAX_MSG_SIZE of true -> precondition_failed("message size ~B larger than max size ~B", [Size, ?MAX_MSG_SIZE]); - false -> ok + false -> + case application:get_env(rabbit, max_message_size) of + {ok, MaxSize} when is_integer(MaxSize) andalso Size > MaxSize -> + precondition_failed("message size ~B larger than" + " configured max size ~B", + [Size, MaxSize]); + + _ -> ok + end end. check_vhost_queue_limit(#resource{name = QueueName}, VHost) -> diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index d8031ce6d7..a97da8a715 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -58,6 +58,7 @@ groups() -> set_disk_free_limit_command, set_vm_memory_high_watermark_command, topic_matching, + max_message_size, {queue_max_length, [], [ {max_length_simple, [], MaxLengthTests}, {max_length_mirrored, [], MaxLengthTests}]} @@ -1299,6 +1300,84 @@ sync_mirrors(QName, Config) -> _ -> ok end. +gen_binary_mb(N) -> + B1M = << <<"_">> || _ <- lists:seq(1, 1024 * 1024) >>, + << B1M || _ <- lists:seq(1, N) >>. + +assert_channel_alive(Ch) -> + amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, + #amqp_msg{payload = <<"HI">>}). + +assert_channel_fail_max_size(Ch, Monitor, ExpectedException) -> + receive + {'DOWN', Monitor, process, Ch, + {shutdown, + {server_initiated_close, 406, Exception}}} -> + ?assertMatch(Exception, ExpectedException) + after 100000 -> + error({channel_exception_expected, max_message_size}) + end. + +max_message_size(Config) -> + Binary128M = gen_binary_mb(128), + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + %% Default message size is 128MB + Size128Mb = 1024 * 1024 * 128, + Size128Mb = rabbit_ct_broker_helpers:rpc(Config, 0, + application, get_env, [rabbit, max_message_size, undefined]), + + Size128Mb = byte_size(Binary128M), + %% Binary is whithin the max size limit + amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary128M}), + %% Channel process is alive + assert_channel_alive(Ch), + + Monitor = monitor(process, Ch), + %% This publish should cause a channel exception + BinaryBiggerThan128M = <<"_", Binary128M/binary>>, + amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan128M}), + ct:pal("Assert channel error 128"), + ExpectedException = <<"PRECONDITION_FAILED - message size ", + (integer_to_binary(byte_size(BinaryBiggerThan128M)))/binary, + " larger than configured max size ", + (integer_to_binary(Size128Mb))/binary>>, + assert_channel_fail_max_size(Ch, Monitor, ExpectedException), + + + {_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + + %% Set a bigger message size + rabbit_ct_broker_helpers:rpc(Config, 0, + application, set_env, [rabbit, max_message_size, 1024 * 1024 * 256]), + + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary128M}), + assert_channel_alive(Ch1), + + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan128M}), + assert_channel_alive(Ch1), + + %% Set message size above 512MB. + %% The actual limit will be 512MB + rabbit_ct_broker_helpers:rpc(Config, 0, + application, set_env, [rabbit, max_message_size, 1024 * 1024 * 515]), + + Binary512M = << Binary128M/binary, Binary128M/binary, + Binary128M/binary, Binary128M/binary>>, + + BinaryBiggerThan512M = <<"_", Binary512M/binary>>, + + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary512M}), + assert_channel_alive(Ch1), + + Monitor1 = monitor(process, Ch1), + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan512M}), + ct:pal("Assert channel error 512"), + ExpectedException1 = <<"PRECONDITION_FAILED - message size ", + (integer_to_binary(byte_size(BinaryBiggerThan512M)))/binary, + " larger than max size ", + (integer_to_binary(byte_size(Binary512M)))/binary>>, + assert_channel_fail_max_size(Ch1, Monitor1, ExpectedException1). + %% --------------------------------------------------------------------------- %% rabbitmqctl helpers. %% --------------------------------------------------------------------------- |
