diff options
-rw-r--r-- | kafka/producer/kafka.py | 4 | ||||
-rw-r--r-- | kafka/producer/record_accumulator.py | 7 |
2 files changed, 9 insertions, 2 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 3abadcc..57155e5 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -554,6 +554,10 @@ class KafkaProducer(object): Arguments: timeout (float, optional): timeout in seconds to wait for completion. + + Raises: + KafkaTimeoutError: failure to flush buffered records within the + provided timeout """ log.debug("Flushing accumulated records in producer.") # trace self._accumulator.begin_flush() diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index fd081aa..3e97fd7 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -526,8 +526,11 @@ class RecordAccumulator(object): for batch in self._incomplete.all(): log.debug('Waiting on produce to %s', batch.produce_future.topic_partition) - assert batch.produce_future.wait(timeout=timeout), 'Timeout waiting for future' - assert batch.produce_future.is_done, 'Future not done?' + if not batch.produce_future.wait(timeout=timeout): + raise Errors.KafkaTimeoutError('Timeout waiting for future') + if not batch.produce_future.is_done: + raise Errors.UnknownError('Future not done') + if batch.produce_future.failed(): log.warning(batch.produce_future.exception) finally: |