summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-12 14:34:09 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-12 14:34:09 +0200
commit86ea63504f3e8a74cfb1d533be9d9602d2d17e27 (patch)
treea2c59af267666a4b44bda748b806585c46faae99 /lib/git/async/pool.py
parentf91495e271597034226f1b9651345091083172c4 (diff)
downloadgitpython-86ea63504f3e8a74cfb1d533be9d9602d2d17e27.tar.gz
Removed async from tree
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py488
1 files changed, 0 insertions, 488 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
deleted file mode 100644
index 8f33a029..00000000
--- a/lib/git/async/pool.py
+++ /dev/null
@@ -1,488 +0,0 @@
-"""Implementation of a thread-pool working with channels"""
-from thread import (
- WorkerThread,
- StopProcessing,
- )
-from threading import Lock
-
-from util import (
- AsyncQueue,
- DummyLock
- )
-
-from Queue import (
- Queue,
- Empty
- )
-
-from graph import Graph
-from channel import (
- mkchannel,
- ChannelWriter,
- Channel,
- SerialChannel,
- CallbackChannelReader
- )
-
-import sys
-import weakref
-from time import sleep
-import new
-
-
-__all__ = ('PoolReader', 'Pool', 'ThreadPool')
-
-
-class PoolReader(CallbackChannelReader):
- """A reader designed to read from channels which take part in pools
- It acts like a handle to the underlying task in the pool."""
- __slots__ = ('_task_ref', '_pool_ref')
-
- def __init__(self, channel, task, pool):
- CallbackChannelReader.__init__(self, channel)
- self._task_ref = weakref.ref(task)
- self._pool_ref = weakref.ref(pool)
-
- def __del__(self):
- """Assures that our task will be deleted if we were the last reader"""
- task = self._task_ref()
- if task is None:
- return
-
- pool = self._pool_ref()
- if pool is None:
- return
-
- # if this is the last reader to the wc we just handled, there
- # is no way anyone will ever read from the task again. If so,
- # delete the task in question, it will take care of itself and orphans
- # it might leave
- # 1 is ourselves, + 1 for the call + 1, and 3 magical ones which
- # I can't explain, but appears to be normal in the destructor
- # On the caller side, getrefcount returns 2, as expected
- # When just calling remove_task,
- # it has no way of knowing that the write channel is about to diminsh.
- # which is why we pass the info as a private kwarg - not nice, but
- # okay for now
- if sys.getrefcount(self) < 6:
- pool.remove_task(task, _from_destructor_ = True)
- # END handle refcount based removal of task
-
- #{ Internal
- def _read(self, count=0, block=True, timeout=None):
- return CallbackChannelReader.read(self, count, block, timeout)
-
- #} END internal
-
- #{ Interface
-
- def pool_ref(self):
- """:return: reference to the pool we belong to"""
- return self._pool_ref
-
- def task_ref(self):
- """:return: reference to the task producing our items"""
- return self._task_ref
-
- #} END interface
-
- def read(self, count=0, block=True, timeout=None):
- """Read an item that was processed by one of our threads
- :note: Triggers task dependency handling needed to provide the necessary
- input"""
- # NOTE: we always queue the operation that would give us count items
- # as tracking the scheduled items or testing the channels size
- # is in herently unsafe depending on the design of the task network
- # If we put on tasks onto the queue for every request, we are sure
- # to always produce enough items, even if the task.min_count actually
- # provided enough - its better to have some possibly empty task runs
- # than having and empty queue that blocks.
-
- # if the user tries to use us to read from a done task, we will never
- # compute as all produced items are already in the channel
- task = self._task_ref()
- if task is None:
- return list()
- # END abort if task was deleted
-
- skip_compute = task.is_done() or task.error()
-
- ########## prepare ##############################
- if not skip_compute:
- self._pool_ref()._prepare_channel_read(task, count)
- # END prepare pool scheduling
-
-
- ####### read data ########
- ##########################
- # read actual items, tasks were setup to put their output into our channel ( as well )
- items = CallbackChannelReader.read(self, count, block, timeout)
- ##########################
-
-
- return items
-
-
-
-class Pool(object):
- """A thread pool maintains a set of one or more worker threads, but supports
- a fully serial mode in which case the amount of threads is zero.
-
- Work is distributed via Channels, which form a dependency graph. The evaluation
- is lazy, as work will only be done once an output is requested.
-
- The thread pools inherent issue is the global interpreter lock that it will hit,
- which gets worse considering a few c extensions specifically lock their part
- globally as well. The only way this will improve is if custom c extensions
- are written which do some bulk work, but release the GIL once they have acquired
- their resources.
-
- Due to the nature of having multiple objects in git, its easy to distribute
- that work cleanly among threads.
-
- :note: the current implementation returns channels which are meant to be
- used only from the main thread, hence you cannot consume their results
- from multiple threads unless you use a task for it."""
- __slots__ = ( '_tasks', # a graph of tasks
- '_num_workers', # list of workers
- '_queue', # master queue for tasks
- '_taskorder_cache', # map task id -> ordered dependent tasks
- '_taskgraph_lock', # lock for accessing the task graph
- )
-
- # CONFIGURATION
- # The type of worker to create - its expected to provide the Thread interface,
- # taking the taskqueue as only init argument
- # as well as a method called stop_and_join() to terminate it
- WorkerCls = None
-
- # The type of lock to use to protect critical sections, providing the
- # threading.Lock interface
- LockCls = None
-
- # the type of the task queue to use - it must provide the Queue interface
- TaskQueueCls = None
-
-
- def __init__(self, size=0):
- self._tasks = Graph()
- self._num_workers = 0
- self._queue = self.TaskQueueCls()
- self._taskgraph_lock = self.LockCls()
- self._taskorder_cache = dict()
- self.set_size(size)
-
- def __del__(self):
- self.set_size(0)
-
- #{ Internal
-
- def _prepare_channel_read(self, task, count):
- """Process the tasks which depend on the given one to be sure the input
- channels are filled with data once we process the actual task
-
- Tasks have two important states: either they are done, or they are done
- and have an error, so they are likely not to have finished all their work.
-
- Either way, we will put them onto a list of tasks to delete them, providng
- information about the failed ones.
-
- Tasks which are not done will be put onto the queue for processing, which
- is fine as we walked them depth-first."""
- # for the walk, we must make sure the ordering does not change. Even
- # when accessing the cache, as it is related to graph changes
- self._taskgraph_lock.acquire()
- try:
- try:
- dfirst_tasks = self._taskorder_cache[id(task)]
- except KeyError:
- # have to retrieve the list from the graph
- dfirst_tasks = self._tasks.input_inclusive_dfirst_reversed(task)
- self._taskorder_cache[id(task)] = dfirst_tasks
- # END handle cached order retrieval
- finally:
- self._taskgraph_lock.release()
- # END handle locking
-
- # check the min count on all involved tasks, and be sure that we don't
- # have any task which produces less than the maximum min-count of all tasks
- # The actual_count is used when chunking tasks up for the queue, whereas
- # the count is usued to determine whether we still have enough output
- # on the queue, checking qsize ( ->revise )
- # ABTRACT: If T depends on T-1, and the client wants 1 item, T produces
- # at least 10, T-1 goes with 1, then T will block after 1 item, which
- # is read by the client. On the next read of 1 item, we would find T's
- # queue empty and put in another 10, which could put another thread into
- # blocking state. T-1 produces one more item, which is consumed right away
- # by the two threads running T. Although this works in the end, it leaves
- # many threads blocking and waiting for input, which is not desired.
- # Setting the min-count to the max of the mincount of all tasks assures
- # we have enough items for all.
- # Addition: in serial mode, we would enter a deadlock if one task would
- # ever wait for items !
- actual_count = count
- min_counts = (((t.min_count is not None and t.min_count) or count) for t in dfirst_tasks)
- min_count = reduce(lambda m1, m2: max(m1, m2), min_counts)
- if 0 < count < min_count:
- actual_count = min_count
- # END set actual count
-
- # the list includes our tasks - the first one to evaluate first, the
- # requested one last
- for task in dfirst_tasks:
- # if task.error() or task.is_done():
- # in theory, the should never be consumed task in the pool, right ?
- # They delete themselves once they are done. But as we run asynchronously,
- # It can be that someone reads, while a task realizes its done, and
- # we get here to prepare the read although it already is done.
- # Its not a problem though, the task wiill not do anything.
- # Hence we don't waste our time with checking for it
- # raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?")
- # END skip processing
-
- # but use the actual count to produce the output, we may produce
- # more than requested
- numchunks = 1
- chunksize = actual_count
- remainder = 0
-
- # we need the count set for this - can't chunk up unlimited items
- # In serial mode we could do this by checking for empty input channels,
- # but in dispatch mode its impossible ( == not easily possible )
- # Only try it if we have enough demand
- if task.max_chunksize and actual_count > task.max_chunksize:
- numchunks = actual_count / task.max_chunksize
- chunksize = task.max_chunksize
- remainder = actual_count - (numchunks * chunksize)
- # END handle chunking
-
- # the following loops are kind of unrolled - code duplication
- # should make things execute faster. Putting the if statements
- # into the loop would be less code, but ... slower
- if self._num_workers:
- # respect the chunk size, and split the task up if we want
- # to process too much. This can be defined per task
- qput = self._queue.put
- if numchunks > 1:
- for i in xrange(numchunks):
- qput((task.process, chunksize))
- # END for each chunk to put
- else:
- qput((task.process, chunksize))
- # END try efficient looping
-
- if remainder:
- qput((task.process, remainder))
- # END handle chunksize
- else:
- # no workers, so we have to do the work ourselves
- if numchunks > 1:
- for i in xrange(numchunks):
- task.process(chunksize)
- # END for each chunk to put
- else:
- task.process(chunksize)
- # END try efficient looping
-
- if remainder:
- task.process(remainder)
- # END handle chunksize
- # END handle serial mode
- # END for each task to process
-
-
- def _remove_task_if_orphaned(self, task, from_destructor):
- """Check the task, and delete it if it is orphaned"""
- # 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader
- # If we are getting here from the destructor of an RPool channel,
- # its totally valid to virtually decrement the refcount by 1 as
- # we can expect it to drop once the destructor completes, which is when
- # we finish all recursive calls
- max_ref_count = 3 + from_destructor
- if sys.getrefcount(task.writer().channel) < max_ref_count:
- self.remove_task(task, from_destructor)
- #} END internal
-
- #{ Interface
- def size(self):
- """:return: amount of workers in the pool
- :note: method is not threadsafe !"""
- return self._num_workers
-
- def set_size(self, size=0):
- """Set the amount of workers to use in this pool. When reducing the size,
- threads will continue with their work until they are done before effectively
- being removed.
-
- :return: self
- :param size: if 0, the pool will do all work itself in the calling thread,
- otherwise the work will be distributed among the given amount of threads.
- If the size is 0, newly added tasks will use channels which are NOT
- threadsafe to optimize item throughput.
-
- :note: currently NOT threadsafe !"""
- assert size > -1, "Size cannot be negative"
-
- # either start new threads, or kill existing ones.
- # If we end up with no threads, we process the remaining chunks on the queue
- # ourselves
- cur_count = self._num_workers
- if cur_count < size:
- # we can safely increase the size, even from serial mode, as we would
- # only be able to do this if the serial ( sync ) mode finished processing.
- # Just adding more workers is not a problem at all.
- add_count = size - cur_count
- for i in range(add_count):
- self.WorkerCls(self._queue).start()
- # END for each new worker to create
- self._num_workers += add_count
- elif cur_count > size:
- # We don't care which thread exactly gets hit by our stop request
- # On their way, they will consume remaining tasks, but new ones
- # could be added as we speak.
- del_count = cur_count - size
- for i in range(del_count):
- self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter
- # END for each thread to stop
- self._num_workers -= del_count
- # END handle count
-
- if size == 0:
- # NOTE: we do not preocess any tasks still on the queue, as we ill
- # naturally do that once we read the next time, only on the tasks
- # that are actually required. The queue will keep the tasks,
- # and once we are deleted, they will vanish without additional
- # time spend on them. If there shouldn't be any consumers anyway.
- # If we should reenable some workers again, they will continue on the
- # remaining tasks, probably with nothing to do.
- # We can't clear the task queue if we have removed workers
- # as they will receive the termination signal through it, and if
- # we had added workers, we wouldn't be here ;).
- pass
- # END process queue
- return self
-
- def num_tasks(self):
- """:return: amount of tasks"""
- self._taskgraph_lock.acquire()
- try:
- return len(self._tasks.nodes)
- finally:
- self._taskgraph_lock.release()
-
- def remove_task(self, task, _from_destructor_ = False):
- """Delete the task
- Additionally we will remove orphaned tasks, which can be identified if their
- output channel is only held by themselves, so no one will ever consume
- its items.
-
- This method blocks until all tasks to be removed have been processed, if
- they are currently being processed.
- :return: self"""
- self._taskgraph_lock.acquire()
- try:
- # it can be that the task is already deleted, but its chunk was on the
- # queue until now, so its marked consumed again
- if not task in self._tasks.nodes:
- return self
- # END early abort
-
- # the task we are currently deleting could also be processed by
- # a thread right now. We don't care about it as its taking care about
- # its write channel itself, and sends everything it can to it.
- # For it it doesn't matter that its not part of our task graph anymore.
-
- # now delete our actual node - be sure its done to prevent further
- # processing in case there are still client reads on their way.
- task.set_done()
-
- # keep its input nodes as we check whether they were orphaned
- in_tasks = task.in_nodes
- self._tasks.remove_node(task)
- self._taskorder_cache.clear()
- finally:
- self._taskgraph_lock.release()
- # END locked deletion
-
- for t in in_tasks:
- self._remove_task_if_orphaned(t, _from_destructor_)
- # END handle orphans recursively
-
- return self
-
- def add_task(self, task):
- """Add a new task to be processed.
- :return: a read channel to retrieve processed items. If that handle is lost,
- the task will be considered orphaned and will be deleted on the next
- occasion."""
- # create a write channel for it
- ctype = Channel
-
- # adjust the task with our pool ref, if it has the slot and is empty
- # For now, we don't allow tasks to be used in multiple pools, except
- # for by their channels
- if hasattr(task, 'pool'):
- their_pool = task.pool()
- if their_pool is None:
- task.set_pool(self)
- elif their_pool is not self:
- raise ValueError("Task %r is already registered to another pool" % task.id)
- # END handle pool exclusivity
- # END handle pool aware tasks
-
- self._taskgraph_lock.acquire()
- try:
- self._taskorder_cache.clear()
- self._tasks.add_node(task)
-
- # Use a non-threadsafe queue
- # This brings about 15% more performance, but sacrifices thread-safety
- if self.size() == 0:
- ctype = SerialChannel
- # END improve locks
-
- # setup the tasks channel - respect the task creators choice though
- # if it is set.
- wc = task.writer()
- ch = None
- if wc is None:
- ch = ctype()
- wc = ChannelWriter(ch)
- task.set_writer(wc)
- else:
- ch = wc.channel
- # END create write channel ifunset
- rc = PoolReader(ch, task, self)
- finally:
- self._taskgraph_lock.release()
- # END sync task addition
-
- # If the input channel is one of our read channels, we add the relation
- if hasattr(task, 'reader'):
- ic = task.reader()
- if hasattr(ic, 'pool_ref') and ic.pool_ref()() is self:
- self._taskgraph_lock.acquire()
- try:
- self._tasks.add_edge(ic._task_ref(), task)
-
- # additionally, bypass ourselves when reading from the
- # task, if possible
- if hasattr(ic, '_read'):
- task.set_read(ic._read)
- # END handle read bypass
- finally:
- self._taskgraph_lock.release()
- # END handle edge-adding
- # END add task relation
- # END handle input channels for connections
-
- return rc
-
- #} END interface
-
-
-class ThreadPool(Pool):
- """A pool using threads as worker"""
- WorkerCls = WorkerThread
- LockCls = Lock
- TaskQueueCls = AsyncQueue