diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/__init__.py | 1 | ||||
-rw-r--r-- | kafka/client.py | 24 | ||||
-rw-r--r-- | kafka/codec.py | 17 |
3 files changed, 41 insertions, 1 deletions
diff --git a/kafka/__init__.py b/kafka/__init__.py index 41038d7..1dcae86 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -9,3 +9,4 @@ from .client import ( Message, ProduceRequest, FetchRequest, OffsetRequest ) from .codec import gzip_encode, gzip_decode +from .codec import snappy_encode, snappy_decode diff --git a/kafka/client.py b/kafka/client.py index 9d23568..212f5c3 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -5,6 +5,7 @@ import struct import zlib from .codec import gzip_encode, gzip_decode +from .codec import snappy_encode, snappy_decode log = logging.getLogger("kafka") @@ -340,7 +341,10 @@ class KafkaClient(object): yield msg elif att & KafkaClient.ATTRIBUTE_CODEC_MASK == 2: # Snappy encoded Message - raise NotImplementedError("Snappy codec is not yet supported") + snp = snappy_decode(payload) + (msgs, _) = cls.read_message_set(snp) + for msg in msgs: + yield msg else: raise RuntimeError("Unsupported compression type: %d" % (att & KafkaClient.ATTRIBUTE_CODEC_MASK)) @@ -437,6 +441,24 @@ class KafkaClient(object): gzipped = gzip_encode(message_set) return Message(1, 0x00 | (KafkaClient.ATTRIBUTE_CODEC_MASK & 0x01), zlib.crc32(gzipped), gzipped) + @classmethod + def create_snappy_message(cls, *payloads): + """ + Create a Snappy encoded Message + + Params + ====== + payloads, list of messages (bytes) to be encoded + + Returns + ======= + A Message tuple + """ + messages = [cls.create_message(payload) for payload in payloads] + message_set = cls.encode_message_set(messages) + snapped = snappy_encode(message_set) + return Message(1, 0x00 | (KafkaClient.ATTRIBUTE_CODEC_MASK & 0x02), zlib.crc32(snapped), snapped) + def send_message_set(self, produceRequest): """ Send a ProduceRequest diff --git a/kafka/codec.py b/kafka/codec.py index 47ab074..83f3c0b 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -4,6 +4,13 @@ import logging log = logging.getLogger("kafka.codec") +try: + import snappy + hasSnappy=True +except ImportError: + log.warn("Snappy codec not available") + hasSnappy=False + def gzip_encode(payload): buf = StringIO() f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6) @@ -21,3 +28,13 @@ def gzip_decode(payload): f.close() buf.close() return out + +def snappy_encode(payload): + if not hasSnappy: + raise NotImplementedError("Snappy codec not available") + return snappy.compress(payload) + +def snappy_decode(payload): + if not hasSnappy: + raise NotImplementedError("Snappy codec not available") + return snappy.decompress(payload) |