summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl59
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl19
2 files changed, 45 insertions, 33 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f626f059a0..1579383f25 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -158,7 +158,9 @@
delivery_flow,
interceptor_state,
queue_states,
- queue_cleanup_timer
+ queue_cleanup_timer,
+ %% Message content size limit
+ max_message_size
}).
-define(QUEUE, lqueue).
@@ -441,6 +443,12 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
_ ->
Limiter0
end,
+ MaxMessageSize = case application:get_env(rabbit, max_message_size) of
+ {ok, MS} when is_integer(MS) ->
+ erlang:min(MS, ?MAX_MSG_SIZE);
+ _ ->
+ ?MAX_MSG_SIZE
+ end,
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
@@ -473,7 +481,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
reply_consumer = none,
delivery_flow = Flow,
interceptor_state = undefined,
- queue_states = #{}},
+ queue_states = #{},
+ max_message_size = MaxMessageSize},
State1 = State#ch{
interceptor_state = rabbit_channel_interceptor:init(State)},
State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer),
@@ -985,20 +994,19 @@ extract_topic_variable_map_from_amqp_params([{amqp_params, {amqp_params_direct,
extract_topic_variable_map_from_amqp_params(_) ->
#{}.
-check_msg_size(Content) ->
+check_msg_size(Content, MaxMessageSize) ->
Size = rabbit_basic:maybe_gc_large_msg(Content),
- case Size > ?MAX_MSG_SIZE of
- true -> precondition_failed("message size ~B larger than max size ~B",
- [Size, ?MAX_MSG_SIZE]);
- false ->
- case application:get_env(rabbit, max_message_size) of
- {ok, MaxSize} when is_integer(MaxSize) andalso Size > MaxSize ->
- precondition_failed("message size ~B larger than"
- " configured max size ~B",
- [Size, MaxSize]);
-
- _ -> ok
- end
+ case Size of
+ S when S > MaxMessageSize ->
+ ErrorMessage = case MaxMessageSize of
+ ?MAX_MSG_SIZE ->
+ "message size ~B larger than max size ~B";
+ _ ->
+ "message size ~B larger than configured max size ~B"
+ end,
+ precondition_failed(ErrorMessage,
+ [Size, MaxMessageSize]);
+ _ -> ok
end.
check_vhost_queue_limit(#resource{name = QueueName}, VHost) ->
@@ -1172,16 +1180,17 @@ handle_method(#'basic.publish'{immediate = true}, _Content, _State) ->
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory},
- Content, State = #ch{virtual_host = VHostPath,
- tx = Tx,
- channel = ChannelNum,
- confirm_enabled = ConfirmEnabled,
- trace_state = TraceState,
- user = #user{username = Username} = User,
- conn_name = ConnName,
- delivery_flow = Flow,
- conn_pid = ConnPid}) ->
- check_msg_size(Content),
+ Content, State = #ch{virtual_host = VHostPath,
+ tx = Tx,
+ channel = ChannelNum,
+ confirm_enabled = ConfirmEnabled,
+ trace_state = TraceState,
+ user = #user{username = Username} = User,
+ conn_name = ConnName,
+ delivery_flow = Flow,
+ conn_pid = ConnPid,
+ max_message_size = MaxMessageSize}) ->
+ check_msg_size(Content, MaxMessageSize),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, User),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
index a97da8a715..94fcc3a177 100644
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ b/test/unit_inbroker_parallel_SUITE.erl
@@ -1343,13 +1343,12 @@ max_message_size(Config) ->
(integer_to_binary(Size128Mb))/binary>>,
assert_channel_fail_max_size(Ch, Monitor, ExpectedException),
-
- {_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
-
%% Set a bigger message size
rabbit_ct_broker_helpers:rpc(Config, 0,
application, set_env, [rabbit, max_message_size, 1024 * 1024 * 256]),
+ {_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+
amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary128M}),
assert_channel_alive(Ch1),
@@ -1361,22 +1360,26 @@ max_message_size(Config) ->
rabbit_ct_broker_helpers:rpc(Config, 0,
application, set_env, [rabbit, max_message_size, 1024 * 1024 * 515]),
+ %% Need a new channel for changes to take effect
+ rabbit_ct_client_helpers:close_channel(Ch1),
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config),
+
Binary512M = << Binary128M/binary, Binary128M/binary,
Binary128M/binary, Binary128M/binary>>,
BinaryBiggerThan512M = <<"_", Binary512M/binary>>,
- amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary512M}),
- assert_channel_alive(Ch1),
+ amqp_channel:call(Ch2, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary512M}),
+ assert_channel_alive(Ch2),
- Monitor1 = monitor(process, Ch1),
- amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan512M}),
+ Monitor2 = monitor(process, Ch2),
+ amqp_channel:call(Ch2, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan512M}),
ct:pal("Assert channel error 512"),
ExpectedException1 = <<"PRECONDITION_FAILED - message size ",
(integer_to_binary(byte_size(BinaryBiggerThan512M)))/binary,
" larger than max size ",
(integer_to_binary(byte_size(Binary512M)))/binary>>,
- assert_channel_fail_max_size(Ch1, Monitor1, ExpectedException1).
+ assert_channel_fail_max_size(Ch2, Monitor2, ExpectedException1).
%% ---------------------------------------------------------------------------
%% rabbitmqctl helpers.