diff options
| -rw-r--r-- | src/rabbit_channel.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 1 | ||||
| -rw-r--r-- | test/channel_source_SUITE.erl | 123 |
4 files changed, 143 insertions, 3 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 036aa9a60c..dcb4befee6 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -63,6 +63,7 @@ -export([refresh_config_local/0, ready_for_close/1]). -export([refresh_interceptors/0]). -export([force_event_refresh/1]). +-export([source/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/4, @@ -448,6 +449,14 @@ force_event_refresh(Ref) -> list_queue_states(Pid) -> gen_server2:call(Pid, list_queue_states). +-spec source(pid(), any()) -> any(). + +source(Pid, Source) when is_pid(Pid) -> + case erlang:is_process_alive(Pid) of + true -> Pid ! {channel_source, Source}; + false -> {error, channel_terminated} + end. + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, @@ -805,7 +814,11 @@ handle_info(queue_cleanup, State = #ch{queue_states = QueueStates0}) -> QName = rabbit_quorum_queue:queue_name(QS), [] /= rabbit_amqqueue:lookup(QName) end, QueueStates0), - noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})). + noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})); + +handle_info({channel_source, Source}, State) -> + put(channel_source, Source), + noreply(State). handle_pre_hibernate(State) -> ok = clear_permission_cache(), @@ -984,7 +997,7 @@ check_topic_authorisation(Resource = #exchange{type = topic}, check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission); check_topic_authorisation(Resource = #exchange{type = topic}, User, ConnPid, RoutingKey, Permission) when is_pid(ConnPid) -> - AmqpParams = get_amqp_params(ConnPid), + AmqpParams = get_amqp_params(ConnPid, get(channel_source)), check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission); check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic}, User = #user{username = Username}, @@ -1007,7 +1020,8 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost check_topic_authorisation(_, _, _, _, _) -> ok. -get_amqp_params(ConnPid) when is_pid(ConnPid) -> +get_amqp_params(_ConnPid, rabbit_reader) -> []; +get_amqp_params(ConnPid, _Any) when is_pid(ConnPid) -> Timeout = get_operation_timeout(), get_amqp_params(ConnPid, rabbit_misc:is_process_alive(ConnPid), Timeout). @@ -2246,6 +2260,7 @@ i(garbage_collection, _State) -> i(reductions, _State) -> {reductions, Reductions} = erlang:process_info(self(), reductions), Reductions; +i(channel_source, _State = #ch{}) -> get(channel_source); i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 50e8f3d2b0..e43bfba90c 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -211,6 +211,7 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, rabbit_direct_client_sup, [{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector}]), + _ = rabbit_channel:source(ChannelPid, ?MODULE), {ok, ChannelPid}. -spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index c0cb9c57d5..6f0b0a5ea5 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -924,6 +924,7 @@ create_channel(Channel, rabbit_channel_sup_sup:start_channel( ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, Protocol, User, VHost, Capabilities, Collector}), + _ = rabbit_channel:source(ChPid, ?MODULE), MRef = erlang:monitor(process, ChPid), put({ch_pid, ChPid}, {Channel, MRef}), put({channel, Channel}, {ChPid, AState}), diff --git a/test/channel_source_SUITE.erl b/test/channel_source_SUITE.erl new file mode 100644 index 0000000000..11f87c7fde --- /dev/null +++ b/test/channel_source_SUITE.erl @@ -0,0 +1,123 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved. +%% + +-module(channel_source_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + network_channel_source_notifications, + direct_channel_source_notifications, + undefined_channel_source_notifications + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Testcase} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +network_channel_source_notifications(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, network_channel_source_notifications1, [Config]). + +network_channel_source_notifications1(Config) -> + ExistingChannels = rabbit_channel:list(), + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + {ok, _ClientCh} = amqp_connection:open_channel(Conn), + [ServerCh] = rabbit_channel:list() -- ExistingChannels, + [{channel_source, rabbit_reader}] = + rabbit_channel:info(ServerCh, [channel_source]), + rabbit_channel:source(ServerCh, ?MODULE), + [{channel_source, ?MODULE}] = + rabbit_channel:info(ServerCh, [channel_source]), + amqp_connection:close(Conn), + {error, channel_terminated} = rabbit_channel:source(ServerCh, ?MODULE), + passed. + +direct_channel_source_notifications(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, direct_channel_source_notifications1, [Config]). + +direct_channel_source_notifications1(Config) -> + ExistingChannels = rabbit_channel:list(), + Conn = rabbit_ct_client_helpers:open_unmanaged_connection_direct(Config), + {ok, _ClientCh} = amqp_connection:open_channel(Conn), + [ServerCh] = rabbit_channel:list() -- ExistingChannels, + [{channel_source, rabbit_direct}] = + rabbit_channel:info(ServerCh, [channel_source]), + rabbit_channel:source(ServerCh, ?MODULE), + [{channel_source, ?MODULE}] = + rabbit_channel:info(ServerCh, [channel_source]), + amqp_connection:close(Conn), + {error, channel_terminated} = rabbit_channel:source(ServerCh, ?MODULE), + passed. + +undefined_channel_source_notifications(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, undefined_channel_source_notifications1, [Config]). + +undefined_channel_source_notifications1(_Config) -> + ExistingChannels = rabbit_channel:list(), + {_Writer, _Limiter, ServerCh} = rabbit_ct_broker_helpers:test_channel(), + [ServerCh] = rabbit_channel:list() -- ExistingChannels, + [{channel_source, undefined}] = + rabbit_channel:info(ServerCh, [channel_source]), + rabbit_channel:source(ServerCh, ?MODULE), + [{channel_source, ?MODULE}] = + rabbit_channel:info(ServerCh, [channel_source]), + passed. |
