summaryrefslogtreecommitdiff
path: root/kafka/producer/kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/kafka.py')
-rw-r--r--kafka/producer/kafka.py15
1 files changed, 11 insertions, 4 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 0ffc29c..646e773 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -15,6 +15,7 @@ from ..client_async import KafkaClient, selectors
from ..codec import has_gzip, has_snappy, has_lz4
from ..metrics import MetricConfig, Metrics
from ..partitioner.default import DefaultPartitioner
+from ..record.default_records import DefaultRecordBatchBuilder
from ..record.legacy_records import LegacyRecordBatchBuilder
from ..serializer import Serializer
from ..structs import TopicPartition
@@ -486,15 +487,21 @@ class KafkaProducer(object):
return self._wait_on_metadata(topic, max_wait)
def _max_usable_produce_magic(self):
- if self.config['api_version'] >= (0, 10):
+ if self.config['api_version'] >= (0, 11):
+ return 2
+ elif self.config['api_version'] >= (0, 10):
return 1
else:
return 0
- def _estimate_size_in_bytes(self, key, value):
+ def _estimate_size_in_bytes(self, key, value, headers=[]):
magic = self._max_usable_produce_magic()
- return LegacyRecordBatchBuilder.estimate_size_in_bytes(
- magic, self.config['compression_type'], key, value)
+ if magic == 2:
+ return DefaultRecordBatchBuilder.estimate_size_in_bytes(
+ key, value, headers)
+ else:
+ return LegacyRecordBatchBuilder.estimate_size_in_bytes(
+ magic, self.config['compression_type'], key, value)
def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
"""Publish a message to a topic.