diff options
| author | Daniil Fedotov <hairyhum@gmail.com> | 2019-01-02 12:06:53 +0400 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-01-02 12:06:53 +0400 |
| commit | be0d28797810e22aba8c3250bd6090278fe87526 (patch) | |
| tree | bd613813049daaa1c34c37a466c81a88c6e7a5c6 /src | |
| parent | 3e35edaee37f8faa2def517d4046944f95621480 (diff) | |
| parent | b1b995ca9f7eb45c90cbfb31194c81af3e798321 (diff) | |
| download | rabbitmq-server-git-be0d28797810e22aba8c3250bd6090278fe87526.tar.gz | |
Merge pull request #1812 from rabbitmq/max_msg_size
Introduce a configurable limit to message size
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 58 |
1 files changed, 40 insertions, 18 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d1f3b06528..eeae247193 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -72,7 +72,7 @@ -export([get_vhost/1, get_user/1]). %% For testing -export([build_topic_variable_map/3]). --export([list_queue_states/1]). +-export([list_queue_states/1, get_max_message_size/0]). %% Mgmt HTTP API refactor -export([handle_method/5]). @@ -158,7 +158,9 @@ delivery_flow, interceptor_state, queue_states, - queue_cleanup_timer + queue_cleanup_timer, + %% Message content size limit + max_message_size }). -define(QUEUE, lqueue). @@ -441,6 +443,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, _ -> Limiter0 end, + MaxMessageSize = get_max_message_size(), State = #ch{state = starting, protocol = Protocol, channel = Channel, @@ -473,7 +476,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, reply_consumer = none, delivery_flow = Flow, interceptor_state = undefined, - queue_states = #{}}, + queue_states = #{}, + max_message_size = MaxMessageSize}, State1 = State#ch{ interceptor_state = rabbit_channel_interceptor:init(State)}, State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer), @@ -793,6 +797,16 @@ code_change(_OldVsn, State, _Extra) -> format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). +-spec get_max_message_size() -> non_neg_integer(). + +get_max_message_size() -> + case application:get_env(rabbit, max_message_size) of + {ok, MS} when is_integer(MS) -> + erlang:min(MS, ?MAX_MSG_SIZE); + _ -> + ?MAX_MSG_SIZE + end. + %%--------------------------------------------------------------------------- reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}. @@ -985,12 +999,19 @@ extract_topic_variable_map_from_amqp_params([{amqp_params, {amqp_params_direct, extract_topic_variable_map_from_amqp_params(_) -> #{}. -check_msg_size(Content) -> +check_msg_size(Content, MaxMessageSize) -> Size = rabbit_basic:maybe_gc_large_msg(Content), - case Size > ?MAX_MSG_SIZE of - true -> precondition_failed("message size ~B larger than max size ~B", - [Size, ?MAX_MSG_SIZE]); - false -> ok + case Size of + S when S > MaxMessageSize -> + ErrorMessage = case MaxMessageSize of + ?MAX_MSG_SIZE -> + "message size ~B is larger than max size ~B"; + _ -> + "message size ~B is larger than configured max size ~B" + end, + precondition_failed(ErrorMessage, + [Size, MaxMessageSize]); + _ -> ok end. check_vhost_queue_limit(#resource{name = QueueName}, VHost) -> @@ -1164,16 +1185,17 @@ handle_method(#'basic.publish'{immediate = true}, _Content, _State) -> handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory}, - Content, State = #ch{virtual_host = VHostPath, - tx = Tx, - channel = ChannelNum, - confirm_enabled = ConfirmEnabled, - trace_state = TraceState, - user = #user{username = Username} = User, - conn_name = ConnName, - delivery_flow = Flow, - conn_pid = ConnPid}) -> - check_msg_size(Content), + Content, State = #ch{virtual_host = VHostPath, + tx = Tx, + channel = ChannelNum, + confirm_enabled = ConfirmEnabled, + trace_state = TraceState, + user = #user{username = Username} = User, + conn_name = ConnName, + delivery_flow = Flow, + conn_pid = ConnPid, + max_message_size = MaxMessageSize}) -> + check_msg_size(Content, MaxMessageSize), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, User), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), |
