summaryrefslogtreecommitdiff
path: root/kafka/consumer/base.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-03-30 16:55:41 -0700
committerDana Powers <dana.powers@rd.io>2015-03-30 16:55:41 -0700
commit1d252bfc20c8b1058dc93a495c3bdb0f4ccdf590 (patch)
tree34f65fca0bfea764a7387bc93bca36989f176304 /kafka/consumer/base.py
parentb6d032cc3f1b53a6d5b395f9b14de62f547c8f1c (diff)
downloadkafka-python-1d252bfc20c8b1058dc93a495c3bdb0f4ccdf590.tar.gz
Bulk fetch offset partitions in base consumer -- suggested by ecanzonieri
Diffstat (limited to 'kafka/consumer/base.py')
-rw-r--r--kafka/consumer/base.py17
1 files changed, 9 insertions, 8 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index 91ad82f..0bbf46c 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -83,12 +83,13 @@ class Consumer(object):
if not partitions:
partitions = self.client.get_partition_ids_for_topic(self.topic)
- for partition in partitions:
- (resp,) = self.client.send_offset_fetch_request(
- self.group,
- [OffsetFetchRequest(self.topic, partition)],
- fail_on_error=False
- )
+ responses = self.client.send_offset_fetch_request(
+ self.group,
+ [OffsetFetchRequest(self.topic, p) for p in partitions],
+ fail_on_error=False
+ )
+
+ for resp in responses:
try:
check_error(resp)
# API spec says server wont set an error here
@@ -98,12 +99,12 @@ class Consumer(object):
# -1 offset signals no commit is currently stored
if resp.offset == -1:
- self.offsets[partition] = 0
+ self.offsets[resp.partition] = 0
# Otherwise we committed the stored offset
# and need to fetch the next one
else:
- self.offsets[partition] = resp.offset
+ self.offsets[resp.partition] = resp.offset
def commit(self, partitions=None):
"""