diff options
Diffstat (limited to 'kafka.py')
-rw-r--r-- | kafka.py | 211 |
1 files changed, 147 insertions, 64 deletions
@@ -1,5 +1,7 @@ from collections import namedtuple +from cStringIO import StringIO import logging +import gzip import select import socket import struct @@ -7,15 +9,6 @@ import zlib log = logging.getLogger("org.apache.kafka") -def length_prefix_message(msg): - """ - Prefix a message with it's length as an int - """ - return struct.pack('>i', len(msg)) + msg - - -def create_message_from_string(payload): - return Message(1, 0, zlib.crc32(payload), payload) error_codes = { -1: "UnknownError", @@ -38,6 +31,30 @@ FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "size ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"]) OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "maxOffsets"]) +def gzip_compress(payload): + buf = StringIO() + f = gzip.GzipFile(fileobj=buf, mode='w') + f.write(payload) + f.close() + buf.seek(0) + out = buf.read() + buf.close() + return out + +def gzip_decompress(payload): + buf = StringIO(payload) + f = gzip.GzipFile(fileobj=buf, mode='r') + out = f.read() + f.close() + buf.close() + return out + +def length_prefix_message(msg): + """ + Prefix a message with it's length as an int + """ + return struct.pack('>i', len(msg)) + msg + class KafkaClient(object): """ Request Structure @@ -65,11 +82,14 @@ class KafkaClient(object): MULTIPRODUCE_KEY = 3 OFFSET_KEY = 4 + ATTRIBUTE_CODEC_MASK = 0x03 + def __init__(self, host, port): self.host = host self.port = port self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((host, port)) + self._sock.settimeout(10) log.debug("Connected to %s on %d", host, port) ###################### @@ -113,10 +133,9 @@ class KafkaClient(object): data += chunk return data - - def create_message(self, message): + def encode_message(self, message): """ - Create a Message from a Message tuple + Encode a Message from a Message tuple Params ====== @@ -138,24 +157,28 @@ class KafkaClient(object): used for indicating the compression algorithm. """ if message.magic == 0: - return struct.pack('>Bi%ds' % len(message.payload), + msg = struct.pack('>Bi%ds' % len(message.payload), message.magic, message.crc, message.payload) elif message.magic == 1: - return struct.pack('>BBi%ds' % len(message.payload), + msg = struct.pack('>BBi%ds' % len(message.payload), message.magic, message.attributes, message.crc, message.payload) else: raise Exception("Unknown message version: %d" % message.magic) + msg = length_prefix_message(msg) + log.debug("Encoded %s as %r" % (message, msg)) + return msg - def create_message_set(self, messages): + def encode_message_set(self, messages): + # TODO document message_set = "" for message in messages: - encoded_message = self.create_message(message) - message_set += length_prefix_message(encoded_message) + encoded_message = self.encode_message(message) + message_set += encoded_message return message_set - def create_produce_request(self, produceRequest): + def encode_produce_request(self, produceRequest): """ - Create a ProduceRequest + Encode a ProduceRequest Wire Format =========== @@ -169,22 +192,24 @@ class KafkaClient(object): The request-key (0) is encoded as a short (int16). len is the length of the proceeding MessageSet """ (topic, partition, messages) = produceRequest - message_set = self.create_message_set(messages) + message_set = self.encode_message_set(messages) + log.debug("Sending MessageSet: %r" % message_set) req = struct.pack('>HH%dsii%ds' % (len(topic), len(message_set)), KafkaClient.PRODUCE_KEY, len(topic), topic, partition, len(message_set), message_set) return req - def create_multi_produce_request(self, produceRequests): + def encode_multi_produce_request(self, produceRequests): + # TODO document req = struct.pack('>HH', KafkaClient.MULTIPRODUCE_KEY, len(produceRequests)) for (topic, partition, messages) in produceRequests: - message_set = self.create_message_set(messages) + message_set = self.encode_message_set(messages) req += struct.pack('>H%dsii%ds' % (len(topic), len(message_set)), len(topic), topic, partition, len(message_set), message_set) return req - def create_fetch_request(self, fetchRequest): + def encode_fetch_request(self, fetchRequest): """ - Create a FetchRequest message + Encode a FetchRequest message Wire Format =========== @@ -203,9 +228,9 @@ class KafkaClient(object): KafkaClient.FETCH_KEY, len(topic), topic, partition, offset, size) return req - def create_multi_fetch_request(self, fetchRequests): + def encode_multi_fetch_request(self, fetchRequests): """ - Create the MultiFetchRequest message from a list of FetchRequest objects + Encode the MultiFetchRequest message from a list of FetchRequest objects Params ====== @@ -235,9 +260,9 @@ class KafkaClient(object): req += struct.pack('>H%dsiqi' % len(topic), len(topic), topic, partition, offset, size) return req - def create_offset_request(self, offsetRequest): + def encode_offset_request(self, offsetRequest): """ - Create an OffsetRequest message + Encode an OffsetRequest message Wire Format =========== @@ -256,10 +281,58 @@ class KafkaClient(object): req = struct.pack('>HH%dsiqi' % len(topic), KafkaClient.OFFSET_KEY, len(topic), topic, partition, offset, maxOffsets) return req + def decode_message(self, data): + """ + Decode a Message + + Since a Message can actually contained a compressed payload of multiple nested Messages, + this method returns a generator. + """ + # TODO document + N = len(data) + (magic,) = struct.unpack('>B', data[0:1]) + if magic == 0: # v0 Message + # Read crc; check the crc; append the message + (crc,) = struct.unpack('>i', data[1:5]) + payload = data[5:N] + assert zlib.crc32(payload) == crc + msg = Message(magic, None, crc, payload) + log.debug("Got v0 Message, %s", msg) + yield msg + elif magic == 1: # v1 Message + # Read attributes, crc; check the crc; append the message + (att, crc) = struct.unpack('>Bi', data[1:6]) + payload = data[6:N] + assert zlib.crc32(payload) == crc + # Uncompressed, just a single Message + if att & KafkaClient.ATTRIBUTE_CODEC_MASK == 0: + msg = Message(magic, att, crc, payload) + log.debug("Got v1 Message, %s", msg) + yield msg + elif att & KafkaClient.ATTRIBUTE_CODEC_MASK == 1: + gz = gzip_decompress(payload) + (msgs, _) = self.read_message_set(gz) + for msg in msgs: + yield msg + else: + raise RuntimeError("Unsupported compression type: %d" % (att & KafkaClient.ATTRIBUTE_CODEC_MASK)) + def read_message_set(self, data): """ Read a MessageSet + This method reads through the given bytes and decodes the length-prefixed Messages. It will short + circuit if there are insufficient bytes to read a whole Message. This allows callers to determine + the next valid offset + + Params + ====== + data: bytes to read + + Returns + ======= + tuple of (list(Message), read), where read is how many bytes were read + Wire Format =========== <MessageSet> ::= <len> <Message> [ <MessageSet> ] @@ -273,41 +346,29 @@ class KafkaClient(object): msgs = [] size = len(data) while cur < size: - if (cur + 5) > size: - # Underflow for the Header + # Make sure we can read this Message + if (cur + 4) > size: + # Not enough bytes to read the length if len(msgs) == 0: raise Exception("Message underflow. Did not request enough bytes to consume a single message") else: log.debug("Not enough data to read header of next message") break - # Read a Message header (length, magic byte) - (N, magic) = struct.unpack('>iB', data[cur:(cur+5)]) - + (N,) = struct.unpack('>i', data[cur:(cur+4)]) if (cur + N + 4) > size: - # Underflow for this Message + # Not enough bytes to read this whole Message log.debug("Not enough data to read next message") break - cur += 5 - - if magic == 0: # v0 Message - # Read crc; check the crc; append the message - (crc,) = struct.unpack('>i', data[cur:(cur+4)]) + else: cur += 4 - payload = data[cur:(cur+N-5)] - assert zlib.crc32(payload) == crc - cur += (N-5) - log.debug("Got v0 Message, %d bytes", len(payload)) - msgs.append(Message(magic, None, crc, payload)) - elif magic == 1: # v1 Message - # Read attributes, crc; check the crc; append the message - (att, crc) = struct.unpack('>Bi', data[cur:(cur+5)]) - cur += 5 - payload = data[cur:(cur+N-6)] - assert zlib.crc32(payload) == crc - cur += (N-6) - log.debug("Got v1 Message, %d bytes", len(payload)) - msgs.append(Message(magic, att, crc, payload)) + # Decode the message(s) + for m in self.decode_message(data[cur:cur+N]): + msgs.append(m) + + # Advance the cursor + cur += N + # Return the retrieved messages and the cursor position return (msgs, cur) @@ -315,6 +376,17 @@ class KafkaClient(object): # Advanced User API # ######################### + def create_message_from_string(self, payload): + #TODO document + return Message(1, 0, zlib.crc32(payload), payload) + + def create_gzipped_message(self, *payloads): + #TODO document + messages = [self.create_message_from_string(payload) for payload in payloads] + message_set = self.encode_message_set(messages) + gzipped = gzip_compress(message_set) + return Message(1, 0x00 | (KafkaClient.ATTRIBUTE_CODEC_MASK & 0x01), zlib.crc32(gzipped), gzipped) + def send_message_set(self, produceRequest): """ Send a ProduceRequest @@ -323,9 +395,12 @@ class KafkaClient(object): ====== produceRequest: ProduceRequest """ - req = length_prefix_message(self.create_produce_request(produceRequest)) - log.debug("Sending %d bytes to Kafka", len(req)) - self._sock.send(req) + req = length_prefix_message(self.encode_produce_request(produceRequest)) + log.debug("Sending %d bytes to Kafka: %r", len(req), req) + sent = self._sock.send(req) + if sent == 0: + raise RuntimeError("Kafka went away") + def send_multi_message_set(self, produceRequests): """ @@ -335,9 +410,11 @@ class KafkaClient(object): ====== produceRequests: list of ProduceRequest """ - req = length_prefix_message(self.create_multi_produce_request(produceRequests)) + req = length_prefix_message(self.encode_multi_produce_request(produceRequests)) log.debug("Sending %d bytes to Kafka", len(req)) - self._sock.send(req) + sent = self._sock.send(req) + if sent == 0: + raise RuntimeError("Kafka went away") def get_message_set(self, fetchRequest): """ @@ -353,9 +430,11 @@ class KafkaClient(object): starting at the next message. """ - req = length_prefix_message(self.create_fetch_request(fetchRequest)) + req = length_prefix_message(self.encode_fetch_request(fetchRequest)) log.debug("Sending %d bytes to Kafka", len(req)) - self._sock.send(req) + sent = self._sock.send(req) + if sent == 0: + raise RuntimeError("Kafka went away") data = self._consume_response() (messages, read) = self.read_message_set(data) @@ -382,9 +461,11 @@ class KafkaClient(object): <MultiMessage> ::= <len> 0 <MessageSet> <len> ::= <int32> """ - req = length_prefix_message(self.create_multi_fetch_request(fetchRequests)) + req = length_prefix_message(self.encode_multi_fetch_request(fetchRequests)) log.debug("Sending %d bytes to Kafka", len(req)) - self._sock.send(req) + sent = self._sock.send(req) + if sent == 0: + raise RuntimeError("Kafka went away") data = self._consume_response() cur = 0 responses = [] @@ -416,9 +497,11 @@ class KafkaClient(object): <offset> ::= <int64> """ - req = length_prefix_message(create_offset_request(offsetRequest)) + req = length_prefix_message(encode_offset_request(offsetRequest)) log.debug("Sending %d bytes to Kafka", len(req)) - self._sock.send(req) + sent = self._sock.send(req) + if sent == 0: + raise RuntimeError("Kafka went away") data = self._consume_response() (num,) = struct.unpack('>i', data[0:4]) |