diff options
-rw-r--r-- | deps/rabbitmq_stream/rebar.config | 2 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl | 78 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream.erl | 111 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_connection_sup.erl | 44 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_manager.erl | 341 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 2782 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_sup.erl | 61 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_utils.erl | 199 | ||||
-rw-r--r-- | deps/rabbitmq_stream/test/command_SUITE.erl | 91 | ||||
-rw-r--r-- | deps/rabbitmq_stream/test/config_schema_SUITE.erl | 27 | ||||
-rw-r--r-- | deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl | 354 |
11 files changed, 1763 insertions, 2327 deletions
diff --git a/deps/rabbitmq_stream/rebar.config b/deps/rabbitmq_stream/rebar.config index d911323332..e5b44cb4eb 100644 --- a/deps/rabbitmq_stream/rebar.config +++ b/deps/rabbitmq_stream/rebar.config @@ -2,5 +2,5 @@ {format, [ {files, ["src/*.erl", "test/*.erl"]}, - {formatter, otp_formatter} + {formatter, default_formatter} ]}. diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl index 7782e9757e..b3c8ec6031 100644 --- a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl +++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl @@ -19,42 +19,34 @@ -behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour'). --export([formatter/0, - scopes/0, - switches/0, - aliases/0, - usage/0, - usage_additional/0, - usage_doc_guides/0, - banner/2, - validate/2, - merge_defaults/2, - run/2, - output/2, - description/0, - help_section/0]). +-export([formatter/0, scopes/0, switches/0, aliases/0, usage/0, usage_additional/0, + usage_doc_guides/0, banner/2, validate/2, merge_defaults/2, run/2, output/2, + description/0, help_section/0]). -formatter() -> 'Elixir.RabbitMQ.CLI.Formatters.Table'. +formatter() -> + 'Elixir.RabbitMQ.CLI.Formatters.Table'. -scopes() -> [ctl, diagnostics, streams]. +scopes() -> + [ctl, diagnostics, streams]. -switches() -> [{verbose, boolean}]. +switches() -> + [{verbose, boolean}]. -aliases() -> [{'V', verbose}]. +aliases() -> + [{'V', verbose}]. description() -> - <<"Lists stream connections on the target " - "node">>. + <<"Lists stream connections on the target node">>. -help_section() -> {plugin, stream}. +help_section() -> + {plugin, stream}. validate(Args, _) -> - case - 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':validate_info_keys(Args, - ?INFO_ITEMS) - of - {ok, _} -> ok; - Error -> Error + case 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':validate_info_keys(Args, ?INFO_ITEMS) of + {ok, _} -> + ok; + Error -> + Error end. merge_defaults([], Opts) -> @@ -62,26 +54,31 @@ merge_defaults([], Opts) -> merge_defaults(Args, Opts) -> {Args, maps:merge(#{verbose => false}, Opts)}. -usage() -> <<"list_stream_connections [<column> ...]">>. +usage() -> + <<"list_stream_connections [<column> ...]">>. usage_additional() -> Prefix = <<" must be one of ">>, - InfoItems = 'Elixir.Enum':join(lists:usort(?INFO_ITEMS), - <<", ">>), + InfoItems = + 'Elixir.Enum':join( + lists:usort(?INFO_ITEMS), <<", ">>), [{<<"<column>">>, <<Prefix/binary, InfoItems/binary>>}]. -usage_doc_guides() -> [?STREAM_GUIDE_URL]. +usage_doc_guides() -> + [?STREAM_GUIDE_URL]. run(Args, - #{node := NodeName, timeout := Timeout, + #{node := NodeName, + timeout := Timeout, verbose := Verbose}) -> - InfoKeys = case Verbose of - true -> ?INFO_ITEMS; - false -> - 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':prepare_info_keys(Args) - end, - Nodes = - 'Elixir.RabbitMQ.CLI.Core.Helpers':nodes_in_cluster(NodeName), + InfoKeys = + case Verbose of + true -> + ?INFO_ITEMS; + false -> + 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':prepare_info_keys(Args) + end, + Nodes = 'Elixir.RabbitMQ.CLI.Core.Helpers':nodes_in_cluster(NodeName), 'Elixir.RabbitMQ.CLI.Ctl.RpcStream':receive_list_items(NodeName, rabbit_stream, emit_connection_info_all, @@ -90,7 +87,8 @@ run(Args, InfoKeys, length(Nodes)). -banner(_, _) -> <<"Listing stream connections ...">>. +banner(_, _) -> + <<"Listing stream connections ...">>. output(Result, _Opts) -> 'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result). diff --git a/deps/rabbitmq_stream/src/rabbit_stream.erl b/deps/rabbitmq_stream/src/rabbit_stream.erl index 96e6c02eba..b3c76d603b 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream.erl @@ -19,101 +19,88 @@ -behaviour(application). -export([start/2, host/0, port/0, kill_connection/1]). - -export([stop/1]). - --export([emit_connection_info_local/3, - emit_connection_info_all/4, - list/0]). +-export([emit_connection_info_local/3, emit_connection_info_all/4, list/0]). -include_lib("rabbit_common/include/rabbit.hrl"). -start(_Type, _Args) -> rabbit_stream_sup:start_link(). +start(_Type, _Args) -> + rabbit_stream_sup:start_link(). host() -> - case application:get_env(rabbitmq_stream, - advertised_host, - undefined) - of - undefined -> hostname_from_node(); - Host -> rabbit_data_coercion:to_binary(Host) + case application:get_env(rabbitmq_stream, advertised_host, undefined) of + undefined -> + hostname_from_node(); + Host -> + rabbit_data_coercion:to_binary(Host) end. hostname_from_node() -> - case re:split(rabbit_data_coercion:to_binary(node()), - "@", - [{return, binary}, {parts, 2}]) - of - [_, Hostname] -> Hostname; + case re:split( + rabbit_data_coercion:to_binary(node()), "@", [{return, binary}, {parts, 2}]) + of + [_, Hostname] -> + Hostname; [_] -> - rabbit_data_coercion:to_binary(inet:gethostname()) + rabbit_data_coercion:to_binary( + inet:gethostname()) end. port() -> - case application:get_env(rabbitmq_stream, - advertised_port, - undefined) - of - undefined -> port_from_listener(); - Port -> Port + case application:get_env(rabbitmq_stream, advertised_port, undefined) of + undefined -> + port_from_listener(); + Port -> + Port end. port_from_listener() -> Listeners = rabbit_networking:node_listeners(node()), - Port = lists:foldl(fun (#listener{port = Port, - protocol = stream}, - _Acc) -> - Port; - (_, Acc) -> Acc - end, - undefined, - Listeners), + Port = + lists:foldl(fun (#listener{port = Port, protocol = stream}, _Acc) -> + Port; + (_, Acc) -> + Acc + end, + undefined, + Listeners), Port. -stop(_State) -> ok. +stop(_State) -> + ok. kill_connection(ConnectionName) -> - ConnectionNameBin = - rabbit_data_coercion:to_binary(ConnectionName), - lists:foreach(fun (ConnectionPid) -> - ConnectionPid ! {infos, self()}, - receive - {ConnectionPid, - #{<<"connection_name">> := ConnectionNameBin}} -> - exit(ConnectionPid, kill); - {ConnectionPid, _ClientProperties} -> ok - after 1000 -> ok - end + ConnectionNameBin = rabbit_data_coercion:to_binary(ConnectionName), + lists:foreach(fun(ConnectionPid) -> + ConnectionPid ! {infos, self()}, + receive + {ConnectionPid, #{<<"connection_name">> := ConnectionNameBin}} -> + exit(ConnectionPid, kill); + {ConnectionPid, _ClientProperties} -> ok + after 1000 -> ok + end end, pg_local:get_members(rabbit_stream_connections)). -emit_connection_info_all(Nodes, Items, Ref, - AggregatorPid) -> - Pids = [spawn_link(Node, - rabbit_stream, - emit_connection_info_local, - [Items, Ref, AggregatorPid]) - || Node <- Nodes], +emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) -> + Pids = + [spawn_link(Node, rabbit_stream, emit_connection_info_local, [Items, Ref, AggregatorPid]) + || Node <- Nodes], rabbit_control_misc:await_emitters_termination(Pids), ok. emit_connection_info_local(Items, Ref, AggregatorPid) -> rabbit_control_misc:emitting_map_with_exit_handler(AggregatorPid, Ref, - fun (Pid) -> - rabbit_stream_reader:info(Pid, - Items) + fun(Pid) -> + rabbit_stream_reader:info(Pid, Items) end, list()). list() -> [Client - || {_, ListSupPid, _, _} - <- supervisor2:which_children(rabbit_stream_sup), - {_, RanchSup, supervisor, _} - <- supervisor2:which_children(ListSupPid), - {ranch_conns_sup, ConnSup, _, _} - <- supervisor:which_children(RanchSup), + || {_, ListSupPid, _, _} <- supervisor2:which_children(rabbit_stream_sup), + {_, RanchSup, supervisor, _} <- supervisor2:which_children(ListSupPid), + {ranch_conns_sup, ConnSup, _, _} <- supervisor:which_children(RanchSup), {_, CliSup, _, _} <- supervisor:which_children(ConnSup), - {rabbit_stream_reader, Client, _, _} - <- supervisor:which_children(CliSup)]. + {rabbit_stream_reader, Client, _, _} <- supervisor:which_children(CliSup)]. diff --git a/deps/rabbitmq_stream/src/rabbit_stream_connection_sup.erl b/deps/rabbitmq_stream/src/rabbit_stream_connection_sup.erl index d1b13c337d..5d1a2d0968 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_connection_sup.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_connection_sup.erl @@ -17,38 +17,33 @@ -module(rabbit_stream_connection_sup). -behaviour(supervisor2). - -behaviour(ranch_protocol). -include_lib("rabbit_common/include/rabbit.hrl"). -export([start_link/4, start_keepalive_link/0]). - -export([init/1]). start_link(Ref, _Sock, Transport, Opts) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), - {ok, KeepaliveSup} = supervisor2:start_child(SupPid, - {rabbit_stream_keepalive_sup, - {rabbit_stream_connection_sup, - start_keepalive_link, - []}, - intrinsic, - infinity, - supervisor, - [rabbit_keepalive_sup]}), - {ok, ReaderPid} = supervisor2:start_child(SupPid, - {rabbit_stream_reader, - {rabbit_stream_reader, - start_link, - [KeepaliveSup, - Transport, - Ref, - Opts]}, - intrinsic, - ?WORKER_WAIT, - worker, - [rabbit_stream_reader]}), + {ok, KeepaliveSup} = + supervisor2:start_child(SupPid, + {rabbit_stream_keepalive_sup, + {rabbit_stream_connection_sup, start_keepalive_link, []}, + intrinsic, + infinity, + supervisor, + [rabbit_keepalive_sup]}), + {ok, ReaderPid} = + supervisor2:start_child(SupPid, + {rabbit_stream_reader, + {rabbit_stream_reader, + start_link, + [KeepaliveSup, Transport, Ref, Opts]}, + intrinsic, + ?WORKER_WAIT, + worker, + [rabbit_stream_reader]}), {ok, SupPid, ReaderPid}. start_keepalive_link() -> @@ -56,4 +51,5 @@ start_keepalive_link() -> %%---------------------------------------------------------------------------- -init([]) -> {ok, {{one_for_all, 0, 1}, []}}. +init([]) -> + {ok, {{one_for_all, 0, 1}, []}}. diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 3a4ed00271..2a215d56b2 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -21,150 +21,92 @@ -include_lib("rabbit_common/include/rabbit.hrl"). %% API --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2]). - --export([start_link/1, - create/4, - delete/3, - lookup_leader/2, - lookup_local_member/2, +-export([init/1, handle_call/3, handle_cast/2, handle_info/2]). +-export([start_link/1, create/4, delete/3, lookup_leader/2, lookup_local_member/2, topology/2]). -record(state, {configuration}). start_link(Conf) -> - gen_server:start_link({local, ?MODULE}, - ?MODULE, - [Conf], - []). - -init([Conf]) -> {ok, #state{configuration = Conf}}. + gen_server:start_link({local, ?MODULE}, ?MODULE, [Conf], []). --spec create(binary(), binary(), - #{binary() => binary()}, binary()) -> {ok, map()} | - {error, - reference_already_exists} | - {error, internal_error}. +init([Conf]) -> + {ok, #state{configuration = Conf}}. +-spec create(binary(), binary(), #{binary() => binary()}, binary()) -> + {ok, map()} | {error, reference_already_exists} | {error, internal_error}. create(VirtualHost, Reference, Arguments, Username) -> - gen_server:call(?MODULE, - {create, VirtualHost, Reference, Arguments, Username}). - --spec delete(binary(), binary(), binary()) -> {ok, - deleted} | - {error, reference_not_found}. + gen_server:call(?MODULE, {create, VirtualHost, Reference, Arguments, Username}). +-spec delete(binary(), binary(), binary()) -> + {ok, deleted} | {error, reference_not_found}. delete(VirtualHost, Reference, Username) -> - gen_server:call(?MODULE, - {delete, VirtualHost, Reference, Username}). - --spec lookup_leader(binary(), binary()) -> pid() | - cluster_not_found. + gen_server:call(?MODULE, {delete, VirtualHost, Reference, Username}). +-spec lookup_leader(binary(), binary()) -> pid() | cluster_not_found. lookup_leader(VirtualHost, Stream) -> - gen_server:call(?MODULE, - {lookup_leader, VirtualHost, Stream}). - --spec lookup_local_member(binary(), binary()) -> {ok, - pid()} | - {error, not_found}. + gen_server:call(?MODULE, {lookup_leader, VirtualHost, Stream}). +-spec lookup_local_member(binary(), binary()) -> {ok, pid()} | {error, not_found}. lookup_local_member(VirtualHost, Stream) -> - gen_server:call(?MODULE, - {lookup_local_member, VirtualHost, Stream}). - --spec topology(binary(), binary()) -> {ok, - #{leader_node => pid(), - replica_nodes => [pid()]}} | - {error, stream_not_found}. + gen_server:call(?MODULE, {lookup_local_member, VirtualHost, Stream}). +-spec topology(binary(), binary()) -> + {ok, #{leader_node => pid(), replica_nodes => [pid()]}} | + {error, stream_not_found}. topology(VirtualHost, Stream) -> - gen_server:call(?MODULE, - {topology, VirtualHost, Stream}). + gen_server:call(?MODULE, {topology, VirtualHost, Stream}). stream_queue_arguments(Arguments) -> - stream_queue_arguments([{<<"x-queue-type">>, - longstr, - <<"stream">>}], - Arguments). + stream_queue_arguments([{<<"x-queue-type">>, longstr, <<"stream">>}], Arguments). -stream_queue_arguments(ArgumentsAcc, Arguments) - when map_size(Arguments) =:= 0 -> +stream_queue_arguments(ArgumentsAcc, Arguments) when map_size(Arguments) =:= 0 -> ArgumentsAcc; -stream_queue_arguments(ArgumentsAcc, - #{<<"max-length-bytes">> := Value} = Arguments) -> - stream_queue_arguments([{<<"x-max-length-bytes">>, - long, - binary_to_integer(Value)}] - ++ ArgumentsAcc, +stream_queue_arguments(ArgumentsAcc, #{<<"max-length-bytes">> := Value} = Arguments) -> + stream_queue_arguments([{<<"x-max-length-bytes">>, long, binary_to_integer(Value)}] + ++ ArgumentsAcc, maps:remove(<<"max-length-bytes">>, Arguments)); -stream_queue_arguments(ArgumentsAcc, - #{<<"max-age">> := Value} = Arguments) -> - stream_queue_arguments([{<<"x-max-age">>, - longstr, - Value}] - ++ ArgumentsAcc, +stream_queue_arguments(ArgumentsAcc, #{<<"max-age">> := Value} = Arguments) -> + stream_queue_arguments([{<<"x-max-age">>, longstr, Value}] ++ ArgumentsAcc, maps:remove(<<"max-age">>, Arguments)); -stream_queue_arguments(ArgumentsAcc, - #{<<"max-segment-size">> := Value} = Arguments) -> - stream_queue_arguments([{<<"x-max-segment-size">>, - long, - binary_to_integer(Value)}] - ++ ArgumentsAcc, +stream_queue_arguments(ArgumentsAcc, #{<<"max-segment-size">> := Value} = Arguments) -> + stream_queue_arguments([{<<"x-max-segment-size">>, long, binary_to_integer(Value)}] + ++ ArgumentsAcc, maps:remove(<<"max-segment-size">>, Arguments)); stream_queue_arguments(ArgumentsAcc, #{<<"initial-cluster-size">> := Value} = Arguments) -> - stream_queue_arguments([{<<"x-initial-cluster-size">>, - long, - binary_to_integer(Value)}] - ++ ArgumentsAcc, + stream_queue_arguments([{<<"x-initial-cluster-size">>, long, binary_to_integer(Value)}] + ++ ArgumentsAcc, maps:remove(<<"initial-cluster-size">>, Arguments)); stream_queue_arguments(ArgumentsAcc, #{<<"queue-leader-locator">> := Value} = Arguments) -> - stream_queue_arguments([{<<"x-queue-leader-locator">>, - longstr, - Value}] - ++ ArgumentsAcc, + stream_queue_arguments([{<<"x-queue-leader-locator">>, longstr, Value}] ++ ArgumentsAcc, maps:remove(<<"queue-leader-locator">>, Arguments)); stream_queue_arguments(ArgumentsAcc, _Arguments) -> ArgumentsAcc. -validate_stream_queue_arguments([]) -> ok; -validate_stream_queue_arguments([{<<"x-initial-cluster-size">>, - long, - ClusterSize} - | _]) +validate_stream_queue_arguments([]) -> + ok; +validate_stream_queue_arguments([{<<"x-initial-cluster-size">>, long, ClusterSize} | _]) when ClusterSize =< 0 -> error; -validate_stream_queue_arguments([{<<"x-queue-leader-locator">>, - longstr, - Locator} - | T]) -> - case lists:member(Locator, - [<<"client-local">>, <<"random">>, <<"least-leaders">>]) - of - true -> validate_stream_queue_arguments(T); - false -> error +validate_stream_queue_arguments([{<<"x-queue-leader-locator">>, longstr, Locator} | T]) -> + case lists:member(Locator, [<<"client-local">>, <<"random">>, <<"least-leaders">>]) of + true -> + validate_stream_queue_arguments(T); + false -> + error end; validate_stream_queue_arguments([_ | T]) -> validate_stream_queue_arguments(T). -handle_call({create, - VirtualHost, - Reference, - Arguments, - Username}, - _From, State) -> - Name = #resource{virtual_host = VirtualHost, - kind = queue, name = Reference}, - StreamQueueArguments = - stream_queue_arguments(Arguments), - case - validate_stream_queue_arguments(StreamQueueArguments) - of +handle_call({create, VirtualHost, Reference, Arguments, Username}, _From, State) -> + Name = + #resource{virtual_host = VirtualHost, + kind = queue, + name = Reference}, + StreamQueueArguments = stream_queue_arguments(Arguments), + case validate_stream_queue_arguments(StreamQueueArguments) of ok -> Q0 = amqqueue:new(Name, none, @@ -175,7 +117,8 @@ handle_call({create, VirtualHost, #{user => Username}, rabbit_stream_queue), - try case rabbit_stream_queue:declare(Q0, node()) of + try + case rabbit_stream_queue:declare(Q0, node()) of {new, Q} -> {reply, {ok, amqqueue:get_type_state(Q)}, State}; {existing, _} -> @@ -187,151 +130,147 @@ handle_call({create, end catch exit:Error -> - rabbit_log:info("Error while creating ~p stream, ~p~n", - [Reference, Error]), + rabbit_log:info("Error while creating ~p stream, ~p~n", [Reference, Error]), {reply, {error, internal_error}, State} end; - error -> {reply, {error, validation_failed}, State} + error -> + {reply, {error, validation_failed}, State} end; -handle_call({delete, VirtualHost, Reference, Username}, - _From, State) -> - Name = #resource{virtual_host = VirtualHost, - kind = queue, name = Reference}, - rabbit_log:debug("Trying to delete stream ~p~n", - [Reference]), +handle_call({delete, VirtualHost, Reference, Username}, _From, State) -> + Name = + #resource{virtual_host = VirtualHost, + kind = queue, + name = Reference}, + rabbit_log:debug("Trying to delete stream ~p~n", [Reference]), case rabbit_amqqueue:lookup(Name) of {ok, Q} -> - rabbit_log:debug("Found queue record ~p, checking if it " - "is a stream~n", - [Reference]), + rabbit_log:debug("Found queue record ~p, checking if it is a stream~n", [Reference]), case is_stream_queue(Q) of true -> - rabbit_log:debug("Queue record ~p is a stream, trying " - "to delete it~n", + rabbit_log:debug("Queue record ~p is a stream, trying to delete it~n", [Reference]), - {ok, _} = rabbit_stream_queue:delete(Q, - false, - false, - Username), + {ok, _} = rabbit_stream_queue:delete(Q, false, false, Username), rabbit_log:debug("Stream ~p deleted~n", [Reference]), {reply, {ok, deleted}, State}; _ -> - rabbit_log:debug("Queue record ~p is NOT a stream, returning " - "error~n", + rabbit_log:debug("Queue record ~p is NOT a stream, returning error~n", [Reference]), {reply, {error, reference_not_found}, State} end; {error, not_found} -> - rabbit_log:debug("Stream ~p not found, cannot delete it~n", - [Reference]), + rabbit_log:debug("Stream ~p not found, cannot delete it~n", [Reference]), {reply, {error, reference_not_found}, State} end; -handle_call({lookup_leader, VirtualHost, Stream}, _From, - State) -> - Name = #resource{virtual_host = VirtualHost, - kind = queue, name = Stream}, +handle_call({lookup_leader, VirtualHost, Stream}, _From, State) -> + Name = + #resource{virtual_host = VirtualHost, + kind = queue, + name = Stream}, Res = case rabbit_amqqueue:lookup(Name) of {ok, Q} -> case is_stream_queue(Q) of true -> - #{leader_pid := LeaderPid} = - amqqueue:get_type_state(Q), + #{leader_pid := LeaderPid} = amqqueue:get_type_state(Q), % FIXME check if pid is alive in case of stale information LeaderPid; - _ -> cluster_not_found + _ -> + cluster_not_found end; - _ -> cluster_not_found + _ -> + cluster_not_found end, {reply, Res, State}; -handle_call({lookup_local_member, VirtualHost, Stream}, - _From, State) -> - Name = #resource{virtual_host = VirtualHost, - kind = queue, name = Stream}, +handle_call({lookup_local_member, VirtualHost, Stream}, _From, State) -> + Name = + #resource{virtual_host = VirtualHost, + kind = queue, + name = Stream}, Res = case rabbit_amqqueue:lookup(Name) of {ok, Q} -> case is_stream_queue(Q) of true -> - #{leader_pid := LeaderPid, - replica_pids := ReplicaPids} = + #{leader_pid := LeaderPid, replica_pids := ReplicaPids} = amqqueue:get_type_state(Q), - LocalMember = lists:foldl(fun (Pid, Acc) -> - case node(Pid) =:= - node() - of - true -> Pid; - false -> Acc - end - end, - undefined, - [LeaderPid] ++ ReplicaPids), + LocalMember = + lists:foldl(fun(Pid, Acc) -> + case node(Pid) =:= node() of + true -> Pid; + false -> Acc + end + end, + undefined, + [LeaderPid] ++ ReplicaPids), % FIXME check if pid is alive in case of stale information case LocalMember of - undefined -> {error, not_available}; - Pid -> {ok, Pid} + undefined -> + {error, not_available}; + Pid -> + {ok, Pid} end; - _ -> {error, not_found} + _ -> + {error, not_found} end; {error, not_found} -> case rabbit_amqqueue:not_found_or_absent_dirty(Name) of - not_found -> {error, not_found}; - _ -> {error, not_available} + not_found -> + {error, not_found}; + _ -> + {error, not_available} end; - _ -> {error, not_found} + _ -> + {error, not_found} end, {reply, Res, State}; -handle_call({topology, VirtualHost, Stream}, _From, - State) -> - Name = #resource{virtual_host = VirtualHost, - kind = queue, name = Stream}, +handle_call({topology, VirtualHost, Stream}, _From, State) -> + Name = + #resource{virtual_host = VirtualHost, + kind = queue, + name = Stream}, Res = case rabbit_amqqueue:lookup(Name) of {ok, Q} -> case is_stream_queue(Q) of true -> QState = amqqueue:get_type_state(Q), - ProcessAliveFun = fun (Pid) -> - rpc:call(node(Pid), - erlang, - is_process_alive, - [Pid], - 10000) - end, - LeaderNode = case ProcessAliveFun(maps:get(leader_pid, - QState)) - of - true -> - maps:get(leader_node, QState); - _ -> undefined - end, - ReplicaNodes = lists:foldl(fun (Pid, Acc) -> - case - ProcessAliveFun(Pid) - of - true -> - Acc ++ - [node(Pid)]; - _ -> Acc - end - end, - [], - maps:get(replica_pids, - QState)), - {ok, - #{leader_node => LeaderNode, - replica_nodes => ReplicaNodes}}; - _ -> {error, stream_not_found} + ProcessAliveFun = + fun(Pid) -> + rpc:call(node(Pid), erlang, is_process_alive, [Pid], 10000) + end, + LeaderNode = + case ProcessAliveFun(maps:get(leader_pid, QState)) of + true -> + maps:get(leader_node, QState); + _ -> + undefined + end, + ReplicaNodes = + lists:foldl(fun(Pid, Acc) -> + case ProcessAliveFun(Pid) of + true -> Acc ++ [node(Pid)]; + _ -> Acc + end + end, + [], + maps:get(replica_pids, QState)), + {ok, #{leader_node => LeaderNode, replica_nodes => ReplicaNodes}}; + _ -> + {error, stream_not_found} end; {error, not_found} -> case rabbit_amqqueue:not_found_or_absent_dirty(Name) of - not_found -> {error, stream_not_found}; - _ -> {error, stream_not_available} + not_found -> + {error, stream_not_found}; + _ -> + {error, stream_not_available} end; - _ -> {error, stream_not_found} + _ -> + {error, stream_not_found} end, {reply, Res, State}; handle_call(which_children, _From, State) -> {reply, [], State}. -handle_cast(_, State) -> {noreply, State}. +handle_cast(_, State) -> + {noreply, State}. handle_info(Info, State) -> rabbit_log:info("Received info ~p~n", [Info]), @@ -339,6 +278,8 @@ handle_info(Info, State) -> is_stream_queue(Q) -> case amqqueue:get_type(Q) of - rabbit_stream_queue -> true; - _ -> false + rabbit_stream_queue -> + true; + _ -> + false end. diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index c1535c4b7b..31f7af92df 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -21,11 +21,8 @@ -include("rabbit_stream.hrl"). -type stream() :: binary(). - -type publisher_id() :: byte(). - -type publisher_reference() :: binary(). - -type subscription_id() :: byte(). -record(publisher, @@ -33,7 +30,6 @@ stream :: stream(), reference :: undefined | publisher_reference(), leader :: pid()}). - -record(consumer, {socket :: rabbit_net:socket(), %% ranch_transport:socket(), @@ -43,12 +39,10 @@ segment :: osiris_log:state(), credit :: integer(), stream :: stream()}). - -record(stream_connection_state, {data :: none | binary(), blocked :: boolean(), consumers :: #{subscription_id() => #consumer{}}}). - -record(stream_connection, {name :: string(), %% server host @@ -66,11 +60,9 @@ publishers :: #{publisher_id() => #publisher{}}, %% FIXME replace with a list (0-255 lookup faster?) - publisher_to_ids :: - #{{stream(), publisher_reference()} => publisher_id()}, + publisher_to_ids :: #{{stream(), publisher_reference()} => publisher_id()}, stream_leaders :: #{stream() => pid()}, - stream_subscriptions :: - #{stream() => [subscription_id()]}, + stream_subscriptions :: #{stream() => [subscription_id()]}, credits :: atomics:atomics_ref(), authentication_state :: atom(), user :: undefined | #user{}, @@ -84,7 +76,6 @@ monitors = #{} :: #{reference() => stream()}, stats_timer :: reference(), send_file_oct :: atomics:atomics_ref()}). - -record(configuration, {initial_credits :: integer(), credits_required_for_unblocking :: integer(), @@ -93,7 +84,6 @@ -define(RESPONSE_FRAME_SIZE, 10). % 2 (key) + 2 (version) + 4 (correlation ID) + 2 (response code) - -define(CREATION_EVENT_KEYS, [pid, name, @@ -120,19 +110,9 @@ connected_at, node, user_who_performed_action]). - --define(SIMPLE_METRICS, - [pid, recv_oct, send_oct, reductions]). - +-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]). -define(OTHER_METRICS, - [recv_cnt, - send_cnt, - send_pend, - state, - channels, - garbage_collection, - timeout]). - + [recv_cnt, send_cnt, send_pend, state, channels, garbage_collection, timeout]). -define(AUTH_NOTIFICATION_INFO_KEYS, [host, name, @@ -151,23 +131,20 @@ -export([start_link/4, init/1, info/2]). start_link(KeepaliveSup, Transport, Ref, Opts) -> - Pid = proc_lib:spawn_link(?MODULE, - init, - [[KeepaliveSup, Transport, Ref, Opts]]), + Pid = proc_lib:spawn_link(?MODULE, init, [[KeepaliveSup, Transport, Ref, Opts]]), {ok, Pid}. init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits, - credits_required_for_unblocking := - CreditsRequiredBeforeUnblocking, - frame_max := FrameMax, heartbeat := Heartbeat}]) -> + credits_required_for_unblocking := CreditsRequiredBeforeUnblocking, + frame_max := FrameMax, + heartbeat := Heartbeat}]) -> process_flag(trap_exit, true), - {ok, Sock} = rabbit_networking:handshake(Ref, - application:get_env(rabbitmq_stream, - proxy_protocol, - false)), + {ok, Sock} = + rabbit_networking:handshake(Ref, + application:get_env(rabbitmq_stream, proxy_protocol, false)), RealSocket = rabbit_net:unwrap_socket(Sock), case rabbit_net:connection_string(Sock, inbound) of {ok, ConnStr} -> @@ -175,55 +152,52 @@ init([KeepaliveSup, SendFileOct = atomics:new(1, [{signed, false}]), atomics:put(SendFileOct, 1, 0), init_credit(Credits, InitialCredits), - {PeerHost, PeerPort, Host, Port} = socket_op(Sock, - fun (S) -> - rabbit_net:socket_ends(S, - inbound) - end), - Connection = #stream_connection{name = - rabbit_data_coercion:to_binary(ConnStr), - host = Host, peer_host = PeerHost, - port = Port, peer_port = PeerPort, - connected_at = - os:system_time(milli_seconds), - auth_mechanism = none, - helper_sup = KeepaliveSup, - socket = RealSocket, - publishers = #{}, - publisher_to_ids = #{}, - stream_leaders = #{}, - stream_subscriptions = #{}, - credits = Credits, - authentication_state = none, - connection_step = tcp_connected, - frame_max = FrameMax, - send_file_oct = SendFileOct}, - State = #stream_connection_state{consumers = #{}, - blocked = false, data = none}, + {PeerHost, PeerPort, Host, Port} = + socket_op(Sock, fun(S) -> rabbit_net:socket_ends(S, inbound) end), + Connection = + #stream_connection{name = rabbit_data_coercion:to_binary(ConnStr), + host = Host, + peer_host = PeerHost, + port = Port, + peer_port = PeerPort, + connected_at = os:system_time(milli_seconds), + auth_mechanism = none, + helper_sup = KeepaliveSup, + socket = RealSocket, + publishers = #{}, + publisher_to_ids = #{}, + stream_leaders = #{}, + stream_subscriptions = #{}, + credits = Credits, + authentication_state = none, + connection_step = tcp_connected, + frame_max = FrameMax, + send_file_oct = SendFileOct}, + State = + #stream_connection_state{consumers = #{}, + blocked = false, + data = none}, Transport:setopts(RealSocket, [{active, once}]), listen_loop_pre_auth(Transport, Connection, State, - #configuration{initial_credits = - InitialCredits, - credits_required_for_unblocking - = + #configuration{initial_credits = InitialCredits, + credits_required_for_unblocking = CreditsRequiredBeforeUnblocking, frame_max = FrameMax, heartbeat = Heartbeat}); {Error, Reason} -> rabbit_net:fast_close(RealSocket), - rabbit_log:warning("Closing connection because of ~p ~p~n", - [Error, Reason]) + rabbit_log:warning("Closing connection because of ~p ~p~n", [Error, Reason]) end. socket_op(Sock, Fun) -> RealSocket = rabbit_net:unwrap_socket(Sock), case Fun(Sock) of - {ok, Res} -> Res; + {ok, Res} -> + Res; {error, Reason} -> - rabbit_log:warning("Error during socket operation ~p~n", - [Reason]), + rabbit_log:warning("Error during socket operation ~p~n", [Reason]), rabbit_net:fast_close(RealSocket), exit(normal) end. @@ -240,41 +214,29 @@ add_credits(CreditReference, Credits) -> has_credits(CreditReference) -> atomics:get(CreditReference, 1) > 0. -has_enough_credits_to_unblock(CreditReference, - CreditsRequiredForUnblocking) -> - atomics:get(CreditReference, 1) > - CreditsRequiredForUnblocking. +has_enough_credits_to_unblock(CreditReference, CreditsRequiredForUnblocking) -> + atomics:get(CreditReference, 1) > CreditsRequiredForUnblocking. listen_loop_pre_auth(Transport, - #stream_connection{socket = S} = Connection, State, - #configuration{frame_max = FrameMax, - heartbeat = Heartbeat} = - Configuration) -> + #stream_connection{socket = S} = Connection, + State, + #configuration{frame_max = FrameMax, heartbeat = Heartbeat} = Configuration) -> {OK, Closed, Error} = Transport:messages(), %% FIXME introduce timeout to complete the connection opening (after block should be enough) receive {OK, S, Data} -> - #stream_connection{connection_step = ConnectionStep0} = - Connection, + #stream_connection{connection_step = ConnectionStep0} = Connection, {Connection1, State1} = - handle_inbound_data_pre_auth(Transport, - Connection, - State, - Data), + handle_inbound_data_pre_auth(Transport, Connection, State, Data), Transport:setopts(S, [{active, once}]), - #stream_connection{connection_step = ConnectionStep} = - Connection1, - rabbit_log:info("Transitioned from ~p to ~p~n", - [ConnectionStep0, ConnectionStep]), + #stream_connection{connection_step = ConnectionStep} = Connection1, + rabbit_log:info("Transitioned from ~p to ~p~n", [ConnectionStep0, ConnectionStep]), case ConnectionStep of authenticated -> - TuneFrame = <<(?COMMAND_TUNE):16, (?VERSION_0):16, - FrameMax:32, Heartbeat:32>>, + TuneFrame = <<?COMMAND_TUNE:16, ?VERSION_0:16, FrameMax:32, Heartbeat:32>>, frame(Transport, Connection1, TuneFrame), listen_loop_pre_auth(Transport, - Connection1#stream_connection{connection_step - = - tuning}, + Connection1#stream_connection{connection_step = tuning}, State1, Configuration); opened -> @@ -282,8 +244,8 @@ listen_loop_pre_auth(Transport, % just meant to be able to close the connection remotely % should be possible once the connections are available in ctl list_connections pg_local:join(rabbit_stream_connections, self()), - Connection2 = rabbit_event:init_stats_timer(Connection1, - #stream_connection.stats_timer), + Connection2 = + rabbit_event:init_stats_timer(Connection1, #stream_connection.stats_timer), Connection3 = ensure_stats_timer(Connection2), Infos = augment_infos_with_user_provided_connection_name(infos(?CREATION_EVENT_KEYS, @@ -293,38 +255,30 @@ listen_loop_pre_auth(Transport, rabbit_core_metrics:connection_created(self(), Infos), rabbit_event:notify(connection_created, Infos), rabbit_networking:register_non_amqp_connection(self()), - listen_loop_post_auth(Transport, - Connection3, - State1, - Configuration); - failure -> close(Transport, S); + listen_loop_post_auth(Transport, Connection3, State1, Configuration); + failure -> + close(Transport, S); _ -> - listen_loop_pre_auth(Transport, - Connection1, - State1, - Configuration) + listen_loop_pre_auth(Transport, Connection1, State1, Configuration) end; {Closed, S} -> rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]), ok; {Error, S, Reason} -> - rabbit_log:info("Socket error ~p [~w]~n", - [Reason, S, self()]); + rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]); M -> rabbit_log:warning("Unknown message ~p~n", [M]), close(Transport, S) end. augment_infos_with_user_provided_connection_name(Infos, - #stream_connection{client_properties - = + #stream_connection{client_properties = ClientProperties}) -> case ClientProperties of - #{<<"connection_name">> := - UserProvidedConnectionName} -> - [{user_provided_name, UserProvidedConnectionName} - | Infos]; - _ -> Infos + #{<<"connection_name">> := UserProvidedConnectionName} -> + [{user_provided_name, UserProvidedConnectionName} | Infos]; + _ -> + Infos end. close(Transport, S) -> @@ -333,8 +287,7 @@ close(Transport, S) -> listen_loop_post_auth(Transport, #stream_connection{socket = S, - stream_subscriptions = - StreamSubscriptions, + stream_subscriptions = StreamSubscriptions, credits = Credits, heartbeater = Heartbeater, monitors = Monitors, @@ -342,9 +295,7 @@ listen_loop_post_auth(Transport, publisher_to_ids = PublisherRefToIds, send_file_oct = SendFileOct} = Connection0, - #stream_connection_state{consumers = Consumers, - blocked = Blocked} = - State, + #stream_connection_state{consumers = Consumers, blocked = Blocked} = State, #configuration{credits_required_for_unblocking = CreditsRequiredForUnblocking} = Configuration) -> @@ -353,12 +304,8 @@ listen_loop_post_auth(Transport, receive {OK, S, Data} -> {Connection1, State1} = - handle_inbound_data_post_auth(Transport, - Connection, - State, - Data), - #stream_connection{connection_step = Step} = - Connection1, + handle_inbound_data_post_auth(Transport, Connection, State, Data), + #stream_connection{connection_step = Step} = Connection1, case Step of closing -> close(Transport, S), @@ -367,291 +314,196 @@ listen_loop_post_auth(Transport, close_sent -> rabbit_log:debug("Transitioned to close_sent ~n"), Transport:setopts(S, [{active, once}]), - listen_loop_post_close(Transport, - Connection1, - State1, - Configuration); + listen_loop_post_close(Transport, Connection1, State1, Configuration); _ -> - State2 = case Blocked of - true -> - case has_enough_credits_to_unblock(Credits, - CreditsRequiredForUnblocking) - of - true -> - Transport:setopts(S, - [{active, - once}]), - ok = - rabbit_heartbeat:resume_monitor(Heartbeater), - State1#stream_connection_state{blocked - = - false}; - false -> State1 - end; - false -> - case has_credits(Credits) of - true -> - Transport:setopts(S, - [{active, - once}]), - State1; - false -> - ok = - rabbit_heartbeat:pause_monitor(Heartbeater), - State1#stream_connection_state{blocked - = - true} - end - end, - listen_loop_post_auth(Transport, - Connection1, - State2, - Configuration) + State2 = + case Blocked of + true -> + case has_enough_credits_to_unblock(Credits, + CreditsRequiredForUnblocking) + of + true -> + Transport:setopts(S, [{active, once}]), + ok = rabbit_heartbeat:resume_monitor(Heartbeater), + State1#stream_connection_state{blocked = false}; + false -> + State1 + end; + false -> + case has_credits(Credits) of + true -> + Transport:setopts(S, [{active, once}]), + State1; + false -> + ok = rabbit_heartbeat:pause_monitor(Heartbeater), + State1#stream_connection_state{blocked = true} + end + end, + listen_loop_post_auth(Transport, Connection1, State2, Configuration) end; {'DOWN', MonitorRef, process, _OsirisPid, _Reason} -> - {Connection1, State1} = case Monitors of - #{MonitorRef := Stream} -> - Monitors1 = maps:remove(MonitorRef, - Monitors), - C = - Connection#stream_connection{monitors - = - Monitors1}, - case - clean_state_after_stream_deletion_or_failure(Stream, - C, - State) - of - {cleaned, - NewConnection, - NewState} -> - StreamSize = - byte_size(Stream), - FrameSize = 2 + 2 + 2 + 2 + - StreamSize, - Transport:send(S, - [<<FrameSize:32, - (?COMMAND_METADATA_UPDATE):16, - (?VERSION_0):16, - (?RESPONSE_CODE_STREAM_NOT_AVAILABLE):16, - StreamSize:16, - Stream/binary>>]), - {NewConnection, NewState}; - {not_cleaned, - SameConnection, - SameState} -> - {SameConnection, SameState} - end; - _ -> {Connection, State} - end, - listen_loop_post_auth(Transport, - Connection1, - State1, - Configuration); + {Connection1, State1} = + case Monitors of + #{MonitorRef := Stream} -> + Monitors1 = maps:remove(MonitorRef, Monitors), + C = Connection#stream_connection{monitors = Monitors1}, + case clean_state_after_stream_deletion_or_failure(Stream, C, State) of + {cleaned, NewConnection, NewState} -> + StreamSize = byte_size(Stream), + FrameSize = 2 + 2 + 2 + 2 + StreamSize, + Transport:send(S, + [<<FrameSize:32, + ?COMMAND_METADATA_UPDATE:16, + ?VERSION_0:16, + ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16, + StreamSize:16, + Stream/binary>>]), + {NewConnection, NewState}; + {not_cleaned, SameConnection, SameState} -> + {SameConnection, SameState} + end; + _ -> + {Connection, State} + end, + listen_loop_post_auth(Transport, Connection1, State1, Configuration); {'$gen_cast', {queue_event, _QueueResource, - {osiris_written, - _QueueResource, - undefined, - CorrelationList}}} -> - {FirstPublisherId, _FirstPublishingId} = lists:nth(1, - CorrelationList), + {osiris_written, _QueueResource, undefined, CorrelationList}}} -> + {FirstPublisherId, _FirstPublishingId} = lists:nth(1, CorrelationList), {LastPublisherId, LastPublishingIds, LastCount} = - lists:foldl(fun ({PublisherId, PublishingId}, - {CurrentPublisherId, PublishingIds, Count}) -> - case PublisherId of - CurrentPublisherId -> - {CurrentPublisherId, - [PublishingIds, - <<PublishingId:64>>], - Count + 1}; - OtherPublisherId -> - FrameSize = 2 + 2 + 1 + 4 + - Count * 8, - %% FIXME enforce max frame size - %% in practice, this should be necessary only for very large chunks and for very small frame size limits - Transport:send(S, - [<<FrameSize:32, - (?COMMAND_PUBLISH_CONFIRM):16, - (?VERSION_0):16>>, - <<CurrentPublisherId:8>>, - <<Count:32>>, - PublishingIds]), - {OtherPublisherId, - <<PublishingId:64>>, - 1} - end + lists:foldl(fun({PublisherId, PublishingId}, + {CurrentPublisherId, PublishingIds, Count}) -> + case PublisherId of + CurrentPublisherId -> + {CurrentPublisherId, + [PublishingIds, <<PublishingId:64>>], + Count + 1}; + OtherPublisherId -> + FrameSize = 2 + 2 + 1 + 4 + Count * 8, + %% FIXME enforce max frame size + %% in practice, this should be necessary only for very large chunks and for very small frame size limits + Transport:send(S, + [<<FrameSize:32, + ?COMMAND_PUBLISH_CONFIRM:16, + ?VERSION_0:16>>, + <<CurrentPublisherId:8>>, + <<Count:32>>, + PublishingIds]), + {OtherPublisherId, <<PublishingId:64>>, 1} + end end, {FirstPublisherId, <<>>, 0}, CorrelationList), FrameSize = 2 + 2 + 1 + 4 + LastCount * 8, Transport:send(S, - [<<FrameSize:32, (?COMMAND_PUBLISH_CONFIRM):16, - (?VERSION_0):16>>, + [<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16>>, <<LastPublisherId:8>>, <<LastCount:32>>, LastPublishingIds]), CorrelationIdCount = length(CorrelationList), add_credits(Credits, CorrelationIdCount), - State1 = case Blocked of - true -> - case has_enough_credits_to_unblock(Credits, - CreditsRequiredForUnblocking) - of - true -> - Transport:setopts(S, [{active, once}]), - ok = - rabbit_heartbeat:resume_monitor(Heartbeater), - State#stream_connection_state{blocked = - false}; - false -> State - end; - false -> State - end, - listen_loop_post_auth(Transport, - Connection, - State1, - Configuration); + State1 = + case Blocked of + true -> + case has_enough_credits_to_unblock(Credits, CreditsRequiredForUnblocking) of + true -> + Transport:setopts(S, [{active, once}]), + ok = rabbit_heartbeat:resume_monitor(Heartbeater), + State#stream_connection_state{blocked = false}; + false -> + State + end; + false -> + State + end, + listen_loop_post_auth(Transport, Connection, State1, Configuration); {'$gen_cast', {queue_event, _QueueResource, - {osiris_written, - #resource{name = Stream}, - PublisherReference, - CorrelationList}}} -> + {osiris_written, #resource{name = Stream}, PublisherReference, CorrelationList}}} -> %% FIXME handle case when publisher ID is not found (e.g. deleted before confirms arrive) - PublisherId = maps:get({Stream, PublisherReference}, - PublisherRefToIds, - undefined), - PubIds = lists:foldl(fun (PublishingId, - PublishingIds) -> - [PublishingIds, <<PublishingId:64>>] - end, - <<>>, - CorrelationList), + PublisherId = maps:get({Stream, PublisherReference}, PublisherRefToIds, undefined), + PubIds = + lists:foldl(fun(PublishingId, PublishingIds) -> [PublishingIds, <<PublishingId:64>>] + end, + <<>>, + CorrelationList), PublishingIdCount = length(CorrelationList), FrameSize = 2 + 2 + 1 + 4 + PublishingIdCount * 8, Transport:send(S, - [<<FrameSize:32, (?COMMAND_PUBLISH_CONFIRM):16, - (?VERSION_0):16>>, + [<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16>>, <<PublisherId:8>>, <<PublishingIdCount:32>>, PubIds]), add_credits(Credits, PublishingIdCount), - State1 = case Blocked of - true -> - case has_enough_credits_to_unblock(Credits, - CreditsRequiredForUnblocking) - of - true -> - Transport:setopts(S, [{active, once}]), - ok = - rabbit_heartbeat:resume_monitor(Heartbeater), - State#stream_connection_state{blocked = - false}; - false -> State - end; - false -> State - end, - listen_loop_post_auth(Transport, - Connection, - State1, - Configuration); + State1 = + case Blocked of + true -> + case has_enough_credits_to_unblock(Credits, CreditsRequiredForUnblocking) of + true -> + Transport:setopts(S, [{active, once}]), + ok = rabbit_heartbeat:resume_monitor(Heartbeater), + State#stream_connection_state{blocked = false}; + false -> + State + end; + false -> + State + end, + listen_loop_post_auth(Transport, Connection, State1, Configuration); {'$gen_cast', - {queue_event, - #resource{name = StreamName}, - {osiris_offset, _QueueResource, -1}}} -> - rabbit_log:info("received osiris offset event for ~p " - "with offset ~p~n", + {queue_event, #resource{name = StreamName}, {osiris_offset, _QueueResource, -1}}} -> + rabbit_log:info("received osiris offset event for ~p with offset ~p~n", [StreamName, -1]), - listen_loop_post_auth(Transport, - Connection, - State, - Configuration); + listen_loop_post_auth(Transport, Connection, State, Configuration); {'$gen_cast', - {queue_event, - #resource{name = StreamName}, - {osiris_offset, _QueueResource, Offset}}} + {queue_event, #resource{name = StreamName}, {osiris_offset, _QueueResource, Offset}}} when Offset > -1 -> - {Connection1, State1} = case maps:get(StreamName, - StreamSubscriptions, - undefined) - of - undefined -> - rabbit_log:info("osiris offset event for ~p, but no subscripti" - "on (leftover messages after unsubscribe?)", - [StreamName]), - {Connection, State}; - [] -> - rabbit_log:info("osiris offset event for ~p, but no registered " - "consumers!", - [StreamName]), - {Connection#stream_connection{stream_subscriptions - = - maps:remove(StreamName, - StreamSubscriptions)}, - State}; - CorrelationIds - when is_list(CorrelationIds) -> - Consumers1 = lists:foldl(fun - (CorrelationId, - ConsumersAcc) -> - #{CorrelationId - := - Consumer} = - ConsumersAcc, - #consumer{credit - = - Credit} = - Consumer, - Consumer1 = - case - Credit - of - 0 -> - Consumer; - _ -> - {{segment, - Segment1}, - {credit, - Credit1}} = - send_chunks(Transport, - Consumer, - SendFileOct), - Consumer#consumer{segment - = - Segment1, - credit - = - Credit1} - end, - ConsumersAcc#{CorrelationId - => - Consumer1} - end, - Consumers, - CorrelationIds), - {Connection, - State#stream_connection_state{consumers - = - Consumers1}} - end, - listen_loop_post_auth(Transport, - Connection1, - State1, - Configuration); + {Connection1, State1} = + case maps:get(StreamName, StreamSubscriptions, undefined) of + undefined -> + rabbit_log:info("osiris offset event for ~p, but no subscription (leftover messages " + "after unsubscribe?)", + [StreamName]), + {Connection, State}; + [] -> + rabbit_log:info("osiris offset event for ~p, but no registered consumers!", + [StreamName]), + {Connection#stream_connection{stream_subscriptions = + maps:remove(StreamName, + StreamSubscriptions)}, + State}; + CorrelationIds when is_list(CorrelationIds) -> + Consumers1 = + lists:foldl(fun(CorrelationId, ConsumersAcc) -> + #{CorrelationId := Consumer} = ConsumersAcc, + #consumer{credit = Credit} = Consumer, + Consumer1 = + case Credit of + 0 -> Consumer; + _ -> + {{segment, Segment1}, {credit, Credit1}} = + send_chunks(Transport, + Consumer, + SendFileOct), + Consumer#consumer{segment = Segment1, + credit = Credit1} + end, + ConsumersAcc#{CorrelationId => Consumer1} + end, + Consumers, + CorrelationIds), + {Connection, State#stream_connection_state{consumers = Consumers1}} + end, + listen_loop_post_auth(Transport, Connection1, State1, Configuration); heartbeat_send -> - Frame = <<(?COMMAND_HEARTBEAT):16, (?VERSION_0):16>>, + Frame = <<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>, case catch frame(Transport, Connection, Frame) of ok -> - listen_loop_post_auth(Transport, - Connection, - State, - Configuration); + listen_loop_post_auth(Transport, Connection, State, Configuration); Unexpected -> - rabbit_log:info("Heartbeat send error ~p, closing connection~n", - [Unexpected]), + rabbit_log:info("Heartbeat send error ~p, closing connection~n", [Unexpected]), C1 = demonitor_all_streams(Connection), close(Transport, C1) end; @@ -661,29 +513,16 @@ listen_loop_post_auth(Transport, close(Transport, C1); {infos, From} -> From ! {self(), ClientProperties}, - listen_loop_post_auth(Transport, - Connection, - State, - Configuration); + listen_loop_post_auth(Transport, Connection, State, Configuration); {'$gen_call', From, info} -> - gen_server:reply(From, - infos(?INFO_ITEMS, Connection, State)), - listen_loop_post_auth(Transport, - Connection, - State, - Configuration); + gen_server:reply(From, infos(?INFO_ITEMS, Connection, State)), + listen_loop_post_auth(Transport, Connection, State, Configuration); {'$gen_call', From, {info, Items}} -> gen_server:reply(From, infos(Items, Connection, State)), - listen_loop_post_auth(Transport, - Connection, - State, - Configuration); + listen_loop_post_auth(Transport, Connection, State, Configuration); emit_stats -> Connection1 = emit_stats(Connection, State), - listen_loop_post_auth(Transport, - Connection1, - State, - Configuration); + listen_loop_post_auth(Transport, Connection1, State, Configuration); {'$gen_cast', {force_event_refresh, Ref}} -> Infos = augment_infos_with_user_provided_connection_name(infos(?CREATION_EVENT_KEYS, @@ -691,18 +530,12 @@ listen_loop_post_auth(Transport, State), Connection), rabbit_event:notify(connection_created, Infos, Ref), - Connection1 = rabbit_event:init_stats_timer(Connection, - #stream_connection.stats_timer), - listen_loop_post_auth(Transport, - Connection1, - State, - Configuration); + Connection1 = rabbit_event:init_stats_timer(Connection, #stream_connection.stats_timer), + listen_loop_post_auth(Transport, Connection1, State, Configuration); {'$gen_call', From, {shutdown, Explanation}} -> % likely closing call from the management plugin gen_server:reply(From, ok), - rabbit_log:info("Forcing stream connection ~p closing: " - "~p~n", - [self(), Explanation]), + rabbit_log:info("Forcing stream connection ~p closing: ~p~n", [self(), Explanation]), demonitor_all_streams(Connection), rabbit_networking:unregister_non_amqp_connection(self()), notify_connection_closed(Connection, State), @@ -718,19 +551,16 @@ listen_loop_post_auth(Transport, demonitor_all_streams(Connection), rabbit_networking:unregister_non_amqp_connection(self()), notify_connection_closed(Connection, State), - rabbit_log:info("Socket error ~p [~w]~n", - [Reason, S, self()]); + rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]); M -> rabbit_log:warning("Unknown message ~p~n", [M]), %% FIXME send close - listen_loop_post_auth(Transport, - Connection, - State, - Configuration) + listen_loop_post_auth(Transport, Connection, State, Configuration) end. listen_loop_post_close(Transport, - #stream_connection{socket = S} = Connection, State, + #stream_connection{socket = S} = Connection, + State, Configuration) -> {OK, Closed, Error} = Transport:messages(), %% FIXME demonitor streams @@ -739,12 +569,8 @@ listen_loop_post_close(Transport, {OK, S, Data} -> Transport:setopts(S, [{active, once}]), {Connection1, State1} = - handle_inbound_data_post_close(Transport, - Connection, - State, - Data), - #stream_connection{connection_step = Step} = - Connection1, + handle_inbound_data_post_close(Transport, Connection, State, Data), + #stream_connection{connection_step = Step} = Connection1, case Step of closing_done -> rabbit_log:debug("Received close confirmation from client"), @@ -753,10 +579,7 @@ listen_loop_post_close(Transport, notify_connection_closed(Connection1, State1); _ -> Transport:setopts(S, [{active, once}]), - listen_loop_post_close(Transport, - Connection1, - State1, - Configuration) + listen_loop_post_close(Transport, Connection1, State1, Configuration) end; {Closed, S} -> rabbit_networking:unregister_non_amqp_connection(self()), @@ -764,42 +587,24 @@ listen_loop_post_close(Transport, rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]), ok; {Error, S, Reason} -> - rabbit_log:info("Socket error ~p [~w]~n", - [Reason, S, self()]), + rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]), close(Transport, S), rabbit_networking:unregister_non_amqp_connection(self()), notify_connection_closed(Connection, State); M -> - rabbit_log:warning("Ignored message on closing ~p~n", - [M]) + rabbit_log:warning("Ignored message on closing ~p~n", [M]) end. -handle_inbound_data_pre_auth(Transport, Connection, - State, Rest) -> - handle_inbound_data(Transport, - Connection, - State, - Rest, - fun handle_frame_pre_auth/5). +handle_inbound_data_pre_auth(Transport, Connection, State, Rest) -> + handle_inbound_data(Transport, Connection, State, Rest, fun handle_frame_pre_auth/5). -handle_inbound_data_post_auth(Transport, Connection, - State, Rest) -> - handle_inbound_data(Transport, - Connection, - State, - Rest, - fun handle_frame_post_auth/5). +handle_inbound_data_post_auth(Transport, Connection, State, Rest) -> + handle_inbound_data(Transport, Connection, State, Rest, fun handle_frame_post_auth/5). -handle_inbound_data_post_close(Transport, Connection, - State, Rest) -> - handle_inbound_data(Transport, - Connection, - State, - Rest, - fun handle_frame_post_close/5). +handle_inbound_data_post_close(Transport, Connection, State, Rest) -> + handle_inbound_data(Transport, Connection, State, Rest, fun handle_frame_post_close/5). -handle_inbound_data(_Transport, Connection, State, <<>>, - _HandleFrameFun) -> +handle_inbound_data(_Transport, Connection, State, <<>>, _HandleFrameFun) -> {Connection, State}; handle_inbound_data(Transport, #stream_connection{frame_max = FrameMax} = Connection, @@ -809,35 +614,32 @@ handle_inbound_data(Transport, when FrameMax /= 0 andalso Size > FrameMax - 4 -> CloseReason = <<"frame too large">>, CloseReasonLength = byte_size(CloseReason), - CloseFrame = <<(?COMMAND_CLOSE):16, (?VERSION_0):16, - 1:32, (?RESPONSE_CODE_FRAME_TOO_LARGE):16, - CloseReasonLength:16, - CloseReason:CloseReasonLength/binary>>, + CloseFrame = + <<?COMMAND_CLOSE:16, + ?VERSION_0:16, + 1:32, + ?RESPONSE_CODE_FRAME_TOO_LARGE:16, + CloseReasonLength:16, + CloseReason:CloseReasonLength/binary>>, frame(Transport, Connection, CloseFrame), - {Connection#stream_connection{connection_step = - close_sent}, - State}; -handle_inbound_data(Transport, Connection, + {Connection#stream_connection{connection_step = close_sent}, State}; +handle_inbound_data(Transport, + Connection, #stream_connection_state{data = none} = State, <<Size:32, Frame:Size/binary, Rest/bits>>, HandleFrameFun) -> - {Connection1, State1, Rest1} = HandleFrameFun(Transport, - Connection, - State, - Frame, - Rest), - handle_inbound_data(Transport, - Connection1, - State1, - Rest1, - HandleFrameFun); -handle_inbound_data(_Transport, Connection, - #stream_connection_state{data = none} = State, Data, + {Connection1, State1, Rest1} = HandleFrameFun(Transport, Connection, State, Frame, Rest), + handle_inbound_data(Transport, Connection1, State1, Rest1, HandleFrameFun); +handle_inbound_data(_Transport, + Connection, + #stream_connection_state{data = none} = State, + Data, _HandleFrameFun) -> - {Connection, - State#stream_connection_state{data = Data}}; -handle_inbound_data(Transport, Connection, - #stream_connection_state{data = Leftover} = State, Data, + {Connection, State#stream_connection_state{data = Data}}; +handle_inbound_data(Transport, + Connection, + #stream_connection_state{data = Leftover} = State, + Data, HandleFrameFun) -> State1 = State#stream_connection_state{data = none}, %% FIXME avoid concatenation to avoid a new binary allocation @@ -850,92 +652,93 @@ handle_inbound_data(Transport, Connection, generate_publishing_error_details(Acc, _Code, <<>>) -> Acc; -generate_publishing_error_details(Acc, Code, - <<PublishingId:64, MessageSize:32, +generate_publishing_error_details(Acc, + Code, + <<PublishingId:64, + MessageSize:32, _Message:MessageSize/binary, Rest/binary>>) -> - generate_publishing_error_details(<<Acc/binary, - PublishingId:64, Code:16>>, - Code, - Rest). + generate_publishing_error_details(<<Acc/binary, PublishingId:64, Code:16>>, Code, Rest). handle_frame_pre_auth(Transport, - #stream_connection{socket = S} = Connection, State, - <<(?COMMAND_PEER_PROPERTIES):16, (?VERSION_0):16, - CorrelationId:32, ClientPropertiesCount:32, + #stream_connection{socket = S} = Connection, + State, + <<?COMMAND_PEER_PROPERTIES:16, + ?VERSION_0:16, + CorrelationId:32, + ClientPropertiesCount:32, ClientPropertiesFrame/binary>>, Rest) -> {ClientProperties, _} = - rabbit_stream_utils:parse_map(ClientPropertiesFrame, - ClientPropertiesCount), - {ok, Product} = application:get_key(rabbit, - description), + rabbit_stream_utils:parse_map(ClientPropertiesFrame, ClientPropertiesCount), + {ok, Product} = application:get_key(rabbit, description), {ok, Version} = application:get_key(rabbit, vsn), %% Get any configuration-specified server properties - RawConfigServerProps = application:get_env(rabbit, - server_properties, - []), - ConfigServerProperties = lists:foldl(fun ({K, V}, - Acc) -> - maps:put(rabbit_data_coercion:to_binary(K), - V, - Acc) - end, - #{}, - RawConfigServerProps), - ServerProperties = maps:merge(ConfigServerProperties, - #{<<"product">> => Product, - <<"version">> => Version, - <<"cluster_name">> => - rabbit_nodes:cluster_name(), - <<"platform">> => - rabbit_misc:platform_and_version(), - <<"copyright">> => ?COPYRIGHT_MESSAGE, - <<"information">> => ?INFORMATION_MESSAGE}), + RawConfigServerProps = application:get_env(rabbit, server_properties, []), + ConfigServerProperties = + lists:foldl(fun({K, V}, Acc) -> + maps:put( + rabbit_data_coercion:to_binary(K), V, Acc) + end, + #{}, + RawConfigServerProps), + ServerProperties = + maps:merge(ConfigServerProperties, + #{<<"product">> => Product, + <<"version">> => Version, + <<"cluster_name">> => rabbit_nodes:cluster_name(), + <<"platform">> => rabbit_misc:platform_and_version(), + <<"copyright">> => ?COPYRIGHT_MESSAGE, + <<"information">> => ?INFORMATION_MESSAGE}), ServerPropertiesCount = map_size(ServerProperties), - ServerPropertiesFragment = maps:fold(fun (K, V, Acc) -> - Key = - rabbit_data_coercion:to_binary(K), - Value = - rabbit_data_coercion:to_binary(V), - KeySize = byte_size(Key), - ValueSize = byte_size(Value), - <<Acc/binary, KeySize:16, - Key:KeySize/binary, - ValueSize:16, - Value:ValueSize/binary>> - end, - <<>>, - ServerProperties), - Frame = <<(?COMMAND_PEER_PROPERTIES):16, - (?VERSION_0):16, CorrelationId:32, - (?RESPONSE_CODE_OK):16, ServerPropertiesCount:32, - ServerPropertiesFragment/binary>>, + ServerPropertiesFragment = + maps:fold(fun(K, V, Acc) -> + Key = rabbit_data_coercion:to_binary(K), + Value = rabbit_data_coercion:to_binary(V), + KeySize = byte_size(Key), + ValueSize = byte_size(Value), + <<Acc/binary, + KeySize:16, + Key:KeySize/binary, + ValueSize:16, + Value:ValueSize/binary>> + end, + <<>>, + ServerProperties), + Frame = + <<?COMMAND_PEER_PROPERTIES:16, + ?VERSION_0:16, + CorrelationId:32, + ?RESPONSE_CODE_OK:16, + ServerPropertiesCount:32, + ServerPropertiesFragment/binary>>, FrameSize = byte_size(Frame), Transport:send(S, [<<FrameSize:32>>, <<Frame/binary>>]), - {Connection#stream_connection{client_properties = - ClientProperties, - authentication_state = - peer_properties_exchanged}, + {Connection#stream_connection{client_properties = ClientProperties, + authentication_state = peer_properties_exchanged}, State, Rest}; handle_frame_pre_auth(Transport, - #stream_connection{socket = S} = Connection, State, - <<(?COMMAND_SASL_HANDSHAKE):16, (?VERSION_0):16, - CorrelationId:32>>, + #stream_connection{socket = S} = Connection, + State, + <<?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16, CorrelationId:32>>, Rest) -> Mechanisms = rabbit_stream_utils:auth_mechanisms(S), - MechanismsFragment = lists:foldl(fun (M, Acc) -> - Size = byte_size(M), - <<Acc/binary, Size:16, - M:Size/binary>> - end, - <<>>, - Mechanisms), + MechanismsFragment = + lists:foldl(fun(M, Acc) -> + Size = byte_size(M), + <<Acc/binary, Size:16, M:Size/binary>> + end, + <<>>, + Mechanisms), MechanismsCount = length(Mechanisms), - Frame = <<(?COMMAND_SASL_HANDSHAKE):16, (?VERSION_0):16, - CorrelationId:32, (?RESPONSE_CODE_OK):16, - MechanismsCount:32, MechanismsFragment/binary>>, + Frame = + <<?COMMAND_SASL_HANDSHAKE:16, + ?VERSION_0:16, + CorrelationId:32, + ?RESPONSE_CODE_OK:16, + MechanismsCount:32, + MechanismsFragment/binary>>, FrameSize = byte_size(Frame), Transport:send(S, [<<FrameSize:32>>, <<Frame/binary>>]), {Connection, State, Rest}; @@ -945,278 +748,223 @@ handle_frame_pre_auth(Transport, host = Host} = Connection0, State, - <<(?COMMAND_SASL_AUTHENTICATE):16, (?VERSION_0):16, - CorrelationId:32, MechanismLength:16, - Mechanism:MechanismLength/binary, SaslFragment/binary>>, + <<?COMMAND_SASL_AUTHENTICATE:16, + ?VERSION_0:16, + CorrelationId:32, + MechanismLength:16, + Mechanism:MechanismLength/binary, + SaslFragment/binary>>, Rest) -> - SaslBin = case SaslFragment of - <<(-1):32/signed>> -> <<>>; - <<SaslBinaryLength:32, - SaslBinary:SaslBinaryLength/binary>> -> - SaslBinary - end, - {Connection1, Rest1} = case - rabbit_stream_utils:auth_mechanism_to_module(Mechanism, - S) - of - {ok, AuthMechanism} -> - AuthState = case AuthState0 of - none -> - AuthMechanism:init(S); - AS -> AS - end, - RemoteAddress = - list_to_binary(inet:ntoa(Host)), - C1 = - Connection0#stream_connection{auth_mechanism - = - {Mechanism, - AuthMechanism}}, - {C2, FrameFragment} = case - AuthMechanism:handle_response(SaslBin, - AuthState) - of - {refused, - Username, - Msg, - Args} -> - rabbit_core_metrics:auth_attempt_failed(RemoteAddress, - Username, - stream), - auth_fail(Username, - Msg, - Args, - C1, - State), - rabbit_log:warning(Msg, - Args), - {C1#stream_connection{connection_step - = - failure}, - <<(?RESPONSE_AUTHENTICATION_FAILURE):16>>}; - {protocol_error, - Msg, - Args} -> - rabbit_core_metrics:auth_attempt_failed(RemoteAddress, - <<>>, - stream), - notify_auth_result(none, - user_authentication_failure, - [{error, - rabbit_misc:format(Msg, - Args)}], - C1, - State), - rabbit_log:warning(Msg, - Args), - {C1#stream_connection{connection_step - = - failure}, - <<(?RESPONSE_SASL_ERROR):16>>}; - {challenge, - Challenge, - AuthState1} -> - rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, - <<>>, - stream), - ChallengeSize = - byte_size(Challenge), - {C1#stream_connection{authentication_state - = - AuthState1, - connection_step - = - authenticating}, - <<(?RESPONSE_SASL_CHALLENGE):16, - ChallengeSize:32, - Challenge/binary>>}; - {ok, - User = - #user{username - = - Username}} -> - case - rabbit_access_control:check_user_loopback(Username, - S) - of - ok -> - rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, - Username, - stream), - notify_auth_result(Username, - user_authentication_success, - [], - C1, - State), - {C1#stream_connection{authentication_state - = - done, - user - = - User, - connection_step - = - authenticated}, - <<(?RESPONSE_CODE_OK):16>>}; - not_allowed -> - rabbit_core_metrics:auth_attempt_failed(RemoteAddress, - Username, - stream), - rabbit_log:warning("User '~s' can only connect via localhost~n", - [Username]), - {C1#stream_connection{connection_step - = - failure}, - <<(?RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK):16>>} - end - end, - Frame = <<(?COMMAND_SASL_AUTHENTICATE):16, - (?VERSION_0):16, CorrelationId:32, - FrameFragment/binary>>, - frame(Transport, C1, Frame), - {C2, Rest}; - {error, _} -> - Frame = <<(?COMMAND_SASL_AUTHENTICATE):16, - (?VERSION_0):16, CorrelationId:32, - (?RESPONSE_SASL_MECHANISM_NOT_SUPPORTED):16>>, - frame(Transport, Connection0, Frame), - {Connection0#stream_connection{connection_step - = - failure}, - Rest} - end, + SaslBin = + case SaslFragment of + <<(-1):32/signed>> -> + <<>>; + <<SaslBinaryLength:32, SaslBinary:SaslBinaryLength/binary>> -> + SaslBinary + end, + {Connection1, Rest1} = + case rabbit_stream_utils:auth_mechanism_to_module(Mechanism, S) of + {ok, AuthMechanism} -> + AuthState = + case AuthState0 of + none -> + AuthMechanism:init(S); + AS -> + AS + end, + RemoteAddress = list_to_binary(inet:ntoa(Host)), + C1 = Connection0#stream_connection{auth_mechanism = {Mechanism, AuthMechanism}}, + {C2, FrameFragment} = + case AuthMechanism:handle_response(SaslBin, AuthState) of + {refused, Username, Msg, Args} -> + rabbit_core_metrics:auth_attempt_failed(RemoteAddress, + Username, + stream), + auth_fail(Username, Msg, Args, C1, State), + rabbit_log:warning(Msg, Args), + {C1#stream_connection{connection_step = failure}, + <<?RESPONSE_AUTHENTICATION_FAILURE:16>>}; + {protocol_error, Msg, Args} -> + rabbit_core_metrics:auth_attempt_failed(RemoteAddress, <<>>, stream), + notify_auth_result(none, + user_authentication_failure, + [{error, rabbit_misc:format(Msg, Args)}], + C1, + State), + rabbit_log:warning(Msg, Args), + {C1#stream_connection{connection_step = failure}, + <<?RESPONSE_SASL_ERROR:16>>}; + {challenge, Challenge, AuthState1} -> + rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, <<>>, stream), + ChallengeSize = byte_size(Challenge), + {C1#stream_connection{authentication_state = AuthState1, + connection_step = authenticating}, + <<?RESPONSE_SASL_CHALLENGE:16, ChallengeSize:32, Challenge/binary>>}; + {ok, User = #user{username = Username}} -> + case rabbit_access_control:check_user_loopback(Username, S) of + ok -> + rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, + Username, + stream), + notify_auth_result(Username, + user_authentication_success, + [], + C1, + State), + {C1#stream_connection{authentication_state = done, + user = User, + connection_step = authenticated}, + <<?RESPONSE_CODE_OK:16>>}; + not_allowed -> + rabbit_core_metrics:auth_attempt_failed(RemoteAddress, + Username, + stream), + rabbit_log:warning("User '~s' can only connect via localhost~n", + [Username]), + {C1#stream_connection{connection_step = failure}, + <<?RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK:16>>} + end + end, + Frame = + <<?COMMAND_SASL_AUTHENTICATE:16, + ?VERSION_0:16, + CorrelationId:32, + FrameFragment/binary>>, + frame(Transport, C1, Frame), + {C2, Rest}; + {error, _} -> + Frame = + <<?COMMAND_SASL_AUTHENTICATE:16, + ?VERSION_0:16, + CorrelationId:32, + ?RESPONSE_SASL_MECHANISM_NOT_SUPPORTED:16>>, + frame(Transport, Connection0, Frame), + {Connection0#stream_connection{connection_step = failure}, Rest} + end, {Connection1, State, Rest1}; handle_frame_pre_auth(_Transport, - #stream_connection{helper_sup = SupPid, socket = Sock, + #stream_connection{helper_sup = SupPid, + socket = Sock, name = ConnectionName} = Connection, State, - <<(?COMMAND_TUNE):16, (?VERSION_0):16, FrameMax:32, - Heartbeat:32>>, + <<?COMMAND_TUNE:16, ?VERSION_0:16, FrameMax:32, Heartbeat:32>>, Rest) -> - rabbit_log:info("Tuning response ~p ~p ~n", - [FrameMax, Heartbeat]), + rabbit_log:info("Tuning response ~p ~p ~n", [FrameMax, Heartbeat]), Parent = self(), %% sending a message to the main process so the heartbeat frame is sent from this main process %% otherwise heartbeat frames can interleave with chunk delivery %% (chunk delivery is made of 2 calls on the socket, one for the header and one send_file for the chunk, %% we don't want a heartbeat frame to sneak in in-between) - SendFun = fun () -> - Parent ! heartbeat_send, - ok - end, - ReceiveFun = fun () -> Parent ! heartbeat_timeout end, - Heartbeater = rabbit_heartbeat:start(SupPid, - Sock, - ConnectionName, - Heartbeat, - SendFun, - Heartbeat, - ReceiveFun), + SendFun = + fun() -> + Parent ! heartbeat_send, + ok + end, + ReceiveFun = fun() -> Parent ! heartbeat_timeout end, + Heartbeater = + rabbit_heartbeat:start(SupPid, + Sock, + ConnectionName, + Heartbeat, + SendFun, + Heartbeat, + ReceiveFun), {Connection#stream_connection{connection_step = tuned, - frame_max = FrameMax, heartbeat = Heartbeat, + frame_max = FrameMax, + heartbeat = Heartbeat, heartbeater = Heartbeater}, State, Rest}; handle_frame_pre_auth(Transport, - #stream_connection{user = User, socket = S} = - Connection, + #stream_connection{user = User, socket = S} = Connection, State, - <<(?COMMAND_OPEN):16, (?VERSION_0):16, CorrelationId:32, + <<?COMMAND_OPEN:16, + ?VERSION_0:16, + CorrelationId:32, VirtualHostLength:16, VirtualHost:VirtualHostLength/binary>>, Rest) -> %% FIXME enforce connection limit (see rabbit_reader:is_over_connection_limit/2) - {Connection1, Frame} = try - rabbit_access_control:check_vhost_access(User, - VirtualHost, - {socket, - S}, - #{}), - F = <<(?COMMAND_OPEN):16, (?VERSION_0):16, - CorrelationId:32, (?RESPONSE_CODE_OK):16>>, - %% FIXME check if vhost is alive (see rabbit_reader:is_vhost_alive/2) - {Connection#stream_connection{connection_step = - opened, - virtual_host = - VirtualHost}, - F} - catch - exit:_ -> - Fr = <<(?COMMAND_OPEN):16, (?VERSION_0):16, - CorrelationId:32, - (?RESPONSE_VHOST_ACCESS_FAILURE):16>>, - {Connection#stream_connection{connection_step - = failure}, - Fr} - end, + {Connection1, Frame} = + try + rabbit_access_control:check_vhost_access(User, VirtualHost, {socket, S}, #{}), + F = <<?COMMAND_OPEN:16, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_CODE_OK:16>>, + %% FIXME check if vhost is alive (see rabbit_reader:is_vhost_alive/2) + {Connection#stream_connection{connection_step = opened, virtual_host = VirtualHost}, F} + catch + exit:_ -> + Fr = <<?COMMAND_OPEN:16, + ?VERSION_0:16, + CorrelationId:32, + ?RESPONSE_VHOST_ACCESS_FAILURE:16>>, + {Connection#stream_connection{connection_step = failure}, Fr} + end, frame(Transport, Connection1, Frame), {Connection1, State, Rest}; -handle_frame_pre_auth(_Transport, Connection, State, - <<(?COMMAND_HEARTBEAT):16, (?VERSION_0):16>>, Rest) -> +handle_frame_pre_auth(_Transport, + Connection, + State, + <<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>, + Rest) -> rabbit_log:info("Received heartbeat frame pre auth~n"), {Connection, State, Rest}; -handle_frame_pre_auth(_Transport, Connection, State, - Frame, Rest) -> - rabbit_log:warning("unknown frame ~p ~p, closing connection.~n", - [Frame, Rest]), - {Connection#stream_connection{connection_step = - failure}, - State, - Rest}. +handle_frame_pre_auth(_Transport, Connection, State, Frame, Rest) -> + rabbit_log:warning("unknown frame ~p ~p, closing connection.~n", [Frame, Rest]), + {Connection#stream_connection{connection_step = failure}, State, Rest}. -auth_fail(Username, Msg, Args, Connection, - ConnectionState) -> +auth_fail(Username, Msg, Args, Connection, ConnectionState) -> notify_auth_result(Username, user_authentication_failure, [{error, rabbit_misc:format(Msg, Args)}], Connection, ConnectionState). -notify_auth_result(Username, AuthResult, ExtraProps, - Connection, ConnectionState) -> - EventProps = [{connection_type, network}, - {name, - case Username of - none -> ''; - _ -> Username - end}] - ++ - [case Item of - name -> - {connection_name, - i(name, Connection, ConnectionState)}; - _ -> {Item, i(Item, Connection, ConnectionState)} - end - || Item <- ?AUTH_NOTIFICATION_INFO_KEYS] - ++ ExtraProps, - rabbit_event:notify(AuthResult, - [P || {_, V} = P <- EventProps, V =/= '']). +notify_auth_result(Username, AuthResult, ExtraProps, Connection, ConnectionState) -> + EventProps = + [{connection_type, network}, + {name, + case Username of + none -> + ''; + _ -> + Username + end}] + ++ [case Item of + name -> + {connection_name, i(name, Connection, ConnectionState)}; + _ -> + {Item, i(Item, Connection, ConnectionState)} + end + || Item <- ?AUTH_NOTIFICATION_INFO_KEYS] + ++ ExtraProps, + rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']). handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost, - user = User, publishers = Publishers0, + user = User, + publishers = Publishers0, publisher_to_ids = RefIds0} = Connection0, State, - <<(?COMMAND_DECLARE_PUBLISHER):16, (?VERSION_0):16, - CorrelationId:32, PublisherId:8, ReferenceSize:16, - Reference:ReferenceSize/binary, StreamSize:16, + <<?COMMAND_DECLARE_PUBLISHER:16, + ?VERSION_0:16, + CorrelationId:32, + PublisherId:8, + ReferenceSize:16, + Reference:ReferenceSize/binary, + StreamSize:16, Stream:StreamSize/binary>>, Rest) -> - case - rabbit_stream_utils:check_write_permitted(#resource{name - = Stream, - kind = queue, - virtual_host = - VirtualHost}, - User, - #{}) - of + case rabbit_stream_utils:check_write_permitted(#resource{name = Stream, + kind = queue, + virtual_host = VirtualHost}, + User, + #{}) + of ok -> - case {maps:is_key(PublisherId, Publishers0), - maps:is_key({Stream, Reference}, RefIds0)} - of + case {maps:is_key(PublisherId, Publishers0), maps:is_key({Stream, Reference}, RefIds0)} + of {false, false} -> case lookup_leader(Stream, Connection0) of cluster_not_found -> @@ -1227,36 +975,29 @@ handle_frame_post_auth(Transport, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST), {Connection0, State, Rest}; {ClusterLeader, - #stream_connection{publishers = Publishers0, - publisher_to_ids = RefIds0} = + #stream_connection{publishers = Publishers0, publisher_to_ids = RefIds0} = Connection1} -> - {PublisherReference, RefIds1} = case Reference of - <<"">> -> - {undefined, - RefIds0}; - _ -> - {Reference, - RefIds0#{{Stream, - Reference} - => - PublisherId}} - end, - Publisher = #publisher{publisher_id = PublisherId, - stream = Stream, - reference = - PublisherReference, - leader = ClusterLeader}, + {PublisherReference, RefIds1} = + case Reference of + <<"">> -> + {undefined, RefIds0}; + _ -> + {Reference, RefIds0#{{Stream, Reference} => PublisherId}} + end, + Publisher = + #publisher{publisher_id = PublisherId, + stream = Stream, + reference = PublisherReference, + leader = ClusterLeader}, response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_OK), {Connection1#stream_connection{publishers = - Publishers0#{PublisherId - => + Publishers0#{PublisherId => Publisher}, - publisher_to_ids = - RefIds1}, + publisher_to_ids = RefIds1}, State, Rest} end; @@ -1277,25 +1018,21 @@ handle_frame_post_auth(Transport, {Connection0, State, Rest} end; handle_frame_post_auth(Transport, - #stream_connection{publishers = Publishers, - publisher_to_ids = PubToIds} = + #stream_connection{publishers = Publishers, publisher_to_ids = PubToIds} = Connection0, State, - <<(?COMMAND_DELETE_PUBLISHER):16, (?VERSION_0):16, - CorrelationId:32, PublisherId:8>>, + <<?COMMAND_DELETE_PUBLISHER:16, + ?VERSION_0:16, + CorrelationId:32, + PublisherId:8>>, Rest) -> case Publishers of - #{PublisherId := - #publisher{stream = Stream, reference = Ref}} -> - Connection1 = Connection0#stream_connection{publishers = - maps:remove(PublisherId, - Publishers), - publisher_to_ids = - maps:remove({Stream, - Ref}, - PubToIds)}, - Connection2 = maybe_clean_connection_from_stream(Stream, - Connection1), + #{PublisherId := #publisher{stream = Stream, reference = Ref}} -> + Connection1 = + Connection0#stream_connection{publishers = maps:remove(PublisherId, Publishers), + publisher_to_ids = + maps:remove({Stream, Ref}, PubToIds)}, + Connection2 = maybe_clean_connection_from_stream(Stream, Connection1), response(Transport, Connection1, ?COMMAND_DELETE_PUBLISHER, @@ -1311,91 +1048,91 @@ handle_frame_post_auth(Transport, {Connection0, State, Rest} end; handle_frame_post_auth(Transport, - #stream_connection{socket = S, credits = Credits, + #stream_connection{socket = S, + credits = Credits, virtual_host = VirtualHost, user = User, publishers = Publishers} = Connection, State, - <<(?COMMAND_PUBLISH):16, (?VERSION_0):16, - PublisherId:8/unsigned, MessageCount:32, + <<?COMMAND_PUBLISH:16, + ?VERSION_0:16, + PublisherId:8/unsigned, + MessageCount:32, Messages/binary>>, Rest) -> case Publishers of #{PublisherId := Publisher} -> - #publisher{stream = Stream, reference = Reference, + #publisher{stream = Stream, + reference = Reference, leader = Leader} = Publisher, - case - rabbit_stream_utils:check_write_permitted(#resource{name - = - Stream, - kind = - queue, - virtual_host - = - VirtualHost}, - User, - #{}) - of + case rabbit_stream_utils:check_write_permitted(#resource{name = Stream, + kind = queue, + virtual_host = VirtualHost}, + User, + #{}) + of ok -> - rabbit_stream_utils:write_messages(Leader, - Reference, - PublisherId, - Messages), + rabbit_stream_utils:write_messages(Leader, Reference, PublisherId, Messages), sub_credits(Credits, MessageCount), {Connection, State, Rest}; error -> FrameSize = 2 + 2 + 1 + 4 + (8 + 2) * MessageCount, - Details = generate_publishing_error_details(<<>>, - ?RESPONSE_CODE_ACCESS_REFUSED, - Messages), + Details = + generate_publishing_error_details(<<>>, + ?RESPONSE_CODE_ACCESS_REFUSED, + Messages), Transport:send(S, - [<<FrameSize:32, (?COMMAND_PUBLISH_ERROR):16, - (?VERSION_0):16, PublisherId:8, - MessageCount:32, Details/binary>>]), + [<<FrameSize:32, + ?COMMAND_PUBLISH_ERROR:16, + ?VERSION_0:16, + PublisherId:8, + MessageCount:32, + Details/binary>>]), {Connection, State, Rest} end; _ -> FrameSize = 2 + 2 + 1 + 4 + (8 + 2) * MessageCount, - Details = generate_publishing_error_details(<<>>, - ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST, - Messages), + Details = + generate_publishing_error_details(<<>>, + ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST, + Messages), Transport:send(S, - [<<FrameSize:32, (?COMMAND_PUBLISH_ERROR):16, - (?VERSION_0):16, PublisherId:8, MessageCount:32, + [<<FrameSize:32, + ?COMMAND_PUBLISH_ERROR:16, + ?VERSION_0:16, + PublisherId:8, + MessageCount:32, Details/binary>>]), {Connection, State, Rest} end; handle_frame_post_auth(Transport, #stream_connection{socket = Socket, - stream_subscriptions = - StreamSubscriptions, + stream_subscriptions = StreamSubscriptions, virtual_host = VirtualHost, user = User, send_file_oct = SendFileOct} = Connection, #stream_connection_state{consumers = Consumers} = State, - <<(?COMMAND_SUBSCRIBE):16, (?VERSION_0):16, - CorrelationId:32, SubscriptionId:8/unsigned, - StreamSize:16, Stream:StreamSize/binary, - OffsetType:16/signed, OffsetAndCredit/binary>>, + <<?COMMAND_SUBSCRIBE:16, + ?VERSION_0:16, + CorrelationId:32, + SubscriptionId:8/unsigned, + StreamSize:16, + Stream:StreamSize/binary, + OffsetType:16/signed, + OffsetAndCredit/binary>>, Rest) -> %% FIXME check the max number of subs is not reached already - case - rabbit_stream_utils:check_read_permitted(#resource{name - = Stream, - kind = queue, - virtual_host = - VirtualHost}, - User, - #{}) - of + case rabbit_stream_utils:check_read_permitted(#resource{name = Stream, + kind = queue, + virtual_host = VirtualHost}, + User, + #{}) + of ok -> - case - rabbit_stream_manager:lookup_local_member(VirtualHost, - Stream) - of + case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of {error, not_available} -> response(Transport, Connection, @@ -1411,9 +1148,7 @@ handle_frame_post_auth(Transport, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST), {Connection, State, Rest}; {ok, LocalMemberPid} -> - case subscription_exists(StreamSubscriptions, - SubscriptionId) - of + case subscription_exists(StreamSubscriptions, SubscriptionId) of true -> response(Transport, Connection, @@ -1422,79 +1157,53 @@ handle_frame_post_auth(Transport, ?RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS), {Connection, State, Rest}; false -> - {OffsetSpec, Credit} = case OffsetType of - ?OFFSET_TYPE_FIRST -> - <<Crdt:16>> = - OffsetAndCredit, - {first, Crdt}; - ?OFFSET_TYPE_LAST -> - <<Crdt:16>> = - OffsetAndCredit, - {last, Crdt}; - ?OFFSET_TYPE_NEXT -> - <<Crdt:16>> = - OffsetAndCredit, - {next, Crdt}; - ?OFFSET_TYPE_OFFSET -> - <<Offset:64/unsigned, - Crdt:16>> = - OffsetAndCredit, - {Offset, Crdt}; - ?OFFSET_TYPE_TIMESTAMP -> - <<Timestamp:64/signed, - Crdt:16>> = - OffsetAndCredit, - {{timestamp, - Timestamp}, - Crdt} - end, - {ok, Segment} = osiris:init_reader(LocalMemberPid, - OffsetSpec), - ConsumerState = #consumer{member_pid = - LocalMemberPid, - offset = OffsetSpec, - subscription_id = - SubscriptionId, - socket = Socket, - segment = Segment, - credit = Credit, - stream = Stream}, - Connection1 = maybe_monitor_stream(LocalMemberPid, - Stream, - Connection), - response_ok(Transport, - Connection, - ?COMMAND_SUBSCRIBE, - CorrelationId), + {OffsetSpec, Credit} = + case OffsetType of + ?OFFSET_TYPE_FIRST -> + <<Crdt:16>> = OffsetAndCredit, + {first, Crdt}; + ?OFFSET_TYPE_LAST -> + <<Crdt:16>> = OffsetAndCredit, + {last, Crdt}; + ?OFFSET_TYPE_NEXT -> + <<Crdt:16>> = OffsetAndCredit, + {next, Crdt}; + ?OFFSET_TYPE_OFFSET -> + <<Offset:64/unsigned, Crdt:16>> = OffsetAndCredit, + {Offset, Crdt}; + ?OFFSET_TYPE_TIMESTAMP -> + <<Timestamp:64/signed, Crdt:16>> = OffsetAndCredit, + {{timestamp, Timestamp}, Crdt} + end, + {ok, Segment} = osiris:init_reader(LocalMemberPid, OffsetSpec), + ConsumerState = + #consumer{member_pid = LocalMemberPid, + offset = OffsetSpec, + subscription_id = SubscriptionId, + socket = Socket, + segment = Segment, + credit = Credit, + stream = Stream}, + Connection1 = maybe_monitor_stream(LocalMemberPid, Stream, Connection), + response_ok(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId), {{segment, Segment1}, {credit, Credit1}} = - send_chunks(Transport, - ConsumerState, - SendFileOct), - Consumers1 = Consumers#{SubscriptionId => - ConsumerState#consumer{segment - = - Segment1, - credit - = - Credit1}}, - StreamSubscriptions1 = case StreamSubscriptions of - #{Stream := - SubscriptionIds} -> - StreamSubscriptions#{Stream - => - [SubscriptionId] - ++ - SubscriptionIds}; - _ -> - StreamSubscriptions#{Stream - => - [SubscriptionId]} - end, - {Connection1#stream_connection{stream_subscriptions - = + send_chunks(Transport, ConsumerState, SendFileOct), + Consumers1 = + Consumers#{SubscriptionId => + ConsumerState#consumer{segment = Segment1, + credit = Credit1}}, + StreamSubscriptions1 = + case StreamSubscriptions of + #{Stream := SubscriptionIds} -> + StreamSubscriptions#{Stream => + [SubscriptionId] + ++ SubscriptionIds}; + _ -> + StreamSubscriptions#{Stream => [SubscriptionId]} + end, + {Connection1#stream_connection{stream_subscriptions = StreamSubscriptions1}, - State#stream_connection_state{consumers = - Consumers1}, + State#stream_connection_state{consumers = Consumers1}, Rest} end end; @@ -1507,16 +1216,14 @@ handle_frame_post_auth(Transport, {Connection, State, Rest} end; handle_frame_post_auth(Transport, - #stream_connection{stream_subscriptions = - StreamSubscriptions} = - Connection, + #stream_connection{stream_subscriptions = StreamSubscriptions} = Connection, #stream_connection_state{consumers = Consumers} = State, - <<(?COMMAND_UNSUBSCRIBE):16, (?VERSION_0):16, - CorrelationId:32, SubscriptionId:8/unsigned>>, + <<?COMMAND_UNSUBSCRIBE:16, + ?VERSION_0:16, + CorrelationId:32, + SubscriptionId:8/unsigned>>, Rest) -> - case subscription_exists(StreamSubscriptions, - SubscriptionId) - of + case subscription_exists(StreamSubscriptions, SubscriptionId) of false -> response(Transport, Connection, @@ -1527,95 +1234,72 @@ handle_frame_post_auth(Transport, true -> #{SubscriptionId := Consumer} = Consumers, Stream = Consumer#consumer.stream, - #{Stream := SubscriptionsForThisStream} = - StreamSubscriptions, - SubscriptionsForThisStream1 = - lists:delete(SubscriptionId, - SubscriptionsForThisStream), - StreamSubscriptions1 = case - length(SubscriptionsForThisStream1) - of - 0 -> - % no more subscription for this stream - maps:remove(Stream, - StreamSubscriptions); - _ -> - StreamSubscriptions#{Stream => - SubscriptionsForThisStream1} - end, - Connection1 = - Connection#stream_connection{stream_subscriptions = - StreamSubscriptions1}, + #{Stream := SubscriptionsForThisStream} = StreamSubscriptions, + SubscriptionsForThisStream1 = lists:delete(SubscriptionId, SubscriptionsForThisStream), + StreamSubscriptions1 = + case length(SubscriptionsForThisStream1) of + 0 -> + % no more subscription for this stream + maps:remove(Stream, StreamSubscriptions); + _ -> + StreamSubscriptions#{Stream => SubscriptionsForThisStream1} + end, + Connection1 = Connection#stream_connection{stream_subscriptions = StreamSubscriptions1}, Consumers1 = maps:remove(SubscriptionId, Consumers), - Connection2 = maybe_clean_connection_from_stream(Stream, - Connection1), - response_ok(Transport, - Connection, - ?COMMAND_SUBSCRIBE, - CorrelationId), - {Connection2, - State#stream_connection_state{consumers = Consumers1}, - Rest} + Connection2 = maybe_clean_connection_from_stream(Stream, Connection1), + response_ok(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId), + {Connection2, State#stream_connection_state{consumers = Consumers1}, Rest} end; handle_frame_post_auth(Transport, - #stream_connection{socket = S, - send_file_oct = SendFileOct} = - Connection, + #stream_connection{socket = S, send_file_oct = SendFileOct} = Connection, #stream_connection_state{consumers = Consumers} = State, - <<(?COMMAND_CREDIT):16, (?VERSION_0):16, - SubscriptionId:8/unsigned, Credit:16/signed>>, + <<?COMMAND_CREDIT:16, + ?VERSION_0:16, + SubscriptionId:8/unsigned, + Credit:16/signed>>, Rest) -> case Consumers of #{SubscriptionId := Consumer} -> #consumer{credit = AvailableCredit} = Consumer, {{segment, Segment1}, {credit, Credit1}} = - send_chunks(Transport, - Consumer, - AvailableCredit + Credit, - SendFileOct), - Consumer1 = Consumer#consumer{segment = Segment1, - credit = Credit1}, + send_chunks(Transport, Consumer, AvailableCredit + Credit, SendFileOct), + Consumer1 = Consumer#consumer{segment = Segment1, credit = Credit1}, {Connection, - State#stream_connection_state{consumers = - Consumers#{SubscriptionId => - Consumer1}}, + State#stream_connection_state{consumers = Consumers#{SubscriptionId => Consumer1}}, Rest}; _ -> - rabbit_log:warning("Giving credit to unknown subscription: " - "~p~n", - [SubscriptionId]), - Frame = <<(?COMMAND_CREDIT):16, (?VERSION_0):16, - (?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST):16, - SubscriptionId:8>>, + rabbit_log:warning("Giving credit to unknown subscription: ~p~n", [SubscriptionId]), + Frame = + <<?COMMAND_CREDIT:16, + ?VERSION_0:16, + ?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST:16, + SubscriptionId:8>>, FrameSize = byte_size(Frame), Transport:send(S, [<<FrameSize:32>>, Frame]), {Connection, State, Rest} end; handle_frame_post_auth(_Transport, - #stream_connection{virtual_host = VirtualHost, - user = User} = - Connection, + #stream_connection{virtual_host = VirtualHost, user = User} = Connection, State, - <<(?COMMAND_COMMIT_OFFSET):16, (?VERSION_0):16, - _CorrelationId:32, ReferenceSize:16, - Reference:ReferenceSize/binary, StreamSize:16, - Stream:StreamSize/binary, Offset:64>>, + <<?COMMAND_COMMIT_OFFSET:16, + ?VERSION_0:16, + _CorrelationId:32, + ReferenceSize:16, + Reference:ReferenceSize/binary, + StreamSize:16, + Stream:StreamSize/binary, + Offset:64>>, Rest) -> - case - rabbit_stream_utils:check_write_permitted(#resource{name - = Stream, - kind = queue, - virtual_host = - VirtualHost}, - User, - #{}) - of + case rabbit_stream_utils:check_write_permitted(#resource{name = Stream, + kind = queue, + virtual_host = VirtualHost}, + User, + #{}) + of ok -> case lookup_leader(Stream, Connection) of cluster_not_found -> - rabbit_log:info("Could not find leader to commit offset " - "on ~p~n", - [Stream]), + rabbit_log:info("Could not find leader to commit offset on ~p~n", [Stream]), %% FIXME commit offset is fire-and-forget, so no response even if error, change this? {Connection, State, Rest}; {ClusterLeader, Connection1} -> @@ -1624,8 +1308,7 @@ handle_frame_post_auth(_Transport, end; error -> %% FIXME commit offset is fire-and-forget, so no response even if error, change this? - rabbit_log:info("Not authorized to commit offset on ~p~n", - [Stream]), + rabbit_log:info("Not authorized to commit offset on ~p~n", [Stream]), {Connection, State, Rest} end; handle_frame_post_auth(Transport, @@ -1634,48 +1317,40 @@ handle_frame_post_auth(Transport, user = User} = Connection, State, - <<(?COMMAND_QUERY_OFFSET):16, (?VERSION_0):16, - CorrelationId:32, ReferenceSize:16, - Reference:ReferenceSize/binary, StreamSize:16, + <<?COMMAND_QUERY_OFFSET:16, + ?VERSION_0:16, + CorrelationId:32, + ReferenceSize:16, + Reference:ReferenceSize/binary, + StreamSize:16, Stream:StreamSize/binary>>, Rest) -> - FrameSize = (?RESPONSE_FRAME_SIZE) + 8, - {ResponseCode, Offset} = case - rabbit_stream_utils:check_read_permitted(#resource{name - = - Stream, - kind - = - queue, - virtual_host - = - VirtualHost}, - User, - #{}) - of - ok -> - case - rabbit_stream_manager:lookup_local_member(VirtualHost, - Stream) - of - {error, not_found} -> - {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, - 0}; - {ok, LocalMemberPid} -> - {?RESPONSE_CODE_OK, - case - osiris:read_tracking(LocalMemberPid, - Reference) - of - undefined -> 0; - Offt -> Offt - end} - end; - error -> {?RESPONSE_CODE_ACCESS_REFUSED, 0} - end, + FrameSize = ?RESPONSE_FRAME_SIZE + 8, + {ResponseCode, Offset} = + case rabbit_stream_utils:check_read_permitted(#resource{name = Stream, + kind = queue, + virtual_host = VirtualHost}, + User, + #{}) + of + ok -> + case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of + {error, not_found} -> + {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0}; + {ok, LocalMemberPid} -> + {?RESPONSE_CODE_OK, + case osiris:read_tracking(LocalMemberPid, Reference) of + undefined -> + 0; + Offt -> + Offt + end} + end; + error -> + {?RESPONSE_CODE_ACCESS_REFUSED, 0} + end, Transport:send(S, - [<<FrameSize:32, (?COMMAND_QUERY_OFFSET):16, - (?VERSION_0):16>>, + [<<FrameSize:32, ?COMMAND_QUERY_OFFSET:16, ?VERSION_0:16>>, <<CorrelationId:32>>, <<ResponseCode:16>>, <<Offset:64>>]), @@ -1686,94 +1361,72 @@ handle_frame_post_auth(Transport, user = User} = Connection, State, - <<(?COMMAND_QUERY_PUBLISHER_SEQUENCE):16, - (?VERSION_0):16, CorrelationId:32, ReferenceSize:16, - Reference:ReferenceSize/binary, StreamSize:16, + <<?COMMAND_QUERY_PUBLISHER_SEQUENCE:16, + ?VERSION_0:16, + CorrelationId:32, + ReferenceSize:16, + Reference:ReferenceSize/binary, + StreamSize:16, Stream:StreamSize/binary>>, Rest) -> - FrameSize = (?RESPONSE_FRAME_SIZE) + 8, - {ResponseCode, Sequence} = case - rabbit_stream_utils:check_read_permitted(#resource{name - = - Stream, - kind - = - queue, - virtual_host - = - VirtualHost}, - User, - #{}) - of - ok -> - case - rabbit_stream_manager:lookup_local_member(VirtualHost, - Stream) - of - {error, not_found} -> - {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, - 0}; - {ok, LocalMemberPid} -> - {?RESPONSE_CODE_OK, - case - osiris:fetch_writer_seq(LocalMemberPid, - Reference) - of - undefined -> 0; - Offt -> Offt - end} - end; - error -> {?RESPONSE_CODE_ACCESS_REFUSED, 0} - end, + FrameSize = ?RESPONSE_FRAME_SIZE + 8, + {ResponseCode, Sequence} = + case rabbit_stream_utils:check_read_permitted(#resource{name = Stream, + kind = queue, + virtual_host = VirtualHost}, + User, + #{}) + of + ok -> + case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of + {error, not_found} -> + {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0}; + {ok, LocalMemberPid} -> + {?RESPONSE_CODE_OK, + case osiris:fetch_writer_seq(LocalMemberPid, Reference) of + undefined -> + 0; + Offt -> + Offt + end} + end; + error -> + {?RESPONSE_CODE_ACCESS_REFUSED, 0} + end, Transport:send(S, - [<<FrameSize:32, (?COMMAND_QUERY_PUBLISHER_SEQUENCE):16, - (?VERSION_0):16>>, + [<<FrameSize:32, ?COMMAND_QUERY_PUBLISHER_SEQUENCE:16, ?VERSION_0:16>>, <<CorrelationId:32>>, <<ResponseCode:16>>, <<Sequence:64>>]), {Connection, State, Rest}; handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost, - user = - #user{username = Username} = - User} = + user = #user{username = Username} = User} = Connection, State, - <<(?COMMAND_CREATE_STREAM):16, (?VERSION_0):16, - CorrelationId:32, StreamSize:16, - Stream:StreamSize/binary, ArgumentsCount:32, + <<?COMMAND_CREATE_STREAM:16, + ?VERSION_0:16, + CorrelationId:32, + StreamSize:16, + Stream:StreamSize/binary, + ArgumentsCount:32, ArgumentsBinary/binary>>, Rest) -> - case - rabbit_stream_utils:enforce_correct_stream_name(Stream) - of + case rabbit_stream_utils:enforce_correct_stream_name(Stream) of {ok, StreamName} -> - {Arguments, _Rest} = - rabbit_stream_utils:parse_map(ArgumentsBinary, - ArgumentsCount), - case - rabbit_stream_utils:check_configure_permitted(#resource{name - = - StreamName, - kind = - queue, - virtual_host - = - VirtualHost}, - User, - #{}) - of + {Arguments, _Rest} = rabbit_stream_utils:parse_map(ArgumentsBinary, ArgumentsCount), + case rabbit_stream_utils:check_configure_permitted(#resource{name = StreamName, + kind = queue, + virtual_host = + VirtualHost}, + User, + #{}) + of ok -> - case rabbit_stream_manager:create(VirtualHost, - StreamName, - Arguments, - Username) - of - {ok, - #{leader_pid := LeaderPid, - replica_pids := ReturnedReplicas}} -> - rabbit_log:info("Created cluster with leader ~p and replicas " - "~p~n", + case rabbit_stream_manager:create(VirtualHost, StreamName, Arguments, Username) + of + {ok, #{leader_pid := LeaderPid, replica_pids := ReturnedReplicas}} -> + rabbit_log:info("Created cluster with leader ~p and replicas ~p~n", [LeaderPid, ReturnedReplicas]), response_ok(Transport, Connection, @@ -1821,59 +1474,42 @@ handle_frame_post_auth(Transport, handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = VirtualHost, - user = - #user{username = Username} = - User} = + user = #user{username = Username} = User} = Connection, State, - <<(?COMMAND_DELETE_STREAM):16, (?VERSION_0):16, - CorrelationId:32, StreamSize:16, + <<?COMMAND_DELETE_STREAM:16, + ?VERSION_0:16, + CorrelationId:32, + StreamSize:16, Stream:StreamSize/binary>>, Rest) -> - case - rabbit_stream_utils:check_configure_permitted(#resource{name - = Stream, - kind = queue, - virtual_host = - VirtualHost}, - User, - #{}) - of + case rabbit_stream_utils:check_configure_permitted(#resource{name = Stream, + kind = queue, + virtual_host = VirtualHost}, + User, + #{}) + of ok -> - case rabbit_stream_manager:delete(VirtualHost, - Stream, - Username) - of + case rabbit_stream_manager:delete(VirtualHost, Stream, Username) of {ok, deleted} -> - response_ok(Transport, - Connection, - ?COMMAND_DELETE_STREAM, - CorrelationId), - {Connection1, State1} = case - clean_state_after_stream_deletion_or_failure(Stream, - Connection, - State) - of - {cleaned, - NewConnection, - NewState} -> - StreamSize = - byte_size(Stream), - FrameSize = 2 + 2 + 2 + 2 + - StreamSize, - Transport:send(S, - [<<FrameSize:32, - (?COMMAND_METADATA_UPDATE):16, - (?VERSION_0):16, - (?RESPONSE_CODE_STREAM_NOT_AVAILABLE):16, - StreamSize:16, - Stream/binary>>]), - {NewConnection, NewState}; - {not_cleaned, - SameConnection, - SameState} -> - {SameConnection, SameState} - end, + response_ok(Transport, Connection, ?COMMAND_DELETE_STREAM, CorrelationId), + {Connection1, State1} = + case clean_state_after_stream_deletion_or_failure(Stream, Connection, State) + of + {cleaned, NewConnection, NewState} -> + StreamSize = byte_size(Stream), + FrameSize = 2 + 2 + 2 + 2 + StreamSize, + Transport:send(S, + [<<FrameSize:32, + ?COMMAND_METADATA_UPDATE:16, + ?VERSION_0:16, + ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16, + StreamSize:16, + Stream/binary>>]), + {NewConnection, NewState}; + {not_cleaned, SameConnection, SameState} -> + {SameConnection, SameState} + end, {Connection1, State1, Rest}; {error, reference_not_found} -> response(Transport, @@ -1892,571 +1528,437 @@ handle_frame_post_auth(Transport, {Connection, State, Rest} end; handle_frame_post_auth(Transport, - #stream_connection{socket = S, - virtual_host = VirtualHost} = - Connection, + #stream_connection{socket = S, virtual_host = VirtualHost} = Connection, State, - <<(?COMMAND_METADATA):16, (?VERSION_0):16, - CorrelationId:32, StreamCount:32, + <<?COMMAND_METADATA:16, + ?VERSION_0:16, + CorrelationId:32, + StreamCount:32, BinaryStreams/binary>>, Rest) -> - Streams = - rabbit_stream_utils:extract_stream_list(BinaryStreams, - []), + Streams = rabbit_stream_utils:extract_stream_list(BinaryStreams, []), %% get the nodes involved in the streams - NodesMap = lists:foldl(fun (Stream, Acc) -> - case - rabbit_stream_manager:topology(VirtualHost, - Stream) - of - {ok, - #{leader_node := undefined, - replica_nodes := ReplicaNodes}} -> - lists:foldl(fun (ReplicaNode, - NodesAcc) -> - maps:put(ReplicaNode, - ok, - NodesAcc) - end, - Acc, - ReplicaNodes); - {ok, - #{leader_node := LeaderNode, - replica_nodes := ReplicaNodes}} -> - Acc1 = maps:put(LeaderNode, ok, Acc), - lists:foldl(fun (ReplicaNode, - NodesAcc) -> - maps:put(ReplicaNode, - ok, - NodesAcc) - end, - Acc1, - ReplicaNodes); - {error, _} -> Acc - end - end, - #{}, - Streams), + NodesMap = + lists:foldl(fun(Stream, Acc) -> + case rabbit_stream_manager:topology(VirtualHost, Stream) of + {ok, #{leader_node := undefined, replica_nodes := ReplicaNodes}} -> + lists:foldl(fun(ReplicaNode, NodesAcc) -> + maps:put(ReplicaNode, ok, NodesAcc) + end, + Acc, + ReplicaNodes); + {ok, #{leader_node := LeaderNode, replica_nodes := ReplicaNodes}} -> + Acc1 = maps:put(LeaderNode, ok, Acc), + lists:foldl(fun(ReplicaNode, NodesAcc) -> + maps:put(ReplicaNode, ok, NodesAcc) + end, + Acc1, + ReplicaNodes); + {error, _} -> Acc + end + end, + #{}, + Streams), Nodes = maps:keys(NodesMap), - {NodesInfo, _} = lists:foldl(fun (Node, {Acc, Index}) -> - Host = rpc:call(Node, - rabbit_stream, - host, - []), - Port = rpc:call(Node, - rabbit_stream, - port, - []), - case {is_binary(Host), - is_integer(Port)} - of - {true, true} -> - {Acc#{Node => - {{index, Index}, - {host, Host}, - {port, Port}}}, - Index + 1}; - _ -> - rabbit_log:warning("Error when retrieving broker metadata: " - "~p ~p~n", - [Host, - Port]), - {Acc, Index} - end - end, - {#{}, 0}, - Nodes), + {NodesInfo, _} = + lists:foldl(fun(Node, {Acc, Index}) -> + Host = rpc:call(Node, rabbit_stream, host, []), + Port = rpc:call(Node, rabbit_stream, port, []), + case {is_binary(Host), is_integer(Port)} of + {true, true} -> + {Acc#{Node => {{index, Index}, {host, Host}, {port, Port}}}, + Index + 1}; + _ -> + rabbit_log:warning("Error when retrieving broker metadata: ~p ~p~n", + [Host, Port]), + {Acc, Index} + end + end, + {#{}, 0}, + Nodes), BrokersCount = map_size(NodesInfo), - BrokersBin = maps:fold(fun (_K, - {{index, Index}, {host, Host}, {port, Port}}, - Acc) -> - HostLength = byte_size(Host), - <<Acc/binary, Index:16, HostLength:16, - Host:HostLength/binary, Port:32>> - end, - <<BrokersCount:32>>, - NodesInfo), - MetadataBin = lists:foldl(fun (Stream, Acc) -> - StreamLength = byte_size(Stream), - case - rabbit_stream_manager:topology(VirtualHost, - Stream) - of - {error, stream_not_found} -> - <<Acc/binary, StreamLength:16, - Stream:StreamLength/binary, - (?RESPONSE_CODE_STREAM_DOES_NOT_EXIST):16, - (-1):16, 0:32>>; - {error, stream_not_available} -> - <<Acc/binary, StreamLength:16, - Stream:StreamLength/binary, - (?RESPONSE_CODE_STREAM_NOT_AVAILABLE):16, - (-1):16, 0:32>>; - {ok, - #{leader_node := LeaderNode, - replica_nodes := Replicas}} -> - LeaderIndex = case NodesInfo of - #{LeaderNode := - NodeInfo} -> - {{index, - LeaderIdx}, - {host, _}, - {port, - _}} = - NodeInfo, - LeaderIdx; - _ -> -1 - end, - {ReplicasBinary, ReplicasCount} = - lists:foldl(fun (Replica, - {Bin, - Count}) -> - case - NodesInfo - of - #{Replica - := - NI} -> - {{index, - ReplicaIndex}, - {host, - _}, - {port, - _}} = - NI, - {<<Bin/binary, - ReplicaIndex:16>>, - Count - + - 1}; - _ -> - {Bin, - Count} - end - end, - {<<>>, 0}, - Replicas), - <<Acc/binary, StreamLength:16, - Stream:StreamLength/binary, - (?RESPONSE_CODE_OK):16, - LeaderIndex:16, - ReplicasCount:32, - ReplicasBinary/binary>> - end - end, - <<StreamCount:32>>, - Streams), - Frame = <<(?COMMAND_METADATA):16, (?VERSION_0):16, - CorrelationId:32, BrokersBin/binary, - MetadataBin/binary>>, + BrokersBin = + maps:fold(fun(_K, {{index, Index}, {host, Host}, {port, Port}}, Acc) -> + HostLength = byte_size(Host), + <<Acc/binary, Index:16, HostLength:16, Host:HostLength/binary, Port:32>> + end, + <<BrokersCount:32>>, + NodesInfo), + MetadataBin = + lists:foldl(fun(Stream, Acc) -> + StreamLength = byte_size(Stream), + case rabbit_stream_manager:topology(VirtualHost, Stream) of + {error, stream_not_found} -> + <<Acc/binary, + StreamLength:16, + Stream:StreamLength/binary, + ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST:16, + (-1):16, + 0:32>>; + {error, stream_not_available} -> + <<Acc/binary, + StreamLength:16, + Stream:StreamLength/binary, + ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16, + (-1):16, + 0:32>>; + {ok, #{leader_node := LeaderNode, replica_nodes := Replicas}} -> + LeaderIndex = + case NodesInfo of + #{LeaderNode := NodeInfo} -> + {{index, LeaderIdx}, {host, _}, {port, _}} = NodeInfo, + LeaderIdx; + _ -> -1 + end, + {ReplicasBinary, ReplicasCount} = + lists:foldl(fun(Replica, {Bin, Count}) -> + case NodesInfo of + #{Replica := NI} -> + {{index, ReplicaIndex}, + {host, _}, + {port, _}} = + NI, + {<<Bin/binary, ReplicaIndex:16>>, + Count + 1}; + _ -> {Bin, Count} + end + end, + {<<>>, 0}, + Replicas), + <<Acc/binary, + StreamLength:16, + Stream:StreamLength/binary, + ?RESPONSE_CODE_OK:16, + LeaderIndex:16, + ReplicasCount:32, + ReplicasBinary/binary>> + end + end, + <<StreamCount:32>>, + Streams), + Frame = + <<?COMMAND_METADATA:16, + ?VERSION_0:16, + CorrelationId:32, + BrokersBin/binary, + MetadataBin/binary>>, FrameSize = byte_size(Frame), Transport:send(S, <<FrameSize:32, Frame/binary>>), {Connection, State, Rest}; -handle_frame_post_auth(Transport, Connection, State, - <<(?COMMAND_CLOSE):16, (?VERSION_0):16, - CorrelationId:32, ClosingCode:16, +handle_frame_post_auth(Transport, + Connection, + State, + <<?COMMAND_CLOSE:16, + ?VERSION_0:16, + CorrelationId:32, + ClosingCode:16, ClosingReasonLength:16, ClosingReason:ClosingReasonLength/binary>>, _Rest) -> - rabbit_log:info("Received close command ~p ~p~n", - [ClosingCode, ClosingReason]), - Frame = <<(?COMMAND_CLOSE):16, (?VERSION_0):16, - CorrelationId:32, (?RESPONSE_CODE_OK):16>>, + rabbit_log:info("Received close command ~p ~p~n", [ClosingCode, ClosingReason]), + Frame = <<?COMMAND_CLOSE:16, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_CODE_OK:16>>, frame(Transport, Connection, Frame), - {Connection#stream_connection{connection_step = - closing}, + {Connection#stream_connection{connection_step = closing}, State, <<>>}; %% we ignore any subsequent frames -handle_frame_post_auth(_Transport, Connection, State, - <<(?COMMAND_HEARTBEAT):16, (?VERSION_0):16>>, Rest) -> +handle_frame_post_auth(_Transport, + Connection, + State, + <<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>, + Rest) -> rabbit_log:info("Received heartbeat frame post auth~n"), {Connection, State, Rest}; -handle_frame_post_auth(Transport, Connection, State, - Frame, Rest) -> - rabbit_log:warning("unknown frame ~p ~p, sending close command.~n", - [Frame, Rest]), +handle_frame_post_auth(Transport, Connection, State, Frame, Rest) -> + rabbit_log:warning("unknown frame ~p ~p, sending close command.~n", [Frame, Rest]), CloseReason = <<"unknown frame">>, CloseReasonLength = byte_size(CloseReason), - CloseFrame = <<(?COMMAND_CLOSE):16, (?VERSION_0):16, - 1:32, (?RESPONSE_CODE_UNKNOWN_FRAME):16, - CloseReasonLength:16, - CloseReason:CloseReasonLength/binary>>, + CloseFrame = + <<?COMMAND_CLOSE:16, + ?VERSION_0:16, + 1:32, + ?RESPONSE_CODE_UNKNOWN_FRAME:16, + CloseReasonLength:16, + CloseReason:CloseReasonLength/binary>>, frame(Transport, Connection, CloseFrame), - {Connection#stream_connection{connection_step = - close_sent}, - State, - Rest}. + {Connection#stream_connection{connection_step = close_sent}, State, Rest}. -notify_connection_closed(#stream_connection{name = - Name} = - Connection, - ConnectionState) -> +notify_connection_closed(#stream_connection{name = Name} = Connection, ConnectionState) -> rabbit_core_metrics:connection_closed(self()), - ClientProperties = i(client_properties, - Connection, - ConnectionState), - EventProperties = [{name, Name}, - {pid, self()}, - {node, node()}, - {client_properties, ClientProperties}], + ClientProperties = i(client_properties, Connection, ConnectionState), + EventProperties = + [{name, Name}, {pid, self()}, {node, node()}, {client_properties, ClientProperties}], rabbit_event:notify(connection_closed, augment_infos_with_user_provided_connection_name(EventProperties, Connection)). -handle_frame_post_close(_Transport, Connection, State, - <<(?COMMAND_CLOSE):16, (?VERSION_0):16, - _CorrelationId:32, _ResponseCode:16>>, +handle_frame_post_close(_Transport, + Connection, + State, + <<?COMMAND_CLOSE:16, ?VERSION_0:16, _CorrelationId:32, _ResponseCode:16>>, Rest) -> rabbit_log:info("Received close confirmation~n"), - {Connection#stream_connection{connection_step = - closing_done}, - State, - Rest}; -handle_frame_post_close(_Transport, Connection, State, - <<(?COMMAND_HEARTBEAT):16, (?VERSION_0):16>>, Rest) -> + {Connection#stream_connection{connection_step = closing_done}, State, Rest}; +handle_frame_post_close(_Transport, + Connection, + State, + <<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>, + Rest) -> rabbit_log:info("Received heartbeat frame post close~n"), {Connection, State, Rest}; -handle_frame_post_close(_Transport, Connection, State, - Frame, Rest) -> - rabbit_log:warning("ignored frame on close ~p ~p.~n", - [Frame, Rest]), +handle_frame_post_close(_Transport, Connection, State, Frame, Rest) -> + rabbit_log:warning("ignored frame on close ~p ~p.~n", [Frame, Rest]), {Connection, State, Rest}. clean_state_after_stream_deletion_or_failure(Stream, - #stream_connection{stream_subscriptions - = + #stream_connection{stream_subscriptions = StreamSubscriptions, - publishers = - Publishers, - publisher_to_ids - = - PublisherToIds, - stream_leaders = - Leaders} = + publishers = Publishers, + publisher_to_ids = PublisherToIds, + stream_leaders = Leaders} = C0, - #stream_connection_state{consumers - = - Consumers} = + #stream_connection_state{consumers = Consumers} = S0) -> - {SubscriptionsCleaned, C1, S1} = case - stream_has_subscriptions(Stream, C0) - of - true -> - #{Stream := SubscriptionIds} = - StreamSubscriptions, - {true, - C0#stream_connection{stream_subscriptions - = - maps:remove(Stream, - StreamSubscriptions)}, - S0#stream_connection_state{consumers - = - maps:without(SubscriptionIds, - Consumers)}}; - false -> {false, C0, S0} - end, - {PublishersCleaned, C2, S2} = case - stream_has_publishers(Stream, C1) - of - true -> - {PurgedPubs, PurgedPubToIds} = - maps:fold(fun (PubId, - #publisher{stream = - S, - reference - = - Ref}, - {Pubs, - PubToIds}) -> - case S of - Stream -> - {maps:remove(PubId, - Pubs), - maps:remove({Stream, - Ref}, - PubToIds)}; - _ -> - {Pubs, - PubToIds} - end - end, - {Publishers, - PublisherToIds}, - Publishers), - {true, - C1#stream_connection{publishers = - PurgedPubs, - publisher_to_ids - = - PurgedPubToIds}, - S1}; - false -> {false, C1, S1} - end, - {LeadersCleaned, Leaders1} = case Leaders of - #{Stream := _} -> - {true, maps:remove(Stream, Leaders)}; - _ -> {false, Leaders} - end, - case SubscriptionsCleaned orelse - PublishersCleaned orelse LeadersCleaned - of + {SubscriptionsCleaned, C1, S1} = + case stream_has_subscriptions(Stream, C0) of + true -> + #{Stream := SubscriptionIds} = StreamSubscriptions, + {true, + C0#stream_connection{stream_subscriptions = + maps:remove(Stream, StreamSubscriptions)}, + S0#stream_connection_state{consumers = maps:without(SubscriptionIds, Consumers)}}; + false -> + {false, C0, S0} + end, + {PublishersCleaned, C2, S2} = + case stream_has_publishers(Stream, C1) of + true -> + {PurgedPubs, PurgedPubToIds} = + maps:fold(fun(PubId, + #publisher{stream = S, reference = Ref}, + {Pubs, PubToIds}) -> + case S of + Stream -> + {maps:remove(PubId, Pubs), + maps:remove({Stream, Ref}, PubToIds)}; + _ -> {Pubs, PubToIds} + end + end, + {Publishers, PublisherToIds}, + Publishers), + {true, + C1#stream_connection{publishers = PurgedPubs, publisher_to_ids = PurgedPubToIds}, + S1}; + false -> + {false, C1, S1} + end, + {LeadersCleaned, Leaders1} = + case Leaders of + #{Stream := _} -> + {true, maps:remove(Stream, Leaders)}; + _ -> + {false, Leaders} + end, + case SubscriptionsCleaned orelse PublishersCleaned orelse LeadersCleaned of true -> C3 = demonitor_stream(Stream, C2), - {cleaned, - C3#stream_connection{stream_leaders = Leaders1}, - S2}; + {cleaned, C3#stream_connection{stream_leaders = Leaders1}, S2}; false -> - {not_cleaned, - C2#stream_connection{stream_leaders = Leaders1}, - S2} + {not_cleaned, C2#stream_connection{stream_leaders = Leaders1}, S2} end. lookup_leader(Stream, - #stream_connection{stream_leaders = StreamLeaders, - virtual_host = VirtualHost} = + #stream_connection{stream_leaders = StreamLeaders, virtual_host = VirtualHost} = Connection) -> case maps:get(Stream, StreamLeaders, undefined) of undefined -> case lookup_leader_from_manager(VirtualHost, Stream) of - cluster_not_found -> cluster_not_found; + cluster_not_found -> + cluster_not_found; LeaderPid -> - Connection1 = maybe_monitor_stream(LeaderPid, - Stream, - Connection), + Connection1 = maybe_monitor_stream(LeaderPid, Stream, Connection), {LeaderPid, Connection1#stream_connection{stream_leaders = - StreamLeaders#{Stream => - LeaderPid}}} + StreamLeaders#{Stream => LeaderPid}}} end; - LeaderPid -> {LeaderPid, Connection} + LeaderPid -> + {LeaderPid, Connection} end. lookup_leader_from_manager(VirtualHost, Stream) -> - rabbit_stream_manager:lookup_leader(VirtualHost, - Stream). + rabbit_stream_manager:lookup_leader(VirtualHost, Stream). maybe_clean_connection_from_stream(Stream, - #stream_connection{stream_leaders = - Leaders} = - Connection0) -> - Connection1 = case {stream_has_publishers(Stream, - Connection0), - stream_has_subscriptions(Stream, Connection0)} - of - {false, false} -> demonitor_stream(Stream, Connection0); - _ -> Connection0 - end, - Connection1#stream_connection{stream_leaders = - maps:remove(Stream, Leaders)}. - -maybe_monitor_stream(Pid, Stream, - #stream_connection{monitors = Monitors} = Connection) -> + #stream_connection{stream_leaders = Leaders} = Connection0) -> + Connection1 = + case {stream_has_publishers(Stream, Connection0), + stream_has_subscriptions(Stream, Connection0)} + of + {false, false} -> + demonitor_stream(Stream, Connection0); + _ -> + Connection0 + end, + Connection1#stream_connection{stream_leaders = maps:remove(Stream, Leaders)}. + +maybe_monitor_stream(Pid, Stream, #stream_connection{monitors = Monitors} = Connection) -> case lists:member(Stream, maps:values(Monitors)) of - true -> Connection; + true -> + Connection; false -> MonitorRef = monitor(process, Pid), - Connection#stream_connection{monitors = - maps:put(MonitorRef, - Stream, - Monitors)} + Connection#stream_connection{monitors = maps:put(MonitorRef, Stream, Monitors)} end. -demonitor_stream(Stream, - #stream_connection{monitors = Monitors0} = - Connection) -> - Monitors = maps:fold(fun (MonitorRef, Strm, Acc) -> - case Strm of - Stream -> - demonitor(MonitorRef, [flush]), - Acc; - _ -> maps:put(MonitorRef, Strm, Acc) - end - end, - #{}, - Monitors0), +demonitor_stream(Stream, #stream_connection{monitors = Monitors0} = Connection) -> + Monitors = + maps:fold(fun(MonitorRef, Strm, Acc) -> + case Strm of + Stream -> + demonitor(MonitorRef, [flush]), + Acc; + _ -> maps:put(MonitorRef, Strm, Acc) + end + end, + #{}, + Monitors0), Connection#stream_connection{monitors = Monitors}. stream_has_subscriptions(Stream, - #stream_connection{stream_subscriptions = - Subscriptions}) -> + #stream_connection{stream_subscriptions = Subscriptions}) -> case Subscriptions of - #{Stream := StreamSubscriptions} - when length(StreamSubscriptions) > 0 -> + #{Stream := StreamSubscriptions} when length(StreamSubscriptions) > 0 -> true; - _ -> false + _ -> + false end. -stream_has_publishers(Stream, - #stream_connection{publishers = Publishers}) -> - lists:any(fun (#publisher{stream = S}) -> - case S of - Stream -> true; - _ -> false - end +stream_has_publishers(Stream, #stream_connection{publishers = Publishers}) -> + lists:any(fun(#publisher{stream = S}) -> + case S of + Stream -> true; + _ -> false + end end, maps:values(Publishers)). -demonitor_all_streams(#stream_connection{monitors = - Monitors} = - Connection) -> - lists:foreach(fun (MonitorRef) -> - demonitor(MonitorRef, [flush]) - end, - maps:keys(Monitors)), +demonitor_all_streams(#stream_connection{monitors = Monitors} = Connection) -> + lists:foreach(fun(MonitorRef) -> demonitor(MonitorRef, [flush]) end, maps:keys(Monitors)), Connection#stream_connection{monitors = #{}}. -frame(Transport, #stream_connection{socket = S}, - Frame) -> +frame(Transport, #stream_connection{socket = S}, Frame) -> FrameSize = byte_size(Frame), Transport:send(S, [<<FrameSize:32>>, Frame]). -response_ok(Transport, State, CommandId, - CorrelationId) -> - response(Transport, - State, - CommandId, - CorrelationId, - ?RESPONSE_CODE_OK). +response_ok(Transport, State, CommandId, CorrelationId) -> + response(Transport, State, CommandId, CorrelationId, ?RESPONSE_CODE_OK). -response(Transport, #stream_connection{socket = S}, - CommandId, CorrelationId, ResponseCode) -> +response(Transport, + #stream_connection{socket = S}, + CommandId, + CorrelationId, + ResponseCode) -> Transport:send(S, - [<<(?RESPONSE_FRAME_SIZE):32, CommandId:16, - (?VERSION_0):16>>, + [<<?RESPONSE_FRAME_SIZE:32, CommandId:16, ?VERSION_0:16>>, <<CorrelationId:32>>, <<ResponseCode:16>>]). -subscription_exists(StreamSubscriptions, - SubscriptionId) -> +subscription_exists(StreamSubscriptions, SubscriptionId) -> SubscriptionIds = - lists:flatten(maps:values(StreamSubscriptions)), - lists:any(fun (Id) -> Id =:= SubscriptionId end, - SubscriptionIds). + lists:flatten( + maps:values(StreamSubscriptions)), + lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds). send_file_callback(Transport, #consumer{socket = S, subscription_id = SubscriptionId}, Counter) -> - fun (Size) -> - FrameSize = 2 + 2 + 1 + Size, - FrameBeginning = <<FrameSize:32, (?COMMAND_DELIVER):16, - (?VERSION_0):16, SubscriptionId:8/unsigned>>, - Transport:send(S, FrameBeginning), - atomics:add(Counter, 1, Size) + fun(Size) -> + FrameSize = 2 + 2 + 1 + Size, + FrameBeginning = + <<FrameSize:32, ?COMMAND_DELIVER:16, ?VERSION_0:16, SubscriptionId:8/unsigned>>, + Transport:send(S, FrameBeginning), + atomics:add(Counter, 1, Size) end. -send_chunks(Transport, - #consumer{credit = Credit} = State, Counter) -> +send_chunks(Transport, #consumer{credit = Credit} = State, Counter) -> send_chunks(Transport, State, Credit, Counter). -send_chunks(_Transport, #consumer{segment = Segment}, 0, - _Counter) -> +send_chunks(_Transport, #consumer{segment = Segment}, 0, _Counter) -> {{segment, Segment}, {credit, 0}}; -send_chunks(Transport, - #consumer{segment = Segment} = State, Credit, - Counter) -> - send_chunks(Transport, - State, - Segment, - Credit, - true, - Counter). +send_chunks(Transport, #consumer{segment = Segment} = State, Credit, Counter) -> + send_chunks(Transport, State, Segment, Credit, true, Counter). -send_chunks(_Transport, _State, Segment, 0 = _Credit, - _Retry, _Counter) -> +send_chunks(_Transport, _State, Segment, 0 = _Credit, _Retry, _Counter) -> {{segment, Segment}, {credit, 0}}; -send_chunks(Transport, #consumer{socket = S} = State, - Segment, Credit, Retry, Counter) -> - case osiris_log:send_file(S, - Segment, - send_file_callback(Transport, State, Counter)) - of +send_chunks(Transport, #consumer{socket = S} = State, Segment, Credit, Retry, Counter) -> + case osiris_log:send_file(S, Segment, send_file_callback(Transport, State, Counter)) of {ok, Segment1} -> - send_chunks(Transport, - State, - Segment1, - Credit - 1, - true, - Counter); + send_chunks(Transport, State, Segment1, Credit - 1, true, Counter); {end_of_stream, Segment1} -> case Retry of true -> timer:sleep(1), - send_chunks(Transport, - State, - Segment1, - Credit, - false, - Counter); + send_chunks(Transport, State, Segment1, Credit, false, Counter); false -> #consumer{member_pid = LocalMember} = State, - osiris:register_offset_listener(LocalMember, - osiris_log:next_offset(Segment1)), + osiris:register_offset_listener(LocalMember, osiris_log:next_offset(Segment1)), {{segment, Segment1}, {credit, Credit}} end end. emit_stats(Connection, ConnectionState) -> - [{_, Pid}, - {_, Recv_oct}, - {_, Send_oct}, - {_, Reductions}] = + [{_, Pid}, {_, Recv_oct}, {_, Send_oct}, {_, Reductions}] = I = infos(?SIMPLE_METRICS, Connection, ConnectionState), - Infos = infos(?OTHER_METRICS, - Connection, - ConnectionState), + Infos = infos(?OTHER_METRICS, Connection, ConnectionState), rabbit_core_metrics:connection_stats(Pid, Infos), - rabbit_core_metrics:connection_stats(Pid, - Recv_oct, - Send_oct, - Reductions), + rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions), rabbit_event:notify(connection_stats, Infos ++ I), - Connection1 = rabbit_event:reset_stats_timer(Connection, - #stream_connection.stats_timer), + Connection1 = rabbit_event:reset_stats_timer(Connection, #stream_connection.stats_timer), ensure_stats_timer(Connection1). ensure_stats_timer(Connection = #stream_connection{}) -> - rabbit_event:ensure_stats_timer(Connection, - #stream_connection.stats_timer, - emit_stats). + rabbit_event:ensure_stats_timer(Connection, #stream_connection.stats_timer, emit_stats). info(Pid, InfoItems) -> - case InfoItems -- (?INFO_ITEMS) of - [] -> gen_server2:call(Pid, {info, InfoItems}); - UnknownItems -> throw({bad_argument, UnknownItems}) + case InfoItems -- ?INFO_ITEMS of + [] -> + gen_server2:call(Pid, {info, InfoItems}); + UnknownItems -> + throw({bad_argument, UnknownItems}) end. infos(Items, Connection, State) -> [{Item, i(Item, Connection, State)} || Item <- Items]. -i(pid, _, _) -> self(); -i(node, _, _) -> node(); -i(SockStat, - #stream_connection{socket = Sock, - send_file_oct = Counter}, - _) - when SockStat =:= - send_oct -> % Number of bytes sent from the socket. +i(pid, _, _) -> + self(); +i(node, _, _) -> + node(); +i(SockStat, #stream_connection{socket = Sock, send_file_oct = Counter}, _) + when SockStat + =:= send_oct -> % Number of bytes sent from the socket. case rabbit_net:getstat(Sock, [SockStat]) of {ok, [{_, N}]} when is_number(N) -> N + atomics:get(Counter, 1); - _ -> 0 + atomics:get(Counter, 1) + _ -> + 0 + atomics:get(Counter, 1) end; i(SockStat, #stream_connection{socket = Sock}, _) - when SockStat =:= - recv_oct; % Number of bytes received by the socket. - SockStat =:= - recv_cnt; % Number of packets received by the socket. - SockStat =:= - send_cnt; % Number of packets sent from the socket. - SockStat =:= - send_pend -> % Number of bytes waiting to be sent by the socket. + when SockStat + =:= recv_oct; % Number of bytes received by the socket. + SockStat + =:= recv_cnt; % Number of packets received by the socket. + SockStat + =:= send_cnt; % Number of packets sent from the socket. + SockStat + =:= send_pend -> % Number of bytes waiting to be sent by the socket. case rabbit_net:getstat(Sock, [SockStat]) of - {ok, [{_, N}]} when is_number(N) -> N; - _ -> 0 + {ok, [{_, N}]} when is_number(N) -> + N; + _ -> + 0 end; i(reductions, _, _) -> - {reductions, Reductions} = erlang:process_info(self(), - reductions), + {reductions, Reductions} = erlang:process_info(self(), reductions), Reductions; i(garbage_collection, _, _) -> rabbit_misc:get_gc_info(self()); @@ -2468,59 +1970,59 @@ i(name, Connection, ConnectionState) -> i(conn_name, Connection, ConnectionState); i(conn_name, #stream_connection{name = Name}, _) -> Name; -i(port, #stream_connection{port = Port}, _) -> Port; -i(peer_port, #stream_connection{peer_port = PeerPort}, - _) -> +i(port, #stream_connection{port = Port}, _) -> + Port; +i(peer_port, #stream_connection{peer_port = PeerPort}, _) -> PeerPort; -i(host, #stream_connection{host = Host}, _) -> Host; -i(peer_host, #stream_connection{peer_host = PeerHost}, - _) -> +i(host, #stream_connection{host = Host}, _) -> + Host; +i(peer_host, #stream_connection{peer_host = PeerHost}, _) -> PeerHost; -i(ssl, _, _) -> false; -i(peer_cert_subject, _, _) -> ''; -i(peer_cert_issuer, _, _) -> ''; -i(peer_cert_validity, _, _) -> ''; -i(ssl_protocol, _, _) -> ''; -i(ssl_key_exchange, _, _) -> ''; -i(ssl_cipher, _, _) -> ''; -i(ssl_hash, _, _) -> ''; -i(channels, _, _) -> 0; -i(protocol, _, _) -> {<<"stream">>, ""}; -i(user_who_performed_action, Connection, - ConnectionState) -> +i(ssl, _, _) -> + false; +i(peer_cert_subject, _, _) -> + ''; +i(peer_cert_issuer, _, _) -> + ''; +i(peer_cert_validity, _, _) -> + ''; +i(ssl_protocol, _, _) -> + ''; +i(ssl_key_exchange, _, _) -> + ''; +i(ssl_cipher, _, _) -> + ''; +i(ssl_hash, _, _) -> + ''; +i(channels, _, _) -> + 0; +i(protocol, _, _) -> + {<<"stream">>, ""}; +i(user_who_performed_action, Connection, ConnectionState) -> i(user, Connection, ConnectionState); i(user, #stream_connection{user = U}, _) -> U#user.username; -i(vhost, #stream_connection{virtual_host = VirtualHost}, - _) -> +i(vhost, #stream_connection{virtual_host = VirtualHost}, _) -> VirtualHost; -i(subscriptions, _, - #stream_connection_state{consumers = Consumers}) -> +i(subscriptions, _, #stream_connection_state{consumers = Consumers}) -> maps:size(Consumers); -i(connection_state, _Connection, - #stream_connection_state{blocked = true}) -> +i(connection_state, _Connection, #stream_connection_state{blocked = true}) -> blocked; -i(connection_state, _Connection, - #stream_connection_state{blocked = false}) -> +i(connection_state, _Connection, #stream_connection_state{blocked = false}) -> running; -i(auth_mechanism, - #stream_connection{auth_mechanism = none}, _) -> +i(auth_mechanism, #stream_connection{auth_mechanism = none}, _) -> none; -i(auth_mechanism, - #stream_connection{auth_mechanism = {Name, _Mod}}, _) -> +i(auth_mechanism, #stream_connection{auth_mechanism = {Name, _Mod}}, _) -> Name; -i(heartbeat, #stream_connection{heartbeat = Heartbeat}, - _) -> +i(heartbeat, #stream_connection{heartbeat = Heartbeat}, _) -> Heartbeat; -i(frame_max, #stream_connection{frame_max = FrameMax}, - _) -> +i(frame_max, #stream_connection{frame_max = FrameMax}, _) -> FrameMax; -i(channel_max, _, _) -> 0; -i(client_properties, - #stream_connection{client_properties = CP}, _) -> +i(channel_max, _, _) -> + 0; +i(client_properties, #stream_connection{client_properties = CP}, _) -> rabbit_misc:to_amqp_table(CP); -i(connected_at, #stream_connection{connected_at = T}, - _) -> +i(connected_at, #stream_connection{connected_at = T}, _) -> T; i(Item, #stream_connection{}, _) -> throw({bad_argument, Item}). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_sup.erl b/deps/rabbitmq_stream/src/rabbit_stream_sup.erl index ecce4cdf2a..3e36de6fb4 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_sup.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_sup.erl @@ -19,7 +19,6 @@ -behaviour(supervisor). -export([start_link/0]). - -export([init/1]). -include("rabbit_stream.hrl"). @@ -28,52 +27,36 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - {ok, Listeners} = application:get_env(rabbitmq_stream, - tcp_listeners), - NumTcpAcceptors = application:get_env(rabbitmq_stream, - num_tcp_acceptors, - 10), - {ok, SocketOpts} = application:get_env(rabbitmq_stream, - tcp_listen_options), + {ok, Listeners} = application:get_env(rabbitmq_stream, tcp_listeners), + NumTcpAcceptors = application:get_env(rabbitmq_stream, num_tcp_acceptors, 10), + {ok, SocketOpts} = application:get_env(rabbitmq_stream, tcp_listen_options), Nodes = rabbit_mnesia:cluster_nodes(all), OsirisConf = #{nodes => Nodes}, - ServerConfiguration = #{initial_credits => - application:get_env(rabbitmq_stream, - initial_credits, - ?DEFAULT_INITIAL_CREDITS), - credits_required_for_unblocking => - application:get_env(rabbitmq_stream, - credits_required_for_unblocking, - ?DEFAULT_CREDITS_REQUIRED_FOR_UNBLOCKING), - frame_max => - application:get_env(rabbit_stream, - frame_max, - ?DEFAULT_FRAME_MAX), - heartbeat => - application:get_env(rabbit_stream, - heartbeat, - ?DEFAULT_HEARTBEAT)}, - StreamManager = #{id => rabbit_stream_manager, - type => worker, - start => - {rabbit_stream_manager, start_link, [OsirisConf]}}, + ServerConfiguration = + #{initial_credits => + application:get_env(rabbitmq_stream, initial_credits, ?DEFAULT_INITIAL_CREDITS), + credits_required_for_unblocking => + application:get_env(rabbitmq_stream, + credits_required_for_unblocking, + ?DEFAULT_CREDITS_REQUIRED_FOR_UNBLOCKING), + frame_max => application:get_env(rabbit_stream, frame_max, ?DEFAULT_FRAME_MAX), + heartbeat => application:get_env(rabbit_stream, heartbeat, ?DEFAULT_HEARTBEAT)}, + StreamManager = + #{id => rabbit_stream_manager, + type => worker, + start => {rabbit_stream_manager, start_link, [OsirisConf]}}, {ok, {{one_for_all, 10, 10}, - [StreamManager] ++ - listener_specs(fun tcp_listener_spec/1, - [SocketOpts, ServerConfiguration, NumTcpAcceptors], - Listeners)}}. + [StreamManager] + ++ listener_specs(fun tcp_listener_spec/1, + [SocketOpts, ServerConfiguration, NumTcpAcceptors], + Listeners)}}. listener_specs(Fun, Args, Listeners) -> [Fun([Address | Args]) - || Listener <- Listeners, - Address - <- rabbit_networking:tcp_listener_addresses(Listener)]. + || Listener <- Listeners, Address <- rabbit_networking:tcp_listener_addresses(Listener)]. -tcp_listener_spec([Address, - SocketOpts, - Configuration, - NumAcceptors]) -> +tcp_listener_spec([Address, SocketOpts, Configuration, NumAcceptors]) -> rabbit_networking:tcp_listener_spec(rabbit_stream_listener_sup, Address, SocketOpts, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index d7ec2264fd..f01259a8d3 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -17,158 +17,150 @@ -module(rabbit_stream_utils). %% API --export([enforce_correct_stream_name/1, - write_messages/4, - parse_map/2, - auth_mechanisms/1, - auth_mechanism_to_module/2, - check_configure_permitted/3, - check_write_permitted/3, - check_read_permitted/3, - extract_stream_list/2]). +-export([enforce_correct_stream_name/1, write_messages/4, parse_map/2, auth_mechanisms/1, + auth_mechanism_to_module/2, check_configure_permitted/3, check_write_permitted/3, + check_read_permitted/3, extract_stream_list/2]). -define(MAX_PERMISSION_CACHE_SIZE, 12). enforce_correct_stream_name(Name) -> % from rabbit_channel - StrippedName = binary:replace(Name, - [<<"\n">>, <<"\r">>], - <<"">>, - [global]), + StrippedName = binary:replace(Name, [<<"\n">>, <<"\r">>], <<"">>, [global]), case check_name(StrippedName) of - ok -> {ok, StrippedName}; - error -> error + ok -> + {ok, StrippedName}; + error -> + error end. -check_name(<<"amq.", _/binary>>) -> error; -check_name(<<"">>) -> error; -check_name(_Name) -> ok. +check_name(<<"amq.", _/binary>>) -> + error; +check_name(<<"">>) -> + error; +check_name(_Name) -> + ok. -write_messages(_ClusterLeader, undefined, _PublisherId, - <<>>) -> +write_messages(_ClusterLeader, undefined, _PublisherId, <<>>) -> ok; -write_messages(ClusterLeader, undefined, PublisherId, - <<PublishingId:64, 0:1, MessageSize:31, - Message:MessageSize/binary, Rest/binary>>) -> +write_messages(ClusterLeader, + undefined, + PublisherId, + <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) -> % FIXME handle write error - ok = osiris:write(ClusterLeader, - undefined, - {PublisherId, PublishingId}, - Message), - write_messages(ClusterLeader, - undefined, - PublisherId, - Rest); -write_messages(ClusterLeader, undefined, PublisherId, - <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, - MessageCount:16, BatchSize:32, Batch:BatchSize/binary, + ok = osiris:write(ClusterLeader, undefined, {PublisherId, PublishingId}, Message), + write_messages(ClusterLeader, undefined, PublisherId, Rest); +write_messages(ClusterLeader, + undefined, + PublisherId, + <<PublishingId:64, + 1:1, + CompressionType:3, + _Unused:4, + MessageCount:16, + BatchSize:32, + Batch:BatchSize/binary, Rest/binary>>) -> % FIXME handle write error - ok = osiris:write(ClusterLeader, - undefined, - {PublisherId, PublishingId}, - {batch, MessageCount, CompressionType, Batch}), - write_messages(ClusterLeader, - undefined, - PublisherId, - Rest); -write_messages(_ClusterLeader, _PublisherRef, - _PublisherId, <<>>) -> + ok = + osiris:write(ClusterLeader, + undefined, + {PublisherId, PublishingId}, + {batch, MessageCount, CompressionType, Batch}), + write_messages(ClusterLeader, undefined, PublisherId, Rest); +write_messages(_ClusterLeader, _PublisherRef, _PublisherId, <<>>) -> ok; -write_messages(ClusterLeader, PublisherRef, PublisherId, - <<PublishingId:64, 0:1, MessageSize:31, - Message:MessageSize/binary, Rest/binary>>) -> +write_messages(ClusterLeader, + PublisherRef, + PublisherId, + <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) -> % FIXME handle write error - ok = osiris:write(ClusterLeader, - PublisherRef, - PublishingId, - Message), - write_messages(ClusterLeader, - PublisherRef, - PublisherId, - Rest); -write_messages(ClusterLeader, PublisherRef, PublisherId, - <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, - MessageCount:16, BatchSize:32, Batch:BatchSize/binary, + ok = osiris:write(ClusterLeader, PublisherRef, PublishingId, Message), + write_messages(ClusterLeader, PublisherRef, PublisherId, Rest); +write_messages(ClusterLeader, + PublisherRef, + PublisherId, + <<PublishingId:64, + 1:1, + CompressionType:3, + _Unused:4, + MessageCount:16, + BatchSize:32, + Batch:BatchSize/binary, Rest/binary>>) -> % FIXME handle write error - ok = osiris:write(ClusterLeader, - PublisherRef, - PublishingId, - {batch, MessageCount, CompressionType, Batch}), - write_messages(ClusterLeader, - PublisherRef, - PublisherId, - Rest). + ok = + osiris:write(ClusterLeader, + PublisherRef, + PublishingId, + {batch, MessageCount, CompressionType, Batch}), + write_messages(ClusterLeader, PublisherRef, PublisherId, Rest). -parse_map(<<>>, _Count) -> {#{}, <<>>}; -parse_map(Content, 0) -> {#{}, Content}; +parse_map(<<>>, _Count) -> + {#{}, <<>>}; +parse_map(Content, 0) -> + {#{}, Content}; parse_map(Arguments, Count) -> parse_map(#{}, Arguments, Count). -parse_map(Acc, <<>>, _Count) -> {Acc, <<>>}; -parse_map(Acc, Content, 0) -> {Acc, Content}; +parse_map(Acc, <<>>, _Count) -> + {Acc, <<>>}; +parse_map(Acc, Content, 0) -> + {Acc, Content}; parse_map(Acc, - <<KeySize:16, Key:KeySize/binary, ValueSize:16, - Value:ValueSize/binary, Rest/binary>>, + <<KeySize:16, Key:KeySize/binary, ValueSize:16, Value:ValueSize/binary, Rest/binary>>, Count) -> parse_map(maps:put(Key, Value, Acc), Rest, Count - 1). auth_mechanisms(Sock) -> - {ok, Configured} = application:get_env(rabbit, - auth_mechanisms), + {ok, Configured} = application:get_env(rabbit, auth_mechanisms), [rabbit_data_coercion:to_binary(Name) - || {Name, Module} - <- rabbit_registry:lookup_all(auth_mechanism), + || {Name, Module} <- rabbit_registry:lookup_all(auth_mechanism), Module:should_offer(Sock), lists:member(Name, Configured)]. auth_mechanism_to_module(TypeBin, Sock) -> case rabbit_registry:binary_to_type(TypeBin) of {error, not_found} -> - rabbit_log:warning("Unknown authentication mechanism '~p'~n", - [TypeBin]), + rabbit_log:warning("Unknown authentication mechanism '~p'~n", [TypeBin]), {error, not_found}; T -> - case {lists:member(TypeBin, - rabbit_stream_utils:auth_mechanisms(Sock)), + case {lists:member(TypeBin, rabbit_stream_utils:auth_mechanisms(Sock)), rabbit_registry:lookup_module(auth_mechanism, T)} - of - {true, {ok, Module}} -> {ok, Module}; + of + {true, {ok, Module}} -> + {ok, Module}; _ -> - rabbit_log:warning("Invalid authentication mechanism '~p'~n", - [T]), + rabbit_log:warning("Invalid authentication mechanism '~p'~n", [T]), {error, invalid} end end. check_resource_access(User, Resource, Perm, Context) -> V = {Resource, Context, Perm}, - Cache = case get(permission_cache) of - undefined -> []; - Other -> Other - end, + Cache = + case get(permission_cache) of + undefined -> + []; + Other -> + Other + end, case lists:member(V, Cache) of - true -> ok; + true -> + ok; false -> - try rabbit_access_control:check_resource_access(User, - Resource, - Perm, - Context), - CacheTail = lists:sublist(Cache, - (?MAX_PERMISSION_CACHE_SIZE) - 1), + try + rabbit_access_control:check_resource_access(User, Resource, Perm, Context), + CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1), put(permission_cache, [V | CacheTail]), ok catch - exit:_ -> error + exit:_ -> + error end end. check_configure_permitted(Resource, User, Context) -> - check_resource_access(User, - Resource, - configure, - Context). + check_resource_access(User, Resource, configure, Context). check_write_permitted(Resource, User, Context) -> check_resource_access(User, Resource, write, Context). @@ -176,8 +168,7 @@ check_write_permitted(Resource, User, Context) -> check_read_permitted(Resource, User, Context) -> check_resource_access(User, Resource, read, Context). -extract_stream_list(<<>>, Streams) -> Streams; -extract_stream_list(<<Length:16, Stream:Length/binary, - Rest/binary>>, - Streams) -> +extract_stream_list(<<>>, Streams) -> + Streams; +extract_stream_list(<<Length:16, Stream:Length/binary, Rest/binary>>, Streams) -> extract_stream_list(Rest, [Stream | Streams]). diff --git a/deps/rabbitmq_stream/test/command_SUITE.erl b/deps/rabbitmq_stream/test/command_SUITE.erl index c44c02bfeb..820c878b3b 100644 --- a/deps/rabbitmq_stream/test/command_SUITE.erl +++ b/deps/rabbitmq_stream/test/command_SUITE.erl @@ -10,35 +10,32 @@ -compile([export_all]). -include_lib("common_test/include/ct.hrl"). - -include_lib("eunit/include/eunit.hrl"). - -include_lib("amqp_client/include/amqp_client.hrl"). -include("rabbit_stream.hrl"). --define(COMMAND, - 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand'). +-define(COMMAND, 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand'). -all() -> [{group, non_parallel_tests}]. +all() -> + [{group, non_parallel_tests}]. groups() -> [{non_parallel_tests, [], [merge_defaults, run]}]. init_per_suite(Config) -> - Config1 = rabbit_ct_helpers:set_config(Config, - [{rmq_nodename_suffix, ?MODULE}]), + Config1 = rabbit_ct_helpers:set_config(Config, [{rmq_nodename_suffix, ?MODULE}]), rabbit_ct_helpers:log_environment(), - rabbit_ct_helpers:run_setup_steps(Config1, - rabbit_ct_broker_helpers:setup_steps()). + rabbit_ct_helpers:run_setup_steps(Config1, rabbit_ct_broker_helpers:setup_steps()). end_per_suite(Config) -> - rabbit_ct_helpers:run_teardown_steps(Config, - rabbit_ct_broker_helpers:teardown_steps()). + rabbit_ct_helpers:run_teardown_steps(Config, rabbit_ct_broker_helpers:teardown_steps()). -init_per_group(_, Config) -> Config. +init_per_group(_, Config) -> + Config. -end_per_group(_, Config) -> Config. +end_per_group(_, Config) -> + Config. init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). @@ -47,72 +44,66 @@ end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). merge_defaults(_Config) -> - {[<<"conn_name">>], #{verbose := false}} = - (?COMMAND):merge_defaults([], #{}), + {[<<"conn_name">>], #{verbose := false}} = ?COMMAND:merge_defaults([], #{}), {[<<"other_key">>], #{verbose := true}} = - (?COMMAND):merge_defaults([<<"other_key">>], - #{verbose => true}), + ?COMMAND:merge_defaults([<<"other_key">>], #{verbose => true}), {[<<"other_key">>], #{verbose := false}} = - (?COMMAND):merge_defaults([<<"other_key">>], - #{verbose => false}). + ?COMMAND:merge_defaults([<<"other_key">>], #{verbose => false}). run(Config) -> - Node = rabbit_ct_broker_helpers:get_node_config(Config, - 0, - nodename), - Opts = #{node => Node, timeout => 10000, - verbose => false}, + Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Opts = + #{node => Node, + timeout => 10000, + verbose => false}, %% No connections - [] = 'Elixir.Enum':to_list((?COMMAND):run([], Opts)), - StreamPort = - rabbit_stream_SUITE:get_stream_port(Config), + [] = + 'Elixir.Enum':to_list( + ?COMMAND:run([], Opts)), + StreamPort = rabbit_stream_SUITE:get_stream_port(Config), S1 = start_stream_connection(StreamPort), ct:sleep(100), [[{conn_name, _}]] = - 'Elixir.Enum':to_list((?COMMAND):run([<<"conn_name">>], - Opts)), + 'Elixir.Enum':to_list( + ?COMMAND:run([<<"conn_name">>], Opts)), S2 = start_stream_connection(StreamPort), ct:sleep(100), [[{conn_name, _}], [{conn_name, _}]] = - 'Elixir.Enum':to_list((?COMMAND):run([<<"conn_name">>], - Opts)), - Port = rabbit_ct_broker_helpers:get_node_config(Config, - 0, - tcp_port_amqp), + 'Elixir.Enum':to_list( + ?COMMAND:run([<<"conn_name">>], Opts)), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), start_amqp_connection(network, Node, Port), %% There are still just two connections [[{conn_name, _}], [{conn_name, _}]] = - 'Elixir.Enum':to_list((?COMMAND):run([<<"conn_name">>], - Opts)), + 'Elixir.Enum':to_list( + ?COMMAND:run([<<"conn_name">>], Opts)), start_amqp_connection(direct, Node, Port), %% Still two MQTT connections, one direct AMQP 0-9-1 connection [[{conn_name, _}], [{conn_name, _}]] = - 'Elixir.Enum':to_list((?COMMAND):run([<<"conn_name">>], - Opts)), + 'Elixir.Enum':to_list( + ?COMMAND:run([<<"conn_name">>], Opts)), %% Verbose returns all keys - Infos = lists:map(fun (El) -> atom_to_binary(El, utf8) - end, - ?INFO_ITEMS), - AllKeys = 'Elixir.Enum':to_list((?COMMAND):run(Infos, - Opts)), - AllKeys = 'Elixir.Enum':to_list((?COMMAND):run([], - Opts#{verbose => true})), + Infos = lists:map(fun(El) -> atom_to_binary(El, utf8) end, ?INFO_ITEMS), + AllKeys = + 'Elixir.Enum':to_list( + ?COMMAND:run(Infos, Opts)), + AllKeys = + 'Elixir.Enum':to_list( + ?COMMAND:run([], Opts#{verbose => true})), %% There are two connections [First, _Second] = AllKeys, %% Keys are INFO_ITEMS KeysCount = length(?INFO_ITEMS), KeysCount = length(First), {Keys, _} = lists:unzip(First), - [] = Keys -- (?INFO_ITEMS), - [] = (?INFO_ITEMS) -- Keys, + [] = Keys -- ?INFO_ITEMS, + [] = ?INFO_ITEMS -- Keys, rabbit_stream_SUITE:test_close(S1), rabbit_stream_SUITE:test_close(S2), ok. start_stream_connection(Port) -> - {ok, S} = gen_tcp:connect("localhost", - Port, - [{active, false}, {mode, binary}]), + {ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]), rabbit_stream_SUITE:test_peer_properties(S), rabbit_stream_SUITE:test_authenticate(S), S. diff --git a/deps/rabbitmq_stream/test/config_schema_SUITE.erl b/deps/rabbitmq_stream/test/config_schema_SUITE.erl index ea567b2aa1..a4e2485bdd 100644 --- a/deps/rabbitmq_stream/test/config_schema_SUITE.erl +++ b/deps/rabbitmq_stream/test/config_schema_SUITE.erl @@ -9,7 +9,8 @@ -compile(export_all). -all() -> [run_snippets]. +all() -> + [run_snippets]. %% ------------------------------------------------------------------- %% Testsuite setup/teardown. @@ -18,25 +19,23 @@ all() -> [run_snippets]. init_per_suite(Config) -> rabbit_ct_helpers:log_environment(), Config1 = rabbit_ct_helpers:run_setup_steps(Config), - rabbit_ct_config_schema:init_schemas(rabbitmq_stream, - Config1). + rabbit_ct_config_schema:init_schemas(rabbitmq_stream, Config1). end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase), - Config1 = rabbit_ct_helpers:set_config(Config, - [{rmq_nodename_suffix, Testcase}]), + Config1 = rabbit_ct_helpers:set_config(Config, [{rmq_nodename_suffix, Testcase}]), rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()). + rabbit_ct_broker_helpers:setup_steps() + ++ rabbit_ct_client_helpers:setup_steps()). end_per_testcase(Testcase, Config) -> - Config1 = rabbit_ct_helpers:run_steps(Config, - rabbit_ct_client_helpers:teardown_steps() - ++ - rabbit_ct_broker_helpers:teardown_steps()), + Config1 = + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() + ++ rabbit_ct_broker_helpers:teardown_steps()), rabbit_ct_helpers:testcase_finished(Config1, Testcase). %% ------------------------------------------------------------------- @@ -44,11 +43,7 @@ end_per_testcase(Testcase, Config) -> %% ------------------------------------------------------------------- run_snippets(Config) -> - ok = rabbit_ct_broker_helpers:rpc(Config, - 0, - ?MODULE, - run_snippets1, - [Config]). + ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, run_snippets1, [Config]). run_snippets1(Config) -> rabbit_ct_config_schema:run_snippets(Config). diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index ef90ef6a70..0fc8524918 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -22,51 +22,49 @@ -compile(export_all). -all() -> [{group, single_node}, {group, cluster}]. +all() -> + [{group, single_node}, {group, cluster}]. groups() -> - [{single_node, [], [test_stream]}, - {cluster, [], [test_stream, java]}]. + [{single_node, [], [test_stream]}, {cluster, [], [test_stream, java]}]. init_per_suite(Config) -> rabbit_ct_helpers:log_environment(), Config. -end_per_suite(Config) -> Config. +end_per_suite(Config) -> + Config. init_per_group(single_node, Config) -> - Config1 = rabbit_ct_helpers:set_config(Config, - [{rmq_nodes_clustered, false}]), - rabbit_ct_helpers:run_setup_steps(Config1, - rabbit_ct_broker_helpers:setup_steps()); + Config1 = rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]), + rabbit_ct_helpers:run_setup_steps(Config1, rabbit_ct_broker_helpers:setup_steps()); init_per_group(cluster = Group, Config) -> - Config1 = rabbit_ct_helpers:set_config(Config, - [{rmq_nodes_clustered, true}]), - Config2 = rabbit_ct_helpers:set_config(Config1, - [{rmq_nodes_count, 3}, - {rmq_nodename_suffix, Group}, - {tcp_ports_base}]), + Config1 = rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]), + Config2 = + rabbit_ct_helpers:set_config(Config1, + [{rmq_nodes_count, 3}, + {rmq_nodename_suffix, Group}, + {tcp_ports_base}]), rabbit_ct_helpers:run_setup_steps(Config2, - [fun (StepConfig) -> - rabbit_ct_helpers:merge_app_env(StepConfig, - {aten, - [{poll_interval, - 1000}]}) + [fun(StepConfig) -> + rabbit_ct_helpers:merge_app_env(StepConfig, + {aten, + [{poll_interval, 1000}]}) end] - ++ - rabbit_ct_broker_helpers:setup_steps()); + ++ rabbit_ct_broker_helpers:setup_steps()); init_per_group(_, Config) -> rabbit_ct_helpers:run_setup_steps(Config). end_per_group(java, Config) -> rabbit_ct_helpers:run_teardown_steps(Config); end_per_group(_, Config) -> - rabbit_ct_helpers:run_steps(Config, - rabbit_ct_broker_helpers:teardown_steps()). + rabbit_ct_helpers:run_steps(Config, rabbit_ct_broker_helpers:teardown_steps()). -init_per_testcase(_TestCase, Config) -> Config. +init_per_testcase(_TestCase, Config) -> + Config. -end_per_testcase(_Test, _Config) -> ok. +end_per_testcase(_Test, _Config) -> + ok. test_stream(Config) -> Port = get_stream_port(Config), @@ -79,41 +77,35 @@ java(Config) -> Node1Name = get_node_name(Config, 0), Node2Name = get_node_name(Config, 1), RabbitMqCtl = get_rabbitmqctl(Config), - DataDir = rabbit_ct_helpers:get_config(Config, - data_dir), - MakeResult = rabbit_ct_helpers:make(Config, - DataDir, - ["tests", - {"NODE1_STREAM_PORT=~b", - [StreamPortNode1]}, - {"NODE1_NAME=~p", [Node1Name]}, - {"NODE2_NAME=~p", [Node2Name]}, - {"NODE2_STREAM_PORT=~b", - [StreamPortNode2]}, - {"RABBITMQCTL=~p", [RabbitMqCtl]}]), + DataDir = rabbit_ct_helpers:get_config(Config, data_dir), + MakeResult = + rabbit_ct_helpers:make(Config, + DataDir, + ["tests", + {"NODE1_STREAM_PORT=~b", [StreamPortNode1]}, + {"NODE1_NAME=~p", [Node1Name]}, + {"NODE2_NAME=~p", [Node2Name]}, + {"NODE2_STREAM_PORT=~b", [StreamPortNode2]}, + {"RABBITMQCTL=~p", [RabbitMqCtl]}]), {ok, _} = MakeResult. get_rabbitmqctl(Config) -> rabbit_ct_helpers:get_config(Config, rabbitmqctl_cmd). -get_stream_port(Config) -> get_stream_port(Config, 0). +get_stream_port(Config) -> + get_stream_port(Config, 0). get_stream_port(Config, Node) -> - rabbit_ct_broker_helpers:get_node_config(Config, - Node, - tcp_port_stream). + rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_stream). -get_node_name(Config) -> get_node_name(Config, 0). +get_node_name(Config) -> + get_node_name(Config, 0). get_node_name(Config, Node) -> - rabbit_ct_broker_helpers:get_node_config(Config, - Node, - nodename). + rabbit_ct_broker_helpers:get_node_config(Config, Node, nodename). test_server(Port) -> - {ok, S} = gen_tcp:connect("localhost", - Port, - [{active, false}, {mode, binary}]), + {ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]), test_peer_properties(S), test_authenticate(S), Stream = <<"stream1">>, @@ -132,194 +124,250 @@ test_server(Port) -> ok. test_peer_properties(S) -> - PeerPropertiesFrame = <<(?COMMAND_PEER_PROPERTIES):16, - (?VERSION_0):16, 1:32, 0:32>>, - PeerPropertiesFrameSize = - byte_size(PeerPropertiesFrame), - gen_tcp:send(S, - <<PeerPropertiesFrameSize:32, - PeerPropertiesFrame/binary>>), + PeerPropertiesFrame = <<?COMMAND_PEER_PROPERTIES:16, ?VERSION_0:16, 1:32, 0:32>>, + PeerPropertiesFrameSize = byte_size(PeerPropertiesFrame), + gen_tcp:send(S, <<PeerPropertiesFrameSize:32, PeerPropertiesFrame/binary>>), {ok, - <<_Size:32, (?COMMAND_PEER_PROPERTIES):16, - (?VERSION_0):16, 1:32, (?RESPONSE_CODE_OK):16, + <<_Size:32, + ?COMMAND_PEER_PROPERTIES:16, + ?VERSION_0:16, + 1:32, + ?RESPONSE_CODE_OK:16, _Rest/binary>>} = gen_tcp:recv(S, 0, 5000). test_authenticate(S) -> - SaslHandshakeFrame = <<(?COMMAND_SASL_HANDSHAKE):16, - (?VERSION_0):16, 1:32>>, + SaslHandshakeFrame = <<?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16, 1:32>>, SaslHandshakeFrameSize = byte_size(SaslHandshakeFrame), - gen_tcp:send(S, - <<SaslHandshakeFrameSize:32, - SaslHandshakeFrame/binary>>), + gen_tcp:send(S, <<SaslHandshakeFrameSize:32, SaslHandshakeFrame/binary>>), Plain = <<"PLAIN">>, AmqPlain = <<"AMQPLAIN">>, {ok, SaslAvailable} = gen_tcp:recv(S, 0, 5000), %% mechanisms order is not deterministic, so checking both orders - ok = case SaslAvailable of - <<31:32, (?COMMAND_SASL_HANDSHAKE):16, (?VERSION_0):16, - 1:32, (?RESPONSE_CODE_OK):16, 2:32, 5:16, - Plain:5/binary, 8:16, AmqPlain:8/binary>> -> - ok; - <<31:32, (?COMMAND_SASL_HANDSHAKE):16, (?VERSION_0):16, - 1:32, (?RESPONSE_CODE_OK):16, 2:32, 8:16, - AmqPlain:8/binary, 5:16, Plain:5/binary>> -> - ok; - _ -> failed - end, + ok = + case SaslAvailable of + <<31:32, + ?COMMAND_SASL_HANDSHAKE:16, + ?VERSION_0:16, + 1:32, + ?RESPONSE_CODE_OK:16, + 2:32, + 5:16, + Plain:5/binary, + 8:16, + AmqPlain:8/binary>> -> + ok; + <<31:32, + ?COMMAND_SASL_HANDSHAKE:16, + ?VERSION_0:16, + 1:32, + ?RESPONSE_CODE_OK:16, + 2:32, + 8:16, + AmqPlain:8/binary, + 5:16, + Plain:5/binary>> -> + ok; + _ -> + failed + end, Username = <<"guest">>, Password = <<"guest">>, Null = 0, - PlainSasl = <<Null:8, Username/binary, Null:8, - Password/binary>>, + PlainSasl = <<Null:8, Username/binary, Null:8, Password/binary>>, PlainSaslSize = byte_size(PlainSasl), SaslAuthenticateFrame = - <<(?COMMAND_SASL_AUTHENTICATE):16, (?VERSION_0):16, - 2:32, 5:16, Plain/binary, PlainSaslSize:32, + <<?COMMAND_SASL_AUTHENTICATE:16, + ?VERSION_0:16, + 2:32, + 5:16, + Plain/binary, + PlainSaslSize:32, PlainSasl/binary>>, - SaslAuthenticateFrameSize = - byte_size(SaslAuthenticateFrame), - gen_tcp:send(S, - <<SaslAuthenticateFrameSize:32, - SaslAuthenticateFrame/binary>>), + SaslAuthenticateFrameSize = byte_size(SaslAuthenticateFrame), + gen_tcp:send(S, <<SaslAuthenticateFrameSize:32, SaslAuthenticateFrame/binary>>), {ok, - <<10:32, (?COMMAND_SASL_AUTHENTICATE):16, - (?VERSION_0):16, 2:32, (?RESPONSE_CODE_OK):16, + <<10:32, + ?COMMAND_SASL_AUTHENTICATE:16, + ?VERSION_0:16, + 2:32, + ?RESPONSE_CODE_OK:16, RestTune/binary>>} = gen_tcp:recv(S, 0, 5000), - TuneExpected = <<12:32, (?COMMAND_TUNE):16, - (?VERSION_0):16, (?DEFAULT_FRAME_MAX):32, - (?DEFAULT_HEARTBEAT):32>>, + TuneExpected = + <<12:32, ?COMMAND_TUNE:16, ?VERSION_0:16, ?DEFAULT_FRAME_MAX:32, ?DEFAULT_HEARTBEAT:32>>, case RestTune of - <<>> -> {ok, TuneExpected} = gen_tcp:recv(S, 0, 5000); - TuneReceived -> TuneExpected = TuneReceived + <<>> -> + {ok, TuneExpected} = gen_tcp:recv(S, 0, 5000); + TuneReceived -> + TuneExpected = TuneReceived end, - TuneFrame = <<(?COMMAND_TUNE):16, (?VERSION_0):16, - (?DEFAULT_FRAME_MAX):32, 0:32>>, + TuneFrame = <<?COMMAND_TUNE:16, ?VERSION_0:16, ?DEFAULT_FRAME_MAX:32, 0:32>>, TuneFrameSize = byte_size(TuneFrame), gen_tcp:send(S, <<TuneFrameSize:32, TuneFrame/binary>>), VirtualHost = <<"/">>, VirtualHostLength = byte_size(VirtualHost), - OpenFrame = <<(?COMMAND_OPEN):16, (?VERSION_0):16, 3:32, - VirtualHostLength:16, VirtualHost/binary>>, + OpenFrame = + <<?COMMAND_OPEN:16, ?VERSION_0:16, 3:32, VirtualHostLength:16, VirtualHost/binary>>, OpenFrameSize = byte_size(OpenFrame), gen_tcp:send(S, <<OpenFrameSize:32, OpenFrame/binary>>), - {ok, - <<10:32, (?COMMAND_OPEN):16, (?VERSION_0):16, 3:32, - (?RESPONSE_CODE_OK):16>>} = + {ok, <<10:32, ?COMMAND_OPEN:16, ?VERSION_0:16, 3:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000). test_create_stream(S, Stream) -> StreamSize = byte_size(Stream), - CreateStreamFrame = <<(?COMMAND_CREATE_STREAM):16, - (?VERSION_0):16, 1:32, StreamSize:16, - Stream:StreamSize/binary, 0:32>>, + CreateStreamFrame = + <<?COMMAND_CREATE_STREAM:16, + ?VERSION_0:16, + 1:32, + StreamSize:16, + Stream:StreamSize/binary, + 0:32>>, FrameSize = byte_size(CreateStreamFrame), - gen_tcp:send(S, - <<FrameSize:32, CreateStreamFrame/binary>>), - {ok, - <<_Size:32, (?COMMAND_CREATE_STREAM):16, - (?VERSION_0):16, 1:32, (?RESPONSE_CODE_OK):16>>} = + gen_tcp:send(S, <<FrameSize:32, CreateStreamFrame/binary>>), + {ok, <<_Size:32, ?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000). test_delete_stream(S, Stream) -> StreamSize = byte_size(Stream), - DeleteStreamFrame = <<(?COMMAND_DELETE_STREAM):16, - (?VERSION_0):16, 1:32, StreamSize:16, - Stream:StreamSize/binary>>, + DeleteStreamFrame = + <<?COMMAND_DELETE_STREAM:16, + ?VERSION_0:16, + 1:32, + StreamSize:16, + Stream:StreamSize/binary>>, FrameSize = byte_size(DeleteStreamFrame), - gen_tcp:send(S, - <<FrameSize:32, DeleteStreamFrame/binary>>), + gen_tcp:send(S, <<FrameSize:32, DeleteStreamFrame/binary>>), ResponseFrameSize = 10, {ok, - <<ResponseFrameSize:32, (?COMMAND_DELETE_STREAM):16, - (?VERSION_0):16, 1:32, (?RESPONSE_CODE_OK):16>>} = + <<ResponseFrameSize:32, + ?COMMAND_DELETE_STREAM:16, + ?VERSION_0:16, + 1:32, + ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 4 + 10, 5000). test_declare_publisher(S, PublisherId, Stream) -> StreamSize = byte_size(Stream), DeclarePublisherFrame = - <<(?COMMAND_DECLARE_PUBLISHER):16, (?VERSION_0):16, - 1:32, PublisherId:8, + <<?COMMAND_DECLARE_PUBLISHER:16, + ?VERSION_0:16, + 1:32, + PublisherId:8, 0:16, %% empty publisher reference - StreamSize:16, Stream:StreamSize/binary>>, + StreamSize:16, + Stream:StreamSize/binary>>, FrameSize = byte_size(DeclarePublisherFrame), - gen_tcp:send(S, - <<FrameSize:32, DeclarePublisherFrame/binary>>), + gen_tcp:send(S, <<FrameSize:32, DeclarePublisherFrame/binary>>), Res = gen_tcp:recv(S, 0, 5000), {ok, - <<_Size:32, (?COMMAND_DECLARE_PUBLISHER):16, - (?VERSION_0):16, 1:32, (?RESPONSE_CODE_OK):16, + <<_Size:32, + ?COMMAND_DECLARE_PUBLISHER:16, + ?VERSION_0:16, + 1:32, + ?RESPONSE_CODE_OK:16, Rest/binary>>} = Res, Rest. test_publish_confirm(S, PublisherId, Body) -> BodySize = byte_size(Body), - PublishFrame = <<(?COMMAND_PUBLISH):16, (?VERSION_0):16, - PublisherId:8, 1:32, 1:64, BodySize:32, - Body:BodySize/binary>>, + PublishFrame = + <<?COMMAND_PUBLISH:16, + ?VERSION_0:16, + PublisherId:8, + 1:32, + 1:64, + BodySize:32, + Body:BodySize/binary>>, FrameSize = byte_size(PublishFrame), gen_tcp:send(S, <<FrameSize:32, PublishFrame/binary>>), {ok, - <<_Size:32, (?COMMAND_PUBLISH_CONFIRM):16, - (?VERSION_0):16, PublisherId:8, 1:32, 1:64>>} = + <<_Size:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16, PublisherId:8, 1:32, 1:64>>} = gen_tcp:recv(S, 0, 5000). test_subscribe(S, SubscriptionId, Stream) -> StreamSize = byte_size(Stream), - SubscribeFrame = <<(?COMMAND_SUBSCRIBE):16, - (?VERSION_0):16, 1:32, SubscriptionId:8, StreamSize:16, - Stream:StreamSize/binary, (?OFFSET_TYPE_OFFSET):16, - 0:64, 10:16>>, + SubscribeFrame = + <<?COMMAND_SUBSCRIBE:16, + ?VERSION_0:16, + 1:32, + SubscriptionId:8, + StreamSize:16, + Stream:StreamSize/binary, + ?OFFSET_TYPE_OFFSET:16, + 0:64, + 10:16>>, FrameSize = byte_size(SubscribeFrame), - gen_tcp:send(S, - <<FrameSize:32, SubscribeFrame/binary>>), + gen_tcp:send(S, <<FrameSize:32, SubscribeFrame/binary>>), Res = gen_tcp:recv(S, 0, 5000), {ok, - <<_Size:32, (?COMMAND_SUBSCRIBE):16, (?VERSION_0):16, - 1:32, (?RESPONSE_CODE_OK):16, Rest/binary>>} = + <<_Size:32, + ?COMMAND_SUBSCRIBE:16, + ?VERSION_0:16, + 1:32, + ?RESPONSE_CODE_OK:16, + Rest/binary>>} = Res, Rest. test_deliver(S, Rest, SubscriptionId, Body) -> BodySize = byte_size(Body), Frame = read_frame(S, Rest), - <<58:32, (?COMMAND_DELIVER):16, (?VERSION_0):16, - SubscriptionId:8, 5:4/unsigned, 0:4/unsigned, 0:8, 1:16, - 1:32, _Timestamp:64, _Epoch:64, 0:64, _Crc:32, - _DataLength:32, _TrailerLength:32, 0:1, - BodySize:31/unsigned, Body/binary>> = + <<58:32, + ?COMMAND_DELIVER:16, + ?VERSION_0:16, + SubscriptionId:8, + 5:4/unsigned, + 0:4/unsigned, + 0:8, + 1:16, + 1:32, + _Timestamp:64, + _Epoch:64, + 0:64, + _Crc:32, + _DataLength:32, + _TrailerLength:32, + 0:1, + BodySize:31/unsigned, + Body/binary>> = Frame. test_metadata_update_stream_deleted(S, Stream) -> StreamSize = byte_size(Stream), {ok, - <<15:32, (?COMMAND_METADATA_UPDATE):16, (?VERSION_0):16, - (?RESPONSE_CODE_STREAM_NOT_AVAILABLE):16, StreamSize:16, + <<15:32, + ?COMMAND_METADATA_UPDATE:16, + ?VERSION_0:16, + ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16, + StreamSize:16, Stream/binary>>} = gen_tcp:recv(S, 0, 5000). test_close(S) -> CloseReason = <<"OK">>, CloseReasonSize = byte_size(CloseReason), - CloseFrame = <<(?COMMAND_CLOSE):16, (?VERSION_0):16, - 1:32, (?RESPONSE_CODE_OK):16, CloseReasonSize:16, - CloseReason/binary>>, + CloseFrame = + <<?COMMAND_CLOSE:16, + ?VERSION_0:16, + 1:32, + ?RESPONSE_CODE_OK:16, + CloseReasonSize:16, + CloseReason/binary>>, CloseFrameSize = byte_size(CloseFrame), - gen_tcp:send(S, - <<CloseFrameSize:32, CloseFrame/binary>>), - {ok, - <<10:32, (?COMMAND_CLOSE):16, (?VERSION_0):16, 1:32, - (?RESPONSE_CODE_OK):16>>} = + gen_tcp:send(S, <<CloseFrameSize:32, CloseFrame/binary>>), + {ok, <<10:32, ?COMMAND_CLOSE:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000). -wait_for_socket_close(_S, 0) -> not_closed; +wait_for_socket_close(_S, 0) -> + not_closed; wait_for_socket_close(S, Attempt) -> case gen_tcp:recv(S, 0, 1000) of {error, timeout} -> wait_for_socket_close(S, Attempt - 1); - {error, closed} -> closed + {error, closed} -> + closed end. read_frame(S, Buffer) -> @@ -328,8 +376,12 @@ read_frame(S, Buffer) -> {tcp, S, Received} -> Data = <<Buffer/binary, Received/binary>>, case Data of - <<Size:32, _Body:Size/binary>> -> Data; - _ -> read_frame(S, Data) + <<Size:32, _Body:Size/binary>> -> + Data; + _ -> + read_frame(S, Data) end - after 1000 -> inet:setopts(S, [{active, false}]), Buffer + after 1000 -> + inet:setopts(S, [{active, false}]), + Buffer end. |