summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2019-02-19 15:05:20 +0300
committerGitHub <noreply@github.com>2019-02-19 15:05:20 +0300
commitc5df441e8130d33683082e165b3ff993f1580ee0 (patch)
tree872556de9386ab4148291663cd303ce3a0032ade
parentd680193af3d43b04e15ce66abe1ec895f2562c77 (diff)
parentf2a01fda8a2079939604a7201659039ad2e501a0 (diff)
downloadrabbitmq-server-git-c5df441e8130d33683082e165b3ff993f1580ee0.tar.gz
Merge pull request #1886 from Ayanda-D/channel-source
Avoid synchronous channel request to connection process
-rw-r--r--src/rabbit_channel.erl121
-rw-r--r--src/rabbit_direct.erl1
-rw-r--r--src/rabbit_reader.erl1
-rw-r--r--test/channel_source_SUITE.erl142
4 files changed, 217 insertions, 48 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 036aa9a60c..8f5f159f88 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -63,6 +63,7 @@
-export([refresh_config_local/0, ready_for_close/1]).
-export([refresh_interceptors/0]).
-export([force_event_refresh/1]).
+-export([source/2]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
@@ -78,7 +79,7 @@
-export([list_queue_states/1, get_max_message_size/0]).
%% Mgmt HTTP API refactor
--export([handle_method/5]).
+-export([handle_method/6]).
-record(ch, {
%% starting | running | flow | closing
@@ -97,6 +98,9 @@
%% same as reader's name, see #v1.name
%% in rabbit_reader
conn_name,
+ %% channel's originating source e.g. rabbit_reader | rabbit_direct | undefined
+ %% or any other channel creating/spawning entity
+ source,
%% limiter pid, see rabbit_limiter
limiter,
%% none | {Msgs, Acks} | committing | failed |
@@ -448,6 +452,14 @@ force_event_refresh(Ref) ->
list_queue_states(Pid) ->
gen_server2:call(Pid, list_queue_states).
+-spec source(pid(), any()) -> any().
+
+source(Pid, Source) when is_pid(Pid) ->
+ case erlang:is_process_alive(Pid) of
+ true -> Pid ! {channel_source, Source};
+ false -> {error, channel_terminated}
+ end.
+
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
@@ -805,7 +817,10 @@ handle_info(queue_cleanup, State = #ch{queue_states = QueueStates0}) ->
QName = rabbit_quorum_queue:queue_name(QS),
[] /= rabbit_amqqueue:lookup(QName)
end, QueueStates0),
- noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})).
+ noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates}));
+
+handle_info({channel_source, Source}, State = #ch{}) ->
+ noreply(State#ch{source = Source}).
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
@@ -938,11 +953,11 @@ check_write_permitted(Resource, User) ->
check_read_permitted(Resource, User) ->
check_resource_access(User, Resource, read).
-check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey) ->
- check_topic_authorisation(Resource, User, ConnPid, RoutingKey, write).
+check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) ->
+ check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, write).
-check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey) ->
- check_topic_authorisation(Resource, User, ConnPid, RoutingKey, read).
+check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) ->
+ check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, read).
check_user_id_header(#'P_basic'{user_id = undefined}, _) ->
ok;
@@ -978,14 +993,17 @@ check_internal_exchange(_) ->
ok.
check_topic_authorisation(Resource = #exchange{type = topic},
- User, none, RoutingKey, Permission) ->
+ User, none, RoutingKey, _ChSrc, Permission) ->
%% Called from outside the channel by mgmt API
AmqpParams = [],
check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission);
check_topic_authorisation(Resource = #exchange{type = topic},
- User, ConnPid, RoutingKey, Permission) when is_pid(ConnPid) ->
- AmqpParams = get_amqp_params(ConnPid),
+ User, ConnPid, RoutingKey, ChSrc, Permission) when is_pid(ConnPid) ->
+ AmqpParams = get_amqp_params(ConnPid, ChSrc),
check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission);
+check_topic_authorisation(_, _, _, _, _, _) ->
+ ok.
+
check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic},
User = #user{username = Username},
AmqpParams, RoutingKey, Permission) ->
@@ -1003,11 +1021,10 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost
User, Resource, Permission, Context),
CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1),
put(topic_permission_cache, [{Resource, Context, Permission} | CacheTail])
- end;
-check_topic_authorisation(_, _, _, _, _) ->
- ok.
+ end.
-get_amqp_params(ConnPid) when is_pid(ConnPid) ->
+get_amqp_params(_ConnPid, rabbit_reader) -> [];
+get_amqp_params(ConnPid, _Any) when is_pid(ConnPid) ->
Timeout = get_operation_timeout(),
get_amqp_params(ConnPid, rabbit_misc:is_process_alive(ConnPid), Timeout).
@@ -1227,13 +1244,14 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
conn_name = ConnName,
delivery_flow = Flow,
conn_pid = ConnPid,
+ source = ChSrc,
max_message_size = MaxMessageSize}) ->
check_msg_size(Content, MaxMessageSize),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, User),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
check_internal_exchange(Exchange),
- check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey),
+ check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = #content {properties = Props} =
@@ -1516,76 +1534,85 @@ handle_method(#'exchange.declare'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
user = User,
queue_collector_pid = CollectorPid,
- conn_pid = ConnPid}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ conn_pid = ConnPid,
+ source = ChSrc}) ->
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.delete'{nowait = NoWait} = Method,
_, State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
virtual_host = VHostPath,
queue_collector_pid = CollectorPid,
user = User}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.delete_ok'{});
handle_method(#'exchange.bind'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid,
+ source = ChSrc,
queue_collector_pid = CollectorPid,
user = User}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.bind_ok'{});
handle_method(#'exchange.unbind'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid,
+ source = ChSrc,
queue_collector_pid = CollectorPid,
user = User}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.unbind_ok'{});
handle_method(#'queue.declare'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid,
+ source = ChSrc,
queue_collector_pid = CollectorPid,
user = User}) ->
{ok, QueueName, MessageCount, ConsumerCount} =
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_queue_declare_ok(QueueName, NoWait, MessageCount,
ConsumerCount, State);
handle_method(#'queue.delete'{nowait = NoWait} = Method, _,
State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
virtual_host = VHostPath,
queue_collector_pid = CollectorPid,
user = User}) ->
{ok, PurgedMessageCount} =
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait,
#'queue.delete_ok'{message_count = PurgedMessageCount});
handle_method(#'queue.bind'{nowait = NoWait} = Method, _,
State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'queue.bind_ok'{});
handle_method(#'queue.unbind'{} = Method, _,
State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, false, #'queue.unbind_ok'{});
handle_method(#'queue.purge'{nowait = NoWait} = Method,
_, State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}) ->
- case handle_method(Method, ConnPid, CollectorPid,
+ case handle_method(Method, ConnPid, ChSrc, CollectorPid,
VHostPath, User) of
{ok, PurgedMessageCount} ->
return_ok(State, NoWait,
@@ -1811,7 +1838,7 @@ handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QRef, DQ)}.
binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
- RoutingKey, Arguments, VHostPath, ConnPid,
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc,
#user{username = Username} = User) ->
ExchangeNameBin = strip_cr_lf(SourceNameBin0),
DestinationNameBin = strip_cr_lf(DestinationNameBin0),
@@ -1820,14 +1847,11 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
[check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]],
check_read_permitted(ExchangeName, User),
- ExchangeLookup = rabbit_exchange:lookup(ExchangeName),
- case ExchangeLookup of
+ case rabbit_exchange:lookup(ExchangeName) of
{error, not_found} ->
- %% no-op
- ExchangeLookup;
+ ok;
{ok, Exchange} ->
- check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey),
- ExchangeLookup
+ check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc)
end,
case Fun(#binding{source = ExchangeName,
destination = DestinationName,
@@ -2226,6 +2250,7 @@ i(user_who_performed_action, Ch) -> i(user, Ch);
i(vhost, #ch{virtual_host = VHost}) -> VHost;
i(transactional, #ch{tx = Tx}) -> Tx =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
+i(source, #ch{source = ChSrc}) -> ChSrc;
i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM);
i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
@@ -2298,39 +2323,39 @@ handle_method(#'exchange.bind'{destination = DestinationNameBin,
source = SourceNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, _CollectorId, VHostPath, User) ->
+ ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:add/3,
SourceNameBin, exchange, DestinationNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
handle_method(#'exchange.unbind'{destination = DestinationNameBin,
source = SourceNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, _CollectorId, VHostPath, User) ->
+ ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:remove/3,
SourceNameBin, exchange, DestinationNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, _CollectorId, VHostPath, User) ->
+ ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:remove/3,
ExchangeNameBin, queue, QueueNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
handle_method(#'queue.bind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, _CollectorId, VHostPath, User) ->
+ ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:add/3,
ExchangeNameBin, queue, QueueNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
%% Note that all declares to these are effectively passive. If it
%% exists it by definition has one consumer.
handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to",
_/binary>> = QueueNameBin},
- _ConnPid, _CollectorPid, VHost, _User) ->
+ _ConnPid, _ChSrc, _CollectorPid, VHost, _User) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin),
case declare_fast_reply_to(StrippedQueueNameBin) of
@@ -2344,7 +2369,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
auto_delete = AutoDelete,
nowait = NoWait,
arguments = Args} = Declare,
- ConnPid, CollectorPid, VHostPath,
+ ConnPid, ChSrc, CollectorPid, VHostPath,
#user{username = Username} = User) ->
Owner = case ExclusiveDeclare of
true -> ConnPid;
@@ -2403,7 +2428,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
{existing, _Q} ->
%% must have been created between the stat and the
%% declare. Loop around again.
- handle_method(Declare, ConnPid, CollectorPid, VHostPath,
+ handle_method(Declare, ConnPid, ChSrc, CollectorPid, VHostPath,
User);
{absent, Q, Reason} ->
rabbit_amqqueue:absent(Q, Reason);
@@ -2419,7 +2444,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
handle_method(#'queue.declare'{queue = QueueNameBin,
nowait = NoWait,
passive = true},
- ConnPid, _CollectorPid, VHostPath, _User) ->
+ ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin),
Fun = fun (Q0) ->
@@ -2433,7 +2458,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
if_empty = IfEmpty},
- ConnPid, _CollectorPid, VHostPath,
+ ConnPid, _ChSrc, _CollectorPid, VHostPath,
User = #user{username = Username}) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath),
@@ -2462,7 +2487,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
end;
handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
if_unused = IfUnused},
- _ConnPid, _CollectorPid, VHostPath,
+ _ConnPid, _ChSrc, _CollectorPid, VHostPath,
User = #user{username = Username}) ->
StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin),
@@ -2478,7 +2503,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
ok
end;
handle_method(#'queue.purge'{queue = QueueNameBin},
- ConnPid, _CollectorPid, VHostPath, User) ->
+ ConnPid, _ChSrc, _CollectorPid, VHostPath, User) ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
check_read_permitted(QueueName, User),
rabbit_amqqueue:with_exclusive_access_or_die(
@@ -2491,7 +2516,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
auto_delete = AutoDelete,
internal = Internal,
arguments = Args},
- _ConnPid, _CollectorPid, VHostPath,
+ _ConnPid, _ChSrc, _CollectorPid, VHostPath,
#user{username = Username} = User) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
@@ -2524,7 +2549,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
AutoDelete, Internal, Args);
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
passive = true},
- _ConnPid, _CollectorPid, VHostPath, _User) ->
+ _ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
check_not_default_exchange(ExchangeName),
_ = rabbit_exchange:lookup_or_die(ExchangeName).
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 50e8f3d2b0..e43bfba90c 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -211,6 +211,7 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User,
rabbit_direct_client_sup,
[{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol,
User, VHost, Capabilities, Collector}]),
+ _ = rabbit_channel:source(ChannelPid, ?MODULE),
{ok, ChannelPid}.
-spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index c0cb9c57d5..6f0b0a5ea5 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -924,6 +924,7 @@ create_channel(Channel,
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
Protocol, User, VHost, Capabilities, Collector}),
+ _ = rabbit_channel:source(ChPid, ?MODULE),
MRef = erlang:monitor(process, ChPid),
put({ch_pid, ChPid}, {Channel, MRef}),
put({channel, Channel}, {ChPid, AState}),
diff --git a/test/channel_source_SUITE.erl b/test/channel_source_SUITE.erl
new file mode 100644
index 0000000000..56b287e913
--- /dev/null
+++ b/test/channel_source_SUITE.erl
@@ -0,0 +1,142 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(channel_source_SUITE).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ network_rabbit_reader_channel_source,
+ network_arbitrary_channel_source,
+ direct_channel_source,
+ undefined_channel_source
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Testcase}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+network_rabbit_reader_channel_source(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, network_rabbit_reader_channel_source1, [Config]).
+
+network_rabbit_reader_channel_source1(Config) ->
+ ExistingChannels = rabbit_channel:list(),
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ {ok, ClientCh} = amqp_connection:open_channel(Conn),
+ [ServerCh] = rabbit_channel:list() -- ExistingChannels,
+ [{source, rabbit_reader}] = rabbit_channel:info(ServerCh, [source]),
+ _ = rabbit_channel:source(ServerCh, ?MODULE),
+ [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]),
+ amqp_channel:close(ClientCh),
+ amqp_connection:close(Conn),
+ {error, channel_terminated} = rabbit_channel:source(ServerCh, ?MODULE),
+ passed.
+
+network_arbitrary_channel_source(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, network_arbitrary_channel_source1, [Config]).
+
+network_arbitrary_channel_source1(Config) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ Writer = spawn(fun () -> rabbit_ct_broker_helpers:test_writer(self()) end),
+ {ok, Limiter} = rabbit_limiter:start_link(no_limiter_id),
+ {ok, Collector} = rabbit_queue_collector:start_link(no_collector_id),
+ {ok, Ch} = rabbit_channel:start_link(
+ 1, Conn, Writer, Conn, "", rabbit_framing_amqp_0_9_1,
+ rabbit_ct_broker_helpers:user(<<"guest">>), <<"/">>, [],
+ Collector, Limiter),
+ _ = rabbit_channel:source(Ch, ?MODULE),
+ [{amqp_params, #amqp_params_network{username = <<"guest">>,
+ password = <<"guest">>, host = "localhost", virtual_host = <<"/">>}}] =
+ rabbit_amqp_connection:amqp_params(Conn, 1000),
+ [{source, ?MODULE}] = rabbit_channel:info(Ch, [source]),
+ [exit(P, normal) || P <- [Writer, Limiter, Collector, Ch]],
+ amqp_connection:close(Conn),
+ {error, channel_terminated} = rabbit_channel:source(Ch, ?MODULE),
+ passed.
+
+direct_channel_source(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, direct_channel_source1, [Config]).
+
+direct_channel_source1(Config) ->
+ ExistingChannels = rabbit_channel:list(),
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection_direct(Config),
+ {ok, ClientCh} = amqp_connection:open_channel(Conn),
+ [ServerCh] = rabbit_channel:list() -- ExistingChannels,
+ [{source, rabbit_direct}] = rabbit_channel:info(ServerCh, [source]),
+ _ = rabbit_channel:source(ServerCh, ?MODULE),
+ [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]),
+ amqp_channel:close(ClientCh),
+ amqp_connection:close(Conn),
+ {error, channel_terminated} = rabbit_channel:source(ServerCh, ?MODULE),
+ passed.
+
+undefined_channel_source(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, undefined_channel_source1, [Config]).
+
+undefined_channel_source1(_Config) ->
+ ExistingChannels = rabbit_channel:list(),
+ {_Writer, _Limiter, ServerCh} = rabbit_ct_broker_helpers:test_channel(),
+ [ServerCh] = rabbit_channel:list() -- ExistingChannels,
+ [{source, undefined}] = rabbit_channel:info(ServerCh, [source]),
+ _ = rabbit_channel:source(ServerCh, ?MODULE),
+ [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]),
+ passed.