diff options
-rw-r--r-- | kafka/consumer.py | 7 |
1 files changed, 3 insertions, 4 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 51f446c..e5a5bf3 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -110,7 +110,7 @@ class Consumer(object): if not partitions: partitions = self.client.get_partition_ids_for_topic(self.topic) - def get_or_init_offset_callback(resp): + def get_or_init_offset(resp): try: kafka.common.check_error(resp) return resp.offset @@ -119,10 +119,9 @@ class Consumer(object): for partition in partitions: req = OffsetFetchRequest(self.topic, partition) - (offset,) = self.client.send_offset_fetch_request(self.group, [req], - callback=get_or_init_offset_callback, + (resp,) = self.client.send_offset_fetch_request(self.group, [req], fail_on_error=False) - self.offsets[partition] = offset + self.offsets[partition] = get_or_init_offset(resp) self.fetch_offsets = self.offsets.copy() def commit(self, partitions=None): |