summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAli-Akber Saifee <ali@indydevs.org>2015-03-18 10:27:04 +0800
committerDana Powers <dana.powers@rd.io>2015-03-30 15:02:44 -0700
commit4bc30a2ec8665b1faef0e668c12138c3cc52e38c (patch)
tree8133f2493591a355d77b8cf3621b63a7982b26ca
parentd05fccb9ef6a5a52de55b7ed7929bdbe3caed8a4 (diff)
downloadkafka-python-4bc30a2ec8665b1faef0e668c12138c3cc52e38c.tar.gz
Add test case for MP Consumer auto commit
Tweak MP Consumer test to use iterator
-rw-r--r--test/test_consumer_integration.py35
1 files changed, 35 insertions, 0 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 17a8ac9..e5e2148 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -327,6 +327,41 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer1.stop()
consumer2.stop()
+ @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
+ def test_multi_process_offset_behavior__resuming_behavior(self):
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ # Start a consumer
+ consumer1 = self.consumer(
+ consumer=MultiProcessConsumer,
+ auto_commit_every_t = None,
+ auto_commit_every_n = 20,
+ )
+
+ # Grab the first 195 messages
+ output_msgs1 = []
+ idx = 0
+ for message in consumer1:
+ output_msgs1.append(message.message.value)
+ idx += 1
+ if idx >= 195:
+ break
+ self.assert_message_count(output_msgs1, 195)
+
+ # The total offset across both partitions should be at 180
+ consumer2 = self.consumer(
+ consumer=MultiProcessConsumer,
+ auto_commit_every_t = None,
+ auto_commit_every_n = 20,
+ )
+
+ # 181-200
+ self.assert_message_count([ message for message in consumer2 ], 20)
+
+ consumer1.stop()
+ consumer2.stop()
+
# TODO: Make this a unit test -- should not require integration
@kafka_versions("all")
def test_fetch_buffer_size(self):