diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-10 09:50:06 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-10 10:34:44 -0700 |
commit | ed42d7899117e4bba8ef47afe825c13185cbdfc7 (patch) | |
tree | 505d066dd0e03fa22d5944cf3617b4bac2998ee0 /kafka/consumer/simple.py | |
parent | 680a8dc3376badccccf0aab27a2307adc0b4cb0d (diff) | |
download | kafka-python-ed42d7899117e4bba8ef47afe825c13185cbdfc7.tar.gz |
Change SimpleConsumer.reset_partition_offset to return offset / None on failure (dont raise exception)
Diffstat (limited to 'kafka/consumer/simple.py')
-rw-r--r-- | kafka/consumer/simple.py | 24 |
1 files changed, 19 insertions, 5 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index e4233ff..c75e78b 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -27,7 +27,7 @@ from .base import ( NO_MESSAGES_WAIT_TIME_SECONDS ) from ..common import ( - FetchRequest, OffsetRequest, + FetchRequest, KafkaError, OffsetRequest, ConsumerFetchSizeTooSmall, ConsumerNoMoreData, UnknownTopicOrPartitionError, NotLeaderForPartitionError, OffsetOutOfRangeError, FailedPayloadsError, check_error @@ -144,6 +144,13 @@ class SimpleConsumer(Consumer): (self.group, self.topic, str(self.offsets.keys())) def reset_partition_offset(self, partition): + """Update offsets using auto_offset_reset policy (smallest|largest) + + Arguments: + partition (int): the partition for which offsets should be updated + + Returns: Updated offset on success, None on failure + """ LATEST = -1 EARLIEST = -2 if self.auto_offset_reset == 'largest': @@ -163,10 +170,17 @@ class SimpleConsumer(Consumer): raise # send_offset_request - (resp, ) = self.client.send_offset_request(reqs) - check_error(resp) - self.offsets[partition] = resp.offsets[0] - self.fetch_offsets[partition] = resp.offsets[0] + log.info('Resetting topic-partition offset to %s for %s:%d', + self.auto_offset_reset, self.topic, partition) + try: + (resp, ) = self.client.send_offset_request(reqs) + except KafkaError as e: + log.error('%s sending offset request for %s:%d', + e.__class__.__name__, self.topic, partition) + else: + self.offsets[partition] = resp.offsets[0] + self.fetch_offsets[partition] = resp.offsets[0] + return resp.offsets[0] def provide_partition_info(self): """ |