diff options
Diffstat (limited to 'kafka/protocol.py')
-rw-r--r-- | kafka/protocol.py | 13 |
1 files changed, 8 insertions, 5 deletions
diff --git a/kafka/protocol.py b/kafka/protocol.py index 58661c7..a9475c3 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -1,6 +1,9 @@ import logging import struct -import zlib + +import six + +from six.moves import xrange from kafka.codec import ( gzip_encode, gzip_decode, snappy_encode, snappy_decode @@ -13,7 +16,7 @@ from kafka.common import ( UnsupportedCodecError ) from kafka.util import ( - read_short_string, read_int_string, relative_unpack, + crc32, read_short_string, read_int_string, relative_unpack, write_short_string, write_int_string, group_by_topic_and_partition ) @@ -67,7 +70,7 @@ class KafkaProtocol(object): Offset => int64 MessageSize => int32 """ - message_set = "" + message_set = b"" for message in messages: encoded_message = KafkaProtocol._encode_message(message) message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message) @@ -94,7 +97,7 @@ class KafkaProtocol(object): msg = struct.pack('>BB', message.magic, message.attributes) msg += write_int_string(message.key) msg += write_int_string(message.value) - crc = zlib.crc32(msg) + crc = crc32(msg) msg = struct.pack('>i%ds' % len(msg), crc, msg) else: raise ProtocolError("Unexpected magic number: %d" % message.magic) @@ -146,7 +149,7 @@ class KafkaProtocol(object): of the MessageSet payload). """ ((crc, magic, att), cur) = relative_unpack('>iBB', data, 0) - if crc != zlib.crc32(data[4:]): + if crc != crc32(data[4:]): raise ChecksumError("Message checksum failed") (key, cur) = read_int_string(data, cur) |