diff options
| -rw-r--r-- | src/rabbit_channel.erl | 218 | ||||
| -rw-r--r-- | src/rabbit_channel_sup.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 1 | ||||
| -rw-r--r-- | test/channel_source_SUITE.erl | 164 |
5 files changed, 99 insertions, 299 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 2148a435b6..37ac409985 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -55,7 +55,7 @@ -behaviour(gen_server2). --export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]). +-export([start_link/12, do/2, do/3, do_flow/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, deliver_reply/2, send_credit_reply/2, send_drained/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1, @@ -63,7 +63,7 @@ -export([refresh_config_local/0, ready_for_close/1]). -export([refresh_interceptors/0]). -export([force_event_refresh/1]). --export([source/2, update_user_state/2]). +-export([update_user_state/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1, @@ -121,7 +121,8 @@ consumer_prefetch, %% Message content size limit max_message_size, - consumer_timeout + consumer_timeout, + authz_context }). -record(ch, {cfg :: #conf{}, @@ -239,14 +240,14 @@ -spec start_link (channel_number(), pid(), pid(), pid(), string(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), - pid(), pid()) -> + pid(), pid(), any()) -> rabbit_types:ok_pid_or_error(). start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, - VHost, Capabilities, CollectorPid, Limiter) -> + VHost, Capabilities, CollectorPid, Limiter, AmqpParams) -> gen_server2:start_link( ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, - User, VHost, Capabilities, CollectorPid, Limiter], []). + User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams], []). -spec do(pid(), rabbit_framing:amqp_method_record()) -> 'ok'. @@ -460,15 +461,6 @@ force_event_refresh(Ref) -> list_queue_states(Pid) -> gen_server2:call(Pid, list_queue_states). --spec source(pid(), any()) -> 'ok' | {error, channel_terminated}. - -source(Pid, Source) when is_pid(Pid) -> - case erlang:is_process_alive(Pid) of - true -> Pid ! {channel_source, Source}, - ok; - false -> {error, channel_terminated} - end. - -spec update_user_state(pid(), rabbit_types:auth_user()) -> 'ok' | {error, channel_terminated}. update_user_state(Pid, UserState) when is_pid(Pid) -> @@ -481,7 +473,7 @@ update_user_state(Pid, UserState) when is_pid(Pid) -> %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, - Capabilities, CollectorPid, LimiterPid]) -> + Capabilities, CollectorPid, LimiterPid, AmqpParams]) -> process_flag(trap_exit, true), ?LG_PROCESS_TYPE(channel), ?store_proc_name({ConnName, Channel}), @@ -504,6 +496,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, put(permission_cache_can_expire, rabbit_access_control:permission_cache_can_expire(User)), MaxMessageSize = get_max_message_size(), ConsumerTimeout = get_consumer_timeout(), + OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams), State = #ch{cfg = #conf{state = starting, protocol = Protocol, channel = Channel, @@ -519,7 +512,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, trace_state = rabbit_trace:init(VHost), consumer_prefetch = Prefetch, max_message_size = MaxMessageSize, - consumer_timeout = ConsumerTimeout + consumer_timeout = ConsumerTimeout, + authz_context = OptionalVariables }, limiter = Limiter, tx = none, @@ -874,8 +868,6 @@ handle_info(tick, State0 = #ch{queue_states = QueueStates0}) -> Return -> Return end; -handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) -> - noreply(State#ch{cfg = Cfg#conf{source = Source}}); handle_info({update_user_state, User}, State = #ch{cfg = Cfg}) -> noreply(State#ch{cfg = Cfg#conf{user = User}}). @@ -1027,11 +1019,11 @@ check_write_permitted(Resource, User, Context) -> check_read_permitted(Resource, User, Context) -> check_resource_access(User, Resource, read, Context). -check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) -> - check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, write). +check_write_permitted_on_topic(Resource, User, RoutingKey, AuthzContext) -> + check_topic_authorisation(Resource, User, RoutingKey, AuthzContext, write). -check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) -> - check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, read). +check_read_permitted_on_topic(Resource, User, RoutingKey, AuthzContext) -> + check_topic_authorisation(Resource, User, RoutingKey, AuthzContext, read). check_user_id_header(#'P_basic'{user_id = undefined}, _) -> ok; @@ -1066,23 +1058,11 @@ check_internal_exchange(#exchange{name = Name, internal = true}) -> check_internal_exchange(_) -> ok. -check_topic_authorisation(Resource = #exchange{type = topic}, - User, none, RoutingKey, _ChSrc, Permission) -> - %% Called from outside the channel by mgmt API - AmqpParams = [], - check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission); -check_topic_authorisation(Resource = #exchange{type = topic}, - User, ConnPid, RoutingKey, ChSrc, Permission) when is_pid(ConnPid) -> - AmqpParams = get_amqp_params(ConnPid, ChSrc), - check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission); -check_topic_authorisation(_, _, _, _, _, _) -> - ok. - check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic}, User = #user{username = Username}, - AmqpParams, RoutingKey, Permission) -> + RoutingKey, AuthzContext, Permission) -> Resource = Name#resource{kind = topic}, - VariableMap = build_topic_variable_map(AmqpParams, VHost, Username), + VariableMap = build_topic_variable_map(AuthzContext, VHost, Username), Context = #{routing_key => RoutingKey, variable_map => VariableMap}, Cache = case get(topic_permission_cache) of @@ -1095,35 +1075,27 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost User, Resource, Permission, Context), CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1), put(topic_permission_cache, [{Resource, Context, Permission} | CacheTail]) - end. + end; +check_topic_authorisation(_, _, _, _, _) -> + ok. -get_amqp_params(_ConnPid, rabbit_reader) -> []; -get_amqp_params(ConnPid, _Any) when is_pid(ConnPid) -> - Timeout = get_operation_timeout(), - get_amqp_params(ConnPid, rabbit_misc:is_process_alive(ConnPid), Timeout); -get_amqp_params(_, _) -> []. - -get_amqp_params(ConnPid, false, _Timeout) -> - %% Connection process is dead - rabbit_log_channel:debug("file ~p, line ~p - connection process not alive: ~p~n", - [?FILE, ?LINE, ConnPid]), - []; -get_amqp_params(ConnPid, true, Timeout) -> - rabbit_amqp_connection:amqp_params(ConnPid, Timeout). - -build_topic_variable_map(AmqpParams, VHost, Username) -> - VariableFromAmqpParams = extract_variable_map_from_amqp_params(AmqpParams), - maps:merge(VariableFromAmqpParams, #{<<"vhost">> => VHost, <<"username">> => Username}). - -extract_authz_context(ConnPid, ChSrc) -> - extract_variable_map_from_amqp_params(get_amqp_params(ConnPid, ChSrc)). - -%% use tuple representation of amqp_params to avoid coupling. -%% get variable map only from amqp_params_direct, not amqp_params_network. -%% amqp_params_direct are usually used from plugins (e.g. MQTT, STOMP) -extract_variable_map_from_amqp_params([{amqp_params, {amqp_params_direct, _, _, _, _, - {amqp_adapter_info, _,_,_,_,_,_,AdditionalInfo}, _}}]) -> + +build_topic_variable_map(AuthzContext, VHost, Username) when is_map(AuthzContext) -> + maps:merge(AuthzContext, #{<<"vhost">> => VHost, <<"username">> => Username}); +build_topic_variable_map(AuthzContext, VHost, Username) -> + maps:merge(extract_variable_map_from_amqp_params(AuthzContext), #{<<"vhost">> => VHost, <<"username">> => Username}). + +%% Use tuple representation of amqp_params to avoid a dependency on amqp_client. +%% Extracts variable map only from amqp_params_direct, not amqp_params_network. +%% amqp_params_direct records are usually used by plugins (e.g. MQTT, STOMP) +extract_variable_map_from_amqp_params({amqp_params, {amqp_params_direct, _, _, _, _, + {amqp_adapter_info, _,_,_,_,_,_,AdditionalInfo}, _}}) -> proplists:get_value(variable_map, AdditionalInfo, #{}); +extract_variable_map_from_amqp_params({amqp_params_direct, _, _, _, _, + {amqp_adapter_info, _,_,_,_,_,_,AdditionalInfo}, _}) -> + proplists:get_value(variable_map, AdditionalInfo, #{}); +extract_variable_map_from_amqp_params([Value]) -> + extract_variable_map_from_amqp_params(Value); extract_variable_map_from_amqp_params(_) -> #{}. @@ -1317,13 +1289,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory}, Content, State = #ch{cfg = #conf{channel = ChannelNum, - conn_pid = ConnPid, - source = ChSrc, conn_name = ConnName, virtual_host = VHostPath, user = #user{username = Username} = User, trace_state = TraceState, - max_message_size = MaxMessageSize + max_message_size = MaxMessageSize, + authz_context = AuthzContext }, tx = Tx, confirm_enabled = ConfirmEnabled, @@ -1331,10 +1302,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, }) -> check_msg_size(Content, MaxMessageSize), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_write_permitted(ExchangeName, User, extract_authz_context(ConnPid, ChSrc)), + check_write_permitted(ExchangeName, User, AuthzContext), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), check_internal_exchange(Exchange), - check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc), + check_write_permitted_on_topic(Exchange, User, RoutingKey, AuthzContext), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = #content {properties = Props} = @@ -1387,13 +1358,13 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, conn_pid = ConnPid, user = User, virtual_host = VHostPath, - source = ChSrc + authz_context = AuthzContext }, limiter = Limiter, next_tag = DeliveryTag, queue_states = QueueStates0}) -> QueueName = qbin_to_resource(QueueNameBin, VHostPath), - check_read_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)), + check_read_permitted(QueueName, User, AuthzContext), case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, %% Use the delivery tag as consumer tag for quorum queues @@ -1474,14 +1445,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin, _, State = #ch{cfg = #conf{consumer_prefetch = ConsumerPrefetch, user = User, virtual_host = VHostPath, - conn_pid = ConnPid, - source = ChSrc}, + authz_context = AuthzContext}, consumer_mapping = ConsumerMapping }) -> case maps:find(ConsumerTag, ConsumerMapping) of error -> QueueName = qbin_to_resource(QueueNameBin, VHostPath), - check_read_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)), + check_read_permitted(QueueName, User, AuthzContext), ActualConsumerTag = case ConsumerTag of <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), @@ -1625,84 +1595,84 @@ handle_method(#'exchange.declare'{nowait = NoWait} = Method, user = User, queue_collector_pid = CollectorPid, conn_pid = ConnPid, - source = ChSrc}}) -> - handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), + authz_context = AuthzContext}}) -> + handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.delete'{nowait = NoWait} = Method, _, State = #ch{cfg = #conf{conn_pid = ConnPid, - source = ChSrc, + authz_context = AuthzContext, virtual_host = VHostPath, queue_collector_pid = CollectorPid, user = User}}) -> - handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.delete_ok'{}); handle_method(#'exchange.bind'{nowait = NoWait} = Method, _, State = #ch{cfg = #conf{virtual_host = VHostPath, conn_pid = ConnPid, - source = ChSrc, + authz_context = AuthzContext, queue_collector_pid = CollectorPid, user = User}}) -> - handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.bind_ok'{}); handle_method(#'exchange.unbind'{nowait = NoWait} = Method, _, State = #ch{cfg = #conf{virtual_host = VHostPath, conn_pid = ConnPid, - source = ChSrc, + authz_context = AuthzContext, queue_collector_pid = CollectorPid, user = User}}) -> - handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.unbind_ok'{}); handle_method(#'queue.declare'{nowait = NoWait} = Method, _, State = #ch{cfg = #conf{virtual_host = VHostPath, conn_pid = ConnPid, - source = ChSrc, + authz_context = AuthzContext, queue_collector_pid = CollectorPid, user = User}}) -> {ok, QueueName, MessageCount, ConsumerCount} = - handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User), return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); handle_method(#'queue.delete'{nowait = NoWait} = Method, _, State = #ch{cfg = #conf{conn_pid = ConnPid, - source = ChSrc, + authz_context = AuthzContext, virtual_host = VHostPath, queue_collector_pid = CollectorPid, user = User}}) -> {ok, PurgedMessageCount} = - handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'queue.delete_ok'{message_count = PurgedMessageCount}); handle_method(#'queue.bind'{nowait = NoWait} = Method, _, State = #ch{cfg = #conf{conn_pid = ConnPid, - source = ChSrc, + authz_context = AuthzContext, user = User, queue_collector_pid = CollectorPid, virtual_host = VHostPath}}) -> - handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'queue.bind_ok'{}); handle_method(#'queue.unbind'{} = Method, _, State = #ch{cfg = #conf{conn_pid = ConnPid, - source = ChSrc, + authz_context = AuthzContext, user = User, queue_collector_pid = CollectorPid, virtual_host = VHostPath}}) -> - handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User), return_ok(State, false, #'queue.unbind_ok'{}); handle_method(#'queue.purge'{nowait = NoWait} = Method, _, State = #ch{cfg = #conf{conn_pid = ConnPid, - source = ChSrc, + authz_context = AuthzContext, user = User, queue_collector_pid = CollectorPid, virtual_host = VHostPath}}) -> - case handle_method(Method, ConnPid, ChSrc, CollectorPid, + case handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User) of {ok, PurgedMessageCount} -> return_ok(State, NoWait, @@ -1948,21 +1918,20 @@ handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QRef, DQ)}. binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, - RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, + RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, #user{username = Username} = User) -> ExchangeNameBin = strip_cr_lf(SourceNameBin0), DestinationNameBin = strip_cr_lf(DestinationNameBin0), DestinationName = name_to_resource(DestinationType, DestinationNameBin, VHostPath), - AuthContext = extract_authz_context(ConnPid, ChSrc), - check_write_permitted(DestinationName, User, AuthContext), + check_write_permitted(DestinationName, User, AuthzContext), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), [check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]], - check_read_permitted(ExchangeName, User, AuthContext), + check_read_permitted(ExchangeName, User, AuthzContext), case rabbit_exchange:lookup(ExchangeName) of {error, not_found} -> ok; {ok, Exchange} -> - check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc) + check_read_permitted_on_topic(Exchange, User, RoutingKey, AuthzContext) end, case Fun(#binding{source = ExchangeName, destination = DestinationName, @@ -2393,7 +2362,6 @@ i(user_who_performed_action, Ch) -> i(user, Ch); i(vhost, #ch{cfg = #conf{virtual_host = VHost}}) -> VHost; i(transactional, #ch{tx = Tx}) -> Tx =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; -i(source, #ch{cfg = #conf{source = ChSrc}}) -> ChSrc; i(name, State) -> name(State); i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM); i(messages_unconfirmed, #ch{unconfirmed = UC}) -> unconfirmed_messages:size(UC); @@ -2473,39 +2441,39 @@ handle_method(#'exchange.bind'{destination = DestinationNameBin, source = SourceNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, ChSrc, _CollectorId, VHostPath, User) -> + ConnPid, AuthzContext, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:add/3, SourceNameBin, exchange, DestinationNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); + RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User); handle_method(#'exchange.unbind'{destination = DestinationNameBin, source = SourceNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, ChSrc, _CollectorId, VHostPath, User) -> + ConnPid, AuthzContext, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:remove/3, SourceNameBin, exchange, DestinationNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); + RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User); handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, ChSrc, _CollectorId, VHostPath, User) -> + ConnPid, AuthzContext, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:remove/3, ExchangeNameBin, queue, QueueNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); + RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User); handle_method(#'queue.bind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, ChSrc, _CollectorId, VHostPath, User) -> + ConnPid, AuthzContext, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:add/3, ExchangeNameBin, queue, QueueNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); + RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User); %% Note that all declares to these are effectively passive. If it %% exists it by definition has one consumer. handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to", _/binary>> = QueueNameBin}, - _ConnPid, _ChSrc, _CollectorPid, VHost, _User) -> + _ConnPid, _AuthzContext, _CollectorPid, VHost, _User) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin), case declare_fast_reply_to(StrippedQueueNameBin) of @@ -2519,7 +2487,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, auto_delete = AutoDelete, nowait = NoWait, arguments = Args} = Declare, - ConnPid, ChSrc, CollectorPid, VHostPath, + ConnPid, AuthzContext, CollectorPid, VHostPath, #user{username = Username} = User) -> Owner = case ExclusiveDeclare of true -> ConnPid; @@ -2533,7 +2501,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), - check_configure_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)), + check_configure_permitted(QueueName, User, AuthzContext), rabbit_core_metrics:queue_declared(QueueName), case rabbit_amqqueue:with( QueueName, @@ -2555,9 +2523,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin, "invalid type '~s' for arg '~s' in ~s", [Type, DlxKey, rabbit_misc:rs(QueueName)]); DLX -> - AuthContext = extract_authz_context(ConnPid, ChSrc), - check_read_permitted(QueueName, User, AuthContext), - check_write_permitted(DLX, User, AuthContext), + check_read_permitted(QueueName, User, AuthzContext), + check_write_permitted(DLX, User, AuthzContext), ok end, case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, @@ -2579,7 +2546,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, {existing, _Q} -> %% must have been created between the stat and the %% declare. Loop around again. - handle_method(Declare, ConnPid, ChSrc, CollectorPid, VHostPath, + handle_method(Declare, ConnPid, AuthzContext, CollectorPid, VHostPath, User); {absent, Q, Reason} -> rabbit_amqqueue:absent(Q, Reason); @@ -2595,7 +2562,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, handle_method(#'queue.declare'{queue = QueueNameBin, nowait = NoWait, passive = true}, - ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) -> + ConnPid, _AuthzContext, _CollectorPid, VHostPath, _User) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin), Fun = fun (Q0) -> @@ -2609,12 +2576,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin, handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, if_empty = IfEmpty}, - ConnPid, ChSrc, _CollectorPid, VHostPath, + ConnPid, AuthzContext, _CollectorPid, VHostPath, User = #user{username = Username}) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath), - check_configure_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)), + check_configure_permitted(QueueName, User, AuthzContext), case rabbit_amqqueue:with( QueueName, fun (Q) -> @@ -2638,13 +2605,13 @@ handle_method(#'queue.delete'{queue = QueueNameBin, end; handle_method(#'exchange.delete'{exchange = ExchangeNameBin, if_unused = IfUnused}, - ConnPid, ChSrc, _CollectorPid, VHostPath, + _ConnPid, AuthzContext, _CollectorPid, VHostPath, User = #user{username = Username}) -> StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin), check_not_default_exchange(ExchangeName), check_exchange_deletion(ExchangeName), - check_configure_permitted(ExchangeName, User, extract_authz_context(ConnPid, ChSrc)), + check_configure_permitted(ExchangeName, User, AuthzContext), case rabbit_exchange:delete(ExchangeName, IfUnused, Username) of {error, not_found} -> ok; @@ -2654,9 +2621,9 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, ok end; handle_method(#'queue.purge'{queue = QueueNameBin}, - ConnPid, ChSrc, _CollectorPid, VHostPath, User) -> + ConnPid, AuthzContext, _CollectorPid, VHostPath, User) -> QueueName = qbin_to_resource(QueueNameBin, VHostPath), - check_read_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)), + check_read_permitted(QueueName, User, AuthzContext), rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:purge(Q) end); @@ -2667,12 +2634,12 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, auto_delete = AutoDelete, internal = Internal, arguments = Args}, - ConnPid, ChSrc, _CollectorPid, VHostPath, + _ConnPid, AuthzContext, _CollectorPid, VHostPath, #user{username = Username} = User) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), check_not_default_exchange(ExchangeName), - check_configure_permitted(ExchangeName, User, extract_authz_context(ConnPid, ChSrc)), + check_configure_permitted(ExchangeName, User, AuthzContext), X = case rabbit_exchange:lookup(ExchangeName) of {ok, FoundX} -> FoundX; {error, not_found} -> @@ -2684,9 +2651,8 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, precondition_failed( "invalid type '~s' for arg '~s' in ~s", [Type, AeKey, rabbit_misc:rs(ExchangeName)]); - AName -> AuthContext = extract_authz_context(ConnPid, ChSrc), - check_read_permitted(ExchangeName, User, AuthContext), - check_write_permitted(AName, User, AuthContext), + AName -> check_read_permitted(ExchangeName, User, AuthzContext), + check_write_permitted(AName, User, AuthzContext), ok end, rabbit_exchange:declare(ExchangeName, @@ -2701,7 +2667,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, AutoDelete, Internal, Args); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, passive = true}, - _ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) -> + _ConnPid, _AuthzContext, _CollectorPid, VHostPath, _User) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), check_not_default_exchange(ExchangeName), _ = rabbit_exchange:lookup_or_die(ExchangeName). diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index a244813d80..7a76ab45ca 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -66,12 +66,12 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User, {channel, {rabbit_channel, start_link, [Channel, ReaderPid, WriterPid, ReaderPid, ConnName, Protocol, User, VHost, Capabilities, Collector, - LimiterPid]}, + LimiterPid, undefined]}, intrinsic, ?FAIR_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), {ok, SupPid, {ChannelPid, AState}}; start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol, - User, VHost, Capabilities, Collector}) -> + User, VHost, Capabilities, Collector, AmqpParams}) -> {ok, SupPid} = supervisor2:start_link( ?MODULE, {direct, {ConnName, Channel}}), [LimiterPid] = supervisor2:find_child(SupPid, limiter), @@ -81,7 +81,7 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol, {channel, {rabbit_channel, start_link, [Channel, ClientChannelPid, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector, - LimiterPid]}, + LimiterPid, AmqpParams]}, intrinsic, ?FAIR_WAIT, worker, [rabbit_channel]}), {ok, SupPid, {ChannelPid, none}}. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index b84b4d91bb..683be84676 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -17,7 +17,7 @@ -module(rabbit_direct). -export([boot/0, force_event_refresh/1, list/0, connect/5, - start_channel/9, disconnect/2]). + start_channel/10, disconnect/2]). -deprecated([{force_event_refresh, 1, eventually}]). @@ -201,17 +201,16 @@ connect1(User, VHost, Protocol, Pid, Infos) -> -spec start_channel (rabbit_channel:channel_number(), pid(), pid(), string(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), - rabbit_framing:amqp_table(), pid()) -> + rabbit_framing:amqp_table(), pid(), any()) -> {'ok', pid()}. start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, - VHost, Capabilities, Collector) -> + VHost, Capabilities, Collector, AmqpParams) -> {ok, _, {ChannelPid, _}} = supervisor2:start_child( rabbit_direct_client_sup, [{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol, - User, VHost, Capabilities, Collector}]), - _ = rabbit_channel:source(ChannelPid, ?MODULE), + User, VHost, Capabilities, Collector, AmqpParams}]), {ok, ChannelPid}. -spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 116dcf89e6..70ed3246d3 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -925,7 +925,6 @@ create_channel(Channel, rabbit_channel_sup_sup:start_channel( ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, Protocol, User, VHost, Capabilities, Collector}), - _ = rabbit_channel:source(ChPid, ?MODULE), MRef = erlang:monitor(process, ChPid), put({ch_pid, ChPid}, {Channel, MRef}), put({channel, Channel}, {ChPid, AState}), diff --git a/test/channel_source_SUITE.erl b/test/channel_source_SUITE.erl deleted file mode 100644 index 3247ffa997..0000000000 --- a/test/channel_source_SUITE.erl +++ /dev/null @@ -1,164 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at https://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved. -%% - --module(channel_source_SUITE). - --include_lib("amqp_client/include/amqp_client.hrl"). --include_lib("eunit/include/eunit.hrl"). - --compile(export_all). - -all() -> - [ - {group, non_parallel_tests} - ]. - -groups() -> - [ - {non_parallel_tests, [], [ - network_rabbit_reader_channel_source, - network_arbitrary_channel_source, - direct_channel_source, - undefined_channel_source - ]} - ]. - -%% ------------------------------------------------------------------- -%% Testsuite setup/teardown. -%% ------------------------------------------------------------------- - -init_per_suite(Config) -> - rabbit_ct_helpers:log_environment(), - rabbit_ct_helpers:run_setup_steps(Config). - -end_per_suite(Config) -> - rabbit_ct_helpers:run_teardown_steps(Config). - -init_per_group(_, Config) -> - Config. - -end_per_group(_, Config) -> - Config. - -init_per_testcase(Testcase, Config) -> - rabbit_ct_helpers:testcase_started(Config, Testcase), - Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, Testcase} - ]), - rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()). - -end_per_testcase(Testcase, Config) -> - Config1 = rabbit_ct_helpers:run_steps(Config, - rabbit_ct_client_helpers:teardown_steps() ++ - rabbit_ct_broker_helpers:teardown_steps()), - rabbit_ct_helpers:testcase_finished(Config1, Testcase). - -%% ------------------------------------------------------------------- -%% Testcases. -%% ------------------------------------------------------------------- - -network_rabbit_reader_channel_source(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, network_rabbit_reader_channel_source1, [Config]). - -network_rabbit_reader_channel_source1(Config) -> - ExistingChannels = rabbit_channel:list(), - Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), - {ok, ClientCh} = amqp_connection:open_channel(Conn), - [ServerCh] = rabbit_channel:list() -- ExistingChannels, - [{source, rabbit_reader}] = rabbit_channel:info(ServerCh, [source]), - _ = rabbit_channel:source(ServerCh, ?MODULE), - [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]), - amqp_channel:close(ClientCh), - amqp_connection:close(Conn), - wait_for_channel_termination(ServerCh, 60), - passed. - -network_arbitrary_channel_source(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, network_arbitrary_channel_source1, [Config]). - -network_arbitrary_channel_source1(Config) -> - Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), - Writer = spawn(fun () -> rabbit_ct_broker_helpers:test_writer(self()) end), - {ok, Limiter} = rabbit_limiter:start_link(no_limiter_id), - {ok, Collector} = rabbit_queue_collector:start_link(no_collector_id), - {ok, Ch} = rabbit_channel:start_link( - 1, Conn, Writer, Conn, "", rabbit_framing_amqp_0_9_1, - rabbit_ct_broker_helpers:user(<<"guest">>), <<"/">>, [], - Collector, Limiter), - _ = rabbit_channel:source(Ch, ?MODULE), - [{source, ?MODULE}] = rabbit_channel:info(Ch, [source]), - [exit(P, normal) || P <- [Writer, Limiter, Collector, Ch]], - amqp_connection:close(Conn), - wait_for_channel_termination(Ch, 60), - passed. - -direct_channel_source(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, direct_channel_source1, [Config]). - -direct_channel_source1(Config) -> - ExistingChannels = rabbit_channel:list(), - Conn = rabbit_ct_client_helpers:open_unmanaged_connection_direct(Config), - {ok, ClientCh} = amqp_connection:open_channel(Conn), - [ServerCh] = rabbit_channel:list() -- ExistingChannels, - [{source, rabbit_direct}] = rabbit_channel:info(ServerCh, [source]), - _ = rabbit_channel:source(ServerCh, ?MODULE), - [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]), - amqp_channel:close(ClientCh), - amqp_connection:close(Conn), - wait_for_channel_termination(ServerCh, 60), - passed. - -wait_for_channel_termination(Ch, 0) -> - ?assertEqual( - {error, channel_terminated}, - rabbit_channel:source(Ch, ?MODULE)); -wait_for_channel_termination(Ch, Attempts) -> - case rabbit_channel:source(Ch, ?MODULE) of - {error, channel_terminated} -> - ok; - _ -> - timer:sleep(1000), - wait_for_channel_termination(Ch, Attempts - 1) - end. - -undefined_channel_source(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, undefined_channel_source1, [Config]). - -undefined_channel_source1(_Config) -> - ExistingChannels = rabbit_channel:list(), - {_Writer, _Limiter, ServerCh} = rabbit_ct_broker_helpers:test_channel(), - wait_for_server_channel(ExistingChannels, ServerCh, 60), - [{source, undefined}] = rabbit_channel:info(ServerCh, [source]), - _ = rabbit_channel:source(ServerCh, ?MODULE), - [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]), - passed. - -wait_for_server_channel(ExistingChannels, ServerCh, 0) -> - ?assertEqual([ServerCh], rabbit_channel:list() -- ExistingChannels); -wait_for_server_channel(ExistingChannels, ServerCh, Attempts) -> - case rabbit_channel:list() -- ExistingChannels of - [ServerCh] -> - ok; - _ -> - timer:sleep(1000), - wait_for_server_channel(ExistingChannels, ServerCh, Attempts - 1) - end. |
