summaryrefslogtreecommitdiff
path: root/lib/git/async/util.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-07 19:12:44 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-07 19:12:44 +0200
commit583cd8807259a69fc01874b798f657c1f9ab7828 (patch)
tree046847c4dcd33f5b30c00ff65770039fc72dd148 /lib/git/async/util.py
parentedd9e23c766cfd51b3a6f6eee5aac0b791ef2fd0 (diff)
downloadgitpython-583cd8807259a69fc01874b798f657c1f9ab7828.tar.gz
Moved pool utilities into util module, fixed critical issue that caused havok - lets call this a safe-state
Diffstat (limited to 'lib/git/async/util.py')
-rw-r--r--lib/git/async/util.py106
1 files changed, 106 insertions, 0 deletions
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
index dabd8a42..432d1736 100644
--- a/lib/git/async/util.py
+++ b/lib/git/async/util.py
@@ -1,8 +1,23 @@
"""Module with utilities related to async operations"""
+from threading import (
+ Lock,
+ _Condition,
+ _sleep,
+ _time,
+ )
+
+from Queue import (
+ Queue,
+ Empty,
+ )
+
+from collections import deque
import sys
import os
+#{ Routines
+
def cpu_count():
""":return:number of CPUs in the system
:note: inspired by multiprocessing"""
@@ -22,3 +37,94 @@ def cpu_count():
raise NotImplementedError('cannot determine number of cpus')
return num
+
+#} END routines
+
+
+class SyncQueue(deque):
+ """Adapter to allow using a deque like a queue, without locking"""
+ def get(self, block=True, timeout=None):
+ try:
+ return self.pop()
+ except IndexError:
+ raise Empty
+ # END raise empty
+
+ def empty(self):
+ return len(self) == 0
+
+ put = deque.append
+
+
+class HSCondition(_Condition):
+ """An attempt to make conditions less blocking, which gains performance
+ in return by sleeping less"""
+ delay = 0.00002 # reduces wait times, but increases overhead
+
+ def wait(self, timeout=None):
+ waiter = Lock()
+ waiter.acquire()
+ self.__dict__['_Condition__waiters'].append(waiter)
+ saved_state = self._release_save()
+ try: # restore state no matter what (e.g., KeyboardInterrupt)
+ if timeout is None:
+ waiter.acquire()
+ else:
+ # Balancing act: We can't afford a pure busy loop, so we
+ # have to sleep; but if we sleep the whole timeout time,
+ # we'll be unresponsive. The scheme here sleeps very
+ # little at first, longer as time goes on, but never longer
+ # than 20 times per second (or the timeout time remaining).
+ endtime = _time() + timeout
+ delay = self.delay
+ acquire = waiter.acquire
+ while True:
+ gotit = acquire(0)
+ if gotit:
+ break
+ remaining = endtime - _time()
+ if remaining <= 0:
+ break
+ delay = min(delay * 2, remaining, .05)
+ _sleep(delay)
+ # END endless loop
+ if not gotit:
+ try:
+ self.__dict__['_Condition__waiters'].remove(waiter)
+ except ValueError:
+ pass
+ # END didn't ever get it
+ finally:
+ self._acquire_restore(saved_state)
+
+ def notify(self, n=1):
+ __waiters = self.__dict__['_Condition__waiters']
+ if not __waiters:
+ return
+ if n == 1:
+ __waiters[0].release()
+ try:
+ __waiters.pop(0)
+ except IndexError:
+ pass
+ else:
+ waiters = __waiters[:n]
+ for waiter in waiters:
+ waiter.release()
+ try:
+ __waiters.remove(waiter)
+ except ValueError:
+ pass
+ # END handle n = 1 case faster
+
+class AsyncQueue(Queue):
+ """A queue using different condition objects to gain multithreading performance"""
+ def __init__(self, maxsize=0):
+ Queue.__init__(self, maxsize)
+
+ self.not_empty = HSCondition(self.mutex)
+ self.not_full = HSCondition(self.mutex)
+ self.all_tasks_done = HSCondition(self.mutex)
+
+
+#} END utilities