summaryrefslogtreecommitdiff
path: root/lib/git/async/task.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-11 14:59:02 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-11 14:59:02 +0200
commitf606937a7a21237c866efafcad33675e6539c103 (patch)
tree13ba7731de4798b2c9bfa24ccc893e4d8e5b8e8d /lib/git/async/task.py
parent257a8a9441fca9a9bc384f673ba86ef5c3f1715d (diff)
parent18e3252a1f655f09093a4cffd5125342a8f94f3b (diff)
downloadgitpython-f606937a7a21237c866efafcad33675e6539c103.tar.gz
Merge branch 'taskdep' into async
Diffstat (limited to 'lib/git/async/task.py')
-rw-r--r--lib/git/async/task.py143
1 files changed, 99 insertions, 44 deletions
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index 03b40492..49e7e7cf 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -1,19 +1,17 @@
from graph import Node
-from channel import WChannel
from util import ReadOnly
import threading
+import weakref
import sys
import new
-getrefcount = sys.getrefcount
-
class OutputChannelTask(Node):
"""Abstracts a named task as part of a set of interdependent tasks, which contains
additional information on how the task should be queued and processed.
Results of the item processing are sent to a write channel, which is to be
- set by the creator using the ``set_wchannel`` method.
+ set by the creator using the ``set_writer`` method.
* **min_count** assures that not less than min_count items will be processed per call.
* **max_chunksize** assures that multi-threading is happening in smaller chunks. If
@@ -21,9 +19,11 @@ class OutputChannelTask(Node):
one worker, as well as dependent tasks. If you want finer granularity , you can
specify this here, causing chunks to be no larger than max_chunksize"""
__slots__ = ( '_read', # method to yield items to process
- '_out_wc', # output write channel
+ '_out_writer', # output write channel
'_exc', # exception caught
'_done', # True if we are done
+ '_num_writers', # number of concurrent writers
+ '_wlock', # lock for the above
'fun', # function to call with items read
'min_count', # minimum amount of items to produce, None means no override
'max_chunksize', # maximium amount of items to process per process call
@@ -31,12 +31,14 @@ class OutputChannelTask(Node):
)
def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0,
- wchannel=None):
+ writer=None):
Node.__init__(self, id)
self._read = None # to be set by subclasss
- self._out_wc = wchannel # to be set later
+ self._out_writer = writer
self._exc = None
self._done = False
+ self._num_writers = 0
+ self._wlock = threading.Lock()
self.fun = fun
self.min_count = None
self.max_chunksize = 0 # note set
@@ -50,29 +52,29 @@ class OutputChannelTask(Node):
"""Set ourselves to being done, has we have completed the processing"""
self._done = True
- def set_wchannel(self, wc):
+ def set_writer(self, writer):
"""Set the write channel to the given one"""
- self._out_wc = wc
+ self._out_writer = writer
- def wchannel(self):
+ def writer(self):
""":return: a proxy to our write channel or None if non is set
:note: you must not hold a reference to our write channel when the
task is being processed. This would cause the write channel never
to be closed as the task will think there is still another instance
being processed which can close the channel once it is done.
In the worst case, this will block your reads."""
- if self._out_wc is None:
+ if self._out_writer is None:
return None
- return self._out_wc
+ return self._out_writer
def close(self):
"""A closed task will close its channel to assure the readers will wake up
:note: its safe to call this method multiple times"""
- self._out_wc.close()
+ self._out_writer.close()
def is_closed(self):
""":return: True if the task's write channel is closed"""
- return self._out_wc.closed()
+ return self._out_writer.closed()
def error(self):
""":return: Exception caught during last processing or None"""
@@ -80,30 +82,42 @@ class OutputChannelTask(Node):
def process(self, count=0):
"""Process count items and send the result individually to the output channel"""
+ # first thing: increment the writer count - other tasks must be able
+ # to respond properly ( even if it turns out we don't need it later )
+ self._wlock.acquire()
+ self._num_writers += 1
+ self._wlock.release()
+
+ #print "%r: reading %i" % (self.id, count)
+ #if hasattr(self, 'reader'):
+ # print "from", self.reader().channel
items = self._read(count)
+ #print "%r: done reading %i items" % (self.id, len(items))
+
try:
- # increase the ref-count - we use this to determine whether anyone else
- # is currently handling our output channel. As this method runs asynchronously,
- # we have to make sure that the channel is closed by the last finishing task,
- # which is not necessarily the one which determines that he is done
- # as he couldn't read anymore items.
- # The refcount will be dropped in the moment we get out of here.
- wc = self._out_wc
- if self.apply_single:
- for item in items:
- rval = self.fun(item)
- wc.write(rval)
- # END for each item
- else:
- # shouldn't apply single be the default anyway ?
- # The task designers should chunk them up in advance
- rvals = self.fun(items)
- for rval in rvals:
- wc.write(rval)
- # END handle single apply
+ try:
+ if items:
+ write = self._out_writer.write
+ if self.apply_single:
+ for item in items:
+ rval = self.fun(item)
+ write(rval)
+ # END for each item
+ else:
+ # shouldn't apply single be the default anyway ?
+ # The task designers should chunk them up in advance
+ rvals = self.fun(items)
+ for rval in rvals:
+ write(rval)
+ # END handle single apply
+ # END if there is anything to do
+ finally:
+ self._wlock.acquire()
+ self._num_writers -= 1
+ self._wlock.release()
+ # END handle writer count
except Exception, e:
- print >> sys.stderr, "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging
-
+ print >> sys.stderr, "task %s error:" % self.id, type(e), str(e) # TODO: REMOVE DEBUG, or make it use logging
# be sure our task is not scheduled again
self.set_done()
@@ -126,7 +140,7 @@ class OutputChannelTask(Node):
self._exc = e
# END set error flag
# END exception handling
- del(wc)
+
# if we didn't get all demanded items, which is also the case if count is 0
# we have depleted the input channel and are done
@@ -145,8 +159,17 @@ class OutputChannelTask(Node):
# The count is: 1 = wc itself, 2 = first reader channel, + x for every
# thread having its copy on the stack
# + 1 for the instance we provide to refcount
- if self.is_done() and getrefcount(self._out_wc) < 4:
- self.close()
+ # Soft close, so others can continue writing their results
+ if self.is_done():
+ self._wlock.acquire()
+ try:
+ if self._num_writers == 0:
+ # print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel
+ self.close()
+ # END handle writers
+ finally:
+ self._wlock.release()
+ # END assure lock release
# END handle channel closure
#{ Configuration
@@ -159,7 +182,7 @@ class ThreadTaskBase(object):
class InputIteratorTaskBase(OutputChannelTask):
"""Implements a task which processes items from an iterable in a multi-processing
safe manner"""
- __slots__ = ('_iterator', '_lock')
+ __slots__ = ('_iterator', '_lock', '_empty')
# the type of the lock to use when reading from the iterator
lock_type = None
@@ -169,13 +192,25 @@ class InputIteratorTaskBase(OutputChannelTask):
raise ValueError("Iterator %r needs a next() function" % iterator)
self._iterator = iterator
self._lock = self.lock_type()
- self._read = self.__read
+
+ # this is necessary to prevent a cyclic ref, preventing us from
+ # getting deleted ( and collected )
+ weakself = weakref.ref(self)
+ self._read = lambda count: weakself().__read(count)
+ self._empty = False
def __read(self, count=0):
"""Read count items from the iterator, and return them"""
+ # not threadsafe, but worst thing that could happen is that
+ # we try to get items one more time
+ if self._empty:
+ return list()
+ # END early abort
+
self._lock.acquire()
try:
if count == 0:
+ self._empty = True
return list(self._iterator)
else:
out = list()
@@ -184,6 +219,7 @@ class InputIteratorTaskBase(OutputChannelTask):
try:
out.append(it.next())
except StopIteration:
+ self._empty = True
break
# END handle empty iterator
# END for each item to take
@@ -199,17 +235,36 @@ class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase):
lock_type = threading.Lock
-class InputChannelTask(OutputChannelTask):
+class InputChannelTask(OutputChannelTask, ThreadTaskBase):
"""Uses an input channel as source for reading items
For instantiation, it takes all arguments of its base, the first one needs
to be the input channel to read from though."""
+ __slots__ = "_pool_ref"
- def __init__(self, in_rc, *args, **kwargs):
+ def __init__(self, in_reader, *args, **kwargs):
OutputChannelTask.__init__(self, *args, **kwargs)
- self._read = in_rc.read
+ self._read = in_reader.read
+ self._pool_ref = None
+
+ #{ Internal Interface
- def rchannel(self):
+ def reader(self):
""":return: input channel from which we read"""
# the instance is bound in its instance method - lets use this to keep
# the refcount at one ( per consumer )
return self._read.im_self
+
+ def set_read(self, read):
+ """Adjust the read method to the given one"""
+ self._read = read
+
+ def set_pool(self, pool):
+ self._pool_ref = weakref.ref(pool)
+
+ def pool(self):
+ """:return: pool we are attached to, or None"""
+ if self._pool_ref is None:
+ return None
+ return self._pool_ref()
+
+ #} END intenral interface