diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 35 |
1 files changed, 15 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4fe57699dc..d208c1e9e6 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -593,11 +593,11 @@ info_all(VHostPath, Items) -> map(list_down(VHostPath), fun (Q) -> info_down(Q, Items, down) end). info_all(VHostPath, Items, Ref, AggregatorPid) -> - map(list(VHostPath), fun (Q) -> AggregatorPid ! {Ref, info(Q, Items)} end) ++ - map(list_down(VHostPath), - fun (Q) -> AggregatorPid ! {Ref, info_down(Q, Items, down)} end), - AggregatorPid ! {Ref, finished}, - ok. + rabbit_control_main:emitting_map( + AggregatorPid, Ref, fun(Q) -> info(Q, Items) end, list(VHostPath), + continue), + rabbit_control_main:emitting_map( + AggregatorPid, Ref, fun(Q) -> info_down(Q, Items) end, list(VHostPath)). force_event_refresh(Ref) -> [gen_server2:cast(Q#amqqueue.pid, @@ -615,27 +615,22 @@ consumers_all(VHostPath) -> ConsumerInfoKeys = consumer_info_keys(), lists:append( map(list(VHostPath), - fun (Q) -> - [lists:zip( - ConsumerInfoKeys, - [Q#amqqueue.name, ChPid, CTag, AckRequired, Prefetch, Args]) || - {ChPid, CTag, AckRequired, Prefetch, Args} <- consumers(Q)] - end)). + fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end)). consumers_all(VHostPath, Ref, AggregatorPid) -> ConsumerInfoKeys = consumer_info_keys(), - map(list(VHostPath), - fun (Q) -> AggregatorPid ! - {Ref, get_queue_consumer_info(Q, ConsumerInfoKeys)} - end), - AggregatorPid ! {Ref, finished}, + rabbit_control_main:emitting_map( + AggregatorPid, Ref, + fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end, + list(VHostPath)), ok. get_queue_consumer_info(Q, ConsumerInfoKeys) -> - lists:flatten([lists:zip(ConsumerInfoKeys, - [Q#amqqueue.name, ChPid, CTag, - AckRequired, Prefetch, Args]) || - {ChPid, CTag, AckRequired, Prefetch, Args} <- consumers(Q)]). + lists:flatten( + [lists:zip(ConsumerInfoKeys, + [Q#amqqueue.name, ChPid, CTag, + AckRequired, Prefetch, Args]) || + {ChPid, CTag, AckRequired, Prefetch, Args} <- consumers(Q)]). stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat). |
