summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-07 19:12:44 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-07 19:12:44 +0200
commit583cd8807259a69fc01874b798f657c1f9ab7828 (patch)
tree046847c4dcd33f5b30c00ff65770039fc72dd148 /lib/git/async/pool.py
parentedd9e23c766cfd51b3a6f6eee5aac0b791ef2fd0 (diff)
downloadgitpython-583cd8807259a69fc01874b798f657c1f9ab7828.tar.gz
Moved pool utilities into util module, fixed critical issue that caused havok - lets call this a safe-state
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py154
1 files changed, 36 insertions, 118 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 30291835..227cabfc 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -1,121 +1,28 @@
"""Implementation of a thread-pool working with channels"""
from thread import WorkerThread
+from threading import Lock
-from threading import (
- Lock,
- _Condition,
- _sleep,
- _time,
+from util import (
+ SyncQueue,
+ AsyncQueue,
)
from task import InputChannelTask
-from Queue import Queue, Empty
-from collections import deque
-
-from graph import (
- Graph,
+from Queue import (
+ Queue,
+ Empty
)
+from graph import Graph
from channel import (
Channel,
WChannel,
RChannel
)
-import weakref
import sys
-#{ Utilities
-
-class SyncQueue(deque):
- """Adapter to allow using a deque like a queue, without locking"""
- def get(self, block=True, timeout=None):
- try:
- return self.pop()
- except IndexError:
- raise Empty
- # END raise empty
-
- def empty(self):
- return len(self) == 0
-
- put = deque.append
-
-
-class HSCondition(_Condition):
- """An attempt to make conditions less blocking, which gains performance
- in return by sleeping less"""
- delay = 0.00002 # reduces wait times, but increases overhead
-
- def wait(self, timeout=None):
- waiter = Lock()
- waiter.acquire()
- self.__dict__['_Condition__waiters'].append(waiter)
- saved_state = self._release_save()
- try: # restore state no matter what (e.g., KeyboardInterrupt)
- if timeout is None:
- waiter.acquire()
- else:
- # Balancing act: We can't afford a pure busy loop, so we
- # have to sleep; but if we sleep the whole timeout time,
- # we'll be unresponsive. The scheme here sleeps very
- # little at first, longer as time goes on, but never longer
- # than 20 times per second (or the timeout time remaining).
- endtime = _time() + timeout
- delay = self.delay
- acquire = waiter.acquire
- while True:
- gotit = acquire(0)
- if gotit:
- break
- remaining = endtime - _time()
- if remaining <= 0:
- break
- delay = min(delay * 2, remaining, .05)
- _sleep(delay)
- # END endless loop
- if not gotit:
- try:
- self.__dict__['_Condition__waiters'].remove(waiter)
- except ValueError:
- pass
- # END didn't ever get it
- finally:
- self._acquire_restore(saved_state)
-
- def notify(self, n=1):
- __waiters = self.__dict__['_Condition__waiters']
- if not __waiters:
- return
- if n == 1:
- __waiters[0].release()
- try:
- __waiters.pop(0)
- except IndexError:
- pass
- else:
- waiters = __waiters[:n]
- for waiter in waiters:
- waiter.release()
- try:
- __waiters.remove(waiter)
- except ValueError:
- pass
- # END handle n = 1 case faster
-
-class PerfQueue(Queue):
- """A queue using different condition objects to gain multithreading performance"""
- def __init__(self, maxsize=0):
- Queue.__init__(self, maxsize)
-
- self.not_empty = HSCondition(self.mutex)
- self.not_full = HSCondition(self.mutex)
- self.all_tasks_done = HSCondition(self.mutex)
-
-
-#} END utilities
-
class RPoolChannel(RChannel):
""" A read-only pool channel may not be wrapped or derived from, but it provides slots to call
before and after an item is to be read.
@@ -237,7 +144,7 @@ class Pool(object):
self._tasks = Graph()
self._consumed_tasks = None
self._workers = list()
- self._queue = SyncQueue() # start with a sync queue
+ self._queue = self.TaskQueueCls()
self._taskgraph_lock = self.LockCls()
self._taskorder_cache = dict()
self.set_size(size)
@@ -375,7 +282,10 @@ class Pool(object):
self._consumed_tasks.put(task)
# END handle consumption
- # delete consumed tasks to cleanup
+ self._handle_consumed_tasks()
+
+ def _handle_consumed_tasks(self):
+ """Remove all consumed tasks from our queue by deleting them"""
try:
while True:
ct = self._consumed_tasks.get(False)
@@ -384,7 +294,7 @@ class Pool(object):
except Empty:
pass
# END pop queue empty
-
+
def _del_task_if_orphaned(self, task):
"""Check the task, and delete it if it is orphaned"""
if sys.getrefcount(task._out_wc) < 3:
@@ -415,11 +325,7 @@ class Pool(object):
cur_count = len(self._workers)
if cur_count < size:
# make sure we have a real queue, and can store our consumed tasks properly
- if not isinstance(self._queue, self.TaskQueueCls):
- if self._queue is not None and not self._queue.empty():
- raise AssertionError("Expected empty queue when switching the queue type")
- # END safety check
- self._queue = self.TaskQueueCls()
+ if not isinstance(self._consumed_tasks, self.TaskQueueCls):
self._consumed_tasks = Queue()
# END init queue
@@ -445,13 +351,8 @@ class Pool(object):
continue
# END while there are tasks on the queue
- # use a serial queue, its faster
- if not isinstance(self._queue, SyncQueue):
- self._queue = SyncQueue()
- # END handle queue type
-
if self._consumed_tasks and not self._consumed_tasks.empty():
- self._post_channel_read(self._consumed_tasks.pop())
+ self._handle_consumed_tasks()
# END assure consumed tasks are empty
self._consumed_tasks = SyncQueue()
# END process queue
@@ -467,6 +368,8 @@ class Pool(object):
output channel is only held by themselves, so no one will ever consume
its items.
+ This method blocks until all tasks to be removed have been processed, if
+ they are currently being processed.
:return: self"""
# now delete our actual node - must set it done os it closes its channels.
# Otherwise further reads of output tasks will block.
@@ -478,6 +381,21 @@ class Pool(object):
self._taskgraph_lock.acquire()
try:
self._taskorder_cache.clear()
+ # before we can delete the task, make sure its write channel
+ # is closed, otherwise people might still be waiting for its result.
+ # If a channel is not closed, this could also mean its not yet fully
+ # processed, but more importantly, there must be no task being processed
+ # right now.
+ # TODO: figure this out
+ for worker in self._workers:
+ r = worker.routine()
+ if r and r.im_self is task:
+ raise NotImplementedError("todo")
+ # END handle running task
+ # END check for in-progress routine
+
+ # its done, close the channel for writing
+ task.close()
self._tasks.del_node(task)
finally:
self._taskgraph_lock.release()
@@ -497,11 +415,11 @@ class Pool(object):
# create a write channel for it
wc, rc = Channel()
rc = RPoolChannel(wc, task, self)
- task._out_wc = wc
+ task.set_wc(wc)
has_input_channel = isinstance(task, InputChannelTask)
if has_input_channel:
- task._pool_ref = weakref.ref(self)
+ task.set_pool(self)
# END init input channel task
self._taskgraph_lock.acquire()
@@ -534,4 +452,4 @@ class ThreadPool(Pool):
"""A pool using threads as worker"""
WorkerCls = WorkerThread
LockCls = Lock
- TaskQueueCls = PerfQueue
+ TaskQueueCls = AsyncQueue