diff options
-rw-r--r-- | kafka/consumer/simple.py | 24 |
1 files changed, 19 insertions, 5 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index e4233ff..c75e78b 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -27,7 +27,7 @@ from .base import ( NO_MESSAGES_WAIT_TIME_SECONDS ) from ..common import ( - FetchRequest, OffsetRequest, + FetchRequest, KafkaError, OffsetRequest, ConsumerFetchSizeTooSmall, ConsumerNoMoreData, UnknownTopicOrPartitionError, NotLeaderForPartitionError, OffsetOutOfRangeError, FailedPayloadsError, check_error @@ -144,6 +144,13 @@ class SimpleConsumer(Consumer): (self.group, self.topic, str(self.offsets.keys())) def reset_partition_offset(self, partition): + """Update offsets using auto_offset_reset policy (smallest|largest) + + Arguments: + partition (int): the partition for which offsets should be updated + + Returns: Updated offset on success, None on failure + """ LATEST = -1 EARLIEST = -2 if self.auto_offset_reset == 'largest': @@ -163,10 +170,17 @@ class SimpleConsumer(Consumer): raise # send_offset_request - (resp, ) = self.client.send_offset_request(reqs) - check_error(resp) - self.offsets[partition] = resp.offsets[0] - self.fetch_offsets[partition] = resp.offsets[0] + log.info('Resetting topic-partition offset to %s for %s:%d', + self.auto_offset_reset, self.topic, partition) + try: + (resp, ) = self.client.send_offset_request(reqs) + except KafkaError as e: + log.error('%s sending offset request for %s:%d', + e.__class__.__name__, self.topic, partition) + else: + self.offsets[partition] = resp.offsets[0] + self.fetch_offsets[partition] = resp.offsets[0] + return resp.offsets[0] def provide_partition_info(self): """ |