summaryrefslogtreecommitdiff
path: root/kafka/consumer/simple.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/simple.py')
-rw-r--r--kafka/consumer/simple.py24
1 files changed, 19 insertions, 5 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index e4233ff..c75e78b 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -27,7 +27,7 @@ from .base import (
NO_MESSAGES_WAIT_TIME_SECONDS
)
from ..common import (
- FetchRequest, OffsetRequest,
+ FetchRequest, KafkaError, OffsetRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
OffsetOutOfRangeError, FailedPayloadsError, check_error
@@ -144,6 +144,13 @@ class SimpleConsumer(Consumer):
(self.group, self.topic, str(self.offsets.keys()))
def reset_partition_offset(self, partition):
+ """Update offsets using auto_offset_reset policy (smallest|largest)
+
+ Arguments:
+ partition (int): the partition for which offsets should be updated
+
+ Returns: Updated offset on success, None on failure
+ """
LATEST = -1
EARLIEST = -2
if self.auto_offset_reset == 'largest':
@@ -163,10 +170,17 @@ class SimpleConsumer(Consumer):
raise
# send_offset_request
- (resp, ) = self.client.send_offset_request(reqs)
- check_error(resp)
- self.offsets[partition] = resp.offsets[0]
- self.fetch_offsets[partition] = resp.offsets[0]
+ log.info('Resetting topic-partition offset to %s for %s:%d',
+ self.auto_offset_reset, self.topic, partition)
+ try:
+ (resp, ) = self.client.send_offset_request(reqs)
+ except KafkaError as e:
+ log.error('%s sending offset request for %s:%d',
+ e.__class__.__name__, self.topic, partition)
+ else:
+ self.offsets[partition] = resp.offsets[0]
+ self.fetch_offsets[partition] = resp.offsets[0]
+ return resp.offsets[0]
def provide_partition_info(self):
"""