summaryrefslogtreecommitdiff
path: root/lib/git/async
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-06 18:13:21 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-06 18:13:21 +0200
commitb3cde0ee162b8f0cb67da981311c8f9c16050a62 (patch)
tree537615b8d7cfd059b2d7938a1929d9e3199fb374 /lib/git/async
parentec28ad575ce1d7bb6a616ffc404f32bbb1af67b2 (diff)
downloadgitpython-b3cde0ee162b8f0cb67da981311c8f9c16050a62.tar.gz
First step of testing the pool - tasks have been separated into a new module including own tests, their design improved to prepare them for some specifics that would be needed for multiprocessing support
Diffstat (limited to 'lib/git/async')
-rw-r--r--lib/git/async/graph.py16
-rw-r--r--lib/git/async/pool.py164
-rw-r--r--lib/git/async/task.py144
-rw-r--r--lib/git/async/thread.py9
-rw-r--r--lib/git/async/util.py24
5 files changed, 246 insertions, 111 deletions
diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py
index b4d6aa00..d817eeb4 100644
--- a/lib/git/async/graph.py
+++ b/lib/git/async/graph.py
@@ -1,14 +1,20 @@
"""Simplistic implementation of a graph"""
class Node(object):
- """A quick and dirty to the point implementation of a simple, and slow ascyclic graph.
- Its not designed to support big graphs, and sports only the functionality
- we need"""
- __slots__ = ('in_nodes', 'out_nodes')
+ """A Node in the graph. They know their neighbours, and have an id which should
+ resolve into a string"""
+ __slots__ = ('in_nodes', 'out_nodes', 'id')
- def __init__(self):
+ def __init__(self, id=None):
+ self.id = id
self.in_nodes = list()
self.out_nodes = list()
+
+ def __str__(self):
+ return str(self.id)
+
+ def __repr__(self):
+ return "%s(%s)" % (type(self).__name__, self.id)
class Graph(object):
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 9a24cbc5..2efc862b 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -1,10 +1,10 @@
"""Implementation of a thread-pool working with channels"""
from thread import WorkerThread
+from task import InputChannelTask
from Queue import Queue
from graph import (
Graph,
- Node
)
from channel import (
@@ -16,73 +16,6 @@ from channel import (
import weakref
import sys
-class TaskNode(Node):
- """Couples an input channel, an output channel, as well as a processing function
- together.
- It may contain additional information on how to handel read-errors from the
- input channel"""
- __slots__ = ( 'in_rc', # input read channel
- '_out_wc', # output write channel
- '_pool_ref', # ref to our pool
- '_exc', # exception caught
- 'fun', # function to call with items read from in_rc
- 'min_count', # minimum amount of items to produce, None means no override
- 'max_chunksize', # maximium amount of items to process per process call
- 'apply_single' # apply single items even if multiple where read
- )
-
- def __init__(self, in_rc, fun, apply_single=True):
- self.in_rc = in_rc
- self._out_wc = None
- self._pool_ref = None
- self._exc = None
- self.fun = fun
- self.min_count = None
- self.max_chunksize = 0 # note set
- self.apply_single = apply_single
-
- def is_done(self):
- """:return: True if we are finished processing"""
- return self._out_wc.closed
-
- def set_done(self):
- """Set ourselves to being done, has we have completed the processing"""
- self._out_wc.close()
-
- def error(self):
- """:return: Exception caught during last processing or None"""
- return self._exc
-
- def process(self, count=1):
- """Process count items and send the result individually to the output channel"""
- if self._out_wc is None:
- raise IOError("Cannot work in uninitialized task")
-
- read = self.in_rc.read
- if isinstance(self.in_rc, RPoolChannel) and self.in_rc._pool is self._pool_ref():
- read = self.in_rc._read
- items = read(count)
-
- try:
- if self.apply_single:
- for item in items:
- self._out_wc.write(self.fun(item))
- # END for each item
- else:
- self._out_wc.write(self.fun(items))
- # END handle single apply
- except Exception, e:
- self._exc = e
- self.set_done()
- # END exception handling
-
- # if we didn't get all demanded items, which is also the case if count is 0
- # we have depleted the input channel and are done
- if len(items) != count:
- self.set_done()
- # END handle done state
- #{ Configuration
-
class RPoolChannel(RChannel):
""" A read-only pool channel may not be wrapped or derived from, but it provides slots to call
@@ -116,7 +49,7 @@ class RPoolChannel(RChannel):
If a function is not provided, the call is effectively uninstalled."""
self._post_cb = fun
- def read(self, count=1, block=False, timeout=None):
+ def read(self, count=0, block=False, timeout=None):
"""Read an item that was processed by one of our threads
:note: Triggers task dependency handling needed to provide the necessary
input"""
@@ -131,9 +64,11 @@ class RPoolChannel(RChannel):
items = RChannel.read(self, count, block, timeout)
if self._post_cb:
items = self._post_cb(items)
+
+ return items
#{ Internal
- def _read(self, count=1, block=False, timeout=None):
+ def _read(self, count=0, block=False, timeout=None):
"""Calls the underlying channel's read directly, without triggering
the pool"""
return RChannel.read(self, count, block, timeout)
@@ -141,7 +76,6 @@ class RPoolChannel(RChannel):
#} END internal
-
class ThreadPool(object):
"""A thread pool maintains a set of one or more worker threads, but supports
a fully serial mode in which case the amount of threads is zero.
@@ -149,6 +83,15 @@ class ThreadPool(object):
Work is distributed via Channels, which form a dependency graph. The evaluation
is lazy, as work will only be done once an output is requested.
+ The thread pools inherent issue is the global interpreter lock that it will hit,
+ which gets worse considering a few c extensions specifically lock their part
+ globally as well. The only way this will improve is if custom c extensions
+ are written which do some bulk work, but release the GIL once they have acquired
+ their resources.
+
+ Due to the nature of having multiple objects in git, its easy to distribute
+ that work cleanly among threads.
+
:note: the current implementation returns channels which are meant to be
used only from the main thread, hence you cannot consume their results
from multiple threads unless you use a task for it."""
@@ -156,7 +99,6 @@ class ThreadPool(object):
'_consumed_tasks', # a list with tasks that are done or had an error
'_workers', # list of worker threads
'_queue', # master queue for tasks
- '_ordered_tasks_cache' # tasks in order of evaluation, mapped from task -> task list
)
def __init__(self, size=0):
@@ -164,10 +106,10 @@ class ThreadPool(object):
self._consumed_tasks = list()
self._workers = list()
self._queue = Queue()
- self._ordered_tasks_cache = dict()
+ self.set_size(size)
def __del__(self):
- raise NotImplementedError("TODO: Proper cleanup")
+ self.set_size(0)
#{ Internal
def _queue_feeder_visitor(self, task, count):
@@ -175,7 +117,7 @@ class ThreadPool(object):
queue all others for processing by our worker threads ( if available )."""
if task.error() or task.is_done():
self._consumed_tasks.append(task)
-
+
# allow min-count override. This makes sure we take at least min-count
# items off the input queue ( later )
if task.min_count is not None:
@@ -236,30 +178,11 @@ class ThreadPool(object):
#} END internal
#{ Interface
+ def size(self):
+ """:return: amount of workers in the pool"""
+ return len(self._workers)
- def del_task(self, task):
- """Delete the task
- Additionally we will remove orphaned tasks, which can be identified if their
- output channel is only held by themselves, so no one will ever consume
- its items.
-
- :return: self"""
- # now delete our actual node - must set it done os it closes its channels.
- # Otherwise further reads of output tasks will block.
- # Actually they may still block if anyone wants to read all ... without
- # a timeout
- # keep its input nodes as we check whether they were orphaned
- in_tasks = task.in_nodes
- task.set_done()
- self._tasks.del_node(task)
-
- for t in in_tasks
- self._del_task_if_orphaned(t)
- # END handle orphans recursively
-
- return self
-
- def set_pool_size(self, size=0):
+ def set_size(self, size=0):
"""Set the amount of workers to use in this pool. When reducing the size,
the call may block as it waits for threads to finish.
When reducing the size to zero, this thread will process all remaining
@@ -275,6 +198,7 @@ class ThreadPool(object):
if cur_count < size:
for i in range(size - cur_count):
worker = WorkerThread(self._queue)
+ worker.start()
self._workers.append(worker)
# END for each new worker to create
elif cur_count > size:
@@ -295,7 +219,33 @@ class ThreadPool(object):
# END while there are tasks on the queue
# END process queue
return self
-
+
+ def num_tasks(self):
+ """:return: amount of tasks"""
+ return len(self._tasks.nodes)
+
+ def del_task(self, task):
+ """Delete the task
+ Additionally we will remove orphaned tasks, which can be identified if their
+ output channel is only held by themselves, so no one will ever consume
+ its items.
+
+ :return: self"""
+ # now delete our actual node - must set it done os it closes its channels.
+ # Otherwise further reads of output tasks will block.
+ # Actually they may still block if anyone wants to read all ... without
+ # a timeout
+ # keep its input nodes as we check whether they were orphaned
+ in_tasks = task.in_nodes
+ task.set_done()
+ self._tasks.del_node(task)
+
+ for t in in_tasks:
+ self._del_task_if_orphaned(t)
+ # END handle orphans recursively
+
+ return self
+
def add_task(self, task):
"""Add a new task to be processed.
:return: a read channel to retrieve processed items. If that handle is lost,
@@ -305,15 +255,21 @@ class ThreadPool(object):
wc, rc = Channel()
rc = RPoolChannel(wc, task, self)
task._out_wc = wc
- task._pool_ref = weakref.ref(self)
+
+ has_input_channel = isinstance(task, InputChannelTask)
+ if has_input_channel:
+ task._pool_ref = weakref.ref(self)
+ # END init input channel task
self._tasks.add_node(task)
# If the input channel is one of our read channels, we add the relation
- ic = task.in_rc
- if isinstance(ic, RPoolChannel) and ic._pool is self:
- self._tasks.add_edge(ic._task, task)
- # END add task relation
+ if has_input_channel:
+ ic = task.in_rc
+ if isinstance(ic, RPoolChannel) and ic._pool is self:
+ self._tasks.add_edge(ic._task, task)
+ # END add task relation
+ # END handle input channels for connections
return rc
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
new file mode 100644
index 00000000..d2422773
--- /dev/null
+++ b/lib/git/async/task.py
@@ -0,0 +1,144 @@
+from graph import Node
+import threading
+import new
+
+class OutputChannelTask(Node):
+ """Abstracts a named task as part of a set of interdependent tasks, which contains
+ additional information on how the task should be queued and processed.
+
+ Results of the item processing are sent to an output channel, which is to be
+ set by the creator"""
+ __slots__ = ( '_read', # method to yield items to process
+ '_out_wc', # output write channel
+ '_exc', # exception caught
+ 'fun', # function to call with items read
+ 'min_count', # minimum amount of items to produce, None means no override
+ 'max_chunksize', # maximium amount of items to process per process call
+ 'apply_single' # apply single items even if multiple where read
+ )
+
+ def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0):
+ Node.__init__(self, id)
+ self._read = None # to be set by subclasss
+ self._out_wc = None # to be set later
+ self._exc = None
+ self.fun = fun
+ self.min_count = None
+ self.max_chunksize = 0 # note set
+ self.apply_single = apply_single
+
+ def is_done(self):
+ """:return: True if we are finished processing"""
+ return self._out_wc.closed
+
+ def set_done(self):
+ """Set ourselves to being done, has we have completed the processing"""
+ self._out_wc.close()
+
+ def error(self):
+ """:return: Exception caught during last processing or None"""
+ return self._exc
+
+ def process(self, count=0):
+ """Process count items and send the result individually to the output channel"""
+ items = self._read(count)
+
+ try:
+ if self.apply_single:
+ for item in items:
+ self._out_wc.write(self.fun(item))
+ # END for each item
+ else:
+ self._out_wc.write(self.fun(items))
+ # END handle single apply
+ except Exception, e:
+ self._exc = e
+ self.set_done()
+ # END exception handling
+
+ # if we didn't get all demanded items, which is also the case if count is 0
+ # we have depleted the input channel and are done
+ if len(items) != count:
+ self.set_done()
+ # END handle done state
+ #{ Configuration
+
+
+class ThreadTaskBase(object):
+ """Describes tasks which can be used with theaded pools"""
+ pass
+
+
+class InputIteratorTaskBase(OutputChannelTask):
+ """Implements a task which processes items from an iterable in a multi-processing
+ safe manner"""
+ __slots__ = ('_iterator', '_lock')
+ # the type of the lock to use when reading from the iterator
+ lock_type = None
+
+ def __init__(self, iterator, *args, **kwargs):
+ OutputChannelTask.__init__(self, *args, **kwargs)
+ if not hasattr(iterator, 'next'):
+ raise ValueError("Iterator %r needs a next() function" % iterator)
+ self._iterator = iterator
+ self._lock = self.lock_type()
+ self._read = self.__read
+
+ def __read(self, count=0):
+ """Read count items from the iterator, and return them"""
+ self._lock.acquire()
+ try:
+ if count == 0:
+ return list(self._iterator)
+ else:
+ out = list()
+ it = self._iterator
+ for i in xrange(count):
+ try:
+ out.append(it.next())
+ except StopIteration:
+ break
+ # END handle empty iterator
+ # END for each item to take
+ return out
+ # END handle count
+ finally:
+ self._lock.release()
+ # END handle locking
+
+
+class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase):
+ """An input iterator for threaded pools"""
+ lock_type = threading.Lock
+
+
+class InputChannelTask(OutputChannelTask):
+ """Uses an input channel as source for reading items
+ For instantiation, it takes all arguments of its base, the first one needs
+ to be the input channel to read from though."""
+ __slots__ = (
+ 'in_rc', # channel to read items from
+ '_pool_ref' # to be set by Pool
+ )
+
+ def __init__(self, in_rc, *args, **kwargs):
+ OutputChannelTask.__init__(self, *args, **kwargs)
+ self._in_rc = in_rc
+
+ def process(self, count=1):
+ """Verify our setup, and do some additional checking, before the
+ base implementation can permanently perform all operations"""
+ self._read = self._in_rc.read
+ # make sure we don't trigger the pool if we read from a pool channel which
+ # belongs to our own pool. Channels from different pools are fine though,
+ # there we want to trigger its computation
+ if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref():
+ self._read = self._in_rc._read
+
+ # permanently install our base for processing
+ self.process = new.instancemethod(OutputChannelTask.__dict__['process'], self, type(self))
+
+ # and call it
+ return OutputChannelTask.process(self, count)
+ #{ Configuration
+
diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py
index 7ca93c86..82acbd8f 100644
--- a/lib/git/async/thread.py
+++ b/lib/git/async/thread.py
@@ -139,10 +139,15 @@ class WorkerThread(TerminatableThread):
if self._should_terminate():
break
# END check for stop request
- routine = self.__class__.quit
+ routine = None
args = tuple()
kwargs = dict()
- tasktuple = self.inq.get()
+ # don't wait too long, instead check for the termination request more often
+ try:
+ tasktuple = self.inq.get(True, 1)
+ except Queue.Empty:
+ continue
+ # END get task with timeout
if isinstance(tasktuple, (tuple, list)):
if len(tasktuple) == 3:
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
new file mode 100644
index 00000000..dabd8a42
--- /dev/null
+++ b/lib/git/async/util.py
@@ -0,0 +1,24 @@
+"""Module with utilities related to async operations"""
+
+import sys
+import os
+
+def cpu_count():
+ """:return:number of CPUs in the system
+ :note: inspired by multiprocessing"""
+ num = 0
+ try:
+ if sys.platform == 'win32':
+ num = int(os.environ['NUMBER_OF_PROCESSORS'])
+ elif 'bsd' in sys.platform or sys.platform == 'darwin':
+ num = int(os.popen('sysctl -n hw.ncpu').read())
+ else:
+ num = os.sysconf('SC_NPROCESSORS_ONLN')
+ except (ValueError, KeyError, OSError, AttributeError):
+ pass
+ # END exception handling
+
+ if num == 0:
+ raise NotImplementedError('cannot determine number of cpus')
+
+ return num