diff options
Diffstat (limited to 'kafka/protocol/legacy.py')
-rw-r--r-- | kafka/protocol/legacy.py | 440 |
1 files changed, 440 insertions, 0 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py new file mode 100644 index 0000000..1835521 --- /dev/null +++ b/kafka/protocol/legacy.py @@ -0,0 +1,440 @@ +from __future__ import absolute_import + +import logging +import struct + +import six + +from six.moves import xrange + +import kafka.common +import kafka.protocol.commit +import kafka.protocol.fetch +import kafka.protocol.message +import kafka.protocol.metadata +import kafka.protocol.offset +import kafka.protocol.produce + +from kafka.codec import ( + gzip_encode, gzip_decode, snappy_encode, snappy_decode +) +from kafka.common import ( + ProtocolError, ChecksumError, + UnsupportedCodecError, + ConsumerMetadataResponse +) +from kafka.util import ( + crc32, read_short_string, read_int_string, relative_unpack, + write_short_string, write_int_string, group_by_topic_and_partition +) + + +log = logging.getLogger(__name__) + +ATTRIBUTE_CODEC_MASK = 0x03 +CODEC_NONE = 0x00 +CODEC_GZIP = 0x01 +CODEC_SNAPPY = 0x02 +ALL_CODECS = (CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY) + + +class KafkaProtocol(object): + """ + Class to encapsulate all of the protocol encoding/decoding. + This class does not have any state associated with it, it is purely + for organization. + """ + PRODUCE_KEY = 0 + FETCH_KEY = 1 + OFFSET_KEY = 2 + METADATA_KEY = 3 + OFFSET_COMMIT_KEY = 8 + OFFSET_FETCH_KEY = 9 + CONSUMER_METADATA_KEY = 10 + + ################### + # Private API # + ################### + + @classmethod + def _encode_message_header(cls, client_id, correlation_id, request_key, + version=0): + """ + Encode the common request envelope + """ + return struct.pack('>hhih%ds' % len(client_id), + request_key, # ApiKey + version, # ApiVersion + correlation_id, # CorrelationId + len(client_id), # ClientId size + client_id) # ClientId + + @classmethod + def _encode_message_set(cls, messages): + """ + Encode a MessageSet. Unlike other arrays in the protocol, + MessageSets are not length-prefixed + + Format + ====== + MessageSet => [Offset MessageSize Message] + Offset => int64 + MessageSize => int32 + """ + message_set = [] + for message in messages: + encoded_message = KafkaProtocol._encode_message(message) + message_set.append(struct.pack('>qi%ds' % len(encoded_message), 0, + len(encoded_message), + encoded_message)) + return b''.join(message_set) + + @classmethod + def _encode_message(cls, message): + """ + Encode a single message. + + The magic number of a message is a format version number. + The only supported magic number right now is zero + + Format + ====== + Message => Crc MagicByte Attributes Key Value + Crc => int32 + MagicByte => int8 + Attributes => int8 + Key => bytes + Value => bytes + """ + if message.magic == 0: + msg = b''.join([ + struct.pack('>BB', message.magic, message.attributes), + write_int_string(message.key), + write_int_string(message.value) + ]) + crc = crc32(msg) + msg = struct.pack('>i%ds' % len(msg), crc, msg) + else: + raise ProtocolError("Unexpected magic number: %d" % message.magic) + return msg + + ################## + # Public API # + ################## + + @classmethod + def encode_produce_request(cls, payloads=(), acks=1, timeout=1000): + """ + Encode a ProduceRequest struct + + Arguments: + payloads: list of ProduceRequestPayload + acks: How "acky" you want the request to be + 1: written to disk by the leader + 0: immediate response + -1: waits for all replicas to be in sync + timeout: Maximum time (in ms) the server will wait for replica acks. + This is _not_ a socket timeout + + Returns: ProduceRequest + """ + if acks not in (1, 0, -1): + raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks) + + return kafka.protocol.produce.ProduceRequest( + required_acks=acks, + timeout=timeout, + topics=[( + topic, + [( + partition, + [(0, 0, kafka.protocol.message.Message(msg.value, key=msg.key, + magic=msg.magic, + attributes=msg.attributes)) + for msg in payload.messages]) + for partition, payload in topic_payloads.items()]) + for topic, topic_payloads in group_by_topic_and_partition(payloads).items()]) + + @classmethod + def decode_produce_response(cls, response): + """ + Decode ProduceResponse to ProduceResponsePayload + + Arguments: + response: ProduceResponse + + Return: list of ProduceResponsePayload + """ + return [ + kafka.common.ProduceResponsePayload(topic, partition, error, offset) + for topic, partitions in response.topics + for partition, error, offset in partitions + ] + + @classmethod + def encode_fetch_request(cls, payloads=(), max_wait_time=100, min_bytes=4096): + """ + Encodes a FetchRequest struct + + Arguments: + payloads: list of FetchRequestPayload + max_wait_time (int, optional): ms to block waiting for min_bytes + data. Defaults to 100. + min_bytes (int, optional): minimum bytes required to return before + max_wait_time. Defaults to 4096. + + Return: FetchRequest + """ + return kafka.protocol.fetch.FetchRequest( + replica_id=-1, + max_wait_time=max_wait_time, + min_bytes=min_bytes, + topics=[( + topic, + [( + partition, + payload.offset, + payload.max_bytes) + for partition, payload in topic_payloads.items()]) + for topic, topic_payloads in group_by_topic_and_partition(payloads).items()]) + + @classmethod + def decode_fetch_response(cls, response): + """ + Decode FetchResponse struct to FetchResponsePayloads + + Arguments: + response: FetchResponse + """ + return [ + kafka.common.FetchResponsePayload( + topic, partition, error, highwater_offset, [ + kafka.common.OffsetAndMessage(offset, message) + for offset, _, message in messages]) + for topic, partitions in response.topics + for partition, error, highwater_offset, messages in partitions + ] + + @classmethod + def encode_offset_request(cls, payloads=()): + return kafka.protocol.offset.OffsetRequest( + replica_id=-1, + topics=[( + topic, + [( + partition, + payload.time, + payload.max_offsets) + for partition, payload in six.iteritems(topic_payloads)]) + for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) + + @classmethod + def decode_offset_response(cls, response): + """ + Decode OffsetResponse into OffsetResponsePayloads + + Arguments: + response: OffsetResponse + + Returns: list of OffsetResponsePayloads + """ + return [ + kafka.common.OffsetResponsePayload(topic, partition, error, tuple(offsets)) + for topic, partitions in response.topics + for partition, error, offsets in partitions + ] + + @classmethod + def encode_metadata_request(cls, topics=(), payloads=None): + """ + Encode a MetadataRequest + + Arguments: + topics: list of strings + """ + if payloads is not None: + topics = payloads + + return kafka.protocol.metadata.MetadataRequest(topics) + + @classmethod + def decode_metadata_response(cls, response): + return response + + @classmethod + def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads): + """ + Encode a ConsumerMetadataRequest + + Arguments: + client_id: string + correlation_id: int + payloads: string (consumer group) + """ + message = [] + message.append(cls._encode_message_header(client_id, correlation_id, + KafkaProtocol.CONSUMER_METADATA_KEY)) + message.append(struct.pack('>h%ds' % len(payloads), len(payloads), payloads)) + + msg = b''.join(message) + return write_int_string(msg) + + @classmethod + def decode_consumer_metadata_response(cls, data): + """ + Decode bytes to a ConsumerMetadataResponse + + Arguments: + data: bytes to decode + """ + ((correlation_id, error, nodeId), cur) = relative_unpack('>ihi', data, 0) + (host, cur) = read_short_string(data, cur) + ((port,), cur) = relative_unpack('>i', data, cur) + + return ConsumerMetadataResponse(error, nodeId, host, port) + + @classmethod + def encode_offset_commit_request(cls, group, payloads): + """ + Encode an OffsetCommitRequest struct + + Arguments: + group: string, the consumer group you are committing offsets for + payloads: list of OffsetCommitRequestPayload + """ + return kafka.protocol.commit.OffsetCommitRequest_v0( + consumer_group=group, + topics=[( + topic, + [( + partition, + payload.offset, + payload.metadata) + for partition, payload in six.iteritems(topic_payloads)]) + for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) + + + @classmethod + def decode_offset_commit_response(cls, response): + """ + Decode OffsetCommitResponse to an OffsetCommitResponsePayload + + Arguments: + response: OffsetCommitResponse + """ + return [ + kafka.common.OffsetCommitResponsePayload(topic, partition, error) + for topic, partitions in response.topics + for partition, error in partitions + ] + + @classmethod + def encode_offset_fetch_request(cls, group, payloads, from_kafka=False): + """ + Encode an OffsetFetchRequest struct. The request is encoded using + version 0 if from_kafka is false, indicating a request for Zookeeper + offsets. It is encoded using version 1 otherwise, indicating a request + for Kafka offsets. + + Arguments: + group: string, the consumer group you are fetching offsets for + payloads: list of OffsetFetchRequestPayload + from_kafka: bool, default False, set True for Kafka-committed offsets + """ + if from_kafka: + request_class = kafka.protocol.commit.OffsetFetchRequest_v1 + else: + request_class = kafka.protocol.commit.OffsetFetchRequest_v0 + + return request_class( + consumer_group=group, + topics=[( + topic, + list(topic_payloads.keys())) + for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) + + @classmethod + def decode_offset_fetch_response(cls, response): + """ + Decode OffsetFetchResponse to OffsetFetchResponsePayloads + + Arguments: + response: OffsetFetchResponse + """ + return [ + kafka.common.OffsetFetchResponsePayload( + topic, partition, offset, metadata, error + ) + for topic, partitions in response.topics + for partition, offset, metadata, error in partitions + ] + + +def create_message(payload, key=None): + """ + Construct a Message + + Arguments: + payload: bytes, the payload to send to Kafka + key: bytes, a key used for partition routing (optional) + + """ + return kafka.common.Message(0, 0, key, payload) + + +def create_gzip_message(payloads, key=None, compresslevel=None): + """ + Construct a Gzipped Message containing multiple Messages + + The given payloads will be encoded, compressed, and sent as a single atomic + message to Kafka. + + Arguments: + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + + """ + message_set = KafkaProtocol._encode_message_set( + [create_message(payload, pl_key) for payload, pl_key in payloads]) + + gzipped = gzip_encode(message_set, compresslevel=compresslevel) + codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP + + return kafka.common.Message(0, 0x00 | codec, key, gzipped) + + +def create_snappy_message(payloads, key=None): + """ + Construct a Snappy Message containing multiple Messages + + The given payloads will be encoded, compressed, and sent as a single atomic + message to Kafka. + + Arguments: + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + + """ + message_set = KafkaProtocol._encode_message_set( + [create_message(payload, pl_key) for payload, pl_key in payloads]) + + snapped = snappy_encode(message_set) + codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY + + return kafka.common.Message(0, 0x00 | codec, key, snapped) + + +def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None): + """Create a message set using the given codec. + + If codec is CODEC_NONE, return a list of raw Kafka messages. Otherwise, + return a list containing a single codec-encoded message. + """ + if codec == CODEC_NONE: + return [create_message(m, k) for m, k in messages] + elif codec == CODEC_GZIP: + return [create_gzip_message(messages, key, compresslevel)] + elif codec == CODEC_SNAPPY: + return [create_snappy_message(messages, key)] + else: + raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec) |