summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorMark Roberts <wizzat@gmail.com>2014-04-23 11:26:27 -0700
committerMark Roberts <wizzat@gmail.com>2014-04-23 11:26:27 -0700
commit8a1f2e6c3a73131d3a32ee4c0012628a6913d1cd (patch)
treec9ac269074fe5da67d457ae755dedf6558f1617e /kafka/consumer.py
parent86e1ac7b96a41cf84e220fa25a11f138555d5c7e (diff)
downloadkafka-python-8a1f2e6c3a73131d3a32ee4c0012628a6913d1cd.tar.gz
Split out kafka version environments, default tox no longer runs any integration tests, make skipped integration also skip setupClass, implement rudimentary offset support in consumer.py
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py22
1 files changed, 11 insertions, 11 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 14b84fe..d855874 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -10,6 +10,7 @@ from Queue import Empty, Queue
from kafka.common import (
ErrorMapping, FetchRequest,
OffsetRequest, OffsetCommitRequest,
+ OffsetFetchRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
)
@@ -105,17 +106,16 @@ class Consumer(object):
"partition=%d failed with errorcode=%s" % (
resp.topic, resp.partition, resp.error))
- # Uncomment for 0.8.1
- #
- #for partition in partitions:
- # req = OffsetFetchRequest(topic, partition)
- # (offset,) = self.client.send_offset_fetch_request(group, [req],
- # callback=get_or_init_offset_callback,
- # fail_on_error=False)
- # self.offsets[partition] = offset
-
- for partition in partitions:
- self.offsets[partition] = 0
+ if auto_commit:
+ for partition in partitions:
+ req = OffsetFetchRequest(topic, partition)
+ (offset,) = self.client.send_offset_fetch_request(group, [req],
+ callback=get_or_init_offset_callback,
+ fail_on_error=False)
+ self.offsets[partition] = offset
+ else:
+ for partition in partitions:
+ self.offsets[partition] = 0
def commit(self, partitions=None):
"""