diff options
Diffstat (limited to 'test/test_integration.py')
-rw-r--r-- | test/test_integration.py | 68 |
1 files changed, 48 insertions, 20 deletions
diff --git a/test/test_integration.py b/test/test_integration.py index a10dae2..eaf432d 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -8,6 +8,7 @@ import random from kafka import * # noqa from kafka.common import * # noqa from kafka.codec import has_gzip, has_snappy +from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES from .fixtures import ZookeeperFixture, KafkaFixture @@ -554,7 +555,7 @@ class TestConsumer(unittest.TestCase): cls.zk = ZookeeperFixture.instance() cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) - cls.client = KafkaClient(cls.server2.host, cls.server2.port, bufsize=8192) + cls.client = KafkaClient(cls.server2.host, cls.server2.port) @classmethod def tearDownClass(cls): # noqa @@ -583,7 +584,9 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.offset, 0) # Start a consumer - consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", + "test_simple_consumer", auto_commit=False, + iter_timeout=0) all_messages = [] for message in consumer: all_messages.append(message) @@ -609,7 +612,9 @@ class TestConsumer(unittest.TestCase): consumer.stop() def test_simple_consumer_blocking(self): - consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer_blocking", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", + "test_simple_consumer_blocking", + auto_commit=False, iter_timeout=0) # Blocking API start = datetime.now() @@ -657,7 +662,8 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = SimpleConsumer(self.client, "group1", "test_simple_pending", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", "test_simple_pending", + auto_commit=False, iter_timeout=0) self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) @@ -755,7 +761,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - # Produce 10 messages that are too large (bigger than default fetch size) + # Produce 10 messages that are large (bigger than default fetch size) messages2 = [create_message(random_string(5000)) for i in range(10)] produce2 = ProduceRequest("test_large_messages", 0, messages2) @@ -764,33 +770,55 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.offset, 10) # Consumer should still get all of them - consumer = SimpleConsumer(self.client, "group1", "test_large_messages", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", "test_large_messages", + auto_commit=False, iter_timeout=0) all_messages = messages1 + messages2 for i, message in enumerate(consumer): self.assertEquals(all_messages[i], message.message) self.assertEquals(i, 19) + # Produce 1 message that is too large (bigger than max fetch size) + big_message_size = MAX_FETCH_BUFFER_SIZE_BYTES + 10 + big_message = create_message(random_string(big_message_size)) + produce3 = ProduceRequest("test_large_messages", 0, [big_message]) + for resp in self.client.send_produce_request([produce3]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 20) + + self.assertRaises(ConsumerFetchSizeTooSmall, consumer.get_message, False, 0.1) + + # Create a consumer with no fetch size limit + big_consumer = SimpleConsumer(self.client, "group1", "test_large_messages", + max_buffer_size=None, partitions=[0], + auto_commit=False, iter_timeout=0) + + # Seek to the last message + big_consumer.seek(-1, 2) + + # Consume giant message successfully + message = big_consumer.get_message(block=False, timeout=10) + self.assertIsNotNone(message) + self.assertEquals(message.message.value, big_message.value) + class TestFailover(unittest.TestCase): - @classmethod - def setUpClass(cls): + def setUp(self): - zk_chroot = random_string(10) + zk_chroot = random_string(10) replicas = 2 partitions = 2 # mini zookeeper, 2 kafka brokers - cls.zk = ZookeeperFixture.instance() - kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] - cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] - cls.client = KafkaClient(cls.brokers[0].host, cls.brokers[0].port) - - @classmethod - def tearDownClass(cls): - cls.client.close() - for broker in cls.brokers: + self.zk = ZookeeperFixture.instance() + kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] + self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] + self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + + def tearDown(self): + self.client.close() + for broker in self.brokers: broker.close() - cls.zk.close() + self.zk.close() def test_switch_leader(self): @@ -869,7 +897,7 @@ class TestFailover(unittest.TestCase): def _count_messages(self, group, topic): client = KafkaClient(self.brokers[0].host, self.brokers[0].port) - consumer = SimpleConsumer(client, group, topic, auto_commit=False) + consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0) all_messages = [] for message in consumer: all_messages.append(message) |