diff options
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 106 |
1 files changed, 27 insertions, 79 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 1800863..d3ee26e 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -13,10 +13,10 @@ import kafka.errors as Errors from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.fetch import FetchRequest -from kafka.protocol.message import PartialMessage from kafka.protocol.offset import ( OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET ) +from kafka.record import MemoryRecords from kafka.serializer import Deserializer from kafka.structs import TopicPartition, OffsetAndTimestamp @@ -304,7 +304,7 @@ class Fetcher(six.Iterator): Raises: OffsetOutOfRangeError: if no subscription offset_reset_strategy - InvalidMessageError: if message crc validation fails (check_crcs + CorruptRecordException: if message crc validation fails (check_crcs must be set to True) RecordTooLargeError: if a message is larger than the currently configured max_partition_fetch_bytes @@ -449,57 +449,25 @@ class Fetcher(six.Iterator): self._next_partition_records = None - def _unpack_message_set(self, tp, messages): + def _unpack_message_set(self, tp, records): try: - for offset, size, msg in messages: - if self.config['check_crcs'] and not msg.validate_crc(): - raise Errors.InvalidMessageError(msg) - - if not msg.is_compressed(): - yield self._parse_record(tp, offset, msg.timestamp, msg) - - else: - # If relative offset is used, we need to decompress the entire message first - # to compute the absolute offset. - inner_mset = msg.decompress() - - # There should only ever be a single layer of compression - if inner_mset[0][-1].is_compressed(): - log.warning('MessageSet at %s offset %d appears ' - ' double-compressed. This should not' - ' happen -- check your producers!', - tp, offset) - if self.config['skip_double_compressed_messages']: - log.warning('Skipping double-compressed message at' - ' %s %d', tp, offset) - continue - - if msg.magic > 0: - last_offset, _, _ = inner_mset[-1] - absolute_base_offset = offset - last_offset - else: - absolute_base_offset = -1 - - for inner_offset, inner_size, inner_msg in inner_mset: - if msg.magic > 0: - # When magic value is greater than 0, the timestamp - # of a compressed message depends on the - # typestamp type of the wrapper message: - - if msg.timestamp_type == 0: # CREATE_TIME (0) - inner_timestamp = inner_msg.timestamp - - elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1) - inner_timestamp = msg.timestamp - - else: - raise ValueError('Unknown timestamp type: {0}'.format(msg.timestamp_type)) - else: - inner_timestamp = msg.timestamp - - if absolute_base_offset >= 0: - inner_offset += absolute_base_offset - yield self._parse_record(tp, inner_offset, inner_timestamp, inner_msg) + batch = records.next_batch() + while batch is not None: + for record in batch: + key_size = len(record.key) if record.key is not None else -1 + value_size = len(record.value) if record.value is not None else -1 + key = self._deserialize( + self.config['key_deserializer'], + tp.topic, record.key) + value = self._deserialize( + self.config['value_deserializer'], + tp.topic, record.value) + yield ConsumerRecord( + tp.topic, tp.partition, record.offset, record.timestamp, + record.timestamp_type, key, value, record.checksum, + key_size, value_size) + + batch = records.next_batch() # If unpacking raises StopIteration, it is erroneously # caught by the generator. We want all exceptions to be raised @@ -508,21 +476,6 @@ class Fetcher(six.Iterator): log.exception('StopIteration raised unpacking messageset: %s', e) raise Exception('StopIteration raised unpacking messageset') - # If unpacking raises AssertionError, it means decompression unsupported - # See Issue 1033 - except AssertionError as e: - log.exception('AssertionError raised unpacking messageset: %s', e) - raise - - def _parse_record(self, tp, offset, timestamp, msg): - key = self._deserialize(self.config['key_deserializer'], tp.topic, msg.key) - value = self._deserialize(self.config['value_deserializer'], tp.topic, msg.value) - return ConsumerRecord(tp.topic, tp.partition, offset, - timestamp, msg.timestamp_type, - key, value, msg.crc, - len(msg.key) if msg.key is not None else -1, - len(msg.value) if msg.value is not None else -1) - def __iter__(self): # pylint: disable=non-iterator-returned return self @@ -784,7 +737,6 @@ class Fetcher(six.Iterator): def _parse_fetched_data(self, completed_fetch): tp = completed_fetch.topic_partition - partition = completed_fetch.partition_data fetch_offset = completed_fetch.fetched_offset num_bytes = 0 records_count = 0 @@ -792,7 +744,6 @@ class Fetcher(six.Iterator): error_code, highwater = completed_fetch.partition_data[:2] error_type = Errors.for_code(error_code) - messages = completed_fetch.partition_data[-1] try: if not self._subscriptions.is_fetchable(tp): @@ -816,21 +767,18 @@ class Fetcher(six.Iterator): position) return None - partial = None - if messages and isinstance(messages[-1][-1], PartialMessage): - partial = messages.pop() - - if messages: + records = MemoryRecords(completed_fetch.partition_data[-1]) + if records.has_next(): log.debug("Adding fetched record for partition %s with" " offset %d to buffered record list", tp, position) - unpacked = list(self._unpack_message_set(tp, messages)) + unpacked = list(self._unpack_message_set(tp, records)) parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked) - last_offset, _, _ = messages[-1] + last_offset = unpacked[-1].offset self._sensors.records_fetch_lag.record(highwater - last_offset) - num_bytes = sum(msg[1] for msg in messages) - records_count = len(messages) - elif partial: + num_bytes = records.valid_bytes() + records_count = len(unpacked) + elif records.size_in_bytes() > 0: # we did not read a single message from a non-empty # buffer because that message's size is larger than # fetch size, in this case record this exception |