summaryrefslogtreecommitdiff
path: root/kafka/protocol/commit.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/protocol/commit.py')
-rw-r--r--kafka/protocol/commit.py42
1 files changed, 24 insertions, 18 deletions
diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py
index 5ba0227..2955de1 100644
--- a/kafka/protocol/commit.py
+++ b/kafka/protocol/commit.py
@@ -2,9 +2,20 @@ from .struct import Struct
from .types import Array, Int16, Int32, Int64, Schema, String
+class OffsetCommitResponse(Struct):
+ SCHEMA = Schema(
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16)))))
+ )
+
+
class OffsetCommitRequest_v2(Struct):
API_KEY = 8
API_VERSION = 2 # added retention_time, dropped timestamp
+ RESPONSE_TYPE = OffsetCommitResponse
SCHEMA = Schema(
('consumer_group', String('utf-8')),
('consumer_group_generation_id', Int32),
@@ -22,6 +33,7 @@ class OffsetCommitRequest_v2(Struct):
class OffsetCommitRequest_v1(Struct):
API_KEY = 8
API_VERSION = 1 # Kafka-backed storage
+ RESPONSE_TYPE = OffsetCommitResponse
SCHEMA = Schema(
('consumer_group', String('utf-8')),
('consumer_group_generation_id', Int32),
@@ -39,6 +51,7 @@ class OffsetCommitRequest_v1(Struct):
class OffsetCommitRequest_v0(Struct):
API_KEY = 8
API_VERSION = 0 # Zookeeper-backed storage
+ RESPONSE_TYPE = OffsetCommitResponse
SCHEMA = Schema(
('consumer_group', String('utf-8')),
('topics', Array(
@@ -50,12 +63,14 @@ class OffsetCommitRequest_v0(Struct):
)
-class OffsetCommitResponse(Struct):
+class OffsetFetchResponse(Struct):
SCHEMA = Schema(
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
+ ('offset', Int64),
+ ('metadata', String('utf-8')),
('error_code', Int16)))))
)
@@ -63,6 +78,7 @@ class OffsetCommitResponse(Struct):
class OffsetFetchRequest_v1(Struct):
API_KEY = 9
API_VERSION = 1 # kafka-backed storage
+ RESPONSE_TYPE = OffsetFetchResponse
SCHEMA = Schema(
('consumer_group', String('utf-8')),
('topics', Array(
@@ -74,6 +90,7 @@ class OffsetFetchRequest_v1(Struct):
class OffsetFetchRequest_v0(Struct):
API_KEY = 9
API_VERSION = 0 # zookeeper-backed storage
+ RESPONSE_TYPE = OffsetFetchResponse
SCHEMA = Schema(
('consumer_group', String('utf-8')),
('topics', Array(
@@ -82,30 +99,19 @@ class OffsetFetchRequest_v0(Struct):
)
-class OffsetFetchResponse(Struct):
+class GroupCoordinatorResponse(Struct):
SCHEMA = Schema(
- ('topics', Array(
- ('topic', String('utf-8')),
- ('partitions', Array(
- ('partition', Int32),
- ('offset', Int64),
- ('metadata', String('utf-8')),
- ('error_code', Int16)))))
+ ('error_code', Int16),
+ ('coordinator_id', Int32),
+ ('host', String('utf-8')),
+ ('port', Int32)
)
class GroupCoordinatorRequest(Struct):
API_KEY = 10
API_VERSION = 0
+ RESPONSE_TYPE = GroupCoordinatorResponse
SCHEMA = Schema(
('consumer_group', String('utf-8'))
)
-
-
-class GroupCoordinatorResponse(Struct):
- SCHEMA = Schema(
- ('error_code', Int16),
- ('coordinator_id', Int32),
- ('host', String('utf-8')),
- ('port', Int32)
- )