diff options
author | David Arthur <mumrah@gmail.com> | 2013-02-20 10:34:34 -0500 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-04-02 20:19:30 -0400 |
commit | 8b70b9cf6ab28a662bff0b00ece6e7a2924a9e8f (patch) | |
tree | 57b64f1a6715548bbbc4c097174a1cdfcaafb290 /kafka/client08.py | |
parent | 71fef1b1555c2fb15a89411a5a6f79baebe4d3ae (diff) | |
download | kafka-python-8b70b9cf6ab28a662bff0b00ece6e7a2924a9e8f.tar.gz |
First pass of cleanup/refactoring
Also added a bunch of docstrings
Diffstat (limited to 'kafka/client08.py')
-rw-r--r-- | kafka/client08.py | 672 |
1 files changed, 455 insertions, 217 deletions
diff --git a/kafka/client08.py b/kafka/client08.py index f120f37..11910d1 100644 --- a/kafka/client08.py +++ b/kafka/client08.py @@ -14,69 +14,117 @@ from .codec import snappy_encode, snappy_decode from .util import read_short_string, read_int_string from .util import relative_unpack from .util import write_short_string, write_int_string +from .util import BufferUnderflowError, ChecksumError log = logging.getLogger("kafka") +############### +# Structs # +############### + # Request payloads ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"]) -FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "maxBytes"]) -OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "maxOffsets"]) +FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "max_bytes"]) +OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "max_offsets"]) +OffsetCommitRequest = namedtuple("OffsetCommitRequest", ["topic", "partition", "offset", "metadata"]) +OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"]) # Response payloads ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error", "highwaterMark", "messages"]) OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offset"]) +OffsetCommitResponse = namedtuple("OffsetCommitResponse", ["topic", "partition", "error"]) +OffsetFetchResponse = namedtuple("OffsetFetchResponse", ["topic", "partition", "offset", "metadata", "error"]) BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"]) -PartitionMetadata = namedtuple("PartitionMetadata", ["topic", "partitionId", "leader", "replicas", "isr"]) +PartitionMetadata = namedtuple("PartitionMetadata", ["topic", "partition", "leader", "replicas", "isr"]) # Other useful structs OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"]) Message = namedtuple("Message", ["magic", "attributes", "key", "value"]) -TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partitionId"]) +TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"]) class ErrorMapping(object): - Unknown = -1 - NoError = 0 - OffsetOutOfRange = 1 - InvalidMessage = 2 - UnknownTopicOrPartition = 3 - InvalidFetchSize = 4 - LeaderNotAvailable = 5 - NotLeaderForPartition = 6 - RequestTimedOut = 7 - BrokerNotAvailable = 8 - ReplicaNotAvailable = 9 - MessageSizeTooLarge = 10 - StaleControllerEpoch = 11 - OffsetMetadataTooLarge = 12 + # Many of these are not actually used by the client + UNKNOWN = -1 + NO_ERROR = 0 + OFFSET_OUT_OF_RANGE = 1 + INVALID_MESSAGE = 2 + UNKNOWN_TOPIC_OR_PARTITON = 3 + INVALID_FETCH_SIZE = 4 + LEADER_NOT_AVAILABLE = 5 + NOT_LEADER_FOR_PARTITION = 6 + REQUEST_TIMED_OUT = 7 + BROKER_NOT_AVAILABLE = 8 + REPLICA_NOT_AVAILABLE = 9 + MESSAGE_SIZE_TO_LARGE = 10 + STALE_CONTROLLER_EPOCH = 11 + OFFSET_METADATA_TOO_LARGE = 12 class KafkaProtocol(object): - PRODUCE_KEY = 0 - FETCH_KEY = 1 - OFFSET_KEY = 2 - METADATA_KEY = 3 + """ + Class to encapsulate all of the protocol encoding/decoding. This class does not + have any state associated with it, it is purely for organization. + """ + PRODUCE_KEY = 0 + FETCH_KEY = 1 + OFFSET_KEY = 2 + METADATA_KEY = 3 + OFFSET_COMMIT_KEY = 6 + OFFSET_FETCH_KEY = 7 ATTRIBUTE_CODEC_MASK = 0x03 + ################### + # Private API # + ################### + @classmethod - def encode_message_header(cls, clientId, correlationId, requestKey): - return struct.pack('>HHiH%ds' % len(clientId), - requestKey, # ApiKey - 0, # ApiVersion - correlationId, # CorrelationId - len(clientId), # - clientId) # ClientId + def _encode_message_header(cls, client_id, correlation_id, request_key): + """ + Encode the common request envelope + """ + return struct.pack('>hhih%ds' % len(client_id), + request_key, # ApiKey + 0, # ApiVersion + correlation_id, # CorrelationId + len(client_id), # + client_id) # ClientId @classmethod - def encode_message_set(cls, messages): + def _encode_message_set(cls, messages): + """ + Encode a MessageSet. Unlike other arrays in the protocol, MessageSets are + not length-prefixed + + Format + ====== + MessageSet => [Offset MessageSize Message] + Offset => int64 + MessageSize => int32 + """ message_set = "" for message in messages: - encoded_message = KafkaProtocol.encode_message(message) + encoded_message = KafkaProtocol._encode_message(message) message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message) return message_set @classmethod - def encode_message(cls, message): + def _encode_message(cls, message): + """ + Encode a single message. + + The magic number of a message is a format version number. The only supported + magic number right now is zero + + Format + ====== + Message => Crc MagicByte Attributes Key Value + Crc => int32 + MagicByte => int8 + Attributes => int8 + Key => bytes + Value => bytes + """ if message.magic == 0: msg = struct.pack('>BB', message.magic, message.attributes) msg += write_int_string(message.key) @@ -87,92 +135,237 @@ class KafkaProtocol(object): raise Exception("Unexpected magic number: %d" % message.magic) return msg - @classmethod - def create_message(cls, value): - return Message(0, 0, "foo", value) @classmethod - def create_gzip_message(cls, value): - message_set = KafkaProtocol.encode_message_set([KafkaProtocol.create_message(value)]) - gzipped = gzip_encode(message_set) - return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & 0x01), "foo", gzipped) - - @classmethod - def decode_message_set_iter(cls, data): + def _decode_message_set_iter(cls, data): """ - Decode a MessageSet, iteratively + Iteratively decode a MessageSet Reads repeated elements of (offset, message), calling decode_message to decode a single message. Since compressed messages contain futher MessageSets, these two methods have been decoupled so that they may recurse easily. - - Format - ====== - MessageSet => [Offset MessageSize Message] - Offset => int64 - MessageSize => int32 - - N.B., the repeating element of the MessageSet is not preceded by an int32 like other - repeating elements in this protocol """ cur = 0 while cur < len(data): - ((offset, ), cur) = relative_unpack('>q', data, cur) - (msg, cur) = read_int_string(data, cur) - for (offset, message) in KafkaProtocol.decode_message(msg, offset): - yield OffsetAndMessage(offset, message) + try: + ((offset, ), cur) = relative_unpack('>q', data, cur) + (msg, cur) = read_int_string(data, cur) + for (offset, message) in KafkaProtocol._decode_message(msg, offset): + yield OffsetAndMessage(offset, message) + except BufferUnderflowError: # If we get a partial read of a message, stop + raise StopIteration() @classmethod - def decode_message(cls, data, offset): + def _decode_message(cls, data, offset): """ Decode a single Message The only caller of this method is decode_message_set_iter. They are decoupled to support nested messages (compressed MessageSets). The offset is actually read from decode_message_set_iter (it is part of the MessageSet payload). - - Format - ======== - Message => Crc MagicByte Attributes Key Value - Crc => int32 - MagicByte => int8 - Attributes => int8 - Key => bytes - Value => bytes """ ((crc, magic, att), cur) = relative_unpack('>iBB', data, 0) - assert crc == zlib.crc32(data[4:]) + if crc != zlib.crc32(data[4:]): + raise ChecksumError("Message checksum failed") + (key, cur) = read_int_string(data, cur) (value, cur) = read_int_string(data, cur) if att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 0: yield (offset, Message(magic, att, key, value)) elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 1: gz = gzip_decode(value) - for (offset, message) in KafkaProtocol.decode_message_set_iter(gz): + for (offset, message) in KafkaProtocol._decode_message_set_iter(gz): yield (offset, message) elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 2: snp = snappy_decode(value) - for (offset, message) in KafkaProtocol.decode_message_set_iter(snp): + for (offset, message) in KafkaProtocol._decode_message_set_iter(snp): yield (offset, message) + ################## + # Public API # + ################## + @classmethod - def encode_metadata_request(cls, clientId, correlationId, *topics): - # Header - message = cls.encode_message_header(clientId, correlationId, KafkaProtocol.METADATA_KEY) + def create_message(cls, payload, key=None): + """ + Construct a Message - # TopicMetadataRequest + Params + ====== + payload: bytes, the payload to send to Kafka + key: bytes, a key used for partition routing (optional) + """ + return Message(0, 0, key, payload) + + @classmethod + def create_gzip_message(cls, payloads, key=None): + """ + Construct a Gzipped Message containing multiple Messages + + The given payloads will be encoded, compressed, and sent as a single atomic + message to Kafka. + + Params + ====== + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + """ + message_set = KafkaProtocol._encode_message_set( + [KafkaProtocol.create_message(payload) for payload in payloads]) + gzipped = gzip_encode(message_set) + return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & 0x01), key, gzipped) + + + @classmethod + def encode_produce_request(self, client_id, correlation_id, payloads=[], acks=1, timeout=1000): + """ + Encode some ProduceRequest structs + + Params + ====== + client_id: string + correlation_id: string + payloads: list of ProduceRequest + acks: How "acky" you want the request to be + 0: immediate response + 1: written to disk by the leader + 2+: waits for this many number of replicas to sync + -1: waits for all replicas to be in sync + timeout: Maximum time the server will wait for acks from replicas. This is _not_ a socket timeout + """ + payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.PRODUCE_KEY) + message += struct.pack('>hii', acks, timeout, len(payloads_by_topic)) + for topic, payload in payloads_by_topic: + payloads = list(payloads) + message += struct.pack('>h%dsi' % len(topic), len(topic), topic, len(payloads)) + for payload in payloads: + message_set = KafkaProtocol._encode_message_set(payload.messages) + message += struct.pack('>ii%ds' % len(message_set), payload.partition, len(message_set), message_set) + return struct.pack('>i%ds' % len(message), len(message), message) + + @classmethod + def decode_produce_response(cls, data): + """ + Decode bytes to a ProduceResponse + + Params + ====== + data: bytes to decode + """ + ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) + for i in range(num_topics): + ((strlen,), cur) = relative_unpack('>h', data, cur) + topic = data[cur:cur+strlen] + cur += strlen + ((num_partitions,), cur) = relative_unpack('>i', data, cur) + for i in range(num_partitions): + ((partition, error, offset), cur) = relative_unpack('>ihq', data, cur) + yield ProduceResponse(topic, partition, error, offset) + + @classmethod + def encode_fetch_request(cls, client_id, correlation_id, payloads=[], max_wait_time=100, min_bytes=4096): + """ + Encodes some FetchRequest structs + + Params + ====== + client_id: string + correlation_id: string + payloads: list of FetchRequest + max_wait_time: int, how long to block waiting on min_bytes of data + min_bytes: int, the minimum number of bytes to accumulate before returning the response + """ + + payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.FETCH_KEY) + message += struct.pack('>iiii', -1, max_wait_time, min_bytes, len(payloads_by_topic)) # -1 is the replica id + for topic, payload in payloads_by_topic: + payloads = list(payloads) + message += write_short_string(topic) + message += struct.pack('>i', len(payloads)) + for payload in payloads: + message += struct.pack('>iqi', payload.partition, payload.offset, payload.max_bytes) + return struct.pack('>i%ds' % len(message), len(message), message) + + @classmethod + def decode_fetch_response_iter(cls, data): + """ + Decode bytes to a FetchResponse + + Params + ====== + data: bytes to decode + """ + ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) + for i in range(num_topics): + (topic, cur) = read_short_string(data, cur) + ((num_partitions,), cur) = relative_unpack('>i', data, cur) + for i in range(num_partitions): + ((partition, error, highwater_mark_offset), cur) = relative_unpack('>ihq', data, cur) + (message_set, cur) = read_int_string(data, cur) + yield FetchResponse(topic, partition, error, highwater_mark_offset, + KafkaProtocol._decode_message_set_iter(message_set)) + + @classmethod + def encode_offset_request(cls, client_id, correlation_id, payloads=[]): + payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_KEY) + message += struct.pack('>ii', -1, len(payloads_by_topic)) # -1 is the replica id + for topic, payload in payloads_by_topic: + payloads = list(payloads) + message += write_short_string(topic) + message += struct.pack('>i', len(payloads)) + for payload in payloads: + message += struct.pack('>iqi', payload.partition, payload.time, payload.max_offsets) + return struct.pack('>i%ds' % len(message), len(message), message) + + @classmethod + def decode_offset_response(cls, data): + """ + Decode bytes to an OffsetResponse + + Params + ====== + data: bytes to decode + """ + ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) + for i in range(num_topics): + (topic, cur) = read_short_string(data, cur) + ((num_partitions,), cur) = relative_unpack('>i', data, cur) + for i in range(num_partitions): + ((partition, error, offset), cur) = relative_unpack('>ihq', data, cur) + yield OffsetResponse(topic, partition, error, offset) + + @classmethod + def encode_metadata_request(cls, client_id, correlation_id, topics=[]): + """ + Encode a MetadataRequest + + Params + ====== + client_id: string + correlation_id: string + topics: list of strings + """ + message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.METADATA_KEY) message += struct.pack('>i', len(topics)) for topic in topics: - message += struct.pack('>H%ds' % len(topic), len(topic), topic) - - # Length-prefix the whole thing + message += struct.pack('>h%ds' % len(topic), len(topic), topic) return write_int_string(message) @classmethod def decode_metadata_response(cls, data): - # TopicMetadataResponse - cur = 0 - ((correlationId, numBrokers), cur) = relative_unpack('>ii', data, cur) + """ + Decode bytes to a MetadataResponse + + Params + ====== + data: bytes to decode + """ + ((correlation_id, numBrokers), cur) = relative_unpack('>ii', data, 0) + + # Broker info brokers = {} for i in range(numBrokers): ((nodeId, ), cur) = relative_unpack('>i', data, cur) @@ -180,145 +373,123 @@ class KafkaProtocol(object): ((port,), cur) = relative_unpack('>i', data, cur) brokers[nodeId] = BrokerMetadata(nodeId, host, port) - ((numTopics,), cur) = relative_unpack('>i', data, cur) + # Topic info + ((num_topics,), cur) = relative_unpack('>i', data, cur) topicMetadata = {} - for i in range(numTopics): - ((topicError,), cur) = relative_unpack('>H', data, cur) + for i in range(num_topics): + ((topicError,), cur) = relative_unpack('>h', data, cur) (topicName, cur) = read_short_string(data, cur) - ((numPartitions,), cur) = relative_unpack('>i', data, cur) + ((num_partitions,), cur) = relative_unpack('>i', data, cur) partitionMetadata = {} - for j in range(numPartitions): - ((partitionErrorCode, partitionId, leader, numReplicas), cur) = relative_unpack('>Hiii', data, cur) + for j in range(num_partitions): + ((partitionErrorCode, partition, leader, numReplicas), cur) = relative_unpack('>hiii', data, cur) (replicas, cur) = relative_unpack('>%di' % numReplicas, data, cur) ((numIsr,), cur) = relative_unpack('>i', data, cur) (isr, cur) = relative_unpack('>%di' % numIsr, data, cur) - partitionMetadata[partitionId] = PartitionMetadata(topicName, partitionId, leader, replicas, isr) + partitionMetadata[partition] = PartitionMetadata(topicName, partition, leader, replicas, isr) topicMetadata[topicName] = partitionMetadata return (brokers, topicMetadata) @classmethod - def encode_produce_request(self, clientId, correlationId, payloads=[], acks=1, timeout=1000): - # Group the payloads by topic - sorted_payloads = sorted(payloads, key=attrgetter("topic")) - grouped_payloads = list(groupby(sorted_payloads, key=attrgetter("topic"))) - - # Pack the message header - message = struct.pack('>HHiH%ds' % len(clientId), - KafkaProtocol.PRODUCE_KEY, # ApiKey - 0, # ApiVersion - correlationId, # CorrelationId - len(clientId), # - clientId) # ClientId - - # Pack the message sets - message += struct.pack('>Hii', acks, timeout, len(grouped_payloads)) - for topic, payload in grouped_payloads: - payloads = list(payloads) - message += struct.pack('>H%dsi' % len(topic), len(topic), topic, len(payloads)) - for payload in payloads: - message_set = KafkaProtocol.encode_message_set(payload.messages) - message += struct.pack('>ii%ds' % len(message_set), payload.partition, len(message_set), message_set) - - # Length-prefix the whole thing - return struct.pack('>i%ds' % len(message), len(message), message) - - @classmethod - def decode_produce_response(cls, data): - ((correlationId, numTopics), cur) = relative_unpack('>ii', data, 0) - for i in range(numTopics): - ((strlen,), cur) = relative_unpack('>H', data, cur) - topic = data[cur:cur+strlen] - cur += strlen - ((numPartitions,), cur) = relative_unpack('>i', data, cur) - for i in range(numPartitions): - ((partition, error, offset), cur) = relative_unpack('>iHq', data, cur) - yield ProduceResponse(topic, partition, error, offset) + def encode_offset_commit_request(cls, client_id, correlation_id, group, payloads): + """ + Encode some OffsetCommitRequest structs - @classmethod - def encode_fetch_request(cls, clientId, correlationId, payloads=[], replicaId=-1, maxWaitTime=100, minBytes=1024): - # Group the payloads by topic - sorted_payloads = sorted(payloads, key=attrgetter("topic")) - grouped_payloads = list(groupby(sorted_payloads, key=attrgetter("topic"))) - - # Pack the message header - message = struct.pack('>HHiH%ds' % len(clientId), - KafkaProtocol.FETCH_KEY, # ApiKey - 0, # ApiVersion - correlationId, # CorrelationId - len(clientId), # - clientId) # ClientId - - # Pack the FetchRequest - message += struct.pack('>iiii', - replicaId, # ReplicaId - maxWaitTime, # MaxWaitTime - minBytes, # MinBytes - len(grouped_payloads)) - for topic, payload in grouped_payloads: + Params + ====== + client_id: string + correlation_id: string + group: string, the consumer group you are committing offsets for + payloads: list of OffsetCommitRequest + """ + payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_COMMIT_KEY) + message += write_short_string(group) + message += struct.pack('>i', len(payloads_by_topic)) + for topic, payload in payloads_by_topic: payloads = list(payloads) message += write_short_string(topic) message += struct.pack('>i', len(payloads)) for payload in payloads: - message += struct.pack('>iqi', payload.partition, payload.offset, payload.maxBytes) - - # Length-prefix the whole thing + message += struct.pack('>iq', payload.partition, payload.offset) + message += write_short_string(payload.metadata) return struct.pack('>i%ds' % len(message), len(message), message) @classmethod - def decode_fetch_response_iter(cls, data): - ((correlationId, numTopics), cur) = relative_unpack('>ii', data, 0) - for i in range(numTopics): + def decode_offset_commit_response(cls, data): + """ + Decode bytes to an OffsetCommitResponse + + Params + ====== + data: bytes to decode + """ + ((correlation_id,), cur) = relative_unpack('>i', data, 0) + (client_id, cur) = read_short_string(data, cur) + ((num_topics,), cur) = relative_unpack('>i', data, cur) + for i in range(num_topics): (topic, cur) = read_short_string(data, cur) - ((numPartitions,), cur) = relative_unpack('>i', data, cur) - for i in range(numPartitions): - ((partition, error, highwaterMarkOffset), cur) = relative_unpack('>iHq', data, cur) - (messageSet, cur) = read_int_string(data, cur) - yield FetchResponse(topic, partition, error, highwaterMarkOffset, KafkaProtocol.decode_message_set_iter(messageSet)) + ((num_partitions,), cur) = relative_unpack('>i', data, cur) + for i in range(num_partitions): + ((partition, error), cur) = relative_unpack('>ih', data, cur) + yield OffsetCommitResponse(topic, partition, error) @classmethod - def encode_offset_request(cls, clientId, correlationId, payloads=[], replicaId=-1): - # Group the payloads by topic - sorted_payloads = sorted(payloads, key=attrgetter("topic")) - grouped_payloads = list(groupby(sorted_payloads, key=attrgetter("topic"))) - - # Pack the message header - message = struct.pack('>HHiH%ds' % len(clientId), - KafkaProtocol.OFFSET_KEY, # ApiKey - 0, # ApiVersion - correlationId, # CorrelationId - len(clientId), # - clientId) # ClientId - - message += struct.pack('>ii', replicaId, len(grouped_payloads)) - - # Pack the OffsetRequest - for topic, payload in grouped_payloads: + def encode_offset_fetch_request(cls, client_id, correlation_id, group, payloads): + """ + Encode some OffsetFetchRequest structs + + Params + ====== + client_id: string + correlation_id: string + group: string, the consumer group you are fetching offsets for + payloads: list of OffsetFetchRequest + """ + payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_FETCH_KEY) + message += write_short_string(group) + message += struct.pack('>i', len(payloads_by_topic)) + for topic, payload in payloads_by_topic: payloads = list(payloads) message += write_short_string(topic) message += struct.pack('>i', len(payloads)) for payload in payloads: - message += struct.pack('>iqi', payload.partition, payload.time, payload.maxOffsets) - - # Length-prefix the whole thing + message += struct.pack('>i', payload.partition) return struct.pack('>i%ds' % len(message), len(message), message) @classmethod - def decode_offset_response(cls, data): - ((correlationId, numTopics), cur) = relative_unpack('>ii', data, 0) - for i in range(numTopics): - (topic, cur) = read_short_string(data, cur) - ((numPartitions,), cur) = relative_unpack('>i', data, cur) - for i in range(numPartitions): - ((partition, error, offset), cur) = relative_unpack('>iHq', data, cur) - yield OffsetResponse(topic, partition, error, offset) + def decode_offset_fetch_response(cls, data): + """ + Decode bytes to an OffsetFetchResponse + Params + ====== + data: bytes to decode + """ + ((correlation_id,), cur) = relative_unpack('>i', data, 0) + (client_id, cur) = read_short_string(data, cur) + ((num_topics,), cur) = relative_unpack('>i', data, cur) + for i in range(num_topics): + (topic, cur) = read_short_string(data, cur) + ((num_partitions,), cur) = relative_unpack('>i', data, cur) + for i in range(num_partitions): + ((partition, offset), cur) = relative_unpack('>iq', data, cur) + (metadata, cur) = read_short_string(data, cur) + ((error,), cur) = relative_unpack('>h', data, cur) + yield OffsetFetchResponse(topic, partition, offset, metadata, error) -class Conn(object): +class KafkaConnection(object): """ A socket connection to a single Kafka broker + + This class is _not_ thread safe. Each call to `send` must be followed + by a call to `recv` in order to get the correct response. Eventually, + we can do something in here to facilitate multiplexed requests/responses + since the Kafka API includes a correlation id. """ - def __init__(self, host, port, bufsize=1024): + def __init__(self, host, port, bufsize=4096): self.host = host self.port = port self.bufsize = bufsize @@ -326,8 +497,9 @@ class Conn(object): self._sock.connect((host, port)) self._sock.settimeout(10) - def close(self): - self._sock.close() + ################### + # Private API # + ################### def _consume_response(self): """ @@ -345,7 +517,7 @@ class Conn(object): """ log.debug("Handling response from Kafka") - # Header + # Read the size off of the header resp = self._sock.recv(4) if resp == "": raise Exception("Got no response from Kafka") @@ -354,61 +526,66 @@ class Conn(object): messageSize = size - 4 log.debug("About to read %d bytes from Kafka", messageSize) - # Response iterator + # Read the remainder of the response total = 0 while total < messageSize: resp = self._sock.recv(self.bufsize) log.debug("Read %d bytes from Kafka", len(resp)) if resp == "": - raise Exception("Underflow") + raise BufferUnderflowError("Not enough data to read this response") total += len(resp) yield resp + ################## + # Public API # + ################## + def send(self, requestId, payload): - #print(repr(payload)) + "Send a request to Kafka" sent = self._sock.sendall(payload) if sent == 0: raise RuntimeError("Kafka went away") self.data = self._consume_response() - #print(repr(self.data)) def recv(self, requestId): + "Get a response from Kafka" return self.data -class KafkaConnection(object): - """ - Low-level API for Kafka 0.8 - """ + def close(self): + "Close this connection" + self._sock.close() - # ClientId for Kafka - CLIENT_ID = "kafka-python" +class KafkaClient(object): - # Global correlation ids + CLIENT_ID = "kafka-python" ID_GEN = count() - def __init__(self, host, port, bufsize=1024): + def __init__(self, host, port, bufsize=4096): # We need one connection to bootstrap self.bufsize = bufsize - self.conns = {(host, port): Conn(host, port, bufsize)} - self.brokers = {} # broker Id -> BrokerMetadata - self.topics_to_brokers = {} # topic Id -> broker Id + self.conns = { # (host, port) -> KafkaConnection + (host, port): KafkaConnection(host, port, bufsize) + } + self.brokers = {} # broker_id -> BrokerMetadata + self.topics_to_brokers = {} # topic_id -> broker_id self.load_metadata_for_topics() def get_conn_for_broker(self, broker): "Get or create a connection to a broker" if (broker.host, broker.port) not in self.conns: - self.conns[(broker.host, broker.port)] = Conn(broker.host, broker.port, self.bufsize) + self.conns[(broker.host, broker.port)] = KafkaConnection(broker.host, broker.port, self.bufsize) return self.conns[(broker.host, broker.port)] def next_id(self): - return KafkaConnection.ID_GEN.next() + "Generate a new correlation id" + return KafkaClient.ID_GEN.next() def load_metadata_for_topics(self, *topics): """ Discover brokers and metadata for a set of topics """ requestId = self.next_id() - request = KafkaProtocol.encode_metadata_request(KafkaConnection.CLIENT_ID, requestId, *topics) + request = KafkaProtocol.encode_metadata_request(KafkaClient.CLIENT_ID, requestId, topics) conn = self.conns.values()[0] # Just get the first one in the list conn.send(requestId, request) response = conn.recv(requestId) @@ -435,12 +612,11 @@ class KafkaConnection(object): def send_produce_request(self, payloads=[], fail_on_error=True, callback=None): # Group the produce requests by topic+partition - sorted_payloads = sorted(payloads, key=lambda x: (x.topic, x.partition)) - grouped_payloads = groupby(sorted_payloads, key=lambda x: (x.topic, x.partition)) + payloads_by_topic_and_partition = group_list_by_key(payloads, key=lambda x: (x.topic, x.partition)) # Group the produce requests by which broker they go to payloads_by_broker = defaultdict(list) - for (topic, partition), payload in grouped_payloads: + for (topic, partition), payload in payloads_by_topic_and_partition: payloads_by_broker[self.get_leader_for_partition(topic, partition)] += list(payload) out = [] @@ -448,7 +624,7 @@ class KafkaConnection(object): for broker, payloads in payloads_by_broker.items(): conn = self.get_conn_for_broker(broker) requestId = self.next_id() - request = KafkaProtocol.encode_produce_request(KafkaConnection.CLIENT_ID, requestId, payloads) + request = KafkaProtocol.encode_produce_request(KafkaClient.CLIENT_ID, requestId, payloads) # Send the request conn.send(requestId, request) response = conn.recv(requestId) @@ -472,12 +648,11 @@ class KafkaConnection(object): brokers. """ # Group the produce requests by topic+partition - sorted_payloads = sorted(payloads, key=lambda x: (x.topic, x.partition)) - grouped_payloads = groupby(sorted_payloads, key=lambda x: (x.topic, x.partition)) + payloads_by_topic_and_partition = group_list_by_key(payloads, key=lambda x: (x.topic, x.partition)) # Group the produce requests by which broker they go to payloads_by_broker = defaultdict(list) - for (topic, partition), payload in grouped_payloads: + for (topic, partition), payload in payloads_by_topic_and_partition: payloads_by_broker[self.get_leader_for_partition(topic, partition)] += list(payload) out = [] @@ -485,7 +660,7 @@ class KafkaConnection(object): for broker, payloads in payloads_by_broker.items(): conn = self.get_conn_for_broker(broker) requestId = self.next_id() - request = KafkaProtocol.encode_fetch_request(KafkaConnection.CLIENT_ID, requestId, payloads) + request = KafkaProtocol.encode_fetch_request(KafkaClient.CLIENT_ID, requestId, payloads) # Send the request conn.send(requestId, request) response = conn.recv(requestId) @@ -501,24 +676,87 @@ class KafkaConnection(object): out.append(fetch_response) return out + def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None): + conn = self.conns.values()[0] # Just get the first one in the list + requestId = self.next_id() + request = KafkaProtocol.encode_offset_commit_request(KafkaClient.CLIENT_ID, requestId, group, payloads) + conn.send(requestId, request) + response = conn.recv(requestId) + out = [] + for offset_commit_response in KafkaProtocol.decode_offset_commit_response(response): + if fail_on_error == True and offset_commit_response.error != 0: + raise Exception("OffsetCommitRequest failed with errorcode=%s", offset_commit_response.error) + if callback is not None: + out.append(callback(offset_commit_response)) + else: + out.append(offset_commit_response) + return out + + def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None): + conn = self.conns.values()[0] # Just get the first one in the list + requestId = self.next_id() + request = KafkaProtocol.encode_offset_fetch_request(KafkaClient.CLIENT_ID, requestId, group, payloads) + conn.send(requestId, request) + response = conn.recv(requestId) + out = [] + for offset_fetch_response in KafkaProtocol.decode_offset_fetch_response(response): + if fail_on_error == True and offset_fetch_response.error != 0: + raise Exception("OffsetFetchRequest failed with errorcode=%s", offset_fetch_response.error) + if callback is not None: + out.append(callback(offset_fetch_response)) + else: + out.append(offset_fetch_response) + return out + + + if __name__ == "__main__": # Bootstrap connection - conn = KafkaConnection("localhost", 9092) + conn = KafkaClient("localhost", 9092) # Create some Messages messages = (KafkaProtocol.create_gzip_message("GZIPPed"), KafkaProtocol.create_message("not-gzipped")) # Create a ProduceRequest - produce = ProduceRequest("foo5", 0, messages) + produce = ProduceRequest(topic="foo5", partition=0, messages=messages) # Send the ProduceRequest - produce_resp = conn.send_produce_request([produce]) + produce_resp = conn.send_produce_request(payloads=[produce]) # Check for errors for resp in produce_resp: if resp.error != 0: raise Exception("ProduceRequest failed with errorcode=%d", resp.error) print resp - + + # Offset commit/fetch + #conn.send_offset_commit_request(group="group", payloads=[OffsetCommitRequest("topic-1", 0, 42, "METADATA?")]) + #conn.send_offset_fetch_request(group="group", payloads=[OffsetFetchRequest("topic-1", 0)]) + + print conn.send_offset_fetch_request(group="group", payloads=[OffsetFetchRequest("foo5", 0)]) + offset = 0 + done = False + while not done: + print offset + for resp in conn.send_fetch_request(payloads=[FetchRequest(topic="foo5", partition=0, offset=offset, max_bytes=4096)]): + i = 0 + for msg in resp.messages: + print conn.send_offset_commit_request(group="group", payloads=[OffsetCommitRequest("foo5", 0, offset, "")]) + print msg, offset + offset = msg.offset+1 + i += 1 + if i == 0: + raise StopIteration("no more messages") + +class Consumer(object): + def __init__(self, conn): + self._conn = conn + + + +class Producer(object): + pass + + |