summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl56
1 files changed, 55 insertions, 1 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index cf5d1ab019..9899964709 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -37,7 +37,7 @@
-export([start_link/5, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2]).
--export([list/0]).
+-export([list/0, info/1, info/2, info_all/0, info_all/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
@@ -51,6 +51,15 @@
-define(MAX_PERMISSION_CACHE_SIZE, 12).
+-define(INFO_KEYS,
+ [pid,
+ connection,
+ user,
+ vhost,
+ transactional,
+ consumer_count,
+ messages_unacknowledged]).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -64,6 +73,10 @@
-spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
+-spec(info/1 :: (pid()) -> [info()]).
+-spec(info/2 :: (pid(), [info_key()]) -> [info()]).
+-spec(info_all/0 :: () -> [[info()]]).
+-spec(info_all/1 :: ([info_key()]) -> [[info()]]).
-endif.
@@ -96,6 +109,21 @@ conserve_memory(Pid, Conserve) ->
list() ->
pg_local:get_members(rabbit_channels).
+info(Pid) ->
+ gen_server2:pcall(Pid, 9, info, infinity).
+
+info(Pid, Items) ->
+ case gen_server2:pcall(Pid, 9, {info, Items}, infinity) of
+ {ok, Res} -> Res;
+ {error, Error} -> throw(Error)
+ end.
+
+info_all() ->
+ rabbit_misc:filter_exit_map(fun (C) -> info(C) end, list()).
+
+info_all(Items) ->
+ rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).
+
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
@@ -118,6 +146,15 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new()}}.
+handle_call(info, _From, State) ->
+ reply(infos(?INFO_KEYS, State), State);
+
+handle_call({info, Items}, _From, State) ->
+ try
+ reply({ok, infos(Items, State)}, State)
+ catch Error -> reply({error, Error}, State)
+ end;
+
handle_call(_Request, _From, State) ->
noreply(State).
@@ -190,6 +227,8 @@ code_change(_OldVsn, State, _Extra) ->
%%---------------------------------------------------------------------------
+reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}.
+
noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
return_ok(State, true, _Msg) -> {noreply, State};
@@ -958,3 +997,18 @@ terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) ->
pg_local:leave(rabbit_channels, self()),
rabbit_writer:shutdown(WriterPid),
rabbit_limiter:shutdown(LimiterPid).
+
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+
+i(pid, _) -> self();
+i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid;
+i(user, #ch{username = Username}) -> Username;
+i(vhost, #ch{virtual_host = VHost}) -> VHost;
+i(transactional, #ch{transaction_id = none}) -> false;
+i(transactional, #ch{transaction_id = _}) -> true;
+i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
+ dict:size(ConsumerMapping);
+i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
+ queue:len(UAMQ);
+i(Item, _) ->
+ throw({bad_argument, Item}).