diff options
Diffstat (limited to 'lib/git/async')
-rw-r--r-- | lib/git/async/channel.py | 1 | ||||
-rw-r--r-- | lib/git/async/pool.py | 30 | ||||
-rw-r--r-- | lib/git/async/task.py | 3 | ||||
-rw-r--r-- | lib/git/async/thread.py | 9 | ||||
-rw-r--r-- | lib/git/async/util.py | 50 |
5 files changed, 52 insertions, 41 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index c05f7383..58c35f96 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -75,7 +75,6 @@ class WChannel(Channel): an error""" # yes, close it a little too early, better than having anyone put # additional items - # print "closing channel", self self._closed = True self._queue.set_writable(False) diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 1767c61c..7bddf7da 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -53,9 +53,8 @@ class RPoolChannel(RChannel): # I can't explain, but appears to be normal in the destructor # On the caller side, getrefcount returns 2, as expected if sys.getrefcount(self) < 6: - print "__del__" - self._pool.del_task(self._task) - print "done" + self._pool.remove_task(self._task) + # END handle refcount based removal of task def set_pre_cb(self, fun = lambda count: None): """Install a callback to call with the item count to be read before any @@ -237,12 +236,14 @@ class Pool(object): # the list includes our tasks - the first one to evaluate first, the # requested one last for task in dfirst_tasks: - if task.error() or task.is_done(): + # if task.error() or task.is_done(): # in theory, the should never be consumed task in the pool, right ? - # They delete themselves once they are done. - # TODO: remove this check for performance later - raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") - #continue + # They delete themselves once they are done. But as we run asynchronously, + # It can be that someone reads, while a task realizes its done, and + # we get here to prepare the read although it already is done. + # Its not a problem though, the task wiill not do anything. + # Hence we don't waste our time with checking for it + # raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") # END skip processing # if the task does not have the required output on its queue, schedule @@ -316,11 +317,11 @@ class Pool(object): """Called after we processed a read to cleanup""" pass - def _del_task_if_orphaned(self, task): + def _remove_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" # 1 as its stored on the task, 1 for the getrefcount call if sys.getrefcount(task._out_wc) < 3: - self.del_task(task) + self.remove_task(task) #} END internal #{ Interface @@ -351,7 +352,6 @@ class Pool(object): # Just adding more workers is not a problem at all. add_count = size - cur_count for i in range(add_count): - print "Add worker" self.WorkerCls(self._queue).start() # END for each new worker to create self._num_workers += add_count @@ -361,7 +361,6 @@ class Pool(object): # could be added as we speak. del_count = cur_count - size for i in range(del_count): - print "stop worker" self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter # END for each thread to stop self._num_workers -= del_count @@ -390,7 +389,7 @@ class Pool(object): finally: self._taskgraph_lock.release() - def del_task(self, task): + def remove_task(self, task): """Delete the task Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume @@ -399,7 +398,6 @@ class Pool(object): This method blocks until all tasks to be removed have been processed, if they are currently being processed. :return: self""" - print "del_task: getting lock" self._taskgraph_lock.acquire() try: # it can be that the task is already deleted, but its chunk was on the @@ -407,7 +405,7 @@ class Pool(object): if not task in self._tasks.nodes: return self # END early abort - print "deleting ", id(task) + # the task we are currently deleting could also be processed by # a thread right now. We don't care about it as its taking care about # its write channel itself, and sends everything it can to it. @@ -426,7 +424,7 @@ class Pool(object): # END locked deletion for t in in_tasks: - self._del_task_if_orphaned(t) + self._remove_task_if_orphaned(t) # END handle orphans recursively return self diff --git a/lib/git/async/task.py b/lib/git/async/task.py index f9536a45..f1448f96 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -89,7 +89,6 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) - # print "task read", len(items) try: # increase the ref-count - we use this to determine whether anyone else # is currently handling our output channel. As this method runs asynchronously, @@ -119,7 +118,7 @@ class OutputChannelTask(Node): # END handle single apply except Exception, e: self._exc = e - print "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging + print >> sys.stderr, "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging self.set_done() # unschedule all, we don't know how many have been produced actually # but only if we don't apply single please diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 556b7e92..cd964f1c 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -5,6 +5,8 @@ import threading import inspect import Queue +import sys + #{ Decorators def do_terminate_threads(whitelist=list()): @@ -160,14 +162,15 @@ class WorkerThread(TerminatableThread): rval = routine(arg) else: # ignore unknown items - print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) + print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) break # END make routine call except StopProcessing: + print self.name, "stops processing" break except Exception,e: - print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) - break # abort ... + print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) + continue # just continue # END routine exception handling # END endless loop diff --git a/lib/git/async/util.py b/lib/git/async/util.py index f3213ed6..dff38f58 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -71,18 +71,15 @@ class SyncQueue(deque): class HSCondition(object): - """An attempt to make conditions less blocking, which gains performance - in return by sleeping less""" - # __slots__ = ("acquire", "release", "_lock", '_waiters') + """Cleaned up code of the original condition object in order + to make it run and respond faster.""" __slots__ = ("_lock", '_waiters') - delay = 0.00002 # reduces wait times, but increases overhead + delay = 0.0002 # reduces wait times, but increases overhead def __init__(self, lock=None): if lock is None: lock = Lock() self._lock = lock - #self.acquire = lock.acquire - #self.release = lock.release self._waiters = list() def release(self): @@ -109,6 +106,8 @@ class HSCondition(object): # Balancing act: We can't afford a pure busy loop, because of the # GIL, so we have to sleep # We try to sleep only tiny amounts of time though to be very responsive + # NOTE: this branch is not used by the async system anyway, but + # will be hit when the user reads with timeout endtime = _time() + timeout delay = self.delay acquire = waiter.acquire @@ -133,25 +132,36 @@ class HSCondition(object): finally: # reacquire the lock self.acquire() + # END assure release lock def notify(self, n=1): + """Its vital that this method is threadsafe - to be fast we don'd get a lock, + but instead rely on pseudo-atomic operations that come with the GIL. + Hence we use pop in the n=1 case to be truly atomic. + In the multi-notify case, we acquire a lock just for safety, as otherwise + we might pop too much of someone else notifies n waiters as well, which + would in the worst case lead to double-releases of locks.""" if not self._waiters: return - waiters = self._waiters if n == 1: - waiters[0].release() + # so here we assume this is thead-safe ! It wouldn't be in any other + # language, but python it is. try: - waiters.pop(0) + self._waiters.pop(0).release() except IndexError: pass else: - print "notify", waiters, n - for waiter in waiters[:n]: - waiter.release() - try: - waiters.remove(waiter) - except ValueError: - pass + self.acquire() + # once the waiter resumes, he will want to acquire the lock + # and waits again, but only until we are done, which is important + # to do that in a thread-safe fashion + try: + for i in range(min(n, len(self._waiters))): + self._waiters.pop(0).release() + # END for each waiter to resume + finally: + self.release() + # END assure we release our lock # END handle n = 1 case faster def notify_all(self): @@ -164,7 +174,8 @@ class ReadOnly(Exception): class AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance. Additionally it has a threadsafe writable flag, which will alert all readers - that there is nothing more to get here.""" + that there is nothing more to get here. + All default-queue code was cleaned up for performance.""" __slots__ = ('mutex', 'not_empty', 'queue', '_writable') def __init__(self, maxsize=0): @@ -222,7 +233,7 @@ class AsyncQueue(Queue): self.not_empty.notify() def get(self, block=True, timeout=None): - self.not_empty.acquire() # == self.mutex.acquire in that case + self.mutex.acquire() q = self.queue try: if block: @@ -246,7 +257,8 @@ class AsyncQueue(Queue): raise Empty # END handle unblocking reason finally: - self.not_empty.release() + self.mutex.release() + # END assure lock is released #} END utilities |