diff options
author | Dana Powers <dana.powers@rd.io> | 2015-04-04 21:04:58 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-04-04 21:04:58 -0700 |
commit | 811fd4cbb903064e3961c60a0b39c43b9473c322 (patch) | |
tree | 3ebaad2a913a38deb8457b89ffb57e36fa6cc52c /kafka/consumer/kafka.py | |
parent | 87b2ca8e60832170a3c4ab3e391509ce40cb6faa (diff) | |
download | kafka-python-811fd4cbb903064e3961c60a0b39c43b9473c322.tar.gz |
Use list comprehension on _topics in KafkaConsumer.fetch_messages
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r-- | kafka/consumer/kafka.py | 16 |
1 files changed, 8 insertions, 8 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 423ba63..79cee28 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -312,16 +312,16 @@ class KafkaConsumer(object): max_wait_time = self._config['fetch_wait_max_ms'] min_bytes = self._config['fetch_min_bytes'] - # Get current fetch offsets - offsets = self._offsets.fetch - if not offsets: - if not self._topics: - raise KafkaConfigurationError('No topics or partitions configured') + if not self._topics: + raise KafkaConfigurationError('No topics or partitions configured') + + if not self._offsets.fetch: raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages') - fetches = [] - for topic_partition, offset in six.iteritems(offsets): - fetches.append(FetchRequest(topic_partition[0], topic_partition[1], offset, max_bytes)) + fetches = [FetchRequest(topic, partition, + self._offsets.fetch[(topic, partition)], + max_bytes) + for (topic, partition) in self._topics] # client.send_fetch_request will collect topic/partition requests by leader # and send each group as a single FetchRequest to the correct broker |