summaryrefslogtreecommitdiff
path: root/test/test_codec.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_codec.py')
-rw-r--r--test/test_codec.py137
1 files changed, 75 insertions, 62 deletions
diff --git a/test/test_codec.py b/test/test_codec.py
index 3416fdb..e7065a2 100644
--- a/test/test_codec.py
+++ b/test/test_codec.py
@@ -1,72 +1,85 @@
import struct
+import pytest
from six.moves import xrange
-from . import unittest
from kafka.codec import (
- has_snappy, gzip_encode, gzip_decode,
- snappy_encode, snappy_decode
+ has_snappy, has_gzip, has_lz4,
+ gzip_encode, gzip_decode,
+ snappy_encode, snappy_decode,
+ lz4_encode, lz4_decode,
)
from test.testutil import random_string
-class TestCodec(unittest.TestCase):
- def test_gzip(self):
- for i in xrange(1000):
- b1 = random_string(100).encode('utf-8')
- b2 = gzip_decode(gzip_encode(b1))
- self.assertEqual(b1, b2)
-
- @unittest.skipUnless(has_snappy(), "Snappy not available")
- def test_snappy(self):
- for i in xrange(1000):
- b1 = random_string(100).encode('utf-8')
- b2 = snappy_decode(snappy_encode(b1))
- self.assertEqual(b1, b2)
-
- @unittest.skipUnless(has_snappy(), "Snappy not available")
- def test_snappy_detect_xerial(self):
- import kafka as kafka1
- _detect_xerial_stream = kafka1.codec._detect_xerial_stream
-
- header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes'
- false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01'
- random_snappy = snappy_encode(b'SNAPPY' * 50)
- short_data = b'\x01\x02\x03\x04'
-
- self.assertTrue(_detect_xerial_stream(header))
- self.assertFalse(_detect_xerial_stream(b''))
- self.assertFalse(_detect_xerial_stream(b'\x00'))
- self.assertFalse(_detect_xerial_stream(false_header))
- self.assertFalse(_detect_xerial_stream(random_snappy))
- self.assertFalse(_detect_xerial_stream(short_data))
-
- @unittest.skipUnless(has_snappy(), "Snappy not available")
- def test_snappy_decode_xerial(self):
- header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
- random_snappy = snappy_encode(b'SNAPPY' * 50)
- block_len = len(random_snappy)
- random_snappy2 = snappy_encode(b'XERIAL' * 50)
- block_len2 = len(random_snappy2)
-
- to_test = header \
- + struct.pack('!i', block_len) + random_snappy \
- + struct.pack('!i', block_len2) + random_snappy2 \
-
- self.assertEqual(snappy_decode(to_test), (b'SNAPPY' * 50) + (b'XERIAL' * 50))
-
- @unittest.skipUnless(has_snappy(), "Snappy not available")
- def test_snappy_encode_xerial(self):
- to_ensure = (
- b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
- b'\x00\x00\x00\x18'
- b'\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
- b'\x00\x00\x00\x18'
- b'\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
- )
-
- to_test = (b'SNAPPY' * 50) + (b'XERIAL' * 50)
-
- compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300)
- self.assertEqual(compressed, to_ensure)
+def test_gzip():
+ for i in xrange(1000):
+ b1 = random_string(100).encode('utf-8')
+ b2 = gzip_decode(gzip_encode(b1))
+ assert b1 == b2
+
+
+@pytest.mark.skipif(not has_snappy(), reason="Snappy not available")
+def test_snappy():
+ for i in xrange(1000):
+ b1 = random_string(100).encode('utf-8')
+ b2 = snappy_decode(snappy_encode(b1))
+ assert b1 == b2
+
+
+@pytest.mark.skipif(not has_snappy(), reason="Snappy not available")
+def test_snappy_detect_xerial():
+ import kafka as kafka1
+ _detect_xerial_stream = kafka1.codec._detect_xerial_stream
+
+ header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes'
+ false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01'
+ random_snappy = snappy_encode(b'SNAPPY' * 50)
+ short_data = b'\x01\x02\x03\x04'
+
+ assert _detect_xerial_stream(header) is True
+ assert _detect_xerial_stream(b'') is False
+ assert _detect_xerial_stream(b'\x00') is False
+ assert _detect_xerial_stream(false_header) is False
+ assert _detect_xerial_stream(random_snappy) is False
+ assert _detect_xerial_stream(short_data) is False
+
+
+@pytest.mark.skipif(not has_snappy(), reason="Snappy not available")
+def test_snappy_decode_xerial():
+ header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
+ random_snappy = snappy_encode(b'SNAPPY' * 50)
+ block_len = len(random_snappy)
+ random_snappy2 = snappy_encode(b'XERIAL' * 50)
+ block_len2 = len(random_snappy2)
+
+ to_test = header \
+ + struct.pack('!i', block_len) + random_snappy \
+ + struct.pack('!i', block_len2) + random_snappy2 \
+
+ assert snappy_decode(to_test) == (b'SNAPPY' * 50) + (b'XERIAL' * 50)
+
+
+@pytest.mark.skipif(not has_snappy(), reason="Snappy not available")
+def test_snappy_encode_xerial():
+ to_ensure = (
+ b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
+ b'\x00\x00\x00\x18'
+ b'\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
+ b'\x00\x00\x00\x18'
+ b'\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
+ )
+
+ to_test = (b'SNAPPY' * 50) + (b'XERIAL' * 50)
+
+ compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300)
+ assert compressed == to_ensure
+
+
+@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available")
+def test_lz4():
+ for i in xrange(1000):
+ b1 = random_string(100).encode('utf-8')
+ b2 = lz4_decode(lz4_encode(b1))
+ assert b1 == b2