diff options
| -rw-r--r-- | src/rabbit_channel.erl | 21 |
1 files changed, 9 insertions, 12 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index cac622f856..0a9643cf4a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -194,7 +194,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, - queue_monitors = sets:new(), + queue_monitors = pmon:new(), consumer_mapping = dict:new(), blocking = sets:new(), queue_consumers = dict:new(), @@ -334,8 +334,8 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State3 = handle_consuming_queue_down(QPid, State2), credit_flow:peer_down(QPid), erase_queue_stats(QPid), - noreply(State3#ch{queue_monitors = - sets:del_element(QPid, State3#ch.queue_monitors)}); + noreply(State3#ch{queue_monitors = pmon:erase( + QPid, State3#ch.queue_monitors)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -1128,6 +1128,7 @@ handle_method(_MethodRecord, _Content, _State) -> consumer_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, + queue_monitors = QMons, queue_consumers = QCons, capabilities = Capabilities}) -> case rabbit_misc:table_lookup( @@ -1140,18 +1141,12 @@ consumer_monitor(ConsumerTag, end, gb_sets:singleton(ConsumerTag), QCons), - monitor_queue(QPid, State#ch{queue_consumers = QCons1}); + State#ch{queue_monitors = pmon:monitor(QPid, QMons), + queue_consumers = QCons1}; _ -> State end. -monitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> - case sets:is_element(QPid, QMons) of - false -> erlang:monitor(process, QPid), - State#ch{queue_monitors = sets:add_element(QPid, QMons)}; - true -> State - end. - handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> MsgSeqNos = case gb_trees:lookup(QPid, UQM) of {value, MsgSet} -> gb_sets:to_list(MsgSet); @@ -1370,7 +1365,9 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ QNames}, State) -> {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery), - State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids), + State1 = State#ch{queue_monitors = + pmon:monitor_all(DeliveredQPids, + State#ch.queue_monitors)}, State2 = process_routing_result(RoutingRes, DeliveredQPids, XName, MsgSeqNo, Message, State1), maybe_incr_stats([{XName, 1} | |
