diff options
author | Mark Roberts <wizzat@gmail.com> | 2014-04-23 12:15:30 -0700 |
---|---|---|
committer | Mark Roberts <wizzat@gmail.com> | 2014-04-23 12:15:30 -0700 |
commit | 764f2053ad4dd73dc391416ddd4cfa345271efcb (patch) | |
tree | be5efdc22cbfbfc80a7d8c0f973496f6294843cd | |
parent | 8a1f2e6c3a73131d3a32ee4c0012628a6913d1cd (diff) | |
download | kafka-python-764f2053ad4dd73dc391416ddd4cfa345271efcb.tar.gz |
Update consumer_integration to flip the autocommit switch when testing kafka 0.8.1
-rw-r--r-- | test/test_consumer_integration.py | 66 |
1 files changed, 36 insertions, 30 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index b1d1a59..e01ce41 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -50,9 +50,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.send_messages(1, range(100, 200)) # Start a consumer - consumer = SimpleConsumer(self.client, "group1", - self.topic, auto_commit=False, - iter_timeout=0) + consumer = self.consumer() self.assert_message_count([ message for message in consumer ], 200) @@ -63,9 +61,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) - consumer = SimpleConsumer(self.client, "group1", - self.topic, auto_commit=False, - iter_timeout=0) + consumer = self.consumer() # Rewind 10 messages from the end consumer.seek(-10, 2) @@ -79,9 +75,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_simple_consumer_blocking(self): - consumer = SimpleConsumer(self.client, "group1", - self.topic, - auto_commit=False, iter_timeout=0) + consumer = self.consumer() # Ask for 5 messages, nothing in queue, block 5 seconds with Timer() as t: @@ -111,8 +105,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.send_messages(0, range(0, 10)) self.send_messages(1, range(10, 20)) - consumer = SimpleConsumer(self.client, "group1", self.topic, - auto_commit=False, iter_timeout=0) + consumer = self.consumer() self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) @@ -126,7 +119,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) - consumer = MultiProcessConsumer(self.client, "grp1", self.topic, auto_commit=False) + consumer = self.consumer(consumer = MultiProcessConsumer) self.assert_message_count([ message for message in consumer ], 200) @@ -134,7 +127,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_multi_process_consumer_blocking(self): - consumer = MultiProcessConsumer(self.client, "grp1", self.topic, auto_commit=False) + consumer = self.consumer(consumer = MultiProcessConsumer) # Ask for 5 messages, No messages in queue, block 5 seconds with Timer() as t: @@ -182,8 +175,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): large_messages = self.send_messages(0, [ random_string(5000) for x in range(10) ]) # Consumer should still get all of them - consumer = SimpleConsumer(self.client, "group1", self.topic, - auto_commit=False, iter_timeout=0) + consumer = self.consumer() expected_messages = set(small_messages + large_messages) actual_messages = set([ x.message.value for x in consumer ]) @@ -198,8 +190,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): ]) # Create a consumer with the default buffer size - consumer = SimpleConsumer(self.client, "group1", self.topic, - auto_commit=False, iter_timeout=0) + consumer = self.consumer() # This consumer failes to get the message with self.assertRaises(ConsumerFetchSizeTooSmall): @@ -208,9 +199,10 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() # Create a consumer with no fetch size limit - big_consumer = SimpleConsumer(self.client, "group1", self.topic, - max_buffer_size=None, partitions=[0], - auto_commit=False, iter_timeout=0) + big_consumer = self.consumer( + max_buffer_size = None, + partitions = [0], + ) # Seek to the last message big_consumer.seek(-1, 2) @@ -228,25 +220,39 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): msgs2 = self.send_messages(1, range(100, 200)) # Start a consumer - consumer1 = SimpleConsumer(self.client, "group1", - self.topic, auto_commit=True, - auto_commit_every_t=600, - auto_commit_every_n=20, - iter_timeout=0) + consumer1 = self.consumer( + auto_commit_every_t = 600, + auto_commit_every_n = 20, + ) # Grab the first 195 messages output_msgs1 = [ consumer1.get_message().message.value for _ in xrange(195) ] self.assert_message_count(output_msgs1, 195) # The offset should be at 180 - consumer2 = SimpleConsumer(self.client, "group1", - self.topic, auto_commit=True, - auto_commit_every_t=600, - auto_commit_every_n=20, - iter_timeout=0) + consumer2 = self.consumer( + auto_commit_every_t = 600, + auto_commit_every_n = 20, + ) # 180-200 self.assert_message_count([ message for message in consumer2 ], 20) consumer1.stop() consumer2.stop() + + def consumer(self, **kwargs): + if os.environ['KAFKA_VERSION'] == "0.8.0": + # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off + kwargs['auto_commit'] = False + else: + kwargs.setdefault('auto_commit', True) + + consumer_class = kwargs.pop('consumer', SimpleConsumer) + group = kwargs.pop('group', self.id()) + topic = kwargs.pop('topic', self.topic) + + if consumer_class == SimpleConsumer: + kwargs.setdefault('iter_timeout', 0) + + return consumer_class(self.client, group, topic, **kwargs) |