diff options
-rw-r--r-- | kafka/codec.py | 38 |
1 files changed, 16 insertions, 22 deletions
diff --git a/kafka/codec.py b/kafka/codec.py index c8195ee..0109a66 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -9,9 +9,8 @@ _XERIAL_V1_FORMAT = 'bccccccBii' try: import snappy - _HAS_SNAPPY = True except ImportError: - _HAS_SNAPPY = False + snappy = None try: import lz4 @@ -28,7 +27,7 @@ def has_gzip(): def has_snappy(): - return _HAS_SNAPPY + return snappy is not None def has_lz4(): @@ -68,7 +67,7 @@ def gzip_decode(payload): return result -def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32*1024): +def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32*1024): """Encodes the given data with snappy compression. If xerial_compatible is set then the stream is encoded in a fashion @@ -97,28 +96,23 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32*1024): if not has_snappy(): raise NotImplementedError("Snappy codec is not available") - if xerial_compatible: - def _chunker(): - for i in xrange(0, len(payload), xerial_blocksize): - yield payload[i:i+xerial_blocksize] + if not xerial_compatible: + return snappy.compress(payload) - out = BytesIO() + out = BytesIO() + for fmt, dat in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER): + out.write(struct.pack('!' + fmt, dat)) - header = b''.join([struct.pack('!' + fmt, dat) for fmt, dat - in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)]) + # Chunk through buffers to avoid creating intermediate slice copies + for chunk in (buffer(payload, i, xerial_blocksize) + for i in xrange(0, len(payload), xerial_blocksize)): - out.write(header) - for chunk in _chunker(): - block = snappy.compress(chunk) - block_size = len(block) - out.write(struct.pack('!i', block_size)) - out.write(block) + block = snappy.compress(chunk) + block_size = len(block) + out.write(struct.pack('!i', block_size)) + out.write(block) - out.seek(0) - return out.read() - - else: - return snappy.compress(payload) + return out.getvalue() def _detect_xerial_stream(payload): |