summaryrefslogtreecommitdiff
path: root/kafka/util.py
blob: bdda7ed8e4f7634f7ad30793bd491d4febc6f84e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from collections import defaultdict
from itertools import groupby
import struct
from threading import Thread, Event


def write_int_string(s):
    if s is None:
        return struct.pack('>i', -1)
    else:
        return struct.pack('>i%ds' % len(s), len(s), s)


def write_short_string(s):
    if s is None:
        return struct.pack('>h', -1)
    else:
        return struct.pack('>h%ds' % len(s), len(s), s)


def read_short_string(data, cur):
    if len(data) < cur + 2:
        raise BufferUnderflowError("Not enough data left")

    (strLen,) = struct.unpack('>h', data[cur:cur + 2])
    if strLen == -1:
        return (None, cur + 2)

    cur += 2
    if len(data) < cur + strLen:
        raise BufferUnderflowError("Not enough data left")

    out = data[cur:cur + strLen]
    return (out, cur + strLen)


def read_int_string(data, cur):
    if len(data) < cur + 4:
        raise BufferUnderflowError("Not enough data left")

    (strLen,) = struct.unpack('>i', data[cur:cur + 4])
    if strLen == -1:
        return (None, cur + 4)

    cur += 4
    if len(data) < cur + strLen:
        raise BufferUnderflowError("Not enough data left")

    out = data[cur:cur + strLen]
    return (out, cur + strLen)


def relative_unpack(fmt, data, cur):
    size = struct.calcsize(fmt)
    if len(data) < cur + size:
        raise BufferUnderflowError("Not enough data left")

    out = struct.unpack(fmt, data[cur:cur + size])
    return (out, cur + size)


def group_by_topic_and_partition(tuples):
    out = defaultdict(dict)
    for t in tuples:
        out[t.topic][t.partition] = t
    return out


class BufferUnderflowError(Exception):
    pass


class ChecksumError(Exception):
    pass

class ConsumerFetchSizeTooSmall(Exception):
    pass

class ReentrantTimer(object):
    """
    A timer that can be restarted, unlike threading.Timer
    (although this uses threading.Timer)

    t: timer interval in milliseconds
    fn: a callable to invoke
    args: tuple of args to be passed to function
    kwargs: keyword arguments to be passed to function
    """
    def __init__(self, t, fn, *args, **kwargs):

        if t <= 0:
            raise ValueError('Invalid timeout value')

        if not callable(fn):
            raise ValueError('fn must be callable')

        self.thread = None
        self.t = t / 1000.0
        self.fn = fn
        self.args = args
        self.kwargs = kwargs
        self.active = None

    def _timer(self, active):
        while not active.wait(self.t):
            self.fn(*self.args, **self.kwargs)

    def start(self):
        if self.thread is not None:
            self.stop()

        self.active = Event()
        self.thread = Thread(target=self._timer, args=(self.active,))
        self.thread.daemon = True  # So the app exits when main thread exits
        self.thread.start()

    def stop(self):
        if self.thread is None:
            return

        self.active.set()
        self.thread.join(self.t + 1)
        self.timer = None