summaryrefslogtreecommitdiff
path: root/kafka/util.py
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-06-12 13:42:24 +0530
committerMahendra M <mahendra.m@gmail.com>2013-06-12 13:42:24 +0530
commitb0c87eed088936c9ec571f789d7818bf445d47b3 (patch)
tree19fc42b1aa94305c8fb257c8fb694d4778dec03f /kafka/util.py
parenta4601d3a1bf6792e0d57e600f48e891ef2be1528 (diff)
downloadkafka-python-b0c87eed088936c9ec571f789d7818bf445d47b3.tar.gz
Optimize auto-commit thread
The previous commit optimized the commit thread such that the timer started only when there were messages to be consumed. This commit goes a step further and ensures the following: * Only one timer thread is created * The main app does not block on exit (waiting for timer thread to finish) This is ensured by having a single thread blocking on an event and keeps calling a function. We use events instead of time.sleep() so as to prevent the python interpreter from running every 50ms checking if the timer has expired (logic copied from threading.Timer)
Diffstat (limited to 'kafka/util.py')
-rw-r--r--kafka/util.py43
1 files changed, 31 insertions, 12 deletions
diff --git a/kafka/util.py b/kafka/util.py
index b3112d5..11db747 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -1,7 +1,7 @@
from collections import defaultdict
from itertools import groupby
import struct
-from threading import Timer
+from threading import Thread, Event
def write_int_string(s):
@@ -81,22 +81,41 @@ class ReentrantTimer(object):
t: timer interval in milliseconds
fn: a callable to invoke
+ args: tuple of args to be passed to function
+ kwargs: keyword arguments to be passed to function
"""
- def __init__(self, t, fn):
- self.timer = None
- self.t = t
+ def __init__(self, t, fn, *args, **kwargs):
+
+ if t <= 0:
+ raise ValueError('Invalid timeout value')
+
+ if not callable(fn):
+ raise ValueError('fn must be callable')
+
+ self.thread = None
+ self.t = t / 1000.0
self.fn = fn
- self.is_active = False
+ self.args = args
+ self.kwargs = kwargs
+ self.active = None
+
+ def _timer(self, active):
+ while not active.wait(self.t):
+ self.fn(*self.args, **self.kwargs)
def start(self):
- if self.timer is not None:
- self.timer.cancel()
+ if self.thread is not None:
+ self.stop()
- self.timer = Timer(self.t / 1000., self.fn)
- self.is_active = True
- self.timer.start()
+ self.active = Event()
+ self.thread = Thread(target=self._timer, args=(self.active))
+ self.thread.daemon = True # So the app exits when main thread exits
+ self.thread.start()
def stop(self):
- self.timer.cancel()
+ if self.thread is None:
+ return
+
+ self.active.set()
+ self.thread.join(self.t + 1)
self.timer = None
- self.is_active = False