summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py106
1 files changed, 27 insertions, 79 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 1800863..d3ee26e 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -13,10 +13,10 @@ import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.fetch import FetchRequest
-from kafka.protocol.message import PartialMessage
from kafka.protocol.offset import (
OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET
)
+from kafka.record import MemoryRecords
from kafka.serializer import Deserializer
from kafka.structs import TopicPartition, OffsetAndTimestamp
@@ -304,7 +304,7 @@ class Fetcher(six.Iterator):
Raises:
OffsetOutOfRangeError: if no subscription offset_reset_strategy
- InvalidMessageError: if message crc validation fails (check_crcs
+ CorruptRecordException: if message crc validation fails (check_crcs
must be set to True)
RecordTooLargeError: if a message is larger than the currently
configured max_partition_fetch_bytes
@@ -449,57 +449,25 @@ class Fetcher(six.Iterator):
self._next_partition_records = None
- def _unpack_message_set(self, tp, messages):
+ def _unpack_message_set(self, tp, records):
try:
- for offset, size, msg in messages:
- if self.config['check_crcs'] and not msg.validate_crc():
- raise Errors.InvalidMessageError(msg)
-
- if not msg.is_compressed():
- yield self._parse_record(tp, offset, msg.timestamp, msg)
-
- else:
- # If relative offset is used, we need to decompress the entire message first
- # to compute the absolute offset.
- inner_mset = msg.decompress()
-
- # There should only ever be a single layer of compression
- if inner_mset[0][-1].is_compressed():
- log.warning('MessageSet at %s offset %d appears '
- ' double-compressed. This should not'
- ' happen -- check your producers!',
- tp, offset)
- if self.config['skip_double_compressed_messages']:
- log.warning('Skipping double-compressed message at'
- ' %s %d', tp, offset)
- continue
-
- if msg.magic > 0:
- last_offset, _, _ = inner_mset[-1]
- absolute_base_offset = offset - last_offset
- else:
- absolute_base_offset = -1
-
- for inner_offset, inner_size, inner_msg in inner_mset:
- if msg.magic > 0:
- # When magic value is greater than 0, the timestamp
- # of a compressed message depends on the
- # typestamp type of the wrapper message:
-
- if msg.timestamp_type == 0: # CREATE_TIME (0)
- inner_timestamp = inner_msg.timestamp
-
- elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1)
- inner_timestamp = msg.timestamp
-
- else:
- raise ValueError('Unknown timestamp type: {0}'.format(msg.timestamp_type))
- else:
- inner_timestamp = msg.timestamp
-
- if absolute_base_offset >= 0:
- inner_offset += absolute_base_offset
- yield self._parse_record(tp, inner_offset, inner_timestamp, inner_msg)
+ batch = records.next_batch()
+ while batch is not None:
+ for record in batch:
+ key_size = len(record.key) if record.key is not None else -1
+ value_size = len(record.value) if record.value is not None else -1
+ key = self._deserialize(
+ self.config['key_deserializer'],
+ tp.topic, record.key)
+ value = self._deserialize(
+ self.config['value_deserializer'],
+ tp.topic, record.value)
+ yield ConsumerRecord(
+ tp.topic, tp.partition, record.offset, record.timestamp,
+ record.timestamp_type, key, value, record.checksum,
+ key_size, value_size)
+
+ batch = records.next_batch()
# If unpacking raises StopIteration, it is erroneously
# caught by the generator. We want all exceptions to be raised
@@ -508,21 +476,6 @@ class Fetcher(six.Iterator):
log.exception('StopIteration raised unpacking messageset: %s', e)
raise Exception('StopIteration raised unpacking messageset')
- # If unpacking raises AssertionError, it means decompression unsupported
- # See Issue 1033
- except AssertionError as e:
- log.exception('AssertionError raised unpacking messageset: %s', e)
- raise
-
- def _parse_record(self, tp, offset, timestamp, msg):
- key = self._deserialize(self.config['key_deserializer'], tp.topic, msg.key)
- value = self._deserialize(self.config['value_deserializer'], tp.topic, msg.value)
- return ConsumerRecord(tp.topic, tp.partition, offset,
- timestamp, msg.timestamp_type,
- key, value, msg.crc,
- len(msg.key) if msg.key is not None else -1,
- len(msg.value) if msg.value is not None else -1)
-
def __iter__(self): # pylint: disable=non-iterator-returned
return self
@@ -784,7 +737,6 @@ class Fetcher(six.Iterator):
def _parse_fetched_data(self, completed_fetch):
tp = completed_fetch.topic_partition
- partition = completed_fetch.partition_data
fetch_offset = completed_fetch.fetched_offset
num_bytes = 0
records_count = 0
@@ -792,7 +744,6 @@ class Fetcher(six.Iterator):
error_code, highwater = completed_fetch.partition_data[:2]
error_type = Errors.for_code(error_code)
- messages = completed_fetch.partition_data[-1]
try:
if not self._subscriptions.is_fetchable(tp):
@@ -816,21 +767,18 @@ class Fetcher(six.Iterator):
position)
return None
- partial = None
- if messages and isinstance(messages[-1][-1], PartialMessage):
- partial = messages.pop()
-
- if messages:
+ records = MemoryRecords(completed_fetch.partition_data[-1])
+ if records.has_next():
log.debug("Adding fetched record for partition %s with"
" offset %d to buffered record list", tp,
position)
- unpacked = list(self._unpack_message_set(tp, messages))
+ unpacked = list(self._unpack_message_set(tp, records))
parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked)
- last_offset, _, _ = messages[-1]
+ last_offset = unpacked[-1].offset
self._sensors.records_fetch_lag.record(highwater - last_offset)
- num_bytes = sum(msg[1] for msg in messages)
- records_count = len(messages)
- elif partial:
+ num_bytes = records.valid_bytes()
+ records_count = len(unpacked)
+ elif records.size_in_bytes() > 0:
# we did not read a single message from a non-empty
# buffer because that message's size is larger than
# fetch size, in this case record this exception