summaryrefslogtreecommitdiff
path: root/lib/git/async/task.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async/task.py')
-rw-r--r--lib/git/async/task.py64
1 files changed, 11 insertions, 53 deletions
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index dc207c33..5edd40bb 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -1,10 +1,11 @@
from graph import Node
import threading
-import weakref
import sys
import new
+getrefcount = sys.getrefcount
+
class OutputChannelTask(Node):
"""Abstracts a named task as part of a set of interdependent tasks, which contains
additional information on how the task should be queued and processed.
@@ -23,7 +24,6 @@ class OutputChannelTask(Node):
'_done', # True if we are done
'_scheduled_items', # amount of scheduled items that will be processed in total
'_slock', # lock for scheduled items
- '_pool_ref', # to be set by Pool
'fun', # function to call with items read
'min_count', # minimum amount of items to produce, None means no override
'max_chunksize', # maximium amount of items to process per process call
@@ -54,7 +54,7 @@ class OutputChannelTask(Node):
def set_wc(self, wc):
"""Set the write channel to the given one
:note: resets it done state in order to allow proper queue handling"""
- self._done = False
+ self._done = False # TODO : fix this, this is a side-effect
self._scheduled_items = 0
self._out_wc = wc
@@ -86,10 +86,6 @@ class OutputChannelTask(Node):
self._slock.release()
# END threadsafe return
- def set_pool(self, pool):
- """Set our pool to the given one, it will be weakref'd"""
- self._pool_ref = weakref.ref(pool)
-
def process(self, count=0):
"""Process count items and send the result individually to the output channel"""
items = self._read(count)
@@ -123,7 +119,7 @@ class OutputChannelTask(Node):
# END handle single apply
except Exception, e:
self._exc = e
- print str(e) # TODO: REMOVE DEBUG, or make it use logging
+ print "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging
self.set_done()
# unschedule all, we don't know how many have been produced actually
# but only if we don't apply single please
@@ -150,18 +146,8 @@ class OutputChannelTask(Node):
# The count is: 1 = wc itself, 2 = first reader channel, + x for every
# thread having its copy on the stack
# + 1 for the instance we provide to refcount
- if self.is_done() and sys.getrefcount(self._out_wc) < 4:
+ if self.is_done() and getrefcount(self._out_wc) < 4:
self.close()
- # additionally, remove ourselves from the pool, this is thread-safe
- # Previously the pool collected done tasks and removed them,
- # but this could happen after a read finished, potentially
- # leaving them on the queue until the read-handle was dropped.
- # This should assure its more in-time.
- # I don't like this back-ref.
- pool = self._pool_ref()
- if pool:
- pool.del_task(self)
- # END remove ourselves from the pool
# END handle channel closure
#{ Configuration
@@ -218,43 +204,15 @@ class InputChannelTask(OutputChannelTask):
"""Uses an input channel as source for reading items
For instantiation, it takes all arguments of its base, the first one needs
to be the input channel to read from though."""
- __slots__ = (
- 'in_rc' # channel to read items from
- )
def __init__(self, in_rc, *args, **kwargs):
OutputChannelTask.__init__(self, *args, **kwargs)
- self._in_rc = in_rc
-
+ self._read = in_rc.read
+
def process(self, count=1):
- """Verify our setup, and do some additional checking, before the
- base implementation can permanently perform all operations"""
- self._read = self._in_rc.read
- # make sure we don't trigger the pool if we read from a pool channel which
- # belongs to our own pool. Channels from different pools are fine though,
- # there we want to trigger its computation
- # PROBLEM: if the user keeps an end, but decides to put the same end into
- # a task of this pool, then all items might deplete without new ones being
- # produced, causing a deadlock. Just triggering the pool would be better,
- # but cost's more, unnecessarily if there is just one consumer, which is
- # the user.
- # * could encode usage in the channel type, and fail if the refcount on
- # the read-pool channel is too high
- # * maybe keep track of the elements that are requested or in-production
- # for each task, which would allow to precisely determine whether
- # the pool as to be triggered, and bail out early. Problem would
- # be the
- # * Perhaps one shouldn't seek the perfect solution , but instead
- # document whats working and what not, or under which conditions.
- # The whole system is simple, but gets more complicated the
- # smarter it wants to be.
- if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref():
- self._read = self._in_rc._read
-
- # permanently install our base for processing
- self.process = new.instancemethod(OutputChannelTask.__dict__['process'], self, type(self))
-
- # and call it
- return OutputChannelTask.process(self, count)
+ # for now, just blindly read our input, could trigger a pool, even
+ # ours, but why not ? It should be able to handle this
+ # TODO: remove this method
+ super(InputChannelTask, self).process(count)
#{ Configuration