diff options
| -rw-r--r-- | src/rabbit_channel.erl | 124 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 29 |
2 files changed, 86 insertions, 67 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b97af6d8ca..f8f099f674 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,8 +35,9 @@ -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, conn_name, limiter, tx_status, next_tag, unacked_message_q, uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, - virtual_host, most_recently_declared_queue, queue_monitors, - consumer_mapping, blocking, queue_consumers, delivering_queues, + virtual_host, most_recently_declared_queue, + queue_names, queue_monitors, consumer_mapping, + blocking, queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed, confirmed, capabilities, trace_state}). @@ -194,6 +195,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, + queue_names = dict:new(), queue_monitors = pmon:new(), consumer_mapping = dict:new(), blocking = sets:new(), @@ -334,9 +336,13 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State3 = handle_consuming_queue_down(QPid, State2), State4 = handle_delivering_queue_down(QPid, State3), credit_flow:peer_down(QPid), - erase_queue_stats(QPid), - noreply(State4#ch{queue_monitors = pmon:erase( - QPid, State4#ch.queue_monitors)}); + #ch{queue_names = QNames, queue_monitors = QMons} = State4, + case dict:find(QPid, QNames) of + {ok, QName} -> erase_queue_stats(QName); + error -> ok + end, + noreply(State4#ch{queue_names = dict:erase(QPid, QNames), + queue_monitors = pmon:erase(QPid, QMons)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -677,7 +683,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, - Msg = {_QName, QPid, _MsgId, Redelivered, + Msg = {QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}}} -> @@ -689,7 +695,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, message_count = MessageCount}, Content), - State1 = monitor_delivering_queue(NoAck, QPid, State), + State1 = monitor_delivering_queue(NoAck, QPid, QName, State), {noreply, record_sent(none, not(NoAck), Msg, State1)}; empty -> {reply, #'basic.get_empty'{}, State} @@ -728,10 +734,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag = ActualConsumerTag})), Q} end) of - {ok, Q = #amqqueue{pid = QPid}} -> + {ok, Q = #amqqueue{pid = QPid, name = QName}} -> CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping), State1 = monitor_delivering_queue( - NoAck, QPid, State#ch{consumer_mapping = CM1}), + NoAck, QPid, QName, + State#ch{consumer_mapping = CM1}), {noreply, case NoWait of true -> consumer_monitor(ActualConsumerTag, State1); @@ -1126,9 +1133,12 @@ consumer_monitor(ConsumerTag, State end. -monitor_delivering_queue(NoAck, QPid, State = #ch{queue_monitors = QMons, - delivering_queues = DQ}) -> - State#ch{queue_monitors = pmon:monitor(QPid, QMons), +monitor_delivering_queue(NoAck, QPid, QName, + State = #ch{queue_names = QNames, + queue_monitors = QMons, + delivering_queues = DQ}) -> + State#ch{queue_names = dict:store(QPid, QName, QNames), + queue_monitors = pmon:monitor(QPid, QMons), delivering_queues = case NoAck of true -> DQ; false -> sets:add_element(QPid, DQ) @@ -1233,18 +1243,18 @@ reject(Requeue, Acked, Limiter) -> ok = notify_limiter(Limiter, Acked). record_sent(ConsumerTag, AckRequired, - Msg = {_QName, QPid, MsgId, Redelivered, _Message}, + Msg = {QName, QPid, MsgId, Redelivered, _Message}, State = #ch{unacked_message_q = UAMQ, next_tag = DeliveryTag, trace_state = TraceState}) -> - incr_stats([{queue_stats, QPid, 1}], case {ConsumerTag, AckRequired} of - {none, true} -> get; - {none, false} -> get_no_ack; - {_ , true} -> deliver; - {_ , false} -> deliver_no_ack - end, State), + incr_stats([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of + {none, true} -> get; + {none, false} -> get_no_ack; + {_ , true} -> deliver; + {_ , false} -> deliver_no_ack + end, State), case Redelivered of - true -> incr_stats([{queue_stats, QPid, 1}], redeliver, State); + true -> incr_stats([{queue_stats, QName, 1}], redeliver, State); false -> ok end, rabbit_trace:tap_trace_out(Msg, TraceState), @@ -1277,11 +1287,15 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> precondition_failed("unknown delivery tag ~w", [DeliveryTag]) end. -ack(Acked, State) -> +ack(Acked, State = #ch{queue_names = QNames}) -> Incs = fold_per_queue( fun (QPid, MsgIds, L) -> ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), - [{queue_stats, QPid, length(MsgIds)} | L] + case dict:find(QPid, QNames) of + {ok, QName} -> Count = length(MsgIds), + [{queue_stats, QName, Count} | L]; + error -> L + end end, [], Acked), ok = notify_limiter(State#ch.limiter, Acked), incr_stats(Incs, ack, State). @@ -1339,23 +1353,30 @@ notify_limiter(Limiter, Acked) -> deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ exchange_name = XName}, msg_seq_no = MsgSeqNo}, - QNames}, State) -> - {RoutingRes, DeliveredQPids} = - rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery), - State1 = State#ch{queue_monitors = - pmon:monitor_all(DeliveredQPids, - State#ch.queue_monitors)}, - State2 = process_routing_result(RoutingRes, DeliveredQPids, - XName, MsgSeqNo, Message, State1), + DelQNames}, State = #ch{queue_names = QNames, + queue_monitors = QMons}) -> + Qs = rabbit_amqqueue:lookup(DelQNames), + {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(Qs, Delivery), + {QNames1, QMons1} = + lists:foldl(fun (#amqqueue{pid = QPid, name = QName}, + {QNames0, QMons0}) -> + {dict:store(QPid, QName, QNames0), + pmon:monitor(QPid, QMons0)} + end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs), + State1 = process_routing_result(RoutingRes, DeliveredQPids, + XName, MsgSeqNo, Message, + State#ch{queue_names = QNames1, + queue_monitors = QMons1}), incr_stats([{exchange_stats, XName, 1} | - [{queue_exchange_stats, {QPid, XName}, 1} || - QPid <- DeliveredQPids]], publish, State2), - State2. + [{queue_exchange_stats, {QName, XName}, 1} || + QPid <- DeliveredQPids, + {ok, QName} <- [dict:find(QPid, QNames1)]]], + publish, State1), + State1. process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), - incr_stats([{exchange_stats, Msg#basic_message.exchange_name, 1}], - return_unroutable, State), + incr_stats([{exchange_stats, XName, 1}], return_unroutable, State), record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); @@ -1489,24 +1510,23 @@ emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> - CoarseStats = infos(?STATISTICS_KEYS, State), + Coarse = infos(?STATISTICS_KEYS, State), case rabbit_event:stats_level(State, #ch.stats_timer) of - coarse -> - rabbit_event:notify(channel_stats, Extra ++ CoarseStats); - fine -> - FineStats = - [{channel_queue_stats, - [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]}, - {channel_exchange_stats, - [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]}, - {channel_queue_exchange_stats, - [{QX, Stats} || - {{queue_exchange_stats, QX}, Stats} <- get()]}], - rabbit_event:notify(channel_stats, - Extra ++ CoarseStats ++ FineStats) + coarse -> rabbit_event:notify(channel_stats, Extra ++ Coarse); + fine -> Fine = [{channel_queue_stats, + [{QName, Stats} || + {{queue_stats, QName}, Stats} <- get()]}, + {channel_exchange_stats, + [{XName, Stats} || + {{exchange_stats, XName}, Stats} <- get()]}, + {channel_queue_exchange_stats, + [{QX, Stats} || + {{queue_exchange_stats, QX}, Stats} <- get()]}], + rabbit_event:notify(channel_stats, Extra ++ Coarse ++ Fine) end. -erase_queue_stats(QPid) -> - erase({queue_stats, QPid}), +erase_queue_stats(QName) -> + erase({queue_stats, QName}), [erase({queue_exchange_stats, QX}) || - {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. + {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(), + QName0 =:= QName]. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index e25843c313..8b2f3b3ad7 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1286,8 +1286,7 @@ test_statistics() -> QName = receive #'queue.declare_ok'{queue = Q0} -> Q0 after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok) end, - {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)), - QPid = Q#amqqueue.pid, + QRes = rabbit_misc:r(<<"/">>, queue, QName), X = rabbit_misc:r(<<"/">>, exchange, <<"">>), rabbit_tests_event_receiver:start(self(), [node()], [channel_stats]), @@ -1311,9 +1310,9 @@ test_statistics() -> length(proplists:get_value( channel_queue_exchange_stats, E)) > 0 end), - [{QPid,[{get,1}]}] = proplists:get_value(channel_queue_stats, Event2), + [{QRes, [{get,1}]}] = proplists:get_value(channel_queue_stats, Event2), [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event2), - [{{QPid,X},[{publish,1}]}] = + [{{QRes,X},[{publish,1}]}] = proplists:get_value(channel_queue_exchange_stats, Event2), %% Check the stats remove stuff on queue deletion @@ -1338,31 +1337,31 @@ test_refresh_events(SecondaryNode) -> [channel_created, queue_created]), {_Writer, Ch} = test_spawn(), - expect_events(Ch, channel_created), + expect_events(pid, Ch, channel_created), rabbit_channel:shutdown(Ch), {_Writer2, Ch2} = test_spawn(SecondaryNode), - expect_events(Ch2, channel_created), + expect_events(pid, Ch2, channel_created), rabbit_channel:shutdown(Ch2), - {new, #amqqueue { pid = QPid } = Q} = + {new, #amqqueue{name = QName} = Q} = rabbit_amqqueue:declare(test_queue(), false, false, [], none), - expect_events(QPid, queue_created), + expect_events(name, QName, queue_created), rabbit_amqqueue:delete(Q, false, false), rabbit_tests_event_receiver:stop(), passed. -expect_events(Pid, Type) -> - expect_event(Pid, Type), +expect_events(Tag, Key, Type) -> + expect_event(Tag, Key, Type), rabbit:force_event_refresh(), - expect_event(Pid, Type). + expect_event(Tag, Key, Type). -expect_event(Pid, Type) -> +expect_event(Tag, Key, Type) -> receive #event{type = Type, props = Props} -> - case pget(pid, Props) of - Pid -> ok; - _ -> expect_event(Pid, Type) + case pget(Tag, Props) of + Key -> ok; + _ -> expect_event(Tag, Key, Type) end after ?TIMEOUT -> throw({failed_to_receive_event, Type}) end. |
