summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-01-12 22:37:32 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-01-12 22:37:32 +0000
commit9090d0daea27ffa72eb7a61fe61c23bafae4b462 (patch)
tree897aeabf75b8ab79ed94549bb95372ca31c0afbc /src
parent07611a63fcfdf5087782335160a3bd5d598df75f (diff)
downloadrabbitmq-server-git-9090d0daea27ffa72eb7a61fe61c23bafae4b462.tar.gz
a little bit of refactoring on the reader
- move the 'blocked'-induced heartbeater disabling to the point at which the transition to the 'blocked' state takes place. This makes the correlation between these two events more obvious. It also prevents duplicate invocations of rabbit_heartbeat:pause_monitor/1, which hitherto was accomplished by relying on switch_callback/3 never getting called while 'blocked' - an assumption that could easily become invalid one day. - a little bit of inlining of timeout calculation code in close_connection/1. - more compact case handling in handle_dependent_exit/3 and wait_for_channel_termination/2.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_reader.erl58
1 files changed, 23 insertions, 35 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 045cc969af..ff5eac21f0 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -345,10 +345,6 @@ handle_other(Other, _Deb, _State) ->
%% internal error -> something worth dying for
exit({unexpected_message, Other}).
-switch_callback(State = #v1{connection_state = blocked,
- heartbeater = Heartbeater}, Callback, Length) ->
- ok = rabbit_heartbeat:pause_monitor(Heartbeater),
- State#v1{callback = Callback, recv_len = Length};
switch_callback(State, Callback, Length) ->
State#v1{callback = Callback, recv_len = Length}.
@@ -380,28 +376,22 @@ close_connection(State = #v1{queue_collector = Collector,
rabbit_queue_collector:delete_all(Collector),
%% We terminate the connection after the specified interval, but
%% no later than ?CLOSING_TIMEOUT seconds.
- TimeoutMillisec =
- 1000 * if TimeoutSec > 0 andalso
- TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec;
- true -> ?CLOSING_TIMEOUT
- end,
- erlang:send_after(TimeoutMillisec, self(), terminate_connection),
+ erlang:send_after((if TimeoutSec > 0 andalso
+ TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec;
+ true -> ?CLOSING_TIMEOUT
+ end) * 1000, self(), terminate_connection),
State#v1{connection_state = closed}.
handle_dependent_exit(ChPid, Reason, State) ->
- case termination_kind(Reason) of
- controlled ->
- channel_cleanup(ChPid),
+ case {channel_cleanup(ChPid), termination_kind(Reason)} of
+ {undefined, uncontrolled} ->
+ exit({abnormal_dependent_exit, ChPid, Reason});
+ {_Channel, controlled} ->
maybe_close(State);
- uncontrolled ->
- case channel_cleanup(ChPid) of
- undefined -> exit({abnormal_dependent_exit, ChPid, Reason});
- Channel -> rabbit_log:error(
- "connection ~p, channel ~p - error:~n~p~n",
- [self(), Channel, Reason]),
- maybe_close(
- handle_exception(State, Channel, Reason))
- end
+ {Channel, uncontrolled} ->
+ rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
+ [self(), Channel, Reason]),
+ maybe_close(handle_exception(State, Channel, Reason))
end.
channel_cleanup(ChPid) ->
@@ -436,19 +426,15 @@ wait_for_channel_termination(0, TimerRef) ->
wait_for_channel_termination(N, TimerRef) ->
receive
{'DOWN', _MRef, process, ChPid, Reason} ->
- case channel_cleanup(ChPid) of
- undefined ->
+ case {channel_cleanup(ChPid), termination_kind(Reason)} of
+ {undefined, _} ->
exit({abnormal_dependent_exit, ChPid, Reason});
- Channel ->
- case termination_kind(Reason) of
- controlled ->
- ok;
- uncontrolled ->
- rabbit_log:error(
- "connection ~p, channel ~p - "
- "error while terminating:~n~p~n",
- [self(), Channel, Reason])
- end,
+ {_Channel, controlled} ->
+ wait_for_channel_termination(N-1, TimerRef);
+ {Channel, uncontrolled} ->
+ rabbit_log:error("connection ~p, channel ~p - "
+ "error while terminating:~n~p~n",
+ [self(), Channel, Reason]),
wait_for_channel_termination(N-1, TimerRef)
end;
cancel_wait ->
@@ -525,7 +511,9 @@ post_process_frame({method, MethodName, _}, _ChPid,
case Protocol:method_has_content(MethodName) of
true -> erlang:bump_reductions(2000),
case State#v1.connection_state of
- blocking -> State#v1{connection_state = blocked};
+ blocking -> ok = rabbit_heartbeat:pause_monitor(
+ State#v1.heartbeater),
+ State#v1{connection_state = blocked};
_ -> State
end;
false -> State