diff options
author | Taras Voinarovskyi <voyn1991@gmail.com> | 2017-10-25 07:28:35 +0900 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-25 07:28:35 +0900 |
commit | 8b05ee8da50b4c7b832676f4e38f9d92a86639cc (patch) | |
tree | 91fe16e3c9aff44ca93633824b96da4b8ff19384 /kafka/producer | |
parent | 4213d53d4ccfd239addc1db07b5b3913b4c6547c (diff) | |
download | kafka-python-8b05ee8da50b4c7b832676f4e38f9d92a86639cc.tar.gz |
Add DefaultRecordBatch implementation aka V2 message format parser/builder. (#1185)
Added bytecode optimization for varint and append/read_msg functions. Mostly based on avoiding LOAD_GLOBAL calls.
Diffstat (limited to 'kafka/producer')
-rw-r--r-- | kafka/producer/kafka.py | 15 | ||||
-rw-r--r-- | kafka/producer/sender.py | 9 |
2 files changed, 18 insertions, 6 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 0ffc29c..646e773 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -15,6 +15,7 @@ from ..client_async import KafkaClient, selectors from ..codec import has_gzip, has_snappy, has_lz4 from ..metrics import MetricConfig, Metrics from ..partitioner.default import DefaultPartitioner +from ..record.default_records import DefaultRecordBatchBuilder from ..record.legacy_records import LegacyRecordBatchBuilder from ..serializer import Serializer from ..structs import TopicPartition @@ -486,15 +487,21 @@ class KafkaProducer(object): return self._wait_on_metadata(topic, max_wait) def _max_usable_produce_magic(self): - if self.config['api_version'] >= (0, 10): + if self.config['api_version'] >= (0, 11): + return 2 + elif self.config['api_version'] >= (0, 10): return 1 else: return 0 - def _estimate_size_in_bytes(self, key, value): + def _estimate_size_in_bytes(self, key, value, headers=[]): magic = self._max_usable_produce_magic() - return LegacyRecordBatchBuilder.estimate_size_in_bytes( - magic, self.config['compression_type'], key, value) + if magic == 2: + return DefaultRecordBatchBuilder.estimate_size_in_bytes( + key, value, headers) + else: + return LegacyRecordBatchBuilder.estimate_size_in_bytes( + magic, self.config['compression_type'], key, value) def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None): """Publish a message to a topic. diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 72a15bb..ffc67f8 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -291,7 +291,11 @@ class Sender(threading.Thread): buf = batch.records.buffer() produce_records_by_partition[topic][partition] = buf - if self.config['api_version'] >= (0, 10): + kwargs = {} + if self.config['api_version'] >= (0, 11): + version = 3 + kwargs = dict(transactional_id=None) + elif self.config['api_version'] >= (0, 10): version = 2 elif self.config['api_version'] == (0, 9): version = 1 @@ -302,7 +306,8 @@ class Sender(threading.Thread): timeout=timeout, topics=[(topic, list(partition_info.items())) for topic, partition_info - in six.iteritems(produce_records_by_partition)] + in six.iteritems(produce_records_by_partition)], + **kwargs ) def wakeup(self): |