diff options
-rw-r--r-- | kafka/consumer/fetcher.py | 99 | ||||
-rw-r--r-- | kafka/errors.py | 7 | ||||
-rw-r--r-- | kafka/producer/buffer.py | 126 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 43 | ||||
-rw-r--r-- | kafka/producer/record_accumulator.py | 100 | ||||
-rw-r--r-- | kafka/producer/sender.py | 1 | ||||
-rw-r--r-- | kafka/protocol/fetch.py | 11 | ||||
-rw-r--r-- | kafka/protocol/legacy.py | 6 | ||||
-rw-r--r-- | kafka/protocol/message.py | 12 | ||||
-rw-r--r-- | kafka/protocol/produce.py | 7 | ||||
-rw-r--r-- | kafka/record/__init__.py | 3 | ||||
-rw-r--r-- | kafka/record/abc.py | 119 | ||||
-rw-r--r-- | kafka/record/legacy_records.py | 485 | ||||
-rw-r--r-- | kafka/record/memory_records.py | 176 | ||||
-rw-r--r-- | kafka/record/util.py | 8 | ||||
-rw-r--r-- | test/record/test_legacy_records.py | 85 | ||||
-rw-r--r-- | test/record/test_records.py | 108 | ||||
-rw-r--r-- | test/test_buffer.py | 72 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 6 | ||||
-rw-r--r-- | test/test_protocol.py | 5 | ||||
-rw-r--r-- | test/test_sender.py | 18 |
21 files changed, 1142 insertions, 355 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index c4fa546..54a771a 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -13,10 +13,10 @@ import kafka.errors as Errors from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.fetch import FetchRequest -from kafka.protocol.message import PartialMessage from kafka.protocol.offset import ( OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET ) +from kafka.record import MemoryRecords from kafka.serializer import Deserializer from kafka.structs import TopicPartition, OffsetAndTimestamp @@ -295,7 +295,7 @@ class Fetcher(six.Iterator): Raises: OffsetOutOfRangeError: if no subscription offset_reset_strategy - InvalidMessageError: if message crc validation fails (check_crcs + CorruptRecordException: if message crc validation fails (check_crcs must be set to True) RecordTooLargeError: if a message is larger than the currently configured max_partition_fetch_bytes @@ -440,57 +440,25 @@ class Fetcher(six.Iterator): self._next_partition_records = None - def _unpack_message_set(self, tp, messages): + def _unpack_message_set(self, tp, records): try: - for offset, size, msg in messages: - if self.config['check_crcs'] and not msg.validate_crc(): - raise Errors.InvalidMessageError(msg) - - if not msg.is_compressed(): - yield self._parse_record(tp, offset, msg.timestamp, msg) - - else: - # If relative offset is used, we need to decompress the entire message first - # to compute the absolute offset. - inner_mset = msg.decompress() - - # There should only ever be a single layer of compression - if inner_mset[0][-1].is_compressed(): - log.warning('MessageSet at %s offset %d appears ' - ' double-compressed. This should not' - ' happen -- check your producers!', - tp, offset) - if self.config['skip_double_compressed_messages']: - log.warning('Skipping double-compressed message at' - ' %s %d', tp, offset) - continue - - if msg.magic > 0: - last_offset, _, _ = inner_mset[-1] - absolute_base_offset = offset - last_offset - else: - absolute_base_offset = -1 - - for inner_offset, inner_size, inner_msg in inner_mset: - if msg.magic > 0: - # When magic value is greater than 0, the timestamp - # of a compressed message depends on the - # typestamp type of the wrapper message: - - if msg.timestamp_type == 0: # CREATE_TIME (0) - inner_timestamp = inner_msg.timestamp - - elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1) - inner_timestamp = msg.timestamp - - else: - raise ValueError('Unknown timestamp type: {0}'.format(msg.timestamp_type)) - else: - inner_timestamp = msg.timestamp - - if absolute_base_offset >= 0: - inner_offset += absolute_base_offset - yield self._parse_record(tp, inner_offset, inner_timestamp, inner_msg) + batch = records.next_batch() + while batch is not None: + for record in batch: + key_size = len(record.key) if record.key is not None else -1 + value_size = len(record.value) if record.value is not None else -1 + key = self._deserialize( + self.config['key_deserializer'], + tp.topic, record.key) + value = self._deserialize( + self.config['value_deserializer'], + tp.topic, record.value) + yield ConsumerRecord( + tp.topic, tp.partition, record.offset, record.timestamp, + record.timestamp_type, key, value, record.checksum, + key_size, value_size) + + batch = records.next_batch() # If unpacking raises StopIteration, it is erroneously # caught by the generator. We want all exceptions to be raised @@ -505,15 +473,6 @@ class Fetcher(six.Iterator): log.exception('AssertionError raised unpacking messageset: %s', e) raise - def _parse_record(self, tp, offset, timestamp, msg): - key = self._deserialize(self.config['key_deserializer'], tp.topic, msg.key) - value = self._deserialize(self.config['value_deserializer'], tp.topic, msg.value) - return ConsumerRecord(tp.topic, tp.partition, offset, - timestamp, msg.timestamp_type, - key, value, msg.crc, - len(msg.key) if msg.key is not None else -1, - len(msg.value) if msg.value is not None else -1) - def __iter__(self): # pylint: disable=non-iterator-returned return self @@ -783,7 +742,7 @@ class Fetcher(six.Iterator): error_code, highwater = completed_fetch.partition_data[:2] error_type = Errors.for_code(error_code) - messages = completed_fetch.partition_data[-1] + records = MemoryRecords(partition_data[-1]) try: if not self._subscriptions.is_fetchable(tp): @@ -807,21 +766,17 @@ class Fetcher(six.Iterator): position) return None - partial = None - if messages and isinstance(messages[-1][-1], PartialMessage): - partial = messages.pop() - - if messages: + if records.has_next(): log.debug("Adding fetched record for partition %s with" " offset %d to buffered record list", tp, position) - unpacked = list(self._unpack_message_set(tp, messages)) + unpacked = list(self._unpack_message_set(tp, records)) parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked) - last_offset, _, _ = messages[-1] + last_offset = unpacked[-1].offset self._sensors.records_fetch_lag.record(highwater - last_offset) - num_bytes = sum(msg[1] for msg in messages) - records_count = len(messages) - elif partial: + num_bytes = records.valid_bytes() + records_count = len(unpacked) + elif records.size_in_bytes() > 0: # we did not read a single message from a non-empty # buffer because that message's size is larger than # fetch size, in this case record this exception diff --git a/kafka/errors.py b/kafka/errors.py index c72455a..4a409db 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -101,12 +101,15 @@ class OffsetOutOfRangeError(BrokerResponseError): ' maintained by the server for the given topic/partition.') -class InvalidMessageError(BrokerResponseError): +class CorruptRecordException(BrokerResponseError): errno = 2 - message = 'INVALID_MESSAGE' + message = 'CORRUPT_MESSAGE' description = ('This message has failed its CRC checksum, exceeds the' ' valid size, or is otherwise corrupt.') +# Backward compatibility +InvalidMessageError = CorruptRecordException + class UnknownTopicOrPartitionError(BrokerResponseError): errno = 3 diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index d1eeaf1..19ea732 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -5,133 +5,9 @@ import io import threading import time -from ..codec import (has_gzip, has_snappy, has_lz4, - gzip_encode, snappy_encode, - lz4_encode, lz4_encode_old_kafka) -from .. import errors as Errors from ..metrics.stats import Rate -from ..protocol.types import Int32, Int64 -from ..protocol.message import MessageSet, Message - - -class MessageSetBuffer(object): - """Wrap a buffer for writing MessageSet batches. - - Arguments: - buf (IO stream): a buffer for writing data. Typically BytesIO. - batch_size (int): maximum number of bytes to write to the buffer. - - Keyword Arguments: - compression_type ('gzip', 'snappy', None): compress messages before - publishing. Default: None. - """ - _COMPRESSORS = { - 'gzip': (has_gzip, gzip_encode, Message.CODEC_GZIP), - 'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY), - 'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4), - 'lz4-old-kafka': (has_lz4, lz4_encode_old_kafka, Message.CODEC_LZ4), - } - def __init__(self, buf, batch_size, compression_type=None, message_version=0): - if compression_type is not None: - assert compression_type in self._COMPRESSORS, 'Unrecognized compression type' - - # Kafka 0.8/0.9 had a quirky lz4... - if compression_type == 'lz4' and message_version == 0: - compression_type = 'lz4-old-kafka' - - checker, encoder, attributes = self._COMPRESSORS[compression_type] - assert checker(), 'Compression Libraries Not Found' - self._compressor = encoder - self._compression_attributes = attributes - else: - self._compressor = None - self._compression_attributes = None - - self._message_version = message_version - self._buffer = buf - # Init MessageSetSize to 0 -- update on close - self._buffer.seek(0) - self._buffer.write(Int32.encode(0)) - self._batch_size = batch_size - self._closed = False - self._messages = 0 - self._bytes_written = 4 # Int32 header is 4 bytes - self._final_size = None - - def append(self, offset, message): - """Append a Message to the MessageSet. - - Arguments: - offset (int): offset of the message - message (Message or bytes): message struct or encoded bytes - - Returns: bytes written - """ - if isinstance(message, Message): - encoded = message.encode() - else: - encoded = bytes(message) - msg = Int64.encode(offset) + Int32.encode(len(encoded)) + encoded - self._buffer.write(msg) - self._messages += 1 - self._bytes_written += len(msg) - return len(msg) - - def has_room_for(self, key, value): - if self._closed: - return False - if not self._messages: - return True - needed_bytes = MessageSet.HEADER_SIZE + Message.HEADER_SIZE - if key is not None: - needed_bytes += len(key) - if value is not None: - needed_bytes += len(value) - return self._buffer.tell() + needed_bytes < self._batch_size - - def is_full(self): - if self._closed: - return True - return self._buffer.tell() >= self._batch_size - - def close(self): - # This method may be called multiple times on the same batch - # i.e., on retries - # we need to make sure we only close it out once - # otherwise compressed messages may be double-compressed - # see Issue 718 - if not self._closed: - if self._compressor: - # TODO: avoid copies with bytearray / memoryview - uncompressed_size = self._buffer.tell() - self._buffer.seek(4) - msg = Message(self._compressor(self._buffer.read(uncompressed_size - 4)), - attributes=self._compression_attributes, - magic=self._message_version) - encoded = msg.encode() - self._buffer.seek(4) - self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg - self._buffer.write(Int32.encode(len(encoded))) - self._buffer.write(encoded) - - # Update the message set size (less the 4 byte header), - # and return with buffer ready for full read() - self._final_size = self._buffer.tell() - self._buffer.seek(0) - self._buffer.write(Int32.encode(self._final_size - 4)) - - self._buffer.seek(0) - self._closed = True - - def size_in_bytes(self): - return self._final_size or self._buffer.tell() - - def compression_rate(self): - return self.size_in_bytes() / self._bytes_written - - def buffer(self): - return self._buffer +import kafka.errors as Errors class SimpleBufferPool(object): diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index de9dcd2..f2a480b 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -12,9 +12,10 @@ from ..vendor import six from .. import errors as Errors from ..client_async import KafkaClient, selectors +from ..codec import has_gzip, has_snappy, has_lz4 from ..metrics import MetricConfig, Metrics from ..partitioner.default import DefaultPartitioner -from ..protocol.message import Message, MessageSet +from ..record.legacy_records import LegacyRecordBatchBuilder from ..serializer import Serializer from ..structs import TopicPartition from .future import FutureRecordMetadata, FutureProduceResult @@ -310,6 +311,13 @@ class KafkaProducer(object): 'sasl_plain_password': None, } + _COMPRESSORS = { + 'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP), + 'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY), + 'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4), + None: (lambda: True, LegacyRecordBatchBuilder.CODEC_NONE), + } + def __init__(self, **configs): log.debug("Starting the Kafka producer") # trace self.config = copy.copy(self.DEFAULT_CONFIG) @@ -355,7 +363,16 @@ class KafkaProducer(object): if self.config['compression_type'] == 'lz4': assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers' - message_version = 1 if self.config['api_version'] >= (0, 10) else 0 + # Check compression_type for library support + ct = self.config['compression_type'] + if ct not in self._COMPRESSORS: + raise ValueError("Not supported codec: {}".format(ct)) + else: + checker, compression_attrs = self._COMPRESSORS[ct] + assert checker(), "Libraries for {} compression codec not found".format(ct) + self.config['compression_type'] = compression_attrs + + message_version = self._max_usable_produce_magic() self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config) self._metadata = client.cluster guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1) @@ -465,6 +482,17 @@ class KafkaProducer(object): max_wait = self.config['max_block_ms'] / 1000.0 return self._wait_on_metadata(topic, max_wait) + def _max_usable_produce_magic(self): + if self.config['api_version'] >= (0, 10): + return 1 + else: + return 0 + + def _estimate_size_in_bytes(self, key, value): + magic = self._max_usable_produce_magic() + return LegacyRecordBatchBuilder.estimate_size_in_bytes( + magic, self.config['compression_type'], key, value) + def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None): """Publish a message to a topic. @@ -514,11 +542,7 @@ class KafkaProducer(object): partition = self._partition(topic, partition, key, value, key_bytes, value_bytes) - message_size = MessageSet.HEADER_SIZE + Message.HEADER_SIZE - if key_bytes is not None: - message_size += len(key_bytes) - if value_bytes is not None: - message_size += len(value_bytes) + message_size = self._estimate_size_in_bytes(key, value) self._ensure_valid_record_size(message_size) tp = TopicPartition(topic, partition) @@ -527,11 +551,12 @@ class KafkaProducer(object): log.debug("Sending (key=%r value=%r) to %s", key, value, tp) result = self._accumulator.append(tp, timestamp_ms, key_bytes, value_bytes, - self.config['max_block_ms']) + self.config['max_block_ms'], + estimated_size=message_size) future, batch_is_full, new_batch_created = result if batch_is_full or new_batch_created: log.debug("Waking up the sender since %s is either full or" - " getting a new batch", tp) + " getting a new batch", tp) self._sender.wakeup() return future diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index fa835f3..0c0ce27 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -7,10 +7,11 @@ import threading import time from .. import errors as Errors -from ..protocol.message import Message, MessageSet -from .buffer import MessageSetBuffer, SimpleBufferPool +from .buffer import SimpleBufferPool from .future import FutureRecordMetadata, FutureProduceResult from ..structs import TopicPartition +from kafka.record.memory_records import MemoryRecordsBuilder +from kafka.record.legacy_records import LegacyRecordBatchBuilder log = logging.getLogger(__name__) @@ -35,9 +36,8 @@ class AtomicInteger(object): return self._val -class RecordBatch(object): - def __init__(self, tp, records, message_version=0): - self.record_count = 0 +class ProducerBatch(object): + def __init__(self, tp, records, buffer): self.max_record_size = 0 now = time.time() self.created = now @@ -46,35 +46,33 @@ class RecordBatch(object): self.last_attempt = now self.last_append = now self.records = records - self.message_version = message_version self.topic_partition = tp self.produce_future = FutureProduceResult(tp) self._retry = False + self._buffer = buffer # We only save it, we don't write to it + + @property + def record_count(self): + return self.records.next_offset() def try_append(self, timestamp_ms, key, value): - if not self.records.has_room_for(key, value): + offset = self.records.next_offset() + checksum, record_size = self.records.append(timestamp_ms, key, value) + if record_size == 0: return None - if self.message_version == 0: - msg = Message(value, key=key, magic=self.message_version) - else: - msg = Message(value, key=key, magic=self.message_version, - timestamp=timestamp_ms) - record_size = self.records.append(self.record_count, msg) - checksum = msg.crc # crc is recalculated during records.append() self.max_record_size = max(self.max_record_size, record_size) self.last_append = time.time() - future = FutureRecordMetadata(self.produce_future, self.record_count, + future = FutureRecordMetadata(self.produce_future, offset, timestamp_ms, checksum, len(key) if key is not None else -1, len(value) if value is not None else -1) - self.record_count += 1 return future def done(self, base_offset=None, timestamp_ms=None, exception=None): log.debug("Produced messages to topic-partition %s with base offset" " %s and error %s.", self.topic_partition, base_offset, - exception) # trace + exception) # trace if self.produce_future.is_done: log.warning('Batch is already closed -- ignoring batch.done()') return @@ -113,7 +111,7 @@ class RecordBatch(object): self.records.close() self.done(-1, None, Errors.KafkaTimeoutError( "Batch for %s containing %s record(s) expired: %s" % ( - self.topic_partition, self.record_count, error))) + self.topic_partition, self.records.next_offset(), error))) return True return False @@ -123,9 +121,12 @@ class RecordBatch(object): def set_retry(self): self._retry = True + def buffer(self): + return self._buffer + def __str__(self): - return 'RecordBatch(topic_partition=%s, record_count=%d)' % ( - self.topic_partition, self.record_count) + return 'ProducerBatch(topic_partition=%s, record_count=%d)' % ( + self.topic_partition, self.records.next_offset()) class RecordAccumulator(object): @@ -148,8 +149,9 @@ class RecordAccumulator(object): will block up to max_block_ms, raising an exception on timeout. In the current implementation, this setting is an approximation. Default: 33554432 (32MB) - compression_type (str): The compression type for all data generated by - the producer. Valid values are 'gzip', 'snappy', 'lz4', or None. + compression_type (int): The compression type for all data generated by + the producer. Valid values are gzip(1), snappy(2), lz4(3), or + none(0). Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression). Default: None. @@ -174,28 +176,41 @@ class RecordAccumulator(object): 'metric_group_prefix': 'producer-metrics', } + _COMPRESSORS = { + 'gzip': LegacyRecordBatchBuilder.CODEC_GZIP, + 'snappy': LegacyRecordBatchBuilder.CODEC_SNAPPY, + 'lz4': LegacyRecordBatchBuilder.CODEC_LZ4, + None: LegacyRecordBatchBuilder.CODEC_NONE + } + def __init__(self, **configs): self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: if key in configs: self.config[key] = configs.pop(key) + # Convert compression type to INT presentation. Mostly for unit tests, + # as Producer should pass already converted values. + ct = self.config["compression_type"] + self.config["compression_type"] = self._COMPRESSORS.get(ct, ct) + self._closed = False self._flushes_in_progress = AtomicInteger() self._appends_in_progress = AtomicInteger() - self._batches = collections.defaultdict(collections.deque) # TopicPartition: [RecordBatch] + self._batches = collections.defaultdict(collections.deque) # TopicPartition: [ProducerBatch] self._tp_locks = {None: threading.Lock()} # TopicPartition: Lock, plus a lock to add entries self._free = SimpleBufferPool(self.config['buffer_memory'], self.config['batch_size'], metrics=self.config['metrics'], metric_group_prefix=self.config['metric_group_prefix']) - self._incomplete = IncompleteRecordBatches() + self._incomplete = IncompleteProducerBatches() # The following variables should only be accessed by the sender thread, # so we don't need to protect them w/ locking. self.muted = set() self._drain_index = 0 - def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms): + def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms, + estimated_size=0): """Add a record to the accumulator, return the append result. The append result will contain the future metadata, and flag for @@ -215,8 +230,8 @@ class RecordAccumulator(object): """ assert isinstance(tp, TopicPartition), 'not TopicPartition' assert not self._closed, 'RecordAccumulator is closed' - # We keep track of the number of appending thread to make sure we do not miss batches in - # abortIncompleteBatches(). + # We keep track of the number of appending thread to make sure we do + # not miss batches in abortIncompleteBatches(). self._appends_in_progress.increment() try: if tp not in self._tp_locks: @@ -234,15 +249,7 @@ class RecordAccumulator(object): batch_is_full = len(dq) > 1 or last.records.is_full() return future, batch_is_full, False - # we don't have an in-progress record batch try to allocate a new batch - message_size = MessageSet.HEADER_SIZE + Message.HEADER_SIZE - if key is not None: - message_size += len(key) - if value is not None: - message_size += len(value) - assert message_size <= self.config['buffer_memory'], 'message too big' - - size = max(self.config['batch_size'], message_size) + size = max(self.config['batch_size'], estimated_size) log.debug("Allocating a new %d byte message buffer for %s", size, tp) # trace buf = self._free.allocate(size, max_time_to_block_ms) with self._tp_locks[tp]: @@ -260,10 +267,13 @@ class RecordAccumulator(object): batch_is_full = len(dq) > 1 or last.records.is_full() return future, batch_is_full, False - records = MessageSetBuffer(buf, self.config['batch_size'], - self.config['compression_type'], - self.config['message_version']) - batch = RecordBatch(tp, records, self.config['message_version']) + records = MemoryRecordsBuilder( + self.config['message_version'], + self.config['compression_type'], + self.config['batch_size'] + ) + + batch = ProducerBatch(tp, records, buf) future = batch.try_append(timestamp_ms, key, value) if not future: raise Exception() @@ -285,7 +295,7 @@ class RecordAccumulator(object): cluster (ClusterMetadata): current metadata for kafka cluster Returns: - list of RecordBatch that were expired + list of ProducerBatch that were expired """ expired_batches = [] to_remove = [] @@ -449,7 +459,7 @@ class RecordAccumulator(object): max_size (int): maximum number of bytes to drain Returns: - dict: {node_id: list of RecordBatch} with total size less than the + dict: {node_id: list of ProducerBatch} with total size less than the requested max_size. """ if not nodes: @@ -505,7 +515,7 @@ class RecordAccumulator(object): def deallocate(self, batch): """Deallocate the record batch.""" self._incomplete.remove(batch) - self._free.deallocate(batch.records.buffer()) + self._free.deallocate(batch.buffer()) def _flush_in_progress(self): """Are there any threads currently waiting on a flush?""" @@ -571,8 +581,8 @@ class RecordAccumulator(object): self._closed = True -class IncompleteRecordBatches(object): - """A threadsafe helper class to hold RecordBatches that haven't been ack'd yet""" +class IncompleteProducerBatches(object): + """A threadsafe helper class to hold ProducerBatches that haven't been ack'd yet""" def __init__(self): self._incomplete = set() diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 679efb0..72a15bb 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -288,7 +288,6 @@ class Sender(threading.Thread): topic = batch.topic_partition.topic partition = batch.topic_partition.partition - # TODO: bytearray / memoryview buf = batch.records.buffer() produce_records_by_partition[topic][partition] = buf diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index 359f197..0b03845 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -1,8 +1,7 @@ from __future__ import absolute_import from .api import Request, Response -from .message import MessageSet -from .types import Array, Int8, Int16, Int32, Int64, Schema, String +from .types import Array, Int8, Int16, Int32, Int64, Schema, String, Bytes class FetchResponse_v0(Response): @@ -15,7 +14,7 @@ class FetchResponse_v0(Response): ('partition', Int32), ('error_code', Int16), ('highwater_offset', Int64), - ('message_set', MessageSet))))) + ('message_set', Bytes))))) ) @@ -30,7 +29,7 @@ class FetchResponse_v1(Response): ('partition', Int32), ('error_code', Int16), ('highwater_offset', Int64), - ('message_set', MessageSet))))) + ('message_set', Bytes))))) ) @@ -61,7 +60,7 @@ class FetchResponse_v4(Response): ('aborted_transactions', Array( ('producer_id', Int64), ('first_offset', Int64))), - ('message_set', MessageSet))))) + ('message_set', Bytes))))) ) @@ -81,7 +80,7 @@ class FetchResponse_v5(Response): ('aborted_transactions', Array( ('producer_id', Int64), ('first_offset', Int64))), - ('message_set', MessageSet))))) + ('message_set', Bytes))))) ) diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index 37145b7..b8f84e7 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -19,6 +19,7 @@ from kafka.structs import ConsumerMetadataResponse from kafka.util import ( crc32, read_short_string, relative_unpack, write_int_string, group_by_topic_and_partition) +from kafka.protocol.message import MessageSet log = logging.getLogger(__name__) @@ -144,7 +145,7 @@ class KafkaProtocol(object): magic=msg.magic, attributes=msg.attributes ) partition_msgs.append((0, m.encode())) - topic_msgs.append((partition, partition_msgs)) + topic_msgs.append((partition, MessageSet.encode(partition_msgs, prepend_size=False))) topics.append((topic, topic_msgs)) @@ -215,7 +216,8 @@ class KafkaProtocol(object): ] @classmethod - def decode_message_set(cls, messages): + def decode_message_set(cls, raw_data): + messages = MessageSet.decode(raw_data, bytes_to_read=len(raw_data)) for offset, _, message in messages: if isinstance(message, kafka.protocol.message.Message) and message.is_compressed(): inner_messages = message.decompress() diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 70d5b36..f5a51a9 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -154,12 +154,13 @@ class MessageSet(AbstractType): HEADER_SIZE = 12 # offset + message_size @classmethod - def encode(cls, items): + def encode(cls, items, prepend_size=True): # RecordAccumulator encodes messagesets internally if isinstance(items, (io.BytesIO, KafkaBytes)): size = Int32.decode(items) - # rewind and return all the bytes - items.seek(items.tell() - 4) + if prepend_size: + # rewind and return all the bytes + items.seek(items.tell() - 4) return items.read(size + 4) encoded_values = [] @@ -167,7 +168,10 @@ class MessageSet(AbstractType): encoded_values.append(Int64.encode(offset)) encoded_values.append(Bytes.encode(message)) encoded = b''.join(encoded_values) - return Bytes.encode(encoded) + if prepend_size: + return Bytes.encode(encoded) + else: + return encoded @classmethod def decode(cls, data, bytes_to_read=None): diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index da1f308..34ff949 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -1,8 +1,7 @@ from __future__ import absolute_import from .api import Request, Response -from .message import MessageSet -from .types import Int16, Int32, Int64, String, Array, Schema +from .types import Int16, Int32, Int64, String, Array, Schema, Bytes class ProduceResponse_v0(Response): @@ -64,7 +63,7 @@ class ProduceRequest_v0(Request): ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), - ('messages', MessageSet))))) + ('messages', Bytes))))) ) def expect_response(self): @@ -109,7 +108,7 @@ class ProduceRequest_v3(Request): ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), - ('messages', MessageSet))))) + ('messages', Bytes))))) ) def expect_response(self): diff --git a/kafka/record/__init__.py b/kafka/record/__init__.py new file mode 100644 index 0000000..4c75acb --- /dev/null +++ b/kafka/record/__init__.py @@ -0,0 +1,3 @@ +from .memory_records import MemoryRecords + +__all__ = ["MemoryRecords"] diff --git a/kafka/record/abc.py b/kafka/record/abc.py new file mode 100644 index 0000000..4f14d76 --- /dev/null +++ b/kafka/record/abc.py @@ -0,0 +1,119 @@ +from __future__ import absolute_import +import abc + + +class ABCRecord(object): + __metaclass__ = abc.ABCMeta + + @abc.abstractproperty + def offset(self): + """ Absolute offset of record + """ + + @abc.abstractproperty + def timestamp(self): + """ Epoch milliseconds + """ + + @abc.abstractproperty + def timestamp_type(self): + """ CREATE_TIME(0) or APPEND_TIME(1) + """ + + @abc.abstractproperty + def key(self): + """ Bytes key or None + """ + + @abc.abstractproperty + def value(self): + """ Bytes value or None + """ + + @abc.abstractproperty + def checksum(self): + """ Prior to v2 format CRC was contained in every message. This will + be the checksum for v0 and v1 and None for v2 and above. + """ + + @abc.abstractproperty + def headers(self): + """ If supported by version list of key-value tuples, or empty list if + not supported by format. + """ + + +class ABCRecordBatchBuilder(object): + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def append(self, offset, timestamp, key, value, headers): + """ Writes record to internal buffer. + + Arguments: + offset (int): Relative offset of record, starting from 0 + timestamp (int): Timestamp in milliseconds since beginning of the + epoch (midnight Jan 1, 1970 (UTC)) + key (bytes or None): Key of the record + value (bytes or None): Value of the record + headers (List[Tuple[str, bytes]]): Headers of the record. Header + keys can not be ``None``. + + Returns: + (bytes, int): Checksum of the written record (or None for v2 and + above) and size of the written record. + """ + + @abc.abstractmethod + def size_in_bytes(self, offset, timestamp, key, value, headers): + """ Return the expected size change on buffer (uncompressed) if we add + this message. This will account for varint size changes and give a + reliable size. + """ + + @abc.abstractmethod + def build(self): + """ Close for append, compress if needed, write size and header and + return a ready to send bytes object. + + Return: + io.BytesIO: finished batch, ready to send. + """ + + +class ABCRecordBatch(object): + """ For v2 incapsulates a RecordBatch, for v0/v1 a single (maybe + compressed) message. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def __iter__(self): + """ Return iterator over records (ABCRecord instances). Will decompress + if needed. + """ + + +class ABCRecords(object): + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def __init__(self, buffer): + """ Initialize with bytes-like object conforming to the buffer + interface (ie. bytes, bytearray, memoryview etc.). + """ + + @abc.abstractmethod + def size_in_bytes(self): + """ Returns the size of buffer. + """ + + @abc.abstractmethod + def next_batch(self): + """ Return next batch of records (ABCRecordBatch instances). + """ + + @abc.abstractmethod + def has_next(self): + """ True if there are more batches to read, False otherwise. + """ diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py new file mode 100644 index 0000000..3d9822d --- /dev/null +++ b/kafka/record/legacy_records.py @@ -0,0 +1,485 @@ +# See: +# https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/\ +# apache/kafka/common/record/LegacyRecord.java + +# Builder and reader implementation for V0 and V1 record versions. As of Kafka +# 0.11.0.0 those were replaced with V2, thus the Legacy naming. + +# The schema is given below (see +# https://kafka.apache.org/protocol#protocol_message_sets for more details): + +# MessageSet => [Offset MessageSize Message] +# Offset => int64 +# MessageSize => int32 + +# v0 +# Message => Crc MagicByte Attributes Key Value +# Crc => int32 +# MagicByte => int8 +# Attributes => int8 +# Key => bytes +# Value => bytes + +# v1 (supported since 0.10.0) +# Message => Crc MagicByte Attributes Key Value +# Crc => int32 +# MagicByte => int8 +# Attributes => int8 +# Timestamp => int64 +# Key => bytes +# Value => bytes + +# The message attribute bits are given below: +# * Unused (4-7) +# * Timestamp Type (3) (added in V1) +# * Compression Type (0-2) + +# Note that when compression is enabled (see attributes above), the whole +# array of MessageSet's is compressed and places into a message as the `value`. +# Only the parent message is marked with `compression` bits in attributes. + +# The CRC covers the data from the Magic byte to the end of the message. + + +import struct +import time + +from .abc import ABCRecord, ABCRecordBatch, ABCRecordBatchBuilder +from .util import calc_crc32 + +from kafka.codec import ( + gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka, + gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka +) +from kafka.errors import CorruptRecordException + + +class LegacyRecordBase(object): + + HEADER_STRUCT_V0 = struct.Struct( + ">q" # BaseOffset => Int64 + "i" # Length => Int32 + "I" # CRC => Int32 + "b" # Magic => Int8 + "b" # Attributes => Int8 + ) + HEADER_STRUCT_V1 = struct.Struct( + ">q" # BaseOffset => Int64 + "i" # Length => Int32 + "I" # CRC => Int32 + "b" # Magic => Int8 + "b" # Attributes => Int8 + "q" # timestamp => Int64 + ) + + LOG_OVERHEAD = CRC_OFFSET = struct.calcsize( + ">q" # Offset + "i" # Size + ) + MAGIC_OFFSET = LOG_OVERHEAD + struct.calcsize( + ">I" # CRC + ) + # Those are used for fast size calculations + RECORD_OVERHEAD_V0 = struct.calcsize( + ">I" # CRC + "b" # magic + "b" # attributes + "i" # Key length + "i" # Value length + ) + RECORD_OVERHEAD_V1 = struct.calcsize( + ">I" # CRC + "b" # magic + "b" # attributes + "q" # timestamp + "i" # Key length + "i" # Value length + ) + + KEY_OFFSET_V0 = HEADER_STRUCT_V0.size + KEY_OFFSET_V1 = HEADER_STRUCT_V1.size + KEY_LENGTH = VALUE_LENGTH = struct.calcsize(">i") # Bytes length is Int32 + + CODEC_MASK = 0x07 + CODEC_NONE = 0x00 + CODEC_GZIP = 0x01 + CODEC_SNAPPY = 0x02 + CODEC_LZ4 = 0x03 + TIMESTAMP_TYPE_MASK = 0x08 + + LOG_APPEND_TIME = 1 + CREATE_TIME = 0 + + +class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase): + + def __init__(self, buffer, magic): + self._buffer = memoryview(buffer) + self._magic = magic + + offset, length, crc, magic_, attrs, timestamp = self._read_header(0) + assert length == len(buffer) - self.LOG_OVERHEAD + assert magic == magic_ + + self._offset = offset + self._crc = crc + self._timestamp = timestamp + self._attributes = attrs + self._decompressed = False + + @property + def timestamp_type(self): + """0 for CreateTime; 1 for LogAppendTime; None if unsupported. + + Value is determined by broker; produced messages should always set to 0 + Requires Kafka >= 0.10 / message version >= 1 + """ + if self._magic == 0: + return None + elif self._attributes & self.TIMESTAMP_TYPE_MASK: + return 1 + else: + return 0 + + @property + def compression_type(self): + return self._attributes & self.CODEC_MASK + + def validate_crc(self): + crc = calc_crc32(self._buffer[self.MAGIC_OFFSET:]) + return self._crc == crc + + def _decompress(self, key_offset): + # Copy of `_read_key_value`, but uses memoryview + pos = key_offset + key_size = struct.unpack_from(">i", self._buffer, pos)[0] + pos += self.KEY_LENGTH + if key_size != -1: + pos += key_size + value_size = struct.unpack_from(">i", self._buffer, pos)[0] + pos += self.VALUE_LENGTH + if value_size == -1: + raise CorruptRecordException("Value of compressed message is None") + else: + data = self._buffer[pos:pos + value_size] + + compression_type = self.compression_type + if compression_type == self.CODEC_GZIP: + uncompressed = gzip_decode(data) + elif compression_type == self.CODEC_SNAPPY: + uncompressed = snappy_decode(data.tobytes()) + elif compression_type == self.CODEC_LZ4: + if self._magic == 0: + uncompressed = lz4_decode_old_kafka(data.tobytes()) + else: + uncompressed = lz4_decode(data.tobytes()) + return uncompressed + + def _read_header(self, pos): + if self._magic == 0: + offset, length, crc, magic_read, attrs = \ + self.HEADER_STRUCT_V0.unpack_from(self._buffer, pos) + timestamp = None + else: + offset, length, crc, magic_read, attrs, timestamp = \ + self.HEADER_STRUCT_V1.unpack_from(self._buffer, pos) + return offset, length, crc, magic_read, attrs, timestamp + + def _read_all_headers(self): + pos = 0 + msgs = [] + buffer_len = len(self._buffer) + while pos < buffer_len: + header = self._read_header(pos) + msgs.append((header, pos)) + pos += self.LOG_OVERHEAD + header[1] # length + return msgs + + def _read_key_value(self, pos): + key_size = struct.unpack_from(">i", self._buffer, pos)[0] + pos += self.KEY_LENGTH + if key_size == -1: + key = None + else: + key = self._buffer[pos:pos + key_size].tobytes() + pos += key_size + + value_size = struct.unpack_from(">i", self._buffer, pos)[0] + pos += self.VALUE_LENGTH + if value_size == -1: + value = None + else: + value = self._buffer[pos:pos + value_size].tobytes() + return key, value + + def __iter__(self): + if self._magic == 1: + key_offset = self.KEY_OFFSET_V1 + else: + key_offset = self.KEY_OFFSET_V0 + timestamp_type = self.timestamp_type + + if self.compression_type: + # In case we will call iter again + if not self._decompressed: + self._buffer = memoryview(self._decompress(key_offset)) + self._decompressed = True + + # If relative offset is used, we need to decompress the entire + # message first to compute the absolute offset. + headers = self._read_all_headers() + if self._magic > 0: + msg_header, _ = headers[-1] + absolute_base_offset = self._offset - msg_header[0] + else: + absolute_base_offset = -1 + + for header, msg_pos in headers: + offset, _, crc, _, attrs, timestamp = header + # There should only ever be a single layer of compression + assert not attrs & self.CODEC_MASK, ( + 'MessageSet at offset %d appears double-compressed. This ' + 'should not happen -- check your producers!' % offset) + + # When magic value is greater than 0, the timestamp + # of a compressed message depends on the + # typestamp type of the wrapper message: + if timestamp_type == self.LOG_APPEND_TIME: + timestamp = self._timestamp + + if absolute_base_offset >= 0: + offset += absolute_base_offset + + key, value = self._read_key_value(msg_pos + key_offset) + yield LegacyRecord( + offset, timestamp, timestamp_type, + key, value, crc) + else: + key, value = self._read_key_value(key_offset) + yield LegacyRecord( + self._offset, self._timestamp, timestamp_type, + key, value, self._crc) + + +class LegacyRecord(ABCRecord): + + __slots__ = ("_offset", "_timestamp", "_timestamp_type", "_key", "_value", + "_crc") + + def __init__(self, offset, timestamp, timestamp_type, key, value, crc): + self._offset = offset + self._timestamp = timestamp + self._timestamp_type = timestamp_type + self._key = key + self._value = value + self._crc = crc + + @property + def offset(self): + return self._offset + + @property + def timestamp(self): + """ Epoch milliseconds + """ + return self._timestamp + + @property + def timestamp_type(self): + """ CREATE_TIME(0) or APPEND_TIME(1) + """ + return self._timestamp_type + + @property + def key(self): + """ Bytes key or None + """ + return self._key + + @property + def value(self): + """ Bytes value or None + """ + return self._value + + @property + def headers(self): + return [] + + @property + def checksum(self): + return self._crc + + def __repr__(self): + return ( + "LegacyRecord(offset={!r}, timestamp={!r}, timestamp_type={!r}," + " key={!r}, value={!r}, crc={!r})".format( + self._offset, self._timestamp, self._timestamp_type, + self._key, self._value, self._crc) + ) + + +class LegacyRecordBatchBuilder(ABCRecordBatchBuilder, LegacyRecordBase): + + def __init__(self, magic, compression_type, batch_size): + self._magic = magic + self._compression_type = compression_type + self._batch_size = batch_size + self._buffer = bytearray() + + def append(self, offset, timestamp, key, value): + """ Append message to batch. + """ + # Check types + if type(offset) != int: + raise TypeError(offset) + if timestamp is None: + timestamp = int(time.time() * 1000) + elif type(timestamp) != int: + raise TypeError(timestamp) + if not (key is None or + isinstance(key, (bytes, bytearray, memoryview))): + raise TypeError( + "Not supported type for key: {}".format(type(key))) + if not (value is None or + isinstance(value, (bytes, bytearray, memoryview))): + raise TypeError( + "Not supported type for value: {}".format(type(value))) + + # Check if we have room for another message + pos = len(self._buffer) + size = self.size_in_bytes(offset, timestamp, key, value) + # We always allow at least one record to be appended + if offset != 0 and pos + size >= self._batch_size: + return None, 0 + + # Allocate proper buffer length + self._buffer.extend(bytearray(size)) + + # Encode message + crc = self._encode_msg(pos, offset, timestamp, key, value) + + return crc, size + + def _encode_msg(self, start_pos, offset, timestamp, key, value, + attributes=0): + """ Encode msg data into the `msg_buffer`, which should be allocated + to at least the size of this message. + """ + magic = self._magic + buf = self._buffer + pos = start_pos + + # Write key and value + pos += self.KEY_OFFSET_V0 if magic == 0 else self.KEY_OFFSET_V1 + + if key is None: + struct.pack_into(">i", buf, pos, -1) + pos += self.KEY_LENGTH + else: + key_size = len(key) + struct.pack_into(">i", buf, pos, key_size) + pos += self.KEY_LENGTH + buf[pos: pos + key_size] = key + pos += key_size + + if value is None: + struct.pack_into(">i", buf, pos, -1) + pos += self.VALUE_LENGTH + else: + value_size = len(value) + struct.pack_into(">i", buf, pos, value_size) + pos += self.VALUE_LENGTH + buf[pos: pos + value_size] = value + pos += value_size + length = (pos - start_pos) - self.LOG_OVERHEAD + + # Write msg header. Note, that Crc will be updated later + if magic == 0: + self.HEADER_STRUCT_V0.pack_into( + buf, start_pos, + offset, length, 0, magic, attributes) + else: + self.HEADER_STRUCT_V1.pack_into( + buf, start_pos, + offset, length, 0, magic, attributes, timestamp) + + # Calculate CRC for msg + crc_data = memoryview(buf)[start_pos + self.MAGIC_OFFSET:] + crc = calc_crc32(crc_data) + struct.pack_into(">I", buf, start_pos + self.CRC_OFFSET, crc) + return crc + + def _maybe_compress(self): + if self._compression_type: + if self._compression_type == self.CODEC_GZIP: + compressed = gzip_encode(bytes(self._buffer)) + elif self._compression_type == self.CODEC_SNAPPY: + compressed = snappy_encode(self._buffer) + elif self._compression_type == self.CODEC_LZ4: + if self._magic == 0: + compressed = lz4_encode_old_kafka(bytes(self._buffer)) + else: + compressed = lz4_encode(bytes(self._buffer)) + size = self.size_in_bytes( + 0, timestamp=0, key=None, value=compressed) + # We will try to reuse the same buffer if we have enough space + if size > len(self._buffer): + self._buffer = bytearray(size) + else: + del self._buffer[size:] + self._encode_msg( + start_pos=0, + offset=0, timestamp=0, key=None, value=compressed, + attributes=self._compression_type) + return True + return False + + def build(self): + """Compress batch to be ready for send""" + self._maybe_compress() + return self._buffer + + def size(self): + """ Return current size of data written to buffer + """ + return len(self._buffer) + + # Size calculations. Just copied Java's implementation + + def size_in_bytes(self, offset, timestamp, key, value, headers=None): + """ Actual size of message to add + """ + assert not headers, "Headers not supported in v0/v1" + magic = self._magic + return self.LOG_OVERHEAD + self.record_size(magic, key, value) + + @classmethod + def record_size(cls, magic, key, value): + message_size = cls.record_overhead(magic) + if key is not None: + message_size += len(key) + if value is not None: + message_size += len(value) + return message_size + + @classmethod + def record_overhead(cls, magic): + assert magic in [0, 1], "Not supported magic" + if magic == 0: + return cls.RECORD_OVERHEAD_V0 + else: + return cls.RECORD_OVERHEAD_V1 + + @classmethod + def estimate_size_in_bytes(cls, magic, compression_type, key, value): + """ Upper bound estimate of record size. + """ + assert magic in [0, 1], "Not supported magic" + # In case of compression we may need another overhead for inner msg + if compression_type: + return ( + cls.LOG_OVERHEAD + cls.record_overhead(magic) + + cls.record_size(magic, key, value) + ) + return cls.LOG_OVERHEAD + cls.record_size(magic, key, value) diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py new file mode 100644 index 0000000..c6a28be --- /dev/null +++ b/kafka/record/memory_records.py @@ -0,0 +1,176 @@ +# This class takes advantage of the fact that all formats v0, v1 and v2 of +# messages storage has the same byte offsets for Length and Magic fields. +# Lets look closely at what leading bytes all versions have: +# +# V0 and V1 (Offset is MessageSet part, other bytes are Message ones): +# Offset => Int64 +# BytesLength => Int32 +# CRC => Int32 +# Magic => Int8 +# ... +# +# V2: +# BaseOffset => Int64 +# Length => Int32 +# PartitionLeaderEpoch => Int32 +# Magic => Int8 +# ... +# +# So we can iterate over batches just by knowing offsets of Length. Magic is +# used to construct the correct class for Batch itself. + +import struct + +from kafka.errors import CorruptRecordException +from .abc import ABCRecords +from .legacy_records import LegacyRecordBatch, LegacyRecordBatchBuilder + + +class MemoryRecords(ABCRecords): + + LENGTH_OFFSET = struct.calcsize(">q") + LOG_OVERHEAD = struct.calcsize(">qi") + MAGIC_OFFSET = struct.calcsize(">qii") + + # Minimum space requirements for Record V0 + MIN_SLICE = LOG_OVERHEAD + LegacyRecordBatch.RECORD_OVERHEAD_V0 + + def __init__(self, bytes_data): + self._buffer = bytes_data + self._pos = 0 + # We keep one slice ahead so `has_next` will return very fast + self._next_slice = None + self._remaining_bytes = None + self._cache_next() + + def size_in_bytes(self): + return len(self._buffer) + + def valid_bytes(self): + # We need to read the whole buffer to get the valid_bytes. + # NOTE: in Fetcher we do the call after iteration, so should be fast + if self._remaining_bytes is None: + next_slice = self._next_slice + pos = self._pos + while self._remaining_bytes is None: + self._cache_next() + # Reset previous iterator position + self._next_slice = next_slice + self._pos = pos + return len(self._buffer) - self._remaining_bytes + + # NOTE: we cache offsets here as kwargs for a bit more speed, as cPython + # will use LOAD_FAST opcode in this case + def _cache_next(self, len_offset=LENGTH_OFFSET, log_overhead=LOG_OVERHEAD): + buffer = self._buffer + buffer_len = len(buffer) + pos = self._pos + remaining = buffer_len - pos + if remaining < log_overhead: + # Will be re-checked in Fetcher for remaining bytes. + self._remaining_bytes = remaining + self._next_slice = None + return + + length, = struct.unpack_from( + ">i", buffer, pos + len_offset) + + slice_end = pos + log_overhead + length + if slice_end > buffer_len: + # Will be re-checked in Fetcher for remaining bytes + self._remaining_bytes = remaining + self._next_slice = None + return + + self._next_slice = memoryview(buffer)[pos: slice_end] + self._pos = slice_end + + def has_next(self): + return self._next_slice is not None + + # NOTE: same cache for LOAD_FAST as above + def next_batch(self, _min_slice=MIN_SLICE, + _magic_offset=MAGIC_OFFSET): + next_slice = self._next_slice + if next_slice is None: + return None + if len(next_slice) < _min_slice: + raise CorruptRecordException( + "Record size is less than the minimum record overhead " + "({})".format(_min_slice - self.LOG_OVERHEAD)) + self._cache_next() + magic, = struct.unpack_from(">b", next_slice, _magic_offset) + if magic <= 1: + return LegacyRecordBatch(next_slice, magic) + else: # pragma: no cover + raise NotImplementedError("Record V2 still not implemented") + + +class MemoryRecordsBuilder(object): + + def __init__(self, magic, compression_type, batch_size): + assert magic in [0, 1], "Not supported magic" + assert compression_type in [0, 1, 2, 3], "Not valid compression type" + self._builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=compression_type, + batch_size=batch_size) + self._batch_size = batch_size + self._buffer = None + + self._next_offset = 0 + self._closed = False + self._bytes_written = 0 + + def append(self, timestamp, key, value): + """ Append a message to the buffer. + + Returns: + (int, int): checksum and bytes written + """ + if self._closed: + return None, 0 + + offset = self._next_offset + checksum, actual_size = self._builder.append( + offset, timestamp, key, value) + # Return of 0 size means there's no space to add a new message + if actual_size == 0: + return None, 0 + + self._next_offset += 1 + return checksum, actual_size + + def close(self): + # This method may be called multiple times on the same batch + # i.e., on retries + # we need to make sure we only close it out once + # otherwise compressed messages may be double-compressed + # see Issue 718 + if not self._closed: + self._bytes_written = self._builder.size() + self._buffer = bytes(self._builder.build()) + self._builder = None + self._closed = True + + def size_in_bytes(self): + if not self._closed: + return self._builder.size() + else: + return len(self._buffer) + + def compression_rate(self): + assert self._closed + return self.size_in_bytes() / self._bytes_written + + def is_full(self): + if self._closed: + return True + else: + return self._builder.size() >= self._batch_size + + def next_offset(self): + return self._next_offset + + def buffer(self): + assert self._closed + return self._buffer diff --git a/kafka/record/util.py b/kafka/record/util.py new file mode 100644 index 0000000..098d6f4 --- /dev/null +++ b/kafka/record/util.py @@ -0,0 +1,8 @@ +import binascii + + +def calc_crc32(memview): + """ Calculate simple CRC-32 checksum over a memoryview of data + """ + crc = binascii.crc32(memview) & 0xffffffff + return crc diff --git a/test/record/test_legacy_records.py b/test/record/test_legacy_records.py new file mode 100644 index 0000000..2d76695 --- /dev/null +++ b/test/record/test_legacy_records.py @@ -0,0 +1,85 @@ +import pytest +from kafka.record.legacy_records import ( + LegacyRecordBatch, LegacyRecordBatchBuilder +) +from kafka.protocol.message import Message + + +@pytest.mark.parametrize("magic", [0, 1]) +def test_read_write_serde_v0_v1_no_compression(magic): + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=0, batch_size=9999999) + builder.append( + 0, timestamp=9999999, key=b"test", value=b"Super") + buffer = builder.build() + + batch = LegacyRecordBatch(bytes(buffer), magic) + msgs = list(batch) + assert len(msgs) == 1 + msg = msgs[0] + + assert msg.offset == 0 + assert msg.timestamp == (9999999 if magic else None) + assert msg.timestamp_type == (0 if magic else None) + assert msg.key == b"test" + assert msg.value == b"Super" + assert msg.checksum == (-2095076219 if magic else 278251978) & 0xffffffff + + +@pytest.mark.parametrize("compression_type", [ + Message.CODEC_GZIP, + Message.CODEC_SNAPPY, + Message.CODEC_LZ4 +]) +@pytest.mark.parametrize("magic", [0, 1]) +def test_read_write_serde_v0_v1_with_compression(compression_type, magic): + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=compression_type, batch_size=9999999) + for offset in range(10): + builder.append( + offset, timestamp=9999999, key=b"test", value=b"Super") + buffer = builder.build() + + batch = LegacyRecordBatch(bytes(buffer), magic) + msgs = list(batch) + + expected_checksum = (-2095076219 if magic else 278251978) & 0xffffffff + for offset, msg in enumerate(msgs): + assert msg.offset == offset + assert msg.timestamp == (9999999 if magic else None) + assert msg.timestamp_type == (0 if magic else None) + assert msg.key == b"test" + assert msg.value == b"Super" + assert msg.checksum == expected_checksum + + +@pytest.mark.parametrize("magic", [0, 1]) +def test_written_bytes_equals_size_in_bytes(magic): + key = b"test" + value = b"Super" + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=0, batch_size=9999999) + + size_in_bytes = builder.size_in_bytes( + 0, timestamp=9999999, key=key, value=value) + + pos = builder.size() + builder.append(0, timestamp=9999999, key=key, value=value) + + assert builder.size() - pos == size_in_bytes + + +@pytest.mark.parametrize("magic", [0, 1]) +def test_estimate_size_in_bytes_bigger_than_batch(magic): + key = b"Super Key" + value = b"1" * 100 + estimate_size = LegacyRecordBatchBuilder.estimate_size_in_bytes( + magic, compression_type=0, key=key, value=value) + + builder = LegacyRecordBatchBuilder( + magic=magic, compression_type=0, batch_size=9999999) + builder.append( + 0, timestamp=9999999, key=key, value=value) + buf = builder.build() + assert len(buf) <= estimate_size, \ + "Estimate should always be upper bound" diff --git a/test/record/test_records.py b/test/record/test_records.py new file mode 100644 index 0000000..fc3eaca --- /dev/null +++ b/test/record/test_records.py @@ -0,0 +1,108 @@ +import pytest +from kafka.record import MemoryRecords +from kafka.errors import CorruptRecordException + +record_batch_data_v1 = [ + # First Message value == "123" + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19G\x86(\xc2\x01\x00\x00' + b'\x00\x01^\x18g\xab\xae\xff\xff\xff\xff\x00\x00\x00\x03123', + # Second Message value == "" + b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x16\xef\x98\xc9 \x01\x00' + b'\x00\x00\x01^\x18g\xaf\xc0\xff\xff\xff\xff\x00\x00\x00\x00', + # Third Message value == "" + b'\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x16_\xaf\xfb^\x01\x00\x00' + b'\x00\x01^\x18g\xb0r\xff\xff\xff\xff\x00\x00\x00\x00', + # Fourth Message value = "123" + b'\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x19\xa8\x12W \x01\x00\x00' + b'\x00\x01^\x18g\xb8\x03\xff\xff\xff\xff\x00\x00\x00\x03123' +] + +# This is real live data from Kafka 10 broker +record_batch_data_v0 = [ + # First Message value == "123" + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x11\xfe\xb0\x1d\xbf\x00' + b'\x00\xff\xff\xff\xff\x00\x00\x00\x03123', + # Second Message value == "" + b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x0eyWH\xe0\x00\x00\xff' + b'\xff\xff\xff\x00\x00\x00\x00', + # Third Message value == "" + b'\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x0eyWH\xe0\x00\x00\xff' + b'\xff\xff\xff\x00\x00\x00\x00', + # Fourth Message value = "123" + b'\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x11\xfe\xb0\x1d\xbf\x00' + b'\x00\xff\xff\xff\xff\x00\x00\x00\x03123' +] + + +def test_memory_records_v1(): + data_bytes = b"".join(record_batch_data_v1) + b"\x00" * 4 + records = MemoryRecords(data_bytes) + + assert records.size_in_bytes() == 146 + assert records.valid_bytes() == 142 + + assert records.has_next() is True + batch = records.next_batch() + recs = list(batch) + assert len(recs) == 1 + assert recs[0].value == b"123" + assert recs[0].key is None + assert recs[0].timestamp == 1503648000942 + assert recs[0].timestamp_type == 0 + assert recs[0].checksum == 1199974594 & 0xffffffff + + assert records.next_batch() is not None + assert records.next_batch() is not None + assert records.next_batch() is not None + + assert records.has_next() is False + assert records.next_batch() is None + assert records.next_batch() is None + + +def test_memory_records_v0(): + data_bytes = b"".join(record_batch_data_v0) + records = MemoryRecords(data_bytes + b"\x00" * 4) + + assert records.size_in_bytes() == 114 + assert records.valid_bytes() == 110 + + records = MemoryRecords(data_bytes) + + assert records.has_next() is True + batch = records.next_batch() + recs = list(batch) + assert len(recs) == 1 + assert recs[0].value == b"123" + assert recs[0].key is None + assert recs[0].timestamp is None + assert recs[0].timestamp_type is None + assert recs[0].checksum == -22012481 & 0xffffffff + + assert records.next_batch() is not None + assert records.next_batch() is not None + assert records.next_batch() is not None + + assert records.has_next() is False + assert records.next_batch() is None + assert records.next_batch() is None + + +def test_memory_records_corrupt(): + records = MemoryRecords(b"") + assert records.size_in_bytes() == 0 + assert records.valid_bytes() == 0 + assert records.has_next() is False + + records = MemoryRecords(b"\x00\x00\x00") + assert records.size_in_bytes() == 3 + assert records.valid_bytes() == 0 + assert records.has_next() is False + + records = MemoryRecords( + b"\x00\x00\x00\x00\x00\x00\x00\x03" # Offset=3 + b"\x00\x00\x00\x03" # Length=3 + b"\xfe\xb0\x1d", # Some random bytes + ) + with pytest.raises(CorruptRecordException): + records.next_batch() diff --git a/test/test_buffer.py b/test/test_buffer.py deleted file mode 100644 index db6cbb3..0000000 --- a/test/test_buffer.py +++ /dev/null @@ -1,72 +0,0 @@ -# pylint: skip-file -from __future__ import absolute_import - -import io -import platform - -import pytest - -from kafka.producer.buffer import MessageSetBuffer -from kafka.protocol.message import Message, MessageSet - - -def test_buffer_close(): - records = MessageSetBuffer(io.BytesIO(), 100000) - orig_msg = Message(b'foobar') - records.append(1234, orig_msg) - records.close() - - msgset = MessageSet.decode(records.buffer()) - assert len(msgset) == 1 - (offset, size, msg) = msgset[0] - assert offset == 1234 - assert msg == orig_msg - - # Closing again should work fine - records.close() - - msgset = MessageSet.decode(records.buffer()) - assert len(msgset) == 1 - (offset, size, msg) = msgset[0] - assert offset == 1234 - assert msg == orig_msg - - -@pytest.mark.parametrize('compression', [ - 'gzip', - 'snappy', - pytest.mark.skipif(platform.python_implementation() == 'PyPy', - reason='python-lz4 crashes on older versions of pypy')('lz4'), -]) -def test_compressed_buffer_close(compression): - records = MessageSetBuffer(io.BytesIO(), 100000, compression_type=compression) - orig_msg = Message(b'foobar') - records.append(1234, orig_msg) - records.close() - - msgset = MessageSet.decode(records.buffer()) - assert len(msgset) == 1 - (offset, size, msg) = msgset[0] - assert offset == 0 - assert msg.is_compressed() - - msgset = msg.decompress() - (offset, size, msg) = msgset[0] - assert not msg.is_compressed() - assert offset == 1234 - assert msg == orig_msg - - # Closing again should work fine - records.close() - - msgset = MessageSet.decode(records.buffer()) - assert len(msgset) == 1 - (offset, size, msg) = msgset[0] - assert offset == 0 - assert msg.is_compressed() - - msgset = msg.decompress() - (offset, size, msg) = msgset[0] - assert not msg.is_compressed() - assert offset == 1234 - assert msg == orig_msg diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 17e7401..d1843b3 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -26,6 +26,8 @@ from test.testutil import ( class TestConsumerIntegration(KafkaIntegrationTestCase): + maxDiff = None + @classmethod def setUpClass(cls): if not os.environ.get('KAFKA_VERSION'): @@ -648,10 +650,10 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): kafka_producer = self.kafka_producer() early_msg = kafka_producer.send( self.topic, partition=0, value=b"first", - timestamp_ms=early_time).get() + timestamp_ms=early_time).get(1) late_msg = kafka_producer.send( self.topic, partition=0, value=b"last", - timestamp_ms=late_time).get() + timestamp_ms=late_time).get(1) consumer = self.kafka_consumer() offsets = consumer.offsets_for_times({tp: early_time}) diff --git a/test/test_protocol.py b/test/test_protocol.py index 0203614..d963650 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -260,13 +260,14 @@ def test_decode_fetch_response_partial(): struct.pack('>i', 8), # Length of value b'ar', # Value (truncated) ]) - resp = FetchResponse[0].decode(io.BytesIO(encoded)) assert len(resp.topics) == 1 topic, partitions = resp.topics[0] assert topic == 'foobar' assert len(partitions) == 2 - m1 = partitions[0][3] + + m1 = MessageSet.decode( + partitions[0][3], bytes_to_read=len(partitions[0][3])) assert len(m1) == 2 assert m1[1] == (None, None, PartialMessage()) diff --git a/test/test_sender.py b/test/test_sender.py index f37e194..2a68def 100644 --- a/test/test_sender.py +++ b/test/test_sender.py @@ -1,20 +1,17 @@ # pylint: skip-file from __future__ import absolute_import -import io - import pytest +import io from kafka.client_async import KafkaClient from kafka.cluster import ClusterMetadata -import kafka.errors as Errors -from kafka.future import Future from kafka.metrics import Metrics -from kafka.producer.buffer import MessageSetBuffer from kafka.protocol.produce import ProduceRequest -from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch +from kafka.producer.record_accumulator import RecordAccumulator, ProducerBatch from kafka.producer.sender import Sender -from kafka.structs import TopicPartition, OffsetAndMetadata +from kafka.record.memory_records import MemoryRecordsBuilder +from kafka.structs import TopicPartition @pytest.fixture @@ -47,7 +44,10 @@ def sender(client, accumulator, metrics): def test_produce_request(sender, mocker, api_version, produce_version): sender.config['api_version'] = api_version tp = TopicPartition('foo', 0) - records = MessageSetBuffer(io.BytesIO(), 100000) - batch = RecordBatch(tp, records) + buffer = io.BytesIO() + records = MemoryRecordsBuilder( + magic=1, compression_type=0, batch_size=100000) + batch = ProducerBatch(tp, records, buffer) + records.close() produce_request = sender._produce_request(0, 0, 0, [batch]) assert isinstance(produce_request, ProduceRequest[produce_version]) |