summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-02-18 21:54:37 -0800
committerDana Powers <dana.powers@gmail.com>2016-02-18 21:54:37 -0800
commit642b640404ce034161a1c958fd8e44eece2cec07 (patch)
treeacc0e0a43b36961337c195def15535d2ca0ce7e6
parentf2d10f02d3f0bbecff2f9469dc477ccd6046ec59 (diff)
downloadkafka-python-642b640404ce034161a1c958fd8e44eece2cec07.tar.gz
Warn if pending batches failed during flush
-rw-r--r--kafka/producer/record_accumulator.py3
1 files changed, 3 insertions, 0 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 70f45f2..c404e9e 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -457,6 +457,9 @@ class RecordAccumulator(object):
"""
for batch in self._incomplete.all():
batch.produce_future.await()
+ assert batch.produce_future.is_done
+ if batch.produce_future.failed():
+ log.warning(batch.produce_future.exception)
self._flushes_in_progress.decrement()
def abort_incomplete_batches(self):