diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-10 23:32:01 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-07-14 23:05:45 -0700 |
commit | a8c86d379adabeecfe9ba8dd47f7280b0fc3199c (patch) | |
tree | 87ec92613f03e070ff8f9fe556f3a83f7f1556b9 | |
parent | 916c25726f6238c5af92728aa8df8d8fddd809a7 (diff) | |
download | kafka-python-a8c86d379adabeecfe9ba8dd47f7280b0fc3199c.tar.gz |
Drop recursion in _unpack_message_set
-rw-r--r-- | kafka/consumer/fetcher.py | 53 |
1 files changed, 36 insertions, 17 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 5f3eb1d..7437567 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -352,33 +352,52 @@ class Fetcher(six.Iterator): position) return dict(drained) - def _unpack_message_set(self, tp, messages, relative_offset=0): + def _unpack_message_set(self, tp, messages): try: for offset, size, msg in messages: if self.config['check_crcs'] and not msg.validate_crc(): raise Errors.InvalidMessageError(msg) elif msg.is_compressed(): - mset = msg.decompress() - # new format uses relative offsets for compressed messages + # If relative offset is used, we need to decompress the entire message first to compute + # the absolute offset. + inner_mset = msg.decompress() if msg.magic > 0: - last_offset, _, _ = mset[-1] - relative = offset - last_offset + last_offset, _, _ = inner_mset[-1] + absolute_base_offset = offset - last_offset else: - relative = 0 - for record in self._unpack_message_set(tp, mset, relative): - yield record + absolute_base_offset = -1 + + for inner_offset, inner_size, inner_msg in inner_mset: + if msg.magic > 0: + # When magic value is greater than 0, the timestamp + # of a compressed message depends on the + # typestamp type of the wrapper message: + + if msg.timestamp_type == 0: # CREATE_TIME (0) + inner_timestamp = inner_msg.timestamp + + elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1) + inner_timestamp = msg.timestamp + + else: + raise ValueError('Unknown timestamp type: {}'.format(msg.timestamp_type)) + else: + inner_timestamp = msg.timestamp + + if absolute_base_offset >= 0: + inner_offset += absolute_base_offset + + key, value = self._deserialize(inner_msg) + yield ConsumerRecord(tp.topic, tp.partition, inner_offset, + inner_timestamp, msg.timestamp_type, + key, value) + else: - # Message v1 adds timestamp - if msg.magic > 0: - timestamp = msg.timestamp - timestamp_type = msg.timestamp_type - else: - timestamp = timestamp_type = None key, value = self._deserialize(msg) - yield ConsumerRecord(tp.topic, tp.partition, - offset + relative_offset, - timestamp, timestamp_type, + yield ConsumerRecord(tp.topic, tp.partition, offset, + msg.timestamp, msg.timestamp_type, key, value) + # If unpacking raises StopIteration, it is erroneously # caught by the generator. We want all exceptions to be raised # back to the user. See Issue 545 |