diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-07-07 12:01:46 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-07-07 12:01:46 +0100 |
| commit | 0b2bc19e987644d69c49398c722fb896cb335cd8 (patch) | |
| tree | f68db015e8b4da8c445d2b272c42ac0cbcf5115b | |
| parent | 7329e6b266cd9f1a5b86cb7d049a04becb365dcc (diff) | |
| download | rabbitmq-server-git-0b2bc19e987644d69c49398c722fb896cb335cd8.tar.gz | |
Support querying slaves for their status
| -rw-r--r-- | docs/rabbitmqctl.1.xml | 8 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 23 |
4 files changed, 54 insertions, 10 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 717645220e..8ad476b25f 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -860,6 +860,14 @@ <listitem><para>Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.</para></listitem> </varlistentry> + <varlistentry> + <term>mirror_nodes</term> + <listitem><para>If the queue is mirrored, the nodes upon which mirrors will be present if the nodes are part of the current cluster.</para></listitem> + </varlistentry> + <varlistentry> + <term>slaves</term> + <listitem><para>If the queue is mirrored, this gives the status of slaves.</para></listitem> + </varlistentry> </variablelist> <para> If no <command>queueinfoitem</command>s are specified then queue name and depth are diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e388ccf203..cb8a485e58 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -74,8 +74,8 @@ messages, consumers, memory, - backing_queue_status, - slave_pids + slaves, + backing_queue_status ]). -define(CREATION_EVENT_KEYS, @@ -802,14 +802,26 @@ i(consumers, State) -> i(memory, _) -> {memory, M} = process_info(self(), memory), M; -i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> - BQ:status(BQS); -i(slave_pids, #q{q = #amqqueue{name = Name}}) -> - {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(Name), - SPids; i(mirror_nodes, #q{q = #amqqueue{name = Name}}) -> {ok, #amqqueue{mirror_nodes = MNodes}} = rabbit_amqqueue:lookup(Name), - MNodes; + case MNodes of + undefined -> ''; + _ -> MNodes + end; +i(slaves, #q{q = #amqqueue{name = Name}}) -> + {ok, #amqqueue{mirror_nodes = MNodes, + slave_pids = SPids}} = rabbit_amqqueue:lookup(Name), + case MNodes of + undefined -> + ''; + _ -> + {Results, _Bad} = + delegate:invoke( + SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end), + [Result || {_Pid, Result} <- Results] + end; +i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> + BQ:status(BQS); i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 6eb1aaba9a..548ad7fa9a 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -419,6 +419,11 @@ format_info_item([T | _] = Value) "[" ++ lists:nthtail(2, lists:append( [", " ++ format_info_item(E) || E <- Value])) ++ "]"; +format_info_item(Value) when is_tuple(Value) -> + List = tuple_to_list(Value), + "{" ++ + lists:nthtail(2, lists:append( + [", " ++ format_info_item(E) || E <- List])) ++ "}"; format_info_item(Value) -> io_lib:format("~w", [Value]). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index a4a40a8c94..bae68cbed2 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -33,7 +33,7 @@ %% All instructions from the GM group must be processed in the order %% in which they're received. --export([start_link/1, set_maximum_since_use/2]). +-export([start_link/1, set_maximum_since_use/2, info/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1, prioritise_call/3, @@ -47,6 +47,13 @@ -include("rabbit.hrl"). -include("gm_specs.hrl"). +-define(STATISTICS_KEYS, + [pid, + is_synchronised + ]). + +-define(INFO_KEYS, ?STATISTICS_KEYS). + -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(DEATH_TIMEOUT, 20000). %% 20 seconds @@ -75,6 +82,9 @@ start_link(Q) -> set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). +info(QPid) -> + gen_server2:call(QPid, info, infinity). + init([#amqqueue { name = QueueName } = Q]) -> process_flag(trap_exit, true), %% amqqueue_process traps exits too. {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), @@ -181,7 +191,10 @@ handle_call({run_backing_queue, Mod, Fun}, _From, State) -> handle_call({commit, _Txn, _ChPid}, _From, State) -> %% We don't support transactions in mirror queues - reply(ok, State). + reply(ok, State); + +handle_call(info, _From, State) -> + reply(infos(?INFO_KEYS, State), State). handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -337,6 +350,12 @@ inform_deaths(SPid, Deaths) -> %% Others %% --------------------------------------------------------------------------- +infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. + +i(pid, _State) -> self(); +i(is_synchronised, State) -> State #state.synchronised; +i(Item, _State) -> throw({bad_argument, Item}). + bq_init(BQ, Q, Recover) -> Self = self(), BQ:init(Q, Recover, |
