summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAyanda Dube <ayanda.dube@erlang-solutions.com>2019-02-18 11:32:13 +0100
committerAyanda Dube <ayanda.dube@erlang-solutions.com>2019-02-18 11:47:10 +0100
commitf71ace363e55e9aca922c00831b49d420217b439 (patch)
treea532b59d8b5e58564e5bdbeeabb3950a1178ef7d
parent2a80aa97ac361b11679af6c755807c68ad8b7b2a (diff)
downloadrabbitmq-server-git-f71ace363e55e9aca922c00831b49d420217b439.tar.gz
store and retrieve channel source from state
-rw-r--r--src/rabbit_channel.erl110
-rw-r--r--test/channel_source_SUITE.erl55
2 files changed, 85 insertions, 80 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index dcb4befee6..8f5f159f88 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -79,7 +79,7 @@
-export([list_queue_states/1, get_max_message_size/0]).
%% Mgmt HTTP API refactor
--export([handle_method/5]).
+-export([handle_method/6]).
-record(ch, {
%% starting | running | flow | closing
@@ -98,6 +98,9 @@
%% same as reader's name, see #v1.name
%% in rabbit_reader
conn_name,
+ %% channel's originating source e.g. rabbit_reader | rabbit_direct | undefined
+ %% or any other channel creating/spawning entity
+ source,
%% limiter pid, see rabbit_limiter
limiter,
%% none | {Msgs, Acks} | committing | failed |
@@ -816,9 +819,8 @@ handle_info(queue_cleanup, State = #ch{queue_states = QueueStates0}) ->
end, QueueStates0),
noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates}));
-handle_info({channel_source, Source}, State) ->
- put(channel_source, Source),
- noreply(State).
+handle_info({channel_source, Source}, State = #ch{}) ->
+ noreply(State#ch{source = Source}).
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
@@ -951,11 +953,11 @@ check_write_permitted(Resource, User) ->
check_read_permitted(Resource, User) ->
check_resource_access(User, Resource, read).
-check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey) ->
- check_topic_authorisation(Resource, User, ConnPid, RoutingKey, write).
+check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) ->
+ check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, write).
-check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey) ->
- check_topic_authorisation(Resource, User, ConnPid, RoutingKey, read).
+check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) ->
+ check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, read).
check_user_id_header(#'P_basic'{user_id = undefined}, _) ->
ok;
@@ -991,14 +993,17 @@ check_internal_exchange(_) ->
ok.
check_topic_authorisation(Resource = #exchange{type = topic},
- User, none, RoutingKey, Permission) ->
+ User, none, RoutingKey, _ChSrc, Permission) ->
%% Called from outside the channel by mgmt API
AmqpParams = [],
check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission);
check_topic_authorisation(Resource = #exchange{type = topic},
- User, ConnPid, RoutingKey, Permission) when is_pid(ConnPid) ->
- AmqpParams = get_amqp_params(ConnPid, get(channel_source)),
+ User, ConnPid, RoutingKey, ChSrc, Permission) when is_pid(ConnPid) ->
+ AmqpParams = get_amqp_params(ConnPid, ChSrc),
check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission);
+check_topic_authorisation(_, _, _, _, _, _) ->
+ ok.
+
check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic},
User = #user{username = Username},
AmqpParams, RoutingKey, Permission) ->
@@ -1016,9 +1021,7 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost
User, Resource, Permission, Context),
CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1),
put(topic_permission_cache, [{Resource, Context, Permission} | CacheTail])
- end;
-check_topic_authorisation(_, _, _, _, _) ->
- ok.
+ end.
get_amqp_params(_ConnPid, rabbit_reader) -> [];
get_amqp_params(ConnPid, _Any) when is_pid(ConnPid) ->
@@ -1241,13 +1244,14 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
conn_name = ConnName,
delivery_flow = Flow,
conn_pid = ConnPid,
+ source = ChSrc,
max_message_size = MaxMessageSize}) ->
check_msg_size(Content, MaxMessageSize),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, User),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
check_internal_exchange(Exchange),
- check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey),
+ check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = #content {properties = Props} =
@@ -1530,76 +1534,85 @@ handle_method(#'exchange.declare'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
user = User,
queue_collector_pid = CollectorPid,
- conn_pid = ConnPid}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ conn_pid = ConnPid,
+ source = ChSrc}) ->
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.delete'{nowait = NoWait} = Method,
_, State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
virtual_host = VHostPath,
queue_collector_pid = CollectorPid,
user = User}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.delete_ok'{});
handle_method(#'exchange.bind'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid,
+ source = ChSrc,
queue_collector_pid = CollectorPid,
user = User}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.bind_ok'{});
handle_method(#'exchange.unbind'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid,
+ source = ChSrc,
queue_collector_pid = CollectorPid,
user = User}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.unbind_ok'{});
handle_method(#'queue.declare'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid,
+ source = ChSrc,
queue_collector_pid = CollectorPid,
user = User}) ->
{ok, QueueName, MessageCount, ConsumerCount} =
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_queue_declare_ok(QueueName, NoWait, MessageCount,
ConsumerCount, State);
handle_method(#'queue.delete'{nowait = NoWait} = Method, _,
State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
virtual_host = VHostPath,
queue_collector_pid = CollectorPid,
user = User}) ->
{ok, PurgedMessageCount} =
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait,
#'queue.delete_ok'{message_count = PurgedMessageCount});
handle_method(#'queue.bind'{nowait = NoWait} = Method, _,
State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'queue.bind_ok'{});
handle_method(#'queue.unbind'{} = Method, _,
State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, false, #'queue.unbind_ok'{});
handle_method(#'queue.purge'{nowait = NoWait} = Method,
_, State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}) ->
- case handle_method(Method, ConnPid, CollectorPid,
+ case handle_method(Method, ConnPid, ChSrc, CollectorPid,
VHostPath, User) of
{ok, PurgedMessageCount} ->
return_ok(State, NoWait,
@@ -1825,7 +1838,7 @@ handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QRef, DQ)}.
binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
- RoutingKey, Arguments, VHostPath, ConnPid,
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc,
#user{username = Username} = User) ->
ExchangeNameBin = strip_cr_lf(SourceNameBin0),
DestinationNameBin = strip_cr_lf(DestinationNameBin0),
@@ -1834,14 +1847,11 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
[check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]],
check_read_permitted(ExchangeName, User),
- ExchangeLookup = rabbit_exchange:lookup(ExchangeName),
- case ExchangeLookup of
+ case rabbit_exchange:lookup(ExchangeName) of
{error, not_found} ->
- %% no-op
- ExchangeLookup;
+ ok;
{ok, Exchange} ->
- check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey),
- ExchangeLookup
+ check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc)
end,
case Fun(#binding{source = ExchangeName,
destination = DestinationName,
@@ -2240,6 +2250,7 @@ i(user_who_performed_action, Ch) -> i(user, Ch);
i(vhost, #ch{virtual_host = VHost}) -> VHost;
i(transactional, #ch{tx = Tx}) -> Tx =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
+i(source, #ch{source = ChSrc}) -> ChSrc;
i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM);
i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
@@ -2260,7 +2271,6 @@ i(garbage_collection, _State) ->
i(reductions, _State) ->
{reductions, Reductions} = erlang:process_info(self(), reductions),
Reductions;
-i(channel_source, _State = #ch{}) -> get(channel_source);
i(Item, _) ->
throw({bad_argument, Item}).
@@ -2313,39 +2323,39 @@ handle_method(#'exchange.bind'{destination = DestinationNameBin,
source = SourceNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, _CollectorId, VHostPath, User) ->
+ ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:add/3,
SourceNameBin, exchange, DestinationNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
handle_method(#'exchange.unbind'{destination = DestinationNameBin,
source = SourceNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, _CollectorId, VHostPath, User) ->
+ ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:remove/3,
SourceNameBin, exchange, DestinationNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, _CollectorId, VHostPath, User) ->
+ ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:remove/3,
ExchangeNameBin, queue, QueueNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
handle_method(#'queue.bind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, _CollectorId, VHostPath, User) ->
+ ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:add/3,
ExchangeNameBin, queue, QueueNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
%% Note that all declares to these are effectively passive. If it
%% exists it by definition has one consumer.
handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to",
_/binary>> = QueueNameBin},
- _ConnPid, _CollectorPid, VHost, _User) ->
+ _ConnPid, _ChSrc, _CollectorPid, VHost, _User) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin),
case declare_fast_reply_to(StrippedQueueNameBin) of
@@ -2359,7 +2369,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
auto_delete = AutoDelete,
nowait = NoWait,
arguments = Args} = Declare,
- ConnPid, CollectorPid, VHostPath,
+ ConnPid, ChSrc, CollectorPid, VHostPath,
#user{username = Username} = User) ->
Owner = case ExclusiveDeclare of
true -> ConnPid;
@@ -2418,7 +2428,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
{existing, _Q} ->
%% must have been created between the stat and the
%% declare. Loop around again.
- handle_method(Declare, ConnPid, CollectorPid, VHostPath,
+ handle_method(Declare, ConnPid, ChSrc, CollectorPid, VHostPath,
User);
{absent, Q, Reason} ->
rabbit_amqqueue:absent(Q, Reason);
@@ -2434,7 +2444,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
handle_method(#'queue.declare'{queue = QueueNameBin,
nowait = NoWait,
passive = true},
- ConnPid, _CollectorPid, VHostPath, _User) ->
+ ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin),
Fun = fun (Q0) ->
@@ -2448,7 +2458,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
if_empty = IfEmpty},
- ConnPid, _CollectorPid, VHostPath,
+ ConnPid, _ChSrc, _CollectorPid, VHostPath,
User = #user{username = Username}) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath),
@@ -2477,7 +2487,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
end;
handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
if_unused = IfUnused},
- _ConnPid, _CollectorPid, VHostPath,
+ _ConnPid, _ChSrc, _CollectorPid, VHostPath,
User = #user{username = Username}) ->
StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin),
@@ -2493,7 +2503,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
ok
end;
handle_method(#'queue.purge'{queue = QueueNameBin},
- ConnPid, _CollectorPid, VHostPath, User) ->
+ ConnPid, _ChSrc, _CollectorPid, VHostPath, User) ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
check_read_permitted(QueueName, User),
rabbit_amqqueue:with_exclusive_access_or_die(
@@ -2506,7 +2516,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
auto_delete = AutoDelete,
internal = Internal,
arguments = Args},
- _ConnPid, _CollectorPid, VHostPath,
+ _ConnPid, _ChSrc, _CollectorPid, VHostPath,
#user{username = Username} = User) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
@@ -2539,7 +2549,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
AutoDelete, Internal, Args);
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
passive = true},
- _ConnPid, _CollectorPid, VHostPath, _User) ->
+ _ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
check_not_default_exchange(ExchangeName),
_ = rabbit_exchange:lookup_or_die(ExchangeName).
diff --git a/test/channel_source_SUITE.erl b/test/channel_source_SUITE.erl
index 11f87c7fde..a18c2474b7 100644
--- a/test/channel_source_SUITE.erl
+++ b/test/channel_source_SUITE.erl
@@ -16,7 +16,6 @@
-module(channel_source_SUITE).
--include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-compile(export_all).
@@ -29,9 +28,9 @@ all() ->
groups() ->
[
{non_parallel_tests, [], [
- network_channel_source_notifications,
- direct_channel_source_notifications,
- undefined_channel_source_notifications
+ network_rabbit_reader_channel_source,
+ direct_channel_source,
+ undefined_channel_source
]}
].
@@ -71,53 +70,49 @@ end_per_testcase(Testcase, Config) ->
%% Testcases.
%% -------------------------------------------------------------------
-network_channel_source_notifications(Config) ->
+network_rabbit_reader_channel_source(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, network_channel_source_notifications1, [Config]).
+ ?MODULE, network_rabbit_reader_channel_source1, [Config]).
-network_channel_source_notifications1(Config) ->
+network_rabbit_reader_channel_source1(Config) ->
ExistingChannels = rabbit_channel:list(),
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
- {ok, _ClientCh} = amqp_connection:open_channel(Conn),
+ {ok, ClientCh} = amqp_connection:open_channel(Conn),
[ServerCh] = rabbit_channel:list() -- ExistingChannels,
- [{channel_source, rabbit_reader}] =
- rabbit_channel:info(ServerCh, [channel_source]),
- rabbit_channel:source(ServerCh, ?MODULE),
- [{channel_source, ?MODULE}] =
- rabbit_channel:info(ServerCh, [channel_source]),
+ [{source, rabbit_reader}] = rabbit_channel:info(ServerCh, [source]),
+ _ = rabbit_channel:source(ServerCh, ?MODULE),
+ [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]),
+ amqp_channel:close(ClientCh),
amqp_connection:close(Conn),
{error, channel_terminated} = rabbit_channel:source(ServerCh, ?MODULE),
passed.
-direct_channel_source_notifications(Config) ->
+direct_channel_source(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, direct_channel_source_notifications1, [Config]).
+ ?MODULE, direct_channel_source1, [Config]).
-direct_channel_source_notifications1(Config) ->
+direct_channel_source1(Config) ->
ExistingChannels = rabbit_channel:list(),
Conn = rabbit_ct_client_helpers:open_unmanaged_connection_direct(Config),
- {ok, _ClientCh} = amqp_connection:open_channel(Conn),
+ {ok, ClientCh} = amqp_connection:open_channel(Conn),
[ServerCh] = rabbit_channel:list() -- ExistingChannels,
- [{channel_source, rabbit_direct}] =
- rabbit_channel:info(ServerCh, [channel_source]),
- rabbit_channel:source(ServerCh, ?MODULE),
- [{channel_source, ?MODULE}] =
- rabbit_channel:info(ServerCh, [channel_source]),
+ [{source, rabbit_direct}] = rabbit_channel:info(ServerCh, [source]),
+ _ = rabbit_channel:source(ServerCh, ?MODULE),
+ [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]),
+ amqp_channel:close(ClientCh),
amqp_connection:close(Conn),
{error, channel_terminated} = rabbit_channel:source(ServerCh, ?MODULE),
passed.
-undefined_channel_source_notifications(Config) ->
+undefined_channel_source(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, undefined_channel_source_notifications1, [Config]).
+ ?MODULE, undefined_channel_source1, [Config]).
-undefined_channel_source_notifications1(_Config) ->
+undefined_channel_source1(_Config) ->
ExistingChannels = rabbit_channel:list(),
{_Writer, _Limiter, ServerCh} = rabbit_ct_broker_helpers:test_channel(),
[ServerCh] = rabbit_channel:list() -- ExistingChannels,
- [{channel_source, undefined}] =
- rabbit_channel:info(ServerCh, [channel_source]),
- rabbit_channel:source(ServerCh, ?MODULE),
- [{channel_source, ?MODULE}] =
- rabbit_channel:info(ServerCh, [channel_source]),
+ [{source, undefined}] = rabbit_channel:info(ServerCh, [source]),
+ _ = rabbit_channel:source(ServerCh, ?MODULE),
+ [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]),
passed.