diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-05 22:54:28 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-05 22:54:28 -0700 |
commit | 90c729438a2e3f1b194e58231e41bd16bd7b7172 (patch) | |
tree | b22cef6b10fd167fb22b8318e1294f6137427f3b /kafka/protocol/commit.py | |
parent | 452e7c2190b83f320f58e7f650302696dde458ed (diff) | |
download | kafka-python-90c729438a2e3f1b194e58231e41bd16bd7b7172.tar.gz |
Use version-indexed lists for request/response protocol structsprotocol_versions
Diffstat (limited to 'kafka/protocol/commit.py')
-rw-r--r-- | kafka/protocol/commit.py | 104 |
1 files changed, 81 insertions, 23 deletions
diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py index a32f8d3..90a3b76 100644 --- a/kafka/protocol/commit.py +++ b/kafka/protocol/commit.py @@ -2,7 +2,9 @@ from .struct import Struct from .types import Array, Int16, Int32, Int64, Schema, String -class OffsetCommitResponse(Struct): +class OffsetCommitResponse_v0(Struct): + API_KEY = 8 + API_VERSION = 0 SCHEMA = Schema( ('topics', Array( ('topic', String('utf-8')), @@ -12,15 +14,36 @@ class OffsetCommitResponse(Struct): ) -class OffsetCommitRequest_v2(Struct): +class OffsetCommitResponse_v1(Struct): API_KEY = 8 - API_VERSION = 2 # added retention_time, dropped timestamp - RESPONSE_TYPE = OffsetCommitResponse + API_VERSION = 1 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16))))) + ) + + +class OffsetCommitResponse_v2(Struct): + API_KEY = 8 + API_VERSION = 2 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16))))) + ) + + +class OffsetCommitRequest_v0(Struct): + API_KEY = 8 + API_VERSION = 0 # Zookeeper-backed storage + RESPONSE_TYPE = OffsetCommitResponse_v0 SCHEMA = Schema( ('consumer_group', String('utf-8')), - ('consumer_group_generation_id', Int32), - ('consumer_id', String('utf-8')), - ('retention_time', Int64), ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( @@ -28,14 +51,12 @@ class OffsetCommitRequest_v2(Struct): ('offset', Int64), ('metadata', String('utf-8')))))) ) - DEFAULT_GENERATION_ID = -1 - DEFAULT_RETENTION_TIME = -1 class OffsetCommitRequest_v1(Struct): API_KEY = 8 API_VERSION = 1 # Kafka-backed storage - RESPONSE_TYPE = OffsetCommitResponse + RESPONSE_TYPE = OffsetCommitResponse_v1 SCHEMA = Schema( ('consumer_group', String('utf-8')), ('consumer_group_generation_id', Int32), @@ -50,12 +71,15 @@ class OffsetCommitRequest_v1(Struct): ) -class OffsetCommitRequest_v0(Struct): +class OffsetCommitRequest_v2(Struct): API_KEY = 8 - API_VERSION = 0 # Zookeeper-backed storage - RESPONSE_TYPE = OffsetCommitResponse + API_VERSION = 2 # added retention_time, dropped timestamp + RESPONSE_TYPE = OffsetCommitResponse_v2 SCHEMA = Schema( ('consumer_group', String('utf-8')), + ('consumer_group_generation_id', Int32), + ('consumer_id', String('utf-8')), + ('retention_time', Int64), ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( @@ -63,9 +87,19 @@ class OffsetCommitRequest_v0(Struct): ('offset', Int64), ('metadata', String('utf-8')))))) ) + DEFAULT_GENERATION_ID = -1 + DEFAULT_RETENTION_TIME = -1 -class OffsetFetchResponse(Struct): +OffsetCommitRequest = [OffsetCommitRequest_v0, OffsetCommitRequest_v1, + OffsetCommitRequest_v2] +OffsetCommitResponse = [OffsetCommitResponse_v0, OffsetCommitResponse_v1, + OffsetCommitResponse_v2] + + +class OffsetFetchResponse_v0(Struct): + API_KEY = 9 + API_VERSION = 0 SCHEMA = Schema( ('topics', Array( ('topic', String('utf-8')), @@ -77,22 +111,24 @@ class OffsetFetchResponse(Struct): ) -class OffsetFetchRequest_v1(Struct): +class OffsetFetchResponse_v1(Struct): API_KEY = 9 - API_VERSION = 1 # kafka-backed storage - RESPONSE_TYPE = OffsetFetchResponse + API_VERSION = 1 SCHEMA = Schema( - ('consumer_group', String('utf-8')), ('topics', Array( ('topic', String('utf-8')), - ('partitions', Array(Int32)))) + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('metadata', String('utf-8')), + ('error_code', Int16))))) ) class OffsetFetchRequest_v0(Struct): API_KEY = 9 API_VERSION = 0 # zookeeper-backed storage - RESPONSE_TYPE = OffsetFetchResponse + RESPONSE_TYPE = OffsetFetchResponse_v0 SCHEMA = Schema( ('consumer_group', String('utf-8')), ('topics', Array( @@ -101,7 +137,25 @@ class OffsetFetchRequest_v0(Struct): ) -class GroupCoordinatorResponse(Struct): +class OffsetFetchRequest_v1(Struct): + API_KEY = 9 + API_VERSION = 1 # kafka-backed storage + RESPONSE_TYPE = OffsetFetchResponse_v1 + SCHEMA = Schema( + ('consumer_group', String('utf-8')), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array(Int32)))) + ) + + +OffsetFetchRequest = [OffsetFetchRequest_v0, OffsetFetchRequest_v1] +OffsetFetchResponse = [OffsetFetchResponse_v0, OffsetFetchResponse_v1] + + +class GroupCoordinatorResponse_v0(Struct): + API_KEY = 10 + API_VERSION = 0 SCHEMA = Schema( ('error_code', Int16), ('coordinator_id', Int32), @@ -110,10 +164,14 @@ class GroupCoordinatorResponse(Struct): ) -class GroupCoordinatorRequest(Struct): +class GroupCoordinatorRequest_v0(Struct): API_KEY = 10 API_VERSION = 0 - RESPONSE_TYPE = GroupCoordinatorResponse + RESPONSE_TYPE = GroupCoordinatorResponse_v0 SCHEMA = Schema( ('consumer_group', String('utf-8')) ) + + +GroupCoordinatorRequest = [GroupCoordinatorRequest_v0] +GroupCoordinatorResponse = [GroupCoordinatorResponse_v0] |