summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-25 15:36:08 -0800
committerDana Powers <dana.powers@rd.io>2016-01-25 15:36:15 -0800
commit7e09258409633ef3f3dbc4f45b89c1a5bb09b05c (patch)
tree63104b9ff56adaa72b5d43bb79a25244c0e29ac8
parent0dcd5f10b983b85a17e38065d79fe8f632e70fad (diff)
downloadkafka-python-7e09258409633ef3f3dbc4f45b89c1a5bb09b05c.tar.gz
Write xerial-formatted snappy by default; use buffers to reduce copies
-rw-r--r--kafka/codec.py38
1 files changed, 16 insertions, 22 deletions
diff --git a/kafka/codec.py b/kafka/codec.py
index c8195ee..0109a66 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -9,9 +9,8 @@ _XERIAL_V1_FORMAT = 'bccccccBii'
try:
import snappy
- _HAS_SNAPPY = True
except ImportError:
- _HAS_SNAPPY = False
+ snappy = None
try:
import lz4
@@ -28,7 +27,7 @@ def has_gzip():
def has_snappy():
- return _HAS_SNAPPY
+ return snappy is not None
def has_lz4():
@@ -68,7 +67,7 @@ def gzip_decode(payload):
return result
-def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32*1024):
+def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32*1024):
"""Encodes the given data with snappy compression.
If xerial_compatible is set then the stream is encoded in a fashion
@@ -97,28 +96,23 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32*1024):
if not has_snappy():
raise NotImplementedError("Snappy codec is not available")
- if xerial_compatible:
- def _chunker():
- for i in xrange(0, len(payload), xerial_blocksize):
- yield payload[i:i+xerial_blocksize]
+ if not xerial_compatible:
+ return snappy.compress(payload)
- out = BytesIO()
+ out = BytesIO()
+ for fmt, dat in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER):
+ out.write(struct.pack('!' + fmt, dat))
- header = b''.join([struct.pack('!' + fmt, dat) for fmt, dat
- in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])
+ # Chunk through buffers to avoid creating intermediate slice copies
+ for chunk in (buffer(payload, i, xerial_blocksize)
+ for i in xrange(0, len(payload), xerial_blocksize)):
- out.write(header)
- for chunk in _chunker():
- block = snappy.compress(chunk)
- block_size = len(block)
- out.write(struct.pack('!i', block_size))
- out.write(block)
+ block = snappy.compress(chunk)
+ block_size = len(block)
+ out.write(struct.pack('!i', block_size))
+ out.write(block)
- out.seek(0)
- return out.read()
-
- else:
- return snappy.compress(payload)
+ return out.getvalue()
def _detect_xerial_stream(payload):