summaryrefslogtreecommitdiff
path: root/kafka/protocol.py
diff options
context:
space:
mode:
authorBruno ReniƩ <brutasse@gmail.com>2014-08-28 20:50:20 +0200
committerMark Roberts <wizzat@fb.com>2014-09-03 09:55:44 -0700
commitcf0b7f0530e765f2cf710bd35daf53bb4ea205d2 (patch)
treef38669f145112a7f301691f311b3c8aca51d1f59 /kafka/protocol.py
parent83af5102e995e854a1980b90f1400afdd098da37 (diff)
downloadkafka-python-cf0b7f0530e765f2cf710bd35daf53bb4ea205d2.tar.gz
Make all unit tests pass on py3.3/3.4
Diffstat (limited to 'kafka/protocol.py')
-rw-r--r--kafka/protocol.py13
1 files changed, 8 insertions, 5 deletions
diff --git a/kafka/protocol.py b/kafka/protocol.py
index 58661c7..a9475c3 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -1,6 +1,9 @@
import logging
import struct
-import zlib
+
+import six
+
+from six.moves import xrange
from kafka.codec import (
gzip_encode, gzip_decode, snappy_encode, snappy_decode
@@ -13,7 +16,7 @@ from kafka.common import (
UnsupportedCodecError
)
from kafka.util import (
- read_short_string, read_int_string, relative_unpack,
+ crc32, read_short_string, read_int_string, relative_unpack,
write_short_string, write_int_string, group_by_topic_and_partition
)
@@ -67,7 +70,7 @@ class KafkaProtocol(object):
Offset => int64
MessageSize => int32
"""
- message_set = ""
+ message_set = b""
for message in messages:
encoded_message = KafkaProtocol._encode_message(message)
message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message)
@@ -94,7 +97,7 @@ class KafkaProtocol(object):
msg = struct.pack('>BB', message.magic, message.attributes)
msg += write_int_string(message.key)
msg += write_int_string(message.value)
- crc = zlib.crc32(msg)
+ crc = crc32(msg)
msg = struct.pack('>i%ds' % len(msg), crc, msg)
else:
raise ProtocolError("Unexpected magic number: %d" % message.magic)
@@ -146,7 +149,7 @@ class KafkaProtocol(object):
of the MessageSet payload).
"""
((crc, magic, att), cur) = relative_unpack('>iBB', data, 0)
- if crc != zlib.crc32(data[4:]):
+ if crc != crc32(data[4:]):
raise ChecksumError("Message checksum failed")
(key, cur) = read_int_string(data, cur)