diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/client.py | 2 | ||||
-rw-r--r-- | kafka/codec.py | 19 | ||||
-rw-r--r-- | kafka/conn.py | 10 | ||||
-rw-r--r-- | kafka/consumer.py | 11 | ||||
-rw-r--r-- | kafka/producer.py | 14 | ||||
-rw-r--r-- | kafka/protocol.py | 13 | ||||
-rw-r--r-- | kafka/util.py | 26 |
7 files changed, 66 insertions, 29 deletions
diff --git a/kafka/client.py b/kafka/client.py index 8630f66..89664fc 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -81,7 +81,7 @@ class KafkaClient(object): """ Generate a new correlation id """ - return KafkaClient.ID_GEN.next() + return next(KafkaClient.ID_GEN) def _send_broker_unaware_request(self, requestId, request): """ diff --git a/kafka/codec.py b/kafka/codec.py index 206ddb4..2279200 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -1,8 +1,11 @@ -from cStringIO import StringIO +from io import BytesIO import gzip import struct -_XERIAL_V1_HEADER = (-126, 'S', 'N', 'A', 'P', 'P', 'Y', 0, 1, 1) +import six +from six.moves import xrange + +_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1) _XERIAL_V1_FORMAT = 'bccccccBii' try: @@ -21,7 +24,7 @@ def has_snappy(): def gzip_encode(payload): - buffer = StringIO() + buffer = BytesIO() handle = gzip.GzipFile(fileobj=buffer, mode="w") handle.write(payload) handle.close() @@ -32,7 +35,7 @@ def gzip_encode(payload): def gzip_decode(payload): - buffer = StringIO(payload) + buffer = BytesIO(payload) handle = gzip.GzipFile(fileobj=buffer, mode='r') result = handle.read() handle.close() @@ -68,9 +71,9 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024): for i in xrange(0, len(payload), xerial_blocksize): yield payload[i:i+xerial_blocksize] - out = StringIO() + out = BytesIO() - header = ''.join([struct.pack('!' + fmt, dat) for fmt, dat + header = b''.join([struct.pack('!' + fmt, dat) for fmt, dat in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)]) out.write(header) @@ -121,8 +124,8 @@ def snappy_decode(payload): if _detect_xerial_stream(payload): # TODO ? Should become a fileobj ? - out = StringIO() - byt = buffer(payload[16:]) + out = BytesIO() + byt = payload[16:] length = len(byt) cursor = 0 diff --git a/kafka/conn.py b/kafka/conn.py index a1b0a80..41cd424 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -5,6 +5,8 @@ import struct from random import shuffle from threading import local +import six + from kafka.common import ConnectionError log = logging.getLogger("kafka") @@ -19,7 +21,7 @@ def collect_hosts(hosts, randomize=True): randomize the returned list. """ - if isinstance(hosts, basestring): + if isinstance(hosts, six.string_types): hosts = hosts.strip().split(',') result = [] @@ -92,7 +94,7 @@ class KafkaConnection(local): # Receiving empty string from recv signals # that the socket is in error. we will never get # more data from this socket - if data == '': + if data == b'': raise socket.error("Not enough data to read message -- did server kill socket?") except socket.error: @@ -103,7 +105,7 @@ class KafkaConnection(local): log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes) responses.append(data) - return ''.join(responses) + return b''.join(responses) ################## # Public API # @@ -144,7 +146,7 @@ class KafkaConnection(local): # Read the remainder of the response resp = self._read_bytes(size) - return str(resp) + return resp def copy(self): """ diff --git a/kafka/consumer.py b/kafka/consumer.py index 0935dd2..7636a83 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -1,12 +1,19 @@ from __future__ import absolute_import -from itertools import izip_longest, repeat +try: + from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611 +except ImportError: # python 2 + from itertools import izip_longest as izip_longest, repeat import logging import time import numbers from threading import Lock from multiprocessing import Process, Queue as MPQueue, Event, Value -from Queue import Empty, Queue + +try: + from Queue import Empty, Queue +except ImportError: # python 2 + from queue import Empty, Queue import kafka from kafka.common import ( diff --git a/kafka/producer.py b/kafka/producer.py index 8a6bff0..e1f4a22 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -4,11 +4,17 @@ import logging import time import random -from Queue import Empty +try: + from queue import Empty +except ImportError: + from Queue import Empty from collections import defaultdict from itertools import cycle from multiprocessing import Queue, Process +import six +from six.moves import xrange + from kafka.common import ( ProduceRequest, TopicAndPartition, UnsupportedCodecError, UnknownTopicOrPartitionError ) @@ -172,8 +178,8 @@ class Producer(object): if not isinstance(msg, (list, tuple)): raise TypeError("msg is not a list or tuple!") - # Raise TypeError if any message is not encoded as a str - if any(not isinstance(m, str) for m in msg): + # Raise TypeError if any message is not encoded as bytes + if any(not isinstance(m, six.binary_type) for m in msg): raise TypeError("all produce message payloads must be type str") if self.async: @@ -221,7 +227,7 @@ class SimpleProducer(Producer): batch_send_every_t - If set, messages are send after this timeout random_start - If true, randomize the initial partition which the the first message block will be published to, otherwise - if false, the first message block will always publish + if false, the first message block will always publish to partition 0 before cycling through each partition """ def __init__(self, client, async=False, diff --git a/kafka/protocol.py b/kafka/protocol.py index 58661c7..a9475c3 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -1,6 +1,9 @@ import logging import struct -import zlib + +import six + +from six.moves import xrange from kafka.codec import ( gzip_encode, gzip_decode, snappy_encode, snappy_decode @@ -13,7 +16,7 @@ from kafka.common import ( UnsupportedCodecError ) from kafka.util import ( - read_short_string, read_int_string, relative_unpack, + crc32, read_short_string, read_int_string, relative_unpack, write_short_string, write_int_string, group_by_topic_and_partition ) @@ -67,7 +70,7 @@ class KafkaProtocol(object): Offset => int64 MessageSize => int32 """ - message_set = "" + message_set = b"" for message in messages: encoded_message = KafkaProtocol._encode_message(message) message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message) @@ -94,7 +97,7 @@ class KafkaProtocol(object): msg = struct.pack('>BB', message.magic, message.attributes) msg += write_int_string(message.key) msg += write_int_string(message.value) - crc = zlib.crc32(msg) + crc = crc32(msg) msg = struct.pack('>i%ds' % len(msg), crc, msg) else: raise ProtocolError("Unexpected magic number: %d" % message.magic) @@ -146,7 +149,7 @@ class KafkaProtocol(object): of the MessageSet payload). """ ((crc, magic, att), cur) = relative_unpack('>iBB', data, 0) - if crc != zlib.crc32(data[4:]): + if crc != crc32(data[4:]): raise ChecksumError("Message checksum failed") (key, cur) = read_int_string(data, cur) diff --git a/kafka/util.py b/kafka/util.py index 9121374..a4a0174 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,14 +1,30 @@ import collections import struct import sys +import zlib from threading import Thread, Event +import six + from kafka.common import BufferUnderflowError +def crc32(data): + """ + Python 2 returns a value in the range [-2**31, 2**31-1]. + Python 3 returns a value in the range [0, 2**32-1]. + + We want a consistent behavior so let's use python2's. + """ + crc = zlib.crc32(data) + if six.PY3 and crc > 2**31: + crc -= 2 ** 32 + return crc + + def write_int_string(s): - if s is not None and not isinstance(s, str): - raise TypeError('Expected "%s" to be str\n' + if s is not None and not isinstance(s, six.binary_type): + raise TypeError('Expected "%s" to be bytes\n' 'data=%s' % (type(s), repr(s))) if s is None: return struct.pack('>i', -1) @@ -17,12 +33,12 @@ def write_int_string(s): def write_short_string(s): - if s is not None and not isinstance(s, str): - raise TypeError('Expected "%s" to be str\n' + if s is not None and not isinstance(s, six.binary_type): + raise TypeError('Expected "%s" to be bytes\n' 'data=%s' % (type(s), repr(s))) if s is None: return struct.pack('>h', -1) - elif len(s) > 32767 and sys.version < (2, 7): + elif len(s) > 32767 and sys.version_info < (2, 7): # Python 2.6 issues a deprecation warning instead of a struct error raise struct.error(len(s)) else: |