summaryrefslogtreecommitdiff
path: root/kafka/protocol/offset.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/protocol/offset.py')
-rw-r--r--kafka/protocol/offset.py39
1 files changed, 36 insertions, 3 deletions
diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py
index 588dfec..8353f8c 100644
--- a/kafka/protocol/offset.py
+++ b/kafka/protocol/offset.py
@@ -1,7 +1,7 @@
from __future__ import absolute_import
from .api import Request, Response
-from .types import Array, Int16, Int32, Int64, Schema, String
+from .types import Array, Int8, Int16, Int32, Int64, Schema, String
class OffsetResetStrategy(object):
@@ -36,6 +36,21 @@ class OffsetResponse_v1(Response):
)
+class OffsetResponse_v2(Response):
+ API_KEY = 2
+ API_VERSION = 2
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16),
+ ('timestamp', Int64),
+ ('offset', Int64)))))
+ )
+
+
class OffsetRequest_v0(Request):
API_KEY = 2
API_VERSION = 0
@@ -70,5 +85,23 @@ class OffsetRequest_v1(Request):
}
-OffsetRequest = [OffsetRequest_v0, OffsetRequest_v1]
-OffsetResponse = [OffsetResponse_v0, OffsetResponse_v1]
+class OffsetRequest_v2(Request):
+ API_KEY = 2
+ API_VERSION = 2
+ RESPONSE_TYPE = OffsetResponse_v2
+ SCHEMA = Schema(
+ ('replica_id', Int32),
+ ('isolation_level', Int8),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('timestamp', Int64)))))
+ )
+ DEFAULTS = {
+ 'replica_id': -1
+ }
+
+
+OffsetRequest = [OffsetRequest_v0, OffsetRequest_v1, OffsetRequest_v2]
+OffsetResponse = [OffsetResponse_v0, OffsetResponse_v1, OffsetResponse_v2]