summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_reader.erl72
1 files changed, 44 insertions, 28 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 3676d9bc59..968fb15732 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -41,7 +41,7 @@
stats_timer, channel_sup_sup_pid, buf, buf_len, throttle}).
-record(connection, {name, host, peer_host, port, peer_port,
- protocol, user, timeout_sec, frame_max, vhost,
+ protocol, user, timeout_sec, frame_max, channel_max, vhost,
client_properties, capabilities,
auth_mechanism, auth_state}).
@@ -607,17 +607,28 @@ create_channel(Channel, State) ->
connection = #connection{name = Name,
protocol = Protocol,
frame_max = FrameMax,
+ channel_max = ChannelMax,
user = User,
vhost = VHost,
capabilities = Capabilities}} = State,
- {ok, _ChSupPid, {ChPid, AState}} =
- rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
- Protocol, User, VHost, Capabilities, Collector}),
- MRef = erlang:monitor(process, ChPid),
- put({ch_pid, ChPid}, {Channel, MRef}),
- put({channel, Channel}, {ChPid, AState}),
- {ChPid, AState}.
+ case ChannelMax /= 0 andalso Channel > ChannelMax of
+ true ->
+ %% we cannot use rabbit_misc:protocol_error here because amqp_error is caught
+ %% only for the methods on channel 0.
+ AmqpError = rabbit_misc:amqp_error(
+ not_allowed, "channel ~w is greater than negotiated channel_max (~w)",
+ [Channel, ChannelMax], 'channel.open'),
+ throw({error, AmqpError});
+ false ->
+ {ok, _ChSupPid, {ChPid, AState}} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
+ Protocol, User, VHost, Capabilities, Collector}),
+ MRef = erlang:monitor(process, ChPid),
+ put({ch_pid, ChPid}, {Channel, MRef}),
+ put({channel, Channel}, {ChPid, AState}),
+ {ChPid, AState}
+ end.
channel_cleanup(ChPid) ->
case get({ch_pid, ChPid}) of
@@ -665,24 +676,28 @@ handle_frame(Type, Channel, Payload, State) ->
process_frame(Frame, Channel, State) ->
ChKey = {channel, Channel},
- {ChPid, AState} = case get(ChKey) of
- undefined -> create_channel(Channel, State);
- Other -> Other
- end,
- case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} ->
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {ok, Method, NewAState} ->
- rabbit_channel:do(ChPid, Method),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {ok, Method, Content, NewAState} ->
- rabbit_channel:do_flow(ChPid, Method, Content),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, control_throttle(State));
- {error, Reason} ->
- handle_exception(State, Channel, Reason)
+ try
+ {ChPid, AState} = case get(ChKey) of
+ undefined -> create_channel(Channel, State);
+ Other -> Other
+ end,
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} ->
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, NewAState} ->
+ rabbit_channel:do(ChPid, Method),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, Content, NewAState} ->
+ rabbit_channel:do_flow(ChPid, Method, Content),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, control_throttle(State));
+ {error, Reason} ->
+ handle_exception(State, Channel, Reason)
+ end
+ catch {error, Error} ->
+ handle_exception(State, Channel, Error)
end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
@@ -862,7 +877,8 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
State#v1{connection_state = opening,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
- frame_max = FrameMax},
+ frame_max = FrameMax,
+ channel_max = ChannelMax},
queue_collector = Collector,
heartbeater = Heartbeater};