diff options
-rw-r--r-- | test/test_consumer_integration.py | 16 |
1 files changed, 13 insertions, 3 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 17c5844..4cebed8 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -139,7 +139,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.send_messages(1, range(100, 200)) # Create 1st consumer and change offsets - consumer = self.consumer() + consumer = self.consumer(group='test_simple_consumer_load_initial_offsets') self.assertEqual(consumer.offsets, {0: 0, 1: 0}) consumer.offsets.update({0:51, 1:101}) # Update counter after manual offsets update @@ -147,7 +147,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.commit() # Create 2nd consumer and check initial offsets - consumer = self.consumer(auto_commit=False) + consumer = self.consumer(group='test_simple_consumer_load_initial_offsets', + auto_commit=False) self.assertEqual(consumer.offsets, {0: 51, 1: 101}) @kafka_versions("all") @@ -315,7 +316,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.send_messages(1, range(10, 20)) # Create 1st consumer and change offsets - consumer = self.consumer() + consumer = self.consumer(group='test_multi_process_consumer_load_initial_offsets') self.assertEqual(consumer.offsets, {0: 0, 1: 0}) consumer.offsets.update({0:5, 1:15}) # Update counter after manual offsets update @@ -324,6 +325,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # Create 2nd consumer and check initial offsets consumer = self.consumer(consumer = MultiProcessConsumer, + group='test_multi_process_consumer_load_initial_offsets', auto_commit=False) self.assertEqual(consumer.offsets, {0: 5, 1: 15}) @@ -382,6 +384,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # Start a consumer consumer1 = self.consumer( + group='test_offset_behavior__resuming_behavior', + auto_commit=True, auto_commit_every_t = None, auto_commit_every_n = 20, ) @@ -392,6 +396,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # The total offset across both partitions should be at 180 consumer2 = self.consumer( + group='test_offset_behavior__resuming_behavior', + auto_commit=True, auto_commit_every_t = None, auto_commit_every_n = 20, ) @@ -410,6 +416,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # Start a consumer consumer1 = self.consumer( consumer=MultiProcessConsumer, + group='test_multi_process_offset_behavior__resuming_behavior', + auto_commit=True, auto_commit_every_t = None, auto_commit_every_n = 20, ) @@ -427,6 +435,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # The total offset across both partitions should be at 180 consumer2 = self.consumer( consumer=MultiProcessConsumer, + group='test_multi_process_offset_behavior__resuming_behavior', + auto_commit=True, auto_commit_every_t = None, auto_commit_every_n = 20, ) |