summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <hairyhum@gmail.com>2018-12-27 19:26:37 +0400
committerDaniil Fedotov <hairyhum@gmail.com>2018-12-27 19:26:37 +0400
commit03fc22ddd9d0b47fb735f36c4c5421a7baeb2ad9 (patch)
tree36c1b3c141f262ca6eb47bf6e90a3efb2590024c
parent794ef8de24ec261d1f3217f0a1ccfae98c8c6873 (diff)
downloadrabbitmq-server-git-03fc22ddd9d0b47fb735f36c4c5421a7baeb2ad9.tar.gz
Introduce a configurable limit to message size.
Add `max_message_size` configuration to configure limit in bytes. If message is bigger - channel exception will be thrown. Default limit is 128MB. There is still a hard limit of 521MB. [#161983593]
-rw-r--r--Makefile4
-rw-r--r--priv/schema/rabbit.schema8
-rw-r--r--src/rabbit_channel.erl10
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl79
4 files changed, 99 insertions, 2 deletions
diff --git a/Makefile b/Makefile
index e26f32d89f..094affc2df 100644
--- a/Makefile
+++ b/Makefile
@@ -130,7 +130,9 @@ define PROJECT_ENV
{vhost_restart_strategy, continue},
%% {global, prefetch count}
{default_consumer_prefetch, {false, 0}},
- {channel_queue_cleanup_interval, 60000}
+ {channel_queue_cleanup_interval, 60000},
+ %% Default max message size is 128 MB
+ {max_message_size, 134217728}
]
endef
diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema
index 5c6078a413..5c62daaf8a 100644
--- a/priv/schema/rabbit.schema
+++ b/priv/schema/rabbit.schema
@@ -554,6 +554,9 @@ end}.
}.
+{mapping, "msx_message_size", "rabbit.max_message_size",
+ [{datatype, integer}, {validators, ["less_then_512MB"]}]}.
+
%% Customising Socket Options.
%%
%% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for
@@ -1361,6 +1364,11 @@ fun(Size) when is_integer(Size) ->
Size > 0 andalso Size < 2147483648
end}.
+{validator, "less_then_512MB", "Max message size should be less than 512MB and gre than 0",
+fun(Size) when is_integer(Size) ->
+ Size > 0 andalso Size < 536870912
+end}.
+
{validator, "less_than_1", "Flooat is not beetween 0 and 1",
fun(Float) when is_float(Float) ->
Float > 0 andalso Float < 1
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index d1f3b06528..f626f059a0 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -990,7 +990,15 @@ check_msg_size(Content) ->
case Size > ?MAX_MSG_SIZE of
true -> precondition_failed("message size ~B larger than max size ~B",
[Size, ?MAX_MSG_SIZE]);
- false -> ok
+ false ->
+ case application:get_env(rabbit, max_message_size) of
+ {ok, MaxSize} when is_integer(MaxSize) andalso Size > MaxSize ->
+ precondition_failed("message size ~B larger than"
+ " configured max size ~B",
+ [Size, MaxSize]);
+
+ _ -> ok
+ end
end.
check_vhost_queue_limit(#resource{name = QueueName}, VHost) ->
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
index d8031ce6d7..a97da8a715 100644
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ b/test/unit_inbroker_parallel_SUITE.erl
@@ -58,6 +58,7 @@ groups() ->
set_disk_free_limit_command,
set_vm_memory_high_watermark_command,
topic_matching,
+ max_message_size,
{queue_max_length, [], [
{max_length_simple, [], MaxLengthTests},
{max_length_mirrored, [], MaxLengthTests}]}
@@ -1299,6 +1300,84 @@ sync_mirrors(QName, Config) ->
_ -> ok
end.
+gen_binary_mb(N) ->
+ B1M = << <<"_">> || _ <- lists:seq(1, 1024 * 1024) >>,
+ << B1M || _ <- lists:seq(1, N) >>.
+
+assert_channel_alive(Ch) ->
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>},
+ #amqp_msg{payload = <<"HI">>}).
+
+assert_channel_fail_max_size(Ch, Monitor, ExpectedException) ->
+ receive
+ {'DOWN', Monitor, process, Ch,
+ {shutdown,
+ {server_initiated_close, 406, Exception}}} ->
+ ?assertMatch(Exception, ExpectedException)
+ after 100000 ->
+ error({channel_exception_expected, max_message_size})
+ end.
+
+max_message_size(Config) ->
+ Binary128M = gen_binary_mb(128),
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ %% Default message size is 128MB
+ Size128Mb = 1024 * 1024 * 128,
+ Size128Mb = rabbit_ct_broker_helpers:rpc(Config, 0,
+ application, get_env, [rabbit, max_message_size, undefined]),
+
+ Size128Mb = byte_size(Binary128M),
+ %% Binary is whithin the max size limit
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary128M}),
+ %% Channel process is alive
+ assert_channel_alive(Ch),
+
+ Monitor = monitor(process, Ch),
+ %% This publish should cause a channel exception
+ BinaryBiggerThan128M = <<"_", Binary128M/binary>>,
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan128M}),
+ ct:pal("Assert channel error 128"),
+ ExpectedException = <<"PRECONDITION_FAILED - message size ",
+ (integer_to_binary(byte_size(BinaryBiggerThan128M)))/binary,
+ " larger than configured max size ",
+ (integer_to_binary(Size128Mb))/binary>>,
+ assert_channel_fail_max_size(Ch, Monitor, ExpectedException),
+
+
+ {_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+
+ %% Set a bigger message size
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ application, set_env, [rabbit, max_message_size, 1024 * 1024 * 256]),
+
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary128M}),
+ assert_channel_alive(Ch1),
+
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan128M}),
+ assert_channel_alive(Ch1),
+
+ %% Set message size above 512MB.
+ %% The actual limit will be 512MB
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ application, set_env, [rabbit, max_message_size, 1024 * 1024 * 515]),
+
+ Binary512M = << Binary128M/binary, Binary128M/binary,
+ Binary128M/binary, Binary128M/binary>>,
+
+ BinaryBiggerThan512M = <<"_", Binary512M/binary>>,
+
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary512M}),
+ assert_channel_alive(Ch1),
+
+ Monitor1 = monitor(process, Ch1),
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan512M}),
+ ct:pal("Assert channel error 512"),
+ ExpectedException1 = <<"PRECONDITION_FAILED - message size ",
+ (integer_to_binary(byte_size(BinaryBiggerThan512M)))/binary,
+ " larger than max size ",
+ (integer_to_binary(byte_size(Binary512M)))/binary>>,
+ assert_channel_fail_max_size(Ch1, Monitor1, ExpectedException1).
+
%% ---------------------------------------------------------------------------
%% rabbitmqctl helpers.
%% ---------------------------------------------------------------------------