summaryrefslogtreecommitdiff
path: root/test/test_codec.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_codec.py')
-rw-r--r--test/test_codec.py24
1 files changed, 6 insertions, 18 deletions
diff --git a/test/test_codec.py b/test/test_codec.py
index 40bd1b4..2e6f67e 100644
--- a/test/test_codec.py
+++ b/test/test_codec.py
@@ -1,18 +1,6 @@
-import os
-import random
import struct
-import unittest
+import unittest2
-from mock import MagicMock, patch
-
-from kafka import KafkaClient
-from kafka.common import (
- ProduceRequest, FetchRequest, Message, ChecksumError,
- ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
- OffsetAndMessage, BrokerMetadata, PartitionMetadata,
- TopicAndPartition, KafkaUnavailableError,
- LeaderUnavailableError, PartitionUnavailableError
-)
from kafka.codec import (
has_snappy, gzip_encode, gzip_decode,
snappy_encode, snappy_decode
@@ -22,21 +10,21 @@ from kafka.protocol import (
)
from testutil import *
-class TestCodec(unittest.TestCase):
+class TestCodec(unittest2.TestCase):
def test_gzip(self):
for i in xrange(1000):
s1 = random_string(100)
s2 = gzip_decode(gzip_encode(s1))
self.assertEquals(s1, s2)
- @unittest.skipUnless(has_snappy(), "Snappy not available")
+ @unittest2.skipUnless(has_snappy(), "Snappy not available")
def test_snappy(self):
for i in xrange(1000):
s1 = random_string(100)
s2 = snappy_decode(snappy_encode(s1))
self.assertEquals(s1, s2)
- @unittest.skipUnless(has_snappy(), "Snappy not available")
+ @unittest2.skipUnless(has_snappy(), "Snappy not available")
def test_snappy_detect_xerial(self):
import kafka as kafka1
_detect_xerial_stream = kafka1.codec._detect_xerial_stream
@@ -53,7 +41,7 @@ class TestCodec(unittest.TestCase):
self.assertFalse(_detect_xerial_stream(random_snappy))
self.assertFalse(_detect_xerial_stream(short_data))
- @unittest.skipUnless(has_snappy(), "Snappy not available")
+ @unittest2.skipUnless(has_snappy(), "Snappy not available")
def test_snappy_decode_xerial(self):
header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
random_snappy = snappy_encode('SNAPPY' * 50)
@@ -67,7 +55,7 @@ class TestCodec(unittest.TestCase):
self.assertEquals(snappy_decode(to_test), ('SNAPPY' * 50) + ('XERIAL' * 50))
- @unittest.skipUnless(has_snappy(), "Snappy not available")
+ @unittest2.skipUnless(has_snappy(), "Snappy not available")
def test_snappy_encode_xerial(self):
to_ensure = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + \
'\x00\x00\x00\x18' + \