From a85e09df89a43de5b659a0fa4ed35bec37c60e04 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 28 Nov 2015 19:41:06 +0800 Subject: Rework protocol type definition: AbstractType, Schema, Struct --- kafka/protocol/commit.py | 111 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 kafka/protocol/commit.py (limited to 'kafka/protocol/commit.py') diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py new file mode 100644 index 0000000..5ba0227 --- /dev/null +++ b/kafka/protocol/commit.py @@ -0,0 +1,111 @@ +from .struct import Struct +from .types import Array, Int16, Int32, Int64, Schema, String + + +class OffsetCommitRequest_v2(Struct): + API_KEY = 8 + API_VERSION = 2 # added retention_time, dropped timestamp + SCHEMA = Schema( + ('consumer_group', String('utf-8')), + ('consumer_group_generation_id', Int32), + ('consumer_id', String('utf-8')), + ('retention_time', Int64), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('metadata', String('utf-8')))))) + ) + + +class OffsetCommitRequest_v1(Struct): + API_KEY = 8 + API_VERSION = 1 # Kafka-backed storage + SCHEMA = Schema( + ('consumer_group', String('utf-8')), + ('consumer_group_generation_id', Int32), + ('consumer_id', String('utf-8')), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('timestamp', Int64), + ('metadata', String('utf-8')))))) + ) + + +class OffsetCommitRequest_v0(Struct): + API_KEY = 8 + API_VERSION = 0 # Zookeeper-backed storage + SCHEMA = Schema( + ('consumer_group', String('utf-8')), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('metadata', String('utf-8')))))) + ) + + +class OffsetCommitResponse(Struct): + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16))))) + ) + + +class OffsetFetchRequest_v1(Struct): + API_KEY = 9 + API_VERSION = 1 # kafka-backed storage + SCHEMA = Schema( + ('consumer_group', String('utf-8')), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array(Int32)))) + ) + + +class OffsetFetchRequest_v0(Struct): + API_KEY = 9 + API_VERSION = 0 # zookeeper-backed storage + SCHEMA = Schema( + ('consumer_group', String('utf-8')), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array(Int32)))) + ) + + +class OffsetFetchResponse(Struct): + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('metadata', String('utf-8')), + ('error_code', Int16))))) + ) + + +class GroupCoordinatorRequest(Struct): + API_KEY = 10 + API_VERSION = 0 + SCHEMA = Schema( + ('consumer_group', String('utf-8')) + ) + + +class GroupCoordinatorResponse(Struct): + SCHEMA = Schema( + ('error_code', Int16), + ('coordinator_id', Int32), + ('host', String('utf-8')), + ('port', Int32) + ) -- cgit v1.2.1 From 058567912e8d82c1da5e5ead9e30be532573a173 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 29 Nov 2015 10:00:50 +0800 Subject: Add simple BrokerConnection class; add request.RESPONSE_TYPE class vars --- kafka/protocol/commit.py | 42 ++++++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 18 deletions(-) (limited to 'kafka/protocol/commit.py') diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py index 5ba0227..2955de1 100644 --- a/kafka/protocol/commit.py +++ b/kafka/protocol/commit.py @@ -2,9 +2,20 @@ from .struct import Struct from .types import Array, Int16, Int32, Int64, Schema, String +class OffsetCommitResponse(Struct): + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16))))) + ) + + class OffsetCommitRequest_v2(Struct): API_KEY = 8 API_VERSION = 2 # added retention_time, dropped timestamp + RESPONSE_TYPE = OffsetCommitResponse SCHEMA = Schema( ('consumer_group', String('utf-8')), ('consumer_group_generation_id', Int32), @@ -22,6 +33,7 @@ class OffsetCommitRequest_v2(Struct): class OffsetCommitRequest_v1(Struct): API_KEY = 8 API_VERSION = 1 # Kafka-backed storage + RESPONSE_TYPE = OffsetCommitResponse SCHEMA = Schema( ('consumer_group', String('utf-8')), ('consumer_group_generation_id', Int32), @@ -39,6 +51,7 @@ class OffsetCommitRequest_v1(Struct): class OffsetCommitRequest_v0(Struct): API_KEY = 8 API_VERSION = 0 # Zookeeper-backed storage + RESPONSE_TYPE = OffsetCommitResponse SCHEMA = Schema( ('consumer_group', String('utf-8')), ('topics', Array( @@ -50,12 +63,14 @@ class OffsetCommitRequest_v0(Struct): ) -class OffsetCommitResponse(Struct): +class OffsetFetchResponse(Struct): SCHEMA = Schema( ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), + ('offset', Int64), + ('metadata', String('utf-8')), ('error_code', Int16))))) ) @@ -63,6 +78,7 @@ class OffsetCommitResponse(Struct): class OffsetFetchRequest_v1(Struct): API_KEY = 9 API_VERSION = 1 # kafka-backed storage + RESPONSE_TYPE = OffsetFetchResponse SCHEMA = Schema( ('consumer_group', String('utf-8')), ('topics', Array( @@ -74,6 +90,7 @@ class OffsetFetchRequest_v1(Struct): class OffsetFetchRequest_v0(Struct): API_KEY = 9 API_VERSION = 0 # zookeeper-backed storage + RESPONSE_TYPE = OffsetFetchResponse SCHEMA = Schema( ('consumer_group', String('utf-8')), ('topics', Array( @@ -82,30 +99,19 @@ class OffsetFetchRequest_v0(Struct): ) -class OffsetFetchResponse(Struct): +class GroupCoordinatorResponse(Struct): SCHEMA = Schema( - ('topics', Array( - ('topic', String('utf-8')), - ('partitions', Array( - ('partition', Int32), - ('offset', Int64), - ('metadata', String('utf-8')), - ('error_code', Int16))))) + ('error_code', Int16), + ('coordinator_id', Int32), + ('host', String('utf-8')), + ('port', Int32) ) class GroupCoordinatorRequest(Struct): API_KEY = 10 API_VERSION = 0 + RESPONSE_TYPE = GroupCoordinatorResponse SCHEMA = Schema( ('consumer_group', String('utf-8')) ) - - -class GroupCoordinatorResponse(Struct): - SCHEMA = Schema( - ('error_code', Int16), - ('coordinator_id', Int32), - ('host', String('utf-8')), - ('port', Int32) - ) -- cgit v1.2.1 From 264fc8bb7ccd314f6361e9bd223b1faa0354afbc Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 28 Dec 2015 13:39:10 -0800 Subject: Add DEFAULT_GENERATION_ID and DEFAULT_RETENTION_TIME to OffsetCommitRequest_v2 --- kafka/protocol/commit.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'kafka/protocol/commit.py') diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py index 2955de1..a32f8d3 100644 --- a/kafka/protocol/commit.py +++ b/kafka/protocol/commit.py @@ -28,6 +28,8 @@ class OffsetCommitRequest_v2(Struct): ('offset', Int64), ('metadata', String('utf-8')))))) ) + DEFAULT_GENERATION_ID = -1 + DEFAULT_RETENTION_TIME = -1 class OffsetCommitRequest_v1(Struct): -- cgit v1.2.1