summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_reader.erl92
1 files changed, 51 insertions, 41 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index f9697c96e5..d96149bda9 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -93,7 +93,8 @@
%% channel supervisor
channel_sup_sup_pid,
%% how many channels this connection has
- channel_count,
+ channels = #{},
+ channel_pids = #{},
%% throttling state, for both
%% credit- and resource-driven flow control
throttle,
@@ -349,7 +350,8 @@ start_connection(Parent, HelperSup, Deb, Sock) ->
helper_sup = HelperSup,
heartbeater = none,
channel_sup_sup_pid = none,
- channel_count = 0,
+ channels = #{},
+ channel_pids = #{},
throttle = #throttle{
last_blocked_at = never,
should_block = false,
@@ -722,13 +724,14 @@ handle_dependent_exit(ChPid, Reason, State) ->
maybe_close(control_throttle(State2))
end.
-terminate_channels(#v1{channel_count = 0} = State) ->
+terminate_channels(#v1{channels = Channels} = State)
+ when map_size(Channels) == 0 ->
State;
-terminate_channels(#v1{channel_count = ChannelCount} = State) ->
- lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
- Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * ChannelCount,
+terminate_channels(#v1{channels = Channels} = State) ->
+ lists:foreach(fun rabbit_channel:shutdown/1, all_channels(State)),
+ Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * map_size(Channels),
TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
- wait_for_channel_termination(ChannelCount, TimerRef, State).
+ wait_for_channel_termination(map_size(Channels), TimerRef, State).
wait_for_channel_termination(0, TimerRef, State) ->
case erlang:cancel_timer(TimerRef) of
@@ -770,9 +773,10 @@ wait_for_channel_termination(N, TimerRef,
end.
maybe_close(State = #v1{connection_state = closing,
- channel_count = 0,
+ channels = Channels,
connection = #connection{protocol = Protocol},
- sock = Sock}) ->
+ sock = Sock})
+ when map_size(Channels) == 0 ->
NewState = close_connection(State),
ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
NewState;
@@ -893,18 +897,19 @@ payload_snippet(<<Snippet:16/binary, _/binary>>) ->
%%--------------------------------------------------------------------------
create_channel(_Channel,
- #v1{channel_count = ChannelCount,
+ #v1{channels = Channels,
connection = #connection{channel_max = ChannelMax}})
- when ChannelMax /= 0 andalso ChannelCount >= ChannelMax ->
+ when ChannelMax /= 0 andalso map_size(Channels) >= ChannelMax ->
{error, rabbit_misc:amqp_error(
not_allowed, "number of channels opened (~w) has reached the "
"negotiated channel_max (~w)",
- [ChannelCount, ChannelMax], 'none')};
+ [map_size(Channels), ChannelMax], 'none')};
create_channel(Channel,
#v1{sock = Sock,
queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
- channel_count = ChannelCount,
+ channels = Channels,
+ channel_pids = ChPids,
connection =
#connection{name = Name,
protocol = Protocol,
@@ -921,9 +926,9 @@ create_channel(Channel,
Protocol, User, VHost, Capabilities,
Collector}),
MRef = erlang:monitor(process, ChPid),
- put({ch_pid, ChPid}, {Channel, MRef}),
- put({channel, Channel}, {ChPid, AState}),
- {ok, {ChPid, AState}, State#v1{channel_count = ChannelCount + 1}};
+ {ok, {ChPid, AState},
+ State#v1{channels = Channels#{Channel => {ChPid, AState}},
+ channel_pids = ChPids#{ChPid => {Channel, MRef}}}};
{true, Limit} ->
{error, rabbit_misc:amqp_error(not_allowed,
"number of channels opened for user '~s' has reached "
@@ -931,23 +936,26 @@ create_channel(Channel,
[Username, Limit], 'none')}
end.
-channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) ->
- case get({ch_pid, ChPid}) of
- undefined -> {undefined, State};
- {Channel, MRef} -> credit_flow:peer_down(ChPid),
- erase({channel, Channel}),
- erase({ch_pid, ChPid}),
- erlang:demonitor(MRef, [flush]),
- {Channel, State#v1{channel_count = ChannelCount - 1}}
+channel_cleanup(ChPid, State = #v1{channels = Channels0,
+ channel_pids = ChPids0}) ->
+ case maps:take(ChPid, ChPids0) of
+ error -> {undefined, State};
+ {{ChannelNum, MRef}, ChPids} ->
+ credit_flow:peer_down(ChPid),
+ Channels = maps:remove(ChannelNum, Channels0),
+ erlang:demonitor(MRef, [flush]),
+ {ChannelNum, State#v1{channels = Channels,
+ channel_pids = ChPids}}
end.
-all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
+all_channels(#v1{channel_pids = ChPids}) ->
+ maps:keys(ChPids).
clean_up_all_channels(State) ->
CleanupFun = fun(ChPid) ->
channel_cleanup(ChPid, State)
end,
- lists:foreach(CleanupFun, all_channels()).
+ lists:foreach(CleanupFun, all_channels(State)).
%%--------------------------------------------------------------------------
@@ -981,27 +989,29 @@ handle_frame(_Type, _Channel, _Payload, State) when ?IS_STOPPING(State) ->
handle_frame(Type, Channel, Payload, State) ->
unexpected_frame(Type, Channel, Payload, State).
-process_frame(Frame, Channel, State) ->
- ChKey = {channel, Channel},
- case (case get(ChKey) of
- undefined -> create_channel(Channel, State);
- Other -> {ok, Other, State}
+process_frame(Frame, Channel, #v1{channels = Channels} = State0) ->
+ case (case maps:get(Channel, Channels, undefined) of
+ undefined -> create_channel(Channel, State0);
+ Other -> {ok, Other, State0}
end) of
{error, Error} ->
- handle_exception(State, Channel, Error);
+ handle_exception(State0, Channel, Error);
{ok, {ChPid, AState}, State1} ->
case rabbit_command_assembler:process(Frame, AState) of
{ok, NewAState} ->
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State1);
+ State = State1#v1{channels =
+ Channels#{Channel => {ChPid, NewAState}}},
+ post_process_frame(Frame, ChPid, State);
{ok, Method, NewAState} ->
rabbit_channel:do(ChPid, Method),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State1);
+ State = State1#v1{channels =
+ Channels#{Channel => {ChPid, NewAState}}},
+ post_process_frame(Frame, ChPid, State);
{ok, Method, Content, NewAState} ->
rabbit_channel:do_flow(ChPid, Method, Content),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, control_throttle(State1));
+ State = State1#v1{channels =
+ Channels#{Channel => {ChPid, NewAState}}},
+ post_process_frame(Frame, ChPid, control_throttle(State));
{error, Reason} ->
handle_exception(State1, Channel, Reason)
end
@@ -1260,7 +1270,7 @@ handle_method0(#'connection.open'{virtual_host = VHost},
[self(), dynamic_connection_name(ConnName), Username, VHost]),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
- lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
+ lists:foreach(fun rabbit_channel:shutdown/1, all_channels(State)),
maybe_close(State#v1{connection_state = closing});
handle_method0(#'connection.close'{},
State = #v1{connection = #connection{protocol = Protocol},
@@ -1295,7 +1305,7 @@ handle_method0(#'connection.update_secret'{new_secret = NewSecret, reason = Reas
lists:foreach(fun(Ch) ->
rabbit_log:debug("Updating user/auth backend state for channel ~p", [Ch]),
_ = rabbit_channel:update_user_state(Ch, User1)
- end, all_channels()),
+ end, all_channels(State)),
ok = send_on_channel0(Sock, #'connection.update_secret_ok'{}, Protocol),
rabbit_log_connection:info(
"connection ~p (~s): "
@@ -1502,7 +1512,7 @@ i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S);
i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S);
i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S);
i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
-i(channels, #v1{channel_count = ChannelCount}) -> ChannelCount;
+i(channels, #v1{channels = Channels}) -> map_size(Channels);
i(state, #v1{connection_state = ConnectionState,
throttle = #throttle{blocked_by = Reasons,
last_blocked_at = T} = Throttle}) ->