summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py26
1 files changed, 9 insertions, 17 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 1fa51a2..b87ed1c 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -84,7 +84,7 @@ class Consumer(object):
# Set up the auto-commit timer
if auto_commit is True and auto_commit_every_t is not None:
self.commit_timer = ReentrantTimer(auto_commit_every_t,
- self._timed_commit)
+ self.commit)
self.commit_timer.start()
def get_or_init_offset_callback(resp):
@@ -109,15 +109,6 @@ class Consumer(object):
for partition in partitions:
self.offsets[partition] = 0
- def _timed_commit(self):
- """
- Commit offsets as part of timer
- """
- self.commit()
-
- # Once the commit is done, start the timer again
- self.commit_timer.start()
-
def commit(self, partitions=None):
"""
Commit offsets for this consumer
@@ -126,11 +117,17 @@ class Consumer(object):
all of them
"""
- # short circuit if nothing happened
+ # short circuit if nothing happened. This check is kept outside
+ # to prevent un-necessarily acquiring a lock for checking the state
if self.count_since_commit == 0:
return
with self.commit_lock:
+ # Do this check again, just in case the state has changed
+ # during the lock acquiring timeout
+ if self.count_since_commit == 0:
+ return
+
reqs = []
if not partitions: # commit all partitions
partitions = self.offsets.keys()
@@ -160,12 +157,7 @@ class Consumer(object):
return
if self.count_since_commit > self.auto_commit_every_n:
- if self.commit_timer is not None:
- self.commit_timer.stop()
- self.commit()
- self.commit_timer.start()
- else:
- self.commit()
+ self.commit()
def pending(self, partitions=None):
"""