diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-10 10:14:32 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-10 10:14:32 +0200 |
commit | cfb278d74ad01f3f1edf5e0ad113974a9555038d (patch) | |
tree | 4315e6827c30d5c7be035fe017ef2d998cd82335 /lib/git/async | |
parent | 3323464f85b986cba23176271da92a478b33ab9c (diff) | |
download | gitpython-cfb278d74ad01f3f1edf5e0ad113974a9555038d.tar.gz |
InputChannelTask now has interface for properly handling the reading from the same and different pools
Diffstat (limited to 'lib/git/async')
-rw-r--r-- | lib/git/async/pool.py | 43 | ||||
-rw-r--r-- | lib/git/async/task.py | 26 |
2 files changed, 64 insertions, 5 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 5ebc3655..1b3c2748 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -10,7 +10,6 @@ from util import ( DummyLock ) -from task import InputChannelTask from Queue import ( Queue, Empty @@ -66,6 +65,24 @@ class RPoolChannel(CallbackRChannel): if sys.getrefcount(self) < 6: pool.remove_task(task, _from_destructor_=True) # END handle refcount based removal of task + + #{ Internal + def _read(self, count=0, block=True, timeout=None): + """Direct read, bypassing the pool handling""" + return CallbackRChannel.read(self, count, block, timeout) + #} END internal + + #{ Interface + + def pool_ref(self): + """:return: reference to the pool we belong to""" + return self._pool_ref + + def task_ref(self): + """:return: reference to the task producing our items""" + return self._task_ref + + #} END interface def read(self, count=0, block=True, timeout=None): """Read an item that was processed by one of our threads @@ -188,7 +205,7 @@ class Pool(object): finally: self._taskgraph_lock.release() # END handle locking - print dfirst_tasks + # check the min count on all involved tasks, and be sure that we don't # have any task which produces less than the maximum min-count of all tasks # The actual_count is used when chunking tasks up for the queue, whereas @@ -406,6 +423,18 @@ class Pool(object): # create a write channel for it wctype = WChannel + # adjust the task with our pool ref, if it has the slot and is empty + # For now, we don't allow tasks to be used in multiple pools, except + # for by their channels + if hasattr(task, 'pool'): + their_pool = task.pool() + if their_pool is None: + task.set_pool(self) + elif their_pool is not self: + raise ValueError("Task %r is already registered to another pool" % task.id) + # END handle pool exclusivity + # END handle pool aware tasks + self._taskgraph_lock.acquire() try: self._taskorder_cache.clear() @@ -431,12 +460,18 @@ class Pool(object): # END sync task addition # If the input channel is one of our read channels, we add the relation - if isinstance(task, InputChannelTask): + if hasattr(task, 'rchannel'): ic = task.rchannel() - if isinstance(ic, RPoolChannel) and ic._pool_ref() is self: + if hasattr(ic, 'pool_ref') and ic.pool_ref()() is self: self._taskgraph_lock.acquire() try: self._tasks.add_edge(ic._task_ref(), task) + + # additionally, bypass ourselves when reading from the + # task, if possible + if hasattr(ic, '_read'): + task.set_read(ic._read) + # END handle read bypass finally: self._taskgraph_lock.release() # END handle edge-adding diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 57dd285d..d5b45609 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,8 +1,12 @@ from graph import Node -from channel import WChannel from util import ReadOnly +from channel import ( + WChannel, + CallbackRChannel + ) import threading +import weakref import sys import new @@ -147,6 +151,7 @@ class OutputChannelTask(Node): # The count is: 1 = wc itself, 2 = first reader channel, + x for every # thread having its copy on the stack # + 1 for the instance we provide to refcount + # Soft close, so others can continue writing their results if self.is_done() and getrefcount(self._out_wc) < 4: print "Closing channel of %r" % self.id self.close() @@ -206,13 +211,32 @@ class InputChannelTask(OutputChannelTask): """Uses an input channel as source for reading items For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" + __slots__ = "_pool_ref" def __init__(self, in_rc, *args, **kwargs): OutputChannelTask.__init__(self, *args, **kwargs) self._read = in_rc.read + self._pool_ref = None + + #{ Internal Interface def rchannel(self): """:return: input channel from which we read""" # the instance is bound in its instance method - lets use this to keep # the refcount at one ( per consumer ) return self._read.im_self + + def set_read(self, read): + """Adjust the read method to the given one""" + self._read = read + + def set_pool(self, pool): + self._pool_ref = weakref.ref(pool) + + def pool(self): + """:return: pool we are attached to, or None""" + if self._pool_ref is None: + return None + return self._pool_ref() + + #} END intenral interface |