diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2019-11-23 16:18:34 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2019-11-23 16:18:34 +0300 |
| commit | 99f29e555c50f5839cba49315aec0afbdbef6342 (patch) | |
| tree | 649b7f13c0c831f4028c446988d74bb5410e999d | |
| parent | c9aa4e8a56ccf3c21e681b4f69f7a946180639eb (diff) | |
| download | rabbitmq-server-git-99f29e555c50f5839cba49315aec0afbdbef6342.tar.gz | |
Continuation to #2169 by @velimir
* Drop channel source
* Correct optional variable extraction to accommodate MQTT and Erlang client
test suites
| -rw-r--r-- | src/rabbit_channel.erl | 40 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 1 | ||||
| -rw-r--r-- | test/channel_source_SUITE.erl | 164 |
4 files changed, 18 insertions, 188 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ef4ba4db5b..f86ec16682 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -63,7 +63,7 @@ -export([refresh_config_local/0, ready_for_close/1]). -export([refresh_interceptors/0]). -export([force_event_refresh/1]). --export([source/2, update_user_state/2]). +-export([update_user_state/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1, @@ -461,15 +461,6 @@ force_event_refresh(Ref) -> list_queue_states(Pid) -> gen_server2:call(Pid, list_queue_states). --spec source(pid(), any()) -> 'ok' | {error, channel_terminated}. - -source(Pid, Source) when is_pid(Pid) -> - case erlang:is_process_alive(Pid) of - true -> Pid ! {channel_source, Source}, - ok; - false -> {error, channel_terminated} - end. - -spec update_user_state(pid(), rabbit_types:auth_user()) -> 'ok' | {error, channel_terminated}. update_user_state(Pid, UserState) when is_pid(Pid) -> @@ -505,7 +496,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, put(permission_cache_can_expire, rabbit_access_control:permission_cache_can_expire(User)), MaxMessageSize = get_max_message_size(), ConsumerTimeout = get_consumer_timeout(), - AuthzContext = extract_variable_map_from_amqp_params(AmqpParams), + OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams), State = #ch{cfg = #conf{state = starting, protocol = Protocol, channel = Channel, @@ -522,7 +513,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, consumer_prefetch = Prefetch, max_message_size = MaxMessageSize, consumer_timeout = ConsumerTimeout, - authz_context = AuthzContext + authz_context = OptionalVariables }, limiter = Limiter, tx = none, @@ -877,8 +868,6 @@ handle_info(tick, State0 = #ch{queue_states = QueueStates0}) -> Return -> Return end; -handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) -> - noreply(State#ch{cfg = Cfg#conf{source = Source}}); handle_info({update_user_state, User}, State = #ch{cfg = Cfg}) -> noreply(State#ch{cfg = Cfg#conf{user = User}}). @@ -1091,16 +1080,24 @@ check_topic_authorisation(_, _, _, _, _) -> ok. +build_topic_variable_map(AuthzContext, VHost, Username) when is_map(AuthzContext) -> + maps:merge(AuthzContext, #{<<"vhost">> => VHost, <<"username">> => Username}); build_topic_variable_map(AuthzContext, VHost, Username) -> - maps:merge(AuthzContext, #{<<"vhost">> => VHost, <<"username">> => Username}). + maps:merge(extract_variable_map_from_amqp_params(AuthzContext), #{<<"vhost">> => VHost, <<"username">> => Username}). -%% use tuple representation of amqp_params to avoid coupling. -%% get variable map only from amqp_params_direct, not amqp_params_network. -%% amqp_params_direct are usually used from plugins (e.g. MQTT, STOMP) -extract_variable_map_from_amqp_params([{amqp_params, {amqp_params_direct, _, _, _, _, - {amqp_adapter_info, _,_,_,_,_,_,AdditionalInfo}, _}}]) -> +%% Use tuple representation of amqp_params to avoid a dependency on amqp_client. +%% Extracts variable map only from amqp_params_direct, not amqp_params_network. +%% amqp_params_direct records are usually used by plugins (e.g. MQTT, STOMP) +extract_variable_map_from_amqp_params({amqp_params, {amqp_params_direct, _, _, _, _, + {amqp_adapter_info, _,_,_,_,_,_,AdditionalInfo}, _}}) -> + proplists:get_value(variable_map, AdditionalInfo, #{}); +extract_variable_map_from_amqp_params({amqp_params_direct, _, _, _, _, + {amqp_adapter_info, _,_,_,_,_,_,AdditionalInfo}, _}) -> proplists:get_value(variable_map, AdditionalInfo, #{}); -extract_variable_map_from_amqp_params(_) -> +extract_variable_map_from_amqp_params([Value]) -> + extract_variable_map_from_amqp_params(Value); +extract_variable_map_from_amqp_params(Other) -> + ct:pal("extract_variable_map_from_amqp_params other: ~p", [Other]), #{}. check_msg_size(Content, MaxMessageSize) -> @@ -2366,7 +2363,6 @@ i(user_who_performed_action, Ch) -> i(user, Ch); i(vhost, #ch{cfg = #conf{virtual_host = VHost}}) -> VHost; i(transactional, #ch{tx = Tx}) -> Tx =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; -i(source, #ch{cfg = #conf{source = ChSrc}}) -> ChSrc; i(name, State) -> name(State); i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM); i(messages_unconfirmed, #ch{unconfirmed = UC}) -> unconfirmed_messages:size(UC); diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 814b2783e6..683be84676 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -211,7 +211,6 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, rabbit_direct_client_sup, [{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector, AmqpParams}]), - _ = rabbit_channel:source(ChannelPid, ?MODULE), {ok, ChannelPid}. -spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 116dcf89e6..70ed3246d3 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -925,7 +925,6 @@ create_channel(Channel, rabbit_channel_sup_sup:start_channel( ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, Protocol, User, VHost, Capabilities, Collector}), - _ = rabbit_channel:source(ChPid, ?MODULE), MRef = erlang:monitor(process, ChPid), put({ch_pid, ChPid}, {Channel, MRef}), put({channel, Channel}, {ChPid, AState}), diff --git a/test/channel_source_SUITE.erl b/test/channel_source_SUITE.erl deleted file mode 100644 index 3247ffa997..0000000000 --- a/test/channel_source_SUITE.erl +++ /dev/null @@ -1,164 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at https://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved. -%% - --module(channel_source_SUITE). - --include_lib("amqp_client/include/amqp_client.hrl"). --include_lib("eunit/include/eunit.hrl"). - --compile(export_all). - -all() -> - [ - {group, non_parallel_tests} - ]. - -groups() -> - [ - {non_parallel_tests, [], [ - network_rabbit_reader_channel_source, - network_arbitrary_channel_source, - direct_channel_source, - undefined_channel_source - ]} - ]. - -%% ------------------------------------------------------------------- -%% Testsuite setup/teardown. -%% ------------------------------------------------------------------- - -init_per_suite(Config) -> - rabbit_ct_helpers:log_environment(), - rabbit_ct_helpers:run_setup_steps(Config). - -end_per_suite(Config) -> - rabbit_ct_helpers:run_teardown_steps(Config). - -init_per_group(_, Config) -> - Config. - -end_per_group(_, Config) -> - Config. - -init_per_testcase(Testcase, Config) -> - rabbit_ct_helpers:testcase_started(Config, Testcase), - Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, Testcase} - ]), - rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()). - -end_per_testcase(Testcase, Config) -> - Config1 = rabbit_ct_helpers:run_steps(Config, - rabbit_ct_client_helpers:teardown_steps() ++ - rabbit_ct_broker_helpers:teardown_steps()), - rabbit_ct_helpers:testcase_finished(Config1, Testcase). - -%% ------------------------------------------------------------------- -%% Testcases. -%% ------------------------------------------------------------------- - -network_rabbit_reader_channel_source(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, network_rabbit_reader_channel_source1, [Config]). - -network_rabbit_reader_channel_source1(Config) -> - ExistingChannels = rabbit_channel:list(), - Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), - {ok, ClientCh} = amqp_connection:open_channel(Conn), - [ServerCh] = rabbit_channel:list() -- ExistingChannels, - [{source, rabbit_reader}] = rabbit_channel:info(ServerCh, [source]), - _ = rabbit_channel:source(ServerCh, ?MODULE), - [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]), - amqp_channel:close(ClientCh), - amqp_connection:close(Conn), - wait_for_channel_termination(ServerCh, 60), - passed. - -network_arbitrary_channel_source(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, network_arbitrary_channel_source1, [Config]). - -network_arbitrary_channel_source1(Config) -> - Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), - Writer = spawn(fun () -> rabbit_ct_broker_helpers:test_writer(self()) end), - {ok, Limiter} = rabbit_limiter:start_link(no_limiter_id), - {ok, Collector} = rabbit_queue_collector:start_link(no_collector_id), - {ok, Ch} = rabbit_channel:start_link( - 1, Conn, Writer, Conn, "", rabbit_framing_amqp_0_9_1, - rabbit_ct_broker_helpers:user(<<"guest">>), <<"/">>, [], - Collector, Limiter), - _ = rabbit_channel:source(Ch, ?MODULE), - [{source, ?MODULE}] = rabbit_channel:info(Ch, [source]), - [exit(P, normal) || P <- [Writer, Limiter, Collector, Ch]], - amqp_connection:close(Conn), - wait_for_channel_termination(Ch, 60), - passed. - -direct_channel_source(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, direct_channel_source1, [Config]). - -direct_channel_source1(Config) -> - ExistingChannels = rabbit_channel:list(), - Conn = rabbit_ct_client_helpers:open_unmanaged_connection_direct(Config), - {ok, ClientCh} = amqp_connection:open_channel(Conn), - [ServerCh] = rabbit_channel:list() -- ExistingChannels, - [{source, rabbit_direct}] = rabbit_channel:info(ServerCh, [source]), - _ = rabbit_channel:source(ServerCh, ?MODULE), - [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]), - amqp_channel:close(ClientCh), - amqp_connection:close(Conn), - wait_for_channel_termination(ServerCh, 60), - passed. - -wait_for_channel_termination(Ch, 0) -> - ?assertEqual( - {error, channel_terminated}, - rabbit_channel:source(Ch, ?MODULE)); -wait_for_channel_termination(Ch, Attempts) -> - case rabbit_channel:source(Ch, ?MODULE) of - {error, channel_terminated} -> - ok; - _ -> - timer:sleep(1000), - wait_for_channel_termination(Ch, Attempts - 1) - end. - -undefined_channel_source(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, undefined_channel_source1, [Config]). - -undefined_channel_source1(_Config) -> - ExistingChannels = rabbit_channel:list(), - {_Writer, _Limiter, ServerCh} = rabbit_ct_broker_helpers:test_channel(), - wait_for_server_channel(ExistingChannels, ServerCh, 60), - [{source, undefined}] = rabbit_channel:info(ServerCh, [source]), - _ = rabbit_channel:source(ServerCh, ?MODULE), - [{source, ?MODULE}] = rabbit_channel:info(ServerCh, [source]), - passed. - -wait_for_server_channel(ExistingChannels, ServerCh, 0) -> - ?assertEqual([ServerCh], rabbit_channel:list() -- ExistingChannels); -wait_for_server_channel(ExistingChannels, ServerCh, Attempts) -> - case rabbit_channel:list() -- ExistingChannels of - [ServerCh] -> - ok; - _ -> - timer:sleep(1000), - wait_for_server_channel(ExistingChannels, ServerCh, Attempts - 1) - end. |
