summaryrefslogtreecommitdiff
path: root/test.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2012-09-28 11:28:56 -0400
committerDavid Arthur <mumrah@gmail.com>2012-09-28 11:28:56 -0400
commit51d8bbb231b29c006323d0705f8819f592aeb6e7 (patch)
treed84c834c8b6d5b8157e6616baab88be3f56a9206 /test.py
parent2a332d2983ae306a692b3796e0a309a5e7504097 (diff)
downloadkafka-python-51d8bbb231b29c006323d0705f8819f592aeb6e7.tar.gz
Starting integration tests
Diffstat (limited to 'test.py')
-rw-r--r--test.py119
1 files changed, 0 insertions, 119 deletions
diff --git a/test.py b/test.py
deleted file mode 100644
index 5c48f27..0000000
--- a/test.py
+++ /dev/null
@@ -1,119 +0,0 @@
-import os
-import random
-import struct
-import unittest
-
-from kafka import KafkaClient, ProduceRequest, FetchRequest
-from kafka import gzip_encode, gzip_decode, length_prefix_message
-
-ITERATIONS = 1000
-STRLEN = 100
-
-def random_string():
- return os.urandom(random.randint(0, STRLEN))
-
-class TestMisc(unittest.TestCase):
- def test_length_prefix(self):
- for i in xrange(ITERATIONS):
- s1 = random_string()
- s2 = length_prefix_message(s1)
- self.assertEquals(struct.unpack('>i', s2[0:4])[0], len(s1))
-
-class TestCodec(unittest.TestCase):
- def test_gzip(self):
- for i in xrange(ITERATIONS):
- s1 = random_string()
- s2 = gzip_decode(gzip_encode(s1))
- self.assertEquals(s1,s2)
-
-class TestMessage(unittest.TestCase):
- def test_create(self):
- msg = KafkaClient.create_message("testing")
- self.assertEquals(msg.payload, "testing")
- self.assertEquals(msg.magic, 1)
- self.assertEquals(msg.attributes, 0)
- self.assertEquals(msg.crc, -386704890)
-
- def test_create_gzip(self):
- msg = KafkaClient.create_gzip_message("testing")
- self.assertEquals(msg.magic, 1)
- self.assertEquals(msg.attributes, 1)
- # Can't check the crc or payload for gzip since it's non-deterministic
- (messages, _) = KafkaClient.read_message_set(gzip_decode(msg.payload))
- inner = messages[0]
- self.assertEquals(inner.magic, 1)
- self.assertEquals(inner.attributes, 0)
- self.assertEquals(inner.payload, "testing")
- self.assertEquals(inner.crc, -386704890)
-
- def test_message_simple(self):
- msg = KafkaClient.create_message("testing")
- enc = KafkaClient.encode_message(msg)
- expect = "\x00\x00\x00\r\x01\x00\xe8\xf3Z\x06testing"
- self.assertEquals(enc, expect)
- (messages, read) = KafkaClient.read_message_set(enc)
- self.assertEquals(len(messages), 1)
- self.assertEquals(messages[0], msg)
-
- def test_message_list(self):
- msgs = [
- KafkaClient.create_message("one"),
- KafkaClient.create_message("two"),
- KafkaClient.create_message("three")
- ]
- enc = KafkaClient.encode_message_set(msgs)
- expect = ("\x00\x00\x00\t\x01\x00zl\x86\xf1one\x00\x00\x00\t\x01\x00\x11"
- "\xca\x8aftwo\x00\x00\x00\x0b\x01\x00F\xc5\xd8\xf5three")
- self.assertEquals(enc, expect)
- (messages, read) = KafkaClient.read_message_set(enc)
- self.assertEquals(len(messages), 3)
- self.assertEquals(messages[0].payload, "one")
- self.assertEquals(messages[1].payload, "two")
- self.assertEquals(messages[2].payload, "three")
-
- def test_message_gzip(self):
- msg = KafkaClient.create_gzip_message("one", "two", "three")
- enc = KafkaClient.encode_message(msg)
- # Can't check the bytes directly since Gzip is non-deterministic
- (messages, read) = KafkaClient.read_message_set(enc)
- self.assertEquals(len(messages), 3)
- self.assertEquals(messages[0].payload, "one")
- self.assertEquals(messages[1].payload, "two")
- self.assertEquals(messages[2].payload, "three")
-
- def test_message_simple_random(self):
- for i in xrange(ITERATIONS):
- n = random.randint(0, 10)
- msgs = [KafkaClient.create_message(random_string()) for j in range(n)]
- enc = KafkaClient.encode_message_set(msgs)
- (messages, read) = KafkaClient.read_message_set(enc)
- self.assertEquals(len(messages), n)
- for j in range(n):
- self.assertEquals(messages[j], msgs[j])
-
- def test_message_gzip_random(self):
- for i in xrange(ITERATIONS):
- n = random.randint(0, 10)
- strings = [random_string() for j in range(n)]
- msg = KafkaClient.create_gzip_message(*strings)
- enc = KafkaClient.encode_message(msg)
- (messages, read) = KafkaClient.read_message_set(enc)
- self.assertEquals(len(messages), n)
- for j in range(n):
- self.assertEquals(messages[j].payload, strings[j])
-
-class TestRequests(unittest.TestCase):
- def test_produce_request(self):
- req = ProduceRequest("my-topic", 0, [KafkaClient.create_message("testing")])
- enc = KafkaClient.encode_produce_request(req)
- expect = "\x00\x00\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00\x11\x00\x00\x00\r\x01\x00\xe8\xf3Z\x06testing"
- self.assertEquals(enc, expect)
-
- def test_fetch_request(self):
- req = FetchRequest("my-topic", 0, 0, 1024)
- enc = KafkaClient.encode_fetch_request(req)
- expect = "\x00\x01\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00"
- self.assertEquals(enc, expect)
-
-if __name__ == '__main__':
- unittest.main()