summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py25
1 files changed, 18 insertions, 7 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index f123113..5c39cb7 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -34,18 +34,20 @@ class SimpleConsumer(object):
self.client._load_metadata_for_topics(topic)
self.offsets = {}
- # Set up the auto-commit timer
- if auto_commit is True:
- if auto_commit_every_t is not None:
- self.commit_timer = ReentrantTimer(auto_commit_every_t, self.commit)
- self.commit_timer.start()
-
+ # Variables for handling offset commits
self.commit_lock = Lock()
+ self.commit_timer = None
self.count_since_commit = 0
self.auto_commit = auto_commit
self.auto_commit_every_n = auto_commit_every_n
self.auto_commit_every_t = auto_commit_every_t
-
+
+ # Set up the auto-commit timer
+ if auto_commit is True and auto_commit_every_t is not None:
+ self.commit_timer = ReentrantTimer(auto_commit_every_t,
+ self._timed_commit)
+ self.commit_timer.start()
+
def get_or_init_offset_callback(resp):
if resp.error == ErrorMapping.NO_ERROR:
return resp.offset
@@ -101,6 +103,15 @@ class SimpleConsumer(object):
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)
+ def _timed_commit(self):
+ """
+ Commit offsets as part of timer
+ """
+ self.commit()
+
+ # Once the commit is done, start the timer again
+ self.commit_timer.start()
+
def commit(self, partitions=[]):
"""
Commit offsets for this consumer