summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDaniil Fedotov <hairyhum@gmail.com>2019-01-02 12:06:53 +0400
committerGitHub <noreply@github.com>2019-01-02 12:06:53 +0400
commitbe0d28797810e22aba8c3250bd6090278fe87526 (patch)
treebd613813049daaa1c34c37a466c81a88c6e7a5c6 /src
parent3e35edaee37f8faa2def517d4046944f95621480 (diff)
parentb1b995ca9f7eb45c90cbfb31194c81af3e798321 (diff)
downloadrabbitmq-server-git-be0d28797810e22aba8c3250bd6090278fe87526.tar.gz
Merge pull request #1812 from rabbitmq/max_msg_size
Introduce a configurable limit to message size
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl58
1 files changed, 40 insertions, 18 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index d1f3b06528..eeae247193 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -72,7 +72,7 @@
-export([get_vhost/1, get_user/1]).
%% For testing
-export([build_topic_variable_map/3]).
--export([list_queue_states/1]).
+-export([list_queue_states/1, get_max_message_size/0]).
%% Mgmt HTTP API refactor
-export([handle_method/5]).
@@ -158,7 +158,9 @@
delivery_flow,
interceptor_state,
queue_states,
- queue_cleanup_timer
+ queue_cleanup_timer,
+ %% Message content size limit
+ max_message_size
}).
-define(QUEUE, lqueue).
@@ -441,6 +443,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
_ ->
Limiter0
end,
+ MaxMessageSize = get_max_message_size(),
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
@@ -473,7 +476,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
reply_consumer = none,
delivery_flow = Flow,
interceptor_state = undefined,
- queue_states = #{}},
+ queue_states = #{},
+ max_message_size = MaxMessageSize},
State1 = State#ch{
interceptor_state = rabbit_channel_interceptor:init(State)},
State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer),
@@ -793,6 +797,16 @@ code_change(_OldVsn, State, _Extra) ->
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+-spec get_max_message_size() -> non_neg_integer().
+
+get_max_message_size() ->
+ case application:get_env(rabbit, max_message_size) of
+ {ok, MS} when is_integer(MS) ->
+ erlang:min(MS, ?MAX_MSG_SIZE);
+ _ ->
+ ?MAX_MSG_SIZE
+ end.
+
%%---------------------------------------------------------------------------
reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
@@ -985,12 +999,19 @@ extract_topic_variable_map_from_amqp_params([{amqp_params, {amqp_params_direct,
extract_topic_variable_map_from_amqp_params(_) ->
#{}.
-check_msg_size(Content) ->
+check_msg_size(Content, MaxMessageSize) ->
Size = rabbit_basic:maybe_gc_large_msg(Content),
- case Size > ?MAX_MSG_SIZE of
- true -> precondition_failed("message size ~B larger than max size ~B",
- [Size, ?MAX_MSG_SIZE]);
- false -> ok
+ case Size of
+ S when S > MaxMessageSize ->
+ ErrorMessage = case MaxMessageSize of
+ ?MAX_MSG_SIZE ->
+ "message size ~B is larger than max size ~B";
+ _ ->
+ "message size ~B is larger than configured max size ~B"
+ end,
+ precondition_failed(ErrorMessage,
+ [Size, MaxMessageSize]);
+ _ -> ok
end.
check_vhost_queue_limit(#resource{name = QueueName}, VHost) ->
@@ -1164,16 +1185,17 @@ handle_method(#'basic.publish'{immediate = true}, _Content, _State) ->
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory},
- Content, State = #ch{virtual_host = VHostPath,
- tx = Tx,
- channel = ChannelNum,
- confirm_enabled = ConfirmEnabled,
- trace_state = TraceState,
- user = #user{username = Username} = User,
- conn_name = ConnName,
- delivery_flow = Flow,
- conn_pid = ConnPid}) ->
- check_msg_size(Content),
+ Content, State = #ch{virtual_host = VHostPath,
+ tx = Tx,
+ channel = ChannelNum,
+ confirm_enabled = ConfirmEnabled,
+ trace_state = TraceState,
+ user = #user{username = Username} = User,
+ conn_name = ConnName,
+ delivery_flow = Flow,
+ conn_pid = ConnPid,
+ max_message_size = MaxMessageSize}) ->
+ check_msg_size(Content, MaxMessageSize),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, User),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),