summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_stream/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2021-04-30 17:27:59 +0100
committerkjnilsson <knilsson@pivotal.io>2021-04-30 17:27:59 +0100
commit0e8474abb7b6fd1479dbc8097c2278a1adf83381 (patch)
tree9021001e3a1f4132c039d63cda3d81e0f4195703 /deps/rabbitmq_stream/src
parent56253b4876659fce00b4db70d9920cc075fef460 (diff)
downloadrabbitmq-server-git-stream-refactoring.tar.gz
move parsing into core modulestream-refactoring
Diffstat (limited to 'deps/rabbitmq_stream/src')
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_core.erl347
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl2
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl401
3 files changed, 487 insertions, 263 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_core.erl b/deps/rabbitmq_stream/src/rabbit_stream_core.erl
index 665ee3c4b9..400d74131e 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_core.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_core.erl
@@ -4,16 +4,20 @@
-export([
init/1,
incoming_data/2,
- frame/1
+ frame/1,
+ parse_command/1
]).
%% holds static or rarely changing fields
-record(cfg, {}).
-record(?MODULE, {cfg :: #cfg{},
+ frames = [] :: [iodata()],
%% partial data
- partial_frame :: undefined |
- {ExpectedLength :: non_neg_integer(), iolist()}
+ data :: undefined |
+ %% this is only if the binary is smaller than 4 bytes
+ binary() |
+ {RemainingBytes :: non_neg_integer(), iodata()}
}).
-opaque state() :: #?MODULE{}.
@@ -67,6 +71,7 @@
{credit, subscription_id(), Credit :: non_neg_integer()} |
{metadata_update, stream_name(), response_code()} |
heartbeat |
+ {tune, FrameMax :: non_neg_integer(), HeartBeat :: non_neg_integer()} |
{request, correlation_id(),
{declare_publisher, publisher_id(), writer_ref(), stream_name()} |
{query_publisher_sequence, writer_ref(), stream_name()} |
@@ -83,7 +88,6 @@
sasl_handshake |
%% TODO: look into
{sasl_authenticate, Mechanism :: binary(), SaslFragment :: binary()} |
- {tune, FrameMax :: non_neg_integer(), HeartBeat :: non_neg_integer()} |
{open, VirtualHost :: binary()} |
{close, Code :: non_neg_integer(), Reason :: binary()} |
{route, RoutingKey :: binary(), SuperStream :: binary()} |
@@ -111,7 +115,7 @@
{peer_properties, response_code(), #{binary() => binary()}} |
{sasl_handshake, response_code(), Mechanisms :: [binary()]} |
%% either response code or sasl fragment
- {sasl_authenticate, response_code() | binary()} |
+ {sasl_authenticate, response_code(), Challenge :: binary()} |
{tune, FrameMax :: non_neg_integer(), HeartBeat :: non_neg_integer()} |
%% TODO should route return a list of routed streams?
{route, response_code(), stream_name()} |
@@ -120,12 +124,50 @@
-spec init(term()) -> state().
init(_) ->
- #?MODULE{}.
+ #?MODULE{cfg = #cfg{}}.
+%% returns frames
-spec incoming_data(binary(), state()) ->
{state(), [command()]}.
-incoming_data(_, State) ->
- {State, []}.
+%% TODO: check max frame size
+incoming_data(<<>>, #?MODULE{frames = Frames} = State) ->
+ {State#?MODULE{frames = []}, parse_frames(Frames)};
+incoming_data(<<Size:32/unsigned, Frame:Size/binary, Rem/binary>>,
+ #?MODULE{frames = Frames,
+ data = undefined} = State) ->
+ incoming_data(Rem, State#?MODULE{frames = [Frame | Frames],
+ data = undefined});
+incoming_data(<<Size:32/unsigned, Rem/binary>>,
+ #?MODULE{frames = Frames,
+ data = undefined} = State) ->
+ %% not enough data to complete frame, stash and await more data
+ {State#?MODULE{frames = [],
+ data = {Size, Rem}},
+ parse_frames(Frames)};
+incoming_data(Data,
+ #?MODULE{frames = Frames,
+ data = {Size, Partial}} = State) ->
+ case Data of
+ <<Data:Size/binary, Rem/binary>> ->
+ incoming_data(Rem, State#?MODULE{frames = [append_data(Partial, [Data])
+ | Frames],
+ data = undefined});
+ Rem ->
+ {State#?MODULE{frames = [],
+ data = {Size - byte_size(Rem),
+ append_data(Partial, Rem)}},
+ parse_frames(Frames)}
+ end;
+incoming_data(Data, #?MODULE{data = Partial} = State)
+ when is_binary(Partial) ->
+ incoming_data(<<Partial/binary, Data/binary>>,
+ State#?MODULE{data = undefined}).
+
+parse_frames(Frames) ->
+ lists:foldl(
+ fun (Frame, Acc) ->
+ [parse_command(Frame) | Acc]
+ end, [], Frames).
-spec frame(command()) -> iodata().
frame({publish_confirm, PublisherId, PublishingIds}) ->
@@ -144,11 +186,300 @@ frame({publish_confirm, PublisherId, PublishingIds}) ->
frame(_Command) ->
[].
+append_data(Prev, Data) when is_binary(Prev) ->
+ [Prev, Data];
+append_data(Prev, Data) when is_list(Prev) ->
+ Prev ++ [Data].
+
wrap_in_frame(IOData) ->
Size = iolist_size(IOData),
[<<Size:32>> | IOData].
+parse_command(<<?REQUEST:1, _:15, _/binary>> = Bin) ->
+ parse_request(Bin);
+parse_command(<<?RESPONSE:1, _:15, _/binary>> = Bin) ->
+ parse_response(Bin);
+parse_command(Data) when is_list(Data) ->
+ %% TODO: most commands are rare or small and likely to be a single
+ %% binary, however publish and delivery should be parsed from the
+ %% iodata rather than turned into a binary
+ parse_command(iolist_to_binary(Data)).
+
+-define(STRING(Size, Str), Size:16, Str:Size/binary).
+
+-spec parse_request(binary()) -> command().
+parse_request(<<?REQUEST:1, ?COMMAND_PUBLISH:15, ?VERSION_1:16,
+ PublisherId:8/unsigned, MessageCount:32, Messages/binary>>) ->
+ {publish, PublisherId, MessageCount, Messages};
+parse_request(<<?REQUEST:1, ?COMMAND_PUBLISH_CONFIRM:15, ?VERSION_1:16,
+ PublisherId:8, _Count:32, PublishingIds/binary>>) ->
+ {publish_confirm, PublisherId, list_of_longs(PublishingIds)};
+parse_request(<<?REQUEST:1, ?COMMAND_DELIVER:15, ?VERSION_1:16,
+ SubscriptionId:8, Chunk/binary>>) ->
+ {deliver, SubscriptionId, Chunk};
+parse_request(<<?REQUEST:1, ?COMMAND_CREDIT:15, ?VERSION_1:16,
+ SubscriptionId:8, Credit:16/signed>>) ->
+ {credit, SubscriptionId, Credit};
+parse_request(<<?REQUEST:1, ?COMMAND_PUBLISH_ERROR:15, ?VERSION_1:16,
+ PublisherId:8, _Count:32, DetailsBin/binary>>) ->
+ %% TODO: change protocol to match
+ [{_, ErrCode} | _] = Details = list_of_longcodes(DetailsBin),
+ {PublishingIds, _} = lists:unzip(Details),
+ {publish_error, PublisherId, ErrCode, PublishingIds};
+parse_request(<<?REQUEST:1, ?COMMAND_METADATA_UPDATE:15, ?VERSION_1:16,
+ ResponseCode:16, StreamSize:16, Stream:StreamSize/binary>>) ->
+ {metadata_update, Stream, ResponseCode};
+parse_request(<<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>) ->
+ heartbeat;
+parse_request(<<?REQUEST:1, ?COMMAND_DECLARE_PUBLISHER:15, ?VERSION_1:16,
+ CorrelationId:32, PublisherId:8,
+ ?STRING(WriterRefSize, WriterRef),
+ ?STRING(StreamSize, Stream)>>) ->
+ request(CorrelationId,
+ {declare_publisher, PublisherId, WriterRef, Stream});
+parse_request(<<?REQUEST:1, ?COMMAND_QUERY_PUBLISHER_SEQUENCE:15, ?VERSION_1:16,
+ CorrelationId:32,
+ ?STRING(WSize, WriterReference),
+ ?STRING(SSize, Stream)>>) ->
+ request(CorrelationId,
+ {query_publisher_sequence, WriterReference, Stream});
+parse_request(<<?REQUEST:1, ?COMMAND_DELETE_PUBLISHER:15, ?VERSION_1:16,
+ CorrelationId:32, PublisherId:8>>) ->
+ request(CorrelationId, {delete_publisher, PublisherId});
+parse_request(<<?REQUEST:1, ?COMMAND_SUBSCRIBE:15, ?VERSION_1:16,
+ CorrelationId:32, SubscriptionId:8,
+ ?STRING(StreamSize, Stream),
+ OffsetType:16/signed,
+ OffsetAndCredit/binary>>) ->
+ {OffsetSpec, Credit} = case OffsetType of
+ ?OFFSET_TYPE_FIRST ->
+ <<Crdt:16>> = OffsetAndCredit,
+ {first, Crdt};
+ ?OFFSET_TYPE_LAST ->
+ <<Crdt:16>> = OffsetAndCredit,
+ {last, Crdt};
+ ?OFFSET_TYPE_NEXT ->
+ <<Crdt:16>> = OffsetAndCredit,
+ {next, Crdt};
+ ?OFFSET_TYPE_OFFSET ->
+ <<Offset:64/unsigned, Crdt:16>> =
+ OffsetAndCredit,
+ {Offset, Crdt};
+ ?OFFSET_TYPE_TIMESTAMP ->
+ <<Timestamp:64/signed, Crdt:16>> =
+ OffsetAndCredit,
+ {{timestamp, Timestamp}, Crdt}
+ end,
+ request(CorrelationId,
+ {subscribe, SubscriptionId, Stream, OffsetSpec, Credit});
+parse_request(<<?REQUEST:1, ?COMMAND_COMMIT_OFFSET:15, ?VERSION_1:16,
+ _CorrelationId:32,
+ ?STRING(RefSize, OffsetRef),
+ ?STRING(SSize, Stream),
+ Offset:64>>) ->
+ %% NB: this request has no response so ignoring correlation id here
+ {commit_offset, OffsetRef, Stream, Offset};
+parse_request(<<?REQUEST:1, ?COMMAND_QUERY_OFFSET:15, ?VERSION_1:16,
+ CorrelationId:32,
+ ?STRING(RefSize, OffsetRef),
+ ?STRING(SSize, Stream)>>) ->
+ request(CorrelationId, {query_offset, OffsetRef, Stream});
+parse_request(<<?REQUEST:1, ?COMMAND_UNSUBSCRIBE:15, ?VERSION_1:16,
+ CorrelationId:32, SubscriptionId:8>>) ->
+ request(CorrelationId, {unsubscribe, SubscriptionId});
+parse_request(<<?REQUEST:1, ?COMMAND_CREATE_STREAM:15, ?VERSION_1:16,
+ CorrelationId:32,
+ ?STRING(StreamSize, Stream),
+ _ArgumentsCount:32,
+ ArgumentsBinary/binary>>) ->
+ Args = parse_map(ArgumentsBinary, #{}),
+ request(CorrelationId, {create_stream, Stream, Args});
+parse_request(<<?REQUEST:1, ?COMMAND_DELETE_STREAM:15, ?VERSION_1:16,
+ CorrelationId:32,
+ ?STRING(StreamSize, Stream)>>) ->
+ request(CorrelationId, {delete_stream, Stream});
+parse_request(<<?REQUEST:1, ?COMMAND_METADATA:15, ?VERSION_1:16,
+ CorrelationId:32, _StreamCount:32,
+ BinaryStreams/binary>>) ->
+ Streams = list_of_strings(BinaryStreams),
+ request(CorrelationId, {metadata, Streams});
+parse_request(<<?REQUEST:1, ?COMMAND_PEER_PROPERTIES:15, ?VERSION_1:16,
+ CorrelationId:32, _PropertiesCount:32,
+ PropertiesBinary/binary>>) ->
+ Props = parse_map(PropertiesBinary, #{}),
+ request(CorrelationId, {peer_properties, Props});
+parse_request(<<?REQUEST:1, ?COMMAND_SASL_HANDSHAKE:15, ?VERSION_1:16,
+ CorrelationId:32>>) ->
+ request(CorrelationId, sasl_handshake);
+parse_request(<<?REQUEST:1, ?COMMAND_SASL_AUTHENTICATE:15, ?VERSION_1:16,
+ CorrelationId:32,
+ ?STRING(MechanismSize, Mechanism),
+ SaslFragment/binary>>) ->
+ SaslBin =
+ case SaslFragment of
+ <<(-1):32/signed>> ->
+ <<>>;
+ <<SaslBinaryLength:32, SaslBinary:SaslBinaryLength/binary>> ->
+ SaslBinary
+ end,
+ request(CorrelationId,
+ {sasl_authenticate, Mechanism, SaslBin});
+parse_request(<<?REQUEST:1, ?COMMAND_TUNE:15, ?VERSION_1:16,
+ FrameMax:32, Heartbeat:32>>) ->
+ %% NB: no correlatio id but uses the response bit
+ {tune, FrameMax, Heartbeat};
+parse_request(<<?REQUEST:1, ?COMMAND_OPEN:15, ?VERSION_1:16,
+ CorrelationId:32,
+ ?STRING(VhostSize, VirtualHost)>>) ->
+ request(CorrelationId,
+ {open, VirtualHost});
+parse_request(<<?REQUEST:1, ?COMMAND_CLOSE:15, ?VERSION_1:16,
+ CorrelationId:32,
+ CloseCode:16,
+ ?STRING(ReasonSize, Reason)>>) ->
+ request(CorrelationId,
+ {close, CloseCode, Reason});
+parse_request(<<?REQUEST:1, ?COMMAND_ROUTE:15, ?VERSION_1:16,
+ CorrelationId:32,
+ ?STRING(RKeySize, RoutingKey),
+ ?STRING(StreamSize, SuperStream)>>) ->
+ request(CorrelationId,
+ {route, RoutingKey, SuperStream});
+parse_request(<<?REQUEST:1, ?COMMAND_PARTITIONS:15, ?VERSION_1:16,
+ CorrelationId:32,
+ ?STRING(StreamSize, SuperStream)>>) ->
+ request(CorrelationId,
+ {partitions, SuperStream});
+parse_request(Bin) ->
+ % {partitions, SuperStream :: binary()}} |
+ exit({no, Bin}).
+
+parse_response(<<?RESPONSE:1, CommandId:15, ?VERSION_1:16,
+ CorrelationId:32, ResponseCode:16>>) ->
+ {response, CorrelationId, {parse_command_id(CommandId), ResponseCode}};
+parse_response(<<?RESPONSE:1, ?COMMAND_TUNE:15, ?VERSION_1:16,
+ FrameMax:32, Heartbeat:32>>) ->
+ {tune, FrameMax, Heartbeat};
+parse_response(<<?RESPONSE:1, CommandId:15, ?VERSION_1:16,
+ CorrelationId:32, Data/binary>>) ->
+ {response, CorrelationId, parse_response_body(CommandId, Data)}.
+
+parse_response_body(?COMMAND_QUERY_PUBLISHER_SEQUENCE,
+ <<ResponseCode:16, Sequence:64>>) ->
+ {query_publisher_sequence, ResponseCode, Sequence};
+parse_response_body(?COMMAND_QUERY_OFFSET,
+ <<ResponseCode:16, Offset:64>>) ->
+ {query_offset, ResponseCode, Offset};
+parse_response_body(?COMMAND_METADATA, <<NumNodes:32, Data/binary>>) ->
+ {NodesLookup, MetadataBin} = parse_nodes(Data, NumNodes, #{}),
+ Nodes = maps:values(NodesLookup),
+ Metadata = parse_meta(MetadataBin, NodesLookup, #{}),
+ {metadata, Nodes, Metadata};
+parse_response_body(?COMMAND_PEER_PROPERTIES,
+ <<ResponseCode:16, _Count:32, PropertiesBin/binary>>) ->
+ Props = parse_map(PropertiesBin, #{}),
+ {peer_properties, ResponseCode, Props};
+parse_response_body(?COMMAND_SASL_HANDSHAKE,
+ <<ResponseCode:16, _Count:32, MechanismsBin/binary>>) ->
+ Props = list_of_strings(MechanismsBin),
+ {peer_properties, ResponseCode, Props};
+parse_response_body(?COMMAND_SASL_AUTHENTICATE,
+ <<ResponseCode:16, ChallengeBin/binary>>) ->
+ Challenge = case ChallengeBin of
+ <<?STRING(CSize, Chall)>> ->
+ Chall;
+ <<>> ->
+ <<>>
+ end,
+ {sasl_authenticate, ResponseCode, Challenge};
+parse_response_body(?COMMAND_ROUTE,
+ <<ResponseCode:16,
+ ?STRING(StreamSize, Stream)>>) ->
+ {route, ResponseCode, Stream};
+parse_response_body(?COMMAND_PARTITIONS,
+ <<ResponseCode:16,
+ _Count:32,
+ PartitionsBin/binary>>) ->
+ Partitions = list_of_strings(PartitionsBin),
+ {partitions, ResponseCode, Partitions}.
+
+request(Corr, Cmd) ->
+ {request, Corr, Cmd}.
+
+parse_meta(<<>>, _Nodes, Acc) ->
+ Acc;
+parse_meta(<<?STRING(StreamSize, Stream),
+ Code:16,
+ LeaderIndex:16,
+ ReplicaCount:32,
+ ReplicaIndexBin:(ReplicaCount * 16)/binary,
+ Rem/binary>>, Nodes,
+ Acc) ->
+ StreamDetail = case Code of
+ ?RESPONSE_CODE_OK ->
+ Leader = maps:get(LeaderIndex, Nodes),
+ Replicas = [maps:get(I, Nodes) ||
+ I <- list_of_shorts(ReplicaIndexBin)],
+ {Leader, Replicas};
+ ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST ->
+ stream_not_found;
+ ?RESPONSE_CODE_STREAM_NOT_AVAILABLE ->
+ stream_not_available
+ end,
+ parse_meta(Rem, Nodes, Acc#{Stream => StreamDetail}).
+
+parse_nodes(Rem, 0, Acc) ->
+ {Acc, Rem};
+parse_nodes(<<Index:16,
+ ?STRING(HostSize, Host),
+ Port:32, Rem/binary>>, C, Acc) ->
+ parse_nodes(Rem, C - 1, Acc#{Index => {Host, Port}}).
+
+parse_map(<<>>, Acc) ->
+ Acc;
+parse_map(<<?STRING(KeySize, Key),
+ ?STRING(ValSize, Value),
+ Rem/binary>>, Acc) ->
+ parse_map(Rem, Acc#{Key => Value}).
+
+list_of_strings(<<>>) ->
+ [];
+list_of_strings(<<?STRING(Size, String), Rem/binary>>) ->
+ [String | list_of_strings(Rem)].
+
+list_of_longs(<<>>) ->
+ [];
+list_of_longs(<<I:64, Rem/binary>>) ->
+ [I | list_of_longs(Rem)].
+
+list_of_shorts(<<>>) ->
+ [];
+list_of_shorts(<<I:16, Rem/binary>>) ->
+ [I | list_of_shorts(Rem)].
+
+list_of_longcodes(<<>>) ->
+ [];
+list_of_longcodes(<<I:64, C:16, Rem/binary>>) ->
+ [{I, C} | list_of_longcodes(Rem)].
+
+
+parse_command_id(?COMMAND_DECLARE_PUBLISHER) -> declare_publisher;
+parse_command_id(?COMMAND_DELETE_PUBLISHER) -> delete_publisher;
+parse_command_id(?COMMAND_SUBSCRIBE) -> subscribe;
+parse_command_id(?COMMAND_UNSUBSCRIBE) -> unsubscribe;
+parse_command_id(?COMMAND_CREATE_STREAM) -> create_stream;
+parse_command_id(?COMMAND_DELETE_STREAM) -> delete_stream;
+parse_command_id(?COMMAND_OPEN) -> open;
+parse_command_id(?COMMAND_CLOSE) -> close.
+command_id(declare_publisher) -> ?COMMAND_DECLARE_PUBLISHER;
+command_id(delete_publisher) -> ?COMMAND_DELETE_PUBLISHER;
+command_id(subscribe) -> ?COMMAND_SUBSCRIBE;
+command_id(unsubscribe) -> ?COMMAND_UNSUBSCRIBE;
+command_id(create_stream) -> ?COMMAND_CREATE_STREAM;
+command_id(delete_stream) -> ?COMMAND_DELETE_STREAM;
+command_id(open) -> ?COMMAND_OPEN;
+command_id(close) -> ?COMMAND_CLOSE.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index 443f8ca80e..c7fbd84504 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -81,7 +81,7 @@ route(RoutingKey, VirtualHost, SuperStream) ->
{route, RoutingKey, VirtualHost, SuperStream}).
-spec partitions(binary(), binary()) ->
- {ok, [binary()] | {error, stream_not_found}}.
+ {ok, [binary()]} | {error, stream_not_found}.
partitions(VirtualHost, SuperStream) ->
gen_server:call(?MODULE, {partitions, VirtualHost, SuperStream}).
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 793a37f75c..86b459fe7f 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -41,7 +41,8 @@
stream :: stream(),
counters :: atomics:atomics_ref()}).
-record(stream_connection_state,
- {data :: none | binary(), blocked :: boolean(),
+ {data :: rabbit_stream_core:state(),
+ blocked :: boolean(),
consumers :: #{subscription_id() => #consumer{}}}).
-record(stream_connection,
{name :: binary(),
@@ -198,7 +199,7 @@ init([KeepaliveSup,
State =
#stream_connection_state{consumers = #{},
blocked = false,
- data = none},
+ data = rabbit_stream_core:init(undefined)},
Transport:setopts(RealSocket, [{active, once}]),
rabbit_alarm:register(self(), {?MODULE, resource_alarm, []}),
@@ -789,7 +790,7 @@ listen_loop_post_close(Transport,
=
IsThereAlarm},
State,
- Configuration);
+ <<>>);
{OK, S, Data} ->
Transport:setopts(S, [{active, once}]),
{Connection1, State1} =
@@ -846,61 +847,87 @@ handle_inbound_data_post_close(Transport, Connection, State, Rest) ->
Rest,
fun handle_frame_post_close/5).
-handle_inbound_data(_Transport,
- Connection,
- State,
- <<>>,
- _HandleFrameFun) ->
- {Connection, State};
-handle_inbound_data(Transport,
- #stream_connection{frame_max = FrameMax} = Connection,
- #stream_connection_state{data = none} = State,
- <<Size:32, _Frame:Size/binary, _Rest/bits>>,
- _HandleFrameFun)
- when FrameMax /= 0 andalso Size > FrameMax - 4 ->
- CloseReason = <<"frame too large">>,
- CloseReasonLength = byte_size(CloseReason),
- CloseFrame =
- <<?REQUEST:1,
- ?COMMAND_CLOSE:15,
- ?VERSION_1:16,
- 1:32,
- ?RESPONSE_CODE_FRAME_TOO_LARGE:16,
- CloseReasonLength:16,
- CloseReason:CloseReasonLength/binary>>,
- frame(Transport, Connection, CloseFrame),
- {Connection#stream_connection{connection_step = close_sent}, State};
-handle_inbound_data(Transport,
- Connection,
- #stream_connection_state{data = none} = State,
- <<Size:32, Frame:Size/binary, Rest/bits>>,
- HandleFrameFun) ->
- {Connection1, State1, Rest1} =
- HandleFrameFun(Transport, Connection, State, Frame, Rest),
- handle_inbound_data(Transport,
- Connection1,
- State1,
- Rest1,
- HandleFrameFun);
-handle_inbound_data(_Transport,
- Connection,
- #stream_connection_state{data = none} = State,
- Data,
- _HandleFrameFun) ->
- {Connection, State#stream_connection_state{data = Data}};
handle_inbound_data(Transport,
Connection,
- #stream_connection_state{data = Leftover} = State,
+ #stream_connection_state{data = CoreState0} = State,
Data,
HandleFrameFun) ->
- State1 = State#stream_connection_state{data = none},
- %% FIXME avoid concatenation to avoid a new binary allocation
- %% see osiris_replica:parse_chunk/3
- handle_inbound_data(Transport,
- Connection,
- State1,
- <<Leftover/binary, Data/binary>>,
- HandleFrameFun).
+ case rabbit_stream_core:incoming_data(Data, CoreState0) of
+ {CoreState, []} ->
+ % rabbit_log:info("NO FRAMES ", []),
+ {Connection,
+ State#stream_connection_state{data = CoreState}};
+ {CoreState, Commands} ->
+ % rabbit_log:info("FRAMES ~w", [length(Frames)]),
+ {Connection1, State1, _} =
+ lists:foldl(
+ fun (Command, {C, S, R}) ->
+ HandleFrameFun(Transport, C, S, Command, R)
+ end, {Connection,
+ State#stream_connection_state{data = CoreState},
+ <<>>}, Commands),
+ handle_inbound_data(Transport,
+ Connection1,
+ State1,
+ <<>>,
+ HandleFrameFun)
+ end.
+
+% handle_inbound_data(_Transport,
+% Connection,
+% State,
+% <<>>,
+% _HandleFrameFun) ->
+% {Connection, State};
+% handle_inbound_data(Transport,
+% #stream_connection{frame_max = FrameMax} = Connection,
+% #stream_connection_state{data = none} = State,
+% <<Size:32, _Frame:Size/binary, _Rest/bits>>,
+% _HandleFrameFun)
+% when FrameMax /= 0 andalso Size > FrameMax - 4 ->
+% CloseReason = <<"frame too large">>,
+% CloseReasonLength = byte_size(CloseReason),
+% CloseFrame =
+% <<?REQUEST:1,
+% ?COMMAND_CLOSE:15,
+% ?VERSION_1:16,
+% 1:32,
+% ?RESPONSE_CODE_FRAME_TOO_LARGE:16,
+% CloseReasonLength:16,
+% CloseReason:CloseReasonLength/binary>>,
+% frame(Transport, Connection, CloseFrame),
+% {Connection#stream_connection{connection_step = close_sent}, State};
+% handle_inbound_data(Transport,
+% Connection,
+% #stream_connection_state{data = none} = State,
+% <<Size:32, Frame:Size/binary, Rest/bits>>,
+% HandleFrameFun) ->
+% {Connection1, State1, Rest1} =
+% HandleFrameFun(Transport, Connection, State, Frame, Rest),
+% handle_inbound_data(Transport,
+% Connection1,
+% State1,
+% Rest1,
+% HandleFrameFun);
+% handle_inbound_data(_Transport,
+% Connection,
+% #stream_connection_state{data = none} = State,
+% Data,
+% _HandleFrameFun) ->
+% {Connection, State#stream_connection_state{data = Data}};
+% handle_inbound_data(Transport,
+% Connection,
+% #stream_connection_state{data = Leftover} = State,
+% Data,
+% HandleFrameFun) ->
+% State1 = State#stream_connection_state{data = none},
+% %% FIXME avoid concatenation to avoid a new binary allocation
+% %% see osiris_replica:parse_chunk/3
+% handle_inbound_data(Transport,
+% Connection,
+% State1,
+% <<Leftover/binary, Data/binary>>,
+% HandleFrameFun).
generate_publishing_error_details(Acc, _Code, <<>>) ->
Acc;
@@ -916,16 +943,9 @@ generate_publishing_error_details(Acc, Code,
handle_frame_pre_auth(Transport,
#stream_connection{socket = S} = Connection,
State,
- <<?REQUEST:1,
- ?COMMAND_PEER_PROPERTIES:15,
- ?VERSION_1:16,
- CorrelationId:32,
- ClientPropertiesCount:32,
- ClientPropertiesFrame/binary>>,
+ {request, CorrelationId,
+ {peer_properties, ClientProperties}},
Rest) ->
- {ClientProperties, _} =
- rabbit_stream_utils:parse_map(ClientPropertiesFrame,
- ClientPropertiesCount),
{ok, Product} = application:get_key(rabbit, description),
{ok, Version} = application:get_key(rabbit, vsn),
@@ -984,10 +1004,7 @@ handle_frame_pre_auth(Transport,
handle_frame_pre_auth(Transport,
#stream_connection{socket = S} = Connection,
State,
- <<?REQUEST:1,
- ?COMMAND_SASL_HANDSHAKE:15,
- ?VERSION_1:16,
- CorrelationId:32>>,
+ {request, CorrelationId, sasl_handshake},
Rest) ->
Mechanisms = rabbit_stream_utils:auth_mechanisms(S),
MechanismsFragment =
@@ -1015,21 +1032,9 @@ handle_frame_pre_auth(Transport,
host = Host} =
Connection0,
State,
- <<?REQUEST:1,
- ?COMMAND_SASL_AUTHENTICATE:15,
- ?VERSION_1:16,
- CorrelationId:32,
- MechanismLength:16,
- Mechanism:MechanismLength/binary,
- SaslFragment/binary>>,
+ {request, CorrelationId,
+ {sasl_authenticate, Mechanism, SaslBin}},
Rest) ->
- SaslBin =
- case SaslFragment of
- <<(-1):32/signed>> ->
- <<>>;
- <<SaslBinaryLength:32, SaslBinary:SaslBinaryLength/binary>> ->
- SaslBinary
- end,
{Connection1, Rest1} =
case rabbit_stream_utils:auth_mechanism_to_module(Mechanism, S) of
@@ -1137,11 +1142,7 @@ handle_frame_pre_auth(_Transport,
name = ConnectionName} =
Connection,
State,
- <<?RESPONSE:1,
- ?COMMAND_TUNE:15,
- ?VERSION_1:16,
- FrameMax:32,
- Heartbeat:32>>,
+ {tune, FrameMax, Heartbeat},
Rest) ->
rabbit_log:debug("Tuning response ~p ~p ", [FrameMax, Heartbeat]),
Parent = self(),
@@ -1172,12 +1173,8 @@ handle_frame_pre_auth(_Transport,
handle_frame_pre_auth(Transport,
#stream_connection{user = User, socket = S} = Connection,
State,
- <<?REQUEST:1,
- ?COMMAND_OPEN:15,
- ?VERSION_1:16,
- CorrelationId:32,
- VirtualHostLength:16,
- VirtualHost:VirtualHostLength/binary>>,
+ {request, CorrelationId,
+ {open, VirtualHost}},
Rest) ->
%% FIXME enforce connection limit (see rabbit_reader:is_over_connection_limit/2)
{Connection1, Frame} =
@@ -1211,13 +1208,13 @@ handle_frame_pre_auth(Transport,
handle_frame_pre_auth(_Transport,
Connection,
State,
- <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>,
+ heartbeat,
Rest) ->
rabbit_log:info("Received heartbeat frame pre auth~n"),
{Connection, State, Rest};
-handle_frame_pre_auth(_Transport, Connection, State, Frame, Rest) ->
- rabbit_log:warning("unknown frame ~p ~p, closing connection.",
- [Frame, Rest]),
+handle_frame_pre_auth(_Transport, Connection, State, Command, Rest) ->
+ rabbit_log:warning("unknown command ~p ~p, closing connection.",
+ [Command, Rest]),
{Connection#stream_connection{connection_step = failure}, State,
Rest}.
@@ -1256,15 +1253,8 @@ notify_auth_result(Username,
handle_frame_post_auth(Transport,
#stream_connection{resource_alarm = true} = Connection0,
State,
- <<?REQUEST:1,
- ?COMMAND_DECLARE_PUBLISHER:15,
- ?VERSION_1:16,
- CorrelationId:32,
- PublisherId:8,
- ReferenceSize:16,
- _Reference:ReferenceSize/binary,
- StreamSize:16,
- Stream:StreamSize/binary>>,
+ {request, CorrelationId,
+ {declare_publisher, PublisherId, _WriterRef, Stream}},
Rest) ->
rabbit_log:info("Cannot create publisher ~p on stream ~p, connection "
"is blocked because of resource alarm",
@@ -1282,15 +1272,8 @@ handle_frame_post_auth(Transport,
resource_alarm = false} =
Connection0,
State,
- <<?REQUEST:1,
- ?COMMAND_DECLARE_PUBLISHER:15,
- ?VERSION_1:16,
- CorrelationId:32,
- PublisherId:8,
- ReferenceSize:16,
- Reference:ReferenceSize/binary,
- StreamSize:16,
- Stream:StreamSize/binary>>,
+ {request, CorrelationId,
+ {declare_publisher, PublisherId, WriterRef, Stream}},
Rest) ->
case rabbit_stream_utils:check_write_permitted(stream_r(Stream,
Connection0),
@@ -1298,7 +1281,7 @@ handle_frame_post_auth(Transport,
of
ok ->
case {maps:is_key(PublisherId, Publishers0),
- maps:is_key({Stream, Reference}, RefIds0)}
+ maps:is_key({Stream, WriterRef}, RefIds0)}
of
{false, false} ->
case lookup_leader(Stream, Connection0) of
@@ -1314,12 +1297,12 @@ handle_frame_post_auth(Transport,
publisher_to_ids = RefIds0} =
Connection1} ->
{PublisherReference, RefIds1} =
- case Reference of
+ case WriterRef of
<<"">> ->
{undefined, RefIds0};
_ ->
- {Reference,
- RefIds0#{{Stream, Reference} =>
+ {WriterRef,
+ RefIds0#{{Stream, WriterRef} =>
PublisherId}}
end,
Publisher =
@@ -1372,12 +1355,7 @@ handle_frame_post_auth(Transport,
publishers = Publishers} =
Connection,
State,
- <<?REQUEST:1,
- ?COMMAND_PUBLISH:15,
- ?VERSION_1:16,
- PublisherId:8/unsigned,
- MessageCount:32,
- Messages/binary>>,
+ {publish, PublisherId, MessageCount, Messages},
Rest) ->
case Publishers of
#{PublisherId := Publisher} ->
@@ -1442,14 +1420,8 @@ handle_frame_post_auth(Transport,
user = User} =
Connection,
State,
- <<?REQUEST:1,
- ?COMMAND_QUERY_PUBLISHER_SEQUENCE:15,
- ?VERSION_1:16,
- CorrelationId:32,
- ReferenceSize:16,
- Reference:ReferenceSize/binary,
- StreamSize:16,
- Stream:StreamSize/binary>>,
+ {request, CorrelationId,
+ {query_publisher_sequence, Reference, Stream}},
Rest) ->
FrameSize = ?RESPONSE_FRAME_SIZE + 8,
{ResponseCode, Sequence} =
@@ -1492,11 +1464,8 @@ handle_frame_post_auth(Transport,
publisher_to_ids = PubToIds} =
Connection0,
State,
- <<?REQUEST:1,
- ?COMMAND_DELETE_PUBLISHER:15,
- ?VERSION_1:16,
- CorrelationId:32,
- PublisherId:8>>,
+ {request, CorrelationId,
+ {delete_publisher, PublisherId}},
Rest) ->
case Publishers of
#{PublisherId := #publisher{stream = Stream, reference = Ref}} ->
@@ -1536,15 +1505,9 @@ handle_frame_post_auth(Transport,
send_file_oct = SendFileOct} =
Connection,
#stream_connection_state{consumers = Consumers} = State,
- <<?REQUEST:1,
- ?COMMAND_SUBSCRIBE:15,
- ?VERSION_1:16,
- CorrelationId:32,
- SubscriptionId:8/unsigned,
- StreamSize:16,
- Stream:StreamSize/binary,
- OffsetType:16/signed,
- OffsetAndCredit/binary>>,
+
+ {request, CorrelationId,
+ {subscribe, SubscriptionId, Stream, OffsetSpec, Credit}},
Rest) ->
%% FIXME check the max number of subs is not reached already
case rabbit_stream_utils:check_read_permitted(#resource{name = Stream,
@@ -1582,26 +1545,6 @@ handle_frame_post_auth(Transport,
?RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS),
{Connection, State, Rest};
false ->
- {OffsetSpec, Credit} =
- case OffsetType of
- ?OFFSET_TYPE_FIRST ->
- <<Crdt:16>> = OffsetAndCredit,
- {first, Crdt};
- ?OFFSET_TYPE_LAST ->
- <<Crdt:16>> = OffsetAndCredit,
- {last, Crdt};
- ?OFFSET_TYPE_NEXT ->
- <<Crdt:16>> = OffsetAndCredit,
- {next, Crdt};
- ?OFFSET_TYPE_OFFSET ->
- <<Offset:64/unsigned, Crdt:16>> =
- OffsetAndCredit,
- {Offset, Crdt};
- ?OFFSET_TYPE_TIMESTAMP ->
- <<Timestamp:64/signed, Crdt:16>> =
- OffsetAndCredit,
- {{timestamp, Timestamp}, Crdt}
- end,
rabbit_log:info("Creating subscription ~p to ~p, with offset specificat"
"ion ~p",
[SubscriptionId, Stream,
@@ -1695,11 +1638,7 @@ handle_frame_post_auth(Transport,
send_file_oct = SendFileOct} =
Connection,
#stream_connection_state{consumers = Consumers} = State,
- <<?REQUEST:1,
- ?COMMAND_CREDIT:15,
- ?VERSION_1:16,
- SubscriptionId:8/unsigned,
- Credit:16/signed>>,
+ {credit, SubscriptionId, Credit},
Rest) ->
case Consumers of
#{SubscriptionId := Consumer} ->
@@ -1752,15 +1691,7 @@ handle_frame_post_auth(_Transport,
user = User} =
Connection,
State,
- <<?REQUEST:1,
- ?COMMAND_COMMIT_OFFSET:15,
- ?VERSION_1:16,
- _CorrelationId:32,
- ReferenceSize:16,
- Reference:ReferenceSize/binary,
- StreamSize:16,
- Stream:StreamSize/binary,
- Offset:64>>,
+ {commit_offset, Reference, Stream, Offset},
Rest) ->
case rabbit_stream_utils:check_write_permitted(#resource{name =
Stream,
@@ -1776,12 +1707,12 @@ handle_frame_post_auth(_Transport,
[Stream]),
%% FIXME commit offset is fire-and-forget, so no response even if error, change this?
{Connection, State, Rest};
- undefined ->
- rabbit_log:warning("Could not find leader (undefined) to commit offset "
- "on ~p",
- [Stream]),
- %% FIXME commit offset is fire-and-forget, so no response even if error, change this?
- {Connection, State, Rest};
+ % undefined ->
+ % rabbit_log:warning("Could not find leader (undefined) to commit offset "
+ % "on ~p",
+ % [Stream]),
+ % %% FIXME commit offset is fire-and-forget, so no response even if error, change this?
+ % {Connection, State, Rest};
{ClusterLeader, Connection1} ->
osiris:write_tracking(ClusterLeader, Reference, Offset),
{Connection1, State, Rest}
@@ -1797,14 +1728,8 @@ handle_frame_post_auth(Transport,
user = User} =
Connection0,
State,
- <<?REQUEST:1,
- ?COMMAND_QUERY_OFFSET:15,
- ?VERSION_1:16,
- CorrelationId:32,
- ReferenceSize:16,
- Reference:ReferenceSize/binary,
- StreamSize:16,
- Stream:StreamSize/binary>>,
+ {request, CorrelationId,
+ {query_offset, Reference, Stream}},
Rest) ->
FrameSize = ?RESPONSE_FRAME_SIZE + 8,
{ResponseCode, Offset, Connection1} =
@@ -1846,11 +1771,8 @@ handle_frame_post_auth(Transport,
StreamSubscriptions} =
Connection,
#stream_connection_state{} = State,
- <<?REQUEST:1,
- ?COMMAND_UNSUBSCRIBE:15,
- ?VERSION_1:16,
- CorrelationId:32,
- SubscriptionId:8/unsigned>>,
+ {request, CorrelationId,
+ {unsubscribe, SubscriptionId}},
Rest) ->
case subscription_exists(StreamSubscriptions, SubscriptionId) of
false ->
@@ -1876,19 +1798,11 @@ handle_frame_post_auth(Transport,
User} =
Connection,
State,
- <<?REQUEST:1,
- ?COMMAND_CREATE_STREAM:15,
- ?VERSION_1:16,
- CorrelationId:32,
- StreamSize:16,
- Stream:StreamSize/binary,
- ArgumentsCount:32,
- ArgumentsBinary/binary>>,
+ {request, CorrelationId,
+ {create_stream, Stream, Arguments}},
Rest) ->
case rabbit_stream_utils:enforce_correct_stream_name(Stream) of
{ok, StreamName} ->
- {Arguments, _Rest} =
- rabbit_stream_utils:parse_map(ArgumentsBinary, ArgumentsCount),
case rabbit_stream_utils:check_configure_permitted(#resource{name =
StreamName,
kind =
@@ -1961,12 +1875,8 @@ handle_frame_post_auth(Transport,
User} =
Connection,
State,
- <<?REQUEST:1,
- ?COMMAND_DELETE_STREAM:15,
- ?VERSION_1:16,
- CorrelationId:32,
- StreamSize:16,
- Stream:StreamSize/binary>>,
+ {request, CorrelationId,
+ {delete_stream, Stream}},
Rest) ->
case rabbit_stream_utils:check_configure_permitted(#resource{name =
Stream,
@@ -2025,14 +1935,9 @@ handle_frame_post_auth(Transport,
virtual_host = VirtualHost} =
Connection,
State,
- <<?REQUEST:1,
- ?COMMAND_METADATA:15,
- ?VERSION_1:16,
- CorrelationId:32,
- StreamCount:32,
- BinaryStreams/binary>>,
+ {request, CorrelationId, {metadata, Streams}},
Rest) ->
- Streams = rabbit_stream_utils:extract_stream_list(BinaryStreams, []),
+ StreamCount = length(Streams),
Topology =
lists:foldl(fun(Stream, Acc) ->
@@ -2171,13 +2076,8 @@ handle_frame_post_auth(Transport,
virtual_host = VirtualHost} =
Connection,
State,
- <<?COMMAND_ROUTE:16,
- ?VERSION_1:16,
- CorrelationId:32,
- RoutingKeySize:16,
- RoutingKey:RoutingKeySize/binary,
- SuperStreamSize:16,
- SuperStream:SuperStreamSize/binary>>,
+ {request, CorrelationId,
+ {route, RoutingKey, SuperStream}},
Rest) ->
{ResponseCode, StreamBin} =
case rabbit_stream_manager:route(RoutingKey, VirtualHost, SuperStream)
@@ -2206,11 +2106,8 @@ handle_frame_post_auth(Transport,
virtual_host = VirtualHost} =
Connection,
State,
- <<?COMMAND_PARTITIONS:16,
- ?VERSION_1:16,
- CorrelationId:32,
- SuperStreamSize:16,
- SuperStream:SuperStreamSize/binary>>,
+ {request, CorrelationId,
+ {partitions, SuperStream}},
Rest) ->
{ResponseCode, PartitionsBin} =
case rabbit_stream_manager:partitions(VirtualHost, SuperStream) of
@@ -2241,13 +2138,8 @@ handle_frame_post_auth(Transport,
handle_frame_post_auth(Transport,
Connection,
State,
- <<?REQUEST:1,
- ?COMMAND_CLOSE:15,
- ?VERSION_1:16,
- CorrelationId:32,
- ClosingCode:16,
- ClosingReasonLength:16,
- ClosingReason:ClosingReasonLength/binary>>,
+ {request, CorrelationId,
+ {close, ClosingCode, ClosingReason}},
_Rest) ->
rabbit_log:info("Received close command ~p ~p",
[ClosingCode, ClosingReason]),
@@ -2263,13 +2155,13 @@ handle_frame_post_auth(Transport,
handle_frame_post_auth(_Transport,
Connection,
State,
- <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>,
+ heartbeat,
Rest) ->
rabbit_log:info("Received heartbeat frame post auth~n"),
{Connection, State, Rest};
-handle_frame_post_auth(Transport, Connection, State, Frame, Rest) ->
- rabbit_log:warning("unknown frame ~p ~p, sending close command.",
- [Frame, Rest]),
+handle_frame_post_auth(Transport, Connection, State, Command, Rest) ->
+ rabbit_log:warning("unknown command ~p ~p, sending close command.",
+ [Command, Rest]),
CloseReason = <<"unknown frame">>,
CloseReasonLength = byte_size(CloseReason),
CloseFrame =
@@ -2311,11 +2203,12 @@ notify_connection_closed(#stream_connection{name = Name,
handle_frame_post_close(_Transport,
Connection,
State,
- <<?RESPONSE:1,
- ?COMMAND_CLOSE:15,
- ?VERSION_1:16,
- _CorrelationId:32,
- _ResponseCode:16>>,
+ {response, _CorrelationId, {close, _Code}},
+ % <<?RESPONSE:1,
+ % ?COMMAND_CLOSE:15,
+ % ?VERSION_1:16,
+ % _CorrelationId:32,
+ % _ResponseCode:16>>,
Rest) ->
rabbit_log:info("Received close confirmation~n"),
{Connection#stream_connection{connection_step = closing_done}, State,
@@ -2323,12 +2216,12 @@ handle_frame_post_close(_Transport,
handle_frame_post_close(_Transport,
Connection,
State,
- <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>,
+ heartbeat,
Rest) ->
- rabbit_log:info("Received heartbeat frame post close~n"),
+ rabbit_log:info("Received heartbeat command post close~n"),
{Connection, State, Rest};
-handle_frame_post_close(_Transport, Connection, State, Frame, Rest) ->
- rabbit_log:warning("ignored frame on close ~p ~p.", [Frame, Rest]),
+handle_frame_post_close(_Transport, Connection, State, Command, Rest) ->
+ rabbit_log:warning("ignored command on close ~p ~p.", [Command, Rest]),
{Connection, State, Rest}.
stream_r(Stream, #stream_connection{virtual_host = VHost}) ->