summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTaras Voinarovskiy <voyn1991@gmail.com>2017-08-25 09:38:27 +0000
committerTaras Voinarovskiy <voyn1991@gmail.com>2017-10-24 15:12:11 +0000
commit3af66bc542efff3f7010bec18b72d844e09488c4 (patch)
treea20623631b36230c9425b08ee95b85afbc9a9455
parente06af5343a55cf8d03e32a645ee970d872cb9ba0 (diff)
downloadkafka-python-v2_records.tar.gz
Add DefaultRecordBatch implementation aka V2 message format parser/builder.v2_records
Added bytecode optimization for varint and append/read_msg functions. Mostly based on avoiding LOAD_GLOBAL calls.
-rw-r--r--benchmarks/record_batch_compose.py4
-rw-r--r--benchmarks/record_batch_read.py4
-rw-r--r--benchmarks/varint_speed.py443
-rw-r--r--kafka/consumer/fetcher.py30
-rw-r--r--kafka/producer/kafka.py15
-rw-r--r--kafka/producer/sender.py9
-rw-r--r--kafka/record/README8
-rw-r--r--kafka/record/_crc32c.py143
-rw-r--r--kafka/record/abc.py10
-rw-r--r--kafka/record/default_records.py595
-rw-r--r--kafka/record/legacy_records.py3
-rw-r--r--kafka/record/memory_records.py23
-rw-r--r--kafka/record/util.py119
-rw-r--r--test/record/test_default_records.py169
-rw-r--r--test/record/test_records.py46
-rw-r--r--test/record/test_util.py95
-rw-r--r--test/test_fetcher.py2
-rw-r--r--test/test_producer.py9
18 files changed, 1696 insertions, 31 deletions
diff --git a/benchmarks/record_batch_compose.py b/benchmarks/record_batch_compose.py
index 86012df..aca669d 100644
--- a/benchmarks/record_batch_compose.py
+++ b/benchmarks/record_batch_compose.py
@@ -58,7 +58,8 @@ def func(loops, magic):
magic, batch_size=DEFAULT_BATCH_SIZE, compression_type=0)
for _ in range(MESSAGES_PER_BATCH):
key, value, timestamp = next(precomputed_samples)
- size = batch.append(timestamp=timestamp, key=key, value=value)
+ size = batch.append(
+ timestamp=timestamp, key=key, value=value)
assert size
batch.close()
results.append(batch.buffer())
@@ -73,3 +74,4 @@ def func(loops, magic):
runner = perf.Runner()
runner.bench_time_func('batch_append_v0', func, 0)
runner.bench_time_func('batch_append_v1', func, 1)
+runner.bench_time_func('batch_append_v2', func, 2)
diff --git a/benchmarks/record_batch_read.py b/benchmarks/record_batch_read.py
index 7ae471e..fc01e42 100644
--- a/benchmarks/record_batch_read.py
+++ b/benchmarks/record_batch_read.py
@@ -35,7 +35,8 @@ def prepare(magic):
size = batch.append(
random.randint(*TIMESTAMP_RANGE),
random_bytes(KEY_SIZE),
- random_bytes(VALUE_SIZE))
+ random_bytes(VALUE_SIZE),
+ headers=[])
assert size
batch.close()
samples.append(bytes(batch.buffer()))
@@ -78,3 +79,4 @@ def func(loops, magic):
runner = perf.Runner()
runner.bench_time_func('batch_read_v0', func, 0)
runner.bench_time_func('batch_read_v1', func, 1)
+runner.bench_time_func('batch_read_v2', func, 2)
diff --git a/benchmarks/varint_speed.py b/benchmarks/varint_speed.py
new file mode 100644
index 0000000..2c5cd62
--- /dev/null
+++ b/benchmarks/varint_speed.py
@@ -0,0 +1,443 @@
+#!/usr/bin/env python
+from __future__ import print_function
+import perf
+import six
+
+
+test_data = [
+ (b"\x00", 0),
+ (b"\x01", -1),
+ (b"\x02", 1),
+ (b"\x7E", 63),
+ (b"\x7F", -64),
+ (b"\x80\x01", 64),
+ (b"\x81\x01", -65),
+ (b"\xFE\x7F", 8191),
+ (b"\xFF\x7F", -8192),
+ (b"\x80\x80\x01", 8192),
+ (b"\x81\x80\x01", -8193),
+ (b"\xFE\xFF\x7F", 1048575),
+ (b"\xFF\xFF\x7F", -1048576),
+ (b"\x80\x80\x80\x01", 1048576),
+ (b"\x81\x80\x80\x01", -1048577),
+ (b"\xFE\xFF\xFF\x7F", 134217727),
+ (b"\xFF\xFF\xFF\x7F", -134217728),
+ (b"\x80\x80\x80\x80\x01", 134217728),
+ (b"\x81\x80\x80\x80\x01", -134217729),
+ (b"\xFE\xFF\xFF\xFF\x7F", 17179869183),
+ (b"\xFF\xFF\xFF\xFF\x7F", -17179869184),
+ (b"\x80\x80\x80\x80\x80\x01", 17179869184),
+ (b"\x81\x80\x80\x80\x80\x01", -17179869185),
+ (b"\xFE\xFF\xFF\xFF\xFF\x7F", 2199023255551),
+ (b"\xFF\xFF\xFF\xFF\xFF\x7F", -2199023255552),
+ (b"\x80\x80\x80\x80\x80\x80\x01", 2199023255552),
+ (b"\x81\x80\x80\x80\x80\x80\x01", -2199023255553),
+ (b"\xFE\xFF\xFF\xFF\xFF\xFF\x7F", 281474976710655),
+ (b"\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -281474976710656),
+ (b"\x80\x80\x80\x80\x80\x80\x80\x01", 281474976710656),
+ (b"\x81\x80\x80\x80\x80\x80\x80\x01", -281474976710657),
+ (b"\xFE\xFF\xFF\xFF\xFF\xFF\xFF\x7F", 36028797018963967),
+ (b"\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -36028797018963968),
+ (b"\x80\x80\x80\x80\x80\x80\x80\x80\x01", 36028797018963968),
+ (b"\x81\x80\x80\x80\x80\x80\x80\x80\x01", -36028797018963969),
+ (b"\xFE\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", 4611686018427387903),
+ (b"\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -4611686018427387904),
+ (b"\x80\x80\x80\x80\x80\x80\x80\x80\x80\x01", 4611686018427387904),
+ (b"\x81\x80\x80\x80\x80\x80\x80\x80\x80\x01", -4611686018427387905),
+]
+
+
+BENCH_VALUES_ENC = [
+ 60, # 1 byte
+ -8192, # 2 bytes
+ 1048575, # 3 bytes
+ 134217727, # 4 bytes
+ -17179869184, # 5 bytes
+ 2199023255551, # 6 bytes
+]
+
+BENCH_VALUES_DEC = [
+ b"\x7E", # 1 byte
+ b"\xFF\x7F", # 2 bytes
+ b"\xFE\xFF\x7F", # 3 bytes
+ b"\xFF\xFF\xFF\x7F", # 4 bytes
+ b"\x80\x80\x80\x80\x01", # 5 bytes
+ b"\xFE\xFF\xFF\xFF\xFF\x7F", # 6 bytes
+]
+BENCH_VALUES_DEC = list(map(bytearray, BENCH_VALUES_DEC))
+
+
+def _assert_valid_enc(enc_func):
+ for encoded, decoded in test_data:
+ assert enc_func(decoded) == encoded, decoded
+
+
+def _assert_valid_dec(dec_func):
+ for encoded, decoded in test_data:
+ res, pos = dec_func(bytearray(encoded))
+ assert res == decoded, (decoded, res)
+ assert pos == len(encoded), (decoded, pos)
+
+
+def _assert_valid_size(size_func):
+ for encoded, decoded in test_data:
+ assert size_func(decoded) == len(encoded), decoded
+
+
+def encode_varint_1(num):
+ """ Encode an integer to a varint presentation. See
+ https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints
+ on how those can be produced.
+
+ Arguments:
+ num (int): Value to encode
+
+ Returns:
+ bytearray: Encoded presentation of integer with length from 1 to 10
+ bytes
+ """
+ # Shift sign to the end of number
+ num = (num << 1) ^ (num >> 63)
+ # Max 10 bytes. We assert those are allocated
+ buf = bytearray(10)
+
+ for i in range(10):
+ # 7 lowest bits from the number and set 8th if we still have pending
+ # bits left to encode
+ buf[i] = num & 0x7f | (0x80 if num > 0x7f else 0)
+ num = num >> 7
+ if num == 0:
+ break
+ else:
+ # Max size of endcoded double is 10 bytes for unsigned values
+ raise ValueError("Out of double range")
+ return buf[:i + 1]
+
+_assert_valid_enc(encode_varint_1)
+
+
+def encode_varint_2(value, int2byte=six.int2byte):
+ value = (value << 1) ^ (value >> 63)
+
+ bits = value & 0x7f
+ value >>= 7
+ res = b""
+ while value:
+ res += int2byte(0x80 | bits)
+ bits = value & 0x7f
+ value >>= 7
+ return res + int2byte(bits)
+
+_assert_valid_enc(encode_varint_2)
+
+
+def encode_varint_3(value, buf):
+ append = buf.append
+ value = (value << 1) ^ (value >> 63)
+
+ bits = value & 0x7f
+ value >>= 7
+ while value:
+ append(0x80 | bits)
+ bits = value & 0x7f
+ value >>= 7
+ append(bits)
+ return value
+
+
+for encoded, decoded in test_data:
+ res = bytearray()
+ encode_varint_3(decoded, res)
+ assert res == encoded
+
+
+def encode_varint_4(value, int2byte=six.int2byte):
+ value = (value << 1) ^ (value >> 63)
+
+ if value <= 0x7f: # 1 byte
+ return int2byte(value)
+ if value <= 0x3fff: # 2 bytes
+ return int2byte(0x80 | (value & 0x7f)) + int2byte(value >> 7)
+ if value <= 0x1fffff: # 3 bytes
+ return int2byte(0x80 | (value & 0x7f)) + \
+ int2byte(0x80 | ((value >> 7) & 0x7f)) + \
+ int2byte(value >> 14)
+ if value <= 0xfffffff: # 4 bytes
+ return int2byte(0x80 | (value & 0x7f)) + \
+ int2byte(0x80 | ((value >> 7) & 0x7f)) + \
+ int2byte(0x80 | ((value >> 14) & 0x7f)) + \
+ int2byte(value >> 21)
+ if value <= 0x7ffffffff: # 5 bytes
+ return int2byte(0x80 | (value & 0x7f)) + \
+ int2byte(0x80 | ((value >> 7) & 0x7f)) + \
+ int2byte(0x80 | ((value >> 14) & 0x7f)) + \
+ int2byte(0x80 | ((value >> 21) & 0x7f)) + \
+ int2byte(value >> 28)
+ else:
+ # Return to general algorithm
+ bits = value & 0x7f
+ value >>= 7
+ res = b""
+ while value:
+ res += int2byte(0x80 | bits)
+ bits = value & 0x7f
+ value >>= 7
+ return res + int2byte(bits)
+
+
+_assert_valid_enc(encode_varint_4)
+
+# import dis
+# dis.dis(encode_varint_4)
+
+
+def encode_varint_5(value, buf, pos=0):
+ value = (value << 1) ^ (value >> 63)
+
+ bits = value & 0x7f
+ value >>= 7
+ while value:
+ buf[pos] = 0x80 | bits
+ bits = value & 0x7f
+ value >>= 7
+ pos += 1
+ buf[pos] = bits
+ return pos + 1
+
+for encoded, decoded in test_data:
+ res = bytearray(10)
+ written = encode_varint_5(decoded, res)
+ assert res[:written] == encoded
+
+
+def encode_varint_6(value, buf):
+ append = buf.append
+ value = (value << 1) ^ (value >> 63)
+
+ if value <= 0x7f: # 1 byte
+ append(value)
+ return 1
+ if value <= 0x3fff: # 2 bytes
+ append(0x80 | (value & 0x7f))
+ append(value >> 7)
+ return 2
+ if value <= 0x1fffff: # 3 bytes
+ append(0x80 | (value & 0x7f))
+ append(0x80 | ((value >> 7) & 0x7f))
+ append(value >> 14)
+ return 3
+ if value <= 0xfffffff: # 4 bytes
+ append(0x80 | (value & 0x7f))
+ append(0x80 | ((value >> 7) & 0x7f))
+ append(0x80 | ((value >> 14) & 0x7f))
+ append(value >> 21)
+ return 4
+ if value <= 0x7ffffffff: # 5 bytes
+ append(0x80 | (value & 0x7f))
+ append(0x80 | ((value >> 7) & 0x7f))
+ append(0x80 | ((value >> 14) & 0x7f))
+ append(0x80 | ((value >> 21) & 0x7f))
+ append(value >> 28)
+ return 5
+ else:
+ # Return to general algorithm
+ bits = value & 0x7f
+ value >>= 7
+ i = 0
+ while value:
+ append(0x80 | bits)
+ bits = value & 0x7f
+ value >>= 7
+ i += 1
+ append(bits)
+ return i
+
+
+for encoded, decoded in test_data:
+ res = bytearray()
+ encode_varint_6(decoded, res)
+ assert res == encoded
+
+
+def size_of_varint_1(value):
+ """ Number of bytes needed to encode an integer in variable-length format.
+ """
+ value = (value << 1) ^ (value >> 63)
+ res = 0
+ while True:
+ res += 1
+ value = value >> 7
+ if value == 0:
+ break
+ return res
+
+_assert_valid_size(size_of_varint_1)
+
+
+def size_of_varint_2(value):
+ """ Number of bytes needed to encode an integer in variable-length format.
+ """
+ value = (value << 1) ^ (value >> 63)
+ if value <= 0x7f:
+ return 1
+ if value <= 0x3fff:
+ return 2
+ if value <= 0x1fffff:
+ return 3
+ if value <= 0xfffffff:
+ return 4
+ if value <= 0x7ffffffff:
+ return 5
+ if value <= 0x3ffffffffff:
+ return 6
+ if value <= 0x1ffffffffffff:
+ return 7
+ if value <= 0xffffffffffffff:
+ return 8
+ if value <= 0x7fffffffffffffff:
+ return 9
+ return 10
+
+_assert_valid_size(size_of_varint_2)
+
+
+if six.PY3:
+ def _read_byte(memview, pos):
+ """ Read a byte from memoryview as an integer
+
+ Raises:
+ IndexError: if position is out of bounds
+ """
+ return memview[pos]
+else:
+ def _read_byte(memview, pos):
+ """ Read a byte from memoryview as an integer
+
+ Raises:
+ IndexError: if position is out of bounds
+ """
+ return ord(memview[pos])
+
+
+def decode_varint_1(buffer, pos=0):
+ """ Decode an integer from a varint presentation. See
+ https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints
+ on how those can be produced.
+
+ Arguments:
+ buffer (bytes-like): any object acceptable by ``memoryview``
+ pos (int): optional position to read from
+
+ Returns:
+ (int, int): Decoded int value and next read position
+ """
+ value = 0
+ shift = 0
+ memview = memoryview(buffer)
+ for i in range(pos, pos + 10):
+ try:
+ byte = _read_byte(memview, i)
+ except IndexError:
+ raise ValueError("End of byte stream")
+ if byte & 0x80 != 0:
+ value |= (byte & 0x7f) << shift
+ shift += 7
+ else:
+ value |= byte << shift
+ break
+ else:
+ # Max size of endcoded double is 10 bytes for unsigned values
+ raise ValueError("Out of double range")
+ # Normalize sign
+ return (value >> 1) ^ -(value & 1), i + 1
+
+_assert_valid_dec(decode_varint_1)
+
+
+def decode_varint_2(buffer, pos=0):
+ result = 0
+ shift = 0
+ while 1:
+ b = buffer[pos]
+ result |= ((b & 0x7f) << shift)
+ pos += 1
+ if not (b & 0x80):
+ # result = result_type(() & mask)
+ return ((result >> 1) ^ -(result & 1), pos)
+ shift += 7
+ if shift >= 64:
+ raise ValueError("Out of int64 range")
+
+
+_assert_valid_dec(decode_varint_2)
+
+
+def decode_varint_3(buffer, pos=0):
+ result = buffer[pos]
+ if not (result & 0x81):
+ return (result >> 1), pos + 1
+ if not (result & 0x80):
+ return (result >> 1) ^ (~0), pos + 1
+
+ result &= 0x7f
+ pos += 1
+ shift = 7
+ while 1:
+ b = buffer[pos]
+ result |= ((b & 0x7f) << shift)
+ pos += 1
+ if not (b & 0x80):
+ return ((result >> 1) ^ -(result & 1), pos)
+ shift += 7
+ if shift >= 64:
+ raise ValueError("Out of int64 range")
+
+
+_assert_valid_dec(decode_varint_3)
+
+# import dis
+# dis.dis(decode_varint_3)
+
+runner = perf.Runner()
+# Encode algorithms returning a bytes result
+for bench_func in [
+ encode_varint_1,
+ encode_varint_2,
+ encode_varint_4]:
+ for i, value in enumerate(BENCH_VALUES_ENC):
+ runner.bench_func(
+ '{}_{}byte'.format(bench_func.__name__, i + 1),
+ bench_func, value)
+
+# Encode algorithms writing to the buffer
+for bench_func in [
+ encode_varint_3,
+ encode_varint_5,
+ encode_varint_6]:
+ for i, value in enumerate(BENCH_VALUES_ENC):
+ fname = bench_func.__name__
+ runner.timeit(
+ '{}_{}byte'.format(fname, i + 1),
+ stmt="{}({}, buffer)".format(fname, value),
+ setup="from __main__ import {}; buffer = bytearray(10)".format(
+ fname)
+ )
+
+# Size algorithms
+for bench_func in [
+ size_of_varint_1,
+ size_of_varint_2]:
+ for i, value in enumerate(BENCH_VALUES_ENC):
+ runner.bench_func(
+ '{}_{}byte'.format(bench_func.__name__, i + 1),
+ bench_func, value)
+
+# Decode algorithms
+for bench_func in [
+ decode_varint_1,
+ decode_varint_2,
+ decode_varint_3]:
+ for i, value in enumerate(BENCH_VALUES_DEC):
+ runner.bench_func(
+ '{}_{}byte'.format(bench_func.__name__, i + 1),
+ bench_func, value)
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index d3ee26e..ddd7567 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -23,6 +23,10 @@ from kafka.structs import TopicPartition, OffsetAndTimestamp
log = logging.getLogger(__name__)
+# Isolation levels
+READ_UNCOMMITTED = 0
+READ_COMMITTED = 1
+
ConsumerRecord = collections.namedtuple("ConsumerRecord",
["topic", "partition", "offset", "timestamp", "timestamp_type",
"key", "value", "checksum", "serialized_key_size", "serialized_value_size"])
@@ -114,6 +118,7 @@ class Fetcher(six.Iterator):
self._iterator = None
self._fetch_futures = collections.deque()
self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
+ self._isolation_level = READ_UNCOMMITTED
def send_fetches(self):
"""Send FetchRequests for all assigned partitions that do not already have
@@ -670,7 +675,9 @@ class Fetcher(six.Iterator):
log.debug("Adding fetch request for partition %s at offset %d",
partition, position)
- if self.config['api_version'] >= (0, 10, 1):
+ if self.config['api_version'] >= (0, 11, 0):
+ version = 4
+ elif self.config['api_version'] >= (0, 10, 1):
version = 3
elif self.config['api_version'] >= (0, 10):
version = 2
@@ -696,12 +703,21 @@ class Fetcher(six.Iterator):
# dicts retain insert order.
partition_data = list(partition_data.items())
random.shuffle(partition_data)
- requests[node_id] = FetchRequest[version](
- -1, # replica_id
- self.config['fetch_max_wait_ms'],
- self.config['fetch_min_bytes'],
- self.config['fetch_max_bytes'],
- partition_data)
+ if version == 3:
+ requests[node_id] = FetchRequest[version](
+ -1, # replica_id
+ self.config['fetch_max_wait_ms'],
+ self.config['fetch_min_bytes'],
+ self.config['fetch_max_bytes'],
+ partition_data)
+ else:
+ requests[node_id] = FetchRequest[version](
+ -1, # replica_id
+ self.config['fetch_max_wait_ms'],
+ self.config['fetch_min_bytes'],
+ self.config['fetch_max_bytes'],
+ self._isolation_level,
+ partition_data)
return requests
def _handle_fetch_response(self, request, send_time, response):
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 0ffc29c..646e773 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -15,6 +15,7 @@ from ..client_async import KafkaClient, selectors
from ..codec import has_gzip, has_snappy, has_lz4
from ..metrics import MetricConfig, Metrics
from ..partitioner.default import DefaultPartitioner
+from ..record.default_records import DefaultRecordBatchBuilder
from ..record.legacy_records import LegacyRecordBatchBuilder
from ..serializer import Serializer
from ..structs import TopicPartition
@@ -486,15 +487,21 @@ class KafkaProducer(object):
return self._wait_on_metadata(topic, max_wait)
def _max_usable_produce_magic(self):
- if self.config['api_version'] >= (0, 10):
+ if self.config['api_version'] >= (0, 11):
+ return 2
+ elif self.config['api_version'] >= (0, 10):
return 1
else:
return 0
- def _estimate_size_in_bytes(self, key, value):
+ def _estimate_size_in_bytes(self, key, value, headers=[]):
magic = self._max_usable_produce_magic()
- return LegacyRecordBatchBuilder.estimate_size_in_bytes(
- magic, self.config['compression_type'], key, value)
+ if magic == 2:
+ return DefaultRecordBatchBuilder.estimate_size_in_bytes(
+ key, value, headers)
+ else:
+ return LegacyRecordBatchBuilder.estimate_size_in_bytes(
+ magic, self.config['compression_type'], key, value)
def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
"""Publish a message to a topic.
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 72a15bb..ffc67f8 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -291,7 +291,11 @@ class Sender(threading.Thread):
buf = batch.records.buffer()
produce_records_by_partition[topic][partition] = buf
- if self.config['api_version'] >= (0, 10):
+ kwargs = {}
+ if self.config['api_version'] >= (0, 11):
+ version = 3
+ kwargs = dict(transactional_id=None)
+ elif self.config['api_version'] >= (0, 10):
version = 2
elif self.config['api_version'] == (0, 9):
version = 1
@@ -302,7 +306,8 @@ class Sender(threading.Thread):
timeout=timeout,
topics=[(topic, list(partition_info.items()))
for topic, partition_info
- in six.iteritems(produce_records_by_partition)]
+ in six.iteritems(produce_records_by_partition)],
+ **kwargs
)
def wakeup(self):
diff --git a/kafka/record/README b/kafka/record/README
new file mode 100644
index 0000000..e445455
--- /dev/null
+++ b/kafka/record/README
@@ -0,0 +1,8 @@
+Module structured mostly based on
+kafka/clients/src/main/java/org/apache/kafka/common/record/ module of Java
+Client.
+
+See abc.py for abstract declarations. `ABCRecords` is used as a facade to hide
+version differences. `ABCRecordBatch` subclasses will implement actual parsers
+for different versions (v0/v1 as LegacyBatch and v2 as DefaultBatch. Names
+taken from Java).
diff --git a/kafka/record/_crc32c.py b/kafka/record/_crc32c.py
new file mode 100644
index 0000000..5704f82
--- /dev/null
+++ b/kafka/record/_crc32c.py
@@ -0,0 +1,143 @@
+#!/usr/bin/env python
+#
+# Taken from https://cloud.google.com/appengine/docs/standard/python/refdocs/\
+# modules/google/appengine/api/files/crc32c?hl=ru
+#
+# Copyright 2007 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Implementation of CRC-32C checksumming as in rfc3720 section B.4.
+See http://en.wikipedia.org/wiki/Cyclic_redundancy_check for details on CRC-32C
+This code is a manual python translation of c code generated by
+pycrc 0.7.1 (http://www.tty1.net/pycrc/). Command line used:
+'./pycrc.py --model=crc-32c --generate c --algorithm=table-driven'
+"""
+
+import array
+
+CRC_TABLE = (
+ 0x00000000, 0xf26b8303, 0xe13b70f7, 0x1350f3f4,
+ 0xc79a971f, 0x35f1141c, 0x26a1e7e8, 0xd4ca64eb,
+ 0x8ad958cf, 0x78b2dbcc, 0x6be22838, 0x9989ab3b,
+ 0x4d43cfd0, 0xbf284cd3, 0xac78bf27, 0x5e133c24,
+ 0x105ec76f, 0xe235446c, 0xf165b798, 0x030e349b,
+ 0xd7c45070, 0x25afd373, 0x36ff2087, 0xc494a384,
+ 0x9a879fa0, 0x68ec1ca3, 0x7bbcef57, 0x89d76c54,
+ 0x5d1d08bf, 0xaf768bbc, 0xbc267848, 0x4e4dfb4b,
+ 0x20bd8ede, 0xd2d60ddd, 0xc186fe29, 0x33ed7d2a,
+ 0xe72719c1, 0x154c9ac2, 0x061c6936, 0xf477ea35,
+ 0xaa64d611, 0x580f5512, 0x4b5fa6e6, 0xb93425e5,
+ 0x6dfe410e, 0x9f95c20d, 0x8cc531f9, 0x7eaeb2fa,
+ 0x30e349b1, 0xc288cab2, 0xd1d83946, 0x23b3ba45,
+ 0xf779deae, 0x05125dad, 0x1642ae59, 0xe4292d5a,
+ 0xba3a117e, 0x4851927d, 0x5b016189, 0xa96ae28a,
+ 0x7da08661, 0x8fcb0562, 0x9c9bf696, 0x6ef07595,
+ 0x417b1dbc, 0xb3109ebf, 0xa0406d4b, 0x522bee48,
+ 0x86e18aa3, 0x748a09a0, 0x67dafa54, 0x95b17957,
+ 0xcba24573, 0x39c9c670, 0x2a993584, 0xd8f2b687,
+ 0x0c38d26c, 0xfe53516f, 0xed03a29b, 0x1f682198,
+ 0x5125dad3, 0xa34e59d0, 0xb01eaa24, 0x42752927,
+ 0x96bf4dcc, 0x64d4cecf, 0x77843d3b, 0x85efbe38,
+ 0xdbfc821c, 0x2997011f, 0x3ac7f2eb, 0xc8ac71e8,
+ 0x1c661503, 0xee0d9600, 0xfd5d65f4, 0x0f36e6f7,
+ 0x61c69362, 0x93ad1061, 0x80fde395, 0x72966096,
+ 0xa65c047d, 0x5437877e, 0x4767748a, 0xb50cf789,
+ 0xeb1fcbad, 0x197448ae, 0x0a24bb5a, 0xf84f3859,
+ 0x2c855cb2, 0xdeeedfb1, 0xcdbe2c45, 0x3fd5af46,
+ 0x7198540d, 0x83f3d70e, 0x90a324fa, 0x62c8a7f9,
+ 0xb602c312, 0x44694011, 0x5739b3e5, 0xa55230e6,
+ 0xfb410cc2, 0x092a8fc1, 0x1a7a7c35, 0xe811ff36,
+ 0x3cdb9bdd, 0xceb018de, 0xdde0eb2a, 0x2f8b6829,
+ 0x82f63b78, 0x709db87b, 0x63cd4b8f, 0x91a6c88c,
+ 0x456cac67, 0xb7072f64, 0xa457dc90, 0x563c5f93,
+ 0x082f63b7, 0xfa44e0b4, 0xe9141340, 0x1b7f9043,
+ 0xcfb5f4a8, 0x3dde77ab, 0x2e8e845f, 0xdce5075c,
+ 0x92a8fc17, 0x60c37f14, 0x73938ce0, 0x81f80fe3,
+ 0x55326b08, 0xa759e80b, 0xb4091bff, 0x466298fc,
+ 0x1871a4d8, 0xea1a27db, 0xf94ad42f, 0x0b21572c,
+ 0xdfeb33c7, 0x2d80b0c4, 0x3ed04330, 0xccbbc033,
+ 0xa24bb5a6, 0x502036a5, 0x4370c551, 0xb11b4652,
+ 0x65d122b9, 0x97baa1ba, 0x84ea524e, 0x7681d14d,
+ 0x2892ed69, 0xdaf96e6a, 0xc9a99d9e, 0x3bc21e9d,
+ 0xef087a76, 0x1d63f975, 0x0e330a81, 0xfc588982,
+ 0xb21572c9, 0x407ef1ca, 0x532e023e, 0xa145813d,
+ 0x758fe5d6, 0x87e466d5, 0x94b49521, 0x66df1622,
+ 0x38cc2a06, 0xcaa7a905, 0xd9f75af1, 0x2b9cd9f2,
+ 0xff56bd19, 0x0d3d3e1a, 0x1e6dcdee, 0xec064eed,
+ 0xc38d26c4, 0x31e6a5c7, 0x22b65633, 0xd0ddd530,
+ 0x0417b1db, 0xf67c32d8, 0xe52cc12c, 0x1747422f,
+ 0x49547e0b, 0xbb3ffd08, 0xa86f0efc, 0x5a048dff,
+ 0x8ecee914, 0x7ca56a17, 0x6ff599e3, 0x9d9e1ae0,
+ 0xd3d3e1ab, 0x21b862a8, 0x32e8915c, 0xc083125f,
+ 0x144976b4, 0xe622f5b7, 0xf5720643, 0x07198540,
+ 0x590ab964, 0xab613a67, 0xb831c993, 0x4a5a4a90,
+ 0x9e902e7b, 0x6cfbad78, 0x7fab5e8c, 0x8dc0dd8f,
+ 0xe330a81a, 0x115b2b19, 0x020bd8ed, 0xf0605bee,
+ 0x24aa3f05, 0xd6c1bc06, 0xc5914ff2, 0x37faccf1,
+ 0x69e9f0d5, 0x9b8273d6, 0x88d28022, 0x7ab90321,
+ 0xae7367ca, 0x5c18e4c9, 0x4f48173d, 0xbd23943e,
+ 0xf36e6f75, 0x0105ec76, 0x12551f82, 0xe03e9c81,
+ 0x34f4f86a, 0xc69f7b69, 0xd5cf889d, 0x27a40b9e,
+ 0x79b737ba, 0x8bdcb4b9, 0x988c474d, 0x6ae7c44e,
+ 0xbe2da0a5, 0x4c4623a6, 0x5f16d052, 0xad7d5351,
+)
+
+CRC_INIT = 0
+_MASK = 0xFFFFFFFF
+
+
+def crc_update(crc, data):
+ """Update CRC-32C checksum with data.
+ Args:
+ crc: 32-bit checksum to update as long.
+ data: byte array, string or iterable over bytes.
+ Returns:
+ 32-bit updated CRC-32C as long.
+ """
+ if type(data) != array.array or data.itemsize != 1:
+ buf = array.array("B", data)
+ else:
+ buf = data
+ crc = crc ^ _MASK
+ for b in buf:
+ table_index = (crc ^ b) & 0xff
+ crc = (CRC_TABLE[table_index] ^ (crc >> 8)) & _MASK
+ return crc ^ _MASK
+
+
+def crc_finalize(crc):
+ """Finalize CRC-32C checksum.
+ This function should be called as last step of crc calculation.
+ Args:
+ crc: 32-bit checksum as long.
+ Returns:
+ finalized 32-bit checksum as long
+ """
+ return crc & _MASK
+
+
+def crc(data):
+ """Compute CRC-32C checksum of the data.
+ Args:
+ data: byte array, string or iterable over bytes.
+ Returns:
+ 32-bit CRC-32C checksum of data as long.
+ """
+ return crc_finalize(crc_update(CRC_INIT, data))
+
+
+if __name__ == "__main__":
+ import sys
+ data = sys.stdin.read()
+ print(hex(crc(data)))
diff --git a/kafka/record/abc.py b/kafka/record/abc.py
index 8a27276..83121c6 100644
--- a/kafka/record/abc.py
+++ b/kafka/record/abc.py
@@ -36,12 +36,18 @@ class ABCRecord(object):
be the checksum for v0 and v1 and None for v2 and above.
"""
+ @abc.abstractproperty
+ def headers(self):
+ """ If supported by version list of key-value tuples, or empty list if
+ not supported by format.
+ """
+
class ABCRecordBatchBuilder(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
- def append(self, offset, timestamp, key, value):
+ def append(self, offset, timestamp, key, value, headers=None):
""" Writes record to internal buffer.
Arguments:
@@ -51,6 +57,8 @@ class ABCRecordBatchBuilder(object):
set to current time.
key (bytes or None): Key of the record
value (bytes or None): Value of the record
+ headers (List[Tuple[str, bytes]]): Headers of the record. Header
+ keys can not be ``None``.
Returns:
(bytes, int): Checksum of the written record (or None for v2 and
diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py
new file mode 100644
index 0000000..3d517af
--- /dev/null
+++ b/kafka/record/default_records.py
@@ -0,0 +1,595 @@
+# See:
+# https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/\
+# apache/kafka/common/record/DefaultRecordBatch.java
+# https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/\
+# apache/kafka/common/record/DefaultRecord.java
+
+# RecordBatch and Record implementation for magic 2 and above.
+# The schema is given below:
+
+# RecordBatch =>
+# BaseOffset => Int64
+# Length => Int32
+# PartitionLeaderEpoch => Int32
+# Magic => Int8
+# CRC => Uint32
+# Attributes => Int16
+# LastOffsetDelta => Int32 // also serves as LastSequenceDelta
+# FirstTimestamp => Int64
+# MaxTimestamp => Int64
+# ProducerId => Int64
+# ProducerEpoch => Int16
+# BaseSequence => Int32
+# Records => [Record]
+
+# Record =>
+# Length => Varint
+# Attributes => Int8
+# TimestampDelta => Varlong
+# OffsetDelta => Varint
+# Key => Bytes
+# Value => Bytes
+# Headers => [HeaderKey HeaderValue]
+# HeaderKey => String
+# HeaderValue => Bytes
+
+# Note that when compression is enabled (see attributes below), the compressed
+# record data is serialized directly following the count of the number of
+# records. (ie Records => [Record], but without length bytes)
+
+# The CRC covers the data from the attributes to the end of the batch (i.e. all
+# the bytes that follow the CRC). It is located after the magic byte, which
+# means that clients must parse the magic byte before deciding how to interpret
+# the bytes between the batch length and the magic byte. The partition leader
+# epoch field is not included in the CRC computation to avoid the need to
+# recompute the CRC when this field is assigned for every batch that is
+# received by the broker. The CRC-32C (Castagnoli) polynomial is used for the
+# computation.
+
+# The current RecordBatch attributes are given below:
+#
+# * Unused (6-15)
+# * Control (5)
+# * Transactional (4)
+# * Timestamp Type (3)
+# * Compression Type (0-2)
+
+import io
+import struct
+import time
+from .abc import ABCRecord, ABCRecordBatch, ABCRecordBatchBuilder
+from .util import decode_varint, encode_varint, calc_crc32c, size_of_varint
+
+from kafka.errors import CorruptRecordException
+from kafka.codec import (
+ gzip_encode, snappy_encode, lz4_encode,
+ gzip_decode, snappy_decode, lz4_decode
+)
+
+
+class DefaultRecordBase(object):
+
+ HEADER_STRUCT = struct.Struct(
+ ">q" # BaseOffset => Int64
+ "i" # Length => Int32
+ "i" # PartitionLeaderEpoch => Int32
+ "b" # Magic => Int8
+ "I" # CRC => Uint32
+ "h" # Attributes => Int16
+ "i" # LastOffsetDelta => Int32 // also serves as LastSequenceDelta
+ "q" # FirstTimestamp => Int64
+ "q" # MaxTimestamp => Int64
+ "q" # ProducerId => Int64
+ "h" # ProducerEpoch => Int16
+ "i" # BaseSequence => Int32
+ "i" # Records count => Int32
+ )
+ # Byte offset in HEADER_STRUCT of attributes field. Used to calculate CRC
+ ATTRIBUTES_OFFSET = struct.calcsize(">qiibI")
+ CRC_OFFSET = struct.calcsize(">qiib")
+ AFTER_LEN_OFFSET = struct.calcsize(">qi")
+
+ CODEC_MASK = 0x07
+ CODEC_NONE = 0x00
+ CODEC_GZIP = 0x01
+ CODEC_SNAPPY = 0x02
+ CODEC_LZ4 = 0x03
+ TIMESTAMP_TYPE_MASK = 0x08
+ TRANSACTIONAL_MASK = 0x10
+ CONTROL_MASK = 0x20
+
+ LOG_APPEND_TIME = 1
+ CREATE_TIME = 0
+
+
+class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch):
+
+ def __init__(self, buffer):
+ self._buffer = bytearray(buffer)
+ self._header_data = self.HEADER_STRUCT.unpack_from(self._buffer)
+ self._pos = self.HEADER_STRUCT.size
+ self._num_records = self._header_data[12]
+ self._next_record_index = 0
+ self._decompressed = False
+
+ @property
+ def base_offset(self):
+ return self._header_data[0]
+
+ @property
+ def magic(self):
+ return self._header_data[3]
+
+ @property
+ def crc(self):
+ return self._header_data[4]
+
+ @property
+ def attributes(self):
+ return self._header_data[5]
+
+ @property
+ def compression_type(self):
+ return self.attributes & self.CODEC_MASK
+
+ @property
+ def timestamp_type(self):
+ return int(bool(self.attributes & self.TIMESTAMP_TYPE_MASK))
+
+ @property
+ def is_transactional(self):
+ return bool(self.attributes & self.TRANSACTIONAL_MASK)
+
+ @property
+ def is_control_batch(self):
+ return bool(self.attributes & self.CONTROL_MASK)
+
+ @property
+ def first_timestamp(self):
+ return self._header_data[7]
+
+ @property
+ def max_timestamp(self):
+ return self._header_data[8]
+
+ def _maybe_uncompress(self):
+ if not self._decompressed:
+ compression_type = self.compression_type
+ if compression_type != self.CODEC_NONE:
+ data = memoryview(self._buffer)[self._pos:]
+ if compression_type == self.CODEC_GZIP:
+ uncompressed = gzip_decode(data)
+ if compression_type == self.CODEC_SNAPPY:
+ uncompressed = snappy_decode(data.tobytes())
+ if compression_type == self.CODEC_LZ4:
+ uncompressed = lz4_decode(data.tobytes())
+ self._buffer = bytearray(uncompressed)
+ self._pos = 0
+ self._decompressed = True
+
+ def _read_msg(
+ self,
+ decode_varint=decode_varint):
+ # Record =>
+ # Length => Varint
+ # Attributes => Int8
+ # TimestampDelta => Varlong
+ # OffsetDelta => Varint
+ # Key => Bytes
+ # Value => Bytes
+ # Headers => [HeaderKey HeaderValue]
+ # HeaderKey => String
+ # HeaderValue => Bytes
+
+ buffer = self._buffer
+ pos = self._pos
+ length, pos = decode_varint(buffer, pos)
+ start_pos = pos
+ _, pos = decode_varint(buffer, pos) # attrs can be skipped for now
+
+ ts_delta, pos = decode_varint(buffer, pos)
+ if self.timestamp_type == self.LOG_APPEND_TIME:
+ timestamp = self.max_timestamp
+ else:
+ timestamp = self.first_timestamp + ts_delta
+
+ offset_delta, pos = decode_varint(buffer, pos)
+ offset = self.base_offset + offset_delta
+
+ key_len, pos = decode_varint(buffer, pos)
+ if key_len >= 0:
+ key = bytes(buffer[pos: pos + key_len])
+ pos += key_len
+ else:
+ key = None
+
+ value_len, pos = decode_varint(buffer, pos)
+ if value_len >= 0:
+ value = bytes(buffer[pos: pos + value_len])
+ pos += value_len
+ else:
+ value = None
+
+ header_count, pos = decode_varint(buffer, pos)
+ if header_count < 0:
+ raise CorruptRecordException("Found invalid number of record "
+ "headers {}".format(header_count))
+ headers = []
+ while header_count:
+ # Header key is of type String, that can't be None
+ h_key_len, pos = decode_varint(buffer, pos)
+ if h_key_len < 0:
+ raise CorruptRecordException(
+ "Invalid negative header key size {}".format(h_key_len))
+ h_key = buffer[pos: pos + h_key_len].decode("utf-8")
+ pos += h_key_len
+
+ # Value is of type NULLABLE_BYTES, so it can be None
+ h_value_len, pos = decode_varint(buffer, pos)
+ if h_value_len >= 0:
+ h_value = bytes(buffer[pos: pos + h_value_len])
+ pos += h_value_len
+ else:
+ h_value = None
+
+ headers.append((h_key, h_value))
+ header_count -= 1
+
+ # validate whether we have read all header bytes in the current record
+ if pos - start_pos != length:
+ CorruptRecordException(
+ "Invalid record size: expected to read {} bytes in record "
+ "payload, but instead read {}".format(length, pos - start_pos))
+ self._pos = pos
+
+ return DefaultRecord(
+ offset, timestamp, self.timestamp_type, key, value, headers)
+
+ def __iter__(self):
+ self._maybe_uncompress()
+ return self
+
+ def __next__(self):
+ if self._next_record_index >= self._num_records:
+ if self._pos != len(self._buffer):
+ raise CorruptRecordException(
+ "{} unconsumed bytes after all records consumed".format(
+ len(self._buffer) - self._pos))
+ raise StopIteration
+ try:
+ msg = self._read_msg()
+ except (ValueError, IndexError) as err:
+ raise CorruptRecordException(
+ "Found invalid record structure: {!r}".format(err))
+ else:
+ self._next_record_index += 1
+ return msg
+
+ next = __next__
+
+ def validate_crc(self):
+ assert self._decompressed is False, \
+ "Validate should be called before iteration"
+
+ crc = self.crc
+ data_view = memoryview(self._buffer)[self.ATTRIBUTES_OFFSET:]
+ verify_crc = calc_crc32c(data_view.tobytes())
+ return crc == verify_crc
+
+
+class DefaultRecord(ABCRecord):
+
+ __slots__ = ("_offset", "_timestamp", "_timestamp_type", "_key", "_value",
+ "_headers")
+
+ def __init__(self, offset, timestamp, timestamp_type, key, value, headers):
+ self._offset = offset
+ self._timestamp = timestamp
+ self._timestamp_type = timestamp_type
+ self._key = key
+ self._value = value
+ self._headers = headers
+
+ @property
+ def offset(self):
+ return self._offset
+
+ @property
+ def timestamp(self):
+ """ Epoch milliseconds
+ """
+ return self._timestamp
+
+ @property
+ def timestamp_type(self):
+ """ CREATE_TIME(0) or APPEND_TIME(1)
+ """
+ return self._timestamp_type
+
+ @property
+ def key(self):
+ """ Bytes key or None
+ """
+ return self._key
+
+ @property
+ def value(self):
+ """ Bytes value or None
+ """
+ return self._value
+
+ @property
+ def headers(self):
+ return self._headers
+
+ @property
+ def checksum(self):
+ return None
+
+ def __repr__(self):
+ return (
+ "DefaultRecord(offset={!r}, timestamp={!r}, timestamp_type={!r},"
+ " key={!r}, value={!r}, headers={!r})".format(
+ self._offset, self._timestamp, self._timestamp_type,
+ self._key, self._value, self._headers)
+ )
+
+
+class DefaultRecordBatchBuilder(DefaultRecordBase, ABCRecordBatchBuilder):
+
+ # excluding key, value and headers:
+ # 5 bytes length + 10 bytes timestamp + 5 bytes offset + 1 byte attributes
+ MAX_RECORD_OVERHEAD = 21
+
+ def __init__(
+ self, magic, compression_type, is_transactional,
+ producer_id, producer_epoch, base_sequence, batch_size):
+ assert magic >= 2
+ self._magic = magic
+ self._compression_type = compression_type & self.CODEC_MASK
+ self._batch_size = batch_size
+ self._is_transactional = bool(is_transactional)
+ # KIP-98 fields for EOS
+ self._producer_id = producer_id
+ self._producer_epoch = producer_epoch
+ self._base_sequence = base_sequence
+
+ self._first_timestamp = None
+ self._max_timestamp = None
+ self._last_offset = 0
+ self._num_records = 0
+
+ self._buffer = bytearray(self.HEADER_STRUCT.size)
+
+ def _get_attributes(self, include_compression_type=True):
+ attrs = 0
+ if include_compression_type:
+ attrs |= self._compression_type
+ # Timestamp Type is set by Broker
+ if self._is_transactional:
+ attrs |= self.TRANSACTIONAL_MASK
+ # Control batches are only created by Broker
+ return attrs
+
+ def append(self, offset, timestamp, key, value, headers,
+ # Cache for LOAD_FAST opcodes
+ encode_varint=encode_varint, size_of_varint=size_of_varint,
+ get_type=type, type_int=int, time_time=time.time,
+ byte_like=(bytes, bytearray, memoryview),
+ bytearray_type=bytearray, len_func=len, zero_len_varint=1
+ ):
+ """ Write message to messageset buffer with MsgVersion 2
+ """
+ # Check types
+ if get_type(offset) != type_int:
+ raise TypeError(offset)
+ if timestamp is None:
+ timestamp = type_int(time_time() * 1000)
+ elif get_type(timestamp) != type_int:
+ raise TypeError(timestamp)
+ if not (key is None or get_type(key) in byte_like):
+ raise TypeError(
+ "Not supported type for key: {}".format(type(key)))
+ if not (value is None or get_type(value) in byte_like):
+ raise TypeError(
+ "Not supported type for value: {}".format(type(value)))
+
+ # We will always add the first message, so those will be set
+ if self._first_timestamp is None:
+ self._first_timestamp = timestamp
+ self._max_timestamp = timestamp
+ timestamp_delta = 0
+ first_message = 1
+ else:
+ timestamp_delta = timestamp - self._first_timestamp
+ first_message = 0
+
+ # We can't write record right away to out buffer, we need to
+ # precompute the length as first value...
+ message_buffer = bytearray_type(b"\x00") # Attributes
+ write_byte = message_buffer.append
+ write = message_buffer.extend
+
+ encode_varint(timestamp_delta, write_byte)
+ # Base offset is always 0 on Produce
+ encode_varint(offset, write_byte)
+
+ if key is not None:
+ encode_varint(len_func(key), write_byte)
+ write(key)
+ else:
+ write_byte(zero_len_varint)
+
+ if value is not None:
+ encode_varint(len_func(value), write_byte)
+ write(value)
+ else:
+ write_byte(zero_len_varint)
+
+ encode_varint(len_func(headers), write_byte)
+
+ for h_key, h_value in headers:
+ h_key = h_key.encode("utf-8")
+ encode_varint(len_func(h_key), write_byte)
+ write(h_key)
+ if h_value is not None:
+ encode_varint(len_func(h_value), write_byte)
+ write(h_value)
+ else:
+ write_byte(zero_len_varint)
+
+ message_len = len_func(message_buffer)
+ main_buffer = self._buffer
+
+ required_size = message_len + size_of_varint(message_len)
+ # Check if we can write this message
+ if (required_size + len_func(main_buffer) > self._batch_size and
+ not first_message):
+ return None
+
+ # Those should be updated after the length check
+ if self._max_timestamp < timestamp:
+ self._max_timestamp = timestamp
+ self._num_records += 1
+ self._last_offset = offset
+
+ encode_varint(message_len, main_buffer.append)
+ main_buffer.extend(message_buffer)
+
+ return DefaultRecordMetadata(offset, required_size, timestamp)
+
+ def write_header(self, use_compression_type=True):
+ batch_len = len(self._buffer)
+ self.HEADER_STRUCT.pack_into(
+ self._buffer, 0,
+ 0, # BaseOffset, set by broker
+ batch_len - self.AFTER_LEN_OFFSET, # Size from here to end
+ 0, # PartitionLeaderEpoch, set by broker
+ self._magic,
+ 0, # CRC will be set below, as we need a filled buffer for it
+ self._get_attributes(use_compression_type),
+ self._last_offset,
+ self._first_timestamp,
+ self._max_timestamp,
+ self._producer_id,
+ self._producer_epoch,
+ self._base_sequence,
+ self._num_records
+ )
+ crc = calc_crc32c(self._buffer[self.ATTRIBUTES_OFFSET:])
+ struct.pack_into(">I", self._buffer, self.CRC_OFFSET, crc)
+
+ def _maybe_compress(self):
+ if self._compression_type != self.CODEC_NONE:
+ header_size = self.HEADER_STRUCT.size
+ data = bytes(self._buffer[header_size:])
+ if self._compression_type == self.CODEC_GZIP:
+ compressed = gzip_encode(data)
+ elif self._compression_type == self.CODEC_SNAPPY:
+ compressed = snappy_encode(data)
+ elif self._compression_type == self.CODEC_LZ4:
+ compressed = lz4_encode(data)
+ compressed_size = len(compressed)
+ if len(data) <= compressed_size:
+ # We did not get any benefit from compression, lets send
+ # uncompressed
+ return False
+ else:
+ # Trim bytearray to the required size
+ needed_size = header_size + compressed_size
+ del self._buffer[needed_size:]
+ self._buffer[header_size:needed_size] = compressed
+ return True
+ return False
+
+ def build(self):
+ send_compressed = self._maybe_compress()
+ self.write_header(send_compressed)
+ return self._buffer
+
+ def size(self):
+ """ Return current size of data written to buffer
+ """
+ return len(self._buffer)
+
+ def size_in_bytes(self, offset, timestamp, key, value, headers):
+ if self._first_timestamp is not None:
+ timestamp_delta = timestamp - self._first_timestamp
+ else:
+ timestamp_delta = 0
+ size_of_body = (
+ 1 + # Attrs
+ size_of_varint(offset) +
+ size_of_varint(timestamp_delta) +
+ self.size_of(key, value, headers)
+ )
+ return size_of_body + size_of_varint(size_of_body)
+
+ @classmethod
+ def size_of(cls, key, value, headers):
+ size = 0
+ # Key size
+ if key is None:
+ size += 1
+ else:
+ key_len = len(key)
+ size += size_of_varint(key_len) + key_len
+ # Value size
+ if value is None:
+ size += 1
+ else:
+ value_len = len(value)
+ size += size_of_varint(value_len) + value_len
+ # Header size
+ size += size_of_varint(len(headers))
+ for h_key, h_value in headers:
+ h_key_len = len(h_key.encode("utf-8"))
+ size += size_of_varint(h_key_len) + h_key_len
+
+ if h_value is None:
+ size += 1
+ else:
+ h_value_len = len(h_value)
+ size += size_of_varint(h_value_len) + h_value_len
+ return size
+
+ @classmethod
+ def estimate_size_in_bytes(cls, key, value, headers):
+ """ Get the upper bound estimate on the size of record
+ """
+ return (
+ cls.HEADER_STRUCT.size + cls.MAX_RECORD_OVERHEAD +
+ cls.size_of(key, value, headers)
+ )
+
+
+class DefaultRecordMetadata(object):
+
+ __slots__ = ("_size", "_timestamp", "_offset")
+
+ def __init__(self, offset, size, timestamp):
+ self._offset = offset
+ self._size = size
+ self._timestamp = timestamp
+
+ @property
+ def offset(self):
+ return self._offset
+
+ @property
+ def crc(self):
+ return None
+
+ @property
+ def size(self):
+ return self._size
+
+ @property
+ def timestamp(self):
+ return self._timestamp
+
+ def __repr__(self):
+ return (
+ "DefaultRecordMetadata(offset={!r}, size={!r}, timestamp={!r})"
+ .format(self._offset, self._size, self._timestamp)
+ )
diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py
index 055914c..8c0791e 100644
--- a/kafka/record/legacy_records.py
+++ b/kafka/record/legacy_records.py
@@ -329,9 +329,10 @@ class LegacyRecordBatchBuilder(ABCRecordBatchBuilder, LegacyRecordBase):
self._batch_size = batch_size
self._buffer = bytearray()
- def append(self, offset, timestamp, key, value):
+ def append(self, offset, timestamp, key, value, headers=None):
""" Append message to batch.
"""
+ assert not headers, "Headers not supported in v0/v1"
# Check types
if type(offset) != int:
raise TypeError(offset)
diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py
index 4ed992c..56aa51f 100644
--- a/kafka/record/memory_records.py
+++ b/kafka/record/memory_records.py
@@ -24,6 +24,7 @@ import struct
from kafka.errors import CorruptRecordException
from .abc import ABCRecords
from .legacy_records import LegacyRecordBatch, LegacyRecordBatchBuilder
+from .default_records import DefaultRecordBatch, DefaultRecordBatchBuilder
class MemoryRecords(ABCRecords):
@@ -102,18 +103,24 @@ class MemoryRecords(ABCRecords):
magic, = struct.unpack_from(">b", next_slice, _magic_offset)
if magic <= 1:
return LegacyRecordBatch(next_slice, magic)
- else: # pragma: no cover
- raise NotImplementedError("Record V2 still not implemented")
+ else:
+ return DefaultRecordBatch(next_slice)
class MemoryRecordsBuilder(object):
def __init__(self, magic, compression_type, batch_size):
- assert magic in [0, 1], "Not supported magic"
+ assert magic in [0, 1, 2], "Not supported magic"
assert compression_type in [0, 1, 2, 3], "Not valid compression type"
- self._builder = LegacyRecordBatchBuilder(
- magic=magic, compression_type=compression_type,
- batch_size=batch_size)
+ if magic >= 2:
+ self._builder = DefaultRecordBatchBuilder(
+ magic=magic, compression_type=compression_type,
+ is_transactional=False, producer_id=-1, producer_epoch=-1,
+ base_sequence=-1, batch_size=batch_size)
+ else:
+ self._builder = LegacyRecordBatchBuilder(
+ magic=magic, compression_type=compression_type,
+ batch_size=batch_size)
self._batch_size = batch_size
self._buffer = None
@@ -121,7 +128,7 @@ class MemoryRecordsBuilder(object):
self._closed = False
self._bytes_written = 0
- def append(self, timestamp, key, value):
+ def append(self, timestamp, key, value, headers=[]):
""" Append a message to the buffer.
Returns:
@@ -131,7 +138,7 @@ class MemoryRecordsBuilder(object):
return None, 0
offset = self._next_offset
- metadata = self._builder.append(offset, timestamp, key, value)
+ metadata = self._builder.append(offset, timestamp, key, value, headers)
# Return of 0 size means there's no space to add a new message
if metadata is None:
return None
diff --git a/kafka/record/util.py b/kafka/record/util.py
index 098d6f4..88135f1 100644
--- a/kafka/record/util.py
+++ b/kafka/record/util.py
@@ -1,5 +1,124 @@
import binascii
+from ._crc32c import crc as crc32c_py
+
+
+def encode_varint(value, write):
+ """ Encode an integer to a varint presentation. See
+ https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints
+ on how those can be produced.
+
+ Arguments:
+ value (int): Value to encode
+ write (function): Called per byte that needs to be writen
+
+ Returns:
+ int: Number of bytes written
+ """
+ value = (value << 1) ^ (value >> 63)
+
+ if value <= 0x7f: # 1 byte
+ write(value)
+ return 1
+ if value <= 0x3fff: # 2 bytes
+ write(0x80 | (value & 0x7f))
+ write(value >> 7)
+ return 2
+ if value <= 0x1fffff: # 3 bytes
+ write(0x80 | (value & 0x7f))
+ write(0x80 | ((value >> 7) & 0x7f))
+ write(value >> 14)
+ return 3
+ if value <= 0xfffffff: # 4 bytes
+ write(0x80 | (value & 0x7f))
+ write(0x80 | ((value >> 7) & 0x7f))
+ write(0x80 | ((value >> 14) & 0x7f))
+ write(value >> 21)
+ return 4
+ if value <= 0x7ffffffff: # 5 bytes
+ write(0x80 | (value & 0x7f))
+ write(0x80 | ((value >> 7) & 0x7f))
+ write(0x80 | ((value >> 14) & 0x7f))
+ write(0x80 | ((value >> 21) & 0x7f))
+ write(value >> 28)
+ return 5
+ else:
+ # Return to general algorithm
+ bits = value & 0x7f
+ value >>= 7
+ i = 0
+ while value:
+ write(0x80 | bits)
+ bits = value & 0x7f
+ value >>= 7
+ i += 1
+ write(bits)
+ return i
+
+
+def size_of_varint(value):
+ """ Number of bytes needed to encode an integer in variable-length format.
+ """
+ value = (value << 1) ^ (value >> 63)
+ if value <= 0x7f:
+ return 1
+ if value <= 0x3fff:
+ return 2
+ if value <= 0x1fffff:
+ return 3
+ if value <= 0xfffffff:
+ return 4
+ if value <= 0x7ffffffff:
+ return 5
+ if value <= 0x3ffffffffff:
+ return 6
+ if value <= 0x1ffffffffffff:
+ return 7
+ if value <= 0xffffffffffffff:
+ return 8
+ if value <= 0x7fffffffffffffff:
+ return 9
+ return 10
+
+
+def decode_varint(buffer, pos=0):
+ """ Decode an integer from a varint presentation. See
+ https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints
+ on how those can be produced.
+
+ Arguments:
+ buffer (bytearry): buffer to read from.
+ pos (int): optional position to read from
+
+ Returns:
+ (int, int): Decoded int value and next read position
+ """
+ result = buffer[pos]
+ if not (result & 0x81):
+ return (result >> 1), pos + 1
+ if not (result & 0x80):
+ return (result >> 1) ^ (~0), pos + 1
+
+ result &= 0x7f
+ pos += 1
+ shift = 7
+ while 1:
+ b = buffer[pos]
+ result |= ((b & 0x7f) << shift)
+ pos += 1
+ if not (b & 0x80):
+ return ((result >> 1) ^ -(result & 1), pos)
+ shift += 7
+ if shift >= 64:
+ raise ValueError("Out of int64 range")
+
+
+def calc_crc32c(memview):
+ """ Calculate CRC-32C (Castagnoli) checksum over a memoryview of data
+ """
+ crc = crc32c_py(memview)
+ return crc
+
def calc_crc32(memview):
""" Calculate simple CRC-32 checksum over a memoryview of data
diff --git a/test/record/test_default_records.py b/test/record/test_default_records.py
new file mode 100644
index 0000000..193703e
--- /dev/null
+++ b/test/record/test_default_records.py
@@ -0,0 +1,169 @@
+# -*- coding: utf-8 -*-
+from __future__ import unicode_literals
+import pytest
+from kafka.record.default_records import (
+ DefaultRecordBatch, DefaultRecordBatchBuilder
+)
+
+
+@pytest.mark.parametrize("compression_type", [
+ DefaultRecordBatch.CODEC_NONE,
+ DefaultRecordBatch.CODEC_GZIP,
+ DefaultRecordBatch.CODEC_SNAPPY,
+ DefaultRecordBatch.CODEC_LZ4
+])
+def test_read_write_serde_v2(compression_type):
+ builder = DefaultRecordBatchBuilder(
+ magic=2, compression_type=compression_type, is_transactional=1,
+ producer_id=123456, producer_epoch=123, base_sequence=9999,
+ batch_size=999999)
+ headers = [] # [("header1", b"aaa"), ("header2", b"bbb")]
+ for offset in range(10):
+ builder.append(
+ offset, timestamp=9999999, key=b"test", value=b"Super",
+ headers=headers)
+ buffer = builder.build()
+ reader = DefaultRecordBatch(bytes(buffer))
+ msgs = list(reader)
+
+ assert reader.is_transactional is True
+ assert reader.compression_type == compression_type
+ assert reader.magic == 2
+ assert reader.timestamp_type == 0
+ assert reader.base_offset == 0
+ for offset, msg in enumerate(msgs):
+ assert msg.offset == offset
+ assert msg.timestamp == 9999999
+ assert msg.key == b"test"
+ assert msg.value == b"Super"
+ assert msg.headers == headers
+
+
+def test_written_bytes_equals_size_in_bytes_v2():
+ key = b"test"
+ value = b"Super"
+ headers = [("header1", b"aaa"), ("header2", b"bbb"), ("xx", None)]
+ builder = DefaultRecordBatchBuilder(
+ magic=2, compression_type=0, is_transactional=0,
+ producer_id=-1, producer_epoch=-1, base_sequence=-1,
+ batch_size=999999)
+
+ size_in_bytes = builder.size_in_bytes(
+ 0, timestamp=9999999, key=key, value=value, headers=headers)
+
+ pos = builder.size()
+ meta = builder.append(
+ 0, timestamp=9999999, key=key, value=value, headers=headers)
+
+ assert builder.size() - pos == size_in_bytes
+ assert meta.size == size_in_bytes
+
+
+def test_estimate_size_in_bytes_bigger_than_batch_v2():
+ key = b"Super Key"
+ value = b"1" * 100
+ headers = [("header1", b"aaa"), ("header2", b"bbb")]
+ estimate_size = DefaultRecordBatchBuilder.estimate_size_in_bytes(
+ key, value, headers)
+
+ builder = DefaultRecordBatchBuilder(
+ magic=2, compression_type=0, is_transactional=0,
+ producer_id=-1, producer_epoch=-1, base_sequence=-1,
+ batch_size=999999)
+ builder.append(
+ 0, timestamp=9999999, key=key, value=value, headers=headers)
+ buf = builder.build()
+ assert len(buf) <= estimate_size, \
+ "Estimate should always be upper bound"
+
+
+def test_default_batch_builder_validates_arguments():
+ builder = DefaultRecordBatchBuilder(
+ magic=2, compression_type=0, is_transactional=0,
+ producer_id=-1, producer_epoch=-1, base_sequence=-1,
+ batch_size=999999)
+
+ # Key should not be str
+ with pytest.raises(TypeError):
+ builder.append(
+ 0, timestamp=9999999, key="some string", value=None, headers=[])
+
+ # Value should not be str
+ with pytest.raises(TypeError):
+ builder.append(
+ 0, timestamp=9999999, key=None, value="some string", headers=[])
+
+ # Timestamp should be of proper type
+ with pytest.raises(TypeError):
+ builder.append(
+ 0, timestamp="1243812793", key=None, value=b"some string",
+ headers=[])
+
+ # Offset of invalid type
+ with pytest.raises(TypeError):
+ builder.append(
+ "0", timestamp=9999999, key=None, value=b"some string", headers=[])
+
+ # Ok to pass value as None
+ builder.append(
+ 0, timestamp=9999999, key=b"123", value=None, headers=[])
+
+ # Timestamp can be None
+ builder.append(
+ 1, timestamp=None, key=None, value=b"some string", headers=[])
+
+ # Ok to pass offsets in not incremental order. This should not happen thou
+ builder.append(
+ 5, timestamp=9999999, key=b"123", value=None, headers=[])
+
+ # in case error handling code fails to fix inner buffer in builder
+ assert len(builder.build()) == 104
+
+
+def test_default_correct_metadata_response():
+ builder = DefaultRecordBatchBuilder(
+ magic=2, compression_type=0, is_transactional=0,
+ producer_id=-1, producer_epoch=-1, base_sequence=-1,
+ batch_size=1024 * 1024)
+ meta = builder.append(
+ 0, timestamp=9999999, key=b"test", value=b"Super", headers=[])
+
+ assert meta.offset == 0
+ assert meta.timestamp == 9999999
+ assert meta.crc is None
+ assert meta.size == 16
+ assert repr(meta) == (
+ "DefaultRecordMetadata(offset=0, size={}, timestamp={})"
+ .format(meta.size, meta.timestamp)
+ )
+
+
+def test_default_batch_size_limit():
+ # First message can be added even if it's too big
+ builder = DefaultRecordBatchBuilder(
+ magic=2, compression_type=0, is_transactional=0,
+ producer_id=-1, producer_epoch=-1, base_sequence=-1,
+ batch_size=1024)
+
+ meta = builder.append(
+ 0, timestamp=None, key=None, value=b"M" * 2000, headers=[])
+ assert meta.size > 0
+ assert meta.crc is None
+ assert meta.offset == 0
+ assert meta.timestamp is not None
+ assert len(builder.build()) > 2000
+
+ builder = DefaultRecordBatchBuilder(
+ magic=2, compression_type=0, is_transactional=0,
+ producer_id=-1, producer_epoch=-1, base_sequence=-1,
+ batch_size=1024)
+ meta = builder.append(
+ 0, timestamp=None, key=None, value=b"M" * 700, headers=[])
+ assert meta is not None
+ meta = builder.append(
+ 1, timestamp=None, key=None, value=b"M" * 700, headers=[])
+ assert meta is None
+ meta = builder.append(
+ 2, timestamp=None, key=None, value=b"M" * 700, headers=[])
+ assert meta is None
+ assert len(builder.build()) < 1000
diff --git a/test/record/test_records.py b/test/record/test_records.py
index fc3eaca..7306bbc 100644
--- a/test/record/test_records.py
+++ b/test/record/test_records.py
@@ -2,6 +2,26 @@ import pytest
from kafka.record import MemoryRecords
from kafka.errors import CorruptRecordException
+# This is real live data from Kafka 11 broker
+record_batch_data_v2 = [
+ # First Batch value == "123"
+ b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00;\x00\x00\x00\x01\x02\x03'
+ b'\x18\xa2p\x00\x00\x00\x00\x00\x00\x00\x00\x01]\xff{\x06<\x00\x00\x01]'
+ b'\xff{\x06<\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00'
+ b'\x00\x00\x01\x12\x00\x00\x00\x01\x06123\x00',
+ # Second Batch value = "" and value = "". 2 records
+ b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00@\x00\x00\x00\x02\x02\xc8'
+ b'\\\xbd#\x00\x00\x00\x00\x00\x01\x00\x00\x01]\xff|\xddl\x00\x00\x01]\xff'
+ b'|\xde\x14\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00'
+ b'\x00\x00\x02\x0c\x00\x00\x00\x01\x00\x00\x0e\x00\xd0\x02\x02\x01\x00'
+ b'\x00',
+ # Third batch value = "123"
+ b'\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00;\x00\x00\x00\x02\x02.\x0b'
+ b'\x85\xb7\x00\x00\x00\x00\x00\x00\x00\x00\x01]\xff|\xe7\x9d\x00\x00\x01]'
+ b'\xff|\xe7\x9d\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'
+ b'\x00\x00\x00\x01\x12\x00\x00\x00\x01\x06123\x00'
+]
+
record_batch_data_v1 = [
# First Message value == "123"
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19G\x86(\xc2\x01\x00\x00'
@@ -34,6 +54,32 @@ record_batch_data_v0 = [
]
+def test_memory_records_v2():
+ data_bytes = b"".join(record_batch_data_v2) + b"\x00" * 4
+ records = MemoryRecords(data_bytes)
+
+ assert records.size_in_bytes() == 222
+ assert records.valid_bytes() == 218
+
+ assert records.has_next() is True
+ batch = records.next_batch()
+ recs = list(batch)
+ assert len(recs) == 1
+ assert recs[0].value == b"123"
+ assert recs[0].key is None
+ assert recs[0].timestamp == 1503229838908
+ assert recs[0].timestamp_type == 0
+ assert recs[0].checksum is None
+ assert recs[0].headers == []
+
+ assert records.next_batch() is not None
+ assert records.next_batch() is not None
+
+ assert records.has_next() is False
+ assert records.next_batch() is None
+ assert records.next_batch() is None
+
+
def test_memory_records_v1():
data_bytes = b"".join(record_batch_data_v1) + b"\x00" * 4
records = MemoryRecords(data_bytes)
diff --git a/test/record/test_util.py b/test/record/test_util.py
new file mode 100644
index 0000000..bfe0fcc
--- /dev/null
+++ b/test/record/test_util.py
@@ -0,0 +1,95 @@
+import struct
+import pytest
+from kafka.record import util
+
+
+varint_data = [
+ (b"\x00", 0),
+ (b"\x01", -1),
+ (b"\x02", 1),
+ (b"\x7E", 63),
+ (b"\x7F", -64),
+ (b"\x80\x01", 64),
+ (b"\x81\x01", -65),
+ (b"\xFE\x7F", 8191),
+ (b"\xFF\x7F", -8192),
+ (b"\x80\x80\x01", 8192),
+ (b"\x81\x80\x01", -8193),
+ (b"\xFE\xFF\x7F", 1048575),
+ (b"\xFF\xFF\x7F", -1048576),
+ (b"\x80\x80\x80\x01", 1048576),
+ (b"\x81\x80\x80\x01", -1048577),
+ (b"\xFE\xFF\xFF\x7F", 134217727),
+ (b"\xFF\xFF\xFF\x7F", -134217728),
+ (b"\x80\x80\x80\x80\x01", 134217728),
+ (b"\x81\x80\x80\x80\x01", -134217729),
+ (b"\xFE\xFF\xFF\xFF\x7F", 17179869183),
+ (b"\xFF\xFF\xFF\xFF\x7F", -17179869184),
+ (b"\x80\x80\x80\x80\x80\x01", 17179869184),
+ (b"\x81\x80\x80\x80\x80\x01", -17179869185),
+ (b"\xFE\xFF\xFF\xFF\xFF\x7F", 2199023255551),
+ (b"\xFF\xFF\xFF\xFF\xFF\x7F", -2199023255552),
+ (b"\x80\x80\x80\x80\x80\x80\x01", 2199023255552),
+ (b"\x81\x80\x80\x80\x80\x80\x01", -2199023255553),
+ (b"\xFE\xFF\xFF\xFF\xFF\xFF\x7F", 281474976710655),
+ (b"\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -281474976710656),
+ (b"\x80\x80\x80\x80\x80\x80\x80\x01", 281474976710656),
+ (b"\x81\x80\x80\x80\x80\x80\x80\x01", -281474976710657),
+ (b"\xFE\xFF\xFF\xFF\xFF\xFF\xFF\x7F", 36028797018963967),
+ (b"\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -36028797018963968),
+ (b"\x80\x80\x80\x80\x80\x80\x80\x80\x01", 36028797018963968),
+ (b"\x81\x80\x80\x80\x80\x80\x80\x80\x01", -36028797018963969),
+ (b"\xFE\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", 4611686018427387903),
+ (b"\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -4611686018427387904),
+ (b"\x80\x80\x80\x80\x80\x80\x80\x80\x80\x01", 4611686018427387904),
+ (b"\x81\x80\x80\x80\x80\x80\x80\x80\x80\x01", -4611686018427387905),
+]
+
+
+@pytest.mark.parametrize("encoded, decoded", varint_data)
+def test_encode_varint(encoded, decoded):
+ res = bytearray()
+ util.encode_varint(decoded, res.append)
+ assert res == encoded
+
+
+@pytest.mark.parametrize("encoded, decoded", varint_data)
+def test_decode_varint(encoded, decoded):
+ # We add a bit of bytes around just to check position is calculated
+ # correctly
+ value, pos = util.decode_varint(
+ bytearray(b"\x01\xf0" + encoded + b"\xff\x01"), 2)
+ assert value == decoded
+ assert pos - 2 == len(encoded)
+
+
+@pytest.mark.parametrize("encoded, decoded", varint_data)
+def test_size_of_varint(encoded, decoded):
+ assert util.size_of_varint(decoded) == len(encoded)
+
+
+def test_crc32c():
+ def make_crc(data):
+ crc = util.calc_crc32c(data)
+ return struct.pack(">I", crc)
+ assert make_crc(b"") == b"\x00\x00\x00\x00"
+ assert make_crc(b"a") == b"\xc1\xd0\x43\x30"
+
+ # Took from librdkafka testcase
+ long_text = b"""\
+ This software is provided 'as-is', without any express or implied
+ warranty. In no event will the author be held liable for any damages
+ arising from the use of this software.
+
+ Permission is granted to anyone to use this software for any purpose,
+ including commercial applications, and to alter it and redistribute it
+ freely, subject to the following restrictions:
+
+ 1. The origin of this software must not be misrepresented; you must not
+ claim that you wrote the original software. If you use this software
+ in a product, an acknowledgment in the product documentation would be
+ appreciated but is not required.
+ 2. Altered source versions must be plainly marked as such, and must not be
+ misrepresented as being the original software.
+ 3. This notice may not be removed or altered from any source distribution."""
+ assert make_crc(long_text) == b"\x7d\xcd\xe1\x13"
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index 364a808..ef3f686 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -54,7 +54,7 @@ def _build_record_batch(msgs, compression=0):
magic=1, compression_type=0, batch_size=9999999)
for msg in msgs:
key, value, timestamp = msg
- builder.append(key=key, value=value, timestamp=timestamp)
+ builder.append(key=key, value=value, timestamp=timestamp, headers=[])
builder.close()
return builder.buffer()
diff --git a/test/test_producer.py b/test/test_producer.py
index 41bd52e..20dffc2 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -88,10 +88,7 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
retries=5,
max_block_ms=10000,
compression_type=compression)
- if producer.config['api_version'] >= (0, 10):
- magic = 1
- else:
- magic = 0
+ magic = producer._max_usable_produce_magic()
topic = random_string(5)
future = producer.send(
@@ -109,7 +106,9 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
else:
assert record.timestamp == -1 # NO_TIMESTAMP
- if magic == 1:
+ if magic >= 2:
+ assert record.checksum is None
+ elif magic == 1:
assert record.checksum == 1370034956
else:
assert record.checksum == 3296137851