summaryrefslogtreecommitdiff
path: root/test/test_consumer_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r--test/test_consumer_integration.py45
1 files changed, 24 insertions, 21 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 40eec14..fe4e454 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -21,9 +21,30 @@ from kafka.structs import (
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import (
- KafkaIntegrationTestCase, kafka_versions, random_string, Timer
+ KafkaIntegrationTestCase, kafka_versions, random_string, Timer,
+ send_messages
)
+def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
+ """Test KafkaConsumer
+ """
+ kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest')
+
+ send_messages(simple_client, topic, 0, range(0, 100))
+ send_messages(simple_client, topic, 1, range(100, 200))
+
+ cnt = 0
+ messages = {0: set(), 1: set()}
+ for message in kafka_consumer:
+ logging.debug("Consumed message %s", repr(message))
+ cnt += 1
+ messages[message.partition].add(message.offset)
+ if cnt >= 200:
+ break
+
+ assert len(messages[0]) == 100
+ assert len(messages[1]) == 100
+
class TestConsumerIntegration(KafkaIntegrationTestCase):
maxDiff = None
@@ -35,9 +56,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
cls.zk = ZookeeperFixture.instance()
chroot = random_string(10)
- cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port,
+ cls.server1 = KafkaFixture.instance(0, cls.zk,
zk_chroot=chroot)
- cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port,
+ cls.server2 = KafkaFixture.instance(1, cls.zk,
zk_chroot=chroot)
cls.server = cls.server1 # Bootstrapping server
@@ -501,24 +522,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
messages = [ message for message in consumer ]
self.assertEqual(len(messages), 2)
- def test_kafka_consumer(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Start a consumer
- consumer = self.kafka_consumer(auto_offset_reset='earliest')
- n = 0
- messages = {0: set(), 1: set()}
- for m in consumer:
- logging.debug("Consumed message %s" % repr(m))
- n += 1
- messages[m.partition].add(m.offset)
- if n >= 200:
- break
-
- self.assertEqual(len(messages[0]), 100)
- self.assertEqual(len(messages[1]), 100)
-
def test_kafka_consumer__blocking(self):
TIMEOUT_MS = 500
consumer = self.kafka_consumer(auto_offset_reset='earliest',