diff options
Diffstat (limited to 'lib/git')
-rw-r--r-- | lib/git/mp/__init__.py | 1 | ||||
-rw-r--r-- | lib/git/mp/channel.py (renamed from lib/git/odb/channel.py) | 0 | ||||
-rw-r--r-- | lib/git/mp/pool.py (renamed from lib/git/odb/pool.py) | 38 | ||||
-rw-r--r-- | lib/git/mp/thread.py (renamed from lib/git/odb/thread.py) | 0 |
4 files changed, 26 insertions, 13 deletions
diff --git a/lib/git/mp/__init__.py b/lib/git/mp/__init__.py new file mode 100644 index 00000000..89b9eb47 --- /dev/null +++ b/lib/git/mp/__init__.py @@ -0,0 +1 @@ +"""Initialize the multi-processing package""" diff --git a/lib/git/odb/channel.py b/lib/git/mp/channel.py index c9cbfb87..c9cbfb87 100644 --- a/lib/git/odb/channel.py +++ b/lib/git/mp/channel.py diff --git a/lib/git/odb/pool.py b/lib/git/mp/pool.py index 5c3a7ead..f9f7880b 100644 --- a/lib/git/odb/pool.py +++ b/lib/git/mp/pool.py @@ -1,5 +1,5 @@ """Implementation of a thread-pool working with channels""" -from thread import TerminatableThread +from thread import WorkerThread from channel import ( Channel, WChannel, @@ -10,7 +10,7 @@ class Node(object): """A quick and dirty to the point implementation of a simple, and slow ascyclic graph. Its not designed to support big graphs, and sports only the functionality we need""" - __slots__('in_nodes', 'out_nodes') + __slots__ = ('in_nodes', 'out_nodes') class Graph(object): @@ -43,17 +43,11 @@ class TaskNode(Node): return self.out_wc.closed -class PoolChannel(Channel): - """Base class for read and write channels which trigger the pool to evaluate - its tasks, causing the evaluation of the task list effectively assure a read - from actual output channel will not block forever due to task dependencies. - """ - __slots__ = tuple() - - -class RPoolChannel(PoolChannel): +class RPoolChannel(RChannel): """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call - before and after an item is to be read""" + before and after an item is to be read. + + It acts like a handle to the underlying task""" __slots__ = ('_task', '_pool', '_pre_cb', '_post_cb') def set_post_cb(self, fun = lambda item: item): @@ -66,6 +60,19 @@ class RPoolChannel(PoolChannel): """Install a callback to call before an item is read from the channel. If it fails, the read will fail with an IOError If a function is not provided, the call is effectively uninstalled.""" + + def read(block=False, timeout=None): + """Read an item that was processed by one of our threads + :note: Triggers task dependency handling needed to provide the necessary + input""" + + #{ Internal + def _read(self, block=False, timeout=None): + """Calls the underlying channel's read directly, without triggering + the pool""" + return RChannel.read(self, block, timeout) + + #} END internal class PoolWorker(WorkerThread): @@ -74,6 +81,8 @@ class PoolWorker(WorkerThread): @classmethod def perform_task(cls, task): + # note : when getting the input channel, be sure not to trigger + # RPoolChannel pass @@ -82,7 +91,10 @@ class ThreadPool(Graph): a fully serial mode in which case the amount of threads is zero. Work is distributed via Channels, which form a dependency graph. The evaluation - is lazy, as work will only be done once an output is requested.""" + is lazy, as work will only be done once an output is requested. + + :note: the current implementation returns channels which are meant to be + used only from the main thread""" __slots__ = ( '_workers', # list of worker threads '_queue', # master queue for tasks '_ordered_tasks_cache' # tasks in order of evaluation, mapped by read channel diff --git a/lib/git/odb/thread.py b/lib/git/mp/thread.py index 3938666a..3938666a 100644 --- a/lib/git/odb/thread.py +++ b/lib/git/mp/thread.py |