summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py104
1 files changed, 70 insertions, 34 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 2ec18f1a..dbc201a9 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -10,7 +10,6 @@ from util import (
DummyLock
)
-from task import InputChannelTask
from Queue import (
Queue,
Empty
@@ -19,25 +18,25 @@ from Queue import (
from graph import Graph
from channel import (
mkchannel,
- WChannel,
- SerialWChannel,
- CallbackRChannel
+ Writer,
+ Channel,
+ SerialChannel,
+ CallbackReader
)
import sys
import weakref
from time import sleep
+import new
-class RPoolChannel(CallbackRChannel):
- """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call
- before and after an item is to be read.
-
+class PoolReader(CallbackReader):
+ """A reader designed to read from channels which take part in pools
It acts like a handle to the underlying task in the pool."""
__slots__ = ('_task_ref', '_pool_ref')
- def __init__(self, wchannel, task, pool):
- CallbackRChannel.__init__(self, wchannel)
+ def __init__(self, channel, task, pool):
+ CallbackReader.__init__(self, channel)
self._task_ref = weakref.ref(task)
self._pool_ref = weakref.ref(pool)
@@ -62,10 +61,27 @@ class RPoolChannel(CallbackRChannel):
# it has no way of knowing that the write channel is about to diminsh.
# which is why we pass the info as a private kwarg - not nice, but
# okay for now
- # TODO: Fix this - private/public method
if sys.getrefcount(self) < 6:
- pool.remove_task(task, _from_destructor_=True)
+ pool.remove_task(task, _from_destructor_ = True)
# END handle refcount based removal of task
+
+ #{ Internal
+ def _read(self, count=0, block=True, timeout=None):
+ return CallbackReader.read(self, count, block, timeout)
+
+ #} END internal
+
+ #{ Interface
+
+ def pool_ref(self):
+ """:return: reference to the pool we belong to"""
+ return self._pool_ref
+
+ def task_ref(self):
+ """:return: reference to the task producing our items"""
+ return self._task_ref
+
+ #} END interface
def read(self, count=0, block=True, timeout=None):
"""Read an item that was processed by one of our threads
@@ -101,7 +117,7 @@ class RPoolChannel(CallbackRChannel):
####### read data ########
##########################
# read actual items, tasks were setup to put their output into our channel ( as well )
- items = CallbackRChannel.read(self, count, block, timeout)
+ items = CallbackReader.read(self, count, block, timeout)
##########################
@@ -182,8 +198,7 @@ class Pool(object):
dfirst_tasks = self._taskorder_cache[id(task)]
except KeyError:
# have to retrieve the list from the graph
- dfirst_tasks = list()
- self._tasks.visit_input_inclusive_depth_first(task, lambda n: dfirst_tasks.append(n))
+ dfirst_tasks = self._tasks.input_inclusive_dfirst_reversed(task)
self._taskorder_cache[id(task)] = dfirst_tasks
# END handle cached order retrieval
finally:
@@ -246,21 +261,21 @@ class Pool(object):
# should make things execute faster. Putting the if statements
# into the loop would be less code, but ... slower
# DEBUG
- # print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
+ # print actual_count, numchunks, chunksize, remainder, task._out_writer.size()
if self._num_workers:
# respect the chunk size, and split the task up if we want
# to process too much. This can be defined per task
- queue = self._queue
+ qput = self._queue.put
if numchunks > 1:
for i in xrange(numchunks):
- queue.put((task.process, chunksize))
+ qput((task.process, chunksize))
# END for each chunk to put
else:
- queue.put((task.process, chunksize))
+ qput((task.process, chunksize))
# END try efficient looping
if remainder:
- queue.put((task.process, remainder))
+ qput((task.process, remainder))
# END handle chunksize
else:
# no workers, so we have to do the work ourselves
@@ -281,13 +296,13 @@ class Pool(object):
def _remove_task_if_orphaned(self, task, from_destructor):
"""Check the task, and delete it if it is orphaned"""
- # 1 as its stored on the task, 1 for the getrefcount call
+ # 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader
# If we are getting here from the destructor of an RPool channel,
# its totally valid to virtually decrement the refcount by 1 as
# we can expect it to drop once the destructor completes, which is when
# we finish all recursive calls
max_ref_count = 3 + from_destructor
- if sys.getrefcount(task.wchannel()) < max_ref_count:
+ if sys.getrefcount(task.writer().channel) < max_ref_count:
self.remove_task(task, from_destructor)
#} END internal
@@ -358,7 +373,7 @@ class Pool(object):
finally:
self._taskgraph_lock.release()
- def remove_task(self, task, _from_destructor_=False):
+ def remove_task(self, task, _from_destructor_ = False):
"""Delete the task
Additionally we will remove orphaned tasks, which can be identified if their
output channel is only held by themselves, so no one will ever consume
@@ -386,7 +401,7 @@ class Pool(object):
# keep its input nodes as we check whether they were orphaned
in_tasks = task.in_nodes
- self._tasks.del_node(task)
+ self._tasks.remove_node(task)
self._taskorder_cache.clear()
finally:
self._taskgraph_lock.release()
@@ -404,7 +419,19 @@ class Pool(object):
the task will be considered orphaned and will be deleted on the next
occasion."""
# create a write channel for it
- wctype = WChannel
+ ctype = Channel
+
+ # adjust the task with our pool ref, if it has the slot and is empty
+ # For now, we don't allow tasks to be used in multiple pools, except
+ # for by their channels
+ if hasattr(task, 'pool'):
+ their_pool = task.pool()
+ if their_pool is None:
+ task.set_pool(self)
+ elif their_pool is not self:
+ raise ValueError("Task %r is already registered to another pool" % task.id)
+ # END handle pool exclusivity
+ # END handle pool aware tasks
self._taskgraph_lock.acquire()
try:
@@ -413,30 +440,39 @@ class Pool(object):
# Use a non-threadsafe queue
# This brings about 15% more performance, but sacrifices thread-safety
- # when reading from multiple threads.
if self.size() == 0:
- wctype = SerialWChannel
+ ctype = SerialChannel
# END improve locks
# setup the tasks channel - respect the task creators choice though
# if it is set.
- wc = task.wchannel()
+ wc = task.writer()
+ ch = None
if wc is None:
- wc = wctype()
+ ch = ctype()
+ wc = Writer(ch)
+ task.set_writer(wc)
+ else:
+ ch = wc.channel
# END create write channel ifunset
- rc = RPoolChannel(wc, task, self)
- task.set_wchannel(wc)
+ rc = PoolReader(ch, task, self)
finally:
self._taskgraph_lock.release()
# END sync task addition
# If the input channel is one of our read channels, we add the relation
- if isinstance(task, InputChannelTask):
- ic = task.rchannel()
- if isinstance(ic, RPoolChannel) and ic._pool_ref() is self:
+ if hasattr(task, 'reader'):
+ ic = task.reader()
+ if hasattr(ic, 'pool_ref') and ic.pool_ref()() is self:
self._taskgraph_lock.acquire()
try:
self._tasks.add_edge(ic._task_ref(), task)
+
+ # additionally, bypass ourselves when reading from the
+ # task, if possible
+ if hasattr(ic, '_read'):
+ task.set_read(ic._read)
+ # END handle read bypass
finally:
self._taskgraph_lock.release()
# END handle edge-adding