diff options
Diffstat (limited to 'lib/git/async')
-rw-r--r-- | lib/git/async/pool.py | 154 | ||||
-rw-r--r-- | lib/git/async/task.py | 26 | ||||
-rw-r--r-- | lib/git/async/thread.py | 16 | ||||
-rw-r--r-- | lib/git/async/util.py | 106 |
4 files changed, 176 insertions, 126 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 30291835..227cabfc 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,121 +1,28 @@ """Implementation of a thread-pool working with channels""" from thread import WorkerThread +from threading import Lock -from threading import ( - Lock, - _Condition, - _sleep, - _time, +from util import ( + SyncQueue, + AsyncQueue, ) from task import InputChannelTask -from Queue import Queue, Empty -from collections import deque - -from graph import ( - Graph, +from Queue import ( + Queue, + Empty ) +from graph import Graph from channel import ( Channel, WChannel, RChannel ) -import weakref import sys -#{ Utilities - -class SyncQueue(deque): - """Adapter to allow using a deque like a queue, without locking""" - def get(self, block=True, timeout=None): - try: - return self.pop() - except IndexError: - raise Empty - # END raise empty - - def empty(self): - return len(self) == 0 - - put = deque.append - - -class HSCondition(_Condition): - """An attempt to make conditions less blocking, which gains performance - in return by sleeping less""" - delay = 0.00002 # reduces wait times, but increases overhead - - def wait(self, timeout=None): - waiter = Lock() - waiter.acquire() - self.__dict__['_Condition__waiters'].append(waiter) - saved_state = self._release_save() - try: # restore state no matter what (e.g., KeyboardInterrupt) - if timeout is None: - waiter.acquire() - else: - # Balancing act: We can't afford a pure busy loop, so we - # have to sleep; but if we sleep the whole timeout time, - # we'll be unresponsive. The scheme here sleeps very - # little at first, longer as time goes on, but never longer - # than 20 times per second (or the timeout time remaining). - endtime = _time() + timeout - delay = self.delay - acquire = waiter.acquire - while True: - gotit = acquire(0) - if gotit: - break - remaining = endtime - _time() - if remaining <= 0: - break - delay = min(delay * 2, remaining, .05) - _sleep(delay) - # END endless loop - if not gotit: - try: - self.__dict__['_Condition__waiters'].remove(waiter) - except ValueError: - pass - # END didn't ever get it - finally: - self._acquire_restore(saved_state) - - def notify(self, n=1): - __waiters = self.__dict__['_Condition__waiters'] - if not __waiters: - return - if n == 1: - __waiters[0].release() - try: - __waiters.pop(0) - except IndexError: - pass - else: - waiters = __waiters[:n] - for waiter in waiters: - waiter.release() - try: - __waiters.remove(waiter) - except ValueError: - pass - # END handle n = 1 case faster - -class PerfQueue(Queue): - """A queue using different condition objects to gain multithreading performance""" - def __init__(self, maxsize=0): - Queue.__init__(self, maxsize) - - self.not_empty = HSCondition(self.mutex) - self.not_full = HSCondition(self.mutex) - self.all_tasks_done = HSCondition(self.mutex) - - -#} END utilities - class RPoolChannel(RChannel): """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call before and after an item is to be read. @@ -237,7 +144,7 @@ class Pool(object): self._tasks = Graph() self._consumed_tasks = None self._workers = list() - self._queue = SyncQueue() # start with a sync queue + self._queue = self.TaskQueueCls() self._taskgraph_lock = self.LockCls() self._taskorder_cache = dict() self.set_size(size) @@ -375,7 +282,10 @@ class Pool(object): self._consumed_tasks.put(task) # END handle consumption - # delete consumed tasks to cleanup + self._handle_consumed_tasks() + + def _handle_consumed_tasks(self): + """Remove all consumed tasks from our queue by deleting them""" try: while True: ct = self._consumed_tasks.get(False) @@ -384,7 +294,7 @@ class Pool(object): except Empty: pass # END pop queue empty - + def _del_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" if sys.getrefcount(task._out_wc) < 3: @@ -415,11 +325,7 @@ class Pool(object): cur_count = len(self._workers) if cur_count < size: # make sure we have a real queue, and can store our consumed tasks properly - if not isinstance(self._queue, self.TaskQueueCls): - if self._queue is not None and not self._queue.empty(): - raise AssertionError("Expected empty queue when switching the queue type") - # END safety check - self._queue = self.TaskQueueCls() + if not isinstance(self._consumed_tasks, self.TaskQueueCls): self._consumed_tasks = Queue() # END init queue @@ -445,13 +351,8 @@ class Pool(object): continue # END while there are tasks on the queue - # use a serial queue, its faster - if not isinstance(self._queue, SyncQueue): - self._queue = SyncQueue() - # END handle queue type - if self._consumed_tasks and not self._consumed_tasks.empty(): - self._post_channel_read(self._consumed_tasks.pop()) + self._handle_consumed_tasks() # END assure consumed tasks are empty self._consumed_tasks = SyncQueue() # END process queue @@ -467,6 +368,8 @@ class Pool(object): output channel is only held by themselves, so no one will ever consume its items. + This method blocks until all tasks to be removed have been processed, if + they are currently being processed. :return: self""" # now delete our actual node - must set it done os it closes its channels. # Otherwise further reads of output tasks will block. @@ -478,6 +381,21 @@ class Pool(object): self._taskgraph_lock.acquire() try: self._taskorder_cache.clear() + # before we can delete the task, make sure its write channel + # is closed, otherwise people might still be waiting for its result. + # If a channel is not closed, this could also mean its not yet fully + # processed, but more importantly, there must be no task being processed + # right now. + # TODO: figure this out + for worker in self._workers: + r = worker.routine() + if r and r.im_self is task: + raise NotImplementedError("todo") + # END handle running task + # END check for in-progress routine + + # its done, close the channel for writing + task.close() self._tasks.del_node(task) finally: self._taskgraph_lock.release() @@ -497,11 +415,11 @@ class Pool(object): # create a write channel for it wc, rc = Channel() rc = RPoolChannel(wc, task, self) - task._out_wc = wc + task.set_wc(wc) has_input_channel = isinstance(task, InputChannelTask) if has_input_channel: - task._pool_ref = weakref.ref(self) + task.set_pool(self) # END init input channel task self._taskgraph_lock.acquire() @@ -534,4 +452,4 @@ class ThreadPool(Pool): """A pool using threads as worker""" WorkerCls = WorkerThread LockCls = Lock - TaskQueueCls = PerfQueue + TaskQueueCls = AsyncQueue diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 3137746c..f106c381 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,5 +1,7 @@ from graph import Node + import threading +import weakref import new class OutputChannelTask(Node): @@ -17,6 +19,7 @@ class OutputChannelTask(Node): __slots__ = ( '_read', # method to yield items to process '_out_wc', # output write channel '_exc', # exception caught + '_done', # True if we are done 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -28,6 +31,7 @@ class OutputChannelTask(Node): self._read = None # to be set by subclasss self._out_wc = None # to be set later self._exc = None + self._done = False self.fun = fun self.min_count = None self.max_chunksize = 0 # note set @@ -35,12 +39,28 @@ class OutputChannelTask(Node): def is_done(self): """:return: True if we are finished processing""" - return self._out_wc.closed + return self._done def set_done(self): """Set ourselves to being done, has we have completed the processing""" + self._done = True + self.close() + + def set_wc(self, wc): + """Set the write channel to the given one + :note: resets it done state in order to allow proper queue handling""" + self._done = False + self._out_wc = wc + + def close(self): + """A closed task will close its channel to assure the readers will wake up + :note: its safe to call this method multiple times""" self._out_wc.close() + def is_closed(self): + """:return: True if the task's write channel is closed""" + return self._out_wc.closed + def error(self): """:return: Exception caught during last processing or None""" return self._exc @@ -148,5 +168,9 @@ class InputChannelTask(OutputChannelTask): # and call it return OutputChannelTask.process(self, count) + + def set_pool(self, pool): + """Set our pool to the given one, it will be weakref'd""" + self._pool_ref = weakref.ref(pool) #{ Configuration diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index f7f0c978..4240a664 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -110,7 +110,7 @@ class WorkerThread(TerminatableThread): t[1] = optional, tuple or list of arguments to pass to the routine t[2] = optional, dictionary of keyword arguments to pass to the routine """ - __slots__ = ('inq', 'outq') + __slots__ = ('inq', '_current_routine') # define how often we should check for a shutdown request in case our @@ -120,10 +120,12 @@ class WorkerThread(TerminatableThread): def __init__(self, inq = None): super(WorkerThread, self).__init__() self.inq = inq or Queue.Queue() + self._current_routine = None # routine we execute right now def run(self): """Process input tasks until we receive the quit signal""" while True: + self._current_routine = None if self._should_terminate(): break # END check for stop request @@ -138,8 +140,9 @@ class WorkerThread(TerminatableThread): # needing exactly one function, and one arg assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need" routine, arg = tasktuple - # DEBUG - # print "%s: picked up: %s(%s)" % (self.name, routine, arg) + + self._current_routine = routine + try: rval = None if inspect.ismethod(routine): @@ -154,16 +157,15 @@ class WorkerThread(TerminatableThread): print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) break # END make routine call - except StopIteration: - break except Exception,e: print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) break # abort ... # END routine exception handling # END endless loop - def quit(self): - raise StopIteration + def routine(self): + """:return: routine we are currently executing, or None if we have no task""" + return self._current_routine #} END classes diff --git a/lib/git/async/util.py b/lib/git/async/util.py index dabd8a42..432d1736 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -1,8 +1,23 @@ """Module with utilities related to async operations""" +from threading import ( + Lock, + _Condition, + _sleep, + _time, + ) + +from Queue import ( + Queue, + Empty, + ) + +from collections import deque import sys import os +#{ Routines + def cpu_count(): """:return:number of CPUs in the system :note: inspired by multiprocessing""" @@ -22,3 +37,94 @@ def cpu_count(): raise NotImplementedError('cannot determine number of cpus') return num + +#} END routines + + +class SyncQueue(deque): + """Adapter to allow using a deque like a queue, without locking""" + def get(self, block=True, timeout=None): + try: + return self.pop() + except IndexError: + raise Empty + # END raise empty + + def empty(self): + return len(self) == 0 + + put = deque.append + + +class HSCondition(_Condition): + """An attempt to make conditions less blocking, which gains performance + in return by sleeping less""" + delay = 0.00002 # reduces wait times, but increases overhead + + def wait(self, timeout=None): + waiter = Lock() + waiter.acquire() + self.__dict__['_Condition__waiters'].append(waiter) + saved_state = self._release_save() + try: # restore state no matter what (e.g., KeyboardInterrupt) + if timeout is None: + waiter.acquire() + else: + # Balancing act: We can't afford a pure busy loop, so we + # have to sleep; but if we sleep the whole timeout time, + # we'll be unresponsive. The scheme here sleeps very + # little at first, longer as time goes on, but never longer + # than 20 times per second (or the timeout time remaining). + endtime = _time() + timeout + delay = self.delay + acquire = waiter.acquire + while True: + gotit = acquire(0) + if gotit: + break + remaining = endtime - _time() + if remaining <= 0: + break + delay = min(delay * 2, remaining, .05) + _sleep(delay) + # END endless loop + if not gotit: + try: + self.__dict__['_Condition__waiters'].remove(waiter) + except ValueError: + pass + # END didn't ever get it + finally: + self._acquire_restore(saved_state) + + def notify(self, n=1): + __waiters = self.__dict__['_Condition__waiters'] + if not __waiters: + return + if n == 1: + __waiters[0].release() + try: + __waiters.pop(0) + except IndexError: + pass + else: + waiters = __waiters[:n] + for waiter in waiters: + waiter.release() + try: + __waiters.remove(waiter) + except ValueError: + pass + # END handle n = 1 case faster + +class AsyncQueue(Queue): + """A queue using different condition objects to gain multithreading performance""" + def __init__(self, maxsize=0): + Queue.__init__(self, maxsize) + + self.not_empty = HSCondition(self.mutex) + self.not_full = HSCondition(self.mutex) + self.all_tasks_done = HSCondition(self.mutex) + + +#} END utilities |