summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-17 11:50:57 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-17 11:50:57 +0000
commit0e78304aae0c0640a64208d7c59cc03866c442a3 (patch)
tree9f055b48e1a403e6e72338b4e24b2b95b0b6fff7 /src
parent910581a849499be0cec89bf9cd8f0ad044aa66b2 (diff)
downloadrabbitmq-server-git-0e78304aae0c0640a64208d7c59cc03866c442a3.tar.gz
Limit maximum message size.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_basic.erl6
-rw-r--r--src/rabbit_channel.erl9
-rw-r--r--src/rabbit_mirror_queue_master.erl8
3 files changed, 17 insertions, 6 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 2e825536b2..1f42322ca4 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -22,7 +22,7 @@
message/3, message/4, properties/1, prepend_table_header/3,
extract_headers/1, map_headers/2, delivery/3, header_routes/1,
parse_expiration/1]).
--export([build_content/2, from_content/1]).
+-export([build_content/2, from_content/1, msg_size/1]).
%%----------------------------------------------------------------------------
@@ -77,6 +77,8 @@
(rabbit_framing:amqp_property_record())
-> rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any())).
+-spec(msg_size/1 :: (rabbit_types:content()) -> non_neg_integer()).
+
-endif.
%%----------------------------------------------------------------------------
@@ -274,3 +276,5 @@ parse_expiration(#'P_basic'{expiration = Expiration}) ->
{error, {leftover_string, S}}
end.
+msg_size(#basic_message{content = #content{payload_fragments_rev = PFR}}) ->
+ iolist_size(PFR).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 62e6994e11..98c0c1f129 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -518,6 +518,14 @@ check_internal_exchange(#exchange{name = Name, internal = true}) ->
check_internal_exchange(_) ->
ok.
+check_msg_size(Content) ->
+ Size = rabbit_basic:msg_size(Content),
+ case Size > ?MAX_MSG_SIZE of
+ true -> precondition_failed("message size ~s larger than max size ~s",
+ [Size, ?MAX_MSG_SIZE]);
+ false -> ok
+ end.
+
expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) ->
rabbit_misc:protocol_error(
not_found, "no previously declared queue", []);
@@ -648,6 +656,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
tx = Tx,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState}) ->
+ check_msg_size(Content),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 20283a15f3..58400f3958 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -212,7 +212,8 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}, msg_size(Msg)),
+ ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg},
+ rabbit_basic:msg_size(Msg)),
BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
@@ -223,7 +224,7 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg},
- msg_size(Msg)),
+ rabbit_basic:msg_size(Msg)),
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.
@@ -480,6 +481,3 @@ ensure_monitoring(ChPid, State = #state { coordinator = CPid,
CPid, [ChPid]),
State #state { known_senders = sets:add_element(ChPid, KS) }
end.
-
-msg_size(#basic_message{content = #content{payload_fragments_rev = PFR}}) ->
- iolist_size(PFR).