diff options
author | David Arthur <mumrah@gmail.com> | 2012-11-16 15:02:12 -0500 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2012-11-16 15:02:49 -0500 |
commit | 26414d4a993b6d0fd9074b2e7c4abc4230ae52eb (patch) | |
tree | 79787ce084071488c92760b91570bd5920786f35 /kafka/client.py | |
parent | 03abf98c0936a1d48af077ba9c41a11cd074ec5b (diff) | |
download | kafka-python-26414d4a993b6d0fd9074b2e7c4abc4230ae52eb.tar.gz |
Add Snappy support0.1-alpha
Fixes #2
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 24 |
1 files changed, 23 insertions, 1 deletions
diff --git a/kafka/client.py b/kafka/client.py index 9d23568..212f5c3 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -5,6 +5,7 @@ import struct import zlib from .codec import gzip_encode, gzip_decode +from .codec import snappy_encode, snappy_decode log = logging.getLogger("kafka") @@ -340,7 +341,10 @@ class KafkaClient(object): yield msg elif att & KafkaClient.ATTRIBUTE_CODEC_MASK == 2: # Snappy encoded Message - raise NotImplementedError("Snappy codec is not yet supported") + snp = snappy_decode(payload) + (msgs, _) = cls.read_message_set(snp) + for msg in msgs: + yield msg else: raise RuntimeError("Unsupported compression type: %d" % (att & KafkaClient.ATTRIBUTE_CODEC_MASK)) @@ -437,6 +441,24 @@ class KafkaClient(object): gzipped = gzip_encode(message_set) return Message(1, 0x00 | (KafkaClient.ATTRIBUTE_CODEC_MASK & 0x01), zlib.crc32(gzipped), gzipped) + @classmethod + def create_snappy_message(cls, *payloads): + """ + Create a Snappy encoded Message + + Params + ====== + payloads, list of messages (bytes) to be encoded + + Returns + ======= + A Message tuple + """ + messages = [cls.create_message(payload) for payload in payloads] + message_set = cls.encode_message_set(messages) + snapped = snappy_encode(message_set) + return Message(1, 0x00 | (KafkaClient.ATTRIBUTE_CODEC_MASK & 0x02), zlib.crc32(snapped), snapped) + def send_message_set(self, produceRequest): """ Send a ProduceRequest |