diff options
Diffstat (limited to 'kafka/producer/sender.py')
-rw-r--r-- | kafka/producer/sender.py | 9 |
1 files changed, 5 insertions, 4 deletions
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index bf7c163..9c36c9b 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -163,7 +163,7 @@ class Sender(threading.Thread): def _failed_produce(self, batches, node_id, error): log.debug("Error sending produce request to node %d: %s", node_id, error) # trace for batch in batches: - self._complete_batch(batch, error, -1) + self._complete_batch(batch, error, -1, None) def _handle_produce_response(self, batches, response): """Handle a produce response.""" @@ -183,15 +183,16 @@ class Sender(threading.Thread): else: # this is the acks = 0 case, just complete all requests for batch in batches: - self._complete_batch(batch, None, -1) + self._complete_batch(batch, None, -1, None) - def _complete_batch(self, batch, error, base_offset): + def _complete_batch(self, batch, error, base_offset, timestamp_ms=None): """Complete or retry the given batch of records. Arguments: batch (RecordBatch): The record batch error (Exception): The error (or None if none) base_offset (int): The base offset assigned to the records if successful + timestamp_ms (int, optional): The timestamp returned by the broker for this batch """ # Standardize no-error to None if error is Errors.NoError: @@ -210,7 +211,7 @@ class Sender(threading.Thread): error = error(batch.topic_partition.topic) # tell the user the result of their request - batch.done(base_offset, error) + batch.done(base_offset, timestamp_ms, error) self._accumulator.deallocate(batch) if getattr(error, 'invalid_metadata', False): |