summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl21
1 files changed, 9 insertions, 12 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index cac622f856..0a9643cf4a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -194,7 +194,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- queue_monitors = sets:new(),
+ queue_monitors = pmon:new(),
consumer_mapping = dict:new(),
blocking = sets:new(),
queue_consumers = dict:new(),
@@ -334,8 +334,8 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State3 = handle_consuming_queue_down(QPid, State2),
credit_flow:peer_down(QPid),
erase_queue_stats(QPid),
- noreply(State3#ch{queue_monitors =
- sets:del_element(QPid, State3#ch.queue_monitors)});
+ noreply(State3#ch{queue_monitors = pmon:erase(
+ QPid, State3#ch.queue_monitors)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -1128,6 +1128,7 @@ handle_method(_MethodRecord, _Content, _State) ->
consumer_monitor(ConsumerTag,
State = #ch{consumer_mapping = ConsumerMapping,
+ queue_monitors = QMons,
queue_consumers = QCons,
capabilities = Capabilities}) ->
case rabbit_misc:table_lookup(
@@ -1140,18 +1141,12 @@ consumer_monitor(ConsumerTag,
end,
gb_sets:singleton(ConsumerTag),
QCons),
- monitor_queue(QPid, State#ch{queue_consumers = QCons1});
+ State#ch{queue_monitors = pmon:monitor(QPid, QMons),
+ queue_consumers = QCons1};
_ ->
State
end.
-monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
- case sets:is_element(QPid, QMons) of
- false -> erlang:monitor(process, QPid),
- State#ch{queue_monitors = sets:add_element(QPid, QMons)};
- true -> State
- end.
-
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
{value, MsgSet} -> gb_sets:to_list(MsgSet);
@@ -1370,7 +1365,9 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
QNames}, State) ->
{RoutingRes, DeliveredQPids} =
rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery),
- State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids),
+ State1 = State#ch{queue_monitors =
+ pmon:monitor_all(DeliveredQPids,
+ State#ch.queue_monitors)},
State2 = process_routing_result(RoutingRes, DeliveredQPids,
XName, MsgSeqNo, Message, State1),
maybe_incr_stats([{XName, 1} |