diff options
Diffstat (limited to 'deps/rabbitmq_stream/src/rabbit_stream.erl')
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream.erl | 103 |
1 files changed, 103 insertions, 0 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream.erl b/deps/rabbitmq_stream/src/rabbit_stream.erl new file mode 100644 index 0000000000..8353d66d57 --- /dev/null +++ b/deps/rabbitmq_stream/src/rabbit_stream.erl @@ -0,0 +1,103 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 2.0 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/en-US/MPL/2.0/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is Pivotal Software, Inc. +%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_stream). +-behaviour(application). + +-export([start/2, host/0, port/0, kill_connection/1]). +-export([stop/1]). +-export([emit_connection_info_local/3, + emit_connection_info_all/4, + list/0]). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +start(_Type, _Args) -> + rabbit_stream_sup:start_link(). + +host() -> + case application:get_env(rabbitmq_stream, advertised_host, undefined) of + undefined -> + hostname_from_node(); + Host -> + rabbit_data_coercion:to_binary(Host) + end. + +hostname_from_node() -> + case re:split(rabbit_data_coercion:to_binary(node()), + "@", + [{return, binary}, {parts, 2}]) of + [_, Hostname] -> + Hostname; + [_] -> + rabbit_data_coercion:to_binary(inet:gethostname()) + end. + +port() -> + case application:get_env(rabbitmq_stream, advertised_port, undefined) of + undefined -> + port_from_listener(); + Port -> + Port + end. + +port_from_listener() -> + Listeners = rabbit_networking:node_listeners(node()), + Port = lists:foldl(fun(#listener{port = Port, protocol = stream}, _Acc) -> + Port; + (_, Acc) -> + Acc + end, undefined, Listeners), + Port. + +stop(_State) -> + ok. + +kill_connection(ConnectionName) -> + ConnectionNameBin = rabbit_data_coercion:to_binary(ConnectionName), + lists:foreach(fun(ConnectionPid) -> + ConnectionPid ! {infos, self()}, + receive + {ConnectionPid, #{<<"connection_name">> := ConnectionNameBin}} -> + exit(ConnectionPid, kill); + {ConnectionPid, _ClientProperties} -> + ok + after 1000 -> + ok + end + end, pg_local:get_members(rabbit_stream_connections)). + +emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) -> + Pids = [spawn_link(Node, rabbit_stream, emit_connection_info_local, + [Items, Ref, AggregatorPid]) + || Node <- Nodes], + rabbit_control_misc:await_emitters_termination(Pids), + ok. + +emit_connection_info_local(Items, Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map_with_exit_handler( + AggregatorPid, Ref, fun(Pid) -> + rabbit_stream_reader:info(Pid, Items) + end, + list()). + +list() -> + [Client + || {_, ListSupPid, _, _} <- supervisor2:which_children(rabbit_stream_sup), + {_, RanchSup, supervisor, _} <- supervisor2:which_children(ListSupPid), + {ranch_conns_sup, ConnSup, _, _} <- supervisor:which_children(RanchSup), + {_, CliSup, _, _} <- supervisor:which_children(ConnSup), + {rabbit_stream_reader, Client, _, _} <- supervisor:which_children(CliSup)].
\ No newline at end of file |