diff options
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 24 |
1 files changed, 20 insertions, 4 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 3a5e37e..bf59775 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -19,7 +19,7 @@ log = logging.getLogger(__name__) ConsumerRecord = collections.namedtuple("ConsumerRecord", - ["topic", "partition", "offset", "key", "value"]) + ["topic", "partition", "offset", "timestamp", "timestamp_type", "key", "value"]) class NoOffsetForPartitionError(Errors.KafkaError): @@ -351,17 +351,33 @@ class Fetcher(six.Iterator): position) return dict(drained) - def _unpack_message_set(self, tp, messages): + def _unpack_message_set(self, tp, messages, relative_offset=0): try: for offset, size, msg in messages: if self.config['check_crcs'] and not msg.validate_crc(): raise Errors.InvalidMessageError(msg) elif msg.is_compressed(): - for record in self._unpack_message_set(tp, msg.decompress()): + mset = msg.decompress() + # new format uses relative offsets for compressed messages + if msg.magic > 0: + last_offset, _, _ = mset[-1] + relative = offset - last_offset + else: + relative = 0 + for record in self._unpack_message_set(tp, mset, relative): yield record else: + # Message v1 adds timestamp + if msg.magic > 0: + timestamp = msg.timestamp + timestamp_type = msg.timestamp_type + else: + timestamp = timestamp_type = None key, value = self._deserialize(msg) - yield ConsumerRecord(tp.topic, tp.partition, offset, key, value) + yield ConsumerRecord(tp.topic, tp.partition, + offset + relative_offset, + timestamp, timestamp_type, + key, value) # If unpacking raises StopIteration, it is erroneously # caught by the generator. We want all exceptions to be raised # back to the user. See Issue 545 |