diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-12 12:41:20 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-12 12:41:20 +0200 |
commit | f91495e271597034226f1b9651345091083172c4 (patch) | |
tree | e0e2aa63b7dc649083858366eaedb6ac4cc5739b /lib/git/async/task.py | |
parent | 7c1169f6ea406fec1e26e99821e18e66437e65eb (diff) | |
parent | 7a0b79ee574999ecbc76696506352e4a5a0d7159 (diff) | |
download | gitpython-f91495e271597034226f1b9651345091083172c4.tar.gz |
Merge branch 'async'
Diffstat (limited to 'lib/git/async/task.py')
-rw-r--r-- | lib/git/async/task.py | 237 |
1 files changed, 237 insertions, 0 deletions
diff --git a/lib/git/async/task.py b/lib/git/async/task.py new file mode 100644 index 00000000..ac948dc0 --- /dev/null +++ b/lib/git/async/task.py @@ -0,0 +1,237 @@ +from graph import Node +from util import ReadOnly +from channel import IteratorReader + + +import threading +import weakref +import sys +import new + +__all__ = ('Task', 'ThreadTaskBase', 'IteratorTaskBase', + 'IteratorThreadTask', 'ChannelThreadTask') + +class Task(Node): + """Abstracts a named task, which contains + additional information on how the task should be queued and processed. + + Results of the item processing are sent to a writer, which is to be + set by the creator using the ``set_writer`` method. + + Items are read using the internal ``_read`` callable, subclasses are meant to + set this to a callable that supports the Reader interface's read function. + + * **min_count** assures that not less than min_count items will be processed per call. + * **max_chunksize** assures that multi-threading is happening in smaller chunks. If + someone wants all items to be processed, using read(0), the whole task would go to + one worker, as well as dependent tasks. If you want finer granularity , you can + specify this here, causing chunks to be no larger than max_chunksize + * **apply_single** if True, default True, individual items will be given to the + worker function. If False, a list of possibly multiple items will be passed + instead.""" + __slots__ = ( '_read', # method to yield items to process + '_out_writer', # output write channel + '_exc', # exception caught + '_done', # True if we are done + '_num_writers', # number of concurrent writers + '_wlock', # lock for the above + 'fun', # function to call with items read + 'min_count', # minimum amount of items to produce, None means no override + 'max_chunksize', # maximium amount of items to process per process call + 'apply_single' # apply single items even if multiple where read + ) + + def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0, + writer=None): + Node.__init__(self, id) + self._read = None # to be set by subclasss + self._out_writer = writer + self._exc = None + self._done = False + self._num_writers = 0 + self._wlock = threading.Lock() + self.fun = fun + self.min_count = None + self.max_chunksize = 0 # note set + self.apply_single = apply_single + + def is_done(self): + """:return: True if we are finished processing""" + return self._done + + def set_done(self): + """Set ourselves to being done, has we have completed the processing""" + self._done = True + + def set_writer(self, writer): + """Set the write channel to the given one""" + self._out_writer = writer + + def writer(self): + """:return: a proxy to our write channel or None if non is set + :note: you must not hold a reference to our write channel when the + task is being processed. This would cause the write channel never + to be closed as the task will think there is still another instance + being processed which can close the channel once it is done. + In the worst case, this will block your reads.""" + if self._out_writer is None: + return None + return self._out_writer + + def close(self): + """A closed task will close its channel to assure the readers will wake up + :note: its safe to call this method multiple times""" + self._out_writer.close() + + def is_closed(self): + """:return: True if the task's write channel is closed""" + return self._out_writer.closed() + + def error(self): + """:return: Exception caught during last processing or None""" + return self._exc + + def process(self, count=0): + """Process count items and send the result individually to the output channel""" + # first thing: increment the writer count - other tasks must be able + # to respond properly ( even if it turns out we don't need it later ) + self._wlock.acquire() + self._num_writers += 1 + self._wlock.release() + + items = self._read(count) + + try: + try: + if items: + write = self._out_writer.write + if self.apply_single: + for item in items: + rval = self.fun(item) + write(rval) + # END for each item + else: + # shouldn't apply single be the default anyway ? + # The task designers should chunk them up in advance + rvals = self.fun(items) + for rval in rvals: + write(rval) + # END handle single apply + # END if there is anything to do + finally: + self._wlock.acquire() + self._num_writers -= 1 + self._wlock.release() + # END handle writer count + except Exception, e: + # be sure our task is not scheduled again + self.set_done() + + # PROBLEM: We have failed to create at least one item, hence its not + # garantueed that enough items will be produced for a possibly blocking + # client on the other end. This is why we have no other choice but + # to close the channel, preventing the possibility of blocking. + # This implies that dependent tasks will go down with us, but that is + # just the right thing to do of course - one loose link in the chain ... + # Other chunks of our kind currently being processed will then + # fail to write to the channel and fail as well + self.close() + + # If some other chunk of our Task had an error, the channel will be closed + # This is not an issue, just be sure we don't overwrite the original + # exception with the ReadOnly error that would be emitted in that case. + # We imply that ReadOnly is exclusive to us, as it won't be an error + # if the user emits it + if not isinstance(e, ReadOnly): + self._exc = e + # END set error flag + # END exception handling + + + # if we didn't get all demanded items, which is also the case if count is 0 + # we have depleted the input channel and are done + # We could check our output channel for how many items we have and put that + # into the equation, but whats important is that we were asked to produce + # count items. + if not items or len(items) != count: + self.set_done() + # END handle done state + + # If we appear to be the only one left with our output channel, and are + # done ( this could have been set in another thread as well ), make + # sure to close the output channel. + # Waiting with this to be the last one helps to keep the + # write-channel writable longer + # The count is: 1 = wc itself, 2 = first reader channel, + x for every + # thread having its copy on the stack + # + 1 for the instance we provide to refcount + # Soft close, so others can continue writing their results + if self.is_done(): + self._wlock.acquire() + try: + if self._num_writers == 0: + self.close() + # END handle writers + finally: + self._wlock.release() + # END assure lock release + # END handle channel closure + #{ Configuration + + +class ThreadTaskBase(object): + """Describes tasks which can be used with theaded pools""" + pass + + +class IteratorTaskBase(Task): + """Implements a task which processes items from an iterable in a multi-processing + safe manner""" + __slots__ = tuple() + + + def __init__(self, iterator, *args, **kwargs): + Task.__init__(self, *args, **kwargs) + self._read = IteratorReader(iterator).read + # defaults to returning our items unchanged + self.fun = lambda item: item + + +class IteratorThreadTask(IteratorTaskBase, ThreadTaskBase): + """An input iterator for threaded pools""" + lock_type = threading.Lock + + +class ChannelThreadTask(Task, ThreadTaskBase): + """Uses an input channel as source for reading items + For instantiation, it takes all arguments of its base, the first one needs + to be the input channel to read from though.""" + __slots__ = "_pool_ref" + + def __init__(self, in_reader, *args, **kwargs): + Task.__init__(self, *args, **kwargs) + self._read = in_reader.read + self._pool_ref = None + + #{ Internal Interface + + def reader(self): + """:return: input channel from which we read""" + # the instance is bound in its instance method - lets use this to keep + # the refcount at one ( per consumer ) + return self._read.im_self + + def set_read(self, read): + """Adjust the read method to the given one""" + self._read = read + + def set_pool(self, pool): + self._pool_ref = weakref.ref(pool) + + def pool(self): + """:return: pool we are attached to, or None""" + if self._pool_ref is None: + return None + return self._pool_ref() + + #} END intenral interface |