diff options
| author | Michael Klishin <michael@novemberain.com> | 2019-02-19 15:05:20 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-02-19 15:05:20 +0300 |
| commit | c5df441e8130d33683082e165b3ff993f1580ee0 (patch) | |
| tree | 872556de9386ab4148291663cd303ce3a0032ade /src | |
| parent | d680193af3d43b04e15ce66abe1ec895f2562c77 (diff) | |
| parent | f2a01fda8a2079939604a7201659039ad2e501a0 (diff) | |
| download | rabbitmq-server-git-c5df441e8130d33683082e165b3ff993f1580ee0.tar.gz | |
Merge pull request #1886 from Ayanda-D/channel-source
Avoid synchronous channel request to connection process
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 121 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 1 |
3 files changed, 75 insertions, 48 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 036aa9a60c..8f5f159f88 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -63,6 +63,7 @@ -export([refresh_config_local/0, ready_for_close/1]). -export([refresh_interceptors/0]). -export([force_event_refresh/1]). +-export([source/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/4, @@ -78,7 +79,7 @@ -export([list_queue_states/1, get_max_message_size/0]). %% Mgmt HTTP API refactor --export([handle_method/5]). +-export([handle_method/6]). -record(ch, { %% starting | running | flow | closing @@ -97,6 +98,9 @@ %% same as reader's name, see #v1.name %% in rabbit_reader conn_name, + %% channel's originating source e.g. rabbit_reader | rabbit_direct | undefined + %% or any other channel creating/spawning entity + source, %% limiter pid, see rabbit_limiter limiter, %% none | {Msgs, Acks} | committing | failed | @@ -448,6 +452,14 @@ force_event_refresh(Ref) -> list_queue_states(Pid) -> gen_server2:call(Pid, list_queue_states). +-spec source(pid(), any()) -> any(). + +source(Pid, Source) when is_pid(Pid) -> + case erlang:is_process_alive(Pid) of + true -> Pid ! {channel_source, Source}; + false -> {error, channel_terminated} + end. + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, @@ -805,7 +817,10 @@ handle_info(queue_cleanup, State = #ch{queue_states = QueueStates0}) -> QName = rabbit_quorum_queue:queue_name(QS), [] /= rabbit_amqqueue:lookup(QName) end, QueueStates0), - noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})). + noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})); + +handle_info({channel_source, Source}, State = #ch{}) -> + noreply(State#ch{source = Source}). handle_pre_hibernate(State) -> ok = clear_permission_cache(), @@ -938,11 +953,11 @@ check_write_permitted(Resource, User) -> check_read_permitted(Resource, User) -> check_resource_access(User, Resource, read). -check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey) -> - check_topic_authorisation(Resource, User, ConnPid, RoutingKey, write). +check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) -> + check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, write). -check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey) -> - check_topic_authorisation(Resource, User, ConnPid, RoutingKey, read). +check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) -> + check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, read). check_user_id_header(#'P_basic'{user_id = undefined}, _) -> ok; @@ -978,14 +993,17 @@ check_internal_exchange(_) -> ok. check_topic_authorisation(Resource = #exchange{type = topic}, - User, none, RoutingKey, Permission) -> + User, none, RoutingKey, _ChSrc, Permission) -> %% Called from outside the channel by mgmt API AmqpParams = [], check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission); check_topic_authorisation(Resource = #exchange{type = topic}, - User, ConnPid, RoutingKey, Permission) when is_pid(ConnPid) -> - AmqpParams = get_amqp_params(ConnPid), + User, ConnPid, RoutingKey, ChSrc, Permission) when is_pid(ConnPid) -> + AmqpParams = get_amqp_params(ConnPid, ChSrc), check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission); +check_topic_authorisation(_, _, _, _, _, _) -> + ok. + check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic}, User = #user{username = Username}, AmqpParams, RoutingKey, Permission) -> @@ -1003,11 +1021,10 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost User, Resource, Permission, Context), CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1), put(topic_permission_cache, [{Resource, Context, Permission} | CacheTail]) - end; -check_topic_authorisation(_, _, _, _, _) -> - ok. + end. -get_amqp_params(ConnPid) when is_pid(ConnPid) -> +get_amqp_params(_ConnPid, rabbit_reader) -> []; +get_amqp_params(ConnPid, _Any) when is_pid(ConnPid) -> Timeout = get_operation_timeout(), get_amqp_params(ConnPid, rabbit_misc:is_process_alive(ConnPid), Timeout). @@ -1227,13 +1244,14 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, conn_name = ConnName, delivery_flow = Flow, conn_pid = ConnPid, + source = ChSrc, max_message_size = MaxMessageSize}) -> check_msg_size(Content, MaxMessageSize), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, User), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), check_internal_exchange(Exchange), - check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey), + check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = #content {properties = Props} = @@ -1516,76 +1534,85 @@ handle_method(#'exchange.declare'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, user = User, queue_collector_pid = CollectorPid, - conn_pid = ConnPid}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + conn_pid = ConnPid, + source = ChSrc}) -> + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.delete'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, virtual_host = VHostPath, queue_collector_pid = CollectorPid, user = User}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.delete_ok'{}); handle_method(#'exchange.bind'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid, + source = ChSrc, queue_collector_pid = CollectorPid, user = User}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.bind_ok'{}); handle_method(#'exchange.unbind'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid, + source = ChSrc, queue_collector_pid = CollectorPid, user = User}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.unbind_ok'{}); handle_method(#'queue.declare'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid, + source = ChSrc, queue_collector_pid = CollectorPid, user = User}) -> {ok, QueueName, MessageCount, ConsumerCount} = - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); handle_method(#'queue.delete'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, virtual_host = VHostPath, queue_collector_pid = CollectorPid, user = User}) -> {ok, PurgedMessageCount} = - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'queue.delete_ok'{message_count = PurgedMessageCount}); handle_method(#'queue.bind'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, user = User, queue_collector_pid = CollectorPid, virtual_host = VHostPath}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'queue.bind_ok'{}); handle_method(#'queue.unbind'{} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, user = User, queue_collector_pid = CollectorPid, virtual_host = VHostPath}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, false, #'queue.unbind_ok'{}); handle_method(#'queue.purge'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, user = User, queue_collector_pid = CollectorPid, virtual_host = VHostPath}) -> - case handle_method(Method, ConnPid, CollectorPid, + case handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User) of {ok, PurgedMessageCount} -> return_ok(State, NoWait, @@ -1811,7 +1838,7 @@ handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QRef, DQ)}. binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, - RoutingKey, Arguments, VHostPath, ConnPid, + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, #user{username = Username} = User) -> ExchangeNameBin = strip_cr_lf(SourceNameBin0), DestinationNameBin = strip_cr_lf(DestinationNameBin0), @@ -1820,14 +1847,11 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), [check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]], check_read_permitted(ExchangeName, User), - ExchangeLookup = rabbit_exchange:lookup(ExchangeName), - case ExchangeLookup of + case rabbit_exchange:lookup(ExchangeName) of {error, not_found} -> - %% no-op - ExchangeLookup; + ok; {ok, Exchange} -> - check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey), - ExchangeLookup + check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc) end, case Fun(#binding{source = ExchangeName, destination = DestinationName, @@ -2226,6 +2250,7 @@ i(user_who_performed_action, Ch) -> i(user, Ch); i(vhost, #ch{virtual_host = VHost}) -> VHost; i(transactional, #ch{tx = Tx}) -> Tx =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; +i(source, #ch{source = ChSrc}) -> ChSrc; i(name, State) -> name(State); i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM); i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC); @@ -2298,39 +2323,39 @@ handle_method(#'exchange.bind'{destination = DestinationNameBin, source = SourceNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, _CollectorId, VHostPath, User) -> + ConnPid, ChSrc, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:add/3, SourceNameBin, exchange, DestinationNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, User); + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); handle_method(#'exchange.unbind'{destination = DestinationNameBin, source = SourceNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, _CollectorId, VHostPath, User) -> + ConnPid, ChSrc, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:remove/3, SourceNameBin, exchange, DestinationNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, User); + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, _CollectorId, VHostPath, User) -> + ConnPid, ChSrc, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:remove/3, ExchangeNameBin, queue, QueueNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, User); + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); handle_method(#'queue.bind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, _CollectorId, VHostPath, User) -> + ConnPid, ChSrc, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:add/3, ExchangeNameBin, queue, QueueNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, User); + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); %% Note that all declares to these are effectively passive. If it %% exists it by definition has one consumer. handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to", _/binary>> = QueueNameBin}, - _ConnPid, _CollectorPid, VHost, _User) -> + _ConnPid, _ChSrc, _CollectorPid, VHost, _User) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin), case declare_fast_reply_to(StrippedQueueNameBin) of @@ -2344,7 +2369,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, auto_delete = AutoDelete, nowait = NoWait, arguments = Args} = Declare, - ConnPid, CollectorPid, VHostPath, + ConnPid, ChSrc, CollectorPid, VHostPath, #user{username = Username} = User) -> Owner = case ExclusiveDeclare of true -> ConnPid; @@ -2403,7 +2428,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, {existing, _Q} -> %% must have been created between the stat and the %% declare. Loop around again. - handle_method(Declare, ConnPid, CollectorPid, VHostPath, + handle_method(Declare, ConnPid, ChSrc, CollectorPid, VHostPath, User); {absent, Q, Reason} -> rabbit_amqqueue:absent(Q, Reason); @@ -2419,7 +2444,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, handle_method(#'queue.declare'{queue = QueueNameBin, nowait = NoWait, passive = true}, - ConnPid, _CollectorPid, VHostPath, _User) -> + ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin), Fun = fun (Q0) -> @@ -2433,7 +2458,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, if_empty = IfEmpty}, - ConnPid, _CollectorPid, VHostPath, + ConnPid, _ChSrc, _CollectorPid, VHostPath, User = #user{username = Username}) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath), @@ -2462,7 +2487,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, end; handle_method(#'exchange.delete'{exchange = ExchangeNameBin, if_unused = IfUnused}, - _ConnPid, _CollectorPid, VHostPath, + _ConnPid, _ChSrc, _CollectorPid, VHostPath, User = #user{username = Username}) -> StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin), @@ -2478,7 +2503,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, ok end; handle_method(#'queue.purge'{queue = QueueNameBin}, - ConnPid, _CollectorPid, VHostPath, User) -> + ConnPid, _ChSrc, _CollectorPid, VHostPath, User) -> QueueName = qbin_to_resource(QueueNameBin, VHostPath), check_read_permitted(QueueName, User), rabbit_amqqueue:with_exclusive_access_or_die( @@ -2491,7 +2516,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, auto_delete = AutoDelete, internal = Internal, arguments = Args}, - _ConnPid, _CollectorPid, VHostPath, + _ConnPid, _ChSrc, _CollectorPid, VHostPath, #user{username = Username} = User) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), @@ -2524,7 +2549,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, AutoDelete, Internal, Args); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, passive = true}, - _ConnPid, _CollectorPid, VHostPath, _User) -> + _ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), check_not_default_exchange(ExchangeName), _ = rabbit_exchange:lookup_or_die(ExchangeName). diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 50e8f3d2b0..e43bfba90c 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -211,6 +211,7 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, rabbit_direct_client_sup, [{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector}]), + _ = rabbit_channel:source(ChannelPid, ?MODULE), {ok, ChannelPid}. -spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index c0cb9c57d5..6f0b0a5ea5 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -924,6 +924,7 @@ create_channel(Channel, rabbit_channel_sup_sup:start_channel( ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, Protocol, User, VHost, Capabilities, Collector}), + _ = rabbit_channel:source(ChPid, ?MODULE), MRef = erlang:monitor(process, ChPid), put({ch_pid, ChPid}, {Channel, MRef}), put({channel, Channel}, {ChPid, AState}), |
