diff options
| -rw-r--r-- | docs/rabbitmqctl.1.xml | 4 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_channel_sup.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 4 |
6 files changed, 43 insertions, 33 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 3ca276ced1..b531d46218 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1191,6 +1191,10 @@ to which the channel belongs.</para></listitem> </varlistentry> <varlistentry> + <term>name</term> + <listitem><para>Readable name for the channel.</para></listitem> + </varlistentry> + <varlistentry> <term>number</term> <listitem><para>The number of the channel, which uniquely identifies it within a connection.</para></listitem> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a101886f26..a54494b59d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -20,7 +20,7 @@ -behaviour(gen_server2). --export([start_link/10, do/2, do/3, do_flow/3, flush/1, shutdown/1]). +-export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). @@ -33,7 +33,7 @@ -export([list_local/0]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, - limiter, tx_status, next_tag, unacked_message_q, + conn_name, limiter, tx_status, next_tag, unacked_message_q, uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, virtual_host, most_recently_declared_queue, queue_monitors, consumer_mapping, blocking, queue_consumers, queue_collector_pid, @@ -56,6 +56,7 @@ -define(CREATION_EVENT_KEYS, [pid, + name, connection, number, user, @@ -71,9 +72,10 @@ -type(channel_number() :: non_neg_integer()). --spec(start_link/10 :: - (channel_number(), pid(), pid(), pid(), rabbit_types:protocol(), - rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), +-spec(start_link/11 :: + (channel_number(), pid(), pid(), pid(), string(), + rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), + rabbit_framing:amqp_table(), pid(), rabbit_limiter:token()) -> rabbit_types:ok_pid_or_error()). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), @@ -103,11 +105,11 @@ %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, - Capabilities, CollectorPid, Limiter) -> +start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, + VHost, Capabilities, CollectorPid, Limiter) -> gen_server2:start_link( - ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, - VHost, Capabilities, CollectorPid, Limiter], []). + ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, + User, VHost, Capabilities, CollectorPid, Limiter], []). do(Pid, Method) -> do(Pid, Method, none). @@ -175,7 +177,7 @@ force_event_refresh() -> %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, +init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, CollectorPid, Limiter]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), @@ -185,6 +187,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, reader_pid = ReaderPid, writer_pid = WriterPid, conn_pid = ConnPid, + conn_name = ConnName, limiter = Limiter, tx_status = none, next_tag = 1, @@ -1509,6 +1512,9 @@ i(prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_limit(Limiter); i(client_flow_blocked, #ch{limiter = Limiter}) -> rabbit_limiter:is_blocked(Limiter); +i(name, #ch{conn_name = ConnName, + channel = Channel}) -> + list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])); i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index dc262b4948..bcb838513b 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -32,10 +32,10 @@ -type(start_link_args() :: {'tcp', rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), pid(), rabbit_types:protocol(), rabbit_types:user(), - rabbit_types:vhost(), rabbit_framing:amqp_table(), + non_neg_integer(), pid(), string(), rabbit_types:protocol(), + rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()} | - {'direct', rabbit_channel:channel_number(), pid(), + {'direct', rabbit_channel:channel_number(), pid(), string(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()}). @@ -45,8 +45,8 @@ %%---------------------------------------------------------------------------- -start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, User, VHost, - Capabilities, Collector}) -> +start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User, + VHost, Capabilities, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, {tcp, Sock, Channel, FrameMax, ReaderPid, Protocol}), @@ -56,14 +56,14 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, User, VHost, supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, - [Channel, ReaderPid, WriterPid, ReaderPid, Protocol, - User, VHost, Capabilities, Collector, + [Channel, ReaderPid, WriterPid, ReaderPid, ConnName, + Protocol, User, VHost, Capabilities, Collector, rabbit_limiter:make_token(LimiterPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), {ok, SupPid, {ChannelPid, AState}}; -start_link({direct, Channel, ClientChannelPid, ConnPid, Protocol, User, VHost, - Capabilities, Collector}) -> +start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol, + User, VHost, Capabilities, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, direct), [LimiterPid] = supervisor2:find_child(SupPid, limiter), {ok, ChannelPid} = @@ -71,7 +71,7 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, Protocol, User, VHost, SupPid, {channel, {rabbit_channel, start_link, [Channel, ClientChannelPid, ClientChannelPid, ConnPid, - Protocol, User, VHost, Capabilities, Collector, + ConnName, Protocol, User, VHost, Capabilities, Collector, rabbit_limiter:make_token(LimiterPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, SupPid, {ChannelPid, none}}. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index e2928cae7e..a471d2826e 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -17,7 +17,7 @@ -module(rabbit_direct). -export([boot/0, force_event_refresh/0, list/0, connect/5, - start_channel/8, disconnect/2]). + start_channel/9, disconnect/2]). %% Internal -export([list_local/0]). @@ -36,10 +36,10 @@ rabbit_event:event_props()) -> {'ok', {rabbit_types:user(), rabbit_framing:amqp_table()}}). --spec(start_channel/8 :: - (rabbit_channel:channel_number(), pid(), pid(), rabbit_types:protocol(), - rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), - pid()) -> {'ok', pid()}). +-spec(start_channel/9 :: + (rabbit_channel:channel_number(), pid(), pid(), string(), + rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), + rabbit_framing:amqp_table(), pid()) -> {'ok', pid()}). -spec(disconnect/2 :: (pid(), rabbit_event:event_props()) -> 'ok'). @@ -92,13 +92,13 @@ connect(Username, VHost, Protocol, Pid, Infos) -> {error, broker_not_found_on_node} end. -start_channel(Number, ClientChannelPid, ConnPid, Protocol, User, VHost, - Capabilities, Collector) -> +start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, + VHost, Capabilities, Collector) -> {ok, _, {ChannelPid, _}} = supervisor2:start_child( rabbit_direct_client_sup, - [{direct, Number, ClientChannelPid, ConnPid, Protocol, User, VHost, - Capabilities, Collector}]), + [{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol, + User, VHost, Capabilities, Collector}]), {ok, ChannelPid}. disconnect(Pid, Infos) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index aa962ae3c2..add1304316 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -907,8 +907,8 @@ create_channel(Channel, State) -> capabilities = Capabilities}} = State, {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( - ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User, - VHost, Capabilities, Collector}), + ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock), + Protocol, User, VHost, Capabilities, Collector}), MRef = erlang:monitor(process, ChPid), put({ch_pid, ChPid}, {Channel, MRef}), put({channel, Channel}, {ChPid, AState}), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 433ed9cb59..29e0428de2 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1079,7 +1079,7 @@ test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), {ok, Ch} = rabbit_channel:start_link( - 1, self(), Writer, self(), rabbit_framing_amqp_0_9_1, + 1, self(), Writer, self(), "", rabbit_framing_amqp_0_9_1, user(<<"user">>), <<"/">>, [], self(), rabbit_limiter:make_token(self())), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], @@ -1150,7 +1150,7 @@ test_spawn() -> Me = self(), Writer = spawn(fun () -> test_writer(Me) end), {ok, Ch} = rabbit_channel:start_link( - 1, Me, Writer, Me, rabbit_framing_amqp_0_9_1, + 1, Me, Writer, Me, "", rabbit_framing_amqp_0_9_1, user(<<"guest">>), <<"/">>, [], Me, rabbit_limiter:make_token(self())), ok = rabbit_channel:do(Ch, #'channel.open'{}), |
