from collections import namedtuple from cStringIO import StringIO import logging import gzip import select import socket import struct import zlib log = logging.getLogger("org.apache.kafka") error_codes = { -1: "UnknownError", 0: None, 1: "OffsetOutOfRange", 2: "InvalidMessage", 3: "WrongPartition", 4: "InvalidFetchSize" } class KafkaException(Exception): def __init__(self, errorType): self.errorType = errorType def __str__(self): return str(errorType) Message = namedtuple("Message", ["magic", "attributes", "crc", "payload"]) FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "size"]) ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"]) OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "maxOffsets"]) def gzip_encode(payload): buf = StringIO() f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6) f.write(payload) f.close() buf.seek(0) out = buf.read() buf.close() return out def gzip_decode(payload): buf = StringIO(payload) f = gzip.GzipFile(fileobj=buf, mode='r') out = f.read() f.close() buf.close() return out def length_prefix_message(msg): """ Prefix a message with it's length as an int """ return struct.pack('>i', len(msg)) + msg class KafkaClient(object): """ Request Structure ================= ::= ::= ::= 0 | 1 | 2 | 3 | 4 ::= | | | | Response Structure ================== ::= ::= ::= -1 | 0 | 1 | 2 | 3 | 4 ::= | | | | Messages are big-endian byte order """ PRODUCE_KEY = 0 FETCH_KEY = 1 MULTIFETCH_KEY = 2 MULTIPRODUCE_KEY = 3 OFFSET_KEY = 4 ATTRIBUTE_CODEC_MASK = 0x03 def __init__(self, host, port, bufsize=1024): self.host = host self.port = port self.bufsize = bufsize self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((host, port)) self._sock.settimeout(10) log.debug("Connected to %s on %d", host, port) ###################### # Protocol Stuff # ###################### def _consume_response_iter(self): """ This method handles the response header and error messages. It then returns an iterator for the chunks of the response """ log.debug("Handling response from Kafka") # Header resp = self._sock.recv(6) if resp == "": raise Exception("Got no response from Kafka") (size, err) = struct.unpack('>iH', resp) log.debug("About to read %d bytes from Kafka", size-2) # Handle error error = error_codes.get(err) if error is not None: raise KafkaException(error) # Response iterator total = 0 while total < (size-2): resp = self._sock.recv(self.bufsize) log.debug("Read %d bytes from Kafka", len(resp)) if resp == "": raise Exception("Underflow") total += len(resp) yield resp def _consume_response(self): """ Fully consumer the response iterator """ data = "" for chunk in self._consume_response_iter(): data += chunk return data @classmethod def encode_message(cls, message): """ Encode a Message from a Message tuple Params ====== message: Message Wire Format =========== ::= | ::= 0 ::= 1 ::= ::= ::= ::= ::= ::= The crc is a crc32 checksum of the message payload. The attributes are bitmask used for indicating the compression algorithm. """ if message.magic == 0: msg = struct.pack('>Bi%ds' % len(message.payload), message.magic, message.crc, message.payload) elif message.magic == 1: msg = struct.pack('>BBi%ds' % len(message.payload), message.magic, message.attributes, message.crc, message.payload) else: raise Exception("Unexpected magic number: %d" % message.magic) msg = length_prefix_message(msg) log.debug("Encoded %s as %r" % (message, msg)) return msg @classmethod def encode_message_set(cls, messages): """ Encode a MessageSet One or more concatenated Messages """ message_set = "" for message in messages: encoded_message = cls.encode_message(message) message_set += encoded_message return message_set @classmethod def encode_produce_request(cls, produceRequest): """ Encode a ProduceRequest Wire Format =========== ::= ::= 0 ::= ::= ::= ::= The request-key (0) is encoded as a short (int16). len is the length of the proceeding MessageSet """ (topic, partition, messages) = produceRequest message_set = cls.encode_message_set(messages) log.debug("Sending MessageSet: %r" % message_set) req = struct.pack('>HH%dsii%ds' % (len(topic), len(message_set)), KafkaClient.PRODUCE_KEY, len(topic), topic, partition, len(message_set), message_set) return req @classmethod def encode_multi_produce_request(cls, produceRequests): """ Encode a MultiProducerRequest Params ====== produceRequest: list of ProduceRequest objects Returns ======= Encoded request Wire Format =========== ::= ::= ::= [ ] ::= ::= ::= ::= ::= num is the number of ProduceRequests being encoded """ req = struct.pack('>HH', KafkaClient.MULTIPRODUCE_KEY, len(produceRequests)) for (topic, partition, messages) in produceRequests: message_set = cls.encode_message_set(messages) req += struct.pack('>H%dsii%ds' % (len(topic), len(message_set)), len(topic), topic, partition, len(message_set), message_set) return req @classmethod def encode_fetch_request(cls, fetchRequest): """ Encode a FetchRequest message Wire Format =========== ::= ::= 1 ::= ::= ::= ::= ::= The request-key (1) is encoded as a short (int16). """ (topic, partition, offset, size) = fetchRequest req = struct.pack('>HH%dsiqi' % len(topic), KafkaClient.FETCH_KEY, len(topic), topic, partition, offset, size) return req @classmethod def encode_multi_fetch_request(cls, fetchRequests): """ Encode the MultiFetchRequest message from a list of FetchRequest objects Params ====== fetchRequests: list of FetchRequest Returns ======= req: bytes, The message to send to Kafka Wire Format =========== ::= [ ] ::= 2 ::= ::= [ ] ::= ::= ::= ::= ::= ::= The request-key (2) is encoded as a short (int16). """ req = struct.pack('>HH', KafkaClient.MULTIFETCH_KEY, len(fetchRequests)) for (topic, partition, offset, size) in fetchRequests: req += struct.pack('>H%dsiqi' % len(topic), len(topic), topic, partition, offset, size) return req @classmethod def encode_offset_request(cls, offsetRequest): """ Encode an OffsetRequest message Wire Format =========== ::=