diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2021-09-10 15:26:26 +0100 |
---|---|---|
committer | Karl Nilsson <kjnilsson@gmail.com> | 2021-09-10 15:26:26 +0100 |
commit | 3b1714cbe3fd3ec5176116063d300020c00ec903 (patch) | |
tree | f4898d285d859e0af579f530719cee3a65e22692 | |
parent | f10db03b4d0ad6c3678cb3407a8769d45d5ccf40 (diff) | |
download | rabbitmq-server-git-handle-connection-closures-in-stream-reader.tar.gz |
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 56 |
1 files changed, 33 insertions, 23 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 7a6c13b844..412be10650 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -1007,7 +1007,8 @@ open(cast, SendFileOct) of {error, closed} -> - rabbit_log_connection:info("Stream protocol connection has been closed by peer", []), + rabbit_log_connection:info("Stream protocol connection has been closed by peer", + []), throw({stop, normal}); {error, Reason} -> rabbit_log_connection:info("Error while sending chunks: ~p", @@ -1823,38 +1824,46 @@ handle_frame_post_auth(Transport, rabbit_log:debug("Distributing existing messages to subscription ~p", [SubscriptionId]), - case send_chunks(Transport, ConsumerState, SendFileOct) of + case send_chunks(Transport, ConsumerState, + SendFileOct) + of {error, closed} -> - rabbit_log_connection:info("Stream protocol connection has been closed by peer", []), + rabbit_log_connection:info("Stream protocol connection has been closed by peer", + []), throw({stop, normal}); {{segment, Segment1}, {credit, Credit1}} -> ConsumerState1 = - ConsumerState#consumer{segment = Segment1, - credit = Credit1}, + ConsumerState#consumer{segment = + Segment1, + credit = + Credit1}, Consumers1 = - Consumers#{SubscriptionId => ConsumerState1}, + Consumers#{SubscriptionId => + ConsumerState1}, StreamSubscriptions1 = - case StreamSubscriptions of - #{Stream := SubscriptionIds} -> - StreamSubscriptions#{Stream => - [SubscriptionId] - ++ SubscriptionIds}; - _ -> - StreamSubscriptions#{Stream => - [SubscriptionId]} - end, + case StreamSubscriptions of + #{Stream := SubscriptionIds} -> + StreamSubscriptions#{Stream => + [SubscriptionId] + ++ SubscriptionIds}; + _ -> + StreamSubscriptions#{Stream => + [SubscriptionId]} + end, #consumer{counters = ConsumerCounters1} = - ConsumerState1, + ConsumerState1, - ConsumerOffset = osiris_log:next_offset(Segment1), + ConsumerOffset = + osiris_log:next_offset(Segment1), ConsumerOffsetLag = - consumer_i(offset_lag, ConsumerState1), + consumer_i(offset_lag, ConsumerState1), rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) " "distributed after subscription", - [SubscriptionId, ConsumerOffset, + [SubscriptionId, + ConsumerOffset, messages_consumed(ConsumerCounters1)]), rabbit_stream_metrics:consumer_created(self(), @@ -1867,10 +1876,10 @@ handle_frame_post_auth(Transport, ConsumerOffsetLag, Properties), {Connection1#stream_connection{stream_subscriptions - = - StreamSubscriptions1}, + = + StreamSubscriptions1}, State#stream_connection_state{consumers = - Consumers1}} + Consumers1}} end end end; @@ -1900,7 +1909,8 @@ handle_frame_post_auth(Transport, SendFileOct) of {error, closed} -> - rabbit_log_connection:info("Stream protocol connection has been closed by peer", []), + rabbit_log_connection:info("Stream protocol connection has been closed by peer", + []), throw({stop, normal}); {{segment, Segment1}, {credit, Credit1}} -> Consumer1 = |