diff options
| -rw-r--r-- | src/rabbit_channel.erl | 59 | ||||
| -rw-r--r-- | test/unit_inbroker_parallel_SUITE.erl | 19 |
2 files changed, 45 insertions, 33 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f626f059a0..1579383f25 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -158,7 +158,9 @@ delivery_flow, interceptor_state, queue_states, - queue_cleanup_timer + queue_cleanup_timer, + %% Message content size limit + max_message_size }). -define(QUEUE, lqueue). @@ -441,6 +443,12 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, _ -> Limiter0 end, + MaxMessageSize = case application:get_env(rabbit, max_message_size) of + {ok, MS} when is_integer(MS) -> + erlang:min(MS, ?MAX_MSG_SIZE); + _ -> + ?MAX_MSG_SIZE + end, State = #ch{state = starting, protocol = Protocol, channel = Channel, @@ -473,7 +481,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, reply_consumer = none, delivery_flow = Flow, interceptor_state = undefined, - queue_states = #{}}, + queue_states = #{}, + max_message_size = MaxMessageSize}, State1 = State#ch{ interceptor_state = rabbit_channel_interceptor:init(State)}, State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer), @@ -985,20 +994,19 @@ extract_topic_variable_map_from_amqp_params([{amqp_params, {amqp_params_direct, extract_topic_variable_map_from_amqp_params(_) -> #{}. -check_msg_size(Content) -> +check_msg_size(Content, MaxMessageSize) -> Size = rabbit_basic:maybe_gc_large_msg(Content), - case Size > ?MAX_MSG_SIZE of - true -> precondition_failed("message size ~B larger than max size ~B", - [Size, ?MAX_MSG_SIZE]); - false -> - case application:get_env(rabbit, max_message_size) of - {ok, MaxSize} when is_integer(MaxSize) andalso Size > MaxSize -> - precondition_failed("message size ~B larger than" - " configured max size ~B", - [Size, MaxSize]); - - _ -> ok - end + case Size of + S when S > MaxMessageSize -> + ErrorMessage = case MaxMessageSize of + ?MAX_MSG_SIZE -> + "message size ~B larger than max size ~B"; + _ -> + "message size ~B larger than configured max size ~B" + end, + precondition_failed(ErrorMessage, + [Size, MaxMessageSize]); + _ -> ok end. check_vhost_queue_limit(#resource{name = QueueName}, VHost) -> @@ -1172,16 +1180,17 @@ handle_method(#'basic.publish'{immediate = true}, _Content, _State) -> handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory}, - Content, State = #ch{virtual_host = VHostPath, - tx = Tx, - channel = ChannelNum, - confirm_enabled = ConfirmEnabled, - trace_state = TraceState, - user = #user{username = Username} = User, - conn_name = ConnName, - delivery_flow = Flow, - conn_pid = ConnPid}) -> - check_msg_size(Content), + Content, State = #ch{virtual_host = VHostPath, + tx = Tx, + channel = ChannelNum, + confirm_enabled = ConfirmEnabled, + trace_state = TraceState, + user = #user{username = Username} = User, + conn_name = ConnName, + delivery_flow = Flow, + conn_pid = ConnPid, + max_message_size = MaxMessageSize}) -> + check_msg_size(Content, MaxMessageSize), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, User), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index a97da8a715..94fcc3a177 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -1343,13 +1343,12 @@ max_message_size(Config) -> (integer_to_binary(Size128Mb))/binary>>, assert_channel_fail_max_size(Ch, Monitor, ExpectedException), - - {_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - %% Set a bigger message size rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbit, max_message_size, 1024 * 1024 * 256]), + {_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary128M}), assert_channel_alive(Ch1), @@ -1361,22 +1360,26 @@ max_message_size(Config) -> rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbit, max_message_size, 1024 * 1024 * 515]), + %% Need a new channel for changes to take effect + rabbit_ct_client_helpers:close_channel(Ch1), + Ch2 = rabbit_ct_client_helpers:open_channel(Config), + Binary512M = << Binary128M/binary, Binary128M/binary, Binary128M/binary, Binary128M/binary>>, BinaryBiggerThan512M = <<"_", Binary512M/binary>>, - amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary512M}), - assert_channel_alive(Ch1), + amqp_channel:call(Ch2, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary512M}), + assert_channel_alive(Ch2), - Monitor1 = monitor(process, Ch1), - amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan512M}), + Monitor2 = monitor(process, Ch2), + amqp_channel:call(Ch2, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan512M}), ct:pal("Assert channel error 512"), ExpectedException1 = <<"PRECONDITION_FAILED - message size ", (integer_to_binary(byte_size(BinaryBiggerThan512M)))/binary, " larger than max size ", (integer_to_binary(byte_size(Binary512M)))/binary>>, - assert_channel_fail_max_size(Ch1, Monitor1, ExpectedException1). + assert_channel_fail_max_size(Ch2, Monitor2, ExpectedException1). %% --------------------------------------------------------------------------- %% rabbitmqctl helpers. |
