summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/simple.py63
-rw-r--r--test/test_consumer_integration.py14
2 files changed, 60 insertions, 17 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index b08255b..733baa8 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -188,33 +188,62 @@ class SimpleConsumer(Consumer):
"""
self.partition_info = True
- def seek(self, offset, whence):
+ def seek(self, offset, whence=None, partition=None):
"""
Alter the current offset in the consumer, similar to fseek
Arguments:
offset: how much to modify the offset
- whence: where to modify it from
+ whence: where to modify it from, default is None
- * 0 is relative to the earliest available offset (head)
- * 1 is relative to the current offset
- * 2 is relative to the latest known offset (tail)
+ * None is an absolute offset
+ * 0 is relative to the earliest available offset (head)
+ * 1 is relative to the current offset
+ * 2 is relative to the latest known offset (tail)
+
+ partition: modify which partition, default is None.
+ If partition is None, would modify all partitions.
"""
- if whence == 1: # relative to current position
- for partition, _offset in self.offsets.items():
- self.offsets[partition] = _offset + offset
+ if whence is None: # set an absolute offset
+ if partition is None:
+ for tmp_partition in self.offsets:
+ self.offsets[tmp_partition] = offset
+ else:
+ self.offsets[partition] = offset
+ elif whence == 1: # relative to current position
+ if partition is None:
+ for tmp_partition, _offset in self.offsets.items():
+ self.offsets[tmp_partition] = _offset + offset
+ else:
+ self.offsets[partition] += offset
elif whence in (0, 2): # relative to beginning or end
- # divide the request offset by number of partitions,
- # distribute the remained evenly
- (delta, rem) = divmod(offset, len(self.offsets))
- deltas = {}
- for partition, r in izip_longest(self.offsets.keys(),
- repeat(1, rem), fillvalue=0):
- deltas[partition] = delta + r
-
reqs = []
- for partition in self.offsets.keys():
+ deltas = {}
+ if partition is None:
+ # divide the request offset by number of partitions,
+ # distribute the remained evenly
+ (delta, rem) = divmod(offset, len(self.offsets))
+ for tmp_partition, r in izip_longest(self.offsets.keys(),
+ repeat(1, rem),
+ fillvalue=0):
+ deltas[tmp_partition] = delta + r
+
+ for tmp_partition in self.offsets.keys():
+ if whence == 0:
+ reqs.append(OffsetRequest(self.topic,
+ tmp_partition,
+ -2,
+ 1))
+ elif whence == 2:
+ reqs.append(OffsetRequest(self.topic,
+ tmp_partition,
+ -1,
+ 1))
+ else:
+ pass
+ else:
+ deltas[partition] = offset
if whence == 0:
reqs.append(OffsetRequest(self.topic, partition, -2, 1))
elif whence == 2:
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index df2eeea..52b3e85 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -164,6 +164,20 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.seek(-13, 2)
self.assert_message_count([ message for message in consumer ], 13)
+ # Set absolute offset
+ consumer.seek(100)
+ self.assert_message_count([ message for message in consumer ], 0)
+ consumer.seek(100, partition=0)
+ self.assert_message_count([ message for message in consumer ], 0)
+ consumer.seek(101, partition=1)
+ self.assert_message_count([ message for message in consumer ], 0)
+ consumer.seek(90, partition=0)
+ self.assert_message_count([ message for message in consumer ], 10)
+ consumer.seek(20, partition=1)
+ self.assert_message_count([ message for message in consumer ], 80)
+ consumer.seek(0, partition=1)
+ self.assert_message_count([ message for message in consumer ], 100)
+
consumer.stop()
@kafka_versions("all")