summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-02-18 22:46:04 -0800
committerDana Powers <dana.powers@gmail.com>2016-02-18 22:46:04 -0800
commit99bc503b1a549bd0877706ec8f04f7cb35445cda (patch)
tree2d9ce878609426151be67b99c78213dfc556dfcb
parent642b640404ce034161a1c958fd8e44eece2cec07 (diff)
downloadkafka-python-99bc503b1a549bd0877706ec8f04f7cb35445cda.tar.gz
Catch duplicate batch.done() calls -- this can happen if we maybe_expire then process a response errback
-rw-r--r--kafka/producer/record_accumulator.py5
1 files changed, 4 insertions, 1 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index c404e9e..24cf8af 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -68,7 +68,10 @@ class RecordBatch(object):
log.debug("Produced messages to topic-partition %s with base offset"
" %s and error %s.", self.topic_partition, base_offset,
exception) # trace
- if exception is None:
+ if self.produce_future.is_done:
+ log.warning('Batch is already closed -- ignoring batch.done()')
+ return
+ elif exception is None:
self.produce_future.success(base_offset)
else:
self.produce_future.failure(exception)