summaryrefslogtreecommitdiff
path: root/kafka/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/protocol')
-rw-r--r--kafka/protocol/fetch.py26
-rw-r--r--kafka/protocol/legacy.py6
-rw-r--r--kafka/protocol/produce.py25
3 files changed, 48 insertions, 9 deletions
diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py
index eeda4e7..6aba972 100644
--- a/kafka/protocol/fetch.py
+++ b/kafka/protocol/fetch.py
@@ -17,6 +17,21 @@ class FetchResponse_v0(Struct):
)
+class FetchResponse_v1(Struct):
+ API_KEY = 1
+ API_VERSION = 1
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('topics', Array(
+ ('topics', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16),
+ ('highwater_offset', Int64),
+ ('message_set', MessageSet)))))
+ )
+
+
class FetchRequest_v0(Struct):
API_KEY = 1
API_VERSION = 0
@@ -34,5 +49,12 @@ class FetchRequest_v0(Struct):
)
-FetchRequest = [FetchRequest_v0]
-FetchResponse = [FetchResponse_v0]
+class FetchRequest_v1(Struct):
+ API_KEY = 1
+ API_VERSION = 1
+ RESPONSE_TYPE = FetchResponse_v1
+ SCHEMA = FetchRequest_v0.SCHEMA
+
+
+FetchRequest = [FetchRequest_v0, FetchRequest_v1]
+FetchResponse = [FetchResponse_v0, FetchResponse_v1]
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index 2eddf3b..08d2d01 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -336,11 +336,7 @@ class KafkaProtocol(object):
payloads: list of OffsetFetchRequestPayload
from_kafka: bool, default False, set True for Kafka-committed offsets
"""
- if from_kafka:
- version = 1
- else:
- version = 0
-
+ version = 1 if from_kafka else 0
return kafka.protocol.commit.OffsetFetchRequest[version](
consumer_group=group,
topics=[(
diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py
index 5753f64..e0b8622 100644
--- a/kafka/protocol/produce.py
+++ b/kafka/protocol/produce.py
@@ -16,6 +16,20 @@ class ProduceResponse_v0(Struct):
)
+class ProduceResponse_v1(Struct):
+ API_KEY = 0
+ API_VERSION = 1
+ SCHEMA = Schema(
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16),
+ ('offset', Int64))))),
+ ('throttle_time_ms', Int32)
+ )
+
+
class ProduceRequest_v0(Struct):
API_KEY = 0
API_VERSION = 0
@@ -31,5 +45,12 @@ class ProduceRequest_v0(Struct):
)
-ProduceRequest = [ProduceRequest_v0]
-ProduceResponse = [ProduceResponse_v0]
+class ProduceRequest_v1(Struct):
+ API_KEY = 0
+ API_VERSION = 1
+ RESPONSE_TYPE = ProduceResponse_v1
+ SCHEMA = ProduceRequest_v0.SCHEMA
+
+
+ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1]
+ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1]