summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test/test_consumer_integration.py16
1 files changed, 13 insertions, 3 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 17c5844..4cebed8 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -139,7 +139,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.send_messages(1, range(100, 200))
# Create 1st consumer and change offsets
- consumer = self.consumer()
+ consumer = self.consumer(group='test_simple_consumer_load_initial_offsets')
self.assertEqual(consumer.offsets, {0: 0, 1: 0})
consumer.offsets.update({0:51, 1:101})
# Update counter after manual offsets update
@@ -147,7 +147,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.commit()
# Create 2nd consumer and check initial offsets
- consumer = self.consumer(auto_commit=False)
+ consumer = self.consumer(group='test_simple_consumer_load_initial_offsets',
+ auto_commit=False)
self.assertEqual(consumer.offsets, {0: 51, 1: 101})
@kafka_versions("all")
@@ -315,7 +316,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.send_messages(1, range(10, 20))
# Create 1st consumer and change offsets
- consumer = self.consumer()
+ consumer = self.consumer(group='test_multi_process_consumer_load_initial_offsets')
self.assertEqual(consumer.offsets, {0: 0, 1: 0})
consumer.offsets.update({0:5, 1:15})
# Update counter after manual offsets update
@@ -324,6 +325,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Create 2nd consumer and check initial offsets
consumer = self.consumer(consumer = MultiProcessConsumer,
+ group='test_multi_process_consumer_load_initial_offsets',
auto_commit=False)
self.assertEqual(consumer.offsets, {0: 5, 1: 15})
@@ -382,6 +384,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Start a consumer
consumer1 = self.consumer(
+ group='test_offset_behavior__resuming_behavior',
+ auto_commit=True,
auto_commit_every_t = None,
auto_commit_every_n = 20,
)
@@ -392,6 +396,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# The total offset across both partitions should be at 180
consumer2 = self.consumer(
+ group='test_offset_behavior__resuming_behavior',
+ auto_commit=True,
auto_commit_every_t = None,
auto_commit_every_n = 20,
)
@@ -410,6 +416,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Start a consumer
consumer1 = self.consumer(
consumer=MultiProcessConsumer,
+ group='test_multi_process_offset_behavior__resuming_behavior',
+ auto_commit=True,
auto_commit_every_t = None,
auto_commit_every_n = 20,
)
@@ -427,6 +435,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# The total offset across both partitions should be at 180
consumer2 = self.consumer(
consumer=MultiProcessConsumer,
+ group='test_multi_process_offset_behavior__resuming_behavior',
+ auto_commit=True,
auto_commit_every_t = None,
auto_commit_every_n = 20,
)