summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-03-12 23:50:52 -0800
committerDana Powers <dana.powers@gmail.com>2016-03-13 09:35:49 -0700
commitf6af0fac6b116a21ad3514991cade86f0e6c8086 (patch)
treec80653d23fbf178cb47237b132d54deba71534bd
parenteb4b2b33a5b05cf8405c3a88d435fc3e9747c767 (diff)
downloadkafka-python-f6af0fac6b116a21ad3514991cade86f0e6c8086.tar.gz
Add optional timeout parameter to KafkaProducer.flush()
-rw-r--r--kafka/producer/kafka.py4
-rw-r--r--kafka/producer/record_accumulator.py18
2 files changed, 13 insertions, 9 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 37cd9b6..0286f8b 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -391,7 +391,7 @@ class KafkaProducer(object):
FutureProduceResult(TopicPartition(topic, partition)),
-1).failure(e)
- def flush(self):
+ def flush(self, timeout=None):
"""
Invoking this method makes all buffered records immediately available
to send (even if linger_ms is greater than 0) and blocks on the
@@ -408,7 +408,7 @@ class KafkaProducer(object):
log.debug("Flushing accumulated records in producer.") # trace
self._accumulator.begin_flush()
self._sender.wakeup()
- self._accumulator.await_flush_completion()
+ self._accumulator.await_flush_completion(timeout=timeout)
def _ensure_valid_record_size(self, size):
"""Validate that the record size isn't too large."""
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 24cf8af..958d207 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -454,16 +454,20 @@ class RecordAccumulator(object):
"""
self._flushes_in_progress.increment()
- def await_flush_completion(self):
+ def await_flush_completion(self, timeout=None):
"""
Mark all partitions as ready to send and block until the send is complete
"""
- for batch in self._incomplete.all():
- batch.produce_future.await()
- assert batch.produce_future.is_done
- if batch.produce_future.failed():
- log.warning(batch.produce_future.exception)
- self._flushes_in_progress.decrement()
+ try:
+ for batch in self._incomplete.all():
+ log.debug('Waiting on produce to %s',
+ batch.produce_future.topic_partition)
+ assert batch.produce_future.await(timeout=timeout), 'Timeout waiting for future'
+ assert batch.produce_future.is_done, 'Future not done?'
+ if batch.produce_future.failed():
+ log.warning(batch.produce_future.exception)
+ finally:
+ self._flushes_in_progress.decrement()
def abort_incomplete_batches(self):
"""