From b3cde0ee162b8f0cb67da981311c8f9c16050a62 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 18:13:21 +0200 Subject: First step of testing the pool - tasks have been separated into a new module including own tests, their design improved to prepare them for some specifics that would be needed for multiprocessing support --- lib/git/async/util.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 lib/git/async/util.py (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py new file mode 100644 index 00000000..dabd8a42 --- /dev/null +++ b/lib/git/async/util.py @@ -0,0 +1,24 @@ +"""Module with utilities related to async operations""" + +import sys +import os + +def cpu_count(): + """:return:number of CPUs in the system + :note: inspired by multiprocessing""" + num = 0 + try: + if sys.platform == 'win32': + num = int(os.environ['NUMBER_OF_PROCESSORS']) + elif 'bsd' in sys.platform or sys.platform == 'darwin': + num = int(os.popen('sysctl -n hw.ncpu').read()) + else: + num = os.sysconf('SC_NPROCESSORS_ONLN') + except (ValueError, KeyError, OSError, AttributeError): + pass + # END exception handling + + if num == 0: + raise NotImplementedError('cannot determine number of cpus') + + return num -- cgit v1.2.1 From 583cd8807259a69fc01874b798f657c1f9ab7828 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 19:12:44 +0200 Subject: Moved pool utilities into util module, fixed critical issue that caused havok - lets call this a safe-state --- lib/git/async/util.py | 106 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index dabd8a42..432d1736 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -1,8 +1,23 @@ """Module with utilities related to async operations""" +from threading import ( + Lock, + _Condition, + _sleep, + _time, + ) + +from Queue import ( + Queue, + Empty, + ) + +from collections import deque import sys import os +#{ Routines + def cpu_count(): """:return:number of CPUs in the system :note: inspired by multiprocessing""" @@ -22,3 +37,94 @@ def cpu_count(): raise NotImplementedError('cannot determine number of cpus') return num + +#} END routines + + +class SyncQueue(deque): + """Adapter to allow using a deque like a queue, without locking""" + def get(self, block=True, timeout=None): + try: + return self.pop() + except IndexError: + raise Empty + # END raise empty + + def empty(self): + return len(self) == 0 + + put = deque.append + + +class HSCondition(_Condition): + """An attempt to make conditions less blocking, which gains performance + in return by sleeping less""" + delay = 0.00002 # reduces wait times, but increases overhead + + def wait(self, timeout=None): + waiter = Lock() + waiter.acquire() + self.__dict__['_Condition__waiters'].append(waiter) + saved_state = self._release_save() + try: # restore state no matter what (e.g., KeyboardInterrupt) + if timeout is None: + waiter.acquire() + else: + # Balancing act: We can't afford a pure busy loop, so we + # have to sleep; but if we sleep the whole timeout time, + # we'll be unresponsive. The scheme here sleeps very + # little at first, longer as time goes on, but never longer + # than 20 times per second (or the timeout time remaining). + endtime = _time() + timeout + delay = self.delay + acquire = waiter.acquire + while True: + gotit = acquire(0) + if gotit: + break + remaining = endtime - _time() + if remaining <= 0: + break + delay = min(delay * 2, remaining, .05) + _sleep(delay) + # END endless loop + if not gotit: + try: + self.__dict__['_Condition__waiters'].remove(waiter) + except ValueError: + pass + # END didn't ever get it + finally: + self._acquire_restore(saved_state) + + def notify(self, n=1): + __waiters = self.__dict__['_Condition__waiters'] + if not __waiters: + return + if n == 1: + __waiters[0].release() + try: + __waiters.pop(0) + except IndexError: + pass + else: + waiters = __waiters[:n] + for waiter in waiters: + waiter.release() + try: + __waiters.remove(waiter) + except ValueError: + pass + # END handle n = 1 case faster + +class AsyncQueue(Queue): + """A queue using different condition objects to gain multithreading performance""" + def __init__(self, maxsize=0): + Queue.__init__(self, maxsize) + + self.not_empty = HSCondition(self.mutex) + self.not_full = HSCondition(self.mutex) + self.all_tasks_done = HSCondition(self.mutex) + + +#} END utilities -- cgit v1.2.1 From be06e87433685b5ea9cfcc131ab89c56cf8292f2 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 22:00:47 +0200 Subject: improved testing to test the actual async handling of the pool. there are still inconsistencies that need to be fixed, but it already improved, especially the 4-thread performance which now is as fast as the dual-threaded performance --- lib/git/async/util.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 432d1736..85d44694 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -59,7 +59,7 @@ class SyncQueue(deque): class HSCondition(_Condition): """An attempt to make conditions less blocking, which gains performance in return by sleeping less""" - delay = 0.00002 # reduces wait times, but increases overhead + delay = 0.00005 # reduces wait times, but increases overhead def wait(self, timeout=None): waiter = Lock() @@ -85,7 +85,9 @@ class HSCondition(_Condition): remaining = endtime - _time() if remaining <= 0: break - delay = min(delay * 2, remaining, .05) + # this makes 4 threads working as good as two, but of course + # it causes more frequent micro-sleeping + #delay = min(delay * 2, remaining, .05) _sleep(delay) # END endless loop if not gotit: -- cgit v1.2.1 From def0f73989047c4ddf9b11da05ad2c9c8e387331 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 23:20:37 +0200 Subject: introduced a new counter keeping track of the scheduled tasks - this prevent unnecessary tasks to be scheduled as we keep track of how many items will be produced for the task at hand. This introduces additional locking, but performns well in multithreaded mode. Performance of the master queue is still a huge issue, its currently the limiting factor, as bypassing the master queue in serial moode gives 15x performance, wich is what I would need --- lib/git/async/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 85d44694..6bd8a4e8 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -59,7 +59,7 @@ class SyncQueue(deque): class HSCondition(_Condition): """An attempt to make conditions less blocking, which gains performance in return by sleeping less""" - delay = 0.00005 # reduces wait times, but increases overhead + delay = 0.0001 # reduces wait times, but increases overhead def wait(self, timeout=None): waiter = Lock() -- cgit v1.2.1 From 898d47d1711accdfded8ee470520fdb96fb12d46 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 23:47:06 +0200 Subject: Task scheduled items lock now uses a dummy lock in serial mode, improving its performance considerably. Channels now use the AsyncQueue, boosting their throughput to about 5k items / s - this is something one can work with, considering the runtime of each item should be large enough to keep the threads busy. This could be a basis, further testing needed --- lib/git/async/util.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 6bd8a4e8..55766579 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -41,6 +41,18 @@ def cpu_count(): #} END routines + +class DummyLock(object): + """An object providing a do-nothing lock interface for use in sync mode""" + __slots__ = tuple() + + def acquire(self): + pass + + def release(self): + pass + + class SyncQueue(deque): """Adapter to allow using a deque like a queue, without locking""" def get(self, block=True, timeout=None): @@ -59,7 +71,7 @@ class SyncQueue(deque): class HSCondition(_Condition): """An attempt to make conditions less blocking, which gains performance in return by sleeping less""" - delay = 0.0001 # reduces wait times, but increases overhead + delay = 0.00002 # reduces wait times, but increases overhead def wait(self, timeout=None): waiter = Lock() -- cgit v1.2.1 From 5d996892ac76199886ba3e2754ff9c9fac2456d6 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 00:32:33 +0200 Subject: test implementation of async-queue with everything stripped from it that didn't seem necessary - its a failure, something is wrong - performance not much better than the original one, its depending on the condition performance actually, which I don't get faster --- lib/git/async/util.py | 53 ++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 5 deletions(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 55766579..e3556c05 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -133,12 +133,55 @@ class HSCondition(_Condition): class AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance""" + __slots__ = ('mutex', 'not_empty', 'queue') + def __init__(self, maxsize=0): - Queue.__init__(self, maxsize) - + self.queue = deque() + self.mutex = Lock() self.not_empty = HSCondition(self.mutex) - self.not_full = HSCondition(self.mutex) - self.all_tasks_done = HSCondition(self.mutex) - + def qsize(self): + self.mutex.acquire() + try: + return len(self.queue) + finally: + self.mutex.release() + + def empty(self): + self.mutex.acquire() + try: + return not len(self.queue) + finally: + self.mutex.release() + + def put(self, item, block=True, timeout=None): + self.mutex.acquire() + self.queue.append(item) + self.mutex.release() + self.not_empty.notify() + + def get(self, block=True, timeout=None): + self.not_empty.acquire() + q = self.queue + try: + if not block: + if not len(q): + raise Empty + elif timeout is None: + while not len(q): + self.not_empty.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a positive number") + else: + endtime = _time() + timeout + while not len(q): + remaining = endtime - _time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + return q.popleft() + finally: + self.not_empty.release() + + #} END utilities -- cgit v1.2.1 From 09c3f39ceb545e1198ad7a3f470d4ec896ce1add Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 10:45:14 +0200 Subject: both versions of the async queue still have trouble in certain situations, at least with my totally overwritten version of the condition - the previous one was somewhat more stable it seems. Nonetheless, this is the fastest version so far --- lib/git/async/util.py | 77 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 56 insertions(+), 21 deletions(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index e3556c05..fb63ccaa 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -2,6 +2,8 @@ from threading import ( Lock, + current_thread, + _allocate_lock, _Condition, _sleep, _time, @@ -57,7 +59,7 @@ class SyncQueue(deque): """Adapter to allow using a deque like a queue, without locking""" def get(self, block=True, timeout=None): try: - return self.pop() + return self.popleft() except IndexError: raise Empty # END raise empty @@ -67,26 +69,45 @@ class SyncQueue(deque): put = deque.append - -class HSCondition(_Condition): + +class HSCondition(object): """An attempt to make conditions less blocking, which gains performance in return by sleeping less""" + __slots__ = ("acquire", "release", "_lock", '_waiters') delay = 0.00002 # reduces wait times, but increases overhead + def __init__(self, lock=None): + if lock is None: + lock = Lock() + self._lock = lock + self.acquire = lock.acquire + self.release = lock.release + self._waiters = list() + + def __release(self): + return self._lock.release() + + def __acquire(self, block=None): + if block is None: + self._lock.acquire() + else: + return self._lock.acquire(block) + def wait(self, timeout=None): - waiter = Lock() - waiter.acquire() - self.__dict__['_Condition__waiters'].append(waiter) - saved_state = self._release_save() + waiter = _allocate_lock() + waiter.acquire() # get it the first time, no blocking + self._waiters.append(waiter) + + # in the momemnt we release our lock, someone else might actually resume + self.release() try: # restore state no matter what (e.g., KeyboardInterrupt) + # now we block, as we hold the lock already if timeout is None: waiter.acquire() else: - # Balancing act: We can't afford a pure busy loop, so we - # have to sleep; but if we sleep the whole timeout time, - # we'll be unresponsive. The scheme here sleeps very - # little at first, longer as time goes on, but never longer - # than 20 times per second (or the timeout time remaining). + # Balancing act: We can't afford a pure busy loop, because of the + # GIL, so we have to sleep + # We try to sleep only tiny amounts of time though to be very responsive endtime = _time() + timeout delay = self.delay acquire = waiter.acquire @@ -104,34 +125,48 @@ class HSCondition(_Condition): # END endless loop if not gotit: try: - self.__dict__['_Condition__waiters'].remove(waiter) + self._waiters.remove(waiter) except ValueError: pass # END didn't ever get it finally: - self._acquire_restore(saved_state) + # reacquire the lock + self.acquire() def notify(self, n=1): - __waiters = self.__dict__['_Condition__waiters'] - if not __waiters: + if not self._waiters: return + waiters = self._waiters if n == 1: - __waiters[0].release() + waiters[0].release() try: - __waiters.pop(0) + waiters.pop(0) except IndexError: pass else: - waiters = __waiters[:n] - for waiter in waiters: + for waiter in waiters[:n]: waiter.release() try: - __waiters.remove(waiter) + waiters.remove(waiter) except ValueError: pass # END handle n = 1 case faster + def notify_all(self): + self.notify(len(self._waiters)) + + class AsyncQueue(Queue): + """A queue using different condition objects to gain multithreading performance""" + def __init__(self, maxsize=0): + Queue.__init__(self, maxsize) + + self.not_empty = HSCondition(self.mutex) + self.not_full = HSCondition(self.mutex) + self.all_tasks_done = HSCondition(self.mutex) + + +class _AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance""" __slots__ = ('mutex', 'not_empty', 'queue') -- cgit v1.2.1 From 3776f7a766851058f6435b9f606b16766425d7ca Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 13:24:44 +0200 Subject: The new channeldesign actually works, but it also shows that its located at the wrong spot. The channel is nothing more than an adapter allowing to read multiple items from a thread-safe queue, the queue itself though must be 'closable' for writing, or needs something like a writable flag. --- lib/git/async/util.py | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index fb63ccaa..01073f6d 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -73,21 +73,22 @@ class SyncQueue(deque): class HSCondition(object): """An attempt to make conditions less blocking, which gains performance in return by sleeping less""" - __slots__ = ("acquire", "release", "_lock", '_waiters') + # __slots__ = ("acquire", "release", "_lock", '_waiters') + __slots__ = ("_lock", '_waiters') delay = 0.00002 # reduces wait times, but increases overhead def __init__(self, lock=None): if lock is None: lock = Lock() self._lock = lock - self.acquire = lock.acquire - self.release = lock.release + #self.acquire = lock.acquire + #self.release = lock.release self._waiters = list() - def __release(self): + def release(self): return self._lock.release() - def __acquire(self, block=None): + def acquire(self, block=None): if block is None: self._lock.acquire() else: @@ -156,7 +157,7 @@ class HSCondition(object): self.notify(len(self._waiters)) -class AsyncQueue(Queue): +class _AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance""" def __init__(self, maxsize=0): Queue.__init__(self, maxsize) @@ -166,7 +167,7 @@ class AsyncQueue(Queue): self.all_tasks_done = HSCondition(self.mutex) -class _AsyncQueue(Queue): +class AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance""" __slots__ = ('mutex', 'not_empty', 'queue') @@ -194,9 +195,9 @@ class _AsyncQueue(Queue): self.queue.append(item) self.mutex.release() self.not_empty.notify() - + def get(self, block=True, timeout=None): - self.not_empty.acquire() + self.not_empty.acquire() # == self.mutex.acquire in that case q = self.queue try: if not block: @@ -205,16 +206,23 @@ class _AsyncQueue(Queue): elif timeout is None: while not len(q): self.not_empty.wait() - elif timeout < 0: - raise ValueError("'timeout' must be a positive number") else: + print "with timeout", timeout + import traceback + traceback.print_stack() endtime = _time() + timeout while not len(q): remaining = endtime - _time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) - return q.popleft() + # END handle block + # can happen if someone else woke us up + try: + return q.popleft() + except IndexError: + raise Empty + # END handle unblocking reason finally: self.not_empty.release() -- cgit v1.2.1 From 53152a824f5186452504f0b68306d10ebebee416 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 14:23:58 +0200 Subject: queue: adjusted queue to be closable ( without own testing yet, except for the pool which runs it ) - its not yet stable, but should be solvable. --- lib/git/async/util.py | 68 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 48 insertions(+), 20 deletions(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 01073f6d..51219cc4 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -166,15 +166,21 @@ class _AsyncQueue(Queue): self.not_full = HSCondition(self.mutex) self.all_tasks_done = HSCondition(self.mutex) - + +class ReadOnly(Exception): + """Thrown when trying to write to a read-only queue""" + class AsyncQueue(Queue): - """A queue using different condition objects to gain multithreading performance""" - __slots__ = ('mutex', 'not_empty', 'queue') + """A queue using different condition objects to gain multithreading performance. + Additionally it has a threadsafe writable flag, which will alert all readers + that there is nothing more to get here.""" + __slots__ = ('mutex', 'not_empty', 'queue', '_writable') def __init__(self, maxsize=0): self.queue = deque() self.mutex = Lock() self.not_empty = HSCondition(self.mutex) + self._writable = True def qsize(self): self.mutex.acquire() @@ -183,6 +189,29 @@ class AsyncQueue(Queue): finally: self.mutex.release() + def writable(self): + self.mutex.acquire() + try: + return self._writable + finally: + self.mutex.release() + + def set_writable(self, state): + """Set the writable flag of this queue to True or False + :return: The previous state""" + self.mutex.acquire() + try: + old = self._writable + self._writable = state + return old + finally: + # if we won't receive anymore items, inform the getters + if not state: + self.not_empty.notify_all() + # END tell everyone + self.mutex.release() + # END handle locking + def empty(self): self.mutex.acquire() try: @@ -192,6 +221,9 @@ class AsyncQueue(Queue): def put(self, item, block=True, timeout=None): self.mutex.acquire() + if not self._writable: + raise ReadOnly + # END handle read-only self.queue.append(item) self.mutex.release() self.not_empty.notify() @@ -200,24 +232,20 @@ class AsyncQueue(Queue): self.not_empty.acquire() # == self.mutex.acquire in that case q = self.queue try: - if not block: - if not len(q): - raise Empty - elif timeout is None: - while not len(q): - self.not_empty.wait() - else: - print "with timeout", timeout - import traceback - traceback.print_stack() - endtime = _time() + timeout - while not len(q): - remaining = endtime - _time() - if remaining <= 0.0: - raise Empty - self.not_empty.wait(remaining) + if block: + if timeout is None: + while not len(q) and self._writable: + self.not_empty.wait() + else: + endtime = _time() + timeout + while not len(q) and self._writable: + remaining = endtime - _time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + # END handle timeout mode # END handle block - # can happen if someone else woke us up + # can happen if we woke up because we are not writable anymore try: return q.popleft() except IndexError: -- cgit v1.2.1 From 619c11787742ce00a0ee8f841cec075897873c79 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 16:47:48 +0200 Subject: Its getting better already - intermediate commit before further chaning the task class --- lib/git/async/util.py | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 51219cc4..6d09de59 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -63,7 +63,7 @@ class SyncQueue(deque): except IndexError: raise Empty # END raise empty - + def empty(self): return len(self) == 0 @@ -86,13 +86,13 @@ class HSCondition(object): self._waiters = list() def release(self): - return self._lock.release() + self._lock.release() def acquire(self, block=None): if block is None: self._lock.acquire() else: - return self._lock.acquire(block) + self._lock.acquire(block) def wait(self, timeout=None): waiter = _allocate_lock() @@ -145,6 +145,7 @@ class HSCondition(object): except IndexError: pass else: + print "notify", waiters, n for waiter in waiters[:n]: waiter.release() try: @@ -156,16 +157,6 @@ class HSCondition(object): def notify_all(self): self.notify(len(self._waiters)) - -class _AsyncQueue(Queue): - """A queue using different condition objects to gain multithreading performance""" - def __init__(self, maxsize=0): - Queue.__init__(self, maxsize) - - self.not_empty = HSCondition(self.mutex) - self.not_full = HSCondition(self.mutex) - self.all_tasks_done = HSCondition(self.mutex) - class ReadOnly(Exception): """Thrown when trying to write to a read-only queue""" @@ -205,11 +196,12 @@ class AsyncQueue(Queue): self._writable = state return old finally: + self.mutex.release() + # if we won't receive anymore items, inform the getters if not state: self.not_empty.notify_all() # END tell everyone - self.mutex.release() # END handle locking def empty(self): @@ -222,6 +214,7 @@ class AsyncQueue(Queue): def put(self, item, block=True, timeout=None): self.mutex.acquire() if not self._writable: + self.mutex.release() raise ReadOnly # END handle read-only self.queue.append(item) @@ -245,7 +238,9 @@ class AsyncQueue(Queue): self.not_empty.wait(remaining) # END handle timeout mode # END handle block - # can happen if we woke up because we are not writable anymore + + # can throw if we woke up because we are not writable anymore + print len(q), id(q), current_thread() try: return q.popleft() except IndexError: -- cgit v1.2.1 From 772b95631916223e472989b43f3a31f61e237f31 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 19:25:33 +0200 Subject: workerthread: adjusted to use a blocking queue, it will receive termination events only with its queue, with boosts performance into brigt green levels --- lib/git/async/util.py | 1 - 1 file changed, 1 deletion(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 6d09de59..f3213ed6 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -240,7 +240,6 @@ class AsyncQueue(Queue): # END handle block # can throw if we woke up because we are not writable anymore - print len(q), id(q), current_thread() try: return q.popleft() except IndexError: -- cgit v1.2.1 From 15941ca090a2c3c987324fc911bbc6f89e941c47 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 10:34:12 +0200 Subject: queue: fixed critical bug in the notify method, as it was not at all thread-safe, causing locks to be released multiple times. Now it runs very fast, and very stable apparently. Now its about putting previous features back in, and studying their results, before more complex task graphs can be examined --- lib/git/async/util.py | 50 +++++++++++++++++++++++++++++++------------------- 1 file changed, 31 insertions(+), 19 deletions(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index f3213ed6..dff38f58 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -71,18 +71,15 @@ class SyncQueue(deque): class HSCondition(object): - """An attempt to make conditions less blocking, which gains performance - in return by sleeping less""" - # __slots__ = ("acquire", "release", "_lock", '_waiters') + """Cleaned up code of the original condition object in order + to make it run and respond faster.""" __slots__ = ("_lock", '_waiters') - delay = 0.00002 # reduces wait times, but increases overhead + delay = 0.0002 # reduces wait times, but increases overhead def __init__(self, lock=None): if lock is None: lock = Lock() self._lock = lock - #self.acquire = lock.acquire - #self.release = lock.release self._waiters = list() def release(self): @@ -109,6 +106,8 @@ class HSCondition(object): # Balancing act: We can't afford a pure busy loop, because of the # GIL, so we have to sleep # We try to sleep only tiny amounts of time though to be very responsive + # NOTE: this branch is not used by the async system anyway, but + # will be hit when the user reads with timeout endtime = _time() + timeout delay = self.delay acquire = waiter.acquire @@ -133,25 +132,36 @@ class HSCondition(object): finally: # reacquire the lock self.acquire() + # END assure release lock def notify(self, n=1): + """Its vital that this method is threadsafe - to be fast we don'd get a lock, + but instead rely on pseudo-atomic operations that come with the GIL. + Hence we use pop in the n=1 case to be truly atomic. + In the multi-notify case, we acquire a lock just for safety, as otherwise + we might pop too much of someone else notifies n waiters as well, which + would in the worst case lead to double-releases of locks.""" if not self._waiters: return - waiters = self._waiters if n == 1: - waiters[0].release() + # so here we assume this is thead-safe ! It wouldn't be in any other + # language, but python it is. try: - waiters.pop(0) + self._waiters.pop(0).release() except IndexError: pass else: - print "notify", waiters, n - for waiter in waiters[:n]: - waiter.release() - try: - waiters.remove(waiter) - except ValueError: - pass + self.acquire() + # once the waiter resumes, he will want to acquire the lock + # and waits again, but only until we are done, which is important + # to do that in a thread-safe fashion + try: + for i in range(min(n, len(self._waiters))): + self._waiters.pop(0).release() + # END for each waiter to resume + finally: + self.release() + # END assure we release our lock # END handle n = 1 case faster def notify_all(self): @@ -164,7 +174,8 @@ class ReadOnly(Exception): class AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance. Additionally it has a threadsafe writable flag, which will alert all readers - that there is nothing more to get here.""" + that there is nothing more to get here. + All default-queue code was cleaned up for performance.""" __slots__ = ('mutex', 'not_empty', 'queue', '_writable') def __init__(self, maxsize=0): @@ -222,7 +233,7 @@ class AsyncQueue(Queue): self.not_empty.notify() def get(self, block=True, timeout=None): - self.not_empty.acquire() # == self.mutex.acquire in that case + self.mutex.acquire() q = self.queue try: if block: @@ -246,7 +257,8 @@ class AsyncQueue(Queue): raise Empty # END handle unblocking reason finally: - self.not_empty.release() + self.mutex.release() + # END assure lock is released #} END utilities -- cgit v1.2.1 From f2c8d26d3b25b864ad48e6de018757266b59f708 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 11:28:37 +0200 Subject: thread: fixed initialization problem if an empty iterable was handed in queue: Queue now derives from deque directly, which safes one dict lookup as the queue does not need to be accessed through self anymore pool test improved to better verify threads are started correctly --- lib/git/async/util.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index dff38f58..b5e1a0c0 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -10,7 +10,6 @@ from threading import ( ) from Queue import ( - Queue, Empty, ) @@ -171,15 +170,14 @@ class HSCondition(object): class ReadOnly(Exception): """Thrown when trying to write to a read-only queue""" -class AsyncQueue(Queue): +class AsyncQueue(deque): """A queue using different condition objects to gain multithreading performance. Additionally it has a threadsafe writable flag, which will alert all readers that there is nothing more to get here. All default-queue code was cleaned up for performance.""" - __slots__ = ('mutex', 'not_empty', 'queue', '_writable') + __slots__ = ('mutex', 'not_empty', '_writable') def __init__(self, maxsize=0): - self.queue = deque() self.mutex = Lock() self.not_empty = HSCondition(self.mutex) self._writable = True @@ -187,7 +185,7 @@ class AsyncQueue(Queue): def qsize(self): self.mutex.acquire() try: - return len(self.queue) + return len(self) finally: self.mutex.release() @@ -218,7 +216,7 @@ class AsyncQueue(Queue): def empty(self): self.mutex.acquire() try: - return not len(self.queue) + return not len(self) finally: self.mutex.release() @@ -228,21 +226,20 @@ class AsyncQueue(Queue): self.mutex.release() raise ReadOnly # END handle read-only - self.queue.append(item) + self.append(item) self.mutex.release() self.not_empty.notify() def get(self, block=True, timeout=None): self.mutex.acquire() - q = self.queue try: if block: if timeout is None: - while not len(q) and self._writable: + while not len(self) and self._writable: self.not_empty.wait() else: endtime = _time() + timeout - while not len(q) and self._writable: + while not len(self) and self._writable: remaining = endtime - _time() if remaining <= 0.0: raise Empty @@ -252,7 +249,7 @@ class AsyncQueue(Queue): # can throw if we woke up because we are not writable anymore try: - return q.popleft() + return self.popleft() except IndexError: raise Empty # END handle unblocking reason -- cgit v1.2.1 From 2054561da184955c4be4a92f0b4fa5c5c1c01350 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 11:31:24 +0200 Subject: HSCondition: using a deck to store waiters, for further speedup --- lib/git/async/util.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index b5e1a0c0..2c18a1b9 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -79,7 +79,7 @@ class HSCondition(object): if lock is None: lock = Lock() self._lock = lock - self._waiters = list() + self._waiters = deque() def release(self): self._lock.release() @@ -146,7 +146,7 @@ class HSCondition(object): # so here we assume this is thead-safe ! It wouldn't be in any other # language, but python it is. try: - self._waiters.pop(0).release() + self._waiters.popleft().release() except IndexError: pass else: @@ -156,7 +156,7 @@ class HSCondition(object): # to do that in a thread-safe fashion try: for i in range(min(n, len(self._waiters))): - self._waiters.pop(0).release() + self._waiters.popleft().release() # END for each waiter to resume finally: self.release() -- cgit v1.2.1 From 1090701721888474d34f8a4af28ee1bb1c3fdaaa Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 11:35:41 +0200 Subject: HSCondition: now deriving from deque, as the AsyncQeue does, to elimitate one more level of indirection. Clearly this not good from a design standpoint, as a Condition is no Deque, but it helps speeding things up which is what this is about. Could make it a hidden class to indicate how 'special' it is --- lib/git/async/util.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 2c18a1b9..ffdb14a2 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -69,17 +69,16 @@ class SyncQueue(deque): put = deque.append -class HSCondition(object): +class HSCondition(deque): """Cleaned up code of the original condition object in order to make it run and respond faster.""" - __slots__ = ("_lock", '_waiters') + __slots__ = ("_lock") delay = 0.0002 # reduces wait times, but increases overhead def __init__(self, lock=None): if lock is None: lock = Lock() self._lock = lock - self._waiters = deque() def release(self): self._lock.release() @@ -93,7 +92,7 @@ class HSCondition(object): def wait(self, timeout=None): waiter = _allocate_lock() waiter.acquire() # get it the first time, no blocking - self._waiters.append(waiter) + self.append(waiter) # in the momemnt we release our lock, someone else might actually resume self.release() @@ -124,7 +123,7 @@ class HSCondition(object): # END endless loop if not gotit: try: - self._waiters.remove(waiter) + self.remove(waiter) except ValueError: pass # END didn't ever get it @@ -140,13 +139,13 @@ class HSCondition(object): In the multi-notify case, we acquire a lock just for safety, as otherwise we might pop too much of someone else notifies n waiters as well, which would in the worst case lead to double-releases of locks.""" - if not self._waiters: + if not self: return if n == 1: # so here we assume this is thead-safe ! It wouldn't be in any other # language, but python it is. try: - self._waiters.popleft().release() + self.popleft().release() except IndexError: pass else: @@ -155,8 +154,8 @@ class HSCondition(object): # and waits again, but only until we are done, which is important # to do that in a thread-safe fashion try: - for i in range(min(n, len(self._waiters))): - self._waiters.popleft().release() + for i in range(min(n, len(self))): + self.popleft().release() # END for each waiter to resume finally: self.release() @@ -164,7 +163,7 @@ class HSCondition(object): # END handle n = 1 case faster def notify_all(self): - self.notify(len(self._waiters)) + self.notify(len(self)) class ReadOnly(Exception): -- cgit v1.2.1 From a988e6985849e4f6a561b4a5468d525c25ce74fe Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 11:45:25 +0200 Subject: HSCondition: now gets a lock even in the single-notify case, as it was required due to the non-atomiciy of the invovled operation. Removed one level of indirection for the lock, by refraining from calling my own 'wrapper' methods, which brought it back to the performance it had before the locking was introduced for the n==1 case --- lib/git/async/util.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index ffdb14a2..008e60a3 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -95,7 +95,7 @@ class HSCondition(deque): self.append(waiter) # in the momemnt we release our lock, someone else might actually resume - self.release() + self._lock.release() try: # restore state no matter what (e.g., KeyboardInterrupt) # now we block, as we hold the lock already if timeout is None: @@ -129,7 +129,7 @@ class HSCondition(deque): # END didn't ever get it finally: # reacquire the lock - self.acquire() + self._lock.acquire() # END assure release lock def notify(self, n=1): @@ -144,12 +144,23 @@ class HSCondition(deque): if n == 1: # so here we assume this is thead-safe ! It wouldn't be in any other # language, but python it is. + # But ... its two objects here - first the popleft, then the relasecall. + # If the timing is really really bad, and that happens if you let it + # run often enough ( its a matter of statistics ), this will fail, + # which is why we lock it. + # And yes, this causes some slow down, as single notifications happen + # alot + self._lock.acquire() try: - self.popleft().release() - except IndexError: - pass + try: + self.popleft().release() + except IndexError: + pass + finally: + self._lock.release() + # END assure lock is released else: - self.acquire() + self._lock.acquire() # once the waiter resumes, he will want to acquire the lock # and waits again, but only until we are done, which is important # to do that in a thread-safe fashion @@ -158,7 +169,7 @@ class HSCondition(deque): self.popleft().release() # END for each waiter to resume finally: - self.release() + self._lock.release() # END assure we release our lock # END handle n = 1 case faster -- cgit v1.2.1 From 4e6bece08aea01859a232e99a1e1ad8cc1eb7d36 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 14:01:51 +0200 Subject: HSCondition: Fixed terrible bug which it inherited from its default python Condition implementation, related to the notify method not being treadsafe. Although I was aware of it, I missed the first check which tests for the size - the result could be incorrect if the whole method wasn't locked. Testing runs stable now, allowing to move on \! --- lib/git/async/util.py | 47 +++++++++++++++-------------------------------- 1 file changed, 15 insertions(+), 32 deletions(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 008e60a3..2f46d55f 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -133,45 +133,28 @@ class HSCondition(deque): # END assure release lock def notify(self, n=1): - """Its vital that this method is threadsafe - to be fast we don'd get a lock, - but instead rely on pseudo-atomic operations that come with the GIL. - Hence we use pop in the n=1 case to be truly atomic. - In the multi-notify case, we acquire a lock just for safety, as otherwise - we might pop too much of someone else notifies n waiters as well, which - would in the worst case lead to double-releases of locks.""" - if not self: - return - if n == 1: - # so here we assume this is thead-safe ! It wouldn't be in any other - # language, but python it is. - # But ... its two objects here - first the popleft, then the relasecall. - # If the timing is really really bad, and that happens if you let it - # run often enough ( its a matter of statistics ), this will fail, - # which is why we lock it. - # And yes, this causes some slow down, as single notifications happen - # alot - self._lock.acquire() - try: + """Its vital that this method is threadsafe - we absolutely have to + get a lock at the beginning of this method to be sure we get the + correct amount of waiters back. If we bail out, although a waiter + is about to be added, it will miss its wakeup notification, and block + forever (possibly)""" + self._lock.acquire() + try: + if not self: # len(self) == 0, but this should be faster + return + if n == 1: try: self.popleft().release() except IndexError: pass - finally: - self._lock.release() - # END assure lock is released - else: - self._lock.acquire() - # once the waiter resumes, he will want to acquire the lock - # and waits again, but only until we are done, which is important - # to do that in a thread-safe fashion - try: + else: for i in range(min(n, len(self))): self.popleft().release() # END for each waiter to resume - finally: - self._lock.release() - # END assure we release our lock - # END handle n = 1 case faster + # END handle n = 1 case faster + finally: + self._lock.release() + # END assure lock is released def notify_all(self): self.notify(len(self)) -- cgit v1.2.1 From 57a4e09294230a36cc874a6272c71757c48139f2 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 15:29:47 +0200 Subject: Channel: removed pseudoconstructor, which clearly improves the design and makes it easier to constomize pool: in serial mode, created channels will be serial-only, which brings 15% of performance --- lib/git/async/util.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 2f46d55f..00d0dbab 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -66,7 +66,14 @@ class SyncQueue(deque): def empty(self): return len(self) == 0 - put = deque.append + def set_writable(self, state): + pass + + def writable(self): + return True + + def put(self, item, block=True, timeout=None): + self.append(item) class HSCondition(deque): -- cgit v1.2.1 From 3323464f85b986cba23176271da92a478b33ab9c Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 00:24:49 +0200 Subject: messy first version of a properly working depth-first graph method, which allows the pool to work as expected. Many more tests need to be added, and there still is a problem with shutdown as sometimes it won't kill all threads, mainly because the process came up with worker threads started, which cannot be --- lib/git/async/util.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 00d0dbab..b7750b0b 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -206,7 +206,6 @@ class AsyncQueue(deque): return old finally: self.mutex.release() - # if we won't receive anymore items, inform the getters if not state: self.not_empty.notify_all() @@ -222,6 +221,13 @@ class AsyncQueue(deque): def put(self, item, block=True, timeout=None): self.mutex.acquire() + # NOTE: we explicitly do NOT check for our writable state + # Its just used as a notification signal, and we need to be able + # to continue writing to prevent threads ( easily ) from failing + # to write their computed results, which we want in fact + # NO: we want them to fail and stop processing, as the one who caused + # the channel to close had a reason and wants the threads to + # stop on the task as soon as possible if not self._writable: self.mutex.release() raise ReadOnly -- cgit v1.2.1 From 6d1212e8c412b0b4802bc1080d38d54907db879d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 11:52:01 +0200 Subject: IMPORTANT: sometimes, when notifying waiters by releasing their lock, the lock is not actually released or they are not actually notifyied, staying in a beautysleep. This glitch is probably caused by some detail not treated correctly in the thread python module, which is something we cannot fix. It works most of the time as expected though - maybe some cleanup is not done correctly which causes this --- lib/git/async/util.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index b7750b0b..11ab75a6 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -101,10 +101,12 @@ class HSCondition(deque): waiter.acquire() # get it the first time, no blocking self.append(waiter) - # in the momemnt we release our lock, someone else might actually resume - self._lock.release() - try: # restore state no matter what (e.g., KeyboardInterrupt) + + try: + # restore state no matter what (e.g., KeyboardInterrupt) # now we block, as we hold the lock already + # in the momemnt we release our lock, someone else might actually resume + self._lock.release() if timeout is None: waiter.acquire() else: -- cgit v1.2.1 From e14e3f143e7260de9581aee27e5a9b2645db72de Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 16:42:09 +0200 Subject: Removed commented-out debug code and additional debug printings. Verified it works on py2.4, 2.5 and 2.6 --- lib/git/async/util.py | 1 - 1 file changed, 1 deletion(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 11ab75a6..4c4f3929 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -2,7 +2,6 @@ from threading import ( Lock, - current_thread, _allocate_lock, _Condition, _sleep, -- cgit v1.2.1