summaryrefslogtreecommitdiff
path: root/test/test_consumer_integration.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-03-02 17:23:03 -0800
committerDana Powers <dana.powers@gmail.com>2015-03-02 17:23:03 -0800
commit2ca10e26b6fe00943db6984d30d5d144da76629e (patch)
tree66dbce0375275da26b4fc011474a9c88666a5cc7 /test/test_consumer_integration.py
parent9ad0be662d388b47aadf04d712f5744add6456e3 (diff)
parent37d0b7f9dbac6c5165e6ea171a97be19c53c27f5 (diff)
downloadkafka-python-2ca10e26b6fe00943db6984d30d5d144da76629e.tar.gz
Merge pull request #296 from ecanzonieri/validate_consumer_offset
Validate consumer offset in SimpleConsumer
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r--test/test_consumer_integration.py44
1 files changed, 43 insertions, 1 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 4723220..9c89190 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -5,7 +5,7 @@ from six.moves import xrange
from kafka import SimpleConsumer, MultiProcessConsumer, KafkaConsumer, create_message
from kafka.common import (
- ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout
+ ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout, OffsetOutOfRangeError
)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
@@ -85,6 +85,48 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ @kafka_versions('all')
+ def test_simple_consumer_smallest_offset_reset(self):
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ consumer = self.consumer(auto_offset_reset='smallest')
+ # Move fetch offset ahead of 300 message (out of range)
+ consumer.seek(300, 2)
+ # Since auto_offset_reset is set to smallest we should read all 200
+ # messages from beginning.
+ self.assert_message_count([message for message in consumer], 200)
+
+ @kafka_versions('all')
+ def test_simple_consumer_largest_offset_reset(self):
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ # Default largest
+ consumer = self.consumer()
+ # Move fetch offset ahead of 300 message (out of range)
+ consumer.seek(300, 2)
+ # Since auto_offset_reset is set to largest we should not read any
+ # messages.
+ self.assert_message_count([message for message in consumer], 0)
+ # Send 200 new messages to the queue
+ self.send_messages(0, range(200, 300))
+ self.send_messages(1, range(300, 400))
+ # Since the offset is set to largest we should read all the new messages.
+ self.assert_message_count([message for message in consumer], 200)
+
+ @kafka_versions('all')
+ def test_simple_consumer_no_reset(self):
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ # Default largest
+ consumer = self.consumer(auto_offset_reset=None)
+ # Move fetch offset ahead of 300 message (out of range)
+ consumer.seek(300, 2)
+ with self.assertRaises(OffsetOutOfRangeError):
+ consumer.get_message()
+
@kafka_versions("all")
def test_simple_consumer__seek(self):
self.send_messages(0, range(0, 100))