summaryrefslogtreecommitdiff
path: root/kafka/protocol/commit.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-05 22:54:28 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-05 22:54:28 -0700
commit90c729438a2e3f1b194e58231e41bd16bd7b7172 (patch)
treeb22cef6b10fd167fb22b8318e1294f6137427f3b /kafka/protocol/commit.py
parent452e7c2190b83f320f58e7f650302696dde458ed (diff)
downloadkafka-python-protocol_versions.tar.gz
Use version-indexed lists for request/response protocol structsprotocol_versions
Diffstat (limited to 'kafka/protocol/commit.py')
-rw-r--r--kafka/protocol/commit.py104
1 files changed, 81 insertions, 23 deletions
diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py
index a32f8d3..90a3b76 100644
--- a/kafka/protocol/commit.py
+++ b/kafka/protocol/commit.py
@@ -2,7 +2,9 @@ from .struct import Struct
from .types import Array, Int16, Int32, Int64, Schema, String
-class OffsetCommitResponse(Struct):
+class OffsetCommitResponse_v0(Struct):
+ API_KEY = 8
+ API_VERSION = 0
SCHEMA = Schema(
('topics', Array(
('topic', String('utf-8')),
@@ -12,15 +14,36 @@ class OffsetCommitResponse(Struct):
)
-class OffsetCommitRequest_v2(Struct):
+class OffsetCommitResponse_v1(Struct):
API_KEY = 8
- API_VERSION = 2 # added retention_time, dropped timestamp
- RESPONSE_TYPE = OffsetCommitResponse
+ API_VERSION = 1
+ SCHEMA = Schema(
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16)))))
+ )
+
+
+class OffsetCommitResponse_v2(Struct):
+ API_KEY = 8
+ API_VERSION = 2
+ SCHEMA = Schema(
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16)))))
+ )
+
+
+class OffsetCommitRequest_v0(Struct):
+ API_KEY = 8
+ API_VERSION = 0 # Zookeeper-backed storage
+ RESPONSE_TYPE = OffsetCommitResponse_v0
SCHEMA = Schema(
('consumer_group', String('utf-8')),
- ('consumer_group_generation_id', Int32),
- ('consumer_id', String('utf-8')),
- ('retention_time', Int64),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
@@ -28,14 +51,12 @@ class OffsetCommitRequest_v2(Struct):
('offset', Int64),
('metadata', String('utf-8'))))))
)
- DEFAULT_GENERATION_ID = -1
- DEFAULT_RETENTION_TIME = -1
class OffsetCommitRequest_v1(Struct):
API_KEY = 8
API_VERSION = 1 # Kafka-backed storage
- RESPONSE_TYPE = OffsetCommitResponse
+ RESPONSE_TYPE = OffsetCommitResponse_v1
SCHEMA = Schema(
('consumer_group', String('utf-8')),
('consumer_group_generation_id', Int32),
@@ -50,12 +71,15 @@ class OffsetCommitRequest_v1(Struct):
)
-class OffsetCommitRequest_v0(Struct):
+class OffsetCommitRequest_v2(Struct):
API_KEY = 8
- API_VERSION = 0 # Zookeeper-backed storage
- RESPONSE_TYPE = OffsetCommitResponse
+ API_VERSION = 2 # added retention_time, dropped timestamp
+ RESPONSE_TYPE = OffsetCommitResponse_v2
SCHEMA = Schema(
('consumer_group', String('utf-8')),
+ ('consumer_group_generation_id', Int32),
+ ('consumer_id', String('utf-8')),
+ ('retention_time', Int64),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
@@ -63,9 +87,19 @@ class OffsetCommitRequest_v0(Struct):
('offset', Int64),
('metadata', String('utf-8'))))))
)
+ DEFAULT_GENERATION_ID = -1
+ DEFAULT_RETENTION_TIME = -1
-class OffsetFetchResponse(Struct):
+OffsetCommitRequest = [OffsetCommitRequest_v0, OffsetCommitRequest_v1,
+ OffsetCommitRequest_v2]
+OffsetCommitResponse = [OffsetCommitResponse_v0, OffsetCommitResponse_v1,
+ OffsetCommitResponse_v2]
+
+
+class OffsetFetchResponse_v0(Struct):
+ API_KEY = 9
+ API_VERSION = 0
SCHEMA = Schema(
('topics', Array(
('topic', String('utf-8')),
@@ -77,22 +111,24 @@ class OffsetFetchResponse(Struct):
)
-class OffsetFetchRequest_v1(Struct):
+class OffsetFetchResponse_v1(Struct):
API_KEY = 9
- API_VERSION = 1 # kafka-backed storage
- RESPONSE_TYPE = OffsetFetchResponse
+ API_VERSION = 1
SCHEMA = Schema(
- ('consumer_group', String('utf-8')),
('topics', Array(
('topic', String('utf-8')),
- ('partitions', Array(Int32))))
+ ('partitions', Array(
+ ('partition', Int32),
+ ('offset', Int64),
+ ('metadata', String('utf-8')),
+ ('error_code', Int16)))))
)
class OffsetFetchRequest_v0(Struct):
API_KEY = 9
API_VERSION = 0 # zookeeper-backed storage
- RESPONSE_TYPE = OffsetFetchResponse
+ RESPONSE_TYPE = OffsetFetchResponse_v0
SCHEMA = Schema(
('consumer_group', String('utf-8')),
('topics', Array(
@@ -101,7 +137,25 @@ class OffsetFetchRequest_v0(Struct):
)
-class GroupCoordinatorResponse(Struct):
+class OffsetFetchRequest_v1(Struct):
+ API_KEY = 9
+ API_VERSION = 1 # kafka-backed storage
+ RESPONSE_TYPE = OffsetFetchResponse_v1
+ SCHEMA = Schema(
+ ('consumer_group', String('utf-8')),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(Int32))))
+ )
+
+
+OffsetFetchRequest = [OffsetFetchRequest_v0, OffsetFetchRequest_v1]
+OffsetFetchResponse = [OffsetFetchResponse_v0, OffsetFetchResponse_v1]
+
+
+class GroupCoordinatorResponse_v0(Struct):
+ API_KEY = 10
+ API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('coordinator_id', Int32),
@@ -110,10 +164,14 @@ class GroupCoordinatorResponse(Struct):
)
-class GroupCoordinatorRequest(Struct):
+class GroupCoordinatorRequest_v0(Struct):
API_KEY = 10
API_VERSION = 0
- RESPONSE_TYPE = GroupCoordinatorResponse
+ RESPONSE_TYPE = GroupCoordinatorResponse_v0
SCHEMA = Schema(
('consumer_group', String('utf-8'))
)
+
+
+GroupCoordinatorRequest = [GroupCoordinatorRequest_v0]
+GroupCoordinatorResponse = [GroupCoordinatorResponse_v0]