summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-01 02:36:31 -0700
committerDana Powers <dana.powers@rd.io>2014-09-01 18:04:10 -0700
commit11fc9bc2e61b34bddbf6d54228709e075b2615a1 (patch)
tree9f0e13c3fbcb606deccfa363ad24daa01fa7498d
parent3bfe593e2fc47c4ab4b90edb07d205ed07489322 (diff)
downloadkafka-python-11fc9bc2e61b34bddbf6d54228709e075b2615a1.tar.gz
Dont need to use callbacks for offset fetch requests
-rw-r--r--kafka/consumer.py7
1 files changed, 3 insertions, 4 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 51f446c..e5a5bf3 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -110,7 +110,7 @@ class Consumer(object):
if not partitions:
partitions = self.client.get_partition_ids_for_topic(self.topic)
- def get_or_init_offset_callback(resp):
+ def get_or_init_offset(resp):
try:
kafka.common.check_error(resp)
return resp.offset
@@ -119,10 +119,9 @@ class Consumer(object):
for partition in partitions:
req = OffsetFetchRequest(self.topic, partition)
- (offset,) = self.client.send_offset_fetch_request(self.group, [req],
- callback=get_or_init_offset_callback,
+ (resp,) = self.client.send_offset_fetch_request(self.group, [req],
fail_on_error=False)
- self.offsets[partition] = offset
+ self.offsets[partition] = get_or_init_offset(resp)
self.fetch_offsets = self.offsets.copy()
def commit(self, partitions=None):