diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2012-01-16 15:59:52 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-01-16 15:59:52 +0000 |
| commit | dbb370029825f5a0f5a50d30c4b76234abba9b95 (patch) | |
| tree | 782c96072c37573efd8d13cc32e61a2de1a3d936 /src | |
| parent | f92fa9559b4119937cc4dfe3ec59f19516dd8d63 (diff) | |
| download | rabbitmq-server-git-dbb370029825f5a0f5a50d30c4b76234abba9b95.tar.gz | |
simplify publisher monitoring code
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 39 |
1 files changed, 11 insertions, 28 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b2279b3088..989a801155 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -327,19 +327,6 @@ lookup_ch(ChPid) -> C -> C end. -lookup_ch_publisher(ChPid) -> - case get({ch_publisher, ChPid}) of - undefined -> not_found; - C -> C - end. - -ch_record_publisher(ChPid) -> - Key = {ch_publisher, ChPid}, - case get(Key) of - undefined -> put(Key, erlang:monitor(process, ChPid)); - _ -> ok - end. - ch_record(ChPid) -> Key = {ch, ChPid}, case get(Key) of @@ -378,12 +365,6 @@ erase_ch_record(#cr{ch_pid = ChPid, erase({ch, ChPid}), ok. -erase_ch_record_publisher(ChPid) -> - MRef = get({ch_publisher, ChPid}), - erlang:demonitor(MRef), - erase({ch_publisher, ChPid}), - ok. - update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) -> ok = rabbit_limiter:register(Limiter, self()), update_ch_record(C#cr{consumer_count = 1}); @@ -617,7 +598,12 @@ should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> - handle_ch_publisher_down(DownPid), + case get({ch_publisher, DownPid}) of + undefined -> ok; + MRef -> erlang:demonitor(MRef), + erase({ch_publisher, DownPid}), + credit_flow:peer_down(DownPid) + end, case lookup_ch(DownPid) of not_found -> {ok, State}; @@ -640,13 +626,6 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> end end. -handle_ch_publisher_down(DownPid) -> - case lookup_ch_publisher(DownPid) of - not_found -> ok; - _ -> erase_ch_record_publisher(DownPid), - credit_flow:peer_down(DownPid) - end. - check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) -> in_use; check_exclusive_access(none, false, _State) -> @@ -1048,7 +1027,11 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. case Flow of - flow -> ch_record_publisher(Sender), + flow -> Key = {ch_publisher, Sender}, + case get(Key) of + undefined -> put(Key, erlang:monitor(process, Sender)); + _ -> ok + end, credit_flow:ack(Sender); noflow -> ok end, |
