summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-11-28 19:34:37 +0800
committerZack Dever <zack.dever@rd.io>2015-12-04 11:25:39 -0800
commitf08775a6198cd16a7bc9ec93ffd057f65064ec54 (patch)
treee46e8df3d16e7baaf82a5d3dc260499955a7d39b
parentdc94b5fe9f3f93bf6f2235d7f65c62fcf0a2a996 (diff)
downloadkafka-python-f08775a6198cd16a7bc9ec93ffd057f65064ec54.tar.gz
Switch crc32 back to signed integer -- this is consistent with protocol encoding spec
-rw-r--r--kafka/protocol/legacy.py4
-rw-r--r--kafka/util.py8
2 files changed, 9 insertions, 3 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index d7ac50a..db9f3e0 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -109,7 +109,7 @@ class KafkaProtocol(object):
write_int_string(message.value)
])
crc = crc32(msg)
- msg = struct.pack('>I%ds' % len(msg), crc, msg)
+ msg = struct.pack('>i%ds' % len(msg), crc, msg)
else:
raise ProtocolError("Unexpected magic number: %d" % message.magic)
return msg
@@ -159,7 +159,7 @@ class KafkaProtocol(object):
The offset is actually read from decode_message_set_iter (it is part
of the MessageSet payload).
"""
- ((crc, magic, att), cur) = relative_unpack('>IBB', data, 0)
+ ((crc, magic, att), cur) = relative_unpack('>iBB', data, 0)
if crc != crc32(data[4:]):
raise ChecksumError("Message checksum failed")
diff --git a/kafka/util.py b/kafka/util.py
index 6d9d307..e95d51d 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -10,7 +10,13 @@ from kafka.common import BufferUnderflowError
def crc32(data):
- return binascii.crc32(data) & 0xffffffff
+ crc = binascii.crc32(data)
+ # py2 and py3 behave a little differently
+ # CRC is encoded as a signed int in kafka protocol
+ # so we'll convert the py3 unsigned result to signed
+ if six.PY3 and crc >= 2**31:
+ crc -= 2**32
+ return crc
def write_int_string(s):