summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test/test_consumer_integration.py35
1 files changed, 35 insertions, 0 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index e5e2148..70d5109 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -127,6 +127,23 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
with self.assertRaises(OffsetOutOfRangeError):
consumer.get_message()
+ @kafka_versions('all')
+ def test_simple_consumer_load_initial_offsets(self):
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ # Create 1st consumer and change offsets
+ consumer = self.consumer()
+ self.assertEqual(consumer.offsets, {0: 0, 1: 0})
+ consumer.offsets.update({0:51, 1:101})
+ # Update counter after manual offsets update
+ consumer.count_since_commit += 1
+ consumer.commit()
+
+ # Create 2nd consumer and check initial offsets
+ consumer = self.consumer(auto_commit=False)
+ self.assertEqual(consumer.offsets, {0: 51, 1: 101})
+
@kafka_versions("all")
def test_simple_consumer__seek(self):
self.send_messages(0, range(0, 100))
@@ -253,6 +270,24 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
@kafka_versions("all")
+ def test_multi_process_consumer_load_initial_offsets(self):
+ self.send_messages(0, range(0, 10))
+ self.send_messages(1, range(10, 20))
+
+ # Create 1st consumer and change offsets
+ consumer = self.consumer()
+ self.assertEqual(consumer.offsets, {0: 0, 1: 0})
+ consumer.offsets.update({0:5, 1:15})
+ # Update counter after manual offsets update
+ consumer.count_since_commit += 1
+ consumer.commit()
+
+ # Create 2nd consumer and check initial offsets
+ consumer = self.consumer(consumer = MultiProcessConsumer,
+ auto_commit=False)
+ self.assertEqual(consumer.offsets, {0: 5, 1: 15})
+
+ @kafka_versions("all")
def test_large_messages(self):
# Produce 10 "normal" size messages
small_messages = self.send_messages(0, [ str(x) for x in range(10) ])