diff options
author | Ali-Akber Saifee <ali@indydevs.org> | 2015-03-18 10:27:04 +0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-03-30 15:02:44 -0700 |
commit | 4bc30a2ec8665b1faef0e668c12138c3cc52e38c (patch) | |
tree | 8133f2493591a355d77b8cf3621b63a7982b26ca | |
parent | d05fccb9ef6a5a52de55b7ed7929bdbe3caed8a4 (diff) | |
download | kafka-python-4bc30a2ec8665b1faef0e668c12138c3cc52e38c.tar.gz |
Add test case for MP Consumer auto commit
Tweak MP Consumer test to use iterator
-rw-r--r-- | test/test_consumer_integration.py | 35 |
1 files changed, 35 insertions, 0 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 17a8ac9..e5e2148 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -327,6 +327,41 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer1.stop() consumer2.stop() + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") + def test_multi_process_offset_behavior__resuming_behavior(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + # Start a consumer + consumer1 = self.consumer( + consumer=MultiProcessConsumer, + auto_commit_every_t = None, + auto_commit_every_n = 20, + ) + + # Grab the first 195 messages + output_msgs1 = [] + idx = 0 + for message in consumer1: + output_msgs1.append(message.message.value) + idx += 1 + if idx >= 195: + break + self.assert_message_count(output_msgs1, 195) + + # The total offset across both partitions should be at 180 + consumer2 = self.consumer( + consumer=MultiProcessConsumer, + auto_commit_every_t = None, + auto_commit_every_n = 20, + ) + + # 181-200 + self.assert_message_count([ message for message in consumer2 ], 20) + + consumer1.stop() + consumer2.stop() + # TODO: Make this a unit test -- should not require integration @kafka_versions("all") def test_fetch_buffer_size(self): |