summaryrefslogtreecommitdiff
path: root/kafka/producer/sender.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/sender.py')
-rw-r--r--kafka/producer/sender.py9
1 files changed, 5 insertions, 4 deletions
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index bf7c163..9c36c9b 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -163,7 +163,7 @@ class Sender(threading.Thread):
def _failed_produce(self, batches, node_id, error):
log.debug("Error sending produce request to node %d: %s", node_id, error) # trace
for batch in batches:
- self._complete_batch(batch, error, -1)
+ self._complete_batch(batch, error, -1, None)
def _handle_produce_response(self, batches, response):
"""Handle a produce response."""
@@ -183,15 +183,16 @@ class Sender(threading.Thread):
else:
# this is the acks = 0 case, just complete all requests
for batch in batches:
- self._complete_batch(batch, None, -1)
+ self._complete_batch(batch, None, -1, None)
- def _complete_batch(self, batch, error, base_offset):
+ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None):
"""Complete or retry the given batch of records.
Arguments:
batch (RecordBatch): The record batch
error (Exception): The error (or None if none)
base_offset (int): The base offset assigned to the records if successful
+ timestamp_ms (int, optional): The timestamp returned by the broker for this batch
"""
# Standardize no-error to None
if error is Errors.NoError:
@@ -210,7 +211,7 @@ class Sender(threading.Thread):
error = error(batch.topic_partition.topic)
# tell the user the result of their request
- batch.done(base_offset, error)
+ batch.done(base_offset, timestamp_ms, error)
self._accumulator.deallocate(batch)
if getattr(error, 'invalid_metadata', False):