summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAyanda Dube <ayanda.dube@erlang-solutions.com>2019-02-15 17:52:57 +0000
committerAyanda Dube <ayanda.dube@erlang-solutions.com>2019-02-18 11:47:10 +0100
commit2a80aa97ac361b11679af6c755807c68ad8b7b2a (patch)
treecd2c82b0f0aa6b9407fd896ff0d87a37e52cffe3
parentf6d93688a71bb765c91fe2cc3bba6e729d5fae5e (diff)
downloadrabbitmq-server-git-2a80aa97ac361b11679af6c755807c68ad8b7b2a.tar.gz
Introduce originating source to channels, to help avoid
synchronous channel requests back to the connection process
-rw-r--r--src/rabbit_channel.erl21
-rw-r--r--src/rabbit_direct.erl1
-rw-r--r--src/rabbit_reader.erl1
-rw-r--r--test/channel_source_SUITE.erl123
4 files changed, 143 insertions, 3 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 036aa9a60c..dcb4befee6 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -63,6 +63,7 @@
-export([refresh_config_local/0, ready_for_close/1]).
-export([refresh_interceptors/0]).
-export([force_event_refresh/1]).
+-export([source/2]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
@@ -448,6 +449,14 @@ force_event_refresh(Ref) ->
list_queue_states(Pid) ->
gen_server2:call(Pid, list_queue_states).
+-spec source(pid(), any()) -> any().
+
+source(Pid, Source) when is_pid(Pid) ->
+ case erlang:is_process_alive(Pid) of
+ true -> Pid ! {channel_source, Source};
+ false -> {error, channel_terminated}
+ end.
+
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
@@ -805,7 +814,11 @@ handle_info(queue_cleanup, State = #ch{queue_states = QueueStates0}) ->
QName = rabbit_quorum_queue:queue_name(QS),
[] /= rabbit_amqqueue:lookup(QName)
end, QueueStates0),
- noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})).
+ noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates}));
+
+handle_info({channel_source, Source}, State) ->
+ put(channel_source, Source),
+ noreply(State).
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
@@ -984,7 +997,7 @@ check_topic_authorisation(Resource = #exchange{type = topic},
check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission);
check_topic_authorisation(Resource = #exchange{type = topic},
User, ConnPid, RoutingKey, Permission) when is_pid(ConnPid) ->
- AmqpParams = get_amqp_params(ConnPid),
+ AmqpParams = get_amqp_params(ConnPid, get(channel_source)),
check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission);
check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic},
User = #user{username = Username},
@@ -1007,7 +1020,8 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost
check_topic_authorisation(_, _, _, _, _) ->
ok.
-get_amqp_params(ConnPid) when is_pid(ConnPid) ->
+get_amqp_params(_ConnPid, rabbit_reader) -> [];
+get_amqp_params(ConnPid, _Any) when is_pid(ConnPid) ->
Timeout = get_operation_timeout(),
get_amqp_params(ConnPid, rabbit_misc:is_process_alive(ConnPid), Timeout).
@@ -2246,6 +2260,7 @@ i(garbage_collection, _State) ->
i(reductions, _State) ->
{reductions, Reductions} = erlang:process_info(self(), reductions),
Reductions;
+i(channel_source, _State = #ch{}) -> get(channel_source);
i(Item, _) ->
throw({bad_argument, Item}).
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 50e8f3d2b0..e43bfba90c 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -211,6 +211,7 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User,
rabbit_direct_client_sup,
[{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol,
User, VHost, Capabilities, Collector}]),
+ _ = rabbit_channel:source(ChannelPid, ?MODULE),
{ok, ChannelPid}.
-spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index c0cb9c57d5..6f0b0a5ea5 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -924,6 +924,7 @@ create_channel(Channel,
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
Protocol, User, VHost, Capabilities, Collector}),
+ _ = rabbit_channel:source(ChPid, ?MODULE),
MRef = erlang:monitor(process, ChPid),
put({ch_pid, ChPid}, {Channel, MRef}),
put({channel, Channel}, {ChPid, AState}),
diff --git a/test/channel_source_SUITE.erl b/test/channel_source_SUITE.erl
new file mode 100644
index 0000000000..11f87c7fde
--- /dev/null
+++ b/test/channel_source_SUITE.erl
@@ -0,0 +1,123 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(channel_source_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ network_channel_source_notifications,
+ direct_channel_source_notifications,
+ undefined_channel_source_notifications
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Testcase}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+network_channel_source_notifications(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, network_channel_source_notifications1, [Config]).
+
+network_channel_source_notifications1(Config) ->
+ ExistingChannels = rabbit_channel:list(),
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ {ok, _ClientCh} = amqp_connection:open_channel(Conn),
+ [ServerCh] = rabbit_channel:list() -- ExistingChannels,
+ [{channel_source, rabbit_reader}] =
+ rabbit_channel:info(ServerCh, [channel_source]),
+ rabbit_channel:source(ServerCh, ?MODULE),
+ [{channel_source, ?MODULE}] =
+ rabbit_channel:info(ServerCh, [channel_source]),
+ amqp_connection:close(Conn),
+ {error, channel_terminated} = rabbit_channel:source(ServerCh, ?MODULE),
+ passed.
+
+direct_channel_source_notifications(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, direct_channel_source_notifications1, [Config]).
+
+direct_channel_source_notifications1(Config) ->
+ ExistingChannels = rabbit_channel:list(),
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection_direct(Config),
+ {ok, _ClientCh} = amqp_connection:open_channel(Conn),
+ [ServerCh] = rabbit_channel:list() -- ExistingChannels,
+ [{channel_source, rabbit_direct}] =
+ rabbit_channel:info(ServerCh, [channel_source]),
+ rabbit_channel:source(ServerCh, ?MODULE),
+ [{channel_source, ?MODULE}] =
+ rabbit_channel:info(ServerCh, [channel_source]),
+ amqp_connection:close(Conn),
+ {error, channel_terminated} = rabbit_channel:source(ServerCh, ?MODULE),
+ passed.
+
+undefined_channel_source_notifications(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, undefined_channel_source_notifications1, [Config]).
+
+undefined_channel_source_notifications1(_Config) ->
+ ExistingChannels = rabbit_channel:list(),
+ {_Writer, _Limiter, ServerCh} = rabbit_ct_broker_helpers:test_channel(),
+ [ServerCh] = rabbit_channel:list() -- ExistingChannels,
+ [{channel_source, undefined}] =
+ rabbit_channel:info(ServerCh, [channel_source]),
+ rabbit_channel:source(ServerCh, ?MODULE),
+ [{channel_source, ?MODULE}] =
+ rabbit_channel:info(ServerCh, [channel_source]),
+ passed.