diff options
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r-- | test/test_consumer_integration.py | 75 |
1 files changed, 74 insertions, 1 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 17a8ac9..403ce0f 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -53,6 +53,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): def consumer(self, **kwargs): if os.environ['KAFKA_VERSION'] == "0.8.0": # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off + kwargs['group'] = None kwargs['auto_commit'] = False else: kwargs.setdefault('auto_commit', True) @@ -127,6 +128,23 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): with self.assertRaises(OffsetOutOfRangeError): consumer.get_message() + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") + def test_simple_consumer_load_initial_offsets(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + # Create 1st consumer and change offsets + consumer = self.consumer() + self.assertEqual(consumer.offsets, {0: 0, 1: 0}) + consumer.offsets.update({0:51, 1:101}) + # Update counter after manual offsets update + consumer.count_since_commit += 1 + consumer.commit() + + # Create 2nd consumer and check initial offsets + consumer = self.consumer(auto_commit=False) + self.assertEqual(consumer.offsets, {0: 51, 1: 101}) + @kafka_versions("all") def test_simple_consumer__seek(self): self.send_messages(0, range(0, 100)) @@ -243,7 +261,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.send_messages(0, range(0, 10)) self.send_messages(1, range(10, 20)) - consumer = MultiProcessConsumer(self.client, "group1", self.topic, + # set group to None and auto_commit to False to avoid interactions w/ + # offset commit/fetch apis + consumer = MultiProcessConsumer(self.client, None, self.topic, auto_commit=False, iter_timeout=0) self.assertEqual(consumer.pending(), 20) @@ -252,6 +272,24 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") + def test_multi_process_consumer_load_initial_offsets(self): + self.send_messages(0, range(0, 10)) + self.send_messages(1, range(10, 20)) + + # Create 1st consumer and change offsets + consumer = self.consumer() + self.assertEqual(consumer.offsets, {0: 0, 1: 0}) + consumer.offsets.update({0:5, 1:15}) + # Update counter after manual offsets update + consumer.count_since_commit += 1 + consumer.commit() + + # Create 2nd consumer and check initial offsets + consumer = self.consumer(consumer = MultiProcessConsumer, + auto_commit=False) + self.assertEqual(consumer.offsets, {0: 5, 1: 15}) + @kafka_versions("all") def test_large_messages(self): # Produce 10 "normal" size messages @@ -327,6 +365,41 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer1.stop() consumer2.stop() + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") + def test_multi_process_offset_behavior__resuming_behavior(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + # Start a consumer + consumer1 = self.consumer( + consumer=MultiProcessConsumer, + auto_commit_every_t = None, + auto_commit_every_n = 20, + ) + + # Grab the first 195 messages + output_msgs1 = [] + idx = 0 + for message in consumer1: + output_msgs1.append(message.message.value) + idx += 1 + if idx >= 195: + break + self.assert_message_count(output_msgs1, 195) + + # The total offset across both partitions should be at 180 + consumer2 = self.consumer( + consumer=MultiProcessConsumer, + auto_commit_every_t = None, + auto_commit_every_n = 20, + ) + + # 181-200 + self.assert_message_count([ message for message in consumer2 ], 20) + + consumer1.stop() + consumer2.stop() + # TODO: Make this a unit test -- should not require integration @kafka_versions("all") def test_fetch_buffer_size(self): |