summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-01-12 14:49:51 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-01-12 14:49:51 +0000
commit647348c7f1e59cd0bc4082f5decbe40b9b8dfd36 (patch)
tree3fa36e00a4f8629a07fc5477ecb14c19d3da29b0 /src
parent34d09736e9231813d8e1b74a325dde2c547cbb62 (diff)
downloadrabbitmq-server-git-647348c7f1e59cd0bc4082f5decbe40b9b8dfd36.tar.gz
Improve book-keeping - now we treat a receiver having gone away as granting us infinite credit, so we can pump out our remaining messages. We now don't leak connections any more.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl17
-rw-r--r--src/rabbit_flow.erl18
-rw-r--r--src/rabbit_reader.erl3
3 files changed, 19 insertions, 19 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c95aaad7e3..31afa47325 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -332,6 +332,7 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
State2 = queue_blocked(QPid, State1),
State3 = handle_consuming_queue_down(QPid, State2),
+ rabbit_flow:receiver_down(QPid),
erase_queue_stats(QPid),
noreply(State3#ch{queue_monitors =
dict:erase(QPid, State3#ch.queue_monitors)});
@@ -1150,31 +1151,19 @@ consumer_monitor(ConsumerTag,
end.
monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
- case (not dict:is_key(QPid, QMons) andalso
- queue_monitor_needed(QPid, State)) of
+ case not dict:is_key(QPid, QMons) of
true -> MRef = erlang:monitor(process, QPid),
State#ch{queue_monitors = dict:store(QPid, MRef, QMons)};
false -> State
end.
demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
- case (dict:is_key(QPid, QMons) andalso
- not queue_monitor_needed(QPid, State)) of
+ case dict:is_key(QPid, QMons) of
true -> true = erlang:demonitor(dict:fetch(QPid, QMons)),
State#ch{queue_monitors = dict:erase(QPid, QMons)};
false -> State
end.
-queue_monitor_needed(QPid, #ch{queue_consumers = QCons,
- blocking = Blocking,
- unconfirmed_qm = UQM} = State) ->
- StatsEnabled = rabbit_event:stats_level(
- State, #ch.stats_timer) =:= fine,
- ConsumerMonitored = dict:is_key(QPid, QCons),
- QueueBlocked = sets:is_element(QPid, Blocking),
- ConfirmMonitored = gb_trees:is_defined(QPid, UQM),
- StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored.
-
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
{value, MsgSet} -> gb_sets:to_list(MsgSet);
diff --git a/src/rabbit_flow.erl b/src/rabbit_flow.erl
index cf7035f2a1..9759e72624 100644
--- a/src/rabbit_flow.erl
+++ b/src/rabbit_flow.erl
@@ -16,10 +16,10 @@
-module(rabbit_flow).
--define(MAX_CREDIT, 100).
--define(MORE_CREDIT_AT, 50).
+-define(MAX_CREDIT, 200).
+-define(MORE_CREDIT_AT, 150).
--export([ack/1, bump/1, blocked/0, send/1]).
+-export([ack/1, bump/1, blocked/0, send/1, receiver_down/1]).
%% There are two "flows" here; of messages and of credit, going in
%% opposite directions. The variable names "From" and "To" refer to
@@ -51,13 +51,17 @@ blocked() ->
get(credit_blocked, []) =/= [].
send(From) ->
- Credit = get({credit_from, From}, ?MAX_CREDIT) - 1,
+ Credit = get_credit(From) - 1,
case Credit of
0 -> block(From);
_ -> ok
end,
put({credit_from, From}, Credit).
+receiver_down(From) ->
+ unblock(From),
+ put({credit_from, From}, quiescing).
+
%% --------------------------------------------------------------------------
grant(To, Quantity) ->
@@ -85,3 +89,9 @@ get(Key, Default) ->
undefined -> Default;
Value -> Value
end.
+
+get_credit(From) ->
+ case get({credit_from, From}, ?MAX_CREDIT) of
+ quiescing -> 1;
+ Credit -> Credit
+ end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 072f378d29..bc25e8174c 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -428,7 +428,8 @@ handle_dependent_exit(ChPid, Reason, State) ->
channel_cleanup(ChPid) ->
case get({ch_pid, ChPid}) of
undefined -> undefined;
- {Channel, MRef} -> erase({channel, Channel}),
+ {Channel, MRef} -> rabbit_flow:receiver_down(ChPid),
+ erase({channel, Channel}),
erase({ch_pid, ChPid}),
erlang:demonitor(MRef, [flush]),
Channel