summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-02-04 03:50:20 +0000
committerMatthias Radestock <matthias@lshift.net>2010-02-04 03:50:20 +0000
commit48810590e37f91643874ce7e8fb368d002fc24c8 (patch)
treefdea6baa99518df109cf3d7f9d03b1d2ead6dd2d /src
parent82d3065e1303085f5bf590a4f21f561161ed2fd9 (diff)
parentb3711bfb0cf231d352ef96d9b95f01780d2731a2 (diff)
downloadrabbitmq-server-git-48810590e37f91643874ce7e8fb368d002fc24c8.tar.gz
merge bug21966 into bug22300
Diffstat (limited to 'src')
-rw-r--r--src/pg_local.erl10
-rw-r--r--src/rabbit_amqqueue.erl5
-rw-r--r--src/rabbit_amqqueue_process.erl7
-rw-r--r--src/rabbit_channel.erl38
-rw-r--r--src/rabbit_control.erl8
-rw-r--r--src/rabbit_exchange.erl5
-rw-r--r--src/rabbit_networking.erl12
-rw-r--r--src/rabbit_reader.erl5
-rw-r--r--src/rabbit_tests.erl49
9 files changed, 94 insertions, 45 deletions
diff --git a/src/pg_local.erl b/src/pg_local.erl
index 7f771f7414..fa41fe46b3 100644
--- a/src/pg_local.erl
+++ b/src/pg_local.erl
@@ -35,6 +35,7 @@
-module(pg_local).
-export([join/2, leave/2, get_members/1]).
+-export([sync/0]). %% intended for testing only; not part of official API
-export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2,
terminate/2]).
@@ -50,6 +51,8 @@
-spec(leave/2 :: (name(), pid()) -> 'ok').
-spec(get_members/1 :: (name()) -> [pid()]).
+-spec(sync/0 :: () -> 'ok').
+
-endif.
%%----------------------------------------------------------------------------
@@ -78,6 +81,10 @@ get_members(Name) ->
ensure_started(),
group_members(Name).
+sync() ->
+ ensure_started(),
+ gen_server:call(?MODULE, sync).
+
%%%
%%% Callback functions from gen_server
%%%
@@ -88,6 +95,9 @@ init([]) ->
pg_local_table = ets:new(pg_local_table, [ordered_set, protected, named_table]),
{ok, #state{}}.
+handle_call(sync, _From, S) ->
+ {reply, ok, S};
+
handle_call(Request, From, S) ->
error_logger:warning_msg("The pg_local server received an unexpected message:\n"
"handle_call(~p, ~p, _)\n",
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9e8e5d6cdb..db7461b09f 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -36,7 +36,7 @@
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
--export([list/1, info/1, info/2, info_all/1, info_all/2]).
+-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
-export([notify_sent/2, unblock/2]).
@@ -69,6 +69,7 @@
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
-spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A).
-spec(list/1 :: (vhost()) -> [amqqueue()]).
+-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (amqqueue()) -> [info()]).
-spec(info/2 :: (amqqueue(), [info_key()]) -> [info()]).
-spec(info_all/1 :: (vhost()) -> [[info()]]).
@@ -222,6 +223,8 @@ list(VHostPath) ->
rabbit_queue,
#amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}).
+info_keys() -> rabbit_amqqueue_process:info_keys().
+
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a3b0814cfa..06e68a1b16 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -39,7 +39,7 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
--export([start_link/1]).
+-export([start_link/1, info_keys/0]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
@@ -88,9 +88,10 @@
%%----------------------------------------------------------------------------
-start_link(Q) ->
- gen_server2:start_link(?MODULE, Q, []).
+start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
+info_keys() -> ?INFO_KEYS.
+
%%----------------------------------------------------------------------------
init(Q) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 2cea0e3741..0dbcf1158e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -37,10 +37,11 @@
-export([start_link/5, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2]).
--export([list/0, info/1, info/2, info_all/0, info_all/1,
+-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1,
consumers/1, consumers_all/0]).
--export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
+-export([init/1, terminate/2, code_change/3,
+ handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]).
-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
transaction_id, tx_participants, next_tag,
@@ -48,13 +49,15 @@
username, virtual_host,
most_recently_declared_queue, consumer_mapping}).
--define(HIBERNATE_AFTER, 1000).
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(INFO_KEYS,
[pid,
connection,
+ number,
user,
vhost,
transactional,
@@ -75,6 +78,7 @@
-spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
+-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (pid()) -> [info()]).
-spec(info/2 :: (pid(), [info_key()]) -> [info()]).
-spec(info_all/0 :: () -> [[info()]]).
@@ -113,6 +117,8 @@ conserve_memory(Pid, Conserve) ->
list() ->
pg_local:get_members(rabbit_channels).
+info_keys() -> ?INFO_KEYS.
+
info(Pid) ->
gen_server2:pcall(Pid, 9, info, infinity).
@@ -158,7 +164,9 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
username = Username,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- consumer_mapping = dict:new()}}.
+ consumer_mapping = dict:new()},
+ hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State);
@@ -223,11 +231,11 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
{stop, normal, State};
handle_info({'EXIT', _Pid, Reason}, State) ->
- {stop, Reason, State};
+ {stop, Reason, State}.
-handle_info(timeout, State) ->
+handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
- {noreply, State, hibernate}.
+ {hibernate, State}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -245,9 +253,9 @@ code_change(_OldVsn, State, _Extra) ->
%%---------------------------------------------------------------------------
-reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}.
+reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}.
-noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
+noreply(NewState) -> {noreply, NewState, hibernate}.
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -1018,12 +1026,12 @@ terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) ->
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
-i(pid, _) -> self();
-i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid;
-i(user, #ch{username = Username}) -> Username;
-i(vhost, #ch{virtual_host = VHost}) -> VHost;
-i(transactional, #ch{transaction_id = none}) -> false;
-i(transactional, #ch{transaction_id = _}) -> true;
+i(pid, _) -> self();
+i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid;
+i(number, #ch{channel = Channel}) -> Channel;
+i(user, #ch{username = Username}) -> Username;
+i(vhost, #ch{virtual_host = VHost}) -> VHost;
+i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 232b3c8f43..c8a8d599a5 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -193,10 +193,10 @@ frame_max, client_properties, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend]. The default is to display user, peer_address, peer_port
and state.
-<ChannelInfoItem> must be a member of the list [pid, connection, user,
-vhost, transactional, consumer_count, messages_unacknowledged,
-prefetch_count]. The default is to display pid, user, transactional,
-consumer_count, messages_unacknowledged.
+<ChannelInfoItem> must be a member of the list [pid, connection,
+number, user, vhost, transactional, consumer_count,
+messages_unacknowledged, prefetch_count]. The default is to display
+pid, user, transactional, consumer_count, messages_unacknowledged.
The output format for \"list_consumers\" is a list of rows containing
the channel process id, consumer tag and queue name, in that order.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index c72ff7f9f7..33bfe89399 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -35,7 +35,7 @@
-include("rabbit_framing.hrl").
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
- list/1, info/1, info/2, info_all/1, info_all/2,
+ list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
publish/2]).
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
@@ -68,6 +68,7 @@
-spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()).
-spec(lookup_or_die/1 :: (exchange_name()) -> exchange()).
-spec(list/1 :: (vhost()) -> [exchange()]).
+-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (exchange()) -> [info()]).
-spec(info/2 :: (exchange(), [info_key()]) -> [info()]).
-spec(info_all/1 :: (vhost()) -> [[info()]]).
@@ -165,6 +166,8 @@ list(VHostPath) ->
rabbit_exchange,
#exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
+info_keys() -> ?INFO_KEYS.
+
map(VHostPath, F) ->
%% TODO: there is scope for optimisation here, e.g. using a
%% cursor, parallelising the function invocation
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 84be7918e9..06e2b40e47 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -32,10 +32,11 @@
-module(rabbit_networking).
-export([boot/0, start/0, start_tcp_listener/2, start_ssl_listener/3,
- stop_tcp_listener/2, on_node_down/1, active_listeners/0,
- node_listeners/1, connections/0, connection_info/1,
- connection_info/2, connection_info_all/0,
- connection_info_all/1]).
+ stop_tcp_listener/2, on_node_down/1, active_listeners/0,
+ node_listeners/1, connections/0, connection_info_keys/0,
+ connection_info/1, connection_info/2,
+ connection_info_all/0, connection_info_all/1]).
+
%%used by TCP-based transports, e.g. STOMP adapter
-export([check_tcp_listener_address/3]).
@@ -70,6 +71,7 @@
-spec(active_listeners/0 :: () -> [listener()]).
-spec(node_listeners/1 :: (erlang_node()) -> [listener()]).
-spec(connections/0 :: () -> [connection()]).
+-spec(connection_info_keys/0 :: () -> [info_key()]).
-spec(connection_info/1 :: (connection()) -> [info()]).
-spec(connection_info/2 :: (connection(), [info_key()]) -> [info()]).
-spec(connection_info_all/0 :: () -> [[info()]]).
@@ -214,6 +216,8 @@ connections() ->
[Pid || {_, Pid, _, _} <- supervisor:which_children(
rabbit_tcp_client_sup)].
+connection_info_keys() -> rabbit_reader:info_keys().
+
connection_info(Pid) -> rabbit_reader:info(Pid).
connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 503e2fb4ae..f5bdb98593 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -33,7 +33,7 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/0, info/1, info/2]).
+-export([start_link/0, info_keys/0, info/1, info/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -129,6 +129,7 @@
-ifdef(use_specs).
+-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (pid()) -> [info()]).
-spec(info/2 :: (pid(), [info_key()]) -> [info()]).
@@ -155,6 +156,8 @@ system_terminate(Reason, _Parent, _Deb, _State) ->
system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
+info_keys() -> ?INFO_KEYS.
+
info(Pid) ->
gen_server:call(Pid, info, infinity).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 66bdd4cdc7..d4eb3adeda 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -49,6 +49,7 @@ test_content_prop_roundtrip(Datum, Binary) ->
all_tests() ->
passed = test_priority_queue(),
+ passed = test_pg_local(),
passed = test_unfold(),
passed = test_parsing(),
passed = test_topic_matching(),
@@ -183,6 +184,28 @@ test_simple_n_element_queue(N) ->
{true, false, N, ToListRes, Items} = test_priority_queue(Q),
passed.
+test_pg_local() ->
+ [P, Q] = [spawn(fun () -> receive X -> X end end) || _ <- [x, x]],
+ check_pg_local(ok, [], []),
+ check_pg_local(pg_local:join(a, P), [P], []),
+ check_pg_local(pg_local:join(b, P), [P], [P]),
+ check_pg_local(pg_local:join(a, P), [P, P], [P]),
+ check_pg_local(pg_local:join(a, Q), [P, P, Q], [P]),
+ check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q]),
+ check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q, Q]),
+ check_pg_local(pg_local:leave(a, P), [P, Q], [P, Q, Q]),
+ check_pg_local(pg_local:leave(b, P), [P, Q], [Q, Q]),
+ check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]),
+ check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]),
+ [X ! done || X <- [P, Q]],
+ check_pg_local(ok, [], []),
+ passed.
+
+check_pg_local(ok, APids, BPids) ->
+ ok = pg_local:sync(),
+ [true, true] = [lists:sort(Pids) == lists:sort(pg_local:get_members(Key)) ||
+ {Key, Pids} <- [{a, APids}, {b, BPids}]].
+
test_unfold() ->
{[], test} = rabbit_misc:unfold(fun (_V) -> false end, test),
List = lists:seq(2,20,2),
@@ -695,18 +718,10 @@ test_server_status() ->
false, false, []),
%% list queues
- ok = info_action(
- list_queues,
- [name, durable, auto_delete, arguments, pid,
- messages_ready, messages_unacknowledged, messages_uncommitted,
- messages, acks_uncommitted, consumers, transactions, memory],
- true),
+ ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true),
%% list exchanges
- ok = info_action(
- list_exchanges,
- [name, type, durable, auto_delete, arguments],
- true),
+ ok = info_action(list_exchanges, rabbit_exchange:info_keys(), true),
%% list bindings
ok = control_action(list_bindings, []),
@@ -721,14 +736,16 @@ test_server_status() ->
{ok, C} = gen_tcp:connect(H, P, []),
timer:sleep(100),
- ok = info_action(
- list_connections,
- [pid, address, port, peer_address, peer_port, state,
- channels, user, vhost, timeout, frame_max,
- recv_oct, recv_cnt, send_oct, send_cnt, send_pend],
- false),
+ ok = info_action(list_connections,
+ rabbit_networking:connection_info_keys(), false),
ok = gen_tcp:close(C),
+ %% list channels
+ Writer = spawn(fun () -> receive shutdown -> ok end end),
+ Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>),
+ ok = info_action(list_channels, rabbit_channel:info_keys(), false),
+ ok = rabbit_channel:shutdown(Ch),
+
passed.
test_hooks() ->