summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer/record_accumulator.py3
1 files changed, 3 insertions, 0 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 70f45f2..c404e9e 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -457,6 +457,9 @@ class RecordAccumulator(object):
"""
for batch in self._incomplete.all():
batch.produce_future.await()
+ assert batch.produce_future.is_done
+ if batch.produce_future.failed():
+ log.warning(batch.produce_future.exception)
self._flushes_in_progress.decrement()
def abort_incomplete_batches(self):