diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-06-25 18:33:07 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-06-25 18:33:07 +0530 |
commit | 2ce321771825c83d51c52801d6736052289faf83 (patch) | |
tree | 9bb8e5041ac054545f36478e7870dd5e7be10bee | |
parent | 65c8eb1f9f3a309f924cf469abb16af98bbe5d6d (diff) | |
parent | 64b757868a75f47752ec26f7b5d49f84058cea40 (diff) | |
download | kafka-python-2ce321771825c83d51c52801d6736052289faf83.tar.gz |
Merge branch 'master' into partition
Conflicts:
kafka/consumer.py
-rw-r--r-- | kafka/consumer.py | 26 | ||||
-rw-r--r-- | kafka/util.py | 40 |
2 files changed, 40 insertions, 26 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 1fa51a2..b87ed1c 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -84,7 +84,7 @@ class Consumer(object): # Set up the auto-commit timer if auto_commit is True and auto_commit_every_t is not None: self.commit_timer = ReentrantTimer(auto_commit_every_t, - self._timed_commit) + self.commit) self.commit_timer.start() def get_or_init_offset_callback(resp): @@ -109,15 +109,6 @@ class Consumer(object): for partition in partitions: self.offsets[partition] = 0 - def _timed_commit(self): - """ - Commit offsets as part of timer - """ - self.commit() - - # Once the commit is done, start the timer again - self.commit_timer.start() - def commit(self, partitions=None): """ Commit offsets for this consumer @@ -126,11 +117,17 @@ class Consumer(object): all of them """ - # short circuit if nothing happened + # short circuit if nothing happened. This check is kept outside + # to prevent un-necessarily acquiring a lock for checking the state if self.count_since_commit == 0: return with self.commit_lock: + # Do this check again, just in case the state has changed + # during the lock acquiring timeout + if self.count_since_commit == 0: + return + reqs = [] if not partitions: # commit all partitions partitions = self.offsets.keys() @@ -160,12 +157,7 @@ class Consumer(object): return if self.count_since_commit > self.auto_commit_every_n: - if self.commit_timer is not None: - self.commit_timer.stop() - self.commit() - self.commit_timer.start() - else: - self.commit() + self.commit() def pending(self, partitions=None): """ diff --git a/kafka/util.py b/kafka/util.py index 10bf838..11178f5 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,7 +1,7 @@ from collections import defaultdict from itertools import groupby import struct -from threading import Timer +from threading import Thread, Event def write_int_string(s): @@ -81,19 +81,41 @@ class ReentrantTimer(object): t: timer interval in milliseconds fn: a callable to invoke + args: tuple of args to be passed to function + kwargs: keyword arguments to be passed to function """ - def __init__(self, t, fn): - self.timer = None - self.t = t + def __init__(self, t, fn, *args, **kwargs): + + if t <= 0: + raise ValueError('Invalid timeout value') + + if not callable(fn): + raise ValueError('fn must be callable') + + self.thread = None + self.t = t / 1000.0 self.fn = fn + self.args = args + self.kwargs = kwargs + self.active = None + + def _timer(self, active): + while not active.wait(self.t): + self.fn(*self.args, **self.kwargs) def start(self): - if self.timer is not None: - self.timer.cancel() + if self.thread is not None: + self.stop() - self.timer = Timer(self.t / 1000., self.fn) - self.timer.start() + self.active = Event() + self.thread = Thread(target=self._timer, args=(self.active,)) + self.thread.daemon = True # So the app exits when main thread exits + self.thread.start() def stop(self): - self.timer.cancel() + if self.thread is None: + return + + self.active.set() + self.thread.join(self.t + 1) self.timer = None |