summaryrefslogtreecommitdiff
path: root/kafka/protocol
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-05 22:54:28 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-05 22:54:28 -0700
commit90c729438a2e3f1b194e58231e41bd16bd7b7172 (patch)
treeb22cef6b10fd167fb22b8318e1294f6137427f3b /kafka/protocol
parent452e7c2190b83f320f58e7f650302696dde458ed (diff)
downloadkafka-python-protocol_versions.tar.gz
Use version-indexed lists for request/response protocol structsprotocol_versions
Diffstat (limited to 'kafka/protocol')
-rw-r--r--kafka/protocol/admin.py24
-rw-r--r--kafka/protocol/commit.py104
-rw-r--r--kafka/protocol/fetch.py12
-rw-r--r--kafka/protocol/group.py48
-rw-r--r--kafka/protocol/legacy.py16
-rw-r--r--kafka/protocol/metadata.py12
-rw-r--r--kafka/protocol/offset.py13
-rw-r--r--kafka/protocol/produce.py14
8 files changed, 181 insertions, 62 deletions
diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py
index 56dd042..8c74613 100644
--- a/kafka/protocol/admin.py
+++ b/kafka/protocol/admin.py
@@ -2,7 +2,9 @@ from .struct import Struct
from .types import Array, Bytes, Int16, Schema, String
-class ListGroupsResponse(Struct):
+class ListGroupsResponse_v0(Struct):
+ API_KEY = 16
+ API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('groups', Array(
@@ -11,14 +13,20 @@ class ListGroupsResponse(Struct):
)
-class ListGroupsRequest(Struct):
+class ListGroupsRequest_v0(Struct):
API_KEY = 16
API_VERSION = 0
- RESPONSE_TYPE = ListGroupsResponse
+ RESPONSE_TYPE = ListGroupsResponse_v0
SCHEMA = Schema()
-class DescribeGroupsResponse(Struct):
+ListGroupsRequest = [ListGroupsRequest_v0]
+ListGroupsResponse = [ListGroupsResponse_v0]
+
+
+class DescribeGroupsResponse_v0(Struct):
+ API_KEY = 15
+ API_VERSION = 0
SCHEMA = Schema(
('groups', Array(
('error_code', Int16),
@@ -35,10 +43,14 @@ class DescribeGroupsResponse(Struct):
)
-class DescribeGroupsRequest(Struct):
+class DescribeGroupsRequest_v0(Struct):
API_KEY = 15
API_VERSION = 0
- RESPONSE_TYPE = DescribeGroupsResponse
+ RESPONSE_TYPE = DescribeGroupsResponse_v0
SCHEMA = Schema(
('groups', Array(String('utf-8')))
)
+
+
+DescribeGroupsRequest = [DescribeGroupsRequest_v0]
+DescribeGroupsResponse = [DescribeGroupsResponse_v0]
diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py
index a32f8d3..90a3b76 100644
--- a/kafka/protocol/commit.py
+++ b/kafka/protocol/commit.py
@@ -2,7 +2,9 @@ from .struct import Struct
from .types import Array, Int16, Int32, Int64, Schema, String
-class OffsetCommitResponse(Struct):
+class OffsetCommitResponse_v0(Struct):
+ API_KEY = 8
+ API_VERSION = 0
SCHEMA = Schema(
('topics', Array(
('topic', String('utf-8')),
@@ -12,15 +14,36 @@ class OffsetCommitResponse(Struct):
)
-class OffsetCommitRequest_v2(Struct):
+class OffsetCommitResponse_v1(Struct):
API_KEY = 8
- API_VERSION = 2 # added retention_time, dropped timestamp
- RESPONSE_TYPE = OffsetCommitResponse
+ API_VERSION = 1
+ SCHEMA = Schema(
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16)))))
+ )
+
+
+class OffsetCommitResponse_v2(Struct):
+ API_KEY = 8
+ API_VERSION = 2
+ SCHEMA = Schema(
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16)))))
+ )
+
+
+class OffsetCommitRequest_v0(Struct):
+ API_KEY = 8
+ API_VERSION = 0 # Zookeeper-backed storage
+ RESPONSE_TYPE = OffsetCommitResponse_v0
SCHEMA = Schema(
('consumer_group', String('utf-8')),
- ('consumer_group_generation_id', Int32),
- ('consumer_id', String('utf-8')),
- ('retention_time', Int64),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
@@ -28,14 +51,12 @@ class OffsetCommitRequest_v2(Struct):
('offset', Int64),
('metadata', String('utf-8'))))))
)
- DEFAULT_GENERATION_ID = -1
- DEFAULT_RETENTION_TIME = -1
class OffsetCommitRequest_v1(Struct):
API_KEY = 8
API_VERSION = 1 # Kafka-backed storage
- RESPONSE_TYPE = OffsetCommitResponse
+ RESPONSE_TYPE = OffsetCommitResponse_v1
SCHEMA = Schema(
('consumer_group', String('utf-8')),
('consumer_group_generation_id', Int32),
@@ -50,12 +71,15 @@ class OffsetCommitRequest_v1(Struct):
)
-class OffsetCommitRequest_v0(Struct):
+class OffsetCommitRequest_v2(Struct):
API_KEY = 8
- API_VERSION = 0 # Zookeeper-backed storage
- RESPONSE_TYPE = OffsetCommitResponse
+ API_VERSION = 2 # added retention_time, dropped timestamp
+ RESPONSE_TYPE = OffsetCommitResponse_v2
SCHEMA = Schema(
('consumer_group', String('utf-8')),
+ ('consumer_group_generation_id', Int32),
+ ('consumer_id', String('utf-8')),
+ ('retention_time', Int64),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
@@ -63,9 +87,19 @@ class OffsetCommitRequest_v0(Struct):
('offset', Int64),
('metadata', String('utf-8'))))))
)
+ DEFAULT_GENERATION_ID = -1
+ DEFAULT_RETENTION_TIME = -1
-class OffsetFetchResponse(Struct):
+OffsetCommitRequest = [OffsetCommitRequest_v0, OffsetCommitRequest_v1,
+ OffsetCommitRequest_v2]
+OffsetCommitResponse = [OffsetCommitResponse_v0, OffsetCommitResponse_v1,
+ OffsetCommitResponse_v2]
+
+
+class OffsetFetchResponse_v0(Struct):
+ API_KEY = 9
+ API_VERSION = 0
SCHEMA = Schema(
('topics', Array(
('topic', String('utf-8')),
@@ -77,22 +111,24 @@ class OffsetFetchResponse(Struct):
)
-class OffsetFetchRequest_v1(Struct):
+class OffsetFetchResponse_v1(Struct):
API_KEY = 9
- API_VERSION = 1 # kafka-backed storage
- RESPONSE_TYPE = OffsetFetchResponse
+ API_VERSION = 1
SCHEMA = Schema(
- ('consumer_group', String('utf-8')),
('topics', Array(
('topic', String('utf-8')),
- ('partitions', Array(Int32))))
+ ('partitions', Array(
+ ('partition', Int32),
+ ('offset', Int64),
+ ('metadata', String('utf-8')),
+ ('error_code', Int16)))))
)
class OffsetFetchRequest_v0(Struct):
API_KEY = 9
API_VERSION = 0 # zookeeper-backed storage
- RESPONSE_TYPE = OffsetFetchResponse
+ RESPONSE_TYPE = OffsetFetchResponse_v0
SCHEMA = Schema(
('consumer_group', String('utf-8')),
('topics', Array(
@@ -101,7 +137,25 @@ class OffsetFetchRequest_v0(Struct):
)
-class GroupCoordinatorResponse(Struct):
+class OffsetFetchRequest_v1(Struct):
+ API_KEY = 9
+ API_VERSION = 1 # kafka-backed storage
+ RESPONSE_TYPE = OffsetFetchResponse_v1
+ SCHEMA = Schema(
+ ('consumer_group', String('utf-8')),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(Int32))))
+ )
+
+
+OffsetFetchRequest = [OffsetFetchRequest_v0, OffsetFetchRequest_v1]
+OffsetFetchResponse = [OffsetFetchResponse_v0, OffsetFetchResponse_v1]
+
+
+class GroupCoordinatorResponse_v0(Struct):
+ API_KEY = 10
+ API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('coordinator_id', Int32),
@@ -110,10 +164,14 @@ class GroupCoordinatorResponse(Struct):
)
-class GroupCoordinatorRequest(Struct):
+class GroupCoordinatorRequest_v0(Struct):
API_KEY = 10
API_VERSION = 0
- RESPONSE_TYPE = GroupCoordinatorResponse
+ RESPONSE_TYPE = GroupCoordinatorResponse_v0
SCHEMA = Schema(
('consumer_group', String('utf-8'))
)
+
+
+GroupCoordinatorRequest = [GroupCoordinatorRequest_v0]
+GroupCoordinatorResponse = [GroupCoordinatorResponse_v0]
diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py
index e00c9ab..eeda4e7 100644
--- a/kafka/protocol/fetch.py
+++ b/kafka/protocol/fetch.py
@@ -3,7 +3,9 @@ from .struct import Struct
from .types import Array, Int16, Int32, Int64, Schema, String
-class FetchResponse(Struct):
+class FetchResponse_v0(Struct):
+ API_KEY = 1
+ API_VERSION = 0
SCHEMA = Schema(
('topics', Array(
('topics', String('utf-8')),
@@ -15,10 +17,10 @@ class FetchResponse(Struct):
)
-class FetchRequest(Struct):
+class FetchRequest_v0(Struct):
API_KEY = 1
API_VERSION = 0
- RESPONSE_TYPE = FetchResponse
+ RESPONSE_TYPE = FetchResponse_v0
SCHEMA = Schema(
('replica_id', Int32),
('max_wait_time', Int32),
@@ -30,3 +32,7 @@ class FetchRequest(Struct):
('offset', Int64),
('max_bytes', Int32)))))
)
+
+
+FetchRequest = [FetchRequest_v0]
+FetchResponse = [FetchResponse_v0]
diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py
index 72de005..97ae5f7 100644
--- a/kafka/protocol/group.py
+++ b/kafka/protocol/group.py
@@ -2,7 +2,9 @@ from .struct import Struct
from .types import Array, Bytes, Int16, Int32, Schema, String
-class JoinGroupResponse(Struct):
+class JoinGroupResponse_v0(Struct):
+ API_KEY = 11
+ API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('generation_id', Int32),
@@ -15,10 +17,10 @@ class JoinGroupResponse(Struct):
)
-class JoinGroupRequest(Struct):
+class JoinGroupRequest_v0(Struct):
API_KEY = 11
API_VERSION = 0
- RESPONSE_TYPE = JoinGroupResponse
+ RESPONSE_TYPE = JoinGroupResponse_v0
SCHEMA = Schema(
('group', String('utf-8')),
('session_timeout', Int32),
@@ -31,6 +33,10 @@ class JoinGroupRequest(Struct):
UNKNOWN_MEMBER_ID = ''
+JoinGroupRequest = [JoinGroupRequest_v0]
+JoinGroupResponse = [JoinGroupResponse_v0]
+
+
class ProtocolMetadata(Struct):
SCHEMA = Schema(
('version', Int16),
@@ -39,17 +45,19 @@ class ProtocolMetadata(Struct):
)
-class SyncGroupResponse(Struct):
+class SyncGroupResponse_v0(Struct):
+ API_KEY = 14
+ API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('member_assignment', Bytes)
)
-class SyncGroupRequest(Struct):
+class SyncGroupRequest_v0(Struct):
API_KEY = 14
API_VERSION = 0
- RESPONSE_TYPE = SyncGroupResponse
+ RESPONSE_TYPE = SyncGroupResponse_v0
SCHEMA = Schema(
('group', String('utf-8')),
('generation_id', Int32),
@@ -60,6 +68,10 @@ class SyncGroupRequest(Struct):
)
+SyncGroupRequest = [SyncGroupRequest_v0]
+SyncGroupResponse = [SyncGroupResponse_v0]
+
+
class MemberAssignment(Struct):
SCHEMA = Schema(
('version', Int16),
@@ -70,16 +82,18 @@ class MemberAssignment(Struct):
)
-class HeartbeatResponse(Struct):
+class HeartbeatResponse_v0(Struct):
+ API_KEY = 12
+ API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16)
)
-class HeartbeatRequest(Struct):
+class HeartbeatRequest_v0(Struct):
API_KEY = 12
API_VERSION = 0
- RESPONSE_TYPE = HeartbeatResponse
+ RESPONSE_TYPE = HeartbeatResponse_v0
SCHEMA = Schema(
('group', String('utf-8')),
('generation_id', Int32),
@@ -87,17 +101,27 @@ class HeartbeatRequest(Struct):
)
-class LeaveGroupResponse(Struct):
+HeartbeatRequest = [HeartbeatRequest_v0]
+HeartbeatResponse = [HeartbeatResponse_v0]
+
+
+class LeaveGroupResponse_v0(Struct):
+ API_KEY = 13
+ API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16)
)
-class LeaveGroupRequest(Struct):
+class LeaveGroupRequest_v0(Struct):
API_KEY = 13
API_VERSION = 0
- RESPONSE_TYPE = LeaveGroupResponse
+ RESPONSE_TYPE = LeaveGroupResponse_v0
SCHEMA = Schema(
('group', String('utf-8')),
('member_id', String('utf-8'))
)
+
+
+LeaveGroupRequest = [LeaveGroupRequest_v0]
+LeaveGroupResponse = [LeaveGroupResponse_v0]
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index e4745f1..2eddf3b 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -136,7 +136,7 @@ class KafkaProtocol(object):
if acks not in (1, 0, -1):
raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks)
- return kafka.protocol.produce.ProduceRequest(
+ return kafka.protocol.produce.ProduceRequest[0](
required_acks=acks,
timeout=timeout,
topics=[(
@@ -180,7 +180,7 @@ class KafkaProtocol(object):
Return: FetchRequest
"""
- return kafka.protocol.fetch.FetchRequest(
+ return kafka.protocol.fetch.FetchRequest[0](
replica_id=-1,
max_wait_time=max_wait_time,
min_bytes=min_bytes,
@@ -212,7 +212,7 @@ class KafkaProtocol(object):
@classmethod
def encode_offset_request(cls, payloads=()):
- return kafka.protocol.offset.OffsetRequest(
+ return kafka.protocol.offset.OffsetRequest[0](
replica_id=-1,
topics=[(
topic,
@@ -250,7 +250,7 @@ class KafkaProtocol(object):
if payloads is not None:
topics = payloads
- return kafka.protocol.metadata.MetadataRequest(topics)
+ return kafka.protocol.metadata.MetadataRequest[0](topics)
@classmethod
def decode_metadata_response(cls, response):
@@ -297,7 +297,7 @@ class KafkaProtocol(object):
group: string, the consumer group you are committing offsets for
payloads: list of OffsetCommitRequestPayload
"""
- return kafka.protocol.commit.OffsetCommitRequest_v0(
+ return kafka.protocol.commit.OffsetCommitRequest[0](
consumer_group=group,
topics=[(
topic,
@@ -337,11 +337,11 @@ class KafkaProtocol(object):
from_kafka: bool, default False, set True for Kafka-committed offsets
"""
if from_kafka:
- request_class = kafka.protocol.commit.OffsetFetchRequest_v1
+ version = 1
else:
- request_class = kafka.protocol.commit.OffsetFetchRequest_v0
+ version = 0
- return request_class(
+ return kafka.protocol.commit.OffsetFetchRequest[version](
consumer_group=group,
topics=[(
topic,
diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py
index 810f1b8..8063dda 100644
--- a/kafka/protocol/metadata.py
+++ b/kafka/protocol/metadata.py
@@ -2,7 +2,9 @@ from .struct import Struct
from .types import Array, Int16, Int32, Schema, String
-class MetadataResponse(Struct):
+class MetadataResponse_v0(Struct):
+ API_KEY = 3
+ API_VERSION = 0
SCHEMA = Schema(
('brokers', Array(
('node_id', Int32),
@@ -20,10 +22,14 @@ class MetadataResponse(Struct):
)
-class MetadataRequest(Struct):
+class MetadataRequest_v0(Struct):
API_KEY = 3
API_VERSION = 0
- RESPONSE_TYPE = MetadataResponse
+ RESPONSE_TYPE = MetadataResponse_v0
SCHEMA = Schema(
('topics', Array(String('utf-8')))
)
+
+
+MetadataRequest = [MetadataRequest_v0]
+MetadataResponse = [MetadataResponse_v0]
diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py
index 606f1f1..57bf4ac 100644
--- a/kafka/protocol/offset.py
+++ b/kafka/protocol/offset.py
@@ -1,13 +1,16 @@
from .struct import Struct
from .types import Array, Int16, Int32, Int64, Schema, String
+
class OffsetResetStrategy(object):
LATEST = -1
EARLIEST = -2
NONE = 0
-class OffsetResponse(Struct):
+class OffsetResponse_v0(Struct):
+ API_KEY = 2
+ API_VERSION = 0
SCHEMA = Schema(
('topics', Array(
('topic', String('utf-8')),
@@ -18,10 +21,10 @@ class OffsetResponse(Struct):
)
-class OffsetRequest(Struct):
+class OffsetRequest_v0(Struct):
API_KEY = 2
API_VERSION = 0
- RESPONSE_TYPE = OffsetResponse
+ RESPONSE_TYPE = OffsetResponse_v0
SCHEMA = Schema(
('replica_id', Int32),
('topics', Array(
@@ -34,3 +37,7 @@ class OffsetRequest(Struct):
DEFAULTS = {
'replica_id': -1
}
+
+
+OffsetRequest = [OffsetRequest_v0]
+OffsetResponse = [OffsetResponse_v0]
diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py
index ef2f96e..5753f64 100644
--- a/kafka/protocol/produce.py
+++ b/kafka/protocol/produce.py
@@ -1,9 +1,11 @@
from .message import MessageSet
from .struct import Struct
-from .types import Int8, Int16, Int32, Int64, Bytes, String, Array, Schema
+from .types import Int16, Int32, Int64, String, Array, Schema
-class ProduceResponse(Struct):
+class ProduceResponse_v0(Struct):
+ API_KEY = 0
+ API_VERSION = 0
SCHEMA = Schema(
('topics', Array(
('topic', String('utf-8')),
@@ -14,10 +16,10 @@ class ProduceResponse(Struct):
)
-class ProduceRequest(Struct):
+class ProduceRequest_v0(Struct):
API_KEY = 0
API_VERSION = 0
- RESPONSE_TYPE = ProduceResponse
+ RESPONSE_TYPE = ProduceResponse_v0
SCHEMA = Schema(
('required_acks', Int16),
('timeout', Int32),
@@ -27,3 +29,7 @@ class ProduceRequest(Struct):
('partition', Int32),
('messages', MessageSet)))))
)
+
+
+ProduceRequest = [ProduceRequest_v0]
+ProduceResponse = [ProduceResponse_v0]