diff options
Diffstat (limited to 'kafka/protocol/api.py')
-rw-r--r-- | kafka/protocol/api.py | 309 |
1 files changed, 12 insertions, 297 deletions
diff --git a/kafka/protocol/api.py b/kafka/protocol/api.py index 8ea820b..0c23437 100644 --- a/kafka/protocol/api.py +++ b/kafka/protocol/api.py @@ -1,301 +1,16 @@ -import struct +from .struct import Struct +from .types import Int16, Int32, String, Schema -from .types import ( - Int8, Int16, Int32, Int64, Bytes, String, Array -) -from ..util import crc32 +class RequestHeader(Struct): + SCHEMA = Schema( + ('api_key', Int16), + ('api_version', Int16), + ('correlation_id', Int32), + ('client_id', String('utf-8')) + ) -class Message(object): - MAGIC_BYTE = 0 - __slots__ = ('magic', 'attributes', 'key', 'value') - - def __init__(self, value, key=None, magic=0, attributes=0): - self.magic = magic - self.attributes = attributes - self.key = key - self.value = value - - def encode(self): - message = ( - Int8.encode(self.magic) + - Int8.encode(self.attributes) + - Bytes.encode(self.key) + - Bytes.encode(self.value) + def __init__(self, request, correlation_id=0, client_id='kafka-python'): + super(RequestHeader, self).__init__( + request.API_KEY, request.API_VERSION, correlation_id, client_id ) - return ( - struct.pack('>I', crc32(message)) + - message - ) - - -class MessageSet(object): - - @staticmethod - def _encode_one(message): - encoded = message.encode() - return (Int64.encode(0) + Int32.encode(len(encoded)) + encoded) - - @staticmethod - def encode(messages): - return b''.join(map(MessageSet._encode_one, messages)) - - -class AbstractRequestResponse(object): - @classmethod - def encode(cls, message): - return Int32.encode(len(message)) + message - - -class AbstractRequest(AbstractRequestResponse): - @classmethod - def encode(cls, request, correlation_id=0, client_id='kafka-python'): - request = (Int16.encode(cls.API_KEY) + - Int16.encode(cls.API_VERSION) + - Int32.encode(correlation_id) + - String.encode(client_id) + - request) - return super(AbstractRequest, cls).encode(request) - - -class FetchRequest(AbstractRequest): - API_KEY = 1 - API_VERSION = 0 - __slots__ = ('replica_id', 'max_wait_time', 'min_bytes', 'topic_partition_offsets') - - def __init__(self, topic_partition_offsets, - max_wait_time=-1, min_bytes=0, replica_id=-1): - """ - topic_partition_offsets is a dict of dicts of (offset, max_bytes) tuples - { - "TopicFoo": { - 0: (1234, 1048576), - 1: (1324, 1048576) - } - } - """ - self.topic_partition_offsets = topic_partition_offsets - self.max_wait_time = max_wait_time - self.min_bytes = min_bytes - self.replica_id = replica_id - - def encode(self): - request = ( - Int32.encode(self.replica_id) + - Int32.encode(self.max_wait_time) + - Int32.encode(self.min_bytes) + - Array.encode([( - String.encode(topic) + - Array.encode([( - Int32.encode(partition) + - Int64.encode(offset) + - Int32.encode(max_bytes) - ) for partition, (offset, max_bytes) in partitions.iteritems()]) - ) for topic, partitions in self.topic_partition_offsets.iteritems()])) - return super(FetchRequest, self).encode(request) - - -class OffsetRequest(AbstractRequest): - API_KEY = 2 - API_VERSION = 0 - __slots__ = ('replica_id', 'topic_partition_times') - - def __init__(self, topic_partition_times, replica_id=-1): - """ - topic_partition_times is a dict of dicts of (time, max_offsets) tuples - { - "TopicFoo": { - 0: (-1, 1), - 1: (-1, 1) - } - } - """ - self.topic_partition_times = topic_partition_times - self.replica_id = replica_id - - def encode(self): - request = ( - Int32.encode(self.replica_id) + - Array.encode([( - String.encode(topic) + - Array.encode([( - Int32.encode(partition) + - Int64.encode(time) + - Int32.encode(max_offsets) - ) for partition, (time, max_offsets) in partitions.iteritems()]) - ) for topic, partitions in self.topic_partition_times.iteritems()])) - return super(OffsetRequest, self).encode(request) - - -class MetadataRequest(AbstractRequest): - API_KEY = 3 - API_VERSION = 0 - __slots__ = ('topics') - - def __init__(self, *topics): - self.topics = topics - - def encode(self): - request = Array.encode(map(String.encode, self.topics)) - return super(MetadataRequest, self).encode(request) - - -# Non-user facing control APIs 4-7 - - -class OffsetCommitRequestV0(AbstractRequest): - API_KEY = 8 - API_VERSION = 0 - __slots__ = ('consumer_group_id', 'offsets') - - def __init__(self, consumer_group_id, offsets): - """ - offsets is a dict of dicts of (offset, metadata) tuples - { - "TopicFoo": { - 0: (1234, ""), - 1: (1243, "") - } - } - """ - self.consumer_group_id = consumer_group_id - self.offsets = offsets - - def encode(self): - request = ( - String.encode(self.consumer_group_id) + - Array.encode([( - String.encode(topic) + - Array.encode([( - Int32.encode(partition) + - Int64.encode(offset) + - String.encode(metadata) - ) for partition, (offset, metadata) in partitions.iteritems()]) - ) for topic, partitions in self.offsets.iteritems()])) - return super(OffsetCommitRequestV0, self).encode(request) - - -class OffsetCommitRequestV1(AbstractRequest): - API_KEY = 8 - API_VERSION = 1 - __slots__ = ('consumer_group_id', 'consumer_group_generation_id', - 'consumer_id', 'offsets') - - def __init__(self, consumer_group_id, consumer_group_generation_id, - consumer_id, offsets): - """ - offsets is a dict of dicts of (offset, timestamp, metadata) tuples - { - "TopicFoo": { - 0: (1234, 1448198827, ""), - 1: (1243, 1448198827, "") - } - } - """ - self.consumer_group_id = consumer_group_id - self.consumer_group_generation_id = consumer_group_generation_id - self.consumer_id = consumer_id - self.offsets = offsets - - def encode(self): - request = ( - String.encode(self.consumer_group_id) + - Int32.encode(self.consumer_group_generation_id) + - String.encode(self.consumer_id) + - Array.encode([( - String.encode(topic) + - Array.encode([( - Int32.encode(partition) + - Int64.encode(offset) + - Int64.encode(timestamp) + - String.encode(metadata) - ) for partition, (offset, timestamp, metadata) in partitions.iteritems()]) - ) for topic, partitions in self.offsets.iteritems()])) - return super(OffsetCommitRequestV1, self).encode(request) - - -class OffsetCommitRequest(AbstractRequest): - API_KEY = 8 - API_VERSION = 2 - __slots__ = ('consumer_group_id', 'consumer_group_generation_id', - 'consumer_id', 'retention_time', 'offsets') - - def __init__(self, consumer_group_id, consumer_group_generation_id, - consumer_id, retention_time, offsets): - """ - offsets is a dict of dicts of (offset, metadata) tuples - { - "TopicFoo": { - 0: (1234, ""), - 1: (1243, "") - } - } - """ - self.consumer_group_id = consumer_group_id - self.consumer_group_generation_id = consumer_group_generation_id - self.consumer_id = consumer_id - self.retention_time = retention_time - self.offsets = offsets - - def encode(self): - request = ( - String.encode(self.consumer_group_id) + - Int32.encode(self.consumer_group_generation_id) + - String.encode(self.consumer_id) + - Int64.encode(self.retention_time) + - Array.encode([( - String.encode(topic) + - Array.encode([( - Int32.encode(partition) + - Int64.encode(offset) + - String.encode(metadata) - ) for partition, (offset, timestamp, metadata) in partitions.iteritems()]) - ) for topic, partitions in self.offsets.iteritems()])) - return super(OffsetCommitRequest, self).encode(request) - - -class OffsetFetchRequestV0(AbstractRequest): - API_KEY = 9 - API_VERSION = 0 - __slots__ = ('consumer_group', 'topic_partitions') - - def __init__(self, consumer_group, topic_partitions): - """ - offsets is a dict of lists of partition ints - { - "TopicFoo": [0, 1, 2] - } - """ - self.consumer_group = consumer_group - self.topic_partitions = topic_partitions - - def encode(self): - request = ( - String.encode(self.consumer_group) + - Array.encode([( - String.encode(topic) + - Array.encode([Int32.encode(partition) for partition in partitions]) - ) for topic, partitions in self.topic_partitions.iteritems()]) - ) - return super(OffsetFetchRequest, self).encode(request) - - -class OffsetFetchRequest(OffsetFetchRequestV0): - """Identical to V0, but offsets fetched from kafka storage not zookeeper""" - API_VERSION = 1 - - -class GroupCoordinatorRequest(AbstractRequest): - API_KEY = 10 - API_VERSION = 0 - __slots__ = ('group_id',) - - def __init__(self, group_id): - self.group_id = group_id - - def encode(self): - request = String.encode(self.group_id) - return super(GroupCoordinatorRequest, self).encode(request) - - - |