summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/git/mp/__init__.py1
-rw-r--r--lib/git/mp/channel.py (renamed from lib/git/odb/channel.py)0
-rw-r--r--lib/git/mp/pool.py (renamed from lib/git/odb/pool.py)38
-rw-r--r--lib/git/mp/thread.py (renamed from lib/git/odb/thread.py)0
-rw-r--r--test/git/mp/__init__.py0
-rw-r--r--test/git/mp/test_channel.py (renamed from test/git/odb/test_channel.py)2
-rw-r--r--test/git/mp/test_pool.py (renamed from test/git/odb/test_pool.py)2
-rw-r--r--test/git/mp/test_thread.py (renamed from test/git/odb/test_thread.py)2
8 files changed, 29 insertions, 16 deletions
diff --git a/lib/git/mp/__init__.py b/lib/git/mp/__init__.py
new file mode 100644
index 00000000..89b9eb47
--- /dev/null
+++ b/lib/git/mp/__init__.py
@@ -0,0 +1 @@
+"""Initialize the multi-processing package"""
diff --git a/lib/git/odb/channel.py b/lib/git/mp/channel.py
index c9cbfb87..c9cbfb87 100644
--- a/lib/git/odb/channel.py
+++ b/lib/git/mp/channel.py
diff --git a/lib/git/odb/pool.py b/lib/git/mp/pool.py
index 5c3a7ead..f9f7880b 100644
--- a/lib/git/odb/pool.py
+++ b/lib/git/mp/pool.py
@@ -1,5 +1,5 @@
"""Implementation of a thread-pool working with channels"""
-from thread import TerminatableThread
+from thread import WorkerThread
from channel import (
Channel,
WChannel,
@@ -10,7 +10,7 @@ class Node(object):
"""A quick and dirty to the point implementation of a simple, and slow ascyclic graph.
Its not designed to support big graphs, and sports only the functionality
we need"""
- __slots__('in_nodes', 'out_nodes')
+ __slots__ = ('in_nodes', 'out_nodes')
class Graph(object):
@@ -43,17 +43,11 @@ class TaskNode(Node):
return self.out_wc.closed
-class PoolChannel(Channel):
- """Base class for read and write channels which trigger the pool to evaluate
- its tasks, causing the evaluation of the task list effectively assure a read
- from actual output channel will not block forever due to task dependencies.
- """
- __slots__ = tuple()
-
-
-class RPoolChannel(PoolChannel):
+class RPoolChannel(RChannel):
""" A read-only pool channel may not be wrapped or derived from, but it provides slots to call
- before and after an item is to be read"""
+ before and after an item is to be read.
+
+ It acts like a handle to the underlying task"""
__slots__ = ('_task', '_pool', '_pre_cb', '_post_cb')
def set_post_cb(self, fun = lambda item: item):
@@ -66,6 +60,19 @@ class RPoolChannel(PoolChannel):
"""Install a callback to call before an item is read from the channel.
If it fails, the read will fail with an IOError
If a function is not provided, the call is effectively uninstalled."""
+
+ def read(block=False, timeout=None):
+ """Read an item that was processed by one of our threads
+ :note: Triggers task dependency handling needed to provide the necessary
+ input"""
+
+ #{ Internal
+ def _read(self, block=False, timeout=None):
+ """Calls the underlying channel's read directly, without triggering
+ the pool"""
+ return RChannel.read(self, block, timeout)
+
+ #} END internal
class PoolWorker(WorkerThread):
@@ -74,6 +81,8 @@ class PoolWorker(WorkerThread):
@classmethod
def perform_task(cls, task):
+ # note : when getting the input channel, be sure not to trigger
+ # RPoolChannel
pass
@@ -82,7 +91,10 @@ class ThreadPool(Graph):
a fully serial mode in which case the amount of threads is zero.
Work is distributed via Channels, which form a dependency graph. The evaluation
- is lazy, as work will only be done once an output is requested."""
+ is lazy, as work will only be done once an output is requested.
+
+ :note: the current implementation returns channels which are meant to be
+ used only from the main thread"""
__slots__ = ( '_workers', # list of worker threads
'_queue', # master queue for tasks
'_ordered_tasks_cache' # tasks in order of evaluation, mapped by read channel
diff --git a/lib/git/odb/thread.py b/lib/git/mp/thread.py
index 3938666a..3938666a 100644
--- a/lib/git/odb/thread.py
+++ b/lib/git/mp/thread.py
diff --git a/test/git/mp/__init__.py b/test/git/mp/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/test/git/mp/__init__.py
diff --git a/test/git/odb/test_channel.py b/test/git/mp/test_channel.py
index d845a6ec..9b667372 100644
--- a/test/git/odb/test_channel.py
+++ b/test/git/mp/test_channel.py
@@ -1,6 +1,6 @@
"""Channel testing"""
from test.testlib import *
-from git.odb.channel import *
+from git.mp.channel import *
import time
diff --git a/test/git/odb/test_pool.py b/test/git/mp/test_pool.py
index 6656c69d..7c4a366f 100644
--- a/test/git/odb/test_pool.py
+++ b/test/git/mp/test_pool.py
@@ -1,6 +1,6 @@
"""Channel testing"""
from test.testlib import *
-from git.odb.pool import *
+from git.mp.pool import *
import time
diff --git a/test/git/odb/test_thread.py b/test/git/mp/test_thread.py
index 674ecc1d..9625aabb 100644
--- a/test/git/odb/test_thread.py
+++ b/test/git/mp/test_thread.py
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
""" Test thead classes and functions"""
from test.testlib import *
-from git.odb.thread import *
+from git.mp.thread import *
from Queue import Queue
class TestWorker(WorkerThread):