summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py34
1 files changed, 15 insertions, 19 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 8a48575..fc03d7a 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -299,15 +299,7 @@ class Fetcher(object):
" and update consumed position to %s", tp, next_offset)
self._subscriptions.assignment[tp].consumed = next_offset
- # TODO: handle compressed messages
- for offset, size, msg in messages:
- if msg.attributes:
- raise Errors.KafkaError('Compressed messages not supported yet')
- elif self.config['check_crcs'] and not msg.validate_crc():
- raise Errors.InvalidMessageError(msg)
-
- key, value = self._deserialize(msg)
- record = ConsumerRecord(tp.topic, tp.partition, offset, key, value)
+ for record in self._unpack_message_set(tp, messages):
drained[tp].append(record)
else:
# these records aren't next in line based on the last consumed
@@ -316,6 +308,17 @@ class Fetcher(object):
tp, fetch_offset)
return dict(drained)
+ def _unpack_message_set(self, tp, messages):
+ for offset, size, msg in messages:
+ if self.config['check_crcs'] and not msg.validate_crc():
+ raise Errors.InvalidMessageError(msg)
+ elif msg.is_compressed():
+ for record in self._unpack_message_set(tp, msg.decompress()):
+ yield record
+ else:
+ key, value = self._deserialize(msg)
+ yield ConsumerRecord(tp.topic, tp.partition, offset, key, value)
+
def __iter__(self):
"""Iterate over fetched_records"""
if self._subscriptions.needs_partition_assignment:
@@ -349,16 +352,9 @@ class Fetcher(object):
self._subscriptions.assignment[tp].fetched = consumed
elif fetch_offset == consumed:
- # TODO: handle compressed messages
- for offset, size, msg in messages:
- if msg.attributes:
- raise Errors.KafkaError('Compressed messages not supported yet')
- elif self.config['check_crcs'] and not msg.validate_crc():
- raise Errors.InvalidMessageError(msg)
-
- self._subscriptions.assignment[tp].consumed = offset + 1
- key, value = self._deserialize(msg)
- yield ConsumerRecord(tp.topic, tp.partition, offset, key, value)
+ for msg in self._unpack_message_set(tp, messages):
+ self._subscriptions.assignment[tp].consumed = msg.offset + 1
+ yield msg
else:
# these records aren't next in line based on the last consumed
# position, ignore them they must be from an obsolete request