summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-15 18:08:00 -0700
committerDana Powers <dana.powers@rd.io>2014-12-15 12:42:55 -0800
commit54ecfeaec05d4a4c85a37310885430b771c8bc57 (patch)
tree894730269f7f5cddf277c88b2c2371d0466cf2ba
parent26e18ce0c032e2801cbbe5d9f444107b8ab4919a (diff)
downloadkafka-python-54ecfeaec05d4a4c85a37310885430b771c8bc57.tar.gz
Add a few basic KafkaConsumer tests
-rw-r--r--test/test_consumer.py6
-rw-r--r--test/test_consumer_integration.py90
2 files changed, 83 insertions, 13 deletions
diff --git a/test/test_consumer.py b/test/test_consumer.py
index 9060919..b33e537 100644
--- a/test/test_consumer.py
+++ b/test/test_consumer.py
@@ -3,8 +3,14 @@ from mock import MagicMock
from . import unittest
from kafka.consumer import SimpleConsumer
+from kafka.consumer.new import KafkaConsumer
+from kafka.common import KafkaConfigurationError
class TestKafkaConsumer(unittest.TestCase):
def test_non_integer_partitions(self):
with self.assertRaises(AssertionError):
SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])
+
+ def test_broker_list_required(self):
+ with self.assertRaises(KafkaConfigurationError):
+ KafkaConsumer()
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 2762008..b4af70b 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -1,10 +1,14 @@
+import logging
import os
from six.moves import xrange
from kafka import SimpleConsumer, MultiProcessConsumer, create_message
-from kafka.common import ProduceRequest, ConsumerFetchSizeTooSmall
+from kafka.common import (
+ ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout
+)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
+from kafka.consumer.new import KafkaConsumer
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import (
@@ -47,6 +51,29 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Make sure there are no duplicates
self.assertEquals(len(set(messages)), num_messages)
+ def consumer(self, **kwargs):
+ if os.environ['KAFKA_VERSION'] == "0.8.0":
+ # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
+ kwargs['auto_commit'] = False
+ else:
+ kwargs.setdefault('auto_commit', True)
+
+ consumer_class = kwargs.pop('consumer', SimpleConsumer)
+ group = kwargs.pop('group', self.id().encode('utf-8'))
+ topic = kwargs.pop('topic', self.topic)
+
+ if consumer_class == SimpleConsumer:
+ kwargs.setdefault('iter_timeout', 0)
+
+ return consumer_class(self.client, group, topic, **kwargs)
+
+ def kafka_consumer(self, **configs):
+ brokers = '%s:%d' % (self.server.host, self.server.port)
+ consumer = KafkaConsumer(self.topic,
+ metadata_broker_list=brokers,
+ **configs)
+ return consumer
+
@kafka_versions("all")
def test_simple_consumer(self):
self.send_messages(0, range(0, 100))
@@ -275,18 +302,55 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
messages = [ message for message in consumer ]
self.assertEquals(len(messages), 2)
- def consumer(self, **kwargs):
- if os.environ['KAFKA_VERSION'] == "0.8.0":
- # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
- kwargs['auto_commit'] = False
- else:
- kwargs.setdefault('auto_commit', True)
+ @kafka_versions("all")
+ def test_kafka_consumer(self):
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
- consumer_class = kwargs.pop('consumer', SimpleConsumer)
- group = kwargs.pop('group', self.id().encode('utf-8'))
- topic = kwargs.pop('topic', self.topic)
+ # Start a consumer
+ consumer = self.kafka_consumer(auto_offset_reset='smallest',
+ consumer_timeout_ms=5000)
+ n = 0
+ messages = {0: set(), 1: set()}
+ logging.debug("kafka consumer offsets: %s" % consumer.offsets())
+ for m in consumer:
+ logging.debug("Consumed message %s" % repr(m))
+ n += 1
+ messages[m.partition].add(m.offset)
+ if n >= 200:
+ break
+
+ self.assertEquals(len(messages[0]), 100)
+ self.assertEquals(len(messages[1]), 100)
- if consumer_class == SimpleConsumer:
- kwargs.setdefault('iter_timeout', 0)
+ @kafka_versions("all")
+ def test_kafka_consumer__blocking(self):
+ consumer = self.kafka_consumer(auto_offset_reset='smallest',
+ consumer_timeout_ms=1000)
- return consumer_class(self.client, group, topic, **kwargs)
+ # Ask for 5 messages, nothing in queue, block 5 seconds
+ with Timer() as t:
+ with self.assertRaises(ConsumerTimeout):
+ msg = consumer.next()
+ self.assertGreaterEqual(t.interval, 1)
+
+ self.send_messages(0, range(0, 10))
+
+ # Ask for 5 messages, 10 in queue. Get 5 back, no blocking
+ messages = set()
+ with Timer() as t:
+ for i in range(5):
+ msg = consumer.next()
+ messages.add((msg.partition, msg.offset))
+ self.assertEqual(len(messages), 5)
+ self.assertLess(t.interval, 1)
+
+ # Ask for 10 messages, get 5 back, block 5 seconds
+ messages = set()
+ with Timer() as t:
+ with self.assertRaises(ConsumerTimeout):
+ for i in range(10):
+ msg = consumer.next()
+ messages.add((msg.partition, msg.offset))
+ self.assertEqual(len(messages), 5)
+ self.assertGreaterEqual(t.interval, 1)