summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-09-10 15:26:26 +0100
committerKarl Nilsson <kjnilsson@gmail.com>2021-09-10 15:26:26 +0100
commit3b1714cbe3fd3ec5176116063d300020c00ec903 (patch)
treef4898d285d859e0af579f530719cee3a65e22692
parentf10db03b4d0ad6c3678cb3407a8769d45d5ccf40 (diff)
downloadrabbitmq-server-git-handle-connection-closures-in-stream-reader.tar.gz
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl56
1 files changed, 33 insertions, 23 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 7a6c13b844..412be10650 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -1007,7 +1007,8 @@ open(cast,
SendFileOct)
of
{error, closed} ->
- rabbit_log_connection:info("Stream protocol connection has been closed by peer", []),
+ rabbit_log_connection:info("Stream protocol connection has been closed by peer",
+ []),
throw({stop, normal});
{error, Reason} ->
rabbit_log_connection:info("Error while sending chunks: ~p",
@@ -1823,38 +1824,46 @@ handle_frame_post_auth(Transport,
rabbit_log:debug("Distributing existing messages to subscription ~p",
[SubscriptionId]),
- case send_chunks(Transport, ConsumerState, SendFileOct) of
+ case send_chunks(Transport, ConsumerState,
+ SendFileOct)
+ of
{error, closed} ->
- rabbit_log_connection:info("Stream protocol connection has been closed by peer", []),
+ rabbit_log_connection:info("Stream protocol connection has been closed by peer",
+ []),
throw({stop, normal});
{{segment, Segment1}, {credit, Credit1}} ->
ConsumerState1 =
- ConsumerState#consumer{segment = Segment1,
- credit = Credit1},
+ ConsumerState#consumer{segment =
+ Segment1,
+ credit =
+ Credit1},
Consumers1 =
- Consumers#{SubscriptionId => ConsumerState1},
+ Consumers#{SubscriptionId =>
+ ConsumerState1},
StreamSubscriptions1 =
- case StreamSubscriptions of
- #{Stream := SubscriptionIds} ->
- StreamSubscriptions#{Stream =>
- [SubscriptionId]
- ++ SubscriptionIds};
- _ ->
- StreamSubscriptions#{Stream =>
- [SubscriptionId]}
- end,
+ case StreamSubscriptions of
+ #{Stream := SubscriptionIds} ->
+ StreamSubscriptions#{Stream =>
+ [SubscriptionId]
+ ++ SubscriptionIds};
+ _ ->
+ StreamSubscriptions#{Stream =>
+ [SubscriptionId]}
+ end,
#consumer{counters = ConsumerCounters1} =
- ConsumerState1,
+ ConsumerState1,
- ConsumerOffset = osiris_log:next_offset(Segment1),
+ ConsumerOffset =
+ osiris_log:next_offset(Segment1),
ConsumerOffsetLag =
- consumer_i(offset_lag, ConsumerState1),
+ consumer_i(offset_lag, ConsumerState1),
rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) "
"distributed after subscription",
- [SubscriptionId, ConsumerOffset,
+ [SubscriptionId,
+ ConsumerOffset,
messages_consumed(ConsumerCounters1)]),
rabbit_stream_metrics:consumer_created(self(),
@@ -1867,10 +1876,10 @@ handle_frame_post_auth(Transport,
ConsumerOffsetLag,
Properties),
{Connection1#stream_connection{stream_subscriptions
- =
- StreamSubscriptions1},
+ =
+ StreamSubscriptions1},
State#stream_connection_state{consumers =
- Consumers1}}
+ Consumers1}}
end
end
end;
@@ -1900,7 +1909,8 @@ handle_frame_post_auth(Transport,
SendFileOct)
of
{error, closed} ->
- rabbit_log_connection:info("Stream protocol connection has been closed by peer", []),
+ rabbit_log_connection:info("Stream protocol connection has been closed by peer",
+ []),
throw({stop, normal});
{{segment, Segment1}, {credit, Credit1}} ->
Consumer1 =