diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-01-12 14:49:51 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-01-12 14:49:51 +0000 |
| commit | 647348c7f1e59cd0bc4082f5decbe40b9b8dfd36 (patch) | |
| tree | 3fa36e00a4f8629a07fc5477ecb14c19d3da29b0 /src | |
| parent | 34d09736e9231813d8e1b74a325dde2c547cbb62 (diff) | |
| download | rabbitmq-server-git-647348c7f1e59cd0bc4082f5decbe40b9b8dfd36.tar.gz | |
Improve book-keeping - now we treat a receiver having gone away as granting us infinite credit, so we can pump out our remaining messages. We now don't leak connections any more.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_flow.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 3 |
3 files changed, 19 insertions, 19 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c95aaad7e3..31afa47325 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -332,6 +332,7 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), State2 = queue_blocked(QPid, State1), State3 = handle_consuming_queue_down(QPid, State2), + rabbit_flow:receiver_down(QPid), erase_queue_stats(QPid), noreply(State3#ch{queue_monitors = dict:erase(QPid, State3#ch.queue_monitors)}); @@ -1150,31 +1151,19 @@ consumer_monitor(ConsumerTag, end. monitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> - case (not dict:is_key(QPid, QMons) andalso - queue_monitor_needed(QPid, State)) of + case not dict:is_key(QPid, QMons) of true -> MRef = erlang:monitor(process, QPid), State#ch{queue_monitors = dict:store(QPid, MRef, QMons)}; false -> State end. demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> - case (dict:is_key(QPid, QMons) andalso - not queue_monitor_needed(QPid, State)) of + case dict:is_key(QPid, QMons) of true -> true = erlang:demonitor(dict:fetch(QPid, QMons)), State#ch{queue_monitors = dict:erase(QPid, QMons)}; false -> State end. -queue_monitor_needed(QPid, #ch{queue_consumers = QCons, - blocking = Blocking, - unconfirmed_qm = UQM} = State) -> - StatsEnabled = rabbit_event:stats_level( - State, #ch.stats_timer) =:= fine, - ConsumerMonitored = dict:is_key(QPid, QCons), - QueueBlocked = sets:is_element(QPid, Blocking), - ConfirmMonitored = gb_trees:is_defined(QPid, UQM), - StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored. - handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> MsgSeqNos = case gb_trees:lookup(QPid, UQM) of {value, MsgSet} -> gb_sets:to_list(MsgSet); diff --git a/src/rabbit_flow.erl b/src/rabbit_flow.erl index cf7035f2a1..9759e72624 100644 --- a/src/rabbit_flow.erl +++ b/src/rabbit_flow.erl @@ -16,10 +16,10 @@ -module(rabbit_flow). --define(MAX_CREDIT, 100). --define(MORE_CREDIT_AT, 50). +-define(MAX_CREDIT, 200). +-define(MORE_CREDIT_AT, 150). --export([ack/1, bump/1, blocked/0, send/1]). +-export([ack/1, bump/1, blocked/0, send/1, receiver_down/1]). %% There are two "flows" here; of messages and of credit, going in %% opposite directions. The variable names "From" and "To" refer to @@ -51,13 +51,17 @@ blocked() -> get(credit_blocked, []) =/= []. send(From) -> - Credit = get({credit_from, From}, ?MAX_CREDIT) - 1, + Credit = get_credit(From) - 1, case Credit of 0 -> block(From); _ -> ok end, put({credit_from, From}, Credit). +receiver_down(From) -> + unblock(From), + put({credit_from, From}, quiescing). + %% -------------------------------------------------------------------------- grant(To, Quantity) -> @@ -85,3 +89,9 @@ get(Key, Default) -> undefined -> Default; Value -> Value end. + +get_credit(From) -> + case get({credit_from, From}, ?MAX_CREDIT) of + quiescing -> 1; + Credit -> Credit + end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 072f378d29..bc25e8174c 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -428,7 +428,8 @@ handle_dependent_exit(ChPid, Reason, State) -> channel_cleanup(ChPid) -> case get({ch_pid, ChPid}) of undefined -> undefined; - {Channel, MRef} -> erase({channel, Channel}), + {Channel, MRef} -> rabbit_flow:receiver_down(ChPid), + erase({channel, Channel}), erase({ch_pid, ChPid}), erlang:demonitor(MRef, [flush]), Channel |
