diff options
-rw-r--r-- | test/test_consumer_integration.py | 35 |
1 files changed, 35 insertions, 0 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index e5e2148..70d5109 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -127,6 +127,23 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): with self.assertRaises(OffsetOutOfRangeError): consumer.get_message() + @kafka_versions('all') + def test_simple_consumer_load_initial_offsets(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + # Create 1st consumer and change offsets + consumer = self.consumer() + self.assertEqual(consumer.offsets, {0: 0, 1: 0}) + consumer.offsets.update({0:51, 1:101}) + # Update counter after manual offsets update + consumer.count_since_commit += 1 + consumer.commit() + + # Create 2nd consumer and check initial offsets + consumer = self.consumer(auto_commit=False) + self.assertEqual(consumer.offsets, {0: 51, 1: 101}) + @kafka_versions("all") def test_simple_consumer__seek(self): self.send_messages(0, range(0, 100)) @@ -253,6 +270,24 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() @kafka_versions("all") + def test_multi_process_consumer_load_initial_offsets(self): + self.send_messages(0, range(0, 10)) + self.send_messages(1, range(10, 20)) + + # Create 1st consumer and change offsets + consumer = self.consumer() + self.assertEqual(consumer.offsets, {0: 0, 1: 0}) + consumer.offsets.update({0:5, 1:15}) + # Update counter after manual offsets update + consumer.count_since_commit += 1 + consumer.commit() + + # Create 2nd consumer and check initial offsets + consumer = self.consumer(consumer = MultiProcessConsumer, + auto_commit=False) + self.assertEqual(consumer.offsets, {0: 5, 1: 15}) + + @kafka_versions("all") def test_large_messages(self): # Produce 10 "normal" size messages small_messages = self.send_messages(0, [ str(x) for x in range(10) ]) |