diff options
| author | Ayanda Dube <ayanda.dube@erlang-solutions.com> | 2019-02-18 11:32:13 +0100 |
|---|---|---|
| committer | Ayanda Dube <ayanda.dube@erlang-solutions.com> | 2019-02-18 11:47:10 +0100 |
| commit | f71ace363e55e9aca922c00831b49d420217b439 (patch) | |
| tree | a532b59d8b5e58564e5bdbeeabb3950a1178ef7d | |
| parent | 2a80aa97ac361b11679af6c755807c68ad8b7b2a (diff) | |
| download | rabbitmq-server-git-f71ace363e55e9aca922c00831b49d420217b439.tar.gz | |
store and retrieve channel source from state
| -rw-r--r-- | src/rabbit_channel.erl | 110 | ||||
| -rw-r--r-- | test/channel_source_SUITE.erl | 55 |
2 files changed, 85 insertions, 80 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index dcb4befee6..8f5f159f88 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -79,7 +79,7 @@ -export([list_queue_states/1, get_max_message_size/0]). %% Mgmt HTTP API refactor --export([handle_method/5]). +-export([handle_method/6]). -record(ch, { %% starting | running | flow | closing @@ -98,6 +98,9 @@ %% same as reader's name, see #v1.name %% in rabbit_reader conn_name, + %% channel's originating source e.g. rabbit_reader | rabbit_direct | undefined + %% or any other channel creating/spawning entity + source, %% limiter pid, see rabbit_limiter limiter, %% none | {Msgs, Acks} | committing | failed | @@ -816,9 +819,8 @@ handle_info(queue_cleanup, State = #ch{queue_states = QueueStates0}) -> end, QueueStates0), noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})); -handle_info({channel_source, Source}, State) -> - put(channel_source, Source), - noreply(State). +handle_info({channel_source, Source}, State = #ch{}) -> + noreply(State#ch{source = Source}). handle_pre_hibernate(State) -> ok = clear_permission_cache(), @@ -951,11 +953,11 @@ check_write_permitted(Resource, User) -> check_read_permitted(Resource, User) -> check_resource_access(User, Resource, read). -check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey) -> - check_topic_authorisation(Resource, User, ConnPid, RoutingKey, write). +check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) -> + check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, write). -check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey) -> - check_topic_authorisation(Resource, User, ConnPid, RoutingKey, read). +check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) -> + check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, read). check_user_id_header(#'P_basic'{user_id = undefined}, _) -> ok; @@ -991,14 +993,17 @@ check_internal_exchange(_) -> ok. check_topic_authorisation(Resource = #exchange{type = topic}, - User, none, RoutingKey, Permission) -> + User, none, RoutingKey, _ChSrc, Permission) -> %% Called from outside the channel by mgmt API AmqpParams = [], check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission); check_topic_authorisation(Resource = #exchange{type = topic}, - User, ConnPid, RoutingKey, Permission) when is_pid(ConnPid) -> - AmqpParams = get_amqp_params(ConnPid, get(channel_source)), + User, ConnPid, RoutingKey, ChSrc, Permission) when is_pid(ConnPid) -> + AmqpParams = get_amqp_params(ConnPid, ChSrc), check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission); +check_topic_authorisation(_, _, _, _, _, _) -> + ok. + check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic}, User = #user{username = Username}, AmqpParams, RoutingKey, Permission) -> @@ -1016,9 +1021,7 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost User, Resource, Permission, Context), CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1), put(topic_permission_cache, [{Resource, Context, Permission} | CacheTail]) - end; -check_topic_authorisation(_, _, _, _, _) -> - ok. + end. get_amqp_params(_ConnPid, rabbit_reader) -> []; get_amqp_params(ConnPid, _Any) when is_pid(ConnPid) -> @@ -1241,13 +1244,14 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, conn_name = ConnName, delivery_flow = Flow, conn_pid = ConnPid, + source = ChSrc, max_message_size = MaxMessageSize}) -> check_msg_size(Content, MaxMessageSize), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, User), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), check_internal_exchange(Exchange), - check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey), + check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = #content {properties = Props} = @@ -1530,76 +1534,85 @@ handle_method(#'exchange.declare'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, user = User, queue_collector_pid = CollectorPid, - conn_pid = ConnPid}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + conn_pid = ConnPid, + source = ChSrc}) -> + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.delete'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, virtual_host = VHostPath, queue_collector_pid = CollectorPid, user = User}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.delete_ok'{}); handle_method(#'exchange.bind'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid, + source = ChSrc, queue_collector_pid = CollectorPid, user = User}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.bind_ok'{}); handle_method(#'exchange.unbind'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid, + source = ChSrc, queue_collector_pid = CollectorPid, user = User}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.unbind_ok'{}); handle_method(#'queue.declare'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid, + source = ChSrc, queue_collector_pid = CollectorPid, user = User}) -> {ok, QueueName, MessageCount, ConsumerCount} = - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); handle_method(#'queue.delete'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, virtual_host = VHostPath, queue_collector_pid = CollectorPid, user = User}) -> {ok, PurgedMessageCount} = - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'queue.delete_ok'{message_count = PurgedMessageCount}); handle_method(#'queue.bind'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, user = User, queue_collector_pid = CollectorPid, virtual_host = VHostPath}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'queue.bind_ok'{}); handle_method(#'queue.unbind'{} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, user = User, queue_collector_pid = CollectorPid, virtual_host = VHostPath}) -> - handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, false, #'queue.unbind_ok'{}); handle_method(#'queue.purge'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, + source = ChSrc, user = User, queue_collector_pid = CollectorPid, virtual_host = VHostPath}) -> - case handle_method(Method, ConnPid, CollectorPid, + case handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User) of {ok, PurgedMessageCount} -> return_ok(State, NoWait, @@ -1825,7 +1838,7 @@ handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QRef, DQ)}. binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, - RoutingKey, Arguments, VHostPath, ConnPid, + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, #user{username = Username} = User) -> ExchangeNameBin = strip_cr_lf(SourceNameBin0), DestinationNameBin = strip_cr_lf(DestinationNameBin0), @@ -1834,14 +1847,11 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), [check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]], check_read_permitted(ExchangeName, User), - ExchangeLookup = rabbit_exchange:lookup(ExchangeName), - case ExchangeLookup of + case rabbit_exchange:lookup(ExchangeName) of {error, not_found} -> - %% no-op - ExchangeLookup; + ok; {ok, Exchange} -> - check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey), - ExchangeLookup + check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc) end, case Fun(#binding{source = ExchangeName, destination = DestinationName, @@ -2240,6 +2250,7 @@ i(user_who_performed_action, Ch) -> i(user, Ch); i(vhost, #ch{virtual_host = VHost}) -> VHost; i(transactional, #ch{tx = Tx}) -> Tx =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; +i(source, #ch{source = ChSrc}) -> ChSrc; i(name, State) -> name(State); i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM); i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC); @@ -2260,7 +2271,6 @@ i(garbage_collection, _State) -> i(reductions, _State) -> {reductions, Reductions} = erlang:process_info(self(), reductions), Reductions; -i(channel_source, _State = #ch{}) -> get(channel_source); i(Item, _) -> throw({bad_argument, Item}). @@ -2313,39 +2323,39 @@ handle_method(#'exchange.bind'{destination = DestinationNameBin, source = SourceNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, _CollectorId, VHostPath, User) -> + ConnPid, ChSrc, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:add/3, SourceNameBin, exchange, DestinationNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, User); + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); handle_method(#'exchange.unbind'{destination = DestinationNameBin, source = SourceNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, _CollectorId, VHostPath, User) -> + ConnPid, ChSrc, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:remove/3, SourceNameBin, exchange, DestinationNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, User); + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, _CollectorId, VHostPath, User) -> + ConnPid, ChSrc, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:remove/3, ExchangeNameBin, queue, QueueNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, User); + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); handle_method(#'queue.bind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, _CollectorId, VHostPath, User) -> + ConnPid, ChSrc, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:add/3, ExchangeNameBin, queue, QueueNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, User); + RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); %% Note that all declares to these are effectively passive. If it %% exists it by definition has one consumer. handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to", _/binary>> = QueueNameBin}, - _ConnPid, _CollectorPid, VHost, _User) -> + _ConnPid, _ChSrc, _CollectorPid, VHost, _User) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin), case declare_fast_reply_to(StrippedQueueNameBin) of @@ -2359,7 +2369,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, auto_delete = AutoDelete, nowait = NoWait, arguments = Args} = Declare, - ConnPid, CollectorPid, VHostPath, + ConnPid, ChSrc, CollectorPid, VHostPath, #user{username = Username} = User) -> Owner = case ExclusiveDeclare of true -> ConnPid; @@ -2418,7 +2428,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, {existing, _Q} -> %% must have been created between the stat and the %% declare. Loop around again. - handle_method(Declare, ConnPid, CollectorPid, VHostPath, + handle_method(Declare, ConnPid, ChSrc, CollectorPid, VHostPath, User); {absent, Q, Reason} -> rabbit_amqqueue:absent(Q, Reason); @@ -2434,7 +2444,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, handle_method(#'queue.declare'{queue = QueueNameBin, nowait = NoWait, passive = true}, - ConnPid, _CollectorPid, VHostPath, _User) -> + ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin), Fun = fun (Q0) -> @@ -2448,7 +2458,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, if_empty = IfEmpty}, - ConnPid, _CollectorPid, VHostPath, + ConnPid, _ChSrc, _CollectorPid, VHostPath, User = #user{username = Username}) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath), @@ -2477,7 +2487,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, end; handle_method(#'exchange.delete'{exchange = ExchangeNameBin, if_unused = IfUnused}, - _ConnPid, _CollectorPid, VHostPath, + _ConnPid, _ChSrc, _CollectorPid, VHostPath, User = #user{username = Username}) -> StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin), @@ -2493,7 +2503,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, ok end; handle_method(#'queue.purge'{queue = QueueNameBin}, - ConnPid, _CollectorPid, VHostPath, User) -> + ConnPid, _ChSrc, _CollectorPid, VHostPath, User) -> QueueName = qbin_to_resource(QueueNameBin, VHostPath), check_read_permitted(QueueName, User), rabbit_amqqueue:with_exclusive_access_or_die( @@ -2506,7 +2516,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, auto_delete = AutoDelete, internal = Internal, arguments = Args}, - _ConnPid, _CollectorPid, VHostPath, + _ConnPid, _ChSrc, _CollectorPid, VHostPath, #user{username = Username} = User) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), @@ -2539,7 +2549,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, AutoDelete, Internal, Args); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, passive = true}, - _ConnPid, _CollectorPid, VHostPath, _User) -> + _ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), check_not_default_exchange(ExchangeName), _ = rabbit_exchange:lookup_or_die(ExchangeName). diff --git a/test/channel_source_SUITE.erl b/test/channel_source_SUITE.erl index 11f87c7fde..a18c2474b7 100644 --- a/test/channel_source_SUITE.erl +++ b/test/channel_source_SUITE.erl @@ -16,7 +16,6 @@ -module(channel_source_SUITE). --include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). -compile(export_all). @@ -29,9 +28,9 @@ all() -> groups() -> [ {non_parallel_tests, [], [ - network_channel_source_notifications, - direct_channel_source_notifications, - undefined_channel_source_notifications + network_rabbit_reader_channel_source, + direct_channel_source, + undefined_channel_source ]} ]. @@ -71,53 +70,49 @@ end_per_testcase(Testcase, Config) -> %% Testcases. %% ------------------------------------------------------------------- -network_channel_source_notifications(Config) -> +network_rabbit_reader_channel_source(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, network_channel_source_notifications1, [Config]). + ?MODULE, network_rabbit_reader_channel_source1, [Config]). -network_channel_source_notifications1(Config) -> +network_rabbit_reader_channel_source1(Config) -> ExistingChannels = rabbit_channel:list(), Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), - {ok, _ClientCh} = amqp_connection:open_channel(Conn), + {ok, ClientCh} = amqp_connection:open_channel(Conn), [ServerCh] = rabbit_channel:list() -- ExistingChannels, - [{channel_source, rabbit_reader}] = - rabbit_channel:info(ServerCh, [channel_source]), - rabbit_channel:source(ServerCh, ?MODULE), - [{channel_source, ?MODULE}] = - rabbit_channel:info(ServerCh, [channel_source]), + [{source, rabbit_reader}] = rabbit_channel:info(ServerCh, [source]), + _ = rabbit_channel:source(ServerCh, ?MODULE), + [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]), + amqp_channel:close(ClientCh), amqp_connection:close(Conn), {error, channel_terminated} = rabbit_channel:source(ServerCh, ?MODULE), passed. -direct_channel_source_notifications(Config) -> +direct_channel_source(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, direct_channel_source_notifications1, [Config]). + ?MODULE, direct_channel_source1, [Config]). -direct_channel_source_notifications1(Config) -> +direct_channel_source1(Config) -> ExistingChannels = rabbit_channel:list(), Conn = rabbit_ct_client_helpers:open_unmanaged_connection_direct(Config), - {ok, _ClientCh} = amqp_connection:open_channel(Conn), + {ok, ClientCh} = amqp_connection:open_channel(Conn), [ServerCh] = rabbit_channel:list() -- ExistingChannels, - [{channel_source, rabbit_direct}] = - rabbit_channel:info(ServerCh, [channel_source]), - rabbit_channel:source(ServerCh, ?MODULE), - [{channel_source, ?MODULE}] = - rabbit_channel:info(ServerCh, [channel_source]), + [{source, rabbit_direct}] = rabbit_channel:info(ServerCh, [source]), + _ = rabbit_channel:source(ServerCh, ?MODULE), + [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]), + amqp_channel:close(ClientCh), amqp_connection:close(Conn), {error, channel_terminated} = rabbit_channel:source(ServerCh, ?MODULE), passed. -undefined_channel_source_notifications(Config) -> +undefined_channel_source(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, undefined_channel_source_notifications1, [Config]). + ?MODULE, undefined_channel_source1, [Config]). -undefined_channel_source_notifications1(_Config) -> +undefined_channel_source1(_Config) -> ExistingChannels = rabbit_channel:list(), {_Writer, _Limiter, ServerCh} = rabbit_ct_broker_helpers:test_channel(), [ServerCh] = rabbit_channel:list() -- ExistingChannels, - [{channel_source, undefined}] = - rabbit_channel:info(ServerCh, [channel_source]), - rabbit_channel:source(ServerCh, ?MODULE), - [{channel_source, ?MODULE}] = - rabbit_channel:info(ServerCh, [channel_source]), + [{source, undefined}] = rabbit_channel:info(ServerCh, [source]), + _ = rabbit_channel:source(ServerCh, ?MODULE), + [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]), passed. |
