summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2019-11-26 02:28:07 +0300
committerGitHub <noreply@github.com>2019-11-26 02:28:07 +0300
commitd130694df550305b128aeb2dfcd050f8f0522a37 (patch)
tree5f4614dee1ab37d1575d1b7f3fa32d497b155b13
parent79da341df49a8fc54229b930bd0e8d52f4209f64 (diff)
parent57ce02dc138b1ffa0f6964f53ea5b22c6613137d (diff)
downloadrabbitmq-server-git-d130694df550305b128aeb2dfcd050f8f0522a37.tar.gz
Merge pull request #2172 from rabbitmq/velimir-avoid-get-amqp-params-call
A continuation to 2169
-rw-r--r--src/rabbit_channel.erl218
-rw-r--r--src/rabbit_channel_sup.erl6
-rw-r--r--src/rabbit_direct.erl9
-rw-r--r--src/rabbit_reader.erl1
-rw-r--r--test/channel_source_SUITE.erl164
5 files changed, 99 insertions, 299 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 2148a435b6..37ac409985 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -55,7 +55,7 @@
-behaviour(gen_server2).
--export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
+-export([start_link/12, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, deliver_reply/2,
send_credit_reply/2, send_drained/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1,
@@ -63,7 +63,7 @@
-export([refresh_config_local/0, ready_for_close/1]).
-export([refresh_interceptors/0]).
-export([force_event_refresh/1]).
--export([source/2, update_user_state/2]).
+-export([update_user_state/2]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1,
@@ -121,7 +121,8 @@
consumer_prefetch,
%% Message content size limit
max_message_size,
- consumer_timeout
+ consumer_timeout,
+ authz_context
}).
-record(ch, {cfg :: #conf{},
@@ -239,14 +240,14 @@
-spec start_link
(channel_number(), pid(), pid(), pid(), string(), rabbit_types:protocol(),
rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
- pid(), pid()) ->
+ pid(), pid(), any()) ->
rabbit_types:ok_pid_or_error().
start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User,
- VHost, Capabilities, CollectorPid, Limiter) ->
+ VHost, Capabilities, CollectorPid, Limiter, AmqpParams) ->
gen_server2:start_link(
?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol,
- User, VHost, Capabilities, CollectorPid, Limiter], []).
+ User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams], []).
-spec do(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
@@ -460,15 +461,6 @@ force_event_refresh(Ref) ->
list_queue_states(Pid) ->
gen_server2:call(Pid, list_queue_states).
--spec source(pid(), any()) -> 'ok' | {error, channel_terminated}.
-
-source(Pid, Source) when is_pid(Pid) ->
- case erlang:is_process_alive(Pid) of
- true -> Pid ! {channel_source, Source},
- ok;
- false -> {error, channel_terminated}
- end.
-
-spec update_user_state(pid(), rabbit_types:auth_user()) -> 'ok' | {error, channel_terminated}.
update_user_state(Pid, UserState) when is_pid(Pid) ->
@@ -481,7 +473,7 @@ update_user_state(Pid, UserState) when is_pid(Pid) ->
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
- Capabilities, CollectorPid, LimiterPid]) ->
+ Capabilities, CollectorPid, LimiterPid, AmqpParams]) ->
process_flag(trap_exit, true),
?LG_PROCESS_TYPE(channel),
?store_proc_name({ConnName, Channel}),
@@ -504,6 +496,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
put(permission_cache_can_expire, rabbit_access_control:permission_cache_can_expire(User)),
MaxMessageSize = get_max_message_size(),
ConsumerTimeout = get_consumer_timeout(),
+ OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams),
State = #ch{cfg = #conf{state = starting,
protocol = Protocol,
channel = Channel,
@@ -519,7 +512,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
trace_state = rabbit_trace:init(VHost),
consumer_prefetch = Prefetch,
max_message_size = MaxMessageSize,
- consumer_timeout = ConsumerTimeout
+ consumer_timeout = ConsumerTimeout,
+ authz_context = OptionalVariables
},
limiter = Limiter,
tx = none,
@@ -874,8 +868,6 @@ handle_info(tick, State0 = #ch{queue_states = QueueStates0}) ->
Return ->
Return
end;
-handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) ->
- noreply(State#ch{cfg = Cfg#conf{source = Source}});
handle_info({update_user_state, User}, State = #ch{cfg = Cfg}) ->
noreply(State#ch{cfg = Cfg#conf{user = User}}).
@@ -1027,11 +1019,11 @@ check_write_permitted(Resource, User, Context) ->
check_read_permitted(Resource, User, Context) ->
check_resource_access(User, Resource, read, Context).
-check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) ->
- check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, write).
+check_write_permitted_on_topic(Resource, User, RoutingKey, AuthzContext) ->
+ check_topic_authorisation(Resource, User, RoutingKey, AuthzContext, write).
-check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) ->
- check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, read).
+check_read_permitted_on_topic(Resource, User, RoutingKey, AuthzContext) ->
+ check_topic_authorisation(Resource, User, RoutingKey, AuthzContext, read).
check_user_id_header(#'P_basic'{user_id = undefined}, _) ->
ok;
@@ -1066,23 +1058,11 @@ check_internal_exchange(#exchange{name = Name, internal = true}) ->
check_internal_exchange(_) ->
ok.
-check_topic_authorisation(Resource = #exchange{type = topic},
- User, none, RoutingKey, _ChSrc, Permission) ->
- %% Called from outside the channel by mgmt API
- AmqpParams = [],
- check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission);
-check_topic_authorisation(Resource = #exchange{type = topic},
- User, ConnPid, RoutingKey, ChSrc, Permission) when is_pid(ConnPid) ->
- AmqpParams = get_amqp_params(ConnPid, ChSrc),
- check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission);
-check_topic_authorisation(_, _, _, _, _, _) ->
- ok.
-
check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic},
User = #user{username = Username},
- AmqpParams, RoutingKey, Permission) ->
+ RoutingKey, AuthzContext, Permission) ->
Resource = Name#resource{kind = topic},
- VariableMap = build_topic_variable_map(AmqpParams, VHost, Username),
+ VariableMap = build_topic_variable_map(AuthzContext, VHost, Username),
Context = #{routing_key => RoutingKey,
variable_map => VariableMap},
Cache = case get(topic_permission_cache) of
@@ -1095,35 +1075,27 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost
User, Resource, Permission, Context),
CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1),
put(topic_permission_cache, [{Resource, Context, Permission} | CacheTail])
- end.
+ end;
+check_topic_authorisation(_, _, _, _, _) ->
+ ok.
-get_amqp_params(_ConnPid, rabbit_reader) -> [];
-get_amqp_params(ConnPid, _Any) when is_pid(ConnPid) ->
- Timeout = get_operation_timeout(),
- get_amqp_params(ConnPid, rabbit_misc:is_process_alive(ConnPid), Timeout);
-get_amqp_params(_, _) -> [].
-
-get_amqp_params(ConnPid, false, _Timeout) ->
- %% Connection process is dead
- rabbit_log_channel:debug("file ~p, line ~p - connection process not alive: ~p~n",
- [?FILE, ?LINE, ConnPid]),
- [];
-get_amqp_params(ConnPid, true, Timeout) ->
- rabbit_amqp_connection:amqp_params(ConnPid, Timeout).
-
-build_topic_variable_map(AmqpParams, VHost, Username) ->
- VariableFromAmqpParams = extract_variable_map_from_amqp_params(AmqpParams),
- maps:merge(VariableFromAmqpParams, #{<<"vhost">> => VHost, <<"username">> => Username}).
-
-extract_authz_context(ConnPid, ChSrc) ->
- extract_variable_map_from_amqp_params(get_amqp_params(ConnPid, ChSrc)).
-
-%% use tuple representation of amqp_params to avoid coupling.
-%% get variable map only from amqp_params_direct, not amqp_params_network.
-%% amqp_params_direct are usually used from plugins (e.g. MQTT, STOMP)
-extract_variable_map_from_amqp_params([{amqp_params, {amqp_params_direct, _, _, _, _,
- {amqp_adapter_info, _,_,_,_,_,_,AdditionalInfo}, _}}]) ->
+
+build_topic_variable_map(AuthzContext, VHost, Username) when is_map(AuthzContext) ->
+ maps:merge(AuthzContext, #{<<"vhost">> => VHost, <<"username">> => Username});
+build_topic_variable_map(AuthzContext, VHost, Username) ->
+ maps:merge(extract_variable_map_from_amqp_params(AuthzContext), #{<<"vhost">> => VHost, <<"username">> => Username}).
+
+%% Use tuple representation of amqp_params to avoid a dependency on amqp_client.
+%% Extracts variable map only from amqp_params_direct, not amqp_params_network.
+%% amqp_params_direct records are usually used by plugins (e.g. MQTT, STOMP)
+extract_variable_map_from_amqp_params({amqp_params, {amqp_params_direct, _, _, _, _,
+ {amqp_adapter_info, _,_,_,_,_,_,AdditionalInfo}, _}}) ->
proplists:get_value(variable_map, AdditionalInfo, #{});
+extract_variable_map_from_amqp_params({amqp_params_direct, _, _, _, _,
+ {amqp_adapter_info, _,_,_,_,_,_,AdditionalInfo}, _}) ->
+ proplists:get_value(variable_map, AdditionalInfo, #{});
+extract_variable_map_from_amqp_params([Value]) ->
+ extract_variable_map_from_amqp_params(Value);
extract_variable_map_from_amqp_params(_) ->
#{}.
@@ -1317,13 +1289,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory},
Content, State = #ch{cfg = #conf{channel = ChannelNum,
- conn_pid = ConnPid,
- source = ChSrc,
conn_name = ConnName,
virtual_host = VHostPath,
user = #user{username = Username} = User,
trace_state = TraceState,
- max_message_size = MaxMessageSize
+ max_message_size = MaxMessageSize,
+ authz_context = AuthzContext
},
tx = Tx,
confirm_enabled = ConfirmEnabled,
@@ -1331,10 +1302,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
}) ->
check_msg_size(Content, MaxMessageSize),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- check_write_permitted(ExchangeName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_write_permitted(ExchangeName, User, AuthzContext),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
check_internal_exchange(Exchange),
- check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc),
+ check_write_permitted_on_topic(Exchange, User, RoutingKey, AuthzContext),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = #content {properties = Props} =
@@ -1387,13 +1358,13 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
conn_pid = ConnPid,
user = User,
virtual_host = VHostPath,
- source = ChSrc
+ authz_context = AuthzContext
},
limiter = Limiter,
next_tag = DeliveryTag,
queue_states = QueueStates0}) ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
- check_read_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_read_permitted(QueueName, User, AuthzContext),
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
%% Use the delivery tag as consumer tag for quorum queues
@@ -1474,14 +1445,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
_, State = #ch{cfg = #conf{consumer_prefetch = ConsumerPrefetch,
user = User,
virtual_host = VHostPath,
- conn_pid = ConnPid,
- source = ChSrc},
+ authz_context = AuthzContext},
consumer_mapping = ConsumerMapping
}) ->
case maps:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
- check_read_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_read_permitted(QueueName, User, AuthzContext),
ActualConsumerTag =
case ConsumerTag of
<<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
@@ -1625,84 +1595,84 @@ handle_method(#'exchange.declare'{nowait = NoWait} = Method,
user = User,
queue_collector_pid = CollectorPid,
conn_pid = ConnPid,
- source = ChSrc}}) ->
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ authz_context = AuthzContext}}) ->
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.delete'{nowait = NoWait} = Method,
_, State = #ch{cfg = #conf{conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
virtual_host = VHostPath,
queue_collector_pid = CollectorPid,
user = User}}) ->
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.delete_ok'{});
handle_method(#'exchange.bind'{nowait = NoWait} = Method,
_, State = #ch{cfg = #conf{virtual_host = VHostPath,
conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
queue_collector_pid = CollectorPid,
user = User}}) ->
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.bind_ok'{});
handle_method(#'exchange.unbind'{nowait = NoWait} = Method,
_, State = #ch{cfg = #conf{virtual_host = VHostPath,
conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
queue_collector_pid = CollectorPid,
user = User}}) ->
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.unbind_ok'{});
handle_method(#'queue.declare'{nowait = NoWait} = Method,
_, State = #ch{cfg = #conf{virtual_host = VHostPath,
conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
queue_collector_pid = CollectorPid,
user = User}}) ->
{ok, QueueName, MessageCount, ConsumerCount} =
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_queue_declare_ok(QueueName, NoWait, MessageCount,
ConsumerCount, State);
handle_method(#'queue.delete'{nowait = NoWait} = Method, _,
State = #ch{cfg = #conf{conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
virtual_host = VHostPath,
queue_collector_pid = CollectorPid,
user = User}}) ->
{ok, PurgedMessageCount} =
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_ok(State, NoWait,
#'queue.delete_ok'{message_count = PurgedMessageCount});
handle_method(#'queue.bind'{nowait = NoWait} = Method, _,
State = #ch{cfg = #conf{conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}}) ->
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'queue.bind_ok'{});
handle_method(#'queue.unbind'{} = Method, _,
State = #ch{cfg = #conf{conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}}) ->
- handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, AuthzContext, CollectorPid, VHostPath, User),
return_ok(State, false, #'queue.unbind_ok'{});
handle_method(#'queue.purge'{nowait = NoWait} = Method,
_, State = #ch{cfg = #conf{conn_pid = ConnPid,
- source = ChSrc,
+ authz_context = AuthzContext,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}}) ->
- case handle_method(Method, ConnPid, ChSrc, CollectorPid,
+ case handle_method(Method, ConnPid, AuthzContext, CollectorPid,
VHostPath, User) of
{ok, PurgedMessageCount} ->
return_ok(State, NoWait,
@@ -1948,21 +1918,20 @@ handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QRef, DQ)}.
binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
- RoutingKey, Arguments, VHostPath, ConnPid, ChSrc,
+ RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext,
#user{username = Username} = User) ->
ExchangeNameBin = strip_cr_lf(SourceNameBin0),
DestinationNameBin = strip_cr_lf(DestinationNameBin0),
DestinationName = name_to_resource(DestinationType, DestinationNameBin, VHostPath),
- AuthContext = extract_authz_context(ConnPid, ChSrc),
- check_write_permitted(DestinationName, User, AuthContext),
+ check_write_permitted(DestinationName, User, AuthzContext),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
[check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]],
- check_read_permitted(ExchangeName, User, AuthContext),
+ check_read_permitted(ExchangeName, User, AuthzContext),
case rabbit_exchange:lookup(ExchangeName) of
{error, not_found} ->
ok;
{ok, Exchange} ->
- check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc)
+ check_read_permitted_on_topic(Exchange, User, RoutingKey, AuthzContext)
end,
case Fun(#binding{source = ExchangeName,
destination = DestinationName,
@@ -2393,7 +2362,6 @@ i(user_who_performed_action, Ch) -> i(user, Ch);
i(vhost, #ch{cfg = #conf{virtual_host = VHost}}) -> VHost;
i(transactional, #ch{tx = Tx}) -> Tx =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
-i(source, #ch{cfg = #conf{source = ChSrc}}) -> ChSrc;
i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM);
i(messages_unconfirmed, #ch{unconfirmed = UC}) -> unconfirmed_messages:size(UC);
@@ -2473,39 +2441,39 @@ handle_method(#'exchange.bind'{destination = DestinationNameBin,
source = SourceNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
+ ConnPid, AuthzContext, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:add/3,
SourceNameBin, exchange, DestinationNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User);
handle_method(#'exchange.unbind'{destination = DestinationNameBin,
source = SourceNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
+ ConnPid, AuthzContext, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:remove/3,
SourceNameBin, exchange, DestinationNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User);
handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
+ ConnPid, AuthzContext, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:remove/3,
ExchangeNameBin, queue, QueueNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User);
handle_method(#'queue.bind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
+ ConnPid, AuthzContext, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:add/3,
ExchangeNameBin, queue, QueueNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, AuthzContext, User);
%% Note that all declares to these are effectively passive. If it
%% exists it by definition has one consumer.
handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to",
_/binary>> = QueueNameBin},
- _ConnPid, _ChSrc, _CollectorPid, VHost, _User) ->
+ _ConnPid, _AuthzContext, _CollectorPid, VHost, _User) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin),
case declare_fast_reply_to(StrippedQueueNameBin) of
@@ -2519,7 +2487,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
auto_delete = AutoDelete,
nowait = NoWait,
arguments = Args} = Declare,
- ConnPid, ChSrc, CollectorPid, VHostPath,
+ ConnPid, AuthzContext, CollectorPid, VHostPath,
#user{username = Username} = User) ->
Owner = case ExclusiveDeclare of
true -> ConnPid;
@@ -2533,7 +2501,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- check_configure_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_configure_permitted(QueueName, User, AuthzContext),
rabbit_core_metrics:queue_declared(QueueName),
case rabbit_amqqueue:with(
QueueName,
@@ -2555,9 +2523,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
"invalid type '~s' for arg '~s' in ~s",
[Type, DlxKey, rabbit_misc:rs(QueueName)]);
DLX ->
- AuthContext = extract_authz_context(ConnPid, ChSrc),
- check_read_permitted(QueueName, User, AuthContext),
- check_write_permitted(DLX, User, AuthContext),
+ check_read_permitted(QueueName, User, AuthzContext),
+ check_write_permitted(DLX, User, AuthzContext),
ok
end,
case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
@@ -2579,7 +2546,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
{existing, _Q} ->
%% must have been created between the stat and the
%% declare. Loop around again.
- handle_method(Declare, ConnPid, ChSrc, CollectorPid, VHostPath,
+ handle_method(Declare, ConnPid, AuthzContext, CollectorPid, VHostPath,
User);
{absent, Q, Reason} ->
rabbit_amqqueue:absent(Q, Reason);
@@ -2595,7 +2562,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
handle_method(#'queue.declare'{queue = QueueNameBin,
nowait = NoWait,
passive = true},
- ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) ->
+ ConnPid, _AuthzContext, _CollectorPid, VHostPath, _User) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin),
Fun = fun (Q0) ->
@@ -2609,12 +2576,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
if_empty = IfEmpty},
- ConnPid, ChSrc, _CollectorPid, VHostPath,
+ ConnPid, AuthzContext, _CollectorPid, VHostPath,
User = #user{username = Username}) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath),
- check_configure_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_configure_permitted(QueueName, User, AuthzContext),
case rabbit_amqqueue:with(
QueueName,
fun (Q) ->
@@ -2638,13 +2605,13 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
end;
handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
if_unused = IfUnused},
- ConnPid, ChSrc, _CollectorPid, VHostPath,
+ _ConnPid, AuthzContext, _CollectorPid, VHostPath,
User = #user{username = Username}) ->
StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin),
check_not_default_exchange(ExchangeName),
check_exchange_deletion(ExchangeName),
- check_configure_permitted(ExchangeName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_configure_permitted(ExchangeName, User, AuthzContext),
case rabbit_exchange:delete(ExchangeName, IfUnused, Username) of
{error, not_found} ->
ok;
@@ -2654,9 +2621,9 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
ok
end;
handle_method(#'queue.purge'{queue = QueueNameBin},
- ConnPid, ChSrc, _CollectorPid, VHostPath, User) ->
+ ConnPid, AuthzContext, _CollectorPid, VHostPath, User) ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
- check_read_permitted(QueueName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_read_permitted(QueueName, User, AuthzContext),
rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:purge(Q) end);
@@ -2667,12 +2634,12 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
auto_delete = AutoDelete,
internal = Internal,
arguments = Args},
- ConnPid, ChSrc, _CollectorPid, VHostPath,
+ _ConnPid, AuthzContext, _CollectorPid, VHostPath,
#user{username = Username} = User) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
check_not_default_exchange(ExchangeName),
- check_configure_permitted(ExchangeName, User, extract_authz_context(ConnPid, ChSrc)),
+ check_configure_permitted(ExchangeName, User, AuthzContext),
X = case rabbit_exchange:lookup(ExchangeName) of
{ok, FoundX} -> FoundX;
{error, not_found} ->
@@ -2684,9 +2651,8 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
precondition_failed(
"invalid type '~s' for arg '~s' in ~s",
[Type, AeKey, rabbit_misc:rs(ExchangeName)]);
- AName -> AuthContext = extract_authz_context(ConnPid, ChSrc),
- check_read_permitted(ExchangeName, User, AuthContext),
- check_write_permitted(AName, User, AuthContext),
+ AName -> check_read_permitted(ExchangeName, User, AuthzContext),
+ check_write_permitted(AName, User, AuthzContext),
ok
end,
rabbit_exchange:declare(ExchangeName,
@@ -2701,7 +2667,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
AutoDelete, Internal, Args);
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
passive = true},
- _ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) ->
+ _ConnPid, _AuthzContext, _CollectorPid, VHostPath, _User) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
check_not_default_exchange(ExchangeName),
_ = rabbit_exchange:lookup_or_die(ExchangeName).
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index a244813d80..7a76ab45ca 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -66,12 +66,12 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User,
{channel, {rabbit_channel, start_link,
[Channel, ReaderPid, WriterPid, ReaderPid, ConnName,
Protocol, User, VHost, Capabilities, Collector,
- LimiterPid]},
+ LimiterPid, undefined]},
intrinsic, ?FAIR_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
{ok, SupPid, {ChannelPid, AState}};
start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
- User, VHost, Capabilities, Collector}) ->
+ User, VHost, Capabilities, Collector, AmqpParams}) ->
{ok, SupPid} = supervisor2:start_link(
?MODULE, {direct, {ConnName, Channel}}),
[LimiterPid] = supervisor2:find_child(SupPid, limiter),
@@ -81,7 +81,7 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
{channel, {rabbit_channel, start_link,
[Channel, ClientChannelPid, ClientChannelPid, ConnPid,
ConnName, Protocol, User, VHost, Capabilities, Collector,
- LimiterPid]},
+ LimiterPid, AmqpParams]},
intrinsic, ?FAIR_WAIT, worker, [rabbit_channel]}),
{ok, SupPid, {ChannelPid, none}}.
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index b84b4d91bb..683be84676 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -17,7 +17,7 @@
-module(rabbit_direct).
-export([boot/0, force_event_refresh/1, list/0, connect/5,
- start_channel/9, disconnect/2]).
+ start_channel/10, disconnect/2]).
-deprecated([{force_event_refresh, 1, eventually}]).
@@ -201,17 +201,16 @@ connect1(User, VHost, Protocol, Pid, Infos) ->
-spec start_channel
(rabbit_channel:channel_number(), pid(), pid(), string(),
rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
- rabbit_framing:amqp_table(), pid()) ->
+ rabbit_framing:amqp_table(), pid(), any()) ->
{'ok', pid()}.
start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User,
- VHost, Capabilities, Collector) ->
+ VHost, Capabilities, Collector, AmqpParams) ->
{ok, _, {ChannelPid, _}} =
supervisor2:start_child(
rabbit_direct_client_sup,
[{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol,
- User, VHost, Capabilities, Collector}]),
- _ = rabbit_channel:source(ChannelPid, ?MODULE),
+ User, VHost, Capabilities, Collector, AmqpParams}]),
{ok, ChannelPid}.
-spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 116dcf89e6..70ed3246d3 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -925,7 +925,6 @@ create_channel(Channel,
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
Protocol, User, VHost, Capabilities, Collector}),
- _ = rabbit_channel:source(ChPid, ?MODULE),
MRef = erlang:monitor(process, ChPid),
put({ch_pid, ChPid}, {Channel, MRef}),
put({channel, Channel}, {ChPid, AState}),
diff --git a/test/channel_source_SUITE.erl b/test/channel_source_SUITE.erl
deleted file mode 100644
index 3247ffa997..0000000000
--- a/test/channel_source_SUITE.erl
+++ /dev/null
@@ -1,164 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at https://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(channel_source_SUITE).
-
--include_lib("amqp_client/include/amqp_client.hrl").
--include_lib("eunit/include/eunit.hrl").
-
--compile(export_all).
-
-all() ->
- [
- {group, non_parallel_tests}
- ].
-
-groups() ->
- [
- {non_parallel_tests, [], [
- network_rabbit_reader_channel_source,
- network_arbitrary_channel_source,
- direct_channel_source,
- undefined_channel_source
- ]}
- ].
-
-%% -------------------------------------------------------------------
-%% Testsuite setup/teardown.
-%% -------------------------------------------------------------------
-
-init_per_suite(Config) ->
- rabbit_ct_helpers:log_environment(),
- rabbit_ct_helpers:run_setup_steps(Config).
-
-end_per_suite(Config) ->
- rabbit_ct_helpers:run_teardown_steps(Config).
-
-init_per_group(_, Config) ->
- Config.
-
-end_per_group(_, Config) ->
- Config.
-
-init_per_testcase(Testcase, Config) ->
- rabbit_ct_helpers:testcase_started(Config, Testcase),
- Config1 = rabbit_ct_helpers:set_config(Config, [
- {rmq_nodename_suffix, Testcase}
- ]),
- rabbit_ct_helpers:run_steps(Config1,
- rabbit_ct_broker_helpers:setup_steps() ++
- rabbit_ct_client_helpers:setup_steps()).
-
-end_per_testcase(Testcase, Config) ->
- Config1 = rabbit_ct_helpers:run_steps(Config,
- rabbit_ct_client_helpers:teardown_steps() ++
- rabbit_ct_broker_helpers:teardown_steps()),
- rabbit_ct_helpers:testcase_finished(Config1, Testcase).
-
-%% -------------------------------------------------------------------
-%% Testcases.
-%% -------------------------------------------------------------------
-
-network_rabbit_reader_channel_source(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, network_rabbit_reader_channel_source1, [Config]).
-
-network_rabbit_reader_channel_source1(Config) ->
- ExistingChannels = rabbit_channel:list(),
- Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
- {ok, ClientCh} = amqp_connection:open_channel(Conn),
- [ServerCh] = rabbit_channel:list() -- ExistingChannels,
- [{source, rabbit_reader}] = rabbit_channel:info(ServerCh, [source]),
- _ = rabbit_channel:source(ServerCh, ?MODULE),
- [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]),
- amqp_channel:close(ClientCh),
- amqp_connection:close(Conn),
- wait_for_channel_termination(ServerCh, 60),
- passed.
-
-network_arbitrary_channel_source(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, network_arbitrary_channel_source1, [Config]).
-
-network_arbitrary_channel_source1(Config) ->
- Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
- Writer = spawn(fun () -> rabbit_ct_broker_helpers:test_writer(self()) end),
- {ok, Limiter} = rabbit_limiter:start_link(no_limiter_id),
- {ok, Collector} = rabbit_queue_collector:start_link(no_collector_id),
- {ok, Ch} = rabbit_channel:start_link(
- 1, Conn, Writer, Conn, "", rabbit_framing_amqp_0_9_1,
- rabbit_ct_broker_helpers:user(<<"guest">>), <<"/">>, [],
- Collector, Limiter),
- _ = rabbit_channel:source(Ch, ?MODULE),
- [{source, ?MODULE}] = rabbit_channel:info(Ch, [source]),
- [exit(P, normal) || P <- [Writer, Limiter, Collector, Ch]],
- amqp_connection:close(Conn),
- wait_for_channel_termination(Ch, 60),
- passed.
-
-direct_channel_source(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, direct_channel_source1, [Config]).
-
-direct_channel_source1(Config) ->
- ExistingChannels = rabbit_channel:list(),
- Conn = rabbit_ct_client_helpers:open_unmanaged_connection_direct(Config),
- {ok, ClientCh} = amqp_connection:open_channel(Conn),
- [ServerCh] = rabbit_channel:list() -- ExistingChannels,
- [{source, rabbit_direct}] = rabbit_channel:info(ServerCh, [source]),
- _ = rabbit_channel:source(ServerCh, ?MODULE),
- [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]),
- amqp_channel:close(ClientCh),
- amqp_connection:close(Conn),
- wait_for_channel_termination(ServerCh, 60),
- passed.
-
-wait_for_channel_termination(Ch, 0) ->
- ?assertEqual(
- {error, channel_terminated},
- rabbit_channel:source(Ch, ?MODULE));
-wait_for_channel_termination(Ch, Attempts) ->
- case rabbit_channel:source(Ch, ?MODULE) of
- {error, channel_terminated} ->
- ok;
- _ ->
- timer:sleep(1000),
- wait_for_channel_termination(Ch, Attempts - 1)
- end.
-
-undefined_channel_source(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, undefined_channel_source1, [Config]).
-
-undefined_channel_source1(_Config) ->
- ExistingChannels = rabbit_channel:list(),
- {_Writer, _Limiter, ServerCh} = rabbit_ct_broker_helpers:test_channel(),
- wait_for_server_channel(ExistingChannels, ServerCh, 60),
- [{source, undefined}] = rabbit_channel:info(ServerCh, [source]),
- _ = rabbit_channel:source(ServerCh, ?MODULE),
- [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]),
- passed.
-
-wait_for_server_channel(ExistingChannels, ServerCh, 0) ->
- ?assertEqual([ServerCh], rabbit_channel:list() -- ExistingChannels);
-wait_for_server_channel(ExistingChannels, ServerCh, Attempts) ->
- case rabbit_channel:list() -- ExistingChannels of
- [ServerCh] ->
- ok;
- _ ->
- timer:sleep(1000),
- wait_for_server_channel(ExistingChannels, ServerCh, Attempts - 1)
- end.