summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@rabbitmq.com>2013-12-12 15:10:36 +0400
committerMichael Klishin <michael@rabbitmq.com>2013-12-12 15:10:36 +0400
commitf9b06b44acebf15b6726f9a17d0137178310a3d1 (patch)
treee85cad6dd3bf904af66d0a98ec575f975632254a /src
parentc2801fded13614c1358699fc4d7dceb02ace9f5f (diff)
downloadrabbitmq-server-git-f9b06b44acebf15b6726f9a17d0137178310a3d1.tar.gz
Decrement channel_count in channel_cleanup
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_reader.erl54
1 files changed, 28 insertions, 26 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 23c78d0118..782900a9f9 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -328,10 +328,10 @@ handle_other({conserve_resources, Source, Conserve},
end,
Throttle1 = Throttle#throttle{alarmed_by = CR1},
control_throttle(State#v1{throttle = Throttle1});
-handle_other({channel_closing, ChPid}, State = #v1{channel_count = ChannelCount}) ->
+handle_other({channel_closing, ChPid}, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
- channel_cleanup(ChPid),
- maybe_close(control_throttle(State#v1{channel_count = (ChannelCount - 1)}));
+ {_, State1} = channel_cleanup(ChPid, State),
+ maybe_close(control_throttle(State1));
handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -488,27 +488,29 @@ close_connection(State = #v1{queue_collector = Collector,
State#v1{connection_state = closed}.
handle_dependent_exit(ChPid, Reason, State) ->
- case {channel_cleanup(ChPid), termination_kind(Reason)} of
- {undefined, controlled} -> State;
- {undefined, uncontrolled} -> exit({abnormal_dependent_exit,
- ChPid, Reason});
- {_Channel, controlled} -> maybe_close(control_throttle(State));
- {Channel, uncontrolled} -> State1 = handle_exception(
- State, Channel, Reason),
- maybe_close(control_throttle(State1))
+ case {channel_cleanup(ChPid, State), termination_kind(Reason)} of
+ {{undefined, State1}, controlled} ->
+ State1;
+ {{undefined, _}, uncontrolled} ->
+ exit({abnormal_dependent_exit, ChPid, Reason});
+ {{_, State1}, controlled} ->
+ maybe_close(control_throttle(State1));
+ {{Channel, State1}, uncontrolled} ->
+ State2 = handle_exception(State1, Channel, Reason),
+ maybe_close(control_throttle(State2))
end.
-terminate_channels() ->
+terminate_channels(State) ->
NChannels =
length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]),
if NChannels > 0 ->
Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
- wait_for_channel_termination(NChannels, TimerRef);
+ wait_for_channel_termination(NChannels, TimerRef, State);
true -> ok
end.
-wait_for_channel_termination(0, TimerRef) ->
+wait_for_channel_termination(0, TimerRef, _State) ->
case erlang:cancel_timer(TimerRef) of
false -> receive
cancel_wait -> ok
@@ -516,20 +518,20 @@ wait_for_channel_termination(0, TimerRef) ->
_ -> ok
end;
-wait_for_channel_termination(N, TimerRef) ->
+wait_for_channel_termination(N, TimerRef, State) ->
receive
{'DOWN', _MRef, process, ChPid, Reason} ->
- case {channel_cleanup(ChPid), termination_kind(Reason)} of
+ case {channel_cleanup(ChPid, State), termination_kind(Reason)} of
{undefined, _} ->
exit({abnormal_dependent_exit, ChPid, Reason});
{_Channel, controlled} ->
- wait_for_channel_termination(N-1, TimerRef);
+ wait_for_channel_termination(N-1, TimerRef, State);
{Channel, uncontrolled} ->
log(error,
"AMQP connection ~p, channel ~p - "
"error while terminating:~n~p~n",
[self(), Channel, Reason]),
- wait_for_channel_termination(N-1, TimerRef)
+ wait_for_channel_termination(N-1, TimerRef, State)
end;
cancel_wait ->
exit(channel_termination_timeout)
@@ -563,7 +565,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol},
[self(), CS, Channel, Reason]),
{0, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
- terminate_channels(),
+ terminate_channels(State),
State1 = close_connection(State),
ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol),
State1;
@@ -630,14 +632,15 @@ create_channel(Channel, State) ->
[ChannelCount, ChannelMax], 'none')}
end.
-channel_cleanup(ChPid) ->
+channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) ->
+ State1 = State#v1{channel_count = (ChannelCount - 1)},
case get({ch_pid, ChPid}) of
- undefined -> undefined;
+ undefined -> {undefined, State1};
{Channel, MRef} -> credit_flow:peer_down(ChPid),
erase({channel, Channel}),
erase({ch_pid, ChPid}),
erlang:demonitor(MRef, [flush]),
- Channel
+ {Channel, State1}
end.
all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
@@ -707,13 +710,12 @@ process_frame(Frame, Channel, State) ->
end
end.
-post_process_frame({method, 'channel.close_ok', _}, ChPid,
- State = #v1{channel_count = ChannelCount}) ->
- channel_cleanup(ChPid),
+post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
+ {_, State1} = channel_cleanup(ChPid, State),
%% This is not strictly necessary, but more obviously
%% correct. Also note that we do not need to call maybe_close/1
%% since we cannot possibly be in the 'closing' state.
- control_throttle(State#v1{channel_count = (ChannelCount - 1)});
+ control_throttle(State1);
post_process_frame({content_header, _, _, _, _}, _ChPid, State) ->
maybe_block(State);
post_process_frame({content_body, _}, _ChPid, State) ->