summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGrigory Starinkin <starinkin@gmail.com>2019-11-20 11:36:19 +0000
committerGrigory Starinkin <starinkin@gmail.com>2019-11-20 11:36:19 +0000
commitb24efa5ee27d88ec5b547f4b2d297651320b9734 (patch)
tree34a8058cc93021944b28598c938d85c316c3ef49
parent880965aa67009dc3fb8dc161b8b1b6df6a71ed9d (diff)
downloadrabbitmq-server-git-b24efa5ee27d88ec5b547f4b2d297651320b9734.tar.gz
avoid synchronous call to a connection process from a channel process
performance optimisation
-rw-r--r--src/rabbit_channel.erl183
-rw-r--r--src/rabbit_channel_sup.erl6
-rw-r--r--src/rabbit_direct.erl8
3 files changed, 84 insertions, 113 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 2148a435b6..ef4ba4db5b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -55,7 +55,7 @@
-behaviour(gen_server2).
--export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
+-export([start_link/12, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, deliver_reply/2,
send_credit_reply/2, send_drained/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1,
@@ -121,7 +121,8 @@
consumer_prefetch,
%% Message content size limit
max_message_size,
- consumer_timeout
+ consumer_timeout,
+ authz_context
}).
-record(ch, {cfg :: #conf{},
@@ -239,14 +240,14 @@
-spec start_link
(channel_number(), pid(), pid(), pid(), string(), rabbit_types:protocol(),
rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
- pid(), pid()) ->
+ pid(), pid(), any()) ->
rabbit_types:ok_pid_or_error().
start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User,
- VHost, Capabilities, CollectorPid, Limiter) ->
+ VHost, Capabilities, CollectorPid, Limiter, AmqpParams) ->
gen_server2:start_link(
?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol,
- User, VHost, Capabilities, CollectorPid, Limiter], []).
+ User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams], []).
-spec do(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
@@ -481,7 +482,7 @@ update_user_state(Pid, UserState) when is_pid(Pid) ->
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
- Capabilities, CollectorPid, LimiterPid]) ->
+ Capabilities, CollectorPid, LimiterPid, AmqpParams]) ->
process_flag(trap_exit, true),
?LG_PROCESS_TYPE(channel),
?store_proc_name({ConnName, Channel}),
@@ -504,6 +505,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
put(permission_cache_can_expire, rabbit_access_control:permission_cache_can_expire(User)),
MaxMessageSize = get_max_message_size(),
ConsumerTimeout = get_consumer_timeout(),
+ AuthzContext = extract_variable_map_from_amqp_params(AmqpParams),
State = #ch{cfg = #conf{state = starting,
protocol = Protocol,
channel = Channel,
@@ -519,7 +521,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
trace_state = rabbit_trace:init(VHost),
consumer_prefetch = Prefetch,
max_message_size = MaxMessageSize,
- consumer_timeout = ConsumerTimeout
+ consumer_timeout = ConsumerTimeout,
+ authz_context = AuthzContext
},
limiter = Limiter,
tx = none,
@@ -1027,11 +1030,11 @@ check_write_permitted(Resource, User, Context) ->
check_read_permitted(Resource, User, Context) ->
check_resource_access(User, Resource, read, Context).
-check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) ->
- check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, write).
+check_write_permitted_on_topic(Resource, User, RoutingKey, AuthzContext) ->
+ check_topic_authorisation(Resource, User, RoutingKey, AuthzContext, write).
-check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) ->
- check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, read).
+check_read_permitted_on_topic(Resource, User, RoutingKey, AuthzContext) ->
+ check_topic_authorisation(Resource, User, RoutingKey, AuthzContext, read).
check_user_id_header(#'P_basic'{user_id = undefined}, _) ->
ok;
@@ -1066,23 +1069,11 @@ check_internal_exchange(#exchange{name = Name, internal = true}) ->
check_internal_exchange(_) ->
ok.
-check_topic_authorisation(Resource = #exchange{type = topic},
- User, none, RoutingKey, _ChSrc, Permission) ->
- %% Called from outside the channel by mgmt API
- AmqpParams = [],
- check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission);
-check_topic_authorisation(Resource = #exchange{type = topic},
- User, ConnPid, RoutingKey, ChSrc, Permission) when is_pid(ConnPid) ->
- AmqpParams = get_amqp_params(ConnPid, ChSrc),
- check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission);
-check_topic_authorisation(_, _, _, _, _, _) ->
- ok.
-
check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic},
User = #user{username = Username},
- AmqpParams, RoutingKey, Permission) ->
+ RoutingKey, AuthzContext, Permission) ->
Resource = Name#resource{kind = topic},
- VariableMap = build_topic_variable_map(AmqpParams, VHost, Username),
+ VariableMap = build_topic_variable_map(AuthzContext, VHost, Username),
Context = #{routing_key => RoutingKey,
variable_map => VariableMap},
Cache = case get(topic_permission_cache) of
@@ -1095,28 +1086,13 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost
User, Resource, Permission, Context),
CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1),
put(topic_permission_cache, [{Resource, Context, Permission} | CacheTail])
- end.
-
-get_amqp_params(_ConnPid, rabbit_reader) -> [];
-get_amqp_params(ConnPid, _Any) when is_pid(ConnPid) ->
- Timeout = get_operation_timeout(),
- get_amqp_params(ConnPid, rabbit_misc:is_process_alive(ConnPid), Timeout);
-get_amqp_params(_, _) -> [].
-
-get_amqp_params(ConnPid, false, _Timeout) ->
- %% Connection process is dead
- rabbit_log_channel:debug("file ~p, line ~p - connection process not alive: ~p~n",
- [?FILE, ?LINE, ConnPid]),
- [];
-get_amqp_params(ConnPid, true, Timeout) ->
- rabbit_amqp_connection:amqp_params(ConnPid, Timeout).
+ end;
+check_topic_authorisation(_, _, _, _, _) ->
+ ok.
-build_topic_variable_map(AmqpParams, VHost, Username) ->
- VariableFromAmqpParams = extract_variable_map_from_amqp_params(AmqpParams),
- maps:merge(VariableFromAmqpParams, #{<<"vhost">> => VHost, <<"username">> => Username}).
-extract_authz_context(ConnPid, ChSrc) ->
- extract_variable_map_from_amqp_params(get_amqp_params(ConnPid, ChSrc)).
+build_topic_variable_map(AuthzContext, VHost, Username) ->
+ maps:merge(AuthzContext, #{<<"vhost">> => VHost, <<"username">> => Username}).
%% use tuple representation of amqp_params to avoid coupling.
%% get variable map only from amqp_params_direct, not amqp_params_network.
@@ -1317,13 +1293,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory},
Content, State = #ch{cfg = #conf{channel = ChannelNum,
- conn_pid = ConnPid,
- source = ChSrc,
conn_name = ConnName,
virtual_host = VHostPath,
user = #user{username = Username} = User,
trace_state = TraceState,
- max_message_size = MaxMessageSize
+ max_message_size = MaxMessageSize,
+ authz_context = AuthzContext
},
tx = Tx,
confirm_enabled = ConfirmEnabled,
@@ -1331,10 +1306,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
}) ->
check_msg_size(Content, MaxMessageSize),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- check_write_permitted(ExchangeName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_write_permitted(ExchangeName, User, AuthzContext),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
check_internal_exchange(Exchange),
- check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc),
+ check_write_permitted_on_topic(Exchange, User, RoutingKey, AuthzContext),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = #content {properties = Props} =
@@ -1387,13 +1362,13 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
conn_pid = ConnPid,
user = User,
virtual_host = VHostPath,
- source = ChSrc
+ authz_context = AuthzContext
},
limiter = Limiter,
next_tag = DeliveryTag,
queue_states = QueueStates0}) ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
- check_read_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_read_permitted(QueueName, User, AuthzContext),
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
%% Use the delivery tag as consumer tag for quorum queues
@@ -1474,14 +1449,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
_, State = #ch{cfg = #conf{consumer_prefetch = ConsumerPrefetch,
user = User,
virtual_host = VHostPath,
- conn_pid = ConnPid,
- source = ChSrc},
+ authz_context = AuthzContext},
consumer_mapping = ConsumerMapping
}) ->
case maps:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
- check_read_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_read_permitted(QueueName, User, AuthzContext),
ActualConsumerTag =
case ConsumerTag of
<<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
@@ -1625,84 +1599,84 @@ handle_method(#'exchange.declare'{nowait = NoWait} = Method,
user = User,
queue_collector_pid = CollectorPid,
conn_pid = ConnPid,
- source = ChSrc}}) ->
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ authz_context = AuthzContext}}) ->
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.delete'{nowait = NoWait} = Method,
_, State = #ch{cfg = #conf{conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
virtual_host = VHostPath,
queue_collector_pid = CollectorPid,
user = User}}) ->
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.delete_ok'{});
handle_method(#'exchange.bind'{nowait = NoWait} = Method,
_, State = #ch{cfg = #conf{virtual_host = VHostPath,
conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
queue_collector_pid = CollectorPid,
user = User}}) ->
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.bind_ok'{});
handle_method(#'exchange.unbind'{nowait = NoWait} = Method,
_, State = #ch{cfg = #conf{virtual_host = VHostPath,
conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
queue_collector_pid = CollectorPid,
user = User}}) ->
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.unbind_ok'{});
handle_method(#'queue.declare'{nowait = NoWait} = Method,
_, State = #ch{cfg = #conf{virtual_host = VHostPath,
conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
queue_collector_pid = CollectorPid,
user = User}}) ->
{ok, QueueName, MessageCount, ConsumerCount} =
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_queue_declare_ok(QueueName, NoWait, MessageCount,
ConsumerCount, State);
handle_method(#'queue.delete'{nowait = NoWait} = Method, _,
State = #ch{cfg = #conf{conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
virtual_host = VHostPath,
queue_collector_pid = CollectorPid,
user = User}}) ->
{ok, PurgedMessageCount} =
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_ok(State, NoWait,
#'queue.delete_ok'{message_count = PurgedMessageCount});
handle_method(#'queue.bind'{nowait = NoWait} = Method, _,
State = #ch{cfg = #conf{conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}}) ->
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'queue.bind_ok'{});
handle_method(#'queue.unbind'{} = Method, _,
State = #ch{cfg = #conf{conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}}) ->
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_ok(State, false, #'queue.unbind_ok'{});
handle_method(#'queue.purge'{nowait = NoWait} = Method,
_, State = #ch{cfg = #conf{conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}}) ->
- case handle_method(Method, ConnPid, ChSrc, CollectorPid,
+ case handle_method(Method, ConnPid, AuthzContext, CollectorPid,
VHostPath, User) of
{ok, PurgedMessageCount} ->
return_ok(State, NoWait,
@@ -1948,21 +1922,20 @@ handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QRef, DQ)}.
binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
- RoutingKey, Arguments, VHostPath, ConnPid, ChSrc,
+ RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext,
#user{username = Username} = User) ->
ExchangeNameBin = strip_cr_lf(SourceNameBin0),
DestinationNameBin = strip_cr_lf(DestinationNameBin0),
DestinationName = name_to_resource(DestinationType, DestinationNameBin, VHostPath),
- AuthContext = extract_authz_context(ConnPid, ChSrc),
- check_write_permitted(DestinationName, User, AuthContext),
+ check_write_permitted(DestinationName, User, AuthzContext),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
[check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]],
- check_read_permitted(ExchangeName, User, AuthContext),
+ check_read_permitted(ExchangeName, User, AuthzContext),
case rabbit_exchange:lookup(ExchangeName) of
{error, not_found} ->
ok;
{ok, Exchange} ->
- check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc)
+ check_read_permitted_on_topic(Exchange, User, RoutingKey, AuthzContext)
end,
case Fun(#binding{source = ExchangeName,
destination = DestinationName,
@@ -2473,39 +2446,39 @@ handle_method(#'exchange.bind'{destination = DestinationNameBin,
source = SourceNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
+ ConnPid, AuthzContext, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:add/3,
SourceNameBin, exchange, DestinationNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User);
handle_method(#'exchange.unbind'{destination = DestinationNameBin,
source = SourceNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
+ ConnPid, AuthzContext, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:remove/3,
SourceNameBin, exchange, DestinationNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User);
handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
+ ConnPid, AuthzContext, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:remove/3,
ExchangeNameBin, queue, QueueNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User);
handle_method(#'queue.bind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
+ ConnPid, AuthzContext, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:add/3,
ExchangeNameBin, queue, QueueNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User);
%% Note that all declares to these are effectively passive. If it
%% exists it by definition has one consumer.
handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to",
_/binary>> = QueueNameBin},
- _ConnPid, _ChSrc, _CollectorPid, VHost, _User) ->
+ _ConnPid, _AuthzContext, _CollectorPid, VHost, _User) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin),
case declare_fast_reply_to(StrippedQueueNameBin) of
@@ -2519,7 +2492,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
auto_delete = AutoDelete,
nowait = NoWait,
arguments = Args} = Declare,
- ConnPid, ChSrc, CollectorPid, VHostPath,
+ ConnPid, AuthzContext, CollectorPid, VHostPath,
#user{username = Username} = User) ->
Owner = case ExclusiveDeclare of
true -> ConnPid;
@@ -2533,7 +2506,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- check_configure_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_configure_permitted(QueueName, User, AuthzContext),
rabbit_core_metrics:queue_declared(QueueName),
case rabbit_amqqueue:with(
QueueName,
@@ -2555,9 +2528,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
"invalid type '~s' for arg '~s' in ~s",
[Type, DlxKey, rabbit_misc:rs(QueueName)]);
DLX ->
- AuthContext = extract_authz_context(ConnPid, ChSrc),
- check_read_permitted(QueueName, User, AuthContext),
- check_write_permitted(DLX, User, AuthContext),
+ check_read_permitted(QueueName, User, AuthzContext),
+ check_write_permitted(DLX, User, AuthzContext),
ok
end,
case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
@@ -2579,7 +2551,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
{existing, _Q} ->
%% must have been created between the stat and the
%% declare. Loop around again.
- handle_method(Declare, ConnPid, ChSrc, CollectorPid, VHostPath,
+ handle_method(Declare, ConnPid, AuthzContext, CollectorPid, VHostPath,
User);
{absent, Q, Reason} ->
rabbit_amqqueue:absent(Q, Reason);
@@ -2595,7 +2567,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
handle_method(#'queue.declare'{queue = QueueNameBin,
nowait = NoWait,
passive = true},
- ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) ->
+ ConnPid, _AuthzContext, _CollectorPid, VHostPath, _User) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin),
Fun = fun (Q0) ->
@@ -2609,12 +2581,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
if_empty = IfEmpty},
- ConnPid, ChSrc, _CollectorPid, VHostPath,
+ ConnPid, AuthzContext, _CollectorPid, VHostPath,
User = #user{username = Username}) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath),
- check_configure_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_configure_permitted(QueueName, User, AuthzContext),
case rabbit_amqqueue:with(
QueueName,
fun (Q) ->
@@ -2638,13 +2610,13 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
end;
handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
if_unused = IfUnused},
- ConnPid, ChSrc, _CollectorPid, VHostPath,
+ _ConnPid, AuthzContext, _CollectorPid, VHostPath,
User = #user{username = Username}) ->
StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin),
check_not_default_exchange(ExchangeName),
check_exchange_deletion(ExchangeName),
- check_configure_permitted(ExchangeName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_configure_permitted(ExchangeName, User, AuthzContext),
case rabbit_exchange:delete(ExchangeName, IfUnused, Username) of
{error, not_found} ->
ok;
@@ -2654,9 +2626,9 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
ok
end;
handle_method(#'queue.purge'{queue = QueueNameBin},
- ConnPid, ChSrc, _CollectorPid, VHostPath, User) ->
+ ConnPid, AuthzContext, _CollectorPid, VHostPath, User) ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
- check_read_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_read_permitted(QueueName, User, AuthzContext),
rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:purge(Q) end);
@@ -2667,12 +2639,12 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
auto_delete = AutoDelete,
internal = Internal,
arguments = Args},
- ConnPid, ChSrc, _CollectorPid, VHostPath,
+ _ConnPid, AuthzContext, _CollectorPid, VHostPath,
#user{username = Username} = User) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
check_not_default_exchange(ExchangeName),
- check_configure_permitted(ExchangeName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_configure_permitted(ExchangeName, User, AuthzContext),
X = case rabbit_exchange:lookup(ExchangeName) of
{ok, FoundX} -> FoundX;
{error, not_found} ->
@@ -2684,9 +2656,8 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
precondition_failed(
"invalid type '~s' for arg '~s' in ~s",
[Type, AeKey, rabbit_misc:rs(ExchangeName)]);
- AName -> AuthContext = extract_authz_context(ConnPid, ChSrc),
- check_read_permitted(ExchangeName, User, AuthContext),
- check_write_permitted(AName, User, AuthContext),
+ AName -> check_read_permitted(ExchangeName, User, AuthzContext),
+ check_write_permitted(AName, User, AuthzContext),
ok
end,
rabbit_exchange:declare(ExchangeName,
@@ -2701,7 +2672,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
AutoDelete, Internal, Args);
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
passive = true},
- _ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) ->
+ _ConnPid, _AuthzContext, _CollectorPid, VHostPath, _User) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
check_not_default_exchange(ExchangeName),
_ = rabbit_exchange:lookup_or_die(ExchangeName).
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index a244813d80..7a76ab45ca 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -66,12 +66,12 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User,
{channel, {rabbit_channel, start_link,
[Channel, ReaderPid, WriterPid, ReaderPid, ConnName,
Protocol, User, VHost, Capabilities, Collector,
- LimiterPid]},
+ LimiterPid, undefined]},
intrinsic, ?FAIR_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
{ok, SupPid, {ChannelPid, AState}};
start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
- User, VHost, Capabilities, Collector}) ->
+ User, VHost, Capabilities, Collector, AmqpParams}) ->
{ok, SupPid} = supervisor2:start_link(
?MODULE, {direct, {ConnName, Channel}}),
[LimiterPid] = supervisor2:find_child(SupPid, limiter),
@@ -81,7 +81,7 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
{channel, {rabbit_channel, start_link,
[Channel, ClientChannelPid, ClientChannelPid, ConnPid,
ConnName, Protocol, User, VHost, Capabilities, Collector,
- LimiterPid]},
+ LimiterPid, AmqpParams]},
intrinsic, ?FAIR_WAIT, worker, [rabbit_channel]}),
{ok, SupPid, {ChannelPid, none}}.
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index b84b4d91bb..814b2783e6 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -17,7 +17,7 @@
-module(rabbit_direct).
-export([boot/0, force_event_refresh/1, list/0, connect/5,
- start_channel/9, disconnect/2]).
+ start_channel/10, disconnect/2]).
-deprecated([{force_event_refresh, 1, eventually}]).
@@ -201,16 +201,16 @@ connect1(User, VHost, Protocol, Pid, Infos) ->
-spec start_channel
(rabbit_channel:channel_number(), pid(), pid(), string(),
rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
- rabbit_framing:amqp_table(), pid()) ->
+ rabbit_framing:amqp_table(), pid(), any()) ->
{'ok', pid()}.
start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User,
- VHost, Capabilities, Collector) ->
+ VHost, Capabilities, Collector, AmqpParams) ->
{ok, _, {ChannelPid, _}} =
supervisor2:start_child(
rabbit_direct_client_sup,
[{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol,
- User, VHost, Capabilities, Collector}]),
+ User, VHost, Capabilities, Collector, AmqpParams}]),
_ = rabbit_channel:source(ChannelPid, ?MODULE),
{ok, ChannelPid}.