diff options
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 26 |
1 files changed, 9 insertions, 17 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 1fa51a2..b87ed1c 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -84,7 +84,7 @@ class Consumer(object): # Set up the auto-commit timer if auto_commit is True and auto_commit_every_t is not None: self.commit_timer = ReentrantTimer(auto_commit_every_t, - self._timed_commit) + self.commit) self.commit_timer.start() def get_or_init_offset_callback(resp): @@ -109,15 +109,6 @@ class Consumer(object): for partition in partitions: self.offsets[partition] = 0 - def _timed_commit(self): - """ - Commit offsets as part of timer - """ - self.commit() - - # Once the commit is done, start the timer again - self.commit_timer.start() - def commit(self, partitions=None): """ Commit offsets for this consumer @@ -126,11 +117,17 @@ class Consumer(object): all of them """ - # short circuit if nothing happened + # short circuit if nothing happened. This check is kept outside + # to prevent un-necessarily acquiring a lock for checking the state if self.count_since_commit == 0: return with self.commit_lock: + # Do this check again, just in case the state has changed + # during the lock acquiring timeout + if self.count_since_commit == 0: + return + reqs = [] if not partitions: # commit all partitions partitions = self.offsets.keys() @@ -160,12 +157,7 @@ class Consumer(object): return if self.count_since_commit > self.auto_commit_every_n: - if self.commit_timer is not None: - self.commit_timer.stop() - self.commit() - self.commit_timer.start() - else: - self.commit() + self.commit() def pending(self, partitions=None): """ |