summaryrefslogtreecommitdiff
path: root/kafka/codec.py
blob: 83f3c0b5631382d86ecd5ac9c82ce716a406bb0f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from cStringIO import StringIO
import gzip
import logging

log = logging.getLogger("kafka.codec")

try:
    import snappy
    hasSnappy=True
except ImportError:
    log.warn("Snappy codec not available")
    hasSnappy=False

def gzip_encode(payload):
    buf = StringIO()
    f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6)
    f.write(payload)
    f.close()
    buf.seek(0)
    out = buf.read()
    buf.close()
    return out

def gzip_decode(payload):
    buf = StringIO(payload)
    f = gzip.GzipFile(fileobj=buf, mode='r')
    out = f.read()
    f.close()
    buf.close()
    return out

def snappy_encode(payload):
    if not hasSnappy:
        raise NotImplementedError("Snappy codec not available")
    return snappy.compress(payload)

def snappy_decode(payload):
    if not hasSnappy:
        raise NotImplementedError("Snappy codec not available")
    return snappy.decompress(payload)