summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-01-16 15:59:52 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-01-16 15:59:52 +0000
commitdbb370029825f5a0f5a50d30c4b76234abba9b95 (patch)
tree782c96072c37573efd8d13cc32e61a2de1a3d936 /src
parentf92fa9559b4119937cc4dfe3ec59f19516dd8d63 (diff)
downloadrabbitmq-server-git-dbb370029825f5a0f5a50d30c4b76234abba9b95.tar.gz
simplify publisher monitoring code
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl39
1 files changed, 11 insertions, 28 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b2279b3088..989a801155 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -327,19 +327,6 @@ lookup_ch(ChPid) ->
C -> C
end.
-lookup_ch_publisher(ChPid) ->
- case get({ch_publisher, ChPid}) of
- undefined -> not_found;
- C -> C
- end.
-
-ch_record_publisher(ChPid) ->
- Key = {ch_publisher, ChPid},
- case get(Key) of
- undefined -> put(Key, erlang:monitor(process, ChPid));
- _ -> ok
- end.
-
ch_record(ChPid) ->
Key = {ch, ChPid},
case get(Key) of
@@ -378,12 +365,6 @@ erase_ch_record(#cr{ch_pid = ChPid,
erase({ch, ChPid}),
ok.
-erase_ch_record_publisher(ChPid) ->
- MRef = get({ch_publisher, ChPid}),
- erlang:demonitor(MRef),
- erase({ch_publisher, ChPid}),
- ok.
-
update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) ->
ok = rabbit_limiter:register(Limiter, self()),
update_ch_record(C#cr{consumer_count = 1});
@@ -617,7 +598,12 @@ should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
- handle_ch_publisher_down(DownPid),
+ case get({ch_publisher, DownPid}) of
+ undefined -> ok;
+ MRef -> erlang:demonitor(MRef),
+ erase({ch_publisher, DownPid}),
+ credit_flow:peer_down(DownPid)
+ end,
case lookup_ch(DownPid) of
not_found ->
{ok, State};
@@ -640,13 +626,6 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
end
end.
-handle_ch_publisher_down(DownPid) ->
- case lookup_ch_publisher(DownPid) of
- not_found -> ok;
- _ -> erase_ch_record_publisher(DownPid),
- credit_flow:peer_down(DownPid)
- end.
-
check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
check_exclusive_access(none, false, _State) ->
@@ -1048,7 +1027,11 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
case Flow of
- flow -> ch_record_publisher(Sender),
+ flow -> Key = {ch_publisher, Sender},
+ case get(Key) of
+ undefined -> put(Key, erlang:monitor(process, Sender));
+ _ -> ok
+ end,
credit_flow:ack(Sender);
noflow -> ok
end,