diff options
Diffstat (limited to 'kafka/protocol')
-rw-r--r-- | kafka/protocol/admin.py | 24 | ||||
-rw-r--r-- | kafka/protocol/commit.py | 104 | ||||
-rw-r--r-- | kafka/protocol/fetch.py | 12 | ||||
-rw-r--r-- | kafka/protocol/group.py | 48 | ||||
-rw-r--r-- | kafka/protocol/legacy.py | 16 | ||||
-rw-r--r-- | kafka/protocol/metadata.py | 12 | ||||
-rw-r--r-- | kafka/protocol/offset.py | 13 | ||||
-rw-r--r-- | kafka/protocol/produce.py | 14 |
8 files changed, 181 insertions, 62 deletions
diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 56dd042..8c74613 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -2,7 +2,9 @@ from .struct import Struct from .types import Array, Bytes, Int16, Schema, String -class ListGroupsResponse(Struct): +class ListGroupsResponse_v0(Struct): + API_KEY = 16 + API_VERSION = 0 SCHEMA = Schema( ('error_code', Int16), ('groups', Array( @@ -11,14 +13,20 @@ class ListGroupsResponse(Struct): ) -class ListGroupsRequest(Struct): +class ListGroupsRequest_v0(Struct): API_KEY = 16 API_VERSION = 0 - RESPONSE_TYPE = ListGroupsResponse + RESPONSE_TYPE = ListGroupsResponse_v0 SCHEMA = Schema() -class DescribeGroupsResponse(Struct): +ListGroupsRequest = [ListGroupsRequest_v0] +ListGroupsResponse = [ListGroupsResponse_v0] + + +class DescribeGroupsResponse_v0(Struct): + API_KEY = 15 + API_VERSION = 0 SCHEMA = Schema( ('groups', Array( ('error_code', Int16), @@ -35,10 +43,14 @@ class DescribeGroupsResponse(Struct): ) -class DescribeGroupsRequest(Struct): +class DescribeGroupsRequest_v0(Struct): API_KEY = 15 API_VERSION = 0 - RESPONSE_TYPE = DescribeGroupsResponse + RESPONSE_TYPE = DescribeGroupsResponse_v0 SCHEMA = Schema( ('groups', Array(String('utf-8'))) ) + + +DescribeGroupsRequest = [DescribeGroupsRequest_v0] +DescribeGroupsResponse = [DescribeGroupsResponse_v0] diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py index a32f8d3..90a3b76 100644 --- a/kafka/protocol/commit.py +++ b/kafka/protocol/commit.py @@ -2,7 +2,9 @@ from .struct import Struct from .types import Array, Int16, Int32, Int64, Schema, String -class OffsetCommitResponse(Struct): +class OffsetCommitResponse_v0(Struct): + API_KEY = 8 + API_VERSION = 0 SCHEMA = Schema( ('topics', Array( ('topic', String('utf-8')), @@ -12,15 +14,36 @@ class OffsetCommitResponse(Struct): ) -class OffsetCommitRequest_v2(Struct): +class OffsetCommitResponse_v1(Struct): API_KEY = 8 - API_VERSION = 2 # added retention_time, dropped timestamp - RESPONSE_TYPE = OffsetCommitResponse + API_VERSION = 1 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16))))) + ) + + +class OffsetCommitResponse_v2(Struct): + API_KEY = 8 + API_VERSION = 2 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16))))) + ) + + +class OffsetCommitRequest_v0(Struct): + API_KEY = 8 + API_VERSION = 0 # Zookeeper-backed storage + RESPONSE_TYPE = OffsetCommitResponse_v0 SCHEMA = Schema( ('consumer_group', String('utf-8')), - ('consumer_group_generation_id', Int32), - ('consumer_id', String('utf-8')), - ('retention_time', Int64), ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( @@ -28,14 +51,12 @@ class OffsetCommitRequest_v2(Struct): ('offset', Int64), ('metadata', String('utf-8')))))) ) - DEFAULT_GENERATION_ID = -1 - DEFAULT_RETENTION_TIME = -1 class OffsetCommitRequest_v1(Struct): API_KEY = 8 API_VERSION = 1 # Kafka-backed storage - RESPONSE_TYPE = OffsetCommitResponse + RESPONSE_TYPE = OffsetCommitResponse_v1 SCHEMA = Schema( ('consumer_group', String('utf-8')), ('consumer_group_generation_id', Int32), @@ -50,12 +71,15 @@ class OffsetCommitRequest_v1(Struct): ) -class OffsetCommitRequest_v0(Struct): +class OffsetCommitRequest_v2(Struct): API_KEY = 8 - API_VERSION = 0 # Zookeeper-backed storage - RESPONSE_TYPE = OffsetCommitResponse + API_VERSION = 2 # added retention_time, dropped timestamp + RESPONSE_TYPE = OffsetCommitResponse_v2 SCHEMA = Schema( ('consumer_group', String('utf-8')), + ('consumer_group_generation_id', Int32), + ('consumer_id', String('utf-8')), + ('retention_time', Int64), ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( @@ -63,9 +87,19 @@ class OffsetCommitRequest_v0(Struct): ('offset', Int64), ('metadata', String('utf-8')))))) ) + DEFAULT_GENERATION_ID = -1 + DEFAULT_RETENTION_TIME = -1 -class OffsetFetchResponse(Struct): +OffsetCommitRequest = [OffsetCommitRequest_v0, OffsetCommitRequest_v1, + OffsetCommitRequest_v2] +OffsetCommitResponse = [OffsetCommitResponse_v0, OffsetCommitResponse_v1, + OffsetCommitResponse_v2] + + +class OffsetFetchResponse_v0(Struct): + API_KEY = 9 + API_VERSION = 0 SCHEMA = Schema( ('topics', Array( ('topic', String('utf-8')), @@ -77,22 +111,24 @@ class OffsetFetchResponse(Struct): ) -class OffsetFetchRequest_v1(Struct): +class OffsetFetchResponse_v1(Struct): API_KEY = 9 - API_VERSION = 1 # kafka-backed storage - RESPONSE_TYPE = OffsetFetchResponse + API_VERSION = 1 SCHEMA = Schema( - ('consumer_group', String('utf-8')), ('topics', Array( ('topic', String('utf-8')), - ('partitions', Array(Int32)))) + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('metadata', String('utf-8')), + ('error_code', Int16))))) ) class OffsetFetchRequest_v0(Struct): API_KEY = 9 API_VERSION = 0 # zookeeper-backed storage - RESPONSE_TYPE = OffsetFetchResponse + RESPONSE_TYPE = OffsetFetchResponse_v0 SCHEMA = Schema( ('consumer_group', String('utf-8')), ('topics', Array( @@ -101,7 +137,25 @@ class OffsetFetchRequest_v0(Struct): ) -class GroupCoordinatorResponse(Struct): +class OffsetFetchRequest_v1(Struct): + API_KEY = 9 + API_VERSION = 1 # kafka-backed storage + RESPONSE_TYPE = OffsetFetchResponse_v1 + SCHEMA = Schema( + ('consumer_group', String('utf-8')), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array(Int32)))) + ) + + +OffsetFetchRequest = [OffsetFetchRequest_v0, OffsetFetchRequest_v1] +OffsetFetchResponse = [OffsetFetchResponse_v0, OffsetFetchResponse_v1] + + +class GroupCoordinatorResponse_v0(Struct): + API_KEY = 10 + API_VERSION = 0 SCHEMA = Schema( ('error_code', Int16), ('coordinator_id', Int32), @@ -110,10 +164,14 @@ class GroupCoordinatorResponse(Struct): ) -class GroupCoordinatorRequest(Struct): +class GroupCoordinatorRequest_v0(Struct): API_KEY = 10 API_VERSION = 0 - RESPONSE_TYPE = GroupCoordinatorResponse + RESPONSE_TYPE = GroupCoordinatorResponse_v0 SCHEMA = Schema( ('consumer_group', String('utf-8')) ) + + +GroupCoordinatorRequest = [GroupCoordinatorRequest_v0] +GroupCoordinatorResponse = [GroupCoordinatorResponse_v0] diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index e00c9ab..eeda4e7 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -3,7 +3,9 @@ from .struct import Struct from .types import Array, Int16, Int32, Int64, Schema, String -class FetchResponse(Struct): +class FetchResponse_v0(Struct): + API_KEY = 1 + API_VERSION = 0 SCHEMA = Schema( ('topics', Array( ('topics', String('utf-8')), @@ -15,10 +17,10 @@ class FetchResponse(Struct): ) -class FetchRequest(Struct): +class FetchRequest_v0(Struct): API_KEY = 1 API_VERSION = 0 - RESPONSE_TYPE = FetchResponse + RESPONSE_TYPE = FetchResponse_v0 SCHEMA = Schema( ('replica_id', Int32), ('max_wait_time', Int32), @@ -30,3 +32,7 @@ class FetchRequest(Struct): ('offset', Int64), ('max_bytes', Int32))))) ) + + +FetchRequest = [FetchRequest_v0] +FetchResponse = [FetchResponse_v0] diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py index 72de005..97ae5f7 100644 --- a/kafka/protocol/group.py +++ b/kafka/protocol/group.py @@ -2,7 +2,9 @@ from .struct import Struct from .types import Array, Bytes, Int16, Int32, Schema, String -class JoinGroupResponse(Struct): +class JoinGroupResponse_v0(Struct): + API_KEY = 11 + API_VERSION = 0 SCHEMA = Schema( ('error_code', Int16), ('generation_id', Int32), @@ -15,10 +17,10 @@ class JoinGroupResponse(Struct): ) -class JoinGroupRequest(Struct): +class JoinGroupRequest_v0(Struct): API_KEY = 11 API_VERSION = 0 - RESPONSE_TYPE = JoinGroupResponse + RESPONSE_TYPE = JoinGroupResponse_v0 SCHEMA = Schema( ('group', String('utf-8')), ('session_timeout', Int32), @@ -31,6 +33,10 @@ class JoinGroupRequest(Struct): UNKNOWN_MEMBER_ID = '' +JoinGroupRequest = [JoinGroupRequest_v0] +JoinGroupResponse = [JoinGroupResponse_v0] + + class ProtocolMetadata(Struct): SCHEMA = Schema( ('version', Int16), @@ -39,17 +45,19 @@ class ProtocolMetadata(Struct): ) -class SyncGroupResponse(Struct): +class SyncGroupResponse_v0(Struct): + API_KEY = 14 + API_VERSION = 0 SCHEMA = Schema( ('error_code', Int16), ('member_assignment', Bytes) ) -class SyncGroupRequest(Struct): +class SyncGroupRequest_v0(Struct): API_KEY = 14 API_VERSION = 0 - RESPONSE_TYPE = SyncGroupResponse + RESPONSE_TYPE = SyncGroupResponse_v0 SCHEMA = Schema( ('group', String('utf-8')), ('generation_id', Int32), @@ -60,6 +68,10 @@ class SyncGroupRequest(Struct): ) +SyncGroupRequest = [SyncGroupRequest_v0] +SyncGroupResponse = [SyncGroupResponse_v0] + + class MemberAssignment(Struct): SCHEMA = Schema( ('version', Int16), @@ -70,16 +82,18 @@ class MemberAssignment(Struct): ) -class HeartbeatResponse(Struct): +class HeartbeatResponse_v0(Struct): + API_KEY = 12 + API_VERSION = 0 SCHEMA = Schema( ('error_code', Int16) ) -class HeartbeatRequest(Struct): +class HeartbeatRequest_v0(Struct): API_KEY = 12 API_VERSION = 0 - RESPONSE_TYPE = HeartbeatResponse + RESPONSE_TYPE = HeartbeatResponse_v0 SCHEMA = Schema( ('group', String('utf-8')), ('generation_id', Int32), @@ -87,17 +101,27 @@ class HeartbeatRequest(Struct): ) -class LeaveGroupResponse(Struct): +HeartbeatRequest = [HeartbeatRequest_v0] +HeartbeatResponse = [HeartbeatResponse_v0] + + +class LeaveGroupResponse_v0(Struct): + API_KEY = 13 + API_VERSION = 0 SCHEMA = Schema( ('error_code', Int16) ) -class LeaveGroupRequest(Struct): +class LeaveGroupRequest_v0(Struct): API_KEY = 13 API_VERSION = 0 - RESPONSE_TYPE = LeaveGroupResponse + RESPONSE_TYPE = LeaveGroupResponse_v0 SCHEMA = Schema( ('group', String('utf-8')), ('member_id', String('utf-8')) ) + + +LeaveGroupRequest = [LeaveGroupRequest_v0] +LeaveGroupResponse = [LeaveGroupResponse_v0] diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index e4745f1..2eddf3b 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -136,7 +136,7 @@ class KafkaProtocol(object): if acks not in (1, 0, -1): raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks) - return kafka.protocol.produce.ProduceRequest( + return kafka.protocol.produce.ProduceRequest[0]( required_acks=acks, timeout=timeout, topics=[( @@ -180,7 +180,7 @@ class KafkaProtocol(object): Return: FetchRequest """ - return kafka.protocol.fetch.FetchRequest( + return kafka.protocol.fetch.FetchRequest[0]( replica_id=-1, max_wait_time=max_wait_time, min_bytes=min_bytes, @@ -212,7 +212,7 @@ class KafkaProtocol(object): @classmethod def encode_offset_request(cls, payloads=()): - return kafka.protocol.offset.OffsetRequest( + return kafka.protocol.offset.OffsetRequest[0]( replica_id=-1, topics=[( topic, @@ -250,7 +250,7 @@ class KafkaProtocol(object): if payloads is not None: topics = payloads - return kafka.protocol.metadata.MetadataRequest(topics) + return kafka.protocol.metadata.MetadataRequest[0](topics) @classmethod def decode_metadata_response(cls, response): @@ -297,7 +297,7 @@ class KafkaProtocol(object): group: string, the consumer group you are committing offsets for payloads: list of OffsetCommitRequestPayload """ - return kafka.protocol.commit.OffsetCommitRequest_v0( + return kafka.protocol.commit.OffsetCommitRequest[0]( consumer_group=group, topics=[( topic, @@ -337,11 +337,11 @@ class KafkaProtocol(object): from_kafka: bool, default False, set True for Kafka-committed offsets """ if from_kafka: - request_class = kafka.protocol.commit.OffsetFetchRequest_v1 + version = 1 else: - request_class = kafka.protocol.commit.OffsetFetchRequest_v0 + version = 0 - return request_class( + return kafka.protocol.commit.OffsetFetchRequest[version]( consumer_group=group, topics=[( topic, diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py index 810f1b8..8063dda 100644 --- a/kafka/protocol/metadata.py +++ b/kafka/protocol/metadata.py @@ -2,7 +2,9 @@ from .struct import Struct from .types import Array, Int16, Int32, Schema, String -class MetadataResponse(Struct): +class MetadataResponse_v0(Struct): + API_KEY = 3 + API_VERSION = 0 SCHEMA = Schema( ('brokers', Array( ('node_id', Int32), @@ -20,10 +22,14 @@ class MetadataResponse(Struct): ) -class MetadataRequest(Struct): +class MetadataRequest_v0(Struct): API_KEY = 3 API_VERSION = 0 - RESPONSE_TYPE = MetadataResponse + RESPONSE_TYPE = MetadataResponse_v0 SCHEMA = Schema( ('topics', Array(String('utf-8'))) ) + + +MetadataRequest = [MetadataRequest_v0] +MetadataResponse = [MetadataResponse_v0] diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py index 606f1f1..57bf4ac 100644 --- a/kafka/protocol/offset.py +++ b/kafka/protocol/offset.py @@ -1,13 +1,16 @@ from .struct import Struct from .types import Array, Int16, Int32, Int64, Schema, String + class OffsetResetStrategy(object): LATEST = -1 EARLIEST = -2 NONE = 0 -class OffsetResponse(Struct): +class OffsetResponse_v0(Struct): + API_KEY = 2 + API_VERSION = 0 SCHEMA = Schema( ('topics', Array( ('topic', String('utf-8')), @@ -18,10 +21,10 @@ class OffsetResponse(Struct): ) -class OffsetRequest(Struct): +class OffsetRequest_v0(Struct): API_KEY = 2 API_VERSION = 0 - RESPONSE_TYPE = OffsetResponse + RESPONSE_TYPE = OffsetResponse_v0 SCHEMA = Schema( ('replica_id', Int32), ('topics', Array( @@ -34,3 +37,7 @@ class OffsetRequest(Struct): DEFAULTS = { 'replica_id': -1 } + + +OffsetRequest = [OffsetRequest_v0] +OffsetResponse = [OffsetResponse_v0] diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index ef2f96e..5753f64 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -1,9 +1,11 @@ from .message import MessageSet from .struct import Struct -from .types import Int8, Int16, Int32, Int64, Bytes, String, Array, Schema +from .types import Int16, Int32, Int64, String, Array, Schema -class ProduceResponse(Struct): +class ProduceResponse_v0(Struct): + API_KEY = 0 + API_VERSION = 0 SCHEMA = Schema( ('topics', Array( ('topic', String('utf-8')), @@ -14,10 +16,10 @@ class ProduceResponse(Struct): ) -class ProduceRequest(Struct): +class ProduceRequest_v0(Struct): API_KEY = 0 API_VERSION = 0 - RESPONSE_TYPE = ProduceResponse + RESPONSE_TYPE = ProduceResponse_v0 SCHEMA = Schema( ('required_acks', Int16), ('timeout', Int32), @@ -27,3 +29,7 @@ class ProduceRequest(Struct): ('partition', Int32), ('messages', MessageSet))))) ) + + +ProduceRequest = [ProduceRequest_v0] +ProduceResponse = [ProduceResponse_v0] |