summaryrefslogtreecommitdiff
path: root/kafka/consumer/kafka.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-04-04 21:04:58 -0700
committerDana Powers <dana.powers@rd.io>2015-04-04 21:04:58 -0700
commit811fd4cbb903064e3961c60a0b39c43b9473c322 (patch)
tree3ebaad2a913a38deb8457b89ffb57e36fa6cc52c /kafka/consumer/kafka.py
parent87b2ca8e60832170a3c4ab3e391509ce40cb6faa (diff)
downloadkafka-python-811fd4cbb903064e3961c60a0b39c43b9473c322.tar.gz
Use list comprehension on _topics in KafkaConsumer.fetch_messages
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r--kafka/consumer/kafka.py16
1 files changed, 8 insertions, 8 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index 423ba63..79cee28 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -312,16 +312,16 @@ class KafkaConsumer(object):
max_wait_time = self._config['fetch_wait_max_ms']
min_bytes = self._config['fetch_min_bytes']
- # Get current fetch offsets
- offsets = self._offsets.fetch
- if not offsets:
- if not self._topics:
- raise KafkaConfigurationError('No topics or partitions configured')
+ if not self._topics:
+ raise KafkaConfigurationError('No topics or partitions configured')
+
+ if not self._offsets.fetch:
raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages')
- fetches = []
- for topic_partition, offset in six.iteritems(offsets):
- fetches.append(FetchRequest(topic_partition[0], topic_partition[1], offset, max_bytes))
+ fetches = [FetchRequest(topic, partition,
+ self._offsets.fetch[(topic, partition)],
+ max_bytes)
+ for (topic, partition) in self._topics]
# client.send_fetch_request will collect topic/partition requests by leader
# and send each group as a single FetchRequest to the correct broker