diff options
-rw-r--r-- | kafka/protocol/legacy.py | 4 | ||||
-rw-r--r-- | kafka/util.py | 8 |
2 files changed, 9 insertions, 3 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index d7ac50a..db9f3e0 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -109,7 +109,7 @@ class KafkaProtocol(object): write_int_string(message.value) ]) crc = crc32(msg) - msg = struct.pack('>I%ds' % len(msg), crc, msg) + msg = struct.pack('>i%ds' % len(msg), crc, msg) else: raise ProtocolError("Unexpected magic number: %d" % message.magic) return msg @@ -159,7 +159,7 @@ class KafkaProtocol(object): The offset is actually read from decode_message_set_iter (it is part of the MessageSet payload). """ - ((crc, magic, att), cur) = relative_unpack('>IBB', data, 0) + ((crc, magic, att), cur) = relative_unpack('>iBB', data, 0) if crc != crc32(data[4:]): raise ChecksumError("Message checksum failed") diff --git a/kafka/util.py b/kafka/util.py index 6d9d307..e95d51d 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -10,7 +10,13 @@ from kafka.common import BufferUnderflowError def crc32(data): - return binascii.crc32(data) & 0xffffffff + crc = binascii.crc32(data) + # py2 and py3 behave a little differently + # CRC is encoded as a signed int in kafka protocol + # so we'll convert the py3 unsigned result to signed + if six.PY3 and crc >= 2**31: + crc -= 2**32 + return crc def write_int_string(s): |