summaryrefslogtreecommitdiff
path: root/kafka/record/abc.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/record/abc.py')
-rw-r--r--kafka/record/abc.py119
1 files changed, 119 insertions, 0 deletions
diff --git a/kafka/record/abc.py b/kafka/record/abc.py
new file mode 100644
index 0000000..4f14d76
--- /dev/null
+++ b/kafka/record/abc.py
@@ -0,0 +1,119 @@
+from __future__ import absolute_import
+import abc
+
+
+class ABCRecord(object):
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractproperty
+ def offset(self):
+ """ Absolute offset of record
+ """
+
+ @abc.abstractproperty
+ def timestamp(self):
+ """ Epoch milliseconds
+ """
+
+ @abc.abstractproperty
+ def timestamp_type(self):
+ """ CREATE_TIME(0) or APPEND_TIME(1)
+ """
+
+ @abc.abstractproperty
+ def key(self):
+ """ Bytes key or None
+ """
+
+ @abc.abstractproperty
+ def value(self):
+ """ Bytes value or None
+ """
+
+ @abc.abstractproperty
+ def checksum(self):
+ """ Prior to v2 format CRC was contained in every message. This will
+ be the checksum for v0 and v1 and None for v2 and above.
+ """
+
+ @abc.abstractproperty
+ def headers(self):
+ """ If supported by version list of key-value tuples, or empty list if
+ not supported by format.
+ """
+
+
+class ABCRecordBatchBuilder(object):
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def append(self, offset, timestamp, key, value, headers):
+ """ Writes record to internal buffer.
+
+ Arguments:
+ offset (int): Relative offset of record, starting from 0
+ timestamp (int): Timestamp in milliseconds since beginning of the
+ epoch (midnight Jan 1, 1970 (UTC))
+ key (bytes or None): Key of the record
+ value (bytes or None): Value of the record
+ headers (List[Tuple[str, bytes]]): Headers of the record. Header
+ keys can not be ``None``.
+
+ Returns:
+ (bytes, int): Checksum of the written record (or None for v2 and
+ above) and size of the written record.
+ """
+
+ @abc.abstractmethod
+ def size_in_bytes(self, offset, timestamp, key, value, headers):
+ """ Return the expected size change on buffer (uncompressed) if we add
+ this message. This will account for varint size changes and give a
+ reliable size.
+ """
+
+ @abc.abstractmethod
+ def build(self):
+ """ Close for append, compress if needed, write size and header and
+ return a ready to send bytes object.
+
+ Return:
+ io.BytesIO: finished batch, ready to send.
+ """
+
+
+class ABCRecordBatch(object):
+ """ For v2 incapsulates a RecordBatch, for v0/v1 a single (maybe
+ compressed) message.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def __iter__(self):
+ """ Return iterator over records (ABCRecord instances). Will decompress
+ if needed.
+ """
+
+
+class ABCRecords(object):
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def __init__(self, buffer):
+ """ Initialize with bytes-like object conforming to the buffer
+ interface (ie. bytes, bytearray, memoryview etc.).
+ """
+
+ @abc.abstractmethod
+ def size_in_bytes(self):
+ """ Returns the size of buffer.
+ """
+
+ @abc.abstractmethod
+ def next_batch(self):
+ """ Return next batch of records (ABCRecordBatch instances).
+ """
+
+ @abc.abstractmethod
+ def has_next(self):
+ """ True if there are more batches to read, False otherwise.
+ """