diff options
author | Mark Roberts <wizzat@gmail.com> | 2014-04-08 01:09:23 -0700 |
---|---|---|
committer | Mark Roberts <wizzat@gmail.com> | 2014-04-08 01:09:23 -0700 |
commit | 80b3335ed2b927c9fadb80f8ff186474f7328b3f (patch) | |
tree | c344498624f3e20a53792f145144b16fdef64e6d /test/test_codec.py | |
parent | 6ee151427c144cc830eba954234f4f89f4529fe3 (diff) | |
download | kafka-python-80b3335ed2b927c9fadb80f8ff186474f7328b3f.tar.gz |
Split test files, modify test_protocol
Diffstat (limited to 'test/test_codec.py')
-rw-r--r-- | test/test_codec.py | 90 |
1 files changed, 90 insertions, 0 deletions
diff --git a/test/test_codec.py b/test/test_codec.py new file mode 100644 index 0000000..8872fe7 --- /dev/null +++ b/test/test_codec.py @@ -0,0 +1,90 @@ +import os +import random +import struct +import unittest + +from mock import MagicMock, patch + +from kafka import KafkaClient +from kafka.common import ( + ProduceRequest, FetchRequest, Message, ChecksumError, + ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, + OffsetAndMessage, BrokerMetadata, PartitionMetadata, + TopicAndPartition, KafkaUnavailableError, + LeaderUnavailableError, PartitionUnavailableError +) +from kafka.codec import ( + has_gzip, has_snappy, gzip_encode, gzip_decode, + snappy_encode, snappy_decode +) +from kafka.protocol import ( + create_gzip_message, create_message, create_snappy_message, KafkaProtocol +) + +ITERATIONS = 1000 +STRLEN = 100 + + +def random_string(): + return os.urandom(random.randint(1, STRLEN)) + + +class TestCodec(unittest.TestCase): + @unittest.skipUnless(has_gzip(), "Gzip not available") + def test_gzip(self): + for i in xrange(ITERATIONS): + s1 = random_string() + s2 = gzip_decode(gzip_encode(s1)) + self.assertEquals(s1, s2) + + @unittest.skipUnless(has_snappy(), "Snappy not available") + def test_snappy(self): + for i in xrange(ITERATIONS): + s1 = random_string() + s2 = snappy_decode(snappy_encode(s1)) + self.assertEquals(s1, s2) + + @unittest.skipUnless(has_snappy(), "Snappy not available") + def test_snappy_detect_xerial(self): + import kafka as kafka1 + _detect_xerial_stream = kafka1.codec._detect_xerial_stream + + header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes' + false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01' + random_snappy = snappy_encode('SNAPPY' * 50) + short_data = b'\x01\x02\x03\x04' + + self.assertTrue(_detect_xerial_stream(header)) + self.assertFalse(_detect_xerial_stream(b'')) + self.assertFalse(_detect_xerial_stream(b'\x00')) + self.assertFalse(_detect_xerial_stream(false_header)) + self.assertFalse(_detect_xerial_stream(random_snappy)) + self.assertFalse(_detect_xerial_stream(short_data)) + + @unittest.skipUnless(has_snappy(), "Snappy not available") + def test_snappy_decode_xerial(self): + header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + random_snappy = snappy_encode('SNAPPY' * 50) + block_len = len(random_snappy) + random_snappy2 = snappy_encode('XERIAL' * 50) + block_len2 = len(random_snappy2) + + to_test = header \ + + struct.pack('!i', block_len) + random_snappy \ + + struct.pack('!i', block_len2) + random_snappy2 \ + + self.assertEquals(snappy_decode(to_test), ('SNAPPY' * 50) + ('XERIAL' * 50)) + + @unittest.skipUnless(has_snappy(), "Snappy not available") + def test_snappy_encode_xerial(self): + to_ensure = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + \ + '\x00\x00\x00\x18' + \ + '\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + \ + '\x00\x00\x00\x18' + \ + '\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + + to_test = ('SNAPPY' * 50) + ('XERIAL' * 50) + + compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300) + self.assertEquals(compressed, to_ensure) + |