diff options
Diffstat (limited to 'deps')
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_core.erl | 347 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_manager.erl | 2 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 401 |
3 files changed, 487 insertions, 263 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_core.erl b/deps/rabbitmq_stream/src/rabbit_stream_core.erl index 665ee3c4b9..400d74131e 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_core.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_core.erl @@ -4,16 +4,20 @@ -export([ init/1, incoming_data/2, - frame/1 + frame/1, + parse_command/1 ]). %% holds static or rarely changing fields -record(cfg, {}). -record(?MODULE, {cfg :: #cfg{}, + frames = [] :: [iodata()], %% partial data - partial_frame :: undefined | - {ExpectedLength :: non_neg_integer(), iolist()} + data :: undefined | + %% this is only if the binary is smaller than 4 bytes + binary() | + {RemainingBytes :: non_neg_integer(), iodata()} }). -opaque state() :: #?MODULE{}. @@ -67,6 +71,7 @@ {credit, subscription_id(), Credit :: non_neg_integer()} | {metadata_update, stream_name(), response_code()} | heartbeat | + {tune, FrameMax :: non_neg_integer(), HeartBeat :: non_neg_integer()} | {request, correlation_id(), {declare_publisher, publisher_id(), writer_ref(), stream_name()} | {query_publisher_sequence, writer_ref(), stream_name()} | @@ -83,7 +88,6 @@ sasl_handshake | %% TODO: look into {sasl_authenticate, Mechanism :: binary(), SaslFragment :: binary()} | - {tune, FrameMax :: non_neg_integer(), HeartBeat :: non_neg_integer()} | {open, VirtualHost :: binary()} | {close, Code :: non_neg_integer(), Reason :: binary()} | {route, RoutingKey :: binary(), SuperStream :: binary()} | @@ -111,7 +115,7 @@ {peer_properties, response_code(), #{binary() => binary()}} | {sasl_handshake, response_code(), Mechanisms :: [binary()]} | %% either response code or sasl fragment - {sasl_authenticate, response_code() | binary()} | + {sasl_authenticate, response_code(), Challenge :: binary()} | {tune, FrameMax :: non_neg_integer(), HeartBeat :: non_neg_integer()} | %% TODO should route return a list of routed streams? {route, response_code(), stream_name()} | @@ -120,12 +124,50 @@ -spec init(term()) -> state(). init(_) -> - #?MODULE{}. + #?MODULE{cfg = #cfg{}}. +%% returns frames -spec incoming_data(binary(), state()) -> {state(), [command()]}. -incoming_data(_, State) -> - {State, []}. +%% TODO: check max frame size +incoming_data(<<>>, #?MODULE{frames = Frames} = State) -> + {State#?MODULE{frames = []}, parse_frames(Frames)}; +incoming_data(<<Size:32/unsigned, Frame:Size/binary, Rem/binary>>, + #?MODULE{frames = Frames, + data = undefined} = State) -> + incoming_data(Rem, State#?MODULE{frames = [Frame | Frames], + data = undefined}); +incoming_data(<<Size:32/unsigned, Rem/binary>>, + #?MODULE{frames = Frames, + data = undefined} = State) -> + %% not enough data to complete frame, stash and await more data + {State#?MODULE{frames = [], + data = {Size, Rem}}, + parse_frames(Frames)}; +incoming_data(Data, + #?MODULE{frames = Frames, + data = {Size, Partial}} = State) -> + case Data of + <<Data:Size/binary, Rem/binary>> -> + incoming_data(Rem, State#?MODULE{frames = [append_data(Partial, [Data]) + | Frames], + data = undefined}); + Rem -> + {State#?MODULE{frames = [], + data = {Size - byte_size(Rem), + append_data(Partial, Rem)}}, + parse_frames(Frames)} + end; +incoming_data(Data, #?MODULE{data = Partial} = State) + when is_binary(Partial) -> + incoming_data(<<Partial/binary, Data/binary>>, + State#?MODULE{data = undefined}). + +parse_frames(Frames) -> + lists:foldl( + fun (Frame, Acc) -> + [parse_command(Frame) | Acc] + end, [], Frames). -spec frame(command()) -> iodata(). frame({publish_confirm, PublisherId, PublishingIds}) -> @@ -144,11 +186,300 @@ frame({publish_confirm, PublisherId, PublishingIds}) -> frame(_Command) -> []. +append_data(Prev, Data) when is_binary(Prev) -> + [Prev, Data]; +append_data(Prev, Data) when is_list(Prev) -> + Prev ++ [Data]. + wrap_in_frame(IOData) -> Size = iolist_size(IOData), [<<Size:32>> | IOData]. +parse_command(<<?REQUEST:1, _:15, _/binary>> = Bin) -> + parse_request(Bin); +parse_command(<<?RESPONSE:1, _:15, _/binary>> = Bin) -> + parse_response(Bin); +parse_command(Data) when is_list(Data) -> + %% TODO: most commands are rare or small and likely to be a single + %% binary, however publish and delivery should be parsed from the + %% iodata rather than turned into a binary + parse_command(iolist_to_binary(Data)). + +-define(STRING(Size, Str), Size:16, Str:Size/binary). + +-spec parse_request(binary()) -> command(). +parse_request(<<?REQUEST:1, ?COMMAND_PUBLISH:15, ?VERSION_1:16, + PublisherId:8/unsigned, MessageCount:32, Messages/binary>>) -> + {publish, PublisherId, MessageCount, Messages}; +parse_request(<<?REQUEST:1, ?COMMAND_PUBLISH_CONFIRM:15, ?VERSION_1:16, + PublisherId:8, _Count:32, PublishingIds/binary>>) -> + {publish_confirm, PublisherId, list_of_longs(PublishingIds)}; +parse_request(<<?REQUEST:1, ?COMMAND_DELIVER:15, ?VERSION_1:16, + SubscriptionId:8, Chunk/binary>>) -> + {deliver, SubscriptionId, Chunk}; +parse_request(<<?REQUEST:1, ?COMMAND_CREDIT:15, ?VERSION_1:16, + SubscriptionId:8, Credit:16/signed>>) -> + {credit, SubscriptionId, Credit}; +parse_request(<<?REQUEST:1, ?COMMAND_PUBLISH_ERROR:15, ?VERSION_1:16, + PublisherId:8, _Count:32, DetailsBin/binary>>) -> + %% TODO: change protocol to match + [{_, ErrCode} | _] = Details = list_of_longcodes(DetailsBin), + {PublishingIds, _} = lists:unzip(Details), + {publish_error, PublisherId, ErrCode, PublishingIds}; +parse_request(<<?REQUEST:1, ?COMMAND_METADATA_UPDATE:15, ?VERSION_1:16, + ResponseCode:16, StreamSize:16, Stream:StreamSize/binary>>) -> + {metadata_update, Stream, ResponseCode}; +parse_request(<<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>) -> + heartbeat; +parse_request(<<?REQUEST:1, ?COMMAND_DECLARE_PUBLISHER:15, ?VERSION_1:16, + CorrelationId:32, PublisherId:8, + ?STRING(WriterRefSize, WriterRef), + ?STRING(StreamSize, Stream)>>) -> + request(CorrelationId, + {declare_publisher, PublisherId, WriterRef, Stream}); +parse_request(<<?REQUEST:1, ?COMMAND_QUERY_PUBLISHER_SEQUENCE:15, ?VERSION_1:16, + CorrelationId:32, + ?STRING(WSize, WriterReference), + ?STRING(SSize, Stream)>>) -> + request(CorrelationId, + {query_publisher_sequence, WriterReference, Stream}); +parse_request(<<?REQUEST:1, ?COMMAND_DELETE_PUBLISHER:15, ?VERSION_1:16, + CorrelationId:32, PublisherId:8>>) -> + request(CorrelationId, {delete_publisher, PublisherId}); +parse_request(<<?REQUEST:1, ?COMMAND_SUBSCRIBE:15, ?VERSION_1:16, + CorrelationId:32, SubscriptionId:8, + ?STRING(StreamSize, Stream), + OffsetType:16/signed, + OffsetAndCredit/binary>>) -> + {OffsetSpec, Credit} = case OffsetType of + ?OFFSET_TYPE_FIRST -> + <<Crdt:16>> = OffsetAndCredit, + {first, Crdt}; + ?OFFSET_TYPE_LAST -> + <<Crdt:16>> = OffsetAndCredit, + {last, Crdt}; + ?OFFSET_TYPE_NEXT -> + <<Crdt:16>> = OffsetAndCredit, + {next, Crdt}; + ?OFFSET_TYPE_OFFSET -> + <<Offset:64/unsigned, Crdt:16>> = + OffsetAndCredit, + {Offset, Crdt}; + ?OFFSET_TYPE_TIMESTAMP -> + <<Timestamp:64/signed, Crdt:16>> = + OffsetAndCredit, + {{timestamp, Timestamp}, Crdt} + end, + request(CorrelationId, + {subscribe, SubscriptionId, Stream, OffsetSpec, Credit}); +parse_request(<<?REQUEST:1, ?COMMAND_COMMIT_OFFSET:15, ?VERSION_1:16, + _CorrelationId:32, + ?STRING(RefSize, OffsetRef), + ?STRING(SSize, Stream), + Offset:64>>) -> + %% NB: this request has no response so ignoring correlation id here + {commit_offset, OffsetRef, Stream, Offset}; +parse_request(<<?REQUEST:1, ?COMMAND_QUERY_OFFSET:15, ?VERSION_1:16, + CorrelationId:32, + ?STRING(RefSize, OffsetRef), + ?STRING(SSize, Stream)>>) -> + request(CorrelationId, {query_offset, OffsetRef, Stream}); +parse_request(<<?REQUEST:1, ?COMMAND_UNSUBSCRIBE:15, ?VERSION_1:16, + CorrelationId:32, SubscriptionId:8>>) -> + request(CorrelationId, {unsubscribe, SubscriptionId}); +parse_request(<<?REQUEST:1, ?COMMAND_CREATE_STREAM:15, ?VERSION_1:16, + CorrelationId:32, + ?STRING(StreamSize, Stream), + _ArgumentsCount:32, + ArgumentsBinary/binary>>) -> + Args = parse_map(ArgumentsBinary, #{}), + request(CorrelationId, {create_stream, Stream, Args}); +parse_request(<<?REQUEST:1, ?COMMAND_DELETE_STREAM:15, ?VERSION_1:16, + CorrelationId:32, + ?STRING(StreamSize, Stream)>>) -> + request(CorrelationId, {delete_stream, Stream}); +parse_request(<<?REQUEST:1, ?COMMAND_METADATA:15, ?VERSION_1:16, + CorrelationId:32, _StreamCount:32, + BinaryStreams/binary>>) -> + Streams = list_of_strings(BinaryStreams), + request(CorrelationId, {metadata, Streams}); +parse_request(<<?REQUEST:1, ?COMMAND_PEER_PROPERTIES:15, ?VERSION_1:16, + CorrelationId:32, _PropertiesCount:32, + PropertiesBinary/binary>>) -> + Props = parse_map(PropertiesBinary, #{}), + request(CorrelationId, {peer_properties, Props}); +parse_request(<<?REQUEST:1, ?COMMAND_SASL_HANDSHAKE:15, ?VERSION_1:16, + CorrelationId:32>>) -> + request(CorrelationId, sasl_handshake); +parse_request(<<?REQUEST:1, ?COMMAND_SASL_AUTHENTICATE:15, ?VERSION_1:16, + CorrelationId:32, + ?STRING(MechanismSize, Mechanism), + SaslFragment/binary>>) -> + SaslBin = + case SaslFragment of + <<(-1):32/signed>> -> + <<>>; + <<SaslBinaryLength:32, SaslBinary:SaslBinaryLength/binary>> -> + SaslBinary + end, + request(CorrelationId, + {sasl_authenticate, Mechanism, SaslBin}); +parse_request(<<?REQUEST:1, ?COMMAND_TUNE:15, ?VERSION_1:16, + FrameMax:32, Heartbeat:32>>) -> + %% NB: no correlatio id but uses the response bit + {tune, FrameMax, Heartbeat}; +parse_request(<<?REQUEST:1, ?COMMAND_OPEN:15, ?VERSION_1:16, + CorrelationId:32, + ?STRING(VhostSize, VirtualHost)>>) -> + request(CorrelationId, + {open, VirtualHost}); +parse_request(<<?REQUEST:1, ?COMMAND_CLOSE:15, ?VERSION_1:16, + CorrelationId:32, + CloseCode:16, + ?STRING(ReasonSize, Reason)>>) -> + request(CorrelationId, + {close, CloseCode, Reason}); +parse_request(<<?REQUEST:1, ?COMMAND_ROUTE:15, ?VERSION_1:16, + CorrelationId:32, + ?STRING(RKeySize, RoutingKey), + ?STRING(StreamSize, SuperStream)>>) -> + request(CorrelationId, + {route, RoutingKey, SuperStream}); +parse_request(<<?REQUEST:1, ?COMMAND_PARTITIONS:15, ?VERSION_1:16, + CorrelationId:32, + ?STRING(StreamSize, SuperStream)>>) -> + request(CorrelationId, + {partitions, SuperStream}); +parse_request(Bin) -> + % {partitions, SuperStream :: binary()}} | + exit({no, Bin}). + +parse_response(<<?RESPONSE:1, CommandId:15, ?VERSION_1:16, + CorrelationId:32, ResponseCode:16>>) -> + {response, CorrelationId, {parse_command_id(CommandId), ResponseCode}}; +parse_response(<<?RESPONSE:1, ?COMMAND_TUNE:15, ?VERSION_1:16, + FrameMax:32, Heartbeat:32>>) -> + {tune, FrameMax, Heartbeat}; +parse_response(<<?RESPONSE:1, CommandId:15, ?VERSION_1:16, + CorrelationId:32, Data/binary>>) -> + {response, CorrelationId, parse_response_body(CommandId, Data)}. + +parse_response_body(?COMMAND_QUERY_PUBLISHER_SEQUENCE, + <<ResponseCode:16, Sequence:64>>) -> + {query_publisher_sequence, ResponseCode, Sequence}; +parse_response_body(?COMMAND_QUERY_OFFSET, + <<ResponseCode:16, Offset:64>>) -> + {query_offset, ResponseCode, Offset}; +parse_response_body(?COMMAND_METADATA, <<NumNodes:32, Data/binary>>) -> + {NodesLookup, MetadataBin} = parse_nodes(Data, NumNodes, #{}), + Nodes = maps:values(NodesLookup), + Metadata = parse_meta(MetadataBin, NodesLookup, #{}), + {metadata, Nodes, Metadata}; +parse_response_body(?COMMAND_PEER_PROPERTIES, + <<ResponseCode:16, _Count:32, PropertiesBin/binary>>) -> + Props = parse_map(PropertiesBin, #{}), + {peer_properties, ResponseCode, Props}; +parse_response_body(?COMMAND_SASL_HANDSHAKE, + <<ResponseCode:16, _Count:32, MechanismsBin/binary>>) -> + Props = list_of_strings(MechanismsBin), + {peer_properties, ResponseCode, Props}; +parse_response_body(?COMMAND_SASL_AUTHENTICATE, + <<ResponseCode:16, ChallengeBin/binary>>) -> + Challenge = case ChallengeBin of + <<?STRING(CSize, Chall)>> -> + Chall; + <<>> -> + <<>> + end, + {sasl_authenticate, ResponseCode, Challenge}; +parse_response_body(?COMMAND_ROUTE, + <<ResponseCode:16, + ?STRING(StreamSize, Stream)>>) -> + {route, ResponseCode, Stream}; +parse_response_body(?COMMAND_PARTITIONS, + <<ResponseCode:16, + _Count:32, + PartitionsBin/binary>>) -> + Partitions = list_of_strings(PartitionsBin), + {partitions, ResponseCode, Partitions}. + +request(Corr, Cmd) -> + {request, Corr, Cmd}. + +parse_meta(<<>>, _Nodes, Acc) -> + Acc; +parse_meta(<<?STRING(StreamSize, Stream), + Code:16, + LeaderIndex:16, + ReplicaCount:32, + ReplicaIndexBin:(ReplicaCount * 16)/binary, + Rem/binary>>, Nodes, + Acc) -> + StreamDetail = case Code of + ?RESPONSE_CODE_OK -> + Leader = maps:get(LeaderIndex, Nodes), + Replicas = [maps:get(I, Nodes) || + I <- list_of_shorts(ReplicaIndexBin)], + {Leader, Replicas}; + ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST -> + stream_not_found; + ?RESPONSE_CODE_STREAM_NOT_AVAILABLE -> + stream_not_available + end, + parse_meta(Rem, Nodes, Acc#{Stream => StreamDetail}). + +parse_nodes(Rem, 0, Acc) -> + {Acc, Rem}; +parse_nodes(<<Index:16, + ?STRING(HostSize, Host), + Port:32, Rem/binary>>, C, Acc) -> + parse_nodes(Rem, C - 1, Acc#{Index => {Host, Port}}). + +parse_map(<<>>, Acc) -> + Acc; +parse_map(<<?STRING(KeySize, Key), + ?STRING(ValSize, Value), + Rem/binary>>, Acc) -> + parse_map(Rem, Acc#{Key => Value}). + +list_of_strings(<<>>) -> + []; +list_of_strings(<<?STRING(Size, String), Rem/binary>>) -> + [String | list_of_strings(Rem)]. + +list_of_longs(<<>>) -> + []; +list_of_longs(<<I:64, Rem/binary>>) -> + [I | list_of_longs(Rem)]. + +list_of_shorts(<<>>) -> + []; +list_of_shorts(<<I:16, Rem/binary>>) -> + [I | list_of_shorts(Rem)]. + +list_of_longcodes(<<>>) -> + []; +list_of_longcodes(<<I:64, C:16, Rem/binary>>) -> + [{I, C} | list_of_longcodes(Rem)]. + + +parse_command_id(?COMMAND_DECLARE_PUBLISHER) -> declare_publisher; +parse_command_id(?COMMAND_DELETE_PUBLISHER) -> delete_publisher; +parse_command_id(?COMMAND_SUBSCRIBE) -> subscribe; +parse_command_id(?COMMAND_UNSUBSCRIBE) -> unsubscribe; +parse_command_id(?COMMAND_CREATE_STREAM) -> create_stream; +parse_command_id(?COMMAND_DELETE_STREAM) -> delete_stream; +parse_command_id(?COMMAND_OPEN) -> open; +parse_command_id(?COMMAND_CLOSE) -> close. +command_id(declare_publisher) -> ?COMMAND_DECLARE_PUBLISHER; +command_id(delete_publisher) -> ?COMMAND_DELETE_PUBLISHER; +command_id(subscribe) -> ?COMMAND_SUBSCRIBE; +command_id(unsubscribe) -> ?COMMAND_UNSUBSCRIBE; +command_id(create_stream) -> ?COMMAND_CREATE_STREAM; +command_id(delete_stream) -> ?COMMAND_DELETE_STREAM; +command_id(open) -> ?COMMAND_OPEN; +command_id(close) -> ?COMMAND_CLOSE. -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 443f8ca80e..c7fbd84504 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -81,7 +81,7 @@ route(RoutingKey, VirtualHost, SuperStream) -> {route, RoutingKey, VirtualHost, SuperStream}). -spec partitions(binary(), binary()) -> - {ok, [binary()] | {error, stream_not_found}}. + {ok, [binary()]} | {error, stream_not_found}. partitions(VirtualHost, SuperStream) -> gen_server:call(?MODULE, {partitions, VirtualHost, SuperStream}). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 793a37f75c..86b459fe7f 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -41,7 +41,8 @@ stream :: stream(), counters :: atomics:atomics_ref()}). -record(stream_connection_state, - {data :: none | binary(), blocked :: boolean(), + {data :: rabbit_stream_core:state(), + blocked :: boolean(), consumers :: #{subscription_id() => #consumer{}}}). -record(stream_connection, {name :: binary(), @@ -198,7 +199,7 @@ init([KeepaliveSup, State = #stream_connection_state{consumers = #{}, blocked = false, - data = none}, + data = rabbit_stream_core:init(undefined)}, Transport:setopts(RealSocket, [{active, once}]), rabbit_alarm:register(self(), {?MODULE, resource_alarm, []}), @@ -789,7 +790,7 @@ listen_loop_post_close(Transport, = IsThereAlarm}, State, - Configuration); + <<>>); {OK, S, Data} -> Transport:setopts(S, [{active, once}]), {Connection1, State1} = @@ -846,61 +847,87 @@ handle_inbound_data_post_close(Transport, Connection, State, Rest) -> Rest, fun handle_frame_post_close/5). -handle_inbound_data(_Transport, - Connection, - State, - <<>>, - _HandleFrameFun) -> - {Connection, State}; -handle_inbound_data(Transport, - #stream_connection{frame_max = FrameMax} = Connection, - #stream_connection_state{data = none} = State, - <<Size:32, _Frame:Size/binary, _Rest/bits>>, - _HandleFrameFun) - when FrameMax /= 0 andalso Size > FrameMax - 4 -> - CloseReason = <<"frame too large">>, - CloseReasonLength = byte_size(CloseReason), - CloseFrame = - <<?REQUEST:1, - ?COMMAND_CLOSE:15, - ?VERSION_1:16, - 1:32, - ?RESPONSE_CODE_FRAME_TOO_LARGE:16, - CloseReasonLength:16, - CloseReason:CloseReasonLength/binary>>, - frame(Transport, Connection, CloseFrame), - {Connection#stream_connection{connection_step = close_sent}, State}; -handle_inbound_data(Transport, - Connection, - #stream_connection_state{data = none} = State, - <<Size:32, Frame:Size/binary, Rest/bits>>, - HandleFrameFun) -> - {Connection1, State1, Rest1} = - HandleFrameFun(Transport, Connection, State, Frame, Rest), - handle_inbound_data(Transport, - Connection1, - State1, - Rest1, - HandleFrameFun); -handle_inbound_data(_Transport, - Connection, - #stream_connection_state{data = none} = State, - Data, - _HandleFrameFun) -> - {Connection, State#stream_connection_state{data = Data}}; handle_inbound_data(Transport, Connection, - #stream_connection_state{data = Leftover} = State, + #stream_connection_state{data = CoreState0} = State, Data, HandleFrameFun) -> - State1 = State#stream_connection_state{data = none}, - %% FIXME avoid concatenation to avoid a new binary allocation - %% see osiris_replica:parse_chunk/3 - handle_inbound_data(Transport, - Connection, - State1, - <<Leftover/binary, Data/binary>>, - HandleFrameFun). + case rabbit_stream_core:incoming_data(Data, CoreState0) of + {CoreState, []} -> + % rabbit_log:info("NO FRAMES ", []), + {Connection, + State#stream_connection_state{data = CoreState}}; + {CoreState, Commands} -> + % rabbit_log:info("FRAMES ~w", [length(Frames)]), + {Connection1, State1, _} = + lists:foldl( + fun (Command, {C, S, R}) -> + HandleFrameFun(Transport, C, S, Command, R) + end, {Connection, + State#stream_connection_state{data = CoreState}, + <<>>}, Commands), + handle_inbound_data(Transport, + Connection1, + State1, + <<>>, + HandleFrameFun) + end. + +% handle_inbound_data(_Transport, +% Connection, +% State, +% <<>>, +% _HandleFrameFun) -> +% {Connection, State}; +% handle_inbound_data(Transport, +% #stream_connection{frame_max = FrameMax} = Connection, +% #stream_connection_state{data = none} = State, +% <<Size:32, _Frame:Size/binary, _Rest/bits>>, +% _HandleFrameFun) +% when FrameMax /= 0 andalso Size > FrameMax - 4 -> +% CloseReason = <<"frame too large">>, +% CloseReasonLength = byte_size(CloseReason), +% CloseFrame = +% <<?REQUEST:1, +% ?COMMAND_CLOSE:15, +% ?VERSION_1:16, +% 1:32, +% ?RESPONSE_CODE_FRAME_TOO_LARGE:16, +% CloseReasonLength:16, +% CloseReason:CloseReasonLength/binary>>, +% frame(Transport, Connection, CloseFrame), +% {Connection#stream_connection{connection_step = close_sent}, State}; +% handle_inbound_data(Transport, +% Connection, +% #stream_connection_state{data = none} = State, +% <<Size:32, Frame:Size/binary, Rest/bits>>, +% HandleFrameFun) -> +% {Connection1, State1, Rest1} = +% HandleFrameFun(Transport, Connection, State, Frame, Rest), +% handle_inbound_data(Transport, +% Connection1, +% State1, +% Rest1, +% HandleFrameFun); +% handle_inbound_data(_Transport, +% Connection, +% #stream_connection_state{data = none} = State, +% Data, +% _HandleFrameFun) -> +% {Connection, State#stream_connection_state{data = Data}}; +% handle_inbound_data(Transport, +% Connection, +% #stream_connection_state{data = Leftover} = State, +% Data, +% HandleFrameFun) -> +% State1 = State#stream_connection_state{data = none}, +% %% FIXME avoid concatenation to avoid a new binary allocation +% %% see osiris_replica:parse_chunk/3 +% handle_inbound_data(Transport, +% Connection, +% State1, +% <<Leftover/binary, Data/binary>>, +% HandleFrameFun). generate_publishing_error_details(Acc, _Code, <<>>) -> Acc; @@ -916,16 +943,9 @@ generate_publishing_error_details(Acc, Code, handle_frame_pre_auth(Transport, #stream_connection{socket = S} = Connection, State, - <<?REQUEST:1, - ?COMMAND_PEER_PROPERTIES:15, - ?VERSION_1:16, - CorrelationId:32, - ClientPropertiesCount:32, - ClientPropertiesFrame/binary>>, + {request, CorrelationId, + {peer_properties, ClientProperties}}, Rest) -> - {ClientProperties, _} = - rabbit_stream_utils:parse_map(ClientPropertiesFrame, - ClientPropertiesCount), {ok, Product} = application:get_key(rabbit, description), {ok, Version} = application:get_key(rabbit, vsn), @@ -984,10 +1004,7 @@ handle_frame_pre_auth(Transport, handle_frame_pre_auth(Transport, #stream_connection{socket = S} = Connection, State, - <<?REQUEST:1, - ?COMMAND_SASL_HANDSHAKE:15, - ?VERSION_1:16, - CorrelationId:32>>, + {request, CorrelationId, sasl_handshake}, Rest) -> Mechanisms = rabbit_stream_utils:auth_mechanisms(S), MechanismsFragment = @@ -1015,21 +1032,9 @@ handle_frame_pre_auth(Transport, host = Host} = Connection0, State, - <<?REQUEST:1, - ?COMMAND_SASL_AUTHENTICATE:15, - ?VERSION_1:16, - CorrelationId:32, - MechanismLength:16, - Mechanism:MechanismLength/binary, - SaslFragment/binary>>, + {request, CorrelationId, + {sasl_authenticate, Mechanism, SaslBin}}, Rest) -> - SaslBin = - case SaslFragment of - <<(-1):32/signed>> -> - <<>>; - <<SaslBinaryLength:32, SaslBinary:SaslBinaryLength/binary>> -> - SaslBinary - end, {Connection1, Rest1} = case rabbit_stream_utils:auth_mechanism_to_module(Mechanism, S) of @@ -1137,11 +1142,7 @@ handle_frame_pre_auth(_Transport, name = ConnectionName} = Connection, State, - <<?RESPONSE:1, - ?COMMAND_TUNE:15, - ?VERSION_1:16, - FrameMax:32, - Heartbeat:32>>, + {tune, FrameMax, Heartbeat}, Rest) -> rabbit_log:debug("Tuning response ~p ~p ", [FrameMax, Heartbeat]), Parent = self(), @@ -1172,12 +1173,8 @@ handle_frame_pre_auth(_Transport, handle_frame_pre_auth(Transport, #stream_connection{user = User, socket = S} = Connection, State, - <<?REQUEST:1, - ?COMMAND_OPEN:15, - ?VERSION_1:16, - CorrelationId:32, - VirtualHostLength:16, - VirtualHost:VirtualHostLength/binary>>, + {request, CorrelationId, + {open, VirtualHost}}, Rest) -> %% FIXME enforce connection limit (see rabbit_reader:is_over_connection_limit/2) {Connection1, Frame} = @@ -1211,13 +1208,13 @@ handle_frame_pre_auth(Transport, handle_frame_pre_auth(_Transport, Connection, State, - <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>, + heartbeat, Rest) -> rabbit_log:info("Received heartbeat frame pre auth~n"), {Connection, State, Rest}; -handle_frame_pre_auth(_Transport, Connection, State, Frame, Rest) -> - rabbit_log:warning("unknown frame ~p ~p, closing connection.", - [Frame, Rest]), +handle_frame_pre_auth(_Transport, Connection, State, Command, Rest) -> + rabbit_log:warning("unknown command ~p ~p, closing connection.", + [Command, Rest]), {Connection#stream_connection{connection_step = failure}, State, Rest}. @@ -1256,15 +1253,8 @@ notify_auth_result(Username, handle_frame_post_auth(Transport, #stream_connection{resource_alarm = true} = Connection0, State, - <<?REQUEST:1, - ?COMMAND_DECLARE_PUBLISHER:15, - ?VERSION_1:16, - CorrelationId:32, - PublisherId:8, - ReferenceSize:16, - _Reference:ReferenceSize/binary, - StreamSize:16, - Stream:StreamSize/binary>>, + {request, CorrelationId, + {declare_publisher, PublisherId, _WriterRef, Stream}}, Rest) -> rabbit_log:info("Cannot create publisher ~p on stream ~p, connection " "is blocked because of resource alarm", @@ -1282,15 +1272,8 @@ handle_frame_post_auth(Transport, resource_alarm = false} = Connection0, State, - <<?REQUEST:1, - ?COMMAND_DECLARE_PUBLISHER:15, - ?VERSION_1:16, - CorrelationId:32, - PublisherId:8, - ReferenceSize:16, - Reference:ReferenceSize/binary, - StreamSize:16, - Stream:StreamSize/binary>>, + {request, CorrelationId, + {declare_publisher, PublisherId, WriterRef, Stream}}, Rest) -> case rabbit_stream_utils:check_write_permitted(stream_r(Stream, Connection0), @@ -1298,7 +1281,7 @@ handle_frame_post_auth(Transport, of ok -> case {maps:is_key(PublisherId, Publishers0), - maps:is_key({Stream, Reference}, RefIds0)} + maps:is_key({Stream, WriterRef}, RefIds0)} of {false, false} -> case lookup_leader(Stream, Connection0) of @@ -1314,12 +1297,12 @@ handle_frame_post_auth(Transport, publisher_to_ids = RefIds0} = Connection1} -> {PublisherReference, RefIds1} = - case Reference of + case WriterRef of <<"">> -> {undefined, RefIds0}; _ -> - {Reference, - RefIds0#{{Stream, Reference} => + {WriterRef, + RefIds0#{{Stream, WriterRef} => PublisherId}} end, Publisher = @@ -1372,12 +1355,7 @@ handle_frame_post_auth(Transport, publishers = Publishers} = Connection, State, - <<?REQUEST:1, - ?COMMAND_PUBLISH:15, - ?VERSION_1:16, - PublisherId:8/unsigned, - MessageCount:32, - Messages/binary>>, + {publish, PublisherId, MessageCount, Messages}, Rest) -> case Publishers of #{PublisherId := Publisher} -> @@ -1442,14 +1420,8 @@ handle_frame_post_auth(Transport, user = User} = Connection, State, - <<?REQUEST:1, - ?COMMAND_QUERY_PUBLISHER_SEQUENCE:15, - ?VERSION_1:16, - CorrelationId:32, - ReferenceSize:16, - Reference:ReferenceSize/binary, - StreamSize:16, - Stream:StreamSize/binary>>, + {request, CorrelationId, + {query_publisher_sequence, Reference, Stream}}, Rest) -> FrameSize = ?RESPONSE_FRAME_SIZE + 8, {ResponseCode, Sequence} = @@ -1492,11 +1464,8 @@ handle_frame_post_auth(Transport, publisher_to_ids = PubToIds} = Connection0, State, - <<?REQUEST:1, - ?COMMAND_DELETE_PUBLISHER:15, - ?VERSION_1:16, - CorrelationId:32, - PublisherId:8>>, + {request, CorrelationId, + {delete_publisher, PublisherId}}, Rest) -> case Publishers of #{PublisherId := #publisher{stream = Stream, reference = Ref}} -> @@ -1536,15 +1505,9 @@ handle_frame_post_auth(Transport, send_file_oct = SendFileOct} = Connection, #stream_connection_state{consumers = Consumers} = State, - <<?REQUEST:1, - ?COMMAND_SUBSCRIBE:15, - ?VERSION_1:16, - CorrelationId:32, - SubscriptionId:8/unsigned, - StreamSize:16, - Stream:StreamSize/binary, - OffsetType:16/signed, - OffsetAndCredit/binary>>, + + {request, CorrelationId, + {subscribe, SubscriptionId, Stream, OffsetSpec, Credit}}, Rest) -> %% FIXME check the max number of subs is not reached already case rabbit_stream_utils:check_read_permitted(#resource{name = Stream, @@ -1582,26 +1545,6 @@ handle_frame_post_auth(Transport, ?RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS), {Connection, State, Rest}; false -> - {OffsetSpec, Credit} = - case OffsetType of - ?OFFSET_TYPE_FIRST -> - <<Crdt:16>> = OffsetAndCredit, - {first, Crdt}; - ?OFFSET_TYPE_LAST -> - <<Crdt:16>> = OffsetAndCredit, - {last, Crdt}; - ?OFFSET_TYPE_NEXT -> - <<Crdt:16>> = OffsetAndCredit, - {next, Crdt}; - ?OFFSET_TYPE_OFFSET -> - <<Offset:64/unsigned, Crdt:16>> = - OffsetAndCredit, - {Offset, Crdt}; - ?OFFSET_TYPE_TIMESTAMP -> - <<Timestamp:64/signed, Crdt:16>> = - OffsetAndCredit, - {{timestamp, Timestamp}, Crdt} - end, rabbit_log:info("Creating subscription ~p to ~p, with offset specificat" "ion ~p", [SubscriptionId, Stream, @@ -1695,11 +1638,7 @@ handle_frame_post_auth(Transport, send_file_oct = SendFileOct} = Connection, #stream_connection_state{consumers = Consumers} = State, - <<?REQUEST:1, - ?COMMAND_CREDIT:15, - ?VERSION_1:16, - SubscriptionId:8/unsigned, - Credit:16/signed>>, + {credit, SubscriptionId, Credit}, Rest) -> case Consumers of #{SubscriptionId := Consumer} -> @@ -1752,15 +1691,7 @@ handle_frame_post_auth(_Transport, user = User} = Connection, State, - <<?REQUEST:1, - ?COMMAND_COMMIT_OFFSET:15, - ?VERSION_1:16, - _CorrelationId:32, - ReferenceSize:16, - Reference:ReferenceSize/binary, - StreamSize:16, - Stream:StreamSize/binary, - Offset:64>>, + {commit_offset, Reference, Stream, Offset}, Rest) -> case rabbit_stream_utils:check_write_permitted(#resource{name = Stream, @@ -1776,12 +1707,12 @@ handle_frame_post_auth(_Transport, [Stream]), %% FIXME commit offset is fire-and-forget, so no response even if error, change this? {Connection, State, Rest}; - undefined -> - rabbit_log:warning("Could not find leader (undefined) to commit offset " - "on ~p", - [Stream]), - %% FIXME commit offset is fire-and-forget, so no response even if error, change this? - {Connection, State, Rest}; + % undefined -> + % rabbit_log:warning("Could not find leader (undefined) to commit offset " + % "on ~p", + % [Stream]), + % %% FIXME commit offset is fire-and-forget, so no response even if error, change this? + % {Connection, State, Rest}; {ClusterLeader, Connection1} -> osiris:write_tracking(ClusterLeader, Reference, Offset), {Connection1, State, Rest} @@ -1797,14 +1728,8 @@ handle_frame_post_auth(Transport, user = User} = Connection0, State, - <<?REQUEST:1, - ?COMMAND_QUERY_OFFSET:15, - ?VERSION_1:16, - CorrelationId:32, - ReferenceSize:16, - Reference:ReferenceSize/binary, - StreamSize:16, - Stream:StreamSize/binary>>, + {request, CorrelationId, + {query_offset, Reference, Stream}}, Rest) -> FrameSize = ?RESPONSE_FRAME_SIZE + 8, {ResponseCode, Offset, Connection1} = @@ -1846,11 +1771,8 @@ handle_frame_post_auth(Transport, StreamSubscriptions} = Connection, #stream_connection_state{} = State, - <<?REQUEST:1, - ?COMMAND_UNSUBSCRIBE:15, - ?VERSION_1:16, - CorrelationId:32, - SubscriptionId:8/unsigned>>, + {request, CorrelationId, + {unsubscribe, SubscriptionId}}, Rest) -> case subscription_exists(StreamSubscriptions, SubscriptionId) of false -> @@ -1876,19 +1798,11 @@ handle_frame_post_auth(Transport, User} = Connection, State, - <<?REQUEST:1, - ?COMMAND_CREATE_STREAM:15, - ?VERSION_1:16, - CorrelationId:32, - StreamSize:16, - Stream:StreamSize/binary, - ArgumentsCount:32, - ArgumentsBinary/binary>>, + {request, CorrelationId, + {create_stream, Stream, Arguments}}, Rest) -> case rabbit_stream_utils:enforce_correct_stream_name(Stream) of {ok, StreamName} -> - {Arguments, _Rest} = - rabbit_stream_utils:parse_map(ArgumentsBinary, ArgumentsCount), case rabbit_stream_utils:check_configure_permitted(#resource{name = StreamName, kind = @@ -1961,12 +1875,8 @@ handle_frame_post_auth(Transport, User} = Connection, State, - <<?REQUEST:1, - ?COMMAND_DELETE_STREAM:15, - ?VERSION_1:16, - CorrelationId:32, - StreamSize:16, - Stream:StreamSize/binary>>, + {request, CorrelationId, + {delete_stream, Stream}}, Rest) -> case rabbit_stream_utils:check_configure_permitted(#resource{name = Stream, @@ -2025,14 +1935,9 @@ handle_frame_post_auth(Transport, virtual_host = VirtualHost} = Connection, State, - <<?REQUEST:1, - ?COMMAND_METADATA:15, - ?VERSION_1:16, - CorrelationId:32, - StreamCount:32, - BinaryStreams/binary>>, + {request, CorrelationId, {metadata, Streams}}, Rest) -> - Streams = rabbit_stream_utils:extract_stream_list(BinaryStreams, []), + StreamCount = length(Streams), Topology = lists:foldl(fun(Stream, Acc) -> @@ -2171,13 +2076,8 @@ handle_frame_post_auth(Transport, virtual_host = VirtualHost} = Connection, State, - <<?COMMAND_ROUTE:16, - ?VERSION_1:16, - CorrelationId:32, - RoutingKeySize:16, - RoutingKey:RoutingKeySize/binary, - SuperStreamSize:16, - SuperStream:SuperStreamSize/binary>>, + {request, CorrelationId, + {route, RoutingKey, SuperStream}}, Rest) -> {ResponseCode, StreamBin} = case rabbit_stream_manager:route(RoutingKey, VirtualHost, SuperStream) @@ -2206,11 +2106,8 @@ handle_frame_post_auth(Transport, virtual_host = VirtualHost} = Connection, State, - <<?COMMAND_PARTITIONS:16, - ?VERSION_1:16, - CorrelationId:32, - SuperStreamSize:16, - SuperStream:SuperStreamSize/binary>>, + {request, CorrelationId, + {partitions, SuperStream}}, Rest) -> {ResponseCode, PartitionsBin} = case rabbit_stream_manager:partitions(VirtualHost, SuperStream) of @@ -2241,13 +2138,8 @@ handle_frame_post_auth(Transport, handle_frame_post_auth(Transport, Connection, State, - <<?REQUEST:1, - ?COMMAND_CLOSE:15, - ?VERSION_1:16, - CorrelationId:32, - ClosingCode:16, - ClosingReasonLength:16, - ClosingReason:ClosingReasonLength/binary>>, + {request, CorrelationId, + {close, ClosingCode, ClosingReason}}, _Rest) -> rabbit_log:info("Received close command ~p ~p", [ClosingCode, ClosingReason]), @@ -2263,13 +2155,13 @@ handle_frame_post_auth(Transport, handle_frame_post_auth(_Transport, Connection, State, - <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>, + heartbeat, Rest) -> rabbit_log:info("Received heartbeat frame post auth~n"), {Connection, State, Rest}; -handle_frame_post_auth(Transport, Connection, State, Frame, Rest) -> - rabbit_log:warning("unknown frame ~p ~p, sending close command.", - [Frame, Rest]), +handle_frame_post_auth(Transport, Connection, State, Command, Rest) -> + rabbit_log:warning("unknown command ~p ~p, sending close command.", + [Command, Rest]), CloseReason = <<"unknown frame">>, CloseReasonLength = byte_size(CloseReason), CloseFrame = @@ -2311,11 +2203,12 @@ notify_connection_closed(#stream_connection{name = Name, handle_frame_post_close(_Transport, Connection, State, - <<?RESPONSE:1, - ?COMMAND_CLOSE:15, - ?VERSION_1:16, - _CorrelationId:32, - _ResponseCode:16>>, + {response, _CorrelationId, {close, _Code}}, + % <<?RESPONSE:1, + % ?COMMAND_CLOSE:15, + % ?VERSION_1:16, + % _CorrelationId:32, + % _ResponseCode:16>>, Rest) -> rabbit_log:info("Received close confirmation~n"), {Connection#stream_connection{connection_step = closing_done}, State, @@ -2323,12 +2216,12 @@ handle_frame_post_close(_Transport, handle_frame_post_close(_Transport, Connection, State, - <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>, + heartbeat, Rest) -> - rabbit_log:info("Received heartbeat frame post close~n"), + rabbit_log:info("Received heartbeat command post close~n"), {Connection, State, Rest}; -handle_frame_post_close(_Transport, Connection, State, Frame, Rest) -> - rabbit_log:warning("ignored frame on close ~p ~p.", [Frame, Rest]), +handle_frame_post_close(_Transport, Connection, State, Command, Rest) -> + rabbit_log:warning("ignored command on close ~p ~p.", [Command, Rest]), {Connection, State, Rest}. stream_r(Stream, #stream_connection{virtual_host = VHost}) -> |