diff options
-rw-r--r-- | kafka/protocol.py | 2 | ||||
-rw-r--r-- | test/test_codec.py | 17 | ||||
-rw-r--r-- | test/test_integration.py | 1867 | ||||
-rw-r--r-- | test/test_protocol.py | 33 | ||||
-rw-r--r-- | test/testutil.py | 10 | ||||
-rw-r--r-- | tox.ini | 6 |
6 files changed, 979 insertions, 956 deletions
diff --git a/kafka/protocol.py b/kafka/protocol.py index 9b8f3b3..7ec7946 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -50,7 +50,7 @@ class KafkaProtocol(object): request_key, # ApiKey 0, # ApiVersion correlation_id, # CorrelationId - len(client_id), + len(client_id), # ClientId size client_id) # ClientId @classmethod diff --git a/test/test_codec.py b/test/test_codec.py index 8872fe7..7fedb71 100644 --- a/test/test_codec.py +++ b/test/test_codec.py @@ -20,27 +20,20 @@ from kafka.codec import ( from kafka.protocol import ( create_gzip_message, create_message, create_snappy_message, KafkaProtocol ) - -ITERATIONS = 1000 -STRLEN = 100 - - -def random_string(): - return os.urandom(random.randint(1, STRLEN)) - +from .testutil import * class TestCodec(unittest.TestCase): @unittest.skipUnless(has_gzip(), "Gzip not available") def test_gzip(self): - for i in xrange(ITERATIONS): - s1 = random_string() + for i in xrange(1000): + s1 = random_string(100) s2 = gzip_decode(gzip_encode(s1)) self.assertEquals(s1, s2) @unittest.skipUnless(has_snappy(), "Snappy not available") def test_snappy(self): - for i in xrange(ITERATIONS): - s1 = random_string() + for i in xrange(1000): + s1 = random_string(100) s2 = snappy_decode(snappy_encode(s1)) self.assertEquals(s1, s2) diff --git a/test/test_integration.py b/test/test_integration.py index 973913d..cf3a632 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -1,936 +1,931 @@ -#import logging -#import unittest -#import time -#from datetime import datetime -#import string -#import random -# -#from kafka import * # noqa -#from kafka.common import * # noqa -#from kafka.codec import has_gzip, has_snappy -#from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES -#from .fixtures import ZookeeperFixture, KafkaFixture -# -# -#def random_string(l): -# s = "".join(random.choice(string.letters) for i in xrange(l)) -# return s -# -# -#def ensure_topic_creation(client, topic_name): -# times = 0 -# while True: -# times += 1 -# client.load_metadata_for_topics(topic_name) -# if client.has_metadata_for_topic(topic_name): -# break -# print "Waiting for %s topic to be created" % topic_name -# time.sleep(1) -# -# if times > 30: -# raise Exception("Unable to create topic %s" % topic_name) -# -# -#class KafkaTestCase(unittest.TestCase): -# def setUp(self): -# self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10)) -# ensure_topic_creation(self.client, self.topic) -# -# -#class TestKafkaClient(KafkaTestCase): -# @classmethod -# def setUpClass(cls): # noqa -# cls.zk = ZookeeperFixture.instance() -# cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) -# cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port)) -# -# @classmethod -# def tearDownClass(cls): # noqa -# cls.client.close() -# cls.server.close() -# cls.zk.close() -# -# ##################### -# # Produce Tests # -# ##################### -# -# def test_produce_many_simple(self): -# -# produce = ProduceRequest(self.topic, 0, messages=[ -# create_message("Test message %d" % i) for i in range(100) -# ]) -# -# for resp in self.client.send_produce_request([produce]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 0) -# -# (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) -# self.assertEquals(offset.offsets[0], 100) -# -# for resp in self.client.send_produce_request([produce]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 100) -# -# (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) -# self.assertEquals(offset.offsets[0], 200) -# -# for resp in self.client.send_produce_request([produce]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 200) -# -# (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) -# self.assertEquals(offset.offsets[0], 300) -# -# def test_produce_10k_simple(self): -# produce = ProduceRequest(self.topic, 0, messages=[ -# create_message("Test message %d" % i) for i in range(10000) -# ]) -# -# for resp in self.client.send_produce_request([produce]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 0) -# -# (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) -# self.assertEquals(offset.offsets[0], 10000) -# -# def test_produce_many_gzip(self): -# if not has_gzip(): -# return -# message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)]) -# message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)]) -# -# produce = ProduceRequest(self.topic, 0, messages=[message1, message2]) -# -# for resp in self.client.send_produce_request([produce]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 0) -# -# (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) -# self.assertEquals(offset.offsets[0], 200) -# -# def test_produce_many_snappy(self): -# if not has_snappy(): -# return -# message1 = create_snappy_message(["Snappy 1 %d" % i for i in range(100)]) -# message2 = create_snappy_message(["Snappy 2 %d" % i for i in range(100)]) -# -# produce = ProduceRequest(self.topic, 0, messages=[message1, message2]) -# -# for resp in self.client.send_produce_request([produce]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 0) -# -# (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) -# self.assertEquals(offset.offsets[0], 200) -# -# def test_produce_mixed(self): -# if not has_gzip() or not has_snappy(): -# return -# message1 = create_message("Just a plain message") -# message2 = create_gzip_message(["Gzipped %d" % i for i in range(100)]) -# message3 = create_snappy_message(["Snappy %d" % i for i in range(100)]) -# -# produce = ProduceRequest(self.topic, 0, messages=[message1, message2, message3]) -# -# for resp in self.client.send_produce_request([produce]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 0) -# -# (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) -# self.assertEquals(offset.offsets[0], 201) -# -# def test_produce_100k_gzipped(self): -# req1 = ProduceRequest(self.topic, 0, messages=[ -# create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)]) -# ]) -# -# for resp in self.client.send_produce_request([req1]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 0) -# -# (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) -# self.assertEquals(offset.offsets[0], 50000) -# -# req2 = ProduceRequest(self.topic, 0, messages=[ -# create_gzip_message(["Gzipped batch 2, message %d" % i for i in range(50000)]) -# ]) -# -# for resp in self.client.send_produce_request([req2]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 50000) -# -# (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) -# self.assertEquals(offset.offsets[0], 100000) -# -# ##################### -# # Consume Tests # -# ##################### -# -# def test_consume_none(self): -# fetch = FetchRequest(self.topic, 0, 0, 1024) -# -# fetch_resp = self.client.send_fetch_request([fetch])[0] -# self.assertEquals(fetch_resp.error, 0) -# self.assertEquals(fetch_resp.topic, self.topic) -# self.assertEquals(fetch_resp.partition, 0) -# -# messages = list(fetch_resp.messages) -# self.assertEquals(len(messages), 0) -# -# def test_produce_consume(self): -# produce = ProduceRequest(self.topic, 0, messages=[ -# create_message("Just a test message"), -# create_message("Message with a key", "foo"), -# ]) -# -# for resp in self.client.send_produce_request([produce]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 0) -# -# fetch = FetchRequest(self.topic, 0, 0, 1024) -# -# fetch_resp = self.client.send_fetch_request([fetch])[0] -# self.assertEquals(fetch_resp.error, 0) -# -# messages = list(fetch_resp.messages) -# self.assertEquals(len(messages), 2) -# self.assertEquals(messages[0].offset, 0) -# self.assertEquals(messages[0].message.value, "Just a test message") -# self.assertEquals(messages[0].message.key, None) -# self.assertEquals(messages[1].offset, 1) -# self.assertEquals(messages[1].message.value, "Message with a key") -# self.assertEquals(messages[1].message.key, "foo") -# -# def test_produce_consume_many(self): -# produce = ProduceRequest(self.topic, 0, messages=[ -# create_message("Test message %d" % i) for i in range(100) -# ]) -# -# for resp in self.client.send_produce_request([produce]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 0) -# -# # 1024 is not enough for 100 messages... -# fetch1 = FetchRequest(self.topic, 0, 0, 1024) -# -# (fetch_resp1,) = self.client.send_fetch_request([fetch1]) -# -# self.assertEquals(fetch_resp1.error, 0) -# self.assertEquals(fetch_resp1.highwaterMark, 100) -# messages = list(fetch_resp1.messages) -# self.assertTrue(len(messages) < 100) -# -# # 10240 should be enough -# fetch2 = FetchRequest(self.topic, 0, 0, 10240) -# (fetch_resp2,) = self.client.send_fetch_request([fetch2]) -# -# self.assertEquals(fetch_resp2.error, 0) -# self.assertEquals(fetch_resp2.highwaterMark, 100) -# messages = list(fetch_resp2.messages) -# self.assertEquals(len(messages), 100) -# for i, message in enumerate(messages): -# self.assertEquals(message.offset, i) -# self.assertEquals(message.message.value, "Test message %d" % i) -# self.assertEquals(message.message.key, None) -# -# def test_produce_consume_two_partitions(self): -# produce1 = ProduceRequest(self.topic, 0, messages=[ -# create_message("Partition 0 %d" % i) for i in range(10) -# ]) -# produce2 = ProduceRequest(self.topic, 1, messages=[ -# create_message("Partition 1 %d" % i) for i in range(10) -# ]) -# -# for resp in self.client.send_produce_request([produce1, produce2]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 0) -# -# fetch1 = FetchRequest(self.topic, 0, 0, 1024) -# fetch2 = FetchRequest(self.topic, 1, 0, 1024) -# fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) -# self.assertEquals(fetch_resp1.error, 0) -# self.assertEquals(fetch_resp1.highwaterMark, 10) -# messages = list(fetch_resp1.messages) -# self.assertEquals(len(messages), 10) -# for i, message in enumerate(messages): -# self.assertEquals(message.offset, i) -# self.assertEquals(message.message.value, "Partition 0 %d" % i) -# self.assertEquals(message.message.key, None) -# self.assertEquals(fetch_resp2.error, 0) -# self.assertEquals(fetch_resp2.highwaterMark, 10) -# messages = list(fetch_resp2.messages) -# self.assertEquals(len(messages), 10) -# for i, message in enumerate(messages): -# self.assertEquals(message.offset, i) -# self.assertEquals(message.message.value, "Partition 1 %d" % i) -# self.assertEquals(message.message.key, None) -# -# #################### -# # Offset Tests # -# #################### -# -# @unittest.skip('commmit offset not supported in this version') -# def test_commit_fetch_offsets(self): -# req = OffsetCommitRequest(self.topic, 0, 42, "metadata") -# (resp,) = self.client.send_offset_commit_request("group", [req]) -# self.assertEquals(resp.error, 0) -# -# req = OffsetFetchRequest(self.topic, 0) -# (resp,) = self.client.send_offset_fetch_request("group", [req]) -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 42) -# self.assertEquals(resp.metadata, "") # Metadata isn't stored for now -# -# # Producer Tests -# -# def test_simple_producer(self): -# producer = SimpleProducer(self.client) -# resp = producer.send_messages(self.topic, "one", "two") -# -# # Will go to partition 0 -# self.assertEquals(len(resp), 1) -# self.assertEquals(resp[0].error, 0) -# self.assertEquals(resp[0].offset, 0) # offset of first msg -# -# # Will go to partition 1 -# resp = producer.send_messages(self.topic, "three") -# self.assertEquals(len(resp), 1) -# self.assertEquals(resp[0].error, 0) -# self.assertEquals(resp[0].offset, 0) # offset of first msg -# -# fetch1 = FetchRequest(self.topic, 0, 0, 1024) -# fetch2 = FetchRequest(self.topic, 1, 0, 1024) -# fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, -# fetch2]) -# self.assertEquals(fetch_resp1.error, 0) -# self.assertEquals(fetch_resp1.highwaterMark, 2) -# messages = list(fetch_resp1.messages) -# self.assertEquals(len(messages), 2) -# self.assertEquals(messages[0].message.value, "one") -# self.assertEquals(messages[1].message.value, "two") -# self.assertEquals(fetch_resp2.error, 0) -# self.assertEquals(fetch_resp2.highwaterMark, 1) -# messages = list(fetch_resp2.messages) -# self.assertEquals(len(messages), 1) -# self.assertEquals(messages[0].message.value, "three") -# -# # Will go to partition 0 -# resp = producer.send_messages(self.topic, "four", "five") -# self.assertEquals(len(resp), 1) -# self.assertEquals(resp[0].error, 0) -# self.assertEquals(resp[0].offset, 2) # offset of first msg -# -# producer.stop() -# -# def test_round_robin_partitioner(self): -# producer = KeyedProducer(self.client, -# partitioner=RoundRobinPartitioner) -# producer.send(self.topic, "key1", "one") -# producer.send(self.topic, "key2", "two") -# producer.send(self.topic, "key3", "three") -# producer.send(self.topic, "key4", "four") -# -# fetch1 = FetchRequest(self.topic, 0, 0, 1024) -# fetch2 = FetchRequest(self.topic, 1, 0, 1024) -# -# fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, -# fetch2]) -# -# self.assertEquals(fetch_resp1.error, 0) -# self.assertEquals(fetch_resp1.highwaterMark, 2) -# self.assertEquals(fetch_resp1.partition, 0) -# -# messages = list(fetch_resp1.messages) -# self.assertEquals(len(messages), 2) -# self.assertEquals(messages[0].message.value, "one") -# self.assertEquals(messages[1].message.value, "three") -# -# self.assertEquals(fetch_resp2.error, 0) -# self.assertEquals(fetch_resp2.highwaterMark, 2) -# self.assertEquals(fetch_resp2.partition, 1) -# -# messages = list(fetch_resp2.messages) -# self.assertEquals(len(messages), 2) -# self.assertEquals(messages[0].message.value, "two") -# self.assertEquals(messages[1].message.value, "four") -# -# producer.stop() -# -# def test_hashed_partitioner(self): -# producer = KeyedProducer(self.client, -# partitioner=HashedPartitioner) -# producer.send(self.topic, 1, "one") -# producer.send(self.topic, 2, "two") -# producer.send(self.topic, 3, "three") -# producer.send(self.topic, 4, "four") -# -# fetch1 = FetchRequest(self.topic, 0, 0, 1024) -# fetch2 = FetchRequest(self.topic, 1, 0, 1024) -# -# fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, -# fetch2]) -# -# self.assertEquals(fetch_resp1.error, 0) -# self.assertEquals(fetch_resp1.highwaterMark, 2) -# self.assertEquals(fetch_resp1.partition, 0) -# -# messages = list(fetch_resp1.messages) -# self.assertEquals(len(messages), 2) -# self.assertEquals(messages[0].message.value, "two") -# self.assertEquals(messages[1].message.value, "four") -# -# self.assertEquals(fetch_resp2.error, 0) -# self.assertEquals(fetch_resp2.highwaterMark, 2) -# self.assertEquals(fetch_resp2.partition, 1) -# -# messages = list(fetch_resp2.messages) -# self.assertEquals(len(messages), 2) -# self.assertEquals(messages[0].message.value, "one") -# self.assertEquals(messages[1].message.value, "three") -# -# producer.stop() -# -# def test_acks_none(self): -# producer = SimpleProducer(self.client, -# req_acks=SimpleProducer.ACK_NOT_REQUIRED) -# resp = producer.send_messages(self.topic, "one") -# self.assertEquals(len(resp), 0) -# -# fetch = FetchRequest(self.topic, 0, 0, 1024) -# fetch_resp = self.client.send_fetch_request([fetch]) -# -# self.assertEquals(fetch_resp[0].error, 0) -# self.assertEquals(fetch_resp[0].highwaterMark, 1) -# self.assertEquals(fetch_resp[0].partition, 0) -# -# messages = list(fetch_resp[0].messages) -# self.assertEquals(len(messages), 1) -# self.assertEquals(messages[0].message.value, "one") -# -# producer.stop() -# -# def test_acks_local_write(self): -# producer = SimpleProducer(self.client, -# req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE) -# resp = producer.send_messages(self.topic, "one") -# self.assertEquals(len(resp), 1) -# -# fetch = FetchRequest(self.topic, 0, 0, 1024) -# fetch_resp = self.client.send_fetch_request([fetch]) -# -# self.assertEquals(fetch_resp[0].error, 0) -# self.assertEquals(fetch_resp[0].highwaterMark, 1) -# self.assertEquals(fetch_resp[0].partition, 0) -# -# messages = list(fetch_resp[0].messages) -# self.assertEquals(len(messages), 1) -# self.assertEquals(messages[0].message.value, "one") -# -# producer.stop() -# -# def test_acks_cluster_commit(self): -# producer = SimpleProducer( -# self.client, -# req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT) -# resp = producer.send_messages(self.topic, "one") -# self.assertEquals(len(resp), 1) -# -# fetch = FetchRequest(self.topic, 0, 0, 1024) -# fetch_resp = self.client.send_fetch_request([fetch]) -# -# self.assertEquals(fetch_resp[0].error, 0) -# self.assertEquals(fetch_resp[0].highwaterMark, 1) -# self.assertEquals(fetch_resp[0].partition, 0) -# -# messages = list(fetch_resp[0].messages) -# self.assertEquals(len(messages), 1) -# self.assertEquals(messages[0].message.value, "one") -# -# producer.stop() -# -# def test_async_simple_producer(self): -# producer = SimpleProducer(self.client, async=True) -# resp = producer.send_messages(self.topic, "one") -# self.assertEquals(len(resp), 0) -# -# # Give it some time -# time.sleep(2) -# -# fetch = FetchRequest(self.topic, 0, 0, 1024) -# fetch_resp = self.client.send_fetch_request([fetch]) -# -# self.assertEquals(fetch_resp[0].error, 0) -# self.assertEquals(fetch_resp[0].highwaterMark, 1) -# self.assertEquals(fetch_resp[0].partition, 0) -# -# messages = list(fetch_resp[0].messages) -# self.assertEquals(len(messages), 1) -# self.assertEquals(messages[0].message.value, "one") -# -# producer.stop() -# -# def test_async_keyed_producer(self): -# producer = KeyedProducer(self.client, async=True) -# -# resp = producer.send(self.topic, "key1", "one") -# self.assertEquals(len(resp), 0) -# -# # Give it some time -# time.sleep(2) -# -# fetch = FetchRequest(self.topic, 0, 0, 1024) -# fetch_resp = self.client.send_fetch_request([fetch]) -# -# self.assertEquals(fetch_resp[0].error, 0) -# self.assertEquals(fetch_resp[0].highwaterMark, 1) -# self.assertEquals(fetch_resp[0].partition, 0) -# -# messages = list(fetch_resp[0].messages) -# self.assertEquals(len(messages), 1) -# self.assertEquals(messages[0].message.value, "one") -# -# producer.stop() -# -# def test_batched_simple_producer(self): -# producer = SimpleProducer(self.client, -# batch_send=True, -# batch_send_every_n=10, -# batch_send_every_t=20) -# -# # Send 5 messages and do a fetch -# msgs = ["message-%d" % i for i in range(0, 5)] -# resp = producer.send_messages(self.topic, *msgs) -# -# # Batch mode is async. No ack -# self.assertEquals(len(resp), 0) -# -# # Give it some time -# time.sleep(2) -# -# fetch1 = FetchRequest(self.topic, 0, 0, 1024) -# fetch2 = FetchRequest(self.topic, 1, 0, 1024) -# fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, -# fetch2]) -# -# self.assertEquals(fetch_resp1.error, 0) -# messages = list(fetch_resp1.messages) -# self.assertEquals(len(messages), 0) -# -# self.assertEquals(fetch_resp2.error, 0) -# messages = list(fetch_resp2.messages) -# self.assertEquals(len(messages), 0) -# -# # Send 5 more messages, wait for 2 seconds and do a fetch -# msgs = ["message-%d" % i for i in range(5, 10)] -# resp = producer.send_messages(self.topic, *msgs) -# -# # Give it some time -# time.sleep(2) -# -# fetch1 = FetchRequest(self.topic, 0, 0, 1024) -# fetch2 = FetchRequest(self.topic, 1, 0, 1024) -# fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, -# fetch2]) -# -# self.assertEquals(fetch_resp1.error, 0) -# messages = list(fetch_resp1.messages) -# self.assertEquals(len(messages), 5) -# -# self.assertEquals(fetch_resp2.error, 0) -# messages = list(fetch_resp2.messages) -# self.assertEquals(len(messages), 5) -# -# # Send 7 messages and wait for 20 seconds -# msgs = ["message-%d" % i for i in range(10, 15)] -# resp = producer.send_messages(self.topic, *msgs) -# msgs = ["message-%d" % i for i in range(15, 17)] -# resp = producer.send_messages(self.topic, *msgs) -# -# fetch1 = FetchRequest(self.topic, 0, 5, 1024) -# fetch2 = FetchRequest(self.topic, 1, 5, 1024) -# fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, -# fetch2]) -# -# self.assertEquals(fetch_resp1.error, 0) -# self.assertEquals(fetch_resp2.error, 0) -# messages = list(fetch_resp1.messages) + list(fetch_resp2.messages) -# self.assertEquals(len(messages), 0) -# -# # Give it some time -# time.sleep(22) -# -# fetch1 = FetchRequest(self.topic, 0, 5, 1024) -# fetch2 = FetchRequest(self.topic, 1, 5, 1024) -# fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, -# fetch2]) -# -# self.assertEquals(fetch_resp1.error, 0) -# self.assertEquals(fetch_resp2.error, 0) -# messages = list(fetch_resp1.messages) + list(fetch_resp2.messages) -# self.assertEquals(len(messages), 7) -# -# producer.stop() -# -# -#class TestConsumer(KafkaTestCase): -# @classmethod -# def setUpClass(cls): -# cls.zk = ZookeeperFixture.instance() -# cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) -# cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) -# cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port)) -# -# @classmethod -# def tearDownClass(cls): # noqa -# cls.client.close() -# cls.server1.close() -# cls.server2.close() -# cls.zk.close() -# -# def test_simple_consumer(self): -# # Produce 100 messages to partition 0 -# produce1 = ProduceRequest(self.topic, 0, messages=[ -# create_message("Test message 0 %d" % i) for i in range(100) -# ]) -# -# for resp in self.client.send_produce_request([produce1]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 0) -# -# # Produce 100 messages to partition 1 -# produce2 = ProduceRequest(self.topic, 1, messages=[ -# create_message("Test message 1 %d" % i) for i in range(100) -# ]) -# -# for resp in self.client.send_produce_request([produce2]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 0) -# -# # Start a consumer -# consumer = SimpleConsumer(self.client, "group1", -# self.topic, auto_commit=False, -# iter_timeout=0) -# all_messages = [] -# for message in consumer: -# all_messages.append(message) -# -# self.assertEquals(len(all_messages), 200) -# # Make sure there are no duplicates -# self.assertEquals(len(all_messages), len(set(all_messages))) -# -# consumer.seek(-10, 2) -# all_messages = [] -# for message in consumer: -# all_messages.append(message) -# -# self.assertEquals(len(all_messages), 10) -# -# consumer.seek(-13, 2) -# all_messages = [] -# for message in consumer: -# all_messages.append(message) -# -# self.assertEquals(len(all_messages), 13) -# -# consumer.stop() -# -# def test_simple_consumer_blocking(self): -# consumer = SimpleConsumer(self.client, "group1", -# self.topic, -# auto_commit=False, iter_timeout=0) -# -# # Blocking API -# start = datetime.now() -# messages = consumer.get_messages(block=True, timeout=5) -# diff = (datetime.now() - start).total_seconds() -# self.assertGreaterEqual(diff, 5) -# self.assertEqual(len(messages), 0) -# -# # Send 10 messages -# produce = ProduceRequest(self.topic, 0, messages=[ -# create_message("Test message 0 %d" % i) for i in range(10) -# ]) -# -# for resp in self.client.send_produce_request([produce]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 0) -# -# # Fetch 5 messages -# messages = consumer.get_messages(count=5, block=True, timeout=5) -# self.assertEqual(len(messages), 5) -# -# # Fetch 10 messages -# start = datetime.now() -# messages = consumer.get_messages(count=10, block=True, timeout=5) -# self.assertEqual(len(messages), 5) -# diff = (datetime.now() - start).total_seconds() -# self.assertGreaterEqual(diff, 5) -# -# consumer.stop() -# -# def test_simple_consumer_pending(self): -# # Produce 10 messages to partition 0 and 1 -# -# produce1 = ProduceRequest(self.topic, 0, messages=[ -# create_message("Test message 0 %d" % i) for i in range(10) -# ]) -# for resp in self.client.send_produce_request([produce1]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 0) -# -# produce2 = ProduceRequest(self.topic, 1, messages=[ -# create_message("Test message 1 %d" % i) for i in range(10) -# ]) -# for resp in self.client.send_produce_request([produce2]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 0) -# -# consumer = SimpleConsumer(self.client, "group1", self.topic, -# auto_commit=False, iter_timeout=0) -# self.assertEquals(consumer.pending(), 20) -# self.assertEquals(consumer.pending(partitions=[0]), 10) -# self.assertEquals(consumer.pending(partitions=[1]), 10) -# consumer.stop() -# -# def test_multi_process_consumer(self): -# # Produce 100 messages to partition 0 -# produce1 = ProduceRequest(self.topic, 0, messages=[ -# create_message("Test message 0 %d" % i) for i in range(100) -# ]) -# -# for resp in self.client.send_produce_request([produce1]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 0) -# -# # Produce 100 messages to partition 1 -# produce2 = ProduceRequest(self.topic, 1, messages=[ -# create_message("Test message 1 %d" % i) for i in range(100) -# ]) -# -# for resp in self.client.send_produce_request([produce2]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 0) -# -# # Start a consumer -# consumer = MultiProcessConsumer(self.client, "grp1", self.topic, auto_commit=False) -# all_messages = [] -# for message in consumer: -# all_messages.append(message) -# -# self.assertEquals(len(all_messages), 200) -# # Make sure there are no duplicates -# self.assertEquals(len(all_messages), len(set(all_messages))) -# -# # Blocking API -# start = datetime.now() -# messages = consumer.get_messages(block=True, timeout=5) -# diff = (datetime.now() - start).total_seconds() -# self.assertGreaterEqual(diff, 4.999) -# self.assertEqual(len(messages), 0) -# -# # Send 10 messages -# produce = ProduceRequest(self.topic, 0, messages=[ -# create_message("Test message 0 %d" % i) for i in range(10) -# ]) -# -# for resp in self.client.send_produce_request([produce]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 100) -# -# # Fetch 5 messages -# messages = consumer.get_messages(count=5, block=True, timeout=5) -# self.assertEqual(len(messages), 5) -# -# # Fetch 10 messages -# start = datetime.now() -# messages = consumer.get_messages(count=10, block=True, timeout=5) -# self.assertEqual(len(messages), 5) -# diff = (datetime.now() - start).total_seconds() -# self.assertGreaterEqual(diff, 5) -# -# consumer.stop() -# -# def test_multi_proc_pending(self): -# # Produce 10 messages to partition 0 and 1 -# produce1 = ProduceRequest(self.topic, 0, messages=[ -# create_message("Test message 0 %d" % i) for i in range(10) -# ]) -# -# for resp in self.client.send_produce_request([produce1]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 0) -# -# produce2 = ProduceRequest(self.topic, 1, messages=[ -# create_message("Test message 1 %d" % i) for i in range(10) -# ]) -# -# for resp in self.client.send_produce_request([produce2]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 0) -# -# consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False) -# self.assertEquals(consumer.pending(), 20) -# self.assertEquals(consumer.pending(partitions=[0]), 10) -# self.assertEquals(consumer.pending(partitions=[1]), 10) -# -# consumer.stop() -# -# def test_large_messages(self): -# # Produce 10 "normal" size messages -# messages1 = [create_message(random_string(1024)) for i in range(10)] -# produce1 = ProduceRequest(self.topic, 0, messages1) -# -# for resp in self.client.send_produce_request([produce1]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 0) -# -# # Produce 10 messages that are large (bigger than default fetch size) -# messages2 = [create_message(random_string(5000)) for i in range(10)] -# produce2 = ProduceRequest(self.topic, 0, messages2) -# -# for resp in self.client.send_produce_request([produce2]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 10) -# -# # Consumer should still get all of them -# consumer = SimpleConsumer(self.client, "group1", self.topic, -# auto_commit=False, iter_timeout=0) -# all_messages = messages1 + messages2 -# for i, message in enumerate(consumer): -# self.assertEquals(all_messages[i], message.message) -# self.assertEquals(i, 19) -# -# # Produce 1 message that is too large (bigger than max fetch size) -# big_message_size = MAX_FETCH_BUFFER_SIZE_BYTES + 10 -# big_message = create_message(random_string(big_message_size)) -# produce3 = ProduceRequest(self.topic, 0, [big_message]) -# for resp in self.client.send_produce_request([produce3]): -# self.assertEquals(resp.error, 0) -# self.assertEquals(resp.offset, 20) -# -# self.assertRaises(ConsumerFetchSizeTooSmall, consumer.get_message, False, 0.1) -# -# # Create a consumer with no fetch size limit -# big_consumer = SimpleConsumer(self.client, "group1", self.topic, -# max_buffer_size=None, partitions=[0], -# auto_commit=False, iter_timeout=0) -# -# # Seek to the last message -# big_consumer.seek(-1, 2) -# -# # Consume giant message successfully -# message = big_consumer.get_message(block=False, timeout=10) -# self.assertIsNotNone(message) -# self.assertEquals(message.message.value, big_message.value) -# -# -#class TestFailover(KafkaTestCase): -# -# @classmethod -# def setUpClass(cls): # noqa -# zk_chroot = random_string(10) -# replicas = 2 -# partitions = 2 -# -# # mini zookeeper, 2 kafka brokers -# cls.zk = ZookeeperFixture.instance() -# kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] -# cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] -# -# hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers] -# cls.client = KafkaClient(hosts) -# -# @classmethod -# def tearDownClass(cls): -# cls.client.close() -# for broker in cls.brokers: -# broker.close() -# cls.zk.close() -# -# def test_switch_leader(self): -# key, topic, partition = random_string(5), self.topic, 0 -# producer = SimpleProducer(self.client) -# -# for i in range(1, 4): -# -# # XXX unfortunately, the conns dict needs to be warmed for this to work -# # XXX unfortunately, for warming to work, we need at least as many partitions as brokers -# self._send_random_messages(producer, self.topic, 10) -# -# # kil leader for partition 0 -# broker = self._kill_leader(topic, partition) -# -# # expect failure, reload meta data -# with self.assertRaises(FailedPayloadsError): -# producer.send_messages(self.topic, 'part 1') -# producer.send_messages(self.topic, 'part 2') -# time.sleep(1) -# -# # send to new leader -# self._send_random_messages(producer, self.topic, 10) -# -# broker.open() -# time.sleep(3) -# -# # count number of messages -# count = self._count_messages('test_switch_leader group %s' % i, topic) -# self.assertIn(count, range(20 * i, 22 * i + 1)) -# -# producer.stop() -# -# def test_switch_leader_async(self): -# key, topic, partition = random_string(5), self.topic, 0 -# producer = SimpleProducer(self.client, async=True) -# -# for i in range(1, 4): -# -# self._send_random_messages(producer, self.topic, 10) -# -# # kil leader for partition 0 -# broker = self._kill_leader(topic, partition) -# -# # expect failure, reload meta data -# producer.send_messages(self.topic, 'part 1') -# producer.send_messages(self.topic, 'part 2') -# time.sleep(1) -# -# # send to new leader -# self._send_random_messages(producer, self.topic, 10) -# -# broker.open() -# time.sleep(3) -# -# # count number of messages -# count = self._count_messages('test_switch_leader_async group %s' % i, topic) -# self.assertIn(count, range(20 * i, 22 * i + 1)) -# -# producer.stop() -# -# def _send_random_messages(self, producer, topic, n): -# for j in range(n): -# resp = producer.send_messages(topic, random_string(10)) -# if len(resp) > 0: -# self.assertEquals(resp[0].error, 0) -# time.sleep(1) # give it some time -# -# def _kill_leader(self, topic, partition): -# leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)] -# broker = self.brokers[leader.nodeId] -# broker.close() -# time.sleep(1) # give it some time -# return broker -# -# def _count_messages(self, group, topic): -# hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port) -# client = KafkaClient(hosts) -# consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0) -# all_messages = [] -# for message in consumer: -# all_messages.append(message) -# consumer.stop() -# client.close() -# return len(all_messages) -# -#if __name__ == "__main__": -# logging.basicConfig(level=logging.DEBUG) -# unittest.main() +import logging +import unittest +import time +from datetime import datetime + +from kafka import * # noqa +from kafka.common import * # noqa +from kafka.codec import has_gzip, has_snappy +from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES +from .fixtures import ZookeeperFixture, KafkaFixture +from .testutil import * + +def ensure_topic_creation(client, topic_name): + times = 0 + while True: + times += 1 + client.load_metadata_for_topics(topic_name) + if client.has_metadata_for_topic(topic_name): + break + print "Waiting for %s topic to be created" % topic_name + time.sleep(1) + + if times > 30: + raise Exception("Unable to create topic %s" % topic_name) + +class KafkaTestCase(unittest.TestCase): + def setUp(self): + self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10)) + ensure_topic_creation(self.client, self.topic) + + +@unittest.skipIf(skip_integration(), 'Skipping Integration') +class TestKafkaClient(KafkaTestCase): + @classmethod + def setUpClass(cls): # noqa + cls.zk = ZookeeperFixture.instance() + cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) + cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port)) + + @classmethod + def tearDownClass(cls): # noqa + cls.client.close() + cls.server.close() + cls.zk.close() + + ##################### + # Produce Tests # + ##################### + + def test_produce_many_simple(self): + + produce = ProduceRequest(self.topic, 0, messages=[ + create_message("Test message %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 100) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 100) + + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 200) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 200) + + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 300) + + def test_produce_10k_simple(self): + produce = ProduceRequest(self.topic, 0, messages=[ + create_message("Test message %d" % i) for i in range(10000) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 10000) + + def test_produce_many_gzip(self): + if not has_gzip(): + return + message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)]) + message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)]) + + produce = ProduceRequest(self.topic, 0, messages=[message1, message2]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 200) + + def test_produce_many_snappy(self): + if not has_snappy(): + return + message1 = create_snappy_message(["Snappy 1 %d" % i for i in range(100)]) + message2 = create_snappy_message(["Snappy 2 %d" % i for i in range(100)]) + + produce = ProduceRequest(self.topic, 0, messages=[message1, message2]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 200) + + def test_produce_mixed(self): + if not has_gzip() or not has_snappy(): + return + message1 = create_message("Just a plain message") + message2 = create_gzip_message(["Gzipped %d" % i for i in range(100)]) + message3 = create_snappy_message(["Snappy %d" % i for i in range(100)]) + + produce = ProduceRequest(self.topic, 0, messages=[message1, message2, message3]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 201) + + def test_produce_100k_gzipped(self): + req1 = ProduceRequest(self.topic, 0, messages=[ + create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)]) + ]) + + for resp in self.client.send_produce_request([req1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 50000) + + req2 = ProduceRequest(self.topic, 0, messages=[ + create_gzip_message(["Gzipped batch 2, message %d" % i for i in range(50000)]) + ]) + + for resp in self.client.send_produce_request([req2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 50000) + + (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 100000) + + ##################### + # Consume Tests # + ##################### + + def test_consume_none(self): + fetch = FetchRequest(self.topic, 0, 0, 1024) + + fetch_resp = self.client.send_fetch_request([fetch])[0] + self.assertEquals(fetch_resp.error, 0) + self.assertEquals(fetch_resp.topic, self.topic) + self.assertEquals(fetch_resp.partition, 0) + + messages = list(fetch_resp.messages) + self.assertEquals(len(messages), 0) + + def test_produce_consume(self): + produce = ProduceRequest(self.topic, 0, messages=[ + create_message("Just a test message"), + create_message("Message with a key", "foo"), + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + fetch = FetchRequest(self.topic, 0, 0, 1024) + + fetch_resp = self.client.send_fetch_request([fetch])[0] + self.assertEquals(fetch_resp.error, 0) + + messages = list(fetch_resp.messages) + self.assertEquals(len(messages), 2) + self.assertEquals(messages[0].offset, 0) + self.assertEquals(messages[0].message.value, "Just a test message") + self.assertEquals(messages[0].message.key, None) + self.assertEquals(messages[1].offset, 1) + self.assertEquals(messages[1].message.value, "Message with a key") + self.assertEquals(messages[1].message.key, "foo") + + def test_produce_consume_many(self): + produce = ProduceRequest(self.topic, 0, messages=[ + create_message("Test message %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # 1024 is not enough for 100 messages... + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + + (fetch_resp1,) = self.client.send_fetch_request([fetch1]) + + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp1.highwaterMark, 100) + messages = list(fetch_resp1.messages) + self.assertTrue(len(messages) < 100) + + # 10240 should be enough + fetch2 = FetchRequest(self.topic, 0, 0, 10240) + (fetch_resp2,) = self.client.send_fetch_request([fetch2]) + + self.assertEquals(fetch_resp2.error, 0) + self.assertEquals(fetch_resp2.highwaterMark, 100) + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 100) + for i, message in enumerate(messages): + self.assertEquals(message.offset, i) + self.assertEquals(message.message.value, "Test message %d" % i) + self.assertEquals(message.message.key, None) + + def test_produce_consume_two_partitions(self): + produce1 = ProduceRequest(self.topic, 0, messages=[ + create_message("Partition 0 %d" % i) for i in range(10) + ]) + produce2 = ProduceRequest(self.topic, 1, messages=[ + create_message("Partition 1 %d" % i) for i in range(10) + ]) + + for resp in self.client.send_produce_request([produce1, produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp1.highwaterMark, 10) + messages = list(fetch_resp1.messages) + self.assertEquals(len(messages), 10) + for i, message in enumerate(messages): + self.assertEquals(message.offset, i) + self.assertEquals(message.message.value, "Partition 0 %d" % i) + self.assertEquals(message.message.key, None) + self.assertEquals(fetch_resp2.error, 0) + self.assertEquals(fetch_resp2.highwaterMark, 10) + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 10) + for i, message in enumerate(messages): + self.assertEquals(message.offset, i) + self.assertEquals(message.message.value, "Partition 1 %d" % i) + self.assertEquals(message.message.key, None) + + #################### + # Offset Tests # + #################### + + @unittest.skip('commmit offset not supported in this version') + def test_commit_fetch_offsets(self): + req = OffsetCommitRequest(self.topic, 0, 42, "metadata") + (resp,) = self.client.send_offset_commit_request("group", [req]) + self.assertEquals(resp.error, 0) + + req = OffsetFetchRequest(self.topic, 0) + (resp,) = self.client.send_offset_fetch_request("group", [req]) + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 42) + self.assertEquals(resp.metadata, "") # Metadata isn't stored for now + + # Producer Tests + + def test_simple_producer(self): + producer = SimpleProducer(self.client) + resp = producer.send_messages(self.topic, "one", "two") + + # Will go to partition 0 + self.assertEquals(len(resp), 1) + self.assertEquals(resp[0].error, 0) + self.assertEquals(resp[0].offset, 0) # offset of first msg + + # Will go to partition 1 + resp = producer.send_messages(self.topic, "three") + self.assertEquals(len(resp), 1) + self.assertEquals(resp[0].error, 0) + self.assertEquals(resp[0].offset, 0) # offset of first msg + + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, + fetch2]) + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp1.highwaterMark, 2) + messages = list(fetch_resp1.messages) + self.assertEquals(len(messages), 2) + self.assertEquals(messages[0].message.value, "one") + self.assertEquals(messages[1].message.value, "two") + self.assertEquals(fetch_resp2.error, 0) + self.assertEquals(fetch_resp2.highwaterMark, 1) + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "three") + + # Will go to partition 0 + resp = producer.send_messages(self.topic, "four", "five") + self.assertEquals(len(resp), 1) + self.assertEquals(resp[0].error, 0) + self.assertEquals(resp[0].offset, 2) # offset of first msg + + producer.stop() + + def test_round_robin_partitioner(self): + producer = KeyedProducer(self.client, + partitioner=RoundRobinPartitioner) + producer.send(self.topic, "key1", "one") + producer.send(self.topic, "key2", "two") + producer.send(self.topic, "key3", "three") + producer.send(self.topic, "key4", "four") + + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) + + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, + fetch2]) + + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp1.highwaterMark, 2) + self.assertEquals(fetch_resp1.partition, 0) + + messages = list(fetch_resp1.messages) + self.assertEquals(len(messages), 2) + self.assertEquals(messages[0].message.value, "one") + self.assertEquals(messages[1].message.value, "three") + + self.assertEquals(fetch_resp2.error, 0) + self.assertEquals(fetch_resp2.highwaterMark, 2) + self.assertEquals(fetch_resp2.partition, 1) + + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 2) + self.assertEquals(messages[0].message.value, "two") + self.assertEquals(messages[1].message.value, "four") + + producer.stop() + + def test_hashed_partitioner(self): + producer = KeyedProducer(self.client, + partitioner=HashedPartitioner) + producer.send(self.topic, 1, "one") + producer.send(self.topic, 2, "two") + producer.send(self.topic, 3, "three") + producer.send(self.topic, 4, "four") + + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) + + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, + fetch2]) + + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp1.highwaterMark, 2) + self.assertEquals(fetch_resp1.partition, 0) + + messages = list(fetch_resp1.messages) + self.assertEquals(len(messages), 2) + self.assertEquals(messages[0].message.value, "two") + self.assertEquals(messages[1].message.value, "four") + + self.assertEquals(fetch_resp2.error, 0) + self.assertEquals(fetch_resp2.highwaterMark, 2) + self.assertEquals(fetch_resp2.partition, 1) + + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 2) + self.assertEquals(messages[0].message.value, "one") + self.assertEquals(messages[1].message.value, "three") + + producer.stop() + + def test_acks_none(self): + producer = SimpleProducer(self.client, + req_acks=SimpleProducer.ACK_NOT_REQUIRED) + resp = producer.send_messages(self.topic, "one") + self.assertEquals(len(resp), 0) + + fetch = FetchRequest(self.topic, 0, 0, 1024) + fetch_resp = self.client.send_fetch_request([fetch]) + + self.assertEquals(fetch_resp[0].error, 0) + self.assertEquals(fetch_resp[0].highwaterMark, 1) + self.assertEquals(fetch_resp[0].partition, 0) + + messages = list(fetch_resp[0].messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "one") + + producer.stop() + + def test_acks_local_write(self): + producer = SimpleProducer(self.client, + req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE) + resp = producer.send_messages(self.topic, "one") + self.assertEquals(len(resp), 1) + + fetch = FetchRequest(self.topic, 0, 0, 1024) + fetch_resp = self.client.send_fetch_request([fetch]) + + self.assertEquals(fetch_resp[0].error, 0) + self.assertEquals(fetch_resp[0].highwaterMark, 1) + self.assertEquals(fetch_resp[0].partition, 0) + + messages = list(fetch_resp[0].messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "one") + + producer.stop() + + def test_acks_cluster_commit(self): + producer = SimpleProducer( + self.client, + req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT) + resp = producer.send_messages(self.topic, "one") + self.assertEquals(len(resp), 1) + + fetch = FetchRequest(self.topic, 0, 0, 1024) + fetch_resp = self.client.send_fetch_request([fetch]) + + self.assertEquals(fetch_resp[0].error, 0) + self.assertEquals(fetch_resp[0].highwaterMark, 1) + self.assertEquals(fetch_resp[0].partition, 0) + + messages = list(fetch_resp[0].messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "one") + + producer.stop() + + def test_async_simple_producer(self): + producer = SimpleProducer(self.client, async=True) + resp = producer.send_messages(self.topic, "one") + self.assertEquals(len(resp), 0) + + # Give it some time + time.sleep(2) + + fetch = FetchRequest(self.topic, 0, 0, 1024) + fetch_resp = self.client.send_fetch_request([fetch]) + + self.assertEquals(fetch_resp[0].error, 0) + self.assertEquals(fetch_resp[0].highwaterMark, 1) + self.assertEquals(fetch_resp[0].partition, 0) + + messages = list(fetch_resp[0].messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "one") + + producer.stop() + + def test_async_keyed_producer(self): + producer = KeyedProducer(self.client, async=True) + + resp = producer.send(self.topic, "key1", "one") + self.assertEquals(len(resp), 0) + + # Give it some time + time.sleep(2) + + fetch = FetchRequest(self.topic, 0, 0, 1024) + fetch_resp = self.client.send_fetch_request([fetch]) + + self.assertEquals(fetch_resp[0].error, 0) + self.assertEquals(fetch_resp[0].highwaterMark, 1) + self.assertEquals(fetch_resp[0].partition, 0) + + messages = list(fetch_resp[0].messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "one") + + producer.stop() + + def test_batched_simple_producer(self): + producer = SimpleProducer(self.client, + batch_send=True, + batch_send_every_n=10, + batch_send_every_t=20) + + # Send 5 messages and do a fetch + msgs = ["message-%d" % i for i in range(0, 5)] + resp = producer.send_messages(self.topic, *msgs) + + # Batch mode is async. No ack + self.assertEquals(len(resp), 0) + + # Give it some time + time.sleep(2) + + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, + fetch2]) + + self.assertEquals(fetch_resp1.error, 0) + messages = list(fetch_resp1.messages) + self.assertEquals(len(messages), 0) + + self.assertEquals(fetch_resp2.error, 0) + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 0) + + # Send 5 more messages, wait for 2 seconds and do a fetch + msgs = ["message-%d" % i for i in range(5, 10)] + resp = producer.send_messages(self.topic, *msgs) + + # Give it some time + time.sleep(2) + + fetch1 = FetchRequest(self.topic, 0, 0, 1024) + fetch2 = FetchRequest(self.topic, 1, 0, 1024) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, + fetch2]) + + self.assertEquals(fetch_resp1.error, 0) + messages = list(fetch_resp1.messages) + self.assertEquals(len(messages), 5) + + self.assertEquals(fetch_resp2.error, 0) + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 5) + + # Send 7 messages and wait for 20 seconds + msgs = ["message-%d" % i for i in range(10, 15)] + resp = producer.send_messages(self.topic, *msgs) + msgs = ["message-%d" % i for i in range(15, 17)] + resp = producer.send_messages(self.topic, *msgs) + + fetch1 = FetchRequest(self.topic, 0, 5, 1024) + fetch2 = FetchRequest(self.topic, 1, 5, 1024) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, + fetch2]) + + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp2.error, 0) + messages = list(fetch_resp1.messages) + list(fetch_resp2.messages) + self.assertEquals(len(messages), 0) + + # Give it some time + time.sleep(22) + + fetch1 = FetchRequest(self.topic, 0, 5, 1024) + fetch2 = FetchRequest(self.topic, 1, 5, 1024) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, + fetch2]) + + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp2.error, 0) + messages = list(fetch_resp1.messages) + list(fetch_resp2.messages) + self.assertEquals(len(messages), 7) + + producer.stop() + + +@unittest.skipIf(skip_integration(), 'Skipping Integration') +class TestConsumer(KafkaTestCase): + @classmethod + def setUpClass(cls): + cls.zk = ZookeeperFixture.instance() + cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) + cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) + cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port)) + + @classmethod + def tearDownClass(cls): # noqa + cls.client.close() + cls.server1.close() + cls.server2.close() + cls.zk.close() + + def test_simple_consumer(self): + # Produce 100 messages to partition 0 + produce1 = ProduceRequest(self.topic, 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # Produce 100 messages to partition 1 + produce2 = ProduceRequest(self.topic, 1, messages=[ + create_message("Test message 1 %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # Start a consumer + consumer = SimpleConsumer(self.client, "group1", + self.topic, auto_commit=False, + iter_timeout=0) + all_messages = [] + for message in consumer: + all_messages.append(message) + + self.assertEquals(len(all_messages), 200) + # Make sure there are no duplicates + self.assertEquals(len(all_messages), len(set(all_messages))) + + consumer.seek(-10, 2) + all_messages = [] + for message in consumer: + all_messages.append(message) + + self.assertEquals(len(all_messages), 10) + + consumer.seek(-13, 2) + all_messages = [] + for message in consumer: + all_messages.append(message) + + self.assertEquals(len(all_messages), 13) + + consumer.stop() + + def test_simple_consumer_blocking(self): + consumer = SimpleConsumer(self.client, "group1", + self.topic, + auto_commit=False, iter_timeout=0) + + # Blocking API + start = datetime.now() + messages = consumer.get_messages(block=True, timeout=5) + diff = (datetime.now() - start).total_seconds() + self.assertGreaterEqual(diff, 5) + self.assertEqual(len(messages), 0) + + # Send 10 messages + produce = ProduceRequest(self.topic, 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(10) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # Fetch 5 messages + messages = consumer.get_messages(count=5, block=True, timeout=5) + self.assertEqual(len(messages), 5) + + # Fetch 10 messages + start = datetime.now() + messages = consumer.get_messages(count=10, block=True, timeout=5) + self.assertEqual(len(messages), 5) + diff = (datetime.now() - start).total_seconds() + self.assertGreaterEqual(diff, 5) + + consumer.stop() + + def test_simple_consumer_pending(self): + # Produce 10 messages to partition 0 and 1 + + produce1 = ProduceRequest(self.topic, 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(10) + ]) + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + produce2 = ProduceRequest(self.topic, 1, messages=[ + create_message("Test message 1 %d" % i) for i in range(10) + ]) + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + consumer = SimpleConsumer(self.client, "group1", self.topic, + auto_commit=False, iter_timeout=0) + self.assertEquals(consumer.pending(), 20) + self.assertEquals(consumer.pending(partitions=[0]), 10) + self.assertEquals(consumer.pending(partitions=[1]), 10) + consumer.stop() + + def test_multi_process_consumer(self): + # Produce 100 messages to partition 0 + produce1 = ProduceRequest(self.topic, 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # Produce 100 messages to partition 1 + produce2 = ProduceRequest(self.topic, 1, messages=[ + create_message("Test message 1 %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # Start a consumer + consumer = MultiProcessConsumer(self.client, "grp1", self.topic, auto_commit=False) + all_messages = [] + for message in consumer: + all_messages.append(message) + + self.assertEquals(len(all_messages), 200) + # Make sure there are no duplicates + self.assertEquals(len(all_messages), len(set(all_messages))) + + # Blocking API + start = datetime.now() + messages = consumer.get_messages(block=True, timeout=5) + diff = (datetime.now() - start).total_seconds() + self.assertGreaterEqual(diff, 4.999) + self.assertEqual(len(messages), 0) + + # Send 10 messages + produce = ProduceRequest(self.topic, 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(10) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 100) + + # Fetch 5 messages + messages = consumer.get_messages(count=5, block=True, timeout=5) + self.assertEqual(len(messages), 5) + + # Fetch 10 messages + start = datetime.now() + messages = consumer.get_messages(count=10, block=True, timeout=5) + self.assertEqual(len(messages), 5) + diff = (datetime.now() - start).total_seconds() + self.assertGreaterEqual(diff, 5) + + consumer.stop() + + def test_multi_proc_pending(self): + # Produce 10 messages to partition 0 and 1 + produce1 = ProduceRequest(self.topic, 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(10) + ]) + + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + produce2 = ProduceRequest(self.topic, 1, messages=[ + create_message("Test message 1 %d" % i) for i in range(10) + ]) + + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False) + self.assertEquals(consumer.pending(), 20) + self.assertEquals(consumer.pending(partitions=[0]), 10) + self.assertEquals(consumer.pending(partitions=[1]), 10) + + consumer.stop() + + def test_large_messages(self): + # Produce 10 "normal" size messages + messages1 = [create_message(random_string(1024)) for i in range(10)] + produce1 = ProduceRequest(self.topic, 0, messages1) + + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # Produce 10 messages that are large (bigger than default fetch size) + messages2 = [create_message(random_string(5000)) for i in range(10)] + produce2 = ProduceRequest(self.topic, 0, messages2) + + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 10) + + # Consumer should still get all of them + consumer = SimpleConsumer(self.client, "group1", self.topic, + auto_commit=False, iter_timeout=0) + all_messages = messages1 + messages2 + for i, message in enumerate(consumer): + self.assertEquals(all_messages[i], message.message) + self.assertEquals(i, 19) + + # Produce 1 message that is too large (bigger than max fetch size) + big_message_size = MAX_FETCH_BUFFER_SIZE_BYTES + 10 + big_message = create_message(random_string(big_message_size)) + produce3 = ProduceRequest(self.topic, 0, [big_message]) + for resp in self.client.send_produce_request([produce3]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 20) + + self.assertRaises(ConsumerFetchSizeTooSmall, consumer.get_message, False, 0.1) + + # Create a consumer with no fetch size limit + big_consumer = SimpleConsumer(self.client, "group1", self.topic, + max_buffer_size=None, partitions=[0], + auto_commit=False, iter_timeout=0) + + # Seek to the last message + big_consumer.seek(-1, 2) + + # Consume giant message successfully + message = big_consumer.get_message(block=False, timeout=10) + self.assertIsNotNone(message) + self.assertEquals(message.message.value, big_message.value) + + +@unittest.skipIf(skip_integration(), 'Skipping Integration') +class TestFailover(KafkaTestCase): + + @classmethod + def setUpClass(cls): # noqa + zk_chroot = random_string(10) + replicas = 2 + partitions = 2 + + # mini zookeeper, 2 kafka brokers + cls.zk = ZookeeperFixture.instance() + kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] + cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] + + hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers] + cls.client = KafkaClient(hosts) + + @classmethod + def tearDownClass(cls): + cls.client.close() + for broker in cls.brokers: + broker.close() + cls.zk.close() + + def test_switch_leader(self): + key, topic, partition = random_string(5), self.topic, 0 + producer = SimpleProducer(self.client) + + for i in range(1, 4): + + # XXX unfortunately, the conns dict needs to be warmed for this to work + # XXX unfortunately, for warming to work, we need at least as many partitions as brokers + self._send_random_messages(producer, self.topic, 10) + + # kil leader for partition 0 + broker = self._kill_leader(topic, partition) + + # expect failure, reload meta data + with self.assertRaises(FailedPayloadsError): + producer.send_messages(self.topic, 'part 1') + producer.send_messages(self.topic, 'part 2') + time.sleep(1) + + # send to new leader + self._send_random_messages(producer, self.topic, 10) + + broker.open() + time.sleep(3) + + # count number of messages + count = self._count_messages('test_switch_leader group %s' % i, topic) + self.assertIn(count, range(20 * i, 22 * i + 1)) + + producer.stop() + + def test_switch_leader_async(self): + key, topic, partition = random_string(5), self.topic, 0 + producer = SimpleProducer(self.client, async=True) + + for i in range(1, 4): + + self._send_random_messages(producer, self.topic, 10) + + # kil leader for partition 0 + broker = self._kill_leader(topic, partition) + + # expect failure, reload meta data + producer.send_messages(self.topic, 'part 1') + producer.send_messages(self.topic, 'part 2') + time.sleep(1) + + # send to new leader + self._send_random_messages(producer, self.topic, 10) + + broker.open() + time.sleep(3) + + # count number of messages + count = self._count_messages('test_switch_leader_async group %s' % i, topic) + self.assertIn(count, range(20 * i, 22 * i + 1)) + + producer.stop() + + def _send_random_messages(self, producer, topic, n): + for j in range(n): + resp = producer.send_messages(topic, random_string(10)) + if len(resp) > 0: + self.assertEquals(resp[0].error, 0) + time.sleep(1) # give it some time + + def _kill_leader(self, topic, partition): + leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)] + broker = self.brokers[leader.nodeId] + broker.close() + time.sleep(1) # give it some time + return broker + + def _count_messages(self, group, topic): + hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port) + client = KafkaClient(hosts) + consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0) + all_messages = [] + for message in consumer: + all_messages.append(message) + consumer.stop() + client.close() + return len(all_messages) + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + unittest.main() diff --git a/test/test_protocol.py b/test/test_protocol.py index 430e65e..e86b6f0 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -86,7 +86,13 @@ class TestProtocol(unittest.TestCase): self.assertEqual(msg.value, expect) def test_encode_message_header(self): - expect = '\x00\n\x00\x00\x00\x00\x00\x04\x00\x07client3' + expect = ( + "\x00\n" # API Key + "\x00\x00" # API Version + "\x00\x00\x00\x04" # CorrelationId + "\x00\x07" # Client length + "client3" # Client Id + ) encoded = KafkaProtocol._encode_message_header("client3", 4, 10) self.assertEqual(encoded, expect) @@ -111,10 +117,27 @@ class TestProtocol(unittest.TestCase): def test_encode_message_set(self): message_set = [create_message("v1", "k1"), create_message("v2", "k2")] encoded = KafkaProtocol._encode_message_set(message_set) - expect = ("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x12W\xe7In\x00" - "\x00\x00\x00\x00\x02k1\x00\x00\x00\x02v1\x00\x00\x00\x00" - "\x00\x00\x00\x00\x00\x00\x00\x12\xff\x06\x02I\x00\x00\x00" - "\x00\x00\x02k2\x00\x00\x00\x02v2") + expect = ( + "\x00\x00\x00\x00\x00\x00\x00\x00" # Msgset1, Offset (Meaningless) + "\x00\x00\x00\x12" # Msgset1, Msg Size + "\x57\xe7\x49\x6e" # Msg1, CRC + "\x00" # Msg1, Magic + "\x00" # Msg1, Flags + "\x00\x00\x00\x02" # Msg1, key size + "k1" # Msg1, key + "\x00\x00\x00\x02" # Msg1, value size + "v1" # Msg1, value + "\x00\x00\x00\x00\x00\x00\x00\x00" # Msgset2, Offset (Meaningless) + "\x00\x00\x00\x12" # Msgset2, Msg Size + "\xff\x06\x02\x49" # Msg2, CRC + "\x00" # Msg2, Magic + "\x00" # Msg2, flags + "\x00\x00\x00\x02" # Msg2, key size + "k2" # Msg2, key + "\x00\x00\x00\x02" # Msg2, value size + "v2" # MSg2, value + ) + self.assertEqual(encoded, expect) def test_decode_message(self): diff --git a/test/testutil.py b/test/testutil.py new file mode 100644 index 0000000..7d57ff6 --- /dev/null +++ b/test/testutil.py @@ -0,0 +1,10 @@ +import os +import random +import string + +def random_string(l): + s = "".join(random.choice(string.letters) for i in xrange(l)) + return s + +def skip_integration(): + return os.environ.get('SKIP_INTEGRATION') @@ -2,9 +2,11 @@ envlist = py27 [testenv] deps = - pytest + nose + coverage mock -commands = py.test --basetemp={envtmpdir} [] +commands = + nosetests --with-coverage --cover-erase --cover-package kafka [] setenv = PROJECT_ROOT = {toxinidir} KAFKA_ROOT = {toxinidir}/kafka-src |