summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl23
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl89
2 files changed, 54 insertions, 58 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 1579383f25..eeae247193 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -72,7 +72,7 @@
-export([get_vhost/1, get_user/1]).
%% For testing
-export([build_topic_variable_map/3]).
--export([list_queue_states/1]).
+-export([list_queue_states/1, get_max_message_size/0]).
%% Mgmt HTTP API refactor
-export([handle_method/5]).
@@ -443,12 +443,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
_ ->
Limiter0
end,
- MaxMessageSize = case application:get_env(rabbit, max_message_size) of
- {ok, MS} when is_integer(MS) ->
- erlang:min(MS, ?MAX_MSG_SIZE);
- _ ->
- ?MAX_MSG_SIZE
- end,
+ MaxMessageSize = get_max_message_size(),
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
@@ -802,6 +797,16 @@ code_change(_OldVsn, State, _Extra) ->
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+-spec get_max_message_size() -> non_neg_integer().
+
+get_max_message_size() ->
+ case application:get_env(rabbit, max_message_size) of
+ {ok, MS} when is_integer(MS) ->
+ erlang:min(MS, ?MAX_MSG_SIZE);
+ _ ->
+ ?MAX_MSG_SIZE
+ end.
+
%%---------------------------------------------------------------------------
reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
@@ -1000,9 +1005,9 @@ check_msg_size(Content, MaxMessageSize) ->
S when S > MaxMessageSize ->
ErrorMessage = case MaxMessageSize of
?MAX_MSG_SIZE ->
- "message size ~B larger than max size ~B";
+ "message size ~B is larger than max size ~B";
_ ->
- "message size ~B larger than configured max size ~B"
+ "message size ~B is larger than configured max size ~B"
end,
precondition_failed(ErrorMessage,
[Size, MaxMessageSize]);
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
index 126f3b9083..c0a4ba53a3 100644
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ b/test/unit_inbroker_parallel_SUITE.erl
@@ -25,6 +25,7 @@
-define(TIMEOUT_LIST_OPS_PASS, 5000).
-define(TIMEOUT, 30000).
+-define(TIMEOUT_CHANNEL_EXCEPTION, 5000).
-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
@@ -58,13 +59,18 @@ groups() ->
set_disk_free_limit_command,
set_vm_memory_high_watermark_command,
topic_matching,
- max_message_size,
{queue_max_length, [], [
{max_length_simple, [], MaxLengthTests},
- {max_length_mirrored, [], MaxLengthTests}]}
+ {max_length_mirrored, [], MaxLengthTests}]},
+ max_message_size
]}
].
+suite() ->
+ [
+ {timetrap, {seconds, 30}}
+ ].
+
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
@@ -1308,80 +1314,65 @@ assert_channel_alive(Ch) ->
amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>},
#amqp_msg{payload = <<"HI">>}).
-assert_channel_fail_max_size(Ch, Monitor, ExpectedException) ->
+assert_channel_fail_max_size(Ch, Monitor) ->
receive
{'DOWN', Monitor, process, Ch,
{shutdown,
- {server_initiated_close, 406, Exception}}} ->
- ?assertMatch(Exception, ExpectedException)
- after 100000 ->
+ {server_initiated_close, 406, _Error}}} ->
+ ok
+ after ?TIMEOUT_CHANNEL_EXCEPTION ->
error({channel_exception_expected, max_message_size})
end.
max_message_size(Config) ->
- Binary128M = gen_binary_mb(128),
+ Binary2M = gen_binary_mb(2),
+ Binary4M = gen_binary_mb(4),
+ Binary6M = gen_binary_mb(6),
+ Binary10M = gen_binary_mb(10),
+
+ Size2Mb = 1024 * 1024 * 2,
+ Size2Mb = byte_size(Binary2M),
- %% Default message size is 128MB
- Size128Mb = 1024 * 1024 * 128,
- Size128Mb = byte_size(Binary128M),
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ application, set_env, [rabbit, max_message_size, 1024 * 1024 * 3]),
- Size128Mb = rabbit_ct_broker_helpers:rpc(Config, 0,
- application, get_env, [rabbit, max_message_size, undefined]),
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
%% Binary is whithin the max size limit
- amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary128M}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary2M}),
%% The channel process is alive
assert_channel_alive(Ch),
Monitor = monitor(process, Ch),
- %% This publish should cause a channel exception
- BinaryBiggerThan128M = <<"_", Binary128M/binary>>,
- amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan128M}),
- ct:pal("Assert channel error 128"),
- ExpectedException = <<"PRECONDITION_FAILED - message size ",
- (integer_to_binary(byte_size(BinaryBiggerThan128M)))/binary,
- " larger than configured max size ",
- (integer_to_binary(Size128Mb))/binary>>,
- assert_channel_fail_max_size(Ch, Monitor, ExpectedException),
-
- %% Set a bigger message size
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary4M}),
+ assert_channel_fail_max_size(Ch, Monitor),
+
+ %% increase the limit
rabbit_ct_broker_helpers:rpc(Config, 0,
- application, set_env, [rabbit, max_message_size, 1024 * 1024 * 256]),
+ application, set_env, [rabbit, max_message_size, 1024 * 1024 * 8]),
{_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary128M}),
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary2M}),
assert_channel_alive(Ch1),
- amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan128M}),
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary4M}),
assert_channel_alive(Ch1),
- %% Set message size above 512MB.
- %% The actual limit will be 512MB
- rabbit_ct_broker_helpers:rpc(Config, 0,
- application, set_env, [rabbit, max_message_size, 1024 * 1024 * 515]),
-
- %% Need a new channel for changes to take effect
- rabbit_ct_client_helpers:close_channel(Ch1),
- Ch2 = rabbit_ct_client_helpers:open_channel(Config),
-
- Binary512M = << Binary128M/binary, Binary128M/binary,
- Binary128M/binary, Binary128M/binary>>,
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary6M}),
+ assert_channel_alive(Ch1),
- BinaryBiggerThan512M = <<"_", Binary512M/binary>>,
+ Monitor1 = monitor(process, Ch1),
+ amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary10M}),
+ assert_channel_fail_max_size(Ch1, Monitor1),
- amqp_channel:call(Ch2, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary512M}),
- assert_channel_alive(Ch2),
+ %% increase beyond the hard limit
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ application, set_env, [rabbit, max_message_size, 1024 * 1024 * 600]),
+ Val = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_channel, get_max_message_size, []),
- Monitor2 = monitor(process, Ch2),
- amqp_channel:call(Ch2, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan512M}),
- ct:pal("Assert channel error 512"),
- ExpectedException1 = <<"PRECONDITION_FAILED - message size ",
- (integer_to_binary(byte_size(BinaryBiggerThan512M)))/binary,
- " larger than max size ",
- (integer_to_binary(byte_size(Binary512M)))/binary>>,
- assert_channel_fail_max_size(Ch2, Monitor2, ExpectedException1).
+ ?assertEqual(?MAX_MSG_SIZE, Val).
%% ---------------------------------------------------------------------------
%% rabbitmqctl helpers.