diff options
| -rw-r--r-- | src/rabbit_channel.erl | 56 |
1 files changed, 55 insertions, 1 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index cf5d1ab019..9899964709 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -37,7 +37,7 @@ -export([start_link/5, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2]). --export([list/0]). +-export([list/0, info/1, info/2, info_all/0, info_all/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). @@ -51,6 +51,15 @@ -define(MAX_PERMISSION_CACHE_SIZE, 12). +-define(INFO_KEYS, + [pid, + connection, + user, + vhost, + transactional, + consumer_count, + messages_unacknowledged]). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -64,6 +73,10 @@ -spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(list/0 :: () -> [pid()]). +-spec(info/1 :: (pid()) -> [info()]). +-spec(info/2 :: (pid(), [info_key()]) -> [info()]). +-spec(info_all/0 :: () -> [[info()]]). +-spec(info_all/1 :: ([info_key()]) -> [[info()]]). -endif. @@ -96,6 +109,21 @@ conserve_memory(Pid, Conserve) -> list() -> pg_local:get_members(rabbit_channels). +info(Pid) -> + gen_server2:pcall(Pid, 9, info, infinity). + +info(Pid, Items) -> + case gen_server2:pcall(Pid, 9, {info, Items}, infinity) of + {ok, Res} -> Res; + {error, Error} -> throw(Error) + end. + +info_all() -> + rabbit_misc:filter_exit_map(fun (C) -> info(C) end, list()). + +info_all(Items) -> + rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost]) -> @@ -118,6 +146,15 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) -> most_recently_declared_queue = <<>>, consumer_mapping = dict:new()}}. +handle_call(info, _From, State) -> + reply(infos(?INFO_KEYS, State), State); + +handle_call({info, Items}, _From, State) -> + try + reply({ok, infos(Items, State)}, State) + catch Error -> reply({error, Error}, State) + end; + handle_call(_Request, _From, State) -> noreply(State). @@ -190,6 +227,8 @@ code_change(_OldVsn, State, _Extra) -> %%--------------------------------------------------------------------------- +reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}. + noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. return_ok(State, true, _Msg) -> {noreply, State}; @@ -958,3 +997,18 @@ terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> pg_local:leave(rabbit_channels, self()), rabbit_writer:shutdown(WriterPid), rabbit_limiter:shutdown(LimiterPid). + +infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. + +i(pid, _) -> self(); +i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid; +i(user, #ch{username = Username}) -> Username; +i(vhost, #ch{virtual_host = VHost}) -> VHost; +i(transactional, #ch{transaction_id = none}) -> false; +i(transactional, #ch{transaction_id = _}) -> true; +i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> + dict:size(ConsumerMapping); +i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> + queue:len(UAMQ); +i(Item, _) -> + throw({bad_argument, Item}). |
