diff options
Diffstat (limited to 'kafka/protocol/offset.py')
-rw-r--r-- | kafka/protocol/offset.py | 39 |
1 files changed, 36 insertions, 3 deletions
diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py index 588dfec..8353f8c 100644 --- a/kafka/protocol/offset.py +++ b/kafka/protocol/offset.py @@ -1,7 +1,7 @@ from __future__ import absolute_import from .api import Request, Response -from .types import Array, Int16, Int32, Int64, Schema, String +from .types import Array, Int8, Int16, Int32, Int64, Schema, String class OffsetResetStrategy(object): @@ -36,6 +36,21 @@ class OffsetResponse_v1(Response): ) +class OffsetResponse_v2(Response): + API_KEY = 2 + API_VERSION = 2 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('timestamp', Int64), + ('offset', Int64))))) + ) + + class OffsetRequest_v0(Request): API_KEY = 2 API_VERSION = 0 @@ -70,5 +85,23 @@ class OffsetRequest_v1(Request): } -OffsetRequest = [OffsetRequest_v0, OffsetRequest_v1] -OffsetResponse = [OffsetResponse_v0, OffsetResponse_v1] +class OffsetRequest_v2(Request): + API_KEY = 2 + API_VERSION = 2 + RESPONSE_TYPE = OffsetResponse_v2 + SCHEMA = Schema( + ('replica_id', Int32), + ('isolation_level', Int8), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('timestamp', Int64))))) + ) + DEFAULTS = { + 'replica_id': -1 + } + + +OffsetRequest = [OffsetRequest_v0, OffsetRequest_v1, OffsetRequest_v2] +OffsetResponse = [OffsetResponse_v0, OffsetResponse_v1, OffsetResponse_v2] |