summaryrefslogtreecommitdiff
path: root/test/test_consumer_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r--test/test_consumer_integration.py43
1 files changed, 43 insertions, 0 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index c97081b..1d28f8e 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -353,3 +353,46 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
messages.add((msg.partition, msg.offset))
self.assertEqual(len(messages), 5)
self.assertGreaterEqual(t.interval, 1)
+
+ @kafka_versions("0.8.1", "0.8.1.1")
+ def test_kafka_consumer__offset_commit_resume(self):
+ GROUP_ID = random_string(10)
+
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ # Start a consumer
+ consumer1 = self.kafka_consumer(
+ group_id = GROUP_ID,
+ auto_commit_enable = True,
+ auto_commit_interval_ms = None,
+ auto_commit_interval_messages = 20,
+ auto_offset_reset='smallest',
+ )
+
+ # Grab the first 195 messages
+ output_msgs1 = []
+ for _ in xrange(195):
+ m = consumer1.next()
+ output_msgs1.append(m)
+ consumer1.task_done(m)
+ self.assert_message_count(output_msgs1, 195)
+
+ # The total offset across both partitions should be at 180
+ consumer2 = self.kafka_consumer(
+ group_id = GROUP_ID,
+ auto_commit_enable = True,
+ auto_commit_interval_ms = None,
+ auto_commit_interval_messages = 20,
+ consumer_timeout_ms = 100,
+ auto_offset_reset='smallest',
+ )
+
+ # 181-200
+ output_msgs2 = []
+ with self.assertRaises(ConsumerTimeout):
+ while True:
+ m = consumer2.next()
+ output_msgs2.append(m)
+ self.assert_message_count(output_msgs2, 20)
+ self.assertEqual(len(set(output_msgs1) & set(output_msgs2)), 15)