summaryrefslogtreecommitdiff
path: root/kafka/consumer/simple.py
diff options
context:
space:
mode:
authorEnrico Canzonieri <ecanzonieri@gmail.com>2015-01-26 14:40:49 -0800
committerEnrico Canzonieri <enrico@yelp.com>2015-01-26 14:53:23 -0800
commitf517ddf283a86947a15f95e5ec562e81f4c477e5 (patch)
treeeb3b086ce03056fb03e216f184c887cd0f58119f /kafka/consumer/simple.py
parent6bc2c7aadac37c6e38c8a3c7be66013e9080aed7 (diff)
downloadkafka-python-f517ddf283a86947a15f95e5ec562e81f4c477e5.tar.gz
Make SimpleConsumer auto_offset_reset more like KafkaConsumer
Diffstat (limited to 'kafka/consumer/simple.py')
-rw-r--r--kafka/consumer/simple.py23
1 files changed, 19 insertions, 4 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index 39103bd..0593b5b 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -8,6 +8,7 @@ import logging
import time
import six
+import sys
try:
from Queue import Empty, Queue
@@ -87,6 +88,9 @@ class SimpleConsumer(Consumer):
iter_timeout: default None. How much time (in seconds) to wait for a
message in the iterator before exiting. None means no
timeout, so it will wait forever.
+ auto_offset_reset: default largest. Reset partition offsets upon
+ OffsetOutOfRangeError. Valid values are largest and smallest.
+ If None do not reset the offsets and raise OffsetOutOfRangeError.
Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will
@@ -101,7 +105,7 @@ class SimpleConsumer(Consumer):
buffer_size=FETCH_BUFFER_SIZE_BYTES,
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
iter_timeout=None,
- use_latest_offsets=True):
+ auto_offset_reset='largest'):
super(SimpleConsumer, self).__init__(
client, group, topic,
partitions=partitions,
@@ -120,7 +124,7 @@ class SimpleConsumer(Consumer):
self.fetch_min_bytes = fetch_size_bytes
self.fetch_offsets = self.offsets.copy()
self.iter_timeout = iter_timeout
- self.use_latest_offsets = use_latest_offsets
+ self.auto_offset_reset = auto_offset_reset
self.queue = Queue()
def __repr__(self):
@@ -130,10 +134,21 @@ class SimpleConsumer(Consumer):
def reset_partition_offset(self, partition):
LATEST = -1
EARLIEST = -2
- if self.use_latest_offsets:
+ if self.auto_offset_reset == 'largest':
reqs = [OffsetRequest(self.topic, partition, LATEST, 1)]
- else:
+ elif self.auto_offset_reset == 'smallest':
reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)]
+ else:
+ # Let's raise an reasonable exception type if user calls
+ # outside of an exception context
+ if sys.exc_info() == (None, None, None):
+ raise OffsetOutOfRangeError('Cannot reset partition offsets without a '
+ 'valid auto_offset_reset setting '
+ '(largest|smallest)')
+ # Otherwise we should re-raise the upstream exception
+ # b/c it typically includes additional data about
+ # the request that triggered it, and we do not want to drop that
+ raise
# send_offset_request
(resp, ) = self.client.send_offset_request(reqs)