diff options
Diffstat (limited to 'kafka/util.py')
-rw-r--r-- | kafka/util.py | 51 |
1 files changed, 34 insertions, 17 deletions
diff --git a/kafka/util.py b/kafka/util.py index 8c02cb2..10bf838 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -3,64 +3,81 @@ from itertools import groupby import struct from threading import Timer + def write_int_string(s): if s is None: return struct.pack('>i', -1) else: return struct.pack('>i%ds' % len(s), len(s), s) + def write_short_string(s): if s is None: return struct.pack('>h', -1) else: return struct.pack('>h%ds' % len(s), len(s), s) + def read_short_string(data, cur): - if len(data) < cur+2: + if len(data) < cur + 2: raise BufferUnderflowError("Not enough data left") - (strLen,) = struct.unpack('>h', data[cur:cur+2]) + + (strLen,) = struct.unpack('>h', data[cur:cur + 2]) if strLen == -1: - return (None, cur+2) + return (None, cur + 2) + cur += 2 - if len(data) < cur+strLen: + if len(data) < cur + strLen: raise BufferUnderflowError("Not enough data left") - out = data[cur:cur+strLen] - return (out, cur+strLen) + + out = data[cur:cur + strLen] + return (out, cur + strLen) + def read_int_string(data, cur): - if len(data) < cur+4: + if len(data) < cur + 4: raise BufferUnderflowError("Not enough data left") - (strLen,) = struct.unpack('>i', data[cur:cur+4]) + + (strLen,) = struct.unpack('>i', data[cur:cur + 4]) if strLen == -1: - return (None, cur+4) + return (None, cur + 4) + cur += 4 - if len(data) < cur+strLen: + if len(data) < cur + strLen: raise BufferUnderflowError("Not enough data left") - out = data[cur:cur+strLen] - return (out, cur+strLen) + + out = data[cur:cur + strLen] + return (out, cur + strLen) + def relative_unpack(fmt, data, cur): size = struct.calcsize(fmt) - if len(data) < cur+size: + if len(data) < cur + size: raise BufferUnderflowError("Not enough data left") - out = struct.unpack(fmt, data[cur:cur+size]) - return (out, cur+size) + + out = struct.unpack(fmt, data[cur:cur + size]) + return (out, cur + size) + def group_by_topic_and_partition(tuples): out = defaultdict(dict) for t in tuples: out[t.topic][t.partition] = t - return out + return out + class BufferUnderflowError(Exception): pass + class ChecksumError(Exception): pass + class ReentrantTimer(object): """ - A timer that can be restarted, unlike threading.Timer (although this uses threading.Timer) + A timer that can be restarted, unlike threading.Timer + (although this uses threading.Timer) t: timer interval in milliseconds fn: a callable to invoke |