summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/__init__.py1
-rw-r--r--kafka/client.py24
-rw-r--r--kafka/codec.py17
3 files changed, 41 insertions, 1 deletions
diff --git a/kafka/__init__.py b/kafka/__init__.py
index 41038d7..1dcae86 100644
--- a/kafka/__init__.py
+++ b/kafka/__init__.py
@@ -9,3 +9,4 @@ from .client import (
Message, ProduceRequest, FetchRequest, OffsetRequest
)
from .codec import gzip_encode, gzip_decode
+from .codec import snappy_encode, snappy_decode
diff --git a/kafka/client.py b/kafka/client.py
index 9d23568..212f5c3 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -5,6 +5,7 @@ import struct
import zlib
from .codec import gzip_encode, gzip_decode
+from .codec import snappy_encode, snappy_decode
log = logging.getLogger("kafka")
@@ -340,7 +341,10 @@ class KafkaClient(object):
yield msg
elif att & KafkaClient.ATTRIBUTE_CODEC_MASK == 2:
# Snappy encoded Message
- raise NotImplementedError("Snappy codec is not yet supported")
+ snp = snappy_decode(payload)
+ (msgs, _) = cls.read_message_set(snp)
+ for msg in msgs:
+ yield msg
else:
raise RuntimeError("Unsupported compression type: %d" % (att & KafkaClient.ATTRIBUTE_CODEC_MASK))
@@ -437,6 +441,24 @@ class KafkaClient(object):
gzipped = gzip_encode(message_set)
return Message(1, 0x00 | (KafkaClient.ATTRIBUTE_CODEC_MASK & 0x01), zlib.crc32(gzipped), gzipped)
+ @classmethod
+ def create_snappy_message(cls, *payloads):
+ """
+ Create a Snappy encoded Message
+
+ Params
+ ======
+ payloads, list of messages (bytes) to be encoded
+
+ Returns
+ =======
+ A Message tuple
+ """
+ messages = [cls.create_message(payload) for payload in payloads]
+ message_set = cls.encode_message_set(messages)
+ snapped = snappy_encode(message_set)
+ return Message(1, 0x00 | (KafkaClient.ATTRIBUTE_CODEC_MASK & 0x02), zlib.crc32(snapped), snapped)
+
def send_message_set(self, produceRequest):
"""
Send a ProduceRequest
diff --git a/kafka/codec.py b/kafka/codec.py
index 47ab074..83f3c0b 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -4,6 +4,13 @@ import logging
log = logging.getLogger("kafka.codec")
+try:
+ import snappy
+ hasSnappy=True
+except ImportError:
+ log.warn("Snappy codec not available")
+ hasSnappy=False
+
def gzip_encode(payload):
buf = StringIO()
f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6)
@@ -21,3 +28,13 @@ def gzip_decode(payload):
f.close()
buf.close()
return out
+
+def snappy_encode(payload):
+ if not hasSnappy:
+ raise NotImplementedError("Snappy codec not available")
+ return snappy.compress(payload)
+
+def snappy_decode(payload):
+ if not hasSnappy:
+ raise NotImplementedError("Snappy codec not available")
+ return snappy.decompress(payload)