diff options
author | Enrico Canzonieri <ecanzonieri@gmail.com> | 2015-01-26 14:40:49 -0800 |
---|---|---|
committer | Enrico Canzonieri <enrico@yelp.com> | 2015-01-26 14:53:23 -0800 |
commit | f517ddf283a86947a15f95e5ec562e81f4c477e5 (patch) | |
tree | eb3b086ce03056fb03e216f184c887cd0f58119f /kafka/consumer/simple.py | |
parent | 6bc2c7aadac37c6e38c8a3c7be66013e9080aed7 (diff) | |
download | kafka-python-f517ddf283a86947a15f95e5ec562e81f4c477e5.tar.gz |
Make SimpleConsumer auto_offset_reset more like KafkaConsumer
Diffstat (limited to 'kafka/consumer/simple.py')
-rw-r--r-- | kafka/consumer/simple.py | 23 |
1 files changed, 19 insertions, 4 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 39103bd..0593b5b 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -8,6 +8,7 @@ import logging import time import six +import sys try: from Queue import Empty, Queue @@ -87,6 +88,9 @@ class SimpleConsumer(Consumer): iter_timeout: default None. How much time (in seconds) to wait for a message in the iterator before exiting. None means no timeout, so it will wait forever. + auto_offset_reset: default largest. Reset partition offsets upon + OffsetOutOfRangeError. Valid values are largest and smallest. + If None do not reset the offsets and raise OffsetOutOfRangeError. Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will @@ -101,7 +105,7 @@ class SimpleConsumer(Consumer): buffer_size=FETCH_BUFFER_SIZE_BYTES, max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, iter_timeout=None, - use_latest_offsets=True): + auto_offset_reset='largest'): super(SimpleConsumer, self).__init__( client, group, topic, partitions=partitions, @@ -120,7 +124,7 @@ class SimpleConsumer(Consumer): self.fetch_min_bytes = fetch_size_bytes self.fetch_offsets = self.offsets.copy() self.iter_timeout = iter_timeout - self.use_latest_offsets = use_latest_offsets + self.auto_offset_reset = auto_offset_reset self.queue = Queue() def __repr__(self): @@ -130,10 +134,21 @@ class SimpleConsumer(Consumer): def reset_partition_offset(self, partition): LATEST = -1 EARLIEST = -2 - if self.use_latest_offsets: + if self.auto_offset_reset == 'largest': reqs = [OffsetRequest(self.topic, partition, LATEST, 1)] - else: + elif self.auto_offset_reset == 'smallest': reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)] + else: + # Let's raise an reasonable exception type if user calls + # outside of an exception context + if sys.exc_info() == (None, None, None): + raise OffsetOutOfRangeError('Cannot reset partition offsets without a ' + 'valid auto_offset_reset setting ' + '(largest|smallest)') + # Otherwise we should re-raise the upstream exception + # b/c it typically includes additional data about + # the request that triggered it, and we do not want to drop that + raise # send_offset_request (resp, ) = self.client.send_offset_request(reqs) |