diff options
| author | Michael Klishin <michael@rabbitmq.com> | 2013-12-12 15:10:36 +0400 |
|---|---|---|
| committer | Michael Klishin <michael@rabbitmq.com> | 2013-12-12 15:10:36 +0400 |
| commit | f9b06b44acebf15b6726f9a17d0137178310a3d1 (patch) | |
| tree | e85cad6dd3bf904af66d0a98ec575f975632254a /src | |
| parent | c2801fded13614c1358699fc4d7dceb02ace9f5f (diff) | |
| download | rabbitmq-server-git-f9b06b44acebf15b6726f9a17d0137178310a3d1.tar.gz | |
Decrement channel_count in channel_cleanup
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_reader.erl | 54 |
1 files changed, 28 insertions, 26 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 23c78d0118..782900a9f9 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -328,10 +328,10 @@ handle_other({conserve_resources, Source, Conserve}, end, Throttle1 = Throttle#throttle{alarmed_by = CR1}, control_throttle(State#v1{throttle = Throttle1}); -handle_other({channel_closing, ChPid}, State = #v1{channel_count = ChannelCount}) -> +handle_other({channel_closing, ChPid}, State) -> ok = rabbit_channel:ready_for_close(ChPid), - channel_cleanup(ChPid), - maybe_close(control_throttle(State#v1{channel_count = (ChannelCount - 1)})); + {_, State1} = channel_cleanup(ChPid, State), + maybe_close(control_throttle(State1)); handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), @@ -488,27 +488,29 @@ close_connection(State = #v1{queue_collector = Collector, State#v1{connection_state = closed}. handle_dependent_exit(ChPid, Reason, State) -> - case {channel_cleanup(ChPid), termination_kind(Reason)} of - {undefined, controlled} -> State; - {undefined, uncontrolled} -> exit({abnormal_dependent_exit, - ChPid, Reason}); - {_Channel, controlled} -> maybe_close(control_throttle(State)); - {Channel, uncontrolled} -> State1 = handle_exception( - State, Channel, Reason), - maybe_close(control_throttle(State1)) + case {channel_cleanup(ChPid, State), termination_kind(Reason)} of + {{undefined, State1}, controlled} -> + State1; + {{undefined, _}, uncontrolled} -> + exit({abnormal_dependent_exit, ChPid, Reason}); + {{_, State1}, controlled} -> + maybe_close(control_throttle(State1)); + {{Channel, State1}, uncontrolled} -> + State2 = handle_exception(State1, Channel, Reason), + maybe_close(control_throttle(State2)) end. -terminate_channels() -> +terminate_channels(State) -> NChannels = length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]), if NChannels > 0 -> Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels, TimerRef = erlang:send_after(Timeout, self(), cancel_wait), - wait_for_channel_termination(NChannels, TimerRef); + wait_for_channel_termination(NChannels, TimerRef, State); true -> ok end. -wait_for_channel_termination(0, TimerRef) -> +wait_for_channel_termination(0, TimerRef, _State) -> case erlang:cancel_timer(TimerRef) of false -> receive cancel_wait -> ok @@ -516,20 +518,20 @@ wait_for_channel_termination(0, TimerRef) -> _ -> ok end; -wait_for_channel_termination(N, TimerRef) -> +wait_for_channel_termination(N, TimerRef, State) -> receive {'DOWN', _MRef, process, ChPid, Reason} -> - case {channel_cleanup(ChPid), termination_kind(Reason)} of + case {channel_cleanup(ChPid, State), termination_kind(Reason)} of {undefined, _} -> exit({abnormal_dependent_exit, ChPid, Reason}); {_Channel, controlled} -> - wait_for_channel_termination(N-1, TimerRef); + wait_for_channel_termination(N-1, TimerRef, State); {Channel, uncontrolled} -> log(error, "AMQP connection ~p, channel ~p - " "error while terminating:~n~p~n", [self(), Channel, Reason]), - wait_for_channel_termination(N-1, TimerRef) + wait_for_channel_termination(N-1, TimerRef, State) end; cancel_wait -> exit(channel_termination_timeout) @@ -563,7 +565,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol}, [self(), CS, Channel, Reason]), {0, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), - terminate_channels(), + terminate_channels(State), State1 = close_connection(State), ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol), State1; @@ -630,14 +632,15 @@ create_channel(Channel, State) -> [ChannelCount, ChannelMax], 'none')} end. -channel_cleanup(ChPid) -> +channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) -> + State1 = State#v1{channel_count = (ChannelCount - 1)}, case get({ch_pid, ChPid}) of - undefined -> undefined; + undefined -> {undefined, State1}; {Channel, MRef} -> credit_flow:peer_down(ChPid), erase({channel, Channel}), erase({ch_pid, ChPid}), erlang:demonitor(MRef, [flush]), - Channel + {Channel, State1} end. all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. @@ -707,13 +710,12 @@ process_frame(Frame, Channel, State) -> end end. -post_process_frame({method, 'channel.close_ok', _}, ChPid, - State = #v1{channel_count = ChannelCount}) -> - channel_cleanup(ChPid), +post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> + {_, State1} = channel_cleanup(ChPid, State), %% This is not strictly necessary, but more obviously %% correct. Also note that we do not need to call maybe_close/1 %% since we cannot possibly be in the 'closing' state. - control_throttle(State#v1{channel_count = (ChannelCount - 1)}); + control_throttle(State1); post_process_frame({content_header, _, _, _, _}, _ChPid, State) -> maybe_block(State); post_process_frame({content_body, _}, _ChPid, State) -> |
