diff options
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r-- | kafka/consumer/kafka.py | 60 |
1 files changed, 32 insertions, 28 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 6f5bcdd..47a5b00 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -194,10 +194,10 @@ class KafkaConsumer(object): elif isinstance(arg, tuple): topic = kafka_bytestring(arg[0]) partition = arg[1] + self._consume_topic_partition(topic, partition) if len(arg) == 3: offset = arg[2] self._offsets.fetch[(topic, partition)] = offset - self._consume_topic_partition(topic, partition) # { topic: partitions, ... } dict elif isinstance(arg, dict): @@ -224,7 +224,7 @@ class KafkaConsumer(object): topic = kafka_bytestring(key[0]) partition = key[1] self._consume_topic_partition(topic, partition) - self._offsets.fetch[key] = value + self._offsets.fetch[(topic, partition)] = value else: raise KafkaConfigurationError('Unknown topic type (%s)' % type(arg)) @@ -312,16 +312,16 @@ class KafkaConsumer(object): max_wait_time = self._config['fetch_wait_max_ms'] min_bytes = self._config['fetch_min_bytes'] - # Get current fetch offsets - offsets = self._offsets.fetch - if not offsets: - if not self._topics: - raise KafkaConfigurationError('No topics or partitions configured') + if not self._topics: + raise KafkaConfigurationError('No topics or partitions configured') + + if not self._offsets.fetch: raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages') - fetches = [] - for topic_partition, offset in six.iteritems(offsets): - fetches.append(FetchRequest(topic_partition[0], topic_partition[1], offset, max_bytes)) + fetches = [FetchRequest(topic, partition, + self._offsets.fetch[(topic, partition)], + max_bytes) + for (topic, partition) in self._topics] # client.send_fetch_request will collect topic/partition requests by leader # and send each group as a single FetchRequest to the correct broker @@ -336,49 +336,53 @@ class KafkaConsumer(object): return for resp in responses: - topic_partition = (resp.topic, resp.partition) + topic = kafka_bytestring(resp.topic) + partition = resp.partition try: check_error(resp) except OffsetOutOfRangeError: - logger.warning('OffsetOutOfRange: topic %s, partition %d, offset %d ' - '(Highwatermark: %d)', - resp.topic, resp.partition, - offsets[topic_partition], resp.highwaterMark) + logger.warning('OffsetOutOfRange: topic %s, partition %d, ' + 'offset %d (Highwatermark: %d)', + topic, partition, + self.offsets._fetch[(topic, partition)], + resp.highwaterMark) # Reset offset - self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition) + self._offsets.fetch[(topic, partition)] = ( + self._reset_partition_offset((topic, partition)) + ) continue except NotLeaderForPartitionError: logger.warning("NotLeaderForPartitionError for %s - %d. " "Metadata may be out of date", - resp.topic, resp.partition) + topic, partition) self._refresh_metadata_on_error() continue except RequestTimedOutError: logger.warning("RequestTimedOutError for %s - %d", - resp.topic, resp.partition) + topic, partition) continue # Track server highwater mark - self._offsets.highwater[topic_partition] = resp.highwaterMark + self._offsets.highwater[(topic, partition)] = resp.highwaterMark # Yield each message # Kafka-python could raise an exception during iteration # we are not catching -- user will need to address for (offset, message) in resp.messages: # deserializer_class could raise an exception here - msg = KafkaMessage(resp.topic, - resp.partition, - offset, message.key, - self._config['deserializer_class'](message.value)) - - if offset < self._offsets.fetch[topic_partition]: - logger.debug('Skipping message %s because its offset is less than the consumer offset', - msg) + val = self._config['deserializer_class'](message.value) + msg = KafkaMessage(topic, partition, offset, message.key, val) + + # in some cases the server will return earlier messages + # than we requested. skip them per kafka spec + if offset < self._offsets.fetch[(topic, partition)]: + logger.debug('message offset less than fetched offset ' + 'skipping: %s', msg) continue # Only increment fetch offset if we safely got the message and deserialized - self._offsets.fetch[topic_partition] = offset + 1 + self._offsets.fetch[(topic, partition)] = offset + 1 # Then yield to user yield msg |