summaryrefslogtreecommitdiff
path: root/kafka/codec.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-07 18:52:05 -0700
committerDana Powers <dana.powers@rd.io>2014-09-07 19:09:32 -0700
commit715425c639a476139065689afde3d255a07d6f96 (patch)
tree0ef2cd875c97c8ca867d89328d6fd5fec7dfcbe8 /kafka/codec.py
parenta99384f4c601d127ab1c4fe5b272ea5c07fd695d (diff)
parentbe23042ecd9ab330886745ccc9ec9e3a0039836f (diff)
downloadkafka-python-715425c639a476139065689afde3d255a07d6f96.tar.gz
Merge pull request #227 from wizzat-feature/py3
Python 3 Support Conflicts: kafka/producer.py test/test_client.py test/test_client_integration.py test/test_codec.py test/test_consumer.py test/test_consumer_integration.py test/test_failover_integration.py test/test_producer.py test/test_producer_integration.py test/test_protocol.py test/test_util.py
Diffstat (limited to 'kafka/codec.py')
-rw-r--r--kafka/codec.py19
1 files changed, 11 insertions, 8 deletions
diff --git a/kafka/codec.py b/kafka/codec.py
index 206ddb4..2279200 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -1,8 +1,11 @@
-from cStringIO import StringIO
+from io import BytesIO
import gzip
import struct
-_XERIAL_V1_HEADER = (-126, 'S', 'N', 'A', 'P', 'P', 'Y', 0, 1, 1)
+import six
+from six.moves import xrange
+
+_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
_XERIAL_V1_FORMAT = 'bccccccBii'
try:
@@ -21,7 +24,7 @@ def has_snappy():
def gzip_encode(payload):
- buffer = StringIO()
+ buffer = BytesIO()
handle = gzip.GzipFile(fileobj=buffer, mode="w")
handle.write(payload)
handle.close()
@@ -32,7 +35,7 @@ def gzip_encode(payload):
def gzip_decode(payload):
- buffer = StringIO(payload)
+ buffer = BytesIO(payload)
handle = gzip.GzipFile(fileobj=buffer, mode='r')
result = handle.read()
handle.close()
@@ -68,9 +71,9 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
for i in xrange(0, len(payload), xerial_blocksize):
yield payload[i:i+xerial_blocksize]
- out = StringIO()
+ out = BytesIO()
- header = ''.join([struct.pack('!' + fmt, dat) for fmt, dat
+ header = b''.join([struct.pack('!' + fmt, dat) for fmt, dat
in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])
out.write(header)
@@ -121,8 +124,8 @@ def snappy_decode(payload):
if _detect_xerial_stream(payload):
# TODO ? Should become a fileobj ?
- out = StringIO()
- byt = buffer(payload[16:])
+ out = BytesIO()
+ byt = payload[16:]
length = len(byt)
cursor = 0