summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-11-23 04:15:19 +0800
committerZack Dever <zack.dever@rd.io>2015-12-04 11:25:39 -0800
commitf6edeafac3f42f5407629dcfb1ddd4357dbf5445 (patch)
tree3e0f6d75e1d87872506c0c103e2ed6c0e8b913d7
parentf0cd6d4082d7abe95693f63b4697cb4ed2b8a6d8 (diff)
downloadkafka-python-f6edeafac3f42f5407629dcfb1ddd4357dbf5445.tar.gz
Add base api type classes w/ encoders in kafka.protocol.types
-rw-r--r--kafka/protocol/types.py55
1 files changed, 55 insertions, 0 deletions
diff --git a/kafka/protocol/types.py b/kafka/protocol/types.py
new file mode 100644
index 0000000..6b257d3
--- /dev/null
+++ b/kafka/protocol/types.py
@@ -0,0 +1,55 @@
+from struct import pack
+
+
+class AbstractField(object):
+ def __init__(self, name):
+ self.name = name
+
+
+class Int8(AbstractField):
+ @classmethod
+ def encode(cls, value):
+ return pack('>b', value)
+
+
+class Int16(AbstractField):
+ @classmethod
+ def encode(cls, value):
+ return pack('>h', value)
+
+
+class Int32(AbstractField):
+ @classmethod
+ def encode(cls, value):
+ return pack('>i', value)
+
+
+class Int64(AbstractField):
+ @classmethod
+ def encode(cls, value):
+ return pack('>q', value)
+
+
+class String(AbstractField):
+ @classmethod
+ def encode(cls, value):
+ if value is None:
+ return Int16.encode(-1)
+ else:
+ return Int16.encode(len(value)) + value
+
+
+class Bytes(AbstractField):
+ @classmethod
+ def encode(cls, value):
+ if value is None:
+ return Int32.encode(-1)
+ else:
+ return Int32.encode(len(value)) + value
+
+
+class Array(object):
+ @classmethod
+ def encode(cls, values):
+ # Assume that values are already encoded
+ return Int32.encode(len(values)) + b''.join(values)