summaryrefslogtreecommitdiff
path: root/kafka/protocol/legacy.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-10 10:57:27 -0800
committerDana Powers <dana.powers@rd.io>2015-12-10 11:25:15 -0800
commit7470cade6bb8629d17541e136527369f9d2ec387 (patch)
treeba424c4d0cc27ffb5ec77196a0e121f3075bc992 /kafka/protocol/legacy.py
parentc3d2fda3c368771cb93a09bb2f1edaa7a3cf9c2b (diff)
downloadkafka-python-7470cade6bb8629d17541e136527369f9d2ec387.tar.gz
Convert OffsetCommit and OffsetFetch protocol encode/decode
Diffstat (limited to 'kafka/protocol/legacy.py')
-rw-r--r--kafka/protocol/legacy.py128
1 files changed, 46 insertions, 82 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index c5babf7..feabed3 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -19,7 +19,6 @@ from kafka.codec import (
gzip_encode, gzip_decode, snappy_encode, snappy_decode
)
from kafka.common import (
- OffsetCommitResponse, OffsetFetchResponse,
ProtocolError, ChecksumError,
UnsupportedCodecError,
ConsumerMetadataResponse
@@ -258,8 +257,8 @@ class KafkaProtocol(object):
partition,
payload.time,
payload.max_offsets)
- for partition, payload in six.iteritems(topic_payloads)])
- for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
+ for partition, payload in six.iteritems(topic_payloads)])
+ for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
@classmethod
def decode_offset_response(cls, response):
@@ -327,115 +326,80 @@ class KafkaProtocol(object):
return ConsumerMetadataResponse(error, nodeId, host, port)
@classmethod
- def encode_offset_commit_request(cls, client_id, correlation_id,
- group, payloads):
+ def encode_offset_commit_request(cls, group, payloads):
"""
- Encode some OffsetCommitRequest structs
+ Encode an OffsetCommitRequest struct
Arguments:
- client_id: string
- correlation_id: int
group: string, the consumer group you are committing offsets for
- payloads: list of OffsetCommitRequest
+ payloads: list of OffsetCommitRequestPayload
"""
- grouped_payloads = group_by_topic_and_partition(payloads)
-
- message = []
- message.append(cls._encode_message_header(client_id, correlation_id,
- KafkaProtocol.OFFSET_COMMIT_KEY))
- message.append(write_short_string(group))
- message.append(struct.pack('>i', len(grouped_payloads)))
-
- for topic, topic_payloads in grouped_payloads.items():
- message.append(write_short_string(topic))
- message.append(struct.pack('>i', len(topic_payloads)))
-
- for partition, payload in topic_payloads.items():
- message.append(struct.pack('>iq', partition, payload.offset))
- message.append(write_short_string(payload.metadata))
+ return kafka.protocol.commit.OffsetCommitRequest_v0(
+ consumer_group=group,
+ topics=[(
+ topic,
+ [(
+ partition,
+ payload.offset,
+ payload.metadata)
+ for partition, payload in six.iteritems(topic_payloads)])
+ for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
- msg = b''.join(message)
- return struct.pack('>i%ds' % len(msg), len(msg), msg)
@classmethod
- def decode_offset_commit_response(cls, data):
+ def decode_offset_commit_response(cls, response):
"""
- Decode bytes to an OffsetCommitResponse
+ Decode OffsetCommitResponse to an OffsetCommitResponsePayload
Arguments:
- data: bytes to decode
+ response: OffsetCommitResponse
"""
- ((correlation_id,), cur) = relative_unpack('>i', data, 0)
- ((num_topics,), cur) = relative_unpack('>i', data, cur)
-
- for _ in xrange(num_topics):
- (topic, cur) = read_short_string(data, cur)
- ((num_partitions,), cur) = relative_unpack('>i', data, cur)
-
- for _ in xrange(num_partitions):
- ((partition, error), cur) = relative_unpack('>ih', data, cur)
- yield OffsetCommitResponse(topic, partition, error)
+ return [
+ kafka.common.OffsetCommitResponsePayload(topic, partition, error)
+ for topic, partitions in response.topics
+ for partition, error in partitions
+ ]
@classmethod
- def encode_offset_fetch_request(cls, client_id, correlation_id,
- group, payloads, from_kafka=False):
+ def encode_offset_fetch_request(cls, group, payloads, from_kafka=False):
"""
- Encode some OffsetFetchRequest structs. The request is encoded using
+ Encode an OffsetFetchRequest struct. The request is encoded using
version 0 if from_kafka is false, indicating a request for Zookeeper
offsets. It is encoded using version 1 otherwise, indicating a request
for Kafka offsets.
Arguments:
- client_id: string
- correlation_id: int
group: string, the consumer group you are fetching offsets for
- payloads: list of OffsetFetchRequest
+ payloads: list of OffsetFetchRequestPayload
from_kafka: bool, default False, set True for Kafka-committed offsets
"""
- grouped_payloads = group_by_topic_and_partition(payloads)
-
- message = []
- reqver = 1 if from_kafka else 0
- message.append(cls._encode_message_header(client_id, correlation_id,
- KafkaProtocol.OFFSET_FETCH_KEY,
- version=reqver))
-
- message.append(write_short_string(group))
- message.append(struct.pack('>i', len(grouped_payloads)))
-
- for topic, topic_payloads in grouped_payloads.items():
- message.append(write_short_string(topic))
- message.append(struct.pack('>i', len(topic_payloads)))
-
- for partition, payload in topic_payloads.items():
- message.append(struct.pack('>i', partition))
+ if from_kafka:
+ request_class = kafka.protocol.commit.OffsetFetchRequest_v1
+ else:
+ request_class = kafka.protocol.commit.OffsetFetchRequest_v0
- msg = b''.join(message)
- return struct.pack('>i%ds' % len(msg), len(msg), msg)
+ return request_class(
+ consumer_group=group,
+ topics=[(
+ topic,
+ list(topic_payloads.keys()))
+ for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
@classmethod
- def decode_offset_fetch_response(cls, data):
+ def decode_offset_fetch_response(cls, response):
"""
- Decode bytes to an OffsetFetchResponse
+ Decode OffsetFetchResponse to OffsetFetchResponsePayloads
Arguments:
- data: bytes to decode
+ response: OffsetFetchResponse
"""
-
- ((correlation_id,), cur) = relative_unpack('>i', data, 0)
- ((num_topics,), cur) = relative_unpack('>i', data, cur)
-
- for _ in range(num_topics):
- (topic, cur) = read_short_string(data, cur)
- ((num_partitions,), cur) = relative_unpack('>i', data, cur)
-
- for _ in range(num_partitions):
- ((partition, offset), cur) = relative_unpack('>iq', data, cur)
- (metadata, cur) = read_short_string(data, cur)
- ((error,), cur) = relative_unpack('>h', data, cur)
-
- yield OffsetFetchResponse(topic, partition, offset,
- metadata, error)
+ return [
+ kafka.common.OffsetFetchResponsePayload(
+ topic, partition, offset, metadata, error
+ )
+ for topic, partitions in response.topics
+ for partition, offset, metadata, error in partitions
+ ]
def create_message(payload, key=None):