diff options
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 22 |
1 files changed, 11 insertions, 11 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 14b84fe..d855874 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -10,6 +10,7 @@ from Queue import Empty, Queue from kafka.common import ( ErrorMapping, FetchRequest, OffsetRequest, OffsetCommitRequest, + OffsetFetchRequest, ConsumerFetchSizeTooSmall, ConsumerNoMoreData ) @@ -105,17 +106,16 @@ class Consumer(object): "partition=%d failed with errorcode=%s" % ( resp.topic, resp.partition, resp.error)) - # Uncomment for 0.8.1 - # - #for partition in partitions: - # req = OffsetFetchRequest(topic, partition) - # (offset,) = self.client.send_offset_fetch_request(group, [req], - # callback=get_or_init_offset_callback, - # fail_on_error=False) - # self.offsets[partition] = offset - - for partition in partitions: - self.offsets[partition] = 0 + if auto_commit: + for partition in partitions: + req = OffsetFetchRequest(topic, partition) + (offset,) = self.client.send_offset_fetch_request(group, [req], + callback=get_or_init_offset_callback, + fail_on_error=False) + self.offsets[partition] = offset + else: + for partition in partitions: + self.offsets[partition] = 0 def commit(self, partitions=None): """ |