diff options
author | Dana Powers <dana.powers@gmail.com> | 2015-06-20 09:39:17 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2015-06-20 09:39:17 -0700 |
commit | adbd4ac052e4a5b40cfc2a3589b7adbcb656afe5 (patch) | |
tree | ee5dabf6b7383d1ebf94c682f2fa235bb3145a1e | |
parent | 4b064340e8796cb85307e014853ddb0435809f2f (diff) | |
parent | 2028a232098abeb89a8125e26abc4f4a379ef1b9 (diff) | |
download | kafka-python-adbd4ac052e4a5b40cfc2a3589b7adbcb656afe5.tar.gz |
Merge pull request #412 from haosdent/seek_absolute_offset
fix #410 SimpleConsumer cannot seek to an absolute offset.
-rw-r--r-- | kafka/consumer/simple.py | 63 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 14 |
2 files changed, 60 insertions, 17 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index b08255b..733baa8 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -188,33 +188,62 @@ class SimpleConsumer(Consumer): """ self.partition_info = True - def seek(self, offset, whence): + def seek(self, offset, whence=None, partition=None): """ Alter the current offset in the consumer, similar to fseek Arguments: offset: how much to modify the offset - whence: where to modify it from + whence: where to modify it from, default is None - * 0 is relative to the earliest available offset (head) - * 1 is relative to the current offset - * 2 is relative to the latest known offset (tail) + * None is an absolute offset + * 0 is relative to the earliest available offset (head) + * 1 is relative to the current offset + * 2 is relative to the latest known offset (tail) + + partition: modify which partition, default is None. + If partition is None, would modify all partitions. """ - if whence == 1: # relative to current position - for partition, _offset in self.offsets.items(): - self.offsets[partition] = _offset + offset + if whence is None: # set an absolute offset + if partition is None: + for tmp_partition in self.offsets: + self.offsets[tmp_partition] = offset + else: + self.offsets[partition] = offset + elif whence == 1: # relative to current position + if partition is None: + for tmp_partition, _offset in self.offsets.items(): + self.offsets[tmp_partition] = _offset + offset + else: + self.offsets[partition] += offset elif whence in (0, 2): # relative to beginning or end - # divide the request offset by number of partitions, - # distribute the remained evenly - (delta, rem) = divmod(offset, len(self.offsets)) - deltas = {} - for partition, r in izip_longest(self.offsets.keys(), - repeat(1, rem), fillvalue=0): - deltas[partition] = delta + r - reqs = [] - for partition in self.offsets.keys(): + deltas = {} + if partition is None: + # divide the request offset by number of partitions, + # distribute the remained evenly + (delta, rem) = divmod(offset, len(self.offsets)) + for tmp_partition, r in izip_longest(self.offsets.keys(), + repeat(1, rem), + fillvalue=0): + deltas[tmp_partition] = delta + r + + for tmp_partition in self.offsets.keys(): + if whence == 0: + reqs.append(OffsetRequest(self.topic, + tmp_partition, + -2, + 1)) + elif whence == 2: + reqs.append(OffsetRequest(self.topic, + tmp_partition, + -1, + 1)) + else: + pass + else: + deltas[partition] = offset if whence == 0: reqs.append(OffsetRequest(self.topic, partition, -2, 1)) elif whence == 2: diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index df2eeea..52b3e85 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -164,6 +164,20 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.seek(-13, 2) self.assert_message_count([ message for message in consumer ], 13) + # Set absolute offset + consumer.seek(100) + self.assert_message_count([ message for message in consumer ], 0) + consumer.seek(100, partition=0) + self.assert_message_count([ message for message in consumer ], 0) + consumer.seek(101, partition=1) + self.assert_message_count([ message for message in consumer ], 0) + consumer.seek(90, partition=0) + self.assert_message_count([ message for message in consumer ], 10) + consumer.seek(20, partition=1) + self.assert_message_count([ message for message in consumer ], 80) + consumer.seek(0, partition=1) + self.assert_message_count([ message for message in consumer ], 100) + consumer.stop() @kafka_versions("all") |