summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-06-19 10:11:53 -0700
committerDana Powers <dana.powers@gmail.com>2017-06-19 10:12:07 -0700
commit3a67f641683102425d08a9572cad4f358f251fa8 (patch)
tree7af6b9badd7ea438c12d7b3af1b4664c0ab0d932
parentbbbac3dc3678df069ef72ecfea62d435bc519a07 (diff)
downloadkafka-python-3a67f641683102425d08a9572cad4f358f251fa8.tar.gz
Add new FetchRequest v4/v5, MetadataRequest v3/v4
-rw-r--r--kafka/protocol/fetch.py94
-rw-r--r--kafka/protocol/metadata.py63
2 files changed, 150 insertions, 7 deletions
diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py
index b441e63..6a61300 100644
--- a/kafka/protocol/fetch.py
+++ b/kafka/protocol/fetch.py
@@ -2,7 +2,7 @@ from __future__ import absolute_import
from .api import Request, Response
from .message import MessageSet
-from .types import Array, Int16, Int32, Int64, Schema, String
+from .types import Array, Int8, Int16, Int32, Int64, Schema, String
class FetchResponse_v0(Response):
@@ -46,6 +46,45 @@ class FetchResponse_v3(Response):
SCHEMA = FetchResponse_v2.SCHEMA
+class FetchResponse_v4(Response):
+ API_KEY = 1
+ API_VERSION = 4
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('topics', Array(
+ ('topics', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16),
+ ('highwater_offset', Int64),
+ ('last_stable_offset', Int64),
+ ('aborted_transactions', Array(
+ ('producer_id', Int64),
+ ('first_offset', Int64))),
+ ('message_set', MessageSet)))))
+ )
+
+
+class FetchResponse_v5(Response):
+ API_KEY = 1
+ API_VERSION = 5
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('topics', Array(
+ ('topics', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16),
+ ('highwater_offset', Int64),
+ ('last_stable_offset', Int64),
+ ('log_start_offset', Int64),
+ ('aborted_transactions', Array(
+ ('producer_id', Int64),
+ ('first_offset', Int64))),
+ ('message_set', MessageSet)))))
+ )
+
+
class FetchRequest_v0(Request):
API_KEY = 1
API_VERSION = 0
@@ -95,7 +134,52 @@ class FetchRequest_v3(Request):
)
-FetchRequest = [FetchRequest_v0, FetchRequest_v1, FetchRequest_v2,
- FetchRequest_v3]
-FetchResponse = [FetchResponse_v0, FetchResponse_v1, FetchResponse_v2,
- FetchResponse_v3]
+class FetchRequest_v4(Request):
+ """Adds isolation_level field"""
+ API_KEY = 1
+ API_VERSION = 4
+ RESPONSE_TYPE = FetchResponse_v4
+ SCHEMA = Schema(
+ ('replica_id', Int32),
+ ('max_wait_time', Int32),
+ ('min_bytes', Int32),
+ ('max_bytes', Int32),
+ ('isolation_level', Int8),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('offset', Int64),
+ ('max_bytes', Int32)))))
+ )
+
+
+class FetchRequest_v5(Request):
+ """This may only be used in broker-broker api calls"""
+ API_KEY = 1
+ API_VERSION = 5
+ RESPONSE_TYPE = FetchResponse_v5
+ SCHEMA = Schema(
+ ('replica_id', Int32),
+ ('max_wait_time', Int32),
+ ('min_bytes', Int32),
+ ('max_bytes', Int32),
+ ('isolation_level', Int8),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('fetch_offset', Int64),
+ ('log_start_offset', Int64),
+ ('max_bytes', Int32)))))
+ )
+
+
+FetchRequest = [
+ FetchRequest_v0, FetchRequest_v1, FetchRequest_v2,
+ FetchRequest_v3, FetchRequest_v4, FetchRequest_v5
+]
+FetchResponse = [
+ FetchResponse_v0, FetchResponse_v1, FetchResponse_v2,
+ FetchResponse_v3, FetchResponse_v4, FetchResponse_v5
+]
diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py
index 907ec25..4db9acf 100644
--- a/kafka/protocol/metadata.py
+++ b/kafka/protocol/metadata.py
@@ -71,6 +71,37 @@ class MetadataResponse_v2(Response):
)
+class MetadataResponse_v3(Response):
+ API_KEY = 3
+ API_VERSION = 3
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('brokers', Array(
+ ('node_id', Int32),
+ ('host', String('utf-8')),
+ ('port', Int32),
+ ('rack', String('utf-8')))),
+ ('cluster_id', String('utf-8')), # <-- Added cluster_id field in v2
+ ('controller_id', Int32),
+ ('topics', Array(
+ ('error_code', Int16),
+ ('topic', String('utf-8')),
+ ('is_internal', Boolean),
+ ('partitions', Array(
+ ('error_code', Int16),
+ ('partition', Int32),
+ ('leader', Int32),
+ ('replicas', Array(Int32)),
+ ('isr', Array(Int32))))))
+ )
+
+
+class MetadataResponse_v4(Response):
+ API_KEY = 3
+ API_VERSION = 4
+ SCHEMA = MetadataResponse_v3.SCHEMA
+
+
class MetadataRequest_v0(Request):
API_KEY = 3
API_VERSION = 0
@@ -95,8 +126,36 @@ class MetadataRequest_v2(Request):
API_VERSION = 2
RESPONSE_TYPE = MetadataResponse_v2
SCHEMA = MetadataRequest_v1.SCHEMA
+ ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics
+ NO_TOPICS = None # Empty array (len 0) for topics returns no topics
+
+
+class MetadataRequest_v3(Request):
+ API_KEY = 3
+ API_VERSION = 3
+ RESPONSE_TYPE = MetadataResponse_v3
+ SCHEMA = MetadataRequest_v1.SCHEMA
+ ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics
+ NO_TOPICS = None # Empty array (len 0) for topics returns no topics
+
+
+class MetadataRequest_v4(Request):
+ API_KEY = 3
+ API_VERSION = 4
+ RESPONSE_TYPE = MetadataResponse_v4
+ SCHEMA = Schema(
+ ('topics', Array(String('utf-8'))),
+ ('allow_auto_topic_creation', Boolean)
+ )
+ ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics
+ NO_TOPICS = None # Empty array (len 0) for topics returns no topics
-MetadataRequest = [MetadataRequest_v0, MetadataRequest_v1, MetadataRequest_v2]
+MetadataRequest = [
+ MetadataRequest_v0, MetadataRequest_v1, MetadataRequest_v2,
+ MetadataRequest_v3, MetadataRequest_v4
+]
MetadataResponse = [
- MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2]
+ MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2,
+ MetadataResponse_v3, MetadataResponse_v4
+]