summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-10 10:14:32 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-10 10:14:32 +0200
commitcfb278d74ad01f3f1edf5e0ad113974a9555038d (patch)
tree4315e6827c30d5c7be035fe017ef2d998cd82335 /lib/git/async/pool.py
parent3323464f85b986cba23176271da92a478b33ab9c (diff)
downloadgitpython-cfb278d74ad01f3f1edf5e0ad113974a9555038d.tar.gz
InputChannelTask now has interface for properly handling the reading from the same and different pools
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py43
1 files changed, 39 insertions, 4 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 5ebc3655..1b3c2748 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -10,7 +10,6 @@ from util import (
DummyLock
)
-from task import InputChannelTask
from Queue import (
Queue,
Empty
@@ -66,6 +65,24 @@ class RPoolChannel(CallbackRChannel):
if sys.getrefcount(self) < 6:
pool.remove_task(task, _from_destructor_=True)
# END handle refcount based removal of task
+
+ #{ Internal
+ def _read(self, count=0, block=True, timeout=None):
+ """Direct read, bypassing the pool handling"""
+ return CallbackRChannel.read(self, count, block, timeout)
+ #} END internal
+
+ #{ Interface
+
+ def pool_ref(self):
+ """:return: reference to the pool we belong to"""
+ return self._pool_ref
+
+ def task_ref(self):
+ """:return: reference to the task producing our items"""
+ return self._task_ref
+
+ #} END interface
def read(self, count=0, block=True, timeout=None):
"""Read an item that was processed by one of our threads
@@ -188,7 +205,7 @@ class Pool(object):
finally:
self._taskgraph_lock.release()
# END handle locking
- print dfirst_tasks
+
# check the min count on all involved tasks, and be sure that we don't
# have any task which produces less than the maximum min-count of all tasks
# The actual_count is used when chunking tasks up for the queue, whereas
@@ -406,6 +423,18 @@ class Pool(object):
# create a write channel for it
wctype = WChannel
+ # adjust the task with our pool ref, if it has the slot and is empty
+ # For now, we don't allow tasks to be used in multiple pools, except
+ # for by their channels
+ if hasattr(task, 'pool'):
+ their_pool = task.pool()
+ if their_pool is None:
+ task.set_pool(self)
+ elif their_pool is not self:
+ raise ValueError("Task %r is already registered to another pool" % task.id)
+ # END handle pool exclusivity
+ # END handle pool aware tasks
+
self._taskgraph_lock.acquire()
try:
self._taskorder_cache.clear()
@@ -431,12 +460,18 @@ class Pool(object):
# END sync task addition
# If the input channel is one of our read channels, we add the relation
- if isinstance(task, InputChannelTask):
+ if hasattr(task, 'rchannel'):
ic = task.rchannel()
- if isinstance(ic, RPoolChannel) and ic._pool_ref() is self:
+ if hasattr(ic, 'pool_ref') and ic.pool_ref()() is self:
self._taskgraph_lock.acquire()
try:
self._tasks.add_edge(ic._task_ref(), task)
+
+ # additionally, bypass ourselves when reading from the
+ # task, if possible
+ if hasattr(ic, '_read'):
+ task.set_read(ic._read)
+ # END handle read bypass
finally:
self._taskgraph_lock.release()
# END handle edge-adding