summaryrefslogtreecommitdiff
path: root/kafka/producer/kafka.py
diff options
context:
space:
mode:
authorTaras Voinarovskiy <voyn1991@gmail.com>2017-08-25 09:38:27 +0000
committerTaras Voinarovskiy <voyn1991@gmail.com>2017-10-24 15:12:11 +0000
commit3af66bc542efff3f7010bec18b72d844e09488c4 (patch)
treea20623631b36230c9425b08ee95b85afbc9a9455 /kafka/producer/kafka.py
parente06af5343a55cf8d03e32a645ee970d872cb9ba0 (diff)
downloadkafka-python-v2_records.tar.gz
Add DefaultRecordBatch implementation aka V2 message format parser/builder.v2_records
Added bytecode optimization for varint and append/read_msg functions. Mostly based on avoiding LOAD_GLOBAL calls.
Diffstat (limited to 'kafka/producer/kafka.py')
-rw-r--r--kafka/producer/kafka.py15
1 files changed, 11 insertions, 4 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 0ffc29c..646e773 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -15,6 +15,7 @@ from ..client_async import KafkaClient, selectors
from ..codec import has_gzip, has_snappy, has_lz4
from ..metrics import MetricConfig, Metrics
from ..partitioner.default import DefaultPartitioner
+from ..record.default_records import DefaultRecordBatchBuilder
from ..record.legacy_records import LegacyRecordBatchBuilder
from ..serializer import Serializer
from ..structs import TopicPartition
@@ -486,15 +487,21 @@ class KafkaProducer(object):
return self._wait_on_metadata(topic, max_wait)
def _max_usable_produce_magic(self):
- if self.config['api_version'] >= (0, 10):
+ if self.config['api_version'] >= (0, 11):
+ return 2
+ elif self.config['api_version'] >= (0, 10):
return 1
else:
return 0
- def _estimate_size_in_bytes(self, key, value):
+ def _estimate_size_in_bytes(self, key, value, headers=[]):
magic = self._max_usable_produce_magic()
- return LegacyRecordBatchBuilder.estimate_size_in_bytes(
- magic, self.config['compression_type'], key, value)
+ if magic == 2:
+ return DefaultRecordBatchBuilder.estimate_size_in_bytes(
+ key, value, headers)
+ else:
+ return LegacyRecordBatchBuilder.estimate_size_in_bytes(
+ magic, self.config['compression_type'], key, value)
def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
"""Publish a message to a topic.