summaryrefslogtreecommitdiff
path: root/kafka/consumer/simple.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-10 09:50:06 -0700
committerDana Powers <dana.powers@rd.io>2015-06-10 10:34:44 -0700
commited42d7899117e4bba8ef47afe825c13185cbdfc7 (patch)
tree505d066dd0e03fa22d5944cf3617b4bac2998ee0 /kafka/consumer/simple.py
parent680a8dc3376badccccf0aab27a2307adc0b4cb0d (diff)
downloadkafka-python-ed42d7899117e4bba8ef47afe825c13185cbdfc7.tar.gz
Change SimpleConsumer.reset_partition_offset to return offset / None on failure (dont raise exception)
Diffstat (limited to 'kafka/consumer/simple.py')
-rw-r--r--kafka/consumer/simple.py24
1 files changed, 19 insertions, 5 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index e4233ff..c75e78b 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -27,7 +27,7 @@ from .base import (
NO_MESSAGES_WAIT_TIME_SECONDS
)
from ..common import (
- FetchRequest, OffsetRequest,
+ FetchRequest, KafkaError, OffsetRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
OffsetOutOfRangeError, FailedPayloadsError, check_error
@@ -144,6 +144,13 @@ class SimpleConsumer(Consumer):
(self.group, self.topic, str(self.offsets.keys()))
def reset_partition_offset(self, partition):
+ """Update offsets using auto_offset_reset policy (smallest|largest)
+
+ Arguments:
+ partition (int): the partition for which offsets should be updated
+
+ Returns: Updated offset on success, None on failure
+ """
LATEST = -1
EARLIEST = -2
if self.auto_offset_reset == 'largest':
@@ -163,10 +170,17 @@ class SimpleConsumer(Consumer):
raise
# send_offset_request
- (resp, ) = self.client.send_offset_request(reqs)
- check_error(resp)
- self.offsets[partition] = resp.offsets[0]
- self.fetch_offsets[partition] = resp.offsets[0]
+ log.info('Resetting topic-partition offset to %s for %s:%d',
+ self.auto_offset_reset, self.topic, partition)
+ try:
+ (resp, ) = self.client.send_offset_request(reqs)
+ except KafkaError as e:
+ log.error('%s sending offset request for %s:%d',
+ e.__class__.__name__, self.topic, partition)
+ else:
+ self.offsets[partition] = resp.offsets[0]
+ self.fetch_offsets[partition] = resp.offsets[0]
+ return resp.offsets[0]
def provide_partition_info(self):
"""