from collections import namedtuple import logging import select import socket import struct import zlib log = logging.getLogger("org.apache.kafka") def length_prefix_message(msg): """ Prefix a message with it's length as an int """ return struct.pack('>i', len(msg)) + msg def create_message_from_string(payload): return Message(1, 0, zlib.crc32(payload), payload) error_codes = { -1: "UnknownError", 0: None, 1: "OffsetOutOfRange", 2: "InvalidMessage", 3: "WrongPartition", 4: "InvalidFetchSize" } class KafkaException(Exception): def __init__(self, errorType): self.errorType = errorType def __str__(self): return str(errorType) Message = namedtuple("Message", ["magic", "attributes", "crc", "payload"]) FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "size"]) ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"]) OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "maxOffsets"]) class KafkaClient(object): """ Request Structure ================= ::= ::= ::= 0 | 1 | 2 | 3 | 4 ::= | | | | Response Structure ================== ::= ::= ::= -1 | 0 | 1 | 2 | 3 | 4 ::= | | | | Messages are big-endian byte order """ PRODUCE_KEY = 0 FETCH_KEY = 1 MULTIFETCH_KEY = 2 MULTIPRODUCE_KEY = 3 OFFSET_KEY = 4 def __init__(self, host, port): self.host = host self.port = port self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((host, port)) log.debug("Connected to %s on %d", host, port) ###################### # Protocol Stuff # ###################### def _consume_response_iter(self): """ This method handles the response header and error messages. It then returns an iterator for the chunks of the response """ log.debug("Handling response from Kafka") # Header resp = self._sock.recv(6) if resp == "": raise Exception("Got no response from Kafka") (size, err) = struct.unpack('>iH', resp) log.debug("About to read %d bytes from Kafka", size-2) # Handle error error = error_codes.get(err) if error is not None: raise KafkaException(error) # Response iterator total = 0 while total < (size-2): resp = self._sock.recv(1024) log.debug("Read %d bytes from Kafka", len(resp)) if resp == "": raise Exception("Underflow") total += len(resp) yield resp def _consume_response(self): """ Fully consumer the response iterator """ data = "" for chunk in self._consume_response_iter(): data += chunk return data def create_message(self, message): """ Create a Message from a Message tuple Params ====== message: Message Wire Format =========== ::= | ::= 0 ::= 1 ::= ::= ::= ::= ::= ::= The crc is a crc32 checksum of the message payload. The attributes are bitmask used for indicating the compression algorithm. """ if message.magic == 0: return struct.pack('>Bi%ds' % len(message.payload), message.magic, message.crc, message.payload) elif message.magic == 1: return struct.pack('>BBi%ds' % len(message.payload), message.magic, message.attributes, message.crc, message.payload) else: raise Exception("Unknown message version: %d" % message.magic) def create_message_set(self, messages): message_set = "" for message in messages: encoded_message = self.create_message(message) message_set += length_prefix_message(encoded_message) return message_set def create_produce_request(self, produceRequest): """ Create a ProduceRequest Wire Format =========== ::= ::= 0 ::= ::= ::= ::= The request-key (0) is encoded as a short (int16). len is the length of the proceeding MessageSet """ (topic, partition, messages) = produceRequest message_set = self.create_message_set(messages) req = struct.pack('>HH%dsii%ds' % (len(topic), len(message_set)), KafkaClient.PRODUCE_KEY, len(topic), topic, partition, len(message_set), message_set) return req def create_multi_produce_request(self, produceRequests): req = struct.pack('>HH', KafkaClient.MULTIPRODUCE_KEY, len(produceRequests)) for (topic, partition, messages) in produceRequests: message_set = self.create_message_set(messages) req += struct.pack('>H%dsii%ds' % (len(topic), len(message_set)), len(topic), topic, partition, len(message_set), message_set) return req def create_fetch_request(self, fetchRequest): """ Create a FetchRequest message Wire Format =========== ::= ::= 1 ::= ::= ::= ::= ::= The request-key (1) is encoded as a short (int16). """ (topic, partition, offset, size) = fetchRequest req = struct.pack('>HH%dsiqi' % len(topic), KafkaClient.FETCH_KEY, len(topic), topic, partition, offset, size) return req def create_multi_fetch_request(self, fetchRequests): """ Create the MultiFetchRequest message from a list of FetchRequest objects Params ====== fetchRequests: list of FetchRequest Returns ======= req: bytes, The message to send to Kafka Wire Format =========== ::= [ ] ::= 2 ::= ::= [ ] ::= ::= ::= ::= ::= ::= The request-key (2) is encoded as a short (int16). """ req = struct.pack('>HH', KafkaClient.MULTIFETCH_KEY, len(fetchRequests)) for (topic, partition, offset, size) in fetchRequests: req += struct.pack('>H%dsiqi' % len(topic), len(topic), topic, partition, offset, size) return req def create_offset_request(self, offsetRequest): """ Create an OffsetRequest message Wire Format =========== ::=