diff options
author | mrtheb <mrlabbe@gmail.com> | 2014-01-31 22:43:59 -0500 |
---|---|---|
committer | mrtheb <mrlabbe@gmail.com> | 2014-01-31 22:43:59 -0500 |
commit | 84de472a4d5b583ff3ed6cc6d92250a7c9291ceb (patch) | |
tree | e3d03da4eeecf8eab2dc63cf113a4daf82addf72 /test/test_integration.py | |
parent | 0bdff4e833f73518a7219fca04dfbc3ed201b06e (diff) | |
parent | 4abf7ee1fbbdc47c8cb7b35f2600e58f1f95e6bb (diff) | |
download | kafka-python-84de472a4d5b583ff3ed6cc6d92250a7c9291ceb.tar.gz |
Merge branch 'master' into multihosts
Conflicts:
kafka/client.py
kafka/conn.py
setup.py
test/test_integration.py
test/test_unit.py
Diffstat (limited to 'test/test_integration.py')
-rw-r--r-- | test/test_integration.py | 318 |
1 files changed, 181 insertions, 137 deletions
diff --git a/test/test_integration.py b/test/test_integration.py index 1f37ebf..000f44a 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -8,10 +8,36 @@ import random from kafka import * # noqa from kafka.common import * # noqa from kafka.codec import has_gzip, has_snappy +from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES from .fixtures import ZookeeperFixture, KafkaFixture -class TestKafkaClient(unittest.TestCase): +def random_string(l): + s = "".join(random.choice(string.letters) for i in xrange(l)) + return s + + +def ensure_topic_creation(client, topic_name): + times = 0 + while True: + times += 1 + client.load_metadata_for_topics(topic_name) + if client.has_metadata_for_topic(topic_name): + break + print "Waiting for %s topic to be created" % topic_name + time.sleep(1) + + if times > 30: + raise Exception("Unable to create topic %s" % topic_name) + + +class KafkaTestCase(unittest.TestCase): + def setUp(self): + self.topic = "%s-%s" % (self.id()[self.id().rindex(".")+1:], random_string(10)) + ensure_topic_creation(self.client, self.topic) + + +class TestKafkaClient(KafkaTestCase): @classmethod def setUpClass(cls): # noqa cls.zk = ZookeeperFixture.instance() @@ -29,7 +55,8 @@ class TestKafkaClient(unittest.TestCase): ##################### def test_produce_many_simple(self): - produce = ProduceRequest("test_produce_many_simple", 0, messages=[ + + produce = ProduceRequest(self.topic, 0, messages=[ create_message("Test message %d" % i) for i in range(100) ]) @@ -37,25 +64,25 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 100) for resp in self.client.send_produce_request([produce]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 100) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 200) for resp in self.client.send_produce_request([produce]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 200) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 300) def test_produce_10k_simple(self): - produce = ProduceRequest("test_produce_10k_simple", 0, messages=[ + produce = ProduceRequest(self.topic, 0, messages=[ create_message("Test message %d" % i) for i in range(10000) ]) @@ -63,7 +90,7 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_10k_simple", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 10000) def test_produce_many_gzip(self): @@ -72,13 +99,13 @@ class TestKafkaClient(unittest.TestCase): message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)]) message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)]) - produce = ProduceRequest("test_produce_many_gzip", 0, messages=[message1, message2]) + produce = ProduceRequest(self.topic, 0, messages=[message1, message2]) for resp in self.client.send_produce_request([produce]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_gzip", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 200) def test_produce_many_snappy(self): @@ -87,13 +114,13 @@ class TestKafkaClient(unittest.TestCase): message1 = create_snappy_message(["Snappy 1 %d" % i for i in range(100)]) message2 = create_snappy_message(["Snappy 2 %d" % i for i in range(100)]) - produce = ProduceRequest("test_produce_many_snappy", 0, messages=[message1, message2]) + produce = ProduceRequest(self.topic, 0, messages=[message1, message2]) for resp in self.client.send_produce_request([produce]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_snappy", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 200) def test_produce_mixed(self): @@ -103,17 +130,17 @@ class TestKafkaClient(unittest.TestCase): message2 = create_gzip_message(["Gzipped %d" % i for i in range(100)]) message3 = create_snappy_message(["Snappy %d" % i for i in range(100)]) - produce = ProduceRequest("test_produce_mixed", 0, messages=[message1, message2, message3]) + produce = ProduceRequest(self.topic, 0, messages=[message1, message2, message3]) for resp in self.client.send_produce_request([produce]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_mixed", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 201) def test_produce_100k_gzipped(self): - req1 = ProduceRequest("test_produce_100k_gzipped", 0, messages=[ + req1 = ProduceRequest(self.topic, 0, messages=[ create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)]) ]) @@ -121,10 +148,10 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_100k_gzipped", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 50000) - req2 = ProduceRequest("test_produce_100k_gzipped", 0, messages=[ + req2 = ProduceRequest(self.topic, 0, messages=[ create_gzip_message(["Gzipped batch 2, message %d" % i for i in range(50000)]) ]) @@ -132,7 +159,7 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 50000) - (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_100k_gzipped", 0, -1, 1)]) + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) self.assertEquals(offset.offsets[0], 100000) ##################### @@ -140,18 +167,18 @@ class TestKafkaClient(unittest.TestCase): ##################### def test_consume_none(self): - fetch = FetchRequest("test_consume_none", 0, 0, 1024) + fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch])[0] self.assertEquals(fetch_resp.error, 0) - self.assertEquals(fetch_resp.topic, "test_consume_none") + self.assertEquals(fetch_resp.topic, self.topic) self.assertEquals(fetch_resp.partition, 0) messages = list(fetch_resp.messages) self.assertEquals(len(messages), 0) def test_produce_consume(self): - produce = ProduceRequest("test_produce_consume", 0, messages=[ + produce = ProduceRequest(self.topic, 0, messages=[ create_message("Just a test message"), create_message("Message with a key", "foo"), ]) @@ -160,7 +187,7 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - fetch = FetchRequest("test_produce_consume", 0, 0, 1024) + fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch])[0] self.assertEquals(fetch_resp.error, 0) @@ -175,7 +202,7 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(messages[1].message.key, "foo") def test_produce_consume_many(self): - produce = ProduceRequest("test_produce_consume_many", 0, messages=[ + produce = ProduceRequest(self.topic, 0, messages=[ create_message("Test message %d" % i) for i in range(100) ]) @@ -184,7 +211,7 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(resp.offset, 0) # 1024 is not enough for 100 messages... - fetch1 = FetchRequest("test_produce_consume_many", 0, 0, 1024) + fetch1 = FetchRequest(self.topic, 0, 0, 1024) (fetch_resp1,) = self.client.send_fetch_request([fetch1]) @@ -194,7 +221,7 @@ class TestKafkaClient(unittest.TestCase): self.assertTrue(len(messages) < 100) # 10240 should be enough - fetch2 = FetchRequest("test_produce_consume_many", 0, 0, 10240) + fetch2 = FetchRequest(self.topic, 0, 0, 10240) (fetch_resp2,) = self.client.send_fetch_request([fetch2]) self.assertEquals(fetch_resp2.error, 0) @@ -207,10 +234,10 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(message.message.key, None) def test_produce_consume_two_partitions(self): - produce1 = ProduceRequest("test_produce_consume_two_partitions", 0, messages=[ + produce1 = ProduceRequest(self.topic, 0, messages=[ create_message("Partition 0 %d" % i) for i in range(10) ]) - produce2 = ProduceRequest("test_produce_consume_two_partitions", 1, messages=[ + produce2 = ProduceRequest(self.topic, 1, messages=[ create_message("Partition 1 %d" % i) for i in range(10) ]) @@ -218,8 +245,8 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - fetch1 = FetchRequest("test_produce_consume_two_partitions", 0, 0, 1024) - fetch2 = FetchRequest("test_produce_consume_two_partitions", 1, 0, 1024) + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) self.assertEquals(fetch_resp1.error, 0) self.assertEquals(fetch_resp1.highwaterMark, 10) @@ -244,11 +271,11 @@ class TestKafkaClient(unittest.TestCase): @unittest.skip('commmit offset not supported in this version') def test_commit_fetch_offsets(self): - req = OffsetCommitRequest("test_commit_fetch_offsets", 0, 42, "metadata") + req = OffsetCommitRequest(self.topic, 0, 42, "metadata") (resp,) = self.client.send_offset_commit_request("group", [req]) self.assertEquals(resp.error, 0) - req = OffsetFetchRequest("test_commit_fetch_offsets", 0) + req = OffsetFetchRequest(self.topic, 0) (resp,) = self.client.send_offset_fetch_request("group", [req]) self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 42) @@ -257,8 +284,8 @@ class TestKafkaClient(unittest.TestCase): # Producer Tests def test_simple_producer(self): - producer = SimpleProducer(self.client, "test_simple_producer") - resp = producer.send_messages("one", "two") + producer = SimpleProducer(self.client) + resp = producer.send_messages(self.topic, "one", "two") # Will go to partition 0 self.assertEquals(len(resp), 1) @@ -266,13 +293,13 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(resp[0].offset, 0) # offset of first msg # Will go to partition 1 - resp = producer.send_messages("three") + resp = producer.send_messages(self.topic, "three") self.assertEquals(len(resp), 1) self.assertEquals(resp[0].error, 0) self.assertEquals(resp[0].offset, 0) # offset of first msg - fetch1 = FetchRequest("test_simple_producer", 0, 0, 1024) - fetch2 = FetchRequest("test_simple_producer", 1, 0, 1024) + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) self.assertEquals(fetch_resp1.error, 0) @@ -288,7 +315,7 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(messages[0].message.value, "three") # Will go to partition 0 - resp = producer.send_messages("four", "five") + resp = producer.send_messages(self.topic, "four", "five") self.assertEquals(len(resp), 1) self.assertEquals(resp[0].error, 0) self.assertEquals(resp[0].offset, 2) # offset of first msg @@ -296,15 +323,15 @@ class TestKafkaClient(unittest.TestCase): producer.stop() def test_round_robin_partitioner(self): - producer = KeyedProducer(self.client, "test_round_robin_partitioner", + producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) - producer.send("key1", "one") - producer.send("key2", "two") - producer.send("key3", "three") - producer.send("key4", "four") + producer.send(self.topic, "key1", "one") + producer.send(self.topic, "key2", "two") + producer.send(self.topic, "key3", "three") + producer.send(self.topic, "key4", "four") - fetch1 = FetchRequest("test_round_robin_partitioner", 0, 0, 1024) - fetch2 = FetchRequest("test_round_robin_partitioner", 1, 0, 1024) + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) @@ -330,15 +357,15 @@ class TestKafkaClient(unittest.TestCase): producer.stop() def test_hashed_partitioner(self): - producer = KeyedProducer(self.client, "test_hash_partitioner", + producer = KeyedProducer(self.client, partitioner=HashedPartitioner) - producer.send(1, "one") - producer.send(2, "two") - producer.send(3, "three") - producer.send(4, "four") + producer.send(self.topic, 1, "one") + producer.send(self.topic, 2, "two") + producer.send(self.topic, 3, "three") + producer.send(self.topic, 4, "four") - fetch1 = FetchRequest("test_hash_partitioner", 0, 0, 1024) - fetch2 = FetchRequest("test_hash_partitioner", 1, 0, 1024) + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) @@ -364,12 +391,12 @@ class TestKafkaClient(unittest.TestCase): producer.stop() def test_acks_none(self): - producer = SimpleProducer(self.client, "test_acks_none", + producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED) - resp = producer.send_messages("one") + resp = producer.send_messages(self.topic, "one") self.assertEquals(len(resp), 0) - fetch = FetchRequest("test_acks_none", 0, 0, 1024) + fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch]) self.assertEquals(fetch_resp[0].error, 0) @@ -383,12 +410,12 @@ class TestKafkaClient(unittest.TestCase): producer.stop() def test_acks_local_write(self): - producer = SimpleProducer(self.client, "test_acks_local_write", + producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE) - resp = producer.send_messages("one") + resp = producer.send_messages(self.topic, "one") self.assertEquals(len(resp), 1) - fetch = FetchRequest("test_acks_local_write", 0, 0, 1024) + fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch]) self.assertEquals(fetch_resp[0].error, 0) @@ -403,12 +430,12 @@ class TestKafkaClient(unittest.TestCase): def test_acks_cluster_commit(self): producer = SimpleProducer( - self.client, "test_acks_cluster_commit", + self.client, req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT) - resp = producer.send_messages("one") + resp = producer.send_messages(self.topic, "one") self.assertEquals(len(resp), 1) - fetch = FetchRequest("test_acks_cluster_commit", 0, 0, 1024) + fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch]) self.assertEquals(fetch_resp[0].error, 0) @@ -422,16 +449,14 @@ class TestKafkaClient(unittest.TestCase): producer.stop() def test_async_simple_producer(self): - producer = SimpleProducer(self.client, "test_async_simple_producer", - async=True) - - resp = producer.send_messages("one") + producer = SimpleProducer(self.client, async=True) + resp = producer.send_messages(self.topic, "one") self.assertEquals(len(resp), 0) # Give it some time time.sleep(2) - fetch = FetchRequest("test_async_simple_producer", 0, 0, 1024) + fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch]) self.assertEquals(fetch_resp[0].error, 0) @@ -445,16 +470,15 @@ class TestKafkaClient(unittest.TestCase): producer.stop() def test_async_keyed_producer(self): - producer = KeyedProducer(self.client, "test_async_keyed_producer", - async=True) + producer = KeyedProducer(self.client, async=True) - resp = producer.send("key1", "one") + resp = producer.send(self.topic, "key1", "one") self.assertEquals(len(resp), 0) # Give it some time time.sleep(2) - fetch = FetchRequest("test_async_keyed_producer", 0, 0, 1024) + fetch = FetchRequest(self.topic, 0, 0, 1024) fetch_resp = self.client.send_fetch_request([fetch]) self.assertEquals(fetch_resp[0].error, 0) @@ -468,14 +492,14 @@ class TestKafkaClient(unittest.TestCase): producer.stop() def test_batched_simple_producer(self): - producer = SimpleProducer(self.client, "test_batched_simple_producer", + producer = SimpleProducer(self.client, batch_send=True, batch_send_every_n=10, batch_send_every_t=20) # Send 5 messages and do a fetch msgs = ["message-%d" % i for i in range(0, 5)] - resp = producer.send_messages(*msgs) + resp = producer.send_messages(self.topic, *msgs) # Batch mode is async. No ack self.assertEquals(len(resp), 0) @@ -483,8 +507,8 @@ class TestKafkaClient(unittest.TestCase): # Give it some time time.sleep(2) - fetch1 = FetchRequest("test_batched_simple_producer", 0, 0, 1024) - fetch2 = FetchRequest("test_batched_simple_producer", 1, 0, 1024) + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) @@ -498,13 +522,13 @@ class TestKafkaClient(unittest.TestCase): # Send 5 more messages, wait for 2 seconds and do a fetch msgs = ["message-%d" % i for i in range(5, 10)] - resp = producer.send_messages(*msgs) + resp = producer.send_messages(self.topic, *msgs) # Give it some time time.sleep(2) - fetch1 = FetchRequest("test_batched_simple_producer", 0, 0, 1024) - fetch2 = FetchRequest("test_batched_simple_producer", 1, 0, 1024) + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) @@ -518,12 +542,12 @@ class TestKafkaClient(unittest.TestCase): # Send 7 messages and wait for 20 seconds msgs = ["message-%d" % i for i in range(10, 15)] - resp = producer.send_messages(*msgs) + resp = producer.send_messages(self.topic, *msgs) msgs = ["message-%d" % i for i in range(15, 17)] - resp = producer.send_messages(*msgs) + resp = producer.send_messages(self.topic, *msgs) - fetch1 = FetchRequest("test_batched_simple_producer", 0, 5, 1024) - fetch2 = FetchRequest("test_batched_simple_producer", 1, 5, 1024) + fetch1 = FetchRequest(self.topic, 0, 5, 1024) + fetch2 = FetchRequest(self.topic, 1, 5, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) @@ -535,8 +559,8 @@ class TestKafkaClient(unittest.TestCase): # Give it some time time.sleep(22) - fetch1 = FetchRequest("test_batched_simple_producer", 0, 5, 1024) - fetch2 = FetchRequest("test_batched_simple_producer", 1, 5, 1024) + fetch1 = FetchRequest(self.topic, 0, 5, 1024) + fetch2 = FetchRequest(self.topic, 1, 5, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) @@ -548,7 +572,7 @@ class TestKafkaClient(unittest.TestCase): producer.stop() -class TestConsumer(unittest.TestCase): +class TestConsumer(KafkaTestCase): @classmethod def setUpClass(cls): cls.zk = ZookeeperFixture.instance() @@ -565,7 +589,7 @@ class TestConsumer(unittest.TestCase): def test_simple_consumer(self): # Produce 100 messages to partition 0 - produce1 = ProduceRequest("test_simple_consumer", 0, messages=[ + produce1 = ProduceRequest(self.topic, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(100) ]) @@ -574,7 +598,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.offset, 0) # Produce 100 messages to partition 1 - produce2 = ProduceRequest("test_simple_consumer", 1, messages=[ + produce2 = ProduceRequest(self.topic, 1, messages=[ create_message("Test message 1 %d" % i) for i in range(100) ]) @@ -583,7 +607,9 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.offset, 0) # Start a consumer - consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", + self.topic, auto_commit=False, + iter_timeout=0) all_messages = [] for message in consumer: all_messages.append(message) @@ -609,7 +635,9 @@ class TestConsumer(unittest.TestCase): consumer.stop() def test_simple_consumer_blocking(self): - consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer_blocking", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", + self.topic, + auto_commit=False, iter_timeout=0) # Blocking API start = datetime.now() @@ -619,7 +647,7 @@ class TestConsumer(unittest.TestCase): self.assertEqual(len(messages), 0) # Send 10 messages - produce = ProduceRequest("test_simple_consumer_blocking", 0, messages=[ + produce = ProduceRequest(self.topic, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) @@ -643,21 +671,22 @@ class TestConsumer(unittest.TestCase): def test_simple_consumer_pending(self): # Produce 10 messages to partition 0 and 1 - produce1 = ProduceRequest("test_simple_pending", 0, messages=[ + produce1 = ProduceRequest(self.topic, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) for resp in self.client.send_produce_request([produce1]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - produce2 = ProduceRequest("test_simple_pending", 1, messages=[ + produce2 = ProduceRequest(self.topic, 1, messages=[ create_message("Test message 1 %d" % i) for i in range(10) ]) for resp in self.client.send_produce_request([produce2]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = SimpleConsumer(self.client, "group1", "test_simple_pending", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", self.topic, + auto_commit=False, iter_timeout=0) self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) @@ -665,7 +694,7 @@ class TestConsumer(unittest.TestCase): def test_multi_process_consumer(self): # Produce 100 messages to partition 0 - produce1 = ProduceRequest("test_mpconsumer", 0, messages=[ + produce1 = ProduceRequest(self.topic, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(100) ]) @@ -674,7 +703,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.offset, 0) # Produce 100 messages to partition 1 - produce2 = ProduceRequest("test_mpconsumer", 1, messages=[ + produce2 = ProduceRequest(self.topic, 1, messages=[ create_message("Test message 1 %d" % i) for i in range(100) ]) @@ -683,7 +712,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.offset, 0) # Start a consumer - consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer", auto_commit=False) + consumer = MultiProcessConsumer(self.client, "grp1", self.topic, auto_commit=False) all_messages = [] for message in consumer: all_messages.append(message) @@ -696,11 +725,11 @@ class TestConsumer(unittest.TestCase): start = datetime.now() messages = consumer.get_messages(block=True, timeout=5) diff = (datetime.now() - start).total_seconds() - self.assertGreaterEqual(diff, 5) + self.assertGreaterEqual(diff, 4.999) self.assertEqual(len(messages), 0) # Send 10 messages - produce = ProduceRequest("test_mpconsumer", 0, messages=[ + produce = ProduceRequest(self.topic, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) @@ -723,7 +752,7 @@ class TestConsumer(unittest.TestCase): def test_multi_proc_pending(self): # Produce 10 messages to partition 0 and 1 - produce1 = ProduceRequest("test_mppending", 0, messages=[ + produce1 = ProduceRequest(self.topic, 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) @@ -731,7 +760,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - produce2 = ProduceRequest("test_mppending", 1, messages=[ + produce2 = ProduceRequest(self.topic, 1, messages=[ create_message("Test message 1 %d" % i) for i in range(10) ]) @@ -739,7 +768,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = MultiProcessConsumer(self.client, "group1", "test_mppending", auto_commit=False) + consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False) self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) @@ -749,74 +778,96 @@ class TestConsumer(unittest.TestCase): def test_large_messages(self): # Produce 10 "normal" size messages messages1 = [create_message(random_string(1024)) for i in range(10)] - produce1 = ProduceRequest("test_large_messages", 0, messages1) + produce1 = ProduceRequest(self.topic, 0, messages1) for resp in self.client.send_produce_request([produce1]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - # Produce 10 messages that are too large (bigger than default fetch size) + # Produce 10 messages that are large (bigger than default fetch size) messages2 = [create_message(random_string(5000)) for i in range(10)] - produce2 = ProduceRequest("test_large_messages", 0, messages2) + produce2 = ProduceRequest(self.topic, 0, messages2) for resp in self.client.send_produce_request([produce2]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 10) # Consumer should still get all of them - consumer = SimpleConsumer(self.client, "group1", "test_large_messages", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", self.topic, + auto_commit=False, iter_timeout=0) all_messages = messages1 + messages2 for i, message in enumerate(consumer): self.assertEquals(all_messages[i], message.message) self.assertEquals(i, 19) -class TestFailover(unittest.TestCase): + # Produce 1 message that is too large (bigger than max fetch size) + big_message_size = MAX_FETCH_BUFFER_SIZE_BYTES + 10 + big_message = create_message(random_string(big_message_size)) + produce3 = ProduceRequest(self.topic, 0, [big_message]) + for resp in self.client.send_produce_request([produce3]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 20) - @classmethod - def setUpClass(cls): + self.assertRaises(ConsumerFetchSizeTooSmall, consumer.get_message, False, 0.1) + + # Create a consumer with no fetch size limit + big_consumer = SimpleConsumer(self.client, "group1", self.topic, + max_buffer_size=None, partitions=[0], + auto_commit=False, iter_timeout=0) + # Seek to the last message + big_consumer.seek(-1, 2) + + # Consume giant message successfully + message = big_consumer.get_message(block=False, timeout=10) + self.assertIsNotNone(message) + self.assertEquals(message.message.value, big_message.value) + + +class TestFailover(KafkaTestCase): + + def setUp(self): zk_chroot = random_string(10) replicas = 2 partitions = 2 # mini zookeeper, 2 kafka brokers - cls.zk = ZookeeperFixture.instance() - kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] - cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] + self.zk = ZookeeperFixture.instance() + kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] + self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] hosts = ','.join(['%s:%d' % (b.host, b.port) for b in cls.brokers]) cls.client = KafkaClient(hosts) + super(TestFailover, self).setUp() - @classmethod - def tearDownClass(cls): - cls.client.close() - for broker in cls.brokers: + def tearDown(self): + self.client.close() + for broker in self.brokers: broker.close() - cls.zk.close() + self.zk.close() def test_switch_leader(self): - - key, topic, partition = random_string(5), 'test_switch_leader', 0 - producer = SimpleProducer(self.client, topic) + key, topic, partition = random_string(5), self.topic, 0 + producer = SimpleProducer(self.client) for i in range(1, 4): # XXX unfortunately, the conns dict needs to be warmed for this to work # XXX unfortunately, for warming to work, we need at least as many partitions as brokers - self._send_random_messages(producer, 10) + self._send_random_messages(producer, self.topic, 10) # kil leader for partition 0 broker = self._kill_leader(topic, partition) # expect failure, reload meta data - with self.assertRaises(FailedPayloadsException): - producer.send_messages('part 1') - producer.send_messages('part 2') + with self.assertRaises(FailedPayloadsError): + producer.send_messages(self.topic, 'part 1') + producer.send_messages(self.topic, 'part 2') time.sleep(1) # send to new leader - self._send_random_messages(producer, 10) + self._send_random_messages(producer, self.topic, 10) broker.open() time.sleep(3) @@ -828,24 +879,23 @@ class TestFailover(unittest.TestCase): producer.stop() def test_switch_leader_async(self): - - key, topic, partition = random_string(5), 'test_switch_leader_async', 0 - producer = SimpleProducer(self.client, topic, async=True) + key, topic, partition = random_string(5), self.topic, 0 + producer = SimpleProducer(self.client, async=True) for i in range(1, 4): - self._send_random_messages(producer, 10) + self._send_random_messages(producer, self.topic, 10) # kil leader for partition 0 broker = self._kill_leader(topic, partition) # expect failure, reload meta data - producer.send_messages('part 1') - producer.send_messages('part 2') + producer.send_messages(self.topic, 'part 1') + producer.send_messages(self.topic, 'part 2') time.sleep(1) # send to new leader - self._send_random_messages(producer, 10) + self._send_random_messages(producer, self.topic, 10) broker.open() time.sleep(3) @@ -856,9 +906,9 @@ class TestFailover(unittest.TestCase): producer.stop() - def _send_random_messages(self, producer, n): + def _send_random_messages(self, producer, topic, n): for j in range(n): - resp = producer.send_messages(random_string(10)) + resp = producer.send_messages(topic, random_string(10)) if len(resp) > 0: self.assertEquals(resp[0].error, 0) time.sleep(1) # give it some time @@ -871,10 +921,9 @@ class TestFailover(unittest.TestCase): return broker def _count_messages(self, group, topic): - hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port) client = KafkaClient(hosts) - consumer = SimpleConsumer(client, group, topic, auto_commit=False) + consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0) all_messages = [] for message in consumer: all_messages.append(message) @@ -882,11 +931,6 @@ class TestFailover(unittest.TestCase): client.close() return len(all_messages) - -def random_string(l): - s = "".join(random.choice(string.letters) for i in xrange(l)) - return s - if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) unittest.main() |