diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 1 |
3 files changed, 20 insertions, 3 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 036aa9a60c..dcb4befee6 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -63,6 +63,7 @@ -export([refresh_config_local/0, ready_for_close/1]). -export([refresh_interceptors/0]). -export([force_event_refresh/1]). +-export([source/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/4, @@ -448,6 +449,14 @@ force_event_refresh(Ref) -> list_queue_states(Pid) -> gen_server2:call(Pid, list_queue_states). +-spec source(pid(), any()) -> any(). + +source(Pid, Source) when is_pid(Pid) -> + case erlang:is_process_alive(Pid) of + true -> Pid ! {channel_source, Source}; + false -> {error, channel_terminated} + end. + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, @@ -805,7 +814,11 @@ handle_info(queue_cleanup, State = #ch{queue_states = QueueStates0}) -> QName = rabbit_quorum_queue:queue_name(QS), [] /= rabbit_amqqueue:lookup(QName) end, QueueStates0), - noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})). + noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})); + +handle_info({channel_source, Source}, State) -> + put(channel_source, Source), + noreply(State). handle_pre_hibernate(State) -> ok = clear_permission_cache(), @@ -984,7 +997,7 @@ check_topic_authorisation(Resource = #exchange{type = topic}, check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission); check_topic_authorisation(Resource = #exchange{type = topic}, User, ConnPid, RoutingKey, Permission) when is_pid(ConnPid) -> - AmqpParams = get_amqp_params(ConnPid), + AmqpParams = get_amqp_params(ConnPid, get(channel_source)), check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission); check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic}, User = #user{username = Username}, @@ -1007,7 +1020,8 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost check_topic_authorisation(_, _, _, _, _) -> ok. -get_amqp_params(ConnPid) when is_pid(ConnPid) -> +get_amqp_params(_ConnPid, rabbit_reader) -> []; +get_amqp_params(ConnPid, _Any) when is_pid(ConnPid) -> Timeout = get_operation_timeout(), get_amqp_params(ConnPid, rabbit_misc:is_process_alive(ConnPid), Timeout). @@ -2246,6 +2260,7 @@ i(garbage_collection, _State) -> i(reductions, _State) -> {reductions, Reductions} = erlang:process_info(self(), reductions), Reductions; +i(channel_source, _State = #ch{}) -> get(channel_source); i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 50e8f3d2b0..e43bfba90c 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -211,6 +211,7 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, rabbit_direct_client_sup, [{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector}]), + _ = rabbit_channel:source(ChannelPid, ?MODULE), {ok, ChannelPid}. -spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index c0cb9c57d5..6f0b0a5ea5 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -924,6 +924,7 @@ create_channel(Channel, rabbit_channel_sup_sup:start_channel( ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, Protocol, User, VHost, Capabilities, Collector}), + _ = rabbit_channel:source(ChPid, ?MODULE), MRef = erlang:monitor(process, ChPid), put({ch_pid, ChPid}, {Channel, MRef}), put({channel, Channel}, {ChPid, AState}), |
