diff options
Diffstat (limited to 'kafka/protocol')
| -rw-r--r-- | kafka/protocol/metadata.py | 40 | ||||
| -rw-r--r-- | kafka/protocol/types.py | 16 |
2 files changed, 52 insertions, 4 deletions
diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py index 8063dda..2711abb 100644 --- a/kafka/protocol/metadata.py +++ b/kafka/protocol/metadata.py @@ -1,5 +1,5 @@ from .struct import Struct -from .types import Array, Int16, Int32, Schema, String +from .types import Array, Boolean, Int16, Int32, Schema, String class MetadataResponse_v0(Struct): @@ -22,14 +22,46 @@ class MetadataResponse_v0(Struct): ) +class MetadataResponse_v1(Struct): + API_KEY = 3 + API_VERSION = 1 + SCHEMA = Schema( + ('brokers', Array( + ('node_id', Int32), + ('host', String('utf-8')), + ('port', Int32), + ('rack', String('utf-8')))), + ('controller_id', Int32), + ('topics', Array( + ('error_code', Int16), + ('topic', String('utf-8')), + ('is_internal', Boolean), + ('partitions', Array( + ('error_code', Int16), + ('partition', Int32), + ('leader', Int32), + ('replicas', Array(Int32)), + ('isr', Array(Int32)))))) + ) + + class MetadataRequest_v0(Struct): API_KEY = 3 API_VERSION = 0 RESPONSE_TYPE = MetadataResponse_v0 SCHEMA = Schema( - ('topics', Array(String('utf-8'))) + ('topics', Array(String('utf-8'))) # Empty Array (len 0) for all topics + ) + + +class MetadataRequest_v1(Struct): + API_KEY = 3 + API_VERSION = 1 + RESPONSE_TYPE = MetadataResponse_v1 + SCHEMA = Schema( + ('topics', Array(String('utf-8'))) # Null Array (len -1) for all topics ) -MetadataRequest = [MetadataRequest_v0] -MetadataResponse = [MetadataResponse_v0] +MetadataRequest = [MetadataRequest_v0, MetadataRequest_v1] +MetadataResponse = [MetadataResponse_v0, MetadataResponse_v1] diff --git a/kafka/protocol/types.py b/kafka/protocol/types.py index 18aaca1..da10326 100644 --- a/kafka/protocol/types.py +++ b/kafka/protocol/types.py @@ -99,6 +99,16 @@ class Bytes(AbstractType): return value +class Boolean(AbstractType): + @classmethod + def encode(cls, value): + return _pack('>?', value) + + @classmethod + def decode(cls, data): + return _unpack('>?', data.read(1)) + + class Schema(AbstractType): def __init__(self, *fields): if fields: @@ -145,6 +155,8 @@ class Array(AbstractType): raise ValueError('Array instantiated with no array_of type') def encode(self, items): + if items is None: + return Int32.encode(-1) return b''.join( [Int32.encode(len(items))] + [self.array_of.encode(item) for item in items] @@ -152,7 +164,11 @@ class Array(AbstractType): def decode(self, data): length = Int32.decode(data) + if length == -1: + return None return [self.array_of.decode(data) for _ in range(length)] def repr(self, list_of_items): + if list_of_items is None: + return 'NULL' return '[' + ', '.join([self.array_of.repr(item) for item in list_of_items]) + ']' |
