diff options
Diffstat (limited to 'kafka/protocol.py')
-rw-r--r-- | kafka/protocol.py | 17 |
1 files changed, 10 insertions, 7 deletions
diff --git a/kafka/protocol.py b/kafka/protocol.py index 58661c7..e5356c5 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -1,6 +1,9 @@ import logging import struct -import zlib + +import six + +from six.moves import xrange from kafka.codec import ( gzip_encode, gzip_decode, snappy_encode, snappy_decode @@ -13,7 +16,7 @@ from kafka.common import ( UnsupportedCodecError ) from kafka.util import ( - read_short_string, read_int_string, relative_unpack, + crc32, read_short_string, read_int_string, relative_unpack, write_short_string, write_int_string, group_by_topic_and_partition ) @@ -67,7 +70,7 @@ class KafkaProtocol(object): Offset => int64 MessageSize => int32 """ - message_set = "" + message_set = b"" for message in messages: encoded_message = KafkaProtocol._encode_message(message) message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message) @@ -94,8 +97,8 @@ class KafkaProtocol(object): msg = struct.pack('>BB', message.magic, message.attributes) msg += write_int_string(message.key) msg += write_int_string(message.value) - crc = zlib.crc32(msg) - msg = struct.pack('>i%ds' % len(msg), crc, msg) + crc = crc32(msg) + msg = struct.pack('>I%ds' % len(msg), crc, msg) else: raise ProtocolError("Unexpected magic number: %d" % message.magic) return msg @@ -145,8 +148,8 @@ class KafkaProtocol(object): The offset is actually read from decode_message_set_iter (it is part of the MessageSet payload). """ - ((crc, magic, att), cur) = relative_unpack('>iBB', data, 0) - if crc != zlib.crc32(data[4:]): + ((crc, magic, att), cur) = relative_unpack('>IBB', data, 0) + if crc != crc32(data[4:]): raise ChecksumError("Message checksum failed") (key, cur) = read_int_string(data, cur) |