summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl39
1 files changed, 11 insertions, 28 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b2279b3088..989a801155 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -327,19 +327,6 @@ lookup_ch(ChPid) ->
C -> C
end.
-lookup_ch_publisher(ChPid) ->
- case get({ch_publisher, ChPid}) of
- undefined -> not_found;
- C -> C
- end.
-
-ch_record_publisher(ChPid) ->
- Key = {ch_publisher, ChPid},
- case get(Key) of
- undefined -> put(Key, erlang:monitor(process, ChPid));
- _ -> ok
- end.
-
ch_record(ChPid) ->
Key = {ch, ChPid},
case get(Key) of
@@ -378,12 +365,6 @@ erase_ch_record(#cr{ch_pid = ChPid,
erase({ch, ChPid}),
ok.
-erase_ch_record_publisher(ChPid) ->
- MRef = get({ch_publisher, ChPid}),
- erlang:demonitor(MRef),
- erase({ch_publisher, ChPid}),
- ok.
-
update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) ->
ok = rabbit_limiter:register(Limiter, self()),
update_ch_record(C#cr{consumer_count = 1});
@@ -617,7 +598,12 @@ should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
- handle_ch_publisher_down(DownPid),
+ case get({ch_publisher, DownPid}) of
+ undefined -> ok;
+ MRef -> erlang:demonitor(MRef),
+ erase({ch_publisher, DownPid}),
+ credit_flow:peer_down(DownPid)
+ end,
case lookup_ch(DownPid) of
not_found ->
{ok, State};
@@ -640,13 +626,6 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
end
end.
-handle_ch_publisher_down(DownPid) ->
- case lookup_ch_publisher(DownPid) of
- not_found -> ok;
- _ -> erase_ch_record_publisher(DownPid),
- credit_flow:peer_down(DownPid)
- end.
-
check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
check_exclusive_access(none, false, _State) ->
@@ -1048,7 +1027,11 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
case Flow of
- flow -> ch_record_publisher(Sender),
+ flow -> Key = {ch_publisher, Sender},
+ case get(Key) of
+ undefined -> put(Key, erlang:monitor(process, Sender));
+ _ -> ok
+ end,
credit_flow:ack(Sender);
noflow -> ok
end,