summaryrefslogtreecommitdiff
path: root/kafka/protocol/legacy.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/protocol/legacy.py')
-rw-r--r--kafka/protocol/legacy.py352
1 files changed, 106 insertions, 246 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index db9f3e0..c5babf7 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -7,16 +7,21 @@ import six
from six.moves import xrange
+import kafka.common
+import kafka.protocol.commit
+import kafka.protocol.fetch
+import kafka.protocol.message
+import kafka.protocol.metadata
+import kafka.protocol.offset
+import kafka.protocol.produce
+
from kafka.codec import (
gzip_encode, gzip_decode, snappy_encode, snappy_decode
)
from kafka.common import (
- Message, OffsetAndMessage, TopicAndPartition,
- BrokerMetadata, TopicMetadata, PartitionMetadata,
- MetadataResponse, ProduceResponse, FetchResponse,
- OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
- ProtocolError, BufferUnderflowError, ChecksumError,
- ConsumerFetchSizeTooSmall, UnsupportedCodecError,
+ OffsetCommitResponse, OffsetFetchResponse,
+ ProtocolError, ChecksumError,
+ UnsupportedCodecError,
ConsumerMetadataResponse
)
from kafka.util import (
@@ -115,41 +120,6 @@ class KafkaProtocol(object):
return msg
@classmethod
- def _decode_message_set_iter(cls, data):
- """
- Iteratively decode a MessageSet
-
- Reads repeated elements of (offset, message), calling decode_message
- to decode a single message. Since compressed messages contain futher
- MessageSets, these two methods have been decoupled so that they may
- recurse easily.
- """
- cur = 0
- read_message = False
- while cur < len(data):
- try:
- ((offset, ), cur) = relative_unpack('>q', data, cur)
- (msg, cur) = read_int_string(data, cur)
- for (offset, message) in KafkaProtocol._decode_message(msg, offset):
- read_message = True
- yield OffsetAndMessage(offset, message)
- except BufferUnderflowError:
- # NOTE: Not sure this is correct error handling:
- # Is it possible to get a BUE if the message set is somewhere
- # in the middle of the fetch response? If so, we probably have
- # an issue that's not fetch size too small.
- # Aren't we ignoring errors if we fail to unpack data by
- # raising StopIteration()?
- # If _decode_message() raises a ChecksumError, couldn't that
- # also be due to the fetch size being too small?
- if read_message is False:
- # If we get a partial read of a message, but haven't
- # yielded anything there's a problem
- raise ConsumerFetchSizeTooSmall()
- else:
- raise StopIteration()
-
- @classmethod
def _decode_message(cls, data, offset):
"""
Decode a single Message
@@ -169,7 +139,7 @@ class KafkaProtocol(object):
codec = att & ATTRIBUTE_CODEC_MASK
if codec == CODEC_NONE:
- yield (offset, Message(magic, att, key, value))
+ yield (offset, kafka.common.Message(magic, att, key, value))
elif codec == CODEC_GZIP:
gz = gzip_decode(value)
@@ -186,253 +156,143 @@ class KafkaProtocol(object):
##################
@classmethod
- def encode_produce_request(cls, client_id, correlation_id,
- payloads=None, acks=1, timeout=1000):
+ def encode_produce_request(cls, payloads=(), acks=1, timeout=1000):
"""
- Encode some ProduceRequest structs
+ Encode a ProduceRequest struct
Arguments:
- client_id: string
- correlation_id: int
- payloads: list of ProduceRequest
+ payloads: list of ProduceRequestPayload
acks: How "acky" you want the request to be
- 0: immediate response
1: written to disk by the leader
- 2+: waits for this many number of replicas to sync
+ 0: immediate response
-1: waits for all replicas to be in sync
- timeout: Maximum time the server will wait for acks from replicas.
+ timeout: Maximum time (in ms) the server will wait for replica acks.
This is _not_ a socket timeout
- """
- payloads = [] if payloads is None else payloads
- grouped_payloads = group_by_topic_and_partition(payloads)
-
- message = []
- message.append(cls._encode_message_header(client_id, correlation_id,
- KafkaProtocol.PRODUCE_KEY))
-
- message.append(struct.pack('>hii', acks, timeout,
- len(grouped_payloads)))
-
- for topic, topic_payloads in grouped_payloads.items():
- message.append(struct.pack('>h%dsi' % len(topic), len(topic), topic,
- len(topic_payloads)))
-
- for partition, payload in topic_payloads.items():
- msg_set = KafkaProtocol._encode_message_set(payload.messages)
- message.append(struct.pack('>ii%ds' % len(msg_set), partition,
- len(msg_set), msg_set))
-
- msg = b''.join(message)
- return struct.pack('>i%ds' % len(msg), len(msg), msg)
+ Returns: ProduceRequest
+ """
+ if acks not in (1, 0, -1):
+ raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks)
+
+ return kafka.protocol.produce.ProduceRequest(
+ required_acks=acks,
+ timeout=timeout,
+ topics=[(
+ topic,
+ [(
+ partition,
+ [(0, 0, kafka.protocol.message.Message(msg.value, key=msg.key,
+ magic=msg.magic,
+ attributes=msg.attributes))
+ for msg in payload.messages])
+ for partition, payload in topic_payloads.items()])
+ for topic, topic_payloads in group_by_topic_and_partition(payloads).items()])
@classmethod
- def decode_produce_response(cls, data):
+ def decode_produce_response(cls, response):
"""
- Decode bytes to a ProduceResponse
+ Decode ProduceResponse to ProduceResponsePayload
Arguments:
- data: bytes to decode
+ response: ProduceResponse
+ Return: list of ProduceResponsePayload
"""
- ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
-
- for _ in range(num_topics):
- ((strlen,), cur) = relative_unpack('>h', data, cur)
- topic = data[cur:cur + strlen]
- cur += strlen
- ((num_partitions,), cur) = relative_unpack('>i', data, cur)
- for _ in range(num_partitions):
- ((partition, error, offset), cur) = relative_unpack('>ihq',
- data, cur)
-
- yield ProduceResponse(topic, partition, error, offset)
+ return [
+ kafka.common.ProduceResponsePayload(topic, partition, error, offset)
+ for topic, partitions in response.topics
+ for partition, error, offset in partitions
+ ]
@classmethod
- def encode_fetch_request(cls, client_id, correlation_id, payloads=None,
- max_wait_time=100, min_bytes=4096):
+ def encode_fetch_request(cls, payloads=(), max_wait_time=100, min_bytes=4096):
"""
- Encodes some FetchRequest structs
+ Encodes a FetchRequest struct
Arguments:
- client_id: string
- correlation_id: int
- payloads: list of FetchRequest
- max_wait_time: int, how long to block waiting on min_bytes of data
- min_bytes: int, the minimum number of bytes to accumulate before
- returning the response
- """
-
- payloads = [] if payloads is None else payloads
- grouped_payloads = group_by_topic_and_partition(payloads)
-
- message = []
- message.append(cls._encode_message_header(client_id, correlation_id,
- KafkaProtocol.FETCH_KEY))
-
- # -1 is the replica id
- message.append(struct.pack('>iiii', -1, max_wait_time, min_bytes,
- len(grouped_payloads)))
-
- for topic, topic_payloads in grouped_payloads.items():
- message.append(write_short_string(topic))
- message.append(struct.pack('>i', len(topic_payloads)))
- for partition, payload in topic_payloads.items():
- message.append(struct.pack('>iqi', partition, payload.offset,
- payload.max_bytes))
-
- msg = b''.join(message)
- return struct.pack('>i%ds' % len(msg), len(msg), msg)
+ payloads: list of FetchRequestPayload
+ max_wait_time (int, optional): ms to block waiting for min_bytes
+ data. Defaults to 100.
+ min_bytes (int, optional): minimum bytes required to return before
+ max_wait_time. Defaults to 4096.
+
+ Return: FetchRequest
+ """
+ return kafka.protocol.fetch.FetchRequest(
+ replica_id=-1,
+ max_wait_time=max_wait_time,
+ min_bytes=min_bytes,
+ topics=[(
+ topic,
+ [(
+ partition,
+ payload.offset,
+ payload.max_bytes)
+ for partition, payload in topic_payloads.items()])
+ for topic, topic_payloads in group_by_topic_and_partition(payloads).items()])
@classmethod
- def decode_fetch_response(cls, data):
+ def decode_fetch_response(cls, response):
"""
- Decode bytes to a FetchResponse
+ Decode FetchResponse struct to FetchResponsePayloads
Arguments:
- data: bytes to decode
+ response: FetchResponse
"""
- ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
-
- for _ in range(num_topics):
- (topic, cur) = read_short_string(data, cur)
- ((num_partitions,), cur) = relative_unpack('>i', data, cur)
-
- for j in range(num_partitions):
- ((partition, error, highwater_mark_offset), cur) = \
- relative_unpack('>ihq', data, cur)
-
- (message_set, cur) = read_int_string(data, cur)
-
- yield FetchResponse(
- topic, partition, error,
- highwater_mark_offset,
- KafkaProtocol._decode_message_set_iter(message_set))
+ return [
+ kafka.common.FetchResponsePayload(
+ topic, partition, error, highwater_offset, [
+ kafka.common.OffsetAndMessage(offset, message)
+ for offset, _, message in messages])
+ for topic, partitions in response.topics
+ for partition, error, highwater_offset, messages in partitions
+ ]
@classmethod
- def encode_offset_request(cls, client_id, correlation_id, payloads=None):
- payloads = [] if payloads is None else payloads
- grouped_payloads = group_by_topic_and_partition(payloads)
-
- message = []
- message.append(cls._encode_message_header(client_id, correlation_id,
- KafkaProtocol.OFFSET_KEY))
-
- # -1 is the replica id
- message.append(struct.pack('>ii', -1, len(grouped_payloads)))
-
- for topic, topic_payloads in grouped_payloads.items():
- message.append(write_short_string(topic))
- message.append(struct.pack('>i', len(topic_payloads)))
-
- for partition, payload in topic_payloads.items():
- message.append(struct.pack('>iqi', partition, payload.time,
- payload.max_offsets))
-
- msg = b''.join(message)
- return struct.pack('>i%ds' % len(msg), len(msg), msg)
+ def encode_offset_request(cls, payloads=()):
+ return kafka.protocol.offset.OffsetRequest(
+ replica_id=-1,
+ topics=[(
+ topic,
+ [(
+ partition,
+ payload.time,
+ payload.max_offsets)
+ for partition, payload in six.iteritems(topic_payloads)])
+ for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
@classmethod
- def decode_offset_response(cls, data):
+ def decode_offset_response(cls, response):
"""
- Decode bytes to an OffsetResponse
+ Decode OffsetResponse into OffsetResponsePayloads
Arguments:
- data: bytes to decode
- """
- ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
+ response: OffsetResponse
- for _ in range(num_topics):
- (topic, cur) = read_short_string(data, cur)
- ((num_partitions,), cur) = relative_unpack('>i', data, cur)
-
- for _ in range(num_partitions):
- ((partition, error, num_offsets,), cur) = \
- relative_unpack('>ihi', data, cur)
-
- offsets = []
- for k in range(num_offsets):
- ((offset,), cur) = relative_unpack('>q', data, cur)
- offsets.append(offset)
-
- yield OffsetResponse(topic, partition, error, tuple(offsets))
+ Returns: list of OffsetResponsePayloads
+ """
+ return [
+ kafka.common.OffsetResponsePayload(topic, partition, error, tuple(offsets))
+ for topic, partitions in response.topics
+ for partition, error, offsets in partitions
+ ]
@classmethod
- def encode_metadata_request(cls, client_id, correlation_id, topics=None,
- payloads=None):
+ def encode_metadata_request(cls, topics=(), payloads=None):
"""
Encode a MetadataRequest
Arguments:
- client_id: string
- correlation_id: int
topics: list of strings
"""
- if payloads is None:
- topics = [] if topics is None else topics
- else:
+ if payloads is not None:
topics = payloads
- message = []
- message.append(cls._encode_message_header(client_id, correlation_id,
- KafkaProtocol.METADATA_KEY))
-
- message.append(struct.pack('>i', len(topics)))
-
- for topic in topics:
- message.append(struct.pack('>h%ds' % len(topic), len(topic), topic))
-
- msg = b''.join(message)
- return write_int_string(msg)
+ return kafka.protocol.metadata.MetadataRequest(topics)
@classmethod
- def decode_metadata_response(cls, data):
- """
- Decode bytes to a MetadataResponse
-
- Arguments:
- data: bytes to decode
- """
- ((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0)
-
- # Broker info
- brokers = []
- for _ in range(numbrokers):
- ((nodeId, ), cur) = relative_unpack('>i', data, cur)
- (host, cur) = read_short_string(data, cur)
- ((port,), cur) = relative_unpack('>i', data, cur)
- brokers.append(BrokerMetadata(nodeId, host, port))
-
- # Topic info
- ((num_topics,), cur) = relative_unpack('>i', data, cur)
- topic_metadata = []
-
- for _ in range(num_topics):
- ((topic_error,), cur) = relative_unpack('>h', data, cur)
- (topic_name, cur) = read_short_string(data, cur)
- ((num_partitions,), cur) = relative_unpack('>i', data, cur)
- partition_metadata = []
-
- for _ in range(num_partitions):
- ((partition_error_code, partition, leader, numReplicas), cur) = \
- relative_unpack('>hiii', data, cur)
-
- (replicas, cur) = relative_unpack(
- '>%di' % numReplicas, data, cur)
-
- ((num_isr,), cur) = relative_unpack('>i', data, cur)
- (isr, cur) = relative_unpack('>%di' % num_isr, data, cur)
-
- partition_metadata.append(
- PartitionMetadata(topic_name, partition, leader,
- replicas, isr, partition_error_code)
- )
-
- topic_metadata.append(
- TopicMetadata(topic_name, topic_error, partition_metadata)
- )
-
- return MetadataResponse(brokers, topic_metadata)
+ def decode_metadata_response(cls, response):
+ return response
@classmethod
def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads):
@@ -587,7 +447,7 @@ def create_message(payload, key=None):
key: bytes, a key used for partition routing (optional)
"""
- return Message(0, 0, key, payload)
+ return kafka.common.Message(0, 0, key, payload)
def create_gzip_message(payloads, key=None, compresslevel=None):
@@ -608,7 +468,7 @@ def create_gzip_message(payloads, key=None, compresslevel=None):
gzipped = gzip_encode(message_set, compresslevel=compresslevel)
codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
- return Message(0, 0x00 | codec, key, gzipped)
+ return kafka.common.Message(0, 0x00 | codec, key, gzipped)
def create_snappy_message(payloads, key=None):
@@ -629,7 +489,7 @@ def create_snappy_message(payloads, key=None):
snapped = snappy_encode(message_set)
codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY
- return Message(0, 0x00 | codec, key, snapped)
+ return kafka.common.Message(0, 0x00 | codec, key, snapped)
def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None):