diff options
Diffstat (limited to 'test/test_codec.py')
-rw-r--r-- | test/test_codec.py | 137 |
1 files changed, 75 insertions, 62 deletions
diff --git a/test/test_codec.py b/test/test_codec.py index 3416fdb..e7065a2 100644 --- a/test/test_codec.py +++ b/test/test_codec.py @@ -1,72 +1,85 @@ import struct +import pytest from six.moves import xrange -from . import unittest from kafka.codec import ( - has_snappy, gzip_encode, gzip_decode, - snappy_encode, snappy_decode + has_snappy, has_gzip, has_lz4, + gzip_encode, gzip_decode, + snappy_encode, snappy_decode, + lz4_encode, lz4_decode, ) from test.testutil import random_string -class TestCodec(unittest.TestCase): - def test_gzip(self): - for i in xrange(1000): - b1 = random_string(100).encode('utf-8') - b2 = gzip_decode(gzip_encode(b1)) - self.assertEqual(b1, b2) - - @unittest.skipUnless(has_snappy(), "Snappy not available") - def test_snappy(self): - for i in xrange(1000): - b1 = random_string(100).encode('utf-8') - b2 = snappy_decode(snappy_encode(b1)) - self.assertEqual(b1, b2) - - @unittest.skipUnless(has_snappy(), "Snappy not available") - def test_snappy_detect_xerial(self): - import kafka as kafka1 - _detect_xerial_stream = kafka1.codec._detect_xerial_stream - - header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes' - false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01' - random_snappy = snappy_encode(b'SNAPPY' * 50) - short_data = b'\x01\x02\x03\x04' - - self.assertTrue(_detect_xerial_stream(header)) - self.assertFalse(_detect_xerial_stream(b'')) - self.assertFalse(_detect_xerial_stream(b'\x00')) - self.assertFalse(_detect_xerial_stream(false_header)) - self.assertFalse(_detect_xerial_stream(random_snappy)) - self.assertFalse(_detect_xerial_stream(short_data)) - - @unittest.skipUnless(has_snappy(), "Snappy not available") - def test_snappy_decode_xerial(self): - header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' - random_snappy = snappy_encode(b'SNAPPY' * 50) - block_len = len(random_snappy) - random_snappy2 = snappy_encode(b'XERIAL' * 50) - block_len2 = len(random_snappy2) - - to_test = header \ - + struct.pack('!i', block_len) + random_snappy \ - + struct.pack('!i', block_len2) + random_snappy2 \ - - self.assertEqual(snappy_decode(to_test), (b'SNAPPY' * 50) + (b'XERIAL' * 50)) - - @unittest.skipUnless(has_snappy(), "Snappy not available") - def test_snappy_encode_xerial(self): - to_ensure = ( - b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' - b'\x00\x00\x00\x18' - b'\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' - b'\x00\x00\x00\x18' - b'\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' - ) - - to_test = (b'SNAPPY' * 50) + (b'XERIAL' * 50) - - compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300) - self.assertEqual(compressed, to_ensure) +def test_gzip(): + for i in xrange(1000): + b1 = random_string(100).encode('utf-8') + b2 = gzip_decode(gzip_encode(b1)) + assert b1 == b2 + + +@pytest.mark.skipif(not has_snappy(), reason="Snappy not available") +def test_snappy(): + for i in xrange(1000): + b1 = random_string(100).encode('utf-8') + b2 = snappy_decode(snappy_encode(b1)) + assert b1 == b2 + + +@pytest.mark.skipif(not has_snappy(), reason="Snappy not available") +def test_snappy_detect_xerial(): + import kafka as kafka1 + _detect_xerial_stream = kafka1.codec._detect_xerial_stream + + header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes' + false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01' + random_snappy = snappy_encode(b'SNAPPY' * 50) + short_data = b'\x01\x02\x03\x04' + + assert _detect_xerial_stream(header) is True + assert _detect_xerial_stream(b'') is False + assert _detect_xerial_stream(b'\x00') is False + assert _detect_xerial_stream(false_header) is False + assert _detect_xerial_stream(random_snappy) is False + assert _detect_xerial_stream(short_data) is False + + +@pytest.mark.skipif(not has_snappy(), reason="Snappy not available") +def test_snappy_decode_xerial(): + header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + random_snappy = snappy_encode(b'SNAPPY' * 50) + block_len = len(random_snappy) + random_snappy2 = snappy_encode(b'XERIAL' * 50) + block_len2 = len(random_snappy2) + + to_test = header \ + + struct.pack('!i', block_len) + random_snappy \ + + struct.pack('!i', block_len2) + random_snappy2 \ + + assert snappy_decode(to_test) == (b'SNAPPY' * 50) + (b'XERIAL' * 50) + + +@pytest.mark.skipif(not has_snappy(), reason="Snappy not available") +def test_snappy_encode_xerial(): + to_ensure = ( + b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + b'\x00\x00\x00\x18' + b'\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + b'\x00\x00\x00\x18' + b'\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + ) + + to_test = (b'SNAPPY' * 50) + (b'XERIAL' * 50) + + compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300) + assert compressed == to_ensure + + +@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available") +def test_lz4(): + for i in xrange(1000): + b1 = random_string(100).encode('utf-8') + b2 = lz4_decode(lz4_encode(b1)) + assert b1 == b2 |