summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl218
-rw-r--r--src/rabbit_channel_sup.erl6
-rw-r--r--src/rabbit_direct.erl9
-rw-r--r--src/rabbit_reader.erl1
4 files changed, 99 insertions, 135 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 2148a435b6..37ac409985 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -55,7 +55,7 @@
-behaviour(gen_server2).
--export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
+-export([start_link/12, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, deliver_reply/2,
send_credit_reply/2, send_drained/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1,
@@ -63,7 +63,7 @@
-export([refresh_config_local/0, ready_for_close/1]).
-export([refresh_interceptors/0]).
-export([force_event_refresh/1]).
--export([source/2, update_user_state/2]).
+-export([update_user_state/2]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1,
@@ -121,7 +121,8 @@
consumer_prefetch,
%% Message content size limit
max_message_size,
- consumer_timeout
+ consumer_timeout,
+ authz_context
}).
-record(ch, {cfg :: #conf{},
@@ -239,14 +240,14 @@
-spec start_link
(channel_number(), pid(), pid(), pid(), string(), rabbit_types:protocol(),
rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
- pid(), pid()) ->
+ pid(), pid(), any()) ->
rabbit_types:ok_pid_or_error().
start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User,
- VHost, Capabilities, CollectorPid, Limiter) ->
+ VHost, Capabilities, CollectorPid, Limiter, AmqpParams) ->
gen_server2:start_link(
?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol,
- User, VHost, Capabilities, CollectorPid, Limiter], []).
+ User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams], []).
-spec do(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
@@ -460,15 +461,6 @@ force_event_refresh(Ref) ->
list_queue_states(Pid) ->
gen_server2:call(Pid, list_queue_states).
--spec source(pid(), any()) -> 'ok' | {error, channel_terminated}.
-
-source(Pid, Source) when is_pid(Pid) ->
- case erlang:is_process_alive(Pid) of
- true -> Pid ! {channel_source, Source},
- ok;
- false -> {error, channel_terminated}
- end.
-
-spec update_user_state(pid(), rabbit_types:auth_user()) -> 'ok' | {error, channel_terminated}.
update_user_state(Pid, UserState) when is_pid(Pid) ->
@@ -481,7 +473,7 @@ update_user_state(Pid, UserState) when is_pid(Pid) ->
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
- Capabilities, CollectorPid, LimiterPid]) ->
+ Capabilities, CollectorPid, LimiterPid, AmqpParams]) ->
process_flag(trap_exit, true),
?LG_PROCESS_TYPE(channel),
?store_proc_name({ConnName, Channel}),
@@ -504,6 +496,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
put(permission_cache_can_expire, rabbit_access_control:permission_cache_can_expire(User)),
MaxMessageSize = get_max_message_size(),
ConsumerTimeout = get_consumer_timeout(),
+ OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams),
State = #ch{cfg = #conf{state = starting,
protocol = Protocol,
channel = Channel,
@@ -519,7 +512,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
trace_state = rabbit_trace:init(VHost),
consumer_prefetch = Prefetch,
max_message_size = MaxMessageSize,
- consumer_timeout = ConsumerTimeout
+ consumer_timeout = ConsumerTimeout,
+ authz_context = OptionalVariables
},
limiter = Limiter,
tx = none,
@@ -874,8 +868,6 @@ handle_info(tick, State0 = #ch{queue_states = QueueStates0}) ->
Return ->
Return
end;
-handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) ->
- noreply(State#ch{cfg = Cfg#conf{source = Source}});
handle_info({update_user_state, User}, State = #ch{cfg = Cfg}) ->
noreply(State#ch{cfg = Cfg#conf{user = User}}).
@@ -1027,11 +1019,11 @@ check_write_permitted(Resource, User, Context) ->
check_read_permitted(Resource, User, Context) ->
check_resource_access(User, Resource, read, Context).
-check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) ->
- check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, write).
+check_write_permitted_on_topic(Resource, User, RoutingKey, AuthzContext) ->
+ check_topic_authorisation(Resource, User, RoutingKey, AuthzContext, write).
-check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) ->
- check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, read).
+check_read_permitted_on_topic(Resource, User, RoutingKey, AuthzContext) ->
+ check_topic_authorisation(Resource, User, RoutingKey, AuthzContext, read).
check_user_id_header(#'P_basic'{user_id = undefined}, _) ->
ok;
@@ -1066,23 +1058,11 @@ check_internal_exchange(#exchange{name = Name, internal = true}) ->
check_internal_exchange(_) ->
ok.
-check_topic_authorisation(Resource = #exchange{type = topic},
- User, none, RoutingKey, _ChSrc, Permission) ->
- %% Called from outside the channel by mgmt API
- AmqpParams = [],
- check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission);
-check_topic_authorisation(Resource = #exchange{type = topic},
- User, ConnPid, RoutingKey, ChSrc, Permission) when is_pid(ConnPid) ->
- AmqpParams = get_amqp_params(ConnPid, ChSrc),
- check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission);
-check_topic_authorisation(_, _, _, _, _, _) ->
- ok.
-
check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic},
User = #user{username = Username},
- AmqpParams, RoutingKey, Permission) ->
+ RoutingKey, AuthzContext, Permission) ->
Resource = Name#resource{kind = topic},
- VariableMap = build_topic_variable_map(AmqpParams, VHost, Username),
+ VariableMap = build_topic_variable_map(AuthzContext, VHost, Username),
Context = #{routing_key => RoutingKey,
variable_map => VariableMap},
Cache = case get(topic_permission_cache) of
@@ -1095,35 +1075,27 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost
User, Resource, Permission, Context),
CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1),
put(topic_permission_cache, [{Resource, Context, Permission} | CacheTail])
- end.
+ end;
+check_topic_authorisation(_, _, _, _, _) ->
+ ok.
-get_amqp_params(_ConnPid, rabbit_reader) -> [];
-get_amqp_params(ConnPid, _Any) when is_pid(ConnPid) ->
- Timeout = get_operation_timeout(),
- get_amqp_params(ConnPid, rabbit_misc:is_process_alive(ConnPid), Timeout);
-get_amqp_params(_, _) -> [].
-
-get_amqp_params(ConnPid, false, _Timeout) ->
- %% Connection process is dead
- rabbit_log_channel:debug("file ~p, line ~p - connection process not alive: ~p~n",
- [?FILE, ?LINE, ConnPid]),
- [];
-get_amqp_params(ConnPid, true, Timeout) ->
- rabbit_amqp_connection:amqp_params(ConnPid, Timeout).
-
-build_topic_variable_map(AmqpParams, VHost, Username) ->
- VariableFromAmqpParams = extract_variable_map_from_amqp_params(AmqpParams),
- maps:merge(VariableFromAmqpParams, #{<<"vhost">> => VHost, <<"username">> => Username}).
-
-extract_authz_context(ConnPid, ChSrc) ->
- extract_variable_map_from_amqp_params(get_amqp_params(ConnPid, ChSrc)).
-
-%% use tuple representation of amqp_params to avoid coupling.
-%% get variable map only from amqp_params_direct, not amqp_params_network.
-%% amqp_params_direct are usually used from plugins (e.g. MQTT, STOMP)
-extract_variable_map_from_amqp_params([{amqp_params, {amqp_params_direct, _, _, _, _,
- {amqp_adapter_info, _,_,_,_,_,_,AdditionalInfo}, _}}]) ->
+
+build_topic_variable_map(AuthzContext, VHost, Username) when is_map(AuthzContext) ->
+ maps:merge(AuthzContext, #{<<"vhost">> => VHost, <<"username">> => Username});
+build_topic_variable_map(AuthzContext, VHost, Username) ->
+ maps:merge(extract_variable_map_from_amqp_params(AuthzContext), #{<<"vhost">> => VHost, <<"username">> => Username}).
+
+%% Use tuple representation of amqp_params to avoid a dependency on amqp_client.
+%% Extracts variable map only from amqp_params_direct, not amqp_params_network.
+%% amqp_params_direct records are usually used by plugins (e.g. MQTT, STOMP)
+extract_variable_map_from_amqp_params({amqp_params, {amqp_params_direct, _, _, _, _,
+ {amqp_adapter_info, _,_,_,_,_,_,AdditionalInfo}, _}}) ->
proplists:get_value(variable_map, AdditionalInfo, #{});
+extract_variable_map_from_amqp_params({amqp_params_direct, _, _, _, _,
+ {amqp_adapter_info, _,_,_,_,_,_,AdditionalInfo}, _}) ->
+ proplists:get_value(variable_map, AdditionalInfo, #{});
+extract_variable_map_from_amqp_params([Value]) ->
+ extract_variable_map_from_amqp_params(Value);
extract_variable_map_from_amqp_params(_) ->
#{}.
@@ -1317,13 +1289,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory},
Content, State = #ch{cfg = #conf{channel = ChannelNum,
- conn_pid = ConnPid,
- source = ChSrc,
conn_name = ConnName,
virtual_host = VHostPath,
user = #user{username = Username} = User,
trace_state = TraceState,
- max_message_size = MaxMessageSize
+ max_message_size = MaxMessageSize,
+ authz_context = AuthzContext
},
tx = Tx,
confirm_enabled = ConfirmEnabled,
@@ -1331,10 +1302,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
}) ->
check_msg_size(Content, MaxMessageSize),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- check_write_permitted(ExchangeName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_write_permitted(ExchangeName, User, AuthzContext),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
check_internal_exchange(Exchange),
- check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc),
+ check_write_permitted_on_topic(Exchange, User, RoutingKey, AuthzContext),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = #content {properties = Props} =
@@ -1387,13 +1358,13 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
conn_pid = ConnPid,
user = User,
virtual_host = VHostPath,
- source = ChSrc
+ authz_context = AuthzContext
},
limiter = Limiter,
next_tag = DeliveryTag,
queue_states = QueueStates0}) ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
- check_read_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_read_permitted(QueueName, User, AuthzContext),
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
%% Use the delivery tag as consumer tag for quorum queues
@@ -1474,14 +1445,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
_, State = #ch{cfg = #conf{consumer_prefetch = ConsumerPrefetch,
user = User,
virtual_host = VHostPath,
- conn_pid = ConnPid,
- source = ChSrc},
+ authz_context = AuthzContext},
consumer_mapping = ConsumerMapping
}) ->
case maps:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
- check_read_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_read_permitted(QueueName, User, AuthzContext),
ActualConsumerTag =
case ConsumerTag of
<<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
@@ -1625,84 +1595,84 @@ handle_method(#'exchange.declare'{nowait = NoWait} = Method,
user = User,
queue_collector_pid = CollectorPid,
conn_pid = ConnPid,
- source = ChSrc}}) ->
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ authz_context = AuthzContext}}) ->
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.delete'{nowait = NoWait} = Method,
_, State = #ch{cfg = #conf{conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
virtual_host = VHostPath,
queue_collector_pid = CollectorPid,
user = User}}) ->
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.delete_ok'{});
handle_method(#'exchange.bind'{nowait = NoWait} = Method,
_, State = #ch{cfg = #conf{virtual_host = VHostPath,
conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
queue_collector_pid = CollectorPid,
user = User}}) ->
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.bind_ok'{});
handle_method(#'exchange.unbind'{nowait = NoWait} = Method,
_, State = #ch{cfg = #conf{virtual_host = VHostPath,
conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
queue_collector_pid = CollectorPid,
user = User}}) ->
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.unbind_ok'{});
handle_method(#'queue.declare'{nowait = NoWait} = Method,
_, State = #ch{cfg = #conf{virtual_host = VHostPath,
conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
queue_collector_pid = CollectorPid,
user = User}}) ->
{ok, QueueName, MessageCount, ConsumerCount} =
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_queue_declare_ok(QueueName, NoWait, MessageCount,
ConsumerCount, State);
handle_method(#'queue.delete'{nowait = NoWait} = Method, _,
State = #ch{cfg = #conf{conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
virtual_host = VHostPath,
queue_collector_pid = CollectorPid,
user = User}}) ->
{ok, PurgedMessageCount} =
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_ok(State, NoWait,
#'queue.delete_ok'{message_count = PurgedMessageCount});
handle_method(#'queue.bind'{nowait = NoWait} = Method, _,
State = #ch{cfg = #conf{conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}}) ->
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'queue.bind_ok'{});
handle_method(#'queue.unbind'{} = Method, _,
State = #ch{cfg = #conf{conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}}) ->
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_ok(State, false, #'queue.unbind_ok'{});
handle_method(#'queue.purge'{nowait = NoWait} = Method,
_, State = #ch{cfg = #conf{conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}}) ->
- case handle_method(Method, ConnPid, ChSrc, CollectorPid,
+ case handle_method(Method, ConnPid, AuthzContext, CollectorPid,
VHostPath, User) of
{ok, PurgedMessageCount} ->
return_ok(State, NoWait,
@@ -1948,21 +1918,20 @@ handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QRef, DQ)}.
binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
- RoutingKey, Arguments, VHostPath, ConnPid, ChSrc,
+ RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext,
#user{username = Username} = User) ->
ExchangeNameBin = strip_cr_lf(SourceNameBin0),
DestinationNameBin = strip_cr_lf(DestinationNameBin0),
DestinationName = name_to_resource(DestinationType, DestinationNameBin, VHostPath),
- AuthContext = extract_authz_context(ConnPid, ChSrc),
- check_write_permitted(DestinationName, User, AuthContext),
+ check_write_permitted(DestinationName, User, AuthzContext),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
[check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]],
- check_read_permitted(ExchangeName, User, AuthContext),
+ check_read_permitted(ExchangeName, User, AuthzContext),
case rabbit_exchange:lookup(ExchangeName) of
{error, not_found} ->
ok;
{ok, Exchange} ->
- check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc)
+ check_read_permitted_on_topic(Exchange, User, RoutingKey, AuthzContext)
end,
case Fun(#binding{source = ExchangeName,
destination = DestinationName,
@@ -2393,7 +2362,6 @@ i(user_who_performed_action, Ch) -> i(user, Ch);
i(vhost, #ch{cfg = #conf{virtual_host = VHost}}) -> VHost;
i(transactional, #ch{tx = Tx}) -> Tx =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
-i(source, #ch{cfg = #conf{source = ChSrc}}) -> ChSrc;
i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM);
i(messages_unconfirmed, #ch{unconfirmed = UC}) -> unconfirmed_messages:size(UC);
@@ -2473,39 +2441,39 @@ handle_method(#'exchange.bind'{destination = DestinationNameBin,
source = SourceNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
+ ConnPid, AuthzContext, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:add/3,
SourceNameBin, exchange, DestinationNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User);
handle_method(#'exchange.unbind'{destination = DestinationNameBin,
source = SourceNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
+ ConnPid, AuthzContext, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:remove/3,
SourceNameBin, exchange, DestinationNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User);
handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
+ ConnPid, AuthzContext, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:remove/3,
ExchangeNameBin, queue, QueueNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User);
handle_method(#'queue.bind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
+ ConnPid, AuthzContext, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:add/3,
ExchangeNameBin, queue, QueueNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User);
%% Note that all declares to these are effectively passive. If it
%% exists it by definition has one consumer.
handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to",
_/binary>> = QueueNameBin},
- _ConnPid, _ChSrc, _CollectorPid, VHost, _User) ->
+ _ConnPid, _AuthzContext, _CollectorPid, VHost, _User) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin),
case declare_fast_reply_to(StrippedQueueNameBin) of
@@ -2519,7 +2487,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
auto_delete = AutoDelete,
nowait = NoWait,
arguments = Args} = Declare,
- ConnPid, ChSrc, CollectorPid, VHostPath,
+ ConnPid, AuthzContext, CollectorPid, VHostPath,
#user{username = Username} = User) ->
Owner = case ExclusiveDeclare of
true -> ConnPid;
@@ -2533,7 +2501,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- check_configure_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_configure_permitted(QueueName, User, AuthzContext),
rabbit_core_metrics:queue_declared(QueueName),
case rabbit_amqqueue:with(
QueueName,
@@ -2555,9 +2523,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
"invalid type '~s' for arg '~s' in ~s",
[Type, DlxKey, rabbit_misc:rs(QueueName)]);
DLX ->
- AuthContext = extract_authz_context(ConnPid, ChSrc),
- check_read_permitted(QueueName, User, AuthContext),
- check_write_permitted(DLX, User, AuthContext),
+ check_read_permitted(QueueName, User, AuthzContext),
+ check_write_permitted(DLX, User, AuthzContext),
ok
end,
case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
@@ -2579,7 +2546,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
{existing, _Q} ->
%% must have been created between the stat and the
%% declare. Loop around again.
- handle_method(Declare, ConnPid, ChSrc, CollectorPid, VHostPath,
+ handle_method(Declare, ConnPid, AuthzContext, CollectorPid, VHostPath,
User);
{absent, Q, Reason} ->
rabbit_amqqueue:absent(Q, Reason);
@@ -2595,7 +2562,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
handle_method(#'queue.declare'{queue = QueueNameBin,
nowait = NoWait,
passive = true},
- ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) ->
+ ConnPid, _AuthzContext, _CollectorPid, VHostPath, _User) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin),
Fun = fun (Q0) ->
@@ -2609,12 +2576,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
if_empty = IfEmpty},
- ConnPid, ChSrc, _CollectorPid, VHostPath,
+ ConnPid, AuthzContext, _CollectorPid, VHostPath,
User = #user{username = Username}) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath),
- check_configure_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_configure_permitted(QueueName, User, AuthzContext),
case rabbit_amqqueue:with(
QueueName,
fun (Q) ->
@@ -2638,13 +2605,13 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
end;
handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
if_unused = IfUnused},
- ConnPid, ChSrc, _CollectorPid, VHostPath,
+ _ConnPid, AuthzContext, _CollectorPid, VHostPath,
User = #user{username = Username}) ->
StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin),
check_not_default_exchange(ExchangeName),
check_exchange_deletion(ExchangeName),
- check_configure_permitted(ExchangeName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_configure_permitted(ExchangeName, User, AuthzContext),
case rabbit_exchange:delete(ExchangeName, IfUnused, Username) of
{error, not_found} ->
ok;
@@ -2654,9 +2621,9 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
ok
end;
handle_method(#'queue.purge'{queue = QueueNameBin},
- ConnPid, ChSrc, _CollectorPid, VHostPath, User) ->
+ ConnPid, AuthzContext, _CollectorPid, VHostPath, User) ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
- check_read_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_read_permitted(QueueName, User, AuthzContext),
rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:purge(Q) end);
@@ -2667,12 +2634,12 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
auto_delete = AutoDelete,
internal = Internal,
arguments = Args},
- ConnPid, ChSrc, _CollectorPid, VHostPath,
+ _ConnPid, AuthzContext, _CollectorPid, VHostPath,
#user{username = Username} = User) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
check_not_default_exchange(ExchangeName),
- check_configure_permitted(ExchangeName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_configure_permitted(ExchangeName, User, AuthzContext),
X = case rabbit_exchange:lookup(ExchangeName) of
{ok, FoundX} -> FoundX;
{error, not_found} ->
@@ -2684,9 +2651,8 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
precondition_failed(
"invalid type '~s' for arg '~s' in ~s",
[Type, AeKey, rabbit_misc:rs(ExchangeName)]);
- AName -> AuthContext = extract_authz_context(ConnPid, ChSrc),
- check_read_permitted(ExchangeName, User, AuthContext),
- check_write_permitted(AName, User, AuthContext),
+ AName -> check_read_permitted(ExchangeName, User, AuthzContext),
+ check_write_permitted(AName, User, AuthzContext),
ok
end,
rabbit_exchange:declare(ExchangeName,
@@ -2701,7 +2667,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
AutoDelete, Internal, Args);
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
passive = true},
- _ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) ->
+ _ConnPid, _AuthzContext, _CollectorPid, VHostPath, _User) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
check_not_default_exchange(ExchangeName),
_ = rabbit_exchange:lookup_or_die(ExchangeName).
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index a244813d80..7a76ab45ca 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -66,12 +66,12 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User,
{channel, {rabbit_channel, start_link,
[Channel, ReaderPid, WriterPid, ReaderPid, ConnName,
Protocol, User, VHost, Capabilities, Collector,
- LimiterPid]},
+ LimiterPid, undefined]},
intrinsic, ?FAIR_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
{ok, SupPid, {ChannelPid, AState}};
start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
- User, VHost, Capabilities, Collector}) ->
+ User, VHost, Capabilities, Collector, AmqpParams}) ->
{ok, SupPid} = supervisor2:start_link(
?MODULE, {direct, {ConnName, Channel}}),
[LimiterPid] = supervisor2:find_child(SupPid, limiter),
@@ -81,7 +81,7 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
{channel, {rabbit_channel, start_link,
[Channel, ClientChannelPid, ClientChannelPid, ConnPid,
ConnName, Protocol, User, VHost, Capabilities, Collector,
- LimiterPid]},
+ LimiterPid, AmqpParams]},
intrinsic, ?FAIR_WAIT, worker, [rabbit_channel]}),
{ok, SupPid, {ChannelPid, none}}.
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index b84b4d91bb..683be84676 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -17,7 +17,7 @@
-module(rabbit_direct).
-export([boot/0, force_event_refresh/1, list/0, connect/5,
- start_channel/9, disconnect/2]).
+ start_channel/10, disconnect/2]).
-deprecated([{force_event_refresh, 1, eventually}]).
@@ -201,17 +201,16 @@ connect1(User, VHost, Protocol, Pid, Infos) ->
-spec start_channel
(rabbit_channel:channel_number(), pid(), pid(), string(),
rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
- rabbit_framing:amqp_table(), pid()) ->
+ rabbit_framing:amqp_table(), pid(), any()) ->
{'ok', pid()}.
start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User,
- VHost, Capabilities, Collector) ->
+ VHost, Capabilities, Collector, AmqpParams) ->
{ok, _, {ChannelPid, _}} =
supervisor2:start_child(
rabbit_direct_client_sup,
[{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol,
- User, VHost, Capabilities, Collector}]),
- _ = rabbit_channel:source(ChannelPid, ?MODULE),
+ User, VHost, Capabilities, Collector, AmqpParams}]),
{ok, ChannelPid}.
-spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 116dcf89e6..70ed3246d3 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -925,7 +925,6 @@ create_channel(Channel,
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
Protocol, User, VHost, Capabilities, Collector}),
- _ = rabbit_channel:source(ChPid, ?MODULE),
MRef = erlang:monitor(process, ChPid),
put({ch_pid, ChPid}, {Channel, MRef}),
put({channel, Channel}, {ChPid, AState}),