summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-05-28 06:39:42 -0700
committerDavid Arthur <mumrah@gmail.com>2013-05-28 06:39:42 -0700
commit904157ba2c33702fed48ce12ee54d80cfc71c606 (patch)
treedc13245cd18c0f3b12c0374ac5a7b07b391c2823
parent6327ba38bcc799b948a8f723ea2e6f078f8e90a8 (diff)
parent8fc0407498b7167244501607f1003b294c694858 (diff)
downloadkafka-python-904157ba2c33702fed48ce12ee54d80cfc71c606.tar.gz
Merge pull request #23 from mahendra/autocommit
Auto commit timer is not periodic
-rw-r--r--kafka/consumer.py25
-rw-r--r--kafka/util.py11
2 files changed, 23 insertions, 13 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index f123113..5c39cb7 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -34,18 +34,20 @@ class SimpleConsumer(object):
self.client._load_metadata_for_topics(topic)
self.offsets = {}
- # Set up the auto-commit timer
- if auto_commit is True:
- if auto_commit_every_t is not None:
- self.commit_timer = ReentrantTimer(auto_commit_every_t, self.commit)
- self.commit_timer.start()
-
+ # Variables for handling offset commits
self.commit_lock = Lock()
+ self.commit_timer = None
self.count_since_commit = 0
self.auto_commit = auto_commit
self.auto_commit_every_n = auto_commit_every_n
self.auto_commit_every_t = auto_commit_every_t
-
+
+ # Set up the auto-commit timer
+ if auto_commit is True and auto_commit_every_t is not None:
+ self.commit_timer = ReentrantTimer(auto_commit_every_t,
+ self._timed_commit)
+ self.commit_timer.start()
+
def get_or_init_offset_callback(resp):
if resp.error == ErrorMapping.NO_ERROR:
return resp.offset
@@ -101,6 +103,15 @@ class SimpleConsumer(object):
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)
+ def _timed_commit(self):
+ """
+ Commit offsets as part of timer
+ """
+ self.commit()
+
+ # Once the commit is done, start the timer again
+ self.commit_timer.start()
+
def commit(self, partitions=[]):
"""
Commit offsets for this consumer
diff --git a/kafka/util.py b/kafka/util.py
index 5dc6bc2..8c02cb2 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -71,13 +71,12 @@ class ReentrantTimer(object):
self.fn = fn
def start(self):
- if self.timer is None:
- self.timer = Timer(self.t / 1000., self.fn)
- self.timer.start()
- else:
+ if self.timer is not None:
self.timer.cancel()
- self.timer = Timer(self.t / 1000., self.fn)
- self.timer.start()
+
+ self.timer = Timer(self.t / 1000., self.fn)
+ self.timer.start()
def stop(self):
self.timer.cancel()
+ self.timer = None