summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbitmq_stream/src/rabbit_stream_reader.erl')
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl401
1 files changed, 147 insertions, 254 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 793a37f75c..86b459fe7f 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -41,7 +41,8 @@
stream :: stream(),
counters :: atomics:atomics_ref()}).
-record(stream_connection_state,
- {data :: none | binary(), blocked :: boolean(),
+ {data :: rabbit_stream_core:state(),
+ blocked :: boolean(),
consumers :: #{subscription_id() => #consumer{}}}).
-record(stream_connection,
{name :: binary(),
@@ -198,7 +199,7 @@ init([KeepaliveSup,
State =
#stream_connection_state{consumers = #{},
blocked = false,
- data = none},
+ data = rabbit_stream_core:init(undefined)},
Transport:setopts(RealSocket, [{active, once}]),
rabbit_alarm:register(self(), {?MODULE, resource_alarm, []}),
@@ -789,7 +790,7 @@ listen_loop_post_close(Transport,
=
IsThereAlarm},
State,
- Configuration);
+ <<>>);
{OK, S, Data} ->
Transport:setopts(S, [{active, once}]),
{Connection1, State1} =
@@ -846,61 +847,87 @@ handle_inbound_data_post_close(Transport, Connection, State, Rest) ->
Rest,
fun handle_frame_post_close/5).
-handle_inbound_data(_Transport,
- Connection,
- State,
- <<>>,
- _HandleFrameFun) ->
- {Connection, State};
-handle_inbound_data(Transport,
- #stream_connection{frame_max = FrameMax} = Connection,
- #stream_connection_state{data = none} = State,
- <<Size:32, _Frame:Size/binary, _Rest/bits>>,
- _HandleFrameFun)
- when FrameMax /= 0 andalso Size > FrameMax - 4 ->
- CloseReason = <<"frame too large">>,
- CloseReasonLength = byte_size(CloseReason),
- CloseFrame =
- <<?REQUEST:1,
- ?COMMAND_CLOSE:15,
- ?VERSION_1:16,
- 1:32,
- ?RESPONSE_CODE_FRAME_TOO_LARGE:16,
- CloseReasonLength:16,
- CloseReason:CloseReasonLength/binary>>,
- frame(Transport, Connection, CloseFrame),
- {Connection#stream_connection{connection_step = close_sent}, State};
-handle_inbound_data(Transport,
- Connection,
- #stream_connection_state{data = none} = State,
- <<Size:32, Frame:Size/binary, Rest/bits>>,
- HandleFrameFun) ->
- {Connection1, State1, Rest1} =
- HandleFrameFun(Transport, Connection, State, Frame, Rest),
- handle_inbound_data(Transport,
- Connection1,
- State1,
- Rest1,
- HandleFrameFun);
-handle_inbound_data(_Transport,
- Connection,
- #stream_connection_state{data = none} = State,
- Data,
- _HandleFrameFun) ->
- {Connection, State#stream_connection_state{data = Data}};
handle_inbound_data(Transport,
Connection,
- #stream_connection_state{data = Leftover} = State,
+ #stream_connection_state{data = CoreState0} = State,
Data,
HandleFrameFun) ->
- State1 = State#stream_connection_state{data = none},
- %% FIXME avoid concatenation to avoid a new binary allocation
- %% see osiris_replica:parse_chunk/3
- handle_inbound_data(Transport,
- Connection,
- State1,
- <<Leftover/binary, Data/binary>>,
- HandleFrameFun).
+ case rabbit_stream_core:incoming_data(Data, CoreState0) of
+ {CoreState, []} ->
+ % rabbit_log:info("NO FRAMES ", []),
+ {Connection,
+ State#stream_connection_state{data = CoreState}};
+ {CoreState, Commands} ->
+ % rabbit_log:info("FRAMES ~w", [length(Frames)]),
+ {Connection1, State1, _} =
+ lists:foldl(
+ fun (Command, {C, S, R}) ->
+ HandleFrameFun(Transport, C, S, Command, R)
+ end, {Connection,
+ State#stream_connection_state{data = CoreState},
+ <<>>}, Commands),
+ handle_inbound_data(Transport,
+ Connection1,
+ State1,
+ <<>>,
+ HandleFrameFun)
+ end.
+
+% handle_inbound_data(_Transport,
+% Connection,
+% State,
+% <<>>,
+% _HandleFrameFun) ->
+% {Connection, State};
+% handle_inbound_data(Transport,
+% #stream_connection{frame_max = FrameMax} = Connection,
+% #stream_connection_state{data = none} = State,
+% <<Size:32, _Frame:Size/binary, _Rest/bits>>,
+% _HandleFrameFun)
+% when FrameMax /= 0 andalso Size > FrameMax - 4 ->
+% CloseReason = <<"frame too large">>,
+% CloseReasonLength = byte_size(CloseReason),
+% CloseFrame =
+% <<?REQUEST:1,
+% ?COMMAND_CLOSE:15,
+% ?VERSION_1:16,
+% 1:32,
+% ?RESPONSE_CODE_FRAME_TOO_LARGE:16,
+% CloseReasonLength:16,
+% CloseReason:CloseReasonLength/binary>>,
+% frame(Transport, Connection, CloseFrame),
+% {Connection#stream_connection{connection_step = close_sent}, State};
+% handle_inbound_data(Transport,
+% Connection,
+% #stream_connection_state{data = none} = State,
+% <<Size:32, Frame:Size/binary, Rest/bits>>,
+% HandleFrameFun) ->
+% {Connection1, State1, Rest1} =
+% HandleFrameFun(Transport, Connection, State, Frame, Rest),
+% handle_inbound_data(Transport,
+% Connection1,
+% State1,
+% Rest1,
+% HandleFrameFun);
+% handle_inbound_data(_Transport,
+% Connection,
+% #stream_connection_state{data = none} = State,
+% Data,
+% _HandleFrameFun) ->
+% {Connection, State#stream_connection_state{data = Data}};
+% handle_inbound_data(Transport,
+% Connection,
+% #stream_connection_state{data = Leftover} = State,
+% Data,
+% HandleFrameFun) ->
+% State1 = State#stream_connection_state{data = none},
+% %% FIXME avoid concatenation to avoid a new binary allocation
+% %% see osiris_replica:parse_chunk/3
+% handle_inbound_data(Transport,
+% Connection,
+% State1,
+% <<Leftover/binary, Data/binary>>,
+% HandleFrameFun).
generate_publishing_error_details(Acc, _Code, <<>>) ->
Acc;
@@ -916,16 +943,9 @@ generate_publishing_error_details(Acc, Code,
handle_frame_pre_auth(Transport,
#stream_connection{socket = S} = Connection,
State,
- <<?REQUEST:1,
- ?COMMAND_PEER_PROPERTIES:15,
- ?VERSION_1:16,
- CorrelationId:32,
- ClientPropertiesCount:32,
- ClientPropertiesFrame/binary>>,
+ {request, CorrelationId,
+ {peer_properties, ClientProperties}},
Rest) ->
- {ClientProperties, _} =
- rabbit_stream_utils:parse_map(ClientPropertiesFrame,
- ClientPropertiesCount),
{ok, Product} = application:get_key(rabbit, description),
{ok, Version} = application:get_key(rabbit, vsn),
@@ -984,10 +1004,7 @@ handle_frame_pre_auth(Transport,
handle_frame_pre_auth(Transport,
#stream_connection{socket = S} = Connection,
State,
- <<?REQUEST:1,
- ?COMMAND_SASL_HANDSHAKE:15,
- ?VERSION_1:16,
- CorrelationId:32>>,
+ {request, CorrelationId, sasl_handshake},
Rest) ->
Mechanisms = rabbit_stream_utils:auth_mechanisms(S),
MechanismsFragment =
@@ -1015,21 +1032,9 @@ handle_frame_pre_auth(Transport,
host = Host} =
Connection0,
State,
- <<?REQUEST:1,
- ?COMMAND_SASL_AUTHENTICATE:15,
- ?VERSION_1:16,
- CorrelationId:32,
- MechanismLength:16,
- Mechanism:MechanismLength/binary,
- SaslFragment/binary>>,
+ {request, CorrelationId,
+ {sasl_authenticate, Mechanism, SaslBin}},
Rest) ->
- SaslBin =
- case SaslFragment of
- <<(-1):32/signed>> ->
- <<>>;
- <<SaslBinaryLength:32, SaslBinary:SaslBinaryLength/binary>> ->
- SaslBinary
- end,
{Connection1, Rest1} =
case rabbit_stream_utils:auth_mechanism_to_module(Mechanism, S) of
@@ -1137,11 +1142,7 @@ handle_frame_pre_auth(_Transport,
name = ConnectionName} =
Connection,
State,
- <<?RESPONSE:1,
- ?COMMAND_TUNE:15,
- ?VERSION_1:16,
- FrameMax:32,
- Heartbeat:32>>,
+ {tune, FrameMax, Heartbeat},
Rest) ->
rabbit_log:debug("Tuning response ~p ~p ", [FrameMax, Heartbeat]),
Parent = self(),
@@ -1172,12 +1173,8 @@ handle_frame_pre_auth(_Transport,
handle_frame_pre_auth(Transport,
#stream_connection{user = User, socket = S} = Connection,
State,
- <<?REQUEST:1,
- ?COMMAND_OPEN:15,
- ?VERSION_1:16,
- CorrelationId:32,
- VirtualHostLength:16,
- VirtualHost:VirtualHostLength/binary>>,
+ {request, CorrelationId,
+ {open, VirtualHost}},
Rest) ->
%% FIXME enforce connection limit (see rabbit_reader:is_over_connection_limit/2)
{Connection1, Frame} =
@@ -1211,13 +1208,13 @@ handle_frame_pre_auth(Transport,
handle_frame_pre_auth(_Transport,
Connection,
State,
- <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>,
+ heartbeat,
Rest) ->
rabbit_log:info("Received heartbeat frame pre auth~n"),
{Connection, State, Rest};
-handle_frame_pre_auth(_Transport, Connection, State, Frame, Rest) ->
- rabbit_log:warning("unknown frame ~p ~p, closing connection.",
- [Frame, Rest]),
+handle_frame_pre_auth(_Transport, Connection, State, Command, Rest) ->
+ rabbit_log:warning("unknown command ~p ~p, closing connection.",
+ [Command, Rest]),
{Connection#stream_connection{connection_step = failure}, State,
Rest}.
@@ -1256,15 +1253,8 @@ notify_auth_result(Username,
handle_frame_post_auth(Transport,
#stream_connection{resource_alarm = true} = Connection0,
State,
- <<?REQUEST:1,
- ?COMMAND_DECLARE_PUBLISHER:15,
- ?VERSION_1:16,
- CorrelationId:32,
- PublisherId:8,
- ReferenceSize:16,
- _Reference:ReferenceSize/binary,
- StreamSize:16,
- Stream:StreamSize/binary>>,
+ {request, CorrelationId,
+ {declare_publisher, PublisherId, _WriterRef, Stream}},
Rest) ->
rabbit_log:info("Cannot create publisher ~p on stream ~p, connection "
"is blocked because of resource alarm",
@@ -1282,15 +1272,8 @@ handle_frame_post_auth(Transport,
resource_alarm = false} =
Connection0,
State,
- <<?REQUEST:1,
- ?COMMAND_DECLARE_PUBLISHER:15,
- ?VERSION_1:16,
- CorrelationId:32,
- PublisherId:8,
- ReferenceSize:16,
- Reference:ReferenceSize/binary,
- StreamSize:16,
- Stream:StreamSize/binary>>,
+ {request, CorrelationId,
+ {declare_publisher, PublisherId, WriterRef, Stream}},
Rest) ->
case rabbit_stream_utils:check_write_permitted(stream_r(Stream,
Connection0),
@@ -1298,7 +1281,7 @@ handle_frame_post_auth(Transport,
of
ok ->
case {maps:is_key(PublisherId, Publishers0),
- maps:is_key({Stream, Reference}, RefIds0)}
+ maps:is_key({Stream, WriterRef}, RefIds0)}
of
{false, false} ->
case lookup_leader(Stream, Connection0) of
@@ -1314,12 +1297,12 @@ handle_frame_post_auth(Transport,
publisher_to_ids = RefIds0} =
Connection1} ->
{PublisherReference, RefIds1} =
- case Reference of
+ case WriterRef of
<<"">> ->
{undefined, RefIds0};
_ ->
- {Reference,
- RefIds0#{{Stream, Reference} =>
+ {WriterRef,
+ RefIds0#{{Stream, WriterRef} =>
PublisherId}}
end,
Publisher =
@@ -1372,12 +1355,7 @@ handle_frame_post_auth(Transport,
publishers = Publishers} =
Connection,
State,
- <<?REQUEST:1,
- ?COMMAND_PUBLISH:15,
- ?VERSION_1:16,
- PublisherId:8/unsigned,
- MessageCount:32,
- Messages/binary>>,
+ {publish, PublisherId, MessageCount, Messages},
Rest) ->
case Publishers of
#{PublisherId := Publisher} ->
@@ -1442,14 +1420,8 @@ handle_frame_post_auth(Transport,
user = User} =
Connection,
State,
- <<?REQUEST:1,
- ?COMMAND_QUERY_PUBLISHER_SEQUENCE:15,
- ?VERSION_1:16,
- CorrelationId:32,
- ReferenceSize:16,
- Reference:ReferenceSize/binary,
- StreamSize:16,
- Stream:StreamSize/binary>>,
+ {request, CorrelationId,
+ {query_publisher_sequence, Reference, Stream}},
Rest) ->
FrameSize = ?RESPONSE_FRAME_SIZE + 8,
{ResponseCode, Sequence} =
@@ -1492,11 +1464,8 @@ handle_frame_post_auth(Transport,
publisher_to_ids = PubToIds} =
Connection0,
State,
- <<?REQUEST:1,
- ?COMMAND_DELETE_PUBLISHER:15,
- ?VERSION_1:16,
- CorrelationId:32,
- PublisherId:8>>,
+ {request, CorrelationId,
+ {delete_publisher, PublisherId}},
Rest) ->
case Publishers of
#{PublisherId := #publisher{stream = Stream, reference = Ref}} ->
@@ -1536,15 +1505,9 @@ handle_frame_post_auth(Transport,
send_file_oct = SendFileOct} =
Connection,
#stream_connection_state{consumers = Consumers} = State,
- <<?REQUEST:1,
- ?COMMAND_SUBSCRIBE:15,
- ?VERSION_1:16,
- CorrelationId:32,
- SubscriptionId:8/unsigned,
- StreamSize:16,
- Stream:StreamSize/binary,
- OffsetType:16/signed,
- OffsetAndCredit/binary>>,
+
+ {request, CorrelationId,
+ {subscribe, SubscriptionId, Stream, OffsetSpec, Credit}},
Rest) ->
%% FIXME check the max number of subs is not reached already
case rabbit_stream_utils:check_read_permitted(#resource{name = Stream,
@@ -1582,26 +1545,6 @@ handle_frame_post_auth(Transport,
?RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS),
{Connection, State, Rest};
false ->
- {OffsetSpec, Credit} =
- case OffsetType of
- ?OFFSET_TYPE_FIRST ->
- <<Crdt:16>> = OffsetAndCredit,
- {first, Crdt};
- ?OFFSET_TYPE_LAST ->
- <<Crdt:16>> = OffsetAndCredit,
- {last, Crdt};
- ?OFFSET_TYPE_NEXT ->
- <<Crdt:16>> = OffsetAndCredit,
- {next, Crdt};
- ?OFFSET_TYPE_OFFSET ->
- <<Offset:64/unsigned, Crdt:16>> =
- OffsetAndCredit,
- {Offset, Crdt};
- ?OFFSET_TYPE_TIMESTAMP ->
- <<Timestamp:64/signed, Crdt:16>> =
- OffsetAndCredit,
- {{timestamp, Timestamp}, Crdt}
- end,
rabbit_log:info("Creating subscription ~p to ~p, with offset specificat"
"ion ~p",
[SubscriptionId, Stream,
@@ -1695,11 +1638,7 @@ handle_frame_post_auth(Transport,
send_file_oct = SendFileOct} =
Connection,
#stream_connection_state{consumers = Consumers} = State,
- <<?REQUEST:1,
- ?COMMAND_CREDIT:15,
- ?VERSION_1:16,
- SubscriptionId:8/unsigned,
- Credit:16/signed>>,
+ {credit, SubscriptionId, Credit},
Rest) ->
case Consumers of
#{SubscriptionId := Consumer} ->
@@ -1752,15 +1691,7 @@ handle_frame_post_auth(_Transport,
user = User} =
Connection,
State,
- <<?REQUEST:1,
- ?COMMAND_COMMIT_OFFSET:15,
- ?VERSION_1:16,
- _CorrelationId:32,
- ReferenceSize:16,
- Reference:ReferenceSize/binary,
- StreamSize:16,
- Stream:StreamSize/binary,
- Offset:64>>,
+ {commit_offset, Reference, Stream, Offset},
Rest) ->
case rabbit_stream_utils:check_write_permitted(#resource{name =
Stream,
@@ -1776,12 +1707,12 @@ handle_frame_post_auth(_Transport,
[Stream]),
%% FIXME commit offset is fire-and-forget, so no response even if error, change this?
{Connection, State, Rest};
- undefined ->
- rabbit_log:warning("Could not find leader (undefined) to commit offset "
- "on ~p",
- [Stream]),
- %% FIXME commit offset is fire-and-forget, so no response even if error, change this?
- {Connection, State, Rest};
+ % undefined ->
+ % rabbit_log:warning("Could not find leader (undefined) to commit offset "
+ % "on ~p",
+ % [Stream]),
+ % %% FIXME commit offset is fire-and-forget, so no response even if error, change this?
+ % {Connection, State, Rest};
{ClusterLeader, Connection1} ->
osiris:write_tracking(ClusterLeader, Reference, Offset),
{Connection1, State, Rest}
@@ -1797,14 +1728,8 @@ handle_frame_post_auth(Transport,
user = User} =
Connection0,
State,
- <<?REQUEST:1,
- ?COMMAND_QUERY_OFFSET:15,
- ?VERSION_1:16,
- CorrelationId:32,
- ReferenceSize:16,
- Reference:ReferenceSize/binary,
- StreamSize:16,
- Stream:StreamSize/binary>>,
+ {request, CorrelationId,
+ {query_offset, Reference, Stream}},
Rest) ->
FrameSize = ?RESPONSE_FRAME_SIZE + 8,
{ResponseCode, Offset, Connection1} =
@@ -1846,11 +1771,8 @@ handle_frame_post_auth(Transport,
StreamSubscriptions} =
Connection,
#stream_connection_state{} = State,
- <<?REQUEST:1,
- ?COMMAND_UNSUBSCRIBE:15,
- ?VERSION_1:16,
- CorrelationId:32,
- SubscriptionId:8/unsigned>>,
+ {request, CorrelationId,
+ {unsubscribe, SubscriptionId}},
Rest) ->
case subscription_exists(StreamSubscriptions, SubscriptionId) of
false ->
@@ -1876,19 +1798,11 @@ handle_frame_post_auth(Transport,
User} =
Connection,
State,
- <<?REQUEST:1,
- ?COMMAND_CREATE_STREAM:15,
- ?VERSION_1:16,
- CorrelationId:32,
- StreamSize:16,
- Stream:StreamSize/binary,
- ArgumentsCount:32,
- ArgumentsBinary/binary>>,
+ {request, CorrelationId,
+ {create_stream, Stream, Arguments}},
Rest) ->
case rabbit_stream_utils:enforce_correct_stream_name(Stream) of
{ok, StreamName} ->
- {Arguments, _Rest} =
- rabbit_stream_utils:parse_map(ArgumentsBinary, ArgumentsCount),
case rabbit_stream_utils:check_configure_permitted(#resource{name =
StreamName,
kind =
@@ -1961,12 +1875,8 @@ handle_frame_post_auth(Transport,
User} =
Connection,
State,
- <<?REQUEST:1,
- ?COMMAND_DELETE_STREAM:15,
- ?VERSION_1:16,
- CorrelationId:32,
- StreamSize:16,
- Stream:StreamSize/binary>>,
+ {request, CorrelationId,
+ {delete_stream, Stream}},
Rest) ->
case rabbit_stream_utils:check_configure_permitted(#resource{name =
Stream,
@@ -2025,14 +1935,9 @@ handle_frame_post_auth(Transport,
virtual_host = VirtualHost} =
Connection,
State,
- <<?REQUEST:1,
- ?COMMAND_METADATA:15,
- ?VERSION_1:16,
- CorrelationId:32,
- StreamCount:32,
- BinaryStreams/binary>>,
+ {request, CorrelationId, {metadata, Streams}},
Rest) ->
- Streams = rabbit_stream_utils:extract_stream_list(BinaryStreams, []),
+ StreamCount = length(Streams),
Topology =
lists:foldl(fun(Stream, Acc) ->
@@ -2171,13 +2076,8 @@ handle_frame_post_auth(Transport,
virtual_host = VirtualHost} =
Connection,
State,
- <<?COMMAND_ROUTE:16,
- ?VERSION_1:16,
- CorrelationId:32,
- RoutingKeySize:16,
- RoutingKey:RoutingKeySize/binary,
- SuperStreamSize:16,
- SuperStream:SuperStreamSize/binary>>,
+ {request, CorrelationId,
+ {route, RoutingKey, SuperStream}},
Rest) ->
{ResponseCode, StreamBin} =
case rabbit_stream_manager:route(RoutingKey, VirtualHost, SuperStream)
@@ -2206,11 +2106,8 @@ handle_frame_post_auth(Transport,
virtual_host = VirtualHost} =
Connection,
State,
- <<?COMMAND_PARTITIONS:16,
- ?VERSION_1:16,
- CorrelationId:32,
- SuperStreamSize:16,
- SuperStream:SuperStreamSize/binary>>,
+ {request, CorrelationId,
+ {partitions, SuperStream}},
Rest) ->
{ResponseCode, PartitionsBin} =
case rabbit_stream_manager:partitions(VirtualHost, SuperStream) of
@@ -2241,13 +2138,8 @@ handle_frame_post_auth(Transport,
handle_frame_post_auth(Transport,
Connection,
State,
- <<?REQUEST:1,
- ?COMMAND_CLOSE:15,
- ?VERSION_1:16,
- CorrelationId:32,
- ClosingCode:16,
- ClosingReasonLength:16,
- ClosingReason:ClosingReasonLength/binary>>,
+ {request, CorrelationId,
+ {close, ClosingCode, ClosingReason}},
_Rest) ->
rabbit_log:info("Received close command ~p ~p",
[ClosingCode, ClosingReason]),
@@ -2263,13 +2155,13 @@ handle_frame_post_auth(Transport,
handle_frame_post_auth(_Transport,
Connection,
State,
- <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>,
+ heartbeat,
Rest) ->
rabbit_log:info("Received heartbeat frame post auth~n"),
{Connection, State, Rest};
-handle_frame_post_auth(Transport, Connection, State, Frame, Rest) ->
- rabbit_log:warning("unknown frame ~p ~p, sending close command.",
- [Frame, Rest]),
+handle_frame_post_auth(Transport, Connection, State, Command, Rest) ->
+ rabbit_log:warning("unknown command ~p ~p, sending close command.",
+ [Command, Rest]),
CloseReason = <<"unknown frame">>,
CloseReasonLength = byte_size(CloseReason),
CloseFrame =
@@ -2311,11 +2203,12 @@ notify_connection_closed(#stream_connection{name = Name,
handle_frame_post_close(_Transport,
Connection,
State,
- <<?RESPONSE:1,
- ?COMMAND_CLOSE:15,
- ?VERSION_1:16,
- _CorrelationId:32,
- _ResponseCode:16>>,
+ {response, _CorrelationId, {close, _Code}},
+ % <<?RESPONSE:1,
+ % ?COMMAND_CLOSE:15,
+ % ?VERSION_1:16,
+ % _CorrelationId:32,
+ % _ResponseCode:16>>,
Rest) ->
rabbit_log:info("Received close confirmation~n"),
{Connection#stream_connection{connection_step = closing_done}, State,
@@ -2323,12 +2216,12 @@ handle_frame_post_close(_Transport,
handle_frame_post_close(_Transport,
Connection,
State,
- <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_1:16>>,
+ heartbeat,
Rest) ->
- rabbit_log:info("Received heartbeat frame post close~n"),
+ rabbit_log:info("Received heartbeat command post close~n"),
{Connection, State, Rest};
-handle_frame_post_close(_Transport, Connection, State, Frame, Rest) ->
- rabbit_log:warning("ignored frame on close ~p ~p.", [Frame, Rest]),
+handle_frame_post_close(_Transport, Connection, State, Command, Rest) ->
+ rabbit_log:warning("ignored command on close ~p ~p.", [Command, Rest]),
{Connection, State, Rest}.
stream_r(Stream, #stream_connection{virtual_host = VHost}) ->