diff options
| -rw-r--r-- | src/rabbit_channel.erl | 183 | ||||
| -rw-r--r-- | src/rabbit_channel_sup.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 8 |
3 files changed, 84 insertions, 113 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 2148a435b6..ef4ba4db5b 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -55,7 +55,7 @@ -behaviour(gen_server2). --export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]). +-export([start_link/12, do/2, do/3, do_flow/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, deliver_reply/2, send_credit_reply/2, send_drained/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1, @@ -121,7 +121,8 @@ consumer_prefetch, %% Message content size limit max_message_size, - consumer_timeout + consumer_timeout, + authz_context }). -record(ch, {cfg :: #conf{}, @@ -239,14 +240,14 @@ -spec start_link (channel_number(), pid(), pid(), pid(), string(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), - pid(), pid()) -> + pid(), pid(), any()) -> rabbit_types:ok_pid_or_error(). start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, - VHost, Capabilities, CollectorPid, Limiter) -> + VHost, Capabilities, CollectorPid, Limiter, AmqpParams) -> gen_server2:start_link( ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, - User, VHost, Capabilities, CollectorPid, Limiter], []). + User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams], []). -spec do(pid(), rabbit_framing:amqp_method_record()) -> 'ok'. @@ -481,7 +482,7 @@ update_user_state(Pid, UserState) when is_pid(Pid) -> %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, - Capabilities, CollectorPid, LimiterPid]) -> + Capabilities, CollectorPid, LimiterPid, AmqpParams]) -> process_flag(trap_exit, true), ?LG_PROCESS_TYPE(channel), ?store_proc_name({ConnName, Channel}), @@ -504,6 +505,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, put(permission_cache_can_expire, rabbit_access_control:permission_cache_can_expire(User)), MaxMessageSize = get_max_message_size(), ConsumerTimeout = get_consumer_timeout(), + AuthzContext = extract_variable_map_from_amqp_params(AmqpParams), State = #ch{cfg = #conf{state = starting, protocol = Protocol, channel = Channel, @@ -519,7 +521,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, trace_state = rabbit_trace:init(VHost), consumer_prefetch = Prefetch, max_message_size = MaxMessageSize, - consumer_timeout = ConsumerTimeout + consumer_timeout = ConsumerTimeout, + authz_context = AuthzContext }, limiter = Limiter, tx = none, @@ -1027,11 +1030,11 @@ check_write_permitted(Resource, User, Context) -> check_read_permitted(Resource, User, Context) -> check_resource_access(User, Resource, read, Context). -check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) -> - check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, write). +check_write_permitted_on_topic(Resource, User, RoutingKey, AuthzContext) -> + check_topic_authorisation(Resource, User, RoutingKey, AuthzContext, write). -check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) -> - check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, read). +check_read_permitted_on_topic(Resource, User, RoutingKey, AuthzContext) -> + check_topic_authorisation(Resource, User, RoutingKey, AuthzContext, read). check_user_id_header(#'P_basic'{user_id = undefined}, _) -> ok; @@ -1066,23 +1069,11 @@ check_internal_exchange(#exchange{name = Name, internal = true}) -> check_internal_exchange(_) -> ok. -check_topic_authorisation(Resource = #exchange{type = topic}, - User, none, RoutingKey, _ChSrc, Permission) -> - %% Called from outside the channel by mgmt API - AmqpParams = [], - check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission); -check_topic_authorisation(Resource = #exchange{type = topic}, - User, ConnPid, RoutingKey, ChSrc, Permission) when is_pid(ConnPid) -> - AmqpParams = get_amqp_params(ConnPid, ChSrc), - check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission); -check_topic_authorisation(_, _, _, _, _, _) -> - ok. - check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic}, User = #user{username = Username}, - AmqpParams, RoutingKey, Permission) -> + RoutingKey, AuthzContext, Permission) -> Resource = Name#resource{kind = topic}, - VariableMap = build_topic_variable_map(AmqpParams, VHost, Username), + VariableMap = build_topic_variable_map(AuthzContext, VHost, Username), Context = #{routing_key => RoutingKey, variable_map => VariableMap}, Cache = case get(topic_permission_cache) of @@ -1095,28 +1086,13 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost User, Resource, Permission, Context), CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1), put(topic_permission_cache, [{Resource, Context, Permission} | CacheTail]) - end. - -get_amqp_params(_ConnPid, rabbit_reader) -> []; -get_amqp_params(ConnPid, _Any) when is_pid(ConnPid) -> - Timeout = get_operation_timeout(), - get_amqp_params(ConnPid, rabbit_misc:is_process_alive(ConnPid), Timeout); -get_amqp_params(_, _) -> []. - -get_amqp_params(ConnPid, false, _Timeout) -> - %% Connection process is dead - rabbit_log_channel:debug("file ~p, line ~p - connection process not alive: ~p~n", - [?FILE, ?LINE, ConnPid]), - []; -get_amqp_params(ConnPid, true, Timeout) -> - rabbit_amqp_connection:amqp_params(ConnPid, Timeout). + end; +check_topic_authorisation(_, _, _, _, _) -> + ok. -build_topic_variable_map(AmqpParams, VHost, Username) -> - VariableFromAmqpParams = extract_variable_map_from_amqp_params(AmqpParams), - maps:merge(VariableFromAmqpParams, #{<<"vhost">> => VHost, <<"username">> => Username}). -extract_authz_context(ConnPid, ChSrc) -> - extract_variable_map_from_amqp_params(get_amqp_params(ConnPid, ChSrc)). +build_topic_variable_map(AuthzContext, VHost, Username) -> + maps:merge(AuthzContext, #{<<"vhost">> => VHost, <<"username">> => Username}). %% use tuple representation of amqp_params to avoid coupling. %% get variable map only from amqp_params_direct, not amqp_params_network. @@ -1317,13 +1293,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory}, Content, State = #ch{cfg = #conf{channel = ChannelNum, - conn_pid = ConnPid, - source = ChSrc, conn_name = ConnName, virtual_host = VHostPath, user = #user{username = Username} = User, trace_state = TraceState, - max_message_size = MaxMessageSize + max_message_size = MaxMessageSize, + authz_context = AuthzContext }, tx = Tx, confirm_enabled = ConfirmEnabled, @@ -1331,10 +1306,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, }) -> check_msg_size(Content, MaxMessageSize), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_write_permitted(ExchangeName, User, extract_authz_context(ConnPid, ChSrc)), + check_write_permitted(ExchangeName, User, AuthzContext), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), check_internal_exchange(Exchange), - check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc), + check_write_permitted_on_topic(Exchange, User, RoutingKey, AuthzContext), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = #content {properties = Props} = @@ -1387,13 +1362,13 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, conn_pid = ConnPid, user = User, virtual_host = VHostPath, - source = ChSrc + authz_context = AuthzContext }, limiter = Limiter, next_tag = DeliveryTag, queue_states = QueueStates0}) -> QueueName = qbin_to_resource(QueueNameBin, VHostPath), - check_read_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)), + check_read_permitted(QueueName, User, AuthzContext), case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, %% Use the delivery tag as consumer tag for quorum queues @@ -1474,14 +1449,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin, _, State = #ch{cfg = #conf{consumer_prefetch = ConsumerPrefetch, user = User, virtual_host = VHostPath, - conn_pid = ConnPid, - source = ChSrc}, + authz_context = AuthzContext}, consumer_mapping = ConsumerMapping }) -> case maps:find(ConsumerTag, ConsumerMapping) of error -> QueueName = qbin_to_resource(QueueNameBin, VHostPath), - check_read_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)), + check_read_permitted(QueueName, User, AuthzContext), ActualConsumerTag = case ConsumerTag of <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), @@ -1625,84 +1599,84 @@ handle_method(#'exchange.declare'{nowait = NoWait} = Method, user = User, queue_collector_pid = CollectorPid, conn_pid = ConnPid, - source = ChSrc}}) -> - handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), + authz_context = AuthzContext}}) -> + handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.delete'{nowait = NoWait} = Method, _, State = #ch{cfg = #conf{conn_pid = ConnPid, - source = ChSrc, + authz_context = AuthzContext, virtual_host = VHostPath, queue_collector_pid = CollectorPid, user = User}}) -> - handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.delete_ok'{}); handle_method(#'exchange.bind'{nowait = NoWait} = Method, _, State = #ch{cfg = #conf{virtual_host = VHostPath, conn_pid = ConnPid, - source = ChSrc, + authz_context = AuthzContext, queue_collector_pid = CollectorPid, user = User}}) -> - handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.bind_ok'{}); handle_method(#'exchange.unbind'{nowait = NoWait} = Method, _, State = #ch{cfg = #conf{virtual_host = VHostPath, conn_pid = ConnPid, - source = ChSrc, + authz_context = AuthzContext, queue_collector_pid = CollectorPid, user = User}}) -> - handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.unbind_ok'{}); handle_method(#'queue.declare'{nowait = NoWait} = Method, _, State = #ch{cfg = #conf{virtual_host = VHostPath, conn_pid = ConnPid, - source = ChSrc, + authz_context = AuthzContext, queue_collector_pid = CollectorPid, user = User}}) -> {ok, QueueName, MessageCount, ConsumerCount} = - handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User), return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); handle_method(#'queue.delete'{nowait = NoWait} = Method, _, State = #ch{cfg = #conf{conn_pid = ConnPid, - source = ChSrc, + authz_context = AuthzContext, virtual_host = VHostPath, queue_collector_pid = CollectorPid, user = User}}) -> {ok, PurgedMessageCount} = - handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'queue.delete_ok'{message_count = PurgedMessageCount}); handle_method(#'queue.bind'{nowait = NoWait} = Method, _, State = #ch{cfg = #conf{conn_pid = ConnPid, - source = ChSrc, + authz_context = AuthzContext, user = User, queue_collector_pid = CollectorPid, virtual_host = VHostPath}}) -> - handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'queue.bind_ok'{}); handle_method(#'queue.unbind'{} = Method, _, State = #ch{cfg = #conf{conn_pid = ConnPid, - source = ChSrc, + authz_context = AuthzContext, user = User, queue_collector_pid = CollectorPid, virtual_host = VHostPath}}) -> - handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), + handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User), return_ok(State, false, #'queue.unbind_ok'{}); handle_method(#'queue.purge'{nowait = NoWait} = Method, _, State = #ch{cfg = #conf{conn_pid = ConnPid, - source = ChSrc, + authz_context = AuthzContext, user = User, queue_collector_pid = CollectorPid, virtual_host = VHostPath}}) -> - case handle_method(Method, ConnPid, ChSrc, CollectorPid, + case handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User) of {ok, PurgedMessageCount} -> return_ok(State, NoWait, @@ -1948,21 +1922,20 @@ handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QRef, DQ)}. binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, - RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, + RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, #user{username = Username} = User) -> ExchangeNameBin = strip_cr_lf(SourceNameBin0), DestinationNameBin = strip_cr_lf(DestinationNameBin0), DestinationName = name_to_resource(DestinationType, DestinationNameBin, VHostPath), - AuthContext = extract_authz_context(ConnPid, ChSrc), - check_write_permitted(DestinationName, User, AuthContext), + check_write_permitted(DestinationName, User, AuthzContext), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), [check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]], - check_read_permitted(ExchangeName, User, AuthContext), + check_read_permitted(ExchangeName, User, AuthzContext), case rabbit_exchange:lookup(ExchangeName) of {error, not_found} -> ok; {ok, Exchange} -> - check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc) + check_read_permitted_on_topic(Exchange, User, RoutingKey, AuthzContext) end, case Fun(#binding{source = ExchangeName, destination = DestinationName, @@ -2473,39 +2446,39 @@ handle_method(#'exchange.bind'{destination = DestinationNameBin, source = SourceNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, ChSrc, _CollectorId, VHostPath, User) -> + ConnPid, AuthzContext, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:add/3, SourceNameBin, exchange, DestinationNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); + RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User); handle_method(#'exchange.unbind'{destination = DestinationNameBin, source = SourceNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, ChSrc, _CollectorId, VHostPath, User) -> + ConnPid, AuthzContext, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:remove/3, SourceNameBin, exchange, DestinationNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); + RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User); handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, ChSrc, _CollectorId, VHostPath, User) -> + ConnPid, AuthzContext, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:remove/3, ExchangeNameBin, queue, QueueNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); + RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User); handle_method(#'queue.bind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, - ConnPid, ChSrc, _CollectorId, VHostPath, User) -> + ConnPid, AuthzContext, _CollectorId, VHostPath, User) -> binding_action(fun rabbit_binding:add/3, ExchangeNameBin, queue, QueueNameBin, - RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User); + RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User); %% Note that all declares to these are effectively passive. If it %% exists it by definition has one consumer. handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to", _/binary>> = QueueNameBin}, - _ConnPid, _ChSrc, _CollectorPid, VHost, _User) -> + _ConnPid, _AuthzContext, _CollectorPid, VHost, _User) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin), case declare_fast_reply_to(StrippedQueueNameBin) of @@ -2519,7 +2492,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, auto_delete = AutoDelete, nowait = NoWait, arguments = Args} = Declare, - ConnPid, ChSrc, CollectorPid, VHostPath, + ConnPid, AuthzContext, CollectorPid, VHostPath, #user{username = Username} = User) -> Owner = case ExclusiveDeclare of true -> ConnPid; @@ -2533,7 +2506,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), - check_configure_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)), + check_configure_permitted(QueueName, User, AuthzContext), rabbit_core_metrics:queue_declared(QueueName), case rabbit_amqqueue:with( QueueName, @@ -2555,9 +2528,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin, "invalid type '~s' for arg '~s' in ~s", [Type, DlxKey, rabbit_misc:rs(QueueName)]); DLX -> - AuthContext = extract_authz_context(ConnPid, ChSrc), - check_read_permitted(QueueName, User, AuthContext), - check_write_permitted(DLX, User, AuthContext), + check_read_permitted(QueueName, User, AuthzContext), + check_write_permitted(DLX, User, AuthzContext), ok end, case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, @@ -2579,7 +2551,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, {existing, _Q} -> %% must have been created between the stat and the %% declare. Loop around again. - handle_method(Declare, ConnPid, ChSrc, CollectorPid, VHostPath, + handle_method(Declare, ConnPid, AuthzContext, CollectorPid, VHostPath, User); {absent, Q, Reason} -> rabbit_amqqueue:absent(Q, Reason); @@ -2595,7 +2567,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, handle_method(#'queue.declare'{queue = QueueNameBin, nowait = NoWait, passive = true}, - ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) -> + ConnPid, _AuthzContext, _CollectorPid, VHostPath, _User) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin), Fun = fun (Q0) -> @@ -2609,12 +2581,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin, handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, if_empty = IfEmpty}, - ConnPid, ChSrc, _CollectorPid, VHostPath, + ConnPid, AuthzContext, _CollectorPid, VHostPath, User = #user{username = Username}) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath), - check_configure_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)), + check_configure_permitted(QueueName, User, AuthzContext), case rabbit_amqqueue:with( QueueName, fun (Q) -> @@ -2638,13 +2610,13 @@ handle_method(#'queue.delete'{queue = QueueNameBin, end; handle_method(#'exchange.delete'{exchange = ExchangeNameBin, if_unused = IfUnused}, - ConnPid, ChSrc, _CollectorPid, VHostPath, + _ConnPid, AuthzContext, _CollectorPid, VHostPath, User = #user{username = Username}) -> StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin), check_not_default_exchange(ExchangeName), check_exchange_deletion(ExchangeName), - check_configure_permitted(ExchangeName, User, extract_authz_context(ConnPid, ChSrc)), + check_configure_permitted(ExchangeName, User, AuthzContext), case rabbit_exchange:delete(ExchangeName, IfUnused, Username) of {error, not_found} -> ok; @@ -2654,9 +2626,9 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, ok end; handle_method(#'queue.purge'{queue = QueueNameBin}, - ConnPid, ChSrc, _CollectorPid, VHostPath, User) -> + ConnPid, AuthzContext, _CollectorPid, VHostPath, User) -> QueueName = qbin_to_resource(QueueNameBin, VHostPath), - check_read_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)), + check_read_permitted(QueueName, User, AuthzContext), rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:purge(Q) end); @@ -2667,12 +2639,12 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, auto_delete = AutoDelete, internal = Internal, arguments = Args}, - ConnPid, ChSrc, _CollectorPid, VHostPath, + _ConnPid, AuthzContext, _CollectorPid, VHostPath, #user{username = Username} = User) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), check_not_default_exchange(ExchangeName), - check_configure_permitted(ExchangeName, User, extract_authz_context(ConnPid, ChSrc)), + check_configure_permitted(ExchangeName, User, AuthzContext), X = case rabbit_exchange:lookup(ExchangeName) of {ok, FoundX} -> FoundX; {error, not_found} -> @@ -2684,9 +2656,8 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, precondition_failed( "invalid type '~s' for arg '~s' in ~s", [Type, AeKey, rabbit_misc:rs(ExchangeName)]); - AName -> AuthContext = extract_authz_context(ConnPid, ChSrc), - check_read_permitted(ExchangeName, User, AuthContext), - check_write_permitted(AName, User, AuthContext), + AName -> check_read_permitted(ExchangeName, User, AuthzContext), + check_write_permitted(AName, User, AuthzContext), ok end, rabbit_exchange:declare(ExchangeName, @@ -2701,7 +2672,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, AutoDelete, Internal, Args); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, passive = true}, - _ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) -> + _ConnPid, _AuthzContext, _CollectorPid, VHostPath, _User) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), check_not_default_exchange(ExchangeName), _ = rabbit_exchange:lookup_or_die(ExchangeName). diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index a244813d80..7a76ab45ca 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -66,12 +66,12 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User, {channel, {rabbit_channel, start_link, [Channel, ReaderPid, WriterPid, ReaderPid, ConnName, Protocol, User, VHost, Capabilities, Collector, - LimiterPid]}, + LimiterPid, undefined]}, intrinsic, ?FAIR_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), {ok, SupPid, {ChannelPid, AState}}; start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol, - User, VHost, Capabilities, Collector}) -> + User, VHost, Capabilities, Collector, AmqpParams}) -> {ok, SupPid} = supervisor2:start_link( ?MODULE, {direct, {ConnName, Channel}}), [LimiterPid] = supervisor2:find_child(SupPid, limiter), @@ -81,7 +81,7 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol, {channel, {rabbit_channel, start_link, [Channel, ClientChannelPid, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector, - LimiterPid]}, + LimiterPid, AmqpParams]}, intrinsic, ?FAIR_WAIT, worker, [rabbit_channel]}), {ok, SupPid, {ChannelPid, none}}. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index b84b4d91bb..814b2783e6 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -17,7 +17,7 @@ -module(rabbit_direct). -export([boot/0, force_event_refresh/1, list/0, connect/5, - start_channel/9, disconnect/2]). + start_channel/10, disconnect/2]). -deprecated([{force_event_refresh, 1, eventually}]). @@ -201,16 +201,16 @@ connect1(User, VHost, Protocol, Pid, Infos) -> -spec start_channel (rabbit_channel:channel_number(), pid(), pid(), string(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), - rabbit_framing:amqp_table(), pid()) -> + rabbit_framing:amqp_table(), pid(), any()) -> {'ok', pid()}. start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, - VHost, Capabilities, Collector) -> + VHost, Capabilities, Collector, AmqpParams) -> {ok, _, {ChannelPid, _}} = supervisor2:start_child( rabbit_direct_client_sup, [{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol, - User, VHost, Capabilities, Collector}]), + User, VHost, Capabilities, Collector, AmqpParams}]), _ = rabbit_channel:source(ChannelPid, ?MODULE), {ok, ChannelPid}. |
