summaryrefslogtreecommitdiff
path: root/kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka.py')
-rw-r--r--kafka.py211
1 files changed, 147 insertions, 64 deletions
diff --git a/kafka.py b/kafka.py
index 3439238..fbd415d 100644
--- a/kafka.py
+++ b/kafka.py
@@ -1,5 +1,7 @@
from collections import namedtuple
+from cStringIO import StringIO
import logging
+import gzip
import select
import socket
import struct
@@ -7,15 +9,6 @@ import zlib
log = logging.getLogger("org.apache.kafka")
-def length_prefix_message(msg):
- """
- Prefix a message with it's length as an int
- """
- return struct.pack('>i', len(msg)) + msg
-
-
-def create_message_from_string(payload):
- return Message(1, 0, zlib.crc32(payload), payload)
error_codes = {
-1: "UnknownError",
@@ -38,6 +31,30 @@ FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "size
ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"])
OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "maxOffsets"])
+def gzip_compress(payload):
+ buf = StringIO()
+ f = gzip.GzipFile(fileobj=buf, mode='w')
+ f.write(payload)
+ f.close()
+ buf.seek(0)
+ out = buf.read()
+ buf.close()
+ return out
+
+def gzip_decompress(payload):
+ buf = StringIO(payload)
+ f = gzip.GzipFile(fileobj=buf, mode='r')
+ out = f.read()
+ f.close()
+ buf.close()
+ return out
+
+def length_prefix_message(msg):
+ """
+ Prefix a message with it's length as an int
+ """
+ return struct.pack('>i', len(msg)) + msg
+
class KafkaClient(object):
"""
Request Structure
@@ -65,11 +82,14 @@ class KafkaClient(object):
MULTIPRODUCE_KEY = 3
OFFSET_KEY = 4
+ ATTRIBUTE_CODEC_MASK = 0x03
+
def __init__(self, host, port):
self.host = host
self.port = port
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((host, port))
+ self._sock.settimeout(10)
log.debug("Connected to %s on %d", host, port)
######################
@@ -113,10 +133,9 @@ class KafkaClient(object):
data += chunk
return data
-
- def create_message(self, message):
+ def encode_message(self, message):
"""
- Create a Message from a Message tuple
+ Encode a Message from a Message tuple
Params
======
@@ -138,24 +157,28 @@ class KafkaClient(object):
used for indicating the compression algorithm.
"""
if message.magic == 0:
- return struct.pack('>Bi%ds' % len(message.payload),
+ msg = struct.pack('>Bi%ds' % len(message.payload),
message.magic, message.crc, message.payload)
elif message.magic == 1:
- return struct.pack('>BBi%ds' % len(message.payload),
+ msg = struct.pack('>BBi%ds' % len(message.payload),
message.magic, message.attributes, message.crc, message.payload)
else:
raise Exception("Unknown message version: %d" % message.magic)
+ msg = length_prefix_message(msg)
+ log.debug("Encoded %s as %r" % (message, msg))
+ return msg
- def create_message_set(self, messages):
+ def encode_message_set(self, messages):
+ # TODO document
message_set = ""
for message in messages:
- encoded_message = self.create_message(message)
- message_set += length_prefix_message(encoded_message)
+ encoded_message = self.encode_message(message)
+ message_set += encoded_message
return message_set
- def create_produce_request(self, produceRequest):
+ def encode_produce_request(self, produceRequest):
"""
- Create a ProduceRequest
+ Encode a ProduceRequest
Wire Format
===========
@@ -169,22 +192,24 @@ class KafkaClient(object):
The request-key (0) is encoded as a short (int16). len is the length of the proceeding MessageSet
"""
(topic, partition, messages) = produceRequest
- message_set = self.create_message_set(messages)
+ message_set = self.encode_message_set(messages)
+ log.debug("Sending MessageSet: %r" % message_set)
req = struct.pack('>HH%dsii%ds' % (len(topic), len(message_set)),
KafkaClient.PRODUCE_KEY, len(topic), topic, partition, len(message_set), message_set)
return req
- def create_multi_produce_request(self, produceRequests):
+ def encode_multi_produce_request(self, produceRequests):
+ # TODO document
req = struct.pack('>HH', KafkaClient.MULTIPRODUCE_KEY, len(produceRequests))
for (topic, partition, messages) in produceRequests:
- message_set = self.create_message_set(messages)
+ message_set = self.encode_message_set(messages)
req += struct.pack('>H%dsii%ds' % (len(topic), len(message_set)),
len(topic), topic, partition, len(message_set), message_set)
return req
- def create_fetch_request(self, fetchRequest):
+ def encode_fetch_request(self, fetchRequest):
"""
- Create a FetchRequest message
+ Encode a FetchRequest message
Wire Format
===========
@@ -203,9 +228,9 @@ class KafkaClient(object):
KafkaClient.FETCH_KEY, len(topic), topic, partition, offset, size)
return req
- def create_multi_fetch_request(self, fetchRequests):
+ def encode_multi_fetch_request(self, fetchRequests):
"""
- Create the MultiFetchRequest message from a list of FetchRequest objects
+ Encode the MultiFetchRequest message from a list of FetchRequest objects
Params
======
@@ -235,9 +260,9 @@ class KafkaClient(object):
req += struct.pack('>H%dsiqi' % len(topic), len(topic), topic, partition, offset, size)
return req
- def create_offset_request(self, offsetRequest):
+ def encode_offset_request(self, offsetRequest):
"""
- Create an OffsetRequest message
+ Encode an OffsetRequest message
Wire Format
===========
@@ -256,10 +281,58 @@ class KafkaClient(object):
req = struct.pack('>HH%dsiqi' % len(topic), KafkaClient.OFFSET_KEY, len(topic), topic, partition, offset, maxOffsets)
return req
+ def decode_message(self, data):
+ """
+ Decode a Message
+
+ Since a Message can actually contained a compressed payload of multiple nested Messages,
+ this method returns a generator.
+ """
+ # TODO document
+ N = len(data)
+ (magic,) = struct.unpack('>B', data[0:1])
+ if magic == 0: # v0 Message
+ # Read crc; check the crc; append the message
+ (crc,) = struct.unpack('>i', data[1:5])
+ payload = data[5:N]
+ assert zlib.crc32(payload) == crc
+ msg = Message(magic, None, crc, payload)
+ log.debug("Got v0 Message, %s", msg)
+ yield msg
+ elif magic == 1: # v1 Message
+ # Read attributes, crc; check the crc; append the message
+ (att, crc) = struct.unpack('>Bi', data[1:6])
+ payload = data[6:N]
+ assert zlib.crc32(payload) == crc
+ # Uncompressed, just a single Message
+ if att & KafkaClient.ATTRIBUTE_CODEC_MASK == 0:
+ msg = Message(magic, att, crc, payload)
+ log.debug("Got v1 Message, %s", msg)
+ yield msg
+ elif att & KafkaClient.ATTRIBUTE_CODEC_MASK == 1:
+ gz = gzip_decompress(payload)
+ (msgs, _) = self.read_message_set(gz)
+ for msg in msgs:
+ yield msg
+ else:
+ raise RuntimeError("Unsupported compression type: %d" % (att & KafkaClient.ATTRIBUTE_CODEC_MASK))
+
def read_message_set(self, data):
"""
Read a MessageSet
+ This method reads through the given bytes and decodes the length-prefixed Messages. It will short
+ circuit if there are insufficient bytes to read a whole Message. This allows callers to determine
+ the next valid offset
+
+ Params
+ ======
+ data: bytes to read
+
+ Returns
+ =======
+ tuple of (list(Message), read), where read is how many bytes were read
+
Wire Format
===========
<MessageSet> ::= <len> <Message> [ <MessageSet> ]
@@ -273,41 +346,29 @@ class KafkaClient(object):
msgs = []
size = len(data)
while cur < size:
- if (cur + 5) > size:
- # Underflow for the Header
+ # Make sure we can read this Message
+ if (cur + 4) > size:
+ # Not enough bytes to read the length
if len(msgs) == 0:
raise Exception("Message underflow. Did not request enough bytes to consume a single message")
else:
log.debug("Not enough data to read header of next message")
break
- # Read a Message header (length, magic byte)
- (N, magic) = struct.unpack('>iB', data[cur:(cur+5)])
-
+ (N,) = struct.unpack('>i', data[cur:(cur+4)])
if (cur + N + 4) > size:
- # Underflow for this Message
+ # Not enough bytes to read this whole Message
log.debug("Not enough data to read next message")
break
- cur += 5
-
- if magic == 0: # v0 Message
- # Read crc; check the crc; append the message
- (crc,) = struct.unpack('>i', data[cur:(cur+4)])
+ else:
cur += 4
- payload = data[cur:(cur+N-5)]
- assert zlib.crc32(payload) == crc
- cur += (N-5)
- log.debug("Got v0 Message, %d bytes", len(payload))
- msgs.append(Message(magic, None, crc, payload))
- elif magic == 1: # v1 Message
- # Read attributes, crc; check the crc; append the message
- (att, crc) = struct.unpack('>Bi', data[cur:(cur+5)])
- cur += 5
- payload = data[cur:(cur+N-6)]
- assert zlib.crc32(payload) == crc
- cur += (N-6)
- log.debug("Got v1 Message, %d bytes", len(payload))
- msgs.append(Message(magic, att, crc, payload))
+ # Decode the message(s)
+ for m in self.decode_message(data[cur:cur+N]):
+ msgs.append(m)
+
+ # Advance the cursor
+ cur += N
+
# Return the retrieved messages and the cursor position
return (msgs, cur)
@@ -315,6 +376,17 @@ class KafkaClient(object):
# Advanced User API #
#########################
+ def create_message_from_string(self, payload):
+ #TODO document
+ return Message(1, 0, zlib.crc32(payload), payload)
+
+ def create_gzipped_message(self, *payloads):
+ #TODO document
+ messages = [self.create_message_from_string(payload) for payload in payloads]
+ message_set = self.encode_message_set(messages)
+ gzipped = gzip_compress(message_set)
+ return Message(1, 0x00 | (KafkaClient.ATTRIBUTE_CODEC_MASK & 0x01), zlib.crc32(gzipped), gzipped)
+
def send_message_set(self, produceRequest):
"""
Send a ProduceRequest
@@ -323,9 +395,12 @@ class KafkaClient(object):
======
produceRequest: ProduceRequest
"""
- req = length_prefix_message(self.create_produce_request(produceRequest))
- log.debug("Sending %d bytes to Kafka", len(req))
- self._sock.send(req)
+ req = length_prefix_message(self.encode_produce_request(produceRequest))
+ log.debug("Sending %d bytes to Kafka: %r", len(req), req)
+ sent = self._sock.send(req)
+ if sent == 0:
+ raise RuntimeError("Kafka went away")
+
def send_multi_message_set(self, produceRequests):
"""
@@ -335,9 +410,11 @@ class KafkaClient(object):
======
produceRequests: list of ProduceRequest
"""
- req = length_prefix_message(self.create_multi_produce_request(produceRequests))
+ req = length_prefix_message(self.encode_multi_produce_request(produceRequests))
log.debug("Sending %d bytes to Kafka", len(req))
- self._sock.send(req)
+ sent = self._sock.send(req)
+ if sent == 0:
+ raise RuntimeError("Kafka went away")
def get_message_set(self, fetchRequest):
"""
@@ -353,9 +430,11 @@ class KafkaClient(object):
starting at the next message.
"""
- req = length_prefix_message(self.create_fetch_request(fetchRequest))
+ req = length_prefix_message(self.encode_fetch_request(fetchRequest))
log.debug("Sending %d bytes to Kafka", len(req))
- self._sock.send(req)
+ sent = self._sock.send(req)
+ if sent == 0:
+ raise RuntimeError("Kafka went away")
data = self._consume_response()
(messages, read) = self.read_message_set(data)
@@ -382,9 +461,11 @@ class KafkaClient(object):
<MultiMessage> ::= <len> 0 <MessageSet>
<len> ::= <int32>
"""
- req = length_prefix_message(self.create_multi_fetch_request(fetchRequests))
+ req = length_prefix_message(self.encode_multi_fetch_request(fetchRequests))
log.debug("Sending %d bytes to Kafka", len(req))
- self._sock.send(req)
+ sent = self._sock.send(req)
+ if sent == 0:
+ raise RuntimeError("Kafka went away")
data = self._consume_response()
cur = 0
responses = []
@@ -416,9 +497,11 @@ class KafkaClient(object):
<offset> ::= <int64>
"""
- req = length_prefix_message(create_offset_request(offsetRequest))
+ req = length_prefix_message(encode_offset_request(offsetRequest))
log.debug("Sending %d bytes to Kafka", len(req))
- self._sock.send(req)
+ sent = self._sock.send(req)
+ if sent == 0:
+ raise RuntimeError("Kafka went away")
data = self._consume_response()
(num,) = struct.unpack('>i', data[0:4])