diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-04-12 22:44:00 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-04-12 22:44:00 -0700 |
commit | a8c93398af2ef3e1c112c34f3bf7e5d2e15e3ee1 (patch) | |
tree | ec02ab0f5728256ac98837f162e3cd2a6fd440fc | |
parent | 120410bcb41568dd3a754931aa8f3c55c8641aa9 (diff) | |
download | kafka-python-batch_expiration_message.tar.gz |
Improve error message when expiring batches in KafkaProducerbatch_expiration_message
-rw-r--r-- | kafka/producer/record_accumulator.py | 18 |
1 files changed, 11 insertions, 7 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 965ddbe..fd081aa 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -101,15 +101,19 @@ class RecordBatch(object): since_backoff = now - (self.last_attempt + retry_backoff_ms / 1000.0) timeout = request_timeout_ms / 1000.0 - if ((not self.in_retry() and is_full and timeout < since_append) or - (not self.in_retry() and timeout < since_ready) or - (self.in_retry() and timeout < since_backoff)): - + error = None + if not self.in_retry() and is_full and timeout < since_append: + error = "%d ms has passed since last append" % since_append + elif not self.in_retry() and timeout < since_ready: + error = "%d ms has passed since batch creation plus linger time" % since_ready + elif self.in_retry() and timeout < since_backoff: + error = "%d ms has passed since last attempt plus backoff time" % since_backoff + + if error: self.records.close() self.done(-1, None, Errors.KafkaTimeoutError( - "Batch containing %s record(s) expired due to timeout while" - " requesting metadata from brokers for %s", self.record_count, - self.topic_partition)) + "Batch for %s containing %s record(s) expired: %s" % ( + self.topic_partition, self.record_count, error))) return True return False |