diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-10 10:57:27 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-10 11:25:15 -0800 |
commit | 7470cade6bb8629d17541e136527369f9d2ec387 (patch) | |
tree | ba424c4d0cc27ffb5ec77196a0e121f3075bc992 /kafka/protocol/legacy.py | |
parent | c3d2fda3c368771cb93a09bb2f1edaa7a3cf9c2b (diff) | |
download | kafka-python-7470cade6bb8629d17541e136527369f9d2ec387.tar.gz |
Convert OffsetCommit and OffsetFetch protocol encode/decode
Diffstat (limited to 'kafka/protocol/legacy.py')
-rw-r--r-- | kafka/protocol/legacy.py | 128 |
1 files changed, 46 insertions, 82 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index c5babf7..feabed3 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -19,7 +19,6 @@ from kafka.codec import ( gzip_encode, gzip_decode, snappy_encode, snappy_decode ) from kafka.common import ( - OffsetCommitResponse, OffsetFetchResponse, ProtocolError, ChecksumError, UnsupportedCodecError, ConsumerMetadataResponse @@ -258,8 +257,8 @@ class KafkaProtocol(object): partition, payload.time, payload.max_offsets) - for partition, payload in six.iteritems(topic_payloads)]) - for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) + for partition, payload in six.iteritems(topic_payloads)]) + for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) @classmethod def decode_offset_response(cls, response): @@ -327,115 +326,80 @@ class KafkaProtocol(object): return ConsumerMetadataResponse(error, nodeId, host, port) @classmethod - def encode_offset_commit_request(cls, client_id, correlation_id, - group, payloads): + def encode_offset_commit_request(cls, group, payloads): """ - Encode some OffsetCommitRequest structs + Encode an OffsetCommitRequest struct Arguments: - client_id: string - correlation_id: int group: string, the consumer group you are committing offsets for - payloads: list of OffsetCommitRequest + payloads: list of OffsetCommitRequestPayload """ - grouped_payloads = group_by_topic_and_partition(payloads) - - message = [] - message.append(cls._encode_message_header(client_id, correlation_id, - KafkaProtocol.OFFSET_COMMIT_KEY)) - message.append(write_short_string(group)) - message.append(struct.pack('>i', len(grouped_payloads))) - - for topic, topic_payloads in grouped_payloads.items(): - message.append(write_short_string(topic)) - message.append(struct.pack('>i', len(topic_payloads))) - - for partition, payload in topic_payloads.items(): - message.append(struct.pack('>iq', partition, payload.offset)) - message.append(write_short_string(payload.metadata)) + return kafka.protocol.commit.OffsetCommitRequest_v0( + consumer_group=group, + topics=[( + topic, + [( + partition, + payload.offset, + payload.metadata) + for partition, payload in six.iteritems(topic_payloads)]) + for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) - msg = b''.join(message) - return struct.pack('>i%ds' % len(msg), len(msg), msg) @classmethod - def decode_offset_commit_response(cls, data): + def decode_offset_commit_response(cls, response): """ - Decode bytes to an OffsetCommitResponse + Decode OffsetCommitResponse to an OffsetCommitResponsePayload Arguments: - data: bytes to decode + response: OffsetCommitResponse """ - ((correlation_id,), cur) = relative_unpack('>i', data, 0) - ((num_topics,), cur) = relative_unpack('>i', data, cur) - - for _ in xrange(num_topics): - (topic, cur) = read_short_string(data, cur) - ((num_partitions,), cur) = relative_unpack('>i', data, cur) - - for _ in xrange(num_partitions): - ((partition, error), cur) = relative_unpack('>ih', data, cur) - yield OffsetCommitResponse(topic, partition, error) + return [ + kafka.common.OffsetCommitResponsePayload(topic, partition, error) + for topic, partitions in response.topics + for partition, error in partitions + ] @classmethod - def encode_offset_fetch_request(cls, client_id, correlation_id, - group, payloads, from_kafka=False): + def encode_offset_fetch_request(cls, group, payloads, from_kafka=False): """ - Encode some OffsetFetchRequest structs. The request is encoded using + Encode an OffsetFetchRequest struct. The request is encoded using version 0 if from_kafka is false, indicating a request for Zookeeper offsets. It is encoded using version 1 otherwise, indicating a request for Kafka offsets. Arguments: - client_id: string - correlation_id: int group: string, the consumer group you are fetching offsets for - payloads: list of OffsetFetchRequest + payloads: list of OffsetFetchRequestPayload from_kafka: bool, default False, set True for Kafka-committed offsets """ - grouped_payloads = group_by_topic_and_partition(payloads) - - message = [] - reqver = 1 if from_kafka else 0 - message.append(cls._encode_message_header(client_id, correlation_id, - KafkaProtocol.OFFSET_FETCH_KEY, - version=reqver)) - - message.append(write_short_string(group)) - message.append(struct.pack('>i', len(grouped_payloads))) - - for topic, topic_payloads in grouped_payloads.items(): - message.append(write_short_string(topic)) - message.append(struct.pack('>i', len(topic_payloads))) - - for partition, payload in topic_payloads.items(): - message.append(struct.pack('>i', partition)) + if from_kafka: + request_class = kafka.protocol.commit.OffsetFetchRequest_v1 + else: + request_class = kafka.protocol.commit.OffsetFetchRequest_v0 - msg = b''.join(message) - return struct.pack('>i%ds' % len(msg), len(msg), msg) + return request_class( + consumer_group=group, + topics=[( + topic, + list(topic_payloads.keys())) + for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) @classmethod - def decode_offset_fetch_response(cls, data): + def decode_offset_fetch_response(cls, response): """ - Decode bytes to an OffsetFetchResponse + Decode OffsetFetchResponse to OffsetFetchResponsePayloads Arguments: - data: bytes to decode + response: OffsetFetchResponse """ - - ((correlation_id,), cur) = relative_unpack('>i', data, 0) - ((num_topics,), cur) = relative_unpack('>i', data, cur) - - for _ in range(num_topics): - (topic, cur) = read_short_string(data, cur) - ((num_partitions,), cur) = relative_unpack('>i', data, cur) - - for _ in range(num_partitions): - ((partition, offset), cur) = relative_unpack('>iq', data, cur) - (metadata, cur) = read_short_string(data, cur) - ((error,), cur) = relative_unpack('>h', data, cur) - - yield OffsetFetchResponse(topic, partition, offset, - metadata, error) + return [ + kafka.common.OffsetFetchResponsePayload( + topic, partition, offset, metadata, error + ) + for topic, partitions in response.topics + for partition, offset, metadata, error in partitions + ] def create_message(payload, key=None): |