diff options
-rw-r--r-- | kafka/consumer/base.py | 17 |
1 files changed, 9 insertions, 8 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 91ad82f..0bbf46c 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -83,12 +83,13 @@ class Consumer(object): if not partitions: partitions = self.client.get_partition_ids_for_topic(self.topic) - for partition in partitions: - (resp,) = self.client.send_offset_fetch_request( - self.group, - [OffsetFetchRequest(self.topic, partition)], - fail_on_error=False - ) + responses = self.client.send_offset_fetch_request( + self.group, + [OffsetFetchRequest(self.topic, p) for p in partitions], + fail_on_error=False + ) + + for resp in responses: try: check_error(resp) # API spec says server wont set an error here @@ -98,12 +99,12 @@ class Consumer(object): # -1 offset signals no commit is currently stored if resp.offset == -1: - self.offsets[partition] = 0 + self.offsets[resp.partition] = 0 # Otherwise we committed the stored offset # and need to fetch the next one else: - self.offsets[partition] = resp.offset + self.offsets[resp.partition] = resp.offset def commit(self, partitions=None): """ |