summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py76
1 files changed, 37 insertions, 39 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 1b3c2748..68551ea3 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -18,27 +18,28 @@ from Queue import (
from graph import Graph
from channel import (
mkchannel,
- WChannel,
- SerialWChannel,
- CallbackRChannel
+ Writer,
+ Channel,
+ SerialChannel,
+ CallbackReader
)
import sys
import weakref
from time import sleep
+import new
-class RPoolChannel(CallbackRChannel):
- """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call
- before and after an item is to be read.
-
+class PoolReader(CallbackReader):
+ """A reader designed to read from channels which take part in pools
It acts like a handle to the underlying task in the pool."""
- __slots__ = ('_task_ref', '_pool_ref')
+ __slots__ = ('_task_ref', '_pool_ref', '_read')
- def __init__(self, wchannel, task, pool):
- CallbackRChannel.__init__(self, wchannel)
+ def __init__(self, channel, task, pool):
+ CallbackReader.__init__(self, channel)
self._task_ref = weakref.ref(task)
self._pool_ref = weakref.ref(pool)
+ self._read = new.instancemethod(CallbackReader.__dict__['read'], self, CallbackReader)
def __del__(self):
"""Assures that our task will be deleted if we were the last reader"""
@@ -63,15 +64,9 @@ class RPoolChannel(CallbackRChannel):
# okay for now
# TODO: Fix this - private/public method
if sys.getrefcount(self) < 6:
- pool.remove_task(task, _from_destructor_=True)
+ pool.remove_task(task)
# END handle refcount based removal of task
- #{ Internal
- def _read(self, count=0, block=True, timeout=None):
- """Direct read, bypassing the pool handling"""
- return CallbackRChannel.read(self, count, block, timeout)
- #} END internal
-
#{ Interface
def pool_ref(self):
@@ -118,7 +113,7 @@ class RPoolChannel(CallbackRChannel):
####### read data ########
##########################
# read actual items, tasks were setup to put their output into our channel ( as well )
- items = CallbackRChannel.read(self, count, block, timeout)
+ items = CallbackReader.read(self, count, block, timeout)
##########################
@@ -262,21 +257,21 @@ class Pool(object):
# should make things execute faster. Putting the if statements
# into the loop would be less code, but ... slower
# DEBUG
- # print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
+ # print actual_count, numchunks, chunksize, remainder, task._out_writer.size()
if self._num_workers:
# respect the chunk size, and split the task up if we want
# to process too much. This can be defined per task
- queue = self._queue
+ qput = self._queue
if numchunks > 1:
for i in xrange(numchunks):
- queue.put((task.process, chunksize))
+ qput((task.process, chunksize))
# END for each chunk to put
else:
- queue.put((task.process, chunksize))
+ qput((task.process, chunksize))
# END try efficient looping
if remainder:
- queue.put((task.process, remainder))
+ qput((task.process, remainder))
# END handle chunksize
else:
# no workers, so we have to do the work ourselves
@@ -295,16 +290,16 @@ class Pool(object):
# END for each task to process
- def _remove_task_if_orphaned(self, task, from_destructor):
+ def _remove_task_if_orphaned(self, task):
"""Check the task, and delete it if it is orphaned"""
- # 1 as its stored on the task, 1 for the getrefcount call
+ # 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader
# If we are getting here from the destructor of an RPool channel,
# its totally valid to virtually decrement the refcount by 1 as
# we can expect it to drop once the destructor completes, which is when
# we finish all recursive calls
- max_ref_count = 3 + from_destructor
- if sys.getrefcount(task.wchannel()) < max_ref_count:
- self.remove_task(task, from_destructor)
+ max_ref_count = 3
+ if sys.getrefcount(task.writer().channel) < max_ref_count:
+ self.remove_task(task)
#} END internal
#{ Interface
@@ -375,7 +370,7 @@ class Pool(object):
finally:
self._taskgraph_lock.release()
- def remove_task(self, task, _from_destructor_=False):
+ def remove_task(self, task):
"""Delete the task
Additionally we will remove orphaned tasks, which can be identified if their
output channel is only held by themselves, so no one will ever consume
@@ -410,7 +405,7 @@ class Pool(object):
# END locked deletion
for t in in_tasks:
- self._remove_task_if_orphaned(t, _from_destructor_)
+ self._remove_task_if_orphaned(t)
# END handle orphans recursively
return self
@@ -421,7 +416,7 @@ class Pool(object):
the task will be considered orphaned and will be deleted on the next
occasion."""
# create a write channel for it
- wctype = WChannel
+ ctype = Channel
# adjust the task with our pool ref, if it has the slot and is empty
# For now, we don't allow tasks to be used in multiple pools, except
@@ -442,26 +437,29 @@ class Pool(object):
# Use a non-threadsafe queue
# This brings about 15% more performance, but sacrifices thread-safety
- # when reading from multiple threads.
if self.size() == 0:
- wctype = SerialWChannel
+ ctype = SerialChannel
# END improve locks
# setup the tasks channel - respect the task creators choice though
# if it is set.
- wc = task.wchannel()
+ wc = task.writer()
+ ch = None
if wc is None:
- wc = wctype()
+ ch = ctype()
+ wc = Writer(ch)
+ task.set_writer(wc)
+ else:
+ ch = wc.channel
# END create write channel ifunset
- rc = RPoolChannel(wc, task, self)
- task.set_wchannel(wc)
+ rc = PoolReader(ch, task, self)
finally:
self._taskgraph_lock.release()
# END sync task addition
# If the input channel is one of our read channels, we add the relation
- if hasattr(task, 'rchannel'):
- ic = task.rchannel()
+ if hasattr(task, 'reader'):
+ ic = task.reader()
if hasattr(ic, 'pool_ref') and ic.pool_ref()() is self:
self._taskgraph_lock.acquire()
try: