summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py22
1 files changed, 11 insertions, 11 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 14b84fe..d855874 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -10,6 +10,7 @@ from Queue import Empty, Queue
from kafka.common import (
ErrorMapping, FetchRequest,
OffsetRequest, OffsetCommitRequest,
+ OffsetFetchRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
)
@@ -105,17 +106,16 @@ class Consumer(object):
"partition=%d failed with errorcode=%s" % (
resp.topic, resp.partition, resp.error))
- # Uncomment for 0.8.1
- #
- #for partition in partitions:
- # req = OffsetFetchRequest(topic, partition)
- # (offset,) = self.client.send_offset_fetch_request(group, [req],
- # callback=get_or_init_offset_callback,
- # fail_on_error=False)
- # self.offsets[partition] = offset
-
- for partition in partitions:
- self.offsets[partition] = 0
+ if auto_commit:
+ for partition in partitions:
+ req = OffsetFetchRequest(topic, partition)
+ (offset,) = self.client.send_offset_fetch_request(group, [req],
+ callback=get_or_init_offset_callback,
+ fail_on_error=False)
+ self.offsets[partition] = offset
+ else:
+ for partition in partitions:
+ self.offsets[partition] = 0
def commit(self, partitions=None):
"""