summaryrefslogtreecommitdiff
path: root/kafka/protocol/produce.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/protocol/produce.py')
-rw-r--r--kafka/protocol/produce.py37
1 files changed, 35 insertions, 2 deletions
diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py
index 9b03354..da1f308 100644
--- a/kafka/protocol/produce.py
+++ b/kafka/protocol/produce.py
@@ -47,6 +47,12 @@ class ProduceResponse_v2(Response):
)
+class ProduceResponse_v3(Response):
+ API_KEY = 0
+ API_VERSION = 3
+ SCHEMA = ProduceResponse_v2.SCHEMA
+
+
class ProduceRequest_v0(Request):
API_KEY = 0
API_VERSION = 0
@@ -91,5 +97,32 @@ class ProduceRequest_v2(Request):
return True
-ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2]
-ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2]
+class ProduceRequest_v3(Request):
+ API_KEY = 0
+ API_VERSION = 3
+ RESPONSE_TYPE = ProduceResponse_v3
+ SCHEMA = Schema(
+ ('transactional_id', String('utf-8')),
+ ('required_acks', Int16),
+ ('timeout', Int32),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('messages', MessageSet)))))
+ )
+
+ def expect_response(self):
+ if self.required_acks == 0: # pylint: disable=no-member
+ return False
+ return True
+
+
+ProduceRequest = [
+ ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2,
+ ProduceRequest_v3
+]
+ProduceResponse = [
+ ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2,
+ ProduceResponse_v2
+]