summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client.py2
-rw-r--r--kafka/codec.py19
-rw-r--r--kafka/conn.py10
-rw-r--r--kafka/consumer.py11
-rw-r--r--kafka/producer.py14
-rw-r--r--kafka/protocol.py13
-rw-r--r--kafka/util.py26
7 files changed, 66 insertions, 29 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 8630f66..89664fc 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -81,7 +81,7 @@ class KafkaClient(object):
"""
Generate a new correlation id
"""
- return KafkaClient.ID_GEN.next()
+ return next(KafkaClient.ID_GEN)
def _send_broker_unaware_request(self, requestId, request):
"""
diff --git a/kafka/codec.py b/kafka/codec.py
index 206ddb4..2279200 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -1,8 +1,11 @@
-from cStringIO import StringIO
+from io import BytesIO
import gzip
import struct
-_XERIAL_V1_HEADER = (-126, 'S', 'N', 'A', 'P', 'P', 'Y', 0, 1, 1)
+import six
+from six.moves import xrange
+
+_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
_XERIAL_V1_FORMAT = 'bccccccBii'
try:
@@ -21,7 +24,7 @@ def has_snappy():
def gzip_encode(payload):
- buffer = StringIO()
+ buffer = BytesIO()
handle = gzip.GzipFile(fileobj=buffer, mode="w")
handle.write(payload)
handle.close()
@@ -32,7 +35,7 @@ def gzip_encode(payload):
def gzip_decode(payload):
- buffer = StringIO(payload)
+ buffer = BytesIO(payload)
handle = gzip.GzipFile(fileobj=buffer, mode='r')
result = handle.read()
handle.close()
@@ -68,9 +71,9 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
for i in xrange(0, len(payload), xerial_blocksize):
yield payload[i:i+xerial_blocksize]
- out = StringIO()
+ out = BytesIO()
- header = ''.join([struct.pack('!' + fmt, dat) for fmt, dat
+ header = b''.join([struct.pack('!' + fmt, dat) for fmt, dat
in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])
out.write(header)
@@ -121,8 +124,8 @@ def snappy_decode(payload):
if _detect_xerial_stream(payload):
# TODO ? Should become a fileobj ?
- out = StringIO()
- byt = buffer(payload[16:])
+ out = BytesIO()
+ byt = payload[16:]
length = len(byt)
cursor = 0
diff --git a/kafka/conn.py b/kafka/conn.py
index a1b0a80..41cd424 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -5,6 +5,8 @@ import struct
from random import shuffle
from threading import local
+import six
+
from kafka.common import ConnectionError
log = logging.getLogger("kafka")
@@ -19,7 +21,7 @@ def collect_hosts(hosts, randomize=True):
randomize the returned list.
"""
- if isinstance(hosts, basestring):
+ if isinstance(hosts, six.string_types):
hosts = hosts.strip().split(',')
result = []
@@ -92,7 +94,7 @@ class KafkaConnection(local):
# Receiving empty string from recv signals
# that the socket is in error. we will never get
# more data from this socket
- if data == '':
+ if data == b'':
raise socket.error("Not enough data to read message -- did server kill socket?")
except socket.error:
@@ -103,7 +105,7 @@ class KafkaConnection(local):
log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
responses.append(data)
- return ''.join(responses)
+ return b''.join(responses)
##################
# Public API #
@@ -144,7 +146,7 @@ class KafkaConnection(local):
# Read the remainder of the response
resp = self._read_bytes(size)
- return str(resp)
+ return resp
def copy(self):
"""
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 0935dd2..7636a83 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -1,12 +1,19 @@
from __future__ import absolute_import
-from itertools import izip_longest, repeat
+try:
+ from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611
+except ImportError: # python 2
+ from itertools import izip_longest as izip_longest, repeat
import logging
import time
import numbers
from threading import Lock
from multiprocessing import Process, Queue as MPQueue, Event, Value
-from Queue import Empty, Queue
+
+try:
+ from Queue import Empty, Queue
+except ImportError: # python 2
+ from queue import Empty, Queue
import kafka
from kafka.common import (
diff --git a/kafka/producer.py b/kafka/producer.py
index 8a6bff0..e1f4a22 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -4,11 +4,17 @@ import logging
import time
import random
-from Queue import Empty
+try:
+ from queue import Empty
+except ImportError:
+ from Queue import Empty
from collections import defaultdict
from itertools import cycle
from multiprocessing import Queue, Process
+import six
+from six.moves import xrange
+
from kafka.common import (
ProduceRequest, TopicAndPartition, UnsupportedCodecError, UnknownTopicOrPartitionError
)
@@ -172,8 +178,8 @@ class Producer(object):
if not isinstance(msg, (list, tuple)):
raise TypeError("msg is not a list or tuple!")
- # Raise TypeError if any message is not encoded as a str
- if any(not isinstance(m, str) for m in msg):
+ # Raise TypeError if any message is not encoded as bytes
+ if any(not isinstance(m, six.binary_type) for m in msg):
raise TypeError("all produce message payloads must be type str")
if self.async:
@@ -221,7 +227,7 @@ class SimpleProducer(Producer):
batch_send_every_t - If set, messages are send after this timeout
random_start - If true, randomize the initial partition which the
the first message block will be published to, otherwise
- if false, the first message block will always publish
+ if false, the first message block will always publish
to partition 0 before cycling through each partition
"""
def __init__(self, client, async=False,
diff --git a/kafka/protocol.py b/kafka/protocol.py
index 58661c7..a9475c3 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -1,6 +1,9 @@
import logging
import struct
-import zlib
+
+import six
+
+from six.moves import xrange
from kafka.codec import (
gzip_encode, gzip_decode, snappy_encode, snappy_decode
@@ -13,7 +16,7 @@ from kafka.common import (
UnsupportedCodecError
)
from kafka.util import (
- read_short_string, read_int_string, relative_unpack,
+ crc32, read_short_string, read_int_string, relative_unpack,
write_short_string, write_int_string, group_by_topic_and_partition
)
@@ -67,7 +70,7 @@ class KafkaProtocol(object):
Offset => int64
MessageSize => int32
"""
- message_set = ""
+ message_set = b""
for message in messages:
encoded_message = KafkaProtocol._encode_message(message)
message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message)
@@ -94,7 +97,7 @@ class KafkaProtocol(object):
msg = struct.pack('>BB', message.magic, message.attributes)
msg += write_int_string(message.key)
msg += write_int_string(message.value)
- crc = zlib.crc32(msg)
+ crc = crc32(msg)
msg = struct.pack('>i%ds' % len(msg), crc, msg)
else:
raise ProtocolError("Unexpected magic number: %d" % message.magic)
@@ -146,7 +149,7 @@ class KafkaProtocol(object):
of the MessageSet payload).
"""
((crc, magic, att), cur) = relative_unpack('>iBB', data, 0)
- if crc != zlib.crc32(data[4:]):
+ if crc != crc32(data[4:]):
raise ChecksumError("Message checksum failed")
(key, cur) = read_int_string(data, cur)
diff --git a/kafka/util.py b/kafka/util.py
index 9121374..a4a0174 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -1,14 +1,30 @@
import collections
import struct
import sys
+import zlib
from threading import Thread, Event
+import six
+
from kafka.common import BufferUnderflowError
+def crc32(data):
+ """
+ Python 2 returns a value in the range [-2**31, 2**31-1].
+ Python 3 returns a value in the range [0, 2**32-1].
+
+ We want a consistent behavior so let's use python2's.
+ """
+ crc = zlib.crc32(data)
+ if six.PY3 and crc > 2**31:
+ crc -= 2 ** 32
+ return crc
+
+
def write_int_string(s):
- if s is not None and not isinstance(s, str):
- raise TypeError('Expected "%s" to be str\n'
+ if s is not None and not isinstance(s, six.binary_type):
+ raise TypeError('Expected "%s" to be bytes\n'
'data=%s' % (type(s), repr(s)))
if s is None:
return struct.pack('>i', -1)
@@ -17,12 +33,12 @@ def write_int_string(s):
def write_short_string(s):
- if s is not None and not isinstance(s, str):
- raise TypeError('Expected "%s" to be str\n'
+ if s is not None and not isinstance(s, six.binary_type):
+ raise TypeError('Expected "%s" to be bytes\n'
'data=%s' % (type(s), repr(s)))
if s is None:
return struct.pack('>h', -1)
- elif len(s) > 32767 and sys.version < (2, 7):
+ elif len(s) > 32767 and sys.version_info < (2, 7):
# Python 2.6 issues a deprecation warning instead of a struct error
raise struct.error(len(s))
else: