diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-02-18 22:46:04 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-02-18 22:46:04 -0800 |
commit | 99bc503b1a549bd0877706ec8f04f7cb35445cda (patch) | |
tree | 2d9ce878609426151be67b99c78213dfc556dfcb | |
parent | 642b640404ce034161a1c958fd8e44eece2cec07 (diff) | |
download | kafka-python-99bc503b1a549bd0877706ec8f04f7cb35445cda.tar.gz |
Catch duplicate batch.done() calls -- this can happen if we maybe_expire then process a response errback
-rw-r--r-- | kafka/producer/record_accumulator.py | 5 |
1 files changed, 4 insertions, 1 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index c404e9e..24cf8af 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -68,7 +68,10 @@ class RecordBatch(object): log.debug("Produced messages to topic-partition %s with base offset" " %s and error %s.", self.topic_partition, base_offset, exception) # trace - if exception is None: + if self.produce_future.is_done: + log.warning('Batch is already closed -- ignoring batch.done()') + return + elif exception is None: self.produce_future.success(base_offset) else: self.produce_future.failure(exception) |