diff options
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r-- | test/test_consumer_integration.py | 45 |
1 files changed, 41 insertions, 4 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 2169145..eab93be 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -14,7 +14,9 @@ from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES from kafka.errors import ( ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError ) -from kafka.structs import ProduceRequestPayload, TopicPartition +from kafka.structs import ( + ProduceRequestPayload, TopicPartition, OffsetAndTimestamp +) from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import ( @@ -637,9 +639,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): @kafka_versions('>=0.10.1') def test_kafka_consumer_offsets_for_time(self): - late_time = int(time.time()) - middle_time = late_time - 1 - early_time = late_time - 2 + late_time = int(time.time()) * 1000 + middle_time = late_time - 1000 + early_time = late_time - 2000 tp = TopicPartition(self.topic, 0) kafka_producer = self.kafka_producer() @@ -652,6 +654,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer = self.kafka_consumer() offsets = consumer.offsets_for_times({tp: early_time}) + self.assertEqual(len(offsets), 1) self.assertEqual(offsets[tp].offset, early_msg.offset) self.assertEqual(offsets[tp].timestamp, early_time) @@ -663,6 +666,40 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assertEqual(offsets[tp].offset, late_msg.offset) self.assertEqual(offsets[tp].timestamp, late_time) + # Out of bound timestamps check + + offsets = consumer.offsets_for_times({tp: 0}) + self.assertEqual(offsets[tp].offset, early_msg.offset) + self.assertEqual(offsets[tp].timestamp, early_time) + + offsets = consumer.offsets_for_times({tp: 9999999999999}) + self.assertEqual(offsets[tp], None) + + @kafka_versions('>=0.10.1') + def test_kafka_consumer_offsets_search_many_partitions(self): + tp0 = TopicPartition(self.topic, 0) + tp1 = TopicPartition(self.topic, 1) + + kafka_producer = self.kafka_producer() + send_time = int(time.time() * 1000) + p0msg = kafka_producer.send( + self.topic, partition=0, value=b"XXX", + timestamp_ms=send_time).get() + p1msg = kafka_producer.send( + self.topic, partition=1, value=b"XXX", + timestamp_ms=send_time).get() + + consumer = self.kafka_consumer() + offsets = consumer.offsets_for_times({ + tp0: send_time, + tp1: send_time + }) + + self.assertEqual(offsets, { + tp0: OffsetAndTimestamp(p0msg.offset, send_time), + tp1: OffsetAndTimestamp(p1msg.offset, send_time) + }) + @kafka_versions('<0.10.1') def test_kafka_consumer_offsets_for_time_old(self): consumer = self.kafka_consumer() |