diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-02-18 21:54:37 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-02-18 21:54:37 -0800 |
commit | 642b640404ce034161a1c958fd8e44eece2cec07 (patch) | |
tree | acc0e0a43b36961337c195def15535d2ca0ce7e6 | |
parent | f2d10f02d3f0bbecff2f9469dc477ccd6046ec59 (diff) | |
download | kafka-python-642b640404ce034161a1c958fd8e44eece2cec07.tar.gz |
Warn if pending batches failed during flush
-rw-r--r-- | kafka/producer/record_accumulator.py | 3 |
1 files changed, 3 insertions, 0 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 70f45f2..c404e9e 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -457,6 +457,9 @@ class RecordAccumulator(object): """ for batch in self._incomplete.all(): batch.produce_future.await() + assert batch.produce_future.is_done + if batch.produce_future.failed(): + log.warning(batch.produce_future.exception) self._flushes_in_progress.decrement() def abort_incomplete_batches(self): |