diff options
Diffstat (limited to 'deps/rabbitmq_stream/src/rabbit_stream_reader.erl')
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 401 |
1 files changed, 147 insertions, 254 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 793a37f75c..86b459fe7f 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -41,7 +41,8 @@ stream :: stream(), counters :: atomics:atomics_ref()}). -record(stream_connection_state, - {data :: none | binary(), blocked :: boolean(), + {data :: rabbit_stream_core:state(), + blocked :: boolean(), consumers :: #{subscription_id() => #consumer{}}}). -record(stream_connection, {name :: binary(), @@ -198,7 +199,7 @@ init([KeepaliveSup, State = #stream_connection_state{consumers = #{}, blocked = false, - data = none}, + data = rabbit_stream_core:init(undefined)}, Transport:setopts(RealSocket, [{active, once}]), rabbit_alarm:register(self(), {?MODULE, resource_alarm, []}), @@ -789,7 +790,7 @@ listen_loop_post_close(Transport, = IsThereAlarm}, State, - Configuration); + <<>>); {OK, S, Data} -> Transport:setopts(S, [{active, once}]), {Connection1, State1} = @@ -846,61 +847,87 @@ handle_inbound_data_post_close(Transport, Connection, State, Rest) -> Rest, fun handle_frame_post_close/5). -handle_inbound_data(_Transport, - Connection, - State, - <<>>, - _HandleFrameFun) -> - {Connection, State}; -handle_inbound_data(Transport, - #stream_connection{frame_max = FrameMax} = Connection, - #stream_connection_state{data = none} = State, - <<Size:32, _Frame:Size/binary, _Rest/bits>>, - _HandleFrameFun) - when FrameMax /= 0 andalso Size > FrameMax - 4 -> - CloseReason = <<"frame too large">>, - CloseReasonLength = byte_size(CloseReason), - CloseFrame = - <<?REQUEST:1, - ?COMMAND_CLOSE:15, - ?VERSION_1:16, - 1:32, - ?RESPONSE_CODE_FRAME_TOO_LARGE:16, - CloseReasonLength:16, - CloseReason:CloseReasonLength/binary>>, - frame(Transport, Connection, CloseFrame), - {Connection#stream_connection{connection_step = close_sent}, State}; -handle_inbound_data(Transport, - Connection, - #stream_connection_state{data = none} = State, - <<Size:32, Frame:Size/binary, Rest/bits>>, - HandleFrameFun) -> - {Connection1, State1, Rest1} = - HandleFrameFun(Transport, Connection, State, Frame, Rest), - handle_inbound_data(Transport, - Connection1, - State1, - Rest1, - HandleFrameFun); -handle_inbound_data(_Transport, - Connection, - #stream_connection_state{data = none} = State, - Data, - _HandleFrameFun) -> - {Connection, State#stream_connection_state{data = Data}}; handle_inbound_data(Transport, Connection, - #stream_connection_state{data = Leftover} = State, + #stream_connection_state{data = CoreState0} = State, Data, HandleFrameFun) -> - State1 = State#stream_connection_state{data = none}, - %% FIXME avoid concatenation to avoid a new binary allocation - %% see osiris_replica:parse_chunk/3 - handle_inbound_data(Transport, - Connection, - State1, - <<Leftover/binary, Data/binary>>, - HandleFrameFun). + case rabbit_stream_core:incoming_data(Data, CoreState0) of + {CoreState, []} -> + % rabbit_log:info("NO FRAMES ", []), + {Connection, + State#stream_connection_state{data = CoreState}}; + {CoreState, Commands} -> + % rabbit_log:info("FRAMES ~w", [length(Frames)]), + {Connection1, State1, _} = + lists:foldl( + fun (Command, {C, S, R}) -> + HandleFrameFun(Transport, C, S, Command, R) + end, {Connection, + State#stream_connection_state{data = CoreState}, + <<>>}, Commands), + handle_inbound_data(Transport, + Connection1, + State1, + <<>>, + HandleFrameFun) + end. + +% handle_inbound_data(_Transport, +% Connection, +% State, +% <<>>, +% _HandleFrameFun) -> +% {Connection, State}; +% handle_inbound_data(Transport, +% #stream_connection{frame_max = FrameMax} = Connection, +% #stream_connection_state{data = none} = State, +% <<Size:32, _Frame:Size/binary, _Rest/bits>>, +% _HandleFrameFun) +% when FrameMax /= 0 andalso Size > FrameMax - 4 -> +% CloseReason = <<"frame too large">>, +% CloseReasonLength = byte_size(CloseReason), +% CloseFrame = +% <<?REQUEST:1, +% ?COMMAND_CLOSE:15, +% ?VERSION_1:16, +% 1:32, +% ?RESPONSE_CODE_FRAME_TOO_LARGE:16, +% CloseReasonLength:16, +% CloseReason:CloseReasonLength/binary>>, +% frame(Transport, Connection, CloseFrame), +% {Connection#stream_connection{connection_step = close_sent}, State}; +% handle_inbound_data(Transport, +% Connection, +% #stream_connection_state{data = none} = State, +% <<Size:32, Frame:Size/binary, Rest/bits>>, +% HandleFrameFun) -> +% {Connection1, State1, Rest1} = +% HandleFrameFun(Transport, Connection, State, Frame, Rest), +% handle_inbound_data(Transport, +% Connection1, +% State1, +% Rest1, +% HandleFrameFun); +% handle_inbound_data(_Transport, +% Connection, +% #stream_connection_state{data = none} = State, +% Data, +% _HandleFrameFun) -> +% {Connection, State#stream_connection_state{data = Data}}; +% handle_inbound_data(Transport, +% Connection, +% #stream_connection_state{data = Leftover} = State, +% Data, +% HandleFrameFun) -> +% State1 = State#stream_connection_state{data = none}, +% %% FIXME avoid concatenation to avoid a new binary allocation +% %% see osiris_replica:parse_chunk/3 +% handle_inbound_data(Transport, +% Connection, +% State1, +% <<Leftover/binary, Data/binary>>, +% HandleFrameFun). generate_publishing_error_details(Acc, _Code, <<>>) -> Acc; @@ -916,16 +943,9 @@ generate_publishing_error_details(Acc, Code, handle_frame_pre_auth(Transport, #stream_connection{socket = S} = Connection, State, - <<?REQUEST:1, - ?COMMAND_PEER_PROPERTIES:15, - ?VERSION_1:16, - CorrelationId:32, - ClientPropertiesCount:32, - ClientPropertiesFrame/binary>>, + {request, CorrelationId, + {peer_properties, ClientProperties}}, Rest) -> - {ClientProperties, _} = - rabbit_stream_utils:parse_map(ClientPropertiesFrame, - ClientPropertiesCount), {ok, Product} = application:get_key(rabbit, description), {ok, Version} = application:get_key(rabbit, vsn), @@ -984,10 +1004,7 @@ handle_frame_pre_auth(Transport, handle_frame_pre_auth(Transport, #stream_connection{socket = S} = Connection, State, - <<?REQUEST:1, - ?COMMAND_SASL_HANDSHAKE:15, - ?VERSION_1:16, - CorrelationId:32>>, + {request, CorrelationId, sasl_handshake}, Rest) -> Mechanisms = rabbit_stream_utils:auth_mechanisms(S), MechanismsFragment = @@ -1015,21 +1032,9 @@ handle_frame_pre_auth(Transport, host = Host} = Connection0, State, - <<?REQUEST:1, - ?COMMAND_SASL_AUTHENTICATE:15, - ?VERSION_1:16, - CorrelationId:32, - MechanismLength:16, - Mechanism:MechanismLength/binary, - SaslFragment/binary>>, + {request, CorrelationId, + {sasl_authenticate, Mechanism, SaslBin}}, Rest) -> - SaslBin = - case SaslFragment of - <<(-1):32/signed>> -> - <<>>; - <<SaslBinaryLength:32, SaslBinary:SaslBinaryLength/binary>> -> - SaslBinary - end, {Connection1, Rest1} = case rabbit_stream_utils:auth_mechanism_to_module(Mechanism, S) of @@ -1137,11 +1142,7 @@ handle_frame_pre_auth(_Transport, name = ConnectionName} = Connection, State, - <<?RESPONSE:1, - ?COMMAND_TUNE:15, - ?VERSION_1:16, - FrameMax:32, - Heartbeat:32>>, + {tune, FrameMax, Heartbeat}, Rest) -> rabbit_log:debug("Tuning response ~p ~p ", [FrameMax, Heartbeat]), Parent = self(), @@ -1172,12 +1173,8 @@ handle_frame_pre_auth(_Transport, handle_frame_pre_auth(Transport, #stream_connection{user = User, socket = S} = Connection, State, - <<?REQUEST:1, - ?COMMAND_OPEN:15, - ?VERSION_1:16, - CorrelationId:32, - VirtualHostLength:16, - VirtualHost:VirtualHostLength/binary>>, + {request, CorrelationId, + {open, VirtualHost}}, Rest) -> %% FIXME enforce connection limit (see rabbit_reader:is_over_connection_limit/2) {Connection1, Frame} = @@ -1211,13 +1208,13 @@ handle_frame_pre_auth(Transport, handle_frame_pre_auth(_Transport, Connection, State, - <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>, + heartbeat, Rest) -> rabbit_log:info("Received heartbeat frame pre auth~n"), {Connection, State, Rest}; -handle_frame_pre_auth(_Transport, Connection, State, Frame, Rest) -> - rabbit_log:warning("unknown frame ~p ~p, closing connection.", - [Frame, Rest]), +handle_frame_pre_auth(_Transport, Connection, State, Command, Rest) -> + rabbit_log:warning("unknown command ~p ~p, closing connection.", + [Command, Rest]), {Connection#stream_connection{connection_step = failure}, State, Rest}. @@ -1256,15 +1253,8 @@ notify_auth_result(Username, handle_frame_post_auth(Transport, #stream_connection{resource_alarm = true} = Connection0, State, - <<?REQUEST:1, - ?COMMAND_DECLARE_PUBLISHER:15, - ?VERSION_1:16, - CorrelationId:32, - PublisherId:8, - ReferenceSize:16, - _Reference:ReferenceSize/binary, - StreamSize:16, - Stream:StreamSize/binary>>, + {request, CorrelationId, + {declare_publisher, PublisherId, _WriterRef, Stream}}, Rest) -> rabbit_log:info("Cannot create publisher ~p on stream ~p, connection " "is blocked because of resource alarm", @@ -1282,15 +1272,8 @@ handle_frame_post_auth(Transport, resource_alarm = false} = Connection0, State, - <<?REQUEST:1, - ?COMMAND_DECLARE_PUBLISHER:15, - ?VERSION_1:16, - CorrelationId:32, - PublisherId:8, - ReferenceSize:16, - Reference:ReferenceSize/binary, - StreamSize:16, - Stream:StreamSize/binary>>, + {request, CorrelationId, + {declare_publisher, PublisherId, WriterRef, Stream}}, Rest) -> case rabbit_stream_utils:check_write_permitted(stream_r(Stream, Connection0), @@ -1298,7 +1281,7 @@ handle_frame_post_auth(Transport, of ok -> case {maps:is_key(PublisherId, Publishers0), - maps:is_key({Stream, Reference}, RefIds0)} + maps:is_key({Stream, WriterRef}, RefIds0)} of {false, false} -> case lookup_leader(Stream, Connection0) of @@ -1314,12 +1297,12 @@ handle_frame_post_auth(Transport, publisher_to_ids = RefIds0} = Connection1} -> {PublisherReference, RefIds1} = - case Reference of + case WriterRef of <<"">> -> {undefined, RefIds0}; _ -> - {Reference, - RefIds0#{{Stream, Reference} => + {WriterRef, + RefIds0#{{Stream, WriterRef} => PublisherId}} end, Publisher = @@ -1372,12 +1355,7 @@ handle_frame_post_auth(Transport, publishers = Publishers} = Connection, State, - <<?REQUEST:1, - ?COMMAND_PUBLISH:15, - ?VERSION_1:16, - PublisherId:8/unsigned, - MessageCount:32, - Messages/binary>>, + {publish, PublisherId, MessageCount, Messages}, Rest) -> case Publishers of #{PublisherId := Publisher} -> @@ -1442,14 +1420,8 @@ handle_frame_post_auth(Transport, user = User} = Connection, State, - <<?REQUEST:1, - ?COMMAND_QUERY_PUBLISHER_SEQUENCE:15, - ?VERSION_1:16, - CorrelationId:32, - ReferenceSize:16, - Reference:ReferenceSize/binary, - StreamSize:16, - Stream:StreamSize/binary>>, + {request, CorrelationId, + {query_publisher_sequence, Reference, Stream}}, Rest) -> FrameSize = ?RESPONSE_FRAME_SIZE + 8, {ResponseCode, Sequence} = @@ -1492,11 +1464,8 @@ handle_frame_post_auth(Transport, publisher_to_ids = PubToIds} = Connection0, State, - <<?REQUEST:1, - ?COMMAND_DELETE_PUBLISHER:15, - ?VERSION_1:16, - CorrelationId:32, - PublisherId:8>>, + {request, CorrelationId, + {delete_publisher, PublisherId}}, Rest) -> case Publishers of #{PublisherId := #publisher{stream = Stream, reference = Ref}} -> @@ -1536,15 +1505,9 @@ handle_frame_post_auth(Transport, send_file_oct = SendFileOct} = Connection, #stream_connection_state{consumers = Consumers} = State, - <<?REQUEST:1, - ?COMMAND_SUBSCRIBE:15, - ?VERSION_1:16, - CorrelationId:32, - SubscriptionId:8/unsigned, - StreamSize:16, - Stream:StreamSize/binary, - OffsetType:16/signed, - OffsetAndCredit/binary>>, + + {request, CorrelationId, + {subscribe, SubscriptionId, Stream, OffsetSpec, Credit}}, Rest) -> %% FIXME check the max number of subs is not reached already case rabbit_stream_utils:check_read_permitted(#resource{name = Stream, @@ -1582,26 +1545,6 @@ handle_frame_post_auth(Transport, ?RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS), {Connection, State, Rest}; false -> - {OffsetSpec, Credit} = - case OffsetType of - ?OFFSET_TYPE_FIRST -> - <<Crdt:16>> = OffsetAndCredit, - {first, Crdt}; - ?OFFSET_TYPE_LAST -> - <<Crdt:16>> = OffsetAndCredit, - {last, Crdt}; - ?OFFSET_TYPE_NEXT -> - <<Crdt:16>> = OffsetAndCredit, - {next, Crdt}; - ?OFFSET_TYPE_OFFSET -> - <<Offset:64/unsigned, Crdt:16>> = - OffsetAndCredit, - {Offset, Crdt}; - ?OFFSET_TYPE_TIMESTAMP -> - <<Timestamp:64/signed, Crdt:16>> = - OffsetAndCredit, - {{timestamp, Timestamp}, Crdt} - end, rabbit_log:info("Creating subscription ~p to ~p, with offset specificat" "ion ~p", [SubscriptionId, Stream, @@ -1695,11 +1638,7 @@ handle_frame_post_auth(Transport, send_file_oct = SendFileOct} = Connection, #stream_connection_state{consumers = Consumers} = State, - <<?REQUEST:1, - ?COMMAND_CREDIT:15, - ?VERSION_1:16, - SubscriptionId:8/unsigned, - Credit:16/signed>>, + {credit, SubscriptionId, Credit}, Rest) -> case Consumers of #{SubscriptionId := Consumer} -> @@ -1752,15 +1691,7 @@ handle_frame_post_auth(_Transport, user = User} = Connection, State, - <<?REQUEST:1, - ?COMMAND_COMMIT_OFFSET:15, - ?VERSION_1:16, - _CorrelationId:32, - ReferenceSize:16, - Reference:ReferenceSize/binary, - StreamSize:16, - Stream:StreamSize/binary, - Offset:64>>, + {commit_offset, Reference, Stream, Offset}, Rest) -> case rabbit_stream_utils:check_write_permitted(#resource{name = Stream, @@ -1776,12 +1707,12 @@ handle_frame_post_auth(_Transport, [Stream]), %% FIXME commit offset is fire-and-forget, so no response even if error, change this? {Connection, State, Rest}; - undefined -> - rabbit_log:warning("Could not find leader (undefined) to commit offset " - "on ~p", - [Stream]), - %% FIXME commit offset is fire-and-forget, so no response even if error, change this? - {Connection, State, Rest}; + % undefined -> + % rabbit_log:warning("Could not find leader (undefined) to commit offset " + % "on ~p", + % [Stream]), + % %% FIXME commit offset is fire-and-forget, so no response even if error, change this? + % {Connection, State, Rest}; {ClusterLeader, Connection1} -> osiris:write_tracking(ClusterLeader, Reference, Offset), {Connection1, State, Rest} @@ -1797,14 +1728,8 @@ handle_frame_post_auth(Transport, user = User} = Connection0, State, - <<?REQUEST:1, - ?COMMAND_QUERY_OFFSET:15, - ?VERSION_1:16, - CorrelationId:32, - ReferenceSize:16, - Reference:ReferenceSize/binary, - StreamSize:16, - Stream:StreamSize/binary>>, + {request, CorrelationId, + {query_offset, Reference, Stream}}, Rest) -> FrameSize = ?RESPONSE_FRAME_SIZE + 8, {ResponseCode, Offset, Connection1} = @@ -1846,11 +1771,8 @@ handle_frame_post_auth(Transport, StreamSubscriptions} = Connection, #stream_connection_state{} = State, - <<?REQUEST:1, - ?COMMAND_UNSUBSCRIBE:15, - ?VERSION_1:16, - CorrelationId:32, - SubscriptionId:8/unsigned>>, + {request, CorrelationId, + {unsubscribe, SubscriptionId}}, Rest) -> case subscription_exists(StreamSubscriptions, SubscriptionId) of false -> @@ -1876,19 +1798,11 @@ handle_frame_post_auth(Transport, User} = Connection, State, - <<?REQUEST:1, - ?COMMAND_CREATE_STREAM:15, - ?VERSION_1:16, - CorrelationId:32, - StreamSize:16, - Stream:StreamSize/binary, - ArgumentsCount:32, - ArgumentsBinary/binary>>, + {request, CorrelationId, + {create_stream, Stream, Arguments}}, Rest) -> case rabbit_stream_utils:enforce_correct_stream_name(Stream) of {ok, StreamName} -> - {Arguments, _Rest} = - rabbit_stream_utils:parse_map(ArgumentsBinary, ArgumentsCount), case rabbit_stream_utils:check_configure_permitted(#resource{name = StreamName, kind = @@ -1961,12 +1875,8 @@ handle_frame_post_auth(Transport, User} = Connection, State, - <<?REQUEST:1, - ?COMMAND_DELETE_STREAM:15, - ?VERSION_1:16, - CorrelationId:32, - StreamSize:16, - Stream:StreamSize/binary>>, + {request, CorrelationId, + {delete_stream, Stream}}, Rest) -> case rabbit_stream_utils:check_configure_permitted(#resource{name = Stream, @@ -2025,14 +1935,9 @@ handle_frame_post_auth(Transport, virtual_host = VirtualHost} = Connection, State, - <<?REQUEST:1, - ?COMMAND_METADATA:15, - ?VERSION_1:16, - CorrelationId:32, - StreamCount:32, - BinaryStreams/binary>>, + {request, CorrelationId, {metadata, Streams}}, Rest) -> - Streams = rabbit_stream_utils:extract_stream_list(BinaryStreams, []), + StreamCount = length(Streams), Topology = lists:foldl(fun(Stream, Acc) -> @@ -2171,13 +2076,8 @@ handle_frame_post_auth(Transport, virtual_host = VirtualHost} = Connection, State, - <<?COMMAND_ROUTE:16, - ?VERSION_1:16, - CorrelationId:32, - RoutingKeySize:16, - RoutingKey:RoutingKeySize/binary, - SuperStreamSize:16, - SuperStream:SuperStreamSize/binary>>, + {request, CorrelationId, + {route, RoutingKey, SuperStream}}, Rest) -> {ResponseCode, StreamBin} = case rabbit_stream_manager:route(RoutingKey, VirtualHost, SuperStream) @@ -2206,11 +2106,8 @@ handle_frame_post_auth(Transport, virtual_host = VirtualHost} = Connection, State, - <<?COMMAND_PARTITIONS:16, - ?VERSION_1:16, - CorrelationId:32, - SuperStreamSize:16, - SuperStream:SuperStreamSize/binary>>, + {request, CorrelationId, + {partitions, SuperStream}}, Rest) -> {ResponseCode, PartitionsBin} = case rabbit_stream_manager:partitions(VirtualHost, SuperStream) of @@ -2241,13 +2138,8 @@ handle_frame_post_auth(Transport, handle_frame_post_auth(Transport, Connection, State, - <<?REQUEST:1, - ?COMMAND_CLOSE:15, - ?VERSION_1:16, - CorrelationId:32, - ClosingCode:16, - ClosingReasonLength:16, - ClosingReason:ClosingReasonLength/binary>>, + {request, CorrelationId, + {close, ClosingCode, ClosingReason}}, _Rest) -> rabbit_log:info("Received close command ~p ~p", [ClosingCode, ClosingReason]), @@ -2263,13 +2155,13 @@ handle_frame_post_auth(Transport, handle_frame_post_auth(_Transport, Connection, State, - <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>, + heartbeat, Rest) -> rabbit_log:info("Received heartbeat frame post auth~n"), {Connection, State, Rest}; -handle_frame_post_auth(Transport, Connection, State, Frame, Rest) -> - rabbit_log:warning("unknown frame ~p ~p, sending close command.", - [Frame, Rest]), +handle_frame_post_auth(Transport, Connection, State, Command, Rest) -> + rabbit_log:warning("unknown command ~p ~p, sending close command.", + [Command, Rest]), CloseReason = <<"unknown frame">>, CloseReasonLength = byte_size(CloseReason), CloseFrame = @@ -2311,11 +2203,12 @@ notify_connection_closed(#stream_connection{name = Name, handle_frame_post_close(_Transport, Connection, State, - <<?RESPONSE:1, - ?COMMAND_CLOSE:15, - ?VERSION_1:16, - _CorrelationId:32, - _ResponseCode:16>>, + {response, _CorrelationId, {close, _Code}}, + % <<?RESPONSE:1, + % ?COMMAND_CLOSE:15, + % ?VERSION_1:16, + % _CorrelationId:32, + % _ResponseCode:16>>, Rest) -> rabbit_log:info("Received close confirmation~n"), {Connection#stream_connection{connection_step = closing_done}, State, @@ -2323,12 +2216,12 @@ handle_frame_post_close(_Transport, handle_frame_post_close(_Transport, Connection, State, - <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>, + heartbeat, Rest) -> - rabbit_log:info("Received heartbeat frame post close~n"), + rabbit_log:info("Received heartbeat command post close~n"), {Connection, State, Rest}; -handle_frame_post_close(_Transport, Connection, State, Frame, Rest) -> - rabbit_log:warning("ignored frame on close ~p ~p.", [Frame, Rest]), +handle_frame_post_close(_Transport, Connection, State, Command, Rest) -> + rabbit_log:warning("ignored command on close ~p ~p.", [Command, Rest]), {Connection, State, Rest}. stream_r(Stream, #stream_connection{virtual_host = VHost}) -> |