diff options
author | David Arthur <mumrah@gmail.com> | 2012-09-28 11:28:56 -0400 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2012-09-28 11:28:56 -0400 |
commit | 51d8bbb231b29c006323d0705f8819f592aeb6e7 (patch) | |
tree | d84c834c8b6d5b8157e6616baab88be3f56a9206 /test/unit.py | |
parent | 2a332d2983ae306a692b3796e0a309a5e7504097 (diff) | |
download | kafka-python-51d8bbb231b29c006323d0705f8819f592aeb6e7.tar.gz |
Starting integration tests
Diffstat (limited to 'test/unit.py')
-rw-r--r-- | test/unit.py | 119 |
1 files changed, 119 insertions, 0 deletions
diff --git a/test/unit.py b/test/unit.py new file mode 100644 index 0000000..5c48f27 --- /dev/null +++ b/test/unit.py @@ -0,0 +1,119 @@ +import os +import random +import struct +import unittest + +from kafka import KafkaClient, ProduceRequest, FetchRequest +from kafka import gzip_encode, gzip_decode, length_prefix_message + +ITERATIONS = 1000 +STRLEN = 100 + +def random_string(): + return os.urandom(random.randint(0, STRLEN)) + +class TestMisc(unittest.TestCase): + def test_length_prefix(self): + for i in xrange(ITERATIONS): + s1 = random_string() + s2 = length_prefix_message(s1) + self.assertEquals(struct.unpack('>i', s2[0:4])[0], len(s1)) + +class TestCodec(unittest.TestCase): + def test_gzip(self): + for i in xrange(ITERATIONS): + s1 = random_string() + s2 = gzip_decode(gzip_encode(s1)) + self.assertEquals(s1,s2) + +class TestMessage(unittest.TestCase): + def test_create(self): + msg = KafkaClient.create_message("testing") + self.assertEquals(msg.payload, "testing") + self.assertEquals(msg.magic, 1) + self.assertEquals(msg.attributes, 0) + self.assertEquals(msg.crc, -386704890) + + def test_create_gzip(self): + msg = KafkaClient.create_gzip_message("testing") + self.assertEquals(msg.magic, 1) + self.assertEquals(msg.attributes, 1) + # Can't check the crc or payload for gzip since it's non-deterministic + (messages, _) = KafkaClient.read_message_set(gzip_decode(msg.payload)) + inner = messages[0] + self.assertEquals(inner.magic, 1) + self.assertEquals(inner.attributes, 0) + self.assertEquals(inner.payload, "testing") + self.assertEquals(inner.crc, -386704890) + + def test_message_simple(self): + msg = KafkaClient.create_message("testing") + enc = KafkaClient.encode_message(msg) + expect = "\x00\x00\x00\r\x01\x00\xe8\xf3Z\x06testing" + self.assertEquals(enc, expect) + (messages, read) = KafkaClient.read_message_set(enc) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0], msg) + + def test_message_list(self): + msgs = [ + KafkaClient.create_message("one"), + KafkaClient.create_message("two"), + KafkaClient.create_message("three") + ] + enc = KafkaClient.encode_message_set(msgs) + expect = ("\x00\x00\x00\t\x01\x00zl\x86\xf1one\x00\x00\x00\t\x01\x00\x11" + "\xca\x8aftwo\x00\x00\x00\x0b\x01\x00F\xc5\xd8\xf5three") + self.assertEquals(enc, expect) + (messages, read) = KafkaClient.read_message_set(enc) + self.assertEquals(len(messages), 3) + self.assertEquals(messages[0].payload, "one") + self.assertEquals(messages[1].payload, "two") + self.assertEquals(messages[2].payload, "three") + + def test_message_gzip(self): + msg = KafkaClient.create_gzip_message("one", "two", "three") + enc = KafkaClient.encode_message(msg) + # Can't check the bytes directly since Gzip is non-deterministic + (messages, read) = KafkaClient.read_message_set(enc) + self.assertEquals(len(messages), 3) + self.assertEquals(messages[0].payload, "one") + self.assertEquals(messages[1].payload, "two") + self.assertEquals(messages[2].payload, "three") + + def test_message_simple_random(self): + for i in xrange(ITERATIONS): + n = random.randint(0, 10) + msgs = [KafkaClient.create_message(random_string()) for j in range(n)] + enc = KafkaClient.encode_message_set(msgs) + (messages, read) = KafkaClient.read_message_set(enc) + self.assertEquals(len(messages), n) + for j in range(n): + self.assertEquals(messages[j], msgs[j]) + + def test_message_gzip_random(self): + for i in xrange(ITERATIONS): + n = random.randint(0, 10) + strings = [random_string() for j in range(n)] + msg = KafkaClient.create_gzip_message(*strings) + enc = KafkaClient.encode_message(msg) + (messages, read) = KafkaClient.read_message_set(enc) + self.assertEquals(len(messages), n) + for j in range(n): + self.assertEquals(messages[j].payload, strings[j]) + +class TestRequests(unittest.TestCase): + def test_produce_request(self): + req = ProduceRequest("my-topic", 0, [KafkaClient.create_message("testing")]) + enc = KafkaClient.encode_produce_request(req) + expect = "\x00\x00\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00\x11\x00\x00\x00\r\x01\x00\xe8\xf3Z\x06testing" + self.assertEquals(enc, expect) + + def test_fetch_request(self): + req = FetchRequest("my-topic", 0, 0, 1024) + enc = KafkaClient.encode_fetch_request(req) + expect = "\x00\x01\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00" + self.assertEquals(enc, expect) + +if __name__ == '__main__': + unittest.main() |