| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
 | import collections
import struct
import sys
from threading import Thread, Event
from kafka.common import BufferUnderflowError
def write_int_string(s):
    if s is None:
        return struct.pack('>i', -1)
    else:
        return struct.pack('>i%ds' % len(s), len(s), s)
def write_short_string(s):
    if s is None:
        return struct.pack('>h', -1)
    elif len(s) > 32767 and sys.version < (2,7):
        # Python 2.6 issues a deprecation warning instead of a struct error
        raise struct.error(len(s))
    else:
        return struct.pack('>h%ds' % len(s), len(s), s)
def read_short_string(data, cur):
    if len(data) < cur + 2:
        raise BufferUnderflowError("Not enough data left")
    (strlen,) = struct.unpack('>h', data[cur:cur + 2])
    if strlen == -1:
        return None, cur + 2
    cur += 2
    if len(data) < cur + strlen:
        raise BufferUnderflowError("Not enough data left")
    out = data[cur:cur + strlen]
    return out, cur + strlen
def read_int_string(data, cur):
    if len(data) < cur + 4:
        raise BufferUnderflowError(
            "Not enough data left to read string len (%d < %d)" %
            (len(data), cur + 4))
    (strlen,) = struct.unpack('>i', data[cur:cur + 4])
    if strlen == -1:
        return None, cur + 4
    cur += 4
    if len(data) < cur + strlen:
        raise BufferUnderflowError("Not enough data left")
    out = data[cur:cur + strlen]
    return out, cur + strlen
def relative_unpack(fmt, data, cur):
    size = struct.calcsize(fmt)
    if len(data) < cur + size:
        raise BufferUnderflowError("Not enough data left")
    out = struct.unpack(fmt, data[cur:cur + size])
    return out, cur + size
def group_by_topic_and_partition(tuples):
    out = collections.defaultdict(dict)
    for t in tuples:
        out[t.topic][t.partition] = t
    return out
class ReentrantTimer(object):
    """
    A timer that can be restarted, unlike threading.Timer
    (although this uses threading.Timer)
    t: timer interval in milliseconds
    fn: a callable to invoke
    args: tuple of args to be passed to function
    kwargs: keyword arguments to be passed to function
    """
    def __init__(self, t, fn, *args, **kwargs):
        if t <= 0:
            raise ValueError('Invalid timeout value')
        if not callable(fn):
            raise ValueError('fn must be callable')
        self.thread = None
        self.t = t / 1000.0
        self.fn = fn
        self.args = args
        self.kwargs = kwargs
        self.active = None
    def _timer(self, active):
        while not active.wait(self.t):
            self.fn(*self.args, **self.kwargs)
    def start(self):
        if self.thread is not None:
            self.stop()
        self.active = Event()
        self.thread = Thread(target=self._timer, args=(self.active,))
        self.thread.daemon = True  # So the app exits when main thread exits
        self.thread.start()
    def stop(self):
        if self.thread is None:
            return
        self.active.set()
        self.thread.join(self.t + 1)
        self.timer = None
 |