summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer.py7
1 files changed, 3 insertions, 4 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 51f446c..e5a5bf3 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -110,7 +110,7 @@ class Consumer(object):
if not partitions:
partitions = self.client.get_partition_ids_for_topic(self.topic)
- def get_or_init_offset_callback(resp):
+ def get_or_init_offset(resp):
try:
kafka.common.check_error(resp)
return resp.offset
@@ -119,10 +119,9 @@ class Consumer(object):
for partition in partitions:
req = OffsetFetchRequest(self.topic, partition)
- (offset,) = self.client.send_offset_fetch_request(self.group, [req],
- callback=get_or_init_offset_callback,
+ (resp,) = self.client.send_offset_fetch_request(self.group, [req],
fail_on_error=False)
- self.offsets[partition] = offset
+ self.offsets[partition] = get_or_init_offset(resp)
self.fetch_offsets = self.offsets.copy()
def commit(self, partitions=None):