summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/codec.py98
-rw-r--r--test/test_unit.py43
2 files changed, 138 insertions, 3 deletions
diff --git a/kafka/codec.py b/kafka/codec.py
index eb5d03c..206ddb4 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -1,5 +1,9 @@
from cStringIO import StringIO
import gzip
+import struct
+
+_XERIAL_V1_HEADER = (-126, 'S', 'N', 'A', 'P', 'P', 'Y', 0, 1, 1)
+_XERIAL_V1_FORMAT = 'bccccccBii'
try:
import snappy
@@ -36,13 +40,101 @@ def gzip_decode(payload):
return result
-def snappy_encode(payload):
+def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
+ """Encodes the given data with snappy if xerial_compatible is set then the
+ stream is encoded in a fashion compatible with the xerial snappy library
+
+ The block size (xerial_blocksize) controls how frequent the blocking occurs
+ 32k is the default in the xerial library.
+
+ The format winds up being
+ +-------------+------------+--------------+------------+--------------+
+ | Header | Block1 len | Block1 data | Blockn len | Blockn data |
+ |-------------+------------+--------------+------------+--------------|
+ | 16 bytes | BE int32 | snappy bytes | BE int32 | snappy bytes |
+ +-------------+------------+--------------+------------+--------------+
+
+ It is important to not that the blocksize is the amount of uncompressed
+ data presented to snappy at each block, whereas the blocklen is the
+ number of bytes that will be present in the stream, that is the
+ length will always be <= blocksize.
+ """
+
if not _has_snappy:
raise NotImplementedError("Snappy codec is not available")
- return snappy.compress(payload)
+
+ if xerial_compatible:
+ def _chunker():
+ for i in xrange(0, len(payload), xerial_blocksize):
+ yield payload[i:i+xerial_blocksize]
+
+ out = StringIO()
+
+ header = ''.join([struct.pack('!' + fmt, dat) for fmt, dat
+ in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])
+
+ out.write(header)
+ for chunk in _chunker():
+ block = snappy.compress(chunk)
+ block_size = len(block)
+ out.write(struct.pack('!i', block_size))
+ out.write(block)
+
+ out.seek(0)
+ return out.read()
+
+ else:
+ return snappy.compress(payload)
+
+
+def _detect_xerial_stream(payload):
+ """Detects if the data given might have been encoded with the blocking mode
+ of the xerial snappy library.
+
+ This mode writes a magic header of the format:
+ +--------+--------------+------------+---------+--------+
+ | Marker | Magic String | Null / Pad | Version | Compat |
+ |--------+--------------+------------+---------+--------|
+ | byte | c-string | byte | int32 | int32 |
+ |--------+--------------+------------+---------+--------|
+ | -126 | 'SNAPPY' | \0 | | |
+ +--------+--------------+------------+---------+--------+
+
+ The pad appears to be to ensure that SNAPPY is a valid cstring
+ The version is the version of this format as written by xerial,
+ in the wild this is currently 1 as such we only support v1.
+
+ Compat is there to claim the miniumum supported version that
+ can read a xerial block stream, presently in the wild this is
+ 1.
+ """
+
+ if len(payload) > 16:
+ header = header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16])
+ return header == _XERIAL_V1_HEADER
+ return False
def snappy_decode(payload):
if not _has_snappy:
raise NotImplementedError("Snappy codec is not available")
- return snappy.decompress(payload)
+
+ if _detect_xerial_stream(payload):
+ # TODO ? Should become a fileobj ?
+ out = StringIO()
+ byt = buffer(payload[16:])
+ length = len(byt)
+ cursor = 0
+
+ while cursor < length:
+ block_size = struct.unpack_from('!i', byt[cursor:])[0]
+ # Skip the block size
+ cursor += 4
+ end = cursor + block_size
+ out.write(snappy.decompress(byt[cursor:end]))
+ cursor = end
+
+ out.seek(0)
+ return out.read()
+ else:
+ return snappy.decompress(payload)
diff --git a/test/test_unit.py b/test/test_unit.py
index e3fd4bb..b5f0118 100644
--- a/test/test_unit.py
+++ b/test/test_unit.py
@@ -70,6 +70,49 @@ class TestCodec(unittest.TestCase):
s2 = snappy_decode(snappy_encode(s1))
self.assertEquals(s1, s2)
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
+ def test_snappy_detect_xerial(self):
+ import kafka as kafka1
+ _detect_xerial_stream = kafka1.codec._detect_xerial_stream
+
+ header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes'
+ false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01'
+ random_snappy = snappy_encode('SNAPPY' * 50)
+ short_data = b'\x01\x02\x03\x04'
+
+ self.assertTrue(_detect_xerial_stream(header))
+ self.assertFalse(_detect_xerial_stream(b''))
+ self.assertFalse(_detect_xerial_stream(b'\x00'))
+ self.assertFalse(_detect_xerial_stream(false_header))
+ self.assertFalse(_detect_xerial_stream(random_snappy))
+ self.assertFalse(_detect_xerial_stream(short_data))
+
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
+ def test_snappy_decode_xerial(self):
+ header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
+ random_snappy = snappy_encode('SNAPPY' * 50)
+ block_len = len(random_snappy)
+ random_snappy2 = snappy_encode('XERIAL' * 50)
+ block_len2 = len(random_snappy2)
+
+ to_test = header \
+ + struct.pack('!i', block_len) + random_snappy \
+ + struct.pack('!i', block_len2) + random_snappy2 \
+
+ self.assertEquals(snappy_decode(to_test), ('SNAPPY' * 50) + ('XERIAL' * 50))
+
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
+ def test_snappy_encode_xerial(self):
+ to_ensure = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + \
+ '\x00\x00\x00\x18' + \
+ '\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + \
+ '\x00\x00\x00\x18' + \
+ '\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
+
+ to_test = ('SNAPPY' * 50) + ('XERIAL' * 50)
+
+ compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300)
+ self.assertEquals(compressed, to_ensure)
class TestProtocol(unittest.TestCase):