summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-06-24 17:30:47 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-06-24 17:30:47 +0100
commitf5db5e3fa3f005344b78ac42c046e2dce26f2ab3 (patch)
tree9248f009a55a53a0b457fbb52f14847a684083f9 /src
parent43ba51419b344026bcac7ba1c2846fd37a8106c6 (diff)
parentd51f4ba4ed5a1e597a3b3742aeb0780e2368725e (diff)
downloadrabbitmq-server-git-f5db5e3fa3f005344b78ac42c046e2dce26f2ab3.tar.gz
Merging bug21673 into bug22896
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl10
-rw-r--r--src/rabbit_reader.erl31
-rw-r--r--src/rabbit_tests.erl40
3 files changed, 61 insertions, 20 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ced6c1c7ef..d337df294f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -605,8 +605,7 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
{reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}};
handle_method(#'basic.recover_async'{requeue = true},
- _, State = #ch{ transaction_id = none,
- unacked_message_q = UAMQ }) ->
+ _, State = #ch{ unacked_message_q = UAMQ }) ->
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
%% The Qpid python test suite incorrectly assumes
@@ -621,8 +620,7 @@ handle_method(#'basic.recover_async'{requeue = true},
{noreply, State#ch{unacked_message_q = queue:new()}};
handle_method(#'basic.recover_async'{requeue = false},
- _, State = #ch{ transaction_id = none,
- writer_pid = WriterPid,
+ _, State = #ch{ writer_pid = WriterPid,
unacked_message_q = UAMQ }) ->
ok = rabbit_misc:queue_fold(
fun ({_DeliveryTag, none, _Msg}, ok) ->
@@ -646,10 +644,6 @@ handle_method(#'basic.recover_async'{requeue = false},
%% variant of this method
{noreply, State};
-handle_method(#'basic.recover_async'{}, _, _State) ->
- rabbit_misc:protocol_error(
- not_allowed, "attempt to recover a transactional channel",[]);
-
handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
{noreply, State2 = #ch{writer_pid = WriterPid}} =
handle_method(#'basic.recover_async'{requeue = Requeue},
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 73a58f1328..8ba5874061 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -53,6 +53,7 @@
-define(CLOSING_TIMEOUT, 1).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
-define(SILENT_CLOSE_DELAY, 3).
+-define(FRAME_MAX, 131072). %% set to zero once QPid fix their negotiation
%---------------------------------------------------------------------------
@@ -605,27 +606,33 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
ok = send_on_channel0(
Sock,
#'connection.tune'{channel_max = 0,
- %% set to zero once QPid fix their negotiation
- frame_max = 131072,
+ frame_max = ?FRAME_MAX,
heartbeat = 0}),
State#v1{connection_state = tuning,
connection = Connection#connection{
user = User,
client_properties = ClientProperties}};
-handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax,
- frame_max = FrameMax,
+handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
connection = Connection,
sock = Sock}) ->
- %% if we have a channel_max limit that the client wishes to
- %% exceed, die as per spec. Not currently a problem, so we ignore
- %% the client's channel_max parameter.
- rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat),
- State#v1{connection_state = opening,
- connection = Connection#connection{
- timeout_sec = ClientHeartbeat,
- frame_max = FrameMax}};
+ if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) ->
+ rabbit_misc:protocol_error(
+ not_allowed, "frame_max=~w < ~w min size",
+ [FrameMax, ?FRAME_MIN_SIZE]);
+ (?FRAME_MAX /= 0) and (FrameMax > ?FRAME_MAX) ->
+ rabbit_misc:protocol_error(
+ not_allowed, "frame_max=~w > ~w max size",
+ [FrameMax, ?FRAME_MAX]);
+ true ->
+ rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat),
+ State#v1{connection_state = opening,
+ connection = Connection#connection{
+ timeout_sec = ClientHeartbeat,
+ frame_max = FrameMax}}
+ end;
+
handle_method0(#'connection.open'{virtual_host = VHostPath,
insist = Insist},
State = #v1{connection_state = opening,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index c684484d74..cffd0e7fd9 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -61,6 +61,7 @@ all_tests() ->
passed = test_pg_local(),
passed = test_unfold(),
passed = test_parsing(),
+ passed = test_content_framing(),
passed = test_topic_matching(),
passed = test_log_management(),
passed = test_app_management(),
@@ -515,6 +516,45 @@ test_field_values() ->
>>),
passed.
+%% Test that content frames don't exceed frame-max
+test_content_framing(FrameMax, Fragments) ->
+ [Header | Frames] =
+ rabbit_binary_generator:build_simple_content_frames(
+ 1,
+ #content{class_id = 0, properties_bin = <<>>,
+ payload_fragments_rev = Fragments},
+ FrameMax),
+ %% header is formatted correctly and the size is the total of the
+ %% fragments
+ <<_FrameHeader:7/binary, _ClassAndWeight:4/binary,
+ BodySize:64/unsigned, _Rest/binary>> = list_to_binary(Header),
+ BodySize = size(list_to_binary(Fragments)),
+ false = lists:any(
+ fun (ContentFrame) ->
+ FrameBinary = list_to_binary(ContentFrame),
+ %% assert
+ <<_TypeAndChannel:3/binary,
+ Size:32/unsigned,
+ _Payload:Size/binary,
+ 16#CE>> = FrameBinary,
+ size(FrameBinary) > FrameMax
+ end,
+ Frames),
+ passed.
+
+test_content_framing() ->
+ %% no content
+ passed = test_content_framing(4096, []),
+ passed = test_content_framing(4096, [<<>>]),
+ %% easily fit in one frame
+ passed = test_content_framing(4096, [<<"Easy">>]),
+ %% exactly one frame (empty frame = 8 bytes)
+ passed = test_content_framing(11, [<<"One">>]),
+ %% more than one frame
+ passed = test_content_framing(20, [<<"into more than one frame">>,
+ <<"This will have to go">>]),
+ passed.
+
test_topic_match(P, R) ->
test_topic_match(P, R, true).