diff options
| author | Dana Powers <dana.powers@rd.io> | 2015-11-23 04:24:28 +0800 | 
|---|---|---|
| committer | Zack Dever <zack.dever@rd.io> | 2015-12-04 11:25:39 -0800 | 
| commit | a0be374ce36f00ebb11a1e211ecee715999d9e8b (patch) | |
| tree | 1731c1a01f765a618448426200e2fb95445b0e39 /kafka/protocol/api.py | |
| parent | f6edeafac3f42f5407629dcfb1ddd4357dbf5445 (diff) | |
| download | kafka-python-a0be374ce36f00ebb11a1e211ecee715999d9e8b.tar.gz | |
Basic 0.8 Request protocol classes, with encoding only
Diffstat (limited to 'kafka/protocol/api.py')
| -rw-r--r-- | kafka/protocol/api.py | 355 | 
1 files changed, 355 insertions, 0 deletions
| diff --git a/kafka/protocol/api.py b/kafka/protocol/api.py new file mode 100644 index 0000000..cbaf828 --- /dev/null +++ b/kafka/protocol/api.py @@ -0,0 +1,355 @@ +import struct + +from .types import ( +    Int8, Int16, Int32, Int64, Bytes, String, Array +) +from ..util import crc32 + + +class Message(object): +    MAGIC_BYTE = 0 +    __slots__ = ('magic', 'attributes', 'key', 'value') + +    def __init__(self, value, key=None, magic=0, attributes=0): +        self.magic = magic +        self.attributes = attributes +        self.key = key +        self.value = value + +    def encode(self): +        message = ( +            Int8.encode(self.magic) + +            Int8.encode(self.attributes) + +            Bytes.encode(self.key) + +            Bytes.encode(self.value) +        ) +        return ( +            struct.pack('>I', crc32(message)) + +            message +        ) + + +class MessageSet(object): + +    @staticmethod +    def _encode_one(message): +        encoded = message.encode() +        return (Int64.encode(0) + Int32.encode(len(encoded)) + encoded) + +    @staticmethod +    def encode(messages): +        return b''.join(map(MessageSet._encode_one, messages)) + + +class AbstractRequestResponse(object): +    @classmethod +    def encode(cls, message): +        return Int32.encode(len(message)) + message + + +class AbstractRequest(AbstractRequestResponse): +    @classmethod +    def encode(cls, request, correlation_id=0, client_id='kafka-python'): +        request = (Int16.encode(cls.API_KEY) + +                   Int16.encode(cls.API_VERSION) + +                   Int32.encode(correlation_id) + +                   String.encode(client_id) + +                   request) +        return super(AbstractRequest, cls).encode(request) + + +class ProduceRequest(AbstractRequest): +    API_KEY = 0 +    API_VERSION = 0 +    __slots__ = ('required_acks', 'timeout', 'topic_partition_messages', 'compression') + +    def __init__(self, topic_partition_messages, +                 required_acks=-1, timeout=1000, compression=None): +        """ +        topic_partition_messages is a dict of dicts of lists (of messages) +        { +          "TopicFoo": { +            0: [ +              Message('foo'), +              Message('bar') +            ], +            1: [ +              Message('fizz'), +              Message('buzz') +            ] +          } +        } +        """ +        self.required_acks = required_acks +        self.timeout = timeout +        self.topic_partition_messages = topic_partition_messages +        self.compression = compression + +    @staticmethod +    def _encode_messages(partition, messages, compression): +        message_set = MessageSet.encode(messages) + +        if compression: +            # compress message_set data and re-encode as single message +            # then wrap single compressed message in a new message_set +            pass + +        return (Int32.encode(partition) + +                Int32.encode(len(message_set)) + +                message_set) + +    def encode(self): +        request = ( +            Int16.encode(self.required_acks) + +            Int32.encode(self.timeout) + +            Array.encode([( +                String.encode(topic) + +                Array.encode([ +                    self._encode_messages(partition, messages, self.compression) +                    for partition, messages in partitions.iteritems()]) +            ) for topic, partitions in self.topic_partition_messages.iteritems()]) +        ) +        return super(ProduceRequest, self).encode(request) + + +class FetchRequest(AbstractRequest): +    API_KEY = 1 +    API_VERSION = 0 +    __slots__ = ('replica_id', 'max_wait_time', 'min_bytes', 'topic_partition_offsets') + +    def __init__(self, topic_partition_offsets, +                 max_wait_time=-1, min_bytes=0, replica_id=-1): +        """ +        topic_partition_offsets is a dict of dicts of (offset, max_bytes) tuples +        { +          "TopicFoo": { +            0: (1234, 1048576), +            1: (1324, 1048576) +          } +        } +        """ +        self.topic_partition_offsets = topic_partition_offsets +        self.max_wait_time = max_wait_time +        self.min_bytes = min_bytes +        self.replica_id = replica_id + +    def encode(self): +        request = ( +            Int32.encode(self.replica_id) + +            Int32.encode(self.max_wait_time) + +            Int32.encode(self.min_bytes) + +            Array.encode([( +                String.encode(topic) + +                Array.encode([( +                    Int32.encode(partition) + +                    Int64.encode(offset) + +                    Int32.encode(max_bytes) +                ) for partition, (offset, max_bytes) in partitions.iteritems()]) +            ) for topic, partitions in self.topic_partition_offsets.iteritems()])) +        return super(FetchRequest, self).encode(request) + + +class OffsetRequest(AbstractRequest): +    API_KEY = 2 +    API_VERSION = 0 +    __slots__ = ('replica_id', 'topic_partition_times') + +    def __init__(self, topic_partition_times, replica_id=-1): +        """ +        topic_partition_times is a dict of dicts of (time, max_offsets) tuples +        { +          "TopicFoo": { +            0: (-1, 1), +            1: (-1, 1) +          } +        } +        """ +        self.topic_partition_times = topic_partition_times +        self.replica_id = replica_id + +    def encode(self): +        request = ( +            Int32.encode(self.replica_id) + +            Array.encode([( +                String.encode(topic) + +                Array.encode([( +                    Int32.encode(partition) + +                    Int64.encode(time) + +                    Int32.encode(max_offsets) +                ) for partition, (time, max_offsets) in partitions.iteritems()]) +            ) for topic, partitions in self.topic_partition_times.iteritems()])) +        return super(OffsetRequest, self).encode(request) + + +class MetadataRequest(AbstractRequest): +    API_KEY = 3 +    API_VERSION = 0 +    __slots__ = ('topics') + +    def __init__(self, *topics): +        self.topics = topics + +    def encode(self): +        request = Array.encode(map(String.encode, self.topics)) +        return super(MetadataRequest, self).encode(request) + + +# Non-user facing control APIs 4-7 + + +class OffsetCommitRequestV0(AbstractRequest): +    API_KEY = 8 +    API_VERSION = 0 +    __slots__ = ('consumer_group_id', 'offsets') + +    def __init__(self, consumer_group_id, offsets): +        """ +        offsets is a dict of dicts of (offset, metadata) tuples +        { +          "TopicFoo": { +            0: (1234, ""), +            1: (1243, "") +          } +        } +        """ +        self.consumer_group_id = consumer_group_id +        self.offsets = offsets + +    def encode(self): +        request = ( +            String.encode(self.consumer_group_id) + +            Array.encode([( +                String.encode(topic) + +                Array.encode([( +                    Int32.encode(partition) + +                    Int64.encode(offset) + +                    String.encode(metadata) +                ) for partition, (offset, metadata) in partitions.iteritems()]) +            ) for topic, partitions in self.offsets.iteritems()])) +        return super(OffsetCommitRequestV0, self).encode(request) + + +class OffsetCommitRequestV1(AbstractRequest): +    API_KEY = 8 +    API_VERSION = 1 +    __slots__ = ('consumer_group_id', 'consumer_group_generation_id', +                 'consumer_id', 'offsets') + +    def __init__(self, consumer_group_id, consumer_group_generation_id, +                 consumer_id, offsets): +        """ +        offsets is a dict of dicts of (offset, timestamp, metadata) tuples +        { +          "TopicFoo": { +            0: (1234, 1448198827, ""), +            1: (1243, 1448198827, "") +          } +        } +        """ +        self.consumer_group_id = consumer_group_id +        self.consumer_group_generation_id = consumer_group_generation_id +        self.consumer_id = consumer_id +        self.offsets = offsets + +    def encode(self): +        request = ( +            String.encode(self.consumer_group_id) + +            Int32.encode(self.consumer_group_generation_id) + +            String.encode(self.consumer_id) + +            Array.encode([( +                String.encode(topic) + +                Array.encode([( +                    Int32.encode(partition) + +                    Int64.encode(offset) + +                    Int64.encode(timestamp) + +                    String.encode(metadata) +                ) for partition, (offset, timestamp, metadata) in partitions.iteritems()]) +            ) for topic, partitions in self.offsets.iteritems()])) +        return super(OffsetCommitRequestV1, self).encode(request) + + +class OffsetCommitRequest(AbstractRequest): +    API_KEY = 8 +    API_VERSION = 2 +    __slots__ = ('consumer_group_id', 'consumer_group_generation_id', +                 'consumer_id', 'retention_time', 'offsets') + +    def __init__(self, consumer_group_id, consumer_group_generation_id, +                 consumer_id, retention_time, offsets): +        """ +        offsets is a dict of dicts of (offset, metadata) tuples +        { +          "TopicFoo": { +            0: (1234, ""), +            1: (1243, "") +          } +        } +        """ +        self.consumer_group_id = consumer_group_id +        self.consumer_group_generation_id = consumer_group_generation_id +        self.consumer_id = consumer_id +        self.retention_time = retention_time +        self.offsets = offsets + +    def encode(self): +        request = ( +            String.encode(self.consumer_group_id) + +            Int32.encode(self.consumer_group_generation_id) + +            String.encode(self.consumer_id) + +            Int64.encode(self.retention_time) + +            Array.encode([( +                String.encode(topic) + +                Array.encode([( +                    Int32.encode(partition) + +                    Int64.encode(offset) + +                    String.encode(metadata) +                ) for partition, (offset, timestamp, metadata) in partitions.iteritems()]) +            ) for topic, partitions in self.offsets.iteritems()])) +        return super(OffsetCommitRequest, self).encode(request) + + +class OffsetFetchRequestV0(AbstractRequest): +    API_KEY = 9 +    API_VERSION = 0 +    __slots__ = ('consumer_group', 'topic_partitions') + +    def __init__(self, consumer_group, topic_partitions): +        """ +        offsets is a dict of lists of partition ints +        { +          "TopicFoo": [0, 1, 2] +        } +        """ +        self.consumer_group = consumer_group +        self.topic_partitions = topic_partitions + +    def encode(self): +        request = ( +            String.encode(self.consumer_group) + +            Array.encode([( +                String.encode(topic) + +                Array.encode([Int32.encode(partition) for partition in partitions]) +            ) for topic, partitions in self.topic_partitions.iteritems()]) +        ) +        return super(OffsetFetchRequest, self).encode(request) + + +class OffsetFetchRequest(OffsetFetchRequestV0): +    """Identical to V0, but offsets fetched from kafka storage not zookeeper""" +    API_VERSION = 1 + + +class GroupCoordinatorRequest(AbstractRequest): +    API_KEY = 10 +    API_VERSION = 0 +    __slots__ = ('group_id',) + +    def __init__(self, group_id): +        self.group_id = group_id + +    def encode(self): +        request = String.encode(self.group_id) +        return super(GroupCoordinatorRequest, self).encode(request) + + + | 
