diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-02-04 03:50:20 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-02-04 03:50:20 +0000 |
| commit | 48810590e37f91643874ce7e8fb368d002fc24c8 (patch) | |
| tree | fdea6baa99518df109cf3d7f9d03b1d2ead6dd2d | |
| parent | 82d3065e1303085f5bf590a4f21f561161ed2fd9 (diff) | |
| parent | b3711bfb0cf231d352ef96d9b95f01780d2731a2 (diff) | |
| download | rabbitmq-server-git-48810590e37f91643874ce7e8fb368d002fc24c8.tar.gz | |
merge bug21966 into bug22300
| -rw-r--r-- | docs/rabbitmqctl.1.pod | 5 | ||||
| -rw-r--r-- | src/pg_local.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 38 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 49 |
10 files changed, 99 insertions, 45 deletions
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index 0ebe17dcb0..3f1ec3b4e0 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -396,6 +396,11 @@ id of the Erlang process associated with the channel id of the Erlang process associated with the connection to which the channel belongs +=item number + +the number of the channel, which uniquely identifies it within a +connection + =item user username associated with the channel diff --git a/src/pg_local.erl b/src/pg_local.erl index 7f771f7414..fa41fe46b3 100644 --- a/src/pg_local.erl +++ b/src/pg_local.erl @@ -35,6 +35,7 @@ -module(pg_local). -export([join/2, leave/2, get_members/1]). +-export([sync/0]). %% intended for testing only; not part of official API -export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2, terminate/2]). @@ -50,6 +51,8 @@ -spec(leave/2 :: (name(), pid()) -> 'ok'). -spec(get_members/1 :: (name()) -> [pid()]). +-spec(sync/0 :: () -> 'ok'). + -endif. %%---------------------------------------------------------------------------- @@ -78,6 +81,10 @@ get_members(Name) -> ensure_started(), group_members(Name). +sync() -> + ensure_started(), + gen_server:call(?MODULE, sync). + %%% %%% Callback functions from gen_server %%% @@ -88,6 +95,9 @@ init([]) -> pg_local_table = ets:new(pg_local_table, [ordered_set, protected, named_table]), {ok, #state{}}. +handle_call(sync, _From, S) -> + {reply, ok, S}; + handle_call(Request, From, S) -> error_logger:warning_msg("The pg_local server received an unexpected message:\n" "handle_call(~p, ~p, _)\n", diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9e8e5d6cdb..db7461b09f 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -36,7 +36,7 @@ -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]). --export([list/1, info/1, info/2, info_all/1, info_all/2]). +-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2, unblock/2]). @@ -69,6 +69,7 @@ -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). -spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A). -spec(list/1 :: (vhost()) -> [amqqueue()]). +-spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (amqqueue()) -> [info()]). -spec(info/2 :: (amqqueue(), [info_key()]) -> [info()]). -spec(info_all/1 :: (vhost()) -> [[info()]]). @@ -222,6 +223,8 @@ list(VHostPath) -> rabbit_queue, #amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}). +info_keys() -> rabbit_amqqueue_process:info_keys(). + map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a3b0814cfa..06e68a1b16 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -39,7 +39,7 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). --export([start_link/1]). +-export([start_link/1, info_keys/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). @@ -88,9 +88,10 @@ %%---------------------------------------------------------------------------- -start_link(Q) -> - gen_server2:start_link(?MODULE, Q, []). +start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). +info_keys() -> ?INFO_KEYS. + %%---------------------------------------------------------------------------- init(Q) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 2cea0e3741..0dbcf1158e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -37,10 +37,11 @@ -export([start_link/5, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2]). --export([list/0, info/1, info/2, info_all/0, info_all/1, +-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1, consumers/1, consumers_all/0]). --export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). +-export([init/1, terminate/2, code_change/3, + handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). -record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, @@ -48,13 +49,15 @@ username, virtual_host, most_recently_declared_queue, consumer_mapping}). --define(HIBERNATE_AFTER, 1000). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). -define(MAX_PERMISSION_CACHE_SIZE, 12). -define(INFO_KEYS, [pid, connection, + number, user, vhost, transactional, @@ -75,6 +78,7 @@ -spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(list/0 :: () -> [pid()]). +-spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (pid()) -> [info()]). -spec(info/2 :: (pid(), [info_key()]) -> [info()]). -spec(info_all/0 :: () -> [[info()]]). @@ -113,6 +117,8 @@ conserve_memory(Pid, Conserve) -> list() -> pg_local:get_members(rabbit_channels). +info_keys() -> ?INFO_KEYS. + info(Pid) -> gen_server2:pcall(Pid, 9, info, infinity). @@ -158,7 +164,9 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) -> username = Username, virtual_host = VHost, most_recently_declared_queue = <<>>, - consumer_mapping = dict:new()}}. + consumer_mapping = dict:new()}, + hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); @@ -223,11 +231,11 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, {stop, normal, State}; handle_info({'EXIT', _Pid, Reason}, State) -> - {stop, Reason, State}; + {stop, Reason, State}. -handle_info(timeout, State) -> +handle_pre_hibernate(State) -> ok = clear_permission_cache(), - {noreply, State, hibernate}. + {hibernate, State}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -245,9 +253,9 @@ code_change(_OldVsn, State, _Extra) -> %%--------------------------------------------------------------------------- -reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}. +reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}. -noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. +noreply(NewState) -> {noreply, NewState, hibernate}. return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -1018,12 +1026,12 @@ terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. -i(pid, _) -> self(); -i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid; -i(user, #ch{username = Username}) -> Username; -i(vhost, #ch{virtual_host = VHost}) -> VHost; -i(transactional, #ch{transaction_id = none}) -> false; -i(transactional, #ch{transaction_id = _}) -> true; +i(pid, _) -> self(); +i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid; +i(number, #ch{channel = Channel}) -> Channel; +i(user, #ch{username = Username}) -> Username; +i(vhost, #ch{virtual_host = VHost}) -> VHost; +i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 232b3c8f43..c8a8d599a5 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -193,10 +193,10 @@ frame_max, client_properties, recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display user, peer_address, peer_port and state. -<ChannelInfoItem> must be a member of the list [pid, connection, user, -vhost, transactional, consumer_count, messages_unacknowledged, -prefetch_count]. The default is to display pid, user, transactional, -consumer_count, messages_unacknowledged. +<ChannelInfoItem> must be a member of the list [pid, connection, +number, user, vhost, transactional, consumer_count, +messages_unacknowledged, prefetch_count]. The default is to display +pid, user, transactional, consumer_count, messages_unacknowledged. The output format for \"list_consumers\" is a list of rows containing the channel process id, consumer tag and queue name, in that order. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index c72ff7f9f7..33bfe89399 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -35,7 +35,7 @@ -include("rabbit_framing.hrl"). -export([recover/0, declare/5, lookup/1, lookup_or_die/1, - list/1, info/1, info/2, info_all/1, info_all/2, + list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, publish/2]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). @@ -68,6 +68,7 @@ -spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()). -spec(lookup_or_die/1 :: (exchange_name()) -> exchange()). -spec(list/1 :: (vhost()) -> [exchange()]). +-spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (exchange()) -> [info()]). -spec(info/2 :: (exchange(), [info_key()]) -> [info()]). -spec(info_all/1 :: (vhost()) -> [[info()]]). @@ -165,6 +166,8 @@ list(VHostPath) -> rabbit_exchange, #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). +info_keys() -> ?INFO_KEYS. + map(VHostPath, F) -> %% TODO: there is scope for optimisation here, e.g. using a %% cursor, parallelising the function invocation diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 84be7918e9..06e2b40e47 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -32,10 +32,11 @@ -module(rabbit_networking). -export([boot/0, start/0, start_tcp_listener/2, start_ssl_listener/3, - stop_tcp_listener/2, on_node_down/1, active_listeners/0, - node_listeners/1, connections/0, connection_info/1, - connection_info/2, connection_info_all/0, - connection_info_all/1]). + stop_tcp_listener/2, on_node_down/1, active_listeners/0, + node_listeners/1, connections/0, connection_info_keys/0, + connection_info/1, connection_info/2, + connection_info_all/0, connection_info_all/1]). + %%used by TCP-based transports, e.g. STOMP adapter -export([check_tcp_listener_address/3]). @@ -70,6 +71,7 @@ -spec(active_listeners/0 :: () -> [listener()]). -spec(node_listeners/1 :: (erlang_node()) -> [listener()]). -spec(connections/0 :: () -> [connection()]). +-spec(connection_info_keys/0 :: () -> [info_key()]). -spec(connection_info/1 :: (connection()) -> [info()]). -spec(connection_info/2 :: (connection(), [info_key()]) -> [info()]). -spec(connection_info_all/0 :: () -> [[info()]]). @@ -214,6 +216,8 @@ connections() -> [Pid || {_, Pid, _, _} <- supervisor:which_children( rabbit_tcp_client_sup)]. +connection_info_keys() -> rabbit_reader:info_keys(). + connection_info(Pid) -> rabbit_reader:info(Pid). connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 503e2fb4ae..f5bdb98593 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -33,7 +33,7 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/0, info/1, info/2]). +-export([start_link/0, info_keys/0, info/1, info/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -129,6 +129,7 @@ -ifdef(use_specs). +-spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (pid()) -> [info()]). -spec(info/2 :: (pid(), [info_key()]) -> [info()]). @@ -155,6 +156,8 @@ system_terminate(Reason, _Parent, _Deb, _State) -> system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. +info_keys() -> ?INFO_KEYS. + info(Pid) -> gen_server:call(Pid, info, infinity). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 66bdd4cdc7..d4eb3adeda 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -49,6 +49,7 @@ test_content_prop_roundtrip(Datum, Binary) -> all_tests() -> passed = test_priority_queue(), + passed = test_pg_local(), passed = test_unfold(), passed = test_parsing(), passed = test_topic_matching(), @@ -183,6 +184,28 @@ test_simple_n_element_queue(N) -> {true, false, N, ToListRes, Items} = test_priority_queue(Q), passed. +test_pg_local() -> + [P, Q] = [spawn(fun () -> receive X -> X end end) || _ <- [x, x]], + check_pg_local(ok, [], []), + check_pg_local(pg_local:join(a, P), [P], []), + check_pg_local(pg_local:join(b, P), [P], [P]), + check_pg_local(pg_local:join(a, P), [P, P], [P]), + check_pg_local(pg_local:join(a, Q), [P, P, Q], [P]), + check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q]), + check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q, Q]), + check_pg_local(pg_local:leave(a, P), [P, Q], [P, Q, Q]), + check_pg_local(pg_local:leave(b, P), [P, Q], [Q, Q]), + check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]), + check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]), + [X ! done || X <- [P, Q]], + check_pg_local(ok, [], []), + passed. + +check_pg_local(ok, APids, BPids) -> + ok = pg_local:sync(), + [true, true] = [lists:sort(Pids) == lists:sort(pg_local:get_members(Key)) || + {Key, Pids} <- [{a, APids}, {b, BPids}]]. + test_unfold() -> {[], test} = rabbit_misc:unfold(fun (_V) -> false end, test), List = lists:seq(2,20,2), @@ -695,18 +718,10 @@ test_server_status() -> false, false, []), %% list queues - ok = info_action( - list_queues, - [name, durable, auto_delete, arguments, pid, - messages_ready, messages_unacknowledged, messages_uncommitted, - messages, acks_uncommitted, consumers, transactions, memory], - true), + ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true), %% list exchanges - ok = info_action( - list_exchanges, - [name, type, durable, auto_delete, arguments], - true), + ok = info_action(list_exchanges, rabbit_exchange:info_keys(), true), %% list bindings ok = control_action(list_bindings, []), @@ -721,14 +736,16 @@ test_server_status() -> {ok, C} = gen_tcp:connect(H, P, []), timer:sleep(100), - ok = info_action( - list_connections, - [pid, address, port, peer_address, peer_port, state, - channels, user, vhost, timeout, frame_max, - recv_oct, recv_cnt, send_oct, send_cnt, send_pend], - false), + ok = info_action(list_connections, + rabbit_networking:connection_info_keys(), false), ok = gen_tcp:close(C), + %% list channels + Writer = spawn(fun () -> receive shutdown -> ok end end), + Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>), + ok = info_action(list_channels, rabbit_channel:info_keys(), false), + ok = rabbit_channel:shutdown(Ch), + passed. test_hooks() -> |
