diff options
author | Dana Powers <dana.powers@rd.io> | 2015-04-04 21:34:24 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-04-04 21:34:24 -0700 |
commit | c0fc334612f7a98ab98c0f970288ebe0023b42db (patch) | |
tree | bf9851a499aee0d36db029bb779ab7273eb05276 /kafka/consumer/kafka.py | |
parent | 811fd4cbb903064e3961c60a0b39c43b9473c322 (diff) | |
download | kafka-python-c0fc334612f7a98ab98c0f970288ebe0023b42db.tar.gz |
Use kafka_bytestring when decoding message topics in KafkaConsumer.fetch_messages
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r-- | kafka/consumer/kafka.py | 40 |
1 files changed, 22 insertions, 18 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 79cee28..47a5b00 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -336,49 +336,53 @@ class KafkaConsumer(object): return for resp in responses: - topic_partition = (resp.topic, resp.partition) + topic = kafka_bytestring(resp.topic) + partition = resp.partition try: check_error(resp) except OffsetOutOfRangeError: - logger.warning('OffsetOutOfRange: topic %s, partition %d, offset %d ' - '(Highwatermark: %d)', - resp.topic, resp.partition, - offsets[topic_partition], resp.highwaterMark) + logger.warning('OffsetOutOfRange: topic %s, partition %d, ' + 'offset %d (Highwatermark: %d)', + topic, partition, + self.offsets._fetch[(topic, partition)], + resp.highwaterMark) # Reset offset - self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition) + self._offsets.fetch[(topic, partition)] = ( + self._reset_partition_offset((topic, partition)) + ) continue except NotLeaderForPartitionError: logger.warning("NotLeaderForPartitionError for %s - %d. " "Metadata may be out of date", - resp.topic, resp.partition) + topic, partition) self._refresh_metadata_on_error() continue except RequestTimedOutError: logger.warning("RequestTimedOutError for %s - %d", - resp.topic, resp.partition) + topic, partition) continue # Track server highwater mark - self._offsets.highwater[topic_partition] = resp.highwaterMark + self._offsets.highwater[(topic, partition)] = resp.highwaterMark # Yield each message # Kafka-python could raise an exception during iteration # we are not catching -- user will need to address for (offset, message) in resp.messages: # deserializer_class could raise an exception here - msg = KafkaMessage(resp.topic, - resp.partition, - offset, message.key, - self._config['deserializer_class'](message.value)) - - if offset < self._offsets.fetch[topic_partition]: - logger.debug('Skipping message %s because its offset is less than the consumer offset', - msg) + val = self._config['deserializer_class'](message.value) + msg = KafkaMessage(topic, partition, offset, message.key, val) + + # in some cases the server will return earlier messages + # than we requested. skip them per kafka spec + if offset < self._offsets.fetch[(topic, partition)]: + logger.debug('message offset less than fetched offset ' + 'skipping: %s', msg) continue # Only increment fetch offset if we safely got the message and deserialized - self._offsets.fetch[topic_partition] = offset + 1 + self._offsets.fetch[(topic, partition)] = offset + 1 # Then yield to user yield msg |