From 5d996892ac76199886ba3e2754ff9c9fac2456d6 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 00:32:33 +0200 Subject: test implementation of async-queue with everything stripped from it that didn't seem necessary - its a failure, something is wrong - performance not much better than the original one, its depending on the condition performance actually, which I don't get faster --- lib/git/async/util.py | 53 ++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 5 deletions(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 55766579..e3556c05 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -133,12 +133,55 @@ class HSCondition(_Condition): class AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance""" + __slots__ = ('mutex', 'not_empty', 'queue') + def __init__(self, maxsize=0): - Queue.__init__(self, maxsize) - + self.queue = deque() + self.mutex = Lock() self.not_empty = HSCondition(self.mutex) - self.not_full = HSCondition(self.mutex) - self.all_tasks_done = HSCondition(self.mutex) - + def qsize(self): + self.mutex.acquire() + try: + return len(self.queue) + finally: + self.mutex.release() + + def empty(self): + self.mutex.acquire() + try: + return not len(self.queue) + finally: + self.mutex.release() + + def put(self, item, block=True, timeout=None): + self.mutex.acquire() + self.queue.append(item) + self.mutex.release() + self.not_empty.notify() + + def get(self, block=True, timeout=None): + self.not_empty.acquire() + q = self.queue + try: + if not block: + if not len(q): + raise Empty + elif timeout is None: + while not len(q): + self.not_empty.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a positive number") + else: + endtime = _time() + timeout + while not len(q): + remaining = endtime - _time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + return q.popleft() + finally: + self.not_empty.release() + + #} END utilities -- cgit v1.2.1 From 09c3f39ceb545e1198ad7a3f470d4ec896ce1add Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 10:45:14 +0200 Subject: both versions of the async queue still have trouble in certain situations, at least with my totally overwritten version of the condition - the previous one was somewhat more stable it seems. Nonetheless, this is the fastest version so far --- lib/git/async/util.py | 77 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 56 insertions(+), 21 deletions(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index e3556c05..fb63ccaa 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -2,6 +2,8 @@ from threading import ( Lock, + current_thread, + _allocate_lock, _Condition, _sleep, _time, @@ -57,7 +59,7 @@ class SyncQueue(deque): """Adapter to allow using a deque like a queue, without locking""" def get(self, block=True, timeout=None): try: - return self.pop() + return self.popleft() except IndexError: raise Empty # END raise empty @@ -67,26 +69,45 @@ class SyncQueue(deque): put = deque.append - -class HSCondition(_Condition): + +class HSCondition(object): """An attempt to make conditions less blocking, which gains performance in return by sleeping less""" + __slots__ = ("acquire", "release", "_lock", '_waiters') delay = 0.00002 # reduces wait times, but increases overhead + def __init__(self, lock=None): + if lock is None: + lock = Lock() + self._lock = lock + self.acquire = lock.acquire + self.release = lock.release + self._waiters = list() + + def __release(self): + return self._lock.release() + + def __acquire(self, block=None): + if block is None: + self._lock.acquire() + else: + return self._lock.acquire(block) + def wait(self, timeout=None): - waiter = Lock() - waiter.acquire() - self.__dict__['_Condition__waiters'].append(waiter) - saved_state = self._release_save() + waiter = _allocate_lock() + waiter.acquire() # get it the first time, no blocking + self._waiters.append(waiter) + + # in the momemnt we release our lock, someone else might actually resume + self.release() try: # restore state no matter what (e.g., KeyboardInterrupt) + # now we block, as we hold the lock already if timeout is None: waiter.acquire() else: - # Balancing act: We can't afford a pure busy loop, so we - # have to sleep; but if we sleep the whole timeout time, - # we'll be unresponsive. The scheme here sleeps very - # little at first, longer as time goes on, but never longer - # than 20 times per second (or the timeout time remaining). + # Balancing act: We can't afford a pure busy loop, because of the + # GIL, so we have to sleep + # We try to sleep only tiny amounts of time though to be very responsive endtime = _time() + timeout delay = self.delay acquire = waiter.acquire @@ -104,34 +125,48 @@ class HSCondition(_Condition): # END endless loop if not gotit: try: - self.__dict__['_Condition__waiters'].remove(waiter) + self._waiters.remove(waiter) except ValueError: pass # END didn't ever get it finally: - self._acquire_restore(saved_state) + # reacquire the lock + self.acquire() def notify(self, n=1): - __waiters = self.__dict__['_Condition__waiters'] - if not __waiters: + if not self._waiters: return + waiters = self._waiters if n == 1: - __waiters[0].release() + waiters[0].release() try: - __waiters.pop(0) + waiters.pop(0) except IndexError: pass else: - waiters = __waiters[:n] - for waiter in waiters: + for waiter in waiters[:n]: waiter.release() try: - __waiters.remove(waiter) + waiters.remove(waiter) except ValueError: pass # END handle n = 1 case faster + def notify_all(self): + self.notify(len(self._waiters)) + + class AsyncQueue(Queue): + """A queue using different condition objects to gain multithreading performance""" + def __init__(self, maxsize=0): + Queue.__init__(self, maxsize) + + self.not_empty = HSCondition(self.mutex) + self.not_full = HSCondition(self.mutex) + self.all_tasks_done = HSCondition(self.mutex) + + +class _AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance""" __slots__ = ('mutex', 'not_empty', 'queue') -- cgit v1.2.1 From 3776f7a766851058f6435b9f606b16766425d7ca Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 13:24:44 +0200 Subject: The new channeldesign actually works, but it also shows that its located at the wrong spot. The channel is nothing more than an adapter allowing to read multiple items from a thread-safe queue, the queue itself though must be 'closable' for writing, or needs something like a writable flag. --- lib/git/async/util.py | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index fb63ccaa..01073f6d 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -73,21 +73,22 @@ class SyncQueue(deque): class HSCondition(object): """An attempt to make conditions less blocking, which gains performance in return by sleeping less""" - __slots__ = ("acquire", "release", "_lock", '_waiters') + # __slots__ = ("acquire", "release", "_lock", '_waiters') + __slots__ = ("_lock", '_waiters') delay = 0.00002 # reduces wait times, but increases overhead def __init__(self, lock=None): if lock is None: lock = Lock() self._lock = lock - self.acquire = lock.acquire - self.release = lock.release + #self.acquire = lock.acquire + #self.release = lock.release self._waiters = list() - def __release(self): + def release(self): return self._lock.release() - def __acquire(self, block=None): + def acquire(self, block=None): if block is None: self._lock.acquire() else: @@ -156,7 +157,7 @@ class HSCondition(object): self.notify(len(self._waiters)) -class AsyncQueue(Queue): +class _AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance""" def __init__(self, maxsize=0): Queue.__init__(self, maxsize) @@ -166,7 +167,7 @@ class AsyncQueue(Queue): self.all_tasks_done = HSCondition(self.mutex) -class _AsyncQueue(Queue): +class AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance""" __slots__ = ('mutex', 'not_empty', 'queue') @@ -194,9 +195,9 @@ class _AsyncQueue(Queue): self.queue.append(item) self.mutex.release() self.not_empty.notify() - + def get(self, block=True, timeout=None): - self.not_empty.acquire() + self.not_empty.acquire() # == self.mutex.acquire in that case q = self.queue try: if not block: @@ -205,16 +206,23 @@ class _AsyncQueue(Queue): elif timeout is None: while not len(q): self.not_empty.wait() - elif timeout < 0: - raise ValueError("'timeout' must be a positive number") else: + print "with timeout", timeout + import traceback + traceback.print_stack() endtime = _time() + timeout while not len(q): remaining = endtime - _time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) - return q.popleft() + # END handle block + # can happen if someone else woke us up + try: + return q.popleft() + except IndexError: + raise Empty + # END handle unblocking reason finally: self.not_empty.release() -- cgit v1.2.1 From 53152a824f5186452504f0b68306d10ebebee416 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 14:23:58 +0200 Subject: queue: adjusted queue to be closable ( without own testing yet, except for the pool which runs it ) - its not yet stable, but should be solvable. --- lib/git/async/util.py | 68 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 48 insertions(+), 20 deletions(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 01073f6d..51219cc4 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -166,15 +166,21 @@ class _AsyncQueue(Queue): self.not_full = HSCondition(self.mutex) self.all_tasks_done = HSCondition(self.mutex) - + +class ReadOnly(Exception): + """Thrown when trying to write to a read-only queue""" + class AsyncQueue(Queue): - """A queue using different condition objects to gain multithreading performance""" - __slots__ = ('mutex', 'not_empty', 'queue') + """A queue using different condition objects to gain multithreading performance. + Additionally it has a threadsafe writable flag, which will alert all readers + that there is nothing more to get here.""" + __slots__ = ('mutex', 'not_empty', 'queue', '_writable') def __init__(self, maxsize=0): self.queue = deque() self.mutex = Lock() self.not_empty = HSCondition(self.mutex) + self._writable = True def qsize(self): self.mutex.acquire() @@ -183,6 +189,29 @@ class AsyncQueue(Queue): finally: self.mutex.release() + def writable(self): + self.mutex.acquire() + try: + return self._writable + finally: + self.mutex.release() + + def set_writable(self, state): + """Set the writable flag of this queue to True or False + :return: The previous state""" + self.mutex.acquire() + try: + old = self._writable + self._writable = state + return old + finally: + # if we won't receive anymore items, inform the getters + if not state: + self.not_empty.notify_all() + # END tell everyone + self.mutex.release() + # END handle locking + def empty(self): self.mutex.acquire() try: @@ -192,6 +221,9 @@ class AsyncQueue(Queue): def put(self, item, block=True, timeout=None): self.mutex.acquire() + if not self._writable: + raise ReadOnly + # END handle read-only self.queue.append(item) self.mutex.release() self.not_empty.notify() @@ -200,24 +232,20 @@ class AsyncQueue(Queue): self.not_empty.acquire() # == self.mutex.acquire in that case q = self.queue try: - if not block: - if not len(q): - raise Empty - elif timeout is None: - while not len(q): - self.not_empty.wait() - else: - print "with timeout", timeout - import traceback - traceback.print_stack() - endtime = _time() + timeout - while not len(q): - remaining = endtime - _time() - if remaining <= 0.0: - raise Empty - self.not_empty.wait(remaining) + if block: + if timeout is None: + while not len(q) and self._writable: + self.not_empty.wait() + else: + endtime = _time() + timeout + while not len(q) and self._writable: + remaining = endtime - _time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + # END handle timeout mode # END handle block - # can happen if someone else woke us up + # can happen if we woke up because we are not writable anymore try: return q.popleft() except IndexError: -- cgit v1.2.1 From 619c11787742ce00a0ee8f841cec075897873c79 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 16:47:48 +0200 Subject: Its getting better already - intermediate commit before further chaning the task class --- lib/git/async/util.py | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 51219cc4..6d09de59 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -63,7 +63,7 @@ class SyncQueue(deque): except IndexError: raise Empty # END raise empty - + def empty(self): return len(self) == 0 @@ -86,13 +86,13 @@ class HSCondition(object): self._waiters = list() def release(self): - return self._lock.release() + self._lock.release() def acquire(self, block=None): if block is None: self._lock.acquire() else: - return self._lock.acquire(block) + self._lock.acquire(block) def wait(self, timeout=None): waiter = _allocate_lock() @@ -145,6 +145,7 @@ class HSCondition(object): except IndexError: pass else: + print "notify", waiters, n for waiter in waiters[:n]: waiter.release() try: @@ -156,16 +157,6 @@ class HSCondition(object): def notify_all(self): self.notify(len(self._waiters)) - -class _AsyncQueue(Queue): - """A queue using different condition objects to gain multithreading performance""" - def __init__(self, maxsize=0): - Queue.__init__(self, maxsize) - - self.not_empty = HSCondition(self.mutex) - self.not_full = HSCondition(self.mutex) - self.all_tasks_done = HSCondition(self.mutex) - class ReadOnly(Exception): """Thrown when trying to write to a read-only queue""" @@ -205,11 +196,12 @@ class AsyncQueue(Queue): self._writable = state return old finally: + self.mutex.release() + # if we won't receive anymore items, inform the getters if not state: self.not_empty.notify_all() # END tell everyone - self.mutex.release() # END handle locking def empty(self): @@ -222,6 +214,7 @@ class AsyncQueue(Queue): def put(self, item, block=True, timeout=None): self.mutex.acquire() if not self._writable: + self.mutex.release() raise ReadOnly # END handle read-only self.queue.append(item) @@ -245,7 +238,9 @@ class AsyncQueue(Queue): self.not_empty.wait(remaining) # END handle timeout mode # END handle block - # can happen if we woke up because we are not writable anymore + + # can throw if we woke up because we are not writable anymore + print len(q), id(q), current_thread() try: return q.popleft() except IndexError: -- cgit v1.2.1 From 772b95631916223e472989b43f3a31f61e237f31 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 19:25:33 +0200 Subject: workerthread: adjusted to use a blocking queue, it will receive termination events only with its queue, with boosts performance into brigt green levels --- lib/git/async/util.py | 1 - 1 file changed, 1 deletion(-) (limited to 'lib/git/async/util.py') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 6d09de59..f3213ed6 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -240,7 +240,6 @@ class AsyncQueue(Queue): # END handle block # can throw if we woke up because we are not writable anymore - print len(q), id(q), current_thread() try: return q.popleft() except IndexError: -- cgit v1.2.1