summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-12-11 13:50:55 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-12-11 13:50:55 +0000
commit0a93735b7e860f14fc9a31219468ac565c6d0441 (patch)
tree4c0d733b644f9ed4cf9386b5346a87d3647ee33a
parent8856091b8230dd12c06f5453bee940f0d7b4aca9 (diff)
parent93c296971b70835cdc8b49ad975f4a57c1328f2f (diff)
downloadrabbitmq-server-git-0a93735b7e860f14fc9a31219468ac565c6d0441.tar.gz
Merge in default
-rw-r--r--src/rabbit_reader.erl81
1 files changed, 47 insertions, 34 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 67effab0ea..1f2138753f 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -43,7 +43,8 @@
-record(connection, {name, host, peer_host, port, peer_port,
protocol, user, timeout_sec, frame_max, channel_max, vhost,
client_properties, capabilities,
- auth_mechanism, auth_state}).
+ auth_mechanism, auth_state,
+ channel_count}).
-record(throttle, {alarmed_by, last_blocked_by, last_blocked_at,
blocked_sent}).
@@ -603,28 +604,30 @@ payload_snippet(<<Snippet:16/binary, _/binary>>) ->
create_channel(Channel, State) ->
#v1{sock = Sock, queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
- connection = #connection{name = Name,
- protocol = Protocol,
- frame_max = FrameMax,
- channel_max = ChannelMax,
- user = User,
- vhost = VHost,
- capabilities = Capabilities}} = State,
- N = length(all_channels()),
- case ChannelMax == 0 orelse N < ChannelMax of
- true -> {ok, _ChSupPid, {ChPid, AState}} =
+ connection = Conn} = State,
+ #connection{name = Name,
+ protocol = Protocol,
+ frame_max = FrameMax,
+ channel_max = ChannelMax,
+ channel_count = ChannelCount,
+ user = User,
+ vhost = VHost,
+ capabilities = Capabilities} = Conn,
+ case ChannelMax == 0 orelse ChannelCount < ChannelMax of
+ true -> {ok, _ChSupPid, {ChPid, ChState}} =
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
Protocol, User, VHost, Capabilities,
Collector}),
MRef = erlang:monitor(process, ChPid),
+ State1 = State#v1{connection =
+ Conn#connection{channel_count = (ChannelCount + 1)}},
put({ch_pid, ChPid}, {Channel, MRef}),
- put({channel, Channel}, {ChPid, AState}),
- {ok, {ChPid, AState}};
+ {ok, {ChPid, ChState}, State1};
false -> {error, rabbit_misc:amqp_error(
not_allowed, "number of channels opened (~w) has "
"reached the negotiated channel_max (~w)",
- [N, ChannelMax], 'none')}
+ [ChannelCount, ChannelMax], 'none')}
end.
channel_cleanup(ChPid) ->
@@ -673,27 +676,35 @@ handle_frame(Type, Channel, Payload, State) ->
process_frame(Frame, Channel, State) ->
ChKey = {channel, Channel},
- case (case get(ChKey) of
- undefined -> create_channel(Channel, State);
- Other -> {ok, Other}
- end) of
+ ChRes = case get(ChKey) of
+ undefined ->
+ case create_channel(Channel, State) of
+ {ok, ChVal, ConnState} ->
+ put(ChKey, ChVal),
+ {ok, ChVal, ConnState};
+ {error, E} ->
+ {error, E}
+ end;
+ Other -> {ok, Other, State}
+ end,
+ case ChRes of
{error, Error} ->
handle_exception(State, Channel, Error);
- {ok, {ChPid, AState}} ->
- case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} ->
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {ok, Method, NewAState} ->
+ {ok, {ChPid, ChState}, State1} ->
+ case rabbit_command_assembler:process(Frame, ChState) of
+ {ok, NewChState} ->
+ put(ChKey, {ChPid, NewChState}),
+ post_process_frame(Frame, ChPid, State1);
+ {ok, Method, NewChState} ->
rabbit_channel:do(ChPid, Method),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {ok, Method, Content, NewAState} ->
+ put(ChKey, {ChPid, NewChState}),
+ post_process_frame(Frame, ChPid, State1);
+ {ok, Method, Content, NewChState} ->
rabbit_channel:do_flow(ChPid, Method, Content),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, control_throttle(State));
+ put(ChKey, {ChPid, NewChState}),
+ post_process_frame(Frame, ChPid, control_throttle(State1));
{error, Reason} ->
- handle_exception(State, Channel, Reason)
+ handle_exception(State1, Channel, Reason)
end
end.
@@ -873,9 +884,10 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
SendFun, ClientHeartbeat, ReceiveFun),
State#v1{connection_state = opening,
connection = Connection#connection{
- frame_max = FrameMax,
- channel_max = ChannelMax,
- timeout_sec = ClientHeartbeat},
+ frame_max = FrameMax,
+ channel_max = ChannelMax,
+ channel_count = 0,
+ timeout_sec = ClientHeartbeat},
queue_collector = Collector,
heartbeater = Heartbeater};
@@ -1041,7 +1053,8 @@ i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S);
i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S);
i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S);
i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
-i(channels, #v1{}) -> length(all_channels());
+i(channels, #v1{connection = Conn}) ->
+ Conn#connection.channel_count;
i(state, #v1{connection_state = ConnectionState,
throttle = #throttle{last_blocked_by = BlockedBy,
last_blocked_at = T}}) ->