summaryrefslogtreecommitdiff
path: root/test.py
blob: 56f1e1ec9cd36a1a43ee9d7a31b32548710374a7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import os
import random
import struct
import unittest

from kafka import KafkaClient
from kafka import gzip_encode, gzip_decode, length_prefix_message

ITERATIONS = 1000
STRLEN = 100

def random_string():
    return os.urandom(random.randint(0, STRLEN))

class TestMisc(unittest.TestCase):
    def test_length_prefix(self):
        for i in xrange(ITERATIONS):
            s1 = random_string()
            s2 = length_prefix_message(s1)
            self.assertEquals(struct.unpack('>i', s2[0:4])[0], len(s1)) 
        
class TestCodec(unittest.TestCase):
    def test_gzip(self):
        for i in xrange(ITERATIONS):
            s1 = random_string()
            s2 = gzip_decode(gzip_encode(s1))
            self.assertEquals(s1,s2)

class TestMessage(unittest.TestCase):
    def test_create(self):
        msg = KafkaClient.create_message("testing")
        self.assertEquals(msg.payload, "testing")
        self.assertEquals(msg.magic, 1)
        self.assertEquals(msg.attributes, 0)
        self.assertEquals(msg.crc, -386704890) 

    def test_create_gzip(self):
        msg = KafkaClient.create_gzip_message("testing")
        self.assertEquals(msg.magic, 1)
        self.assertEquals(msg.attributes, 1)
        # Can't check the crc or payload for gzip since it's non-deterministic
        (messages, _) = KafkaClient.read_message_set(gzip_decode(msg.payload))
        inner = messages[0]
        self.assertEquals(inner.magic, 1)
        self.assertEquals(inner.attributes, 0)
        self.assertEquals(inner.payload, "testing")
        self.assertEquals(inner.crc, -386704890) 

    def test_message_simple(self):
        msg = KafkaClient.create_message("testing")
        enc = KafkaClient.encode_message(msg)
        expect = "\x00\x00\x00\r\x01\x00\xe8\xf3Z\x06testing"
        self.assertEquals(enc, expect)
        (messages, read) = KafkaClient.read_message_set(enc)
        self.assertEquals(len(messages), 1)
        self.assertEquals(messages[0], msg)

    def test_message_list(self):
        msgs = [
            KafkaClient.create_message("one"),
            KafkaClient.create_message("two"),
            KafkaClient.create_message("three")
        ]
        enc = KafkaClient.encode_message_set(msgs)
        expect = ("\x00\x00\x00\t\x01\x00zl\x86\xf1one\x00\x00\x00\t\x01\x00\x11"
                  "\xca\x8aftwo\x00\x00\x00\x0b\x01\x00F\xc5\xd8\xf5three")
        self.assertEquals(enc, expect)
        (messages, read) = KafkaClient.read_message_set(enc)
        self.assertEquals(len(messages), 3)
        self.assertEquals(messages[0].payload, "one")
        self.assertEquals(messages[1].payload, "two")
        self.assertEquals(messages[2].payload, "three")

    def test_message_gzip(self):
        msg = KafkaClient.create_gzip_message("one", "two", "three")
        enc = KafkaClient.encode_message(msg)
        # Can't check the bytes directly since Gzip is non-deterministic
        (messages, read) = KafkaClient.read_message_set(enc)
        self.assertEquals(len(messages), 3)
        self.assertEquals(messages[0].payload, "one")
        self.assertEquals(messages[1].payload, "two")
        self.assertEquals(messages[2].payload, "three")

    def test_message_simple_random(self):
        for i in xrange(ITERATIONS):
            n = random.randint(0, 10)
            msgs = [KafkaClient.create_message(random_string()) for j in range(n)]
            enc = KafkaClient.encode_message_set(msgs)
            (messages, read) = KafkaClient.read_message_set(enc)
            self.assertEquals(len(messages), n)
            for j in range(n):
                self.assertEquals(messages[j], msgs[j])

    def test_message_gzip_random(self):
        for i in xrange(ITERATIONS):
            n = random.randint(0, 10)
            strings = [random_string() for j in range(n)]
            msg = KafkaClient.create_gzip_message(*strings)
            enc = KafkaClient.encode_message(msg)
            (messages, read) = KafkaClient.read_message_set(enc)
            self.assertEquals(len(messages), n)
            for j in range(n):
                self.assertEquals(messages[j].payload, strings[j])

if __name__ == '__main__':
    unittest.main()