summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@lshift.net>2008-11-09 23:32:13 +0000
committerEmile Joubert <emile@lshift.net>2008-11-09 23:32:13 +0000
commitbba6ec51b373650ce89038379a6fcb159b4f0723 (patch)
treed5d2aa923bce504f6e02c1200d9cbb6346c502cf
parentca3b1520af6f2d7a0502d550bb174493b0f7ccf9 (diff)
parent252620e5a067cdae5984757b6b395bad66e747b7 (diff)
downloadrabbitmq-server-git-bba6ec51b373650ce89038379a6fcb159b4f0723.tar.gz
Merged bug18381 into bug19684
-rw-r--r--src/rabbit_amqqueue.erl57
-rw-r--r--src/rabbit_amqqueue_process.erl56
-rw-r--r--src/rabbit_control.erl29
-rw-r--r--src/rabbit_misc.erl15
4 files changed, 155 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 56d2c35d94..6f846dafe7 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -27,13 +27,15 @@
-export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]).
-export([pseudo_queue/2]).
--export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1,
+-export([lookup/1, with/2, with_or_die/2, list/0, list_vhost_queues/1,
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
+-export([info/1, info/2, info_all/0, info_all/1]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2]).
-export([on_node_down/1]).
+-export([info_queue_sorted/1, info_queue_sorted/2]).
-import(mnesia).
-import(gen_server).
@@ -54,6 +56,9 @@
-type(qfun(A) :: fun ((amqqueue()) -> A)).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
+-type(info_key() :: atom()).
+-type(info() :: {info_key(), any()}).
+
-spec(start/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
-spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) ->
@@ -61,7 +66,16 @@
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
-spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A).
+-spec(list/0 :: () -> [amqqueue()]).
-spec(list_vhost_queues/1 :: (vhost()) -> [amqqueue()]).
+-spec(info/1 :: (amqqueue()) -> [info()]).
+-spec(info/2 ::
+ (amqqueue(), info_key()) -> info();
+ (amqqueue(), [info_key()]) -> [info()]).
+-spec(info_all/0 :: () -> [{amqqueue(), [info()]}]).
+-spec(info_all/1 ::
+ (info_key()) -> [{amqqueue(), info()}];
+ ([info_key()]) -> [{amqqueue(), [info()]}]).
-spec(stat/1 :: (amqqueue()) -> qstats()).
-spec(stat_all/0 :: () -> [qstats()]).
-spec(delete/3 ::
@@ -91,6 +105,8 @@
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
+-spec(info_queue_sorted/2 :: (info_key(), non_neg_integer()) -> [{amqqueue(), info()}]).
+-spec(info_queue_sorted/1 :: (info_key()) -> [{amqqueue(), info()}]).
-endif.
@@ -178,10 +194,34 @@ with_or_die(Name, F) ->
not_found, "no ~s", [rabbit_misc:rs(Name)])
end).
+list() -> rabbit_misc:dirty_read_all(amqqueue).
+
+map(F) ->
+ %% TODO: there is scope for optimisation here, e.g. using a
+ %% cursor, parallelising the function invocation
+ Ref = make_ref(),
+ lists:filter(fun (R) -> R =/= Ref end,
+ [rabbit_misc:with_exit_handler(
+ fun () -> Ref end,
+ fun () -> F(Q) end) || Q <- list()]).
+
list_vhost_queues(VHostPath) ->
mnesia:dirty_match_object(
#amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}).
+info(#amqqueue{ pid = QPid }) ->
+ gen_server:call(QPid, info).
+
+info(#amqqueue{ pid = QPid }, ItemOrItems) ->
+ case gen_server:call(QPid, {info, ItemOrItems}) of
+ {ok, Res} -> Res;
+ {error, Error} -> throw(Error)
+ end.
+
+info_all() -> map(fun (Q) -> {Q, info(Q)} end).
+
+info_all(ItemOrItems) -> map(fun (Q) -> {Q, info(Q, ItemOrItems)} end).
+
stat(#amqqueue{pid = QPid}) -> gen_server:call(QPid, stat).
stat_all() ->
@@ -298,3 +338,18 @@ safe_pmap_ok(H, F, L) ->
[] -> ok;
Errors -> {error, Errors}
end.
+
+
+info_lookup({#amqqueue{}, InfoTupleList}, InfoKey) ->
+ case lists:keysearch(InfoKey, 1, InfoTupleList) of
+ false -> {error, not_found};
+ {value, {InfoKey, InfoValue}} -> InfoValue
+ end.
+
+%% TODO: avoid sorting if Length is much less than the number of queues
+info_queue_sorted(InfoKey) ->
+ lists:sort(fun(A, B) -> info_lookup(A, InfoKey) > info_lookup(B, InfoKey) end,
+ rabbit_amqqueue:info_all()).
+info_queue_sorted(InfoKey, Length) ->
+ lists:sublist(info_queue_sorted(InfoKey), Length).
+
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e687df846a..b733d1146d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -61,6 +61,16 @@
is_overload_protection_active,
unsent_message_count}).
+-define(INFO_KEYS,
+ [messages_ready,
+ messages_unacknowledged,
+ messages_uncommitted,
+ messages,
+ acks_uncommitted,
+ consumers,
+ transactions,
+ memory]).
+
%%----------------------------------------------------------------------------
start_link(Q) ->
@@ -407,6 +417,9 @@ store_tx(Txn, Tx) ->
erase_tx(Txn) ->
erase({txn, Txn}).
+all_tx_record() ->
+ [T || {{txn, _}, T} <- get()].
+
all_tx() ->
[Txn || {{txn, Txn}, _} <- get()].
@@ -458,8 +471,51 @@ purge_message_buffer(QName, MessageBuffer) ->
%% artifically ack them.
persist_acks(none, QName, lists:append(Messages)).
+infos(Items, State) -> [{Item, info(Item, State)} || Item <- Items].
+
+info(messages_ready, #q{message_buffer = MessageBuffer}) ->
+ queue:len(MessageBuffer);
+info(messages_unacknowledged, _) ->
+ lists:sum([dict:size(UAM) ||
+ #cr{unacked_messages = UAM} <- all_ch_record()]);
+info(messages_uncommitted, _) ->
+ lists:sum([length(Pending) ||
+ #tx{pending_messages = Pending} <- all_tx_record()]);
+info(messages, State) ->
+ lists:sum([info(Item, State) || Item <- [messages_ready,
+ messages_unacknowledged,
+ messages_uncommitted]]);
+info(acks_uncommitted, _) ->
+ lists:sum([length(Pending) ||
+ #tx{pending_acks = Pending} <- all_tx_record()]);
+info(consumers, _) ->
+ lists:sum([length(Consumers) ||
+ #cr{consumers = Consumers} <- all_ch_record()]);
+info(transactions, _) ->
+ length(all_tx_record());
+info(memory, _) ->
+ {memory, M} = process_info(self(), memory),
+ M;
+info(Item, _) ->
+ throw({bad_argument, Item}).
+
%---------------------------------------------------------------------------
+handle_call(info, _From, State) ->
+ reply(infos(?INFO_KEYS, State), State);
+
+handle_call({info, Item}, _From, State) when is_atom(Item) ->
+ try
+ reply({ok, {Item, info(Item, State)}}, State)
+ catch Error -> reply({error, Error}, State)
+ end;
+
+handle_call({info, Items}, _From, State) when is_list(Items) ->
+ try
+ reply({ok, infos(Items, State)}, State)
+ catch Error -> reply({error, Error}, State)
+ end;
+
handle_call({deliver_immediately, Txn, Message}, _From, State) ->
%% Synchronous, "immediate" delivery mode
%%
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index bc588279b4..6589c384fa 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -89,6 +89,12 @@ Available commands:
list_user_vhosts <UserName>
list_vhost_users <VHostPath>
+ set_memory_threshold <Percentage>
+ get_memory_consumption
+
+ list_queues
+ list_deepest_queues
+
<node> should be the name of the master node of the RabbitMQ cluster. It
defaults to the node named \"rabbit\" on the local host. On a host named
\"server.example.com\", the master node will usually be rabbit@server (unless
@@ -179,7 +185,28 @@ action(list_user_vhosts, Node, Args = [_Username]) ->
action(list_vhost_users, Node, Args = [_VHostPath]) ->
io:format("Listing users for vhosts ~p...", Args),
- display_list(call(Node, {rabbit_access_control, list_vhost_users, Args})).
+ display_list(call(Node, {rabbit_access_control, list_vhost_users, Args}));
+
+action(set_memory_threshold, Node, Args) ->
+ io:format("% Setting memory threshold to ~p ...~n", Args),
+ rpc_call(Node, rabbit_misc, set_memory_threshold, Args);
+
+action(get_memory_consumption, Node, []) ->
+ io:format("% Getting memory consumption ...~n", []),
+ Res = rpc_call(Node, rabbit_misc, get_memory_consumption, []),
+ io:format("~p~n", [Res]);
+
+action(list_queues, Node, []) ->
+ io:format("Listing all queues ..."),
+ Res = rpc_call(Node, rabbit_amqqueue, info_all, []),
+ io:format("~p~n", [Res]),
+ ok;
+
+action(list_deepest_queues, Node, Args = [Depth]) ->
+ io:format("Listing ~p deepest queues ...~n", Args),
+ Res = rpc_call(Node, rabbit_amqqueue, info_queue_sorted, [messages, list_to_integer(Depth)]),
+ io:format("~p~n", [Res]),
+ ok.
display_list(L) when is_list(L) ->
lists:foreach(fun (I) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 7638af582d..a807c6e919 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -43,6 +43,7 @@
-export([guid/0, string_guid/1, binstring_guid/1]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([append_file/2]).
+-export([set_memory_threshold/1, get_memory_consumption/0]).
-import(mnesia).
-import(lists).
@@ -99,6 +100,8 @@
-spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}).
-spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}).
+-spec(get_memory_consumption/0 :: () -> ({non_neg_integer(), non_neg_integer()})).
+-spec(set_memory_threshold/1 :: (string()) -> 'ok' | {'error', any()}).
-endif.
%%----------------------------------------------------------------------------
@@ -369,3 +372,15 @@ append_file(File, _, Suffix) ->
{ok, Data} -> file:write_file([File, Suffix], Data, [append]);
Error -> Error
end.
+
+set_memory_threshold(Level) ->
+ case Res = string:to_float(Level) of
+ {error, _} -> Res;
+ {_Percentage, [_]} -> {error, unexpected_suffix};
+ {Percentage, _} -> memsup:set_sysmem_high_watermark(Percentage)
+ end.
+
+get_memory_consumption() ->
+ {Total, Allocated, _Worst} = memsup:get_memory_data(),
+ {Total, Allocated}.
+