summaryrefslogtreecommitdiff
path: root/kafka/util.py
blob: 5dc6bc21b547b1c8655932d8b815ff2ac86a3248 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from collections import defaultdict
from itertools import groupby
import struct
from threading import Timer

def write_int_string(s):
    if s is None:
        return struct.pack('>i', -1)
    else:
        return struct.pack('>i%ds' % len(s), len(s), s)

def write_short_string(s):
    if s is None:
        return struct.pack('>h', -1)
    else:
        return struct.pack('>h%ds' % len(s), len(s), s)

def read_short_string(data, cur):
    if len(data) < cur+2:
        raise BufferUnderflowError("Not enough data left")
    (strLen,) = struct.unpack('>h', data[cur:cur+2])
    if strLen == -1:
        return (None, cur+2)
    cur += 2
    if len(data) < cur+strLen:
        raise BufferUnderflowError("Not enough data left")
    out = data[cur:cur+strLen]
    return (out, cur+strLen)

def read_int_string(data, cur):
    if len(data) < cur+4:
        raise BufferUnderflowError("Not enough data left")
    (strLen,) = struct.unpack('>i', data[cur:cur+4])
    if strLen == -1:
        return (None, cur+4)
    cur += 4
    if len(data) < cur+strLen:
        raise BufferUnderflowError("Not enough data left")
    out = data[cur:cur+strLen]
    return (out, cur+strLen)

def relative_unpack(fmt, data, cur):
    size = struct.calcsize(fmt)
    if len(data) < cur+size:
        raise BufferUnderflowError("Not enough data left")
    out = struct.unpack(fmt, data[cur:cur+size])
    return (out, cur+size)

def group_by_topic_and_partition(tuples):
    out = defaultdict(dict)
    for t in tuples:
        out[t.topic][t.partition] = t
    return out 

class BufferUnderflowError(Exception):
    pass

class ChecksumError(Exception):
    pass

class ReentrantTimer(object):
    """
    A timer that can be restarted, unlike threading.Timer (although this uses threading.Timer)

    t: timer interval in milliseconds
    fn: a callable to invoke
    """
    def __init__(self, t, fn):
        self.timer = None
        self.t = t
        self.fn = fn

    def start(self):
        if self.timer is None:
            self.timer = Timer(self.t / 1000., self.fn)
            self.timer.start()
        else:
            self.timer.cancel()
            self.timer = Timer(self.t / 1000., self.fn)
            self.timer.start()

    def stop(self):
        self.timer.cancel()