summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-12-03 11:42:49 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-12-03 11:42:49 +0100
commitd68b4c37ca30231dcec20e36a6ffca1e57a7e320 (patch)
treece98b1d4a6f64d90f87ac10a48691a47446a0bb2
parent5d8eccbcf439b18a68837a5b174767a22379a932 (diff)
downloadrabbitmq-server-git-stream-plugin-formatting.tar.gz
Format stream plugin code with default formatterstream-plugin-formatting
-rw-r--r--deps/rabbitmq_stream/rebar.config2
-rw-r--r--deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl78
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream.erl111
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_connection_sup.erl44
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl341
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl2782
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_sup.erl61
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_utils.erl199
-rw-r--r--deps/rabbitmq_stream/test/command_SUITE.erl91
-rw-r--r--deps/rabbitmq_stream/test/config_schema_SUITE.erl27
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl354
11 files changed, 1763 insertions, 2327 deletions
diff --git a/deps/rabbitmq_stream/rebar.config b/deps/rabbitmq_stream/rebar.config
index d911323332..e5b44cb4eb 100644
--- a/deps/rabbitmq_stream/rebar.config
+++ b/deps/rabbitmq_stream/rebar.config
@@ -2,5 +2,5 @@
{format, [
{files, ["src/*.erl", "test/*.erl"]},
- {formatter, otp_formatter}
+ {formatter, default_formatter}
]}.
diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl
index 7782e9757e..b3c8ec6031 100644
--- a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl
+++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl
@@ -19,42 +19,34 @@
-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
--export([formatter/0,
- scopes/0,
- switches/0,
- aliases/0,
- usage/0,
- usage_additional/0,
- usage_doc_guides/0,
- banner/2,
- validate/2,
- merge_defaults/2,
- run/2,
- output/2,
- description/0,
- help_section/0]).
+-export([formatter/0, scopes/0, switches/0, aliases/0, usage/0, usage_additional/0,
+ usage_doc_guides/0, banner/2, validate/2, merge_defaults/2, run/2, output/2,
+ description/0, help_section/0]).
-formatter() -> 'Elixir.RabbitMQ.CLI.Formatters.Table'.
+formatter() ->
+ 'Elixir.RabbitMQ.CLI.Formatters.Table'.
-scopes() -> [ctl, diagnostics, streams].
+scopes() ->
+ [ctl, diagnostics, streams].
-switches() -> [{verbose, boolean}].
+switches() ->
+ [{verbose, boolean}].
-aliases() -> [{'V', verbose}].
+aliases() ->
+ [{'V', verbose}].
description() ->
- <<"Lists stream connections on the target "
- "node">>.
+ <<"Lists stream connections on the target node">>.
-help_section() -> {plugin, stream}.
+help_section() ->
+ {plugin, stream}.
validate(Args, _) ->
- case
- 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':validate_info_keys(Args,
- ?INFO_ITEMS)
- of
- {ok, _} -> ok;
- Error -> Error
+ case 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':validate_info_keys(Args, ?INFO_ITEMS) of
+ {ok, _} ->
+ ok;
+ Error ->
+ Error
end.
merge_defaults([], Opts) ->
@@ -62,26 +54,31 @@ merge_defaults([], Opts) ->
merge_defaults(Args, Opts) ->
{Args, maps:merge(#{verbose => false}, Opts)}.
-usage() -> <<"list_stream_connections [<column> ...]">>.
+usage() ->
+ <<"list_stream_connections [<column> ...]">>.
usage_additional() ->
Prefix = <<" must be one of ">>,
- InfoItems = 'Elixir.Enum':join(lists:usort(?INFO_ITEMS),
- <<", ">>),
+ InfoItems =
+ 'Elixir.Enum':join(
+ lists:usort(?INFO_ITEMS), <<", ">>),
[{<<"<column>">>, <<Prefix/binary, InfoItems/binary>>}].
-usage_doc_guides() -> [?STREAM_GUIDE_URL].
+usage_doc_guides() ->
+ [?STREAM_GUIDE_URL].
run(Args,
- #{node := NodeName, timeout := Timeout,
+ #{node := NodeName,
+ timeout := Timeout,
verbose := Verbose}) ->
- InfoKeys = case Verbose of
- true -> ?INFO_ITEMS;
- false ->
- 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':prepare_info_keys(Args)
- end,
- Nodes =
- 'Elixir.RabbitMQ.CLI.Core.Helpers':nodes_in_cluster(NodeName),
+ InfoKeys =
+ case Verbose of
+ true ->
+ ?INFO_ITEMS;
+ false ->
+ 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':prepare_info_keys(Args)
+ end,
+ Nodes = 'Elixir.RabbitMQ.CLI.Core.Helpers':nodes_in_cluster(NodeName),
'Elixir.RabbitMQ.CLI.Ctl.RpcStream':receive_list_items(NodeName,
rabbit_stream,
emit_connection_info_all,
@@ -90,7 +87,8 @@ run(Args,
InfoKeys,
length(Nodes)).
-banner(_, _) -> <<"Listing stream connections ...">>.
+banner(_, _) ->
+ <<"Listing stream connections ...">>.
output(Result, _Opts) ->
'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result).
diff --git a/deps/rabbitmq_stream/src/rabbit_stream.erl b/deps/rabbitmq_stream/src/rabbit_stream.erl
index 96e6c02eba..b3c76d603b 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream.erl
@@ -19,101 +19,88 @@
-behaviour(application).
-export([start/2, host/0, port/0, kill_connection/1]).
-
-export([stop/1]).
-
--export([emit_connection_info_local/3,
- emit_connection_info_all/4,
- list/0]).
+-export([emit_connection_info_local/3, emit_connection_info_all/4, list/0]).
-include_lib("rabbit_common/include/rabbit.hrl").
-start(_Type, _Args) -> rabbit_stream_sup:start_link().
+start(_Type, _Args) ->
+ rabbit_stream_sup:start_link().
host() ->
- case application:get_env(rabbitmq_stream,
- advertised_host,
- undefined)
- of
- undefined -> hostname_from_node();
- Host -> rabbit_data_coercion:to_binary(Host)
+ case application:get_env(rabbitmq_stream, advertised_host, undefined) of
+ undefined ->
+ hostname_from_node();
+ Host ->
+ rabbit_data_coercion:to_binary(Host)
end.
hostname_from_node() ->
- case re:split(rabbit_data_coercion:to_binary(node()),
- "@",
- [{return, binary}, {parts, 2}])
- of
- [_, Hostname] -> Hostname;
+ case re:split(
+ rabbit_data_coercion:to_binary(node()), "@", [{return, binary}, {parts, 2}])
+ of
+ [_, Hostname] ->
+ Hostname;
[_] ->
- rabbit_data_coercion:to_binary(inet:gethostname())
+ rabbit_data_coercion:to_binary(
+ inet:gethostname())
end.
port() ->
- case application:get_env(rabbitmq_stream,
- advertised_port,
- undefined)
- of
- undefined -> port_from_listener();
- Port -> Port
+ case application:get_env(rabbitmq_stream, advertised_port, undefined) of
+ undefined ->
+ port_from_listener();
+ Port ->
+ Port
end.
port_from_listener() ->
Listeners = rabbit_networking:node_listeners(node()),
- Port = lists:foldl(fun (#listener{port = Port,
- protocol = stream},
- _Acc) ->
- Port;
- (_, Acc) -> Acc
- end,
- undefined,
- Listeners),
+ Port =
+ lists:foldl(fun (#listener{port = Port, protocol = stream}, _Acc) ->
+ Port;
+ (_, Acc) ->
+ Acc
+ end,
+ undefined,
+ Listeners),
Port.
-stop(_State) -> ok.
+stop(_State) ->
+ ok.
kill_connection(ConnectionName) ->
- ConnectionNameBin =
- rabbit_data_coercion:to_binary(ConnectionName),
- lists:foreach(fun (ConnectionPid) ->
- ConnectionPid ! {infos, self()},
- receive
- {ConnectionPid,
- #{<<"connection_name">> := ConnectionNameBin}} ->
- exit(ConnectionPid, kill);
- {ConnectionPid, _ClientProperties} -> ok
- after 1000 -> ok
- end
+ ConnectionNameBin = rabbit_data_coercion:to_binary(ConnectionName),
+ lists:foreach(fun(ConnectionPid) ->
+ ConnectionPid ! {infos, self()},
+ receive
+ {ConnectionPid, #{<<"connection_name">> := ConnectionNameBin}} ->
+ exit(ConnectionPid, kill);
+ {ConnectionPid, _ClientProperties} -> ok
+ after 1000 -> ok
+ end
end,
pg_local:get_members(rabbit_stream_connections)).
-emit_connection_info_all(Nodes, Items, Ref,
- AggregatorPid) ->
- Pids = [spawn_link(Node,
- rabbit_stream,
- emit_connection_info_local,
- [Items, Ref, AggregatorPid])
- || Node <- Nodes],
+emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
+ Pids =
+ [spawn_link(Node, rabbit_stream, emit_connection_info_local, [Items, Ref, AggregatorPid])
+ || Node <- Nodes],
rabbit_control_misc:await_emitters_termination(Pids),
ok.
emit_connection_info_local(Items, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map_with_exit_handler(AggregatorPid,
Ref,
- fun (Pid) ->
- rabbit_stream_reader:info(Pid,
- Items)
+ fun(Pid) ->
+ rabbit_stream_reader:info(Pid, Items)
end,
list()).
list() ->
[Client
- || {_, ListSupPid, _, _}
- <- supervisor2:which_children(rabbit_stream_sup),
- {_, RanchSup, supervisor, _}
- <- supervisor2:which_children(ListSupPid),
- {ranch_conns_sup, ConnSup, _, _}
- <- supervisor:which_children(RanchSup),
+ || {_, ListSupPid, _, _} <- supervisor2:which_children(rabbit_stream_sup),
+ {_, RanchSup, supervisor, _} <- supervisor2:which_children(ListSupPid),
+ {ranch_conns_sup, ConnSup, _, _} <- supervisor:which_children(RanchSup),
{_, CliSup, _, _} <- supervisor:which_children(ConnSup),
- {rabbit_stream_reader, Client, _, _}
- <- supervisor:which_children(CliSup)].
+ {rabbit_stream_reader, Client, _, _} <- supervisor:which_children(CliSup)].
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_connection_sup.erl b/deps/rabbitmq_stream/src/rabbit_stream_connection_sup.erl
index d1b13c337d..5d1a2d0968 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_connection_sup.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_connection_sup.erl
@@ -17,38 +17,33 @@
-module(rabbit_stream_connection_sup).
-behaviour(supervisor2).
-
-behaviour(ranch_protocol).
-include_lib("rabbit_common/include/rabbit.hrl").
-export([start_link/4, start_keepalive_link/0]).
-
-export([init/1]).
start_link(Ref, _Sock, Transport, Opts) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
- {ok, KeepaliveSup} = supervisor2:start_child(SupPid,
- {rabbit_stream_keepalive_sup,
- {rabbit_stream_connection_sup,
- start_keepalive_link,
- []},
- intrinsic,
- infinity,
- supervisor,
- [rabbit_keepalive_sup]}),
- {ok, ReaderPid} = supervisor2:start_child(SupPid,
- {rabbit_stream_reader,
- {rabbit_stream_reader,
- start_link,
- [KeepaliveSup,
- Transport,
- Ref,
- Opts]},
- intrinsic,
- ?WORKER_WAIT,
- worker,
- [rabbit_stream_reader]}),
+ {ok, KeepaliveSup} =
+ supervisor2:start_child(SupPid,
+ {rabbit_stream_keepalive_sup,
+ {rabbit_stream_connection_sup, start_keepalive_link, []},
+ intrinsic,
+ infinity,
+ supervisor,
+ [rabbit_keepalive_sup]}),
+ {ok, ReaderPid} =
+ supervisor2:start_child(SupPid,
+ {rabbit_stream_reader,
+ {rabbit_stream_reader,
+ start_link,
+ [KeepaliveSup, Transport, Ref, Opts]},
+ intrinsic,
+ ?WORKER_WAIT,
+ worker,
+ [rabbit_stream_reader]}),
{ok, SupPid, ReaderPid}.
start_keepalive_link() ->
@@ -56,4 +51,5 @@ start_keepalive_link() ->
%%----------------------------------------------------------------------------
-init([]) -> {ok, {{one_for_all, 0, 1}, []}}.
+init([]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index 3a4ed00271..2a215d56b2 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -21,150 +21,92 @@
-include_lib("rabbit_common/include/rabbit.hrl").
%% API
--export([init/1,
- handle_call/3,
- handle_cast/2,
- handle_info/2]).
-
--export([start_link/1,
- create/4,
- delete/3,
- lookup_leader/2,
- lookup_local_member/2,
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
+-export([start_link/1, create/4, delete/3, lookup_leader/2, lookup_local_member/2,
topology/2]).
-record(state, {configuration}).
start_link(Conf) ->
- gen_server:start_link({local, ?MODULE},
- ?MODULE,
- [Conf],
- []).
-
-init([Conf]) -> {ok, #state{configuration = Conf}}.
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [Conf], []).
--spec create(binary(), binary(),
- #{binary() => binary()}, binary()) -> {ok, map()} |
- {error,
- reference_already_exists} |
- {error, internal_error}.
+init([Conf]) ->
+ {ok, #state{configuration = Conf}}.
+-spec create(binary(), binary(), #{binary() => binary()}, binary()) ->
+ {ok, map()} | {error, reference_already_exists} | {error, internal_error}.
create(VirtualHost, Reference, Arguments, Username) ->
- gen_server:call(?MODULE,
- {create, VirtualHost, Reference, Arguments, Username}).
-
--spec delete(binary(), binary(), binary()) -> {ok,
- deleted} |
- {error, reference_not_found}.
+ gen_server:call(?MODULE, {create, VirtualHost, Reference, Arguments, Username}).
+-spec delete(binary(), binary(), binary()) ->
+ {ok, deleted} | {error, reference_not_found}.
delete(VirtualHost, Reference, Username) ->
- gen_server:call(?MODULE,
- {delete, VirtualHost, Reference, Username}).
-
--spec lookup_leader(binary(), binary()) -> pid() |
- cluster_not_found.
+ gen_server:call(?MODULE, {delete, VirtualHost, Reference, Username}).
+-spec lookup_leader(binary(), binary()) -> pid() | cluster_not_found.
lookup_leader(VirtualHost, Stream) ->
- gen_server:call(?MODULE,
- {lookup_leader, VirtualHost, Stream}).
-
--spec lookup_local_member(binary(), binary()) -> {ok,
- pid()} |
- {error, not_found}.
+ gen_server:call(?MODULE, {lookup_leader, VirtualHost, Stream}).
+-spec lookup_local_member(binary(), binary()) -> {ok, pid()} | {error, not_found}.
lookup_local_member(VirtualHost, Stream) ->
- gen_server:call(?MODULE,
- {lookup_local_member, VirtualHost, Stream}).
-
--spec topology(binary(), binary()) -> {ok,
- #{leader_node => pid(),
- replica_nodes => [pid()]}} |
- {error, stream_not_found}.
+ gen_server:call(?MODULE, {lookup_local_member, VirtualHost, Stream}).
+-spec topology(binary(), binary()) ->
+ {ok, #{leader_node => pid(), replica_nodes => [pid()]}} |
+ {error, stream_not_found}.
topology(VirtualHost, Stream) ->
- gen_server:call(?MODULE,
- {topology, VirtualHost, Stream}).
+ gen_server:call(?MODULE, {topology, VirtualHost, Stream}).
stream_queue_arguments(Arguments) ->
- stream_queue_arguments([{<<"x-queue-type">>,
- longstr,
- <<"stream">>}],
- Arguments).
+ stream_queue_arguments([{<<"x-queue-type">>, longstr, <<"stream">>}], Arguments).
-stream_queue_arguments(ArgumentsAcc, Arguments)
- when map_size(Arguments) =:= 0 ->
+stream_queue_arguments(ArgumentsAcc, Arguments) when map_size(Arguments) =:= 0 ->
ArgumentsAcc;
-stream_queue_arguments(ArgumentsAcc,
- #{<<"max-length-bytes">> := Value} = Arguments) ->
- stream_queue_arguments([{<<"x-max-length-bytes">>,
- long,
- binary_to_integer(Value)}]
- ++ ArgumentsAcc,
+stream_queue_arguments(ArgumentsAcc, #{<<"max-length-bytes">> := Value} = Arguments) ->
+ stream_queue_arguments([{<<"x-max-length-bytes">>, long, binary_to_integer(Value)}]
+ ++ ArgumentsAcc,
maps:remove(<<"max-length-bytes">>, Arguments));
-stream_queue_arguments(ArgumentsAcc,
- #{<<"max-age">> := Value} = Arguments) ->
- stream_queue_arguments([{<<"x-max-age">>,
- longstr,
- Value}]
- ++ ArgumentsAcc,
+stream_queue_arguments(ArgumentsAcc, #{<<"max-age">> := Value} = Arguments) ->
+ stream_queue_arguments([{<<"x-max-age">>, longstr, Value}] ++ ArgumentsAcc,
maps:remove(<<"max-age">>, Arguments));
-stream_queue_arguments(ArgumentsAcc,
- #{<<"max-segment-size">> := Value} = Arguments) ->
- stream_queue_arguments([{<<"x-max-segment-size">>,
- long,
- binary_to_integer(Value)}]
- ++ ArgumentsAcc,
+stream_queue_arguments(ArgumentsAcc, #{<<"max-segment-size">> := Value} = Arguments) ->
+ stream_queue_arguments([{<<"x-max-segment-size">>, long, binary_to_integer(Value)}]
+ ++ ArgumentsAcc,
maps:remove(<<"max-segment-size">>, Arguments));
stream_queue_arguments(ArgumentsAcc,
#{<<"initial-cluster-size">> := Value} = Arguments) ->
- stream_queue_arguments([{<<"x-initial-cluster-size">>,
- long,
- binary_to_integer(Value)}]
- ++ ArgumentsAcc,
+ stream_queue_arguments([{<<"x-initial-cluster-size">>, long, binary_to_integer(Value)}]
+ ++ ArgumentsAcc,
maps:remove(<<"initial-cluster-size">>, Arguments));
stream_queue_arguments(ArgumentsAcc,
#{<<"queue-leader-locator">> := Value} = Arguments) ->
- stream_queue_arguments([{<<"x-queue-leader-locator">>,
- longstr,
- Value}]
- ++ ArgumentsAcc,
+ stream_queue_arguments([{<<"x-queue-leader-locator">>, longstr, Value}] ++ ArgumentsAcc,
maps:remove(<<"queue-leader-locator">>, Arguments));
stream_queue_arguments(ArgumentsAcc, _Arguments) ->
ArgumentsAcc.
-validate_stream_queue_arguments([]) -> ok;
-validate_stream_queue_arguments([{<<"x-initial-cluster-size">>,
- long,
- ClusterSize}
- | _])
+validate_stream_queue_arguments([]) ->
+ ok;
+validate_stream_queue_arguments([{<<"x-initial-cluster-size">>, long, ClusterSize} | _])
when ClusterSize =< 0 ->
error;
-validate_stream_queue_arguments([{<<"x-queue-leader-locator">>,
- longstr,
- Locator}
- | T]) ->
- case lists:member(Locator,
- [<<"client-local">>, <<"random">>, <<"least-leaders">>])
- of
- true -> validate_stream_queue_arguments(T);
- false -> error
+validate_stream_queue_arguments([{<<"x-queue-leader-locator">>, longstr, Locator} | T]) ->
+ case lists:member(Locator, [<<"client-local">>, <<"random">>, <<"least-leaders">>]) of
+ true ->
+ validate_stream_queue_arguments(T);
+ false ->
+ error
end;
validate_stream_queue_arguments([_ | T]) ->
validate_stream_queue_arguments(T).
-handle_call({create,
- VirtualHost,
- Reference,
- Arguments,
- Username},
- _From, State) ->
- Name = #resource{virtual_host = VirtualHost,
- kind = queue, name = Reference},
- StreamQueueArguments =
- stream_queue_arguments(Arguments),
- case
- validate_stream_queue_arguments(StreamQueueArguments)
- of
+handle_call({create, VirtualHost, Reference, Arguments, Username}, _From, State) ->
+ Name =
+ #resource{virtual_host = VirtualHost,
+ kind = queue,
+ name = Reference},
+ StreamQueueArguments = stream_queue_arguments(Arguments),
+ case validate_stream_queue_arguments(StreamQueueArguments) of
ok ->
Q0 = amqqueue:new(Name,
none,
@@ -175,7 +117,8 @@ handle_call({create,
VirtualHost,
#{user => Username},
rabbit_stream_queue),
- try case rabbit_stream_queue:declare(Q0, node()) of
+ try
+ case rabbit_stream_queue:declare(Q0, node()) of
{new, Q} ->
{reply, {ok, amqqueue:get_type_state(Q)}, State};
{existing, _} ->
@@ -187,151 +130,147 @@ handle_call({create,
end
catch
exit:Error ->
- rabbit_log:info("Error while creating ~p stream, ~p~n",
- [Reference, Error]),
+ rabbit_log:info("Error while creating ~p stream, ~p~n", [Reference, Error]),
{reply, {error, internal_error}, State}
end;
- error -> {reply, {error, validation_failed}, State}
+ error ->
+ {reply, {error, validation_failed}, State}
end;
-handle_call({delete, VirtualHost, Reference, Username},
- _From, State) ->
- Name = #resource{virtual_host = VirtualHost,
- kind = queue, name = Reference},
- rabbit_log:debug("Trying to delete stream ~p~n",
- [Reference]),
+handle_call({delete, VirtualHost, Reference, Username}, _From, State) ->
+ Name =
+ #resource{virtual_host = VirtualHost,
+ kind = queue,
+ name = Reference},
+ rabbit_log:debug("Trying to delete stream ~p~n", [Reference]),
case rabbit_amqqueue:lookup(Name) of
{ok, Q} ->
- rabbit_log:debug("Found queue record ~p, checking if it "
- "is a stream~n",
- [Reference]),
+ rabbit_log:debug("Found queue record ~p, checking if it is a stream~n", [Reference]),
case is_stream_queue(Q) of
true ->
- rabbit_log:debug("Queue record ~p is a stream, trying "
- "to delete it~n",
+ rabbit_log:debug("Queue record ~p is a stream, trying to delete it~n",
[Reference]),
- {ok, _} = rabbit_stream_queue:delete(Q,
- false,
- false,
- Username),
+ {ok, _} = rabbit_stream_queue:delete(Q, false, false, Username),
rabbit_log:debug("Stream ~p deleted~n", [Reference]),
{reply, {ok, deleted}, State};
_ ->
- rabbit_log:debug("Queue record ~p is NOT a stream, returning "
- "error~n",
+ rabbit_log:debug("Queue record ~p is NOT a stream, returning error~n",
[Reference]),
{reply, {error, reference_not_found}, State}
end;
{error, not_found} ->
- rabbit_log:debug("Stream ~p not found, cannot delete it~n",
- [Reference]),
+ rabbit_log:debug("Stream ~p not found, cannot delete it~n", [Reference]),
{reply, {error, reference_not_found}, State}
end;
-handle_call({lookup_leader, VirtualHost, Stream}, _From,
- State) ->
- Name = #resource{virtual_host = VirtualHost,
- kind = queue, name = Stream},
+handle_call({lookup_leader, VirtualHost, Stream}, _From, State) ->
+ Name =
+ #resource{virtual_host = VirtualHost,
+ kind = queue,
+ name = Stream},
Res = case rabbit_amqqueue:lookup(Name) of
{ok, Q} ->
case is_stream_queue(Q) of
true ->
- #{leader_pid := LeaderPid} =
- amqqueue:get_type_state(Q),
+ #{leader_pid := LeaderPid} = amqqueue:get_type_state(Q),
% FIXME check if pid is alive in case of stale information
LeaderPid;
- _ -> cluster_not_found
+ _ ->
+ cluster_not_found
end;
- _ -> cluster_not_found
+ _ ->
+ cluster_not_found
end,
{reply, Res, State};
-handle_call({lookup_local_member, VirtualHost, Stream},
- _From, State) ->
- Name = #resource{virtual_host = VirtualHost,
- kind = queue, name = Stream},
+handle_call({lookup_local_member, VirtualHost, Stream}, _From, State) ->
+ Name =
+ #resource{virtual_host = VirtualHost,
+ kind = queue,
+ name = Stream},
Res = case rabbit_amqqueue:lookup(Name) of
{ok, Q} ->
case is_stream_queue(Q) of
true ->
- #{leader_pid := LeaderPid,
- replica_pids := ReplicaPids} =
+ #{leader_pid := LeaderPid, replica_pids := ReplicaPids} =
amqqueue:get_type_state(Q),
- LocalMember = lists:foldl(fun (Pid, Acc) ->
- case node(Pid) =:=
- node()
- of
- true -> Pid;
- false -> Acc
- end
- end,
- undefined,
- [LeaderPid] ++ ReplicaPids),
+ LocalMember =
+ lists:foldl(fun(Pid, Acc) ->
+ case node(Pid) =:= node() of
+ true -> Pid;
+ false -> Acc
+ end
+ end,
+ undefined,
+ [LeaderPid] ++ ReplicaPids),
% FIXME check if pid is alive in case of stale information
case LocalMember of
- undefined -> {error, not_available};
- Pid -> {ok, Pid}
+ undefined ->
+ {error, not_available};
+ Pid ->
+ {ok, Pid}
end;
- _ -> {error, not_found}
+ _ ->
+ {error, not_found}
end;
{error, not_found} ->
case rabbit_amqqueue:not_found_or_absent_dirty(Name) of
- not_found -> {error, not_found};
- _ -> {error, not_available}
+ not_found ->
+ {error, not_found};
+ _ ->
+ {error, not_available}
end;
- _ -> {error, not_found}
+ _ ->
+ {error, not_found}
end,
{reply, Res, State};
-handle_call({topology, VirtualHost, Stream}, _From,
- State) ->
- Name = #resource{virtual_host = VirtualHost,
- kind = queue, name = Stream},
+handle_call({topology, VirtualHost, Stream}, _From, State) ->
+ Name =
+ #resource{virtual_host = VirtualHost,
+ kind = queue,
+ name = Stream},
Res = case rabbit_amqqueue:lookup(Name) of
{ok, Q} ->
case is_stream_queue(Q) of
true ->
QState = amqqueue:get_type_state(Q),
- ProcessAliveFun = fun (Pid) ->
- rpc:call(node(Pid),
- erlang,
- is_process_alive,
- [Pid],
- 10000)
- end,
- LeaderNode = case ProcessAliveFun(maps:get(leader_pid,
- QState))
- of
- true ->
- maps:get(leader_node, QState);
- _ -> undefined
- end,
- ReplicaNodes = lists:foldl(fun (Pid, Acc) ->
- case
- ProcessAliveFun(Pid)
- of
- true ->
- Acc ++
- [node(Pid)];
- _ -> Acc
- end
- end,
- [],
- maps:get(replica_pids,
- QState)),
- {ok,
- #{leader_node => LeaderNode,
- replica_nodes => ReplicaNodes}};
- _ -> {error, stream_not_found}
+ ProcessAliveFun =
+ fun(Pid) ->
+ rpc:call(node(Pid), erlang, is_process_alive, [Pid], 10000)
+ end,
+ LeaderNode =
+ case ProcessAliveFun(maps:get(leader_pid, QState)) of
+ true ->
+ maps:get(leader_node, QState);
+ _ ->
+ undefined
+ end,
+ ReplicaNodes =
+ lists:foldl(fun(Pid, Acc) ->
+ case ProcessAliveFun(Pid) of
+ true -> Acc ++ [node(Pid)];
+ _ -> Acc
+ end
+ end,
+ [],
+ maps:get(replica_pids, QState)),
+ {ok, #{leader_node => LeaderNode, replica_nodes => ReplicaNodes}};
+ _ ->
+ {error, stream_not_found}
end;
{error, not_found} ->
case rabbit_amqqueue:not_found_or_absent_dirty(Name) of
- not_found -> {error, stream_not_found};
- _ -> {error, stream_not_available}
+ not_found ->
+ {error, stream_not_found};
+ _ ->
+ {error, stream_not_available}
end;
- _ -> {error, stream_not_found}
+ _ ->
+ {error, stream_not_found}
end,
{reply, Res, State};
handle_call(which_children, _From, State) ->
{reply, [], State}.
-handle_cast(_, State) -> {noreply, State}.
+handle_cast(_, State) ->
+ {noreply, State}.
handle_info(Info, State) ->
rabbit_log:info("Received info ~p~n", [Info]),
@@ -339,6 +278,8 @@ handle_info(Info, State) ->
is_stream_queue(Q) ->
case amqqueue:get_type(Q) of
- rabbit_stream_queue -> true;
- _ -> false
+ rabbit_stream_queue ->
+ true;
+ _ ->
+ false
end.
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index c1535c4b7b..31f7af92df 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -21,11 +21,8 @@
-include("rabbit_stream.hrl").
-type stream() :: binary().
-
-type publisher_id() :: byte().
-
-type publisher_reference() :: binary().
-
-type subscription_id() :: byte().
-record(publisher,
@@ -33,7 +30,6 @@
stream :: stream(),
reference :: undefined | publisher_reference(),
leader :: pid()}).
-
-record(consumer,
{socket ::
rabbit_net:socket(), %% ranch_transport:socket(),
@@ -43,12 +39,10 @@
segment :: osiris_log:state(),
credit :: integer(),
stream :: stream()}).
-
-record(stream_connection_state,
{data :: none | binary(),
blocked :: boolean(),
consumers :: #{subscription_id() => #consumer{}}}).
-
-record(stream_connection,
{name :: string(),
%% server host
@@ -66,11 +60,9 @@
publishers ::
#{publisher_id() =>
#publisher{}}, %% FIXME replace with a list (0-255 lookup faster?)
- publisher_to_ids ::
- #{{stream(), publisher_reference()} => publisher_id()},
+ publisher_to_ids :: #{{stream(), publisher_reference()} => publisher_id()},
stream_leaders :: #{stream() => pid()},
- stream_subscriptions ::
- #{stream() => [subscription_id()]},
+ stream_subscriptions :: #{stream() => [subscription_id()]},
credits :: atomics:atomics_ref(),
authentication_state :: atom(),
user :: undefined | #user{},
@@ -84,7 +76,6 @@
monitors = #{} :: #{reference() => stream()},
stats_timer :: reference(),
send_file_oct :: atomics:atomics_ref()}).
-
-record(configuration,
{initial_credits :: integer(),
credits_required_for_unblocking :: integer(),
@@ -93,7 +84,6 @@
-define(RESPONSE_FRAME_SIZE,
10). % 2 (key) + 2 (version) + 4 (correlation ID) + 2 (response code)
-
-define(CREATION_EVENT_KEYS,
[pid,
name,
@@ -120,19 +110,9 @@
connected_at,
node,
user_who_performed_action]).
-
--define(SIMPLE_METRICS,
- [pid, recv_oct, send_oct, reductions]).
-
+-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]).
-define(OTHER_METRICS,
- [recv_cnt,
- send_cnt,
- send_pend,
- state,
- channels,
- garbage_collection,
- timeout]).
-
+ [recv_cnt, send_cnt, send_pend, state, channels, garbage_collection, timeout]).
-define(AUTH_NOTIFICATION_INFO_KEYS,
[host,
name,
@@ -151,23 +131,20 @@
-export([start_link/4, init/1, info/2]).
start_link(KeepaliveSup, Transport, Ref, Opts) ->
- Pid = proc_lib:spawn_link(?MODULE,
- init,
- [[KeepaliveSup, Transport, Ref, Opts]]),
+ Pid = proc_lib:spawn_link(?MODULE, init, [[KeepaliveSup, Transport, Ref, Opts]]),
{ok, Pid}.
init([KeepaliveSup,
Transport,
Ref,
#{initial_credits := InitialCredits,
- credits_required_for_unblocking :=
- CreditsRequiredBeforeUnblocking,
- frame_max := FrameMax, heartbeat := Heartbeat}]) ->
+ credits_required_for_unblocking := CreditsRequiredBeforeUnblocking,
+ frame_max := FrameMax,
+ heartbeat := Heartbeat}]) ->
process_flag(trap_exit, true),
- {ok, Sock} = rabbit_networking:handshake(Ref,
- application:get_env(rabbitmq_stream,
- proxy_protocol,
- false)),
+ {ok, Sock} =
+ rabbit_networking:handshake(Ref,
+ application:get_env(rabbitmq_stream, proxy_protocol, false)),
RealSocket = rabbit_net:unwrap_socket(Sock),
case rabbit_net:connection_string(Sock, inbound) of
{ok, ConnStr} ->
@@ -175,55 +152,52 @@ init([KeepaliveSup,
SendFileOct = atomics:new(1, [{signed, false}]),
atomics:put(SendFileOct, 1, 0),
init_credit(Credits, InitialCredits),
- {PeerHost, PeerPort, Host, Port} = socket_op(Sock,
- fun (S) ->
- rabbit_net:socket_ends(S,
- inbound)
- end),
- Connection = #stream_connection{name =
- rabbit_data_coercion:to_binary(ConnStr),
- host = Host, peer_host = PeerHost,
- port = Port, peer_port = PeerPort,
- connected_at =
- os:system_time(milli_seconds),
- auth_mechanism = none,
- helper_sup = KeepaliveSup,
- socket = RealSocket,
- publishers = #{},
- publisher_to_ids = #{},
- stream_leaders = #{},
- stream_subscriptions = #{},
- credits = Credits,
- authentication_state = none,
- connection_step = tcp_connected,
- frame_max = FrameMax,
- send_file_oct = SendFileOct},
- State = #stream_connection_state{consumers = #{},
- blocked = false, data = none},
+ {PeerHost, PeerPort, Host, Port} =
+ socket_op(Sock, fun(S) -> rabbit_net:socket_ends(S, inbound) end),
+ Connection =
+ #stream_connection{name = rabbit_data_coercion:to_binary(ConnStr),
+ host = Host,
+ peer_host = PeerHost,
+ port = Port,
+ peer_port = PeerPort,
+ connected_at = os:system_time(milli_seconds),
+ auth_mechanism = none,
+ helper_sup = KeepaliveSup,
+ socket = RealSocket,
+ publishers = #{},
+ publisher_to_ids = #{},
+ stream_leaders = #{},
+ stream_subscriptions = #{},
+ credits = Credits,
+ authentication_state = none,
+ connection_step = tcp_connected,
+ frame_max = FrameMax,
+ send_file_oct = SendFileOct},
+ State =
+ #stream_connection_state{consumers = #{},
+ blocked = false,
+ data = none},
Transport:setopts(RealSocket, [{active, once}]),
listen_loop_pre_auth(Transport,
Connection,
State,
- #configuration{initial_credits =
- InitialCredits,
- credits_required_for_unblocking
- =
+ #configuration{initial_credits = InitialCredits,
+ credits_required_for_unblocking =
CreditsRequiredBeforeUnblocking,
frame_max = FrameMax,
heartbeat = Heartbeat});
{Error, Reason} ->
rabbit_net:fast_close(RealSocket),
- rabbit_log:warning("Closing connection because of ~p ~p~n",
- [Error, Reason])
+ rabbit_log:warning("Closing connection because of ~p ~p~n", [Error, Reason])
end.
socket_op(Sock, Fun) ->
RealSocket = rabbit_net:unwrap_socket(Sock),
case Fun(Sock) of
- {ok, Res} -> Res;
+ {ok, Res} ->
+ Res;
{error, Reason} ->
- rabbit_log:warning("Error during socket operation ~p~n",
- [Reason]),
+ rabbit_log:warning("Error during socket operation ~p~n", [Reason]),
rabbit_net:fast_close(RealSocket),
exit(normal)
end.
@@ -240,41 +214,29 @@ add_credits(CreditReference, Credits) ->
has_credits(CreditReference) ->
atomics:get(CreditReference, 1) > 0.
-has_enough_credits_to_unblock(CreditReference,
- CreditsRequiredForUnblocking) ->
- atomics:get(CreditReference, 1) >
- CreditsRequiredForUnblocking.
+has_enough_credits_to_unblock(CreditReference, CreditsRequiredForUnblocking) ->
+ atomics:get(CreditReference, 1) > CreditsRequiredForUnblocking.
listen_loop_pre_auth(Transport,
- #stream_connection{socket = S} = Connection, State,
- #configuration{frame_max = FrameMax,
- heartbeat = Heartbeat} =
- Configuration) ->
+ #stream_connection{socket = S} = Connection,
+ State,
+ #configuration{frame_max = FrameMax, heartbeat = Heartbeat} = Configuration) ->
{OK, Closed, Error} = Transport:messages(),
%% FIXME introduce timeout to complete the connection opening (after block should be enough)
receive
{OK, S, Data} ->
- #stream_connection{connection_step = ConnectionStep0} =
- Connection,
+ #stream_connection{connection_step = ConnectionStep0} = Connection,
{Connection1, State1} =
- handle_inbound_data_pre_auth(Transport,
- Connection,
- State,
- Data),
+ handle_inbound_data_pre_auth(Transport, Connection, State, Data),
Transport:setopts(S, [{active, once}]),
- #stream_connection{connection_step = ConnectionStep} =
- Connection1,
- rabbit_log:info("Transitioned from ~p to ~p~n",
- [ConnectionStep0, ConnectionStep]),
+ #stream_connection{connection_step = ConnectionStep} = Connection1,
+ rabbit_log:info("Transitioned from ~p to ~p~n", [ConnectionStep0, ConnectionStep]),
case ConnectionStep of
authenticated ->
- TuneFrame = <<(?COMMAND_TUNE):16, (?VERSION_0):16,
- FrameMax:32, Heartbeat:32>>,
+ TuneFrame = <<?COMMAND_TUNE:16, ?VERSION_0:16, FrameMax:32, Heartbeat:32>>,
frame(Transport, Connection1, TuneFrame),
listen_loop_pre_auth(Transport,
- Connection1#stream_connection{connection_step
- =
- tuning},
+ Connection1#stream_connection{connection_step = tuning},
State1,
Configuration);
opened ->
@@ -282,8 +244,8 @@ listen_loop_pre_auth(Transport,
% just meant to be able to close the connection remotely
% should be possible once the connections are available in ctl list_connections
pg_local:join(rabbit_stream_connections, self()),
- Connection2 = rabbit_event:init_stats_timer(Connection1,
- #stream_connection.stats_timer),
+ Connection2 =
+ rabbit_event:init_stats_timer(Connection1, #stream_connection.stats_timer),
Connection3 = ensure_stats_timer(Connection2),
Infos =
augment_infos_with_user_provided_connection_name(infos(?CREATION_EVENT_KEYS,
@@ -293,38 +255,30 @@ listen_loop_pre_auth(Transport,
rabbit_core_metrics:connection_created(self(), Infos),
rabbit_event:notify(connection_created, Infos),
rabbit_networking:register_non_amqp_connection(self()),
- listen_loop_post_auth(Transport,
- Connection3,
- State1,
- Configuration);
- failure -> close(Transport, S);
+ listen_loop_post_auth(Transport, Connection3, State1, Configuration);
+ failure ->
+ close(Transport, S);
_ ->
- listen_loop_pre_auth(Transport,
- Connection1,
- State1,
- Configuration)
+ listen_loop_pre_auth(Transport, Connection1, State1, Configuration)
end;
{Closed, S} ->
rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]),
ok;
{Error, S, Reason} ->
- rabbit_log:info("Socket error ~p [~w]~n",
- [Reason, S, self()]);
+ rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]);
M ->
rabbit_log:warning("Unknown message ~p~n", [M]),
close(Transport, S)
end.
augment_infos_with_user_provided_connection_name(Infos,
- #stream_connection{client_properties
- =
+ #stream_connection{client_properties =
ClientProperties}) ->
case ClientProperties of
- #{<<"connection_name">> :=
- UserProvidedConnectionName} ->
- [{user_provided_name, UserProvidedConnectionName}
- | Infos];
- _ -> Infos
+ #{<<"connection_name">> := UserProvidedConnectionName} ->
+ [{user_provided_name, UserProvidedConnectionName} | Infos];
+ _ ->
+ Infos
end.
close(Transport, S) ->
@@ -333,8 +287,7 @@ close(Transport, S) ->
listen_loop_post_auth(Transport,
#stream_connection{socket = S,
- stream_subscriptions =
- StreamSubscriptions,
+ stream_subscriptions = StreamSubscriptions,
credits = Credits,
heartbeater = Heartbeater,
monitors = Monitors,
@@ -342,9 +295,7 @@ listen_loop_post_auth(Transport,
publisher_to_ids = PublisherRefToIds,
send_file_oct = SendFileOct} =
Connection0,
- #stream_connection_state{consumers = Consumers,
- blocked = Blocked} =
- State,
+ #stream_connection_state{consumers = Consumers, blocked = Blocked} = State,
#configuration{credits_required_for_unblocking =
CreditsRequiredForUnblocking} =
Configuration) ->
@@ -353,12 +304,8 @@ listen_loop_post_auth(Transport,
receive
{OK, S, Data} ->
{Connection1, State1} =
- handle_inbound_data_post_auth(Transport,
- Connection,
- State,
- Data),
- #stream_connection{connection_step = Step} =
- Connection1,
+ handle_inbound_data_post_auth(Transport, Connection, State, Data),
+ #stream_connection{connection_step = Step} = Connection1,
case Step of
closing ->
close(Transport, S),
@@ -367,291 +314,196 @@ listen_loop_post_auth(Transport,
close_sent ->
rabbit_log:debug("Transitioned to close_sent ~n"),
Transport:setopts(S, [{active, once}]),
- listen_loop_post_close(Transport,
- Connection1,
- State1,
- Configuration);
+ listen_loop_post_close(Transport, Connection1, State1, Configuration);
_ ->
- State2 = case Blocked of
- true ->
- case has_enough_credits_to_unblock(Credits,
- CreditsRequiredForUnblocking)
- of
- true ->
- Transport:setopts(S,
- [{active,
- once}]),
- ok =
- rabbit_heartbeat:resume_monitor(Heartbeater),
- State1#stream_connection_state{blocked
- =
- false};
- false -> State1
- end;
- false ->
- case has_credits(Credits) of
- true ->
- Transport:setopts(S,
- [{active,
- once}]),
- State1;
- false ->
- ok =
- rabbit_heartbeat:pause_monitor(Heartbeater),
- State1#stream_connection_state{blocked
- =
- true}
- end
- end,
- listen_loop_post_auth(Transport,
- Connection1,
- State2,
- Configuration)
+ State2 =
+ case Blocked of
+ true ->
+ case has_enough_credits_to_unblock(Credits,
+ CreditsRequiredForUnblocking)
+ of
+ true ->
+ Transport:setopts(S, [{active, once}]),
+ ok = rabbit_heartbeat:resume_monitor(Heartbeater),
+ State1#stream_connection_state{blocked = false};
+ false ->
+ State1
+ end;
+ false ->
+ case has_credits(Credits) of
+ true ->
+ Transport:setopts(S, [{active, once}]),
+ State1;
+ false ->
+ ok = rabbit_heartbeat:pause_monitor(Heartbeater),
+ State1#stream_connection_state{blocked = true}
+ end
+ end,
+ listen_loop_post_auth(Transport, Connection1, State2, Configuration)
end;
{'DOWN', MonitorRef, process, _OsirisPid, _Reason} ->
- {Connection1, State1} = case Monitors of
- #{MonitorRef := Stream} ->
- Monitors1 = maps:remove(MonitorRef,
- Monitors),
- C =
- Connection#stream_connection{monitors
- =
- Monitors1},
- case
- clean_state_after_stream_deletion_or_failure(Stream,
- C,
- State)
- of
- {cleaned,
- NewConnection,
- NewState} ->
- StreamSize =
- byte_size(Stream),
- FrameSize = 2 + 2 + 2 + 2 +
- StreamSize,
- Transport:send(S,
- [<<FrameSize:32,
- (?COMMAND_METADATA_UPDATE):16,
- (?VERSION_0):16,
- (?RESPONSE_CODE_STREAM_NOT_AVAILABLE):16,
- StreamSize:16,
- Stream/binary>>]),
- {NewConnection, NewState};
- {not_cleaned,
- SameConnection,
- SameState} ->
- {SameConnection, SameState}
- end;
- _ -> {Connection, State}
- end,
- listen_loop_post_auth(Transport,
- Connection1,
- State1,
- Configuration);
+ {Connection1, State1} =
+ case Monitors of
+ #{MonitorRef := Stream} ->
+ Monitors1 = maps:remove(MonitorRef, Monitors),
+ C = Connection#stream_connection{monitors = Monitors1},
+ case clean_state_after_stream_deletion_or_failure(Stream, C, State) of
+ {cleaned, NewConnection, NewState} ->
+ StreamSize = byte_size(Stream),
+ FrameSize = 2 + 2 + 2 + 2 + StreamSize,
+ Transport:send(S,
+ [<<FrameSize:32,
+ ?COMMAND_METADATA_UPDATE:16,
+ ?VERSION_0:16,
+ ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16,
+ StreamSize:16,
+ Stream/binary>>]),
+ {NewConnection, NewState};
+ {not_cleaned, SameConnection, SameState} ->
+ {SameConnection, SameState}
+ end;
+ _ ->
+ {Connection, State}
+ end,
+ listen_loop_post_auth(Transport, Connection1, State1, Configuration);
{'$gen_cast',
{queue_event,
_QueueResource,
- {osiris_written,
- _QueueResource,
- undefined,
- CorrelationList}}} ->
- {FirstPublisherId, _FirstPublishingId} = lists:nth(1,
- CorrelationList),
+ {osiris_written, _QueueResource, undefined, CorrelationList}}} ->
+ {FirstPublisherId, _FirstPublishingId} = lists:nth(1, CorrelationList),
{LastPublisherId, LastPublishingIds, LastCount} =
- lists:foldl(fun ({PublisherId, PublishingId},
- {CurrentPublisherId, PublishingIds, Count}) ->
- case PublisherId of
- CurrentPublisherId ->
- {CurrentPublisherId,
- [PublishingIds,
- <<PublishingId:64>>],
- Count + 1};
- OtherPublisherId ->
- FrameSize = 2 + 2 + 1 + 4 +
- Count * 8,
- %% FIXME enforce max frame size
- %% in practice, this should be necessary only for very large chunks and for very small frame size limits
- Transport:send(S,
- [<<FrameSize:32,
- (?COMMAND_PUBLISH_CONFIRM):16,
- (?VERSION_0):16>>,
- <<CurrentPublisherId:8>>,
- <<Count:32>>,
- PublishingIds]),
- {OtherPublisherId,
- <<PublishingId:64>>,
- 1}
- end
+ lists:foldl(fun({PublisherId, PublishingId},
+ {CurrentPublisherId, PublishingIds, Count}) ->
+ case PublisherId of
+ CurrentPublisherId ->
+ {CurrentPublisherId,
+ [PublishingIds, <<PublishingId:64>>],
+ Count + 1};
+ OtherPublisherId ->
+ FrameSize = 2 + 2 + 1 + 4 + Count * 8,
+ %% FIXME enforce max frame size
+ %% in practice, this should be necessary only for very large chunks and for very small frame size limits
+ Transport:send(S,
+ [<<FrameSize:32,
+ ?COMMAND_PUBLISH_CONFIRM:16,
+ ?VERSION_0:16>>,
+ <<CurrentPublisherId:8>>,
+ <<Count:32>>,
+ PublishingIds]),
+ {OtherPublisherId, <<PublishingId:64>>, 1}
+ end
end,
{FirstPublisherId, <<>>, 0},
CorrelationList),
FrameSize = 2 + 2 + 1 + 4 + LastCount * 8,
Transport:send(S,
- [<<FrameSize:32, (?COMMAND_PUBLISH_CONFIRM):16,
- (?VERSION_0):16>>,
+ [<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16>>,
<<LastPublisherId:8>>,
<<LastCount:32>>,
LastPublishingIds]),
CorrelationIdCount = length(CorrelationList),
add_credits(Credits, CorrelationIdCount),
- State1 = case Blocked of
- true ->
- case has_enough_credits_to_unblock(Credits,
- CreditsRequiredForUnblocking)
- of
- true ->
- Transport:setopts(S, [{active, once}]),
- ok =
- rabbit_heartbeat:resume_monitor(Heartbeater),
- State#stream_connection_state{blocked =
- false};
- false -> State
- end;
- false -> State
- end,
- listen_loop_post_auth(Transport,
- Connection,
- State1,
- Configuration);
+ State1 =
+ case Blocked of
+ true ->
+ case has_enough_credits_to_unblock(Credits, CreditsRequiredForUnblocking) of
+ true ->
+ Transport:setopts(S, [{active, once}]),
+ ok = rabbit_heartbeat:resume_monitor(Heartbeater),
+ State#stream_connection_state{blocked = false};
+ false ->
+ State
+ end;
+ false ->
+ State
+ end,
+ listen_loop_post_auth(Transport, Connection, State1, Configuration);
{'$gen_cast',
{queue_event,
_QueueResource,
- {osiris_written,
- #resource{name = Stream},
- PublisherReference,
- CorrelationList}}} ->
+ {osiris_written, #resource{name = Stream}, PublisherReference, CorrelationList}}} ->
%% FIXME handle case when publisher ID is not found (e.g. deleted before confirms arrive)
- PublisherId = maps:get({Stream, PublisherReference},
- PublisherRefToIds,
- undefined),
- PubIds = lists:foldl(fun (PublishingId,
- PublishingIds) ->
- [PublishingIds, <<PublishingId:64>>]
- end,
- <<>>,
- CorrelationList),
+ PublisherId = maps:get({Stream, PublisherReference}, PublisherRefToIds, undefined),
+ PubIds =
+ lists:foldl(fun(PublishingId, PublishingIds) -> [PublishingIds, <<PublishingId:64>>]
+ end,
+ <<>>,
+ CorrelationList),
PublishingIdCount = length(CorrelationList),
FrameSize = 2 + 2 + 1 + 4 + PublishingIdCount * 8,
Transport:send(S,
- [<<FrameSize:32, (?COMMAND_PUBLISH_CONFIRM):16,
- (?VERSION_0):16>>,
+ [<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16>>,
<<PublisherId:8>>,
<<PublishingIdCount:32>>,
PubIds]),
add_credits(Credits, PublishingIdCount),
- State1 = case Blocked of
- true ->
- case has_enough_credits_to_unblock(Credits,
- CreditsRequiredForUnblocking)
- of
- true ->
- Transport:setopts(S, [{active, once}]),
- ok =
- rabbit_heartbeat:resume_monitor(Heartbeater),
- State#stream_connection_state{blocked =
- false};
- false -> State
- end;
- false -> State
- end,
- listen_loop_post_auth(Transport,
- Connection,
- State1,
- Configuration);
+ State1 =
+ case Blocked of
+ true ->
+ case has_enough_credits_to_unblock(Credits, CreditsRequiredForUnblocking) of
+ true ->
+ Transport:setopts(S, [{active, once}]),
+ ok = rabbit_heartbeat:resume_monitor(Heartbeater),
+ State#stream_connection_state{blocked = false};
+ false ->
+ State
+ end;
+ false ->
+ State
+ end,
+ listen_loop_post_auth(Transport, Connection, State1, Configuration);
{'$gen_cast',
- {queue_event,
- #resource{name = StreamName},
- {osiris_offset, _QueueResource, -1}}} ->
- rabbit_log:info("received osiris offset event for ~p "
- "with offset ~p~n",
+ {queue_event, #resource{name = StreamName}, {osiris_offset, _QueueResource, -1}}} ->
+ rabbit_log:info("received osiris offset event for ~p with offset ~p~n",
[StreamName, -1]),
- listen_loop_post_auth(Transport,
- Connection,
- State,
- Configuration);
+ listen_loop_post_auth(Transport, Connection, State, Configuration);
{'$gen_cast',
- {queue_event,
- #resource{name = StreamName},
- {osiris_offset, _QueueResource, Offset}}}
+ {queue_event, #resource{name = StreamName}, {osiris_offset, _QueueResource, Offset}}}
when Offset > -1 ->
- {Connection1, State1} = case maps:get(StreamName,
- StreamSubscriptions,
- undefined)
- of
- undefined ->
- rabbit_log:info("osiris offset event for ~p, but no subscripti"
- "on (leftover messages after unsubscribe?)",
- [StreamName]),
- {Connection, State};
- [] ->
- rabbit_log:info("osiris offset event for ~p, but no registered "
- "consumers!",
- [StreamName]),
- {Connection#stream_connection{stream_subscriptions
- =
- maps:remove(StreamName,
- StreamSubscriptions)},
- State};
- CorrelationIds
- when is_list(CorrelationIds) ->
- Consumers1 = lists:foldl(fun
- (CorrelationId,
- ConsumersAcc) ->
- #{CorrelationId
- :=
- Consumer} =
- ConsumersAcc,
- #consumer{credit
- =
- Credit} =
- Consumer,
- Consumer1 =
- case
- Credit
- of
- 0 ->
- Consumer;
- _ ->
- {{segment,
- Segment1},
- {credit,
- Credit1}} =
- send_chunks(Transport,
- Consumer,
- SendFileOct),
- Consumer#consumer{segment
- =
- Segment1,
- credit
- =
- Credit1}
- end,
- ConsumersAcc#{CorrelationId
- =>
- Consumer1}
- end,
- Consumers,
- CorrelationIds),
- {Connection,
- State#stream_connection_state{consumers
- =
- Consumers1}}
- end,
- listen_loop_post_auth(Transport,
- Connection1,
- State1,
- Configuration);
+ {Connection1, State1} =
+ case maps:get(StreamName, StreamSubscriptions, undefined) of
+ undefined ->
+ rabbit_log:info("osiris offset event for ~p, but no subscription (leftover messages "
+ "after unsubscribe?)",
+ [StreamName]),
+ {Connection, State};
+ [] ->
+ rabbit_log:info("osiris offset event for ~p, but no registered consumers!",
+ [StreamName]),
+ {Connection#stream_connection{stream_subscriptions =
+ maps:remove(StreamName,
+ StreamSubscriptions)},
+ State};
+ CorrelationIds when is_list(CorrelationIds) ->
+ Consumers1 =
+ lists:foldl(fun(CorrelationId, ConsumersAcc) ->
+ #{CorrelationId := Consumer} = ConsumersAcc,
+ #consumer{credit = Credit} = Consumer,
+ Consumer1 =
+ case Credit of
+ 0 -> Consumer;
+ _ ->
+ {{segment, Segment1}, {credit, Credit1}} =
+ send_chunks(Transport,
+ Consumer,
+ SendFileOct),
+ Consumer#consumer{segment = Segment1,
+ credit = Credit1}
+ end,
+ ConsumersAcc#{CorrelationId => Consumer1}
+ end,
+ Consumers,
+ CorrelationIds),
+ {Connection, State#stream_connection_state{consumers = Consumers1}}
+ end,
+ listen_loop_post_auth(Transport, Connection1, State1, Configuration);
heartbeat_send ->
- Frame = <<(?COMMAND_HEARTBEAT):16, (?VERSION_0):16>>,
+ Frame = <<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>,
case catch frame(Transport, Connection, Frame) of
ok ->
- listen_loop_post_auth(Transport,
- Connection,
- State,
- Configuration);
+ listen_loop_post_auth(Transport, Connection, State, Configuration);
Unexpected ->
- rabbit_log:info("Heartbeat send error ~p, closing connection~n",
- [Unexpected]),
+ rabbit_log:info("Heartbeat send error ~p, closing connection~n", [Unexpected]),
C1 = demonitor_all_streams(Connection),
close(Transport, C1)
end;
@@ -661,29 +513,16 @@ listen_loop_post_auth(Transport,
close(Transport, C1);
{infos, From} ->
From ! {self(), ClientProperties},
- listen_loop_post_auth(Transport,
- Connection,
- State,
- Configuration);
+ listen_loop_post_auth(Transport, Connection, State, Configuration);
{'$gen_call', From, info} ->
- gen_server:reply(From,
- infos(?INFO_ITEMS, Connection, State)),
- listen_loop_post_auth(Transport,
- Connection,
- State,
- Configuration);
+ gen_server:reply(From, infos(?INFO_ITEMS, Connection, State)),
+ listen_loop_post_auth(Transport, Connection, State, Configuration);
{'$gen_call', From, {info, Items}} ->
gen_server:reply(From, infos(Items, Connection, State)),
- listen_loop_post_auth(Transport,
- Connection,
- State,
- Configuration);
+ listen_loop_post_auth(Transport, Connection, State, Configuration);
emit_stats ->
Connection1 = emit_stats(Connection, State),
- listen_loop_post_auth(Transport,
- Connection1,
- State,
- Configuration);
+ listen_loop_post_auth(Transport, Connection1, State, Configuration);
{'$gen_cast', {force_event_refresh, Ref}} ->
Infos =
augment_infos_with_user_provided_connection_name(infos(?CREATION_EVENT_KEYS,
@@ -691,18 +530,12 @@ listen_loop_post_auth(Transport,
State),
Connection),
rabbit_event:notify(connection_created, Infos, Ref),
- Connection1 = rabbit_event:init_stats_timer(Connection,
- #stream_connection.stats_timer),
- listen_loop_post_auth(Transport,
- Connection1,
- State,
- Configuration);
+ Connection1 = rabbit_event:init_stats_timer(Connection, #stream_connection.stats_timer),
+ listen_loop_post_auth(Transport, Connection1, State, Configuration);
{'$gen_call', From, {shutdown, Explanation}} ->
% likely closing call from the management plugin
gen_server:reply(From, ok),
- rabbit_log:info("Forcing stream connection ~p closing: "
- "~p~n",
- [self(), Explanation]),
+ rabbit_log:info("Forcing stream connection ~p closing: ~p~n", [self(), Explanation]),
demonitor_all_streams(Connection),
rabbit_networking:unregister_non_amqp_connection(self()),
notify_connection_closed(Connection, State),
@@ -718,19 +551,16 @@ listen_loop_post_auth(Transport,
demonitor_all_streams(Connection),
rabbit_networking:unregister_non_amqp_connection(self()),
notify_connection_closed(Connection, State),
- rabbit_log:info("Socket error ~p [~w]~n",
- [Reason, S, self()]);
+ rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]);
M ->
rabbit_log:warning("Unknown message ~p~n", [M]),
%% FIXME send close
- listen_loop_post_auth(Transport,
- Connection,
- State,
- Configuration)
+ listen_loop_post_auth(Transport, Connection, State, Configuration)
end.
listen_loop_post_close(Transport,
- #stream_connection{socket = S} = Connection, State,
+ #stream_connection{socket = S} = Connection,
+ State,
Configuration) ->
{OK, Closed, Error} = Transport:messages(),
%% FIXME demonitor streams
@@ -739,12 +569,8 @@ listen_loop_post_close(Transport,
{OK, S, Data} ->
Transport:setopts(S, [{active, once}]),
{Connection1, State1} =
- handle_inbound_data_post_close(Transport,
- Connection,
- State,
- Data),
- #stream_connection{connection_step = Step} =
- Connection1,
+ handle_inbound_data_post_close(Transport, Connection, State, Data),
+ #stream_connection{connection_step = Step} = Connection1,
case Step of
closing_done ->
rabbit_log:debug("Received close confirmation from client"),
@@ -753,10 +579,7 @@ listen_loop_post_close(Transport,
notify_connection_closed(Connection1, State1);
_ ->
Transport:setopts(S, [{active, once}]),
- listen_loop_post_close(Transport,
- Connection1,
- State1,
- Configuration)
+ listen_loop_post_close(Transport, Connection1, State1, Configuration)
end;
{Closed, S} ->
rabbit_networking:unregister_non_amqp_connection(self()),
@@ -764,42 +587,24 @@ listen_loop_post_close(Transport,
rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]),
ok;
{Error, S, Reason} ->
- rabbit_log:info("Socket error ~p [~w]~n",
- [Reason, S, self()]),
+ rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]),
close(Transport, S),
rabbit_networking:unregister_non_amqp_connection(self()),
notify_connection_closed(Connection, State);
M ->
- rabbit_log:warning("Ignored message on closing ~p~n",
- [M])
+ rabbit_log:warning("Ignored message on closing ~p~n", [M])
end.
-handle_inbound_data_pre_auth(Transport, Connection,
- State, Rest) ->
- handle_inbound_data(Transport,
- Connection,
- State,
- Rest,
- fun handle_frame_pre_auth/5).
+handle_inbound_data_pre_auth(Transport, Connection, State, Rest) ->
+ handle_inbound_data(Transport, Connection, State, Rest, fun handle_frame_pre_auth/5).
-handle_inbound_data_post_auth(Transport, Connection,
- State, Rest) ->
- handle_inbound_data(Transport,
- Connection,
- State,
- Rest,
- fun handle_frame_post_auth/5).
+handle_inbound_data_post_auth(Transport, Connection, State, Rest) ->
+ handle_inbound_data(Transport, Connection, State, Rest, fun handle_frame_post_auth/5).
-handle_inbound_data_post_close(Transport, Connection,
- State, Rest) ->
- handle_inbound_data(Transport,
- Connection,
- State,
- Rest,
- fun handle_frame_post_close/5).
+handle_inbound_data_post_close(Transport, Connection, State, Rest) ->
+ handle_inbound_data(Transport, Connection, State, Rest, fun handle_frame_post_close/5).
-handle_inbound_data(_Transport, Connection, State, <<>>,
- _HandleFrameFun) ->
+handle_inbound_data(_Transport, Connection, State, <<>>, _HandleFrameFun) ->
{Connection, State};
handle_inbound_data(Transport,
#stream_connection{frame_max = FrameMax} = Connection,
@@ -809,35 +614,32 @@ handle_inbound_data(Transport,
when FrameMax /= 0 andalso Size > FrameMax - 4 ->
CloseReason = <<"frame too large">>,
CloseReasonLength = byte_size(CloseReason),
- CloseFrame = <<(?COMMAND_CLOSE):16, (?VERSION_0):16,
- 1:32, (?RESPONSE_CODE_FRAME_TOO_LARGE):16,
- CloseReasonLength:16,
- CloseReason:CloseReasonLength/binary>>,
+ CloseFrame =
+ <<?COMMAND_CLOSE:16,
+ ?VERSION_0:16,
+ 1:32,
+ ?RESPONSE_CODE_FRAME_TOO_LARGE:16,
+ CloseReasonLength:16,
+ CloseReason:CloseReasonLength/binary>>,
frame(Transport, Connection, CloseFrame),
- {Connection#stream_connection{connection_step =
- close_sent},
- State};
-handle_inbound_data(Transport, Connection,
+ {Connection#stream_connection{connection_step = close_sent}, State};
+handle_inbound_data(Transport,
+ Connection,
#stream_connection_state{data = none} = State,
<<Size:32, Frame:Size/binary, Rest/bits>>,
HandleFrameFun) ->
- {Connection1, State1, Rest1} = HandleFrameFun(Transport,
- Connection,
- State,
- Frame,
- Rest),
- handle_inbound_data(Transport,
- Connection1,
- State1,
- Rest1,
- HandleFrameFun);
-handle_inbound_data(_Transport, Connection,
- #stream_connection_state{data = none} = State, Data,
+ {Connection1, State1, Rest1} = HandleFrameFun(Transport, Connection, State, Frame, Rest),
+ handle_inbound_data(Transport, Connection1, State1, Rest1, HandleFrameFun);
+handle_inbound_data(_Transport,
+ Connection,
+ #stream_connection_state{data = none} = State,
+ Data,
_HandleFrameFun) ->
- {Connection,
- State#stream_connection_state{data = Data}};
-handle_inbound_data(Transport, Connection,
- #stream_connection_state{data = Leftover} = State, Data,
+ {Connection, State#stream_connection_state{data = Data}};
+handle_inbound_data(Transport,
+ Connection,
+ #stream_connection_state{data = Leftover} = State,
+ Data,
HandleFrameFun) ->
State1 = State#stream_connection_state{data = none},
%% FIXME avoid concatenation to avoid a new binary allocation
@@ -850,92 +652,93 @@ handle_inbound_data(Transport, Connection,
generate_publishing_error_details(Acc, _Code, <<>>) ->
Acc;
-generate_publishing_error_details(Acc, Code,
- <<PublishingId:64, MessageSize:32,
+generate_publishing_error_details(Acc,
+ Code,
+ <<PublishingId:64,
+ MessageSize:32,
_Message:MessageSize/binary,
Rest/binary>>) ->
- generate_publishing_error_details(<<Acc/binary,
- PublishingId:64, Code:16>>,
- Code,
- Rest).
+ generate_publishing_error_details(<<Acc/binary, PublishingId:64, Code:16>>, Code, Rest).
handle_frame_pre_auth(Transport,
- #stream_connection{socket = S} = Connection, State,
- <<(?COMMAND_PEER_PROPERTIES):16, (?VERSION_0):16,
- CorrelationId:32, ClientPropertiesCount:32,
+ #stream_connection{socket = S} = Connection,
+ State,
+ <<?COMMAND_PEER_PROPERTIES:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ ClientPropertiesCount:32,
ClientPropertiesFrame/binary>>,
Rest) ->
{ClientProperties, _} =
- rabbit_stream_utils:parse_map(ClientPropertiesFrame,
- ClientPropertiesCount),
- {ok, Product} = application:get_key(rabbit,
- description),
+ rabbit_stream_utils:parse_map(ClientPropertiesFrame, ClientPropertiesCount),
+ {ok, Product} = application:get_key(rabbit, description),
{ok, Version} = application:get_key(rabbit, vsn),
%% Get any configuration-specified server properties
- RawConfigServerProps = application:get_env(rabbit,
- server_properties,
- []),
- ConfigServerProperties = lists:foldl(fun ({K, V},
- Acc) ->
- maps:put(rabbit_data_coercion:to_binary(K),
- V,
- Acc)
- end,
- #{},
- RawConfigServerProps),
- ServerProperties = maps:merge(ConfigServerProperties,
- #{<<"product">> => Product,
- <<"version">> => Version,
- <<"cluster_name">> =>
- rabbit_nodes:cluster_name(),
- <<"platform">> =>
- rabbit_misc:platform_and_version(),
- <<"copyright">> => ?COPYRIGHT_MESSAGE,
- <<"information">> => ?INFORMATION_MESSAGE}),
+ RawConfigServerProps = application:get_env(rabbit, server_properties, []),
+ ConfigServerProperties =
+ lists:foldl(fun({K, V}, Acc) ->
+ maps:put(
+ rabbit_data_coercion:to_binary(K), V, Acc)
+ end,
+ #{},
+ RawConfigServerProps),
+ ServerProperties =
+ maps:merge(ConfigServerProperties,
+ #{<<"product">> => Product,
+ <<"version">> => Version,
+ <<"cluster_name">> => rabbit_nodes:cluster_name(),
+ <<"platform">> => rabbit_misc:platform_and_version(),
+ <<"copyright">> => ?COPYRIGHT_MESSAGE,
+ <<"information">> => ?INFORMATION_MESSAGE}),
ServerPropertiesCount = map_size(ServerProperties),
- ServerPropertiesFragment = maps:fold(fun (K, V, Acc) ->
- Key =
- rabbit_data_coercion:to_binary(K),
- Value =
- rabbit_data_coercion:to_binary(V),
- KeySize = byte_size(Key),
- ValueSize = byte_size(Value),
- <<Acc/binary, KeySize:16,
- Key:KeySize/binary,
- ValueSize:16,
- Value:ValueSize/binary>>
- end,
- <<>>,
- ServerProperties),
- Frame = <<(?COMMAND_PEER_PROPERTIES):16,
- (?VERSION_0):16, CorrelationId:32,
- (?RESPONSE_CODE_OK):16, ServerPropertiesCount:32,
- ServerPropertiesFragment/binary>>,
+ ServerPropertiesFragment =
+ maps:fold(fun(K, V, Acc) ->
+ Key = rabbit_data_coercion:to_binary(K),
+ Value = rabbit_data_coercion:to_binary(V),
+ KeySize = byte_size(Key),
+ ValueSize = byte_size(Value),
+ <<Acc/binary,
+ KeySize:16,
+ Key:KeySize/binary,
+ ValueSize:16,
+ Value:ValueSize/binary>>
+ end,
+ <<>>,
+ ServerProperties),
+ Frame =
+ <<?COMMAND_PEER_PROPERTIES:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ ?RESPONSE_CODE_OK:16,
+ ServerPropertiesCount:32,
+ ServerPropertiesFragment/binary>>,
FrameSize = byte_size(Frame),
Transport:send(S, [<<FrameSize:32>>, <<Frame/binary>>]),
- {Connection#stream_connection{client_properties =
- ClientProperties,
- authentication_state =
- peer_properties_exchanged},
+ {Connection#stream_connection{client_properties = ClientProperties,
+ authentication_state = peer_properties_exchanged},
State,
Rest};
handle_frame_pre_auth(Transport,
- #stream_connection{socket = S} = Connection, State,
- <<(?COMMAND_SASL_HANDSHAKE):16, (?VERSION_0):16,
- CorrelationId:32>>,
+ #stream_connection{socket = S} = Connection,
+ State,
+ <<?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16, CorrelationId:32>>,
Rest) ->
Mechanisms = rabbit_stream_utils:auth_mechanisms(S),
- MechanismsFragment = lists:foldl(fun (M, Acc) ->
- Size = byte_size(M),
- <<Acc/binary, Size:16,
- M:Size/binary>>
- end,
- <<>>,
- Mechanisms),
+ MechanismsFragment =
+ lists:foldl(fun(M, Acc) ->
+ Size = byte_size(M),
+ <<Acc/binary, Size:16, M:Size/binary>>
+ end,
+ <<>>,
+ Mechanisms),
MechanismsCount = length(Mechanisms),
- Frame = <<(?COMMAND_SASL_HANDSHAKE):16, (?VERSION_0):16,
- CorrelationId:32, (?RESPONSE_CODE_OK):16,
- MechanismsCount:32, MechanismsFragment/binary>>,
+ Frame =
+ <<?COMMAND_SASL_HANDSHAKE:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ ?RESPONSE_CODE_OK:16,
+ MechanismsCount:32,
+ MechanismsFragment/binary>>,
FrameSize = byte_size(Frame),
Transport:send(S, [<<FrameSize:32>>, <<Frame/binary>>]),
{Connection, State, Rest};
@@ -945,278 +748,223 @@ handle_frame_pre_auth(Transport,
host = Host} =
Connection0,
State,
- <<(?COMMAND_SASL_AUTHENTICATE):16, (?VERSION_0):16,
- CorrelationId:32, MechanismLength:16,
- Mechanism:MechanismLength/binary, SaslFragment/binary>>,
+ <<?COMMAND_SASL_AUTHENTICATE:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ MechanismLength:16,
+ Mechanism:MechanismLength/binary,
+ SaslFragment/binary>>,
Rest) ->
- SaslBin = case SaslFragment of
- <<(-1):32/signed>> -> <<>>;
- <<SaslBinaryLength:32,
- SaslBinary:SaslBinaryLength/binary>> ->
- SaslBinary
- end,
- {Connection1, Rest1} = case
- rabbit_stream_utils:auth_mechanism_to_module(Mechanism,
- S)
- of
- {ok, AuthMechanism} ->
- AuthState = case AuthState0 of
- none ->
- AuthMechanism:init(S);
- AS -> AS
- end,
- RemoteAddress =
- list_to_binary(inet:ntoa(Host)),
- C1 =
- Connection0#stream_connection{auth_mechanism
- =
- {Mechanism,
- AuthMechanism}},
- {C2, FrameFragment} = case
- AuthMechanism:handle_response(SaslBin,
- AuthState)
- of
- {refused,
- Username,
- Msg,
- Args} ->
- rabbit_core_metrics:auth_attempt_failed(RemoteAddress,
- Username,
- stream),
- auth_fail(Username,
- Msg,
- Args,
- C1,
- State),
- rabbit_log:warning(Msg,
- Args),
- {C1#stream_connection{connection_step
- =
- failure},
- <<(?RESPONSE_AUTHENTICATION_FAILURE):16>>};
- {protocol_error,
- Msg,
- Args} ->
- rabbit_core_metrics:auth_attempt_failed(RemoteAddress,
- <<>>,
- stream),
- notify_auth_result(none,
- user_authentication_failure,
- [{error,
- rabbit_misc:format(Msg,
- Args)}],
- C1,
- State),
- rabbit_log:warning(Msg,
- Args),
- {C1#stream_connection{connection_step
- =
- failure},
- <<(?RESPONSE_SASL_ERROR):16>>};
- {challenge,
- Challenge,
- AuthState1} ->
- rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress,
- <<>>,
- stream),
- ChallengeSize =
- byte_size(Challenge),
- {C1#stream_connection{authentication_state
- =
- AuthState1,
- connection_step
- =
- authenticating},
- <<(?RESPONSE_SASL_CHALLENGE):16,
- ChallengeSize:32,
- Challenge/binary>>};
- {ok,
- User =
- #user{username
- =
- Username}} ->
- case
- rabbit_access_control:check_user_loopback(Username,
- S)
- of
- ok ->
- rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress,
- Username,
- stream),
- notify_auth_result(Username,
- user_authentication_success,
- [],
- C1,
- State),
- {C1#stream_connection{authentication_state
- =
- done,
- user
- =
- User,
- connection_step
- =
- authenticated},
- <<(?RESPONSE_CODE_OK):16>>};
- not_allowed ->
- rabbit_core_metrics:auth_attempt_failed(RemoteAddress,
- Username,
- stream),
- rabbit_log:warning("User '~s' can only connect via localhost~n",
- [Username]),
- {C1#stream_connection{connection_step
- =
- failure},
- <<(?RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK):16>>}
- end
- end,
- Frame = <<(?COMMAND_SASL_AUTHENTICATE):16,
- (?VERSION_0):16, CorrelationId:32,
- FrameFragment/binary>>,
- frame(Transport, C1, Frame),
- {C2, Rest};
- {error, _} ->
- Frame = <<(?COMMAND_SASL_AUTHENTICATE):16,
- (?VERSION_0):16, CorrelationId:32,
- (?RESPONSE_SASL_MECHANISM_NOT_SUPPORTED):16>>,
- frame(Transport, Connection0, Frame),
- {Connection0#stream_connection{connection_step
- =
- failure},
- Rest}
- end,
+ SaslBin =
+ case SaslFragment of
+ <<(-1):32/signed>> ->
+ <<>>;
+ <<SaslBinaryLength:32, SaslBinary:SaslBinaryLength/binary>> ->
+ SaslBinary
+ end,
+ {Connection1, Rest1} =
+ case rabbit_stream_utils:auth_mechanism_to_module(Mechanism, S) of
+ {ok, AuthMechanism} ->
+ AuthState =
+ case AuthState0 of
+ none ->
+ AuthMechanism:init(S);
+ AS ->
+ AS
+ end,
+ RemoteAddress = list_to_binary(inet:ntoa(Host)),
+ C1 = Connection0#stream_connection{auth_mechanism = {Mechanism, AuthMechanism}},
+ {C2, FrameFragment} =
+ case AuthMechanism:handle_response(SaslBin, AuthState) of
+ {refused, Username, Msg, Args} ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress,
+ Username,
+ stream),
+ auth_fail(Username, Msg, Args, C1, State),
+ rabbit_log:warning(Msg, Args),
+ {C1#stream_connection{connection_step = failure},
+ <<?RESPONSE_AUTHENTICATION_FAILURE:16>>};
+ {protocol_error, Msg, Args} ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, <<>>, stream),
+ notify_auth_result(none,
+ user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}],
+ C1,
+ State),
+ rabbit_log:warning(Msg, Args),
+ {C1#stream_connection{connection_step = failure},
+ <<?RESPONSE_SASL_ERROR:16>>};
+ {challenge, Challenge, AuthState1} ->
+ rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, <<>>, stream),
+ ChallengeSize = byte_size(Challenge),
+ {C1#stream_connection{authentication_state = AuthState1,
+ connection_step = authenticating},
+ <<?RESPONSE_SASL_CHALLENGE:16, ChallengeSize:32, Challenge/binary>>};
+ {ok, User = #user{username = Username}} ->
+ case rabbit_access_control:check_user_loopback(Username, S) of
+ ok ->
+ rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress,
+ Username,
+ stream),
+ notify_auth_result(Username,
+ user_authentication_success,
+ [],
+ C1,
+ State),
+ {C1#stream_connection{authentication_state = done,
+ user = User,
+ connection_step = authenticated},
+ <<?RESPONSE_CODE_OK:16>>};
+ not_allowed ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress,
+ Username,
+ stream),
+ rabbit_log:warning("User '~s' can only connect via localhost~n",
+ [Username]),
+ {C1#stream_connection{connection_step = failure},
+ <<?RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK:16>>}
+ end
+ end,
+ Frame =
+ <<?COMMAND_SASL_AUTHENTICATE:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ FrameFragment/binary>>,
+ frame(Transport, C1, Frame),
+ {C2, Rest};
+ {error, _} ->
+ Frame =
+ <<?COMMAND_SASL_AUTHENTICATE:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ ?RESPONSE_SASL_MECHANISM_NOT_SUPPORTED:16>>,
+ frame(Transport, Connection0, Frame),
+ {Connection0#stream_connection{connection_step = failure}, Rest}
+ end,
{Connection1, State, Rest1};
handle_frame_pre_auth(_Transport,
- #stream_connection{helper_sup = SupPid, socket = Sock,
+ #stream_connection{helper_sup = SupPid,
+ socket = Sock,
name = ConnectionName} =
Connection,
State,
- <<(?COMMAND_TUNE):16, (?VERSION_0):16, FrameMax:32,
- Heartbeat:32>>,
+ <<?COMMAND_TUNE:16, ?VERSION_0:16, FrameMax:32, Heartbeat:32>>,
Rest) ->
- rabbit_log:info("Tuning response ~p ~p ~n",
- [FrameMax, Heartbeat]),
+ rabbit_log:info("Tuning response ~p ~p ~n", [FrameMax, Heartbeat]),
Parent = self(),
%% sending a message to the main process so the heartbeat frame is sent from this main process
%% otherwise heartbeat frames can interleave with chunk delivery
%% (chunk delivery is made of 2 calls on the socket, one for the header and one send_file for the chunk,
%% we don't want a heartbeat frame to sneak in in-between)
- SendFun = fun () ->
- Parent ! heartbeat_send,
- ok
- end,
- ReceiveFun = fun () -> Parent ! heartbeat_timeout end,
- Heartbeater = rabbit_heartbeat:start(SupPid,
- Sock,
- ConnectionName,
- Heartbeat,
- SendFun,
- Heartbeat,
- ReceiveFun),
+ SendFun =
+ fun() ->
+ Parent ! heartbeat_send,
+ ok
+ end,
+ ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
+ Heartbeater =
+ rabbit_heartbeat:start(SupPid,
+ Sock,
+ ConnectionName,
+ Heartbeat,
+ SendFun,
+ Heartbeat,
+ ReceiveFun),
{Connection#stream_connection{connection_step = tuned,
- frame_max = FrameMax, heartbeat = Heartbeat,
+ frame_max = FrameMax,
+ heartbeat = Heartbeat,
heartbeater = Heartbeater},
State,
Rest};
handle_frame_pre_auth(Transport,
- #stream_connection{user = User, socket = S} =
- Connection,
+ #stream_connection{user = User, socket = S} = Connection,
State,
- <<(?COMMAND_OPEN):16, (?VERSION_0):16, CorrelationId:32,
+ <<?COMMAND_OPEN:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
VirtualHostLength:16,
VirtualHost:VirtualHostLength/binary>>,
Rest) ->
%% FIXME enforce connection limit (see rabbit_reader:is_over_connection_limit/2)
- {Connection1, Frame} = try
- rabbit_access_control:check_vhost_access(User,
- VirtualHost,
- {socket,
- S},
- #{}),
- F = <<(?COMMAND_OPEN):16, (?VERSION_0):16,
- CorrelationId:32, (?RESPONSE_CODE_OK):16>>,
- %% FIXME check if vhost is alive (see rabbit_reader:is_vhost_alive/2)
- {Connection#stream_connection{connection_step =
- opened,
- virtual_host =
- VirtualHost},
- F}
- catch
- exit:_ ->
- Fr = <<(?COMMAND_OPEN):16, (?VERSION_0):16,
- CorrelationId:32,
- (?RESPONSE_VHOST_ACCESS_FAILURE):16>>,
- {Connection#stream_connection{connection_step
- = failure},
- Fr}
- end,
+ {Connection1, Frame} =
+ try
+ rabbit_access_control:check_vhost_access(User, VirtualHost, {socket, S}, #{}),
+ F = <<?COMMAND_OPEN:16, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_CODE_OK:16>>,
+ %% FIXME check if vhost is alive (see rabbit_reader:is_vhost_alive/2)
+ {Connection#stream_connection{connection_step = opened, virtual_host = VirtualHost}, F}
+ catch
+ exit:_ ->
+ Fr = <<?COMMAND_OPEN:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ ?RESPONSE_VHOST_ACCESS_FAILURE:16>>,
+ {Connection#stream_connection{connection_step = failure}, Fr}
+ end,
frame(Transport, Connection1, Frame),
{Connection1, State, Rest};
-handle_frame_pre_auth(_Transport, Connection, State,
- <<(?COMMAND_HEARTBEAT):16, (?VERSION_0):16>>, Rest) ->
+handle_frame_pre_auth(_Transport,
+ Connection,
+ State,
+ <<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>,
+ Rest) ->
rabbit_log:info("Received heartbeat frame pre auth~n"),
{Connection, State, Rest};
-handle_frame_pre_auth(_Transport, Connection, State,
- Frame, Rest) ->
- rabbit_log:warning("unknown frame ~p ~p, closing connection.~n",
- [Frame, Rest]),
- {Connection#stream_connection{connection_step =
- failure},
- State,
- Rest}.
+handle_frame_pre_auth(_Transport, Connection, State, Frame, Rest) ->
+ rabbit_log:warning("unknown frame ~p ~p, closing connection.~n", [Frame, Rest]),
+ {Connection#stream_connection{connection_step = failure}, State, Rest}.
-auth_fail(Username, Msg, Args, Connection,
- ConnectionState) ->
+auth_fail(Username, Msg, Args, Connection, ConnectionState) ->
notify_auth_result(Username,
user_authentication_failure,
[{error, rabbit_misc:format(Msg, Args)}],
Connection,
ConnectionState).
-notify_auth_result(Username, AuthResult, ExtraProps,
- Connection, ConnectionState) ->
- EventProps = [{connection_type, network},
- {name,
- case Username of
- none -> '';
- _ -> Username
- end}]
- ++
- [case Item of
- name ->
- {connection_name,
- i(name, Connection, ConnectionState)};
- _ -> {Item, i(Item, Connection, ConnectionState)}
- end
- || Item <- ?AUTH_NOTIFICATION_INFO_KEYS]
- ++ ExtraProps,
- rabbit_event:notify(AuthResult,
- [P || {_, V} = P <- EventProps, V =/= '']).
+notify_auth_result(Username, AuthResult, ExtraProps, Connection, ConnectionState) ->
+ EventProps =
+ [{connection_type, network},
+ {name,
+ case Username of
+ none ->
+ '';
+ _ ->
+ Username
+ end}]
+ ++ [case Item of
+ name ->
+ {connection_name, i(name, Connection, ConnectionState)};
+ _ ->
+ {Item, i(Item, Connection, ConnectionState)}
+ end
+ || Item <- ?AUTH_NOTIFICATION_INFO_KEYS]
+ ++ ExtraProps,
+ rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']).
handle_frame_post_auth(Transport,
#stream_connection{virtual_host = VirtualHost,
- user = User, publishers = Publishers0,
+ user = User,
+ publishers = Publishers0,
publisher_to_ids = RefIds0} =
Connection0,
State,
- <<(?COMMAND_DECLARE_PUBLISHER):16, (?VERSION_0):16,
- CorrelationId:32, PublisherId:8, ReferenceSize:16,
- Reference:ReferenceSize/binary, StreamSize:16,
+ <<?COMMAND_DECLARE_PUBLISHER:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ PublisherId:8,
+ ReferenceSize:16,
+ Reference:ReferenceSize/binary,
+ StreamSize:16,
Stream:StreamSize/binary>>,
Rest) ->
- case
- rabbit_stream_utils:check_write_permitted(#resource{name
- = Stream,
- kind = queue,
- virtual_host =
- VirtualHost},
- User,
- #{})
- of
+ case rabbit_stream_utils:check_write_permitted(#resource{name = Stream,
+ kind = queue,
+ virtual_host = VirtualHost},
+ User,
+ #{})
+ of
ok ->
- case {maps:is_key(PublisherId, Publishers0),
- maps:is_key({Stream, Reference}, RefIds0)}
- of
+ case {maps:is_key(PublisherId, Publishers0), maps:is_key({Stream, Reference}, RefIds0)}
+ of
{false, false} ->
case lookup_leader(Stream, Connection0) of
cluster_not_found ->
@@ -1227,36 +975,29 @@ handle_frame_post_auth(Transport,
?RESPONSE_CODE_STREAM_DOES_NOT_EXIST),
{Connection0, State, Rest};
{ClusterLeader,
- #stream_connection{publishers = Publishers0,
- publisher_to_ids = RefIds0} =
+ #stream_connection{publishers = Publishers0, publisher_to_ids = RefIds0} =
Connection1} ->
- {PublisherReference, RefIds1} = case Reference of
- <<"">> ->
- {undefined,
- RefIds0};
- _ ->
- {Reference,
- RefIds0#{{Stream,
- Reference}
- =>
- PublisherId}}
- end,
- Publisher = #publisher{publisher_id = PublisherId,
- stream = Stream,
- reference =
- PublisherReference,
- leader = ClusterLeader},
+ {PublisherReference, RefIds1} =
+ case Reference of
+ <<"">> ->
+ {undefined, RefIds0};
+ _ ->
+ {Reference, RefIds0#{{Stream, Reference} => PublisherId}}
+ end,
+ Publisher =
+ #publisher{publisher_id = PublisherId,
+ stream = Stream,
+ reference = PublisherReference,
+ leader = ClusterLeader},
response(Transport,
Connection0,
?COMMAND_DECLARE_PUBLISHER,
CorrelationId,
?RESPONSE_CODE_OK),
{Connection1#stream_connection{publishers =
- Publishers0#{PublisherId
- =>
+ Publishers0#{PublisherId =>
Publisher},
- publisher_to_ids =
- RefIds1},
+ publisher_to_ids = RefIds1},
State,
Rest}
end;
@@ -1277,25 +1018,21 @@ handle_frame_post_auth(Transport,
{Connection0, State, Rest}
end;
handle_frame_post_auth(Transport,
- #stream_connection{publishers = Publishers,
- publisher_to_ids = PubToIds} =
+ #stream_connection{publishers = Publishers, publisher_to_ids = PubToIds} =
Connection0,
State,
- <<(?COMMAND_DELETE_PUBLISHER):16, (?VERSION_0):16,
- CorrelationId:32, PublisherId:8>>,
+ <<?COMMAND_DELETE_PUBLISHER:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ PublisherId:8>>,
Rest) ->
case Publishers of
- #{PublisherId :=
- #publisher{stream = Stream, reference = Ref}} ->
- Connection1 = Connection0#stream_connection{publishers =
- maps:remove(PublisherId,
- Publishers),
- publisher_to_ids =
- maps:remove({Stream,
- Ref},
- PubToIds)},
- Connection2 = maybe_clean_connection_from_stream(Stream,
- Connection1),
+ #{PublisherId := #publisher{stream = Stream, reference = Ref}} ->
+ Connection1 =
+ Connection0#stream_connection{publishers = maps:remove(PublisherId, Publishers),
+ publisher_to_ids =
+ maps:remove({Stream, Ref}, PubToIds)},
+ Connection2 = maybe_clean_connection_from_stream(Stream, Connection1),
response(Transport,
Connection1,
?COMMAND_DELETE_PUBLISHER,
@@ -1311,91 +1048,91 @@ handle_frame_post_auth(Transport,
{Connection0, State, Rest}
end;
handle_frame_post_auth(Transport,
- #stream_connection{socket = S, credits = Credits,
+ #stream_connection{socket = S,
+ credits = Credits,
virtual_host = VirtualHost,
user = User,
publishers = Publishers} =
Connection,
State,
- <<(?COMMAND_PUBLISH):16, (?VERSION_0):16,
- PublisherId:8/unsigned, MessageCount:32,
+ <<?COMMAND_PUBLISH:16,
+ ?VERSION_0:16,
+ PublisherId:8/unsigned,
+ MessageCount:32,
Messages/binary>>,
Rest) ->
case Publishers of
#{PublisherId := Publisher} ->
- #publisher{stream = Stream, reference = Reference,
+ #publisher{stream = Stream,
+ reference = Reference,
leader = Leader} =
Publisher,
- case
- rabbit_stream_utils:check_write_permitted(#resource{name
- =
- Stream,
- kind =
- queue,
- virtual_host
- =
- VirtualHost},
- User,
- #{})
- of
+ case rabbit_stream_utils:check_write_permitted(#resource{name = Stream,
+ kind = queue,
+ virtual_host = VirtualHost},
+ User,
+ #{})
+ of
ok ->
- rabbit_stream_utils:write_messages(Leader,
- Reference,
- PublisherId,
- Messages),
+ rabbit_stream_utils:write_messages(Leader, Reference, PublisherId, Messages),
sub_credits(Credits, MessageCount),
{Connection, State, Rest};
error ->
FrameSize = 2 + 2 + 1 + 4 + (8 + 2) * MessageCount,
- Details = generate_publishing_error_details(<<>>,
- ?RESPONSE_CODE_ACCESS_REFUSED,
- Messages),
+ Details =
+ generate_publishing_error_details(<<>>,
+ ?RESPONSE_CODE_ACCESS_REFUSED,
+ Messages),
Transport:send(S,
- [<<FrameSize:32, (?COMMAND_PUBLISH_ERROR):16,
- (?VERSION_0):16, PublisherId:8,
- MessageCount:32, Details/binary>>]),
+ [<<FrameSize:32,
+ ?COMMAND_PUBLISH_ERROR:16,
+ ?VERSION_0:16,
+ PublisherId:8,
+ MessageCount:32,
+ Details/binary>>]),
{Connection, State, Rest}
end;
_ ->
FrameSize = 2 + 2 + 1 + 4 + (8 + 2) * MessageCount,
- Details = generate_publishing_error_details(<<>>,
- ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST,
- Messages),
+ Details =
+ generate_publishing_error_details(<<>>,
+ ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST,
+ Messages),
Transport:send(S,
- [<<FrameSize:32, (?COMMAND_PUBLISH_ERROR):16,
- (?VERSION_0):16, PublisherId:8, MessageCount:32,
+ [<<FrameSize:32,
+ ?COMMAND_PUBLISH_ERROR:16,
+ ?VERSION_0:16,
+ PublisherId:8,
+ MessageCount:32,
Details/binary>>]),
{Connection, State, Rest}
end;
handle_frame_post_auth(Transport,
#stream_connection{socket = Socket,
- stream_subscriptions =
- StreamSubscriptions,
+ stream_subscriptions = StreamSubscriptions,
virtual_host = VirtualHost,
user = User,
send_file_oct = SendFileOct} =
Connection,
#stream_connection_state{consumers = Consumers} = State,
- <<(?COMMAND_SUBSCRIBE):16, (?VERSION_0):16,
- CorrelationId:32, SubscriptionId:8/unsigned,
- StreamSize:16, Stream:StreamSize/binary,
- OffsetType:16/signed, OffsetAndCredit/binary>>,
+ <<?COMMAND_SUBSCRIBE:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ SubscriptionId:8/unsigned,
+ StreamSize:16,
+ Stream:StreamSize/binary,
+ OffsetType:16/signed,
+ OffsetAndCredit/binary>>,
Rest) ->
%% FIXME check the max number of subs is not reached already
- case
- rabbit_stream_utils:check_read_permitted(#resource{name
- = Stream,
- kind = queue,
- virtual_host =
- VirtualHost},
- User,
- #{})
- of
+ case rabbit_stream_utils:check_read_permitted(#resource{name = Stream,
+ kind = queue,
+ virtual_host = VirtualHost},
+ User,
+ #{})
+ of
ok ->
- case
- rabbit_stream_manager:lookup_local_member(VirtualHost,
- Stream)
- of
+ case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of
{error, not_available} ->
response(Transport,
Connection,
@@ -1411,9 +1148,7 @@ handle_frame_post_auth(Transport,
?RESPONSE_CODE_STREAM_DOES_NOT_EXIST),
{Connection, State, Rest};
{ok, LocalMemberPid} ->
- case subscription_exists(StreamSubscriptions,
- SubscriptionId)
- of
+ case subscription_exists(StreamSubscriptions, SubscriptionId) of
true ->
response(Transport,
Connection,
@@ -1422,79 +1157,53 @@ handle_frame_post_auth(Transport,
?RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS),
{Connection, State, Rest};
false ->
- {OffsetSpec, Credit} = case OffsetType of
- ?OFFSET_TYPE_FIRST ->
- <<Crdt:16>> =
- OffsetAndCredit,
- {first, Crdt};
- ?OFFSET_TYPE_LAST ->
- <<Crdt:16>> =
- OffsetAndCredit,
- {last, Crdt};
- ?OFFSET_TYPE_NEXT ->
- <<Crdt:16>> =
- OffsetAndCredit,
- {next, Crdt};
- ?OFFSET_TYPE_OFFSET ->
- <<Offset:64/unsigned,
- Crdt:16>> =
- OffsetAndCredit,
- {Offset, Crdt};
- ?OFFSET_TYPE_TIMESTAMP ->
- <<Timestamp:64/signed,
- Crdt:16>> =
- OffsetAndCredit,
- {{timestamp,
- Timestamp},
- Crdt}
- end,
- {ok, Segment} = osiris:init_reader(LocalMemberPid,
- OffsetSpec),
- ConsumerState = #consumer{member_pid =
- LocalMemberPid,
- offset = OffsetSpec,
- subscription_id =
- SubscriptionId,
- socket = Socket,
- segment = Segment,
- credit = Credit,
- stream = Stream},
- Connection1 = maybe_monitor_stream(LocalMemberPid,
- Stream,
- Connection),
- response_ok(Transport,
- Connection,
- ?COMMAND_SUBSCRIBE,
- CorrelationId),
+ {OffsetSpec, Credit} =
+ case OffsetType of
+ ?OFFSET_TYPE_FIRST ->
+ <<Crdt:16>> = OffsetAndCredit,
+ {first, Crdt};
+ ?OFFSET_TYPE_LAST ->
+ <<Crdt:16>> = OffsetAndCredit,
+ {last, Crdt};
+ ?OFFSET_TYPE_NEXT ->
+ <<Crdt:16>> = OffsetAndCredit,
+ {next, Crdt};
+ ?OFFSET_TYPE_OFFSET ->
+ <<Offset:64/unsigned, Crdt:16>> = OffsetAndCredit,
+ {Offset, Crdt};
+ ?OFFSET_TYPE_TIMESTAMP ->
+ <<Timestamp:64/signed, Crdt:16>> = OffsetAndCredit,
+ {{timestamp, Timestamp}, Crdt}
+ end,
+ {ok, Segment} = osiris:init_reader(LocalMemberPid, OffsetSpec),
+ ConsumerState =
+ #consumer{member_pid = LocalMemberPid,
+ offset = OffsetSpec,
+ subscription_id = SubscriptionId,
+ socket = Socket,
+ segment = Segment,
+ credit = Credit,
+ stream = Stream},
+ Connection1 = maybe_monitor_stream(LocalMemberPid, Stream, Connection),
+ response_ok(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId),
{{segment, Segment1}, {credit, Credit1}} =
- send_chunks(Transport,
- ConsumerState,
- SendFileOct),
- Consumers1 = Consumers#{SubscriptionId =>
- ConsumerState#consumer{segment
- =
- Segment1,
- credit
- =
- Credit1}},
- StreamSubscriptions1 = case StreamSubscriptions of
- #{Stream :=
- SubscriptionIds} ->
- StreamSubscriptions#{Stream
- =>
- [SubscriptionId]
- ++
- SubscriptionIds};
- _ ->
- StreamSubscriptions#{Stream
- =>
- [SubscriptionId]}
- end,
- {Connection1#stream_connection{stream_subscriptions
- =
+ send_chunks(Transport, ConsumerState, SendFileOct),
+ Consumers1 =
+ Consumers#{SubscriptionId =>
+ ConsumerState#consumer{segment = Segment1,
+ credit = Credit1}},
+ StreamSubscriptions1 =
+ case StreamSubscriptions of
+ #{Stream := SubscriptionIds} ->
+ StreamSubscriptions#{Stream =>
+ [SubscriptionId]
+ ++ SubscriptionIds};
+ _ ->
+ StreamSubscriptions#{Stream => [SubscriptionId]}
+ end,
+ {Connection1#stream_connection{stream_subscriptions =
StreamSubscriptions1},
- State#stream_connection_state{consumers =
- Consumers1},
+ State#stream_connection_state{consumers = Consumers1},
Rest}
end
end;
@@ -1507,16 +1216,14 @@ handle_frame_post_auth(Transport,
{Connection, State, Rest}
end;
handle_frame_post_auth(Transport,
- #stream_connection{stream_subscriptions =
- StreamSubscriptions} =
- Connection,
+ #stream_connection{stream_subscriptions = StreamSubscriptions} = Connection,
#stream_connection_state{consumers = Consumers} = State,
- <<(?COMMAND_UNSUBSCRIBE):16, (?VERSION_0):16,
- CorrelationId:32, SubscriptionId:8/unsigned>>,
+ <<?COMMAND_UNSUBSCRIBE:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ SubscriptionId:8/unsigned>>,
Rest) ->
- case subscription_exists(StreamSubscriptions,
- SubscriptionId)
- of
+ case subscription_exists(StreamSubscriptions, SubscriptionId) of
false ->
response(Transport,
Connection,
@@ -1527,95 +1234,72 @@ handle_frame_post_auth(Transport,
true ->
#{SubscriptionId := Consumer} = Consumers,
Stream = Consumer#consumer.stream,
- #{Stream := SubscriptionsForThisStream} =
- StreamSubscriptions,
- SubscriptionsForThisStream1 =
- lists:delete(SubscriptionId,
- SubscriptionsForThisStream),
- StreamSubscriptions1 = case
- length(SubscriptionsForThisStream1)
- of
- 0 ->
- % no more subscription for this stream
- maps:remove(Stream,
- StreamSubscriptions);
- _ ->
- StreamSubscriptions#{Stream =>
- SubscriptionsForThisStream1}
- end,
- Connection1 =
- Connection#stream_connection{stream_subscriptions =
- StreamSubscriptions1},
+ #{Stream := SubscriptionsForThisStream} = StreamSubscriptions,
+ SubscriptionsForThisStream1 = lists:delete(SubscriptionId, SubscriptionsForThisStream),
+ StreamSubscriptions1 =
+ case length(SubscriptionsForThisStream1) of
+ 0 ->
+ % no more subscription for this stream
+ maps:remove(Stream, StreamSubscriptions);
+ _ ->
+ StreamSubscriptions#{Stream => SubscriptionsForThisStream1}
+ end,
+ Connection1 = Connection#stream_connection{stream_subscriptions = StreamSubscriptions1},
Consumers1 = maps:remove(SubscriptionId, Consumers),
- Connection2 = maybe_clean_connection_from_stream(Stream,
- Connection1),
- response_ok(Transport,
- Connection,
- ?COMMAND_SUBSCRIBE,
- CorrelationId),
- {Connection2,
- State#stream_connection_state{consumers = Consumers1},
- Rest}
+ Connection2 = maybe_clean_connection_from_stream(Stream, Connection1),
+ response_ok(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId),
+ {Connection2, State#stream_connection_state{consumers = Consumers1}, Rest}
end;
handle_frame_post_auth(Transport,
- #stream_connection{socket = S,
- send_file_oct = SendFileOct} =
- Connection,
+ #stream_connection{socket = S, send_file_oct = SendFileOct} = Connection,
#stream_connection_state{consumers = Consumers} = State,
- <<(?COMMAND_CREDIT):16, (?VERSION_0):16,
- SubscriptionId:8/unsigned, Credit:16/signed>>,
+ <<?COMMAND_CREDIT:16,
+ ?VERSION_0:16,
+ SubscriptionId:8/unsigned,
+ Credit:16/signed>>,
Rest) ->
case Consumers of
#{SubscriptionId := Consumer} ->
#consumer{credit = AvailableCredit} = Consumer,
{{segment, Segment1}, {credit, Credit1}} =
- send_chunks(Transport,
- Consumer,
- AvailableCredit + Credit,
- SendFileOct),
- Consumer1 = Consumer#consumer{segment = Segment1,
- credit = Credit1},
+ send_chunks(Transport, Consumer, AvailableCredit + Credit, SendFileOct),
+ Consumer1 = Consumer#consumer{segment = Segment1, credit = Credit1},
{Connection,
- State#stream_connection_state{consumers =
- Consumers#{SubscriptionId =>
- Consumer1}},
+ State#stream_connection_state{consumers = Consumers#{SubscriptionId => Consumer1}},
Rest};
_ ->
- rabbit_log:warning("Giving credit to unknown subscription: "
- "~p~n",
- [SubscriptionId]),
- Frame = <<(?COMMAND_CREDIT):16, (?VERSION_0):16,
- (?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST):16,
- SubscriptionId:8>>,
+ rabbit_log:warning("Giving credit to unknown subscription: ~p~n", [SubscriptionId]),
+ Frame =
+ <<?COMMAND_CREDIT:16,
+ ?VERSION_0:16,
+ ?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST:16,
+ SubscriptionId:8>>,
FrameSize = byte_size(Frame),
Transport:send(S, [<<FrameSize:32>>, Frame]),
{Connection, State, Rest}
end;
handle_frame_post_auth(_Transport,
- #stream_connection{virtual_host = VirtualHost,
- user = User} =
- Connection,
+ #stream_connection{virtual_host = VirtualHost, user = User} = Connection,
State,
- <<(?COMMAND_COMMIT_OFFSET):16, (?VERSION_0):16,
- _CorrelationId:32, ReferenceSize:16,
- Reference:ReferenceSize/binary, StreamSize:16,
- Stream:StreamSize/binary, Offset:64>>,
+ <<?COMMAND_COMMIT_OFFSET:16,
+ ?VERSION_0:16,
+ _CorrelationId:32,
+ ReferenceSize:16,
+ Reference:ReferenceSize/binary,
+ StreamSize:16,
+ Stream:StreamSize/binary,
+ Offset:64>>,
Rest) ->
- case
- rabbit_stream_utils:check_write_permitted(#resource{name
- = Stream,
- kind = queue,
- virtual_host =
- VirtualHost},
- User,
- #{})
- of
+ case rabbit_stream_utils:check_write_permitted(#resource{name = Stream,
+ kind = queue,
+ virtual_host = VirtualHost},
+ User,
+ #{})
+ of
ok ->
case lookup_leader(Stream, Connection) of
cluster_not_found ->
- rabbit_log:info("Could not find leader to commit offset "
- "on ~p~n",
- [Stream]),
+ rabbit_log:info("Could not find leader to commit offset on ~p~n", [Stream]),
%% FIXME commit offset is fire-and-forget, so no response even if error, change this?
{Connection, State, Rest};
{ClusterLeader, Connection1} ->
@@ -1624,8 +1308,7 @@ handle_frame_post_auth(_Transport,
end;
error ->
%% FIXME commit offset is fire-and-forget, so no response even if error, change this?
- rabbit_log:info("Not authorized to commit offset on ~p~n",
- [Stream]),
+ rabbit_log:info("Not authorized to commit offset on ~p~n", [Stream]),
{Connection, State, Rest}
end;
handle_frame_post_auth(Transport,
@@ -1634,48 +1317,40 @@ handle_frame_post_auth(Transport,
user = User} =
Connection,
State,
- <<(?COMMAND_QUERY_OFFSET):16, (?VERSION_0):16,
- CorrelationId:32, ReferenceSize:16,
- Reference:ReferenceSize/binary, StreamSize:16,
+ <<?COMMAND_QUERY_OFFSET:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ ReferenceSize:16,
+ Reference:ReferenceSize/binary,
+ StreamSize:16,
Stream:StreamSize/binary>>,
Rest) ->
- FrameSize = (?RESPONSE_FRAME_SIZE) + 8,
- {ResponseCode, Offset} = case
- rabbit_stream_utils:check_read_permitted(#resource{name
- =
- Stream,
- kind
- =
- queue,
- virtual_host
- =
- VirtualHost},
- User,
- #{})
- of
- ok ->
- case
- rabbit_stream_manager:lookup_local_member(VirtualHost,
- Stream)
- of
- {error, not_found} ->
- {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST,
- 0};
- {ok, LocalMemberPid} ->
- {?RESPONSE_CODE_OK,
- case
- osiris:read_tracking(LocalMemberPid,
- Reference)
- of
- undefined -> 0;
- Offt -> Offt
- end}
- end;
- error -> {?RESPONSE_CODE_ACCESS_REFUSED, 0}
- end,
+ FrameSize = ?RESPONSE_FRAME_SIZE + 8,
+ {ResponseCode, Offset} =
+ case rabbit_stream_utils:check_read_permitted(#resource{name = Stream,
+ kind = queue,
+ virtual_host = VirtualHost},
+ User,
+ #{})
+ of
+ ok ->
+ case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of
+ {error, not_found} ->
+ {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0};
+ {ok, LocalMemberPid} ->
+ {?RESPONSE_CODE_OK,
+ case osiris:read_tracking(LocalMemberPid, Reference) of
+ undefined ->
+ 0;
+ Offt ->
+ Offt
+ end}
+ end;
+ error ->
+ {?RESPONSE_CODE_ACCESS_REFUSED, 0}
+ end,
Transport:send(S,
- [<<FrameSize:32, (?COMMAND_QUERY_OFFSET):16,
- (?VERSION_0):16>>,
+ [<<FrameSize:32, ?COMMAND_QUERY_OFFSET:16, ?VERSION_0:16>>,
<<CorrelationId:32>>,
<<ResponseCode:16>>,
<<Offset:64>>]),
@@ -1686,94 +1361,72 @@ handle_frame_post_auth(Transport,
user = User} =
Connection,
State,
- <<(?COMMAND_QUERY_PUBLISHER_SEQUENCE):16,
- (?VERSION_0):16, CorrelationId:32, ReferenceSize:16,
- Reference:ReferenceSize/binary, StreamSize:16,
+ <<?COMMAND_QUERY_PUBLISHER_SEQUENCE:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ ReferenceSize:16,
+ Reference:ReferenceSize/binary,
+ StreamSize:16,
Stream:StreamSize/binary>>,
Rest) ->
- FrameSize = (?RESPONSE_FRAME_SIZE) + 8,
- {ResponseCode, Sequence} = case
- rabbit_stream_utils:check_read_permitted(#resource{name
- =
- Stream,
- kind
- =
- queue,
- virtual_host
- =
- VirtualHost},
- User,
- #{})
- of
- ok ->
- case
- rabbit_stream_manager:lookup_local_member(VirtualHost,
- Stream)
- of
- {error, not_found} ->
- {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST,
- 0};
- {ok, LocalMemberPid} ->
- {?RESPONSE_CODE_OK,
- case
- osiris:fetch_writer_seq(LocalMemberPid,
- Reference)
- of
- undefined -> 0;
- Offt -> Offt
- end}
- end;
- error -> {?RESPONSE_CODE_ACCESS_REFUSED, 0}
- end,
+ FrameSize = ?RESPONSE_FRAME_SIZE + 8,
+ {ResponseCode, Sequence} =
+ case rabbit_stream_utils:check_read_permitted(#resource{name = Stream,
+ kind = queue,
+ virtual_host = VirtualHost},
+ User,
+ #{})
+ of
+ ok ->
+ case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of
+ {error, not_found} ->
+ {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0};
+ {ok, LocalMemberPid} ->
+ {?RESPONSE_CODE_OK,
+ case osiris:fetch_writer_seq(LocalMemberPid, Reference) of
+ undefined ->
+ 0;
+ Offt ->
+ Offt
+ end}
+ end;
+ error ->
+ {?RESPONSE_CODE_ACCESS_REFUSED, 0}
+ end,
Transport:send(S,
- [<<FrameSize:32, (?COMMAND_QUERY_PUBLISHER_SEQUENCE):16,
- (?VERSION_0):16>>,
+ [<<FrameSize:32, ?COMMAND_QUERY_PUBLISHER_SEQUENCE:16, ?VERSION_0:16>>,
<<CorrelationId:32>>,
<<ResponseCode:16>>,
<<Sequence:64>>]),
{Connection, State, Rest};
handle_frame_post_auth(Transport,
#stream_connection{virtual_host = VirtualHost,
- user =
- #user{username = Username} =
- User} =
+ user = #user{username = Username} = User} =
Connection,
State,
- <<(?COMMAND_CREATE_STREAM):16, (?VERSION_0):16,
- CorrelationId:32, StreamSize:16,
- Stream:StreamSize/binary, ArgumentsCount:32,
+ <<?COMMAND_CREATE_STREAM:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ StreamSize:16,
+ Stream:StreamSize/binary,
+ ArgumentsCount:32,
ArgumentsBinary/binary>>,
Rest) ->
- case
- rabbit_stream_utils:enforce_correct_stream_name(Stream)
- of
+ case rabbit_stream_utils:enforce_correct_stream_name(Stream) of
{ok, StreamName} ->
- {Arguments, _Rest} =
- rabbit_stream_utils:parse_map(ArgumentsBinary,
- ArgumentsCount),
- case
- rabbit_stream_utils:check_configure_permitted(#resource{name
- =
- StreamName,
- kind =
- queue,
- virtual_host
- =
- VirtualHost},
- User,
- #{})
- of
+ {Arguments, _Rest} = rabbit_stream_utils:parse_map(ArgumentsBinary, ArgumentsCount),
+ case rabbit_stream_utils:check_configure_permitted(#resource{name = StreamName,
+ kind = queue,
+ virtual_host =
+ VirtualHost},
+ User,
+ #{})
+ of
ok ->
- case rabbit_stream_manager:create(VirtualHost,
- StreamName,
- Arguments,
- Username)
- of
- {ok,
- #{leader_pid := LeaderPid,
- replica_pids := ReturnedReplicas}} ->
- rabbit_log:info("Created cluster with leader ~p and replicas "
- "~p~n",
+ case rabbit_stream_manager:create(VirtualHost, StreamName, Arguments, Username)
+ of
+ {ok, #{leader_pid := LeaderPid, replica_pids := ReturnedReplicas}} ->
+ rabbit_log:info("Created cluster with leader ~p and replicas ~p~n",
[LeaderPid, ReturnedReplicas]),
response_ok(Transport,
Connection,
@@ -1821,59 +1474,42 @@ handle_frame_post_auth(Transport,
handle_frame_post_auth(Transport,
#stream_connection{socket = S,
virtual_host = VirtualHost,
- user =
- #user{username = Username} =
- User} =
+ user = #user{username = Username} = User} =
Connection,
State,
- <<(?COMMAND_DELETE_STREAM):16, (?VERSION_0):16,
- CorrelationId:32, StreamSize:16,
+ <<?COMMAND_DELETE_STREAM:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ StreamSize:16,
Stream:StreamSize/binary>>,
Rest) ->
- case
- rabbit_stream_utils:check_configure_permitted(#resource{name
- = Stream,
- kind = queue,
- virtual_host =
- VirtualHost},
- User,
- #{})
- of
+ case rabbit_stream_utils:check_configure_permitted(#resource{name = Stream,
+ kind = queue,
+ virtual_host = VirtualHost},
+ User,
+ #{})
+ of
ok ->
- case rabbit_stream_manager:delete(VirtualHost,
- Stream,
- Username)
- of
+ case rabbit_stream_manager:delete(VirtualHost, Stream, Username) of
{ok, deleted} ->
- response_ok(Transport,
- Connection,
- ?COMMAND_DELETE_STREAM,
- CorrelationId),
- {Connection1, State1} = case
- clean_state_after_stream_deletion_or_failure(Stream,
- Connection,
- State)
- of
- {cleaned,
- NewConnection,
- NewState} ->
- StreamSize =
- byte_size(Stream),
- FrameSize = 2 + 2 + 2 + 2 +
- StreamSize,
- Transport:send(S,
- [<<FrameSize:32,
- (?COMMAND_METADATA_UPDATE):16,
- (?VERSION_0):16,
- (?RESPONSE_CODE_STREAM_NOT_AVAILABLE):16,
- StreamSize:16,
- Stream/binary>>]),
- {NewConnection, NewState};
- {not_cleaned,
- SameConnection,
- SameState} ->
- {SameConnection, SameState}
- end,
+ response_ok(Transport, Connection, ?COMMAND_DELETE_STREAM, CorrelationId),
+ {Connection1, State1} =
+ case clean_state_after_stream_deletion_or_failure(Stream, Connection, State)
+ of
+ {cleaned, NewConnection, NewState} ->
+ StreamSize = byte_size(Stream),
+ FrameSize = 2 + 2 + 2 + 2 + StreamSize,
+ Transport:send(S,
+ [<<FrameSize:32,
+ ?COMMAND_METADATA_UPDATE:16,
+ ?VERSION_0:16,
+ ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16,
+ StreamSize:16,
+ Stream/binary>>]),
+ {NewConnection, NewState};
+ {not_cleaned, SameConnection, SameState} ->
+ {SameConnection, SameState}
+ end,
{Connection1, State1, Rest};
{error, reference_not_found} ->
response(Transport,
@@ -1892,571 +1528,437 @@ handle_frame_post_auth(Transport,
{Connection, State, Rest}
end;
handle_frame_post_auth(Transport,
- #stream_connection{socket = S,
- virtual_host = VirtualHost} =
- Connection,
+ #stream_connection{socket = S, virtual_host = VirtualHost} = Connection,
State,
- <<(?COMMAND_METADATA):16, (?VERSION_0):16,
- CorrelationId:32, StreamCount:32,
+ <<?COMMAND_METADATA:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ StreamCount:32,
BinaryStreams/binary>>,
Rest) ->
- Streams =
- rabbit_stream_utils:extract_stream_list(BinaryStreams,
- []),
+ Streams = rabbit_stream_utils:extract_stream_list(BinaryStreams, []),
%% get the nodes involved in the streams
- NodesMap = lists:foldl(fun (Stream, Acc) ->
- case
- rabbit_stream_manager:topology(VirtualHost,
- Stream)
- of
- {ok,
- #{leader_node := undefined,
- replica_nodes := ReplicaNodes}} ->
- lists:foldl(fun (ReplicaNode,
- NodesAcc) ->
- maps:put(ReplicaNode,
- ok,
- NodesAcc)
- end,
- Acc,
- ReplicaNodes);
- {ok,
- #{leader_node := LeaderNode,
- replica_nodes := ReplicaNodes}} ->
- Acc1 = maps:put(LeaderNode, ok, Acc),
- lists:foldl(fun (ReplicaNode,
- NodesAcc) ->
- maps:put(ReplicaNode,
- ok,
- NodesAcc)
- end,
- Acc1,
- ReplicaNodes);
- {error, _} -> Acc
- end
- end,
- #{},
- Streams),
+ NodesMap =
+ lists:foldl(fun(Stream, Acc) ->
+ case rabbit_stream_manager:topology(VirtualHost, Stream) of
+ {ok, #{leader_node := undefined, replica_nodes := ReplicaNodes}} ->
+ lists:foldl(fun(ReplicaNode, NodesAcc) ->
+ maps:put(ReplicaNode, ok, NodesAcc)
+ end,
+ Acc,
+ ReplicaNodes);
+ {ok, #{leader_node := LeaderNode, replica_nodes := ReplicaNodes}} ->
+ Acc1 = maps:put(LeaderNode, ok, Acc),
+ lists:foldl(fun(ReplicaNode, NodesAcc) ->
+ maps:put(ReplicaNode, ok, NodesAcc)
+ end,
+ Acc1,
+ ReplicaNodes);
+ {error, _} -> Acc
+ end
+ end,
+ #{},
+ Streams),
Nodes = maps:keys(NodesMap),
- {NodesInfo, _} = lists:foldl(fun (Node, {Acc, Index}) ->
- Host = rpc:call(Node,
- rabbit_stream,
- host,
- []),
- Port = rpc:call(Node,
- rabbit_stream,
- port,
- []),
- case {is_binary(Host),
- is_integer(Port)}
- of
- {true, true} ->
- {Acc#{Node =>
- {{index, Index},
- {host, Host},
- {port, Port}}},
- Index + 1};
- _ ->
- rabbit_log:warning("Error when retrieving broker metadata: "
- "~p ~p~n",
- [Host,
- Port]),
- {Acc, Index}
- end
- end,
- {#{}, 0},
- Nodes),
+ {NodesInfo, _} =
+ lists:foldl(fun(Node, {Acc, Index}) ->
+ Host = rpc:call(Node, rabbit_stream, host, []),
+ Port = rpc:call(Node, rabbit_stream, port, []),
+ case {is_binary(Host), is_integer(Port)} of
+ {true, true} ->
+ {Acc#{Node => {{index, Index}, {host, Host}, {port, Port}}},
+ Index + 1};
+ _ ->
+ rabbit_log:warning("Error when retrieving broker metadata: ~p ~p~n",
+ [Host, Port]),
+ {Acc, Index}
+ end
+ end,
+ {#{}, 0},
+ Nodes),
BrokersCount = map_size(NodesInfo),
- BrokersBin = maps:fold(fun (_K,
- {{index, Index}, {host, Host}, {port, Port}},
- Acc) ->
- HostLength = byte_size(Host),
- <<Acc/binary, Index:16, HostLength:16,
- Host:HostLength/binary, Port:32>>
- end,
- <<BrokersCount:32>>,
- NodesInfo),
- MetadataBin = lists:foldl(fun (Stream, Acc) ->
- StreamLength = byte_size(Stream),
- case
- rabbit_stream_manager:topology(VirtualHost,
- Stream)
- of
- {error, stream_not_found} ->
- <<Acc/binary, StreamLength:16,
- Stream:StreamLength/binary,
- (?RESPONSE_CODE_STREAM_DOES_NOT_EXIST):16,
- (-1):16, 0:32>>;
- {error, stream_not_available} ->
- <<Acc/binary, StreamLength:16,
- Stream:StreamLength/binary,
- (?RESPONSE_CODE_STREAM_NOT_AVAILABLE):16,
- (-1):16, 0:32>>;
- {ok,
- #{leader_node := LeaderNode,
- replica_nodes := Replicas}} ->
- LeaderIndex = case NodesInfo of
- #{LeaderNode :=
- NodeInfo} ->
- {{index,
- LeaderIdx},
- {host, _},
- {port,
- _}} =
- NodeInfo,
- LeaderIdx;
- _ -> -1
- end,
- {ReplicasBinary, ReplicasCount} =
- lists:foldl(fun (Replica,
- {Bin,
- Count}) ->
- case
- NodesInfo
- of
- #{Replica
- :=
- NI} ->
- {{index,
- ReplicaIndex},
- {host,
- _},
- {port,
- _}} =
- NI,
- {<<Bin/binary,
- ReplicaIndex:16>>,
- Count
- +
- 1};
- _ ->
- {Bin,
- Count}
- end
- end,
- {<<>>, 0},
- Replicas),
- <<Acc/binary, StreamLength:16,
- Stream:StreamLength/binary,
- (?RESPONSE_CODE_OK):16,
- LeaderIndex:16,
- ReplicasCount:32,
- ReplicasBinary/binary>>
- end
- end,
- <<StreamCount:32>>,
- Streams),
- Frame = <<(?COMMAND_METADATA):16, (?VERSION_0):16,
- CorrelationId:32, BrokersBin/binary,
- MetadataBin/binary>>,
+ BrokersBin =
+ maps:fold(fun(_K, {{index, Index}, {host, Host}, {port, Port}}, Acc) ->
+ HostLength = byte_size(Host),
+ <<Acc/binary, Index:16, HostLength:16, Host:HostLength/binary, Port:32>>
+ end,
+ <<BrokersCount:32>>,
+ NodesInfo),
+ MetadataBin =
+ lists:foldl(fun(Stream, Acc) ->
+ StreamLength = byte_size(Stream),
+ case rabbit_stream_manager:topology(VirtualHost, Stream) of
+ {error, stream_not_found} ->
+ <<Acc/binary,
+ StreamLength:16,
+ Stream:StreamLength/binary,
+ ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST:16,
+ (-1):16,
+ 0:32>>;
+ {error, stream_not_available} ->
+ <<Acc/binary,
+ StreamLength:16,
+ Stream:StreamLength/binary,
+ ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16,
+ (-1):16,
+ 0:32>>;
+ {ok, #{leader_node := LeaderNode, replica_nodes := Replicas}} ->
+ LeaderIndex =
+ case NodesInfo of
+ #{LeaderNode := NodeInfo} ->
+ {{index, LeaderIdx}, {host, _}, {port, _}} = NodeInfo,
+ LeaderIdx;
+ _ -> -1
+ end,
+ {ReplicasBinary, ReplicasCount} =
+ lists:foldl(fun(Replica, {Bin, Count}) ->
+ case NodesInfo of
+ #{Replica := NI} ->
+ {{index, ReplicaIndex},
+ {host, _},
+ {port, _}} =
+ NI,
+ {<<Bin/binary, ReplicaIndex:16>>,
+ Count + 1};
+ _ -> {Bin, Count}
+ end
+ end,
+ {<<>>, 0},
+ Replicas),
+ <<Acc/binary,
+ StreamLength:16,
+ Stream:StreamLength/binary,
+ ?RESPONSE_CODE_OK:16,
+ LeaderIndex:16,
+ ReplicasCount:32,
+ ReplicasBinary/binary>>
+ end
+ end,
+ <<StreamCount:32>>,
+ Streams),
+ Frame =
+ <<?COMMAND_METADATA:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ BrokersBin/binary,
+ MetadataBin/binary>>,
FrameSize = byte_size(Frame),
Transport:send(S, <<FrameSize:32, Frame/binary>>),
{Connection, State, Rest};
-handle_frame_post_auth(Transport, Connection, State,
- <<(?COMMAND_CLOSE):16, (?VERSION_0):16,
- CorrelationId:32, ClosingCode:16,
+handle_frame_post_auth(Transport,
+ Connection,
+ State,
+ <<?COMMAND_CLOSE:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ ClosingCode:16,
ClosingReasonLength:16,
ClosingReason:ClosingReasonLength/binary>>,
_Rest) ->
- rabbit_log:info("Received close command ~p ~p~n",
- [ClosingCode, ClosingReason]),
- Frame = <<(?COMMAND_CLOSE):16, (?VERSION_0):16,
- CorrelationId:32, (?RESPONSE_CODE_OK):16>>,
+ rabbit_log:info("Received close command ~p ~p~n", [ClosingCode, ClosingReason]),
+ Frame = <<?COMMAND_CLOSE:16, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_CODE_OK:16>>,
frame(Transport, Connection, Frame),
- {Connection#stream_connection{connection_step =
- closing},
+ {Connection#stream_connection{connection_step = closing},
State,
<<>>}; %% we ignore any subsequent frames
-handle_frame_post_auth(_Transport, Connection, State,
- <<(?COMMAND_HEARTBEAT):16, (?VERSION_0):16>>, Rest) ->
+handle_frame_post_auth(_Transport,
+ Connection,
+ State,
+ <<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>,
+ Rest) ->
rabbit_log:info("Received heartbeat frame post auth~n"),
{Connection, State, Rest};
-handle_frame_post_auth(Transport, Connection, State,
- Frame, Rest) ->
- rabbit_log:warning("unknown frame ~p ~p, sending close command.~n",
- [Frame, Rest]),
+handle_frame_post_auth(Transport, Connection, State, Frame, Rest) ->
+ rabbit_log:warning("unknown frame ~p ~p, sending close command.~n", [Frame, Rest]),
CloseReason = <<"unknown frame">>,
CloseReasonLength = byte_size(CloseReason),
- CloseFrame = <<(?COMMAND_CLOSE):16, (?VERSION_0):16,
- 1:32, (?RESPONSE_CODE_UNKNOWN_FRAME):16,
- CloseReasonLength:16,
- CloseReason:CloseReasonLength/binary>>,
+ CloseFrame =
+ <<?COMMAND_CLOSE:16,
+ ?VERSION_0:16,
+ 1:32,
+ ?RESPONSE_CODE_UNKNOWN_FRAME:16,
+ CloseReasonLength:16,
+ CloseReason:CloseReasonLength/binary>>,
frame(Transport, Connection, CloseFrame),
- {Connection#stream_connection{connection_step =
- close_sent},
- State,
- Rest}.
+ {Connection#stream_connection{connection_step = close_sent}, State, Rest}.
-notify_connection_closed(#stream_connection{name =
- Name} =
- Connection,
- ConnectionState) ->
+notify_connection_closed(#stream_connection{name = Name} = Connection, ConnectionState) ->
rabbit_core_metrics:connection_closed(self()),
- ClientProperties = i(client_properties,
- Connection,
- ConnectionState),
- EventProperties = [{name, Name},
- {pid, self()},
- {node, node()},
- {client_properties, ClientProperties}],
+ ClientProperties = i(client_properties, Connection, ConnectionState),
+ EventProperties =
+ [{name, Name}, {pid, self()}, {node, node()}, {client_properties, ClientProperties}],
rabbit_event:notify(connection_closed,
augment_infos_with_user_provided_connection_name(EventProperties,
Connection)).
-handle_frame_post_close(_Transport, Connection, State,
- <<(?COMMAND_CLOSE):16, (?VERSION_0):16,
- _CorrelationId:32, _ResponseCode:16>>,
+handle_frame_post_close(_Transport,
+ Connection,
+ State,
+ <<?COMMAND_CLOSE:16, ?VERSION_0:16, _CorrelationId:32, _ResponseCode:16>>,
Rest) ->
rabbit_log:info("Received close confirmation~n"),
- {Connection#stream_connection{connection_step =
- closing_done},
- State,
- Rest};
-handle_frame_post_close(_Transport, Connection, State,
- <<(?COMMAND_HEARTBEAT):16, (?VERSION_0):16>>, Rest) ->
+ {Connection#stream_connection{connection_step = closing_done}, State, Rest};
+handle_frame_post_close(_Transport,
+ Connection,
+ State,
+ <<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>,
+ Rest) ->
rabbit_log:info("Received heartbeat frame post close~n"),
{Connection, State, Rest};
-handle_frame_post_close(_Transport, Connection, State,
- Frame, Rest) ->
- rabbit_log:warning("ignored frame on close ~p ~p.~n",
- [Frame, Rest]),
+handle_frame_post_close(_Transport, Connection, State, Frame, Rest) ->
+ rabbit_log:warning("ignored frame on close ~p ~p.~n", [Frame, Rest]),
{Connection, State, Rest}.
clean_state_after_stream_deletion_or_failure(Stream,
- #stream_connection{stream_subscriptions
- =
+ #stream_connection{stream_subscriptions =
StreamSubscriptions,
- publishers =
- Publishers,
- publisher_to_ids
- =
- PublisherToIds,
- stream_leaders =
- Leaders} =
+ publishers = Publishers,
+ publisher_to_ids = PublisherToIds,
+ stream_leaders = Leaders} =
C0,
- #stream_connection_state{consumers
- =
- Consumers} =
+ #stream_connection_state{consumers = Consumers} =
S0) ->
- {SubscriptionsCleaned, C1, S1} = case
- stream_has_subscriptions(Stream, C0)
- of
- true ->
- #{Stream := SubscriptionIds} =
- StreamSubscriptions,
- {true,
- C0#stream_connection{stream_subscriptions
- =
- maps:remove(Stream,
- StreamSubscriptions)},
- S0#stream_connection_state{consumers
- =
- maps:without(SubscriptionIds,
- Consumers)}};
- false -> {false, C0, S0}
- end,
- {PublishersCleaned, C2, S2} = case
- stream_has_publishers(Stream, C1)
- of
- true ->
- {PurgedPubs, PurgedPubToIds} =
- maps:fold(fun (PubId,
- #publisher{stream =
- S,
- reference
- =
- Ref},
- {Pubs,
- PubToIds}) ->
- case S of
- Stream ->
- {maps:remove(PubId,
- Pubs),
- maps:remove({Stream,
- Ref},
- PubToIds)};
- _ ->
- {Pubs,
- PubToIds}
- end
- end,
- {Publishers,
- PublisherToIds},
- Publishers),
- {true,
- C1#stream_connection{publishers =
- PurgedPubs,
- publisher_to_ids
- =
- PurgedPubToIds},
- S1};
- false -> {false, C1, S1}
- end,
- {LeadersCleaned, Leaders1} = case Leaders of
- #{Stream := _} ->
- {true, maps:remove(Stream, Leaders)};
- _ -> {false, Leaders}
- end,
- case SubscriptionsCleaned orelse
- PublishersCleaned orelse LeadersCleaned
- of
+ {SubscriptionsCleaned, C1, S1} =
+ case stream_has_subscriptions(Stream, C0) of
+ true ->
+ #{Stream := SubscriptionIds} = StreamSubscriptions,
+ {true,
+ C0#stream_connection{stream_subscriptions =
+ maps:remove(Stream, StreamSubscriptions)},
+ S0#stream_connection_state{consumers = maps:without(SubscriptionIds, Consumers)}};
+ false ->
+ {false, C0, S0}
+ end,
+ {PublishersCleaned, C2, S2} =
+ case stream_has_publishers(Stream, C1) of
+ true ->
+ {PurgedPubs, PurgedPubToIds} =
+ maps:fold(fun(PubId,
+ #publisher{stream = S, reference = Ref},
+ {Pubs, PubToIds}) ->
+ case S of
+ Stream ->
+ {maps:remove(PubId, Pubs),
+ maps:remove({Stream, Ref}, PubToIds)};
+ _ -> {Pubs, PubToIds}
+ end
+ end,
+ {Publishers, PublisherToIds},
+ Publishers),
+ {true,
+ C1#stream_connection{publishers = PurgedPubs, publisher_to_ids = PurgedPubToIds},
+ S1};
+ false ->
+ {false, C1, S1}
+ end,
+ {LeadersCleaned, Leaders1} =
+ case Leaders of
+ #{Stream := _} ->
+ {true, maps:remove(Stream, Leaders)};
+ _ ->
+ {false, Leaders}
+ end,
+ case SubscriptionsCleaned orelse PublishersCleaned orelse LeadersCleaned of
true ->
C3 = demonitor_stream(Stream, C2),
- {cleaned,
- C3#stream_connection{stream_leaders = Leaders1},
- S2};
+ {cleaned, C3#stream_connection{stream_leaders = Leaders1}, S2};
false ->
- {not_cleaned,
- C2#stream_connection{stream_leaders = Leaders1},
- S2}
+ {not_cleaned, C2#stream_connection{stream_leaders = Leaders1}, S2}
end.
lookup_leader(Stream,
- #stream_connection{stream_leaders = StreamLeaders,
- virtual_host = VirtualHost} =
+ #stream_connection{stream_leaders = StreamLeaders, virtual_host = VirtualHost} =
Connection) ->
case maps:get(Stream, StreamLeaders, undefined) of
undefined ->
case lookup_leader_from_manager(VirtualHost, Stream) of
- cluster_not_found -> cluster_not_found;
+ cluster_not_found ->
+ cluster_not_found;
LeaderPid ->
- Connection1 = maybe_monitor_stream(LeaderPid,
- Stream,
- Connection),
+ Connection1 = maybe_monitor_stream(LeaderPid, Stream, Connection),
{LeaderPid,
Connection1#stream_connection{stream_leaders =
- StreamLeaders#{Stream =>
- LeaderPid}}}
+ StreamLeaders#{Stream => LeaderPid}}}
end;
- LeaderPid -> {LeaderPid, Connection}
+ LeaderPid ->
+ {LeaderPid, Connection}
end.
lookup_leader_from_manager(VirtualHost, Stream) ->
- rabbit_stream_manager:lookup_leader(VirtualHost,
- Stream).
+ rabbit_stream_manager:lookup_leader(VirtualHost, Stream).
maybe_clean_connection_from_stream(Stream,
- #stream_connection{stream_leaders =
- Leaders} =
- Connection0) ->
- Connection1 = case {stream_has_publishers(Stream,
- Connection0),
- stream_has_subscriptions(Stream, Connection0)}
- of
- {false, false} -> demonitor_stream(Stream, Connection0);
- _ -> Connection0
- end,
- Connection1#stream_connection{stream_leaders =
- maps:remove(Stream, Leaders)}.
-
-maybe_monitor_stream(Pid, Stream,
- #stream_connection{monitors = Monitors} = Connection) ->
+ #stream_connection{stream_leaders = Leaders} = Connection0) ->
+ Connection1 =
+ case {stream_has_publishers(Stream, Connection0),
+ stream_has_subscriptions(Stream, Connection0)}
+ of
+ {false, false} ->
+ demonitor_stream(Stream, Connection0);
+ _ ->
+ Connection0
+ end,
+ Connection1#stream_connection{stream_leaders = maps:remove(Stream, Leaders)}.
+
+maybe_monitor_stream(Pid, Stream, #stream_connection{monitors = Monitors} = Connection) ->
case lists:member(Stream, maps:values(Monitors)) of
- true -> Connection;
+ true ->
+ Connection;
false ->
MonitorRef = monitor(process, Pid),
- Connection#stream_connection{monitors =
- maps:put(MonitorRef,
- Stream,
- Monitors)}
+ Connection#stream_connection{monitors = maps:put(MonitorRef, Stream, Monitors)}
end.
-demonitor_stream(Stream,
- #stream_connection{monitors = Monitors0} =
- Connection) ->
- Monitors = maps:fold(fun (MonitorRef, Strm, Acc) ->
- case Strm of
- Stream ->
- demonitor(MonitorRef, [flush]),
- Acc;
- _ -> maps:put(MonitorRef, Strm, Acc)
- end
- end,
- #{},
- Monitors0),
+demonitor_stream(Stream, #stream_connection{monitors = Monitors0} = Connection) ->
+ Monitors =
+ maps:fold(fun(MonitorRef, Strm, Acc) ->
+ case Strm of
+ Stream ->
+ demonitor(MonitorRef, [flush]),
+ Acc;
+ _ -> maps:put(MonitorRef, Strm, Acc)
+ end
+ end,
+ #{},
+ Monitors0),
Connection#stream_connection{monitors = Monitors}.
stream_has_subscriptions(Stream,
- #stream_connection{stream_subscriptions =
- Subscriptions}) ->
+ #stream_connection{stream_subscriptions = Subscriptions}) ->
case Subscriptions of
- #{Stream := StreamSubscriptions}
- when length(StreamSubscriptions) > 0 ->
+ #{Stream := StreamSubscriptions} when length(StreamSubscriptions) > 0 ->
true;
- _ -> false
+ _ ->
+ false
end.
-stream_has_publishers(Stream,
- #stream_connection{publishers = Publishers}) ->
- lists:any(fun (#publisher{stream = S}) ->
- case S of
- Stream -> true;
- _ -> false
- end
+stream_has_publishers(Stream, #stream_connection{publishers = Publishers}) ->
+ lists:any(fun(#publisher{stream = S}) ->
+ case S of
+ Stream -> true;
+ _ -> false
+ end
end,
maps:values(Publishers)).
-demonitor_all_streams(#stream_connection{monitors =
- Monitors} =
- Connection) ->
- lists:foreach(fun (MonitorRef) ->
- demonitor(MonitorRef, [flush])
- end,
- maps:keys(Monitors)),
+demonitor_all_streams(#stream_connection{monitors = Monitors} = Connection) ->
+ lists:foreach(fun(MonitorRef) -> demonitor(MonitorRef, [flush]) end, maps:keys(Monitors)),
Connection#stream_connection{monitors = #{}}.
-frame(Transport, #stream_connection{socket = S},
- Frame) ->
+frame(Transport, #stream_connection{socket = S}, Frame) ->
FrameSize = byte_size(Frame),
Transport:send(S, [<<FrameSize:32>>, Frame]).
-response_ok(Transport, State, CommandId,
- CorrelationId) ->
- response(Transport,
- State,
- CommandId,
- CorrelationId,
- ?RESPONSE_CODE_OK).
+response_ok(Transport, State, CommandId, CorrelationId) ->
+ response(Transport, State, CommandId, CorrelationId, ?RESPONSE_CODE_OK).
-response(Transport, #stream_connection{socket = S},
- CommandId, CorrelationId, ResponseCode) ->
+response(Transport,
+ #stream_connection{socket = S},
+ CommandId,
+ CorrelationId,
+ ResponseCode) ->
Transport:send(S,
- [<<(?RESPONSE_FRAME_SIZE):32, CommandId:16,
- (?VERSION_0):16>>,
+ [<<?RESPONSE_FRAME_SIZE:32, CommandId:16, ?VERSION_0:16>>,
<<CorrelationId:32>>,
<<ResponseCode:16>>]).
-subscription_exists(StreamSubscriptions,
- SubscriptionId) ->
+subscription_exists(StreamSubscriptions, SubscriptionId) ->
SubscriptionIds =
- lists:flatten(maps:values(StreamSubscriptions)),
- lists:any(fun (Id) -> Id =:= SubscriptionId end,
- SubscriptionIds).
+ lists:flatten(
+ maps:values(StreamSubscriptions)),
+ lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds).
send_file_callback(Transport,
#consumer{socket = S, subscription_id = SubscriptionId},
Counter) ->
- fun (Size) ->
- FrameSize = 2 + 2 + 1 + Size,
- FrameBeginning = <<FrameSize:32, (?COMMAND_DELIVER):16,
- (?VERSION_0):16, SubscriptionId:8/unsigned>>,
- Transport:send(S, FrameBeginning),
- atomics:add(Counter, 1, Size)
+ fun(Size) ->
+ FrameSize = 2 + 2 + 1 + Size,
+ FrameBeginning =
+ <<FrameSize:32, ?COMMAND_DELIVER:16, ?VERSION_0:16, SubscriptionId:8/unsigned>>,
+ Transport:send(S, FrameBeginning),
+ atomics:add(Counter, 1, Size)
end.
-send_chunks(Transport,
- #consumer{credit = Credit} = State, Counter) ->
+send_chunks(Transport, #consumer{credit = Credit} = State, Counter) ->
send_chunks(Transport, State, Credit, Counter).
-send_chunks(_Transport, #consumer{segment = Segment}, 0,
- _Counter) ->
+send_chunks(_Transport, #consumer{segment = Segment}, 0, _Counter) ->
{{segment, Segment}, {credit, 0}};
-send_chunks(Transport,
- #consumer{segment = Segment} = State, Credit,
- Counter) ->
- send_chunks(Transport,
- State,
- Segment,
- Credit,
- true,
- Counter).
+send_chunks(Transport, #consumer{segment = Segment} = State, Credit, Counter) ->
+ send_chunks(Transport, State, Segment, Credit, true, Counter).
-send_chunks(_Transport, _State, Segment, 0 = _Credit,
- _Retry, _Counter) ->
+send_chunks(_Transport, _State, Segment, 0 = _Credit, _Retry, _Counter) ->
{{segment, Segment}, {credit, 0}};
-send_chunks(Transport, #consumer{socket = S} = State,
- Segment, Credit, Retry, Counter) ->
- case osiris_log:send_file(S,
- Segment,
- send_file_callback(Transport, State, Counter))
- of
+send_chunks(Transport, #consumer{socket = S} = State, Segment, Credit, Retry, Counter) ->
+ case osiris_log:send_file(S, Segment, send_file_callback(Transport, State, Counter)) of
{ok, Segment1} ->
- send_chunks(Transport,
- State,
- Segment1,
- Credit - 1,
- true,
- Counter);
+ send_chunks(Transport, State, Segment1, Credit - 1, true, Counter);
{end_of_stream, Segment1} ->
case Retry of
true ->
timer:sleep(1),
- send_chunks(Transport,
- State,
- Segment1,
- Credit,
- false,
- Counter);
+ send_chunks(Transport, State, Segment1, Credit, false, Counter);
false ->
#consumer{member_pid = LocalMember} = State,
- osiris:register_offset_listener(LocalMember,
- osiris_log:next_offset(Segment1)),
+ osiris:register_offset_listener(LocalMember, osiris_log:next_offset(Segment1)),
{{segment, Segment1}, {credit, Credit}}
end
end.
emit_stats(Connection, ConnectionState) ->
- [{_, Pid},
- {_, Recv_oct},
- {_, Send_oct},
- {_, Reductions}] =
+ [{_, Pid}, {_, Recv_oct}, {_, Send_oct}, {_, Reductions}] =
I = infos(?SIMPLE_METRICS, Connection, ConnectionState),
- Infos = infos(?OTHER_METRICS,
- Connection,
- ConnectionState),
+ Infos = infos(?OTHER_METRICS, Connection, ConnectionState),
rabbit_core_metrics:connection_stats(Pid, Infos),
- rabbit_core_metrics:connection_stats(Pid,
- Recv_oct,
- Send_oct,
- Reductions),
+ rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions),
rabbit_event:notify(connection_stats, Infos ++ I),
- Connection1 = rabbit_event:reset_stats_timer(Connection,
- #stream_connection.stats_timer),
+ Connection1 = rabbit_event:reset_stats_timer(Connection, #stream_connection.stats_timer),
ensure_stats_timer(Connection1).
ensure_stats_timer(Connection = #stream_connection{}) ->
- rabbit_event:ensure_stats_timer(Connection,
- #stream_connection.stats_timer,
- emit_stats).
+ rabbit_event:ensure_stats_timer(Connection, #stream_connection.stats_timer, emit_stats).
info(Pid, InfoItems) ->
- case InfoItems -- (?INFO_ITEMS) of
- [] -> gen_server2:call(Pid, {info, InfoItems});
- UnknownItems -> throw({bad_argument, UnknownItems})
+ case InfoItems -- ?INFO_ITEMS of
+ [] ->
+ gen_server2:call(Pid, {info, InfoItems});
+ UnknownItems ->
+ throw({bad_argument, UnknownItems})
end.
infos(Items, Connection, State) ->
[{Item, i(Item, Connection, State)} || Item <- Items].
-i(pid, _, _) -> self();
-i(node, _, _) -> node();
-i(SockStat,
- #stream_connection{socket = Sock,
- send_file_oct = Counter},
- _)
- when SockStat =:=
- send_oct -> % Number of bytes sent from the socket.
+i(pid, _, _) ->
+ self();
+i(node, _, _) ->
+ node();
+i(SockStat, #stream_connection{socket = Sock, send_file_oct = Counter}, _)
+ when SockStat
+ =:= send_oct -> % Number of bytes sent from the socket.
case rabbit_net:getstat(Sock, [SockStat]) of
{ok, [{_, N}]} when is_number(N) ->
N + atomics:get(Counter, 1);
- _ -> 0 + atomics:get(Counter, 1)
+ _ ->
+ 0 + atomics:get(Counter, 1)
end;
i(SockStat, #stream_connection{socket = Sock}, _)
- when SockStat =:=
- recv_oct; % Number of bytes received by the socket.
- SockStat =:=
- recv_cnt; % Number of packets received by the socket.
- SockStat =:=
- send_cnt; % Number of packets sent from the socket.
- SockStat =:=
- send_pend -> % Number of bytes waiting to be sent by the socket.
+ when SockStat
+ =:= recv_oct; % Number of bytes received by the socket.
+ SockStat
+ =:= recv_cnt; % Number of packets received by the socket.
+ SockStat
+ =:= send_cnt; % Number of packets sent from the socket.
+ SockStat
+ =:= send_pend -> % Number of bytes waiting to be sent by the socket.
case rabbit_net:getstat(Sock, [SockStat]) of
- {ok, [{_, N}]} when is_number(N) -> N;
- _ -> 0
+ {ok, [{_, N}]} when is_number(N) ->
+ N;
+ _ ->
+ 0
end;
i(reductions, _, _) ->
- {reductions, Reductions} = erlang:process_info(self(),
- reductions),
+ {reductions, Reductions} = erlang:process_info(self(), reductions),
Reductions;
i(garbage_collection, _, _) ->
rabbit_misc:get_gc_info(self());
@@ -2468,59 +1970,59 @@ i(name, Connection, ConnectionState) ->
i(conn_name, Connection, ConnectionState);
i(conn_name, #stream_connection{name = Name}, _) ->
Name;
-i(port, #stream_connection{port = Port}, _) -> Port;
-i(peer_port, #stream_connection{peer_port = PeerPort},
- _) ->
+i(port, #stream_connection{port = Port}, _) ->
+ Port;
+i(peer_port, #stream_connection{peer_port = PeerPort}, _) ->
PeerPort;
-i(host, #stream_connection{host = Host}, _) -> Host;
-i(peer_host, #stream_connection{peer_host = PeerHost},
- _) ->
+i(host, #stream_connection{host = Host}, _) ->
+ Host;
+i(peer_host, #stream_connection{peer_host = PeerHost}, _) ->
PeerHost;
-i(ssl, _, _) -> false;
-i(peer_cert_subject, _, _) -> '';
-i(peer_cert_issuer, _, _) -> '';
-i(peer_cert_validity, _, _) -> '';
-i(ssl_protocol, _, _) -> '';
-i(ssl_key_exchange, _, _) -> '';
-i(ssl_cipher, _, _) -> '';
-i(ssl_hash, _, _) -> '';
-i(channels, _, _) -> 0;
-i(protocol, _, _) -> {<<"stream">>, ""};
-i(user_who_performed_action, Connection,
- ConnectionState) ->
+i(ssl, _, _) ->
+ false;
+i(peer_cert_subject, _, _) ->
+ '';
+i(peer_cert_issuer, _, _) ->
+ '';
+i(peer_cert_validity, _, _) ->
+ '';
+i(ssl_protocol, _, _) ->
+ '';
+i(ssl_key_exchange, _, _) ->
+ '';
+i(ssl_cipher, _, _) ->
+ '';
+i(ssl_hash, _, _) ->
+ '';
+i(channels, _, _) ->
+ 0;
+i(protocol, _, _) ->
+ {<<"stream">>, ""};
+i(user_who_performed_action, Connection, ConnectionState) ->
i(user, Connection, ConnectionState);
i(user, #stream_connection{user = U}, _) ->
U#user.username;
-i(vhost, #stream_connection{virtual_host = VirtualHost},
- _) ->
+i(vhost, #stream_connection{virtual_host = VirtualHost}, _) ->
VirtualHost;
-i(subscriptions, _,
- #stream_connection_state{consumers = Consumers}) ->
+i(subscriptions, _, #stream_connection_state{consumers = Consumers}) ->
maps:size(Consumers);
-i(connection_state, _Connection,
- #stream_connection_state{blocked = true}) ->
+i(connection_state, _Connection, #stream_connection_state{blocked = true}) ->
blocked;
-i(connection_state, _Connection,
- #stream_connection_state{blocked = false}) ->
+i(connection_state, _Connection, #stream_connection_state{blocked = false}) ->
running;
-i(auth_mechanism,
- #stream_connection{auth_mechanism = none}, _) ->
+i(auth_mechanism, #stream_connection{auth_mechanism = none}, _) ->
none;
-i(auth_mechanism,
- #stream_connection{auth_mechanism = {Name, _Mod}}, _) ->
+i(auth_mechanism, #stream_connection{auth_mechanism = {Name, _Mod}}, _) ->
Name;
-i(heartbeat, #stream_connection{heartbeat = Heartbeat},
- _) ->
+i(heartbeat, #stream_connection{heartbeat = Heartbeat}, _) ->
Heartbeat;
-i(frame_max, #stream_connection{frame_max = FrameMax},
- _) ->
+i(frame_max, #stream_connection{frame_max = FrameMax}, _) ->
FrameMax;
-i(channel_max, _, _) -> 0;
-i(client_properties,
- #stream_connection{client_properties = CP}, _) ->
+i(channel_max, _, _) ->
+ 0;
+i(client_properties, #stream_connection{client_properties = CP}, _) ->
rabbit_misc:to_amqp_table(CP);
-i(connected_at, #stream_connection{connected_at = T},
- _) ->
+i(connected_at, #stream_connection{connected_at = T}, _) ->
T;
i(Item, #stream_connection{}, _) ->
throw({bad_argument, Item}).
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_sup.erl b/deps/rabbitmq_stream/src/rabbit_stream_sup.erl
index ecce4cdf2a..3e36de6fb4 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_sup.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_sup.erl
@@ -19,7 +19,6 @@
-behaviour(supervisor).
-export([start_link/0]).
-
-export([init/1]).
-include("rabbit_stream.hrl").
@@ -28,52 +27,36 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
- {ok, Listeners} = application:get_env(rabbitmq_stream,
- tcp_listeners),
- NumTcpAcceptors = application:get_env(rabbitmq_stream,
- num_tcp_acceptors,
- 10),
- {ok, SocketOpts} = application:get_env(rabbitmq_stream,
- tcp_listen_options),
+ {ok, Listeners} = application:get_env(rabbitmq_stream, tcp_listeners),
+ NumTcpAcceptors = application:get_env(rabbitmq_stream, num_tcp_acceptors, 10),
+ {ok, SocketOpts} = application:get_env(rabbitmq_stream, tcp_listen_options),
Nodes = rabbit_mnesia:cluster_nodes(all),
OsirisConf = #{nodes => Nodes},
- ServerConfiguration = #{initial_credits =>
- application:get_env(rabbitmq_stream,
- initial_credits,
- ?DEFAULT_INITIAL_CREDITS),
- credits_required_for_unblocking =>
- application:get_env(rabbitmq_stream,
- credits_required_for_unblocking,
- ?DEFAULT_CREDITS_REQUIRED_FOR_UNBLOCKING),
- frame_max =>
- application:get_env(rabbit_stream,
- frame_max,
- ?DEFAULT_FRAME_MAX),
- heartbeat =>
- application:get_env(rabbit_stream,
- heartbeat,
- ?DEFAULT_HEARTBEAT)},
- StreamManager = #{id => rabbit_stream_manager,
- type => worker,
- start =>
- {rabbit_stream_manager, start_link, [OsirisConf]}},
+ ServerConfiguration =
+ #{initial_credits =>
+ application:get_env(rabbitmq_stream, initial_credits, ?DEFAULT_INITIAL_CREDITS),
+ credits_required_for_unblocking =>
+ application:get_env(rabbitmq_stream,
+ credits_required_for_unblocking,
+ ?DEFAULT_CREDITS_REQUIRED_FOR_UNBLOCKING),
+ frame_max => application:get_env(rabbit_stream, frame_max, ?DEFAULT_FRAME_MAX),
+ heartbeat => application:get_env(rabbit_stream, heartbeat, ?DEFAULT_HEARTBEAT)},
+ StreamManager =
+ #{id => rabbit_stream_manager,
+ type => worker,
+ start => {rabbit_stream_manager, start_link, [OsirisConf]}},
{ok,
{{one_for_all, 10, 10},
- [StreamManager] ++
- listener_specs(fun tcp_listener_spec/1,
- [SocketOpts, ServerConfiguration, NumTcpAcceptors],
- Listeners)}}.
+ [StreamManager]
+ ++ listener_specs(fun tcp_listener_spec/1,
+ [SocketOpts, ServerConfiguration, NumTcpAcceptors],
+ Listeners)}}.
listener_specs(Fun, Args, Listeners) ->
[Fun([Address | Args])
- || Listener <- Listeners,
- Address
- <- rabbit_networking:tcp_listener_addresses(Listener)].
+ || Listener <- Listeners, Address <- rabbit_networking:tcp_listener_addresses(Listener)].
-tcp_listener_spec([Address,
- SocketOpts,
- Configuration,
- NumAcceptors]) ->
+tcp_listener_spec([Address, SocketOpts, Configuration, NumAcceptors]) ->
rabbit_networking:tcp_listener_spec(rabbit_stream_listener_sup,
Address,
SocketOpts,
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
index d7ec2264fd..f01259a8d3 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
@@ -17,158 +17,150 @@
-module(rabbit_stream_utils).
%% API
--export([enforce_correct_stream_name/1,
- write_messages/4,
- parse_map/2,
- auth_mechanisms/1,
- auth_mechanism_to_module/2,
- check_configure_permitted/3,
- check_write_permitted/3,
- check_read_permitted/3,
- extract_stream_list/2]).
+-export([enforce_correct_stream_name/1, write_messages/4, parse_map/2, auth_mechanisms/1,
+ auth_mechanism_to_module/2, check_configure_permitted/3, check_write_permitted/3,
+ check_read_permitted/3, extract_stream_list/2]).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
enforce_correct_stream_name(Name) ->
% from rabbit_channel
- StrippedName = binary:replace(Name,
- [<<"\n">>, <<"\r">>],
- <<"">>,
- [global]),
+ StrippedName = binary:replace(Name, [<<"\n">>, <<"\r">>], <<"">>, [global]),
case check_name(StrippedName) of
- ok -> {ok, StrippedName};
- error -> error
+ ok ->
+ {ok, StrippedName};
+ error ->
+ error
end.
-check_name(<<"amq.", _/binary>>) -> error;
-check_name(<<"">>) -> error;
-check_name(_Name) -> ok.
+check_name(<<"amq.", _/binary>>) ->
+ error;
+check_name(<<"">>) ->
+ error;
+check_name(_Name) ->
+ ok.
-write_messages(_ClusterLeader, undefined, _PublisherId,
- <<>>) ->
+write_messages(_ClusterLeader, undefined, _PublisherId, <<>>) ->
ok;
-write_messages(ClusterLeader, undefined, PublisherId,
- <<PublishingId:64, 0:1, MessageSize:31,
- Message:MessageSize/binary, Rest/binary>>) ->
+write_messages(ClusterLeader,
+ undefined,
+ PublisherId,
+ <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) ->
% FIXME handle write error
- ok = osiris:write(ClusterLeader,
- undefined,
- {PublisherId, PublishingId},
- Message),
- write_messages(ClusterLeader,
- undefined,
- PublisherId,
- Rest);
-write_messages(ClusterLeader, undefined, PublisherId,
- <<PublishingId:64, 1:1, CompressionType:3, _Unused:4,
- MessageCount:16, BatchSize:32, Batch:BatchSize/binary,
+ ok = osiris:write(ClusterLeader, undefined, {PublisherId, PublishingId}, Message),
+ write_messages(ClusterLeader, undefined, PublisherId, Rest);
+write_messages(ClusterLeader,
+ undefined,
+ PublisherId,
+ <<PublishingId:64,
+ 1:1,
+ CompressionType:3,
+ _Unused:4,
+ MessageCount:16,
+ BatchSize:32,
+ Batch:BatchSize/binary,
Rest/binary>>) ->
% FIXME handle write error
- ok = osiris:write(ClusterLeader,
- undefined,
- {PublisherId, PublishingId},
- {batch, MessageCount, CompressionType, Batch}),
- write_messages(ClusterLeader,
- undefined,
- PublisherId,
- Rest);
-write_messages(_ClusterLeader, _PublisherRef,
- _PublisherId, <<>>) ->
+ ok =
+ osiris:write(ClusterLeader,
+ undefined,
+ {PublisherId, PublishingId},
+ {batch, MessageCount, CompressionType, Batch}),
+ write_messages(ClusterLeader, undefined, PublisherId, Rest);
+write_messages(_ClusterLeader, _PublisherRef, _PublisherId, <<>>) ->
ok;
-write_messages(ClusterLeader, PublisherRef, PublisherId,
- <<PublishingId:64, 0:1, MessageSize:31,
- Message:MessageSize/binary, Rest/binary>>) ->
+write_messages(ClusterLeader,
+ PublisherRef,
+ PublisherId,
+ <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) ->
% FIXME handle write error
- ok = osiris:write(ClusterLeader,
- PublisherRef,
- PublishingId,
- Message),
- write_messages(ClusterLeader,
- PublisherRef,
- PublisherId,
- Rest);
-write_messages(ClusterLeader, PublisherRef, PublisherId,
- <<PublishingId:64, 1:1, CompressionType:3, _Unused:4,
- MessageCount:16, BatchSize:32, Batch:BatchSize/binary,
+ ok = osiris:write(ClusterLeader, PublisherRef, PublishingId, Message),
+ write_messages(ClusterLeader, PublisherRef, PublisherId, Rest);
+write_messages(ClusterLeader,
+ PublisherRef,
+ PublisherId,
+ <<PublishingId:64,
+ 1:1,
+ CompressionType:3,
+ _Unused:4,
+ MessageCount:16,
+ BatchSize:32,
+ Batch:BatchSize/binary,
Rest/binary>>) ->
% FIXME handle write error
- ok = osiris:write(ClusterLeader,
- PublisherRef,
- PublishingId,
- {batch, MessageCount, CompressionType, Batch}),
- write_messages(ClusterLeader,
- PublisherRef,
- PublisherId,
- Rest).
+ ok =
+ osiris:write(ClusterLeader,
+ PublisherRef,
+ PublishingId,
+ {batch, MessageCount, CompressionType, Batch}),
+ write_messages(ClusterLeader, PublisherRef, PublisherId, Rest).
-parse_map(<<>>, _Count) -> {#{}, <<>>};
-parse_map(Content, 0) -> {#{}, Content};
+parse_map(<<>>, _Count) ->
+ {#{}, <<>>};
+parse_map(Content, 0) ->
+ {#{}, Content};
parse_map(Arguments, Count) ->
parse_map(#{}, Arguments, Count).
-parse_map(Acc, <<>>, _Count) -> {Acc, <<>>};
-parse_map(Acc, Content, 0) -> {Acc, Content};
+parse_map(Acc, <<>>, _Count) ->
+ {Acc, <<>>};
+parse_map(Acc, Content, 0) ->
+ {Acc, Content};
parse_map(Acc,
- <<KeySize:16, Key:KeySize/binary, ValueSize:16,
- Value:ValueSize/binary, Rest/binary>>,
+ <<KeySize:16, Key:KeySize/binary, ValueSize:16, Value:ValueSize/binary, Rest/binary>>,
Count) ->
parse_map(maps:put(Key, Value, Acc), Rest, Count - 1).
auth_mechanisms(Sock) ->
- {ok, Configured} = application:get_env(rabbit,
- auth_mechanisms),
+ {ok, Configured} = application:get_env(rabbit, auth_mechanisms),
[rabbit_data_coercion:to_binary(Name)
- || {Name, Module}
- <- rabbit_registry:lookup_all(auth_mechanism),
+ || {Name, Module} <- rabbit_registry:lookup_all(auth_mechanism),
Module:should_offer(Sock),
lists:member(Name, Configured)].
auth_mechanism_to_module(TypeBin, Sock) ->
case rabbit_registry:binary_to_type(TypeBin) of
{error, not_found} ->
- rabbit_log:warning("Unknown authentication mechanism '~p'~n",
- [TypeBin]),
+ rabbit_log:warning("Unknown authentication mechanism '~p'~n", [TypeBin]),
{error, not_found};
T ->
- case {lists:member(TypeBin,
- rabbit_stream_utils:auth_mechanisms(Sock)),
+ case {lists:member(TypeBin, rabbit_stream_utils:auth_mechanisms(Sock)),
rabbit_registry:lookup_module(auth_mechanism, T)}
- of
- {true, {ok, Module}} -> {ok, Module};
+ of
+ {true, {ok, Module}} ->
+ {ok, Module};
_ ->
- rabbit_log:warning("Invalid authentication mechanism '~p'~n",
- [T]),
+ rabbit_log:warning("Invalid authentication mechanism '~p'~n", [T]),
{error, invalid}
end
end.
check_resource_access(User, Resource, Perm, Context) ->
V = {Resource, Context, Perm},
- Cache = case get(permission_cache) of
- undefined -> [];
- Other -> Other
- end,
+ Cache =
+ case get(permission_cache) of
+ undefined ->
+ [];
+ Other ->
+ Other
+ end,
case lists:member(V, Cache) of
- true -> ok;
+ true ->
+ ok;
false ->
- try rabbit_access_control:check_resource_access(User,
- Resource,
- Perm,
- Context),
- CacheTail = lists:sublist(Cache,
- (?MAX_PERMISSION_CACHE_SIZE) - 1),
+ try
+ rabbit_access_control:check_resource_access(User, Resource, Perm, Context),
+ CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1),
put(permission_cache, [V | CacheTail]),
ok
catch
- exit:_ -> error
+ exit:_ ->
+ error
end
end.
check_configure_permitted(Resource, User, Context) ->
- check_resource_access(User,
- Resource,
- configure,
- Context).
+ check_resource_access(User, Resource, configure, Context).
check_write_permitted(Resource, User, Context) ->
check_resource_access(User, Resource, write, Context).
@@ -176,8 +168,7 @@ check_write_permitted(Resource, User, Context) ->
check_read_permitted(Resource, User, Context) ->
check_resource_access(User, Resource, read, Context).
-extract_stream_list(<<>>, Streams) -> Streams;
-extract_stream_list(<<Length:16, Stream:Length/binary,
- Rest/binary>>,
- Streams) ->
+extract_stream_list(<<>>, Streams) ->
+ Streams;
+extract_stream_list(<<Length:16, Stream:Length/binary, Rest/binary>>, Streams) ->
extract_stream_list(Rest, [Stream | Streams]).
diff --git a/deps/rabbitmq_stream/test/command_SUITE.erl b/deps/rabbitmq_stream/test/command_SUITE.erl
index c44c02bfeb..820c878b3b 100644
--- a/deps/rabbitmq_stream/test/command_SUITE.erl
+++ b/deps/rabbitmq_stream/test/command_SUITE.erl
@@ -10,35 +10,32 @@
-compile([export_all]).
-include_lib("common_test/include/ct.hrl").
-
-include_lib("eunit/include/eunit.hrl").
-
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_stream.hrl").
--define(COMMAND,
- 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand').
+-define(COMMAND, 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand').
-all() -> [{group, non_parallel_tests}].
+all() ->
+ [{group, non_parallel_tests}].
groups() ->
[{non_parallel_tests, [], [merge_defaults, run]}].
init_per_suite(Config) ->
- Config1 = rabbit_ct_helpers:set_config(Config,
- [{rmq_nodename_suffix, ?MODULE}]),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{rmq_nodename_suffix, ?MODULE}]),
rabbit_ct_helpers:log_environment(),
- rabbit_ct_helpers:run_setup_steps(Config1,
- rabbit_ct_broker_helpers:setup_steps()).
+ rabbit_ct_helpers:run_setup_steps(Config1, rabbit_ct_broker_helpers:setup_steps()).
end_per_suite(Config) ->
- rabbit_ct_helpers:run_teardown_steps(Config,
- rabbit_ct_broker_helpers:teardown_steps()).
+ rabbit_ct_helpers:run_teardown_steps(Config, rabbit_ct_broker_helpers:teardown_steps()).
-init_per_group(_, Config) -> Config.
+init_per_group(_, Config) ->
+ Config.
-end_per_group(_, Config) -> Config.
+end_per_group(_, Config) ->
+ Config.
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
@@ -47,72 +44,66 @@ end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
merge_defaults(_Config) ->
- {[<<"conn_name">>], #{verbose := false}} =
- (?COMMAND):merge_defaults([], #{}),
+ {[<<"conn_name">>], #{verbose := false}} = ?COMMAND:merge_defaults([], #{}),
{[<<"other_key">>], #{verbose := true}} =
- (?COMMAND):merge_defaults([<<"other_key">>],
- #{verbose => true}),
+ ?COMMAND:merge_defaults([<<"other_key">>], #{verbose => true}),
{[<<"other_key">>], #{verbose := false}} =
- (?COMMAND):merge_defaults([<<"other_key">>],
- #{verbose => false}).
+ ?COMMAND:merge_defaults([<<"other_key">>], #{verbose => false}).
run(Config) ->
- Node = rabbit_ct_broker_helpers:get_node_config(Config,
- 0,
- nodename),
- Opts = #{node => Node, timeout => 10000,
- verbose => false},
+ Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Opts =
+ #{node => Node,
+ timeout => 10000,
+ verbose => false},
%% No connections
- [] = 'Elixir.Enum':to_list((?COMMAND):run([], Opts)),
- StreamPort =
- rabbit_stream_SUITE:get_stream_port(Config),
+ [] =
+ 'Elixir.Enum':to_list(
+ ?COMMAND:run([], Opts)),
+ StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
S1 = start_stream_connection(StreamPort),
ct:sleep(100),
[[{conn_name, _}]] =
- 'Elixir.Enum':to_list((?COMMAND):run([<<"conn_name">>],
- Opts)),
+ 'Elixir.Enum':to_list(
+ ?COMMAND:run([<<"conn_name">>], Opts)),
S2 = start_stream_connection(StreamPort),
ct:sleep(100),
[[{conn_name, _}], [{conn_name, _}]] =
- 'Elixir.Enum':to_list((?COMMAND):run([<<"conn_name">>],
- Opts)),
- Port = rabbit_ct_broker_helpers:get_node_config(Config,
- 0,
- tcp_port_amqp),
+ 'Elixir.Enum':to_list(
+ ?COMMAND:run([<<"conn_name">>], Opts)),
+ Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
start_amqp_connection(network, Node, Port),
%% There are still just two connections
[[{conn_name, _}], [{conn_name, _}]] =
- 'Elixir.Enum':to_list((?COMMAND):run([<<"conn_name">>],
- Opts)),
+ 'Elixir.Enum':to_list(
+ ?COMMAND:run([<<"conn_name">>], Opts)),
start_amqp_connection(direct, Node, Port),
%% Still two MQTT connections, one direct AMQP 0-9-1 connection
[[{conn_name, _}], [{conn_name, _}]] =
- 'Elixir.Enum':to_list((?COMMAND):run([<<"conn_name">>],
- Opts)),
+ 'Elixir.Enum':to_list(
+ ?COMMAND:run([<<"conn_name">>], Opts)),
%% Verbose returns all keys
- Infos = lists:map(fun (El) -> atom_to_binary(El, utf8)
- end,
- ?INFO_ITEMS),
- AllKeys = 'Elixir.Enum':to_list((?COMMAND):run(Infos,
- Opts)),
- AllKeys = 'Elixir.Enum':to_list((?COMMAND):run([],
- Opts#{verbose => true})),
+ Infos = lists:map(fun(El) -> atom_to_binary(El, utf8) end, ?INFO_ITEMS),
+ AllKeys =
+ 'Elixir.Enum':to_list(
+ ?COMMAND:run(Infos, Opts)),
+ AllKeys =
+ 'Elixir.Enum':to_list(
+ ?COMMAND:run([], Opts#{verbose => true})),
%% There are two connections
[First, _Second] = AllKeys,
%% Keys are INFO_ITEMS
KeysCount = length(?INFO_ITEMS),
KeysCount = length(First),
{Keys, _} = lists:unzip(First),
- [] = Keys -- (?INFO_ITEMS),
- [] = (?INFO_ITEMS) -- Keys,
+ [] = Keys -- ?INFO_ITEMS,
+ [] = ?INFO_ITEMS -- Keys,
rabbit_stream_SUITE:test_close(S1),
rabbit_stream_SUITE:test_close(S2),
ok.
start_stream_connection(Port) ->
- {ok, S} = gen_tcp:connect("localhost",
- Port,
- [{active, false}, {mode, binary}]),
+ {ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
rabbit_stream_SUITE:test_peer_properties(S),
rabbit_stream_SUITE:test_authenticate(S),
S.
diff --git a/deps/rabbitmq_stream/test/config_schema_SUITE.erl b/deps/rabbitmq_stream/test/config_schema_SUITE.erl
index ea567b2aa1..a4e2485bdd 100644
--- a/deps/rabbitmq_stream/test/config_schema_SUITE.erl
+++ b/deps/rabbitmq_stream/test/config_schema_SUITE.erl
@@ -9,7 +9,8 @@
-compile(export_all).
-all() -> [run_snippets].
+all() ->
+ [run_snippets].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
@@ -18,25 +19,23 @@ all() -> [run_snippets].
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:run_setup_steps(Config),
- rabbit_ct_config_schema:init_schemas(rabbitmq_stream,
- Config1).
+ rabbit_ct_config_schema:init_schemas(rabbitmq_stream, Config1).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase),
- Config1 = rabbit_ct_helpers:set_config(Config,
- [{rmq_nodename_suffix, Testcase}]),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{rmq_nodename_suffix, Testcase}]),
rabbit_ct_helpers:run_steps(Config1,
- rabbit_ct_broker_helpers:setup_steps() ++
- rabbit_ct_client_helpers:setup_steps()).
+ rabbit_ct_broker_helpers:setup_steps()
+ ++ rabbit_ct_client_helpers:setup_steps()).
end_per_testcase(Testcase, Config) ->
- Config1 = rabbit_ct_helpers:run_steps(Config,
- rabbit_ct_client_helpers:teardown_steps()
- ++
- rabbit_ct_broker_helpers:teardown_steps()),
+ Config1 =
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps()
+ ++ rabbit_ct_broker_helpers:teardown_steps()),
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
%% -------------------------------------------------------------------
@@ -44,11 +43,7 @@ end_per_testcase(Testcase, Config) ->
%% -------------------------------------------------------------------
run_snippets(Config) ->
- ok = rabbit_ct_broker_helpers:rpc(Config,
- 0,
- ?MODULE,
- run_snippets1,
- [Config]).
+ ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, run_snippets1, [Config]).
run_snippets1(Config) ->
rabbit_ct_config_schema:run_snippets(Config).
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
index ef90ef6a70..0fc8524918 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
@@ -22,51 +22,49 @@
-compile(export_all).
-all() -> [{group, single_node}, {group, cluster}].
+all() ->
+ [{group, single_node}, {group, cluster}].
groups() ->
- [{single_node, [], [test_stream]},
- {cluster, [], [test_stream, java]}].
+ [{single_node, [], [test_stream]}, {cluster, [], [test_stream, java]}].
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Config.
-end_per_suite(Config) -> Config.
+end_per_suite(Config) ->
+ Config.
init_per_group(single_node, Config) ->
- Config1 = rabbit_ct_helpers:set_config(Config,
- [{rmq_nodes_clustered, false}]),
- rabbit_ct_helpers:run_setup_steps(Config1,
- rabbit_ct_broker_helpers:setup_steps());
+ Config1 = rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]),
+ rabbit_ct_helpers:run_setup_steps(Config1, rabbit_ct_broker_helpers:setup_steps());
init_per_group(cluster = Group, Config) ->
- Config1 = rabbit_ct_helpers:set_config(Config,
- [{rmq_nodes_clustered, true}]),
- Config2 = rabbit_ct_helpers:set_config(Config1,
- [{rmq_nodes_count, 3},
- {rmq_nodename_suffix, Group},
- {tcp_ports_base}]),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]),
+ Config2 =
+ rabbit_ct_helpers:set_config(Config1,
+ [{rmq_nodes_count, 3},
+ {rmq_nodename_suffix, Group},
+ {tcp_ports_base}]),
rabbit_ct_helpers:run_setup_steps(Config2,
- [fun (StepConfig) ->
- rabbit_ct_helpers:merge_app_env(StepConfig,
- {aten,
- [{poll_interval,
- 1000}]})
+ [fun(StepConfig) ->
+ rabbit_ct_helpers:merge_app_env(StepConfig,
+ {aten,
+ [{poll_interval, 1000}]})
end]
- ++
- rabbit_ct_broker_helpers:setup_steps());
+ ++ rabbit_ct_broker_helpers:setup_steps());
init_per_group(_, Config) ->
rabbit_ct_helpers:run_setup_steps(Config).
end_per_group(java, Config) ->
rabbit_ct_helpers:run_teardown_steps(Config);
end_per_group(_, Config) ->
- rabbit_ct_helpers:run_steps(Config,
- rabbit_ct_broker_helpers:teardown_steps()).
+ rabbit_ct_helpers:run_steps(Config, rabbit_ct_broker_helpers:teardown_steps()).
-init_per_testcase(_TestCase, Config) -> Config.
+init_per_testcase(_TestCase, Config) ->
+ Config.
-end_per_testcase(_Test, _Config) -> ok.
+end_per_testcase(_Test, _Config) ->
+ ok.
test_stream(Config) ->
Port = get_stream_port(Config),
@@ -79,41 +77,35 @@ java(Config) ->
Node1Name = get_node_name(Config, 0),
Node2Name = get_node_name(Config, 1),
RabbitMqCtl = get_rabbitmqctl(Config),
- DataDir = rabbit_ct_helpers:get_config(Config,
- data_dir),
- MakeResult = rabbit_ct_helpers:make(Config,
- DataDir,
- ["tests",
- {"NODE1_STREAM_PORT=~b",
- [StreamPortNode1]},
- {"NODE1_NAME=~p", [Node1Name]},
- {"NODE2_NAME=~p", [Node2Name]},
- {"NODE2_STREAM_PORT=~b",
- [StreamPortNode2]},
- {"RABBITMQCTL=~p", [RabbitMqCtl]}]),
+ DataDir = rabbit_ct_helpers:get_config(Config, data_dir),
+ MakeResult =
+ rabbit_ct_helpers:make(Config,
+ DataDir,
+ ["tests",
+ {"NODE1_STREAM_PORT=~b", [StreamPortNode1]},
+ {"NODE1_NAME=~p", [Node1Name]},
+ {"NODE2_NAME=~p", [Node2Name]},
+ {"NODE2_STREAM_PORT=~b", [StreamPortNode2]},
+ {"RABBITMQCTL=~p", [RabbitMqCtl]}]),
{ok, _} = MakeResult.
get_rabbitmqctl(Config) ->
rabbit_ct_helpers:get_config(Config, rabbitmqctl_cmd).
-get_stream_port(Config) -> get_stream_port(Config, 0).
+get_stream_port(Config) ->
+ get_stream_port(Config, 0).
get_stream_port(Config, Node) ->
- rabbit_ct_broker_helpers:get_node_config(Config,
- Node,
- tcp_port_stream).
+ rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_stream).
-get_node_name(Config) -> get_node_name(Config, 0).
+get_node_name(Config) ->
+ get_node_name(Config, 0).
get_node_name(Config, Node) ->
- rabbit_ct_broker_helpers:get_node_config(Config,
- Node,
- nodename).
+ rabbit_ct_broker_helpers:get_node_config(Config, Node, nodename).
test_server(Port) ->
- {ok, S} = gen_tcp:connect("localhost",
- Port,
- [{active, false}, {mode, binary}]),
+ {ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
test_peer_properties(S),
test_authenticate(S),
Stream = <<"stream1">>,
@@ -132,194 +124,250 @@ test_server(Port) ->
ok.
test_peer_properties(S) ->
- PeerPropertiesFrame = <<(?COMMAND_PEER_PROPERTIES):16,
- (?VERSION_0):16, 1:32, 0:32>>,
- PeerPropertiesFrameSize =
- byte_size(PeerPropertiesFrame),
- gen_tcp:send(S,
- <<PeerPropertiesFrameSize:32,
- PeerPropertiesFrame/binary>>),
+ PeerPropertiesFrame = <<?COMMAND_PEER_PROPERTIES:16, ?VERSION_0:16, 1:32, 0:32>>,
+ PeerPropertiesFrameSize = byte_size(PeerPropertiesFrame),
+ gen_tcp:send(S, <<PeerPropertiesFrameSize:32, PeerPropertiesFrame/binary>>),
{ok,
- <<_Size:32, (?COMMAND_PEER_PROPERTIES):16,
- (?VERSION_0):16, 1:32, (?RESPONSE_CODE_OK):16,
+ <<_Size:32,
+ ?COMMAND_PEER_PROPERTIES:16,
+ ?VERSION_0:16,
+ 1:32,
+ ?RESPONSE_CODE_OK:16,
_Rest/binary>>} =
gen_tcp:recv(S, 0, 5000).
test_authenticate(S) ->
- SaslHandshakeFrame = <<(?COMMAND_SASL_HANDSHAKE):16,
- (?VERSION_0):16, 1:32>>,
+ SaslHandshakeFrame = <<?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16, 1:32>>,
SaslHandshakeFrameSize = byte_size(SaslHandshakeFrame),
- gen_tcp:send(S,
- <<SaslHandshakeFrameSize:32,
- SaslHandshakeFrame/binary>>),
+ gen_tcp:send(S, <<SaslHandshakeFrameSize:32, SaslHandshakeFrame/binary>>),
Plain = <<"PLAIN">>,
AmqPlain = <<"AMQPLAIN">>,
{ok, SaslAvailable} = gen_tcp:recv(S, 0, 5000),
%% mechanisms order is not deterministic, so checking both orders
- ok = case SaslAvailable of
- <<31:32, (?COMMAND_SASL_HANDSHAKE):16, (?VERSION_0):16,
- 1:32, (?RESPONSE_CODE_OK):16, 2:32, 5:16,
- Plain:5/binary, 8:16, AmqPlain:8/binary>> ->
- ok;
- <<31:32, (?COMMAND_SASL_HANDSHAKE):16, (?VERSION_0):16,
- 1:32, (?RESPONSE_CODE_OK):16, 2:32, 8:16,
- AmqPlain:8/binary, 5:16, Plain:5/binary>> ->
- ok;
- _ -> failed
- end,
+ ok =
+ case SaslAvailable of
+ <<31:32,
+ ?COMMAND_SASL_HANDSHAKE:16,
+ ?VERSION_0:16,
+ 1:32,
+ ?RESPONSE_CODE_OK:16,
+ 2:32,
+ 5:16,
+ Plain:5/binary,
+ 8:16,
+ AmqPlain:8/binary>> ->
+ ok;
+ <<31:32,
+ ?COMMAND_SASL_HANDSHAKE:16,
+ ?VERSION_0:16,
+ 1:32,
+ ?RESPONSE_CODE_OK:16,
+ 2:32,
+ 8:16,
+ AmqPlain:8/binary,
+ 5:16,
+ Plain:5/binary>> ->
+ ok;
+ _ ->
+ failed
+ end,
Username = <<"guest">>,
Password = <<"guest">>,
Null = 0,
- PlainSasl = <<Null:8, Username/binary, Null:8,
- Password/binary>>,
+ PlainSasl = <<Null:8, Username/binary, Null:8, Password/binary>>,
PlainSaslSize = byte_size(PlainSasl),
SaslAuthenticateFrame =
- <<(?COMMAND_SASL_AUTHENTICATE):16, (?VERSION_0):16,
- 2:32, 5:16, Plain/binary, PlainSaslSize:32,
+ <<?COMMAND_SASL_AUTHENTICATE:16,
+ ?VERSION_0:16,
+ 2:32,
+ 5:16,
+ Plain/binary,
+ PlainSaslSize:32,
PlainSasl/binary>>,
- SaslAuthenticateFrameSize =
- byte_size(SaslAuthenticateFrame),
- gen_tcp:send(S,
- <<SaslAuthenticateFrameSize:32,
- SaslAuthenticateFrame/binary>>),
+ SaslAuthenticateFrameSize = byte_size(SaslAuthenticateFrame),
+ gen_tcp:send(S, <<SaslAuthenticateFrameSize:32, SaslAuthenticateFrame/binary>>),
{ok,
- <<10:32, (?COMMAND_SASL_AUTHENTICATE):16,
- (?VERSION_0):16, 2:32, (?RESPONSE_CODE_OK):16,
+ <<10:32,
+ ?COMMAND_SASL_AUTHENTICATE:16,
+ ?VERSION_0:16,
+ 2:32,
+ ?RESPONSE_CODE_OK:16,
RestTune/binary>>} =
gen_tcp:recv(S, 0, 5000),
- TuneExpected = <<12:32, (?COMMAND_TUNE):16,
- (?VERSION_0):16, (?DEFAULT_FRAME_MAX):32,
- (?DEFAULT_HEARTBEAT):32>>,
+ TuneExpected =
+ <<12:32, ?COMMAND_TUNE:16, ?VERSION_0:16, ?DEFAULT_FRAME_MAX:32, ?DEFAULT_HEARTBEAT:32>>,
case RestTune of
- <<>> -> {ok, TuneExpected} = gen_tcp:recv(S, 0, 5000);
- TuneReceived -> TuneExpected = TuneReceived
+ <<>> ->
+ {ok, TuneExpected} = gen_tcp:recv(S, 0, 5000);
+ TuneReceived ->
+ TuneExpected = TuneReceived
end,
- TuneFrame = <<(?COMMAND_TUNE):16, (?VERSION_0):16,
- (?DEFAULT_FRAME_MAX):32, 0:32>>,
+ TuneFrame = <<?COMMAND_TUNE:16, ?VERSION_0:16, ?DEFAULT_FRAME_MAX:32, 0:32>>,
TuneFrameSize = byte_size(TuneFrame),
gen_tcp:send(S, <<TuneFrameSize:32, TuneFrame/binary>>),
VirtualHost = <<"/">>,
VirtualHostLength = byte_size(VirtualHost),
- OpenFrame = <<(?COMMAND_OPEN):16, (?VERSION_0):16, 3:32,
- VirtualHostLength:16, VirtualHost/binary>>,
+ OpenFrame =
+ <<?COMMAND_OPEN:16, ?VERSION_0:16, 3:32, VirtualHostLength:16, VirtualHost/binary>>,
OpenFrameSize = byte_size(OpenFrame),
gen_tcp:send(S, <<OpenFrameSize:32, OpenFrame/binary>>),
- {ok,
- <<10:32, (?COMMAND_OPEN):16, (?VERSION_0):16, 3:32,
- (?RESPONSE_CODE_OK):16>>} =
+ {ok, <<10:32, ?COMMAND_OPEN:16, ?VERSION_0:16, 3:32, ?RESPONSE_CODE_OK:16>>} =
gen_tcp:recv(S, 0, 5000).
test_create_stream(S, Stream) ->
StreamSize = byte_size(Stream),
- CreateStreamFrame = <<(?COMMAND_CREATE_STREAM):16,
- (?VERSION_0):16, 1:32, StreamSize:16,
- Stream:StreamSize/binary, 0:32>>,
+ CreateStreamFrame =
+ <<?COMMAND_CREATE_STREAM:16,
+ ?VERSION_0:16,
+ 1:32,
+ StreamSize:16,
+ Stream:StreamSize/binary,
+ 0:32>>,
FrameSize = byte_size(CreateStreamFrame),
- gen_tcp:send(S,
- <<FrameSize:32, CreateStreamFrame/binary>>),
- {ok,
- <<_Size:32, (?COMMAND_CREATE_STREAM):16,
- (?VERSION_0):16, 1:32, (?RESPONSE_CODE_OK):16>>} =
+ gen_tcp:send(S, <<FrameSize:32, CreateStreamFrame/binary>>),
+ {ok, <<_Size:32, ?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} =
gen_tcp:recv(S, 0, 5000).
test_delete_stream(S, Stream) ->
StreamSize = byte_size(Stream),
- DeleteStreamFrame = <<(?COMMAND_DELETE_STREAM):16,
- (?VERSION_0):16, 1:32, StreamSize:16,
- Stream:StreamSize/binary>>,
+ DeleteStreamFrame =
+ <<?COMMAND_DELETE_STREAM:16,
+ ?VERSION_0:16,
+ 1:32,
+ StreamSize:16,
+ Stream:StreamSize/binary>>,
FrameSize = byte_size(DeleteStreamFrame),
- gen_tcp:send(S,
- <<FrameSize:32, DeleteStreamFrame/binary>>),
+ gen_tcp:send(S, <<FrameSize:32, DeleteStreamFrame/binary>>),
ResponseFrameSize = 10,
{ok,
- <<ResponseFrameSize:32, (?COMMAND_DELETE_STREAM):16,
- (?VERSION_0):16, 1:32, (?RESPONSE_CODE_OK):16>>} =
+ <<ResponseFrameSize:32,
+ ?COMMAND_DELETE_STREAM:16,
+ ?VERSION_0:16,
+ 1:32,
+ ?RESPONSE_CODE_OK:16>>} =
gen_tcp:recv(S, 4 + 10, 5000).
test_declare_publisher(S, PublisherId, Stream) ->
StreamSize = byte_size(Stream),
DeclarePublisherFrame =
- <<(?COMMAND_DECLARE_PUBLISHER):16, (?VERSION_0):16,
- 1:32, PublisherId:8,
+ <<?COMMAND_DECLARE_PUBLISHER:16,
+ ?VERSION_0:16,
+ 1:32,
+ PublisherId:8,
0:16, %% empty publisher reference
- StreamSize:16, Stream:StreamSize/binary>>,
+ StreamSize:16,
+ Stream:StreamSize/binary>>,
FrameSize = byte_size(DeclarePublisherFrame),
- gen_tcp:send(S,
- <<FrameSize:32, DeclarePublisherFrame/binary>>),
+ gen_tcp:send(S, <<FrameSize:32, DeclarePublisherFrame/binary>>),
Res = gen_tcp:recv(S, 0, 5000),
{ok,
- <<_Size:32, (?COMMAND_DECLARE_PUBLISHER):16,
- (?VERSION_0):16, 1:32, (?RESPONSE_CODE_OK):16,
+ <<_Size:32,
+ ?COMMAND_DECLARE_PUBLISHER:16,
+ ?VERSION_0:16,
+ 1:32,
+ ?RESPONSE_CODE_OK:16,
Rest/binary>>} =
Res,
Rest.
test_publish_confirm(S, PublisherId, Body) ->
BodySize = byte_size(Body),
- PublishFrame = <<(?COMMAND_PUBLISH):16, (?VERSION_0):16,
- PublisherId:8, 1:32, 1:64, BodySize:32,
- Body:BodySize/binary>>,
+ PublishFrame =
+ <<?COMMAND_PUBLISH:16,
+ ?VERSION_0:16,
+ PublisherId:8,
+ 1:32,
+ 1:64,
+ BodySize:32,
+ Body:BodySize/binary>>,
FrameSize = byte_size(PublishFrame),
gen_tcp:send(S, <<FrameSize:32, PublishFrame/binary>>),
{ok,
- <<_Size:32, (?COMMAND_PUBLISH_CONFIRM):16,
- (?VERSION_0):16, PublisherId:8, 1:32, 1:64>>} =
+ <<_Size:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16, PublisherId:8, 1:32, 1:64>>} =
gen_tcp:recv(S, 0, 5000).
test_subscribe(S, SubscriptionId, Stream) ->
StreamSize = byte_size(Stream),
- SubscribeFrame = <<(?COMMAND_SUBSCRIBE):16,
- (?VERSION_0):16, 1:32, SubscriptionId:8, StreamSize:16,
- Stream:StreamSize/binary, (?OFFSET_TYPE_OFFSET):16,
- 0:64, 10:16>>,
+ SubscribeFrame =
+ <<?COMMAND_SUBSCRIBE:16,
+ ?VERSION_0:16,
+ 1:32,
+ SubscriptionId:8,
+ StreamSize:16,
+ Stream:StreamSize/binary,
+ ?OFFSET_TYPE_OFFSET:16,
+ 0:64,
+ 10:16>>,
FrameSize = byte_size(SubscribeFrame),
- gen_tcp:send(S,
- <<FrameSize:32, SubscribeFrame/binary>>),
+ gen_tcp:send(S, <<FrameSize:32, SubscribeFrame/binary>>),
Res = gen_tcp:recv(S, 0, 5000),
{ok,
- <<_Size:32, (?COMMAND_SUBSCRIBE):16, (?VERSION_0):16,
- 1:32, (?RESPONSE_CODE_OK):16, Rest/binary>>} =
+ <<_Size:32,
+ ?COMMAND_SUBSCRIBE:16,
+ ?VERSION_0:16,
+ 1:32,
+ ?RESPONSE_CODE_OK:16,
+ Rest/binary>>} =
Res,
Rest.
test_deliver(S, Rest, SubscriptionId, Body) ->
BodySize = byte_size(Body),
Frame = read_frame(S, Rest),
- <<58:32, (?COMMAND_DELIVER):16, (?VERSION_0):16,
- SubscriptionId:8, 5:4/unsigned, 0:4/unsigned, 0:8, 1:16,
- 1:32, _Timestamp:64, _Epoch:64, 0:64, _Crc:32,
- _DataLength:32, _TrailerLength:32, 0:1,
- BodySize:31/unsigned, Body/binary>> =
+ <<58:32,
+ ?COMMAND_DELIVER:16,
+ ?VERSION_0:16,
+ SubscriptionId:8,
+ 5:4/unsigned,
+ 0:4/unsigned,
+ 0:8,
+ 1:16,
+ 1:32,
+ _Timestamp:64,
+ _Epoch:64,
+ 0:64,
+ _Crc:32,
+ _DataLength:32,
+ _TrailerLength:32,
+ 0:1,
+ BodySize:31/unsigned,
+ Body/binary>> =
Frame.
test_metadata_update_stream_deleted(S, Stream) ->
StreamSize = byte_size(Stream),
{ok,
- <<15:32, (?COMMAND_METADATA_UPDATE):16, (?VERSION_0):16,
- (?RESPONSE_CODE_STREAM_NOT_AVAILABLE):16, StreamSize:16,
+ <<15:32,
+ ?COMMAND_METADATA_UPDATE:16,
+ ?VERSION_0:16,
+ ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16,
+ StreamSize:16,
Stream/binary>>} =
gen_tcp:recv(S, 0, 5000).
test_close(S) ->
CloseReason = <<"OK">>,
CloseReasonSize = byte_size(CloseReason),
- CloseFrame = <<(?COMMAND_CLOSE):16, (?VERSION_0):16,
- 1:32, (?RESPONSE_CODE_OK):16, CloseReasonSize:16,
- CloseReason/binary>>,
+ CloseFrame =
+ <<?COMMAND_CLOSE:16,
+ ?VERSION_0:16,
+ 1:32,
+ ?RESPONSE_CODE_OK:16,
+ CloseReasonSize:16,
+ CloseReason/binary>>,
CloseFrameSize = byte_size(CloseFrame),
- gen_tcp:send(S,
- <<CloseFrameSize:32, CloseFrame/binary>>),
- {ok,
- <<10:32, (?COMMAND_CLOSE):16, (?VERSION_0):16, 1:32,
- (?RESPONSE_CODE_OK):16>>} =
+ gen_tcp:send(S, <<CloseFrameSize:32, CloseFrame/binary>>),
+ {ok, <<10:32, ?COMMAND_CLOSE:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} =
gen_tcp:recv(S, 0, 5000).
-wait_for_socket_close(_S, 0) -> not_closed;
+wait_for_socket_close(_S, 0) ->
+ not_closed;
wait_for_socket_close(S, Attempt) ->
case gen_tcp:recv(S, 0, 1000) of
{error, timeout} ->
wait_for_socket_close(S, Attempt - 1);
- {error, closed} -> closed
+ {error, closed} ->
+ closed
end.
read_frame(S, Buffer) ->
@@ -328,8 +376,12 @@ read_frame(S, Buffer) ->
{tcp, S, Received} ->
Data = <<Buffer/binary, Received/binary>>,
case Data of
- <<Size:32, _Body:Size/binary>> -> Data;
- _ -> read_frame(S, Data)
+ <<Size:32, _Body:Size/binary>> ->
+ Data;
+ _ ->
+ read_frame(S, Data)
end
- after 1000 -> inet:setopts(S, [{active, false}]), Buffer
+ after 1000 ->
+ inet:setopts(S, [{active, false}]),
+ Buffer
end.