summaryrefslogtreecommitdiff
path: root/test/test_integration.py
diff options
context:
space:
mode:
authorOmar <omar.ghishan@rd.io>2014-01-13 13:52:02 -0800
committerOmar <omar.ghishan@rd.io>2014-01-13 13:52:02 -0800
commit87c7f9dedfc008e3fff7a010cc4e708eeec5bebe (patch)
treec55c3c5fea1fab6eef77f5213909ed2c2f8acc92 /test/test_integration.py
parent354fcdbdd9b34b3454b964e6dc0d4a746744bbcd (diff)
parenta0c7141e2cc7399a9472a8169ea5f730f0407386 (diff)
downloadkafka-python-87c7f9dedfc008e3fff7a010cc4e708eeec5bebe.tar.gz
Merge pull request #88 from rdiomar/rdiomar_changes
Various changes/fixes, including: * Allow customizing socket timeouts * Read the correct number of bytes from kafka * Guarantee reading the expected number of bytes from the socket every time * Remove bufsize from client and conn * SimpleConsumer flow changes * Fix some error handling * Add optional upper limit to consumer fetch buffer size * Add and fix unit and integration tests
Diffstat (limited to 'test/test_integration.py')
-rw-r--r--test/test_integration.py68
1 files changed, 48 insertions, 20 deletions
diff --git a/test/test_integration.py b/test/test_integration.py
index a10dae2..eaf432d 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -8,6 +8,7 @@ import random
from kafka import * # noqa
from kafka.common import * # noqa
from kafka.codec import has_gzip, has_snappy
+from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
from .fixtures import ZookeeperFixture, KafkaFixture
@@ -554,7 +555,7 @@ class TestConsumer(unittest.TestCase):
cls.zk = ZookeeperFixture.instance()
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
- cls.client = KafkaClient(cls.server2.host, cls.server2.port, bufsize=8192)
+ cls.client = KafkaClient(cls.server2.host, cls.server2.port)
@classmethod
def tearDownClass(cls): # noqa
@@ -583,7 +584,9 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 0)
# Start a consumer
- consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer", auto_commit=False)
+ consumer = SimpleConsumer(self.client, "group1",
+ "test_simple_consumer", auto_commit=False,
+ iter_timeout=0)
all_messages = []
for message in consumer:
all_messages.append(message)
@@ -609,7 +612,9 @@ class TestConsumer(unittest.TestCase):
consumer.stop()
def test_simple_consumer_blocking(self):
- consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer_blocking", auto_commit=False)
+ consumer = SimpleConsumer(self.client, "group1",
+ "test_simple_consumer_blocking",
+ auto_commit=False, iter_timeout=0)
# Blocking API
start = datetime.now()
@@ -657,7 +662,8 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- consumer = SimpleConsumer(self.client, "group1", "test_simple_pending", auto_commit=False)
+ consumer = SimpleConsumer(self.client, "group1", "test_simple_pending",
+ auto_commit=False, iter_timeout=0)
self.assertEquals(consumer.pending(), 20)
self.assertEquals(consumer.pending(partitions=[0]), 10)
self.assertEquals(consumer.pending(partitions=[1]), 10)
@@ -755,7 +761,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- # Produce 10 messages that are too large (bigger than default fetch size)
+ # Produce 10 messages that are large (bigger than default fetch size)
messages2 = [create_message(random_string(5000)) for i in range(10)]
produce2 = ProduceRequest("test_large_messages", 0, messages2)
@@ -764,33 +770,55 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 10)
# Consumer should still get all of them
- consumer = SimpleConsumer(self.client, "group1", "test_large_messages", auto_commit=False)
+ consumer = SimpleConsumer(self.client, "group1", "test_large_messages",
+ auto_commit=False, iter_timeout=0)
all_messages = messages1 + messages2
for i, message in enumerate(consumer):
self.assertEquals(all_messages[i], message.message)
self.assertEquals(i, 19)
+ # Produce 1 message that is too large (bigger than max fetch size)
+ big_message_size = MAX_FETCH_BUFFER_SIZE_BYTES + 10
+ big_message = create_message(random_string(big_message_size))
+ produce3 = ProduceRequest("test_large_messages", 0, [big_message])
+ for resp in self.client.send_produce_request([produce3]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 20)
+
+ self.assertRaises(ConsumerFetchSizeTooSmall, consumer.get_message, False, 0.1)
+
+ # Create a consumer with no fetch size limit
+ big_consumer = SimpleConsumer(self.client, "group1", "test_large_messages",
+ max_buffer_size=None, partitions=[0],
+ auto_commit=False, iter_timeout=0)
+
+ # Seek to the last message
+ big_consumer.seek(-1, 2)
+
+ # Consume giant message successfully
+ message = big_consumer.get_message(block=False, timeout=10)
+ self.assertIsNotNone(message)
+ self.assertEquals(message.message.value, big_message.value)
+
class TestFailover(unittest.TestCase):
- @classmethod
- def setUpClass(cls):
+ def setUp(self):
- zk_chroot = random_string(10)
+ zk_chroot = random_string(10)
replicas = 2
partitions = 2
# mini zookeeper, 2 kafka brokers
- cls.zk = ZookeeperFixture.instance()
- kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
- cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
- cls.client = KafkaClient(cls.brokers[0].host, cls.brokers[0].port)
-
- @classmethod
- def tearDownClass(cls):
- cls.client.close()
- for broker in cls.brokers:
+ self.zk = ZookeeperFixture.instance()
+ kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions]
+ self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
+ self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
+
+ def tearDown(self):
+ self.client.close()
+ for broker in self.brokers:
broker.close()
- cls.zk.close()
+ self.zk.close()
def test_switch_leader(self):
@@ -869,7 +897,7 @@ class TestFailover(unittest.TestCase):
def _count_messages(self, group, topic):
client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
- consumer = SimpleConsumer(client, group, topic, auto_commit=False)
+ consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0)
all_messages = []
for message in consumer:
all_messages.append(message)