summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py11
1 files changed, 5 insertions, 6 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index fa1b8bc..42628e1 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -91,7 +91,7 @@ class Consumer(object):
self.offsets = {}
if not partitions:
- partitions = self.client.topic_partitions[topic]
+ partitions = self.client.get_partition_ids_for_topic(topic)
else:
assert all(isinstance(x, numbers.Integral) for x in partitions)
@@ -117,9 +117,9 @@ class Consumer(object):
def fetch_last_known_offsets(self, partitions=None):
if not partitions:
- partitions = self.client.topic_partitions[self.topic]
+ partitions = self.client.get_partition_ids_for_topic(self.topic)
- def get_or_init_offset_callback(resp):
+ def get_or_init_offset(resp):
try:
kafka.common.check_error(resp)
return resp.offset
@@ -128,10 +128,9 @@ class Consumer(object):
for partition in partitions:
req = OffsetFetchRequest(self.topic, partition)
- (offset,) = self.client.send_offset_fetch_request(self.group, [req],
- callback=get_or_init_offset_callback,
+ (resp,) = self.client.send_offset_fetch_request(self.group, [req],
fail_on_error=False)
- self.offsets[partition] = offset
+ self.offsets[partition] = get_or_init_offset(resp)
self.fetch_offsets = self.offsets.copy()
def commit(self, partitions=None):