summaryrefslogtreecommitdiff
path: root/test/test_consumer_integration.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-17 13:49:23 -0700
committerDana Powers <dana.powers@rd.io>2014-12-15 12:42:56 -0800
commitb80e83b7335a92fcbfcf25e38d51f24fc00c20ea (patch)
tree18a57ce552645e7d72c2a43050e73d2c7f2679c5 /test/test_consumer_integration.py
parent206560a74b56e7a2dcc7f358f24b5769a22769b5 (diff)
downloadkafka-python-b80e83b7335a92fcbfcf25e38d51f24fc00c20ea.tar.gz
Fix task_done checks when no previous commit exists; add test
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r--test/test_consumer_integration.py43
1 files changed, 43 insertions, 0 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index c97081b..1d28f8e 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -353,3 +353,46 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
messages.add((msg.partition, msg.offset))
self.assertEqual(len(messages), 5)
self.assertGreaterEqual(t.interval, 1)
+
+ @kafka_versions("0.8.1", "0.8.1.1")
+ def test_kafka_consumer__offset_commit_resume(self):
+ GROUP_ID = random_string(10)
+
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ # Start a consumer
+ consumer1 = self.kafka_consumer(
+ group_id = GROUP_ID,
+ auto_commit_enable = True,
+ auto_commit_interval_ms = None,
+ auto_commit_interval_messages = 20,
+ auto_offset_reset='smallest',
+ )
+
+ # Grab the first 195 messages
+ output_msgs1 = []
+ for _ in xrange(195):
+ m = consumer1.next()
+ output_msgs1.append(m)
+ consumer1.task_done(m)
+ self.assert_message_count(output_msgs1, 195)
+
+ # The total offset across both partitions should be at 180
+ consumer2 = self.kafka_consumer(
+ group_id = GROUP_ID,
+ auto_commit_enable = True,
+ auto_commit_interval_ms = None,
+ auto_commit_interval_messages = 20,
+ consumer_timeout_ms = 100,
+ auto_offset_reset='smallest',
+ )
+
+ # 181-200
+ output_msgs2 = []
+ with self.assertRaises(ConsumerTimeout):
+ while True:
+ m = consumer2.next()
+ output_msgs2.append(m)
+ self.assert_message_count(output_msgs2, 20)
+ self.assertEqual(len(set(output_msgs1) & set(output_msgs2)), 15)