diff options
| -rw-r--r-- | src/rabbit_channel.erl | 23 | ||||
| -rw-r--r-- | test/unit_inbroker_parallel_SUITE.erl | 89 |
2 files changed, 54 insertions, 58 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 1579383f25..eeae247193 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -72,7 +72,7 @@ -export([get_vhost/1, get_user/1]). %% For testing -export([build_topic_variable_map/3]). --export([list_queue_states/1]). +-export([list_queue_states/1, get_max_message_size/0]). %% Mgmt HTTP API refactor -export([handle_method/5]). @@ -443,12 +443,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, _ -> Limiter0 end, - MaxMessageSize = case application:get_env(rabbit, max_message_size) of - {ok, MS} when is_integer(MS) -> - erlang:min(MS, ?MAX_MSG_SIZE); - _ -> - ?MAX_MSG_SIZE - end, + MaxMessageSize = get_max_message_size(), State = #ch{state = starting, protocol = Protocol, channel = Channel, @@ -802,6 +797,16 @@ code_change(_OldVsn, State, _Extra) -> format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). +-spec get_max_message_size() -> non_neg_integer(). + +get_max_message_size() -> + case application:get_env(rabbit, max_message_size) of + {ok, MS} when is_integer(MS) -> + erlang:min(MS, ?MAX_MSG_SIZE); + _ -> + ?MAX_MSG_SIZE + end. + %%--------------------------------------------------------------------------- reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}. @@ -1000,9 +1005,9 @@ check_msg_size(Content, MaxMessageSize) -> S when S > MaxMessageSize -> ErrorMessage = case MaxMessageSize of ?MAX_MSG_SIZE -> - "message size ~B larger than max size ~B"; + "message size ~B is larger than max size ~B"; _ -> - "message size ~B larger than configured max size ~B" + "message size ~B is larger than configured max size ~B" end, precondition_failed(ErrorMessage, [Size, MaxMessageSize]); diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index 126f3b9083..c0a4ba53a3 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -25,6 +25,7 @@ -define(TIMEOUT_LIST_OPS_PASS, 5000). -define(TIMEOUT, 30000). +-define(TIMEOUT_CHANNEL_EXCEPTION, 5000). -define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>). @@ -58,13 +59,18 @@ groups() -> set_disk_free_limit_command, set_vm_memory_high_watermark_command, topic_matching, - max_message_size, {queue_max_length, [], [ {max_length_simple, [], MaxLengthTests}, - {max_length_mirrored, [], MaxLengthTests}]} + {max_length_mirrored, [], MaxLengthTests}]}, + max_message_size ]} ]. +suite() -> + [ + {timetrap, {seconds, 30}} + ]. + %% ------------------------------------------------------------------- %% Testsuite setup/teardown. %% ------------------------------------------------------------------- @@ -1308,80 +1314,65 @@ assert_channel_alive(Ch) -> amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = <<"HI">>}). -assert_channel_fail_max_size(Ch, Monitor, ExpectedException) -> +assert_channel_fail_max_size(Ch, Monitor) -> receive {'DOWN', Monitor, process, Ch, {shutdown, - {server_initiated_close, 406, Exception}}} -> - ?assertMatch(Exception, ExpectedException) - after 100000 -> + {server_initiated_close, 406, _Error}}} -> + ok + after ?TIMEOUT_CHANNEL_EXCEPTION -> error({channel_exception_expected, max_message_size}) end. max_message_size(Config) -> - Binary128M = gen_binary_mb(128), + Binary2M = gen_binary_mb(2), + Binary4M = gen_binary_mb(4), + Binary6M = gen_binary_mb(6), + Binary10M = gen_binary_mb(10), + + Size2Mb = 1024 * 1024 * 2, + Size2Mb = byte_size(Binary2M), - %% Default message size is 128MB - Size128Mb = 1024 * 1024 * 128, - Size128Mb = byte_size(Binary128M), + rabbit_ct_broker_helpers:rpc(Config, 0, + application, set_env, [rabbit, max_message_size, 1024 * 1024 * 3]), - Size128Mb = rabbit_ct_broker_helpers:rpc(Config, 0, - application, get_env, [rabbit, max_message_size, undefined]), {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), %% Binary is whithin the max size limit - amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary128M}), + amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary2M}), %% The channel process is alive assert_channel_alive(Ch), Monitor = monitor(process, Ch), - %% This publish should cause a channel exception - BinaryBiggerThan128M = <<"_", Binary128M/binary>>, - amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan128M}), - ct:pal("Assert channel error 128"), - ExpectedException = <<"PRECONDITION_FAILED - message size ", - (integer_to_binary(byte_size(BinaryBiggerThan128M)))/binary, - " larger than configured max size ", - (integer_to_binary(Size128Mb))/binary>>, - assert_channel_fail_max_size(Ch, Monitor, ExpectedException), - - %% Set a bigger message size + amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary4M}), + assert_channel_fail_max_size(Ch, Monitor), + + %% increase the limit rabbit_ct_broker_helpers:rpc(Config, 0, - application, set_env, [rabbit, max_message_size, 1024 * 1024 * 256]), + application, set_env, [rabbit, max_message_size, 1024 * 1024 * 8]), {_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary128M}), + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary2M}), assert_channel_alive(Ch1), - amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan128M}), + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary4M}), assert_channel_alive(Ch1), - %% Set message size above 512MB. - %% The actual limit will be 512MB - rabbit_ct_broker_helpers:rpc(Config, 0, - application, set_env, [rabbit, max_message_size, 1024 * 1024 * 515]), - - %% Need a new channel for changes to take effect - rabbit_ct_client_helpers:close_channel(Ch1), - Ch2 = rabbit_ct_client_helpers:open_channel(Config), - - Binary512M = << Binary128M/binary, Binary128M/binary, - Binary128M/binary, Binary128M/binary>>, + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary6M}), + assert_channel_alive(Ch1), - BinaryBiggerThan512M = <<"_", Binary512M/binary>>, + Monitor1 = monitor(process, Ch1), + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary10M}), + assert_channel_fail_max_size(Ch1, Monitor1), - amqp_channel:call(Ch2, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary512M}), - assert_channel_alive(Ch2), + %% increase beyond the hard limit + rabbit_ct_broker_helpers:rpc(Config, 0, + application, set_env, [rabbit, max_message_size, 1024 * 1024 * 600]), + Val = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_channel, get_max_message_size, []), - Monitor2 = monitor(process, Ch2), - amqp_channel:call(Ch2, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan512M}), - ct:pal("Assert channel error 512"), - ExpectedException1 = <<"PRECONDITION_FAILED - message size ", - (integer_to_binary(byte_size(BinaryBiggerThan512M)))/binary, - " larger than max size ", - (integer_to_binary(byte_size(Binary512M)))/binary>>, - assert_channel_fail_max_size(Ch2, Monitor2, ExpectedException1). + ?assertEqual(?MAX_MSG_SIZE, Val). %% --------------------------------------------------------------------------- %% rabbitmqctl helpers. |
