diff options
| author | Michael Klishin <michael@rabbitmq.com> | 2013-11-27 18:00:13 +0400 |
|---|---|---|
| committer | Michael Klishin <michael@rabbitmq.com> | 2013-11-27 18:00:13 +0400 |
| commit | 320126490654e585255f0e4442f280cb89b232d6 (patch) | |
| tree | 1189927bf0a858a4b6310b375bea88a8ffdcb41e | |
| parent | 87a5ab558320df7d12ac84677ce46c6a80a2985e (diff) | |
| download | rabbitmq-server-git-320126490654e585255f0e4442f280cb89b232d6.tar.gz | |
Reject channel.open method with channel number > negotiated channel_max
| -rw-r--r-- | src/rabbit_reader.erl | 72 |
1 files changed, 44 insertions, 28 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 3676d9bc59..968fb15732 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -41,7 +41,7 @@ stats_timer, channel_sup_sup_pid, buf, buf_len, throttle}). -record(connection, {name, host, peer_host, port, peer_port, - protocol, user, timeout_sec, frame_max, vhost, + protocol, user, timeout_sec, frame_max, channel_max, vhost, client_properties, capabilities, auth_mechanism, auth_state}). @@ -607,17 +607,28 @@ create_channel(Channel, State) -> connection = #connection{name = Name, protocol = Protocol, frame_max = FrameMax, + channel_max = ChannelMax, user = User, vhost = VHost, capabilities = Capabilities}} = State, - {ok, _ChSupPid, {ChPid, AState}} = - rabbit_channel_sup_sup:start_channel( - ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, - Protocol, User, VHost, Capabilities, Collector}), - MRef = erlang:monitor(process, ChPid), - put({ch_pid, ChPid}, {Channel, MRef}), - put({channel, Channel}, {ChPid, AState}), - {ChPid, AState}. + case ChannelMax /= 0 andalso Channel > ChannelMax of + true -> + %% we cannot use rabbit_misc:protocol_error here because amqp_error is caught + %% only for the methods on channel 0. + AmqpError = rabbit_misc:amqp_error( + not_allowed, "channel ~w is greater than negotiated channel_max (~w)", + [Channel, ChannelMax], 'channel.open'), + throw({error, AmqpError}); + false -> + {ok, _ChSupPid, {ChPid, AState}} = + rabbit_channel_sup_sup:start_channel( + ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, + Protocol, User, VHost, Capabilities, Collector}), + MRef = erlang:monitor(process, ChPid), + put({ch_pid, ChPid}, {Channel, MRef}), + put({channel, Channel}, {ChPid, AState}), + {ChPid, AState} + end. channel_cleanup(ChPid) -> case get({ch_pid, ChPid}) of @@ -665,24 +676,28 @@ handle_frame(Type, Channel, Payload, State) -> process_frame(Frame, Channel, State) -> ChKey = {channel, Channel}, - {ChPid, AState} = case get(ChKey) of - undefined -> create_channel(Channel, State); - Other -> Other - end, - case rabbit_command_assembler:process(Frame, AState) of - {ok, NewAState} -> - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State); - {ok, Method, NewAState} -> - rabbit_channel:do(ChPid, Method), - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State); - {ok, Method, Content, NewAState} -> - rabbit_channel:do_flow(ChPid, Method, Content), - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, control_throttle(State)); - {error, Reason} -> - handle_exception(State, Channel, Reason) + try + {ChPid, AState} = case get(ChKey) of + undefined -> create_channel(Channel, State); + Other -> Other + end, + case rabbit_command_assembler:process(Frame, AState) of + {ok, NewAState} -> + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State); + {ok, Method, NewAState} -> + rabbit_channel:do(ChPid, Method), + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State); + {ok, Method, Content, NewAState} -> + rabbit_channel:do_flow(ChPid, Method, Content), + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, control_throttle(State)); + {error, Reason} -> + handle_exception(State, Channel, Reason) + end + catch {error, Error} -> + handle_exception(State, Channel, Error) end. post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> @@ -862,7 +877,8 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, - frame_max = FrameMax}, + frame_max = FrameMax, + channel_max = ChannelMax}, queue_collector = Collector, heartbeater = Heartbeater}; |
