summaryrefslogtreecommitdiff
path: root/kafka/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/protocol')
-rw-r--r--kafka/protocol/metadata.py40
-rw-r--r--kafka/protocol/types.py16
2 files changed, 52 insertions, 4 deletions
diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py
index 8063dda..2711abb 100644
--- a/kafka/protocol/metadata.py
+++ b/kafka/protocol/metadata.py
@@ -1,5 +1,5 @@
from .struct import Struct
-from .types import Array, Int16, Int32, Schema, String
+from .types import Array, Boolean, Int16, Int32, Schema, String
class MetadataResponse_v0(Struct):
@@ -22,14 +22,46 @@ class MetadataResponse_v0(Struct):
)
+class MetadataResponse_v1(Struct):
+ API_KEY = 3
+ API_VERSION = 1
+ SCHEMA = Schema(
+ ('brokers', Array(
+ ('node_id', Int32),
+ ('host', String('utf-8')),
+ ('port', Int32),
+ ('rack', String('utf-8')))),
+ ('controller_id', Int32),
+ ('topics', Array(
+ ('error_code', Int16),
+ ('topic', String('utf-8')),
+ ('is_internal', Boolean),
+ ('partitions', Array(
+ ('error_code', Int16),
+ ('partition', Int32),
+ ('leader', Int32),
+ ('replicas', Array(Int32)),
+ ('isr', Array(Int32))))))
+ )
+
+
class MetadataRequest_v0(Struct):
API_KEY = 3
API_VERSION = 0
RESPONSE_TYPE = MetadataResponse_v0
SCHEMA = Schema(
- ('topics', Array(String('utf-8')))
+ ('topics', Array(String('utf-8'))) # Empty Array (len 0) for all topics
+ )
+
+
+class MetadataRequest_v1(Struct):
+ API_KEY = 3
+ API_VERSION = 1
+ RESPONSE_TYPE = MetadataResponse_v1
+ SCHEMA = Schema(
+ ('topics', Array(String('utf-8'))) # Null Array (len -1) for all topics
)
-MetadataRequest = [MetadataRequest_v0]
-MetadataResponse = [MetadataResponse_v0]
+MetadataRequest = [MetadataRequest_v0, MetadataRequest_v1]
+MetadataResponse = [MetadataResponse_v0, MetadataResponse_v1]
diff --git a/kafka/protocol/types.py b/kafka/protocol/types.py
index 18aaca1..da10326 100644
--- a/kafka/protocol/types.py
+++ b/kafka/protocol/types.py
@@ -99,6 +99,16 @@ class Bytes(AbstractType):
return value
+class Boolean(AbstractType):
+ @classmethod
+ def encode(cls, value):
+ return _pack('>?', value)
+
+ @classmethod
+ def decode(cls, data):
+ return _unpack('>?', data.read(1))
+
+
class Schema(AbstractType):
def __init__(self, *fields):
if fields:
@@ -145,6 +155,8 @@ class Array(AbstractType):
raise ValueError('Array instantiated with no array_of type')
def encode(self, items):
+ if items is None:
+ return Int32.encode(-1)
return b''.join(
[Int32.encode(len(items))] +
[self.array_of.encode(item) for item in items]
@@ -152,7 +164,11 @@ class Array(AbstractType):
def decode(self, data):
length = Int32.decode(data)
+ if length == -1:
+ return None
return [self.array_of.decode(data) for _ in range(length)]
def repr(self, list_of_items):
+ if list_of_items is None:
+ return 'NULL'
return '[' + ', '.join([self.array_of.repr(item) for item in list_of_items]) + ']'