summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorTaras Voinarovskyi <voyn1991@gmail.com>2017-10-25 07:28:35 +0900
committerGitHub <noreply@github.com>2017-10-25 07:28:35 +0900
commit8b05ee8da50b4c7b832676f4e38f9d92a86639cc (patch)
tree91fe16e3c9aff44ca93633824b96da4b8ff19384 /kafka
parent4213d53d4ccfd239addc1db07b5b3913b4c6547c (diff)
downloadkafka-python-8b05ee8da50b4c7b832676f4e38f9d92a86639cc.tar.gz
Add DefaultRecordBatch implementation aka V2 message format parser/builder. (#1185)
Added bytecode optimization for varint and append/read_msg functions. Mostly based on avoiding LOAD_GLOBAL calls.
Diffstat (limited to 'kafka')
-rw-r--r--kafka/consumer/fetcher.py30
-rw-r--r--kafka/producer/kafka.py15
-rw-r--r--kafka/producer/sender.py9
-rw-r--r--kafka/record/README8
-rw-r--r--kafka/record/_crc32c.py143
-rw-r--r--kafka/record/abc.py10
-rw-r--r--kafka/record/default_records.py595
-rw-r--r--kafka/record/legacy_records.py3
-rw-r--r--kafka/record/memory_records.py23
-rw-r--r--kafka/record/util.py119
10 files changed, 932 insertions, 23 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index d3ee26e..ddd7567 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -23,6 +23,10 @@ from kafka.structs import TopicPartition, OffsetAndTimestamp
log = logging.getLogger(__name__)
+# Isolation levels
+READ_UNCOMMITTED = 0
+READ_COMMITTED = 1
+
ConsumerRecord = collections.namedtuple("ConsumerRecord",
["topic", "partition", "offset", "timestamp", "timestamp_type",
"key", "value", "checksum", "serialized_key_size", "serialized_value_size"])
@@ -114,6 +118,7 @@ class Fetcher(six.Iterator):
self._iterator = None
self._fetch_futures = collections.deque()
self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
+ self._isolation_level = READ_UNCOMMITTED
def send_fetches(self):
"""Send FetchRequests for all assigned partitions that do not already have
@@ -670,7 +675,9 @@ class Fetcher(six.Iterator):
log.debug("Adding fetch request for partition %s at offset %d",
partition, position)
- if self.config['api_version'] >= (0, 10, 1):
+ if self.config['api_version'] >= (0, 11, 0):
+ version = 4
+ elif self.config['api_version'] >= (0, 10, 1):
version = 3
elif self.config['api_version'] >= (0, 10):
version = 2
@@ -696,12 +703,21 @@ class Fetcher(six.Iterator):
# dicts retain insert order.
partition_data = list(partition_data.items())
random.shuffle(partition_data)
- requests[node_id] = FetchRequest[version](
- -1, # replica_id
- self.config['fetch_max_wait_ms'],
- self.config['fetch_min_bytes'],
- self.config['fetch_max_bytes'],
- partition_data)
+ if version == 3:
+ requests[node_id] = FetchRequest[version](
+ -1, # replica_id
+ self.config['fetch_max_wait_ms'],
+ self.config['fetch_min_bytes'],
+ self.config['fetch_max_bytes'],
+ partition_data)
+ else:
+ requests[node_id] = FetchRequest[version](
+ -1, # replica_id
+ self.config['fetch_max_wait_ms'],
+ self.config['fetch_min_bytes'],
+ self.config['fetch_max_bytes'],
+ self._isolation_level,
+ partition_data)
return requests
def _handle_fetch_response(self, request, send_time, response):
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 0ffc29c..646e773 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -15,6 +15,7 @@ from ..client_async import KafkaClient, selectors
from ..codec import has_gzip, has_snappy, has_lz4
from ..metrics import MetricConfig, Metrics
from ..partitioner.default import DefaultPartitioner
+from ..record.default_records import DefaultRecordBatchBuilder
from ..record.legacy_records import LegacyRecordBatchBuilder
from ..serializer import Serializer
from ..structs import TopicPartition
@@ -486,15 +487,21 @@ class KafkaProducer(object):
return self._wait_on_metadata(topic, max_wait)
def _max_usable_produce_magic(self):
- if self.config['api_version'] >= (0, 10):
+ if self.config['api_version'] >= (0, 11):
+ return 2
+ elif self.config['api_version'] >= (0, 10):
return 1
else:
return 0
- def _estimate_size_in_bytes(self, key, value):
+ def _estimate_size_in_bytes(self, key, value, headers=[]):
magic = self._max_usable_produce_magic()
- return LegacyRecordBatchBuilder.estimate_size_in_bytes(
- magic, self.config['compression_type'], key, value)
+ if magic == 2:
+ return DefaultRecordBatchBuilder.estimate_size_in_bytes(
+ key, value, headers)
+ else:
+ return LegacyRecordBatchBuilder.estimate_size_in_bytes(
+ magic, self.config['compression_type'], key, value)
def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
"""Publish a message to a topic.
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 72a15bb..ffc67f8 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -291,7 +291,11 @@ class Sender(threading.Thread):
buf = batch.records.buffer()
produce_records_by_partition[topic][partition] = buf
- if self.config['api_version'] >= (0, 10):
+ kwargs = {}
+ if self.config['api_version'] >= (0, 11):
+ version = 3
+ kwargs = dict(transactional_id=None)
+ elif self.config['api_version'] >= (0, 10):
version = 2
elif self.config['api_version'] == (0, 9):
version = 1
@@ -302,7 +306,8 @@ class Sender(threading.Thread):
timeout=timeout,
topics=[(topic, list(partition_info.items()))
for topic, partition_info
- in six.iteritems(produce_records_by_partition)]
+ in six.iteritems(produce_records_by_partition)],
+ **kwargs
)
def wakeup(self):
diff --git a/kafka/record/README b/kafka/record/README
new file mode 100644
index 0000000..e445455
--- /dev/null
+++ b/kafka/record/README
@@ -0,0 +1,8 @@
+Module structured mostly based on
+kafka/clients/src/main/java/org/apache/kafka/common/record/ module of Java
+Client.
+
+See abc.py for abstract declarations. `ABCRecords` is used as a facade to hide
+version differences. `ABCRecordBatch` subclasses will implement actual parsers
+for different versions (v0/v1 as LegacyBatch and v2 as DefaultBatch. Names
+taken from Java).
diff --git a/kafka/record/_crc32c.py b/kafka/record/_crc32c.py
new file mode 100644
index 0000000..5704f82
--- /dev/null
+++ b/kafka/record/_crc32c.py
@@ -0,0 +1,143 @@
+#!/usr/bin/env python
+#
+# Taken from https://cloud.google.com/appengine/docs/standard/python/refdocs/\
+# modules/google/appengine/api/files/crc32c?hl=ru
+#
+# Copyright 2007 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Implementation of CRC-32C checksumming as in rfc3720 section B.4.
+See http://en.wikipedia.org/wiki/Cyclic_redundancy_check for details on CRC-32C
+This code is a manual python translation of c code generated by
+pycrc 0.7.1 (http://www.tty1.net/pycrc/). Command line used:
+'./pycrc.py --model=crc-32c --generate c --algorithm=table-driven'
+"""
+
+import array
+
+CRC_TABLE = (
+ 0x00000000, 0xf26b8303, 0xe13b70f7, 0x1350f3f4,
+ 0xc79a971f, 0x35f1141c, 0x26a1e7e8, 0xd4ca64eb,
+ 0x8ad958cf, 0x78b2dbcc, 0x6be22838, 0x9989ab3b,
+ 0x4d43cfd0, 0xbf284cd3, 0xac78bf27, 0x5e133c24,
+ 0x105ec76f, 0xe235446c, 0xf165b798, 0x030e349b,
+ 0xd7c45070, 0x25afd373, 0x36ff2087, 0xc494a384,
+ 0x9a879fa0, 0x68ec1ca3, 0x7bbcef57, 0x89d76c54,
+ 0x5d1d08bf, 0xaf768bbc, 0xbc267848, 0x4e4dfb4b,
+ 0x20bd8ede, 0xd2d60ddd, 0xc186fe29, 0x33ed7d2a,
+ 0xe72719c1, 0x154c9ac2, 0x061c6936, 0xf477ea35,
+ 0xaa64d611, 0x580f5512, 0x4b5fa6e6, 0xb93425e5,
+ 0x6dfe410e, 0x9f95c20d, 0x8cc531f9, 0x7eaeb2fa,
+ 0x30e349b1, 0xc288cab2, 0xd1d83946, 0x23b3ba45,
+ 0xf779deae, 0x05125dad, 0x1642ae59, 0xe4292d5a,
+ 0xba3a117e, 0x4851927d, 0x5b016189, 0xa96ae28a,
+ 0x7da08661, 0x8fcb0562, 0x9c9bf696, 0x6ef07595,
+ 0x417b1dbc, 0xb3109ebf, 0xa0406d4b, 0x522bee48,
+ 0x86e18aa3, 0x748a09a0, 0x67dafa54, 0x95b17957,
+ 0xcba24573, 0x39c9c670, 0x2a993584, 0xd8f2b687,
+ 0x0c38d26c, 0xfe53516f, 0xed03a29b, 0x1f682198,
+ 0x5125dad3, 0xa34e59d0, 0xb01eaa24, 0x42752927,
+ 0x96bf4dcc, 0x64d4cecf, 0x77843d3b, 0x85efbe38,
+ 0xdbfc821c, 0x2997011f, 0x3ac7f2eb, 0xc8ac71e8,
+ 0x1c661503, 0xee0d9600, 0xfd5d65f4, 0x0f36e6f7,
+ 0x61c69362, 0x93ad1061, 0x80fde395, 0x72966096,
+ 0xa65c047d, 0x5437877e, 0x4767748a, 0xb50cf789,
+ 0xeb1fcbad, 0x197448ae, 0x0a24bb5a, 0xf84f3859,
+ 0x2c855cb2, 0xdeeedfb1, 0xcdbe2c45, 0x3fd5af46,
+ 0x7198540d, 0x83f3d70e, 0x90a324fa, 0x62c8a7f9,
+ 0xb602c312, 0x44694011, 0x5739b3e5, 0xa55230e6,
+ 0xfb410cc2, 0x092a8fc1, 0x1a7a7c35, 0xe811ff36,
+ 0x3cdb9bdd, 0xceb018de, 0xdde0eb2a, 0x2f8b6829,
+ 0x82f63b78, 0x709db87b, 0x63cd4b8f, 0x91a6c88c,
+ 0x456cac67, 0xb7072f64, 0xa457dc90, 0x563c5f93,
+ 0x082f63b7, 0xfa44e0b4, 0xe9141340, 0x1b7f9043,
+ 0xcfb5f4a8, 0x3dde77ab, 0x2e8e845f, 0xdce5075c,
+ 0x92a8fc17, 0x60c37f14, 0x73938ce0, 0x81f80fe3,
+ 0x55326b08, 0xa759e80b, 0xb4091bff, 0x466298fc,
+ 0x1871a4d8, 0xea1a27db, 0xf94ad42f, 0x0b21572c,
+ 0xdfeb33c7, 0x2d80b0c4, 0x3ed04330, 0xccbbc033,
+ 0xa24bb5a6, 0x502036a5, 0x4370c551, 0xb11b4652,
+ 0x65d122b9, 0x97baa1ba, 0x84ea524e, 0x7681d14d,
+ 0x2892ed69, 0xdaf96e6a, 0xc9a99d9e, 0x3bc21e9d,
+ 0xef087a76, 0x1d63f975, 0x0e330a81, 0xfc588982,
+ 0xb21572c9, 0x407ef1ca, 0x532e023e, 0xa145813d,
+ 0x758fe5d6, 0x87e466d5, 0x94b49521, 0x66df1622,
+ 0x38cc2a06, 0xcaa7a905, 0xd9f75af1, 0x2b9cd9f2,
+ 0xff56bd19, 0x0d3d3e1a, 0x1e6dcdee, 0xec064eed,
+ 0xc38d26c4, 0x31e6a5c7, 0x22b65633, 0xd0ddd530,
+ 0x0417b1db, 0xf67c32d8, 0xe52cc12c, 0x1747422f,
+ 0x49547e0b, 0xbb3ffd08, 0xa86f0efc, 0x5a048dff,
+ 0x8ecee914, 0x7ca56a17, 0x6ff599e3, 0x9d9e1ae0,
+ 0xd3d3e1ab, 0x21b862a8, 0x32e8915c, 0xc083125f,
+ 0x144976b4, 0xe622f5b7, 0xf5720643, 0x07198540,
+ 0x590ab964, 0xab613a67, 0xb831c993, 0x4a5a4a90,
+ 0x9e902e7b, 0x6cfbad78, 0x7fab5e8c, 0x8dc0dd8f,
+ 0xe330a81a, 0x115b2b19, 0x020bd8ed, 0xf0605bee,
+ 0x24aa3f05, 0xd6c1bc06, 0xc5914ff2, 0x37faccf1,
+ 0x69e9f0d5, 0x9b8273d6, 0x88d28022, 0x7ab90321,
+ 0xae7367ca, 0x5c18e4c9, 0x4f48173d, 0xbd23943e,
+ 0xf36e6f75, 0x0105ec76, 0x12551f82, 0xe03e9c81,
+ 0x34f4f86a, 0xc69f7b69, 0xd5cf889d, 0x27a40b9e,
+ 0x79b737ba, 0x8bdcb4b9, 0x988c474d, 0x6ae7c44e,
+ 0xbe2da0a5, 0x4c4623a6, 0x5f16d052, 0xad7d5351,
+)
+
+CRC_INIT = 0
+_MASK = 0xFFFFFFFF
+
+
+def crc_update(crc, data):
+ """Update CRC-32C checksum with data.
+ Args:
+ crc: 32-bit checksum to update as long.
+ data: byte array, string or iterable over bytes.
+ Returns:
+ 32-bit updated CRC-32C as long.
+ """
+ if type(data) != array.array or data.itemsize != 1:
+ buf = array.array("B", data)
+ else:
+ buf = data
+ crc = crc ^ _MASK
+ for b in buf:
+ table_index = (crc ^ b) & 0xff
+ crc = (CRC_TABLE[table_index] ^ (crc >> 8)) & _MASK
+ return crc ^ _MASK
+
+
+def crc_finalize(crc):
+ """Finalize CRC-32C checksum.
+ This function should be called as last step of crc calculation.
+ Args:
+ crc: 32-bit checksum as long.
+ Returns:
+ finalized 32-bit checksum as long
+ """
+ return crc & _MASK
+
+
+def crc(data):
+ """Compute CRC-32C checksum of the data.
+ Args:
+ data: byte array, string or iterable over bytes.
+ Returns:
+ 32-bit CRC-32C checksum of data as long.
+ """
+ return crc_finalize(crc_update(CRC_INIT, data))
+
+
+if __name__ == "__main__":
+ import sys
+ data = sys.stdin.read()
+ print(hex(crc(data)))
diff --git a/kafka/record/abc.py b/kafka/record/abc.py
index 8a27276..83121c6 100644
--- a/kafka/record/abc.py
+++ b/kafka/record/abc.py
@@ -36,12 +36,18 @@ class ABCRecord(object):
be the checksum for v0 and v1 and None for v2 and above.
"""
+ @abc.abstractproperty
+ def headers(self):
+ """ If supported by version list of key-value tuples, or empty list if
+ not supported by format.
+ """
+
class ABCRecordBatchBuilder(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
- def append(self, offset, timestamp, key, value):
+ def append(self, offset, timestamp, key, value, headers=None):
""" Writes record to internal buffer.
Arguments:
@@ -51,6 +57,8 @@ class ABCRecordBatchBuilder(object):
set to current time.
key (bytes or None): Key of the record
value (bytes or None): Value of the record
+ headers (List[Tuple[str, bytes]]): Headers of the record. Header
+ keys can not be ``None``.
Returns:
(bytes, int): Checksum of the written record (or None for v2 and
diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py
new file mode 100644
index 0000000..3d517af
--- /dev/null
+++ b/kafka/record/default_records.py
@@ -0,0 +1,595 @@
+# See:
+# https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/\
+# apache/kafka/common/record/DefaultRecordBatch.java
+# https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/\
+# apache/kafka/common/record/DefaultRecord.java
+
+# RecordBatch and Record implementation for magic 2 and above.
+# The schema is given below:
+
+# RecordBatch =>
+# BaseOffset => Int64
+# Length => Int32
+# PartitionLeaderEpoch => Int32
+# Magic => Int8
+# CRC => Uint32
+# Attributes => Int16
+# LastOffsetDelta => Int32 // also serves as LastSequenceDelta
+# FirstTimestamp => Int64
+# MaxTimestamp => Int64
+# ProducerId => Int64
+# ProducerEpoch => Int16
+# BaseSequence => Int32
+# Records => [Record]
+
+# Record =>
+# Length => Varint
+# Attributes => Int8
+# TimestampDelta => Varlong
+# OffsetDelta => Varint
+# Key => Bytes
+# Value => Bytes
+# Headers => [HeaderKey HeaderValue]
+# HeaderKey => String
+# HeaderValue => Bytes
+
+# Note that when compression is enabled (see attributes below), the compressed
+# record data is serialized directly following the count of the number of
+# records. (ie Records => [Record], but without length bytes)
+
+# The CRC covers the data from the attributes to the end of the batch (i.e. all
+# the bytes that follow the CRC). It is located after the magic byte, which
+# means that clients must parse the magic byte before deciding how to interpret
+# the bytes between the batch length and the magic byte. The partition leader
+# epoch field is not included in the CRC computation to avoid the need to
+# recompute the CRC when this field is assigned for every batch that is
+# received by the broker. The CRC-32C (Castagnoli) polynomial is used for the
+# computation.
+
+# The current RecordBatch attributes are given below:
+#
+# * Unused (6-15)
+# * Control (5)
+# * Transactional (4)
+# * Timestamp Type (3)
+# * Compression Type (0-2)
+
+import io
+import struct
+import time
+from .abc import ABCRecord, ABCRecordBatch, ABCRecordBatchBuilder
+from .util import decode_varint, encode_varint, calc_crc32c, size_of_varint
+
+from kafka.errors import CorruptRecordException
+from kafka.codec import (
+ gzip_encode, snappy_encode, lz4_encode,
+ gzip_decode, snappy_decode, lz4_decode
+)
+
+
+class DefaultRecordBase(object):
+
+ HEADER_STRUCT = struct.Struct(
+ ">q" # BaseOffset => Int64
+ "i" # Length => Int32
+ "i" # PartitionLeaderEpoch => Int32
+ "b" # Magic => Int8
+ "I" # CRC => Uint32
+ "h" # Attributes => Int16
+ "i" # LastOffsetDelta => Int32 // also serves as LastSequenceDelta
+ "q" # FirstTimestamp => Int64
+ "q" # MaxTimestamp => Int64
+ "q" # ProducerId => Int64
+ "h" # ProducerEpoch => Int16
+ "i" # BaseSequence => Int32
+ "i" # Records count => Int32
+ )
+ # Byte offset in HEADER_STRUCT of attributes field. Used to calculate CRC
+ ATTRIBUTES_OFFSET = struct.calcsize(">qiibI")
+ CRC_OFFSET = struct.calcsize(">qiib")
+ AFTER_LEN_OFFSET = struct.calcsize(">qi")
+
+ CODEC_MASK = 0x07
+ CODEC_NONE = 0x00
+ CODEC_GZIP = 0x01
+ CODEC_SNAPPY = 0x02
+ CODEC_LZ4 = 0x03
+ TIMESTAMP_TYPE_MASK = 0x08
+ TRANSACTIONAL_MASK = 0x10
+ CONTROL_MASK = 0x20
+
+ LOG_APPEND_TIME = 1
+ CREATE_TIME = 0
+
+
+class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch):
+
+ def __init__(self, buffer):
+ self._buffer = bytearray(buffer)
+ self._header_data = self.HEADER_STRUCT.unpack_from(self._buffer)
+ self._pos = self.HEADER_STRUCT.size
+ self._num_records = self._header_data[12]
+ self._next_record_index = 0
+ self._decompressed = False
+
+ @property
+ def base_offset(self):
+ return self._header_data[0]
+
+ @property
+ def magic(self):
+ return self._header_data[3]
+
+ @property
+ def crc(self):
+ return self._header_data[4]
+
+ @property
+ def attributes(self):
+ return self._header_data[5]
+
+ @property
+ def compression_type(self):
+ return self.attributes & self.CODEC_MASK
+
+ @property
+ def timestamp_type(self):
+ return int(bool(self.attributes & self.TIMESTAMP_TYPE_MASK))
+
+ @property
+ def is_transactional(self):
+ return bool(self.attributes & self.TRANSACTIONAL_MASK)
+
+ @property
+ def is_control_batch(self):
+ return bool(self.attributes & self.CONTROL_MASK)
+
+ @property
+ def first_timestamp(self):
+ return self._header_data[7]
+
+ @property
+ def max_timestamp(self):
+ return self._header_data[8]
+
+ def _maybe_uncompress(self):
+ if not self._decompressed:
+ compression_type = self.compression_type
+ if compression_type != self.CODEC_NONE:
+ data = memoryview(self._buffer)[self._pos:]
+ if compression_type == self.CODEC_GZIP:
+ uncompressed = gzip_decode(data)
+ if compression_type == self.CODEC_SNAPPY:
+ uncompressed = snappy_decode(data.tobytes())
+ if compression_type == self.CODEC_LZ4:
+ uncompressed = lz4_decode(data.tobytes())
+ self._buffer = bytearray(uncompressed)
+ self._pos = 0
+ self._decompressed = True
+
+ def _read_msg(
+ self,
+ decode_varint=decode_varint):
+ # Record =>
+ # Length => Varint
+ # Attributes => Int8
+ # TimestampDelta => Varlong
+ # OffsetDelta => Varint
+ # Key => Bytes
+ # Value => Bytes
+ # Headers => [HeaderKey HeaderValue]
+ # HeaderKey => String
+ # HeaderValue => Bytes
+
+ buffer = self._buffer
+ pos = self._pos
+ length, pos = decode_varint(buffer, pos)
+ start_pos = pos
+ _, pos = decode_varint(buffer, pos) # attrs can be skipped for now
+
+ ts_delta, pos = decode_varint(buffer, pos)
+ if self.timestamp_type == self.LOG_APPEND_TIME:
+ timestamp = self.max_timestamp
+ else:
+ timestamp = self.first_timestamp + ts_delta
+
+ offset_delta, pos = decode_varint(buffer, pos)
+ offset = self.base_offset + offset_delta
+
+ key_len, pos = decode_varint(buffer, pos)
+ if key_len >= 0:
+ key = bytes(buffer[pos: pos + key_len])
+ pos += key_len
+ else:
+ key = None
+
+ value_len, pos = decode_varint(buffer, pos)
+ if value_len >= 0:
+ value = bytes(buffer[pos: pos + value_len])
+ pos += value_len
+ else:
+ value = None
+
+ header_count, pos = decode_varint(buffer, pos)
+ if header_count < 0:
+ raise CorruptRecordException("Found invalid number of record "
+ "headers {}".format(header_count))
+ headers = []
+ while header_count:
+ # Header key is of type String, that can't be None
+ h_key_len, pos = decode_varint(buffer, pos)
+ if h_key_len < 0:
+ raise CorruptRecordException(
+ "Invalid negative header key size {}".format(h_key_len))
+ h_key = buffer[pos: pos + h_key_len].decode("utf-8")
+ pos += h_key_len
+
+ # Value is of type NULLABLE_BYTES, so it can be None
+ h_value_len, pos = decode_varint(buffer, pos)
+ if h_value_len >= 0:
+ h_value = bytes(buffer[pos: pos + h_value_len])
+ pos += h_value_len
+ else:
+ h_value = None
+
+ headers.append((h_key, h_value))
+ header_count -= 1
+
+ # validate whether we have read all header bytes in the current record
+ if pos - start_pos != length:
+ CorruptRecordException(
+ "Invalid record size: expected to read {} bytes in record "
+ "payload, but instead read {}".format(length, pos - start_pos))
+ self._pos = pos
+
+ return DefaultRecord(
+ offset, timestamp, self.timestamp_type, key, value, headers)
+
+ def __iter__(self):
+ self._maybe_uncompress()
+ return self
+
+ def __next__(self):
+ if self._next_record_index >= self._num_records:
+ if self._pos != len(self._buffer):
+ raise CorruptRecordException(
+ "{} unconsumed bytes after all records consumed".format(
+ len(self._buffer) - self._pos))
+ raise StopIteration
+ try:
+ msg = self._read_msg()
+ except (ValueError, IndexError) as err:
+ raise CorruptRecordException(
+ "Found invalid record structure: {!r}".format(err))
+ else:
+ self._next_record_index += 1
+ return msg
+
+ next = __next__
+
+ def validate_crc(self):
+ assert self._decompressed is False, \
+ "Validate should be called before iteration"
+
+ crc = self.crc
+ data_view = memoryview(self._buffer)[self.ATTRIBUTES_OFFSET:]
+ verify_crc = calc_crc32c(data_view.tobytes())
+ return crc == verify_crc
+
+
+class DefaultRecord(ABCRecord):
+
+ __slots__ = ("_offset", "_timestamp", "_timestamp_type", "_key", "_value",
+ "_headers")
+
+ def __init__(self, offset, timestamp, timestamp_type, key, value, headers):
+ self._offset = offset
+ self._timestamp = timestamp
+ self._timestamp_type = timestamp_type
+ self._key = key
+ self._value = value
+ self._headers = headers
+
+ @property
+ def offset(self):
+ return self._offset
+
+ @property
+ def timestamp(self):
+ """ Epoch milliseconds
+ """
+ return self._timestamp
+
+ @property
+ def timestamp_type(self):
+ """ CREATE_TIME(0) or APPEND_TIME(1)
+ """
+ return self._timestamp_type
+
+ @property
+ def key(self):
+ """ Bytes key or None
+ """
+ return self._key
+
+ @property
+ def value(self):
+ """ Bytes value or None
+ """
+ return self._value
+
+ @property
+ def headers(self):
+ return self._headers
+
+ @property
+ def checksum(self):
+ return None
+
+ def __repr__(self):
+ return (
+ "DefaultRecord(offset={!r}, timestamp={!r}, timestamp_type={!r},"
+ " key={!r}, value={!r}, headers={!r})".format(
+ self._offset, self._timestamp, self._timestamp_type,
+ self._key, self._value, self._headers)
+ )
+
+
+class DefaultRecordBatchBuilder(DefaultRecordBase, ABCRecordBatchBuilder):
+
+ # excluding key, value and headers:
+ # 5 bytes length + 10 bytes timestamp + 5 bytes offset + 1 byte attributes
+ MAX_RECORD_OVERHEAD = 21
+
+ def __init__(
+ self, magic, compression_type, is_transactional,
+ producer_id, producer_epoch, base_sequence, batch_size):
+ assert magic >= 2
+ self._magic = magic
+ self._compression_type = compression_type & self.CODEC_MASK
+ self._batch_size = batch_size
+ self._is_transactional = bool(is_transactional)
+ # KIP-98 fields for EOS
+ self._producer_id = producer_id
+ self._producer_epoch = producer_epoch
+ self._base_sequence = base_sequence
+
+ self._first_timestamp = None
+ self._max_timestamp = None
+ self._last_offset = 0
+ self._num_records = 0
+
+ self._buffer = bytearray(self.HEADER_STRUCT.size)
+
+ def _get_attributes(self, include_compression_type=True):
+ attrs = 0
+ if include_compression_type:
+ attrs |= self._compression_type
+ # Timestamp Type is set by Broker
+ if self._is_transactional:
+ attrs |= self.TRANSACTIONAL_MASK
+ # Control batches are only created by Broker
+ return attrs
+
+ def append(self, offset, timestamp, key, value, headers,
+ # Cache for LOAD_FAST opcodes
+ encode_varint=encode_varint, size_of_varint=size_of_varint,
+ get_type=type, type_int=int, time_time=time.time,
+ byte_like=(bytes, bytearray, memoryview),
+ bytearray_type=bytearray, len_func=len, zero_len_varint=1
+ ):
+ """ Write message to messageset buffer with MsgVersion 2
+ """
+ # Check types
+ if get_type(offset) != type_int:
+ raise TypeError(offset)
+ if timestamp is None:
+ timestamp = type_int(time_time() * 1000)
+ elif get_type(timestamp) != type_int:
+ raise TypeError(timestamp)
+ if not (key is None or get_type(key) in byte_like):
+ raise TypeError(
+ "Not supported type for key: {}".format(type(key)))
+ if not (value is None or get_type(value) in byte_like):
+ raise TypeError(
+ "Not supported type for value: {}".format(type(value)))
+
+ # We will always add the first message, so those will be set
+ if self._first_timestamp is None:
+ self._first_timestamp = timestamp
+ self._max_timestamp = timestamp
+ timestamp_delta = 0
+ first_message = 1
+ else:
+ timestamp_delta = timestamp - self._first_timestamp
+ first_message = 0
+
+ # We can't write record right away to out buffer, we need to
+ # precompute the length as first value...
+ message_buffer = bytearray_type(b"\x00") # Attributes
+ write_byte = message_buffer.append
+ write = message_buffer.extend
+
+ encode_varint(timestamp_delta, write_byte)
+ # Base offset is always 0 on Produce
+ encode_varint(offset, write_byte)
+
+ if key is not None:
+ encode_varint(len_func(key), write_byte)
+ write(key)
+ else:
+ write_byte(zero_len_varint)
+
+ if value is not None:
+ encode_varint(len_func(value), write_byte)
+ write(value)
+ else:
+ write_byte(zero_len_varint)
+
+ encode_varint(len_func(headers), write_byte)
+
+ for h_key, h_value in headers:
+ h_key = h_key.encode("utf-8")
+ encode_varint(len_func(h_key), write_byte)
+ write(h_key)
+ if h_value is not None:
+ encode_varint(len_func(h_value), write_byte)
+ write(h_value)
+ else:
+ write_byte(zero_len_varint)
+
+ message_len = len_func(message_buffer)
+ main_buffer = self._buffer
+
+ required_size = message_len + size_of_varint(message_len)
+ # Check if we can write this message
+ if (required_size + len_func(main_buffer) > self._batch_size and
+ not first_message):
+ return None
+
+ # Those should be updated after the length check
+ if self._max_timestamp < timestamp:
+ self._max_timestamp = timestamp
+ self._num_records += 1
+ self._last_offset = offset
+
+ encode_varint(message_len, main_buffer.append)
+ main_buffer.extend(message_buffer)
+
+ return DefaultRecordMetadata(offset, required_size, timestamp)
+
+ def write_header(self, use_compression_type=True):
+ batch_len = len(self._buffer)
+ self.HEADER_STRUCT.pack_into(
+ self._buffer, 0,
+ 0, # BaseOffset, set by broker
+ batch_len - self.AFTER_LEN_OFFSET, # Size from here to end
+ 0, # PartitionLeaderEpoch, set by broker
+ self._magic,
+ 0, # CRC will be set below, as we need a filled buffer for it
+ self._get_attributes(use_compression_type),
+ self._last_offset,
+ self._first_timestamp,
+ self._max_timestamp,
+ self._producer_id,
+ self._producer_epoch,
+ self._base_sequence,
+ self._num_records
+ )
+ crc = calc_crc32c(self._buffer[self.ATTRIBUTES_OFFSET:])
+ struct.pack_into(">I", self._buffer, self.CRC_OFFSET, crc)
+
+ def _maybe_compress(self):
+ if self._compression_type != self.CODEC_NONE:
+ header_size = self.HEADER_STRUCT.size
+ data = bytes(self._buffer[header_size:])
+ if self._compression_type == self.CODEC_GZIP:
+ compressed = gzip_encode(data)
+ elif self._compression_type == self.CODEC_SNAPPY:
+ compressed = snappy_encode(data)
+ elif self._compression_type == self.CODEC_LZ4:
+ compressed = lz4_encode(data)
+ compressed_size = len(compressed)
+ if len(data) <= compressed_size:
+ # We did not get any benefit from compression, lets send
+ # uncompressed
+ return False
+ else:
+ # Trim bytearray to the required size
+ needed_size = header_size + compressed_size
+ del self._buffer[needed_size:]
+ self._buffer[header_size:needed_size] = compressed
+ return True
+ return False
+
+ def build(self):
+ send_compressed = self._maybe_compress()
+ self.write_header(send_compressed)
+ return self._buffer
+
+ def size(self):
+ """ Return current size of data written to buffer
+ """
+ return len(self._buffer)
+
+ def size_in_bytes(self, offset, timestamp, key, value, headers):
+ if self._first_timestamp is not None:
+ timestamp_delta = timestamp - self._first_timestamp
+ else:
+ timestamp_delta = 0
+ size_of_body = (
+ 1 + # Attrs
+ size_of_varint(offset) +
+ size_of_varint(timestamp_delta) +
+ self.size_of(key, value, headers)
+ )
+ return size_of_body + size_of_varint(size_of_body)
+
+ @classmethod
+ def size_of(cls, key, value, headers):
+ size = 0
+ # Key size
+ if key is None:
+ size += 1
+ else:
+ key_len = len(key)
+ size += size_of_varint(key_len) + key_len
+ # Value size
+ if value is None:
+ size += 1
+ else:
+ value_len = len(value)
+ size += size_of_varint(value_len) + value_len
+ # Header size
+ size += size_of_varint(len(headers))
+ for h_key, h_value in headers:
+ h_key_len = len(h_key.encode("utf-8"))
+ size += size_of_varint(h_key_len) + h_key_len
+
+ if h_value is None:
+ size += 1
+ else:
+ h_value_len = len(h_value)
+ size += size_of_varint(h_value_len) + h_value_len
+ return size
+
+ @classmethod
+ def estimate_size_in_bytes(cls, key, value, headers):
+ """ Get the upper bound estimate on the size of record
+ """
+ return (
+ cls.HEADER_STRUCT.size + cls.MAX_RECORD_OVERHEAD +
+ cls.size_of(key, value, headers)
+ )
+
+
+class DefaultRecordMetadata(object):
+
+ __slots__ = ("_size", "_timestamp", "_offset")
+
+ def __init__(self, offset, size, timestamp):
+ self._offset = offset
+ self._size = size
+ self._timestamp = timestamp
+
+ @property
+ def offset(self):
+ return self._offset
+
+ @property
+ def crc(self):
+ return None
+
+ @property
+ def size(self):
+ return self._size
+
+ @property
+ def timestamp(self):
+ return self._timestamp
+
+ def __repr__(self):
+ return (
+ "DefaultRecordMetadata(offset={!r}, size={!r}, timestamp={!r})"
+ .format(self._offset, self._size, self._timestamp)
+ )
diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py
index 055914c..8c0791e 100644
--- a/kafka/record/legacy_records.py
+++ b/kafka/record/legacy_records.py
@@ -329,9 +329,10 @@ class LegacyRecordBatchBuilder(ABCRecordBatchBuilder, LegacyRecordBase):
self._batch_size = batch_size
self._buffer = bytearray()
- def append(self, offset, timestamp, key, value):
+ def append(self, offset, timestamp, key, value, headers=None):
""" Append message to batch.
"""
+ assert not headers, "Headers not supported in v0/v1"
# Check types
if type(offset) != int:
raise TypeError(offset)
diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py
index 4ed992c..56aa51f 100644
--- a/kafka/record/memory_records.py
+++ b/kafka/record/memory_records.py
@@ -24,6 +24,7 @@ import struct
from kafka.errors import CorruptRecordException
from .abc import ABCRecords
from .legacy_records import LegacyRecordBatch, LegacyRecordBatchBuilder
+from .default_records import DefaultRecordBatch, DefaultRecordBatchBuilder
class MemoryRecords(ABCRecords):
@@ -102,18 +103,24 @@ class MemoryRecords(ABCRecords):
magic, = struct.unpack_from(">b", next_slice, _magic_offset)
if magic <= 1:
return LegacyRecordBatch(next_slice, magic)
- else: # pragma: no cover
- raise NotImplementedError("Record V2 still not implemented")
+ else:
+ return DefaultRecordBatch(next_slice)
class MemoryRecordsBuilder(object):
def __init__(self, magic, compression_type, batch_size):
- assert magic in [0, 1], "Not supported magic"
+ assert magic in [0, 1, 2], "Not supported magic"
assert compression_type in [0, 1, 2, 3], "Not valid compression type"
- self._builder = LegacyRecordBatchBuilder(
- magic=magic, compression_type=compression_type,
- batch_size=batch_size)
+ if magic >= 2:
+ self._builder = DefaultRecordBatchBuilder(
+ magic=magic, compression_type=compression_type,
+ is_transactional=False, producer_id=-1, producer_epoch=-1,
+ base_sequence=-1, batch_size=batch_size)
+ else:
+ self._builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=compression_type,
+ batch_size=batch_size)
self._batch_size = batch_size
self._buffer = None
@@ -121,7 +128,7 @@ class MemoryRecordsBuilder(object):
self._closed = False
self._bytes_written = 0
- def append(self, timestamp, key, value):
+ def append(self, timestamp, key, value, headers=[]):
""" Append a message to the buffer.
Returns:
@@ -131,7 +138,7 @@ class MemoryRecordsBuilder(object):
return None, 0
offset = self._next_offset
- metadata = self._builder.append(offset, timestamp, key, value)
+ metadata = self._builder.append(offset, timestamp, key, value, headers)
# Return of 0 size means there's no space to add a new message
if metadata is None:
return None
diff --git a/kafka/record/util.py b/kafka/record/util.py
index 098d6f4..88135f1 100644
--- a/kafka/record/util.py
+++ b/kafka/record/util.py
@@ -1,5 +1,124 @@
import binascii
+from ._crc32c import crc as crc32c_py
+
+
+def encode_varint(value, write):
+ """ Encode an integer to a varint presentation. See
+ https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints
+ on how those can be produced.
+
+ Arguments:
+ value (int): Value to encode
+ write (function): Called per byte that needs to be writen
+
+ Returns:
+ int: Number of bytes written
+ """
+ value = (value << 1) ^ (value >> 63)
+
+ if value <= 0x7f: # 1 byte
+ write(value)
+ return 1
+ if value <= 0x3fff: # 2 bytes
+ write(0x80 | (value & 0x7f))
+ write(value >> 7)
+ return 2
+ if value <= 0x1fffff: # 3 bytes
+ write(0x80 | (value & 0x7f))
+ write(0x80 | ((value >> 7) & 0x7f))
+ write(value >> 14)
+ return 3
+ if value <= 0xfffffff: # 4 bytes
+ write(0x80 | (value & 0x7f))
+ write(0x80 | ((value >> 7) & 0x7f))
+ write(0x80 | ((value >> 14) & 0x7f))
+ write(value >> 21)
+ return 4
+ if value <= 0x7ffffffff: # 5 bytes
+ write(0x80 | (value & 0x7f))
+ write(0x80 | ((value >> 7) & 0x7f))
+ write(0x80 | ((value >> 14) & 0x7f))
+ write(0x80 | ((value >> 21) & 0x7f))
+ write(value >> 28)
+ return 5
+ else:
+ # Return to general algorithm
+ bits = value & 0x7f
+ value >>= 7
+ i = 0
+ while value:
+ write(0x80 | bits)
+ bits = value & 0x7f
+ value >>= 7
+ i += 1
+ write(bits)
+ return i
+
+
+def size_of_varint(value):
+ """ Number of bytes needed to encode an integer in variable-length format.
+ """
+ value = (value << 1) ^ (value >> 63)
+ if value <= 0x7f:
+ return 1
+ if value <= 0x3fff:
+ return 2
+ if value <= 0x1fffff:
+ return 3
+ if value <= 0xfffffff:
+ return 4
+ if value <= 0x7ffffffff:
+ return 5
+ if value <= 0x3ffffffffff:
+ return 6
+ if value <= 0x1ffffffffffff:
+ return 7
+ if value <= 0xffffffffffffff:
+ return 8
+ if value <= 0x7fffffffffffffff:
+ return 9
+ return 10
+
+
+def decode_varint(buffer, pos=0):
+ """ Decode an integer from a varint presentation. See
+ https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints
+ on how those can be produced.
+
+ Arguments:
+ buffer (bytearry): buffer to read from.
+ pos (int): optional position to read from
+
+ Returns:
+ (int, int): Decoded int value and next read position
+ """
+ result = buffer[pos]
+ if not (result & 0x81):
+ return (result >> 1), pos + 1
+ if not (result & 0x80):
+ return (result >> 1) ^ (~0), pos + 1
+
+ result &= 0x7f
+ pos += 1
+ shift = 7
+ while 1:
+ b = buffer[pos]
+ result |= ((b & 0x7f) << shift)
+ pos += 1
+ if not (b & 0x80):
+ return ((result >> 1) ^ -(result & 1), pos)
+ shift += 7
+ if shift >= 64:
+ raise ValueError("Out of int64 range")
+
+
+def calc_crc32c(memview):
+ """ Calculate CRC-32C (Castagnoli) checksum over a memoryview of data
+ """
+ crc = crc32c_py(memview)
+ return crc
+
def calc_crc32(memview):
""" Calculate simple CRC-32 checksum over a memoryview of data