diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-06-12 13:42:24 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-06-12 13:42:24 +0530 |
commit | b0c87eed088936c9ec571f789d7818bf445d47b3 (patch) | |
tree | 19fc42b1aa94305c8fb257c8fb694d4778dec03f /kafka/util.py | |
parent | a4601d3a1bf6792e0d57e600f48e891ef2be1528 (diff) | |
download | kafka-python-b0c87eed088936c9ec571f789d7818bf445d47b3.tar.gz |
Optimize auto-commit thread
The previous commit optimized the commit thread such that the timer
started only when there were messages to be consumed. This commit
goes a step further and ensures the following:
* Only one timer thread is created
* The main app does not block on exit (waiting for timer thread to finish)
This is ensured by having a single thread blocking on an event and
keeps calling a function. We use events instead of time.sleep() so
as to prevent the python interpreter from running every 50ms checking
if the timer has expired (logic copied from threading.Timer)
Diffstat (limited to 'kafka/util.py')
-rw-r--r-- | kafka/util.py | 43 |
1 files changed, 31 insertions, 12 deletions
diff --git a/kafka/util.py b/kafka/util.py index b3112d5..11db747 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,7 +1,7 @@ from collections import defaultdict from itertools import groupby import struct -from threading import Timer +from threading import Thread, Event def write_int_string(s): @@ -81,22 +81,41 @@ class ReentrantTimer(object): t: timer interval in milliseconds fn: a callable to invoke + args: tuple of args to be passed to function + kwargs: keyword arguments to be passed to function """ - def __init__(self, t, fn): - self.timer = None - self.t = t + def __init__(self, t, fn, *args, **kwargs): + + if t <= 0: + raise ValueError('Invalid timeout value') + + if not callable(fn): + raise ValueError('fn must be callable') + + self.thread = None + self.t = t / 1000.0 self.fn = fn - self.is_active = False + self.args = args + self.kwargs = kwargs + self.active = None + + def _timer(self, active): + while not active.wait(self.t): + self.fn(*self.args, **self.kwargs) def start(self): - if self.timer is not None: - self.timer.cancel() + if self.thread is not None: + self.stop() - self.timer = Timer(self.t / 1000., self.fn) - self.is_active = True - self.timer.start() + self.active = Event() + self.thread = Thread(target=self._timer, args=(self.active)) + self.thread.daemon = True # So the app exits when main thread exits + self.thread.start() def stop(self): - self.timer.cancel() + if self.thread is None: + return + + self.active.set() + self.thread.join(self.t + 1) self.timer = None - self.is_active = False |