diff options
Diffstat (limited to 'kafka/protocol/produce.py')
-rw-r--r-- | kafka/protocol/produce.py | 37 |
1 files changed, 35 insertions, 2 deletions
diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index 9b03354..da1f308 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -47,6 +47,12 @@ class ProduceResponse_v2(Response): ) +class ProduceResponse_v3(Response): + API_KEY = 0 + API_VERSION = 3 + SCHEMA = ProduceResponse_v2.SCHEMA + + class ProduceRequest_v0(Request): API_KEY = 0 API_VERSION = 0 @@ -91,5 +97,32 @@ class ProduceRequest_v2(Request): return True -ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2] -ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2] +class ProduceRequest_v3(Request): + API_KEY = 0 + API_VERSION = 3 + RESPONSE_TYPE = ProduceResponse_v3 + SCHEMA = Schema( + ('transactional_id', String('utf-8')), + ('required_acks', Int16), + ('timeout', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('messages', MessageSet))))) + ) + + def expect_response(self): + if self.required_acks == 0: # pylint: disable=no-member + return False + return True + + +ProduceRequest = [ + ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2, + ProduceRequest_v3 +] +ProduceResponse = [ + ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2, + ProduceResponse_v2 +] |