summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAyanda Dube <ayanda.dube@erlang-solutions.com>2015-09-08 14:31:00 +0100
committerAyanda Dube <ayanda.dube@erlang-solutions.com>2015-10-07 08:46:40 +0100
commit58733568bac91c141911a12c03c653c7d27ce2d0 (patch)
tree41ced76d123406926d1de78fa60c8dbfb7cc64a3 /src
parent4eff6cf0cb5b08ba829b20aa1a95f6f87f9c7c05 (diff)
downloadrabbitmq-server-git-58733568bac91c141911a12c03c653c7d27ce2d0.tar.gz
Adds info_all/4 for dynamic listing of queues.
Adds consumers_all/4 for dynamic listing of consumers References #62
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl30
1 files changed, 28 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 65e4255a73..997252c371 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -24,10 +24,11 @@
assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, requeue/3, ack/3, reject/4]).
--export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
+-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
+ info_all/4]).
-export([list_down/1]).
-export([force_event_refresh/1, notify_policy_changed/1]).
--export([consumers/1, consumers_all/1, consumer_info_keys/0]).
+-export([consumers/1, consumers_all/1, consumers_all/3, consumer_info_keys/0]).
-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
@@ -118,6 +119,8 @@
-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys())
-> [rabbit_types:infos()]).
+-spec(info_all/4 :: (rabbit_types:vhost(), rabbit_types:info_keys(),
+ reference()) -> 'ok').
-spec(force_event_refresh/1 :: (reference()) -> 'ok').
-spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(consumers/1 :: (rabbit_types:amqqueue())
@@ -128,6 +131,10 @@
(rabbit_types:vhost())
-> [{name(), pid(), rabbit_types:ctag(), boolean(),
non_neg_integer(), rabbit_framing:amqp_table()}]).
+-spec(consumers_all/3 ::
+ (rabbit_types:vhost(), reference(), pid())
+ -> [{reference(), {name(), pid(), rabbit_types:ctag(), boolean(),
+ non_neg_integer(), rabbit_framing:amqp_table()}}]).
-spec(stat/1 ::
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
@@ -586,6 +593,12 @@ info_all(VHostPath, Items) ->
map(list(VHostPath), fun (Q) -> info(Q, Items) end) ++
map(list_down(VHostPath), fun (Q) -> info_down(Q, Items, down) end).
+info_all(VHostPath, Items, Ref, Pid) ->
+ map(list(VHostPath), fun (Q) -> Pid ! {Ref, info(Q, Items)} end) ++
+ map(list_down(VHostPath), fun (Q) -> Pid ! {Ref, info_down(Q, Items, down)} end),
+ Pid ! {Ref, finished},
+ ok.
+
force_event_refresh(Ref) ->
[gen_server2:cast(Q#amqqueue.pid,
{force_event_refresh, Ref}) || Q <- list()],
@@ -609,6 +622,19 @@ consumers_all(VHostPath) ->
{ChPid, CTag, AckRequired, Prefetch, Args} <- consumers(Q)]
end)).
+consumers_all(VHostPath, Ref, Pid) ->
+ ConsumerInfoKeys=consumer_info_keys(),
+ lists:append(
+ map(list(VHostPath),
+ fun (Q) ->
+ Pid ! {Ref, [lists:zip(
+ ConsumerInfoKeys,
+ [Q#amqqueue.name, ChPid, CTag,
+ AckRequired, Prefetch, Args]) ||
+ {ChPid, CTag, AckRequired, Prefetch, Args}
+ <- consumers(Q)]}
+ end)).
+
stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat).
delete_immediately(QPids) ->