summaryrefslogtreecommitdiff
path: root/kafka/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/protocol')
-rw-r--r--kafka/protocol/admin.py30
-rw-r--r--kafka/protocol/api.py49
-rw-r--r--kafka/protocol/commit.py30
-rw-r--r--kafka/protocol/fetch.py18
-rw-r--r--kafka/protocol/group.py21
-rw-r--r--kafka/protocol/metadata.py14
-rw-r--r--kafka/protocol/offset.py10
-rw-r--r--kafka/protocol/produce.py29
8 files changed, 133 insertions, 68 deletions
diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py
index 89ea739..c5142b3 100644
--- a/kafka/protocol/admin.py
+++ b/kafka/protocol/admin.py
@@ -1,10 +1,10 @@
from __future__ import absolute_import
-from .struct import Struct
+from .api import Request, Response
from .types import Array, Boolean, Bytes, Int16, Int32, Schema, String
-class ApiVersionResponse_v0(Struct):
+class ApiVersionResponse_v0(Response):
API_KEY = 18
API_VERSION = 0
SCHEMA = Schema(
@@ -16,7 +16,7 @@ class ApiVersionResponse_v0(Struct):
)
-class ApiVersionRequest_v0(Struct):
+class ApiVersionRequest_v0(Request):
API_KEY = 18
API_VERSION = 0
RESPONSE_TYPE = ApiVersionResponse_v0
@@ -27,7 +27,7 @@ ApiVersionRequest = [ApiVersionRequest_v0]
ApiVersionResponse = [ApiVersionResponse_v0]
-class CreateTopicsResponse_v0(Struct):
+class CreateTopicsResponse_v0(Response):
API_KEY = 19
API_VERSION = 0
SCHEMA = Schema(
@@ -37,7 +37,7 @@ class CreateTopicsResponse_v0(Struct):
)
-class CreateTopicsResponse_v1(Struct):
+class CreateTopicsResponse_v1(Response):
API_KEY = 19
API_VERSION = 1
SCHEMA = Schema(
@@ -48,7 +48,7 @@ class CreateTopicsResponse_v1(Struct):
)
-class CreateTopicsRequest_v0(Struct):
+class CreateTopicsRequest_v0(Request):
API_KEY = 19
API_VERSION = 0
RESPONSE_TYPE = CreateTopicsResponse_v0
@@ -67,7 +67,7 @@ class CreateTopicsRequest_v0(Struct):
)
-class CreateTopicsRequest_v1(Struct):
+class CreateTopicsRequest_v1(Request):
API_KEY = 19
API_VERSION = 1
RESPONSE_TYPE = CreateTopicsResponse_v1
@@ -91,7 +91,7 @@ CreateTopicsRequest = [CreateTopicsRequest_v0, CreateTopicsRequest_v1]
CreateTopicsResponse = [CreateTopicsResponse_v0, CreateTopicsRequest_v1]
-class DeleteTopicsResponse_v0(Struct):
+class DeleteTopicsResponse_v0(Response):
API_KEY = 20
API_VERSION = 0
SCHEMA = Schema(
@@ -101,7 +101,7 @@ class DeleteTopicsResponse_v0(Struct):
)
-class DeleteTopicsRequest_v0(Struct):
+class DeleteTopicsRequest_v0(Request):
API_KEY = 20
API_VERSION = 0
RESPONSE_TYPE = DeleteTopicsResponse_v0
@@ -115,7 +115,7 @@ DeleteTopicsRequest = [DeleteTopicsRequest_v0]
DeleteTopicsResponse = [DeleteTopicsResponse_v0]
-class ListGroupsResponse_v0(Struct):
+class ListGroupsResponse_v0(Response):
API_KEY = 16
API_VERSION = 0
SCHEMA = Schema(
@@ -126,7 +126,7 @@ class ListGroupsResponse_v0(Struct):
)
-class ListGroupsRequest_v0(Struct):
+class ListGroupsRequest_v0(Request):
API_KEY = 16
API_VERSION = 0
RESPONSE_TYPE = ListGroupsResponse_v0
@@ -137,7 +137,7 @@ ListGroupsRequest = [ListGroupsRequest_v0]
ListGroupsResponse = [ListGroupsResponse_v0]
-class DescribeGroupsResponse_v0(Struct):
+class DescribeGroupsResponse_v0(Response):
API_KEY = 15
API_VERSION = 0
SCHEMA = Schema(
@@ -156,7 +156,7 @@ class DescribeGroupsResponse_v0(Struct):
)
-class DescribeGroupsRequest_v0(Struct):
+class DescribeGroupsRequest_v0(Request):
API_KEY = 15
API_VERSION = 0
RESPONSE_TYPE = DescribeGroupsResponse_v0
@@ -169,7 +169,7 @@ DescribeGroupsRequest = [DescribeGroupsRequest_v0]
DescribeGroupsResponse = [DescribeGroupsResponse_v0]
-class SaslHandShakeResponse_v0(Struct):
+class SaslHandShakeResponse_v0(Response):
API_KEY = 17
API_VERSION = 0
SCHEMA = Schema(
@@ -178,7 +178,7 @@ class SaslHandShakeResponse_v0(Struct):
)
-class SaslHandShakeRequest_v0(Struct):
+class SaslHandShakeRequest_v0(Request):
API_KEY = 17
API_VERSION = 0
RESPONSE_TYPE = SaslHandShakeResponse_v0
diff --git a/kafka/protocol/api.py b/kafka/protocol/api.py
index 7779aac..ec24a39 100644
--- a/kafka/protocol/api.py
+++ b/kafka/protocol/api.py
@@ -1,5 +1,7 @@
from __future__ import absolute_import
+import abc
+
from .struct import Struct
from .types import Int16, Int32, String, Schema
@@ -16,3 +18,50 @@ class RequestHeader(Struct):
super(RequestHeader, self).__init__(
request.API_KEY, request.API_VERSION, correlation_id, client_id
)
+
+
+class Request(Struct):
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractproperty
+ def API_KEY(self):
+ """Integer identifier for api request"""
+ pass
+
+ @abc.abstractproperty
+ def API_VERSION(self):
+ """Integer of api request version"""
+ pass
+
+ @abc.abstractproperty
+ def SCHEMA(self):
+ """An instance of Schema() representing the request structure"""
+ pass
+
+ @abc.abstractproperty
+ def RESPONSE_TYPE(self):
+ """The Response class associated with the api request"""
+ pass
+
+ def expect_response(self):
+ """Override this method if an api request does not always generate a response"""
+ return True
+
+
+class Response(Struct):
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractproperty
+ def API_KEY(self):
+ """Integer identifier for api request/response"""
+ pass
+
+ @abc.abstractproperty
+ def API_VERSION(self):
+ """Integer of api request/response version"""
+ pass
+
+ @abc.abstractproperty
+ def SCHEMA(self):
+ """An instance of Schema() representing the response structure"""
+ pass
diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py
index 5645372..bcffe67 100644
--- a/kafka/protocol/commit.py
+++ b/kafka/protocol/commit.py
@@ -1,10 +1,10 @@
from __future__ import absolute_import
-from .struct import Struct
+from .api import Request, Response
from .types import Array, Int16, Int32, Int64, Schema, String
-class OffsetCommitResponse_v0(Struct):
+class OffsetCommitResponse_v0(Response):
API_KEY = 8
API_VERSION = 0
SCHEMA = Schema(
@@ -16,19 +16,19 @@ class OffsetCommitResponse_v0(Struct):
)
-class OffsetCommitResponse_v1(Struct):
+class OffsetCommitResponse_v1(Response):
API_KEY = 8
API_VERSION = 1
SCHEMA = OffsetCommitResponse_v0.SCHEMA
-class OffsetCommitResponse_v2(Struct):
+class OffsetCommitResponse_v2(Response):
API_KEY = 8
API_VERSION = 2
SCHEMA = OffsetCommitResponse_v1.SCHEMA
-class OffsetCommitRequest_v0(Struct):
+class OffsetCommitRequest_v0(Request):
API_KEY = 8
API_VERSION = 0 # Zookeeper-backed storage
RESPONSE_TYPE = OffsetCommitResponse_v0
@@ -43,7 +43,7 @@ class OffsetCommitRequest_v0(Struct):
)
-class OffsetCommitRequest_v1(Struct):
+class OffsetCommitRequest_v1(Request):
API_KEY = 8
API_VERSION = 1 # Kafka-backed storage
RESPONSE_TYPE = OffsetCommitResponse_v1
@@ -61,7 +61,7 @@ class OffsetCommitRequest_v1(Struct):
)
-class OffsetCommitRequest_v2(Struct):
+class OffsetCommitRequest_v2(Request):
API_KEY = 8
API_VERSION = 2 # added retention_time, dropped timestamp
RESPONSE_TYPE = OffsetCommitResponse_v2
@@ -87,7 +87,7 @@ OffsetCommitResponse = [OffsetCommitResponse_v0, OffsetCommitResponse_v1,
OffsetCommitResponse_v2]
-class OffsetFetchResponse_v0(Struct):
+class OffsetFetchResponse_v0(Response):
API_KEY = 9
API_VERSION = 0
SCHEMA = Schema(
@@ -101,13 +101,13 @@ class OffsetFetchResponse_v0(Struct):
)
-class OffsetFetchResponse_v1(Struct):
+class OffsetFetchResponse_v1(Response):
API_KEY = 9
API_VERSION = 1
SCHEMA = OffsetFetchResponse_v0.SCHEMA
-class OffsetFetchResponse_v2(Struct):
+class OffsetFetchResponse_v2(Response):
# Added in KIP-88
API_KEY = 9
API_VERSION = 2
@@ -123,7 +123,7 @@ class OffsetFetchResponse_v2(Struct):
)
-class OffsetFetchRequest_v0(Struct):
+class OffsetFetchRequest_v0(Request):
API_KEY = 9
API_VERSION = 0 # zookeeper-backed storage
RESPONSE_TYPE = OffsetFetchResponse_v0
@@ -135,14 +135,14 @@ class OffsetFetchRequest_v0(Struct):
)
-class OffsetFetchRequest_v1(Struct):
+class OffsetFetchRequest_v1(Request):
API_KEY = 9
API_VERSION = 1 # kafka-backed storage
RESPONSE_TYPE = OffsetFetchResponse_v1
SCHEMA = OffsetFetchRequest_v0.SCHEMA
-class OffsetFetchRequest_v2(Struct):
+class OffsetFetchRequest_v2(Request):
# KIP-88: Allows passing null topics to return offsets for all partitions
# that the consumer group has a stored offset for, even if no consumer in
# the group is currently consuming that partition.
@@ -158,7 +158,7 @@ OffsetFetchResponse = [OffsetFetchResponse_v0, OffsetFetchResponse_v1,
OffsetFetchResponse_v2]
-class GroupCoordinatorResponse_v0(Struct):
+class GroupCoordinatorResponse_v0(Response):
API_KEY = 10
API_VERSION = 0
SCHEMA = Schema(
@@ -169,7 +169,7 @@ class GroupCoordinatorResponse_v0(Struct):
)
-class GroupCoordinatorRequest_v0(Struct):
+class GroupCoordinatorRequest_v0(Request):
API_KEY = 10
API_VERSION = 0
RESPONSE_TYPE = GroupCoordinatorResponse_v0
diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py
index 6a9ad5b..b441e63 100644
--- a/kafka/protocol/fetch.py
+++ b/kafka/protocol/fetch.py
@@ -1,11 +1,11 @@
from __future__ import absolute_import
+from .api import Request, Response
from .message import MessageSet
-from .struct import Struct
from .types import Array, Int16, Int32, Int64, Schema, String
-class FetchResponse_v0(Struct):
+class FetchResponse_v0(Response):
API_KEY = 1
API_VERSION = 0
SCHEMA = Schema(
@@ -19,7 +19,7 @@ class FetchResponse_v0(Struct):
)
-class FetchResponse_v1(Struct):
+class FetchResponse_v1(Response):
API_KEY = 1
API_VERSION = 1
SCHEMA = Schema(
@@ -34,19 +34,19 @@ class FetchResponse_v1(Struct):
)
-class FetchResponse_v2(Struct):
+class FetchResponse_v2(Response):
API_KEY = 1
API_VERSION = 2
SCHEMA = FetchResponse_v1.SCHEMA # message format changed internally
-class FetchResponse_v3(Struct):
+class FetchResponse_v3(Response):
API_KEY = 1
API_VERSION = 3
SCHEMA = FetchResponse_v2.SCHEMA
-class FetchRequest_v0(Struct):
+class FetchRequest_v0(Request):
API_KEY = 1
API_VERSION = 0
RESPONSE_TYPE = FetchResponse_v0
@@ -63,21 +63,21 @@ class FetchRequest_v0(Struct):
)
-class FetchRequest_v1(Struct):
+class FetchRequest_v1(Request):
API_KEY = 1
API_VERSION = 1
RESPONSE_TYPE = FetchResponse_v1
SCHEMA = FetchRequest_v0.SCHEMA
-class FetchRequest_v2(Struct):
+class FetchRequest_v2(Request):
API_KEY = 1
API_VERSION = 2
RESPONSE_TYPE = FetchResponse_v2
SCHEMA = FetchRequest_v1.SCHEMA
-class FetchRequest_v3(Struct):
+class FetchRequest_v3(Request):
API_KEY = 1
API_VERSION = 3
RESPONSE_TYPE = FetchResponse_v3
diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py
index 0e0b70e..5cab754 100644
--- a/kafka/protocol/group.py
+++ b/kafka/protocol/group.py
@@ -1,10 +1,11 @@
from __future__ import absolute_import
+from .api import Request, Response
from .struct import Struct
from .types import Array, Bytes, Int16, Int32, Schema, String
-class JoinGroupResponse_v0(Struct):
+class JoinGroupResponse_v0(Response):
API_KEY = 11
API_VERSION = 0
SCHEMA = Schema(
@@ -19,13 +20,13 @@ class JoinGroupResponse_v0(Struct):
)
-class JoinGroupResponse_v1(Struct):
+class JoinGroupResponse_v1(Response):
API_KEY = 11
API_VERSION = 1
SCHEMA = JoinGroupResponse_v0.SCHEMA
-class JoinGroupRequest_v0(Struct):
+class JoinGroupRequest_v0(Request):
API_KEY = 11
API_VERSION = 0
RESPONSE_TYPE = JoinGroupResponse_v0
@@ -41,7 +42,7 @@ class JoinGroupRequest_v0(Struct):
UNKNOWN_MEMBER_ID = ''
-class JoinGroupRequest_v1(Struct):
+class JoinGroupRequest_v1(Request):
API_KEY = 11
API_VERSION = 1
RESPONSE_TYPE = JoinGroupResponse_v1
@@ -70,7 +71,7 @@ class ProtocolMetadata(Struct):
)
-class SyncGroupResponse_v0(Struct):
+class SyncGroupResponse_v0(Response):
API_KEY = 14
API_VERSION = 0
SCHEMA = Schema(
@@ -79,7 +80,7 @@ class SyncGroupResponse_v0(Struct):
)
-class SyncGroupRequest_v0(Struct):
+class SyncGroupRequest_v0(Request):
API_KEY = 14
API_VERSION = 0
RESPONSE_TYPE = SyncGroupResponse_v0
@@ -107,7 +108,7 @@ class MemberAssignment(Struct):
)
-class HeartbeatResponse_v0(Struct):
+class HeartbeatResponse_v0(Response):
API_KEY = 12
API_VERSION = 0
SCHEMA = Schema(
@@ -115,7 +116,7 @@ class HeartbeatResponse_v0(Struct):
)
-class HeartbeatRequest_v0(Struct):
+class HeartbeatRequest_v0(Request):
API_KEY = 12
API_VERSION = 0
RESPONSE_TYPE = HeartbeatResponse_v0
@@ -130,7 +131,7 @@ HeartbeatRequest = [HeartbeatRequest_v0]
HeartbeatResponse = [HeartbeatResponse_v0]
-class LeaveGroupResponse_v0(Struct):
+class LeaveGroupResponse_v0(Response):
API_KEY = 13
API_VERSION = 0
SCHEMA = Schema(
@@ -138,7 +139,7 @@ class LeaveGroupResponse_v0(Struct):
)
-class LeaveGroupRequest_v0(Struct):
+class LeaveGroupRequest_v0(Request):
API_KEY = 13
API_VERSION = 0
RESPONSE_TYPE = LeaveGroupResponse_v0
diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py
index e017c59..907ec25 100644
--- a/kafka/protocol/metadata.py
+++ b/kafka/protocol/metadata.py
@@ -1,10 +1,10 @@
from __future__ import absolute_import
-from .struct import Struct
+from .api import Request, Response
from .types import Array, Boolean, Int16, Int32, Schema, String
-class MetadataResponse_v0(Struct):
+class MetadataResponse_v0(Response):
API_KEY = 3
API_VERSION = 0
SCHEMA = Schema(
@@ -24,7 +24,7 @@ class MetadataResponse_v0(Struct):
)
-class MetadataResponse_v1(Struct):
+class MetadataResponse_v1(Response):
API_KEY = 3
API_VERSION = 1
SCHEMA = Schema(
@@ -47,7 +47,7 @@ class MetadataResponse_v1(Struct):
)
-class MetadataResponse_v2(Struct):
+class MetadataResponse_v2(Response):
API_KEY = 3
API_VERSION = 2
SCHEMA = Schema(
@@ -71,7 +71,7 @@ class MetadataResponse_v2(Struct):
)
-class MetadataRequest_v0(Struct):
+class MetadataRequest_v0(Request):
API_KEY = 3
API_VERSION = 0
RESPONSE_TYPE = MetadataResponse_v0
@@ -81,7 +81,7 @@ class MetadataRequest_v0(Struct):
ALL_TOPICS = None # Empty Array (len 0) for topics returns all topics
-class MetadataRequest_v1(Struct):
+class MetadataRequest_v1(Request):
API_KEY = 3
API_VERSION = 1
RESPONSE_TYPE = MetadataResponse_v1
@@ -90,7 +90,7 @@ class MetadataRequest_v1(Struct):
NO_TOPICS = None # Empty array (len 0) for topics returns no topics
-class MetadataRequest_v2(Struct):
+class MetadataRequest_v2(Request):
API_KEY = 3
API_VERSION = 2
RESPONSE_TYPE = MetadataResponse_v2
diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py
index 5182d63..588dfec 100644
--- a/kafka/protocol/offset.py
+++ b/kafka/protocol/offset.py
@@ -1,6 +1,6 @@
from __future__ import absolute_import
-from .struct import Struct
+from .api import Request, Response
from .types import Array, Int16, Int32, Int64, Schema, String
@@ -10,7 +10,7 @@ class OffsetResetStrategy(object):
NONE = 0
-class OffsetResponse_v0(Struct):
+class OffsetResponse_v0(Response):
API_KEY = 2
API_VERSION = 0
SCHEMA = Schema(
@@ -22,7 +22,7 @@ class OffsetResponse_v0(Struct):
('offsets', Array(Int64))))))
)
-class OffsetResponse_v1(Struct):
+class OffsetResponse_v1(Response):
API_KEY = 2
API_VERSION = 1
SCHEMA = Schema(
@@ -36,7 +36,7 @@ class OffsetResponse_v1(Struct):
)
-class OffsetRequest_v0(Struct):
+class OffsetRequest_v0(Request):
API_KEY = 2
API_VERSION = 0
RESPONSE_TYPE = OffsetResponse_v0
@@ -53,7 +53,7 @@ class OffsetRequest_v0(Struct):
'replica_id': -1
}
-class OffsetRequest_v1(Struct):
+class OffsetRequest_v1(Request):
API_KEY = 2
API_VERSION = 1
RESPONSE_TYPE = OffsetResponse_v1
diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py
index c1a519e..9b03354 100644
--- a/kafka/protocol/produce.py
+++ b/kafka/protocol/produce.py
@@ -1,11 +1,11 @@
from __future__ import absolute_import
+from .api import Request, Response
from .message import MessageSet
-from .struct import Struct
from .types import Int16, Int32, Int64, String, Array, Schema
-class ProduceResponse_v0(Struct):
+class ProduceResponse_v0(Response):
API_KEY = 0
API_VERSION = 0
SCHEMA = Schema(
@@ -18,7 +18,7 @@ class ProduceResponse_v0(Struct):
)
-class ProduceResponse_v1(Struct):
+class ProduceResponse_v1(Response):
API_KEY = 0
API_VERSION = 1
SCHEMA = Schema(
@@ -32,7 +32,7 @@ class ProduceResponse_v1(Struct):
)
-class ProduceResponse_v2(Struct):
+class ProduceResponse_v2(Response):
API_KEY = 0
API_VERSION = 2
SCHEMA = Schema(
@@ -47,7 +47,7 @@ class ProduceResponse_v2(Struct):
)
-class ProduceRequest_v0(Struct):
+class ProduceRequest_v0(Request):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = ProduceResponse_v0
@@ -61,20 +61,35 @@ class ProduceRequest_v0(Struct):
('messages', MessageSet)))))
)
+ def expect_response(self):
+ if self.required_acks == 0: # pylint: disable=no-member
+ return False
+ return True
-class ProduceRequest_v1(Struct):
+
+class ProduceRequest_v1(Request):
API_KEY = 0
API_VERSION = 1
RESPONSE_TYPE = ProduceResponse_v1
SCHEMA = ProduceRequest_v0.SCHEMA
+ def expect_response(self):
+ if self.required_acks == 0: # pylint: disable=no-member
+ return False
+ return True
+
-class ProduceRequest_v2(Struct):
+class ProduceRequest_v2(Request):
API_KEY = 0
API_VERSION = 2
RESPONSE_TYPE = ProduceResponse_v2
SCHEMA = ProduceRequest_v1.SCHEMA
+ def expect_response(self):
+ if self.required_acks == 0: # pylint: disable=no-member
+ return False
+ return True
+
ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2]
ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2]