diff options
Diffstat (limited to 'kafka/protocol/legacy.py')
-rw-r--r-- | kafka/protocol/legacy.py | 29 |
1 files changed, 29 insertions, 0 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index 6d9329d..c855d05 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -249,6 +249,35 @@ class KafkaProtocol(object): ] @classmethod + def encode_list_offset_request(cls, payloads=()): + return kafka.protocol.offset.OffsetRequest[1]( + replica_id=-1, + topics=[( + topic, + [( + partition, + payload.time) + for partition, payload in six.iteritems(topic_payloads)]) + for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) + + @classmethod + def decode_list_offset_response(cls, response): + """ + Decode OffsetResponse_v2 into ListOffsetResponsePayloads + + Arguments: + response: OffsetResponse_v2 + + Returns: list of ListOffsetResponsePayloads + """ + return [ + kafka.structs.ListOffsetResponsePayload(topic, partition, error, timestamp, offset) + for topic, partitions in response.topics + for partition, error, timestamp, offset in partitions + ] + + + @classmethod def encode_metadata_request(cls, topics=(), payloads=None): """ Encode a MetadataRequest |