summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@rabbitmq.com>2013-11-25 15:19:50 +0400
committerMichael Klishin <michael@rabbitmq.com>2013-11-25 15:19:50 +0400
commitd799ebb5bdd967bad0c92328f2188da9e4eaf4cf (patch)
treed45027f5cf154e4e4b4756470463ca45b7c3136e
parent6472f3c07a2624e6c1a62ea70479d2b9cb693ceb (diff)
downloadrabbitmq-server-git-d799ebb5bdd967bad0c92328f2188da9e4eaf4cf.tar.gz
Make connection.tune.channel_max configurable
-rw-r--r--src/rabbit_reader.erl61
1 files changed, 44 insertions, 17 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e00732fdcf..220db174aa 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -32,6 +32,7 @@
-define(CLOSING_TIMEOUT, 30).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
-define(SILENT_CLOSE_DELAY, 3).
+-define(CHANNEL_MIN, 4).
%%--------------------------------------------------------------------------
@@ -838,8 +839,9 @@ handle_method0(#'connection.secure_ok'{response = Response},
State = #v1{connection_state = securing}) ->
auth_phase(Response, State);
-handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
- heartbeat = ClientHeartbeat},
+handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
+ heartbeat = ClientHeartbeat,
+ channel_max = ChannelMax},
State = #v1{connection_state = tuning,
connection = Connection,
helper_sup = SupPid,
@@ -854,21 +856,40 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ServerFrameMax]);
true ->
- {ok, Collector} =
+ ServerChannelMax = server_channel_max(),
+ Protocol = Connection#connection.protocol,
+ if ChannelMax /= 0 andalso ChannelMax < ?CHANNEL_MIN ->
+ AmqpError = rabbit_misc:amqp_error(
+ not_allowed, "negotiated channel_max=~w < ~w min size",
+ [ChannelMax, ServerChannelMax], none),
+ {0, CloseMethod} = rabbit_binary_generator:map_exception(
+ 0, AmqpError, Protocol),
+ ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol);
+ ChannelMax /= 0 andalso ChannelMax > ServerChannelMax ->
+ AmqpError = rabbit_misc:amqp_error(
+ not_allowed, "negotiated channel_max=~w > ~w max size",
+ [ChannelMax, ServerChannelMax], none),
+ {0, CloseMethod} = rabbit_binary_generator:map_exception(
+ 0, AmqpError, Protocol),
+ ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol);
+ true ->
+ {ok, Collector} =
rabbit_connection_helper_sup:start_queue_collector(SupPid),
- Frame = rabbit_binary_generator:build_heartbeat_frame(),
- SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
- Parent = self(),
- ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
- Heartbeater =
- rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat,
- SendFun, ClientHeartbeat, ReceiveFun),
- State#v1{connection_state = opening,
- connection = Connection#connection{
- timeout_sec = ClientHeartbeat,
- frame_max = FrameMax},
- queue_collector = Collector,
- heartbeater = Heartbeater}
+ Frame = rabbit_binary_generator:build_heartbeat_frame(),
+ SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
+ Parent = self(),
+ ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
+ Heartbeater =
+ rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat,
+ SendFun, ClientHeartbeat,
+ ReceiveFun),
+ State#v1{connection_state = opening,
+ connection = Connection#connection{
+ timeout_sec = ClientHeartbeat,
+ frame_max = FrameMax},
+ queue_collector = Collector,
+ heartbeater = Heartbeater}
+ end
end;
handle_method0(#'connection.open'{virtual_host = VHostPath},
@@ -921,6 +942,12 @@ server_frame_max() ->
{ok, FrameMax} = application:get_env(rabbit, frame_max),
FrameMax.
+server_channel_max() ->
+ case application:get_env(rabbit, channel_max) of
+ {ok, ChannelMax} -> ChannelMax;
+ undefined -> 0
+ end.
+
server_heartbeat() ->
{ok, Heartbeat} = application:get_env(rabbit, heartbeat),
Heartbeat.
@@ -989,7 +1016,7 @@ auth_phase(Response,
State#v1{connection = Connection#connection{
auth_state = AuthState1}};
{ok, User} ->
- Tune = #'connection.tune'{channel_max = 0,
+ Tune = #'connection.tune'{channel_max = server_channel_max(),
frame_max = server_frame_max(),
heartbeat = server_heartbeat()},
ok = send_on_channel0(Sock, Tune, Protocol),