summaryrefslogtreecommitdiff
path: root/kafka/producer
diff options
context:
space:
mode:
authorTaras <voyn1991@gmail.com>2017-10-10 00:13:16 +0300
committerTaras <voyn1991@gmail.com>2017-10-11 18:09:17 +0300
commitfbea5f04bccd28f3aa15a1711548b131504591ac (patch)
tree1c8a0efe687c2ace72fa146b4f03e15def8e3a95 /kafka/producer
parentf04435c5ed97fef0975a77a8dc7bae7c284bba63 (diff)
downloadkafka-python-fbea5f04bccd28f3aa15a1711548b131504591ac.tar.gz
Refactor MessageSet and Message into LegacyRecordBatch to later support v2 message format
Diffstat (limited to 'kafka/producer')
-rw-r--r--kafka/producer/buffer.py126
-rw-r--r--kafka/producer/kafka.py43
-rw-r--r--kafka/producer/record_accumulator.py100
-rw-r--r--kafka/producer/sender.py1
4 files changed, 90 insertions, 180 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index d1eeaf1..19ea732 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -5,133 +5,9 @@ import io
import threading
import time
-from ..codec import (has_gzip, has_snappy, has_lz4,
- gzip_encode, snappy_encode,
- lz4_encode, lz4_encode_old_kafka)
-from .. import errors as Errors
from ..metrics.stats import Rate
-from ..protocol.types import Int32, Int64
-from ..protocol.message import MessageSet, Message
-
-
-class MessageSetBuffer(object):
- """Wrap a buffer for writing MessageSet batches.
-
- Arguments:
- buf (IO stream): a buffer for writing data. Typically BytesIO.
- batch_size (int): maximum number of bytes to write to the buffer.
-
- Keyword Arguments:
- compression_type ('gzip', 'snappy', None): compress messages before
- publishing. Default: None.
- """
- _COMPRESSORS = {
- 'gzip': (has_gzip, gzip_encode, Message.CODEC_GZIP),
- 'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY),
- 'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4),
- 'lz4-old-kafka': (has_lz4, lz4_encode_old_kafka, Message.CODEC_LZ4),
- }
- def __init__(self, buf, batch_size, compression_type=None, message_version=0):
- if compression_type is not None:
- assert compression_type in self._COMPRESSORS, 'Unrecognized compression type'
-
- # Kafka 0.8/0.9 had a quirky lz4...
- if compression_type == 'lz4' and message_version == 0:
- compression_type = 'lz4-old-kafka'
-
- checker, encoder, attributes = self._COMPRESSORS[compression_type]
- assert checker(), 'Compression Libraries Not Found'
- self._compressor = encoder
- self._compression_attributes = attributes
- else:
- self._compressor = None
- self._compression_attributes = None
-
- self._message_version = message_version
- self._buffer = buf
- # Init MessageSetSize to 0 -- update on close
- self._buffer.seek(0)
- self._buffer.write(Int32.encode(0))
- self._batch_size = batch_size
- self._closed = False
- self._messages = 0
- self._bytes_written = 4 # Int32 header is 4 bytes
- self._final_size = None
-
- def append(self, offset, message):
- """Append a Message to the MessageSet.
-
- Arguments:
- offset (int): offset of the message
- message (Message or bytes): message struct or encoded bytes
-
- Returns: bytes written
- """
- if isinstance(message, Message):
- encoded = message.encode()
- else:
- encoded = bytes(message)
- msg = Int64.encode(offset) + Int32.encode(len(encoded)) + encoded
- self._buffer.write(msg)
- self._messages += 1
- self._bytes_written += len(msg)
- return len(msg)
-
- def has_room_for(self, key, value):
- if self._closed:
- return False
- if not self._messages:
- return True
- needed_bytes = MessageSet.HEADER_SIZE + Message.HEADER_SIZE
- if key is not None:
- needed_bytes += len(key)
- if value is not None:
- needed_bytes += len(value)
- return self._buffer.tell() + needed_bytes < self._batch_size
-
- def is_full(self):
- if self._closed:
- return True
- return self._buffer.tell() >= self._batch_size
-
- def close(self):
- # This method may be called multiple times on the same batch
- # i.e., on retries
- # we need to make sure we only close it out once
- # otherwise compressed messages may be double-compressed
- # see Issue 718
- if not self._closed:
- if self._compressor:
- # TODO: avoid copies with bytearray / memoryview
- uncompressed_size = self._buffer.tell()
- self._buffer.seek(4)
- msg = Message(self._compressor(self._buffer.read(uncompressed_size - 4)),
- attributes=self._compression_attributes,
- magic=self._message_version)
- encoded = msg.encode()
- self._buffer.seek(4)
- self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg
- self._buffer.write(Int32.encode(len(encoded)))
- self._buffer.write(encoded)
-
- # Update the message set size (less the 4 byte header),
- # and return with buffer ready for full read()
- self._final_size = self._buffer.tell()
- self._buffer.seek(0)
- self._buffer.write(Int32.encode(self._final_size - 4))
-
- self._buffer.seek(0)
- self._closed = True
-
- def size_in_bytes(self):
- return self._final_size or self._buffer.tell()
-
- def compression_rate(self):
- return self.size_in_bytes() / self._bytes_written
-
- def buffer(self):
- return self._buffer
+import kafka.errors as Errors
class SimpleBufferPool(object):
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index de9dcd2..f2a480b 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -12,9 +12,10 @@ from ..vendor import six
from .. import errors as Errors
from ..client_async import KafkaClient, selectors
+from ..codec import has_gzip, has_snappy, has_lz4
from ..metrics import MetricConfig, Metrics
from ..partitioner.default import DefaultPartitioner
-from ..protocol.message import Message, MessageSet
+from ..record.legacy_records import LegacyRecordBatchBuilder
from ..serializer import Serializer
from ..structs import TopicPartition
from .future import FutureRecordMetadata, FutureProduceResult
@@ -310,6 +311,13 @@ class KafkaProducer(object):
'sasl_plain_password': None,
}
+ _COMPRESSORS = {
+ 'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP),
+ 'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY),
+ 'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4),
+ None: (lambda: True, LegacyRecordBatchBuilder.CODEC_NONE),
+ }
+
def __init__(self, **configs):
log.debug("Starting the Kafka producer") # trace
self.config = copy.copy(self.DEFAULT_CONFIG)
@@ -355,7 +363,16 @@ class KafkaProducer(object):
if self.config['compression_type'] == 'lz4':
assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
- message_version = 1 if self.config['api_version'] >= (0, 10) else 0
+ # Check compression_type for library support
+ ct = self.config['compression_type']
+ if ct not in self._COMPRESSORS:
+ raise ValueError("Not supported codec: {}".format(ct))
+ else:
+ checker, compression_attrs = self._COMPRESSORS[ct]
+ assert checker(), "Libraries for {} compression codec not found".format(ct)
+ self.config['compression_type'] = compression_attrs
+
+ message_version = self._max_usable_produce_magic()
self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config)
self._metadata = client.cluster
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
@@ -465,6 +482,17 @@ class KafkaProducer(object):
max_wait = self.config['max_block_ms'] / 1000.0
return self._wait_on_metadata(topic, max_wait)
+ def _max_usable_produce_magic(self):
+ if self.config['api_version'] >= (0, 10):
+ return 1
+ else:
+ return 0
+
+ def _estimate_size_in_bytes(self, key, value):
+ magic = self._max_usable_produce_magic()
+ return LegacyRecordBatchBuilder.estimate_size_in_bytes(
+ magic, self.config['compression_type'], key, value)
+
def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
"""Publish a message to a topic.
@@ -514,11 +542,7 @@ class KafkaProducer(object):
partition = self._partition(topic, partition, key, value,
key_bytes, value_bytes)
- message_size = MessageSet.HEADER_SIZE + Message.HEADER_SIZE
- if key_bytes is not None:
- message_size += len(key_bytes)
- if value_bytes is not None:
- message_size += len(value_bytes)
+ message_size = self._estimate_size_in_bytes(key, value)
self._ensure_valid_record_size(message_size)
tp = TopicPartition(topic, partition)
@@ -527,11 +551,12 @@ class KafkaProducer(object):
log.debug("Sending (key=%r value=%r) to %s", key, value, tp)
result = self._accumulator.append(tp, timestamp_ms,
key_bytes, value_bytes,
- self.config['max_block_ms'])
+ self.config['max_block_ms'],
+ estimated_size=message_size)
future, batch_is_full, new_batch_created = result
if batch_is_full or new_batch_created:
log.debug("Waking up the sender since %s is either full or"
- " getting a new batch", tp)
+ " getting a new batch", tp)
self._sender.wakeup()
return future
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index fa835f3..0c0ce27 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -7,10 +7,11 @@ import threading
import time
from .. import errors as Errors
-from ..protocol.message import Message, MessageSet
-from .buffer import MessageSetBuffer, SimpleBufferPool
+from .buffer import SimpleBufferPool
from .future import FutureRecordMetadata, FutureProduceResult
from ..structs import TopicPartition
+from kafka.record.memory_records import MemoryRecordsBuilder
+from kafka.record.legacy_records import LegacyRecordBatchBuilder
log = logging.getLogger(__name__)
@@ -35,9 +36,8 @@ class AtomicInteger(object):
return self._val
-class RecordBatch(object):
- def __init__(self, tp, records, message_version=0):
- self.record_count = 0
+class ProducerBatch(object):
+ def __init__(self, tp, records, buffer):
self.max_record_size = 0
now = time.time()
self.created = now
@@ -46,35 +46,33 @@ class RecordBatch(object):
self.last_attempt = now
self.last_append = now
self.records = records
- self.message_version = message_version
self.topic_partition = tp
self.produce_future = FutureProduceResult(tp)
self._retry = False
+ self._buffer = buffer # We only save it, we don't write to it
+
+ @property
+ def record_count(self):
+ return self.records.next_offset()
def try_append(self, timestamp_ms, key, value):
- if not self.records.has_room_for(key, value):
+ offset = self.records.next_offset()
+ checksum, record_size = self.records.append(timestamp_ms, key, value)
+ if record_size == 0:
return None
- if self.message_version == 0:
- msg = Message(value, key=key, magic=self.message_version)
- else:
- msg = Message(value, key=key, magic=self.message_version,
- timestamp=timestamp_ms)
- record_size = self.records.append(self.record_count, msg)
- checksum = msg.crc # crc is recalculated during records.append()
self.max_record_size = max(self.max_record_size, record_size)
self.last_append = time.time()
- future = FutureRecordMetadata(self.produce_future, self.record_count,
+ future = FutureRecordMetadata(self.produce_future, offset,
timestamp_ms, checksum,
len(key) if key is not None else -1,
len(value) if value is not None else -1)
- self.record_count += 1
return future
def done(self, base_offset=None, timestamp_ms=None, exception=None):
log.debug("Produced messages to topic-partition %s with base offset"
" %s and error %s.", self.topic_partition, base_offset,
- exception) # trace
+ exception) # trace
if self.produce_future.is_done:
log.warning('Batch is already closed -- ignoring batch.done()')
return
@@ -113,7 +111,7 @@ class RecordBatch(object):
self.records.close()
self.done(-1, None, Errors.KafkaTimeoutError(
"Batch for %s containing %s record(s) expired: %s" % (
- self.topic_partition, self.record_count, error)))
+ self.topic_partition, self.records.next_offset(), error)))
return True
return False
@@ -123,9 +121,12 @@ class RecordBatch(object):
def set_retry(self):
self._retry = True
+ def buffer(self):
+ return self._buffer
+
def __str__(self):
- return 'RecordBatch(topic_partition=%s, record_count=%d)' % (
- self.topic_partition, self.record_count)
+ return 'ProducerBatch(topic_partition=%s, record_count=%d)' % (
+ self.topic_partition, self.records.next_offset())
class RecordAccumulator(object):
@@ -148,8 +149,9 @@ class RecordAccumulator(object):
will block up to max_block_ms, raising an exception on timeout.
In the current implementation, this setting is an approximation.
Default: 33554432 (32MB)
- compression_type (str): The compression type for all data generated by
- the producer. Valid values are 'gzip', 'snappy', 'lz4', or None.
+ compression_type (int): The compression type for all data generated by
+ the producer. Valid values are gzip(1), snappy(2), lz4(3), or
+ none(0).
Compression is of full batches of data, so the efficacy of batching
will also impact the compression ratio (more batching means better
compression). Default: None.
@@ -174,28 +176,41 @@ class RecordAccumulator(object):
'metric_group_prefix': 'producer-metrics',
}
+ _COMPRESSORS = {
+ 'gzip': LegacyRecordBatchBuilder.CODEC_GZIP,
+ 'snappy': LegacyRecordBatchBuilder.CODEC_SNAPPY,
+ 'lz4': LegacyRecordBatchBuilder.CODEC_LZ4,
+ None: LegacyRecordBatchBuilder.CODEC_NONE
+ }
+
def __init__(self, **configs):
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
self.config[key] = configs.pop(key)
+ # Convert compression type to INT presentation. Mostly for unit tests,
+ # as Producer should pass already converted values.
+ ct = self.config["compression_type"]
+ self.config["compression_type"] = self._COMPRESSORS.get(ct, ct)
+
self._closed = False
self._flushes_in_progress = AtomicInteger()
self._appends_in_progress = AtomicInteger()
- self._batches = collections.defaultdict(collections.deque) # TopicPartition: [RecordBatch]
+ self._batches = collections.defaultdict(collections.deque) # TopicPartition: [ProducerBatch]
self._tp_locks = {None: threading.Lock()} # TopicPartition: Lock, plus a lock to add entries
self._free = SimpleBufferPool(self.config['buffer_memory'],
self.config['batch_size'],
metrics=self.config['metrics'],
metric_group_prefix=self.config['metric_group_prefix'])
- self._incomplete = IncompleteRecordBatches()
+ self._incomplete = IncompleteProducerBatches()
# The following variables should only be accessed by the sender thread,
# so we don't need to protect them w/ locking.
self.muted = set()
self._drain_index = 0
- def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms):
+ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
+ estimated_size=0):
"""Add a record to the accumulator, return the append result.
The append result will contain the future metadata, and flag for
@@ -215,8 +230,8 @@ class RecordAccumulator(object):
"""
assert isinstance(tp, TopicPartition), 'not TopicPartition'
assert not self._closed, 'RecordAccumulator is closed'
- # We keep track of the number of appending thread to make sure we do not miss batches in
- # abortIncompleteBatches().
+ # We keep track of the number of appending thread to make sure we do
+ # not miss batches in abortIncompleteBatches().
self._appends_in_progress.increment()
try:
if tp not in self._tp_locks:
@@ -234,15 +249,7 @@ class RecordAccumulator(object):
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False
- # we don't have an in-progress record batch try to allocate a new batch
- message_size = MessageSet.HEADER_SIZE + Message.HEADER_SIZE
- if key is not None:
- message_size += len(key)
- if value is not None:
- message_size += len(value)
- assert message_size <= self.config['buffer_memory'], 'message too big'
-
- size = max(self.config['batch_size'], message_size)
+ size = max(self.config['batch_size'], estimated_size)
log.debug("Allocating a new %d byte message buffer for %s", size, tp) # trace
buf = self._free.allocate(size, max_time_to_block_ms)
with self._tp_locks[tp]:
@@ -260,10 +267,13 @@ class RecordAccumulator(object):
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False
- records = MessageSetBuffer(buf, self.config['batch_size'],
- self.config['compression_type'],
- self.config['message_version'])
- batch = RecordBatch(tp, records, self.config['message_version'])
+ records = MemoryRecordsBuilder(
+ self.config['message_version'],
+ self.config['compression_type'],
+ self.config['batch_size']
+ )
+
+ batch = ProducerBatch(tp, records, buf)
future = batch.try_append(timestamp_ms, key, value)
if not future:
raise Exception()
@@ -285,7 +295,7 @@ class RecordAccumulator(object):
cluster (ClusterMetadata): current metadata for kafka cluster
Returns:
- list of RecordBatch that were expired
+ list of ProducerBatch that were expired
"""
expired_batches = []
to_remove = []
@@ -449,7 +459,7 @@ class RecordAccumulator(object):
max_size (int): maximum number of bytes to drain
Returns:
- dict: {node_id: list of RecordBatch} with total size less than the
+ dict: {node_id: list of ProducerBatch} with total size less than the
requested max_size.
"""
if not nodes:
@@ -505,7 +515,7 @@ class RecordAccumulator(object):
def deallocate(self, batch):
"""Deallocate the record batch."""
self._incomplete.remove(batch)
- self._free.deallocate(batch.records.buffer())
+ self._free.deallocate(batch.buffer())
def _flush_in_progress(self):
"""Are there any threads currently waiting on a flush?"""
@@ -571,8 +581,8 @@ class RecordAccumulator(object):
self._closed = True
-class IncompleteRecordBatches(object):
- """A threadsafe helper class to hold RecordBatches that haven't been ack'd yet"""
+class IncompleteProducerBatches(object):
+ """A threadsafe helper class to hold ProducerBatches that haven't been ack'd yet"""
def __init__(self):
self._incomplete = set()
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 679efb0..72a15bb 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -288,7 +288,6 @@ class Sender(threading.Thread):
topic = batch.topic_partition.topic
partition = batch.topic_partition.partition
- # TODO: bytearray / memoryview
buf = batch.records.buffer()
produce_records_by_partition[topic][partition] = buf