summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-09-10 10:15:59 +0100
committerKarl Nilsson <kjnilsson@gmail.com>2021-09-10 10:15:59 +0100
commitd6301a3e119281b737819f0ed85cdb8c3236e64e (patch)
treeb80e36a2b81c8f815532b4367faeb8d18c7de4b0
parent3513fa0ea89f2e7c6142977d596e1ae81898751f (diff)
downloadrabbitmq-server-git-d6301a3e119281b737819f0ed85cdb8c3236e64e.tar.gz
Handle closed connections in stream reader
and throw and stop gracefully.
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl111
1 files changed, 52 insertions, 59 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 8bad2e53af..2a682f0cf8 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -1006,6 +1006,9 @@ open(cast,
Consumer,
SendFileOct)
of
+ {error, closed} ->
+ rabbit_log:info("Stream protocol connection has been closed by peer", []),
+ throw({stop, connection_closed});
{error, Reason} ->
rabbit_log_connection:info("Error while sending chunks: ~p",
[Reason]),
@@ -1819,52 +1822,56 @@ handle_frame_post_auth(Transport,
rabbit_log:debug("Distributing existing messages to subscription ~p",
[SubscriptionId]),
- {{segment, Segment1}, {credit, Credit1}} =
- send_chunks(Transport, ConsumerState,
- SendFileOct),
- ConsumerState1 =
- ConsumerState#consumer{segment = Segment1,
- credit = Credit1},
- Consumers1 =
- Consumers#{SubscriptionId => ConsumerState1},
-
- StreamSubscriptions1 =
- case StreamSubscriptions of
- #{Stream := SubscriptionIds} ->
- StreamSubscriptions#{Stream =>
+
+ case send_chunks(Transport, ConsumerState, SendFileOct) of
+ {error, closed} ->
+ rabbit_log:info("Stream protocol connection has been closed by peer", []),
+ throw({stop, connection_closed});
+ {{segment, Segment1}, {credit, Credit1}} ->
+ ConsumerState1 =
+ ConsumerState#consumer{segment = Segment1,
+ credit = Credit1},
+ Consumers1 =
+ Consumers#{SubscriptionId => ConsumerState1},
+
+ StreamSubscriptions1 =
+ case StreamSubscriptions of
+ #{Stream := SubscriptionIds} ->
+ StreamSubscriptions#{Stream =>
[SubscriptionId]
++ SubscriptionIds};
- _ ->
- StreamSubscriptions#{Stream =>
+ _ ->
+ StreamSubscriptions#{Stream =>
[SubscriptionId]}
- end,
-
- #consumer{counters = ConsumerCounters1} =
- ConsumerState1,
-
- ConsumerOffset = osiris_log:next_offset(Segment1),
- ConsumerOffsetLag =
- consumer_i(offset_lag, ConsumerState1),
-
- rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) "
- "distributed after subscription",
- [SubscriptionId, ConsumerOffset,
- messages_consumed(ConsumerCounters1)]),
-
- rabbit_stream_metrics:consumer_created(self(),
- stream_r(Stream,
- Connection1),
- SubscriptionId,
- Credit1,
- messages_consumed(ConsumerCounters1),
- ConsumerOffset,
- ConsumerOffsetLag,
- Properties),
- {Connection1#stream_connection{stream_subscriptions
- =
- StreamSubscriptions1},
- State#stream_connection_state{consumers =
- Consumers1}}
+ end,
+
+ #consumer{counters = ConsumerCounters1} =
+ ConsumerState1,
+
+ ConsumerOffset = osiris_log:next_offset(Segment1),
+ ConsumerOffsetLag =
+ consumer_i(offset_lag, ConsumerState1),
+
+ rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) "
+ "distributed after subscription",
+ [SubscriptionId, ConsumerOffset,
+ messages_consumed(ConsumerCounters1)]),
+
+ rabbit_stream_metrics:consumer_created(self(),
+ stream_r(Stream,
+ Connection1),
+ SubscriptionId,
+ Credit1,
+ messages_consumed(ConsumerCounters1),
+ ConsumerOffset,
+ ConsumerOffsetLag,
+ Properties),
+ {Connection1#stream_connection{stream_subscriptions
+ =
+ StreamSubscriptions1},
+ State#stream_connection_state{consumers =
+ Consumers1}}
+ end
end
end;
error ->
@@ -1893,22 +1900,8 @@ handle_frame_post_auth(Transport,
SendFileOct)
of
{error, closed} ->
- rabbit_log:warning("Stream protocol connection for subscription ~p has been closed, removing "
- "subscription",
- [SubscriptionId]),
- {Connection1, State1} =
- remove_subscription(SubscriptionId, Connection, State),
-
- Code = ?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST,
- Frame =
- rabbit_stream_core:frame({response, 1,
- {credit, Code,
- SubscriptionId}}),
- send(Transport, S, Frame),
- rabbit_global_counters:increase_protocol_counter(stream,
- ?SUBSCRIPTION_ID_DOES_NOT_EXIST,
- 1),
- {Connection1, State1};
+ rabbit_log:warning("Stream protocol connection has been closed by peer", []),
+ throw({stop, connection_closed});
{{segment, Segment1}, {credit, Credit1}} ->
Consumer1 =
Consumer#consumer{segment = Segment1, credit = Credit1},