diff options
-rw-r--r-- | benchmarks/record_batch_compose.py | 4 | ||||
-rw-r--r-- | benchmarks/record_batch_read.py | 4 | ||||
-rw-r--r-- | benchmarks/varint_speed.py | 443 | ||||
-rw-r--r-- | kafka/consumer/fetcher.py | 30 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 15 | ||||
-rw-r--r-- | kafka/producer/sender.py | 9 | ||||
-rw-r--r-- | kafka/record/README | 8 | ||||
-rw-r--r-- | kafka/record/_crc32c.py | 143 | ||||
-rw-r--r-- | kafka/record/abc.py | 10 | ||||
-rw-r--r-- | kafka/record/default_records.py | 595 | ||||
-rw-r--r-- | kafka/record/legacy_records.py | 3 | ||||
-rw-r--r-- | kafka/record/memory_records.py | 23 | ||||
-rw-r--r-- | kafka/record/util.py | 119 | ||||
-rw-r--r-- | test/record/test_default_records.py | 169 | ||||
-rw-r--r-- | test/record/test_records.py | 46 | ||||
-rw-r--r-- | test/record/test_util.py | 95 | ||||
-rw-r--r-- | test/test_fetcher.py | 2 | ||||
-rw-r--r-- | test/test_producer.py | 9 |
18 files changed, 1696 insertions, 31 deletions
diff --git a/benchmarks/record_batch_compose.py b/benchmarks/record_batch_compose.py index 86012df..aca669d 100644 --- a/benchmarks/record_batch_compose.py +++ b/benchmarks/record_batch_compose.py @@ -58,7 +58,8 @@ def func(loops, magic): magic, batch_size=DEFAULT_BATCH_SIZE, compression_type=0) for _ in range(MESSAGES_PER_BATCH): key, value, timestamp = next(precomputed_samples) - size = batch.append(timestamp=timestamp, key=key, value=value) + size = batch.append( + timestamp=timestamp, key=key, value=value) assert size batch.close() results.append(batch.buffer()) @@ -73,3 +74,4 @@ def func(loops, magic): runner = perf.Runner() runner.bench_time_func('batch_append_v0', func, 0) runner.bench_time_func('batch_append_v1', func, 1) +runner.bench_time_func('batch_append_v2', func, 2) diff --git a/benchmarks/record_batch_read.py b/benchmarks/record_batch_read.py index 7ae471e..fc01e42 100644 --- a/benchmarks/record_batch_read.py +++ b/benchmarks/record_batch_read.py @@ -35,7 +35,8 @@ def prepare(magic): size = batch.append( random.randint(*TIMESTAMP_RANGE), random_bytes(KEY_SIZE), - random_bytes(VALUE_SIZE)) + random_bytes(VALUE_SIZE), + headers=[]) assert size batch.close() samples.append(bytes(batch.buffer())) @@ -78,3 +79,4 @@ def func(loops, magic): runner = perf.Runner() runner.bench_time_func('batch_read_v0', func, 0) runner.bench_time_func('batch_read_v1', func, 1) +runner.bench_time_func('batch_read_v2', func, 2) diff --git a/benchmarks/varint_speed.py b/benchmarks/varint_speed.py new file mode 100644 index 0000000..2c5cd62 --- /dev/null +++ b/benchmarks/varint_speed.py @@ -0,0 +1,443 @@ +#!/usr/bin/env python +from __future__ import print_function +import perf +import six + + +test_data = [ + (b"\x00", 0), + (b"\x01", -1), + (b"\x02", 1), + (b"\x7E", 63), + (b"\x7F", -64), + (b"\x80\x01", 64), + (b"\x81\x01", -65), + (b"\xFE\x7F", 8191), + (b"\xFF\x7F", -8192), + (b"\x80\x80\x01", 8192), + (b"\x81\x80\x01", -8193), + (b"\xFE\xFF\x7F", 1048575), + (b"\xFF\xFF\x7F", -1048576), + (b"\x80\x80\x80\x01", 1048576), + (b"\x81\x80\x80\x01", -1048577), + (b"\xFE\xFF\xFF\x7F", 134217727), + (b"\xFF\xFF\xFF\x7F", -134217728), + (b"\x80\x80\x80\x80\x01", 134217728), + (b"\x81\x80\x80\x80\x01", -134217729), + (b"\xFE\xFF\xFF\xFF\x7F", 17179869183), + (b"\xFF\xFF\xFF\xFF\x7F", -17179869184), + (b"\x80\x80\x80\x80\x80\x01", 17179869184), + (b"\x81\x80\x80\x80\x80\x01", -17179869185), + (b"\xFE\xFF\xFF\xFF\xFF\x7F", 2199023255551), + (b"\xFF\xFF\xFF\xFF\xFF\x7F", -2199023255552), + (b"\x80\x80\x80\x80\x80\x80\x01", 2199023255552), + (b"\x81\x80\x80\x80\x80\x80\x01", -2199023255553), + (b"\xFE\xFF\xFF\xFF\xFF\xFF\x7F", 281474976710655), + (b"\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -281474976710656), + (b"\x80\x80\x80\x80\x80\x80\x80\x01", 281474976710656), + (b"\x81\x80\x80\x80\x80\x80\x80\x01", -281474976710657), + (b"\xFE\xFF\xFF\xFF\xFF\xFF\xFF\x7F", 36028797018963967), + (b"\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -36028797018963968), + (b"\x80\x80\x80\x80\x80\x80\x80\x80\x01", 36028797018963968), + (b"\x81\x80\x80\x80\x80\x80\x80\x80\x01", -36028797018963969), + (b"\xFE\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", 4611686018427387903), + (b"\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -4611686018427387904), + (b"\x80\x80\x80\x80\x80\x80\x80\x80\x80\x01", 4611686018427387904), + (b"\x81\x80\x80\x80\x80\x80\x80\x80\x80\x01", -4611686018427387905), +] + + +BENCH_VALUES_ENC = [ + 60, # 1 byte + -8192, # 2 bytes + 1048575, # 3 bytes + 134217727, # 4 bytes + -17179869184, # 5 bytes + 2199023255551, # 6 bytes +] + +BENCH_VALUES_DEC = [ + b"\x7E", # 1 byte + b"\xFF\x7F", # 2 bytes + b"\xFE\xFF\x7F", # 3 bytes + b"\xFF\xFF\xFF\x7F", # 4 bytes + b"\x80\x80\x80\x80\x01", # 5 bytes + b"\xFE\xFF\xFF\xFF\xFF\x7F", # 6 bytes +] +BENCH_VALUES_DEC = list(map(bytearray, BENCH_VALUES_DEC)) + + +def _assert_valid_enc(enc_func): + for encoded, decoded in test_data: + assert enc_func(decoded) == encoded, decoded + + +def _assert_valid_dec(dec_func): + for encoded, decoded in test_data: + res, pos = dec_func(bytearray(encoded)) + assert res == decoded, (decoded, res) + assert pos == len(encoded), (decoded, pos) + + +def _assert_valid_size(size_func): + for encoded, decoded in test_data: + assert size_func(decoded) == len(encoded), decoded + + +def encode_varint_1(num): + """ Encode an integer to a varint presentation. See + https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints + on how those can be produced. + + Arguments: + num (int): Value to encode + + Returns: + bytearray: Encoded presentation of integer with length from 1 to 10 + bytes + """ + # Shift sign to the end of number + num = (num << 1) ^ (num >> 63) + # Max 10 bytes. We assert those are allocated + buf = bytearray(10) + + for i in range(10): + # 7 lowest bits from the number and set 8th if we still have pending + # bits left to encode + buf[i] = num & 0x7f | (0x80 if num > 0x7f else 0) + num = num >> 7 + if num == 0: + break + else: + # Max size of endcoded double is 10 bytes for unsigned values + raise ValueError("Out of double range") + return buf[:i + 1] + +_assert_valid_enc(encode_varint_1) + + +def encode_varint_2(value, int2byte=six.int2byte): + value = (value << 1) ^ (value >> 63) + + bits = value & 0x7f + value >>= 7 + res = b"" + while value: + res += int2byte(0x80 | bits) + bits = value & 0x7f + value >>= 7 + return res + int2byte(bits) + +_assert_valid_enc(encode_varint_2) + + +def encode_varint_3(value, buf): + append = buf.append + value = (value << 1) ^ (value >> 63) + + bits = value & 0x7f + value >>= 7 + while value: + append(0x80 | bits) + bits = value & 0x7f + value >>= 7 + append(bits) + return value + + +for encoded, decoded in test_data: + res = bytearray() + encode_varint_3(decoded, res) + assert res == encoded + + +def encode_varint_4(value, int2byte=six.int2byte): + value = (value << 1) ^ (value >> 63) + + if value <= 0x7f: # 1 byte + return int2byte(value) + if value <= 0x3fff: # 2 bytes + return int2byte(0x80 | (value & 0x7f)) + int2byte(value >> 7) + if value <= 0x1fffff: # 3 bytes + return int2byte(0x80 | (value & 0x7f)) + \ + int2byte(0x80 | ((value >> 7) & 0x7f)) + \ + int2byte(value >> 14) + if value <= 0xfffffff: # 4 bytes + return int2byte(0x80 | (value & 0x7f)) + \ + int2byte(0x80 | ((value >> 7) & 0x7f)) + \ + int2byte(0x80 | ((value >> 14) & 0x7f)) + \ + int2byte(value >> 21) + if value <= 0x7ffffffff: # 5 bytes + return int2byte(0x80 | (value & 0x7f)) + \ + int2byte(0x80 | ((value >> 7) & 0x7f)) + \ + int2byte(0x80 | ((value >> 14) & 0x7f)) + \ + int2byte(0x80 | ((value >> 21) & 0x7f)) + \ + int2byte(value >> 28) + else: + # Return to general algorithm + bits = value & 0x7f + value >>= 7 + res = b"" + while value: + res += int2byte(0x80 | bits) + bits = value & 0x7f + value >>= 7 + return res + int2byte(bits) + + +_assert_valid_enc(encode_varint_4) + +# import dis +# dis.dis(encode_varint_4) + + +def encode_varint_5(value, buf, pos=0): + value = (value << 1) ^ (value >> 63) + + bits = value & 0x7f + value >>= 7 + while value: + buf[pos] = 0x80 | bits + bits = value & 0x7f + value >>= 7 + pos += 1 + buf[pos] = bits + return pos + 1 + +for encoded, decoded in test_data: + res = bytearray(10) + written = encode_varint_5(decoded, res) + assert res[:written] == encoded + + +def encode_varint_6(value, buf): + append = buf.append + value = (value << 1) ^ (value >> 63) + + if value <= 0x7f: # 1 byte + append(value) + return 1 + if value <= 0x3fff: # 2 bytes + append(0x80 | (value & 0x7f)) + append(value >> 7) + return 2 + if value <= 0x1fffff: # 3 bytes + append(0x80 | (value & 0x7f)) + append(0x80 | ((value >> 7) & 0x7f)) + append(value >> 14) + return 3 + if value <= 0xfffffff: # 4 bytes + append(0x80 | (value & 0x7f)) + append(0x80 | ((value >> 7) & 0x7f)) + append(0x80 | ((value >> 14) & 0x7f)) + append(value >> 21) + return 4 + if value <= 0x7ffffffff: # 5 bytes + append(0x80 | (value & 0x7f)) + append(0x80 | ((value >> 7) & 0x7f)) + append(0x80 | ((value >> 14) & 0x7f)) + append(0x80 | ((value >> 21) & 0x7f)) + append(value >> 28) + return 5 + else: + # Return to general algorithm + bits = value & 0x7f + value >>= 7 + i = 0 + while value: + append(0x80 | bits) + bits = value & 0x7f + value >>= 7 + i += 1 + append(bits) + return i + + +for encoded, decoded in test_data: + res = bytearray() + encode_varint_6(decoded, res) + assert res == encoded + + +def size_of_varint_1(value): + """ Number of bytes needed to encode an integer in variable-length format. + """ + value = (value << 1) ^ (value >> 63) + res = 0 + while True: + res += 1 + value = value >> 7 + if value == 0: + break + return res + +_assert_valid_size(size_of_varint_1) + + +def size_of_varint_2(value): + """ Number of bytes needed to encode an integer in variable-length format. + """ + value = (value << 1) ^ (value >> 63) + if value <= 0x7f: + return 1 + if value <= 0x3fff: + return 2 + if value <= 0x1fffff: + return 3 + if value <= 0xfffffff: + return 4 + if value <= 0x7ffffffff: + return 5 + if value <= 0x3ffffffffff: + return 6 + if value <= 0x1ffffffffffff: + return 7 + if value <= 0xffffffffffffff: + return 8 + if value <= 0x7fffffffffffffff: + return 9 + return 10 + +_assert_valid_size(size_of_varint_2) + + +if six.PY3: + def _read_byte(memview, pos): + """ Read a byte from memoryview as an integer + + Raises: + IndexError: if position is out of bounds + """ + return memview[pos] +else: + def _read_byte(memview, pos): + """ Read a byte from memoryview as an integer + + Raises: + IndexError: if position is out of bounds + """ + return ord(memview[pos]) + + +def decode_varint_1(buffer, pos=0): + """ Decode an integer from a varint presentation. See + https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints + on how those can be produced. + + Arguments: + buffer (bytes-like): any object acceptable by ``memoryview`` + pos (int): optional position to read from + + Returns: + (int, int): Decoded int value and next read position + """ + value = 0 + shift = 0 + memview = memoryview(buffer) + for i in range(pos, pos + 10): + try: + byte = _read_byte(memview, i) + except IndexError: + raise ValueError("End of byte stream") + if byte & 0x80 != 0: + value |= (byte & 0x7f) << shift + shift += 7 + else: + value |= byte << shift + break + else: + # Max size of endcoded double is 10 bytes for unsigned values + raise ValueError("Out of double range") + # Normalize sign + return (value >> 1) ^ -(value & 1), i + 1 + +_assert_valid_dec(decode_varint_1) + + +def decode_varint_2(buffer, pos=0): + result = 0 + shift = 0 + while 1: + b = buffer[pos] + result |= ((b & 0x7f) << shift) + pos += 1 + if not (b & 0x80): + # result = result_type(() & mask) + return ((result >> 1) ^ -(result & 1), pos) + shift += 7 + if shift >= 64: + raise ValueError("Out of int64 range") + + +_assert_valid_dec(decode_varint_2) + + +def decode_varint_3(buffer, pos=0): + result = buffer[pos] + if not (result & 0x81): + return (result >> 1), pos + 1 + if not (result & 0x80): + return (result >> 1) ^ (~0), pos + 1 + + result &= 0x7f + pos += 1 + shift = 7 + while 1: + b = buffer[pos] + result |= ((b & 0x7f) << shift) + pos += 1 + if not (b & 0x80): + return ((result >> 1) ^ -(result & 1), pos) + shift += 7 + if shift >= 64: + raise ValueError("Out of int64 range") + + +_assert_valid_dec(decode_varint_3) + +# import dis +# dis.dis(decode_varint_3) + +runner = perf.Runner() +# Encode algorithms returning a bytes result +for bench_func in [ + encode_varint_1, + encode_varint_2, + encode_varint_4]: + for i, value in enumerate(BENCH_VALUES_ENC): + runner.bench_func( + '{}_{}byte'.format(bench_func.__name__, i + 1), + bench_func, value) + +# Encode algorithms writing to the buffer +for bench_func in [ + encode_varint_3, + encode_varint_5, + encode_varint_6]: + for i, value in enumerate(BENCH_VALUES_ENC): + fname = bench_func.__name__ + runner.timeit( + '{}_{}byte'.format(fname, i + 1), + stmt="{}({}, buffer)".format(fname, value), + setup="from __main__ import {}; buffer = bytearray(10)".format( + fname) + ) + +# Size algorithms +for bench_func in [ + size_of_varint_1, + size_of_varint_2]: + for i, value in enumerate(BENCH_VALUES_ENC): + runner.bench_func( + '{}_{}byte'.format(bench_func.__name__, i + 1), + bench_func, value) + +# Decode algorithms +for bench_func in [ + decode_varint_1, + decode_varint_2, + decode_varint_3]: + for i, value in enumerate(BENCH_VALUES_DEC): + runner.bench_func( + '{}_{}byte'.format(bench_func.__name__, i + 1), + bench_func, value) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index d3ee26e..ddd7567 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -23,6 +23,10 @@ from kafka.structs import TopicPartition, OffsetAndTimestamp log = logging.getLogger(__name__) +# Isolation levels +READ_UNCOMMITTED = 0 +READ_COMMITTED = 1 + ConsumerRecord = collections.namedtuple("ConsumerRecord", ["topic", "partition", "offset", "timestamp", "timestamp_type", "key", "value", "checksum", "serialized_key_size", "serialized_value_size"]) @@ -114,6 +118,7 @@ class Fetcher(six.Iterator): self._iterator = None self._fetch_futures = collections.deque() self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix']) + self._isolation_level = READ_UNCOMMITTED def send_fetches(self): """Send FetchRequests for all assigned partitions that do not already have @@ -670,7 +675,9 @@ class Fetcher(six.Iterator): log.debug("Adding fetch request for partition %s at offset %d", partition, position) - if self.config['api_version'] >= (0, 10, 1): + if self.config['api_version'] >= (0, 11, 0): + version = 4 + elif self.config['api_version'] >= (0, 10, 1): version = 3 elif self.config['api_version'] >= (0, 10): version = 2 @@ -696,12 +703,21 @@ class Fetcher(six.Iterator): # dicts retain insert order. partition_data = list(partition_data.items()) random.shuffle(partition_data) - requests[node_id] = FetchRequest[version]( - -1, # replica_id - self.config['fetch_max_wait_ms'], - self.config['fetch_min_bytes'], - self.config['fetch_max_bytes'], - partition_data) + if version == 3: + requests[node_id] = FetchRequest[version]( + -1, # replica_id + self.config['fetch_max_wait_ms'], + self.config['fetch_min_bytes'], + self.config['fetch_max_bytes'], + partition_data) + else: + requests[node_id] = FetchRequest[version]( + -1, # replica_id + self.config['fetch_max_wait_ms'], + self.config['fetch_min_bytes'], + self.config['fetch_max_bytes'], + self._isolation_level, + partition_data) return requests def _handle_fetch_response(self, request, send_time, response): diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 0ffc29c..646e773 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -15,6 +15,7 @@ from ..client_async import KafkaClient, selectors from ..codec import has_gzip, has_snappy, has_lz4 from ..metrics import MetricConfig, Metrics from ..partitioner.default import DefaultPartitioner +from ..record.default_records import DefaultRecordBatchBuilder from ..record.legacy_records import LegacyRecordBatchBuilder from ..serializer import Serializer from ..structs import TopicPartition @@ -486,15 +487,21 @@ class KafkaProducer(object): return self._wait_on_metadata(topic, max_wait) def _max_usable_produce_magic(self): - if self.config['api_version'] >= (0, 10): + if self.config['api_version'] >= (0, 11): + return 2 + elif self.config['api_version'] >= (0, 10): return 1 else: return 0 - def _estimate_size_in_bytes(self, key, value): + def _estimate_size_in_bytes(self, key, value, headers=[]): magic = self._max_usable_produce_magic() - return LegacyRecordBatchBuilder.estimate_size_in_bytes( - magic, self.config['compression_type'], key, value) + if magic == 2: + return DefaultRecordBatchBuilder.estimate_size_in_bytes( + key, value, headers) + else: + return LegacyRecordBatchBuilder.estimate_size_in_bytes( + magic, self.config['compression_type'], key, value) def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None): """Publish a message to a topic. diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 72a15bb..ffc67f8 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -291,7 +291,11 @@ class Sender(threading.Thread): buf = batch.records.buffer() produce_records_by_partition[topic][partition] = buf - if self.config['api_version'] >= (0, 10): + kwargs = {} + if self.config['api_version'] >= (0, 11): + version = 3 + kwargs = dict(transactional_id=None) + elif self.config['api_version'] >= (0, 10): version = 2 elif self.config['api_version'] == (0, 9): version = 1 @@ -302,7 +306,8 @@ class Sender(threading.Thread): timeout=timeout, topics=[(topic, list(partition_info.items())) for topic, partition_info - in six.iteritems(produce_records_by_partition)] + in six.iteritems(produce_records_by_partition)], + **kwargs ) def wakeup(self): diff --git a/kafka/record/README b/kafka/record/README new file mode 100644 index 0000000..e445455 --- /dev/null +++ b/kafka/record/README @@ -0,0 +1,8 @@ +Module structured mostly based on +kafka/clients/src/main/java/org/apache/kafka/common/record/ module of Java +Client. + +See abc.py for abstract declarations. `ABCRecords` is used as a facade to hide +version differences. `ABCRecordBatch` subclasses will implement actual parsers +for different versions (v0/v1 as LegacyBatch and v2 as DefaultBatch. Names +taken from Java). diff --git a/kafka/record/_crc32c.py b/kafka/record/_crc32c.py new file mode 100644 index 0000000..5704f82 --- /dev/null +++ b/kafka/record/_crc32c.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python +# +# Taken from https://cloud.google.com/appengine/docs/standard/python/refdocs/\ +# modules/google/appengine/api/files/crc32c?hl=ru +# +# Copyright 2007 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Implementation of CRC-32C checksumming as in rfc3720 section B.4. +See http://en.wikipedia.org/wiki/Cyclic_redundancy_check for details on CRC-32C +This code is a manual python translation of c code generated by +pycrc 0.7.1 (http://www.tty1.net/pycrc/). Command line used: +'./pycrc.py --model=crc-32c --generate c --algorithm=table-driven' +""" + +import array + +CRC_TABLE = ( + 0x00000000, 0xf26b8303, 0xe13b70f7, 0x1350f3f4, + 0xc79a971f, 0x35f1141c, 0x26a1e7e8, 0xd4ca64eb, + 0x8ad958cf, 0x78b2dbcc, 0x6be22838, 0x9989ab3b, + 0x4d43cfd0, 0xbf284cd3, 0xac78bf27, 0x5e133c24, + 0x105ec76f, 0xe235446c, 0xf165b798, 0x030e349b, + 0xd7c45070, 0x25afd373, 0x36ff2087, 0xc494a384, + 0x9a879fa0, 0x68ec1ca3, 0x7bbcef57, 0x89d76c54, + 0x5d1d08bf, 0xaf768bbc, 0xbc267848, 0x4e4dfb4b, + 0x20bd8ede, 0xd2d60ddd, 0xc186fe29, 0x33ed7d2a, + 0xe72719c1, 0x154c9ac2, 0x061c6936, 0xf477ea35, + 0xaa64d611, 0x580f5512, 0x4b5fa6e6, 0xb93425e5, + 0x6dfe410e, 0x9f95c20d, 0x8cc531f9, 0x7eaeb2fa, + 0x30e349b1, 0xc288cab2, 0xd1d83946, 0x23b3ba45, + 0xf779deae, 0x05125dad, 0x1642ae59, 0xe4292d5a, + 0xba3a117e, 0x4851927d, 0x5b016189, 0xa96ae28a, + 0x7da08661, 0x8fcb0562, 0x9c9bf696, 0x6ef07595, + 0x417b1dbc, 0xb3109ebf, 0xa0406d4b, 0x522bee48, + 0x86e18aa3, 0x748a09a0, 0x67dafa54, 0x95b17957, + 0xcba24573, 0x39c9c670, 0x2a993584, 0xd8f2b687, + 0x0c38d26c, 0xfe53516f, 0xed03a29b, 0x1f682198, + 0x5125dad3, 0xa34e59d0, 0xb01eaa24, 0x42752927, + 0x96bf4dcc, 0x64d4cecf, 0x77843d3b, 0x85efbe38, + 0xdbfc821c, 0x2997011f, 0x3ac7f2eb, 0xc8ac71e8, + 0x1c661503, 0xee0d9600, 0xfd5d65f4, 0x0f36e6f7, + 0x61c69362, 0x93ad1061, 0x80fde395, 0x72966096, + 0xa65c047d, 0x5437877e, 0x4767748a, 0xb50cf789, + 0xeb1fcbad, 0x197448ae, 0x0a24bb5a, 0xf84f3859, + 0x2c855cb2, 0xdeeedfb1, 0xcdbe2c45, 0x3fd5af46, + 0x7198540d, 0x83f3d70e, 0x90a324fa, 0x62c8a7f9, + 0xb602c312, 0x44694011, 0x5739b3e5, 0xa55230e6, + 0xfb410cc2, 0x092a8fc1, 0x1a7a7c35, 0xe811ff36, + 0x3cdb9bdd, 0xceb018de, 0xdde0eb2a, 0x2f8b6829, + 0x82f63b78, 0x709db87b, 0x63cd4b8f, 0x91a6c88c, + 0x456cac67, 0xb7072f64, 0xa457dc90, 0x563c5f93, + 0x082f63b7, 0xfa44e0b4, 0xe9141340, 0x1b7f9043, + 0xcfb5f4a8, 0x3dde77ab, 0x2e8e845f, 0xdce5075c, + 0x92a8fc17, 0x60c37f14, 0x73938ce0, 0x81f80fe3, + 0x55326b08, 0xa759e80b, 0xb4091bff, 0x466298fc, + 0x1871a4d8, 0xea1a27db, 0xf94ad42f, 0x0b21572c, + 0xdfeb33c7, 0x2d80b0c4, 0x3ed04330, 0xccbbc033, + 0xa24bb5a6, 0x502036a5, 0x4370c551, 0xb11b4652, + 0x65d122b9, 0x97baa1ba, 0x84ea524e, 0x7681d14d, + 0x2892ed69, 0xdaf96e6a, 0xc9a99d9e, 0x3bc21e9d, + 0xef087a76, 0x1d63f975, 0x0e330a81, 0xfc588982, + 0xb21572c9, 0x407ef1ca, 0x532e023e, 0xa145813d, + 0x758fe5d6, 0x87e466d5, 0x94b49521, 0x66df1622, + 0x38cc2a06, 0xcaa7a905, 0xd9f75af1, 0x2b9cd9f2, + 0xff56bd19, 0x0d3d3e1a, 0x1e6dcdee, 0xec064eed, + 0xc38d26c4, 0x31e6a5c7, 0x22b65633, 0xd0ddd530, + 0x0417b1db, 0xf67c32d8, 0xe52cc12c, 0x1747422f, + 0x49547e0b, 0xbb3ffd08, 0xa86f0efc, 0x5a048dff, + 0x8ecee914, 0x7ca56a17, 0x6ff599e3, 0x9d9e1ae0, + 0xd3d3e1ab, 0x21b862a8, 0x32e8915c, 0xc083125f, + 0x144976b4, 0xe622f5b7, 0xf5720643, 0x07198540, + 0x590ab964, 0xab613a67, 0xb831c993, 0x4a5a4a90, + 0x9e902e7b, 0x6cfbad78, 0x7fab5e8c, 0x8dc0dd8f, + 0xe330a81a, 0x115b2b19, 0x020bd8ed, 0xf0605bee, + 0x24aa3f05, 0xd6c1bc06, 0xc5914ff2, 0x37faccf1, + 0x69e9f0d5, 0x9b8273d6, 0x88d28022, 0x7ab90321, + 0xae7367ca, 0x5c18e4c9, 0x4f48173d, 0xbd23943e, + 0xf36e6f75, 0x0105ec76, 0x12551f82, 0xe03e9c81, + 0x34f4f86a, 0xc69f7b69, 0xd5cf889d, 0x27a40b9e, + 0x79b737ba, 0x8bdcb4b9, 0x988c474d, 0x6ae7c44e, + 0xbe2da0a5, 0x4c4623a6, 0x5f16d052, 0xad7d5351, +) + +CRC_INIT = 0 +_MASK = 0xFFFFFFFF + + +def crc_update(crc, data): + """Update CRC-32C checksum with data. + Args: + crc: 32-bit checksum to update as long. + data: byte array, string or iterable over bytes. + Returns: + 32-bit updated CRC-32C as long. + """ + if type(data) != array.array or data.itemsize != 1: + buf = array.array("B", data) + else: + buf = data + crc = crc ^ _MASK + for b in buf: + table_index = (crc ^ b) & 0xff + crc = (CRC_TABLE[table_index] ^ (crc >> 8)) & _MASK + return crc ^ _MASK + + +def crc_finalize(crc): + """Finalize CRC-32C checksum. + This function should be called as last step of crc calculation. + Args: + crc: 32-bit checksum as long. + Returns: + finalized 32-bit checksum as long + """ + return crc & _MASK + + +def crc(data): + """Compute CRC-32C checksum of the data. + Args: + data: byte array, string or iterable over bytes. + Returns: + 32-bit CRC-32C checksum of data as long. + """ + return crc_finalize(crc_update(CRC_INIT, data)) + + +if __name__ == "__main__": + import sys + data = sys.stdin.read() + print(hex(crc(data))) diff --git a/kafka/record/abc.py b/kafka/record/abc.py index 8a27276..83121c6 100644 --- a/kafka/record/abc.py +++ b/kafka/record/abc.py @@ -36,12 +36,18 @@ class ABCRecord(object): be the checksum for v0 and v1 and None for v2 and above. """ + @abc.abstractproperty + def headers(self): + """ If supported by version list of key-value tuples, or empty list if + not supported by format. + """ + class ABCRecordBatchBuilder(object): __metaclass__ = abc.ABCMeta @abc.abstractmethod - def append(self, offset, timestamp, key, value): + def append(self, offset, timestamp, key, value, headers=None): """ Writes record to internal buffer. Arguments: @@ -51,6 +57,8 @@ class ABCRecordBatchBuilder(object): set to current time. key (bytes or None): Key of the record value (bytes or None): Value of the record + headers (List[Tuple[str, bytes]]): Headers of the record. Header + keys can not be ``None``. Returns: (bytes, int): Checksum of the written record (or None for v2 and diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py new file mode 100644 index 0000000..3d517af --- /dev/null +++ b/kafka/record/default_records.py @@ -0,0 +1,595 @@ +# See: +# https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/\ +# apache/kafka/common/record/DefaultRecordBatch.java +# https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/\ +# apache/kafka/common/record/DefaultRecord.java + +# RecordBatch and Record implementation for magic 2 and above. +# The schema is given below: + +# RecordBatch => +# BaseOffset => Int64 +# Length => Int32 +# PartitionLeaderEpoch => Int32 +# Magic => Int8 +# CRC => Uint32 +# Attributes => Int16 +# LastOffsetDelta => Int32 // also serves as LastSequenceDelta +# FirstTimestamp => Int64 +# MaxTimestamp => Int64 +# ProducerId => Int64 +# ProducerEpoch => Int16 +# BaseSequence => Int32 +# Records => [Record] + +# Record => +# Length => Varint +# Attributes => Int8 +# TimestampDelta => Varlong +# OffsetDelta => Varint +# Key => Bytes +# Value => Bytes +# Headers => [HeaderKey HeaderValue] +# HeaderKey => String +# HeaderValue => Bytes + +# Note that when compression is enabled (see attributes below), the compressed +# record data is serialized directly following the count of the number of +# records. (ie Records => [Record], but without length bytes) + +# The CRC covers the data from the attributes to the end of the batch (i.e. all +# the bytes that follow the CRC). It is located after the magic byte, which +# means that clients must parse the magic byte before deciding how to interpret +# the bytes between the batch length and the magic byte. The partition leader +# epoch field is not included in the CRC computation to avoid the need to +# recompute the CRC when this field is assigned for every batch that is +# received by the broker. The CRC-32C (Castagnoli) polynomial is used for the +# computation. + +# The current RecordBatch attributes are given below: +# +# * Unused (6-15) +# * Control (5) +# * Transactional (4) +# * Timestamp Type (3) +# * Compression Type (0-2) + +import io +import struct +import time +from .abc import ABCRecord, ABCRecordBatch, ABCRecordBatchBuilder +from .util import decode_varint, encode_varint, calc_crc32c, size_of_varint + +from kafka.errors import CorruptRecordException +from kafka.codec import ( + gzip_encode, snappy_encode, lz4_encode, + gzip_decode, snappy_decode, lz4_decode +) + + +class DefaultRecordBase(object): + + HEADER_STRUCT = struct.Struct( + ">q" # BaseOffset => Int64 + "i" # Length => Int32 + "i" # PartitionLeaderEpoch => Int32 + "b" # Magic => Int8 + "I" # CRC => Uint32 + "h" # Attributes => Int16 + "i" # LastOffsetDelta => Int32 // also serves as LastSequenceDelta + "q" # FirstTimestamp => Int64 + "q" # MaxTimestamp => Int64 + "q" # ProducerId => Int64 + "h" # ProducerEpoch => Int16 + "i" # BaseSequence => Int32 + "i" # Records count => Int32 + ) + # Byte offset in HEADER_STRUCT of attributes field. Used to calculate CRC + ATTRIBUTES_OFFSET = struct.calcsize(">qiibI") + CRC_OFFSET = struct.calcsize(">qiib") + AFTER_LEN_OFFSET = struct.calcsize(">qi") + + CODEC_MASK = 0x07 + CODEC_NONE = 0x00 + CODEC_GZIP = 0x01 + CODEC_SNAPPY = 0x02 + CODEC_LZ4 = 0x03 + TIMESTAMP_TYPE_MASK = 0x08 + TRANSACTIONAL_MASK = 0x10 + CONTROL_MASK = 0x20 + + LOG_APPEND_TIME = 1 + CREATE_TIME = 0 + + +class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch): + + def __init__(self, buffer): + self._buffer = bytearray(buffer) + self._header_data = self.HEADER_STRUCT.unpack_from(self._buffer) + self._pos = self.HEADER_STRUCT.size + self._num_records = self._header_data[12] + self._next_record_index = 0 + self._decompressed = False + + @property + def base_offset(self): + return self._header_data[0] + + @property + def magic(self): + return self._header_data[3] + + @property + def crc(self): + return self._header_data[4] + + @property + def attributes(self): + return self._header_data[5] + + @property + def compression_type(self): + return self.attributes & self.CODEC_MASK + + @property + def timestamp_type(self): + return int(bool(self.attributes & self.TIMESTAMP_TYPE_MASK)) + + @property + def is_transactional(self): + return bool(self.attributes & self.TRANSACTIONAL_MASK) + + @property + def is_control_batch(self): + return bool(self.attributes & self.CONTROL_MASK) + + @property + def first_timestamp(self): + return self._header_data[7] + + @property + def max_timestamp(self): + return self._header_data[8] + + def _maybe_uncompress(self): + if not self._decompressed: + compression_type = self.compression_type + if compression_type != self.CODEC_NONE: + data = memoryview(self._buffer)[self._pos:] + if compression_type == self.CODEC_GZIP: + uncompressed = gzip_decode(data) + if compression_type == self.CODEC_SNAPPY: + uncompressed = snappy_decode(data.tobytes()) + if compression_type == self.CODEC_LZ4: + uncompressed = lz4_decode(data.tobytes()) + self._buffer = bytearray(uncompressed) + self._pos = 0 + self._decompressed = True + + def _read_msg( + self, + decode_varint=decode_varint): + # Record => + # Length => Varint + # Attributes => Int8 + # TimestampDelta => Varlong + # OffsetDelta => Varint + # Key => Bytes + # Value => Bytes + # Headers => [HeaderKey HeaderValue] + # HeaderKey => String + # HeaderValue => Bytes + + buffer = self._buffer + pos = self._pos + length, pos = decode_varint(buffer, pos) + start_pos = pos + _, pos = decode_varint(buffer, pos) # attrs can be skipped for now + + ts_delta, pos = decode_varint(buffer, pos) + if self.timestamp_type == self.LOG_APPEND_TIME: + timestamp = self.max_timestamp + else: + timestamp = self.first_timestamp + ts_delta + + offset_delta, pos = decode_varint(buffer, pos) + offset = self.base_offset + offset_delta + + key_len, pos = decode_varint(buffer, pos) + if key_len >= 0: + key = bytes(buffer[pos: pos + key_len]) + pos += key_len + else: + key = None + + value_len, pos = decode_varint(buffer, pos) + if value_len >= 0: + value = bytes(buffer[pos: pos + value_len]) + pos += value_len + else: + value = None + + header_count, pos = decode_varint(buffer, pos) + if header_count < 0: + raise CorruptRecordException("Found invalid number of record " + "headers {}".format(header_count)) + headers = [] + while header_count: + # Header key is of type String, that can't be None + h_key_len, pos = decode_varint(buffer, pos) + if h_key_len < 0: + raise CorruptRecordException( + "Invalid negative header key size {}".format(h_key_len)) + h_key = buffer[pos: pos + h_key_len].decode("utf-8") + pos += h_key_len + + # Value is of type NULLABLE_BYTES, so it can be None + h_value_len, pos = decode_varint(buffer, pos) + if h_value_len >= 0: + h_value = bytes(buffer[pos: pos + h_value_len]) + pos += h_value_len + else: + h_value = None + + headers.append((h_key, h_value)) + header_count -= 1 + + # validate whether we have read all header bytes in the current record + if pos - start_pos != length: + CorruptRecordException( + "Invalid record size: expected to read {} bytes in record " + "payload, but instead read {}".format(length, pos - start_pos)) + self._pos = pos + + return DefaultRecord( + offset, timestamp, self.timestamp_type, key, value, headers) + + def __iter__(self): + self._maybe_uncompress() + return self + + def __next__(self): + if self._next_record_index >= self._num_records: + if self._pos != len(self._buffer): + raise CorruptRecordException( + "{} unconsumed bytes after all records consumed".format( + len(self._buffer) - self._pos)) + raise StopIteration + try: + msg = self._read_msg() + except (ValueError, IndexError) as err: + raise CorruptRecordException( + "Found invalid record structure: {!r}".format(err)) + else: + self._next_record_index += 1 + return msg + + next = __next__ + + def validate_crc(self): + assert self._decompressed is False, \ + "Validate should be called before iteration" + + crc = self.crc + data_view = memoryview(self._buffer)[self.ATTRIBUTES_OFFSET:] + verify_crc = calc_crc32c(data_view.tobytes()) + return crc == verify_crc + + +class DefaultRecord(ABCRecord): + + __slots__ = ("_offset", "_timestamp", "_timestamp_type", "_key", "_value", + "_headers") + + def __init__(self, offset, timestamp, timestamp_type, key, value, headers): + self._offset = offset + self._timestamp = timestamp + self._timestamp_type = timestamp_type + self._key = key + self._value = value + self._headers = headers + + @property + def offset(self): + return self._offset + + @property + def timestamp(self): + """ Epoch milliseconds + """ + return self._timestamp + + @property + def timestamp_type(self): + """ CREATE_TIME(0) or APPEND_TIME(1) + """ + return self._timestamp_type + + @property + def key(self): + """ Bytes key or None + """ + return self._key + + @property + def value(self): + """ Bytes value or None + """ + return self._value + + @property + def headers(self): + return self._headers + + @property + def checksum(self): + return None + + def __repr__(self): + return ( + "DefaultRecord(offset={!r}, timestamp={!r}, timestamp_type={!r}," + " key={!r}, value={!r}, headers={!r})".format( + self._offset, self._timestamp, self._timestamp_type, + self._key, self._value, self._headers) + ) + + +class DefaultRecordBatchBuilder(DefaultRecordBase, ABCRecordBatchBuilder): + + # excluding key, value and headers: + # 5 bytes length + 10 bytes timestamp + 5 bytes offset + 1 byte attributes + MAX_RECORD_OVERHEAD = 21 + + def __init__( + self, magic, compression_type, is_transactional, + producer_id, producer_epoch, base_sequence, batch_size): + assert magic >= 2 + self._magic = magic + self._compression_type = compression_type & self.CODEC_MASK + self._batch_size = batch_size + self._is_transactional = bool(is_transactional) + # KIP-98 fields for EOS + self._producer_id = producer_id + self._producer_epoch = producer_epoch + self._base_sequence = base_sequence + + self._first_timestamp = None + self._max_timestamp = None + self._last_offset = 0 + self._num_records = 0 + + self._buffer = bytearray(self.HEADER_STRUCT.size) + + def _get_attributes(self, include_compression_type=True): + attrs = 0 + if include_compression_type: + attrs |= self._compression_type + # Timestamp Type is set by Broker + if self._is_transactional: + attrs |= self.TRANSACTIONAL_MASK + # Control batches are only created by Broker + return attrs + + def append(self, offset, timestamp, key, value, headers, + # Cache for LOAD_FAST opcodes + encode_varint=encode_varint, size_of_varint=size_of_varint, + get_type=type, type_int=int, time_time=time.time, + byte_like=(bytes, bytearray, memoryview), + bytearray_type=bytearray, len_func=len, zero_len_varint=1 + ): + """ Write message to messageset buffer with MsgVersion 2 + """ + # Check types + if get_type(offset) != type_int: + raise TypeError(offset) + if timestamp is None: + timestamp = type_int(time_time() * 1000) + elif get_type(timestamp) != type_int: + raise TypeError(timestamp) + if not (key is None or get_type(key) in byte_like): + raise TypeError( + "Not supported type for key: {}".format(type(key))) + if not (value is None or get_type(value) in byte_like): + raise TypeError( + "Not supported type for value: {}".format(type(value))) + + # We will always add the first message, so those will be set + if self._first_timestamp is None: + self._first_timestamp = timestamp + self._max_timestamp = timestamp + timestamp_delta = 0 + first_message = 1 + else: + timestamp_delta = timestamp - self._first_timestamp + first_message = 0 + + # We can't write record right away to out buffer, we need to + # precompute the length as first value... + message_buffer = bytearray_type(b"\x00") # Attributes + write_byte = message_buffer.append + write = message_buffer.extend + + encode_varint(timestamp_delta, write_byte) + # Base offset is always 0 on Produce + encode_varint(offset, write_byte) + + if key is not None: + encode_varint(len_func(key), write_byte) + write(key) + else: + write_byte(zero_len_varint) + + if value is not None: + encode_varint(len_func(value), write_byte) + write(value) + else: + write_byte(zero_len_varint) + + encode_varint(len_func(headers), write_byte) + + for h_key, h_value in headers: + h_key = h_key.encode("utf-8") + encode_varint(len_func(h_key), write_byte) + write(h_key) + if h_value is not None: + encode_varint(len_func(h_value), write_byte) + write(h_value) + else: + write_byte(zero_len_varint) + + message_len = len_func(message_buffer) + main_buffer = self._buffer + + required_size = message_len + size_of_varint(message_len) + # Check if we can write this message + if (required_size + len_func(main_buffer) > self._batch_size and + not first_message): + return None + + # Those should be updated after the length check + if self._max_timestamp < timestamp: + self._max_timestamp = timestamp + self._num_records += 1 + self._last_offset = offset + + encode_varint(message_len, main_buffer.append) + main_buffer.extend(message_buffer) + + return DefaultRecordMetadata(offset, required_size, timestamp) + + def write_header(self, use_compression_type=True): + batch_len = len(self._buffer) + self.HEADER_STRUCT.pack_into( + self._buffer, 0, + 0, # BaseOffset, set by broker + batch_len - self.AFTER_LEN_OFFSET, # Size from here to end + 0, # PartitionLeaderEpoch, set by broker + self._magic, + 0, # CRC will be set below, as we need a filled buffer for it + self._get_attributes(use_compression_type), + self._last_offset, + self._first_timestamp, + self._max_timestamp, + self._producer_id, + self._producer_epoch, + self._base_sequence, + self._num_records + ) + crc = calc_crc32c(self._buffer[self.ATTRIBUTES_OFFSET:]) + struct.pack_into(">I", self._buffer, self.CRC_OFFSET, crc) + + def _maybe_compress(self): + if self._compression_type != self.CODEC_NONE: + header_size = self.HEADER_STRUCT.size + data = bytes(self._buffer[header_size:]) + if self._compression_type == self.CODEC_GZIP: + compressed = gzip_encode(data) + elif self._compression_type == self.CODEC_SNAPPY: + compressed = snappy_encode(data) + elif self._compression_type == self.CODEC_LZ4: + compressed = lz4_encode(data) + compressed_size = len(compressed) + if len(data) <= compressed_size: + # We did not get any benefit from compression, lets send + # uncompressed + return False + else: + # Trim bytearray to the required size + needed_size = header_size + compressed_size + del self._buffer[needed_size:] + self._buffer[header_size:needed_size] = compressed + return True + return False + + def build(self): + send_compressed = self._maybe_compress() + self.write_header(send_compressed) + return self._buffer + + def size(self): + """ Return current size of data written to buffer + """ + return len(self._buffer) + + def size_in_bytes(self, offset, timestamp, key, value, headers): + if self._first_timestamp is not None: + timestamp_delta = timestamp - self._first_timestamp + else: + timestamp_delta = 0 + size_of_body = ( + 1 + # Attrs + size_of_varint(offset) + + size_of_varint(timestamp_delta) + + self.size_of(key, value, headers) + ) + return size_of_body + size_of_varint(size_of_body) + + @classmethod + def size_of(cls, key, value, headers): + size = 0 + # Key size + if key is None: + size += 1 + else: + key_len = len(key) + size += size_of_varint(key_len) + key_len + # Value size + if value is None: + size += 1 + else: + value_len = len(value) + size += size_of_varint(value_len) + value_len + # Header size + size += size_of_varint(len(headers)) + for h_key, h_value in headers: + h_key_len = len(h_key.encode("utf-8")) + size += size_of_varint(h_key_len) + h_key_len + + if h_value is None: + size += 1 + else: + h_value_len = len(h_value) + size += size_of_varint(h_value_len) + h_value_len + return size + + @classmethod + def estimate_size_in_bytes(cls, key, value, headers): + """ Get the upper bound estimate on the size of record + """ + return ( + cls.HEADER_STRUCT.size + cls.MAX_RECORD_OVERHEAD + + cls.size_of(key, value, headers) + ) + + +class DefaultRecordMetadata(object): + + __slots__ = ("_size", "_timestamp", "_offset") + + def __init__(self, offset, size, timestamp): + self._offset = offset + self._size = size + self._timestamp = timestamp + + @property + def offset(self): + return self._offset + + @property + def crc(self): + return None + + @property + def size(self): + return self._size + + @property + def timestamp(self): + return self._timestamp + + def __repr__(self): + return ( + "DefaultRecordMetadata(offset={!r}, size={!r}, timestamp={!r})" + .format(self._offset, self._size, self._timestamp) + ) diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py index 055914c..8c0791e 100644 --- a/kafka/record/legacy_records.py +++ b/kafka/record/legacy_records.py @@ -329,9 +329,10 @@ class LegacyRecordBatchBuilder(ABCRecordBatchBuilder, LegacyRecordBase): self._batch_size = batch_size self._buffer = bytearray() - def append(self, offset, timestamp, key, value): + def append(self, offset, timestamp, key, value, headers=None): """ Append message to batch. """ + assert not headers, "Headers not supported in v0/v1" # Check types if type(offset) != int: raise TypeError(offset) diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py index 4ed992c..56aa51f 100644 --- a/kafka/record/memory_records.py +++ b/kafka/record/memory_records.py @@ -24,6 +24,7 @@ import struct from kafka.errors import CorruptRecordException from .abc import ABCRecords from .legacy_records import LegacyRecordBatch, LegacyRecordBatchBuilder +from .default_records import DefaultRecordBatch, DefaultRecordBatchBuilder class MemoryRecords(ABCRecords): @@ -102,18 +103,24 @@ class MemoryRecords(ABCRecords): magic, = struct.unpack_from(">b", next_slice, _magic_offset) if magic <= 1: return LegacyRecordBatch(next_slice, magic) - else: # pragma: no cover - raise NotImplementedError("Record V2 still not implemented") + else: + return DefaultRecordBatch(next_slice) class MemoryRecordsBuilder(object): def __init__(self, magic, compression_type, batch_size): - assert magic in [0, 1], "Not supported magic" + assert magic in [0, 1, 2], "Not supported magic" assert compression_type in [0, 1, 2, 3], "Not valid compression type" - self._builder = LegacyRecordBatchBuilder( - magic=magic, compression_type=compression_type, - batch_size=batch_size) + if magic >= 2: + self._builder = DefaultRecordBatchBuilder( + magic=magic, compression_type=compression_type, + is_transactional=False, producer_id=-1, producer_epoch=-1, + base_sequence=-1, batch_size=batch_size) + else: + self._builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=compression_type, + batch_size=batch_size) self._batch_size = batch_size self._buffer = None @@ -121,7 +128,7 @@ class MemoryRecordsBuilder(object): self._closed = False self._bytes_written = 0 - def append(self, timestamp, key, value): + def append(self, timestamp, key, value, headers=[]): """ Append a message to the buffer. Returns: @@ -131,7 +138,7 @@ class MemoryRecordsBuilder(object): return None, 0 offset = self._next_offset - metadata = self._builder.append(offset, timestamp, key, value) + metadata = self._builder.append(offset, timestamp, key, value, headers) # Return of 0 size means there's no space to add a new message if metadata is None: return None diff --git a/kafka/record/util.py b/kafka/record/util.py index 098d6f4..88135f1 100644 --- a/kafka/record/util.py +++ b/kafka/record/util.py @@ -1,5 +1,124 @@ import binascii +from ._crc32c import crc as crc32c_py + + +def encode_varint(value, write): + """ Encode an integer to a varint presentation. See + https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints + on how those can be produced. + + Arguments: + value (int): Value to encode + write (function): Called per byte that needs to be writen + + Returns: + int: Number of bytes written + """ + value = (value << 1) ^ (value >> 63) + + if value <= 0x7f: # 1 byte + write(value) + return 1 + if value <= 0x3fff: # 2 bytes + write(0x80 | (value & 0x7f)) + write(value >> 7) + return 2 + if value <= 0x1fffff: # 3 bytes + write(0x80 | (value & 0x7f)) + write(0x80 | ((value >> 7) & 0x7f)) + write(value >> 14) + return 3 + if value <= 0xfffffff: # 4 bytes + write(0x80 | (value & 0x7f)) + write(0x80 | ((value >> 7) & 0x7f)) + write(0x80 | ((value >> 14) & 0x7f)) + write(value >> 21) + return 4 + if value <= 0x7ffffffff: # 5 bytes + write(0x80 | (value & 0x7f)) + write(0x80 | ((value >> 7) & 0x7f)) + write(0x80 | ((value >> 14) & 0x7f)) + write(0x80 | ((value >> 21) & 0x7f)) + write(value >> 28) + return 5 + else: + # Return to general algorithm + bits = value & 0x7f + value >>= 7 + i = 0 + while value: + write(0x80 | bits) + bits = value & 0x7f + value >>= 7 + i += 1 + write(bits) + return i + + +def size_of_varint(value): + """ Number of bytes needed to encode an integer in variable-length format. + """ + value = (value << 1) ^ (value >> 63) + if value <= 0x7f: + return 1 + if value <= 0x3fff: + return 2 + if value <= 0x1fffff: + return 3 + if value <= 0xfffffff: + return 4 + if value <= 0x7ffffffff: + return 5 + if value <= 0x3ffffffffff: + return 6 + if value <= 0x1ffffffffffff: + return 7 + if value <= 0xffffffffffffff: + return 8 + if value <= 0x7fffffffffffffff: + return 9 + return 10 + + +def decode_varint(buffer, pos=0): + """ Decode an integer from a varint presentation. See + https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints + on how those can be produced. + + Arguments: + buffer (bytearry): buffer to read from. + pos (int): optional position to read from + + Returns: + (int, int): Decoded int value and next read position + """ + result = buffer[pos] + if not (result & 0x81): + return (result >> 1), pos + 1 + if not (result & 0x80): + return (result >> 1) ^ (~0), pos + 1 + + result &= 0x7f + pos += 1 + shift = 7 + while 1: + b = buffer[pos] + result |= ((b & 0x7f) << shift) + pos += 1 + if not (b & 0x80): + return ((result >> 1) ^ -(result & 1), pos) + shift += 7 + if shift >= 64: + raise ValueError("Out of int64 range") + + +def calc_crc32c(memview): + """ Calculate CRC-32C (Castagnoli) checksum over a memoryview of data + """ + crc = crc32c_py(memview) + return crc + def calc_crc32(memview): """ Calculate simple CRC-32 checksum over a memoryview of data diff --git a/test/record/test_default_records.py b/test/record/test_default_records.py new file mode 100644 index 0000000..193703e --- /dev/null +++ b/test/record/test_default_records.py @@ -0,0 +1,169 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals +import pytest +from kafka.record.default_records import ( + DefaultRecordBatch, DefaultRecordBatchBuilder +) + + +@pytest.mark.parametrize("compression_type", [ + DefaultRecordBatch.CODEC_NONE, + DefaultRecordBatch.CODEC_GZIP, + DefaultRecordBatch.CODEC_SNAPPY, + DefaultRecordBatch.CODEC_LZ4 +]) +def test_read_write_serde_v2(compression_type): + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=compression_type, is_transactional=1, + producer_id=123456, producer_epoch=123, base_sequence=9999, + batch_size=999999) + headers = [] # [("header1", b"aaa"), ("header2", b"bbb")] + for offset in range(10): + builder.append( + offset, timestamp=9999999, key=b"test", value=b"Super", + headers=headers) + buffer = builder.build() + reader = DefaultRecordBatch(bytes(buffer)) + msgs = list(reader) + + assert reader.is_transactional is True + assert reader.compression_type == compression_type + assert reader.magic == 2 + assert reader.timestamp_type == 0 + assert reader.base_offset == 0 + for offset, msg in enumerate(msgs): + assert msg.offset == offset + assert msg.timestamp == 9999999 + assert msg.key == b"test" + assert msg.value == b"Super" + assert msg.headers == headers + + +def test_written_bytes_equals_size_in_bytes_v2(): + key = b"test" + value = b"Super" + headers = [("header1", b"aaa"), ("header2", b"bbb"), ("xx", None)] + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=0, is_transactional=0, + producer_id=-1, producer_epoch=-1, base_sequence=-1, + batch_size=999999) + + size_in_bytes = builder.size_in_bytes( + 0, timestamp=9999999, key=key, value=value, headers=headers) + + pos = builder.size() + meta = builder.append( + 0, timestamp=9999999, key=key, value=value, headers=headers) + + assert builder.size() - pos == size_in_bytes + assert meta.size == size_in_bytes + + +def test_estimate_size_in_bytes_bigger_than_batch_v2(): + key = b"Super Key" + value = b"1" * 100 + headers = [("header1", b"aaa"), ("header2", b"bbb")] + estimate_size = DefaultRecordBatchBuilder.estimate_size_in_bytes( + key, value, headers) + + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=0, is_transactional=0, + producer_id=-1, producer_epoch=-1, base_sequence=-1, + batch_size=999999) + builder.append( + 0, timestamp=9999999, key=key, value=value, headers=headers) + buf = builder.build() + assert len(buf) <= estimate_size, \ + "Estimate should always be upper bound" + + +def test_default_batch_builder_validates_arguments(): + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=0, is_transactional=0, + producer_id=-1, producer_epoch=-1, base_sequence=-1, + batch_size=999999) + + # Key should not be str + with pytest.raises(TypeError): + builder.append( + 0, timestamp=9999999, key="some string", value=None, headers=[]) + + # Value should not be str + with pytest.raises(TypeError): + builder.append( + 0, timestamp=9999999, key=None, value="some string", headers=[]) + + # Timestamp should be of proper type + with pytest.raises(TypeError): + builder.append( + 0, timestamp="1243812793", key=None, value=b"some string", + headers=[]) + + # Offset of invalid type + with pytest.raises(TypeError): + builder.append( + "0", timestamp=9999999, key=None, value=b"some string", headers=[]) + + # Ok to pass value as None + builder.append( + 0, timestamp=9999999, key=b"123", value=None, headers=[]) + + # Timestamp can be None + builder.append( + 1, timestamp=None, key=None, value=b"some string", headers=[]) + + # Ok to pass offsets in not incremental order. This should not happen thou + builder.append( + 5, timestamp=9999999, key=b"123", value=None, headers=[]) + + # in case error handling code fails to fix inner buffer in builder + assert len(builder.build()) == 104 + + +def test_default_correct_metadata_response(): + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=0, is_transactional=0, + producer_id=-1, producer_epoch=-1, base_sequence=-1, + batch_size=1024 * 1024) + meta = builder.append( + 0, timestamp=9999999, key=b"test", value=b"Super", headers=[]) + + assert meta.offset == 0 + assert meta.timestamp == 9999999 + assert meta.crc is None + assert meta.size == 16 + assert repr(meta) == ( + "DefaultRecordMetadata(offset=0, size={}, timestamp={})" + .format(meta.size, meta.timestamp) + ) + + +def test_default_batch_size_limit(): + # First message can be added even if it's too big + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=0, is_transactional=0, + producer_id=-1, producer_epoch=-1, base_sequence=-1, + batch_size=1024) + + meta = builder.append( + 0, timestamp=None, key=None, value=b"M" * 2000, headers=[]) + assert meta.size > 0 + assert meta.crc is None + assert meta.offset == 0 + assert meta.timestamp is not None + assert len(builder.build()) > 2000 + + builder = DefaultRecordBatchBuilder( + magic=2, compression_type=0, is_transactional=0, + producer_id=-1, producer_epoch=-1, base_sequence=-1, + batch_size=1024) + meta = builder.append( + 0, timestamp=None, key=None, value=b"M" * 700, headers=[]) + assert meta is not None + meta = builder.append( + 1, timestamp=None, key=None, value=b"M" * 700, headers=[]) + assert meta is None + meta = builder.append( + 2, timestamp=None, key=None, value=b"M" * 700, headers=[]) + assert meta is None + assert len(builder.build()) < 1000 diff --git a/test/record/test_records.py b/test/record/test_records.py index fc3eaca..7306bbc 100644 --- a/test/record/test_records.py +++ b/test/record/test_records.py @@ -2,6 +2,26 @@ import pytest from kafka.record import MemoryRecords from kafka.errors import CorruptRecordException +# This is real live data from Kafka 11 broker +record_batch_data_v2 = [ + # First Batch value == "123" + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00;\x00\x00\x00\x01\x02\x03' + b'\x18\xa2p\x00\x00\x00\x00\x00\x00\x00\x00\x01]\xff{\x06<\x00\x00\x01]' + b'\xff{\x06<\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00' + b'\x00\x00\x01\x12\x00\x00\x00\x01\x06123\x00', + # Second Batch value = "" and value = "". 2 records + b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00@\x00\x00\x00\x02\x02\xc8' + b'\\\xbd#\x00\x00\x00\x00\x00\x01\x00\x00\x01]\xff|\xddl\x00\x00\x01]\xff' + b'|\xde\x14\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00' + b'\x00\x00\x02\x0c\x00\x00\x00\x01\x00\x00\x0e\x00\xd0\x02\x02\x01\x00' + b'\x00', + # Third batch value = "123" + b'\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00;\x00\x00\x00\x02\x02.\x0b' + b'\x85\xb7\x00\x00\x00\x00\x00\x00\x00\x00\x01]\xff|\xe7\x9d\x00\x00\x01]' + b'\xff|\xe7\x9d\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff' + b'\x00\x00\x00\x01\x12\x00\x00\x00\x01\x06123\x00' +] + record_batch_data_v1 = [ # First Message value == "123" b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19G\x86(\xc2\x01\x00\x00' @@ -34,6 +54,32 @@ record_batch_data_v0 = [ ] +def test_memory_records_v2(): + data_bytes = b"".join(record_batch_data_v2) + b"\x00" * 4 + records = MemoryRecords(data_bytes) + + assert records.size_in_bytes() == 222 + assert records.valid_bytes() == 218 + + assert records.has_next() is True + batch = records.next_batch() + recs = list(batch) + assert len(recs) == 1 + assert recs[0].value == b"123" + assert recs[0].key is None + assert recs[0].timestamp == 1503229838908 + assert recs[0].timestamp_type == 0 + assert recs[0].checksum is None + assert recs[0].headers == [] + + assert records.next_batch() is not None + assert records.next_batch() is not None + + assert records.has_next() is False + assert records.next_batch() is None + assert records.next_batch() is None + + def test_memory_records_v1(): data_bytes = b"".join(record_batch_data_v1) + b"\x00" * 4 records = MemoryRecords(data_bytes) diff --git a/test/record/test_util.py b/test/record/test_util.py new file mode 100644 index 0000000..bfe0fcc --- /dev/null +++ b/test/record/test_util.py @@ -0,0 +1,95 @@ +import struct +import pytest +from kafka.record import util + + +varint_data = [ + (b"\x00", 0), + (b"\x01", -1), + (b"\x02", 1), + (b"\x7E", 63), + (b"\x7F", -64), + (b"\x80\x01", 64), + (b"\x81\x01", -65), + (b"\xFE\x7F", 8191), + (b"\xFF\x7F", -8192), + (b"\x80\x80\x01", 8192), + (b"\x81\x80\x01", -8193), + (b"\xFE\xFF\x7F", 1048575), + (b"\xFF\xFF\x7F", -1048576), + (b"\x80\x80\x80\x01", 1048576), + (b"\x81\x80\x80\x01", -1048577), + (b"\xFE\xFF\xFF\x7F", 134217727), + (b"\xFF\xFF\xFF\x7F", -134217728), + (b"\x80\x80\x80\x80\x01", 134217728), + (b"\x81\x80\x80\x80\x01", -134217729), + (b"\xFE\xFF\xFF\xFF\x7F", 17179869183), + (b"\xFF\xFF\xFF\xFF\x7F", -17179869184), + (b"\x80\x80\x80\x80\x80\x01", 17179869184), + (b"\x81\x80\x80\x80\x80\x01", -17179869185), + (b"\xFE\xFF\xFF\xFF\xFF\x7F", 2199023255551), + (b"\xFF\xFF\xFF\xFF\xFF\x7F", -2199023255552), + (b"\x80\x80\x80\x80\x80\x80\x01", 2199023255552), + (b"\x81\x80\x80\x80\x80\x80\x01", -2199023255553), + (b"\xFE\xFF\xFF\xFF\xFF\xFF\x7F", 281474976710655), + (b"\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -281474976710656), + (b"\x80\x80\x80\x80\x80\x80\x80\x01", 281474976710656), + (b"\x81\x80\x80\x80\x80\x80\x80\x01", -281474976710657), + (b"\xFE\xFF\xFF\xFF\xFF\xFF\xFF\x7F", 36028797018963967), + (b"\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -36028797018963968), + (b"\x80\x80\x80\x80\x80\x80\x80\x80\x01", 36028797018963968), + (b"\x81\x80\x80\x80\x80\x80\x80\x80\x01", -36028797018963969), + (b"\xFE\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", 4611686018427387903), + (b"\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -4611686018427387904), + (b"\x80\x80\x80\x80\x80\x80\x80\x80\x80\x01", 4611686018427387904), + (b"\x81\x80\x80\x80\x80\x80\x80\x80\x80\x01", -4611686018427387905), +] + + +@pytest.mark.parametrize("encoded, decoded", varint_data) +def test_encode_varint(encoded, decoded): + res = bytearray() + util.encode_varint(decoded, res.append) + assert res == encoded + + +@pytest.mark.parametrize("encoded, decoded", varint_data) +def test_decode_varint(encoded, decoded): + # We add a bit of bytes around just to check position is calculated + # correctly + value, pos = util.decode_varint( + bytearray(b"\x01\xf0" + encoded + b"\xff\x01"), 2) + assert value == decoded + assert pos - 2 == len(encoded) + + +@pytest.mark.parametrize("encoded, decoded", varint_data) +def test_size_of_varint(encoded, decoded): + assert util.size_of_varint(decoded) == len(encoded) + + +def test_crc32c(): + def make_crc(data): + crc = util.calc_crc32c(data) + return struct.pack(">I", crc) + assert make_crc(b"") == b"\x00\x00\x00\x00" + assert make_crc(b"a") == b"\xc1\xd0\x43\x30" + + # Took from librdkafka testcase + long_text = b"""\ + This software is provided 'as-is', without any express or implied + warranty. In no event will the author be held liable for any damages + arising from the use of this software. + + Permission is granted to anyone to use this software for any purpose, + including commercial applications, and to alter it and redistribute it + freely, subject to the following restrictions: + + 1. The origin of this software must not be misrepresented; you must not + claim that you wrote the original software. If you use this software + in a product, an acknowledgment in the product documentation would be + appreciated but is not required. + 2. Altered source versions must be plainly marked as such, and must not be + misrepresented as being the original software. + 3. This notice may not be removed or altered from any source distribution.""" + assert make_crc(long_text) == b"\x7d\xcd\xe1\x13" diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 364a808..ef3f686 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -54,7 +54,7 @@ def _build_record_batch(msgs, compression=0): magic=1, compression_type=0, batch_size=9999999) for msg in msgs: key, value, timestamp = msg - builder.append(key=key, value=value, timestamp=timestamp) + builder.append(key=key, value=value, timestamp=timestamp, headers=[]) builder.close() return builder.buffer() diff --git a/test/test_producer.py b/test/test_producer.py index 41bd52e..20dffc2 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -88,10 +88,7 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression): retries=5, max_block_ms=10000, compression_type=compression) - if producer.config['api_version'] >= (0, 10): - magic = 1 - else: - magic = 0 + magic = producer._max_usable_produce_magic() topic = random_string(5) future = producer.send( @@ -109,7 +106,9 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression): else: assert record.timestamp == -1 # NO_TIMESTAMP - if magic == 1: + if magic >= 2: + assert record.checksum is None + elif magic == 1: assert record.checksum == 1370034956 else: assert record.checksum == 3296137851 |