From a0be374ce36f00ebb11a1e211ecee715999d9e8b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 23 Nov 2015 04:24:28 +0800 Subject: Basic 0.8 Request protocol classes, with encoding only --- kafka/protocol/api.py | 355 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 355 insertions(+) create mode 100644 kafka/protocol/api.py (limited to 'kafka/protocol/api.py') diff --git a/kafka/protocol/api.py b/kafka/protocol/api.py new file mode 100644 index 0000000..cbaf828 --- /dev/null +++ b/kafka/protocol/api.py @@ -0,0 +1,355 @@ +import struct + +from .types import ( + Int8, Int16, Int32, Int64, Bytes, String, Array +) +from ..util import crc32 + + +class Message(object): + MAGIC_BYTE = 0 + __slots__ = ('magic', 'attributes', 'key', 'value') + + def __init__(self, value, key=None, magic=0, attributes=0): + self.magic = magic + self.attributes = attributes + self.key = key + self.value = value + + def encode(self): + message = ( + Int8.encode(self.magic) + + Int8.encode(self.attributes) + + Bytes.encode(self.key) + + Bytes.encode(self.value) + ) + return ( + struct.pack('>I', crc32(message)) + + message + ) + + +class MessageSet(object): + + @staticmethod + def _encode_one(message): + encoded = message.encode() + return (Int64.encode(0) + Int32.encode(len(encoded)) + encoded) + + @staticmethod + def encode(messages): + return b''.join(map(MessageSet._encode_one, messages)) + + +class AbstractRequestResponse(object): + @classmethod + def encode(cls, message): + return Int32.encode(len(message)) + message + + +class AbstractRequest(AbstractRequestResponse): + @classmethod + def encode(cls, request, correlation_id=0, client_id='kafka-python'): + request = (Int16.encode(cls.API_KEY) + + Int16.encode(cls.API_VERSION) + + Int32.encode(correlation_id) + + String.encode(client_id) + + request) + return super(AbstractRequest, cls).encode(request) + + +class ProduceRequest(AbstractRequest): + API_KEY = 0 + API_VERSION = 0 + __slots__ = ('required_acks', 'timeout', 'topic_partition_messages', 'compression') + + def __init__(self, topic_partition_messages, + required_acks=-1, timeout=1000, compression=None): + """ + topic_partition_messages is a dict of dicts of lists (of messages) + { + "TopicFoo": { + 0: [ + Message('foo'), + Message('bar') + ], + 1: [ + Message('fizz'), + Message('buzz') + ] + } + } + """ + self.required_acks = required_acks + self.timeout = timeout + self.topic_partition_messages = topic_partition_messages + self.compression = compression + + @staticmethod + def _encode_messages(partition, messages, compression): + message_set = MessageSet.encode(messages) + + if compression: + # compress message_set data and re-encode as single message + # then wrap single compressed message in a new message_set + pass + + return (Int32.encode(partition) + + Int32.encode(len(message_set)) + + message_set) + + def encode(self): + request = ( + Int16.encode(self.required_acks) + + Int32.encode(self.timeout) + + Array.encode([( + String.encode(topic) + + Array.encode([ + self._encode_messages(partition, messages, self.compression) + for partition, messages in partitions.iteritems()]) + ) for topic, partitions in self.topic_partition_messages.iteritems()]) + ) + return super(ProduceRequest, self).encode(request) + + +class FetchRequest(AbstractRequest): + API_KEY = 1 + API_VERSION = 0 + __slots__ = ('replica_id', 'max_wait_time', 'min_bytes', 'topic_partition_offsets') + + def __init__(self, topic_partition_offsets, + max_wait_time=-1, min_bytes=0, replica_id=-1): + """ + topic_partition_offsets is a dict of dicts of (offset, max_bytes) tuples + { + "TopicFoo": { + 0: (1234, 1048576), + 1: (1324, 1048576) + } + } + """ + self.topic_partition_offsets = topic_partition_offsets + self.max_wait_time = max_wait_time + self.min_bytes = min_bytes + self.replica_id = replica_id + + def encode(self): + request = ( + Int32.encode(self.replica_id) + + Int32.encode(self.max_wait_time) + + Int32.encode(self.min_bytes) + + Array.encode([( + String.encode(topic) + + Array.encode([( + Int32.encode(partition) + + Int64.encode(offset) + + Int32.encode(max_bytes) + ) for partition, (offset, max_bytes) in partitions.iteritems()]) + ) for topic, partitions in self.topic_partition_offsets.iteritems()])) + return super(FetchRequest, self).encode(request) + + +class OffsetRequest(AbstractRequest): + API_KEY = 2 + API_VERSION = 0 + __slots__ = ('replica_id', 'topic_partition_times') + + def __init__(self, topic_partition_times, replica_id=-1): + """ + topic_partition_times is a dict of dicts of (time, max_offsets) tuples + { + "TopicFoo": { + 0: (-1, 1), + 1: (-1, 1) + } + } + """ + self.topic_partition_times = topic_partition_times + self.replica_id = replica_id + + def encode(self): + request = ( + Int32.encode(self.replica_id) + + Array.encode([( + String.encode(topic) + + Array.encode([( + Int32.encode(partition) + + Int64.encode(time) + + Int32.encode(max_offsets) + ) for partition, (time, max_offsets) in partitions.iteritems()]) + ) for topic, partitions in self.topic_partition_times.iteritems()])) + return super(OffsetRequest, self).encode(request) + + +class MetadataRequest(AbstractRequest): + API_KEY = 3 + API_VERSION = 0 + __slots__ = ('topics') + + def __init__(self, *topics): + self.topics = topics + + def encode(self): + request = Array.encode(map(String.encode, self.topics)) + return super(MetadataRequest, self).encode(request) + + +# Non-user facing control APIs 4-7 + + +class OffsetCommitRequestV0(AbstractRequest): + API_KEY = 8 + API_VERSION = 0 + __slots__ = ('consumer_group_id', 'offsets') + + def __init__(self, consumer_group_id, offsets): + """ + offsets is a dict of dicts of (offset, metadata) tuples + { + "TopicFoo": { + 0: (1234, ""), + 1: (1243, "") + } + } + """ + self.consumer_group_id = consumer_group_id + self.offsets = offsets + + def encode(self): + request = ( + String.encode(self.consumer_group_id) + + Array.encode([( + String.encode(topic) + + Array.encode([( + Int32.encode(partition) + + Int64.encode(offset) + + String.encode(metadata) + ) for partition, (offset, metadata) in partitions.iteritems()]) + ) for topic, partitions in self.offsets.iteritems()])) + return super(OffsetCommitRequestV0, self).encode(request) + + +class OffsetCommitRequestV1(AbstractRequest): + API_KEY = 8 + API_VERSION = 1 + __slots__ = ('consumer_group_id', 'consumer_group_generation_id', + 'consumer_id', 'offsets') + + def __init__(self, consumer_group_id, consumer_group_generation_id, + consumer_id, offsets): + """ + offsets is a dict of dicts of (offset, timestamp, metadata) tuples + { + "TopicFoo": { + 0: (1234, 1448198827, ""), + 1: (1243, 1448198827, "") + } + } + """ + self.consumer_group_id = consumer_group_id + self.consumer_group_generation_id = consumer_group_generation_id + self.consumer_id = consumer_id + self.offsets = offsets + + def encode(self): + request = ( + String.encode(self.consumer_group_id) + + Int32.encode(self.consumer_group_generation_id) + + String.encode(self.consumer_id) + + Array.encode([( + String.encode(topic) + + Array.encode([( + Int32.encode(partition) + + Int64.encode(offset) + + Int64.encode(timestamp) + + String.encode(metadata) + ) for partition, (offset, timestamp, metadata) in partitions.iteritems()]) + ) for topic, partitions in self.offsets.iteritems()])) + return super(OffsetCommitRequestV1, self).encode(request) + + +class OffsetCommitRequest(AbstractRequest): + API_KEY = 8 + API_VERSION = 2 + __slots__ = ('consumer_group_id', 'consumer_group_generation_id', + 'consumer_id', 'retention_time', 'offsets') + + def __init__(self, consumer_group_id, consumer_group_generation_id, + consumer_id, retention_time, offsets): + """ + offsets is a dict of dicts of (offset, metadata) tuples + { + "TopicFoo": { + 0: (1234, ""), + 1: (1243, "") + } + } + """ + self.consumer_group_id = consumer_group_id + self.consumer_group_generation_id = consumer_group_generation_id + self.consumer_id = consumer_id + self.retention_time = retention_time + self.offsets = offsets + + def encode(self): + request = ( + String.encode(self.consumer_group_id) + + Int32.encode(self.consumer_group_generation_id) + + String.encode(self.consumer_id) + + Int64.encode(self.retention_time) + + Array.encode([( + String.encode(topic) + + Array.encode([( + Int32.encode(partition) + + Int64.encode(offset) + + String.encode(metadata) + ) for partition, (offset, timestamp, metadata) in partitions.iteritems()]) + ) for topic, partitions in self.offsets.iteritems()])) + return super(OffsetCommitRequest, self).encode(request) + + +class OffsetFetchRequestV0(AbstractRequest): + API_KEY = 9 + API_VERSION = 0 + __slots__ = ('consumer_group', 'topic_partitions') + + def __init__(self, consumer_group, topic_partitions): + """ + offsets is a dict of lists of partition ints + { + "TopicFoo": [0, 1, 2] + } + """ + self.consumer_group = consumer_group + self.topic_partitions = topic_partitions + + def encode(self): + request = ( + String.encode(self.consumer_group) + + Array.encode([( + String.encode(topic) + + Array.encode([Int32.encode(partition) for partition in partitions]) + ) for topic, partitions in self.topic_partitions.iteritems()]) + ) + return super(OffsetFetchRequest, self).encode(request) + + +class OffsetFetchRequest(OffsetFetchRequestV0): + """Identical to V0, but offsets fetched from kafka storage not zookeeper""" + API_VERSION = 1 + + +class GroupCoordinatorRequest(AbstractRequest): + API_KEY = 10 + API_VERSION = 0 + __slots__ = ('group_id',) + + def __init__(self, group_id): + self.group_id = group_id + + def encode(self): + request = String.encode(self.group_id) + return super(GroupCoordinatorRequest, self).encode(request) + + + -- cgit v1.2.1 From 3f65ff4ab93f2282af442e6bb5e54e3af1d602db Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 23 Nov 2015 05:26:13 +0800 Subject: Move ProduceRequest to kafka.protocol.produce --- kafka/protocol/api.py | 54 --------------------------------------------------- 1 file changed, 54 deletions(-) (limited to 'kafka/protocol/api.py') diff --git a/kafka/protocol/api.py b/kafka/protocol/api.py index cbaf828..8ea820b 100644 --- a/kafka/protocol/api.py +++ b/kafka/protocol/api.py @@ -58,60 +58,6 @@ class AbstractRequest(AbstractRequestResponse): return super(AbstractRequest, cls).encode(request) -class ProduceRequest(AbstractRequest): - API_KEY = 0 - API_VERSION = 0 - __slots__ = ('required_acks', 'timeout', 'topic_partition_messages', 'compression') - - def __init__(self, topic_partition_messages, - required_acks=-1, timeout=1000, compression=None): - """ - topic_partition_messages is a dict of dicts of lists (of messages) - { - "TopicFoo": { - 0: [ - Message('foo'), - Message('bar') - ], - 1: [ - Message('fizz'), - Message('buzz') - ] - } - } - """ - self.required_acks = required_acks - self.timeout = timeout - self.topic_partition_messages = topic_partition_messages - self.compression = compression - - @staticmethod - def _encode_messages(partition, messages, compression): - message_set = MessageSet.encode(messages) - - if compression: - # compress message_set data and re-encode as single message - # then wrap single compressed message in a new message_set - pass - - return (Int32.encode(partition) + - Int32.encode(len(message_set)) + - message_set) - - def encode(self): - request = ( - Int16.encode(self.required_acks) + - Int32.encode(self.timeout) + - Array.encode([( - String.encode(topic) + - Array.encode([ - self._encode_messages(partition, messages, self.compression) - for partition, messages in partitions.iteritems()]) - ) for topic, partitions in self.topic_partition_messages.iteritems()]) - ) - return super(ProduceRequest, self).encode(request) - - class FetchRequest(AbstractRequest): API_KEY = 1 API_VERSION = 0 -- cgit v1.2.1 From a85e09df89a43de5b659a0fa4ed35bec37c60e04 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 28 Nov 2015 19:41:06 +0800 Subject: Rework protocol type definition: AbstractType, Schema, Struct --- kafka/protocol/api.py | 309 ++------------------------------------------------ 1 file changed, 12 insertions(+), 297 deletions(-) (limited to 'kafka/protocol/api.py') diff --git a/kafka/protocol/api.py b/kafka/protocol/api.py index 8ea820b..0c23437 100644 --- a/kafka/protocol/api.py +++ b/kafka/protocol/api.py @@ -1,301 +1,16 @@ -import struct +from .struct import Struct +from .types import Int16, Int32, String, Schema -from .types import ( - Int8, Int16, Int32, Int64, Bytes, String, Array -) -from ..util import crc32 +class RequestHeader(Struct): + SCHEMA = Schema( + ('api_key', Int16), + ('api_version', Int16), + ('correlation_id', Int32), + ('client_id', String('utf-8')) + ) -class Message(object): - MAGIC_BYTE = 0 - __slots__ = ('magic', 'attributes', 'key', 'value') - - def __init__(self, value, key=None, magic=0, attributes=0): - self.magic = magic - self.attributes = attributes - self.key = key - self.value = value - - def encode(self): - message = ( - Int8.encode(self.magic) + - Int8.encode(self.attributes) + - Bytes.encode(self.key) + - Bytes.encode(self.value) + def __init__(self, request, correlation_id=0, client_id='kafka-python'): + super(RequestHeader, self).__init__( + request.API_KEY, request.API_VERSION, correlation_id, client_id ) - return ( - struct.pack('>I', crc32(message)) + - message - ) - - -class MessageSet(object): - - @staticmethod - def _encode_one(message): - encoded = message.encode() - return (Int64.encode(0) + Int32.encode(len(encoded)) + encoded) - - @staticmethod - def encode(messages): - return b''.join(map(MessageSet._encode_one, messages)) - - -class AbstractRequestResponse(object): - @classmethod - def encode(cls, message): - return Int32.encode(len(message)) + message - - -class AbstractRequest(AbstractRequestResponse): - @classmethod - def encode(cls, request, correlation_id=0, client_id='kafka-python'): - request = (Int16.encode(cls.API_KEY) + - Int16.encode(cls.API_VERSION) + - Int32.encode(correlation_id) + - String.encode(client_id) + - request) - return super(AbstractRequest, cls).encode(request) - - -class FetchRequest(AbstractRequest): - API_KEY = 1 - API_VERSION = 0 - __slots__ = ('replica_id', 'max_wait_time', 'min_bytes', 'topic_partition_offsets') - - def __init__(self, topic_partition_offsets, - max_wait_time=-1, min_bytes=0, replica_id=-1): - """ - topic_partition_offsets is a dict of dicts of (offset, max_bytes) tuples - { - "TopicFoo": { - 0: (1234, 1048576), - 1: (1324, 1048576) - } - } - """ - self.topic_partition_offsets = topic_partition_offsets - self.max_wait_time = max_wait_time - self.min_bytes = min_bytes - self.replica_id = replica_id - - def encode(self): - request = ( - Int32.encode(self.replica_id) + - Int32.encode(self.max_wait_time) + - Int32.encode(self.min_bytes) + - Array.encode([( - String.encode(topic) + - Array.encode([( - Int32.encode(partition) + - Int64.encode(offset) + - Int32.encode(max_bytes) - ) for partition, (offset, max_bytes) in partitions.iteritems()]) - ) for topic, partitions in self.topic_partition_offsets.iteritems()])) - return super(FetchRequest, self).encode(request) - - -class OffsetRequest(AbstractRequest): - API_KEY = 2 - API_VERSION = 0 - __slots__ = ('replica_id', 'topic_partition_times') - - def __init__(self, topic_partition_times, replica_id=-1): - """ - topic_partition_times is a dict of dicts of (time, max_offsets) tuples - { - "TopicFoo": { - 0: (-1, 1), - 1: (-1, 1) - } - } - """ - self.topic_partition_times = topic_partition_times - self.replica_id = replica_id - - def encode(self): - request = ( - Int32.encode(self.replica_id) + - Array.encode([( - String.encode(topic) + - Array.encode([( - Int32.encode(partition) + - Int64.encode(time) + - Int32.encode(max_offsets) - ) for partition, (time, max_offsets) in partitions.iteritems()]) - ) for topic, partitions in self.topic_partition_times.iteritems()])) - return super(OffsetRequest, self).encode(request) - - -class MetadataRequest(AbstractRequest): - API_KEY = 3 - API_VERSION = 0 - __slots__ = ('topics') - - def __init__(self, *topics): - self.topics = topics - - def encode(self): - request = Array.encode(map(String.encode, self.topics)) - return super(MetadataRequest, self).encode(request) - - -# Non-user facing control APIs 4-7 - - -class OffsetCommitRequestV0(AbstractRequest): - API_KEY = 8 - API_VERSION = 0 - __slots__ = ('consumer_group_id', 'offsets') - - def __init__(self, consumer_group_id, offsets): - """ - offsets is a dict of dicts of (offset, metadata) tuples - { - "TopicFoo": { - 0: (1234, ""), - 1: (1243, "") - } - } - """ - self.consumer_group_id = consumer_group_id - self.offsets = offsets - - def encode(self): - request = ( - String.encode(self.consumer_group_id) + - Array.encode([( - String.encode(topic) + - Array.encode([( - Int32.encode(partition) + - Int64.encode(offset) + - String.encode(metadata) - ) for partition, (offset, metadata) in partitions.iteritems()]) - ) for topic, partitions in self.offsets.iteritems()])) - return super(OffsetCommitRequestV0, self).encode(request) - - -class OffsetCommitRequestV1(AbstractRequest): - API_KEY = 8 - API_VERSION = 1 - __slots__ = ('consumer_group_id', 'consumer_group_generation_id', - 'consumer_id', 'offsets') - - def __init__(self, consumer_group_id, consumer_group_generation_id, - consumer_id, offsets): - """ - offsets is a dict of dicts of (offset, timestamp, metadata) tuples - { - "TopicFoo": { - 0: (1234, 1448198827, ""), - 1: (1243, 1448198827, "") - } - } - """ - self.consumer_group_id = consumer_group_id - self.consumer_group_generation_id = consumer_group_generation_id - self.consumer_id = consumer_id - self.offsets = offsets - - def encode(self): - request = ( - String.encode(self.consumer_group_id) + - Int32.encode(self.consumer_group_generation_id) + - String.encode(self.consumer_id) + - Array.encode([( - String.encode(topic) + - Array.encode([( - Int32.encode(partition) + - Int64.encode(offset) + - Int64.encode(timestamp) + - String.encode(metadata) - ) for partition, (offset, timestamp, metadata) in partitions.iteritems()]) - ) for topic, partitions in self.offsets.iteritems()])) - return super(OffsetCommitRequestV1, self).encode(request) - - -class OffsetCommitRequest(AbstractRequest): - API_KEY = 8 - API_VERSION = 2 - __slots__ = ('consumer_group_id', 'consumer_group_generation_id', - 'consumer_id', 'retention_time', 'offsets') - - def __init__(self, consumer_group_id, consumer_group_generation_id, - consumer_id, retention_time, offsets): - """ - offsets is a dict of dicts of (offset, metadata) tuples - { - "TopicFoo": { - 0: (1234, ""), - 1: (1243, "") - } - } - """ - self.consumer_group_id = consumer_group_id - self.consumer_group_generation_id = consumer_group_generation_id - self.consumer_id = consumer_id - self.retention_time = retention_time - self.offsets = offsets - - def encode(self): - request = ( - String.encode(self.consumer_group_id) + - Int32.encode(self.consumer_group_generation_id) + - String.encode(self.consumer_id) + - Int64.encode(self.retention_time) + - Array.encode([( - String.encode(topic) + - Array.encode([( - Int32.encode(partition) + - Int64.encode(offset) + - String.encode(metadata) - ) for partition, (offset, timestamp, metadata) in partitions.iteritems()]) - ) for topic, partitions in self.offsets.iteritems()])) - return super(OffsetCommitRequest, self).encode(request) - - -class OffsetFetchRequestV0(AbstractRequest): - API_KEY = 9 - API_VERSION = 0 - __slots__ = ('consumer_group', 'topic_partitions') - - def __init__(self, consumer_group, topic_partitions): - """ - offsets is a dict of lists of partition ints - { - "TopicFoo": [0, 1, 2] - } - """ - self.consumer_group = consumer_group - self.topic_partitions = topic_partitions - - def encode(self): - request = ( - String.encode(self.consumer_group) + - Array.encode([( - String.encode(topic) + - Array.encode([Int32.encode(partition) for partition in partitions]) - ) for topic, partitions in self.topic_partitions.iteritems()]) - ) - return super(OffsetFetchRequest, self).encode(request) - - -class OffsetFetchRequest(OffsetFetchRequestV0): - """Identical to V0, but offsets fetched from kafka storage not zookeeper""" - API_VERSION = 1 - - -class GroupCoordinatorRequest(AbstractRequest): - API_KEY = 10 - API_VERSION = 0 - __slots__ = ('group_id',) - - def __init__(self, group_id): - self.group_id = group_id - - def encode(self): - request = String.encode(self.group_id) - return super(GroupCoordinatorRequest, self).encode(request) - - - -- cgit v1.2.1