summaryrefslogtreecommitdiff
path: root/test/test_consumer_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r--test/test_consumer_integration.py35
1 files changed, 35 insertions, 0 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 17a8ac9..e5e2148 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -327,6 +327,41 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer1.stop()
consumer2.stop()
+ @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
+ def test_multi_process_offset_behavior__resuming_behavior(self):
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ # Start a consumer
+ consumer1 = self.consumer(
+ consumer=MultiProcessConsumer,
+ auto_commit_every_t = None,
+ auto_commit_every_n = 20,
+ )
+
+ # Grab the first 195 messages
+ output_msgs1 = []
+ idx = 0
+ for message in consumer1:
+ output_msgs1.append(message.message.value)
+ idx += 1
+ if idx >= 195:
+ break
+ self.assert_message_count(output_msgs1, 195)
+
+ # The total offset across both partitions should be at 180
+ consumer2 = self.consumer(
+ consumer=MultiProcessConsumer,
+ auto_commit_every_t = None,
+ auto_commit_every_n = 20,
+ )
+
+ # 181-200
+ self.assert_message_count([ message for message in consumer2 ], 20)
+
+ consumer1.stop()
+ consumer2.stop()
+
# TODO: Make this a unit test -- should not require integration
@kafka_versions("all")
def test_fetch_buffer_size(self):