diff options
| author | Michael Klishin <michael@rabbitmq.com> | 2013-11-25 15:19:50 +0400 |
|---|---|---|
| committer | Michael Klishin <michael@rabbitmq.com> | 2013-11-25 15:19:50 +0400 |
| commit | d799ebb5bdd967bad0c92328f2188da9e4eaf4cf (patch) | |
| tree | d45027f5cf154e4e4b4756470463ca45b7c3136e | |
| parent | 6472f3c07a2624e6c1a62ea70479d2b9cb693ceb (diff) | |
| download | rabbitmq-server-git-d799ebb5bdd967bad0c92328f2188da9e4eaf4cf.tar.gz | |
Make connection.tune.channel_max configurable
| -rw-r--r-- | src/rabbit_reader.erl | 61 |
1 files changed, 44 insertions, 17 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e00732fdcf..220db174aa 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -32,6 +32,7 @@ -define(CLOSING_TIMEOUT, 30). -define(CHANNEL_TERMINATION_TIMEOUT, 3). -define(SILENT_CLOSE_DELAY, 3). +-define(CHANNEL_MIN, 4). %%-------------------------------------------------------------------------- @@ -838,8 +839,9 @@ handle_method0(#'connection.secure_ok'{response = Response}, State = #v1{connection_state = securing}) -> auth_phase(Response, State); -handle_method0(#'connection.tune_ok'{frame_max = FrameMax, - heartbeat = ClientHeartbeat}, +handle_method0(#'connection.tune_ok'{frame_max = FrameMax, + heartbeat = ClientHeartbeat, + channel_max = ChannelMax}, State = #v1{connection_state = tuning, connection = Connection, helper_sup = SupPid, @@ -854,21 +856,40 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ServerFrameMax]); true -> - {ok, Collector} = + ServerChannelMax = server_channel_max(), + Protocol = Connection#connection.protocol, + if ChannelMax /= 0 andalso ChannelMax < ?CHANNEL_MIN -> + AmqpError = rabbit_misc:amqp_error( + not_allowed, "negotiated channel_max=~w < ~w min size", + [ChannelMax, ServerChannelMax], none), + {0, CloseMethod} = rabbit_binary_generator:map_exception( + 0, AmqpError, Protocol), + ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol); + ChannelMax /= 0 andalso ChannelMax > ServerChannelMax -> + AmqpError = rabbit_misc:amqp_error( + not_allowed, "negotiated channel_max=~w > ~w max size", + [ChannelMax, ServerChannelMax], none), + {0, CloseMethod} = rabbit_binary_generator:map_exception( + 0, AmqpError, Protocol), + ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol); + true -> + {ok, Collector} = rabbit_connection_helper_sup:start_queue_collector(SupPid), - Frame = rabbit_binary_generator:build_heartbeat_frame(), - SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, - Parent = self(), - ReceiveFun = fun() -> Parent ! heartbeat_timeout end, - Heartbeater = - rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat, - SendFun, ClientHeartbeat, ReceiveFun), - State#v1{connection_state = opening, - connection = Connection#connection{ - timeout_sec = ClientHeartbeat, - frame_max = FrameMax}, - queue_collector = Collector, - heartbeater = Heartbeater} + Frame = rabbit_binary_generator:build_heartbeat_frame(), + SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, + Parent = self(), + ReceiveFun = fun() -> Parent ! heartbeat_timeout end, + Heartbeater = + rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat, + SendFun, ClientHeartbeat, + ReceiveFun), + State#v1{connection_state = opening, + connection = Connection#connection{ + timeout_sec = ClientHeartbeat, + frame_max = FrameMax}, + queue_collector = Collector, + heartbeater = Heartbeater} + end end; handle_method0(#'connection.open'{virtual_host = VHostPath}, @@ -921,6 +942,12 @@ server_frame_max() -> {ok, FrameMax} = application:get_env(rabbit, frame_max), FrameMax. +server_channel_max() -> + case application:get_env(rabbit, channel_max) of + {ok, ChannelMax} -> ChannelMax; + undefined -> 0 + end. + server_heartbeat() -> {ok, Heartbeat} = application:get_env(rabbit, heartbeat), Heartbeat. @@ -989,7 +1016,7 @@ auth_phase(Response, State#v1{connection = Connection#connection{ auth_state = AuthState1}}; {ok, User} -> - Tune = #'connection.tune'{channel_max = 0, + Tune = #'connection.tune'{channel_max = server_channel_max(), frame_max = server_frame_max(), heartbeat = server_heartbeat()}, ok = send_on_channel0(Sock, Tune, Protocol), |
