summaryrefslogtreecommitdiff
path: root/kafka/codec.py
blob: eb5d03cf97c1d6b1f4b261e3cdf5336bcd1fd02e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from cStringIO import StringIO
import gzip

try:
    import snappy
    _has_snappy = True
except ImportError:
    _has_snappy = False


def has_gzip():
    return True


def has_snappy():
    return _has_snappy


def gzip_encode(payload):
    buffer = StringIO()
    handle = gzip.GzipFile(fileobj=buffer, mode="w")
    handle.write(payload)
    handle.close()
    buffer.seek(0)
    result = buffer.read()
    buffer.close()
    return result


def gzip_decode(payload):
    buffer = StringIO(payload)
    handle = gzip.GzipFile(fileobj=buffer, mode='r')
    result = handle.read()
    handle.close()
    buffer.close()
    return result


def snappy_encode(payload):
    if not _has_snappy:
        raise NotImplementedError("Snappy codec is not available")
    return snappy.compress(payload)


def snappy_decode(payload):
    if not _has_snappy:
        raise NotImplementedError("Snappy codec is not available")
    return snappy.decompress(payload)