diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 57 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 56 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 15 |
4 files changed, 155 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 56d2c35d94..6f846dafe7 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -27,13 +27,15 @@ -export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]). -export([pseudo_queue/2]). --export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1, +-export([lookup/1, with/2, with_or_die/2, list/0, list_vhost_queues/1, stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). +-export([info/1, info/2, info_all/0, info_all/1]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2]). -export([on_node_down/1]). +-export([info_queue_sorted/1, info_queue_sorted/2]). -import(mnesia). -import(gen_server). @@ -54,6 +56,9 @@ -type(qfun(A) :: fun ((amqqueue()) -> A)). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). +-type(info_key() :: atom()). +-type(info() :: {info_key(), any()}). + -spec(start/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). -spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) -> @@ -61,7 +66,16 @@ -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). -spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A). +-spec(list/0 :: () -> [amqqueue()]). -spec(list_vhost_queues/1 :: (vhost()) -> [amqqueue()]). +-spec(info/1 :: (amqqueue()) -> [info()]). +-spec(info/2 :: + (amqqueue(), info_key()) -> info(); + (amqqueue(), [info_key()]) -> [info()]). +-spec(info_all/0 :: () -> [{amqqueue(), [info()]}]). +-spec(info_all/1 :: + (info_key()) -> [{amqqueue(), info()}]; + ([info_key()]) -> [{amqqueue(), [info()]}]). -spec(stat/1 :: (amqqueue()) -> qstats()). -spec(stat_all/0 :: () -> [qstats()]). -spec(delete/3 :: @@ -91,6 +105,8 @@ -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). +-spec(info_queue_sorted/2 :: (info_key(), non_neg_integer()) -> [{amqqueue(), info()}]). +-spec(info_queue_sorted/1 :: (info_key()) -> [{amqqueue(), info()}]). -endif. @@ -178,10 +194,34 @@ with_or_die(Name, F) -> not_found, "no ~s", [rabbit_misc:rs(Name)]) end). +list() -> rabbit_misc:dirty_read_all(amqqueue). + +map(F) -> + %% TODO: there is scope for optimisation here, e.g. using a + %% cursor, parallelising the function invocation + Ref = make_ref(), + lists:filter(fun (R) -> R =/= Ref end, + [rabbit_misc:with_exit_handler( + fun () -> Ref end, + fun () -> F(Q) end) || Q <- list()]). + list_vhost_queues(VHostPath) -> mnesia:dirty_match_object( #amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}). +info(#amqqueue{ pid = QPid }) -> + gen_server:call(QPid, info). + +info(#amqqueue{ pid = QPid }, ItemOrItems) -> + case gen_server:call(QPid, {info, ItemOrItems}) of + {ok, Res} -> Res; + {error, Error} -> throw(Error) + end. + +info_all() -> map(fun (Q) -> {Q, info(Q)} end). + +info_all(ItemOrItems) -> map(fun (Q) -> {Q, info(Q, ItemOrItems)} end). + stat(#amqqueue{pid = QPid}) -> gen_server:call(QPid, stat). stat_all() -> @@ -298,3 +338,18 @@ safe_pmap_ok(H, F, L) -> [] -> ok; Errors -> {error, Errors} end. + + +info_lookup({#amqqueue{}, InfoTupleList}, InfoKey) -> + case lists:keysearch(InfoKey, 1, InfoTupleList) of + false -> {error, not_found}; + {value, {InfoKey, InfoValue}} -> InfoValue + end. + +%% TODO: avoid sorting if Length is much less than the number of queues +info_queue_sorted(InfoKey) -> + lists:sort(fun(A, B) -> info_lookup(A, InfoKey) > info_lookup(B, InfoKey) end, + rabbit_amqqueue:info_all()). +info_queue_sorted(InfoKey, Length) -> + lists:sublist(info_queue_sorted(InfoKey), Length). + diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e687df846a..b733d1146d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -61,6 +61,16 @@ is_overload_protection_active, unsent_message_count}). +-define(INFO_KEYS, + [messages_ready, + messages_unacknowledged, + messages_uncommitted, + messages, + acks_uncommitted, + consumers, + transactions, + memory]). + %%---------------------------------------------------------------------------- start_link(Q) -> @@ -407,6 +417,9 @@ store_tx(Txn, Tx) -> erase_tx(Txn) -> erase({txn, Txn}). +all_tx_record() -> + [T || {{txn, _}, T} <- get()]. + all_tx() -> [Txn || {{txn, Txn}, _} <- get()]. @@ -458,8 +471,51 @@ purge_message_buffer(QName, MessageBuffer) -> %% artifically ack them. persist_acks(none, QName, lists:append(Messages)). +infos(Items, State) -> [{Item, info(Item, State)} || Item <- Items]. + +info(messages_ready, #q{message_buffer = MessageBuffer}) -> + queue:len(MessageBuffer); +info(messages_unacknowledged, _) -> + lists:sum([dict:size(UAM) || + #cr{unacked_messages = UAM} <- all_ch_record()]); +info(messages_uncommitted, _) -> + lists:sum([length(Pending) || + #tx{pending_messages = Pending} <- all_tx_record()]); +info(messages, State) -> + lists:sum([info(Item, State) || Item <- [messages_ready, + messages_unacknowledged, + messages_uncommitted]]); +info(acks_uncommitted, _) -> + lists:sum([length(Pending) || + #tx{pending_acks = Pending} <- all_tx_record()]); +info(consumers, _) -> + lists:sum([length(Consumers) || + #cr{consumers = Consumers} <- all_ch_record()]); +info(transactions, _) -> + length(all_tx_record()); +info(memory, _) -> + {memory, M} = process_info(self(), memory), + M; +info(Item, _) -> + throw({bad_argument, Item}). + %--------------------------------------------------------------------------- +handle_call(info, _From, State) -> + reply(infos(?INFO_KEYS, State), State); + +handle_call({info, Item}, _From, State) when is_atom(Item) -> + try + reply({ok, {Item, info(Item, State)}}, State) + catch Error -> reply({error, Error}, State) + end; + +handle_call({info, Items}, _From, State) when is_list(Items) -> + try + reply({ok, infos(Items, State)}, State) + catch Error -> reply({error, Error}, State) + end; + handle_call({deliver_immediately, Txn, Message}, _From, State) -> %% Synchronous, "immediate" delivery mode %% diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index bc588279b4..6589c384fa 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -89,6 +89,12 @@ Available commands: list_user_vhosts <UserName> list_vhost_users <VHostPath> + set_memory_threshold <Percentage> + get_memory_consumption + + list_queues + list_deepest_queues + <node> should be the name of the master node of the RabbitMQ cluster. It defaults to the node named \"rabbit\" on the local host. On a host named \"server.example.com\", the master node will usually be rabbit@server (unless @@ -179,7 +185,28 @@ action(list_user_vhosts, Node, Args = [_Username]) -> action(list_vhost_users, Node, Args = [_VHostPath]) -> io:format("Listing users for vhosts ~p...", Args), - display_list(call(Node, {rabbit_access_control, list_vhost_users, Args})). + display_list(call(Node, {rabbit_access_control, list_vhost_users, Args})); + +action(set_memory_threshold, Node, Args) -> + io:format("% Setting memory threshold to ~p ...~n", Args), + rpc_call(Node, rabbit_misc, set_memory_threshold, Args); + +action(get_memory_consumption, Node, []) -> + io:format("% Getting memory consumption ...~n", []), + Res = rpc_call(Node, rabbit_misc, get_memory_consumption, []), + io:format("~p~n", [Res]); + +action(list_queues, Node, []) -> + io:format("Listing all queues ..."), + Res = rpc_call(Node, rabbit_amqqueue, info_all, []), + io:format("~p~n", [Res]), + ok; + +action(list_deepest_queues, Node, Args = [Depth]) -> + io:format("Listing ~p deepest queues ...~n", Args), + Res = rpc_call(Node, rabbit_amqqueue, info_queue_sorted, [messages, list_to_integer(Depth)]), + io:format("~p~n", [Res]), + ok. display_list(L) when is_list(L) -> lists:foreach(fun (I) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 7638af582d..a807c6e919 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -43,6 +43,7 @@ -export([guid/0, string_guid/1, binstring_guid/1]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([append_file/2]). +-export([set_memory_threshold/1, get_memory_consumption/0]). -import(mnesia). -import(lists). @@ -99,6 +100,8 @@ -spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}). -spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}). +-spec(get_memory_consumption/0 :: () -> ({non_neg_integer(), non_neg_integer()})). +-spec(set_memory_threshold/1 :: (string()) -> 'ok' | {'error', any()}). -endif. %%---------------------------------------------------------------------------- @@ -369,3 +372,15 @@ append_file(File, _, Suffix) -> {ok, Data} -> file:write_file([File, Suffix], Data, [append]); Error -> Error end. + +set_memory_threshold(Level) -> + case Res = string:to_float(Level) of + {error, _} -> Res; + {_Percentage, [_]} -> {error, unexpected_suffix}; + {Percentage, _} -> memsup:set_sysmem_high_watermark(Percentage) + end. + +get_memory_consumption() -> + {Total, Allocated, _Worst} = memsup:get_memory_data(), + {Total, Allocated}. + |
