summaryrefslogtreecommitdiff
path: root/lib/git/async
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async')
-rw-r--r--lib/git/async/channel.py1
-rw-r--r--lib/git/async/pool.py30
-rw-r--r--lib/git/async/task.py3
-rw-r--r--lib/git/async/thread.py9
-rw-r--r--lib/git/async/util.py50
5 files changed, 52 insertions, 41 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
index c05f7383..58c35f96 100644
--- a/lib/git/async/channel.py
+++ b/lib/git/async/channel.py
@@ -75,7 +75,6 @@ class WChannel(Channel):
an error"""
# yes, close it a little too early, better than having anyone put
# additional items
- # print "closing channel", self
self._closed = True
self._queue.set_writable(False)
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 1767c61c..7bddf7da 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -53,9 +53,8 @@ class RPoolChannel(RChannel):
# I can't explain, but appears to be normal in the destructor
# On the caller side, getrefcount returns 2, as expected
if sys.getrefcount(self) < 6:
- print "__del__"
- self._pool.del_task(self._task)
- print "done"
+ self._pool.remove_task(self._task)
+ # END handle refcount based removal of task
def set_pre_cb(self, fun = lambda count: None):
"""Install a callback to call with the item count to be read before any
@@ -237,12 +236,14 @@ class Pool(object):
# the list includes our tasks - the first one to evaluate first, the
# requested one last
for task in dfirst_tasks:
- if task.error() or task.is_done():
+ # if task.error() or task.is_done():
# in theory, the should never be consumed task in the pool, right ?
- # They delete themselves once they are done.
- # TODO: remove this check for performance later
- raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?")
- #continue
+ # They delete themselves once they are done. But as we run asynchronously,
+ # It can be that someone reads, while a task realizes its done, and
+ # we get here to prepare the read although it already is done.
+ # Its not a problem though, the task wiill not do anything.
+ # Hence we don't waste our time with checking for it
+ # raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?")
# END skip processing
# if the task does not have the required output on its queue, schedule
@@ -316,11 +317,11 @@ class Pool(object):
"""Called after we processed a read to cleanup"""
pass
- def _del_task_if_orphaned(self, task):
+ def _remove_task_if_orphaned(self, task):
"""Check the task, and delete it if it is orphaned"""
# 1 as its stored on the task, 1 for the getrefcount call
if sys.getrefcount(task._out_wc) < 3:
- self.del_task(task)
+ self.remove_task(task)
#} END internal
#{ Interface
@@ -351,7 +352,6 @@ class Pool(object):
# Just adding more workers is not a problem at all.
add_count = size - cur_count
for i in range(add_count):
- print "Add worker"
self.WorkerCls(self._queue).start()
# END for each new worker to create
self._num_workers += add_count
@@ -361,7 +361,6 @@ class Pool(object):
# could be added as we speak.
del_count = cur_count - size
for i in range(del_count):
- print "stop worker"
self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter
# END for each thread to stop
self._num_workers -= del_count
@@ -390,7 +389,7 @@ class Pool(object):
finally:
self._taskgraph_lock.release()
- def del_task(self, task):
+ def remove_task(self, task):
"""Delete the task
Additionally we will remove orphaned tasks, which can be identified if their
output channel is only held by themselves, so no one will ever consume
@@ -399,7 +398,6 @@ class Pool(object):
This method blocks until all tasks to be removed have been processed, if
they are currently being processed.
:return: self"""
- print "del_task: getting lock"
self._taskgraph_lock.acquire()
try:
# it can be that the task is already deleted, but its chunk was on the
@@ -407,7 +405,7 @@ class Pool(object):
if not task in self._tasks.nodes:
return self
# END early abort
- print "deleting ", id(task)
+
# the task we are currently deleting could also be processed by
# a thread right now. We don't care about it as its taking care about
# its write channel itself, and sends everything it can to it.
@@ -426,7 +424,7 @@ class Pool(object):
# END locked deletion
for t in in_tasks:
- self._del_task_if_orphaned(t)
+ self._remove_task_if_orphaned(t)
# END handle orphans recursively
return self
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index f9536a45..f1448f96 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -89,7 +89,6 @@ class OutputChannelTask(Node):
def process(self, count=0):
"""Process count items and send the result individually to the output channel"""
items = self._read(count)
- # print "task read", len(items)
try:
# increase the ref-count - we use this to determine whether anyone else
# is currently handling our output channel. As this method runs asynchronously,
@@ -119,7 +118,7 @@ class OutputChannelTask(Node):
# END handle single apply
except Exception, e:
self._exc = e
- print "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging
+ print >> sys.stderr, "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging
self.set_done()
# unschedule all, we don't know how many have been produced actually
# but only if we don't apply single please
diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py
index 556b7e92..cd964f1c 100644
--- a/lib/git/async/thread.py
+++ b/lib/git/async/thread.py
@@ -5,6 +5,8 @@ import threading
import inspect
import Queue
+import sys
+
#{ Decorators
def do_terminate_threads(whitelist=list()):
@@ -160,14 +162,15 @@ class WorkerThread(TerminatableThread):
rval = routine(arg)
else:
# ignore unknown items
- print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
+ print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
break
# END make routine call
except StopProcessing:
+ print self.name, "stops processing"
break
except Exception,e:
- print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e))
- break # abort ...
+ print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e))
+ continue # just continue
# END routine exception handling
# END endless loop
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
index f3213ed6..dff38f58 100644
--- a/lib/git/async/util.py
+++ b/lib/git/async/util.py
@@ -71,18 +71,15 @@ class SyncQueue(deque):
class HSCondition(object):
- """An attempt to make conditions less blocking, which gains performance
- in return by sleeping less"""
- # __slots__ = ("acquire", "release", "_lock", '_waiters')
+ """Cleaned up code of the original condition object in order
+ to make it run and respond faster."""
__slots__ = ("_lock", '_waiters')
- delay = 0.00002 # reduces wait times, but increases overhead
+ delay = 0.0002 # reduces wait times, but increases overhead
def __init__(self, lock=None):
if lock is None:
lock = Lock()
self._lock = lock
- #self.acquire = lock.acquire
- #self.release = lock.release
self._waiters = list()
def release(self):
@@ -109,6 +106,8 @@ class HSCondition(object):
# Balancing act: We can't afford a pure busy loop, because of the
# GIL, so we have to sleep
# We try to sleep only tiny amounts of time though to be very responsive
+ # NOTE: this branch is not used by the async system anyway, but
+ # will be hit when the user reads with timeout
endtime = _time() + timeout
delay = self.delay
acquire = waiter.acquire
@@ -133,25 +132,36 @@ class HSCondition(object):
finally:
# reacquire the lock
self.acquire()
+ # END assure release lock
def notify(self, n=1):
+ """Its vital that this method is threadsafe - to be fast we don'd get a lock,
+ but instead rely on pseudo-atomic operations that come with the GIL.
+ Hence we use pop in the n=1 case to be truly atomic.
+ In the multi-notify case, we acquire a lock just for safety, as otherwise
+ we might pop too much of someone else notifies n waiters as well, which
+ would in the worst case lead to double-releases of locks."""
if not self._waiters:
return
- waiters = self._waiters
if n == 1:
- waiters[0].release()
+ # so here we assume this is thead-safe ! It wouldn't be in any other
+ # language, but python it is.
try:
- waiters.pop(0)
+ self._waiters.pop(0).release()
except IndexError:
pass
else:
- print "notify", waiters, n
- for waiter in waiters[:n]:
- waiter.release()
- try:
- waiters.remove(waiter)
- except ValueError:
- pass
+ self.acquire()
+ # once the waiter resumes, he will want to acquire the lock
+ # and waits again, but only until we are done, which is important
+ # to do that in a thread-safe fashion
+ try:
+ for i in range(min(n, len(self._waiters))):
+ self._waiters.pop(0).release()
+ # END for each waiter to resume
+ finally:
+ self.release()
+ # END assure we release our lock
# END handle n = 1 case faster
def notify_all(self):
@@ -164,7 +174,8 @@ class ReadOnly(Exception):
class AsyncQueue(Queue):
"""A queue using different condition objects to gain multithreading performance.
Additionally it has a threadsafe writable flag, which will alert all readers
- that there is nothing more to get here."""
+ that there is nothing more to get here.
+ All default-queue code was cleaned up for performance."""
__slots__ = ('mutex', 'not_empty', 'queue', '_writable')
def __init__(self, maxsize=0):
@@ -222,7 +233,7 @@ class AsyncQueue(Queue):
self.not_empty.notify()
def get(self, block=True, timeout=None):
- self.not_empty.acquire() # == self.mutex.acquire in that case
+ self.mutex.acquire()
q = self.queue
try:
if block:
@@ -246,7 +257,8 @@ class AsyncQueue(Queue):
raise Empty
# END handle unblocking reason
finally:
- self.not_empty.release()
+ self.mutex.release()
+ # END assure lock is released
#} END utilities