summaryrefslogtreecommitdiff
path: root/kafka/producer/kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/kafka.py')
-rw-r--r--kafka/producer/kafka.py45
1 files changed, 34 insertions, 11 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index de9dcd2..5638b61 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -12,9 +12,10 @@ from ..vendor import six
from .. import errors as Errors
from ..client_async import KafkaClient, selectors
+from ..codec import has_gzip, has_snappy, has_lz4
from ..metrics import MetricConfig, Metrics
from ..partitioner.default import DefaultPartitioner
-from ..protocol.message import Message, MessageSet
+from ..record.legacy_records import LegacyRecordBatchBuilder
from ..serializer import Serializer
from ..structs import TopicPartition
from .future import FutureRecordMetadata, FutureProduceResult
@@ -310,6 +311,13 @@ class KafkaProducer(object):
'sasl_plain_password': None,
}
+ _COMPRESSORS = {
+ 'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP),
+ 'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY),
+ 'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4),
+ None: (lambda: True, LegacyRecordBatchBuilder.CODEC_NONE),
+ }
+
def __init__(self, **configs):
log.debug("Starting the Kafka producer") # trace
self.config = copy.copy(self.DEFAULT_CONFIG)
@@ -355,7 +363,16 @@ class KafkaProducer(object):
if self.config['compression_type'] == 'lz4':
assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
- message_version = 1 if self.config['api_version'] >= (0, 10) else 0
+ # Check compression_type for library support
+ ct = self.config['compression_type']
+ if ct not in self._COMPRESSORS:
+ raise ValueError("Not supported codec: {}".format(ct))
+ else:
+ checker, compression_attrs = self._COMPRESSORS[ct]
+ assert checker(), "Libraries for {} compression codec not found".format(ct)
+ self.config['compression_attrs'] = compression_attrs
+
+ message_version = self._max_usable_produce_magic()
self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config)
self._metadata = client.cluster
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
@@ -465,6 +482,17 @@ class KafkaProducer(object):
max_wait = self.config['max_block_ms'] / 1000.0
return self._wait_on_metadata(topic, max_wait)
+ def _max_usable_produce_magic(self):
+ if self.config['api_version'] >= (0, 10):
+ return 1
+ else:
+ return 0
+
+ def _estimate_size_in_bytes(self, key, value):
+ magic = self._max_usable_produce_magic()
+ return LegacyRecordBatchBuilder.estimate_size_in_bytes(
+ magic, self.config['compression_type'], key, value)
+
def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
"""Publish a message to a topic.
@@ -514,24 +542,19 @@ class KafkaProducer(object):
partition = self._partition(topic, partition, key, value,
key_bytes, value_bytes)
- message_size = MessageSet.HEADER_SIZE + Message.HEADER_SIZE
- if key_bytes is not None:
- message_size += len(key_bytes)
- if value_bytes is not None:
- message_size += len(value_bytes)
+ message_size = self._estimate_size_in_bytes(key, value)
self._ensure_valid_record_size(message_size)
tp = TopicPartition(topic, partition)
- if timestamp_ms is None:
- timestamp_ms = int(time.time() * 1000)
log.debug("Sending (key=%r value=%r) to %s", key, value, tp)
result = self._accumulator.append(tp, timestamp_ms,
key_bytes, value_bytes,
- self.config['max_block_ms'])
+ self.config['max_block_ms'],
+ estimated_size=message_size)
future, batch_is_full, new_batch_created = result
if batch_is_full or new_batch_created:
log.debug("Waking up the sender since %s is either full or"
- " getting a new batch", tp)
+ " getting a new batch", tp)
self._sender.wakeup()
return future