diff options
Diffstat (limited to 'kafka/producer/kafka.py')
-rw-r--r-- | kafka/producer/kafka.py | 45 |
1 files changed, 34 insertions, 11 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index de9dcd2..5638b61 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -12,9 +12,10 @@ from ..vendor import six from .. import errors as Errors from ..client_async import KafkaClient, selectors +from ..codec import has_gzip, has_snappy, has_lz4 from ..metrics import MetricConfig, Metrics from ..partitioner.default import DefaultPartitioner -from ..protocol.message import Message, MessageSet +from ..record.legacy_records import LegacyRecordBatchBuilder from ..serializer import Serializer from ..structs import TopicPartition from .future import FutureRecordMetadata, FutureProduceResult @@ -310,6 +311,13 @@ class KafkaProducer(object): 'sasl_plain_password': None, } + _COMPRESSORS = { + 'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP), + 'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY), + 'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4), + None: (lambda: True, LegacyRecordBatchBuilder.CODEC_NONE), + } + def __init__(self, **configs): log.debug("Starting the Kafka producer") # trace self.config = copy.copy(self.DEFAULT_CONFIG) @@ -355,7 +363,16 @@ class KafkaProducer(object): if self.config['compression_type'] == 'lz4': assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers' - message_version = 1 if self.config['api_version'] >= (0, 10) else 0 + # Check compression_type for library support + ct = self.config['compression_type'] + if ct not in self._COMPRESSORS: + raise ValueError("Not supported codec: {}".format(ct)) + else: + checker, compression_attrs = self._COMPRESSORS[ct] + assert checker(), "Libraries for {} compression codec not found".format(ct) + self.config['compression_attrs'] = compression_attrs + + message_version = self._max_usable_produce_magic() self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config) self._metadata = client.cluster guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1) @@ -465,6 +482,17 @@ class KafkaProducer(object): max_wait = self.config['max_block_ms'] / 1000.0 return self._wait_on_metadata(topic, max_wait) + def _max_usable_produce_magic(self): + if self.config['api_version'] >= (0, 10): + return 1 + else: + return 0 + + def _estimate_size_in_bytes(self, key, value): + magic = self._max_usable_produce_magic() + return LegacyRecordBatchBuilder.estimate_size_in_bytes( + magic, self.config['compression_type'], key, value) + def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None): """Publish a message to a topic. @@ -514,24 +542,19 @@ class KafkaProducer(object): partition = self._partition(topic, partition, key, value, key_bytes, value_bytes) - message_size = MessageSet.HEADER_SIZE + Message.HEADER_SIZE - if key_bytes is not None: - message_size += len(key_bytes) - if value_bytes is not None: - message_size += len(value_bytes) + message_size = self._estimate_size_in_bytes(key, value) self._ensure_valid_record_size(message_size) tp = TopicPartition(topic, partition) - if timestamp_ms is None: - timestamp_ms = int(time.time() * 1000) log.debug("Sending (key=%r value=%r) to %s", key, value, tp) result = self._accumulator.append(tp, timestamp_ms, key_bytes, value_bytes, - self.config['max_block_ms']) + self.config['max_block_ms'], + estimated_size=message_size) future, batch_is_full, new_batch_created = result if batch_is_full or new_batch_created: log.debug("Waking up the sender since %s is either full or" - " getting a new batch", tp) + " getting a new batch", tp) self._sender.wakeup() return future |