summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@lshift.net>2008-11-14 00:03:39 +0000
committerEmile Joubert <emile@lshift.net>2008-11-14 00:03:39 +0000
commit3e75d42a0d5d3df6f4a68d65af6c3adfe9379c10 (patch)
treec0f00694130a0737df175e6b258d373003777b25
parentbaa5e4804a527a54ef4be26a1b86394c43245af4 (diff)
parent6a613d1278ddb85272c2ba647ed822c1cad10af9 (diff)
downloadrabbitmq-server-git-3e75d42a0d5d3df6f4a68d65af6c3adfe9379c10.tar.gz
Merged bug18381 into bug19684
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_amqqueue.erl38
-rw-r--r--src/rabbit_amqqueue_process.erl42
-rw-r--r--src/rabbit_control.erl33
-rw-r--r--src/rabbit_exchange.erl34
-rw-r--r--src/rabbit_misc.erl1
6 files changed, 83 insertions, 67 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 706a92af7a..267e018b44 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -66,6 +66,8 @@
-type(node() :: atom()).
-type(socket() :: port()).
-type(thunk(T) :: fun(() -> T)).
+-type(info_key() :: atom()).
+-type(info() :: {info_key(), any()}).
%% this is really an abstract type, but dialyzer does not support them
-type(guid() :: any()).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 67b2ab9b7d..c93ae89a3b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -35,7 +35,6 @@
-export([notify_sent/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2]).
-export([on_node_down/1]).
--export([info_queue_sorted/1, info_queue_sorted/2]).
-import(mnesia).
-import(gen_server).
@@ -56,8 +55,6 @@
-type(qfun(A) :: fun ((amqqueue()) -> A)).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
--type(info_key() :: atom()).
--type(info() :: {info_key(), any()}).
-spec(start/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
@@ -69,13 +66,9 @@
-spec(list/0 :: () -> [amqqueue()]).
-spec(list_vhost_queues/1 :: (vhost()) -> [amqqueue()]).
-spec(info/1 :: (amqqueue()) -> [info()]).
--spec(info/2 ::
- (amqqueue(), info_key()) -> info();
- (amqqueue(), [info_key()]) -> [info()]).
--spec(info_all/0 :: () -> [{amqqueue(), [info()]}]).
--spec(info_all/1 ::
- (info_key()) -> [{amqqueue(), info()}];
- ([info_key()]) -> [{amqqueue(), [info()]}]).
+-spec(info/2 :: (amqqueue(), [info_key()]) -> [info()]).
+-spec(info_all/0 :: () -> [[info()]]).
+-spec(info_all/1 :: ([info_key()]) -> [[info()]]).
-spec(stat/1 :: (amqqueue()) -> qstats()).
-spec(stat_all/0 :: () -> [qstats()]).
-spec(delete/3 ::
@@ -105,8 +98,6 @@
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
--spec(info_queue_sorted/1 :: ([info_key()]) -> [{amqqueue(), [info()]}]).
--spec(info_queue_sorted/2 :: (non_neg_integer(), [info_key()]) -> [{amqqueue(), [info()]}]).
-endif.
@@ -212,15 +203,15 @@ list_vhost_queues(VHostPath) ->
info(#amqqueue{ pid = QPid }) ->
gen_server:call(QPid, info).
-info(#amqqueue{ pid = QPid }, ItemOrItems) ->
- case gen_server:call(QPid, {info, ItemOrItems}) of
+info(#amqqueue{ pid = QPid }, Items) ->
+ case gen_server:call(QPid, {info, Items}) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
-info_all() -> map(fun (Q) -> {Q, info(Q)} end).
+info_all() -> map(fun (Q) -> info(Q) end).
-info_all(ItemOrItems) -> map(fun (Q) -> {Q, info(Q, ItemOrItems)} end).
+info_all(Items) -> map(fun (Q) -> info(Q, Items) end).
stat(#amqqueue{pid = QPid}) -> gen_server:call(QPid, stat).
@@ -338,18 +329,3 @@ safe_pmap_ok(H, F, L) ->
[] -> ok;
Errors -> {error, Errors}
end.
-
-info_lookup({#amqqueue{}, InfoTupleList}, InfoKey) ->
- case lists:keysearch(InfoKey, 1, InfoTupleList) of
- false -> throw({bad_argument, InfoKey});
- {value, {InfoKey, InfoValue}} -> InfoValue
- end.
-
-%% TODO: avoid sorting if Length is much less than the number of queues
-info_queue_sorted(InfoItems = [SortItem | _]) ->
- lists:sort(fun(A, B) -> info_lookup(A, SortItem) > info_lookup(B, SortItem) end,
- rabbit_amqqueue:info_all(InfoItems)).
-
-info_queue_sorted(Length, InfoItems) ->
- lists:sublist(info_queue_sorted(InfoItems), Length).
-
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b733d1146d..a9addd5293 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -62,7 +62,12 @@
unsent_message_count}).
-define(INFO_KEYS,
- [messages_ready,
+ [name,
+ durable,
+ auto_delete,
+ arguments,
+ pid,
+ messages_ready,
messages_unacknowledged,
messages_uncommitted,
messages,
@@ -471,32 +476,37 @@ purge_message_buffer(QName, MessageBuffer) ->
%% artifically ack them.
persist_acks(none, QName, lists:append(Messages)).
-infos(Items, State) -> [{Item, info(Item, State)} || Item <- Items].
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
-info(messages_ready, #q{message_buffer = MessageBuffer}) ->
+i(name, #q{q = #amqqueue{name = Name}}) -> Name;
+i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable;
+i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete;
+i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments;
+i(pid, #q{q = #amqqueue{pid = Pid}}) -> Pid;
+i(messages_ready, #q{message_buffer = MessageBuffer}) ->
queue:len(MessageBuffer);
-info(messages_unacknowledged, _) ->
+i(messages_unacknowledged, _) ->
lists:sum([dict:size(UAM) ||
#cr{unacked_messages = UAM} <- all_ch_record()]);
-info(messages_uncommitted, _) ->
+i(messages_uncommitted, _) ->
lists:sum([length(Pending) ||
#tx{pending_messages = Pending} <- all_tx_record()]);
-info(messages, State) ->
- lists:sum([info(Item, State) || Item <- [messages_ready,
+i(messages, State) ->
+ lists:sum([i(Item, State) || Item <- [messages_ready,
messages_unacknowledged,
messages_uncommitted]]);
-info(acks_uncommitted, _) ->
+i(acks_uncommitted, _) ->
lists:sum([length(Pending) ||
#tx{pending_acks = Pending} <- all_tx_record()]);
-info(consumers, _) ->
+i(consumers, _) ->
lists:sum([length(Consumers) ||
#cr{consumers = Consumers} <- all_ch_record()]);
-info(transactions, _) ->
+i(transactions, _) ->
length(all_tx_record());
-info(memory, _) ->
+i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
-info(Item, _) ->
+i(Item, _) ->
throw({bad_argument, Item}).
%---------------------------------------------------------------------------
@@ -504,13 +514,7 @@ info(Item, _) ->
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State);
-handle_call({info, Item}, _From, State) when is_atom(Item) ->
- try
- reply({ok, {Item, info(Item, State)}}, State)
- catch Error -> reply({error, Error}, State)
- end;
-
-handle_call({info, Items}, _From, State) when is_list(Items) ->
+handle_call({info, Items}, _From, State) ->
try
reply({ok, infos(Items, State)}, State)
catch Error -> reply({error, Error}, State)
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index d7c6d2548d..d6f5ad73fe 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -89,8 +89,7 @@ Available commands:
list_user_vhosts <UserName>
list_vhost_users <VHostPath>
- list_queues [-l <ResultSize>] <InfoItem> [<InfoItem> ...]
-
+ list_queues <InfoItem> [<InfoItem> ...]
<node> should be the name of the master node of the RabbitMQ cluster. It
defaults to the node named \"rabbit\" on the local host. On a host named
@@ -98,10 +97,9 @@ defaults to the node named \"rabbit\" on the local host. On a host named
NODENAME has been set to some non-default value at broker startup time). The
output of hostname -s is usually the correct suffix to use after the \"@\" sign.
-<InfoItem> must be a member of the list [messages_ready,
-messages_unacknowledged, messages_uncommitted, messages, acks_uncommitted,
-consumers, transactions, memory]. At most <ResultSize> queues will be listed,
-in descending order of the first <InfoItem>.
+<InfoItem> must be a member of the list [name, durable, auto_delete, arguments,
+pid, messages_ready, messages_unacknowledged, messages_uncommitted, messages,
+acks_uncommitted, consumers, transactions, memory].
"),
halt(1).
@@ -191,16 +189,21 @@ action(list_vhost_users, Node, Args = [_VHostPath]) ->
action(list_queues, Node, Args) ->
io:format("Listing queues ...~n"),
- Res =
- case Args of
- ["-l", Length | InfoItems] ->
- rpc_call(Node, rabbit_amqqueue, info_queue_sorted,
- [list_to_integer(Length), [list_to_atom(X) || X <- InfoItems]]);
- InfoItems ->
- rpc_call(Node, rabbit_amqqueue, info_queue_sorted,
- [[list_to_atom(X) || X <- InfoItems]])
+ lists:map(
+ fun (ResultRow) ->
+ lists:map(
+ fun(ResultColumn) ->
+ case ResultColumn of
+ {name, #resource{virtual_host = VHostPath, kind = queue, name = Name}} ->
+ io:format("~s@~s ", [Name, VHostPath]);
+ {_, Res} ->
+ io:format("~w ", [Res])
+ end
+ end,
+ ResultRow),
+ io:nl()
end,
- io:format("~n~p~n", [Res]),
+ rpc_call(Node, rabbit_amqqueue, info_all, [[list_to_atom(X) || X <- Args]])),
ok.
display_list(L) when is_list(L) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index a8c54438a2..cc73f80c9f 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -29,7 +29,8 @@
-include("rabbit_framing.hrl").
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
- list_vhost_exchanges/1,
+ list/0, list_vhost_exchanges/1,
+ info/1, info/2, info_all/0, info_all/1,
simple_publish/6, simple_publish/3,
route/2]).
-export([add_binding/4, delete_binding/4]).
@@ -62,7 +63,12 @@
-spec(assert_type/2 :: (exchange(), atom()) -> 'ok').
-spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()).
-spec(lookup_or_die/1 :: (exchange_name()) -> exchange()).
+-spec(list/0 :: () -> [exchange()]).
-spec(list_vhost_exchanges/1 :: (vhost()) -> [exchange()]).
+-spec(info/1 :: (exchange()) -> [info()]).
+-spec(info/2 :: (exchange(), [info_key()]) -> [info()]).
+-spec(info_all/0 :: () -> [[info()]]).
+-spec(info_all/1 :: ([info_key()]) -> [[info()]]).
-spec(simple_publish/6 ::
(bool(), bool(), exchange_name(), routing_key(), binary(), binary()) ->
publish_res()).
@@ -87,6 +93,8 @@
%%----------------------------------------------------------------------------
+-define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
+
recover() ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
@@ -154,10 +162,34 @@ lookup_or_die(Name) ->
not_found, "no ~s", [rabbit_misc:rs(Name)])
end.
+list() -> rabbit_misc:dirty_read_all(exchange).
+
+map(F) ->
+ %% TODO: there is scope for optimisation here, e.g. using a
+ %% cursor, parallelising the function invocation
+ lists:map(F, list()).
+
list_vhost_exchanges(VHostPath) ->
mnesia:dirty_match_object(
#exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
+infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
+
+i(name, #exchange{name = Name}) -> Name;
+i(type, #exchange{type = Type}) -> Type;
+i(durable, #exchange{durable = Durable}) -> Durable;
+i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
+i(arguments, #exchange{arguments = Arguments}) -> Arguments;
+i(Item, _) -> throw({bad_argument, Item}).
+
+info(X = #exchange{}) -> infos(?INFO_KEYS, X).
+
+info(X = #exchange{}, Items) -> infos(Items, X).
+
+info_all() -> map(fun (X) -> info(X) end).
+
+info_all(Items) -> map(fun (X) -> info(X, Items) end).
+
%% Usable by Erlang code that wants to publish messages.
simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin,
ContentTypeBin, BodyBin) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 2c134002ca..7638af582d 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -369,4 +369,3 @@ append_file(File, _, Suffix) ->
{ok, Data} -> file:write_file([File, Suffix], Data, [append]);
Error -> Error
end.
-