diff options
-rw-r--r-- | kafka/consumer/kafka.py | 16 |
1 files changed, 8 insertions, 8 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 423ba63..79cee28 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -312,16 +312,16 @@ class KafkaConsumer(object): max_wait_time = self._config['fetch_wait_max_ms'] min_bytes = self._config['fetch_min_bytes'] - # Get current fetch offsets - offsets = self._offsets.fetch - if not offsets: - if not self._topics: - raise KafkaConfigurationError('No topics or partitions configured') + if not self._topics: + raise KafkaConfigurationError('No topics or partitions configured') + + if not self._offsets.fetch: raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages') - fetches = [] - for topic_partition, offset in six.iteritems(offsets): - fetches.append(FetchRequest(topic_partition[0], topic_partition[1], offset, max_bytes)) + fetches = [FetchRequest(topic, partition, + self._offsets.fetch[(topic, partition)], + max_bytes) + for (topic, partition) in self._topics] # client.send_fetch_request will collect topic/partition requests by leader # and send each group as a single FetchRequest to the correct broker |