diff options
Diffstat (limited to 'kafka/protocol/commit.py')
-rw-r--r-- | kafka/protocol/commit.py | 42 |
1 files changed, 24 insertions, 18 deletions
diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py index 5ba0227..2955de1 100644 --- a/kafka/protocol/commit.py +++ b/kafka/protocol/commit.py @@ -2,9 +2,20 @@ from .struct import Struct from .types import Array, Int16, Int32, Int64, Schema, String +class OffsetCommitResponse(Struct): + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16))))) + ) + + class OffsetCommitRequest_v2(Struct): API_KEY = 8 API_VERSION = 2 # added retention_time, dropped timestamp + RESPONSE_TYPE = OffsetCommitResponse SCHEMA = Schema( ('consumer_group', String('utf-8')), ('consumer_group_generation_id', Int32), @@ -22,6 +33,7 @@ class OffsetCommitRequest_v2(Struct): class OffsetCommitRequest_v1(Struct): API_KEY = 8 API_VERSION = 1 # Kafka-backed storage + RESPONSE_TYPE = OffsetCommitResponse SCHEMA = Schema( ('consumer_group', String('utf-8')), ('consumer_group_generation_id', Int32), @@ -39,6 +51,7 @@ class OffsetCommitRequest_v1(Struct): class OffsetCommitRequest_v0(Struct): API_KEY = 8 API_VERSION = 0 # Zookeeper-backed storage + RESPONSE_TYPE = OffsetCommitResponse SCHEMA = Schema( ('consumer_group', String('utf-8')), ('topics', Array( @@ -50,12 +63,14 @@ class OffsetCommitRequest_v0(Struct): ) -class OffsetCommitResponse(Struct): +class OffsetFetchResponse(Struct): SCHEMA = Schema( ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), + ('offset', Int64), + ('metadata', String('utf-8')), ('error_code', Int16))))) ) @@ -63,6 +78,7 @@ class OffsetCommitResponse(Struct): class OffsetFetchRequest_v1(Struct): API_KEY = 9 API_VERSION = 1 # kafka-backed storage + RESPONSE_TYPE = OffsetFetchResponse SCHEMA = Schema( ('consumer_group', String('utf-8')), ('topics', Array( @@ -74,6 +90,7 @@ class OffsetFetchRequest_v1(Struct): class OffsetFetchRequest_v0(Struct): API_KEY = 9 API_VERSION = 0 # zookeeper-backed storage + RESPONSE_TYPE = OffsetFetchResponse SCHEMA = Schema( ('consumer_group', String('utf-8')), ('topics', Array( @@ -82,30 +99,19 @@ class OffsetFetchRequest_v0(Struct): ) -class OffsetFetchResponse(Struct): +class GroupCoordinatorResponse(Struct): SCHEMA = Schema( - ('topics', Array( - ('topic', String('utf-8')), - ('partitions', Array( - ('partition', Int32), - ('offset', Int64), - ('metadata', String('utf-8')), - ('error_code', Int16))))) + ('error_code', Int16), + ('coordinator_id', Int32), + ('host', String('utf-8')), + ('port', Int32) ) class GroupCoordinatorRequest(Struct): API_KEY = 10 API_VERSION = 0 + RESPONSE_TYPE = GroupCoordinatorResponse SCHEMA = Schema( ('consumer_group', String('utf-8')) ) - - -class GroupCoordinatorResponse(Struct): - SCHEMA = Schema( - ('error_code', Int16), - ('coordinator_id', Int32), - ('host', String('utf-8')), - ('port', Int32) - ) |