diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-15 18:08:00 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-12-15 12:42:55 -0800 |
commit | 54ecfeaec05d4a4c85a37310885430b771c8bc57 (patch) | |
tree | 894730269f7f5cddf277c88b2c2371d0466cf2ba | |
parent | 26e18ce0c032e2801cbbe5d9f444107b8ab4919a (diff) | |
download | kafka-python-54ecfeaec05d4a4c85a37310885430b771c8bc57.tar.gz |
Add a few basic KafkaConsumer tests
-rw-r--r-- | test/test_consumer.py | 6 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 90 |
2 files changed, 83 insertions, 13 deletions
diff --git a/test/test_consumer.py b/test/test_consumer.py index 9060919..b33e537 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -3,8 +3,14 @@ from mock import MagicMock from . import unittest from kafka.consumer import SimpleConsumer +from kafka.consumer.new import KafkaConsumer +from kafka.common import KafkaConfigurationError class TestKafkaConsumer(unittest.TestCase): def test_non_integer_partitions(self): with self.assertRaises(AssertionError): SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ]) + + def test_broker_list_required(self): + with self.assertRaises(KafkaConfigurationError): + KafkaConsumer() diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 2762008..b4af70b 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -1,10 +1,14 @@ +import logging import os from six.moves import xrange from kafka import SimpleConsumer, MultiProcessConsumer, create_message -from kafka.common import ProduceRequest, ConsumerFetchSizeTooSmall +from kafka.common import ( + ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout +) from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES +from kafka.consumer.new import KafkaConsumer from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import ( @@ -47,6 +51,29 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # Make sure there are no duplicates self.assertEquals(len(set(messages)), num_messages) + def consumer(self, **kwargs): + if os.environ['KAFKA_VERSION'] == "0.8.0": + # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off + kwargs['auto_commit'] = False + else: + kwargs.setdefault('auto_commit', True) + + consumer_class = kwargs.pop('consumer', SimpleConsumer) + group = kwargs.pop('group', self.id().encode('utf-8')) + topic = kwargs.pop('topic', self.topic) + + if consumer_class == SimpleConsumer: + kwargs.setdefault('iter_timeout', 0) + + return consumer_class(self.client, group, topic, **kwargs) + + def kafka_consumer(self, **configs): + brokers = '%s:%d' % (self.server.host, self.server.port) + consumer = KafkaConsumer(self.topic, + metadata_broker_list=brokers, + **configs) + return consumer + @kafka_versions("all") def test_simple_consumer(self): self.send_messages(0, range(0, 100)) @@ -275,18 +302,55 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): messages = [ message for message in consumer ] self.assertEquals(len(messages), 2) - def consumer(self, **kwargs): - if os.environ['KAFKA_VERSION'] == "0.8.0": - # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off - kwargs['auto_commit'] = False - else: - kwargs.setdefault('auto_commit', True) + @kafka_versions("all") + def test_kafka_consumer(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) - consumer_class = kwargs.pop('consumer', SimpleConsumer) - group = kwargs.pop('group', self.id().encode('utf-8')) - topic = kwargs.pop('topic', self.topic) + # Start a consumer + consumer = self.kafka_consumer(auto_offset_reset='smallest', + consumer_timeout_ms=5000) + n = 0 + messages = {0: set(), 1: set()} + logging.debug("kafka consumer offsets: %s" % consumer.offsets()) + for m in consumer: + logging.debug("Consumed message %s" % repr(m)) + n += 1 + messages[m.partition].add(m.offset) + if n >= 200: + break + + self.assertEquals(len(messages[0]), 100) + self.assertEquals(len(messages[1]), 100) - if consumer_class == SimpleConsumer: - kwargs.setdefault('iter_timeout', 0) + @kafka_versions("all") + def test_kafka_consumer__blocking(self): + consumer = self.kafka_consumer(auto_offset_reset='smallest', + consumer_timeout_ms=1000) - return consumer_class(self.client, group, topic, **kwargs) + # Ask for 5 messages, nothing in queue, block 5 seconds + with Timer() as t: + with self.assertRaises(ConsumerTimeout): + msg = consumer.next() + self.assertGreaterEqual(t.interval, 1) + + self.send_messages(0, range(0, 10)) + + # Ask for 5 messages, 10 in queue. Get 5 back, no blocking + messages = set() + with Timer() as t: + for i in range(5): + msg = consumer.next() + messages.add((msg.partition, msg.offset)) + self.assertEqual(len(messages), 5) + self.assertLess(t.interval, 1) + + # Ask for 10 messages, get 5 back, block 5 seconds + messages = set() + with Timer() as t: + with self.assertRaises(ConsumerTimeout): + for i in range(10): + msg = consumer.next() + messages.add((msg.partition, msg.offset)) + self.assertEqual(len(messages), 5) + self.assertGreaterEqual(t.interval, 1) |