summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py24
1 files changed, 20 insertions, 4 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 3a5e37e..bf59775 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -19,7 +19,7 @@ log = logging.getLogger(__name__)
ConsumerRecord = collections.namedtuple("ConsumerRecord",
- ["topic", "partition", "offset", "key", "value"])
+ ["topic", "partition", "offset", "timestamp", "timestamp_type", "key", "value"])
class NoOffsetForPartitionError(Errors.KafkaError):
@@ -351,17 +351,33 @@ class Fetcher(six.Iterator):
position)
return dict(drained)
- def _unpack_message_set(self, tp, messages):
+ def _unpack_message_set(self, tp, messages, relative_offset=0):
try:
for offset, size, msg in messages:
if self.config['check_crcs'] and not msg.validate_crc():
raise Errors.InvalidMessageError(msg)
elif msg.is_compressed():
- for record in self._unpack_message_set(tp, msg.decompress()):
+ mset = msg.decompress()
+ # new format uses relative offsets for compressed messages
+ if msg.magic > 0:
+ last_offset, _, _ = mset[-1]
+ relative = offset - last_offset
+ else:
+ relative = 0
+ for record in self._unpack_message_set(tp, mset, relative):
yield record
else:
+ # Message v1 adds timestamp
+ if msg.magic > 0:
+ timestamp = msg.timestamp
+ timestamp_type = msg.timestamp_type
+ else:
+ timestamp = timestamp_type = None
key, value = self._deserialize(msg)
- yield ConsumerRecord(tp.topic, tp.partition, offset, key, value)
+ yield ConsumerRecord(tp.topic, tp.partition,
+ offset + relative_offset,
+ timestamp, timestamp_type,
+ key, value)
# If unpacking raises StopIteration, it is erroneously
# caught by the generator. We want all exceptions to be raised
# back to the user. See Issue 545