summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/protocol/abstract.py4
-rw-r--r--kafka/protocol/message.py6
-rw-r--r--kafka/protocol/struct.py8
-rw-r--r--kafka/protocol/types.py14
4 files changed, 27 insertions, 5 deletions
diff --git a/kafka/protocol/abstract.py b/kafka/protocol/abstract.py
index 9c53c8c..532d15e 100644
--- a/kafka/protocol/abstract.py
+++ b/kafka/protocol/abstract.py
@@ -11,3 +11,7 @@ class AbstractType(object):
@abc.abstractmethod
def decode(cls, data):
pass
+
+ @classmethod
+ def repr(cls, value):
+ return repr(value)
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index 4024a5c..4f84c43 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -1,6 +1,6 @@
from .struct import Struct
from .types import (
- Int8, Int16, Int32, Int64, Bytes, String, Array, Schema, AbstractType
+ Int8, Int32, Int64, Bytes, Schema, AbstractType
)
from ..util import crc32
@@ -67,3 +67,7 @@ class MessageSet(AbstractType):
msg_size = items[-1][1]
bytes_read += (8 + 4 + msg_size)
return items
+
+ @classmethod
+ def repr(cls, messages):
+ return '[' + ', '.join([cls.ITEM.repr(m) for m in messages]) + ']'
diff --git a/kafka/protocol/struct.py b/kafka/protocol/struct.py
index 77f5fe7..30e233c 100644
--- a/kafka/protocol/struct.py
+++ b/kafka/protocol/struct.py
@@ -1,4 +1,4 @@
-from collections import namedtuple
+#from collections import namedtuple
from io import BytesIO
from .abstract import AbstractType
@@ -39,10 +39,12 @@ class Struct(AbstractType):
return cls(*[field.decode(data) for field in cls.SCHEMA.fields])
def __repr__(self):
- key_vals =['%s=%r' % (name, self.__dict__[name])
- for name in self.SCHEMA.names]
+ key_vals = []
+ for name, field in zip(self.SCHEMA.names, self.SCHEMA.fields):
+ key_vals.append('%s=%s' % (name, field.repr(self.__dict__[name])))
return self.__class__.__name__ + '(' + ', '.join(key_vals) + ')'
+
"""
class MetaStruct(type):
def __new__(cls, clsname, bases, dct):
diff --git a/kafka/protocol/types.py b/kafka/protocol/types.py
index 5aa2e41..99d89a6 100644
--- a/kafka/protocol/types.py
+++ b/kafka/protocol/types.py
@@ -1,6 +1,5 @@
from __future__ import absolute_import
-import abc
from struct import pack, unpack
from .abstract import AbstractType
@@ -104,6 +103,16 @@ class Schema(AbstractType):
def __len__(self):
return len(self.fields)
+ def repr(self, value):
+ key_vals = []
+ for i in range(len(self)):
+ try:
+ field_val = getattr(value, self.names[i])
+ except AttributeError:
+ field_val = value[i]
+ key_vals.append('%s=%s' % (self.names[i], self.fields[i].repr(field_val)))
+ return '(' + ', '.join(key_vals) + ')'
+
class Array(AbstractType):
def __init__(self, *array_of):
@@ -124,3 +133,6 @@ class Array(AbstractType):
def decode(self, data):
length = Int32.decode(data)
return [self.array_of.decode(data) for _ in range(length)]
+
+ def repr(self, list_of_items):
+ return '[' + ', '.join([self.array_of.repr(item) for item in list_of_items]) + ']'