summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2012-11-16 15:02:12 -0500
committerDavid Arthur <mumrah@gmail.com>2012-11-16 15:02:49 -0500
commit26414d4a993b6d0fd9074b2e7c4abc4230ae52eb (patch)
tree79787ce084071488c92760b91570bd5920786f35 /kafka/client.py
parent03abf98c0936a1d48af077ba9c41a11cd074ec5b (diff)
downloadkafka-python-26414d4a993b6d0fd9074b2e7c4abc4230ae52eb.tar.gz
Add Snappy support0.1-alpha
Fixes #2
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py24
1 files changed, 23 insertions, 1 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 9d23568..212f5c3 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -5,6 +5,7 @@ import struct
import zlib
from .codec import gzip_encode, gzip_decode
+from .codec import snappy_encode, snappy_decode
log = logging.getLogger("kafka")
@@ -340,7 +341,10 @@ class KafkaClient(object):
yield msg
elif att & KafkaClient.ATTRIBUTE_CODEC_MASK == 2:
# Snappy encoded Message
- raise NotImplementedError("Snappy codec is not yet supported")
+ snp = snappy_decode(payload)
+ (msgs, _) = cls.read_message_set(snp)
+ for msg in msgs:
+ yield msg
else:
raise RuntimeError("Unsupported compression type: %d" % (att & KafkaClient.ATTRIBUTE_CODEC_MASK))
@@ -437,6 +441,24 @@ class KafkaClient(object):
gzipped = gzip_encode(message_set)
return Message(1, 0x00 | (KafkaClient.ATTRIBUTE_CODEC_MASK & 0x01), zlib.crc32(gzipped), gzipped)
+ @classmethod
+ def create_snappy_message(cls, *payloads):
+ """
+ Create a Snappy encoded Message
+
+ Params
+ ======
+ payloads, list of messages (bytes) to be encoded
+
+ Returns
+ =======
+ A Message tuple
+ """
+ messages = [cls.create_message(payload) for payload in payloads]
+ message_set = cls.encode_message_set(messages)
+ snapped = snappy_encode(message_set)
+ return Message(1, 0x00 | (KafkaClient.ATTRIBUTE_CODEC_MASK & 0x02), zlib.crc32(snapped), snapped)
+
def send_message_set(self, produceRequest):
"""
Send a ProduceRequest