summaryrefslogtreecommitdiff
path: root/kafka/protocol/legacy.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/protocol/legacy.py')
-rw-r--r--kafka/protocol/legacy.py440
1 files changed, 440 insertions, 0 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
new file mode 100644
index 0000000..1835521
--- /dev/null
+++ b/kafka/protocol/legacy.py
@@ -0,0 +1,440 @@
+from __future__ import absolute_import
+
+import logging
+import struct
+
+import six
+
+from six.moves import xrange
+
+import kafka.common
+import kafka.protocol.commit
+import kafka.protocol.fetch
+import kafka.protocol.message
+import kafka.protocol.metadata
+import kafka.protocol.offset
+import kafka.protocol.produce
+
+from kafka.codec import (
+ gzip_encode, gzip_decode, snappy_encode, snappy_decode
+)
+from kafka.common import (
+ ProtocolError, ChecksumError,
+ UnsupportedCodecError,
+ ConsumerMetadataResponse
+)
+from kafka.util import (
+ crc32, read_short_string, read_int_string, relative_unpack,
+ write_short_string, write_int_string, group_by_topic_and_partition
+)
+
+
+log = logging.getLogger(__name__)
+
+ATTRIBUTE_CODEC_MASK = 0x03
+CODEC_NONE = 0x00
+CODEC_GZIP = 0x01
+CODEC_SNAPPY = 0x02
+ALL_CODECS = (CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY)
+
+
+class KafkaProtocol(object):
+ """
+ Class to encapsulate all of the protocol encoding/decoding.
+ This class does not have any state associated with it, it is purely
+ for organization.
+ """
+ PRODUCE_KEY = 0
+ FETCH_KEY = 1
+ OFFSET_KEY = 2
+ METADATA_KEY = 3
+ OFFSET_COMMIT_KEY = 8
+ OFFSET_FETCH_KEY = 9
+ CONSUMER_METADATA_KEY = 10
+
+ ###################
+ # Private API #
+ ###################
+
+ @classmethod
+ def _encode_message_header(cls, client_id, correlation_id, request_key,
+ version=0):
+ """
+ Encode the common request envelope
+ """
+ return struct.pack('>hhih%ds' % len(client_id),
+ request_key, # ApiKey
+ version, # ApiVersion
+ correlation_id, # CorrelationId
+ len(client_id), # ClientId size
+ client_id) # ClientId
+
+ @classmethod
+ def _encode_message_set(cls, messages):
+ """
+ Encode a MessageSet. Unlike other arrays in the protocol,
+ MessageSets are not length-prefixed
+
+ Format
+ ======
+ MessageSet => [Offset MessageSize Message]
+ Offset => int64
+ MessageSize => int32
+ """
+ message_set = []
+ for message in messages:
+ encoded_message = KafkaProtocol._encode_message(message)
+ message_set.append(struct.pack('>qi%ds' % len(encoded_message), 0,
+ len(encoded_message),
+ encoded_message))
+ return b''.join(message_set)
+
+ @classmethod
+ def _encode_message(cls, message):
+ """
+ Encode a single message.
+
+ The magic number of a message is a format version number.
+ The only supported magic number right now is zero
+
+ Format
+ ======
+ Message => Crc MagicByte Attributes Key Value
+ Crc => int32
+ MagicByte => int8
+ Attributes => int8
+ Key => bytes
+ Value => bytes
+ """
+ if message.magic == 0:
+ msg = b''.join([
+ struct.pack('>BB', message.magic, message.attributes),
+ write_int_string(message.key),
+ write_int_string(message.value)
+ ])
+ crc = crc32(msg)
+ msg = struct.pack('>i%ds' % len(msg), crc, msg)
+ else:
+ raise ProtocolError("Unexpected magic number: %d" % message.magic)
+ return msg
+
+ ##################
+ # Public API #
+ ##################
+
+ @classmethod
+ def encode_produce_request(cls, payloads=(), acks=1, timeout=1000):
+ """
+ Encode a ProduceRequest struct
+
+ Arguments:
+ payloads: list of ProduceRequestPayload
+ acks: How "acky" you want the request to be
+ 1: written to disk by the leader
+ 0: immediate response
+ -1: waits for all replicas to be in sync
+ timeout: Maximum time (in ms) the server will wait for replica acks.
+ This is _not_ a socket timeout
+
+ Returns: ProduceRequest
+ """
+ if acks not in (1, 0, -1):
+ raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks)
+
+ return kafka.protocol.produce.ProduceRequest(
+ required_acks=acks,
+ timeout=timeout,
+ topics=[(
+ topic,
+ [(
+ partition,
+ [(0, 0, kafka.protocol.message.Message(msg.value, key=msg.key,
+ magic=msg.magic,
+ attributes=msg.attributes))
+ for msg in payload.messages])
+ for partition, payload in topic_payloads.items()])
+ for topic, topic_payloads in group_by_topic_and_partition(payloads).items()])
+
+ @classmethod
+ def decode_produce_response(cls, response):
+ """
+ Decode ProduceResponse to ProduceResponsePayload
+
+ Arguments:
+ response: ProduceResponse
+
+ Return: list of ProduceResponsePayload
+ """
+ return [
+ kafka.common.ProduceResponsePayload(topic, partition, error, offset)
+ for topic, partitions in response.topics
+ for partition, error, offset in partitions
+ ]
+
+ @classmethod
+ def encode_fetch_request(cls, payloads=(), max_wait_time=100, min_bytes=4096):
+ """
+ Encodes a FetchRequest struct
+
+ Arguments:
+ payloads: list of FetchRequestPayload
+ max_wait_time (int, optional): ms to block waiting for min_bytes
+ data. Defaults to 100.
+ min_bytes (int, optional): minimum bytes required to return before
+ max_wait_time. Defaults to 4096.
+
+ Return: FetchRequest
+ """
+ return kafka.protocol.fetch.FetchRequest(
+ replica_id=-1,
+ max_wait_time=max_wait_time,
+ min_bytes=min_bytes,
+ topics=[(
+ topic,
+ [(
+ partition,
+ payload.offset,
+ payload.max_bytes)
+ for partition, payload in topic_payloads.items()])
+ for topic, topic_payloads in group_by_topic_and_partition(payloads).items()])
+
+ @classmethod
+ def decode_fetch_response(cls, response):
+ """
+ Decode FetchResponse struct to FetchResponsePayloads
+
+ Arguments:
+ response: FetchResponse
+ """
+ return [
+ kafka.common.FetchResponsePayload(
+ topic, partition, error, highwater_offset, [
+ kafka.common.OffsetAndMessage(offset, message)
+ for offset, _, message in messages])
+ for topic, partitions in response.topics
+ for partition, error, highwater_offset, messages in partitions
+ ]
+
+ @classmethod
+ def encode_offset_request(cls, payloads=()):
+ return kafka.protocol.offset.OffsetRequest(
+ replica_id=-1,
+ topics=[(
+ topic,
+ [(
+ partition,
+ payload.time,
+ payload.max_offsets)
+ for partition, payload in six.iteritems(topic_payloads)])
+ for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
+
+ @classmethod
+ def decode_offset_response(cls, response):
+ """
+ Decode OffsetResponse into OffsetResponsePayloads
+
+ Arguments:
+ response: OffsetResponse
+
+ Returns: list of OffsetResponsePayloads
+ """
+ return [
+ kafka.common.OffsetResponsePayload(topic, partition, error, tuple(offsets))
+ for topic, partitions in response.topics
+ for partition, error, offsets in partitions
+ ]
+
+ @classmethod
+ def encode_metadata_request(cls, topics=(), payloads=None):
+ """
+ Encode a MetadataRequest
+
+ Arguments:
+ topics: list of strings
+ """
+ if payloads is not None:
+ topics = payloads
+
+ return kafka.protocol.metadata.MetadataRequest(topics)
+
+ @classmethod
+ def decode_metadata_response(cls, response):
+ return response
+
+ @classmethod
+ def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads):
+ """
+ Encode a ConsumerMetadataRequest
+
+ Arguments:
+ client_id: string
+ correlation_id: int
+ payloads: string (consumer group)
+ """
+ message = []
+ message.append(cls._encode_message_header(client_id, correlation_id,
+ KafkaProtocol.CONSUMER_METADATA_KEY))
+ message.append(struct.pack('>h%ds' % len(payloads), len(payloads), payloads))
+
+ msg = b''.join(message)
+ return write_int_string(msg)
+
+ @classmethod
+ def decode_consumer_metadata_response(cls, data):
+ """
+ Decode bytes to a ConsumerMetadataResponse
+
+ Arguments:
+ data: bytes to decode
+ """
+ ((correlation_id, error, nodeId), cur) = relative_unpack('>ihi', data, 0)
+ (host, cur) = read_short_string(data, cur)
+ ((port,), cur) = relative_unpack('>i', data, cur)
+
+ return ConsumerMetadataResponse(error, nodeId, host, port)
+
+ @classmethod
+ def encode_offset_commit_request(cls, group, payloads):
+ """
+ Encode an OffsetCommitRequest struct
+
+ Arguments:
+ group: string, the consumer group you are committing offsets for
+ payloads: list of OffsetCommitRequestPayload
+ """
+ return kafka.protocol.commit.OffsetCommitRequest_v0(
+ consumer_group=group,
+ topics=[(
+ topic,
+ [(
+ partition,
+ payload.offset,
+ payload.metadata)
+ for partition, payload in six.iteritems(topic_payloads)])
+ for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
+
+
+ @classmethod
+ def decode_offset_commit_response(cls, response):
+ """
+ Decode OffsetCommitResponse to an OffsetCommitResponsePayload
+
+ Arguments:
+ response: OffsetCommitResponse
+ """
+ return [
+ kafka.common.OffsetCommitResponsePayload(topic, partition, error)
+ for topic, partitions in response.topics
+ for partition, error in partitions
+ ]
+
+ @classmethod
+ def encode_offset_fetch_request(cls, group, payloads, from_kafka=False):
+ """
+ Encode an OffsetFetchRequest struct. The request is encoded using
+ version 0 if from_kafka is false, indicating a request for Zookeeper
+ offsets. It is encoded using version 1 otherwise, indicating a request
+ for Kafka offsets.
+
+ Arguments:
+ group: string, the consumer group you are fetching offsets for
+ payloads: list of OffsetFetchRequestPayload
+ from_kafka: bool, default False, set True for Kafka-committed offsets
+ """
+ if from_kafka:
+ request_class = kafka.protocol.commit.OffsetFetchRequest_v1
+ else:
+ request_class = kafka.protocol.commit.OffsetFetchRequest_v0
+
+ return request_class(
+ consumer_group=group,
+ topics=[(
+ topic,
+ list(topic_payloads.keys()))
+ for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
+
+ @classmethod
+ def decode_offset_fetch_response(cls, response):
+ """
+ Decode OffsetFetchResponse to OffsetFetchResponsePayloads
+
+ Arguments:
+ response: OffsetFetchResponse
+ """
+ return [
+ kafka.common.OffsetFetchResponsePayload(
+ topic, partition, offset, metadata, error
+ )
+ for topic, partitions in response.topics
+ for partition, offset, metadata, error in partitions
+ ]
+
+
+def create_message(payload, key=None):
+ """
+ Construct a Message
+
+ Arguments:
+ payload: bytes, the payload to send to Kafka
+ key: bytes, a key used for partition routing (optional)
+
+ """
+ return kafka.common.Message(0, 0, key, payload)
+
+
+def create_gzip_message(payloads, key=None, compresslevel=None):
+ """
+ Construct a Gzipped Message containing multiple Messages
+
+ The given payloads will be encoded, compressed, and sent as a single atomic
+ message to Kafka.
+
+ Arguments:
+ payloads: list(bytes), a list of payload to send be sent to Kafka
+ key: bytes, a key used for partition routing (optional)
+
+ """
+ message_set = KafkaProtocol._encode_message_set(
+ [create_message(payload, pl_key) for payload, pl_key in payloads])
+
+ gzipped = gzip_encode(message_set, compresslevel=compresslevel)
+ codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
+
+ return kafka.common.Message(0, 0x00 | codec, key, gzipped)
+
+
+def create_snappy_message(payloads, key=None):
+ """
+ Construct a Snappy Message containing multiple Messages
+
+ The given payloads will be encoded, compressed, and sent as a single atomic
+ message to Kafka.
+
+ Arguments:
+ payloads: list(bytes), a list of payload to send be sent to Kafka
+ key: bytes, a key used for partition routing (optional)
+
+ """
+ message_set = KafkaProtocol._encode_message_set(
+ [create_message(payload, pl_key) for payload, pl_key in payloads])
+
+ snapped = snappy_encode(message_set)
+ codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY
+
+ return kafka.common.Message(0, 0x00 | codec, key, snapped)
+
+
+def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None):
+ """Create a message set using the given codec.
+
+ If codec is CODEC_NONE, return a list of raw Kafka messages. Otherwise,
+ return a list containing a single codec-encoded message.
+ """
+ if codec == CODEC_NONE:
+ return [create_message(m, k) for m, k in messages]
+ elif codec == CODEC_GZIP:
+ return [create_gzip_message(messages, key, compresslevel)]
+ elif codec == CODEC_SNAPPY:
+ return [create_snappy_message(messages, key)]
+ else:
+ raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)