diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-24 17:30:47 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-24 17:30:47 +0100 |
| commit | f5db5e3fa3f005344b78ac42c046e2dce26f2ab3 (patch) | |
| tree | 9248f009a55a53a0b457fbb52f14847a684083f9 /src | |
| parent | 43ba51419b344026bcac7ba1c2846fd37a8106c6 (diff) | |
| parent | d51f4ba4ed5a1e597a3b3742aeb0780e2368725e (diff) | |
| download | rabbitmq-server-git-f5db5e3fa3f005344b78ac42c046e2dce26f2ab3.tar.gz | |
Merging bug21673 into bug22896
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 40 |
3 files changed, 61 insertions, 20 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ced6c1c7ef..d337df294f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -605,8 +605,7 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}}; handle_method(#'basic.recover_async'{requeue = true}, - _, State = #ch{ transaction_id = none, - unacked_message_q = UAMQ }) -> + _, State = #ch{ unacked_message_q = UAMQ }) -> ok = fold_per_queue( fun (QPid, MsgIds, ok) -> %% The Qpid python test suite incorrectly assumes @@ -621,8 +620,7 @@ handle_method(#'basic.recover_async'{requeue = true}, {noreply, State#ch{unacked_message_q = queue:new()}}; handle_method(#'basic.recover_async'{requeue = false}, - _, State = #ch{ transaction_id = none, - writer_pid = WriterPid, + _, State = #ch{ writer_pid = WriterPid, unacked_message_q = UAMQ }) -> ok = rabbit_misc:queue_fold( fun ({_DeliveryTag, none, _Msg}, ok) -> @@ -646,10 +644,6 @@ handle_method(#'basic.recover_async'{requeue = false}, %% variant of this method {noreply, State}; -handle_method(#'basic.recover_async'{}, _, _State) -> - rabbit_misc:protocol_error( - not_allowed, "attempt to recover a transactional channel",[]); - handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> {noreply, State2 = #ch{writer_pid = WriterPid}} = handle_method(#'basic.recover_async'{requeue = Requeue}, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 73a58f1328..8ba5874061 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -53,6 +53,7 @@ -define(CLOSING_TIMEOUT, 1). -define(CHANNEL_TERMINATION_TIMEOUT, 3). -define(SILENT_CLOSE_DELAY, 3). +-define(FRAME_MAX, 131072). %% set to zero once QPid fix their negotiation %--------------------------------------------------------------------------- @@ -605,27 +606,33 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, ok = send_on_channel0( Sock, #'connection.tune'{channel_max = 0, - %% set to zero once QPid fix their negotiation - frame_max = 131072, + frame_max = ?FRAME_MAX, heartbeat = 0}), State#v1{connection_state = tuning, connection = Connection#connection{ user = User, client_properties = ClientProperties}}; -handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax, - frame_max = FrameMax, +handle_method0(#'connection.tune_ok'{frame_max = FrameMax, heartbeat = ClientHeartbeat}, State = #v1{connection_state = tuning, connection = Connection, sock = Sock}) -> - %% if we have a channel_max limit that the client wishes to - %% exceed, die as per spec. Not currently a problem, so we ignore - %% the client's channel_max parameter. - rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat), - State#v1{connection_state = opening, - connection = Connection#connection{ - timeout_sec = ClientHeartbeat, - frame_max = FrameMax}}; + if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) -> + rabbit_misc:protocol_error( + not_allowed, "frame_max=~w < ~w min size", + [FrameMax, ?FRAME_MIN_SIZE]); + (?FRAME_MAX /= 0) and (FrameMax > ?FRAME_MAX) -> + rabbit_misc:protocol_error( + not_allowed, "frame_max=~w > ~w max size", + [FrameMax, ?FRAME_MAX]); + true -> + rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat), + State#v1{connection_state = opening, + connection = Connection#connection{ + timeout_sec = ClientHeartbeat, + frame_max = FrameMax}} + end; + handle_method0(#'connection.open'{virtual_host = VHostPath, insist = Insist}, State = #v1{connection_state = opening, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index c684484d74..cffd0e7fd9 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -61,6 +61,7 @@ all_tests() -> passed = test_pg_local(), passed = test_unfold(), passed = test_parsing(), + passed = test_content_framing(), passed = test_topic_matching(), passed = test_log_management(), passed = test_app_management(), @@ -515,6 +516,45 @@ test_field_values() -> >>), passed. +%% Test that content frames don't exceed frame-max +test_content_framing(FrameMax, Fragments) -> + [Header | Frames] = + rabbit_binary_generator:build_simple_content_frames( + 1, + #content{class_id = 0, properties_bin = <<>>, + payload_fragments_rev = Fragments}, + FrameMax), + %% header is formatted correctly and the size is the total of the + %% fragments + <<_FrameHeader:7/binary, _ClassAndWeight:4/binary, + BodySize:64/unsigned, _Rest/binary>> = list_to_binary(Header), + BodySize = size(list_to_binary(Fragments)), + false = lists:any( + fun (ContentFrame) -> + FrameBinary = list_to_binary(ContentFrame), + %% assert + <<_TypeAndChannel:3/binary, + Size:32/unsigned, + _Payload:Size/binary, + 16#CE>> = FrameBinary, + size(FrameBinary) > FrameMax + end, + Frames), + passed. + +test_content_framing() -> + %% no content + passed = test_content_framing(4096, []), + passed = test_content_framing(4096, [<<>>]), + %% easily fit in one frame + passed = test_content_framing(4096, [<<"Easy">>]), + %% exactly one frame (empty frame = 8 bytes) + passed = test_content_framing(11, [<<"One">>]), + %% more than one frame + passed = test_content_framing(20, [<<"into more than one frame">>, + <<"This will have to go">>]), + passed. + test_topic_match(P, R) -> test_topic_match(P, R, true). |
