summaryrefslogtreecommitdiff
path: root/test/test_codec.py
diff options
context:
space:
mode:
authorMark Roberts <wizzat@gmail.com>2014-04-08 01:09:23 -0700
committerMark Roberts <wizzat@gmail.com>2014-04-08 01:09:23 -0700
commit80b3335ed2b927c9fadb80f8ff186474f7328b3f (patch)
treec344498624f3e20a53792f145144b16fdef64e6d /test/test_codec.py
parent6ee151427c144cc830eba954234f4f89f4529fe3 (diff)
downloadkafka-python-80b3335ed2b927c9fadb80f8ff186474f7328b3f.tar.gz
Split test files, modify test_protocol
Diffstat (limited to 'test/test_codec.py')
-rw-r--r--test/test_codec.py90
1 files changed, 90 insertions, 0 deletions
diff --git a/test/test_codec.py b/test/test_codec.py
new file mode 100644
index 0000000..8872fe7
--- /dev/null
+++ b/test/test_codec.py
@@ -0,0 +1,90 @@
+import os
+import random
+import struct
+import unittest
+
+from mock import MagicMock, patch
+
+from kafka import KafkaClient
+from kafka.common import (
+ ProduceRequest, FetchRequest, Message, ChecksumError,
+ ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
+ OffsetAndMessage, BrokerMetadata, PartitionMetadata,
+ TopicAndPartition, KafkaUnavailableError,
+ LeaderUnavailableError, PartitionUnavailableError
+)
+from kafka.codec import (
+ has_gzip, has_snappy, gzip_encode, gzip_decode,
+ snappy_encode, snappy_decode
+)
+from kafka.protocol import (
+ create_gzip_message, create_message, create_snappy_message, KafkaProtocol
+)
+
+ITERATIONS = 1000
+STRLEN = 100
+
+
+def random_string():
+ return os.urandom(random.randint(1, STRLEN))
+
+
+class TestCodec(unittest.TestCase):
+ @unittest.skipUnless(has_gzip(), "Gzip not available")
+ def test_gzip(self):
+ for i in xrange(ITERATIONS):
+ s1 = random_string()
+ s2 = gzip_decode(gzip_encode(s1))
+ self.assertEquals(s1, s2)
+
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
+ def test_snappy(self):
+ for i in xrange(ITERATIONS):
+ s1 = random_string()
+ s2 = snappy_decode(snappy_encode(s1))
+ self.assertEquals(s1, s2)
+
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
+ def test_snappy_detect_xerial(self):
+ import kafka as kafka1
+ _detect_xerial_stream = kafka1.codec._detect_xerial_stream
+
+ header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes'
+ false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01'
+ random_snappy = snappy_encode('SNAPPY' * 50)
+ short_data = b'\x01\x02\x03\x04'
+
+ self.assertTrue(_detect_xerial_stream(header))
+ self.assertFalse(_detect_xerial_stream(b''))
+ self.assertFalse(_detect_xerial_stream(b'\x00'))
+ self.assertFalse(_detect_xerial_stream(false_header))
+ self.assertFalse(_detect_xerial_stream(random_snappy))
+ self.assertFalse(_detect_xerial_stream(short_data))
+
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
+ def test_snappy_decode_xerial(self):
+ header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
+ random_snappy = snappy_encode('SNAPPY' * 50)
+ block_len = len(random_snappy)
+ random_snappy2 = snappy_encode('XERIAL' * 50)
+ block_len2 = len(random_snappy2)
+
+ to_test = header \
+ + struct.pack('!i', block_len) + random_snappy \
+ + struct.pack('!i', block_len2) + random_snappy2 \
+
+ self.assertEquals(snappy_decode(to_test), ('SNAPPY' * 50) + ('XERIAL' * 50))
+
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
+ def test_snappy_encode_xerial(self):
+ to_ensure = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + \
+ '\x00\x00\x00\x18' + \
+ '\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + \
+ '\x00\x00\x00\x18' + \
+ '\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
+
+ to_test = ('SNAPPY' * 50) + ('XERIAL' * 50)
+
+ compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300)
+ self.assertEquals(compressed, to_ensure)
+