summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-06-25 05:56:08 -0700
committerDavid Arthur <mumrah@gmail.com>2013-06-25 05:56:08 -0700
commit64b757868a75f47752ec26f7b5d49f84058cea40 (patch)
tree79f237efd502c10d618b40c68e7d56963c9130aa
parent883eed1f8ce1af37c621ad6ec89dc993694fd29b (diff)
parent119d4114f9ddb909b2147e75006da6ac13ad3ed8 (diff)
downloadkafka-python-64b757868a75f47752ec26f7b5d49f84058cea40.tar.gz
Merge pull request #31 from mahendra/lazythread
Optimize auto-commit process
-rw-r--r--kafka/consumer.py26
-rw-r--r--kafka/util.py40
2 files changed, 40 insertions, 26 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index c9f12e1..f8855dc 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -58,7 +58,7 @@ class SimpleConsumer(object):
# Set up the auto-commit timer
if auto_commit is True and auto_commit_every_t is not None:
self.commit_timer = ReentrantTimer(auto_commit_every_t,
- self._timed_commit)
+ self.commit)
self.commit_timer.start()
def get_or_init_offset_callback(resp):
@@ -150,15 +150,6 @@ class SimpleConsumer(object):
return total
- def _timed_commit(self):
- """
- Commit offsets as part of timer
- """
- self.commit()
-
- # Once the commit is done, start the timer again
- self.commit_timer.start()
-
def commit(self, partitions=[]):
"""
Commit offsets for this consumer
@@ -167,11 +158,17 @@ class SimpleConsumer(object):
all of them
"""
- # short circuit if nothing happened
+ # short circuit if nothing happened. This check is kept outside
+ # to prevent un-necessarily acquiring a lock for checking the state
if self.count_since_commit == 0:
return
with self.commit_lock:
+ # Do this check again, just in case the state has changed
+ # during the lock acquiring timeout
+ if self.count_since_commit == 0:
+ return
+
reqs = []
if len(partitions) == 0: # commit all partitions
partitions = self.offsets.keys()
@@ -201,12 +198,7 @@ class SimpleConsumer(object):
return
if self.count_since_commit > self.auto_commit_every_n:
- if self.commit_timer is not None:
- self.commit_timer.stop()
- self.commit()
- self.commit_timer.start()
- else:
- self.commit()
+ self.commit()
def __iter__(self):
"""
diff --git a/kafka/util.py b/kafka/util.py
index 10bf838..11178f5 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -1,7 +1,7 @@
from collections import defaultdict
from itertools import groupby
import struct
-from threading import Timer
+from threading import Thread, Event
def write_int_string(s):
@@ -81,19 +81,41 @@ class ReentrantTimer(object):
t: timer interval in milliseconds
fn: a callable to invoke
+ args: tuple of args to be passed to function
+ kwargs: keyword arguments to be passed to function
"""
- def __init__(self, t, fn):
- self.timer = None
- self.t = t
+ def __init__(self, t, fn, *args, **kwargs):
+
+ if t <= 0:
+ raise ValueError('Invalid timeout value')
+
+ if not callable(fn):
+ raise ValueError('fn must be callable')
+
+ self.thread = None
+ self.t = t / 1000.0
self.fn = fn
+ self.args = args
+ self.kwargs = kwargs
+ self.active = None
+
+ def _timer(self, active):
+ while not active.wait(self.t):
+ self.fn(*self.args, **self.kwargs)
def start(self):
- if self.timer is not None:
- self.timer.cancel()
+ if self.thread is not None:
+ self.stop()
- self.timer = Timer(self.t / 1000., self.fn)
- self.timer.start()
+ self.active = Event()
+ self.thread = Thread(target=self._timer, args=(self.active,))
+ self.thread.daemon = True # So the app exits when main thread exits
+ self.thread.start()
def stop(self):
- self.timer.cancel()
+ if self.thread is None:
+ return
+
+ self.active.set()
+ self.thread.join(self.t + 1)
self.timer = None