diff options
author | Enrico Canzonieri <ecanzonieri@gmail.com> | 2015-01-06 17:33:21 +0100 |
---|---|---|
committer | Enrico Canzonieri <enrico@yelp.com> | 2015-01-14 15:13:33 -0800 |
commit | 878e4cefb696547d76f2eb0ad5b7afd2b1cfe044 (patch) | |
tree | 7ad6a258d4dcf0d59bcc532cb09420416087bd99 | |
parent | 02c2b469003e2ddcb051dbb4d95977137050c19f (diff) | |
download | kafka-python-878e4cefb696547d76f2eb0ad5b7afd2b1cfe044.tar.gz |
Implement offsets reset when OffsetOutOfRangeError
This slightly changes the SimpleConsumer interface adding the default
option use_latest_offsets. The fetch behaviour is also changed since it
does not raise OffsetOutOfRangeError anymore. Resetting the offsets
automatically is especially useful in MultiprocessConsumer, where an
explicit seek call is not possible.
-rw-r--r-- | kafka/consumer/simple.py | 38 |
1 files changed, 35 insertions, 3 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index df975f4..5cd15b5 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -16,7 +16,9 @@ except ImportError: # python 2 from kafka.common import ( FetchRequest, OffsetRequest, - ConsumerFetchSizeTooSmall, ConsumerNoMoreData + ConsumerFetchSizeTooSmall, ConsumerNoMoreData, + UnknownTopicOrPartitionError, NotLeaderForPartitionError, + OffsetOutOfRangeError, check_error ) from .base import ( Consumer, @@ -98,7 +100,8 @@ class SimpleConsumer(Consumer): fetch_size_bytes=FETCH_MIN_BYTES, buffer_size=FETCH_BUFFER_SIZE_BYTES, max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, - iter_timeout=None): + iter_timeout=None, + use_latest_offsets=True): super(SimpleConsumer, self).__init__( client, group, topic, partitions=partitions, @@ -117,12 +120,26 @@ class SimpleConsumer(Consumer): self.fetch_min_bytes = fetch_size_bytes self.fetch_offsets = self.offsets.copy() self.iter_timeout = iter_timeout + self.use_latest_offsets = use_latest_offsets self.queue = Queue() def __repr__(self): return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \ (self.group, self.topic, str(self.offsets.keys())) + def reset_partition_offset(self, partition): + LATEST = -1 + EARLIEST = -2 + if self.use_latest_offsets: + req = OffsetRequest(self.topic, partition, LATEST, 1) + else: + req = OffsetRequest(self.topic, partition, EARLIEST, 1) + + resp = self.client.send_offset_request(req) + check_error(resp) + self.offsets[partition] = resp.offsets[0] + self.fetch_offsets[partition] = resp.offsets[0] + def provide_partition_info(self): """ Indicates that partition info must be returned by the consumer @@ -286,10 +303,25 @@ class SimpleConsumer(Consumer): responses = self.client.send_fetch_request( requests, max_wait_time=int(self.fetch_max_wait_time), - min_bytes=self.fetch_min_bytes) + min_bytes=self.fetch_min_bytes, + fail_on_error=False + ) retry_partitions = {} for resp in responses: + + try: + check_error(resp) + except (UnknownTopicOrPartitionError, NotLeaderForPartitionError): + self.client.reset_topic_metadata(resp.topic) + raise + except OffsetOutOfRangeError: + log.warning("OffsetOutOfRangeError for %s - %d. " + "Resetting partition offset...", + resp.topic, resp.partition) + self.reset_partition_offset(resp.partition) + continue + partition = resp.partition buffer_size = partitions[partition] try: |