1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
# -*- coding: utf-8 -*-
import struct
import six
from . import unittest
import kafka.common
import kafka.util
class UtilTest(unittest.TestCase):
@unittest.skip("Unwritten")
def test_relative_unpack(self):
pass
def test_write_int_string(self):
self.assertEqual(
kafka.util.write_int_string(b'some string'),
b'\x00\x00\x00\x0bsome string'
)
def test_write_int_string__unicode(self):
with self.assertRaises(TypeError) as cm:
kafka.util.write_int_string(u'unicode')
#: :type: TypeError
te = cm.exception
if six.PY2:
self.assertIn('unicode', str(te))
else:
self.assertIn('str', str(te))
self.assertIn('to be bytes', str(te))
def test_write_int_string__empty(self):
self.assertEqual(
kafka.util.write_int_string(b''),
b'\x00\x00\x00\x00'
)
def test_write_int_string__null(self):
self.assertEqual(
kafka.util.write_int_string(None),
b'\xff\xff\xff\xff'
)
def test_read_int_string(self):
self.assertEqual(kafka.util.read_int_string(b'\xff\xff\xff\xff', 0), (None, 4))
self.assertEqual(kafka.util.read_int_string(b'\x00\x00\x00\x00', 0), (b'', 4))
self.assertEqual(kafka.util.read_int_string(b'\x00\x00\x00\x0bsome string', 0), (b'some string', 15))
def test_read_int_string__insufficient_data(self):
with self.assertRaises(kafka.common.BufferUnderflowError):
kafka.util.read_int_string(b'\x00\x00\x00\x021', 0)
def test_write_short_string(self):
self.assertEqual(
kafka.util.write_short_string(b'some string'),
b'\x00\x0bsome string'
)
def test_write_short_string__unicode(self):
with self.assertRaises(TypeError) as cm:
kafka.util.write_short_string(u'hello')
#: :type: TypeError
te = cm.exception
if six.PY2:
self.assertIn('unicode', str(te))
else:
self.assertIn('str', str(te))
self.assertIn('to be bytes', str(te))
def test_write_short_string__empty(self):
self.assertEqual(
kafka.util.write_short_string(b''),
b'\x00\x00'
)
def test_write_short_string__null(self):
self.assertEqual(
kafka.util.write_short_string(None),
b'\xff\xff'
)
def test_write_short_string__too_long(self):
with self.assertRaises(struct.error):
kafka.util.write_short_string(b' ' * 33000)
def test_read_short_string(self):
self.assertEqual(kafka.util.read_short_string(b'\xff\xff', 0), (None, 2))
self.assertEqual(kafka.util.read_short_string(b'\x00\x00', 0), (b'', 2))
self.assertEqual(kafka.util.read_short_string(b'\x00\x0bsome string', 0), (b'some string', 13))
def test_read_int_string__insufficient_data2(self):
with self.assertRaises(kafka.common.BufferUnderflowError):
kafka.util.read_int_string('\x00\x021', 0)
def test_relative_unpack2(self):
self.assertEqual(
kafka.util.relative_unpack('>hh', b'\x00\x01\x00\x00\x02', 0),
((1, 0), 4)
)
def test_relative_unpack3(self):
with self.assertRaises(kafka.common.BufferUnderflowError):
kafka.util.relative_unpack('>hh', '\x00', 0)
def test_group_by_topic_and_partition(self):
t = kafka.common.TopicAndPartition
l = [
t("a", 1),
t("a", 2),
t("a", 3),
t("b", 3),
]
self.assertEqual(kafka.util.group_by_topic_and_partition(l), {
"a": {
1: t("a", 1),
2: t("a", 2),
3: t("a", 3),
},
"b": {
3: t("b", 3),
}
})
# should not be able to group duplicate topic-partitions
t1 = t("a", 1)
with self.assertRaises(AssertionError):
kafka.util.group_by_topic_and_partition([t1, t1])
|