diff options
| author | Dana Powers <dana.powers@gmail.com> | 2016-02-18 21:54:37 -0800 |
|---|---|---|
| committer | Dana Powers <dana.powers@gmail.com> | 2016-02-18 21:54:37 -0800 |
| commit | 642b640404ce034161a1c958fd8e44eece2cec07 (patch) | |
| tree | acc0e0a43b36961337c195def15535d2ca0ce7e6 | |
| parent | f2d10f02d3f0bbecff2f9469dc477ccd6046ec59 (diff) | |
| download | kafka-python-642b640404ce034161a1c958fd8e44eece2cec07.tar.gz | |
Warn if pending batches failed during flush
| -rw-r--r-- | kafka/producer/record_accumulator.py | 3 |
1 files changed, 3 insertions, 0 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 70f45f2..c404e9e 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -457,6 +457,9 @@ class RecordAccumulator(object): """ for batch in self._incomplete.all(): batch.produce_future.await() + assert batch.produce_future.is_done + if batch.produce_future.failed(): + log.warning(batch.produce_future.exception) self._flushes_in_progress.decrement() def abort_incomplete_batches(self): |
