summaryrefslogtreecommitdiff
path: root/lib/git/async
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async')
-rw-r--r--lib/git/async/pool.py154
-rw-r--r--lib/git/async/task.py26
-rw-r--r--lib/git/async/thread.py16
-rw-r--r--lib/git/async/util.py106
4 files changed, 176 insertions, 126 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 30291835..227cabfc 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -1,121 +1,28 @@
"""Implementation of a thread-pool working with channels"""
from thread import WorkerThread
+from threading import Lock
-from threading import (
- Lock,
- _Condition,
- _sleep,
- _time,
+from util import (
+ SyncQueue,
+ AsyncQueue,
)
from task import InputChannelTask
-from Queue import Queue, Empty
-from collections import deque
-
-from graph import (
- Graph,
+from Queue import (
+ Queue,
+ Empty
)
+from graph import Graph
from channel import (
Channel,
WChannel,
RChannel
)
-import weakref
import sys
-#{ Utilities
-
-class SyncQueue(deque):
- """Adapter to allow using a deque like a queue, without locking"""
- def get(self, block=True, timeout=None):
- try:
- return self.pop()
- except IndexError:
- raise Empty
- # END raise empty
-
- def empty(self):
- return len(self) == 0
-
- put = deque.append
-
-
-class HSCondition(_Condition):
- """An attempt to make conditions less blocking, which gains performance
- in return by sleeping less"""
- delay = 0.00002 # reduces wait times, but increases overhead
-
- def wait(self, timeout=None):
- waiter = Lock()
- waiter.acquire()
- self.__dict__['_Condition__waiters'].append(waiter)
- saved_state = self._release_save()
- try: # restore state no matter what (e.g., KeyboardInterrupt)
- if timeout is None:
- waiter.acquire()
- else:
- # Balancing act: We can't afford a pure busy loop, so we
- # have to sleep; but if we sleep the whole timeout time,
- # we'll be unresponsive. The scheme here sleeps very
- # little at first, longer as time goes on, but never longer
- # than 20 times per second (or the timeout time remaining).
- endtime = _time() + timeout
- delay = self.delay
- acquire = waiter.acquire
- while True:
- gotit = acquire(0)
- if gotit:
- break
- remaining = endtime - _time()
- if remaining <= 0:
- break
- delay = min(delay * 2, remaining, .05)
- _sleep(delay)
- # END endless loop
- if not gotit:
- try:
- self.__dict__['_Condition__waiters'].remove(waiter)
- except ValueError:
- pass
- # END didn't ever get it
- finally:
- self._acquire_restore(saved_state)
-
- def notify(self, n=1):
- __waiters = self.__dict__['_Condition__waiters']
- if not __waiters:
- return
- if n == 1:
- __waiters[0].release()
- try:
- __waiters.pop(0)
- except IndexError:
- pass
- else:
- waiters = __waiters[:n]
- for waiter in waiters:
- waiter.release()
- try:
- __waiters.remove(waiter)
- except ValueError:
- pass
- # END handle n = 1 case faster
-
-class PerfQueue(Queue):
- """A queue using different condition objects to gain multithreading performance"""
- def __init__(self, maxsize=0):
- Queue.__init__(self, maxsize)
-
- self.not_empty = HSCondition(self.mutex)
- self.not_full = HSCondition(self.mutex)
- self.all_tasks_done = HSCondition(self.mutex)
-
-
-#} END utilities
-
class RPoolChannel(RChannel):
""" A read-only pool channel may not be wrapped or derived from, but it provides slots to call
before and after an item is to be read.
@@ -237,7 +144,7 @@ class Pool(object):
self._tasks = Graph()
self._consumed_tasks = None
self._workers = list()
- self._queue = SyncQueue() # start with a sync queue
+ self._queue = self.TaskQueueCls()
self._taskgraph_lock = self.LockCls()
self._taskorder_cache = dict()
self.set_size(size)
@@ -375,7 +282,10 @@ class Pool(object):
self._consumed_tasks.put(task)
# END handle consumption
- # delete consumed tasks to cleanup
+ self._handle_consumed_tasks()
+
+ def _handle_consumed_tasks(self):
+ """Remove all consumed tasks from our queue by deleting them"""
try:
while True:
ct = self._consumed_tasks.get(False)
@@ -384,7 +294,7 @@ class Pool(object):
except Empty:
pass
# END pop queue empty
-
+
def _del_task_if_orphaned(self, task):
"""Check the task, and delete it if it is orphaned"""
if sys.getrefcount(task._out_wc) < 3:
@@ -415,11 +325,7 @@ class Pool(object):
cur_count = len(self._workers)
if cur_count < size:
# make sure we have a real queue, and can store our consumed tasks properly
- if not isinstance(self._queue, self.TaskQueueCls):
- if self._queue is not None and not self._queue.empty():
- raise AssertionError("Expected empty queue when switching the queue type")
- # END safety check
- self._queue = self.TaskQueueCls()
+ if not isinstance(self._consumed_tasks, self.TaskQueueCls):
self._consumed_tasks = Queue()
# END init queue
@@ -445,13 +351,8 @@ class Pool(object):
continue
# END while there are tasks on the queue
- # use a serial queue, its faster
- if not isinstance(self._queue, SyncQueue):
- self._queue = SyncQueue()
- # END handle queue type
-
if self._consumed_tasks and not self._consumed_tasks.empty():
- self._post_channel_read(self._consumed_tasks.pop())
+ self._handle_consumed_tasks()
# END assure consumed tasks are empty
self._consumed_tasks = SyncQueue()
# END process queue
@@ -467,6 +368,8 @@ class Pool(object):
output channel is only held by themselves, so no one will ever consume
its items.
+ This method blocks until all tasks to be removed have been processed, if
+ they are currently being processed.
:return: self"""
# now delete our actual node - must set it done os it closes its channels.
# Otherwise further reads of output tasks will block.
@@ -478,6 +381,21 @@ class Pool(object):
self._taskgraph_lock.acquire()
try:
self._taskorder_cache.clear()
+ # before we can delete the task, make sure its write channel
+ # is closed, otherwise people might still be waiting for its result.
+ # If a channel is not closed, this could also mean its not yet fully
+ # processed, but more importantly, there must be no task being processed
+ # right now.
+ # TODO: figure this out
+ for worker in self._workers:
+ r = worker.routine()
+ if r and r.im_self is task:
+ raise NotImplementedError("todo")
+ # END handle running task
+ # END check for in-progress routine
+
+ # its done, close the channel for writing
+ task.close()
self._tasks.del_node(task)
finally:
self._taskgraph_lock.release()
@@ -497,11 +415,11 @@ class Pool(object):
# create a write channel for it
wc, rc = Channel()
rc = RPoolChannel(wc, task, self)
- task._out_wc = wc
+ task.set_wc(wc)
has_input_channel = isinstance(task, InputChannelTask)
if has_input_channel:
- task._pool_ref = weakref.ref(self)
+ task.set_pool(self)
# END init input channel task
self._taskgraph_lock.acquire()
@@ -534,4 +452,4 @@ class ThreadPool(Pool):
"""A pool using threads as worker"""
WorkerCls = WorkerThread
LockCls = Lock
- TaskQueueCls = PerfQueue
+ TaskQueueCls = AsyncQueue
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index 3137746c..f106c381 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -1,5 +1,7 @@
from graph import Node
+
import threading
+import weakref
import new
class OutputChannelTask(Node):
@@ -17,6 +19,7 @@ class OutputChannelTask(Node):
__slots__ = ( '_read', # method to yield items to process
'_out_wc', # output write channel
'_exc', # exception caught
+ '_done', # True if we are done
'fun', # function to call with items read
'min_count', # minimum amount of items to produce, None means no override
'max_chunksize', # maximium amount of items to process per process call
@@ -28,6 +31,7 @@ class OutputChannelTask(Node):
self._read = None # to be set by subclasss
self._out_wc = None # to be set later
self._exc = None
+ self._done = False
self.fun = fun
self.min_count = None
self.max_chunksize = 0 # note set
@@ -35,12 +39,28 @@ class OutputChannelTask(Node):
def is_done(self):
""":return: True if we are finished processing"""
- return self._out_wc.closed
+ return self._done
def set_done(self):
"""Set ourselves to being done, has we have completed the processing"""
+ self._done = True
+ self.close()
+
+ def set_wc(self, wc):
+ """Set the write channel to the given one
+ :note: resets it done state in order to allow proper queue handling"""
+ self._done = False
+ self._out_wc = wc
+
+ def close(self):
+ """A closed task will close its channel to assure the readers will wake up
+ :note: its safe to call this method multiple times"""
self._out_wc.close()
+ def is_closed(self):
+ """:return: True if the task's write channel is closed"""
+ return self._out_wc.closed
+
def error(self):
""":return: Exception caught during last processing or None"""
return self._exc
@@ -148,5 +168,9 @@ class InputChannelTask(OutputChannelTask):
# and call it
return OutputChannelTask.process(self, count)
+
+ def set_pool(self, pool):
+ """Set our pool to the given one, it will be weakref'd"""
+ self._pool_ref = weakref.ref(pool)
#{ Configuration
diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py
index f7f0c978..4240a664 100644
--- a/lib/git/async/thread.py
+++ b/lib/git/async/thread.py
@@ -110,7 +110,7 @@ class WorkerThread(TerminatableThread):
t[1] = optional, tuple or list of arguments to pass to the routine
t[2] = optional, dictionary of keyword arguments to pass to the routine
"""
- __slots__ = ('inq', 'outq')
+ __slots__ = ('inq', '_current_routine')
# define how often we should check for a shutdown request in case our
@@ -120,10 +120,12 @@ class WorkerThread(TerminatableThread):
def __init__(self, inq = None):
super(WorkerThread, self).__init__()
self.inq = inq or Queue.Queue()
+ self._current_routine = None # routine we execute right now
def run(self):
"""Process input tasks until we receive the quit signal"""
while True:
+ self._current_routine = None
if self._should_terminate():
break
# END check for stop request
@@ -138,8 +140,9 @@ class WorkerThread(TerminatableThread):
# needing exactly one function, and one arg
assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need"
routine, arg = tasktuple
- # DEBUG
- # print "%s: picked up: %s(%s)" % (self.name, routine, arg)
+
+ self._current_routine = routine
+
try:
rval = None
if inspect.ismethod(routine):
@@ -154,16 +157,15 @@ class WorkerThread(TerminatableThread):
print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
break
# END make routine call
- except StopIteration:
- break
except Exception,e:
print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e))
break # abort ...
# END routine exception handling
# END endless loop
- def quit(self):
- raise StopIteration
+ def routine(self):
+ """:return: routine we are currently executing, or None if we have no task"""
+ return self._current_routine
#} END classes
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
index dabd8a42..432d1736 100644
--- a/lib/git/async/util.py
+++ b/lib/git/async/util.py
@@ -1,8 +1,23 @@
"""Module with utilities related to async operations"""
+from threading import (
+ Lock,
+ _Condition,
+ _sleep,
+ _time,
+ )
+
+from Queue import (
+ Queue,
+ Empty,
+ )
+
+from collections import deque
import sys
import os
+#{ Routines
+
def cpu_count():
""":return:number of CPUs in the system
:note: inspired by multiprocessing"""
@@ -22,3 +37,94 @@ def cpu_count():
raise NotImplementedError('cannot determine number of cpus')
return num
+
+#} END routines
+
+
+class SyncQueue(deque):
+ """Adapter to allow using a deque like a queue, without locking"""
+ def get(self, block=True, timeout=None):
+ try:
+ return self.pop()
+ except IndexError:
+ raise Empty
+ # END raise empty
+
+ def empty(self):
+ return len(self) == 0
+
+ put = deque.append
+
+
+class HSCondition(_Condition):
+ """An attempt to make conditions less blocking, which gains performance
+ in return by sleeping less"""
+ delay = 0.00002 # reduces wait times, but increases overhead
+
+ def wait(self, timeout=None):
+ waiter = Lock()
+ waiter.acquire()
+ self.__dict__['_Condition__waiters'].append(waiter)
+ saved_state = self._release_save()
+ try: # restore state no matter what (e.g., KeyboardInterrupt)
+ if timeout is None:
+ waiter.acquire()
+ else:
+ # Balancing act: We can't afford a pure busy loop, so we
+ # have to sleep; but if we sleep the whole timeout time,
+ # we'll be unresponsive. The scheme here sleeps very
+ # little at first, longer as time goes on, but never longer
+ # than 20 times per second (or the timeout time remaining).
+ endtime = _time() + timeout
+ delay = self.delay
+ acquire = waiter.acquire
+ while True:
+ gotit = acquire(0)
+ if gotit:
+ break
+ remaining = endtime - _time()
+ if remaining <= 0:
+ break
+ delay = min(delay * 2, remaining, .05)
+ _sleep(delay)
+ # END endless loop
+ if not gotit:
+ try:
+ self.__dict__['_Condition__waiters'].remove(waiter)
+ except ValueError:
+ pass
+ # END didn't ever get it
+ finally:
+ self._acquire_restore(saved_state)
+
+ def notify(self, n=1):
+ __waiters = self.__dict__['_Condition__waiters']
+ if not __waiters:
+ return
+ if n == 1:
+ __waiters[0].release()
+ try:
+ __waiters.pop(0)
+ except IndexError:
+ pass
+ else:
+ waiters = __waiters[:n]
+ for waiter in waiters:
+ waiter.release()
+ try:
+ __waiters.remove(waiter)
+ except ValueError:
+ pass
+ # END handle n = 1 case faster
+
+class AsyncQueue(Queue):
+ """A queue using different condition objects to gain multithreading performance"""
+ def __init__(self, maxsize=0):
+ Queue.__init__(self, maxsize)
+
+ self.not_empty = HSCondition(self.mutex)
+ self.not_full = HSCondition(self.mutex)
+ self.all_tasks_done = HSCondition(self.mutex)
+
+
+#} END utilities