diff options
| author | Michael Klishin <michael@novemberain.com> | 2019-02-19 15:05:20 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-02-19 15:05:20 +0300 |
| commit | c5df441e8130d33683082e165b3ff993f1580ee0 (patch) | |
| tree | 872556de9386ab4148291663cd303ce3a0032ade | |
| parent | d680193af3d43b04e15ce66abe1ec895f2562c77 (diff) | |
| parent | f2a01fda8a2079939604a7201659039ad2e501a0 (diff) | |
| download | rabbitmq-server-git-c5df441e8130d33683082e165b3ff993f1580ee0.tar.gz | |
Merge pull request #1886 from Ayanda-D/channel-source
Avoid synchronous channel request to connection process
| -rw-r--r-- | src/rabbit_channel.erl | 121 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 1 | ||||
| -rw-r--r-- | test/channel_source_SUITE.erl | 142 |
4 files changed, 217 insertions, 48 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 036aa9a60c..8f5f159f88 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -63,6 +63,7 @@ -export([refresh_config_local/0, ready_for_close/1]). -export([refresh_interceptors/0]). -export([force_event_refresh/1]). +-export([source/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/4, @@ -78,7 +79,7 @@ -export([list_queue_states/1, get_max_message_size/0]). %% Mgmt HTTP API refactor --export([handle_method/5]). +-export([handle_method/6]). -record(ch, { %% starting | running | flow | closing @@ -97,6 +98,9 @@ %% same as reader's name, see #v1.name %% in rabbit_reader conn_name, + %% channel's originating source e.g. rabbit_reader | rabbit_direct | undefined + %% or any other channel creating/spawning entity + source, %% limiter pid, see rabbit_limiter limiter, %% none | {Msgs, Acks} | committing | failed | @@ -448,6 +452,14 @@ force_event_refresh(Ref) -> list_queue_states(Pid) -> gen_server2:call(Pid, list_queue_states). +-spec source(pid(), any()) -> any(). + +source(Pid, Source) when is_pid(Pid) -> + case erlang:is_process_alive(Pid) of + true -> Pid ! {channel_source, Source}; + false -> {error, channel_terminated} + end. + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, @@ -805,7 +817,10 @@ handle_info(queue_cleanup, State = #ch{queue_states = QueueStates0}) -> QName = rabbit_quorum_queue:queue_name(QS), [] /= rabbit_amqqueue:lookup(QName) end, QueueStates0), - noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})). + noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})); + +handle_info({channel_source, Source}, State = #ch{}) -> + noreply(State#ch{source = Source}). handle_pre_hibernate(State) -> ok = clear_permission_cache(), @@ -938,11 +953,11 @@ check_write_permitted(Resource, User) -> check_read_permitted(Resource, User) -> check_resource_access(User, Resource, read). -check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey) -> - check_topic_authorisation(Resource, User, ConnPid, RoutingKey, write). +check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) -> + check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, write). -check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey) -> - check_topic_authorisation(Resource, User, ConnPid, RoutingKey, read). +check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) -> + check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, read). check_user_id_header(#'P_basic'{user_id = undefined}, _) -> ok; @@ -978,14 +993,17 @@ check_internal_exchange(_) -> ok. check_topic_authorisation(Resource = #exchange{type = topic}, - User, none, RoutingKey, Permission) -> + User, none, RoutingKey, _ChSrc, Permission) -> %% Called from outside the channel by mgmt API AmqpParams = [], check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission); check_topic_authorisation(Resource = #exchange{type = topic}, - User, ConnPid, RoutingKey, Permission) when is_pid(ConnPid) -> - AmqpParams = get_amqp_params(ConnPid), + User, ConnPid, RoutingKey, ChSrc, Permission) when is_pid(ConnPid) -> + AmqpParams = get_amqp_params(ConnPid, ChSrc), check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission); +check_topic_authorisation(_, _, _, _, _, _) -> + ok. + check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic}, User = #user{username = Username}, AmqpParams, RoutingKey, Permission) -> @@ -1003,11 +1021,10 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost User, Resource, Permission, Context), CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1), put(topic_permission_cache, [{Resource, Context, Permission} | CacheTail]) - end; -check_topic_authorisation(_, _, _, _, _) -> - ok. + end. -get_amqp_params(ConnPid) when is_pid(ConnPid) -> +get_amqp_params(_ConnPid, rabbit_reader) -> []; +get_amqp_params(ConnPid, _Any) when is_pid(ConnPid) -> Timeout = get_operation_timeout(), get_amqp_params(ConnPid, rabbit_misc:is_process_alive(ConnPid), Timeout). @@ -1227,13 +1244,14 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, conn_name = ConnName, delivery_flow = Flow, conn_pid = ConnPid, + source = ChSrc, max_message_size = MaxMessageSize}) -> check_msg_size(Content, MaxMessageSize), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, User), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), check_internal_exchange(Exchange), - check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey), + check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = #content {properties = Props} = @@ -1516,76 +1534,85 @@ handle_method(#'exchange.declare'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, user = User, queue_collector_pid = CollectorPid, - conn_pid = ConnPid}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + conn_pid = ConnPid, + source = ChSrc}) -> + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.delete'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, virtual_host = VHostPath, queue_collector_pid = CollectorPid, user = User}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.delete_ok'{}); handle_method(#'exchange.bind'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid, + source = ChSrc, queue_collector_pid = CollectorPid, user = User}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.bind_ok'{}); handle_method(#'exchange.unbind'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid, + source = ChSrc, queue_collector_pid = CollectorPid, user = User}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.unbind_ok'{}); handle_method(#'queue.declare'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid, + source = ChSrc, queue_collector_pid = CollectorPid, user = User}) -> {ok, QueueName, MessageCount, ConsumerCount} = - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); handle_method(#'queue.delete'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, virtual_host = VHostPath, queue_collector_pid = CollectorPid, user = User}) -> {ok, PurgedMessageCount} = - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'queue.delete_ok'{message_count = PurgedMessageCount}); handle_method(#'queue.bind'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, user = User, queue_collector_pid = CollectorPid, virtual_host = VHostPath}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'queue.bind_ok'{}); handle_method(#'queue.unbind'{} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, user = User, queue_collector_pid = CollectorPid, virtual_host = VHostPath}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, false, #'queue.unbind_ok'{}); handle_method(#'queue.purge'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, user = User, queue_collector_pid = CollectorPid, virtual_host = VHostPath}) -> - case handle_method(Method, ConnPid, CollectorPid, + case handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User) of {ok, PurgedMessageCount} -> return_ok(State, NoWait, @@ -1811,7 +1838,7 @@ handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QRef, DQ)}. binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, - RoutingKey, Arguments, VHostPath, ConnPid, + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, #user{username = Username} = User) -> ExchangeNameBin = strip_cr_lf(SourceNameBin0), DestinationNameBin = strip_cr_lf(DestinationNameBin0), @@ -1820,14 +1847,11 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), [check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]], check_read_permitted(ExchangeName, User), - ExchangeLookup = rabbit_exchange:lookup(ExchangeName), - case ExchangeLookup of + case rabbit_exchange:lookup(ExchangeName) of {error, not_found} -> - %% no-op - ExchangeLookup; + ok; {ok, Exchange} -> - check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey), - ExchangeLookup + check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc) end, case Fun(#binding{source = ExchangeName, destination = DestinationName, @@ -2226,6 +2250,7 @@ i(user_who_performed_action, Ch) -> i(user, Ch); i(vhost, #ch{virtual_host = VHost}) -> VHost; i(transactional, #ch{tx = Tx}) -> Tx =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; +i(source, #ch{source = ChSrc}) -> ChSrc; i(name, State) -> name(State); i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM); i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC); @@ -2298,39 +2323,39 @@ handle_method(#'exchange.bind'{destination = DestinationNameBin, source = SourceNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, _CollectorId, VHostPath, User) -> + ConnPid, ChSrc, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:add/3, SourceNameBin, exchange, DestinationNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, User); + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); handle_method(#'exchange.unbind'{destination = DestinationNameBin, source = SourceNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, _CollectorId, VHostPath, User) -> + ConnPid, ChSrc, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:remove/3, SourceNameBin, exchange, DestinationNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, User); + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, _CollectorId, VHostPath, User) -> + ConnPid, ChSrc, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:remove/3, ExchangeNameBin, queue, QueueNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, User); + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); handle_method(#'queue.bind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, _CollectorId, VHostPath, User) -> + ConnPid, ChSrc, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:add/3, ExchangeNameBin, queue, QueueNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, User); + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); %% Note that all declares to these are effectively passive. If it %% exists it by definition has one consumer. handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to", _/binary>> = QueueNameBin}, - _ConnPid, _CollectorPid, VHost, _User) -> + _ConnPid, _ChSrc, _CollectorPid, VHost, _User) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin), case declare_fast_reply_to(StrippedQueueNameBin) of @@ -2344,7 +2369,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, auto_delete = AutoDelete, nowait = NoWait, arguments = Args} = Declare, - ConnPid, CollectorPid, VHostPath, + ConnPid, ChSrc, CollectorPid, VHostPath, #user{username = Username} = User) -> Owner = case ExclusiveDeclare of true -> ConnPid; @@ -2403,7 +2428,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, {existing, _Q} -> %% must have been created between the stat and the %% declare. Loop around again. - handle_method(Declare, ConnPid, CollectorPid, VHostPath, + handle_method(Declare, ConnPid, ChSrc, CollectorPid, VHostPath, User); {absent, Q, Reason} -> rabbit_amqqueue:absent(Q, Reason); @@ -2419,7 +2444,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, handle_method(#'queue.declare'{queue = QueueNameBin, nowait = NoWait, passive = true}, - ConnPid, _CollectorPid, VHostPath, _User) -> + ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin), Fun = fun (Q0) -> @@ -2433,7 +2458,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, if_empty = IfEmpty}, - ConnPid, _CollectorPid, VHostPath, + ConnPid, _ChSrc, _CollectorPid, VHostPath, User = #user{username = Username}) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath), @@ -2462,7 +2487,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, end; handle_method(#'exchange.delete'{exchange = ExchangeNameBin, if_unused = IfUnused}, - _ConnPid, _CollectorPid, VHostPath, + _ConnPid, _ChSrc, _CollectorPid, VHostPath, User = #user{username = Username}) -> StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin), @@ -2478,7 +2503,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, ok end; handle_method(#'queue.purge'{queue = QueueNameBin}, - ConnPid, _CollectorPid, VHostPath, User) -> + ConnPid, _ChSrc, _CollectorPid, VHostPath, User) -> QueueName = qbin_to_resource(QueueNameBin, VHostPath), check_read_permitted(QueueName, User), rabbit_amqqueue:with_exclusive_access_or_die( @@ -2491,7 +2516,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, auto_delete = AutoDelete, internal = Internal, arguments = Args}, - _ConnPid, _CollectorPid, VHostPath, + _ConnPid, _ChSrc, _CollectorPid, VHostPath, #user{username = Username} = User) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), @@ -2524,7 +2549,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, AutoDelete, Internal, Args); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, passive = true}, - _ConnPid, _CollectorPid, VHostPath, _User) -> + _ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), check_not_default_exchange(ExchangeName), _ = rabbit_exchange:lookup_or_die(ExchangeName). diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 50e8f3d2b0..e43bfba90c 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -211,6 +211,7 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, rabbit_direct_client_sup, [{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector}]), + _ = rabbit_channel:source(ChannelPid, ?MODULE), {ok, ChannelPid}. -spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index c0cb9c57d5..6f0b0a5ea5 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -924,6 +924,7 @@ create_channel(Channel, rabbit_channel_sup_sup:start_channel( ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, Protocol, User, VHost, Capabilities, Collector}), + _ = rabbit_channel:source(ChPid, ?MODULE), MRef = erlang:monitor(process, ChPid), put({ch_pid, ChPid}, {Channel, MRef}), put({channel, Channel}, {ChPid, AState}), diff --git a/test/channel_source_SUITE.erl b/test/channel_source_SUITE.erl new file mode 100644 index 0000000000..56b287e913 --- /dev/null +++ b/test/channel_source_SUITE.erl @@ -0,0 +1,142 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved. +%% + +-module(channel_source_SUITE). + +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + network_rabbit_reader_channel_source, + network_arbitrary_channel_source, + direct_channel_source, + undefined_channel_source + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Testcase} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +network_rabbit_reader_channel_source(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, network_rabbit_reader_channel_source1, [Config]). + +network_rabbit_reader_channel_source1(Config) -> + ExistingChannels = rabbit_channel:list(), + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + {ok, ClientCh} = amqp_connection:open_channel(Conn), + [ServerCh] = rabbit_channel:list() -- ExistingChannels, + [{source, rabbit_reader}] = rabbit_channel:info(ServerCh, [source]), + _ = rabbit_channel:source(ServerCh, ?MODULE), + [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]), + amqp_channel:close(ClientCh), + amqp_connection:close(Conn), + {error, channel_terminated} = rabbit_channel:source(ServerCh, ?MODULE), + passed. + +network_arbitrary_channel_source(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, network_arbitrary_channel_source1, [Config]). + +network_arbitrary_channel_source1(Config) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + Writer = spawn(fun () -> rabbit_ct_broker_helpers:test_writer(self()) end), + {ok, Limiter} = rabbit_limiter:start_link(no_limiter_id), + {ok, Collector} = rabbit_queue_collector:start_link(no_collector_id), + {ok, Ch} = rabbit_channel:start_link( + 1, Conn, Writer, Conn, "", rabbit_framing_amqp_0_9_1, + rabbit_ct_broker_helpers:user(<<"guest">>), <<"/">>, [], + Collector, Limiter), + _ = rabbit_channel:source(Ch, ?MODULE), + [{amqp_params, #amqp_params_network{username = <<"guest">>, + password = <<"guest">>, host = "localhost", virtual_host = <<"/">>}}] = + rabbit_amqp_connection:amqp_params(Conn, 1000), + [{source, ?MODULE}] = rabbit_channel:info(Ch, [source]), + [exit(P, normal) || P <- [Writer, Limiter, Collector, Ch]], + amqp_connection:close(Conn), + {error, channel_terminated} = rabbit_channel:source(Ch, ?MODULE), + passed. + +direct_channel_source(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, direct_channel_source1, [Config]). + +direct_channel_source1(Config) -> + ExistingChannels = rabbit_channel:list(), + Conn = rabbit_ct_client_helpers:open_unmanaged_connection_direct(Config), + {ok, ClientCh} = amqp_connection:open_channel(Conn), + [ServerCh] = rabbit_channel:list() -- ExistingChannels, + [{source, rabbit_direct}] = rabbit_channel:info(ServerCh, [source]), + _ = rabbit_channel:source(ServerCh, ?MODULE), + [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]), + amqp_channel:close(ClientCh), + amqp_connection:close(Conn), + {error, channel_terminated} = rabbit_channel:source(ServerCh, ?MODULE), + passed. + +undefined_channel_source(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, undefined_channel_source1, [Config]). + +undefined_channel_source1(_Config) -> + ExistingChannels = rabbit_channel:list(), + {_Writer, _Limiter, ServerCh} = rabbit_ct_broker_helpers:test_channel(), + [ServerCh] = rabbit_channel:list() -- ExistingChannels, + [{source, undefined}] = rabbit_channel:info(ServerCh, [source]), + _ = rabbit_channel:source(ServerCh, ?MODULE), + [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]), + passed. |
