summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2019-02-19 15:05:20 +0300
committerGitHub <noreply@github.com>2019-02-19 15:05:20 +0300
commitc5df441e8130d33683082e165b3ff993f1580ee0 (patch)
tree872556de9386ab4148291663cd303ce3a0032ade /src
parentd680193af3d43b04e15ce66abe1ec895f2562c77 (diff)
parentf2a01fda8a2079939604a7201659039ad2e501a0 (diff)
downloadrabbitmq-server-git-c5df441e8130d33683082e165b3ff993f1580ee0.tar.gz
Merge pull request #1886 from Ayanda-D/channel-source
Avoid synchronous channel request to connection process
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl121
-rw-r--r--src/rabbit_direct.erl1
-rw-r--r--src/rabbit_reader.erl1
3 files changed, 75 insertions, 48 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 036aa9a60c..8f5f159f88 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -63,6 +63,7 @@
-export([refresh_config_local/0, ready_for_close/1]).
-export([refresh_interceptors/0]).
-export([force_event_refresh/1]).
+-export([source/2]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
@@ -78,7 +79,7 @@
-export([list_queue_states/1, get_max_message_size/0]).
%% Mgmt HTTP API refactor
--export([handle_method/5]).
+-export([handle_method/6]).
-record(ch, {
%% starting | running | flow | closing
@@ -97,6 +98,9 @@
%% same as reader's name, see #v1.name
%% in rabbit_reader
conn_name,
+ %% channel's originating source e.g. rabbit_reader | rabbit_direct | undefined
+ %% or any other channel creating/spawning entity
+ source,
%% limiter pid, see rabbit_limiter
limiter,
%% none | {Msgs, Acks} | committing | failed |
@@ -448,6 +452,14 @@ force_event_refresh(Ref) ->
list_queue_states(Pid) ->
gen_server2:call(Pid, list_queue_states).
+-spec source(pid(), any()) -> any().
+
+source(Pid, Source) when is_pid(Pid) ->
+ case erlang:is_process_alive(Pid) of
+ true -> Pid ! {channel_source, Source};
+ false -> {error, channel_terminated}
+ end.
+
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
@@ -805,7 +817,10 @@ handle_info(queue_cleanup, State = #ch{queue_states = QueueStates0}) ->
QName = rabbit_quorum_queue:queue_name(QS),
[] /= rabbit_amqqueue:lookup(QName)
end, QueueStates0),
- noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})).
+ noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates}));
+
+handle_info({channel_source, Source}, State = #ch{}) ->
+ noreply(State#ch{source = Source}).
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
@@ -938,11 +953,11 @@ check_write_permitted(Resource, User) ->
check_read_permitted(Resource, User) ->
check_resource_access(User, Resource, read).
-check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey) ->
- check_topic_authorisation(Resource, User, ConnPid, RoutingKey, write).
+check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) ->
+ check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, write).
-check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey) ->
- check_topic_authorisation(Resource, User, ConnPid, RoutingKey, read).
+check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) ->
+ check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, read).
check_user_id_header(#'P_basic'{user_id = undefined}, _) ->
ok;
@@ -978,14 +993,17 @@ check_internal_exchange(_) ->
ok.
check_topic_authorisation(Resource = #exchange{type = topic},
- User, none, RoutingKey, Permission) ->
+ User, none, RoutingKey, _ChSrc, Permission) ->
%% Called from outside the channel by mgmt API
AmqpParams = [],
check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission);
check_topic_authorisation(Resource = #exchange{type = topic},
- User, ConnPid, RoutingKey, Permission) when is_pid(ConnPid) ->
- AmqpParams = get_amqp_params(ConnPid),
+ User, ConnPid, RoutingKey, ChSrc, Permission) when is_pid(ConnPid) ->
+ AmqpParams = get_amqp_params(ConnPid, ChSrc),
check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission);
+check_topic_authorisation(_, _, _, _, _, _) ->
+ ok.
+
check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic},
User = #user{username = Username},
AmqpParams, RoutingKey, Permission) ->
@@ -1003,11 +1021,10 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost
User, Resource, Permission, Context),
CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1),
put(topic_permission_cache, [{Resource, Context, Permission} | CacheTail])
- end;
-check_topic_authorisation(_, _, _, _, _) ->
- ok.
+ end.
-get_amqp_params(ConnPid) when is_pid(ConnPid) ->
+get_amqp_params(_ConnPid, rabbit_reader) -> [];
+get_amqp_params(ConnPid, _Any) when is_pid(ConnPid) ->
Timeout = get_operation_timeout(),
get_amqp_params(ConnPid, rabbit_misc:is_process_alive(ConnPid), Timeout).
@@ -1227,13 +1244,14 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
conn_name = ConnName,
delivery_flow = Flow,
conn_pid = ConnPid,
+ source = ChSrc,
max_message_size = MaxMessageSize}) ->
check_msg_size(Content, MaxMessageSize),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, User),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
check_internal_exchange(Exchange),
- check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey),
+ check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = #content {properties = Props} =
@@ -1516,76 +1534,85 @@ handle_method(#'exchange.declare'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
user = User,
queue_collector_pid = CollectorPid,
- conn_pid = ConnPid}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ conn_pid = ConnPid,
+ source = ChSrc}) ->
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.delete'{nowait = NoWait} = Method,
_, State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
virtual_host = VHostPath,
queue_collector_pid = CollectorPid,
user = User}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.delete_ok'{});
handle_method(#'exchange.bind'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid,
+ source = ChSrc,
queue_collector_pid = CollectorPid,
user = User}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.bind_ok'{});
handle_method(#'exchange.unbind'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid,
+ source = ChSrc,
queue_collector_pid = CollectorPid,
user = User}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.unbind_ok'{});
handle_method(#'queue.declare'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid,
+ source = ChSrc,
queue_collector_pid = CollectorPid,
user = User}) ->
{ok, QueueName, MessageCount, ConsumerCount} =
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_queue_declare_ok(QueueName, NoWait, MessageCount,
ConsumerCount, State);
handle_method(#'queue.delete'{nowait = NoWait} = Method, _,
State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
virtual_host = VHostPath,
queue_collector_pid = CollectorPid,
user = User}) ->
{ok, PurgedMessageCount} =
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait,
#'queue.delete_ok'{message_count = PurgedMessageCount});
handle_method(#'queue.bind'{nowait = NoWait} = Method, _,
State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'queue.bind_ok'{});
handle_method(#'queue.unbind'{} = Method, _,
State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, false, #'queue.unbind_ok'{});
handle_method(#'queue.purge'{nowait = NoWait} = Method,
_, State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}) ->
- case handle_method(Method, ConnPid, CollectorPid,
+ case handle_method(Method, ConnPid, ChSrc, CollectorPid,
VHostPath, User) of
{ok, PurgedMessageCount} ->
return_ok(State, NoWait,
@@ -1811,7 +1838,7 @@ handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QRef, DQ)}.
binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
- RoutingKey, Arguments, VHostPath, ConnPid,
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc,
#user{username = Username} = User) ->
ExchangeNameBin = strip_cr_lf(SourceNameBin0),
DestinationNameBin = strip_cr_lf(DestinationNameBin0),
@@ -1820,14 +1847,11 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
[check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]],
check_read_permitted(ExchangeName, User),
- ExchangeLookup = rabbit_exchange:lookup(ExchangeName),
- case ExchangeLookup of
+ case rabbit_exchange:lookup(ExchangeName) of
{error, not_found} ->
- %% no-op
- ExchangeLookup;
+ ok;
{ok, Exchange} ->
- check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey),
- ExchangeLookup
+ check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc)
end,
case Fun(#binding{source = ExchangeName,
destination = DestinationName,
@@ -2226,6 +2250,7 @@ i(user_who_performed_action, Ch) -> i(user, Ch);
i(vhost, #ch{virtual_host = VHost}) -> VHost;
i(transactional, #ch{tx = Tx}) -> Tx =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
+i(source, #ch{source = ChSrc}) -> ChSrc;
i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM);
i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
@@ -2298,39 +2323,39 @@ handle_method(#'exchange.bind'{destination = DestinationNameBin,
source = SourceNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, _CollectorId, VHostPath, User) ->
+ ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:add/3,
SourceNameBin, exchange, DestinationNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
handle_method(#'exchange.unbind'{destination = DestinationNameBin,
source = SourceNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, _CollectorId, VHostPath, User) ->
+ ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:remove/3,
SourceNameBin, exchange, DestinationNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, _CollectorId, VHostPath, User) ->
+ ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:remove/3,
ExchangeNameBin, queue, QueueNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
handle_method(#'queue.bind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, _CollectorId, VHostPath, User) ->
+ ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:add/3,
ExchangeNameBin, queue, QueueNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
%% Note that all declares to these are effectively passive. If it
%% exists it by definition has one consumer.
handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to",
_/binary>> = QueueNameBin},
- _ConnPid, _CollectorPid, VHost, _User) ->
+ _ConnPid, _ChSrc, _CollectorPid, VHost, _User) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin),
case declare_fast_reply_to(StrippedQueueNameBin) of
@@ -2344,7 +2369,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
auto_delete = AutoDelete,
nowait = NoWait,
arguments = Args} = Declare,
- ConnPid, CollectorPid, VHostPath,
+ ConnPid, ChSrc, CollectorPid, VHostPath,
#user{username = Username} = User) ->
Owner = case ExclusiveDeclare of
true -> ConnPid;
@@ -2403,7 +2428,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
{existing, _Q} ->
%% must have been created between the stat and the
%% declare. Loop around again.
- handle_method(Declare, ConnPid, CollectorPid, VHostPath,
+ handle_method(Declare, ConnPid, ChSrc, CollectorPid, VHostPath,
User);
{absent, Q, Reason} ->
rabbit_amqqueue:absent(Q, Reason);
@@ -2419,7 +2444,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
handle_method(#'queue.declare'{queue = QueueNameBin,
nowait = NoWait,
passive = true},
- ConnPid, _CollectorPid, VHostPath, _User) ->
+ ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin),
Fun = fun (Q0) ->
@@ -2433,7 +2458,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
if_empty = IfEmpty},
- ConnPid, _CollectorPid, VHostPath,
+ ConnPid, _ChSrc, _CollectorPid, VHostPath,
User = #user{username = Username}) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath),
@@ -2462,7 +2487,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
end;
handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
if_unused = IfUnused},
- _ConnPid, _CollectorPid, VHostPath,
+ _ConnPid, _ChSrc, _CollectorPid, VHostPath,
User = #user{username = Username}) ->
StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin),
@@ -2478,7 +2503,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
ok
end;
handle_method(#'queue.purge'{queue = QueueNameBin},
- ConnPid, _CollectorPid, VHostPath, User) ->
+ ConnPid, _ChSrc, _CollectorPid, VHostPath, User) ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
check_read_permitted(QueueName, User),
rabbit_amqqueue:with_exclusive_access_or_die(
@@ -2491,7 +2516,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
auto_delete = AutoDelete,
internal = Internal,
arguments = Args},
- _ConnPid, _CollectorPid, VHostPath,
+ _ConnPid, _ChSrc, _CollectorPid, VHostPath,
#user{username = Username} = User) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
@@ -2524,7 +2549,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
AutoDelete, Internal, Args);
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
passive = true},
- _ConnPid, _CollectorPid, VHostPath, _User) ->
+ _ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
check_not_default_exchange(ExchangeName),
_ = rabbit_exchange:lookup_or_die(ExchangeName).
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 50e8f3d2b0..e43bfba90c 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -211,6 +211,7 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User,
rabbit_direct_client_sup,
[{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol,
User, VHost, Capabilities, Collector}]),
+ _ = rabbit_channel:source(ChannelPid, ?MODULE),
{ok, ChannelPid}.
-spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index c0cb9c57d5..6f0b0a5ea5 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -924,6 +924,7 @@ create_channel(Channel,
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
Protocol, User, VHost, Capabilities, Collector}),
+ _ = rabbit_channel:source(ChPid, ?MODULE),
MRef = erlang:monitor(process, ChPid),
put({ch_pid, ChPid}, {Channel, MRef}),
put({channel, Channel}, {ChPid, AState}),