summaryrefslogtreecommitdiff
path: root/kafka/protocol.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-07 18:52:05 -0700
committerDana Powers <dana.powers@rd.io>2014-09-07 19:09:32 -0700
commit715425c639a476139065689afde3d255a07d6f96 (patch)
tree0ef2cd875c97c8ca867d89328d6fd5fec7dfcbe8 /kafka/protocol.py
parenta99384f4c601d127ab1c4fe5b272ea5c07fd695d (diff)
parentbe23042ecd9ab330886745ccc9ec9e3a0039836f (diff)
downloadkafka-python-715425c639a476139065689afde3d255a07d6f96.tar.gz
Merge pull request #227 from wizzat-feature/py3
Python 3 Support Conflicts: kafka/producer.py test/test_client.py test/test_client_integration.py test/test_codec.py test/test_consumer.py test/test_consumer_integration.py test/test_failover_integration.py test/test_producer.py test/test_producer_integration.py test/test_protocol.py test/test_util.py
Diffstat (limited to 'kafka/protocol.py')
-rw-r--r--kafka/protocol.py17
1 files changed, 10 insertions, 7 deletions
diff --git a/kafka/protocol.py b/kafka/protocol.py
index 58661c7..e5356c5 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -1,6 +1,9 @@
import logging
import struct
-import zlib
+
+import six
+
+from six.moves import xrange
from kafka.codec import (
gzip_encode, gzip_decode, snappy_encode, snappy_decode
@@ -13,7 +16,7 @@ from kafka.common import (
UnsupportedCodecError
)
from kafka.util import (
- read_short_string, read_int_string, relative_unpack,
+ crc32, read_short_string, read_int_string, relative_unpack,
write_short_string, write_int_string, group_by_topic_and_partition
)
@@ -67,7 +70,7 @@ class KafkaProtocol(object):
Offset => int64
MessageSize => int32
"""
- message_set = ""
+ message_set = b""
for message in messages:
encoded_message = KafkaProtocol._encode_message(message)
message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message)
@@ -94,8 +97,8 @@ class KafkaProtocol(object):
msg = struct.pack('>BB', message.magic, message.attributes)
msg += write_int_string(message.key)
msg += write_int_string(message.value)
- crc = zlib.crc32(msg)
- msg = struct.pack('>i%ds' % len(msg), crc, msg)
+ crc = crc32(msg)
+ msg = struct.pack('>I%ds' % len(msg), crc, msg)
else:
raise ProtocolError("Unexpected magic number: %d" % message.magic)
return msg
@@ -145,8 +148,8 @@ class KafkaProtocol(object):
The offset is actually read from decode_message_set_iter (it is part
of the MessageSet payload).
"""
- ((crc, magic, att), cur) = relative_unpack('>iBB', data, 0)
- if crc != zlib.crc32(data[4:]):
+ ((crc, magic, att), cur) = relative_unpack('>IBB', data, 0)
+ if crc != crc32(data[4:]):
raise ChecksumError("Message checksum failed")
(key, cur) = read_int_string(data, cur)