From a85e09df89a43de5b659a0fa4ed35bec37c60e04 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 28 Nov 2015 19:41:06 +0800 Subject: Rework protocol type definition: AbstractType, Schema, Struct --- kafka/protocol/commit.py | 111 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 kafka/protocol/commit.py (limited to 'kafka/protocol/commit.py') diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py new file mode 100644 index 0000000..5ba0227 --- /dev/null +++ b/kafka/protocol/commit.py @@ -0,0 +1,111 @@ +from .struct import Struct +from .types import Array, Int16, Int32, Int64, Schema, String + + +class OffsetCommitRequest_v2(Struct): + API_KEY = 8 + API_VERSION = 2 # added retention_time, dropped timestamp + SCHEMA = Schema( + ('consumer_group', String('utf-8')), + ('consumer_group_generation_id', Int32), + ('consumer_id', String('utf-8')), + ('retention_time', Int64), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('metadata', String('utf-8')))))) + ) + + +class OffsetCommitRequest_v1(Struct): + API_KEY = 8 + API_VERSION = 1 # Kafka-backed storage + SCHEMA = Schema( + ('consumer_group', String('utf-8')), + ('consumer_group_generation_id', Int32), + ('consumer_id', String('utf-8')), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('timestamp', Int64), + ('metadata', String('utf-8')))))) + ) + + +class OffsetCommitRequest_v0(Struct): + API_KEY = 8 + API_VERSION = 0 # Zookeeper-backed storage + SCHEMA = Schema( + ('consumer_group', String('utf-8')), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('metadata', String('utf-8')))))) + ) + + +class OffsetCommitResponse(Struct): + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16))))) + ) + + +class OffsetFetchRequest_v1(Struct): + API_KEY = 9 + API_VERSION = 1 # kafka-backed storage + SCHEMA = Schema( + ('consumer_group', String('utf-8')), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array(Int32)))) + ) + + +class OffsetFetchRequest_v0(Struct): + API_KEY = 9 + API_VERSION = 0 # zookeeper-backed storage + SCHEMA = Schema( + ('consumer_group', String('utf-8')), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array(Int32)))) + ) + + +class OffsetFetchResponse(Struct): + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('metadata', String('utf-8')), + ('error_code', Int16))))) + ) + + +class GroupCoordinatorRequest(Struct): + API_KEY = 10 + API_VERSION = 0 + SCHEMA = Schema( + ('consumer_group', String('utf-8')) + ) + + +class GroupCoordinatorResponse(Struct): + SCHEMA = Schema( + ('error_code', Int16), + ('coordinator_id', Int32), + ('host', String('utf-8')), + ('port', Int32) + ) -- cgit v1.2.1