diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-08 18:20:12 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-08 18:20:12 +0200 |
commit | e5c0002d069382db1768349bf0c5ff40aafbf140 (patch) | |
tree | 0af5a850aa240aff9bc90b0d0e4a31ab1d1ac9eb /lib/git/async | |
parent | 13dd59ba5b3228820841682b59bad6c22476ff66 (diff) | |
download | gitpython-e5c0002d069382db1768349bf0c5ff40aafbf140.tar.gz |
Revised task deletion works well, adjusted test to be creating new tasks all the time instead of reusing its own one, it was somewhat hard to manage its state over time and could cause bugs. It works okay, but it occasionally hangs, it appears to be an empty queue, have to gradually put certain things back in, although in the current mode of operation, it should never have empty queues from the pool to the user
Diffstat (limited to 'lib/git/async')
-rw-r--r-- | lib/git/async/pool.py | 25 | ||||
-rw-r--r-- | lib/git/async/task.py | 64 |
2 files changed, 31 insertions, 58 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index fce5e424..a915f7b0 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -41,8 +41,18 @@ class RPoolChannel(RChannel): def __del__(self): """Assures that our task will be deleted if we were the last reader""" - del(self._wc) # decrement ref-count - self._pool._del_task_if_orphaned(self._task) + del(self._wc) # decrement ref-count early + # now, if this is the last reader to the wc we just handled, there + # is no way anyone will ever read from the task again. If so, + # delete the task in question, it will take care of itself and orphans + # it might leave + # 1 is ourselves, + 1 for the call + 1, and 3 magical ones which + # I can't explain, but appears to be normal in the destructor + # On the caller side, getrefcount returns 2, as expected + if sys.getrefcount(self) < 6: + print "__del__" + self._pool.del_task(self._task) + print "done" def set_pre_cb(self, fun = lambda count: None): """Install a callback to call with the item count to be read before any @@ -105,7 +115,7 @@ class RPoolChannel(RChannel): ####### Finalize ######## self._pool._post_channel_read(self._task) - + return items #{ Internal @@ -227,6 +237,7 @@ class Pool(object): if task.error() or task.is_done(): # in theory, the should never be consumed task in the pool, right ? # They delete themselves once they are done. + # TODO: remove this check for performance later raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") #continue # END skip processing @@ -363,7 +374,11 @@ class Pool(object): def num_tasks(self): """:return: amount of tasks""" - return len(self._tasks.nodes) + self._taskgraph_lock.acquire() + try: + return len(self._tasks.nodes) + finally: + self._taskgraph_lock.release() def del_task(self, task): """Delete the task @@ -374,6 +389,7 @@ class Pool(object): This method blocks until all tasks to be removed have been processed, if they are currently being processed. :return: self""" + print "del_task: getting lock" self._taskgraph_lock.acquire() try: # it can be that the task is already deleted, but its chunk was on the @@ -414,7 +430,6 @@ class Pool(object): wc, rc = Channel() rc = RPoolChannel(wc, task, self) task.set_wc(wc) - task.set_pool(self) self._taskgraph_lock.acquire() try: diff --git a/lib/git/async/task.py b/lib/git/async/task.py index dc207c33..5edd40bb 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,10 +1,11 @@ from graph import Node import threading -import weakref import sys import new +getrefcount = sys.getrefcount + class OutputChannelTask(Node): """Abstracts a named task as part of a set of interdependent tasks, which contains additional information on how the task should be queued and processed. @@ -23,7 +24,6 @@ class OutputChannelTask(Node): '_done', # True if we are done '_scheduled_items', # amount of scheduled items that will be processed in total '_slock', # lock for scheduled items - '_pool_ref', # to be set by Pool 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -54,7 +54,7 @@ class OutputChannelTask(Node): def set_wc(self, wc): """Set the write channel to the given one :note: resets it done state in order to allow proper queue handling""" - self._done = False + self._done = False # TODO : fix this, this is a side-effect self._scheduled_items = 0 self._out_wc = wc @@ -86,10 +86,6 @@ class OutputChannelTask(Node): self._slock.release() # END threadsafe return - def set_pool(self, pool): - """Set our pool to the given one, it will be weakref'd""" - self._pool_ref = weakref.ref(pool) - def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) @@ -123,7 +119,7 @@ class OutputChannelTask(Node): # END handle single apply except Exception, e: self._exc = e - print str(e) # TODO: REMOVE DEBUG, or make it use logging + print "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging self.set_done() # unschedule all, we don't know how many have been produced actually # but only if we don't apply single please @@ -150,18 +146,8 @@ class OutputChannelTask(Node): # The count is: 1 = wc itself, 2 = first reader channel, + x for every # thread having its copy on the stack # + 1 for the instance we provide to refcount - if self.is_done() and sys.getrefcount(self._out_wc) < 4: + if self.is_done() and getrefcount(self._out_wc) < 4: self.close() - # additionally, remove ourselves from the pool, this is thread-safe - # Previously the pool collected done tasks and removed them, - # but this could happen after a read finished, potentially - # leaving them on the queue until the read-handle was dropped. - # This should assure its more in-time. - # I don't like this back-ref. - pool = self._pool_ref() - if pool: - pool.del_task(self) - # END remove ourselves from the pool # END handle channel closure #{ Configuration @@ -218,43 +204,15 @@ class InputChannelTask(OutputChannelTask): """Uses an input channel as source for reading items For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" - __slots__ = ( - 'in_rc' # channel to read items from - ) def __init__(self, in_rc, *args, **kwargs): OutputChannelTask.__init__(self, *args, **kwargs) - self._in_rc = in_rc - + self._read = in_rc.read + def process(self, count=1): - """Verify our setup, and do some additional checking, before the - base implementation can permanently perform all operations""" - self._read = self._in_rc.read - # make sure we don't trigger the pool if we read from a pool channel which - # belongs to our own pool. Channels from different pools are fine though, - # there we want to trigger its computation - # PROBLEM: if the user keeps an end, but decides to put the same end into - # a task of this pool, then all items might deplete without new ones being - # produced, causing a deadlock. Just triggering the pool would be better, - # but cost's more, unnecessarily if there is just one consumer, which is - # the user. - # * could encode usage in the channel type, and fail if the refcount on - # the read-pool channel is too high - # * maybe keep track of the elements that are requested or in-production - # for each task, which would allow to precisely determine whether - # the pool as to be triggered, and bail out early. Problem would - # be the - # * Perhaps one shouldn't seek the perfect solution , but instead - # document whats working and what not, or under which conditions. - # The whole system is simple, but gets more complicated the - # smarter it wants to be. - if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref(): - self._read = self._in_rc._read - - # permanently install our base for processing - self.process = new.instancemethod(OutputChannelTask.__dict__['process'], self, type(self)) - - # and call it - return OutputChannelTask.process(self, count) + # for now, just blindly read our input, could trigger a pool, even + # ours, but why not ? It should be able to handle this + # TODO: remove this method + super(InputChannelTask, self).process(count) #{ Configuration |