summaryrefslogtreecommitdiff
path: root/test/test_consumer_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r--test/test_consumer_integration.py45
1 files changed, 41 insertions, 4 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 2169145..eab93be 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -14,7 +14,9 @@ from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from kafka.errors import (
ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError
)
-from kafka.structs import ProduceRequestPayload, TopicPartition
+from kafka.structs import (
+ ProduceRequestPayload, TopicPartition, OffsetAndTimestamp
+)
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import (
@@ -637,9 +639,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
@kafka_versions('>=0.10.1')
def test_kafka_consumer_offsets_for_time(self):
- late_time = int(time.time())
- middle_time = late_time - 1
- early_time = late_time - 2
+ late_time = int(time.time()) * 1000
+ middle_time = late_time - 1000
+ early_time = late_time - 2000
tp = TopicPartition(self.topic, 0)
kafka_producer = self.kafka_producer()
@@ -652,6 +654,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer = self.kafka_consumer()
offsets = consumer.offsets_for_times({tp: early_time})
+ self.assertEqual(len(offsets), 1)
self.assertEqual(offsets[tp].offset, early_msg.offset)
self.assertEqual(offsets[tp].timestamp, early_time)
@@ -663,6 +666,40 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assertEqual(offsets[tp].offset, late_msg.offset)
self.assertEqual(offsets[tp].timestamp, late_time)
+ # Out of bound timestamps check
+
+ offsets = consumer.offsets_for_times({tp: 0})
+ self.assertEqual(offsets[tp].offset, early_msg.offset)
+ self.assertEqual(offsets[tp].timestamp, early_time)
+
+ offsets = consumer.offsets_for_times({tp: 9999999999999})
+ self.assertEqual(offsets[tp], None)
+
+ @kafka_versions('>=0.10.1')
+ def test_kafka_consumer_offsets_search_many_partitions(self):
+ tp0 = TopicPartition(self.topic, 0)
+ tp1 = TopicPartition(self.topic, 1)
+
+ kafka_producer = self.kafka_producer()
+ send_time = int(time.time() * 1000)
+ p0msg = kafka_producer.send(
+ self.topic, partition=0, value=b"XXX",
+ timestamp_ms=send_time).get()
+ p1msg = kafka_producer.send(
+ self.topic, partition=1, value=b"XXX",
+ timestamp_ms=send_time).get()
+
+ consumer = self.kafka_consumer()
+ offsets = consumer.offsets_for_times({
+ tp0: send_time,
+ tp1: send_time
+ })
+
+ self.assertEqual(offsets, {
+ tp0: OffsetAndTimestamp(p0msg.offset, send_time),
+ tp1: OffsetAndTimestamp(p1msg.offset, send_time)
+ })
+
@kafka_versions('<0.10.1')
def test_kafka_consumer_offsets_for_time_old(self):
consumer = self.kafka_consumer()