diff options
Diffstat (limited to 'kafka/protocol')
-rw-r--r-- | kafka/protocol/fetch.py | 26 | ||||
-rw-r--r-- | kafka/protocol/legacy.py | 6 | ||||
-rw-r--r-- | kafka/protocol/produce.py | 25 |
3 files changed, 48 insertions, 9 deletions
diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index eeda4e7..6aba972 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -17,6 +17,21 @@ class FetchResponse_v0(Struct): ) +class FetchResponse_v1(Struct): + API_KEY = 1 + API_VERSION = 1 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topics', Array( + ('topics', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('highwater_offset', Int64), + ('message_set', MessageSet))))) + ) + + class FetchRequest_v0(Struct): API_KEY = 1 API_VERSION = 0 @@ -34,5 +49,12 @@ class FetchRequest_v0(Struct): ) -FetchRequest = [FetchRequest_v0] -FetchResponse = [FetchResponse_v0] +class FetchRequest_v1(Struct): + API_KEY = 1 + API_VERSION = 1 + RESPONSE_TYPE = FetchResponse_v1 + SCHEMA = FetchRequest_v0.SCHEMA + + +FetchRequest = [FetchRequest_v0, FetchRequest_v1] +FetchResponse = [FetchResponse_v0, FetchResponse_v1] diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index 2eddf3b..08d2d01 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -336,11 +336,7 @@ class KafkaProtocol(object): payloads: list of OffsetFetchRequestPayload from_kafka: bool, default False, set True for Kafka-committed offsets """ - if from_kafka: - version = 1 - else: - version = 0 - + version = 1 if from_kafka else 0 return kafka.protocol.commit.OffsetFetchRequest[version]( consumer_group=group, topics=[( diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index 5753f64..e0b8622 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -16,6 +16,20 @@ class ProduceResponse_v0(Struct): ) +class ProduceResponse_v1(Struct): + API_KEY = 0 + API_VERSION = 1 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('offset', Int64))))), + ('throttle_time_ms', Int32) + ) + + class ProduceRequest_v0(Struct): API_KEY = 0 API_VERSION = 0 @@ -31,5 +45,12 @@ class ProduceRequest_v0(Struct): ) -ProduceRequest = [ProduceRequest_v0] -ProduceResponse = [ProduceResponse_v0] +class ProduceRequest_v1(Struct): + API_KEY = 0 + API_VERSION = 1 + RESPONSE_TYPE = ProduceResponse_v1 + SCHEMA = ProduceRequest_v0.SCHEMA + + +ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1] +ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1] |