diff options
author | charsyam <charsyam@naver.com> | 2017-03-03 07:15:01 +0900 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-03-02 14:15:01 -0800 |
commit | 3a630f2f886d9182bc6fe593d3659b0f3986fb4b (patch) | |
tree | 6b2f539a1227065cc7dabd1e06acc74c10e49cf3 /kafka/protocol/legacy.py | |
parent | a22ea165649b3510d770243f6f3809d598cb4f81 (diff) | |
download | kafka-python-3a630f2f886d9182bc6fe593d3659b0f3986fb4b.tar.gz |
Add send_list_offset_request for searching offset by timestamp (#1001)
Diffstat (limited to 'kafka/protocol/legacy.py')
-rw-r--r-- | kafka/protocol/legacy.py | 29 |
1 files changed, 29 insertions, 0 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index 6d9329d..c855d05 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -249,6 +249,35 @@ class KafkaProtocol(object): ] @classmethod + def encode_list_offset_request(cls, payloads=()): + return kafka.protocol.offset.OffsetRequest[1]( + replica_id=-1, + topics=[( + topic, + [( + partition, + payload.time) + for partition, payload in six.iteritems(topic_payloads)]) + for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) + + @classmethod + def decode_list_offset_response(cls, response): + """ + Decode OffsetResponse_v2 into ListOffsetResponsePayloads + + Arguments: + response: OffsetResponse_v2 + + Returns: list of ListOffsetResponsePayloads + """ + return [ + kafka.structs.ListOffsetResponsePayload(topic, partition, error, timestamp, offset) + for topic, partitions in response.topics + for partition, error, timestamp, offset in partitions + ] + + + @classmethod def encode_metadata_request(cls, topics=(), payloads=None): """ Encode a MetadataRequest |