diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-09 10:34:12 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-09 10:34:12 +0200 |
commit | 15941ca090a2c3c987324fc911bbc6f89e941c47 (patch) | |
tree | 3c508eb2e8be484e8685cddaa1de72826fbf9302 /lib/git/async | |
parent | f78d4a28f307a9d7943a06be9f919304c25ac2d9 (diff) | |
download | gitpython-15941ca090a2c3c987324fc911bbc6f89e941c47.tar.gz |
queue: fixed critical bug in the notify method, as it was not at all thread-safe, causing locks to be released multiple times. Now it runs very fast, and very stable apparently.
Now its about putting previous features back in, and studying their results, before more complex task graphs can be examined
Diffstat (limited to 'lib/git/async')
-rw-r--r-- | lib/git/async/channel.py | 1 | ||||
-rw-r--r-- | lib/git/async/pool.py | 30 | ||||
-rw-r--r-- | lib/git/async/task.py | 3 | ||||
-rw-r--r-- | lib/git/async/thread.py | 9 | ||||
-rw-r--r-- | lib/git/async/util.py | 50 |
5 files changed, 52 insertions, 41 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index c05f7383..58c35f96 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -75,7 +75,6 @@ class WChannel(Channel): an error""" # yes, close it a little too early, better than having anyone put # additional items - # print "closing channel", self self._closed = True self._queue.set_writable(False) diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 1767c61c..7bddf7da 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -53,9 +53,8 @@ class RPoolChannel(RChannel): # I can't explain, but appears to be normal in the destructor # On the caller side, getrefcount returns 2, as expected if sys.getrefcount(self) < 6: - print "__del__" - self._pool.del_task(self._task) - print "done" + self._pool.remove_task(self._task) + # END handle refcount based removal of task def set_pre_cb(self, fun = lambda count: None): """Install a callback to call with the item count to be read before any @@ -237,12 +236,14 @@ class Pool(object): # the list includes our tasks - the first one to evaluate first, the # requested one last for task in dfirst_tasks: - if task.error() or task.is_done(): + # if task.error() or task.is_done(): # in theory, the should never be consumed task in the pool, right ? - # They delete themselves once they are done. - # TODO: remove this check for performance later - raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") - #continue + # They delete themselves once they are done. But as we run asynchronously, + # It can be that someone reads, while a task realizes its done, and + # we get here to prepare the read although it already is done. + # Its not a problem though, the task wiill not do anything. + # Hence we don't waste our time with checking for it + # raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") # END skip processing # if the task does not have the required output on its queue, schedule @@ -316,11 +317,11 @@ class Pool(object): """Called after we processed a read to cleanup""" pass - def _del_task_if_orphaned(self, task): + def _remove_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" # 1 as its stored on the task, 1 for the getrefcount call if sys.getrefcount(task._out_wc) < 3: - self.del_task(task) + self.remove_task(task) #} END internal #{ Interface @@ -351,7 +352,6 @@ class Pool(object): # Just adding more workers is not a problem at all. add_count = size - cur_count for i in range(add_count): - print "Add worker" self.WorkerCls(self._queue).start() # END for each new worker to create self._num_workers += add_count @@ -361,7 +361,6 @@ class Pool(object): # could be added as we speak. del_count = cur_count - size for i in range(del_count): - print "stop worker" self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter # END for each thread to stop self._num_workers -= del_count @@ -390,7 +389,7 @@ class Pool(object): finally: self._taskgraph_lock.release() - def del_task(self, task): + def remove_task(self, task): """Delete the task Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume @@ -399,7 +398,6 @@ class Pool(object): This method blocks until all tasks to be removed have been processed, if they are currently being processed. :return: self""" - print "del_task: getting lock" self._taskgraph_lock.acquire() try: # it can be that the task is already deleted, but its chunk was on the @@ -407,7 +405,7 @@ class Pool(object): if not task in self._tasks.nodes: return self # END early abort - print "deleting ", id(task) + # the task we are currently deleting could also be processed by # a thread right now. We don't care about it as its taking care about # its write channel itself, and sends everything it can to it. @@ -426,7 +424,7 @@ class Pool(object): # END locked deletion for t in in_tasks: - self._del_task_if_orphaned(t) + self._remove_task_if_orphaned(t) # END handle orphans recursively return self diff --git a/lib/git/async/task.py b/lib/git/async/task.py index f9536a45..f1448f96 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -89,7 +89,6 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) - # print "task read", len(items) try: # increase the ref-count - we use this to determine whether anyone else # is currently handling our output channel. As this method runs asynchronously, @@ -119,7 +118,7 @@ class OutputChannelTask(Node): # END handle single apply except Exception, e: self._exc = e - print "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging + print >> sys.stderr, "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging self.set_done() # unschedule all, we don't know how many have been produced actually # but only if we don't apply single please diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 556b7e92..cd964f1c 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -5,6 +5,8 @@ import threading import inspect import Queue +import sys + #{ Decorators def do_terminate_threads(whitelist=list()): @@ -160,14 +162,15 @@ class WorkerThread(TerminatableThread): rval = routine(arg) else: # ignore unknown items - print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) + print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) break # END make routine call except StopProcessing: + print self.name, "stops processing" break except Exception,e: - print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) - break # abort ... + print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) + continue # just continue # END routine exception handling # END endless loop diff --git a/lib/git/async/util.py b/lib/git/async/util.py index f3213ed6..dff38f58 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -71,18 +71,15 @@ class SyncQueue(deque): class HSCondition(object): - """An attempt to make conditions less blocking, which gains performance - in return by sleeping less""" - # __slots__ = ("acquire", "release", "_lock", '_waiters') + """Cleaned up code of the original condition object in order + to make it run and respond faster.""" __slots__ = ("_lock", '_waiters') - delay = 0.00002 # reduces wait times, but increases overhead + delay = 0.0002 # reduces wait times, but increases overhead def __init__(self, lock=None): if lock is None: lock = Lock() self._lock = lock - #self.acquire = lock.acquire - #self.release = lock.release self._waiters = list() def release(self): @@ -109,6 +106,8 @@ class HSCondition(object): # Balancing act: We can't afford a pure busy loop, because of the # GIL, so we have to sleep # We try to sleep only tiny amounts of time though to be very responsive + # NOTE: this branch is not used by the async system anyway, but + # will be hit when the user reads with timeout endtime = _time() + timeout delay = self.delay acquire = waiter.acquire @@ -133,25 +132,36 @@ class HSCondition(object): finally: # reacquire the lock self.acquire() + # END assure release lock def notify(self, n=1): + """Its vital that this method is threadsafe - to be fast we don'd get a lock, + but instead rely on pseudo-atomic operations that come with the GIL. + Hence we use pop in the n=1 case to be truly atomic. + In the multi-notify case, we acquire a lock just for safety, as otherwise + we might pop too much of someone else notifies n waiters as well, which + would in the worst case lead to double-releases of locks.""" if not self._waiters: return - waiters = self._waiters if n == 1: - waiters[0].release() + # so here we assume this is thead-safe ! It wouldn't be in any other + # language, but python it is. try: - waiters.pop(0) + self._waiters.pop(0).release() except IndexError: pass else: - print "notify", waiters, n - for waiter in waiters[:n]: - waiter.release() - try: - waiters.remove(waiter) - except ValueError: - pass + self.acquire() + # once the waiter resumes, he will want to acquire the lock + # and waits again, but only until we are done, which is important + # to do that in a thread-safe fashion + try: + for i in range(min(n, len(self._waiters))): + self._waiters.pop(0).release() + # END for each waiter to resume + finally: + self.release() + # END assure we release our lock # END handle n = 1 case faster def notify_all(self): @@ -164,7 +174,8 @@ class ReadOnly(Exception): class AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance. Additionally it has a threadsafe writable flag, which will alert all readers - that there is nothing more to get here.""" + that there is nothing more to get here. + All default-queue code was cleaned up for performance.""" __slots__ = ('mutex', 'not_empty', 'queue', '_writable') def __init__(self, maxsize=0): @@ -222,7 +233,7 @@ class AsyncQueue(Queue): self.not_empty.notify() def get(self, block=True, timeout=None): - self.not_empty.acquire() # == self.mutex.acquire in that case + self.mutex.acquire() q = self.queue try: if block: @@ -246,7 +257,8 @@ class AsyncQueue(Queue): raise Empty # END handle unblocking reason finally: - self.not_empty.release() + self.mutex.release() + # END assure lock is released #} END utilities |