diff options
| author | Emile Joubert <emile@lshift.net> | 2008-11-14 00:03:39 +0000 |
|---|---|---|
| committer | Emile Joubert <emile@lshift.net> | 2008-11-14 00:03:39 +0000 |
| commit | 3e75d42a0d5d3df6f4a68d65af6c3adfe9379c10 (patch) | |
| tree | c0f00694130a0737df175e6b258d373003777b25 | |
| parent | baa5e4804a527a54ef4be26a1b86394c43245af4 (diff) | |
| parent | 6a613d1278ddb85272c2ba647ed822c1cad10af9 (diff) | |
| download | rabbitmq-server-git-3e75d42a0d5d3df6f4a68d65af6c3adfe9379c10.tar.gz | |
Merged bug18381 into bug19684
| -rw-r--r-- | include/rabbit.hrl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 38 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 42 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 1 |
6 files changed, 83 insertions, 67 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 706a92af7a..267e018b44 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -66,6 +66,8 @@ -type(node() :: atom()). -type(socket() :: port()). -type(thunk(T) :: fun(() -> T)). +-type(info_key() :: atom()). +-type(info() :: {info_key(), any()}). %% this is really an abstract type, but dialyzer does not support them -type(guid() :: any()). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 67b2ab9b7d..c93ae89a3b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -35,7 +35,6 @@ -export([notify_sent/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2]). -export([on_node_down/1]). --export([info_queue_sorted/1, info_queue_sorted/2]). -import(mnesia). -import(gen_server). @@ -56,8 +55,6 @@ -type(qfun(A) :: fun ((amqqueue()) -> A)). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). --type(info_key() :: atom()). --type(info() :: {info_key(), any()}). -spec(start/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). @@ -69,13 +66,9 @@ -spec(list/0 :: () -> [amqqueue()]). -spec(list_vhost_queues/1 :: (vhost()) -> [amqqueue()]). -spec(info/1 :: (amqqueue()) -> [info()]). --spec(info/2 :: - (amqqueue(), info_key()) -> info(); - (amqqueue(), [info_key()]) -> [info()]). --spec(info_all/0 :: () -> [{amqqueue(), [info()]}]). --spec(info_all/1 :: - (info_key()) -> [{amqqueue(), info()}]; - ([info_key()]) -> [{amqqueue(), [info()]}]). +-spec(info/2 :: (amqqueue(), [info_key()]) -> [info()]). +-spec(info_all/0 :: () -> [[info()]]). +-spec(info_all/1 :: ([info_key()]) -> [[info()]]). -spec(stat/1 :: (amqqueue()) -> qstats()). -spec(stat_all/0 :: () -> [qstats()]). -spec(delete/3 :: @@ -105,8 +98,6 @@ -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). --spec(info_queue_sorted/1 :: ([info_key()]) -> [{amqqueue(), [info()]}]). --spec(info_queue_sorted/2 :: (non_neg_integer(), [info_key()]) -> [{amqqueue(), [info()]}]). -endif. @@ -212,15 +203,15 @@ list_vhost_queues(VHostPath) -> info(#amqqueue{ pid = QPid }) -> gen_server:call(QPid, info). -info(#amqqueue{ pid = QPid }, ItemOrItems) -> - case gen_server:call(QPid, {info, ItemOrItems}) of +info(#amqqueue{ pid = QPid }, Items) -> + case gen_server:call(QPid, {info, Items}) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. -info_all() -> map(fun (Q) -> {Q, info(Q)} end). +info_all() -> map(fun (Q) -> info(Q) end). -info_all(ItemOrItems) -> map(fun (Q) -> {Q, info(Q, ItemOrItems)} end). +info_all(Items) -> map(fun (Q) -> info(Q, Items) end). stat(#amqqueue{pid = QPid}) -> gen_server:call(QPid, stat). @@ -338,18 +329,3 @@ safe_pmap_ok(H, F, L) -> [] -> ok; Errors -> {error, Errors} end. - -info_lookup({#amqqueue{}, InfoTupleList}, InfoKey) -> - case lists:keysearch(InfoKey, 1, InfoTupleList) of - false -> throw({bad_argument, InfoKey}); - {value, {InfoKey, InfoValue}} -> InfoValue - end. - -%% TODO: avoid sorting if Length is much less than the number of queues -info_queue_sorted(InfoItems = [SortItem | _]) -> - lists:sort(fun(A, B) -> info_lookup(A, SortItem) > info_lookup(B, SortItem) end, - rabbit_amqqueue:info_all(InfoItems)). - -info_queue_sorted(Length, InfoItems) -> - lists:sublist(info_queue_sorted(InfoItems), Length). - diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b733d1146d..a9addd5293 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -62,7 +62,12 @@ unsent_message_count}). -define(INFO_KEYS, - [messages_ready, + [name, + durable, + auto_delete, + arguments, + pid, + messages_ready, messages_unacknowledged, messages_uncommitted, messages, @@ -471,32 +476,37 @@ purge_message_buffer(QName, MessageBuffer) -> %% artifically ack them. persist_acks(none, QName, lists:append(Messages)). -infos(Items, State) -> [{Item, info(Item, State)} || Item <- Items]. +infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. -info(messages_ready, #q{message_buffer = MessageBuffer}) -> +i(name, #q{q = #amqqueue{name = Name}}) -> Name; +i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable; +i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete; +i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments; +i(pid, #q{q = #amqqueue{pid = Pid}}) -> Pid; +i(messages_ready, #q{message_buffer = MessageBuffer}) -> queue:len(MessageBuffer); -info(messages_unacknowledged, _) -> +i(messages_unacknowledged, _) -> lists:sum([dict:size(UAM) || #cr{unacked_messages = UAM} <- all_ch_record()]); -info(messages_uncommitted, _) -> +i(messages_uncommitted, _) -> lists:sum([length(Pending) || #tx{pending_messages = Pending} <- all_tx_record()]); -info(messages, State) -> - lists:sum([info(Item, State) || Item <- [messages_ready, +i(messages, State) -> + lists:sum([i(Item, State) || Item <- [messages_ready, messages_unacknowledged, messages_uncommitted]]); -info(acks_uncommitted, _) -> +i(acks_uncommitted, _) -> lists:sum([length(Pending) || #tx{pending_acks = Pending} <- all_tx_record()]); -info(consumers, _) -> +i(consumers, _) -> lists:sum([length(Consumers) || #cr{consumers = Consumers} <- all_ch_record()]); -info(transactions, _) -> +i(transactions, _) -> length(all_tx_record()); -info(memory, _) -> +i(memory, _) -> {memory, M} = process_info(self(), memory), M; -info(Item, _) -> +i(Item, _) -> throw({bad_argument, Item}). %--------------------------------------------------------------------------- @@ -504,13 +514,7 @@ info(Item, _) -> handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); -handle_call({info, Item}, _From, State) when is_atom(Item) -> - try - reply({ok, {Item, info(Item, State)}}, State) - catch Error -> reply({error, Error}, State) - end; - -handle_call({info, Items}, _From, State) when is_list(Items) -> +handle_call({info, Items}, _From, State) -> try reply({ok, infos(Items, State)}, State) catch Error -> reply({error, Error}, State) diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index d7c6d2548d..d6f5ad73fe 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -89,8 +89,7 @@ Available commands: list_user_vhosts <UserName> list_vhost_users <VHostPath> - list_queues [-l <ResultSize>] <InfoItem> [<InfoItem> ...] - + list_queues <InfoItem> [<InfoItem> ...] <node> should be the name of the master node of the RabbitMQ cluster. It defaults to the node named \"rabbit\" on the local host. On a host named @@ -98,10 +97,9 @@ defaults to the node named \"rabbit\" on the local host. On a host named NODENAME has been set to some non-default value at broker startup time). The output of hostname -s is usually the correct suffix to use after the \"@\" sign. -<InfoItem> must be a member of the list [messages_ready, -messages_unacknowledged, messages_uncommitted, messages, acks_uncommitted, -consumers, transactions, memory]. At most <ResultSize> queues will be listed, -in descending order of the first <InfoItem>. +<InfoItem> must be a member of the list [name, durable, auto_delete, arguments, +pid, messages_ready, messages_unacknowledged, messages_uncommitted, messages, +acks_uncommitted, consumers, transactions, memory]. "), halt(1). @@ -191,16 +189,21 @@ action(list_vhost_users, Node, Args = [_VHostPath]) -> action(list_queues, Node, Args) -> io:format("Listing queues ...~n"), - Res = - case Args of - ["-l", Length | InfoItems] -> - rpc_call(Node, rabbit_amqqueue, info_queue_sorted, - [list_to_integer(Length), [list_to_atom(X) || X <- InfoItems]]); - InfoItems -> - rpc_call(Node, rabbit_amqqueue, info_queue_sorted, - [[list_to_atom(X) || X <- InfoItems]]) + lists:map( + fun (ResultRow) -> + lists:map( + fun(ResultColumn) -> + case ResultColumn of + {name, #resource{virtual_host = VHostPath, kind = queue, name = Name}} -> + io:format("~s@~s ", [Name, VHostPath]); + {_, Res} -> + io:format("~w ", [Res]) + end + end, + ResultRow), + io:nl() end, - io:format("~n~p~n", [Res]), + rpc_call(Node, rabbit_amqqueue, info_all, [[list_to_atom(X) || X <- Args]])), ok. display_list(L) when is_list(L) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index a8c54438a2..cc73f80c9f 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -29,7 +29,8 @@ -include("rabbit_framing.hrl"). -export([recover/0, declare/5, lookup/1, lookup_or_die/1, - list_vhost_exchanges/1, + list/0, list_vhost_exchanges/1, + info/1, info/2, info_all/0, info_all/1, simple_publish/6, simple_publish/3, route/2]). -export([add_binding/4, delete_binding/4]). @@ -62,7 +63,12 @@ -spec(assert_type/2 :: (exchange(), atom()) -> 'ok'). -spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()). -spec(lookup_or_die/1 :: (exchange_name()) -> exchange()). +-spec(list/0 :: () -> [exchange()]). -spec(list_vhost_exchanges/1 :: (vhost()) -> [exchange()]). +-spec(info/1 :: (exchange()) -> [info()]). +-spec(info/2 :: (exchange(), [info_key()]) -> [info()]). +-spec(info_all/0 :: () -> [[info()]]). +-spec(info_all/1 :: ([info_key()]) -> [[info()]]). -spec(simple_publish/6 :: (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> publish_res()). @@ -87,6 +93,8 @@ %%---------------------------------------------------------------------------- +-define(INFO_KEYS, [name, type, durable, auto_delete, arguments]. + recover() -> rabbit_misc:execute_mnesia_transaction( fun () -> @@ -154,10 +162,34 @@ lookup_or_die(Name) -> not_found, "no ~s", [rabbit_misc:rs(Name)]) end. +list() -> rabbit_misc:dirty_read_all(exchange). + +map(F) -> + %% TODO: there is scope for optimisation here, e.g. using a + %% cursor, parallelising the function invocation + lists:map(F, list()). + list_vhost_exchanges(VHostPath) -> mnesia:dirty_match_object( #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). +infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. + +i(name, #exchange{name = Name}) -> Name; +i(type, #exchange{type = Type}) -> Type; +i(durable, #exchange{durable = Durable}) -> Durable; +i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete; +i(arguments, #exchange{arguments = Arguments}) -> Arguments; +i(Item, _) -> throw({bad_argument, Item}). + +info(X = #exchange{}) -> infos(?INFO_KEYS, X). + +info(X = #exchange{}, Items) -> infos(Items, X). + +info_all() -> map(fun (X) -> info(X) end). + +info_all(Items) -> map(fun (X) -> info(X, Items) end). + %% Usable by Erlang code that wants to publish messages. simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 2c134002ca..7638af582d 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -369,4 +369,3 @@ append_file(File, _, Suffix) -> {ok, Data} -> file:write_file([File, Suffix], Data, [append]); Error -> Error end. - |
