summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl40
-rw-r--r--src/rabbit_direct.erl1
-rw-r--r--src/rabbit_reader.erl1
-rw-r--r--test/channel_source_SUITE.erl164
4 files changed, 18 insertions, 188 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ef4ba4db5b..f86ec16682 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -63,7 +63,7 @@
-export([refresh_config_local/0, ready_for_close/1]).
-export([refresh_interceptors/0]).
-export([force_event_refresh/1]).
--export([source/2, update_user_state/2]).
+-export([update_user_state/2]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1,
@@ -461,15 +461,6 @@ force_event_refresh(Ref) ->
list_queue_states(Pid) ->
gen_server2:call(Pid, list_queue_states).
--spec source(pid(), any()) -> 'ok' | {error, channel_terminated}.
-
-source(Pid, Source) when is_pid(Pid) ->
- case erlang:is_process_alive(Pid) of
- true -> Pid ! {channel_source, Source},
- ok;
- false -> {error, channel_terminated}
- end.
-
-spec update_user_state(pid(), rabbit_types:auth_user()) -> 'ok' | {error, channel_terminated}.
update_user_state(Pid, UserState) when is_pid(Pid) ->
@@ -505,7 +496,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
put(permission_cache_can_expire, rabbit_access_control:permission_cache_can_expire(User)),
MaxMessageSize = get_max_message_size(),
ConsumerTimeout = get_consumer_timeout(),
- AuthzContext = extract_variable_map_from_amqp_params(AmqpParams),
+ OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams),
State = #ch{cfg = #conf{state = starting,
protocol = Protocol,
channel = Channel,
@@ -522,7 +513,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
consumer_prefetch = Prefetch,
max_message_size = MaxMessageSize,
consumer_timeout = ConsumerTimeout,
- authz_context = AuthzContext
+ authz_context = OptionalVariables
},
limiter = Limiter,
tx = none,
@@ -877,8 +868,6 @@ handle_info(tick, State0 = #ch{queue_states = QueueStates0}) ->
Return ->
Return
end;
-handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) ->
- noreply(State#ch{cfg = Cfg#conf{source = Source}});
handle_info({update_user_state, User}, State = #ch{cfg = Cfg}) ->
noreply(State#ch{cfg = Cfg#conf{user = User}}).
@@ -1091,16 +1080,24 @@ check_topic_authorisation(_, _, _, _, _) ->
ok.
+build_topic_variable_map(AuthzContext, VHost, Username) when is_map(AuthzContext) ->
+ maps:merge(AuthzContext, #{<<"vhost">> => VHost, <<"username">> => Username});
build_topic_variable_map(AuthzContext, VHost, Username) ->
- maps:merge(AuthzContext, #{<<"vhost">> => VHost, <<"username">> => Username}).
+ maps:merge(extract_variable_map_from_amqp_params(AuthzContext), #{<<"vhost">> => VHost, <<"username">> => Username}).
-%% use tuple representation of amqp_params to avoid coupling.
-%% get variable map only from amqp_params_direct, not amqp_params_network.
-%% amqp_params_direct are usually used from plugins (e.g. MQTT, STOMP)
-extract_variable_map_from_amqp_params([{amqp_params, {amqp_params_direct, _, _, _, _,
- {amqp_adapter_info, _,_,_,_,_,_,AdditionalInfo}, _}}]) ->
+%% Use tuple representation of amqp_params to avoid a dependency on amqp_client.
+%% Extracts variable map only from amqp_params_direct, not amqp_params_network.
+%% amqp_params_direct records are usually used by plugins (e.g. MQTT, STOMP)
+extract_variable_map_from_amqp_params({amqp_params, {amqp_params_direct, _, _, _, _,
+ {amqp_adapter_info, _,_,_,_,_,_,AdditionalInfo}, _}}) ->
+ proplists:get_value(variable_map, AdditionalInfo, #{});
+extract_variable_map_from_amqp_params({amqp_params_direct, _, _, _, _,
+ {amqp_adapter_info, _,_,_,_,_,_,AdditionalInfo}, _}) ->
proplists:get_value(variable_map, AdditionalInfo, #{});
-extract_variable_map_from_amqp_params(_) ->
+extract_variable_map_from_amqp_params([Value]) ->
+ extract_variable_map_from_amqp_params(Value);
+extract_variable_map_from_amqp_params(Other) ->
+ ct:pal("extract_variable_map_from_amqp_params other: ~p", [Other]),
#{}.
check_msg_size(Content, MaxMessageSize) ->
@@ -2366,7 +2363,6 @@ i(user_who_performed_action, Ch) -> i(user, Ch);
i(vhost, #ch{cfg = #conf{virtual_host = VHost}}) -> VHost;
i(transactional, #ch{tx = Tx}) -> Tx =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
-i(source, #ch{cfg = #conf{source = ChSrc}}) -> ChSrc;
i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM);
i(messages_unconfirmed, #ch{unconfirmed = UC}) -> unconfirmed_messages:size(UC);
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 814b2783e6..683be84676 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -211,7 +211,6 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User,
rabbit_direct_client_sup,
[{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol,
User, VHost, Capabilities, Collector, AmqpParams}]),
- _ = rabbit_channel:source(ChannelPid, ?MODULE),
{ok, ChannelPid}.
-spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 116dcf89e6..70ed3246d3 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -925,7 +925,6 @@ create_channel(Channel,
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
Protocol, User, VHost, Capabilities, Collector}),
- _ = rabbit_channel:source(ChPid, ?MODULE),
MRef = erlang:monitor(process, ChPid),
put({ch_pid, ChPid}, {Channel, MRef}),
put({channel, Channel}, {ChPid, AState}),
diff --git a/test/channel_source_SUITE.erl b/test/channel_source_SUITE.erl
deleted file mode 100644
index 3247ffa997..0000000000
--- a/test/channel_source_SUITE.erl
+++ /dev/null
@@ -1,164 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at https://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(channel_source_SUITE).
-
--include_lib("amqp_client/include/amqp_client.hrl").
--include_lib("eunit/include/eunit.hrl").
-
--compile(export_all).
-
-all() ->
- [
- {group, non_parallel_tests}
- ].
-
-groups() ->
- [
- {non_parallel_tests, [], [
- network_rabbit_reader_channel_source,
- network_arbitrary_channel_source,
- direct_channel_source,
- undefined_channel_source
- ]}
- ].
-
-%% -------------------------------------------------------------------
-%% Testsuite setup/teardown.
-%% -------------------------------------------------------------------
-
-init_per_suite(Config) ->
- rabbit_ct_helpers:log_environment(),
- rabbit_ct_helpers:run_setup_steps(Config).
-
-end_per_suite(Config) ->
- rabbit_ct_helpers:run_teardown_steps(Config).
-
-init_per_group(_, Config) ->
- Config.
-
-end_per_group(_, Config) ->
- Config.
-
-init_per_testcase(Testcase, Config) ->
- rabbit_ct_helpers:testcase_started(Config, Testcase),
- Config1 = rabbit_ct_helpers:set_config(Config, [
- {rmq_nodename_suffix, Testcase}
- ]),
- rabbit_ct_helpers:run_steps(Config1,
- rabbit_ct_broker_helpers:setup_steps() ++
- rabbit_ct_client_helpers:setup_steps()).
-
-end_per_testcase(Testcase, Config) ->
- Config1 = rabbit_ct_helpers:run_steps(Config,
- rabbit_ct_client_helpers:teardown_steps() ++
- rabbit_ct_broker_helpers:teardown_steps()),
- rabbit_ct_helpers:testcase_finished(Config1, Testcase).
-
-%% -------------------------------------------------------------------
-%% Testcases.
-%% -------------------------------------------------------------------
-
-network_rabbit_reader_channel_source(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, network_rabbit_reader_channel_source1, [Config]).
-
-network_rabbit_reader_channel_source1(Config) ->
- ExistingChannels = rabbit_channel:list(),
- Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
- {ok, ClientCh} = amqp_connection:open_channel(Conn),
- [ServerCh] = rabbit_channel:list() -- ExistingChannels,
- [{source, rabbit_reader}] = rabbit_channel:info(ServerCh, [source]),
- _ = rabbit_channel:source(ServerCh, ?MODULE),
- [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]),
- amqp_channel:close(ClientCh),
- amqp_connection:close(Conn),
- wait_for_channel_termination(ServerCh, 60),
- passed.
-
-network_arbitrary_channel_source(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, network_arbitrary_channel_source1, [Config]).
-
-network_arbitrary_channel_source1(Config) ->
- Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
- Writer = spawn(fun () -> rabbit_ct_broker_helpers:test_writer(self()) end),
- {ok, Limiter} = rabbit_limiter:start_link(no_limiter_id),
- {ok, Collector} = rabbit_queue_collector:start_link(no_collector_id),
- {ok, Ch} = rabbit_channel:start_link(
- 1, Conn, Writer, Conn, "", rabbit_framing_amqp_0_9_1,
- rabbit_ct_broker_helpers:user(<<"guest">>), <<"/">>, [],
- Collector, Limiter),
- _ = rabbit_channel:source(Ch, ?MODULE),
- [{source, ?MODULE}] = rabbit_channel:info(Ch, [source]),
- [exit(P, normal) || P <- [Writer, Limiter, Collector, Ch]],
- amqp_connection:close(Conn),
- wait_for_channel_termination(Ch, 60),
- passed.
-
-direct_channel_source(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, direct_channel_source1, [Config]).
-
-direct_channel_source1(Config) ->
- ExistingChannels = rabbit_channel:list(),
- Conn = rabbit_ct_client_helpers:open_unmanaged_connection_direct(Config),
- {ok, ClientCh} = amqp_connection:open_channel(Conn),
- [ServerCh] = rabbit_channel:list() -- ExistingChannels,
- [{source, rabbit_direct}] = rabbit_channel:info(ServerCh, [source]),
- _ = rabbit_channel:source(ServerCh, ?MODULE),
- [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]),
- amqp_channel:close(ClientCh),
- amqp_connection:close(Conn),
- wait_for_channel_termination(ServerCh, 60),
- passed.
-
-wait_for_channel_termination(Ch, 0) ->
- ?assertEqual(
- {error, channel_terminated},
- rabbit_channel:source(Ch, ?MODULE));
-wait_for_channel_termination(Ch, Attempts) ->
- case rabbit_channel:source(Ch, ?MODULE) of
- {error, channel_terminated} ->
- ok;
- _ ->
- timer:sleep(1000),
- wait_for_channel_termination(Ch, Attempts - 1)
- end.
-
-undefined_channel_source(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, undefined_channel_source1, [Config]).
-
-undefined_channel_source1(_Config) ->
- ExistingChannels = rabbit_channel:list(),
- {_Writer, _Limiter, ServerCh} = rabbit_ct_broker_helpers:test_channel(),
- wait_for_server_channel(ExistingChannels, ServerCh, 60),
- [{source, undefined}] = rabbit_channel:info(ServerCh, [source]),
- _ = rabbit_channel:source(ServerCh, ?MODULE),
- [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]),
- passed.
-
-wait_for_server_channel(ExistingChannels, ServerCh, 0) ->
- ?assertEqual([ServerCh], rabbit_channel:list() -- ExistingChannels);
-wait_for_server_channel(ExistingChannels, ServerCh, Attempts) ->
- case rabbit_channel:list() -- ExistingChannels of
- [ServerCh] ->
- ok;
- _ ->
- timer:sleep(1000),
- wait_for_server_channel(ExistingChannels, ServerCh, Attempts - 1)
- end.