diff options
author | Taras <voyn1991@gmail.com> | 2017-10-10 00:13:16 +0300 |
---|---|---|
committer | Taras <voyn1991@gmail.com> | 2017-10-11 18:09:17 +0300 |
commit | fbea5f04bccd28f3aa15a1711548b131504591ac (patch) | |
tree | 1c8a0efe687c2ace72fa146b4f03e15def8e3a95 /kafka/producer | |
parent | f04435c5ed97fef0975a77a8dc7bae7c284bba63 (diff) | |
download | kafka-python-fbea5f04bccd28f3aa15a1711548b131504591ac.tar.gz |
Refactor MessageSet and Message into LegacyRecordBatch to later support v2 message format
Diffstat (limited to 'kafka/producer')
-rw-r--r-- | kafka/producer/buffer.py | 126 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 43 | ||||
-rw-r--r-- | kafka/producer/record_accumulator.py | 100 | ||||
-rw-r--r-- | kafka/producer/sender.py | 1 |
4 files changed, 90 insertions, 180 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index d1eeaf1..19ea732 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -5,133 +5,9 @@ import io import threading import time -from ..codec import (has_gzip, has_snappy, has_lz4, - gzip_encode, snappy_encode, - lz4_encode, lz4_encode_old_kafka) -from .. import errors as Errors from ..metrics.stats import Rate -from ..protocol.types import Int32, Int64 -from ..protocol.message import MessageSet, Message - - -class MessageSetBuffer(object): - """Wrap a buffer for writing MessageSet batches. - - Arguments: - buf (IO stream): a buffer for writing data. Typically BytesIO. - batch_size (int): maximum number of bytes to write to the buffer. - - Keyword Arguments: - compression_type ('gzip', 'snappy', None): compress messages before - publishing. Default: None. - """ - _COMPRESSORS = { - 'gzip': (has_gzip, gzip_encode, Message.CODEC_GZIP), - 'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY), - 'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4), - 'lz4-old-kafka': (has_lz4, lz4_encode_old_kafka, Message.CODEC_LZ4), - } - def __init__(self, buf, batch_size, compression_type=None, message_version=0): - if compression_type is not None: - assert compression_type in self._COMPRESSORS, 'Unrecognized compression type' - - # Kafka 0.8/0.9 had a quirky lz4... - if compression_type == 'lz4' and message_version == 0: - compression_type = 'lz4-old-kafka' - - checker, encoder, attributes = self._COMPRESSORS[compression_type] - assert checker(), 'Compression Libraries Not Found' - self._compressor = encoder - self._compression_attributes = attributes - else: - self._compressor = None - self._compression_attributes = None - - self._message_version = message_version - self._buffer = buf - # Init MessageSetSize to 0 -- update on close - self._buffer.seek(0) - self._buffer.write(Int32.encode(0)) - self._batch_size = batch_size - self._closed = False - self._messages = 0 - self._bytes_written = 4 # Int32 header is 4 bytes - self._final_size = None - - def append(self, offset, message): - """Append a Message to the MessageSet. - - Arguments: - offset (int): offset of the message - message (Message or bytes): message struct or encoded bytes - - Returns: bytes written - """ - if isinstance(message, Message): - encoded = message.encode() - else: - encoded = bytes(message) - msg = Int64.encode(offset) + Int32.encode(len(encoded)) + encoded - self._buffer.write(msg) - self._messages += 1 - self._bytes_written += len(msg) - return len(msg) - - def has_room_for(self, key, value): - if self._closed: - return False - if not self._messages: - return True - needed_bytes = MessageSet.HEADER_SIZE + Message.HEADER_SIZE - if key is not None: - needed_bytes += len(key) - if value is not None: - needed_bytes += len(value) - return self._buffer.tell() + needed_bytes < self._batch_size - - def is_full(self): - if self._closed: - return True - return self._buffer.tell() >= self._batch_size - - def close(self): - # This method may be called multiple times on the same batch - # i.e., on retries - # we need to make sure we only close it out once - # otherwise compressed messages may be double-compressed - # see Issue 718 - if not self._closed: - if self._compressor: - # TODO: avoid copies with bytearray / memoryview - uncompressed_size = self._buffer.tell() - self._buffer.seek(4) - msg = Message(self._compressor(self._buffer.read(uncompressed_size - 4)), - attributes=self._compression_attributes, - magic=self._message_version) - encoded = msg.encode() - self._buffer.seek(4) - self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg - self._buffer.write(Int32.encode(len(encoded))) - self._buffer.write(encoded) - - # Update the message set size (less the 4 byte header), - # and return with buffer ready for full read() - self._final_size = self._buffer.tell() - self._buffer.seek(0) - self._buffer.write(Int32.encode(self._final_size - 4)) - - self._buffer.seek(0) - self._closed = True - - def size_in_bytes(self): - return self._final_size or self._buffer.tell() - - def compression_rate(self): - return self.size_in_bytes() / self._bytes_written - - def buffer(self): - return self._buffer +import kafka.errors as Errors class SimpleBufferPool(object): diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index de9dcd2..f2a480b 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -12,9 +12,10 @@ from ..vendor import six from .. import errors as Errors from ..client_async import KafkaClient, selectors +from ..codec import has_gzip, has_snappy, has_lz4 from ..metrics import MetricConfig, Metrics from ..partitioner.default import DefaultPartitioner -from ..protocol.message import Message, MessageSet +from ..record.legacy_records import LegacyRecordBatchBuilder from ..serializer import Serializer from ..structs import TopicPartition from .future import FutureRecordMetadata, FutureProduceResult @@ -310,6 +311,13 @@ class KafkaProducer(object): 'sasl_plain_password': None, } + _COMPRESSORS = { + 'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP), + 'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY), + 'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4), + None: (lambda: True, LegacyRecordBatchBuilder.CODEC_NONE), + } + def __init__(self, **configs): log.debug("Starting the Kafka producer") # trace self.config = copy.copy(self.DEFAULT_CONFIG) @@ -355,7 +363,16 @@ class KafkaProducer(object): if self.config['compression_type'] == 'lz4': assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers' - message_version = 1 if self.config['api_version'] >= (0, 10) else 0 + # Check compression_type for library support + ct = self.config['compression_type'] + if ct not in self._COMPRESSORS: + raise ValueError("Not supported codec: {}".format(ct)) + else: + checker, compression_attrs = self._COMPRESSORS[ct] + assert checker(), "Libraries for {} compression codec not found".format(ct) + self.config['compression_type'] = compression_attrs + + message_version = self._max_usable_produce_magic() self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config) self._metadata = client.cluster guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1) @@ -465,6 +482,17 @@ class KafkaProducer(object): max_wait = self.config['max_block_ms'] / 1000.0 return self._wait_on_metadata(topic, max_wait) + def _max_usable_produce_magic(self): + if self.config['api_version'] >= (0, 10): + return 1 + else: + return 0 + + def _estimate_size_in_bytes(self, key, value): + magic = self._max_usable_produce_magic() + return LegacyRecordBatchBuilder.estimate_size_in_bytes( + magic, self.config['compression_type'], key, value) + def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None): """Publish a message to a topic. @@ -514,11 +542,7 @@ class KafkaProducer(object): partition = self._partition(topic, partition, key, value, key_bytes, value_bytes) - message_size = MessageSet.HEADER_SIZE + Message.HEADER_SIZE - if key_bytes is not None: - message_size += len(key_bytes) - if value_bytes is not None: - message_size += len(value_bytes) + message_size = self._estimate_size_in_bytes(key, value) self._ensure_valid_record_size(message_size) tp = TopicPartition(topic, partition) @@ -527,11 +551,12 @@ class KafkaProducer(object): log.debug("Sending (key=%r value=%r) to %s", key, value, tp) result = self._accumulator.append(tp, timestamp_ms, key_bytes, value_bytes, - self.config['max_block_ms']) + self.config['max_block_ms'], + estimated_size=message_size) future, batch_is_full, new_batch_created = result if batch_is_full or new_batch_created: log.debug("Waking up the sender since %s is either full or" - " getting a new batch", tp) + " getting a new batch", tp) self._sender.wakeup() return future diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index fa835f3..0c0ce27 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -7,10 +7,11 @@ import threading import time from .. import errors as Errors -from ..protocol.message import Message, MessageSet -from .buffer import MessageSetBuffer, SimpleBufferPool +from .buffer import SimpleBufferPool from .future import FutureRecordMetadata, FutureProduceResult from ..structs import TopicPartition +from kafka.record.memory_records import MemoryRecordsBuilder +from kafka.record.legacy_records import LegacyRecordBatchBuilder log = logging.getLogger(__name__) @@ -35,9 +36,8 @@ class AtomicInteger(object): return self._val -class RecordBatch(object): - def __init__(self, tp, records, message_version=0): - self.record_count = 0 +class ProducerBatch(object): + def __init__(self, tp, records, buffer): self.max_record_size = 0 now = time.time() self.created = now @@ -46,35 +46,33 @@ class RecordBatch(object): self.last_attempt = now self.last_append = now self.records = records - self.message_version = message_version self.topic_partition = tp self.produce_future = FutureProduceResult(tp) self._retry = False + self._buffer = buffer # We only save it, we don't write to it + + @property + def record_count(self): + return self.records.next_offset() def try_append(self, timestamp_ms, key, value): - if not self.records.has_room_for(key, value): + offset = self.records.next_offset() + checksum, record_size = self.records.append(timestamp_ms, key, value) + if record_size == 0: return None - if self.message_version == 0: - msg = Message(value, key=key, magic=self.message_version) - else: - msg = Message(value, key=key, magic=self.message_version, - timestamp=timestamp_ms) - record_size = self.records.append(self.record_count, msg) - checksum = msg.crc # crc is recalculated during records.append() self.max_record_size = max(self.max_record_size, record_size) self.last_append = time.time() - future = FutureRecordMetadata(self.produce_future, self.record_count, + future = FutureRecordMetadata(self.produce_future, offset, timestamp_ms, checksum, len(key) if key is not None else -1, len(value) if value is not None else -1) - self.record_count += 1 return future def done(self, base_offset=None, timestamp_ms=None, exception=None): log.debug("Produced messages to topic-partition %s with base offset" " %s and error %s.", self.topic_partition, base_offset, - exception) # trace + exception) # trace if self.produce_future.is_done: log.warning('Batch is already closed -- ignoring batch.done()') return @@ -113,7 +111,7 @@ class RecordBatch(object): self.records.close() self.done(-1, None, Errors.KafkaTimeoutError( "Batch for %s containing %s record(s) expired: %s" % ( - self.topic_partition, self.record_count, error))) + self.topic_partition, self.records.next_offset(), error))) return True return False @@ -123,9 +121,12 @@ class RecordBatch(object): def set_retry(self): self._retry = True + def buffer(self): + return self._buffer + def __str__(self): - return 'RecordBatch(topic_partition=%s, record_count=%d)' % ( - self.topic_partition, self.record_count) + return 'ProducerBatch(topic_partition=%s, record_count=%d)' % ( + self.topic_partition, self.records.next_offset()) class RecordAccumulator(object): @@ -148,8 +149,9 @@ class RecordAccumulator(object): will block up to max_block_ms, raising an exception on timeout. In the current implementation, this setting is an approximation. Default: 33554432 (32MB) - compression_type (str): The compression type for all data generated by - the producer. Valid values are 'gzip', 'snappy', 'lz4', or None. + compression_type (int): The compression type for all data generated by + the producer. Valid values are gzip(1), snappy(2), lz4(3), or + none(0). Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression). Default: None. @@ -174,28 +176,41 @@ class RecordAccumulator(object): 'metric_group_prefix': 'producer-metrics', } + _COMPRESSORS = { + 'gzip': LegacyRecordBatchBuilder.CODEC_GZIP, + 'snappy': LegacyRecordBatchBuilder.CODEC_SNAPPY, + 'lz4': LegacyRecordBatchBuilder.CODEC_LZ4, + None: LegacyRecordBatchBuilder.CODEC_NONE + } + def __init__(self, **configs): self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: if key in configs: self.config[key] = configs.pop(key) + # Convert compression type to INT presentation. Mostly for unit tests, + # as Producer should pass already converted values. + ct = self.config["compression_type"] + self.config["compression_type"] = self._COMPRESSORS.get(ct, ct) + self._closed = False self._flushes_in_progress = AtomicInteger() self._appends_in_progress = AtomicInteger() - self._batches = collections.defaultdict(collections.deque) # TopicPartition: [RecordBatch] + self._batches = collections.defaultdict(collections.deque) # TopicPartition: [ProducerBatch] self._tp_locks = {None: threading.Lock()} # TopicPartition: Lock, plus a lock to add entries self._free = SimpleBufferPool(self.config['buffer_memory'], self.config['batch_size'], metrics=self.config['metrics'], metric_group_prefix=self.config['metric_group_prefix']) - self._incomplete = IncompleteRecordBatches() + self._incomplete = IncompleteProducerBatches() # The following variables should only be accessed by the sender thread, # so we don't need to protect them w/ locking. self.muted = set() self._drain_index = 0 - def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms): + def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms, + estimated_size=0): """Add a record to the accumulator, return the append result. The append result will contain the future metadata, and flag for @@ -215,8 +230,8 @@ class RecordAccumulator(object): """ assert isinstance(tp, TopicPartition), 'not TopicPartition' assert not self._closed, 'RecordAccumulator is closed' - # We keep track of the number of appending thread to make sure we do not miss batches in - # abortIncompleteBatches(). + # We keep track of the number of appending thread to make sure we do + # not miss batches in abortIncompleteBatches(). self._appends_in_progress.increment() try: if tp not in self._tp_locks: @@ -234,15 +249,7 @@ class RecordAccumulator(object): batch_is_full = len(dq) > 1 or last.records.is_full() return future, batch_is_full, False - # we don't have an in-progress record batch try to allocate a new batch - message_size = MessageSet.HEADER_SIZE + Message.HEADER_SIZE - if key is not None: - message_size += len(key) - if value is not None: - message_size += len(value) - assert message_size <= self.config['buffer_memory'], 'message too big' - - size = max(self.config['batch_size'], message_size) + size = max(self.config['batch_size'], estimated_size) log.debug("Allocating a new %d byte message buffer for %s", size, tp) # trace buf = self._free.allocate(size, max_time_to_block_ms) with self._tp_locks[tp]: @@ -260,10 +267,13 @@ class RecordAccumulator(object): batch_is_full = len(dq) > 1 or last.records.is_full() return future, batch_is_full, False - records = MessageSetBuffer(buf, self.config['batch_size'], - self.config['compression_type'], - self.config['message_version']) - batch = RecordBatch(tp, records, self.config['message_version']) + records = MemoryRecordsBuilder( + self.config['message_version'], + self.config['compression_type'], + self.config['batch_size'] + ) + + batch = ProducerBatch(tp, records, buf) future = batch.try_append(timestamp_ms, key, value) if not future: raise Exception() @@ -285,7 +295,7 @@ class RecordAccumulator(object): cluster (ClusterMetadata): current metadata for kafka cluster Returns: - list of RecordBatch that were expired + list of ProducerBatch that were expired """ expired_batches = [] to_remove = [] @@ -449,7 +459,7 @@ class RecordAccumulator(object): max_size (int): maximum number of bytes to drain Returns: - dict: {node_id: list of RecordBatch} with total size less than the + dict: {node_id: list of ProducerBatch} with total size less than the requested max_size. """ if not nodes: @@ -505,7 +515,7 @@ class RecordAccumulator(object): def deallocate(self, batch): """Deallocate the record batch.""" self._incomplete.remove(batch) - self._free.deallocate(batch.records.buffer()) + self._free.deallocate(batch.buffer()) def _flush_in_progress(self): """Are there any threads currently waiting on a flush?""" @@ -571,8 +581,8 @@ class RecordAccumulator(object): self._closed = True -class IncompleteRecordBatches(object): - """A threadsafe helper class to hold RecordBatches that haven't been ack'd yet""" +class IncompleteProducerBatches(object): + """A threadsafe helper class to hold ProducerBatches that haven't been ack'd yet""" def __init__(self): self._incomplete = set() diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 679efb0..72a15bb 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -288,7 +288,6 @@ class Sender(threading.Thread): topic = batch.topic_partition.topic partition = batch.topic_partition.partition - # TODO: bytearray / memoryview buf = batch.records.buffer() produce_records_by_partition[topic][partition] = buf |