summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_stream/src
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbitmq_stream/src')
-rw-r--r--deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl95
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream.erl103
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_connection_sup.erl49
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl262
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl1274
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_sup.erl61
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_utils.erl125
7 files changed, 1969 insertions, 0 deletions
diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl
new file mode 100644
index 0000000000..f185ab044e
--- /dev/null
+++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl
@@ -0,0 +1,95 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 2.0 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+
+-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand').
+
+-include("rabbit_stream.hrl").
+
+-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
+
+-export([formatter/0,
+ scopes/0,
+ switches/0,
+ aliases/0,
+ usage/0,
+ usage_additional/0,
+ usage_doc_guides/0,
+ banner/2,
+ validate/2,
+ merge_defaults/2,
+ run/2,
+ output/2,
+ description/0,
+ help_section/0]).
+
+formatter() -> 'Elixir.RabbitMQ.CLI.Formatters.Table'.
+
+scopes() -> [ctl, diagnostics, streams].
+
+switches() -> [{verbose, boolean}].
+aliases() -> [{'V', verbose}].
+
+description() -> <<"Lists stream connections on the target node">>.
+
+help_section() ->
+ {plugin, stream}.
+
+validate(Args, _) ->
+ case 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':validate_info_keys(Args,
+ ?INFO_ITEMS) of
+ {ok, _} -> ok;
+ Error -> Error
+ end.
+
+merge_defaults([], Opts) ->
+ merge_defaults([<<"conn_name">>], Opts);
+merge_defaults(Args, Opts) ->
+ {Args, maps:merge(#{verbose => false}, Opts)}.
+
+usage() ->
+ <<"list_stream_connections [<column> ...]">>.
+
+usage_additional() ->
+ Prefix = <<" must be one of ">>,
+ InfoItems = 'Elixir.Enum':join(lists:usort(?INFO_ITEMS), <<", ">>),
+ [
+ {<<"<column>">>, <<Prefix/binary, InfoItems/binary>>}
+ ].
+
+usage_doc_guides() ->
+ [?STREAM_GUIDE_URL].
+
+run(Args, #{node := NodeName,
+ timeout := Timeout,
+ verbose := Verbose}) ->
+ InfoKeys = case Verbose of
+ true -> ?INFO_ITEMS;
+ false -> 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':prepare_info_keys(Args)
+ end,
+ Nodes = 'Elixir.RabbitMQ.CLI.Core.Helpers':nodes_in_cluster(NodeName),
+
+ 'Elixir.RabbitMQ.CLI.Ctl.RpcStream':receive_list_items(
+ NodeName,
+ rabbit_stream,
+ emit_connection_info_all,
+ [Nodes, InfoKeys],
+ Timeout,
+ InfoKeys,
+ length(Nodes)).
+
+banner(_, _) -> <<"Listing stream connections ...">>.
+
+output(Result, _Opts) ->
+ 'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result).
diff --git a/deps/rabbitmq_stream/src/rabbit_stream.erl b/deps/rabbitmq_stream/src/rabbit_stream.erl
new file mode 100644
index 0000000000..8353d66d57
--- /dev/null
+++ b/deps/rabbitmq_stream/src/rabbit_stream.erl
@@ -0,0 +1,103 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 2.0 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/en-US/MPL/2.0/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is Pivotal Software, Inc.
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_stream).
+-behaviour(application).
+
+-export([start/2, host/0, port/0, kill_connection/1]).
+-export([stop/1]).
+-export([emit_connection_info_local/3,
+ emit_connection_info_all/4,
+ list/0]).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+start(_Type, _Args) ->
+ rabbit_stream_sup:start_link().
+
+host() ->
+ case application:get_env(rabbitmq_stream, advertised_host, undefined) of
+ undefined ->
+ hostname_from_node();
+ Host ->
+ rabbit_data_coercion:to_binary(Host)
+ end.
+
+hostname_from_node() ->
+ case re:split(rabbit_data_coercion:to_binary(node()),
+ "@",
+ [{return, binary}, {parts, 2}]) of
+ [_, Hostname] ->
+ Hostname;
+ [_] ->
+ rabbit_data_coercion:to_binary(inet:gethostname())
+ end.
+
+port() ->
+ case application:get_env(rabbitmq_stream, advertised_port, undefined) of
+ undefined ->
+ port_from_listener();
+ Port ->
+ Port
+ end.
+
+port_from_listener() ->
+ Listeners = rabbit_networking:node_listeners(node()),
+ Port = lists:foldl(fun(#listener{port = Port, protocol = stream}, _Acc) ->
+ Port;
+ (_, Acc) ->
+ Acc
+ end, undefined, Listeners),
+ Port.
+
+stop(_State) ->
+ ok.
+
+kill_connection(ConnectionName) ->
+ ConnectionNameBin = rabbit_data_coercion:to_binary(ConnectionName),
+ lists:foreach(fun(ConnectionPid) ->
+ ConnectionPid ! {infos, self()},
+ receive
+ {ConnectionPid, #{<<"connection_name">> := ConnectionNameBin}} ->
+ exit(ConnectionPid, kill);
+ {ConnectionPid, _ClientProperties} ->
+ ok
+ after 1000 ->
+ ok
+ end
+ end, pg_local:get_members(rabbit_stream_connections)).
+
+emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
+ Pids = [spawn_link(Node, rabbit_stream, emit_connection_info_local,
+ [Items, Ref, AggregatorPid])
+ || Node <- Nodes],
+ rabbit_control_misc:await_emitters_termination(Pids),
+ ok.
+
+emit_connection_info_local(Items, Ref, AggregatorPid) ->
+ rabbit_control_misc:emitting_map_with_exit_handler(
+ AggregatorPid, Ref, fun(Pid) ->
+ rabbit_stream_reader:info(Pid, Items)
+ end,
+ list()).
+
+list() ->
+ [Client
+ || {_, ListSupPid, _, _} <- supervisor2:which_children(rabbit_stream_sup),
+ {_, RanchSup, supervisor, _} <- supervisor2:which_children(ListSupPid),
+ {ranch_conns_sup, ConnSup, _, _} <- supervisor:which_children(RanchSup),
+ {_, CliSup, _, _} <- supervisor:which_children(ConnSup),
+ {rabbit_stream_reader, Client, _, _} <- supervisor:which_children(CliSup)]. \ No newline at end of file
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_connection_sup.erl b/deps/rabbitmq_stream/src/rabbit_stream_connection_sup.erl
new file mode 100644
index 0000000000..3092a68517
--- /dev/null
+++ b/deps/rabbitmq_stream/src/rabbit_stream_connection_sup.erl
@@ -0,0 +1,49 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 2.0 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/en-US/MPL/2.0/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is Pivotal Software, Inc.
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_stream_connection_sup).
+
+-behaviour(supervisor2).
+-behaviour(ranch_protocol).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+-export([start_link/4, start_keepalive_link/0]).
+
+-export([init/1]).
+
+
+start_link(Ref, _Sock, Transport, Opts) ->
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, KeepaliveSup} = supervisor2:start_child(
+ SupPid,
+ {rabbit_stream_keepalive_sup,
+ {rabbit_stream_connection_sup, start_keepalive_link, []},
+ intrinsic, infinity, supervisor, [rabbit_keepalive_sup]}),
+ {ok, ReaderPid} = supervisor2:start_child(
+ SupPid,
+ {rabbit_stream_reader,
+ {rabbit_stream_reader, start_link, [KeepaliveSup, Transport, Ref, Opts]},
+ intrinsic, ?WORKER_WAIT, worker, [rabbit_stream_reader]}),
+ {ok, SupPid, ReaderPid}.
+
+start_keepalive_link() ->
+ supervisor2:start_link(?MODULE, []).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
new file mode 100644
index 0000000000..e418dd1022
--- /dev/null
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -0,0 +1,262 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 2.0 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/en-US/MPL/2.0/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is Pivotal Software, Inc.
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_stream_manager).
+-behaviour(gen_server).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+%% API
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
+-export([start_link/1, create/4, delete/3, lookup_leader/2, lookup_local_member/2, topology/2]).
+
+-record(state, {
+ configuration
+}).
+
+start_link(Conf) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [Conf], []).
+
+init([Conf]) ->
+ {ok, #state{configuration = Conf}}.
+
+-spec create(binary(), binary(), #{binary() => binary()}, binary()) ->
+ {ok, map()} | {error, reference_already_exists} | {error, internal_error}.
+create(VirtualHost, Reference, Arguments, Username) ->
+ gen_server:call(?MODULE, {create, VirtualHost, Reference, Arguments, Username}).
+
+-spec delete(binary(), binary(), binary()) ->
+ {ok, deleted} | {error, reference_not_found}.
+delete(VirtualHost, Reference, Username) ->
+ gen_server:call(?MODULE, {delete, VirtualHost, Reference, Username}).
+
+-spec lookup_leader(binary(), binary()) -> pid() | cluster_not_found.
+lookup_leader(VirtualHost, Stream) ->
+ gen_server:call(?MODULE, {lookup_leader, VirtualHost, Stream}).
+
+-spec lookup_local_member(binary(), binary()) -> {ok, pid()} | {error, not_found}.
+lookup_local_member(VirtualHost, Stream) ->
+ gen_server:call(?MODULE, {lookup_local_member, VirtualHost, Stream}).
+
+-spec topology(binary(), binary()) ->
+ {ok, #{leader_node => pid(), replica_nodes => [pid()]}} | {error, stream_not_found}.
+topology(VirtualHost, Stream) ->
+ gen_server:call(?MODULE, {topology, VirtualHost, Stream}).
+
+stream_queue_arguments(Arguments) ->
+ stream_queue_arguments([{<<"x-queue-type">>, longstr, <<"stream">>}], Arguments).
+
+stream_queue_arguments(ArgumentsAcc, Arguments) when map_size(Arguments) =:= 0 ->
+ ArgumentsAcc;
+stream_queue_arguments(ArgumentsAcc, #{<<"max-length-bytes">> := Value} = Arguments) ->
+ stream_queue_arguments(
+ [{<<"x-max-length-bytes">>, long, binary_to_integer(Value)}] ++ ArgumentsAcc,
+ maps:remove(<<"max-length-bytes">>, Arguments)
+ );
+stream_queue_arguments(ArgumentsAcc, #{<<"max-age">> := Value} = Arguments) ->
+ stream_queue_arguments(
+ [{<<"x-max-age">>, longstr, Value}] ++ ArgumentsAcc,
+ maps:remove(<<"max-age">>, Arguments)
+ );
+stream_queue_arguments(ArgumentsAcc, #{<<"max-segment-size">> := Value} = Arguments) ->
+ stream_queue_arguments(
+ [{<<"x-max-segment-size">>, long, binary_to_integer(Value)}] ++ ArgumentsAcc,
+ maps:remove(<<"max-segment-size">>, Arguments)
+ );
+stream_queue_arguments(ArgumentsAcc, #{<<"initial-cluster-size">> := Value} = Arguments) ->
+ stream_queue_arguments(
+ [{<<"x-initial-cluster-size">>, long, binary_to_integer(Value)}] ++ ArgumentsAcc,
+ maps:remove(<<"initial-cluster-size">>, Arguments)
+ );
+stream_queue_arguments(ArgumentsAcc, #{<<"queue-leader-locator">> := Value} = Arguments) ->
+ stream_queue_arguments(
+ [{<<"x-queue-leader-locator">>, longstr, Value}] ++ ArgumentsAcc,
+ maps:remove(<<"queue-leader-locator">>, Arguments)
+ );
+stream_queue_arguments(ArgumentsAcc, _Arguments) ->
+ ArgumentsAcc.
+
+validate_stream_queue_arguments([]) ->
+ ok;
+validate_stream_queue_arguments([{<<"x-initial-cluster-size">>, long, ClusterSize} | _]) when ClusterSize =< 0 ->
+ error;
+validate_stream_queue_arguments([{<<"x-queue-leader-locator">>, longstr, Locator} | T]) ->
+ case lists:member(Locator, [<<"client-local">>,
+ <<"random">>,
+ <<"least-leaders">>]) of
+ true ->
+ validate_stream_queue_arguments(T);
+ false ->
+ error
+ end;
+validate_stream_queue_arguments([_ | T]) ->
+ validate_stream_queue_arguments(T).
+
+
+handle_call({create, VirtualHost, Reference, Arguments, Username}, _From, State) ->
+ Name = #resource{virtual_host = VirtualHost, kind = queue, name = Reference},
+ StreamQueueArguments = stream_queue_arguments(Arguments),
+ case validate_stream_queue_arguments(StreamQueueArguments) of
+ ok ->
+ Q0 = amqqueue:new(
+ Name,
+ none, true, false, none, StreamQueueArguments,
+ VirtualHost, #{user => Username}, rabbit_stream_queue
+ ),
+ try
+ case rabbit_stream_queue:declare(Q0, node()) of
+ {new, Q} ->
+ {reply, {ok, amqqueue:get_type_state(Q)}, State};
+ {existing, _} ->
+ {reply, {error, reference_already_exists}, State};
+ {error, Err} ->
+ rabbit_log:warning("Error while creating ~p stream, ~p~n", [Reference, Err]),
+ {reply, {error, internal_error}, State}
+ end
+ catch
+ exit:Error ->
+ rabbit_log:info("Error while creating ~p stream, ~p~n", [Reference, Error]),
+ {reply, {error, internal_error}, State}
+ end;
+ error ->
+ {reply, {error, validation_failed}, State}
+ end;
+handle_call({delete, VirtualHost, Reference, Username}, _From, State) ->
+ Name = #resource{virtual_host = VirtualHost, kind = queue, name = Reference},
+ rabbit_log:debug("Trying to delete stream ~p~n", [Reference]),
+ case rabbit_amqqueue:lookup(Name) of
+ {ok, Q} ->
+ rabbit_log:debug("Found queue record ~p, checking if it is a stream~n", [Reference]),
+ case is_stream_queue(Q) of
+ true ->
+ rabbit_log:debug("Queue record ~p is a stream, trying to delete it~n", [Reference]),
+ {ok, _} = rabbit_stream_queue:delete(Q, false, false, Username),
+ rabbit_log:debug("Stream ~p deleted~n", [Reference]),
+ {reply, {ok, deleted}, State};
+ _ ->
+ rabbit_log:debug("Queue record ~p is NOT a stream, returning error~n", [Reference]),
+ {reply, {error, reference_not_found}, State}
+ end;
+ {error, not_found} ->
+ rabbit_log:debug("Stream ~p not found, cannot delete it~n", [Reference]),
+ {reply, {error, reference_not_found}, State}
+ end;
+handle_call({lookup_leader, VirtualHost, Stream}, _From, State) ->
+ Name = #resource{virtual_host = VirtualHost, kind = queue, name = Stream},
+ Res = case rabbit_amqqueue:lookup(Name) of
+ {ok, Q} ->
+ case is_stream_queue(Q) of
+ true ->
+ #{leader_pid := LeaderPid} = amqqueue:get_type_state(Q),
+ LeaderPid;
+ _ ->
+ cluster_not_found
+ end;
+ _ ->
+ cluster_not_found
+ end,
+ {reply, Res, State};
+handle_call({lookup_local_member, VirtualHost, Stream}, _From, State) ->
+ Name = #resource{virtual_host = VirtualHost, kind = queue, name = Stream},
+ Res = case rabbit_amqqueue:lookup(Name) of
+ {ok, Q} ->
+ case is_stream_queue(Q) of
+ true ->
+ #{leader_pid := LeaderPid, replica_pids := ReplicaPids} = amqqueue:get_type_state(Q),
+ LocalMember = lists:foldl(fun(Pid, Acc) ->
+ case node(Pid) =:= node() of
+ true ->
+ Pid;
+ false ->
+ Acc
+ end
+ end, undefined, [LeaderPid] ++ ReplicaPids),
+ case LocalMember of
+ undefined ->
+ {error, not_available};
+ Pid ->
+ {ok, Pid}
+ end;
+ _ ->
+ {error, not_found}
+ end;
+ {error, not_found} ->
+ case rabbit_amqqueue:not_found_or_absent_dirty(Name) of
+ not_found ->
+ {error, not_found};
+ _ ->
+ {error, not_available}
+ end;
+ _ ->
+ {error, not_found}
+ end,
+ {reply, Res, State};
+handle_call({topology, VirtualHost, Stream}, _From, State) ->
+ Name = #resource{virtual_host = VirtualHost, kind = queue, name = Stream},
+ Res = case rabbit_amqqueue:lookup(Name) of
+ {ok, Q} ->
+ case is_stream_queue(Q) of
+ true ->
+ QState = amqqueue:get_type_state(Q),
+ ProcessAliveFun = fun(Pid) ->
+ rpc:call(node(Pid), erlang, is_process_alive, [Pid], 10000)
+ end,
+ LeaderNode = case ProcessAliveFun(maps:get(leader_pid, QState)) of
+ true ->
+ maps:get(leader_node, QState);
+ _ ->
+ undefined
+ end,
+ ReplicaNodes = lists:foldl(fun(Pid, Acc) ->
+ case ProcessAliveFun(Pid) of
+ true ->
+ Acc ++ [node(Pid)];
+ _ ->
+ Acc
+ end
+ end, [], maps:get(replica_pids, QState)),
+ {ok, #{leader_node => LeaderNode, replica_nodes => ReplicaNodes}};
+ _ ->
+ {error, stream_not_found}
+ end;
+ {error, not_found} ->
+ case rabbit_amqqueue:not_found_or_absent_dirty(Name) of
+ not_found ->
+ {error, stream_not_found};
+ _ ->
+ {error, stream_not_available}
+ end;
+ _ ->
+ {error, stream_not_found}
+ end,
+ {reply, Res, State};
+handle_call(which_children, _From, State) ->
+ {reply, [], State}.
+
+handle_cast(_, State) ->
+ {noreply, State}.
+
+handle_info(Info, State) ->
+ rabbit_log:info("Received info ~p~n", [Info]),
+ {noreply, State}.
+
+is_stream_queue(Q) ->
+ case amqqueue:get_type(Q) of
+ rabbit_stream_queue ->
+ true;
+ _ ->
+ false
+ end. \ No newline at end of file
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
new file mode 100644
index 0000000000..d3b4820256
--- /dev/null
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -0,0 +1,1274 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 2.0 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/en-US/MPL/2.0/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is Pivotal Software, Inc.
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_stream_reader).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("rabbit_stream.hrl").
+
+-record(consumer, {
+ socket :: rabbit_net:socket(), %% ranch_transport:socket(),
+ member_pid :: pid(),
+ offset :: osiris:offset(),
+ subscription_id :: integer(),
+ segment :: osiris_log:state(),
+ credit :: integer(),
+ stream :: binary()
+}).
+
+-record(stream_connection_state, {
+ data :: 'none' | binary(),
+ blocked :: boolean(),
+ consumers :: #{integer() => #consumer{}}
+}).
+
+-record(stream_connection, {
+ name :: string(),
+ %% server host
+ host,
+ %% client host
+ peer_host,
+ %% server port
+ port,
+ %% client port
+ peer_port,
+ auth_mechanism,
+ connected_at :: integer(),
+ helper_sup :: pid(),
+ socket :: rabbit_net:socket(),
+ stream_leaders :: #{binary() => pid()},
+ stream_subscriptions :: #{binary() => [integer()]},
+ credits :: atomics:atomics_ref(),
+ authentication_state :: atom(),
+ user :: 'undefined' | #user{},
+ virtual_host :: 'undefined' | binary(),
+ connection_step :: atom(), % tcp_connected, peer_properties_exchanged, authenticating, authenticated, tuning, tuned, opened, failure, closing, closing_done
+ frame_max :: integer(),
+ heartbeat :: integer(),
+ heartbeater :: any(),
+ client_properties = #{} :: #{binary() => binary()},
+ monitors = #{} :: #{reference() => binary()},
+ stats_timer :: reference(),
+ send_file_oct :: atomics:atomics_ref()
+}).
+
+-record(configuration, {
+ initial_credits :: integer(),
+ credits_required_for_unblocking :: integer(),
+ frame_max :: integer(),
+ heartbeat :: integer()
+}).
+
+-define(RESPONSE_FRAME_SIZE, 10). % 2 (key) + 2 (version) + 4 (correlation ID) + 2 (response code)
+-define(CREATION_EVENT_KEYS,
+ [pid, name, port, peer_port, host,
+ peer_host, ssl, peer_cert_subject, peer_cert_issuer,
+ peer_cert_validity, auth_mechanism, ssl_protocol,
+ ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
+ timeout, frame_max, channel_max, client_properties, connected_at,
+ node, user_who_performed_action]).
+-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]).
+-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, state, channels, garbage_collection,
+ timeout]).
+-define(AUTH_NOTIFICATION_INFO_KEYS,
+ [host, name, peer_host, peer_port, protocol, auth_mechanism,
+ ssl, ssl_protocol, ssl_cipher, peer_cert_issuer, peer_cert_subject,
+ peer_cert_validity]).
+
+%% API
+-export([start_link/4, init/1, info/2]).
+
+start_link(KeepaliveSup, Transport, Ref, Opts) ->
+ Pid = proc_lib:spawn_link(?MODULE, init,
+ [[KeepaliveSup, Transport, Ref, Opts]]),
+
+ {ok, Pid}.
+
+init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits,
+ credits_required_for_unblocking := CreditsRequiredBeforeUnblocking,
+ frame_max := FrameMax,
+ heartbeat := Heartbeat}]) ->
+ process_flag(trap_exit, true),
+ {ok, Sock} = rabbit_networking:handshake(Ref,
+ application:get_env(rabbitmq_stream, proxy_protocol, false)),
+ RealSocket = rabbit_net:unwrap_socket(Sock),
+ case rabbit_net:connection_string(Sock, inbound) of
+ {ok, ConnStr} ->
+ Credits = atomics:new(1, [{signed, true}]),
+ SendFileOct = atomics:new(1, [{signed, false}]),
+ atomics:put(SendFileOct, 1, 0),
+ init_credit(Credits, InitialCredits),
+ {PeerHost, PeerPort, Host, Port} =
+ socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
+ Connection = #stream_connection{
+ name = rabbit_data_coercion:to_binary(ConnStr),
+ host = Host,
+ peer_host = PeerHost,
+ port = Port,
+ peer_port = PeerPort,
+ connected_at = os:system_time(milli_seconds),
+ auth_mechanism = none,
+ helper_sup = KeepaliveSup,
+ socket = RealSocket,
+ stream_leaders = #{},
+ stream_subscriptions = #{},
+ credits = Credits,
+ authentication_state = none,
+ connection_step = tcp_connected,
+ frame_max = FrameMax,
+ send_file_oct = SendFileOct},
+ State = #stream_connection_state{
+ consumers = #{}, blocked = false, data = none
+ },
+ Transport:setopts(RealSocket, [{active, once}]),
+
+ listen_loop_pre_auth(Transport, Connection, State, #configuration{
+ initial_credits = InitialCredits,
+ credits_required_for_unblocking = CreditsRequiredBeforeUnblocking,
+ frame_max = FrameMax,
+ heartbeat = Heartbeat
+ });
+ {Error, Reason} ->
+ rabbit_net:fast_close(RealSocket),
+ rabbit_log:warning("Closing connection because of ~p ~p~n", [Error, Reason])
+ end.
+
+socket_op(Sock, Fun) ->
+ RealSocket = rabbit_net:unwrap_socket(Sock),
+ case Fun(Sock) of
+ {ok, Res} -> Res;
+ {error, Reason} ->
+ rabbit_log:warning("Error during socket operation ~p~n", [Reason]),
+ rabbit_net:fast_close(RealSocket),
+ exit(normal)
+ end.
+
+init_credit(CreditReference, Credits) ->
+ atomics:put(CreditReference, 1, Credits).
+
+sub_credits(CreditReference, Credits) ->
+ atomics:sub(CreditReference, 1, Credits).
+
+add_credits(CreditReference, Credits) ->
+ atomics:add(CreditReference, 1, Credits).
+
+has_credits(CreditReference) ->
+ atomics:get(CreditReference, 1) > 0.
+
+has_enough_credits_to_unblock(CreditReference, CreditsRequiredForUnblocking) ->
+ atomics:get(CreditReference, 1) > CreditsRequiredForUnblocking.
+
+listen_loop_pre_auth(Transport, #stream_connection{socket = S} = Connection, State,
+ #configuration{frame_max = FrameMax, heartbeat = Heartbeat} = Configuration) ->
+ {OK, Closed, Error} = Transport:messages(),
+ %% FIXME introduce timeout to complete the connection opening (after block should be enough)
+ receive
+ {OK, S, Data} ->
+ #stream_connection{connection_step = ConnectionStep0} = Connection,
+ {Connection1, State1} = handle_inbound_data_pre_auth(Transport, Connection, State, Data),
+ Transport:setopts(S, [{active, once}]),
+ #stream_connection{connection_step = ConnectionStep} = Connection1,
+ rabbit_log:info("Transitioned from ~p to ~p~n", [ConnectionStep0, ConnectionStep]),
+ case ConnectionStep of
+ authenticated ->
+ TuneFrame = <<?COMMAND_TUNE:16, ?VERSION_0:16, FrameMax:32, Heartbeat:32>>,
+ frame(Transport, Connection1, TuneFrame),
+ listen_loop_pre_auth(Transport, Connection1#stream_connection{connection_step = tuning}, State1, Configuration);
+ opened ->
+ % TODO remove registration to rabbit_stream_connections
+ % just meant to be able to close the connection remotely
+ % should be possible once the connections are available in ctl list_connections
+ pg_local:join(rabbit_stream_connections, self()),
+ Connection2 = rabbit_event:init_stats_timer(Connection1, #stream_connection.stats_timer),
+ Connection3 = ensure_stats_timer(Connection2),
+ Infos = augment_infos_with_user_provided_connection_name(
+ infos(?CREATION_EVENT_KEYS, Connection3, State1),
+ Connection3
+ ),
+ rabbit_core_metrics:connection_created(self(), Infos),
+ rabbit_event:notify(connection_created, Infos),
+ rabbit_networking:register_non_amqp_connection(self()),
+ listen_loop_post_auth(Transport, Connection3, State1, Configuration);
+ failure ->
+ close(Transport, S);
+ _ ->
+ listen_loop_pre_auth(Transport, Connection1, State1, Configuration)
+ end;
+ {Closed, S} ->
+ rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]),
+ ok;
+ {Error, S, Reason} ->
+ rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]);
+ M ->
+ rabbit_log:warning("Unknown message ~p~n", [M]),
+ close(Transport, S)
+ end.
+
+augment_infos_with_user_provided_connection_name(Infos, #stream_connection{client_properties = ClientProperties}) ->
+ case ClientProperties of
+ #{<<"connection_name">> := UserProvidedConnectionName} ->
+ [{user_provided_name, UserProvidedConnectionName} | Infos];
+ _ ->
+ Infos
+ end.
+
+close(Transport, S) ->
+ Transport:shutdown(S, write),
+ Transport:close(S).
+
+listen_loop_post_auth(Transport, #stream_connection{socket = S,
+ stream_subscriptions = StreamSubscriptions, credits = Credits,
+ heartbeater = Heartbeater, monitors = Monitors, client_properties = ClientProperties,
+ send_file_oct = SendFileOct} = Connection0,
+ #stream_connection_state{consumers = Consumers, blocked = Blocked} = State,
+ #configuration{credits_required_for_unblocking = CreditsRequiredForUnblocking} = Configuration) ->
+ Connection = ensure_stats_timer(Connection0),
+ {OK, Closed, Error} = Transport:messages(),
+ receive
+ {OK, S, Data} ->
+ {Connection1, State1} = handle_inbound_data_post_auth(Transport, Connection, State, Data),
+ #stream_connection{connection_step = Step} = Connection1,
+ case Step of
+ closing ->
+ close(Transport, S),
+ rabbit_networking:unregister_non_amqp_connection(self()),
+ notify_connection_closed(Connection1, State1);
+ close_sent ->
+ rabbit_log:debug("Transitioned to close_sent ~n"),
+ Transport:setopts(S, [{active, once}]),
+ listen_loop_post_close(Transport, Connection1, State1, Configuration);
+ _ ->
+ State2 = case Blocked of
+ true ->
+ case has_enough_credits_to_unblock(Credits, CreditsRequiredForUnblocking) of
+ true ->
+ Transport:setopts(S, [{active, once}]),
+ ok = rabbit_heartbeat:resume_monitor(Heartbeater),
+ State1#stream_connection_state{blocked = false};
+ false ->
+ State1
+ end;
+ false ->
+ case has_credits(Credits) of
+ true ->
+ Transport:setopts(S, [{active, once}]),
+ State1;
+ false ->
+ ok = rabbit_heartbeat:pause_monitor(Heartbeater),
+ State1#stream_connection_state{blocked = true}
+ end
+ end,
+ listen_loop_post_auth(Transport, Connection1, State2, Configuration)
+ end;
+ {'DOWN', MonitorRef, process, _OsirisPid, _Reason} ->
+ {Connection1, State1} = case Monitors of
+ #{MonitorRef := Stream} ->
+ Monitors1 = maps:remove(MonitorRef, Monitors),
+ C = Connection#stream_connection{monitors = Monitors1},
+ case clean_state_after_stream_deletion_or_failure(Stream, C, State) of
+ {cleaned, NewConnection, NewState} ->
+ StreamSize = byte_size(Stream),
+ FrameSize = 2 + 2 + 2 + 2 + StreamSize,
+ Transport:send(S, [<<FrameSize:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16,
+ ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16, StreamSize:16, Stream/binary>>]),
+ {NewConnection, NewState};
+ {not_cleaned, SameConnection, SameState} ->
+ {SameConnection, SameState}
+ end;
+ _ ->
+ {Connection, State}
+ end,
+ listen_loop_post_auth(Transport, Connection1, State1, Configuration);
+ {'$gen_cast', {queue_event, _QueueResource, {osiris_written, _QueueResource, CorrelationList}}} ->
+ {FirstPublisherId, _FirstPublishingId} = lists:nth(1, CorrelationList),
+ {LastPublisherId, LastPublishingIds, LastCount} = lists:foldl(fun({PublisherId, PublishingId}, {CurrentPublisherId, PublishingIds, Count}) ->
+ case PublisherId of
+ CurrentPublisherId ->
+ {CurrentPublisherId, [PublishingIds, <<PublishingId:64>>], Count + 1};
+ OtherPublisherId ->
+ FrameSize = 2 + 2 + 1 + 4 + Count * 8,
+ %% FIXME enforce max frame size
+ %% in practice, this should be necessary only for very large chunks and for very small frame size limits
+ Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16>>,
+ <<CurrentPublisherId:8>>,
+ <<Count:32>>, PublishingIds]),
+ {OtherPublisherId, <<PublishingId:64>>, 1}
+ end
+ end, {FirstPublisherId, <<>>, 0}, CorrelationList),
+ FrameSize = 2 + 2 + 1 + 4 + LastCount * 8,
+ Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16>>,
+ <<LastPublisherId:8>>,
+ <<LastCount:32>>, LastPublishingIds]),
+ CorrelationIdCount = length(CorrelationList),
+ add_credits(Credits, CorrelationIdCount),
+ State1 = case Blocked of
+ true ->
+ case has_enough_credits_to_unblock(Credits, CreditsRequiredForUnblocking) of
+ true ->
+ Transport:setopts(S, [{active, once}]),
+ ok = rabbit_heartbeat:resume_monitor(Heartbeater),
+ State#stream_connection_state{blocked = false};
+ false ->
+ State
+ end;
+ false ->
+ State
+ end,
+ listen_loop_post_auth(Transport, Connection, State1, Configuration);
+ {'$gen_cast', {queue_event, #resource{name = StreamName}, {osiris_offset, _QueueResource, -1}}} ->
+ rabbit_log:info("received osiris offset event for ~p with offset ~p~n", [StreamName, -1]),
+ listen_loop_post_auth(Transport, Connection, State, Configuration);
+ {'$gen_cast', {queue_event, #resource{name = StreamName}, {osiris_offset, _QueueResource, Offset}}} when Offset > -1 ->
+ {Connection1, State1} = case maps:get(StreamName, StreamSubscriptions, undefined) of
+ undefined ->
+ rabbit_log:info("osiris offset event for ~p, but no subscription (leftover messages after unsubscribe?)", [StreamName]),
+ {Connection, State};
+ [] ->
+ rabbit_log:info("osiris offset event for ~p, but no registered consumers!", [StreamName]),
+ {Connection#stream_connection{stream_subscriptions = maps:remove(StreamName, StreamSubscriptions)}, State};
+ CorrelationIds when is_list(CorrelationIds) ->
+ Consumers1 = lists:foldl(fun(CorrelationId, ConsumersAcc) ->
+ #{CorrelationId := Consumer} = ConsumersAcc,
+ #consumer{credit = Credit} = Consumer,
+ Consumer1 = case Credit of
+ 0 ->
+ Consumer;
+ _ ->
+ {{segment, Segment1}, {credit, Credit1}} = send_chunks(
+ Transport,
+ Consumer,
+ SendFileOct
+ ),
+ Consumer#consumer{segment = Segment1, credit = Credit1}
+ end,
+ ConsumersAcc#{CorrelationId => Consumer1}
+ end,
+ Consumers,
+ CorrelationIds),
+ {Connection, State#stream_connection_state{consumers = Consumers1}}
+ end,
+ listen_loop_post_auth(Transport, Connection1, State1, Configuration);
+ heartbeat_send ->
+ Frame = <<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>,
+ case catch frame(Transport, Connection, Frame) of
+ ok ->
+ listen_loop_post_auth(Transport, Connection, State, Configuration);
+ Unexpected ->
+ rabbit_log:info("Heartbeat send error ~p, closing connection~n", [Unexpected]),
+ C1 = demonitor_all_streams(Connection),
+ close(Transport, C1)
+ end;
+ heartbeat_timeout ->
+ rabbit_log:info("Heartbeat timeout, closing connection~n"),
+ C1 = demonitor_all_streams(Connection),
+ close(Transport, C1);
+ {infos, From} ->
+ From ! {self(), ClientProperties},
+ listen_loop_post_auth(Transport, Connection, State, Configuration);
+ {'$gen_call', From, info} ->
+ gen_server:reply(From, infos(?INFO_ITEMS, Connection, State)),
+ listen_loop_post_auth(Transport, Connection, State, Configuration);
+ {'$gen_call', From, {info, Items}} ->
+ gen_server:reply(From, infos(Items, Connection, State)),
+ listen_loop_post_auth(Transport, Connection, State, Configuration);
+ emit_stats ->
+ Connection1 = emit_stats(Connection, State),
+ listen_loop_post_auth(Transport, Connection1, State, Configuration);
+ {'$gen_cast', {force_event_refresh, Ref}} ->
+ Infos = augment_infos_with_user_provided_connection_name(
+ infos(?CREATION_EVENT_KEYS, Connection, State),
+ Connection
+ ),
+ rabbit_event:notify(connection_created, Infos, Ref),
+ Connection1 = rabbit_event:init_stats_timer(Connection, #stream_connection.stats_timer),
+ listen_loop_post_auth(Transport, Connection1, State, Configuration);
+ {'$gen_call', From, {shutdown, Explanation}} ->
+ % likely closing call from the management plugin
+ gen_server:reply(From, ok),
+ rabbit_log:info("Forcing stream connection ~p closing: ~p~n", [self(), Explanation]),
+ demonitor_all_streams(Connection),
+ rabbit_networking:unregister_non_amqp_connection(self()),
+ notify_connection_closed(Connection, State),
+ close(Transport, S),
+ ok;
+ {Closed, S} ->
+ demonitor_all_streams(Connection),
+ rabbit_networking:unregister_non_amqp_connection(self()),
+ notify_connection_closed(Connection, State),
+ rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]),
+ ok;
+ {Error, S, Reason} ->
+ demonitor_all_streams(Connection),
+ rabbit_networking:unregister_non_amqp_connection(self()),
+ notify_connection_closed(Connection, State),
+ rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]);
+ M ->
+ rabbit_log:warning("Unknown message ~p~n", [M]),
+ %% FIXME send close
+ listen_loop_post_auth(Transport, Connection, State, Configuration)
+ end.
+
+listen_loop_post_close(Transport, #stream_connection{socket = S} = Connection, State, Configuration) ->
+ {OK, Closed, Error} = Transport:messages(),
+ %% FIXME demonitor streams
+ %% FIXME introduce timeout to complete the connection closing (after block should be enough)
+ receive
+ {OK, S, Data} ->
+ Transport:setopts(S, [{active, once}]),
+ {Connection1, State1} = handle_inbound_data_post_close(Transport, Connection, State, Data),
+ #stream_connection{connection_step = Step} = Connection1,
+ case Step of
+ closing_done ->
+ rabbit_log:debug("Received close confirmation from client"),
+ close(Transport, S),
+ rabbit_networking:unregister_non_amqp_connection(self()),
+ notify_connection_closed(Connection1, State1);
+ _ ->
+ Transport:setopts(S, [{active, once}]),
+ listen_loop_post_close(Transport, Connection1, State1, Configuration)
+ end;
+ {Closed, S} ->
+ rabbit_networking:unregister_non_amqp_connection(self()),
+ notify_connection_closed(Connection, State),
+ rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]),
+ ok;
+ {Error, S, Reason} ->
+ rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]),
+ close(Transport, S),
+ rabbit_networking:unregister_non_amqp_connection(self()),
+ notify_connection_closed(Connection, State);
+ M ->
+ rabbit_log:warning("Ignored message on closing ~p~n", [M])
+ end.
+
+handle_inbound_data_pre_auth(Transport, Connection, State, Rest) ->
+ handle_inbound_data(Transport, Connection, State, Rest, fun handle_frame_pre_auth/5).
+
+handle_inbound_data_post_auth(Transport, Connection, State, Rest) ->
+ handle_inbound_data(Transport, Connection, State, Rest, fun handle_frame_post_auth/5).
+
+handle_inbound_data_post_close(Transport, Connection, State, Rest) ->
+ handle_inbound_data(Transport, Connection, State, Rest, fun handle_frame_post_close/5).
+
+handle_inbound_data(_Transport, Connection, State, <<>>, _HandleFrameFun) ->
+ {Connection, State};
+handle_inbound_data(Transport, #stream_connection{frame_max = FrameMax} = Connection,
+ #stream_connection_state{data = none} = State, <<Size:32, _Frame:Size/binary, _Rest/bits>>, _HandleFrameFun)
+ when FrameMax /= 0 andalso Size > FrameMax - 4 ->
+ CloseReason = <<"frame too large">>,
+ CloseReasonLength = byte_size(CloseReason),
+ CloseFrame = <<?COMMAND_CLOSE:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_FRAME_TOO_LARGE:16,
+ CloseReasonLength:16, CloseReason:CloseReasonLength/binary>>,
+ frame(Transport, Connection, CloseFrame),
+ {Connection#stream_connection{connection_step = close_sent}, State};
+handle_inbound_data(Transport, Connection,
+ #stream_connection_state{data = none} = State, <<Size:32, Frame:Size/binary, Rest/bits>>, HandleFrameFun) ->
+ {Connection1, State1, Rest1} = HandleFrameFun(Transport, Connection, State, Frame, Rest),
+ handle_inbound_data(Transport, Connection1, State1, Rest1, HandleFrameFun);
+handle_inbound_data(_Transport, Connection, #stream_connection_state{data = none} = State, Data, _HandleFrameFun) ->
+ {Connection, State#stream_connection_state{data = Data}};
+handle_inbound_data(Transport, Connection, #stream_connection_state{data = Leftover} = State, Data, HandleFrameFun) ->
+ State1 = State#stream_connection_state{data = none},
+ %% FIXME avoid concatenation to avoid a new binary allocation
+ %% see osiris_replica:parse_chunk/3
+ handle_inbound_data(Transport, Connection, State1, <<Leftover/binary, Data/binary>>, HandleFrameFun).
+
+generate_publishing_error_details(Acc, _Code, <<>>) ->
+ Acc;
+generate_publishing_error_details(Acc, Code, <<PublishingId:64, MessageSize:32, _Message:MessageSize/binary, Rest/binary>>) ->
+ generate_publishing_error_details(
+ <<Acc/binary, PublishingId:64, Code:16>>,
+ Code,
+ Rest).
+
+handle_frame_pre_auth(Transport, #stream_connection{socket = S} = Connection, State,
+ <<?COMMAND_PEER_PROPERTIES:16, ?VERSION_0:16, CorrelationId:32,
+ ClientPropertiesCount:32, ClientPropertiesFrame/binary>>, Rest) ->
+
+ {ClientProperties, _} = rabbit_stream_utils:parse_map(ClientPropertiesFrame, ClientPropertiesCount),
+
+ {ok, Product} = application:get_key(rabbit, description),
+ {ok, Version} = application:get_key(rabbit, vsn),
+
+ %% Get any configuration-specified server properties
+ RawConfigServerProps = application:get_env(rabbit,
+ server_properties, []),
+
+ ConfigServerProperties = lists:foldl(fun({K, V}, Acc) ->
+ maps:put(rabbit_data_coercion:to_binary(K), V, Acc)
+ end, #{}, RawConfigServerProps),
+
+ ServerProperties = maps:merge(ConfigServerProperties, #{
+ <<"product">> => Product,
+ <<"version">> => Version,
+ <<"cluster_name">> => rabbit_nodes:cluster_name(),
+ <<"platform">> => rabbit_misc:platform_and_version(),
+ <<"copyright">> => ?COPYRIGHT_MESSAGE,
+ <<"information">> => ?INFORMATION_MESSAGE
+ }),
+
+ ServerPropertiesCount = map_size(ServerProperties),
+
+ ServerPropertiesFragment = maps:fold(fun(K, V, Acc) ->
+ Key = rabbit_data_coercion:to_binary(K),
+ Value = rabbit_data_coercion:to_binary(V),
+ KeySize = byte_size(Key),
+ ValueSize = byte_size(Value),
+ <<Acc/binary, KeySize:16, Key:KeySize/binary, ValueSize:16, Value:ValueSize/binary>>
+ end, <<>>, ServerProperties),
+
+ Frame = <<?COMMAND_PEER_PROPERTIES:16, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_CODE_OK:16,
+ ServerPropertiesCount:32, ServerPropertiesFragment/binary>>,
+ FrameSize = byte_size(Frame),
+
+ Transport:send(S, [<<FrameSize:32>>, <<Frame/binary>>]),
+ {Connection#stream_connection{client_properties = ClientProperties, authentication_state = peer_properties_exchanged}, State, Rest};
+handle_frame_pre_auth(Transport, #stream_connection{socket = S} = Connection, State,
+ <<?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16, CorrelationId:32>>, Rest) ->
+
+ Mechanisms = rabbit_stream_utils:auth_mechanisms(S),
+ MechanismsFragment = lists:foldl(fun(M, Acc) ->
+ Size = byte_size(M),
+ <<Acc/binary, Size:16, M:Size/binary>>
+ end, <<>>, Mechanisms),
+ MechanismsCount = length(Mechanisms),
+ Frame = <<?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_CODE_OK:16,
+ MechanismsCount:32, MechanismsFragment/binary>>,
+ FrameSize = byte_size(Frame),
+
+ Transport:send(S, [<<FrameSize:32>>, <<Frame/binary>>]),
+ {Connection, State, Rest};
+handle_frame_pre_auth(Transport,
+ #stream_connection{socket = S,
+ authentication_state = AuthState0,
+ host = Host} = Connection0, State,
+ <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32,
+ MechanismLength:16, Mechanism:MechanismLength/binary,
+ SaslFragment/binary>>, Rest) ->
+
+ SaslBin = case SaslFragment of
+ <<-1:32/signed>> ->
+ <<>>;
+ <<SaslBinaryLength:32, SaslBinary:SaslBinaryLength/binary>> ->
+ SaslBinary
+ end,
+
+ {Connection1, Rest1} = case rabbit_stream_utils:auth_mechanism_to_module(Mechanism, S) of
+ {ok, AuthMechanism} ->
+ AuthState = case AuthState0 of
+ none ->
+ AuthMechanism:init(S);
+ AS ->
+ AS
+ end,
+ RemoteAddress = list_to_binary(inet:ntoa(Host)),
+ C1 = Connection0#stream_connection{auth_mechanism = {Mechanism, AuthMechanism}},
+ {C2, FrameFragment} =
+ case AuthMechanism:handle_response(SaslBin, AuthState) of
+ {refused, Username, Msg, Args} ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream),
+ auth_fail(Username, Msg, Args, C1, State),
+ rabbit_log:warning(Msg, Args),
+ {C1#stream_connection{connection_step = failure}, <<?RESPONSE_AUTHENTICATION_FAILURE:16>>};
+ {protocol_error, Msg, Args} ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, <<>>, stream),
+ notify_auth_result(none, user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}],
+ C1, State),
+ rabbit_log:warning(Msg, Args),
+ {C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_ERROR:16>>};
+ {challenge, Challenge, AuthState1} ->
+ rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, <<>>, stream),
+ ChallengeSize = byte_size(Challenge),
+ {C1#stream_connection{authentication_state = AuthState1, connection_step = authenticating},
+ <<?RESPONSE_SASL_CHALLENGE:16, ChallengeSize:32, Challenge/binary>>
+ };
+ {ok, User = #user{username = Username}} ->
+ case rabbit_access_control:check_user_loopback(Username, S) of
+ ok ->
+ rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, Username, stream),
+ notify_auth_result(Username, user_authentication_success,
+ [], C1, State),
+ {C1#stream_connection{authentication_state = done, user = User, connection_step = authenticated},
+ <<?RESPONSE_CODE_OK:16>>
+ };
+ not_allowed ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream),
+ rabbit_log:warning("User '~s' can only connect via localhost~n", [Username]),
+ {C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK:16>>}
+ end
+ end,
+ Frame = <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32, FrameFragment/binary>>,
+ frame(Transport, C1, Frame),
+ {C2, Rest};
+ {error, _} ->
+ Frame = <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_SASL_MECHANISM_NOT_SUPPORTED:16>>,
+ frame(Transport, Connection0, Frame),
+ {Connection0#stream_connection{connection_step = failure}, Rest}
+ end,
+
+ {Connection1, State, Rest1};
+handle_frame_pre_auth(_Transport, #stream_connection{helper_sup = SupPid, socket = Sock, name = ConnectionName} = Connection, State,
+ <<?COMMAND_TUNE:16, ?VERSION_0:16, FrameMax:32, Heartbeat:32>>, Rest) ->
+ rabbit_log:info("Tuning response ~p ~p ~n", [FrameMax, Heartbeat]),
+ Parent = self(),
+ %% sending a message to the main process so the heartbeat frame is sent from this main process
+ %% otherwise heartbeat frames can interleave with chunk delivery
+ %% (chunk delivery is made of 2 calls on the socket, one for the header and one send_file for the chunk,
+ %% we don't want a heartbeat frame to sneak in in-between)
+ SendFun =
+ fun() ->
+ Parent ! heartbeat_send,
+ ok
+ end,
+ ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
+ Heartbeater = rabbit_heartbeat:start(
+ SupPid, Sock, ConnectionName,
+ Heartbeat, SendFun, Heartbeat, ReceiveFun),
+
+ {Connection#stream_connection{connection_step = tuned, frame_max = FrameMax,
+ heartbeat = Heartbeat, heartbeater = Heartbeater}, State, Rest};
+handle_frame_pre_auth(Transport, #stream_connection{user = User, socket = S} = Connection, State,
+ <<?COMMAND_OPEN:16, ?VERSION_0:16, CorrelationId:32,
+ VirtualHostLength:16, VirtualHost:VirtualHostLength/binary>>, Rest) ->
+
+ %% FIXME enforce connection limit (see rabbit_reader:is_over_connection_limit/2)
+
+ {Connection1, Frame} = try
+ rabbit_access_control:check_vhost_access(User, VirtualHost, {socket, S}, #{}),
+ F = <<?COMMAND_OPEN:16, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_CODE_OK:16>>,
+ %% FIXME check if vhost is alive (see rabbit_reader:is_vhost_alive/2)
+ {Connection#stream_connection{connection_step = opened, virtual_host = VirtualHost}, F}
+ catch
+ exit:_ ->
+ Fr = <<?COMMAND_OPEN:16, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_VHOST_ACCESS_FAILURE:16>>,
+ {Connection#stream_connection{connection_step = failure}, Fr}
+ end,
+
+ frame(Transport, Connection1, Frame),
+
+ {Connection1, State, Rest};
+handle_frame_pre_auth(_Transport, Connection, State, <<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>, Rest) ->
+ rabbit_log:info("Received heartbeat frame pre auth~n"),
+ {Connection, State, Rest};
+handle_frame_pre_auth(_Transport, Connection, State, Frame, Rest) ->
+ rabbit_log:warning("unknown frame ~p ~p, closing connection.~n", [Frame, Rest]),
+ {Connection#stream_connection{connection_step = failure}, State, Rest}.
+
+auth_fail(Username, Msg, Args, Connection, ConnectionState) ->
+ notify_auth_result(Username, user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}], Connection, ConnectionState).
+
+notify_auth_result(Username, AuthResult, ExtraProps, Connection, ConnectionState) ->
+ EventProps = [{connection_type, network},
+ {name, case Username of none -> ''; _ -> Username end}] ++
+ [case Item of
+ name -> {connection_name, i(name, Connection, ConnectionState)};
+ _ -> {Item, i(Item, Connection, ConnectionState)}
+ end || Item <- ?AUTH_NOTIFICATION_INFO_KEYS] ++
+ ExtraProps,
+ rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']).
+
+handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credits,
+ virtual_host = VirtualHost, user = User} = Connection, State,
+ <<?COMMAND_PUBLISH:16, ?VERSION_0:16,
+ StreamSize:16, Stream:StreamSize/binary,
+ PublisherId:8/unsigned,
+ MessageCount:32, Messages/binary>>, Rest) ->
+ case rabbit_stream_utils:check_write_permitted(
+ #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
+ User,
+ #{}) of
+ ok ->
+ case lookup_leader(Stream, Connection) of
+ cluster_not_found ->
+ FrameSize = 2 + 2 + 1 + 4 + (8 + 2) * MessageCount,
+ Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, Messages),
+ Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_ERROR:16, ?VERSION_0:16,
+ PublisherId:8,
+ MessageCount:32, Details/binary>>]),
+ {Connection, State, Rest};
+ {ClusterLeader, Connection1} ->
+ rabbit_stream_utils:write_messages(ClusterLeader, PublisherId, Messages),
+ sub_credits(Credits, MessageCount),
+ {Connection1, State, Rest}
+ end;
+ error ->
+ FrameSize = 2 + 2 + 1 + 4 + (8 + 2) * MessageCount,
+ Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_ACCESS_REFUSED, Messages),
+ Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_ERROR:16, ?VERSION_0:16,
+ PublisherId:8,
+ MessageCount:32, Details/binary>>]),
+ {Connection, State, Rest}
+ end;
+handle_frame_post_auth(Transport, #stream_connection{socket = Socket,
+ stream_subscriptions = StreamSubscriptions, virtual_host = VirtualHost, user = User,
+ send_file_oct = SendFileOct} = Connection,
+ #stream_connection_state{consumers = Consumers} = State,
+ <<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:8/unsigned, StreamSize:16, Stream:StreamSize/binary,
+ OffsetType:16/signed, OffsetAndCredit/binary>>, Rest) ->
+ case rabbit_stream_utils:check_read_permitted(
+ #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
+ User,
+ #{}) of
+ ok ->
+ case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of
+ {error, not_available} ->
+ response(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE),
+ {Connection, State, Rest};
+ {error, not_found} ->
+ response(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST),
+ {Connection, State, Rest};
+ {ok, LocalMemberPid} ->
+ case subscription_exists(StreamSubscriptions, SubscriptionId) of
+ true ->
+ response(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS),
+ {Connection, State, Rest};
+ false ->
+ {OffsetSpec, Credit} = case OffsetType of
+ ?OFFSET_TYPE_FIRST ->
+ <<Crdt:16>> = OffsetAndCredit,
+ {first, Crdt};
+ ?OFFSET_TYPE_LAST ->
+ <<Crdt:16>> = OffsetAndCredit,
+ {last, Crdt};
+ ?OFFSET_TYPE_NEXT ->
+ <<Crdt:16>> = OffsetAndCredit,
+ {next, Crdt};
+ ?OFFSET_TYPE_OFFSET ->
+ <<Offset:64/unsigned, Crdt:16>> = OffsetAndCredit,
+ {Offset, Crdt};
+ ?OFFSET_TYPE_TIMESTAMP ->
+ <<Timestamp:64/signed, Crdt:16>> = OffsetAndCredit,
+ {{timestamp, Timestamp}, Crdt}
+ end,
+ {ok, Segment} = osiris:init_reader(LocalMemberPid, OffsetSpec),
+ ConsumerState = #consumer{
+ member_pid = LocalMemberPid, offset = OffsetSpec, subscription_id = SubscriptionId, socket = Socket,
+ segment = Segment,
+ credit = Credit,
+ stream = Stream
+ },
+
+ Connection1 = maybe_monitor_stream(LocalMemberPid, Stream, Connection),
+
+ response_ok(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId),
+
+ {{segment, Segment1}, {credit, Credit1}} = send_chunks(
+ Transport,
+ ConsumerState,
+ SendFileOct
+ ),
+ Consumers1 = Consumers#{SubscriptionId => ConsumerState#consumer{segment = Segment1, credit = Credit1}},
+
+ StreamSubscriptions1 =
+ case StreamSubscriptions of
+ #{Stream := SubscriptionIds} ->
+ StreamSubscriptions#{Stream => [SubscriptionId] ++ SubscriptionIds};
+ _ ->
+ StreamSubscriptions#{Stream => [SubscriptionId]}
+ end,
+ {Connection1#stream_connection{stream_subscriptions = StreamSubscriptions1}, State#stream_connection_state{consumers = Consumers1}, Rest}
+ end
+ end;
+ error ->
+ response(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED),
+ {Connection, State, Rest}
+ end;
+handle_frame_post_auth(Transport, #stream_connection{stream_subscriptions = StreamSubscriptions,
+ stream_leaders = StreamLeaders} = Connection,
+ #stream_connection_state{consumers = Consumers} = State,
+ <<?COMMAND_UNSUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:8/unsigned>>, Rest) ->
+ case subscription_exists(StreamSubscriptions, SubscriptionId) of
+ false ->
+ response(Transport, Connection, ?COMMAND_UNSUBSCRIBE, CorrelationId, ?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST),
+ {Connection, State, Rest};
+ true ->
+ #{SubscriptionId := Consumer} = Consumers,
+ Stream = Consumer#consumer.stream,
+ #{Stream := SubscriptionsForThisStream} = StreamSubscriptions,
+ SubscriptionsForThisStream1 = lists:delete(SubscriptionId, SubscriptionsForThisStream),
+ {Connection1, StreamSubscriptions1, StreamLeaders1} =
+ case length(SubscriptionsForThisStream1) of
+ 0 ->
+ %% no more subscriptions for this stream
+ %% we unregister even though it could affect publishing if the stream is published to
+ %% from this connection and is deleted.
+ %% to mitigate this, we remove the stream from the leaders cache
+ %% this way the stream leader will be looked up in the next publish command
+ %% and registered to.
+ C = demonitor_stream(Stream, Connection),
+ {C, maps:remove(Stream, StreamSubscriptions),
+ maps:remove(Stream, StreamLeaders)
+ };
+ _ ->
+ {Connection, StreamSubscriptions#{Stream => SubscriptionsForThisStream1}, StreamLeaders}
+ end,
+ Consumers1 = maps:remove(SubscriptionId, Consumers),
+ response_ok(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId),
+ {Connection1#stream_connection{
+ stream_subscriptions = StreamSubscriptions1,
+ stream_leaders = StreamLeaders1
+ }, State#stream_connection_state{consumers = Consumers1}, Rest}
+ end;
+handle_frame_post_auth(Transport, #stream_connection{socket = S, send_file_oct = SendFileOct} = Connection,
+ #stream_connection_state{consumers = Consumers} = State,
+ <<?COMMAND_CREDIT:16, ?VERSION_0:16, SubscriptionId:8/unsigned, Credit:16/signed>>, Rest) ->
+
+ case Consumers of
+ #{SubscriptionId := Consumer} ->
+ #consumer{credit = AvailableCredit} = Consumer,
+
+ {{segment, Segment1}, {credit, Credit1}} = send_chunks(
+ Transport,
+ Consumer,
+ AvailableCredit + Credit,
+ SendFileOct
+ ),
+
+ Consumer1 = Consumer#consumer{segment = Segment1, credit = Credit1},
+ {Connection, State#stream_connection_state{consumers = Consumers#{SubscriptionId => Consumer1}}, Rest};
+ _ ->
+ rabbit_log:warning("Giving credit to unknown subscription: ~p~n", [SubscriptionId]),
+ Frame = <<?COMMAND_CREDIT:16, ?VERSION_0:16, ?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST:16, SubscriptionId:8>>,
+ FrameSize = byte_size(Frame),
+ Transport:send(S, [<<FrameSize:32>>, Frame]),
+ {Connection, State, Rest}
+ end;
+handle_frame_post_auth(_Transport, #stream_connection{virtual_host = VirtualHost, user = User} = Connection,
+ State,
+ <<?COMMAND_COMMIT_OFFSET:16, ?VERSION_0:16, _CorrelationId:32,
+ ReferenceSize:16, Reference:ReferenceSize/binary,
+ StreamSize:16, Stream:StreamSize/binary, Offset:64>>, Rest) ->
+
+ case rabbit_stream_utils:check_write_permitted(
+ #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
+ User,
+ #{}) of
+ ok ->
+ case lookup_leader(Stream, Connection) of
+ cluster_not_found ->
+ rabbit_log:info("Could not find leader to commit offset on ~p~n", [Stream]),
+ %% FIXME commit offset is fire-and-forget, so no response even if error, change this?
+ {Connection, State, Rest};
+ {ClusterLeader, Connection1} ->
+ osiris:write_tracking(ClusterLeader, Reference, Offset),
+ {Connection1, State, Rest}
+ end;
+ error ->
+ %% FIXME commit offset is fire-and-forget, so no response even if error, change this?
+ rabbit_log:info("Not authorized to commit offset on ~p~n", [Stream]),
+ {Connection, State, Rest}
+ end;
+handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = VirtualHost, user = User} = Connection,
+ State,
+ <<?COMMAND_QUERY_OFFSET:16, ?VERSION_0:16, CorrelationId:32,
+ ReferenceSize:16, Reference:ReferenceSize/binary,
+ StreamSize:16, Stream:StreamSize/binary>>, Rest) ->
+ FrameSize = ?RESPONSE_FRAME_SIZE + 8,
+ {ResponseCode, Offset} = case rabbit_stream_utils:check_read_permitted(
+ #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
+ User,
+ #{}) of
+ ok ->
+ case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of
+ {error, not_found} ->
+ {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0};
+ {ok, LocalMemberPid} ->
+ {?RESPONSE_CODE_OK, case osiris:read_tracking(LocalMemberPid, Reference) of
+ undefined ->
+ 0;
+ Offt ->
+ Offt
+ end}
+ end;
+ error ->
+ {?RESPONSE_CODE_ACCESS_REFUSED, 0}
+ end,
+ Transport:send(S, [<<FrameSize:32, ?COMMAND_QUERY_OFFSET:16, ?VERSION_0:16>>,
+ <<CorrelationId:32>>, <<ResponseCode:16>>, <<Offset:64>>]),
+ {Connection, State, Rest};
+handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost, user = #user{username = Username} = User} = Connection,
+ State,
+ <<?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary,
+ ArgumentsCount:32, ArgumentsBinary/binary>>, Rest) ->
+ case rabbit_stream_utils:enforce_correct_stream_name(Stream) of
+ {ok, StreamName} ->
+ {Arguments, _Rest} = rabbit_stream_utils:parse_map(ArgumentsBinary, ArgumentsCount),
+ case rabbit_stream_utils:check_configure_permitted(
+ #resource{name = StreamName, kind = queue, virtual_host = VirtualHost},
+ User,
+ #{}) of
+ ok ->
+ case rabbit_stream_manager:create(VirtualHost, StreamName, Arguments, Username) of
+ {ok, #{leader_pid := LeaderPid, replica_pids := ReturnedReplicas}} ->
+ rabbit_log:info("Created cluster with leader ~p and replicas ~p~n", [LeaderPid, ReturnedReplicas]),
+ response_ok(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId),
+ {Connection, State, Rest};
+ {error, validation_failed} ->
+ response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED),
+ {Connection, State, Rest};
+ {error, reference_already_exists} ->
+ response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_STREAM_ALREADY_EXISTS),
+ {Connection, State, Rest};
+ {error, _} ->
+ response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_INTERNAL_ERROR),
+ {Connection, State, Rest}
+ end;
+ error ->
+ response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED),
+ {Connection, State, Rest}
+ end;
+ _ ->
+ response(Transport, Connection, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED),
+ {Connection, State, Rest}
+ end;
+handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = VirtualHost,
+ user = #user{username = Username} = User} = Connection, State,
+ <<?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary>>, Rest) ->
+ case rabbit_stream_utils:check_configure_permitted(
+ #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
+ User,
+ #{}) of
+ ok ->
+ case rabbit_stream_manager:delete(VirtualHost, Stream, Username) of
+ {ok, deleted} ->
+ response_ok(Transport, Connection, ?COMMAND_DELETE_STREAM, CorrelationId),
+ {Connection1, State1} = case clean_state_after_stream_deletion_or_failure(Stream, Connection, State) of
+ {cleaned, NewConnection, NewState} ->
+ StreamSize = byte_size(Stream),
+ FrameSize = 2 + 2 + 2 + 2 + StreamSize,
+ Transport:send(S, [<<FrameSize:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16,
+ ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16, StreamSize:16, Stream/binary>>]),
+ {NewConnection, NewState};
+ {not_cleaned, SameConnection, SameState} ->
+ {SameConnection, SameState}
+ end,
+ {Connection1, State1, Rest};
+ {error, reference_not_found} ->
+ response(Transport, Connection, ?COMMAND_DELETE_STREAM, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST),
+ {Connection, State, Rest}
+ end;
+ error ->
+ response(Transport, Connection, ?COMMAND_DELETE_STREAM, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED),
+ {Connection, State, Rest}
+ end;
+handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = VirtualHost} = Connection, State,
+ <<?COMMAND_METADATA:16, ?VERSION_0:16, CorrelationId:32, StreamCount:32, BinaryStreams/binary>>, Rest) ->
+ Streams = rabbit_stream_utils:extract_stream_list(BinaryStreams, []),
+
+ %% get the nodes involved in the streams
+ NodesMap = lists:foldl(fun(Stream, Acc) ->
+ case rabbit_stream_manager:topology(VirtualHost, Stream) of
+ {ok, #{leader_node := undefined, replica_nodes := ReplicaNodes}} ->
+ lists:foldl(fun(ReplicaNode, NodesAcc) -> maps:put(ReplicaNode, ok, NodesAcc) end, Acc, ReplicaNodes);
+ {ok, #{leader_node := LeaderNode, replica_nodes := ReplicaNodes}} ->
+ Acc1 = maps:put(LeaderNode, ok, Acc),
+ lists:foldl(fun(ReplicaNode, NodesAcc) -> maps:put(ReplicaNode, ok, NodesAcc) end, Acc1, ReplicaNodes);
+ {error, _} ->
+ Acc
+ end
+ end, #{}, Streams),
+
+ Nodes = maps:keys(NodesMap),
+ {NodesInfo, _} = lists:foldl(fun(Node, {Acc, Index}) ->
+ Host = rpc:call(Node, rabbit_stream, host, []),
+ Port = rpc:call(Node, rabbit_stream, port, []),
+ case {is_binary(Host), is_integer(Port)} of
+ {true, true} ->
+ {Acc#{Node => {{index, Index}, {host, Host}, {port, Port}}}, Index + 1};
+ _ ->
+ rabbit_log:warning("Error when retrieving broker metadata: ~p ~p~n", [Host, Port]),
+ {Acc, Index}
+ end
+ end, {#{}, 0}, Nodes),
+
+ BrokersCount = map_size(NodesInfo),
+ BrokersBin = maps:fold(fun(_K, {{index, Index}, {host, Host}, {port, Port}}, Acc) ->
+ HostLength = byte_size(Host),
+ <<Acc/binary, Index:16, HostLength:16, Host:HostLength/binary, Port:32>>
+ end, <<BrokersCount:32>>, NodesInfo),
+
+
+ MetadataBin = lists:foldl(fun(Stream, Acc) ->
+ StreamLength = byte_size(Stream),
+ case rabbit_stream_manager:topology(VirtualHost, Stream) of
+ {error, stream_not_found} ->
+ <<Acc/binary, StreamLength:16, Stream:StreamLength/binary, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST:16,
+ -1:16, 0:32>>;
+ {error, stream_not_available} ->
+ <<Acc/binary, StreamLength:16, Stream:StreamLength/binary, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16,
+ -1:16, 0:32>>;
+ {ok, #{leader_node := LeaderNode, replica_nodes := Replicas}} ->
+ LeaderIndex = case NodesInfo of
+ #{LeaderNode := NodeInfo} ->
+ {{index, LeaderIdx}, {host, _}, {port, _}} = NodeInfo,
+ LeaderIdx;
+ _ ->
+ -1
+ end,
+ {ReplicasBinary, ReplicasCount} = lists:foldl(fun(Replica, {Bin, Count}) ->
+ case NodesInfo of
+ #{Replica := NI} ->
+ {{index, ReplicaIndex}, {host, _}, {port, _}} = NI,
+ {<<Bin/binary, ReplicaIndex:16>>, Count + 1};
+ _ ->
+ {Bin, Count}
+ end
+
+
+ end, {<<>>, 0}, Replicas),
+ <<Acc/binary, StreamLength:16, Stream:StreamLength/binary, ?RESPONSE_CODE_OK:16,
+ LeaderIndex:16, ReplicasCount:32, ReplicasBinary/binary>>
+ end
+
+ end, <<StreamCount:32>>, Streams),
+ Frame = <<?COMMAND_METADATA:16, ?VERSION_0:16, CorrelationId:32, BrokersBin/binary, MetadataBin/binary>>,
+ FrameSize = byte_size(Frame),
+ Transport:send(S, <<FrameSize:32, Frame/binary>>),
+ {Connection, State, Rest};
+handle_frame_post_auth(Transport, Connection, State,
+ <<?COMMAND_CLOSE:16, ?VERSION_0:16, CorrelationId:32,
+ ClosingCode:16, ClosingReasonLength:16, ClosingReason:ClosingReasonLength/binary>>, _Rest) ->
+ rabbit_log:info("Received close command ~p ~p~n", [ClosingCode, ClosingReason]),
+ Frame = <<?COMMAND_CLOSE:16, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_CODE_OK:16>>,
+ frame(Transport, Connection, Frame),
+ {Connection#stream_connection{connection_step = closing}, State, <<>>}; %% we ignore any subsequent frames
+handle_frame_post_auth(_Transport, Connection, State, <<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>, Rest) ->
+ rabbit_log:info("Received heartbeat frame post auth~n"),
+ {Connection, State, Rest};
+handle_frame_post_auth(Transport, Connection, State, Frame, Rest) ->
+ rabbit_log:warning("unknown frame ~p ~p, sending close command.~n", [Frame, Rest]),
+ CloseReason = <<"unknown frame">>,
+ CloseReasonLength = byte_size(CloseReason),
+ CloseFrame = <<?COMMAND_CLOSE:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_UNKNOWN_FRAME:16,
+ CloseReasonLength:16, CloseReason:CloseReasonLength/binary>>,
+ frame(Transport, Connection, CloseFrame),
+ {Connection#stream_connection{connection_step = close_sent}, State, Rest}.
+
+notify_connection_closed(#stream_connection{name = Name} = Connection, ConnectionState) ->
+ rabbit_core_metrics:connection_closed(self()),
+ ClientProperties = i(client_properties, Connection, ConnectionState),
+ EventProperties = [{name, Name},
+ {pid, self()},
+ {node, node()},
+ {client_properties, ClientProperties}],
+ rabbit_event:notify(connection_closed,
+ augment_infos_with_user_provided_connection_name(EventProperties, Connection)).
+
+handle_frame_post_close(_Transport, Connection, State,
+ <<?COMMAND_CLOSE:16, ?VERSION_0:16, _CorrelationId:32, _ResponseCode:16>>, Rest) ->
+ rabbit_log:info("Received close confirmation~n"),
+ {Connection#stream_connection{connection_step = closing_done}, State, Rest};
+handle_frame_post_close(_Transport, Connection, State, <<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>, Rest) ->
+ rabbit_log:info("Received heartbeat frame post close~n"),
+ {Connection, State, Rest};
+handle_frame_post_close(_Transport, Connection, State, Frame, Rest) ->
+ rabbit_log:warning("ignored frame on close ~p ~p.~n", [Frame, Rest]),
+ {Connection, State, Rest}.
+
+clean_state_after_stream_deletion_or_failure(Stream, #stream_connection{stream_leaders = StreamLeaders, stream_subscriptions = StreamSubscriptions} = Connection,
+ #stream_connection_state{consumers = Consumers} = State) ->
+ case {maps:is_key(Stream, StreamSubscriptions), maps:is_key(Stream, StreamLeaders)} of
+ {true, _} ->
+ #{Stream := SubscriptionIds} = StreamSubscriptions,
+ {cleaned, Connection#stream_connection{
+ stream_leaders = maps:remove(Stream, StreamLeaders),
+ stream_subscriptions = maps:remove(Stream, StreamSubscriptions)
+ }, State#stream_connection_state{consumers = maps:without(SubscriptionIds, Consumers)}};
+ {false, true} ->
+ {cleaned, Connection#stream_connection{
+ stream_leaders = maps:remove(Stream, StreamLeaders)
+ }, State};
+ {false, false} ->
+ {not_cleaned, Connection, State}
+ end.
+
+lookup_leader(Stream, #stream_connection{stream_leaders = StreamLeaders, virtual_host = VirtualHost} = Connection) ->
+ case maps:get(Stream, StreamLeaders, undefined) of
+ undefined ->
+ case lookup_leader_from_manager(VirtualHost, Stream) of
+ cluster_not_found ->
+ cluster_not_found;
+ LeaderPid ->
+ Connection1 = maybe_monitor_stream(LeaderPid, Stream, Connection),
+ {LeaderPid, Connection1#stream_connection{stream_leaders = StreamLeaders#{Stream => LeaderPid}}}
+ end;
+ LeaderPid ->
+ {LeaderPid, Connection}
+ end.
+
+lookup_leader_from_manager(VirtualHost, Stream) ->
+ rabbit_stream_manager:lookup_leader(VirtualHost, Stream).
+
+maybe_monitor_stream(Pid, Stream, #stream_connection{monitors = Monitors} = Connection) ->
+ case lists:member(Stream, maps:values(Monitors)) of
+ true ->
+ Connection;
+ false ->
+ MonitorRef = monitor(process, Pid),
+ Connection#stream_connection{monitors = maps:put(MonitorRef, Stream, Monitors)}
+ end.
+
+demonitor_stream(Stream, #stream_connection{monitors = Monitors0} = Connection) ->
+ Monitors = maps:fold(fun(MonitorRef, Strm, Acc) ->
+ case Strm of
+ Stream ->
+ Acc;
+ _ ->
+ maps:put(MonitorRef, Strm, Acc)
+
+ end
+ end, #{}, Monitors0),
+ Connection#stream_connection{monitors = Monitors}.
+
+demonitor_all_streams(#stream_connection{monitors = Monitors} = Connection) ->
+ lists:foreach(fun(MonitorRef) ->
+ demonitor(MonitorRef, [flush])
+ end, maps:keys(Monitors)),
+ Connection#stream_connection{monitors = #{}}.
+
+frame(Transport, #stream_connection{socket = S}, Frame) ->
+ FrameSize = byte_size(Frame),
+ Transport:send(S, [<<FrameSize:32>>, Frame]).
+
+response_ok(Transport, State, CommandId, CorrelationId) ->
+ response(Transport, State, CommandId, CorrelationId, ?RESPONSE_CODE_OK).
+
+response(Transport, #stream_connection{socket = S}, CommandId, CorrelationId, ResponseCode) ->
+ Transport:send(S, [<<?RESPONSE_FRAME_SIZE:32, CommandId:16, ?VERSION_0:16>>, <<CorrelationId:32>>, <<ResponseCode:16>>]).
+
+subscription_exists(StreamSubscriptions, SubscriptionId) ->
+ SubscriptionIds = lists:flatten(maps:values(StreamSubscriptions)),
+ lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds).
+
+send_file_callback(Transport, #consumer{socket = S, subscription_id = SubscriptionId}, Counter) ->
+ fun(Size) ->
+ FrameSize = 2 + 2 + 1 + Size,
+ FrameBeginning = <<FrameSize:32, ?COMMAND_DELIVER:16, ?VERSION_0:16, SubscriptionId:8/unsigned>>,
+ Transport:send(S, FrameBeginning),
+ atomics:add(Counter, 1, Size)
+ end.
+
+send_chunks(Transport, #consumer{credit = Credit} = State, Counter) ->
+ send_chunks(Transport, State, Credit, Counter).
+
+send_chunks(_Transport, #consumer{segment = Segment}, 0, _Counter) ->
+ {{segment, Segment}, {credit, 0}};
+send_chunks(Transport, #consumer{segment = Segment} = State, Credit, Counter) ->
+ send_chunks(Transport, State, Segment, Credit, true, Counter).
+
+send_chunks(_Transport, _State, Segment, 0 = _Credit, _Retry, _Counter) ->
+ {{segment, Segment}, {credit, 0}};
+send_chunks(Transport, #consumer{socket = S} = State, Segment, Credit, Retry, Counter) ->
+ case osiris_log:send_file(S, Segment, send_file_callback(Transport, State, Counter)) of
+ {ok, Segment1} ->
+ send_chunks(
+ Transport,
+ State,
+ Segment1,
+ Credit - 1,
+ true,
+ Counter
+ );
+ {end_of_stream, Segment1} ->
+ case Retry of
+ true ->
+ timer:sleep(1),
+ send_chunks(Transport, State, Segment1, Credit, false, Counter);
+ false ->
+ #consumer{member_pid = LocalMember} = State,
+ osiris:register_offset_listener(LocalMember, osiris_log:next_offset(Segment1)),
+ {{segment, Segment1}, {credit, Credit}}
+ end
+ end.
+
+emit_stats(Connection, ConnectionState) ->
+ [{_, Pid}, {_, Recv_oct}, {_, Send_oct}, {_, Reductions}] = I
+ = infos(?SIMPLE_METRICS, Connection, ConnectionState),
+ Infos = infos(?OTHER_METRICS, Connection, ConnectionState),
+ rabbit_core_metrics:connection_stats(Pid, Infos),
+ rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions),
+ rabbit_event:notify(connection_stats, Infos ++ I),
+ Connection1 = rabbit_event:reset_stats_timer(Connection, #stream_connection.stats_timer),
+ ensure_stats_timer(Connection1).
+
+ensure_stats_timer(Connection = #stream_connection{}) ->
+ rabbit_event:ensure_stats_timer(Connection, #stream_connection.stats_timer, emit_stats).
+
+info(Pid, InfoItems) ->
+ case InfoItems -- ?INFO_ITEMS of
+ [] ->
+ gen_server2:call(Pid, {info, InfoItems});
+ UnknownItems -> throw({bad_argument, UnknownItems})
+ end.
+
+infos(Items, Connection, State) -> [{Item, i(Item, Connection, State)} || Item <- Items].
+
+i(pid, _, _) -> self();
+i(node, _, _) -> node();
+i(SockStat, #stream_connection{socket = Sock, send_file_oct = Counter}, _) when
+ SockStat =:= send_oct -> % Number of bytes sent from the socket.
+ case rabbit_net:getstat(Sock, [SockStat]) of
+ {ok, [{_, N}]} when is_number(N) -> N + atomics:get(Counter, 1);
+ _ -> 0 + atomics:get(Counter, 1)
+ end;
+i(SockStat, #stream_connection{socket = Sock}, _) when
+ SockStat =:= recv_oct; % Number of bytes received by the socket.
+ SockStat =:= recv_cnt; % Number of packets received by the socket.
+ SockStat =:= send_cnt; % Number of packets sent from the socket.
+ SockStat =:= send_pend -> % Number of bytes waiting to be sent by the socket.
+ case rabbit_net:getstat(Sock, [SockStat]) of
+ {ok, [{_, N}]} when is_number(N) -> N;
+ _ -> 0
+ end;
+i(reductions, _, _) ->
+ {reductions, Reductions} = erlang:process_info(self(), reductions),
+ Reductions;
+i(garbage_collection, _, _) ->
+ rabbit_misc:get_gc_info(self());
+i(state, Connection, ConnectionState) -> i(connection_state, Connection, ConnectionState);
+i(timeout, Connection, ConnectionState) -> i(heartbeat, Connection, ConnectionState);
+i(name, Connection, ConnectionState) -> i(conn_name, Connection, ConnectionState);
+i(conn_name, #stream_connection{name = Name}, _) -> Name;
+i(port, #stream_connection{port = Port}, _) -> Port;
+i(peer_port, #stream_connection{peer_port = PeerPort}, _) -> PeerPort;
+i(host, #stream_connection{host = Host}, _) -> Host;
+i(peer_host, #stream_connection{peer_host = PeerHost}, _) -> PeerHost;
+i(ssl, _, _) -> false;
+i(peer_cert_subject, _, _) -> '';
+i(peer_cert_issuer, _, _) -> '';
+i(peer_cert_validity, _, _) -> '';
+i(ssl_protocol, _, _) -> '';
+i(ssl_key_exchange, _, _) -> '';
+i(ssl_cipher, _, _) -> '';
+i(ssl_hash, _, _) -> '';
+i(channels, _, _) -> 0;
+i(protocol, _, _) -> {<<"stream">>, ""};
+i(user_who_performed_action, Connection, ConnectionState) -> i(user, Connection, ConnectionState);
+i(user, #stream_connection{user = U}, _) -> U#user.username;
+i(vhost, #stream_connection{virtual_host = VirtualHost}, _) -> VirtualHost;
+i(subscriptions, _, #stream_connection_state{consumers = Consumers}) -> maps:size(Consumers);
+i(connection_state, _Connection, #stream_connection_state{blocked = true}) -> blocked;
+i(connection_state, _Connection, #stream_connection_state{blocked = false}) -> running;
+i(auth_mechanism, #stream_connection{auth_mechanism = none}, _) -> none;
+i(auth_mechanism, #stream_connection{auth_mechanism = {Name, _Mod}}, _) -> Name;
+i(heartbeat, #stream_connection{heartbeat = Heartbeat}, _) -> Heartbeat;
+i(frame_max, #stream_connection{frame_max = FrameMax}, _) -> FrameMax;
+i(channel_max, _, _) -> 0;
+i(client_properties, #stream_connection{client_properties = CP}, _) -> rabbit_misc:to_amqp_table(CP);
+i(connected_at, #stream_connection{connected_at = T}, _) -> T;
+i(Item, #stream_connection{}, _) -> throw({bad_argument, Item}). \ No newline at end of file
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_sup.erl b/deps/rabbitmq_stream/src/rabbit_stream_sup.erl
new file mode 100644
index 0000000000..b331b47356
--- /dev/null
+++ b/deps/rabbitmq_stream/src/rabbit_stream_sup.erl
@@ -0,0 +1,61 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 2.0 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/en-US/MPL/2.0/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is Pivotal Software, Inc.
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_stream_sup).
+-behaviour(supervisor).
+
+-export([start_link/0]).
+-export([init/1]).
+
+-include("rabbit_stream.hrl").
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+ {ok, Listeners} = application:get_env(rabbitmq_stream, tcp_listeners),
+ NumTcpAcceptors = application:get_env(rabbitmq_stream, num_tcp_acceptors, 10),
+ {ok, SocketOpts} = application:get_env(rabbitmq_stream, tcp_listen_options),
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ OsirisConf = #{nodes => Nodes},
+
+ ServerConfiguration = #{
+ initial_credits => application:get_env(rabbitmq_stream, initial_credits, ?DEFAULT_INITIAL_CREDITS),
+ credits_required_for_unblocking => application:get_env(rabbitmq_stream, credits_required_for_unblocking, ?DEFAULT_CREDITS_REQUIRED_FOR_UNBLOCKING),
+ frame_max => application:get_env(rabbit_stream, frame_max, ?DEFAULT_FRAME_MAX),
+ heartbeat => application:get_env(rabbit_stream, heartbeat, ?DEFAULT_HEARTBEAT)
+ },
+
+ StreamManager = #{id => rabbit_stream_manager,
+ type => worker,
+ start => {rabbit_stream_manager, start_link, [OsirisConf]}},
+
+ {ok, {{one_for_all, 10, 10},
+ [StreamManager] ++
+ listener_specs(fun tcp_listener_spec/1,
+ [SocketOpts, ServerConfiguration, NumTcpAcceptors], Listeners)}}.
+
+listener_specs(Fun, Args, Listeners) ->
+ [Fun([Address | Args]) ||
+ Listener <- Listeners,
+ Address <- rabbit_networking:tcp_listener_addresses(Listener)].
+
+tcp_listener_spec([Address, SocketOpts, Configuration, NumAcceptors]) ->
+ rabbit_networking:tcp_listener_spec(
+ rabbit_stream_listener_sup, Address, SocketOpts,
+ ranch_tcp, rabbit_stream_connection_sup, Configuration,
+ stream, NumAcceptors, "Stream TCP listener").
+
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
new file mode 100644
index 0000000000..c20aacb12c
--- /dev/null
+++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
@@ -0,0 +1,125 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 2.0 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/en-US/MPL/2.0/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is Pivotal Software, Inc.
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_stream_utils).
+
+%% API
+-export([enforce_correct_stream_name/1, write_messages/3, parse_map/2,
+ auth_mechanisms/1, auth_mechanism_to_module/2,
+ check_configure_permitted/3, check_write_permitted/3, check_read_permitted/3,
+ extract_stream_list/2]).
+
+-define(MAX_PERMISSION_CACHE_SIZE, 12).
+
+enforce_correct_stream_name(Name) ->
+ % from rabbit_channel
+ StrippedName = binary:replace(Name, [<<"\n">>, <<"\r">>], <<"">>, [global]),
+ case check_name(StrippedName) of
+ ok ->
+ {ok, StrippedName};
+ error ->
+ error
+ end.
+
+check_name(<<"amq.", _/binary>>) ->
+ error;
+check_name(<<"">>) ->
+ error;
+check_name(_Name) ->
+ ok.
+
+write_messages(_ClusterLeader, _PublisherId, <<>>) ->
+ ok;
+write_messages(ClusterLeader, PublisherId, <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) ->
+ % FIXME handle write error
+ ok = osiris:write(ClusterLeader, {PublisherId, PublishingId}, Message),
+ write_messages(ClusterLeader, PublisherId, Rest);
+write_messages(ClusterLeader, PublisherId, <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, MessageCount:16, BatchSize:32, Batch:BatchSize/binary, Rest/binary>>) ->
+ % FIXME handle write error
+ ok = osiris:write(ClusterLeader, {PublisherId, PublishingId}, {batch, MessageCount, CompressionType, Batch}),
+ write_messages(ClusterLeader, PublisherId, Rest).
+
+
+parse_map(<<>>, _Count) ->
+ {#{}, <<>>};
+parse_map(Content, 0) ->
+ {#{}, Content};
+parse_map(Arguments, Count) ->
+ parse_map(#{}, Arguments, Count).
+
+parse_map(Acc, <<>>, _Count) ->
+ {Acc, <<>>};
+parse_map(Acc, Content, 0) ->
+ {Acc, Content};
+parse_map(Acc, <<KeySize:16, Key:KeySize/binary, ValueSize:16, Value:ValueSize/binary, Rest/binary>>, Count) ->
+ parse_map(maps:put(Key, Value, Acc), Rest, Count - 1).
+
+auth_mechanisms(Sock) ->
+ {ok, Configured} = application:get_env(rabbit, auth_mechanisms),
+ [rabbit_data_coercion:to_binary(Name) || {Name, Module} <- rabbit_registry:lookup_all(auth_mechanism),
+ Module:should_offer(Sock), lists:member(Name, Configured)].
+
+auth_mechanism_to_module(TypeBin, Sock) ->
+ case rabbit_registry:binary_to_type(TypeBin) of
+ {error, not_found} ->
+ rabbit_log:warning("Unknown authentication mechanism '~p'~n", [TypeBin]),
+ {error, not_found};
+ T ->
+ case {lists:member(TypeBin, rabbit_stream_utils:auth_mechanisms(Sock)),
+ rabbit_registry:lookup_module(auth_mechanism, T)} of
+ {true, {ok, Module}} ->
+ {ok, Module};
+ _ ->
+ rabbit_log:warning("Invalid authentication mechanism '~p'~n", [T]),
+ {error, invalid}
+ end
+ end.
+
+check_resource_access(User, Resource, Perm, Context) ->
+ V = {Resource, Context, Perm},
+
+ Cache = case get(permission_cache) of
+ undefined -> [];
+ Other -> Other
+ end,
+ case lists:member(V, Cache) of
+ true -> ok;
+ false ->
+ try
+ rabbit_access_control:check_resource_access(
+ User, Resource, Perm, Context),
+ CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1),
+ put(permission_cache, [V | CacheTail]),
+ ok
+ catch
+ exit:_ ->
+ error
+ end
+ end.
+
+check_configure_permitted(Resource, User, Context) ->
+ check_resource_access(User, Resource, configure, Context).
+
+check_write_permitted(Resource, User, Context) ->
+ check_resource_access(User, Resource, write, Context).
+
+check_read_permitted(Resource, User, Context) ->
+ check_resource_access(User, Resource, read, Context).
+
+extract_stream_list(<<>>, Streams) ->
+ Streams;
+extract_stream_list(<<Length:16, Stream:Length/binary, Rest/binary>>, Streams) ->
+ extract_stream_list(Rest, [Stream | Streams]). \ No newline at end of file