summaryrefslogtreecommitdiff
path: root/kafka/producer/kafka.py
diff options
context:
space:
mode:
authorTaras Voinarovskyi <voyn1991@gmail.com>2017-10-25 07:28:35 +0900
committerGitHub <noreply@github.com>2017-10-25 07:28:35 +0900
commit8b05ee8da50b4c7b832676f4e38f9d92a86639cc (patch)
tree91fe16e3c9aff44ca93633824b96da4b8ff19384 /kafka/producer/kafka.py
parent4213d53d4ccfd239addc1db07b5b3913b4c6547c (diff)
downloadkafka-python-8b05ee8da50b4c7b832676f4e38f9d92a86639cc.tar.gz
Add DefaultRecordBatch implementation aka V2 message format parser/builder. (#1185)
Added bytecode optimization for varint and append/read_msg functions. Mostly based on avoiding LOAD_GLOBAL calls.
Diffstat (limited to 'kafka/producer/kafka.py')
-rw-r--r--kafka/producer/kafka.py15
1 files changed, 11 insertions, 4 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 0ffc29c..646e773 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -15,6 +15,7 @@ from ..client_async import KafkaClient, selectors
from ..codec import has_gzip, has_snappy, has_lz4
from ..metrics import MetricConfig, Metrics
from ..partitioner.default import DefaultPartitioner
+from ..record.default_records import DefaultRecordBatchBuilder
from ..record.legacy_records import LegacyRecordBatchBuilder
from ..serializer import Serializer
from ..structs import TopicPartition
@@ -486,15 +487,21 @@ class KafkaProducer(object):
return self._wait_on_metadata(topic, max_wait)
def _max_usable_produce_magic(self):
- if self.config['api_version'] >= (0, 10):
+ if self.config['api_version'] >= (0, 11):
+ return 2
+ elif self.config['api_version'] >= (0, 10):
return 1
else:
return 0
- def _estimate_size_in_bytes(self, key, value):
+ def _estimate_size_in_bytes(self, key, value, headers=[]):
magic = self._max_usable_produce_magic()
- return LegacyRecordBatchBuilder.estimate_size_in_bytes(
- magic, self.config['compression_type'], key, value)
+ if magic == 2:
+ return DefaultRecordBatchBuilder.estimate_size_in_bytes(
+ key, value, headers)
+ else:
+ return LegacyRecordBatchBuilder.estimate_size_in_bytes(
+ magic, self.config['compression_type'], key, value)
def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
"""Publish a message to a topic.