summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-06 18:13:21 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-06 18:13:21 +0200
commitb3cde0ee162b8f0cb67da981311c8f9c16050a62 (patch)
tree537615b8d7cfd059b2d7938a1929d9e3199fb374 /lib/git/async/pool.py
parentec28ad575ce1d7bb6a616ffc404f32bbb1af67b2 (diff)
downloadgitpython-b3cde0ee162b8f0cb67da981311c8f9c16050a62.tar.gz
First step of testing the pool - tasks have been separated into a new module including own tests, their design improved to prepare them for some specifics that would be needed for multiprocessing support
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py164
1 files changed, 60 insertions, 104 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 9a24cbc5..2efc862b 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -1,10 +1,10 @@
"""Implementation of a thread-pool working with channels"""
from thread import WorkerThread
+from task import InputChannelTask
from Queue import Queue
from graph import (
Graph,
- Node
)
from channel import (
@@ -16,73 +16,6 @@ from channel import (
import weakref
import sys
-class TaskNode(Node):
- """Couples an input channel, an output channel, as well as a processing function
- together.
- It may contain additional information on how to handel read-errors from the
- input channel"""
- __slots__ = ( 'in_rc', # input read channel
- '_out_wc', # output write channel
- '_pool_ref', # ref to our pool
- '_exc', # exception caught
- 'fun', # function to call with items read from in_rc
- 'min_count', # minimum amount of items to produce, None means no override
- 'max_chunksize', # maximium amount of items to process per process call
- 'apply_single' # apply single items even if multiple where read
- )
-
- def __init__(self, in_rc, fun, apply_single=True):
- self.in_rc = in_rc
- self._out_wc = None
- self._pool_ref = None
- self._exc = None
- self.fun = fun
- self.min_count = None
- self.max_chunksize = 0 # note set
- self.apply_single = apply_single
-
- def is_done(self):
- """:return: True if we are finished processing"""
- return self._out_wc.closed
-
- def set_done(self):
- """Set ourselves to being done, has we have completed the processing"""
- self._out_wc.close()
-
- def error(self):
- """:return: Exception caught during last processing or None"""
- return self._exc
-
- def process(self, count=1):
- """Process count items and send the result individually to the output channel"""
- if self._out_wc is None:
- raise IOError("Cannot work in uninitialized task")
-
- read = self.in_rc.read
- if isinstance(self.in_rc, RPoolChannel) and self.in_rc._pool is self._pool_ref():
- read = self.in_rc._read
- items = read(count)
-
- try:
- if self.apply_single:
- for item in items:
- self._out_wc.write(self.fun(item))
- # END for each item
- else:
- self._out_wc.write(self.fun(items))
- # END handle single apply
- except Exception, e:
- self._exc = e
- self.set_done()
- # END exception handling
-
- # if we didn't get all demanded items, which is also the case if count is 0
- # we have depleted the input channel and are done
- if len(items) != count:
- self.set_done()
- # END handle done state
- #{ Configuration
-
class RPoolChannel(RChannel):
""" A read-only pool channel may not be wrapped or derived from, but it provides slots to call
@@ -116,7 +49,7 @@ class RPoolChannel(RChannel):
If a function is not provided, the call is effectively uninstalled."""
self._post_cb = fun
- def read(self, count=1, block=False, timeout=None):
+ def read(self, count=0, block=False, timeout=None):
"""Read an item that was processed by one of our threads
:note: Triggers task dependency handling needed to provide the necessary
input"""
@@ -131,9 +64,11 @@ class RPoolChannel(RChannel):
items = RChannel.read(self, count, block, timeout)
if self._post_cb:
items = self._post_cb(items)
+
+ return items
#{ Internal
- def _read(self, count=1, block=False, timeout=None):
+ def _read(self, count=0, block=False, timeout=None):
"""Calls the underlying channel's read directly, without triggering
the pool"""
return RChannel.read(self, count, block, timeout)
@@ -141,7 +76,6 @@ class RPoolChannel(RChannel):
#} END internal
-
class ThreadPool(object):
"""A thread pool maintains a set of one or more worker threads, but supports
a fully serial mode in which case the amount of threads is zero.
@@ -149,6 +83,15 @@ class ThreadPool(object):
Work is distributed via Channels, which form a dependency graph. The evaluation
is lazy, as work will only be done once an output is requested.
+ The thread pools inherent issue is the global interpreter lock that it will hit,
+ which gets worse considering a few c extensions specifically lock their part
+ globally as well. The only way this will improve is if custom c extensions
+ are written which do some bulk work, but release the GIL once they have acquired
+ their resources.
+
+ Due to the nature of having multiple objects in git, its easy to distribute
+ that work cleanly among threads.
+
:note: the current implementation returns channels which are meant to be
used only from the main thread, hence you cannot consume their results
from multiple threads unless you use a task for it."""
@@ -156,7 +99,6 @@ class ThreadPool(object):
'_consumed_tasks', # a list with tasks that are done or had an error
'_workers', # list of worker threads
'_queue', # master queue for tasks
- '_ordered_tasks_cache' # tasks in order of evaluation, mapped from task -> task list
)
def __init__(self, size=0):
@@ -164,10 +106,10 @@ class ThreadPool(object):
self._consumed_tasks = list()
self._workers = list()
self._queue = Queue()
- self._ordered_tasks_cache = dict()
+ self.set_size(size)
def __del__(self):
- raise NotImplementedError("TODO: Proper cleanup")
+ self.set_size(0)
#{ Internal
def _queue_feeder_visitor(self, task, count):
@@ -175,7 +117,7 @@ class ThreadPool(object):
queue all others for processing by our worker threads ( if available )."""
if task.error() or task.is_done():
self._consumed_tasks.append(task)
-
+
# allow min-count override. This makes sure we take at least min-count
# items off the input queue ( later )
if task.min_count is not None:
@@ -236,30 +178,11 @@ class ThreadPool(object):
#} END internal
#{ Interface
+ def size(self):
+ """:return: amount of workers in the pool"""
+ return len(self._workers)
- def del_task(self, task):
- """Delete the task
- Additionally we will remove orphaned tasks, which can be identified if their
- output channel is only held by themselves, so no one will ever consume
- its items.
-
- :return: self"""
- # now delete our actual node - must set it done os it closes its channels.
- # Otherwise further reads of output tasks will block.
- # Actually they may still block if anyone wants to read all ... without
- # a timeout
- # keep its input nodes as we check whether they were orphaned
- in_tasks = task.in_nodes
- task.set_done()
- self._tasks.del_node(task)
-
- for t in in_tasks
- self._del_task_if_orphaned(t)
- # END handle orphans recursively
-
- return self
-
- def set_pool_size(self, size=0):
+ def set_size(self, size=0):
"""Set the amount of workers to use in this pool. When reducing the size,
the call may block as it waits for threads to finish.
When reducing the size to zero, this thread will process all remaining
@@ -275,6 +198,7 @@ class ThreadPool(object):
if cur_count < size:
for i in range(size - cur_count):
worker = WorkerThread(self._queue)
+ worker.start()
self._workers.append(worker)
# END for each new worker to create
elif cur_count > size:
@@ -295,7 +219,33 @@ class ThreadPool(object):
# END while there are tasks on the queue
# END process queue
return self
-
+
+ def num_tasks(self):
+ """:return: amount of tasks"""
+ return len(self._tasks.nodes)
+
+ def del_task(self, task):
+ """Delete the task
+ Additionally we will remove orphaned tasks, which can be identified if their
+ output channel is only held by themselves, so no one will ever consume
+ its items.
+
+ :return: self"""
+ # now delete our actual node - must set it done os it closes its channels.
+ # Otherwise further reads of output tasks will block.
+ # Actually they may still block if anyone wants to read all ... without
+ # a timeout
+ # keep its input nodes as we check whether they were orphaned
+ in_tasks = task.in_nodes
+ task.set_done()
+ self._tasks.del_node(task)
+
+ for t in in_tasks:
+ self._del_task_if_orphaned(t)
+ # END handle orphans recursively
+
+ return self
+
def add_task(self, task):
"""Add a new task to be processed.
:return: a read channel to retrieve processed items. If that handle is lost,
@@ -305,15 +255,21 @@ class ThreadPool(object):
wc, rc = Channel()
rc = RPoolChannel(wc, task, self)
task._out_wc = wc
- task._pool_ref = weakref.ref(self)
+
+ has_input_channel = isinstance(task, InputChannelTask)
+ if has_input_channel:
+ task._pool_ref = weakref.ref(self)
+ # END init input channel task
self._tasks.add_node(task)
# If the input channel is one of our read channels, we add the relation
- ic = task.in_rc
- if isinstance(ic, RPoolChannel) and ic._pool is self:
- self._tasks.add_edge(ic._task, task)
- # END add task relation
+ if has_input_channel:
+ ic = task.in_rc
+ if isinstance(ic, RPoolChannel) and ic._pool is self:
+ self._tasks.add_edge(ic._task, task)
+ # END add task relation
+ # END handle input channels for connections
return rc