diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 19:12:44 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 19:12:44 +0200 |
commit | 583cd8807259a69fc01874b798f657c1f9ab7828 (patch) | |
tree | 046847c4dcd33f5b30c00ff65770039fc72dd148 /lib/git/async/pool.py | |
parent | edd9e23c766cfd51b3a6f6eee5aac0b791ef2fd0 (diff) | |
download | gitpython-583cd8807259a69fc01874b798f657c1f9ab7828.tar.gz |
Moved pool utilities into util module, fixed critical issue that caused havok - lets call this a safe-state
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r-- | lib/git/async/pool.py | 154 |
1 files changed, 36 insertions, 118 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 30291835..227cabfc 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,121 +1,28 @@ """Implementation of a thread-pool working with channels""" from thread import WorkerThread +from threading import Lock -from threading import ( - Lock, - _Condition, - _sleep, - _time, +from util import ( + SyncQueue, + AsyncQueue, ) from task import InputChannelTask -from Queue import Queue, Empty -from collections import deque - -from graph import ( - Graph, +from Queue import ( + Queue, + Empty ) +from graph import Graph from channel import ( Channel, WChannel, RChannel ) -import weakref import sys -#{ Utilities - -class SyncQueue(deque): - """Adapter to allow using a deque like a queue, without locking""" - def get(self, block=True, timeout=None): - try: - return self.pop() - except IndexError: - raise Empty - # END raise empty - - def empty(self): - return len(self) == 0 - - put = deque.append - - -class HSCondition(_Condition): - """An attempt to make conditions less blocking, which gains performance - in return by sleeping less""" - delay = 0.00002 # reduces wait times, but increases overhead - - def wait(self, timeout=None): - waiter = Lock() - waiter.acquire() - self.__dict__['_Condition__waiters'].append(waiter) - saved_state = self._release_save() - try: # restore state no matter what (e.g., KeyboardInterrupt) - if timeout is None: - waiter.acquire() - else: - # Balancing act: We can't afford a pure busy loop, so we - # have to sleep; but if we sleep the whole timeout time, - # we'll be unresponsive. The scheme here sleeps very - # little at first, longer as time goes on, but never longer - # than 20 times per second (or the timeout time remaining). - endtime = _time() + timeout - delay = self.delay - acquire = waiter.acquire - while True: - gotit = acquire(0) - if gotit: - break - remaining = endtime - _time() - if remaining <= 0: - break - delay = min(delay * 2, remaining, .05) - _sleep(delay) - # END endless loop - if not gotit: - try: - self.__dict__['_Condition__waiters'].remove(waiter) - except ValueError: - pass - # END didn't ever get it - finally: - self._acquire_restore(saved_state) - - def notify(self, n=1): - __waiters = self.__dict__['_Condition__waiters'] - if not __waiters: - return - if n == 1: - __waiters[0].release() - try: - __waiters.pop(0) - except IndexError: - pass - else: - waiters = __waiters[:n] - for waiter in waiters: - waiter.release() - try: - __waiters.remove(waiter) - except ValueError: - pass - # END handle n = 1 case faster - -class PerfQueue(Queue): - """A queue using different condition objects to gain multithreading performance""" - def __init__(self, maxsize=0): - Queue.__init__(self, maxsize) - - self.not_empty = HSCondition(self.mutex) - self.not_full = HSCondition(self.mutex) - self.all_tasks_done = HSCondition(self.mutex) - - -#} END utilities - class RPoolChannel(RChannel): """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call before and after an item is to be read. @@ -237,7 +144,7 @@ class Pool(object): self._tasks = Graph() self._consumed_tasks = None self._workers = list() - self._queue = SyncQueue() # start with a sync queue + self._queue = self.TaskQueueCls() self._taskgraph_lock = self.LockCls() self._taskorder_cache = dict() self.set_size(size) @@ -375,7 +282,10 @@ class Pool(object): self._consumed_tasks.put(task) # END handle consumption - # delete consumed tasks to cleanup + self._handle_consumed_tasks() + + def _handle_consumed_tasks(self): + """Remove all consumed tasks from our queue by deleting them""" try: while True: ct = self._consumed_tasks.get(False) @@ -384,7 +294,7 @@ class Pool(object): except Empty: pass # END pop queue empty - + def _del_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" if sys.getrefcount(task._out_wc) < 3: @@ -415,11 +325,7 @@ class Pool(object): cur_count = len(self._workers) if cur_count < size: # make sure we have a real queue, and can store our consumed tasks properly - if not isinstance(self._queue, self.TaskQueueCls): - if self._queue is not None and not self._queue.empty(): - raise AssertionError("Expected empty queue when switching the queue type") - # END safety check - self._queue = self.TaskQueueCls() + if not isinstance(self._consumed_tasks, self.TaskQueueCls): self._consumed_tasks = Queue() # END init queue @@ -445,13 +351,8 @@ class Pool(object): continue # END while there are tasks on the queue - # use a serial queue, its faster - if not isinstance(self._queue, SyncQueue): - self._queue = SyncQueue() - # END handle queue type - if self._consumed_tasks and not self._consumed_tasks.empty(): - self._post_channel_read(self._consumed_tasks.pop()) + self._handle_consumed_tasks() # END assure consumed tasks are empty self._consumed_tasks = SyncQueue() # END process queue @@ -467,6 +368,8 @@ class Pool(object): output channel is only held by themselves, so no one will ever consume its items. + This method blocks until all tasks to be removed have been processed, if + they are currently being processed. :return: self""" # now delete our actual node - must set it done os it closes its channels. # Otherwise further reads of output tasks will block. @@ -478,6 +381,21 @@ class Pool(object): self._taskgraph_lock.acquire() try: self._taskorder_cache.clear() + # before we can delete the task, make sure its write channel + # is closed, otherwise people might still be waiting for its result. + # If a channel is not closed, this could also mean its not yet fully + # processed, but more importantly, there must be no task being processed + # right now. + # TODO: figure this out + for worker in self._workers: + r = worker.routine() + if r and r.im_self is task: + raise NotImplementedError("todo") + # END handle running task + # END check for in-progress routine + + # its done, close the channel for writing + task.close() self._tasks.del_node(task) finally: self._taskgraph_lock.release() @@ -497,11 +415,11 @@ class Pool(object): # create a write channel for it wc, rc = Channel() rc = RPoolChannel(wc, task, self) - task._out_wc = wc + task.set_wc(wc) has_input_channel = isinstance(task, InputChannelTask) if has_input_channel: - task._pool_ref = weakref.ref(self) + task.set_pool(self) # END init input channel task self._taskgraph_lock.acquire() @@ -534,4 +452,4 @@ class ThreadPool(Pool): """A pool using threads as worker""" WorkerCls = WorkerThread LockCls = Lock - TaskQueueCls = PerfQueue + TaskQueueCls = AsyncQueue |