diff options
author | kjnilsson <knilsson@pivotal.io> | 2020-10-09 12:18:38 +0100 |
---|---|---|
committer | kjnilsson <knilsson@pivotal.io> | 2020-10-09 12:18:38 +0100 |
commit | d51d3efc48e2ab13dab1172e0c03493c18cc7b68 (patch) | |
tree | 54a650142fc1befde4ce632e6195ab0f2da4b32b | |
parent | b460ea1b4f41c5547f03c316e758be61bdcd0f2e (diff) | |
download | rabbitmq-server-git-reduce-proc-dict-use.tar.gz |
Reduce use of process dictionaryreduce-proc-dict-use
In cases where dynamic keys are used.
-rw-r--r-- | src/rabbit_reader.erl | 92 |
1 files changed, 51 insertions, 41 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index f9697c96e5..d96149bda9 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -93,7 +93,8 @@ %% channel supervisor channel_sup_sup_pid, %% how many channels this connection has - channel_count, + channels = #{}, + channel_pids = #{}, %% throttling state, for both %% credit- and resource-driven flow control throttle, @@ -349,7 +350,8 @@ start_connection(Parent, HelperSup, Deb, Sock) -> helper_sup = HelperSup, heartbeater = none, channel_sup_sup_pid = none, - channel_count = 0, + channels = #{}, + channel_pids = #{}, throttle = #throttle{ last_blocked_at = never, should_block = false, @@ -722,13 +724,14 @@ handle_dependent_exit(ChPid, Reason, State) -> maybe_close(control_throttle(State2)) end. -terminate_channels(#v1{channel_count = 0} = State) -> +terminate_channels(#v1{channels = Channels} = State) + when map_size(Channels) == 0 -> State; -terminate_channels(#v1{channel_count = ChannelCount} = State) -> - lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), - Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * ChannelCount, +terminate_channels(#v1{channels = Channels} = State) -> + lists:foreach(fun rabbit_channel:shutdown/1, all_channels(State)), + Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * map_size(Channels), TimerRef = erlang:send_after(Timeout, self(), cancel_wait), - wait_for_channel_termination(ChannelCount, TimerRef, State). + wait_for_channel_termination(map_size(Channels), TimerRef, State). wait_for_channel_termination(0, TimerRef, State) -> case erlang:cancel_timer(TimerRef) of @@ -770,9 +773,10 @@ wait_for_channel_termination(N, TimerRef, end. maybe_close(State = #v1{connection_state = closing, - channel_count = 0, + channels = Channels, connection = #connection{protocol = Protocol}, - sock = Sock}) -> + sock = Sock}) + when map_size(Channels) == 0 -> NewState = close_connection(State), ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), NewState; @@ -893,18 +897,19 @@ payload_snippet(<<Snippet:16/binary, _/binary>>) -> %%-------------------------------------------------------------------------- create_channel(_Channel, - #v1{channel_count = ChannelCount, + #v1{channels = Channels, connection = #connection{channel_max = ChannelMax}}) - when ChannelMax /= 0 andalso ChannelCount >= ChannelMax -> + when ChannelMax /= 0 andalso map_size(Channels) >= ChannelMax -> {error, rabbit_misc:amqp_error( not_allowed, "number of channels opened (~w) has reached the " "negotiated channel_max (~w)", - [ChannelCount, ChannelMax], 'none')}; + [map_size(Channels), ChannelMax], 'none')}; create_channel(Channel, #v1{sock = Sock, queue_collector = Collector, channel_sup_sup_pid = ChanSupSup, - channel_count = ChannelCount, + channels = Channels, + channel_pids = ChPids, connection = #connection{name = Name, protocol = Protocol, @@ -921,9 +926,9 @@ create_channel(Channel, Protocol, User, VHost, Capabilities, Collector}), MRef = erlang:monitor(process, ChPid), - put({ch_pid, ChPid}, {Channel, MRef}), - put({channel, Channel}, {ChPid, AState}), - {ok, {ChPid, AState}, State#v1{channel_count = ChannelCount + 1}}; + {ok, {ChPid, AState}, + State#v1{channels = Channels#{Channel => {ChPid, AState}}, + channel_pids = ChPids#{ChPid => {Channel, MRef}}}}; {true, Limit} -> {error, rabbit_misc:amqp_error(not_allowed, "number of channels opened for user '~s' has reached " @@ -931,23 +936,26 @@ create_channel(Channel, [Username, Limit], 'none')} end. -channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) -> - case get({ch_pid, ChPid}) of - undefined -> {undefined, State}; - {Channel, MRef} -> credit_flow:peer_down(ChPid), - erase({channel, Channel}), - erase({ch_pid, ChPid}), - erlang:demonitor(MRef, [flush]), - {Channel, State#v1{channel_count = ChannelCount - 1}} +channel_cleanup(ChPid, State = #v1{channels = Channels0, + channel_pids = ChPids0}) -> + case maps:take(ChPid, ChPids0) of + error -> {undefined, State}; + {{ChannelNum, MRef}, ChPids} -> + credit_flow:peer_down(ChPid), + Channels = maps:remove(ChannelNum, Channels0), + erlang:demonitor(MRef, [flush]), + {ChannelNum, State#v1{channels = Channels, + channel_pids = ChPids}} end. -all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. +all_channels(#v1{channel_pids = ChPids}) -> + maps:keys(ChPids). clean_up_all_channels(State) -> CleanupFun = fun(ChPid) -> channel_cleanup(ChPid, State) end, - lists:foreach(CleanupFun, all_channels()). + lists:foreach(CleanupFun, all_channels(State)). %%-------------------------------------------------------------------------- @@ -981,27 +989,29 @@ handle_frame(_Type, _Channel, _Payload, State) when ?IS_STOPPING(State) -> handle_frame(Type, Channel, Payload, State) -> unexpected_frame(Type, Channel, Payload, State). -process_frame(Frame, Channel, State) -> - ChKey = {channel, Channel}, - case (case get(ChKey) of - undefined -> create_channel(Channel, State); - Other -> {ok, Other, State} +process_frame(Frame, Channel, #v1{channels = Channels} = State0) -> + case (case maps:get(Channel, Channels, undefined) of + undefined -> create_channel(Channel, State0); + Other -> {ok, Other, State0} end) of {error, Error} -> - handle_exception(State, Channel, Error); + handle_exception(State0, Channel, Error); {ok, {ChPid, AState}, State1} -> case rabbit_command_assembler:process(Frame, AState) of {ok, NewAState} -> - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State1); + State = State1#v1{channels = + Channels#{Channel => {ChPid, NewAState}}}, + post_process_frame(Frame, ChPid, State); {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State1); + State = State1#v1{channels = + Channels#{Channel => {ChPid, NewAState}}}, + post_process_frame(Frame, ChPid, State); {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(ChPid, Method, Content), - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, control_throttle(State1)); + State = State1#v1{channels = + Channels#{Channel => {ChPid, NewAState}}}, + post_process_frame(Frame, ChPid, control_throttle(State)); {error, Reason} -> handle_exception(State1, Channel, Reason) end @@ -1260,7 +1270,7 @@ handle_method0(#'connection.open'{virtual_host = VHost}, [self(), dynamic_connection_name(ConnName), Username, VHost]), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> - lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), + lists:foreach(fun rabbit_channel:shutdown/1, all_channels(State)), maybe_close(State#v1{connection_state = closing}); handle_method0(#'connection.close'{}, State = #v1{connection = #connection{protocol = Protocol}, @@ -1295,7 +1305,7 @@ handle_method0(#'connection.update_secret'{new_secret = NewSecret, reason = Reas lists:foreach(fun(Ch) -> rabbit_log:debug("Updating user/auth backend state for channel ~p", [Ch]), _ = rabbit_channel:update_user_state(Ch, User1) - end, all_channels()), + end, all_channels(State)), ok = send_on_channel0(Sock, #'connection.update_secret_ok'{}, Protocol), rabbit_log_connection:info( "connection ~p (~s): " @@ -1502,7 +1512,7 @@ i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S); i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S); i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S); i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S); -i(channels, #v1{channel_count = ChannelCount}) -> ChannelCount; +i(channels, #v1{channels = Channels}) -> map_size(Channels); i(state, #v1{connection_state = ConnectionState, throttle = #throttle{blocked_by = Reasons, last_blocked_at = T} = Throttle}) -> |