summaryrefslogtreecommitdiff
path: root/test/test_util.py
blob: 5fc3f69f2873361b3dfc871e064fcf22afce62ca (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# -*- coding: utf-8 -*-
import struct

import six
from . import unittest

import kafka.errors
import kafka.util
import kafka.structs


class UtilTest(unittest.TestCase):
    @unittest.skip("Unwritten")
    def test_relative_unpack(self):
        pass

    def test_write_int_string(self):
        self.assertEqual(
            kafka.util.write_int_string(b'some string'),
            b'\x00\x00\x00\x0bsome string'
        )

    def test_write_int_string__unicode(self):
        with self.assertRaises(TypeError) as cm:
            kafka.util.write_int_string(u'unicode')
        #: :type: TypeError
        te = cm.exception
        if six.PY2:
            self.assertIn('unicode', str(te))
        else:
            self.assertIn('str', str(te))
        self.assertIn('to be bytes', str(te))

    def test_write_int_string__empty(self):
        self.assertEqual(
            kafka.util.write_int_string(b''),
            b'\x00\x00\x00\x00'
        )

    def test_write_int_string__null(self):
        self.assertEqual(
            kafka.util.write_int_string(None),
            b'\xff\xff\xff\xff'
        )

    def test_read_int_string(self):
        self.assertEqual(kafka.util.read_int_string(b'\xff\xff\xff\xff', 0), (None, 4))
        self.assertEqual(kafka.util.read_int_string(b'\x00\x00\x00\x00', 0), (b'', 4))
        self.assertEqual(kafka.util.read_int_string(b'\x00\x00\x00\x0bsome string', 0), (b'some string', 15))

    def test_read_int_string__insufficient_data(self):
        with self.assertRaises(kafka.errors.BufferUnderflowError):
            kafka.util.read_int_string(b'\x00\x00\x00\x021', 0)

    def test_write_short_string(self):
        self.assertEqual(
            kafka.util.write_short_string(b'some string'),
            b'\x00\x0bsome string'
        )

    def test_write_short_string__unicode(self):
        with self.assertRaises(TypeError) as cm:
            kafka.util.write_short_string(u'hello')
        #: :type: TypeError
        te = cm.exception
        if six.PY2:
            self.assertIn('unicode', str(te))
        else:
            self.assertIn('str', str(te))
        self.assertIn('to be bytes', str(te))

    def test_write_short_string__empty(self):
        self.assertEqual(
            kafka.util.write_short_string(b''),
            b'\x00\x00'
        )

    def test_write_short_string__null(self):
        self.assertEqual(
            kafka.util.write_short_string(None),
            b'\xff\xff'
        )

    def test_write_short_string__too_long(self):
        with self.assertRaises(struct.error):
            kafka.util.write_short_string(b' ' * 33000)

    def test_read_short_string(self):
        self.assertEqual(kafka.util.read_short_string(b'\xff\xff', 0), (None, 2))
        self.assertEqual(kafka.util.read_short_string(b'\x00\x00', 0), (b'', 2))
        self.assertEqual(kafka.util.read_short_string(b'\x00\x0bsome string', 0), (b'some string', 13))

    def test_read_int_string__insufficient_data2(self):
        with self.assertRaises(kafka.errors.BufferUnderflowError):
            kafka.util.read_int_string('\x00\x021', 0)

    def test_relative_unpack2(self):
        self.assertEqual(
            kafka.util.relative_unpack('>hh', b'\x00\x01\x00\x00\x02', 0),
            ((1, 0), 4)
        )

    def test_relative_unpack3(self):
        with self.assertRaises(kafka.errors.BufferUnderflowError):
            kafka.util.relative_unpack('>hh', '\x00', 0)

    def test_group_by_topic_and_partition(self):
        t = kafka.structs.TopicPartition

        l = [
            t("a", 1),
            t("a", 2),
            t("a", 3),
            t("b", 3),
        ]

        self.assertEqual(kafka.util.group_by_topic_and_partition(l), {
            "a": {
                1: t("a", 1),
                2: t("a", 2),
                3: t("a", 3),
            },
            "b": {
                3: t("b", 3),
            }
        })

        # should not be able to group duplicate topic-partitions
        t1 = t("a", 1)
        with self.assertRaises(AssertionError):
            kafka.util.group_by_topic_and_partition([t1, t1])