From 5d996892ac76199886ba3e2754ff9c9fac2456d6 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 00:32:33 +0200 Subject: test implementation of async-queue with everything stripped from it that didn't seem necessary - its a failure, something is wrong - performance not much better than the original one, its depending on the condition performance actually, which I don't get faster --- lib/git/async/util.py | 53 ++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 5 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 55766579..e3556c05 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -133,12 +133,55 @@ class HSCondition(_Condition): class AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance""" + __slots__ = ('mutex', 'not_empty', 'queue') + def __init__(self, maxsize=0): - Queue.__init__(self, maxsize) - + self.queue = deque() + self.mutex = Lock() self.not_empty = HSCondition(self.mutex) - self.not_full = HSCondition(self.mutex) - self.all_tasks_done = HSCondition(self.mutex) - + def qsize(self): + self.mutex.acquire() + try: + return len(self.queue) + finally: + self.mutex.release() + + def empty(self): + self.mutex.acquire() + try: + return not len(self.queue) + finally: + self.mutex.release() + + def put(self, item, block=True, timeout=None): + self.mutex.acquire() + self.queue.append(item) + self.mutex.release() + self.not_empty.notify() + + def get(self, block=True, timeout=None): + self.not_empty.acquire() + q = self.queue + try: + if not block: + if not len(q): + raise Empty + elif timeout is None: + while not len(q): + self.not_empty.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a positive number") + else: + endtime = _time() + timeout + while not len(q): + remaining = endtime - _time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + return q.popleft() + finally: + self.not_empty.release() + + #} END utilities -- cgit v1.2.1 From 09c3f39ceb545e1198ad7a3f470d4ec896ce1add Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 10:45:14 +0200 Subject: both versions of the async queue still have trouble in certain situations, at least with my totally overwritten version of the condition - the previous one was somewhat more stable it seems. Nonetheless, this is the fastest version so far --- lib/git/async/util.py | 77 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 56 insertions(+), 21 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/util.py b/lib/git/async/util.py index e3556c05..fb63ccaa 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -2,6 +2,8 @@ from threading import ( Lock, + current_thread, + _allocate_lock, _Condition, _sleep, _time, @@ -57,7 +59,7 @@ class SyncQueue(deque): """Adapter to allow using a deque like a queue, without locking""" def get(self, block=True, timeout=None): try: - return self.pop() + return self.popleft() except IndexError: raise Empty # END raise empty @@ -67,26 +69,45 @@ class SyncQueue(deque): put = deque.append - -class HSCondition(_Condition): + +class HSCondition(object): """An attempt to make conditions less blocking, which gains performance in return by sleeping less""" + __slots__ = ("acquire", "release", "_lock", '_waiters') delay = 0.00002 # reduces wait times, but increases overhead + def __init__(self, lock=None): + if lock is None: + lock = Lock() + self._lock = lock + self.acquire = lock.acquire + self.release = lock.release + self._waiters = list() + + def __release(self): + return self._lock.release() + + def __acquire(self, block=None): + if block is None: + self._lock.acquire() + else: + return self._lock.acquire(block) + def wait(self, timeout=None): - waiter = Lock() - waiter.acquire() - self.__dict__['_Condition__waiters'].append(waiter) - saved_state = self._release_save() + waiter = _allocate_lock() + waiter.acquire() # get it the first time, no blocking + self._waiters.append(waiter) + + # in the momemnt we release our lock, someone else might actually resume + self.release() try: # restore state no matter what (e.g., KeyboardInterrupt) + # now we block, as we hold the lock already if timeout is None: waiter.acquire() else: - # Balancing act: We can't afford a pure busy loop, so we - # have to sleep; but if we sleep the whole timeout time, - # we'll be unresponsive. The scheme here sleeps very - # little at first, longer as time goes on, but never longer - # than 20 times per second (or the timeout time remaining). + # Balancing act: We can't afford a pure busy loop, because of the + # GIL, so we have to sleep + # We try to sleep only tiny amounts of time though to be very responsive endtime = _time() + timeout delay = self.delay acquire = waiter.acquire @@ -104,34 +125,48 @@ class HSCondition(_Condition): # END endless loop if not gotit: try: - self.__dict__['_Condition__waiters'].remove(waiter) + self._waiters.remove(waiter) except ValueError: pass # END didn't ever get it finally: - self._acquire_restore(saved_state) + # reacquire the lock + self.acquire() def notify(self, n=1): - __waiters = self.__dict__['_Condition__waiters'] - if not __waiters: + if not self._waiters: return + waiters = self._waiters if n == 1: - __waiters[0].release() + waiters[0].release() try: - __waiters.pop(0) + waiters.pop(0) except IndexError: pass else: - waiters = __waiters[:n] - for waiter in waiters: + for waiter in waiters[:n]: waiter.release() try: - __waiters.remove(waiter) + waiters.remove(waiter) except ValueError: pass # END handle n = 1 case faster + def notify_all(self): + self.notify(len(self._waiters)) + + class AsyncQueue(Queue): + """A queue using different condition objects to gain multithreading performance""" + def __init__(self, maxsize=0): + Queue.__init__(self, maxsize) + + self.not_empty = HSCondition(self.mutex) + self.not_full = HSCondition(self.mutex) + self.all_tasks_done = HSCondition(self.mutex) + + +class _AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance""" __slots__ = ('mutex', 'not_empty', 'queue') -- cgit v1.2.1 From 3776f7a766851058f6435b9f606b16766425d7ca Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 13:24:44 +0200 Subject: The new channeldesign actually works, but it also shows that its located at the wrong spot. The channel is nothing more than an adapter allowing to read multiple items from a thread-safe queue, the queue itself though must be 'closable' for writing, or needs something like a writable flag. --- lib/git/async/channel.py | 108 ++++++++++++++++++++++++----------------------- lib/git/async/task.py | 6 +-- lib/git/async/util.py | 32 ++++++++------ 3 files changed, 79 insertions(+), 67 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 2d5ab79c..655024fe 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -4,7 +4,11 @@ from Queue import ( Full ) -from util import AsyncQueue +from util import ( + AsyncQueue, + DummyLock + ) + from time import time import sys @@ -23,12 +27,9 @@ class Channel(object): def __new__(cls, *args): if cls is Channel: - max_items = 0 - if len(args) == 1: - max_items = args[0] - if len(args) > 1: - raise ValueError("Specify not more than the number of items the channel should take") - wc = WChannel(max_items) + if len(args) > 0: + raise ValueError("Cannot take any arguments when creating a new channel") + wc = WChannel() rc = RChannel(wc) return wc, rc # END constructor mode @@ -39,11 +40,11 @@ class WChannel(Channel): """The write end of a channel""" __slots__ = ('_closed', '_queue') - def __init__(self, max_items=0): + def __init__(self): """initialize this instance, able to hold max_items at once Write calls will block if the channel is full, until someone reads from it""" self._closed = False - self._queue = AsyncQueue(max_items) + self._queue = AsyncQueue() #{ Interface @@ -74,7 +75,21 @@ class WChannel(Channel): def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" + mutex = self._queue.mutex + mutex.acquire() + # this is atomic already, due to the GIL - no need to get the queue's mutex + print "channel.close()" self._closed = True + # now make sure that the people waiting for an item are released now + # As we it could be that some readers are already on their way to initiate + # a blocking get, we must make sure that locks never block before that happens + + # now we are the only one accessing the queue, so change it + self._queue.mutex = DummyLock() + print self._queue.not_empty._waiters + self._queue.not_empty.notify_all() + print self._queue.not_empty._waiters + mutex.release() @property def closed(self): @@ -134,58 +149,47 @@ class RChannel(Channel): pass # END handle exceptions else: - # if we have really bad timing, the source of the channel - # marks itself closed, but before setting it, the thread - # switches to us. We read it, read False, and try to fetch - # something, and never return. The whole closed channel thing - # is not atomic ( of course ) - # This is why we never block for long, to get a chance to recheck - # for closed channels. - # We blend this into the timeout of the user - ourtimeout = 0.25 # the smaller, the more responsive, but the slower - wc = self._wc - timeout = (timeout is None and sys.maxint) or timeout # make sure we can compute with it - assert timeout != 0.0, "shouldn't block if timeout is 0" # okay safe - if timeout and ourtimeout > timeout: - ourtimeout = timeout - # END setup timeout - # to get everything into one loop, we set the count accordingly if count == 0: count = sys.maxint # END handle count + endtime = sys.maxint # allows timeout for whole operation + if timeout is not None: + endtime = time() + timeout + # could be improved by a separate: no-endtime branch, saving the time calls for i in xrange(count): - have_timeout = False - st = time() - while True: + try: + print "about to read", i, count, block, timeout + out.append(queue.get(block, timeout)) + print "got one" + except Empty: + pass + # END ignore empty + + # if we have been unblocked because the closed state changed + # in the meanwhile, stop trying + # NOTE: must NOT cache _wc + if self._wc.closed: + # its racing time - all threads waiting for the queue + # are awake now, and we actually can't be sure its empty + # Hence we pop it empty without blocking, getting as much + # as we can. This effectively lets us race ( with mutexes ) + # of the other threads. + print "stopped because it was closed" try: - if wc.closed: - have_timeout = True - # its about the 'in the meanwhile' :) - get everything - # we can in non-blocking mode. This will raise - try: - while True: - out.append(queue.get(False)) - # END until it raises Empty - except Empty: - break - # END finally, out of here - # END don't continue on closed channels - - # END abort reading if it was closed ( in the meanwhile ) - out.append(queue.get(block, ourtimeout)) - break # breakout right away + while True: + out.append(queue.get(False)) + # END pop it empty except Empty: - if timeout - (time() - st) <= 0: - # hitting timeout - have_timeout = True - break - # END abort if the user wants no more time spent here - # END handle timeout - # END endless timer loop - if have_timeout: + pass + # END ignore emptyness, we have all + break + # END handle cloased + + if time() >= endtime: + break # END stop on timeout # END for each item # END handle blocking diff --git a/lib/git/async/task.py b/lib/git/async/task.py index cf486f48..ce701c86 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -140,10 +140,10 @@ class OutputChannelTask(Node): # If we appear to be the only one left with our output channel, and are # closed ( this could have been set in another thread as well ), make # sure to close the output channel. - # The count is: 1 = wc itself, 2 = first reader channel, and we have only - # one, 3 is ours + x for every thread having its copy on the stack + # The count is: 1 = wc itself, 2 = first reader channel, + x for every + # thread having its copy on the stack # + 1 for the instance we provide to refcount - if self.is_done() and sys.getrefcount(self._out_wc) < 5: + if self.is_done() and sys.getrefcount(self._out_wc) < 4: self.close() # END handle channel closure #{ Configuration diff --git a/lib/git/async/util.py b/lib/git/async/util.py index fb63ccaa..01073f6d 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -73,21 +73,22 @@ class SyncQueue(deque): class HSCondition(object): """An attempt to make conditions less blocking, which gains performance in return by sleeping less""" - __slots__ = ("acquire", "release", "_lock", '_waiters') + # __slots__ = ("acquire", "release", "_lock", '_waiters') + __slots__ = ("_lock", '_waiters') delay = 0.00002 # reduces wait times, but increases overhead def __init__(self, lock=None): if lock is None: lock = Lock() self._lock = lock - self.acquire = lock.acquire - self.release = lock.release + #self.acquire = lock.acquire + #self.release = lock.release self._waiters = list() - def __release(self): + def release(self): return self._lock.release() - def __acquire(self, block=None): + def acquire(self, block=None): if block is None: self._lock.acquire() else: @@ -156,7 +157,7 @@ class HSCondition(object): self.notify(len(self._waiters)) -class AsyncQueue(Queue): +class _AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance""" def __init__(self, maxsize=0): Queue.__init__(self, maxsize) @@ -166,7 +167,7 @@ class AsyncQueue(Queue): self.all_tasks_done = HSCondition(self.mutex) -class _AsyncQueue(Queue): +class AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance""" __slots__ = ('mutex', 'not_empty', 'queue') @@ -194,9 +195,9 @@ class _AsyncQueue(Queue): self.queue.append(item) self.mutex.release() self.not_empty.notify() - + def get(self, block=True, timeout=None): - self.not_empty.acquire() + self.not_empty.acquire() # == self.mutex.acquire in that case q = self.queue try: if not block: @@ -205,16 +206,23 @@ class _AsyncQueue(Queue): elif timeout is None: while not len(q): self.not_empty.wait() - elif timeout < 0: - raise ValueError("'timeout' must be a positive number") else: + print "with timeout", timeout + import traceback + traceback.print_stack() endtime = _time() + timeout while not len(q): remaining = endtime - _time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) - return q.popleft() + # END handle block + # can happen if someone else woke us up + try: + return q.popleft() + except IndexError: + raise Empty + # END handle unblocking reason finally: self.not_empty.release() -- cgit v1.2.1 From 53152a824f5186452504f0b68306d10ebebee416 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 14:23:58 +0200 Subject: queue: adjusted queue to be closable ( without own testing yet, except for the pool which runs it ) - its not yet stable, but should be solvable. --- lib/git/async/channel.py | 44 ++++++++++--------------------- lib/git/async/util.py | 68 ++++++++++++++++++++++++++++++++++-------------- 2 files changed, 62 insertions(+), 50 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 655024fe..08323582 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -6,7 +6,6 @@ from Queue import ( from util import ( AsyncQueue, - DummyLock ) from time import time @@ -56,15 +55,13 @@ class WChannel(Channel): channel :param timeout: timeout in seconds for blocking calls. :raise IOError: when writing into closed file - :raise EOFError: when writing into a non-blocking full channel - :note: may block if the channel has a limited capacity""" - if self._closed: - raise IOError("Cannot write to a closed channel") - + :raise EOFError: when writing into a non-blocking full channel""" + # let the queue handle the 'closed' attribute, we write much more often + # to an open channel than to a closed one, saving a few cycles try: self._queue.put(item, block, timeout) - except Full: - raise EOFError("Capacity of the channel was exeeded") + except ReadOnly: + raise IOError("Cannot write to a closed channel") # END exception handling def size(self): @@ -75,21 +72,10 @@ class WChannel(Channel): def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" - mutex = self._queue.mutex - mutex.acquire() - # this is atomic already, due to the GIL - no need to get the queue's mutex - print "channel.close()" + # yes, close it a little too early, better than having anyone put + # additional items self._closed = True - # now make sure that the people waiting for an item are released now - # As we it could be that some readers are already on their way to initiate - # a blocking get, we must make sure that locks never block before that happens - - # now we are the only one accessing the queue, so change it - self._queue.mutex = DummyLock() - print self._queue.not_empty._waiters - self._queue.not_empty.notify_all() - print self._queue.not_empty._waiters - mutex.release() + self._queue.set_writable(False) @property def closed(self): @@ -124,6 +110,7 @@ class RChannel(Channel): If count was < 1, a list with all items that could be read will be returned.""" # if the channel is closed for writing, we never block + # NOTE: is handled by the queue if self._wc.closed or timeout == 0: block = False @@ -160,9 +147,7 @@ class RChannel(Channel): # could be improved by a separate: no-endtime branch, saving the time calls for i in xrange(count): try: - print "about to read", i, count, block, timeout out.append(queue.get(block, timeout)) - print "got one" except Empty: pass # END ignore empty @@ -176,7 +161,6 @@ class RChannel(Channel): # Hence we pop it empty without blocking, getting as much # as we can. This effectively lets us race ( with mutexes ) # of the other threads. - print "stopped because it was closed" try: while True: out.append(queue.get(False)) @@ -186,11 +170,11 @@ class RChannel(Channel): # END ignore emptyness, we have all break - # END handle cloased - - if time() >= endtime: - break - # END stop on timeout + # END handle channel cloased + + if time() >= endtime: + break + # END stop operation on timeout # END for each item # END handle blocking return out diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 01073f6d..51219cc4 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -166,15 +166,21 @@ class _AsyncQueue(Queue): self.not_full = HSCondition(self.mutex) self.all_tasks_done = HSCondition(self.mutex) - + +class ReadOnly(Exception): + """Thrown when trying to write to a read-only queue""" + class AsyncQueue(Queue): - """A queue using different condition objects to gain multithreading performance""" - __slots__ = ('mutex', 'not_empty', 'queue') + """A queue using different condition objects to gain multithreading performance. + Additionally it has a threadsafe writable flag, which will alert all readers + that there is nothing more to get here.""" + __slots__ = ('mutex', 'not_empty', 'queue', '_writable') def __init__(self, maxsize=0): self.queue = deque() self.mutex = Lock() self.not_empty = HSCondition(self.mutex) + self._writable = True def qsize(self): self.mutex.acquire() @@ -183,6 +189,29 @@ class AsyncQueue(Queue): finally: self.mutex.release() + def writable(self): + self.mutex.acquire() + try: + return self._writable + finally: + self.mutex.release() + + def set_writable(self, state): + """Set the writable flag of this queue to True or False + :return: The previous state""" + self.mutex.acquire() + try: + old = self._writable + self._writable = state + return old + finally: + # if we won't receive anymore items, inform the getters + if not state: + self.not_empty.notify_all() + # END tell everyone + self.mutex.release() + # END handle locking + def empty(self): self.mutex.acquire() try: @@ -192,6 +221,9 @@ class AsyncQueue(Queue): def put(self, item, block=True, timeout=None): self.mutex.acquire() + if not self._writable: + raise ReadOnly + # END handle read-only self.queue.append(item) self.mutex.release() self.not_empty.notify() @@ -200,24 +232,20 @@ class AsyncQueue(Queue): self.not_empty.acquire() # == self.mutex.acquire in that case q = self.queue try: - if not block: - if not len(q): - raise Empty - elif timeout is None: - while not len(q): - self.not_empty.wait() - else: - print "with timeout", timeout - import traceback - traceback.print_stack() - endtime = _time() + timeout - while not len(q): - remaining = endtime - _time() - if remaining <= 0.0: - raise Empty - self.not_empty.wait(remaining) + if block: + if timeout is None: + while not len(q) and self._writable: + self.not_empty.wait() + else: + endtime = _time() + timeout + while not len(q) and self._writable: + remaining = endtime - _time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + # END handle timeout mode # END handle block - # can happen if someone else woke us up + # can happen if we woke up because we are not writable anymore try: return q.popleft() except IndexError: -- cgit v1.2.1 From 619c11787742ce00a0ee8f841cec075897873c79 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 16:47:48 +0200 Subject: Its getting better already - intermediate commit before further chaning the task class --- lib/git/async/channel.py | 35 ++++++++++++++++++++--------------- lib/git/async/pool.py | 7 ++++--- lib/git/async/task.py | 2 ++ lib/git/async/thread.py | 3 ++- lib/git/async/util.py | 25 ++++++++++--------------- 5 files changed, 38 insertions(+), 34 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 08323582..5c52b1dc 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -6,6 +6,7 @@ from Queue import ( from util import ( AsyncQueue, + ReadOnly ) from time import time @@ -59,6 +60,7 @@ class WChannel(Channel): # let the queue handle the 'closed' attribute, we write much more often # to an open channel than to a closed one, saving a few cycles try: + print "putting item", item, id(self._queue.queue) self._queue.put(item, block, timeout) except ReadOnly: raise IOError("Cannot write to a closed channel") @@ -74,6 +76,7 @@ class WChannel(Channel): an error""" # yes, close it a little too early, better than having anyone put # additional items + print "closing channel", self self._closed = True self._queue.set_writable(False) @@ -102,7 +105,7 @@ class RChannel(Channel): :param count: given amount of items to read. If < 1, all items will be read :param block: if True, the call will block until an item is available :param timeout: if positive and block is True, it will block only for the - given amount of seconds. + given amount of seconds, returning the items it received so far. :return: single item in a list if count is 1, or a list of count items. If the channel was empty and count was 1, an empty list will be returned. If count was greater 1, a list with less than count items will be @@ -149,27 +152,29 @@ class RChannel(Channel): try: out.append(queue.get(block, timeout)) except Empty: - pass + # here we are only if there is nothing on the queue, + # and if we are blocking. If we are not blocking, this + # indiccates that the queue was set unwritable in the meanwhile. + # hence we can abort now to prevent reading (possibly) forever + # Besides, this is racy as all threads will rip on the channel + # without waiting until its empty + if not block: + break # END ignore empty # if we have been unblocked because the closed state changed # in the meanwhile, stop trying # NOTE: must NOT cache _wc if self._wc.closed: - # its racing time - all threads waiting for the queue - # are awake now, and we actually can't be sure its empty - # Hence we pop it empty without blocking, getting as much - # as we can. This effectively lets us race ( with mutexes ) - # of the other threads. - try: - while True: - out.append(queue.get(False)) - # END pop it empty - except Empty: - pass - # END ignore emptyness, we have all + # If we were closed, we drop out even if there might still + # be items. Now its time to get these items, according to + # our count. Just switch to unblocking mode. + # If we are to read unlimited items, this would run forever, + # but the EmptyException handler takes care of this + block = False - break + # we don't continue, but let the timer decide whether + # it wants to abort # END handle channel cloased if time() >= endtime: diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index d6b5711d..cf1c2199 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -80,8 +80,8 @@ class RPoolChannel(RChannel): # * make no assumptions if there are multiple consumers # * have_enough = False - if count > 0: - have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count + #if count > 0: + # have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count # END ########## prepare ############################## @@ -319,6 +319,7 @@ class Pool(object): def _del_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" + # 1 as its stored on the task, 1 for the getrefcount call if sys.getrefcount(task._out_wc) < 3: self.del_task(task) #} END internal @@ -403,7 +404,7 @@ class Pool(object): if not task in self._tasks.nodes: return self # END early abort - + print "deleting ", id(task) # the task we are currently deleting could also be processed by # a thread right now. We don't care about it as its taking care about # its write channel itself, and sends everything it can to it. diff --git a/lib/git/async/task.py b/lib/git/async/task.py index ce701c86..97521cae 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -88,6 +88,7 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) + print "task read", len(items) try: # increase the ref-count - we use this to determine whether anyone else # is currently handling our output channel. As this method runs asynchronously, @@ -117,6 +118,7 @@ class OutputChannelTask(Node): # END handle single apply except Exception, e: self._exc = e + print str(e) # TODO: REMOVE DEBUG, or make it use logging self.set_done() # unschedule all, we don't know how many have been produced actually # but only if we don't apply single please diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 4240a664..5faad4f8 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -124,6 +124,7 @@ class WorkerThread(TerminatableThread): def run(self): """Process input tasks until we receive the quit signal""" + gettask = self.inq.get while True: self._current_routine = None if self._should_terminate(): @@ -132,7 +133,7 @@ class WorkerThread(TerminatableThread): # don't wait too long, instead check for the termination request more often try: - tasktuple = self.inq.get(True, 1) + tasktuple = gettask(True, 0.25) except Queue.Empty: continue # END get task with timeout diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 51219cc4..6d09de59 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -63,7 +63,7 @@ class SyncQueue(deque): except IndexError: raise Empty # END raise empty - + def empty(self): return len(self) == 0 @@ -86,13 +86,13 @@ class HSCondition(object): self._waiters = list() def release(self): - return self._lock.release() + self._lock.release() def acquire(self, block=None): if block is None: self._lock.acquire() else: - return self._lock.acquire(block) + self._lock.acquire(block) def wait(self, timeout=None): waiter = _allocate_lock() @@ -145,6 +145,7 @@ class HSCondition(object): except IndexError: pass else: + print "notify", waiters, n for waiter in waiters[:n]: waiter.release() try: @@ -156,16 +157,6 @@ class HSCondition(object): def notify_all(self): self.notify(len(self._waiters)) - -class _AsyncQueue(Queue): - """A queue using different condition objects to gain multithreading performance""" - def __init__(self, maxsize=0): - Queue.__init__(self, maxsize) - - self.not_empty = HSCondition(self.mutex) - self.not_full = HSCondition(self.mutex) - self.all_tasks_done = HSCondition(self.mutex) - class ReadOnly(Exception): """Thrown when trying to write to a read-only queue""" @@ -205,11 +196,12 @@ class AsyncQueue(Queue): self._writable = state return old finally: + self.mutex.release() + # if we won't receive anymore items, inform the getters if not state: self.not_empty.notify_all() # END tell everyone - self.mutex.release() # END handle locking def empty(self): @@ -222,6 +214,7 @@ class AsyncQueue(Queue): def put(self, item, block=True, timeout=None): self.mutex.acquire() if not self._writable: + self.mutex.release() raise ReadOnly # END handle read-only self.queue.append(item) @@ -245,7 +238,9 @@ class AsyncQueue(Queue): self.not_empty.wait(remaining) # END handle timeout mode # END handle block - # can happen if we woke up because we are not writable anymore + + # can throw if we woke up because we are not writable anymore + print len(q), id(q), current_thread() try: return q.popleft() except IndexError: -- cgit v1.2.1 From 13dd59ba5b3228820841682b59bad6c22476ff66 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 17:25:43 +0200 Subject: task: now deletes itself once its done - for the test this doesn't change a thing as the task deletes itself too late - its time for a paradigm change, the task should be deleted with its RPoolChannel or explicitly by the user. The test needs to adapt, and shouldn't assume anything unless the RPoolChannel is gone --- lib/git/async/pool.py | 57 ++++++++++++++------------------------------------- lib/git/async/task.py | 22 ++++++++++++++------ 2 files changed, 31 insertions(+), 48 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index cf1c2199..fce5e424 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -78,14 +78,17 @@ class RPoolChannel(RChannel): # have an item, but its currently being produced by some worker. # This is why we: # * make no assumptions if there are multiple consumers - # * - have_enough = False + # * + + # if the user tries to use us to read from a done task, we will never + # compute as all produced items are already in the channel + skip_compute = self._task.is_done() or self._task.error() #if count > 0: - # have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count + # skip_compute = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count # END ########## prepare ############################## - if not have_enough: + if not skip_compute: self._pool._prepare_channel_read(self._task, count) # END prepare pool scheduling @@ -134,7 +137,6 @@ class Pool(object): used only from the main thread, hence you cannot consume their results from multiple threads unless you use a task for it.""" __slots__ = ( '_tasks', # a graph of tasks - '_consumed_tasks', # a queue with tasks that are done or had an error '_workers', # list of worker threads '_queue', # master queue for tasks '_taskorder_cache', # map task id -> ordered dependent tasks @@ -157,7 +159,6 @@ class Pool(object): def __init__(self, size=0): self._tasks = Graph() - self._consumed_tasks = None self._workers = list() self._queue = self.TaskQueueCls() self._taskgraph_lock = self.LockCls() @@ -224,8 +225,10 @@ class Pool(object): # requested one last for task in dfirst_tasks: if task.error() or task.is_done(): - self._consumed_tasks.put(task) - continue + # in theory, the should never be consumed task in the pool, right ? + # They delete themselves once they are done. + raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") + #continue # END skip processing # if the task does not have the required output on its queue, schedule @@ -297,26 +300,8 @@ class Pool(object): def _post_channel_read(self, task): """Called after we processed a read to cleanup""" - # check whether we consumed the task, and schedule it for deletion - # This could have happend after the read returned ( even though the pre-read - # checks it as well ) - if task.error() or task.is_done(): - self._consumed_tasks.put(task) - # END handle consumption - - self._handle_consumed_tasks() - - def _handle_consumed_tasks(self): - """Remove all consumed tasks from our queue by deleting them""" - try: - while True: - ct = self._consumed_tasks.get(False) - self.del_task(ct) - # END for each task to delete - except Empty: - pass - # END pop queue empty - + pass + def _del_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" # 1 as its stored on the task, 1 for the getrefcount call @@ -347,11 +332,6 @@ class Pool(object): # ourselves cur_count = len(self._workers) if cur_count < size: - # make sure we have a real queue, and can store our consumed tasks properly - if not isinstance(self._consumed_tasks, self.TaskQueueCls): - self._consumed_tasks = Queue() - # END init queue - for i in range(size - cur_count): worker = self.WorkerCls(self._queue) worker.start() @@ -377,9 +357,6 @@ class Pool(object): continue # END while there are tasks on the queue - if self._consumed_tasks and not self._consumed_tasks.empty(): - self._handle_consumed_tasks() - # END assure consumed tasks are empty self._consumed_tasks = SyncQueue() # END process queue return self @@ -437,11 +414,7 @@ class Pool(object): wc, rc = Channel() rc = RPoolChannel(wc, task, self) task.set_wc(wc) - - has_input_channel = isinstance(task, InputChannelTask) - if has_input_channel: - task.set_pool(self) - # END init input channel task + task.set_pool(self) self._taskgraph_lock.acquire() try: @@ -452,7 +425,7 @@ class Pool(object): # END sync task addition # If the input channel is one of our read channels, we add the relation - if has_input_channel: + if isinstance(task, InputChannelTask): ic = task.in_rc if isinstance(ic, RPoolChannel) and ic._pool is self: self._taskgraph_lock.acquire() diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 97521cae..dc207c33 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -23,6 +23,7 @@ class OutputChannelTask(Node): '_done', # True if we are done '_scheduled_items', # amount of scheduled items that will be processed in total '_slock', # lock for scheduled items + '_pool_ref', # to be set by Pool 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -84,6 +85,10 @@ class OutputChannelTask(Node): finally: self._slock.release() # END threadsafe return + + def set_pool(self, pool): + """Set our pool to the given one, it will be weakref'd""" + self._pool_ref = weakref.ref(pool) def process(self, count=0): """Process count items and send the result individually to the output channel""" @@ -147,6 +152,16 @@ class OutputChannelTask(Node): # + 1 for the instance we provide to refcount if self.is_done() and sys.getrefcount(self._out_wc) < 4: self.close() + # additionally, remove ourselves from the pool, this is thread-safe + # Previously the pool collected done tasks and removed them, + # but this could happen after a read finished, potentially + # leaving them on the queue until the read-handle was dropped. + # This should assure its more in-time. + # I don't like this back-ref. + pool = self._pool_ref() + if pool: + pool.del_task(self) + # END remove ourselves from the pool # END handle channel closure #{ Configuration @@ -204,8 +219,7 @@ class InputChannelTask(OutputChannelTask): For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" __slots__ = ( - 'in_rc', # channel to read items from - '_pool_ref' # to be set by Pool + 'in_rc' # channel to read items from ) def __init__(self, in_rc, *args, **kwargs): @@ -242,9 +256,5 @@ class InputChannelTask(OutputChannelTask): # and call it return OutputChannelTask.process(self, count) - - def set_pool(self, pool): - """Set our pool to the given one, it will be weakref'd""" - self._pool_ref = weakref.ref(pool) #{ Configuration -- cgit v1.2.1 From e5c0002d069382db1768349bf0c5ff40aafbf140 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 18:20:12 +0200 Subject: Revised task deletion works well, adjusted test to be creating new tasks all the time instead of reusing its own one, it was somewhat hard to manage its state over time and could cause bugs. It works okay, but it occasionally hangs, it appears to be an empty queue, have to gradually put certain things back in, although in the current mode of operation, it should never have empty queues from the pool to the user --- lib/git/async/pool.py | 25 ++++++++++++++++---- lib/git/async/task.py | 64 +++++++++------------------------------------------ 2 files changed, 31 insertions(+), 58 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index fce5e424..a915f7b0 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -41,8 +41,18 @@ class RPoolChannel(RChannel): def __del__(self): """Assures that our task will be deleted if we were the last reader""" - del(self._wc) # decrement ref-count - self._pool._del_task_if_orphaned(self._task) + del(self._wc) # decrement ref-count early + # now, if this is the last reader to the wc we just handled, there + # is no way anyone will ever read from the task again. If so, + # delete the task in question, it will take care of itself and orphans + # it might leave + # 1 is ourselves, + 1 for the call + 1, and 3 magical ones which + # I can't explain, but appears to be normal in the destructor + # On the caller side, getrefcount returns 2, as expected + if sys.getrefcount(self) < 6: + print "__del__" + self._pool.del_task(self._task) + print "done" def set_pre_cb(self, fun = lambda count: None): """Install a callback to call with the item count to be read before any @@ -105,7 +115,7 @@ class RPoolChannel(RChannel): ####### Finalize ######## self._pool._post_channel_read(self._task) - + return items #{ Internal @@ -227,6 +237,7 @@ class Pool(object): if task.error() or task.is_done(): # in theory, the should never be consumed task in the pool, right ? # They delete themselves once they are done. + # TODO: remove this check for performance later raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") #continue # END skip processing @@ -363,7 +374,11 @@ class Pool(object): def num_tasks(self): """:return: amount of tasks""" - return len(self._tasks.nodes) + self._taskgraph_lock.acquire() + try: + return len(self._tasks.nodes) + finally: + self._taskgraph_lock.release() def del_task(self, task): """Delete the task @@ -374,6 +389,7 @@ class Pool(object): This method blocks until all tasks to be removed have been processed, if they are currently being processed. :return: self""" + print "del_task: getting lock" self._taskgraph_lock.acquire() try: # it can be that the task is already deleted, but its chunk was on the @@ -414,7 +430,6 @@ class Pool(object): wc, rc = Channel() rc = RPoolChannel(wc, task, self) task.set_wc(wc) - task.set_pool(self) self._taskgraph_lock.acquire() try: diff --git a/lib/git/async/task.py b/lib/git/async/task.py index dc207c33..5edd40bb 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,10 +1,11 @@ from graph import Node import threading -import weakref import sys import new +getrefcount = sys.getrefcount + class OutputChannelTask(Node): """Abstracts a named task as part of a set of interdependent tasks, which contains additional information on how the task should be queued and processed. @@ -23,7 +24,6 @@ class OutputChannelTask(Node): '_done', # True if we are done '_scheduled_items', # amount of scheduled items that will be processed in total '_slock', # lock for scheduled items - '_pool_ref', # to be set by Pool 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -54,7 +54,7 @@ class OutputChannelTask(Node): def set_wc(self, wc): """Set the write channel to the given one :note: resets it done state in order to allow proper queue handling""" - self._done = False + self._done = False # TODO : fix this, this is a side-effect self._scheduled_items = 0 self._out_wc = wc @@ -86,10 +86,6 @@ class OutputChannelTask(Node): self._slock.release() # END threadsafe return - def set_pool(self, pool): - """Set our pool to the given one, it will be weakref'd""" - self._pool_ref = weakref.ref(pool) - def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) @@ -123,7 +119,7 @@ class OutputChannelTask(Node): # END handle single apply except Exception, e: self._exc = e - print str(e) # TODO: REMOVE DEBUG, or make it use logging + print "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging self.set_done() # unschedule all, we don't know how many have been produced actually # but only if we don't apply single please @@ -150,18 +146,8 @@ class OutputChannelTask(Node): # The count is: 1 = wc itself, 2 = first reader channel, + x for every # thread having its copy on the stack # + 1 for the instance we provide to refcount - if self.is_done() and sys.getrefcount(self._out_wc) < 4: + if self.is_done() and getrefcount(self._out_wc) < 4: self.close() - # additionally, remove ourselves from the pool, this is thread-safe - # Previously the pool collected done tasks and removed them, - # but this could happen after a read finished, potentially - # leaving them on the queue until the read-handle was dropped. - # This should assure its more in-time. - # I don't like this back-ref. - pool = self._pool_ref() - if pool: - pool.del_task(self) - # END remove ourselves from the pool # END handle channel closure #{ Configuration @@ -218,43 +204,15 @@ class InputChannelTask(OutputChannelTask): """Uses an input channel as source for reading items For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" - __slots__ = ( - 'in_rc' # channel to read items from - ) def __init__(self, in_rc, *args, **kwargs): OutputChannelTask.__init__(self, *args, **kwargs) - self._in_rc = in_rc - + self._read = in_rc.read + def process(self, count=1): - """Verify our setup, and do some additional checking, before the - base implementation can permanently perform all operations""" - self._read = self._in_rc.read - # make sure we don't trigger the pool if we read from a pool channel which - # belongs to our own pool. Channels from different pools are fine though, - # there we want to trigger its computation - # PROBLEM: if the user keeps an end, but decides to put the same end into - # a task of this pool, then all items might deplete without new ones being - # produced, causing a deadlock. Just triggering the pool would be better, - # but cost's more, unnecessarily if there is just one consumer, which is - # the user. - # * could encode usage in the channel type, and fail if the refcount on - # the read-pool channel is too high - # * maybe keep track of the elements that are requested or in-production - # for each task, which would allow to precisely determine whether - # the pool as to be triggered, and bail out early. Problem would - # be the - # * Perhaps one shouldn't seek the perfect solution , but instead - # document whats working and what not, or under which conditions. - # The whole system is simple, but gets more complicated the - # smarter it wants to be. - if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref(): - self._read = self._in_rc._read - - # permanently install our base for processing - self.process = new.instancemethod(OutputChannelTask.__dict__['process'], self, type(self)) - - # and call it - return OutputChannelTask.process(self, count) + # for now, just blindly read our input, could trigger a pool, even + # ours, but why not ? It should be able to handle this + # TODO: remove this method + super(InputChannelTask, self).process(count) #{ Configuration -- cgit v1.2.1 From 772b95631916223e472989b43f3a31f61e237f31 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 19:25:33 +0200 Subject: workerthread: adjusted to use a blocking queue, it will receive termination events only with its queue, with boosts performance into brigt green levels --- lib/git/async/channel.py | 3 +-- lib/git/async/pool.py | 56 ++++++++++++++++++++++++++++-------------------- lib/git/async/task.py | 2 +- lib/git/async/thread.py | 24 +++++++++++++++------ lib/git/async/util.py | 1 - 5 files changed, 52 insertions(+), 34 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 5c52b1dc..c05f7383 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -60,7 +60,6 @@ class WChannel(Channel): # let the queue handle the 'closed' attribute, we write much more often # to an open channel than to a closed one, saving a few cycles try: - print "putting item", item, id(self._queue.queue) self._queue.put(item, block, timeout) except ReadOnly: raise IOError("Cannot write to a closed channel") @@ -76,7 +75,7 @@ class WChannel(Channel): an error""" # yes, close it a little too early, better than having anyone put # additional items - print "closing channel", self + # print "closing channel", self self._closed = True self._queue.set_writable(False) diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index a915f7b0..1767c61c 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,5 +1,8 @@ """Implementation of a thread-pool working with channels""" -from thread import WorkerThread +from thread import ( + WorkerThread, + StopProcessing, + ) from threading import Lock from util import ( @@ -147,7 +150,7 @@ class Pool(object): used only from the main thread, hence you cannot consume their results from multiple threads unless you use a task for it.""" __slots__ = ( '_tasks', # a graph of tasks - '_workers', # list of worker threads + '_num_workers', # list of workers '_queue', # master queue for tasks '_taskorder_cache', # map task id -> ordered dependent tasks '_taskgraph_lock', # lock for accessing the task graph @@ -169,7 +172,7 @@ class Pool(object): def __init__(self, size=0): self._tasks = Graph() - self._workers = list() + self._num_workers = 0 self._queue = self.TaskQueueCls() self._taskgraph_lock = self.LockCls() self._taskorder_cache = dict() @@ -270,7 +273,7 @@ class Pool(object): # into the loop would be less code, but ... slower # DEBUG # print actual_count, numchunks, chunksize, remainder, task._out_wc.size() - if self._workers: + if self._num_workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task queue = self._queue @@ -323,7 +326,7 @@ class Pool(object): #{ Interface def size(self): """:return: amount of workers in the pool""" - return len(self._workers) + return self._num_workers def set_size(self, size=0): """Set the amount of workers to use in this pool. When reducing the size, @@ -341,34 +344,41 @@ class Pool(object): # either start new threads, or kill existing ones. # If we end up with no threads, we process the remaining chunks on the queue # ourselves - cur_count = len(self._workers) + cur_count = self._num_workers if cur_count < size: - for i in range(size - cur_count): - worker = self.WorkerCls(self._queue) - worker.start() - self._workers.append(worker) - # END for each new worker to create - elif cur_count > size: # we can safely increase the size, even from serial mode, as we would # only be able to do this if the serial ( sync ) mode finished processing. # Just adding more workers is not a problem at all. + add_count = size - cur_count + for i in range(add_count): + print "Add worker" + self.WorkerCls(self._queue).start() + # END for each new worker to create + self._num_workers += add_count + elif cur_count > size: + # We don't care which thread exactly gets hit by our stop request + # On their way, they will consume remaining tasks, but new ones + # could be added as we speak. del_count = cur_count - size for i in range(del_count): - self._workers[i].stop_and_join() + print "stop worker" + self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter # END for each thread to stop - del(self._workers[:del_count]) + self._num_workers -= del_count # END handle count if size == 0: - while not self._queue.empty(): - try: - taskmethod, count = self._queue.get(False) - taskmethod(count) - except Queue.Empty: - continue - # END while there are tasks on the queue - - self._consumed_tasks = SyncQueue() + # NOTE: we do not preocess any tasks still on the queue, as we ill + # naturally do that once we read the next time, only on the tasks + # that are actually required. The queue will keep the tasks, + # and once we are deleted, they will vanish without additional + # time spend on them. If there shouldn't be any consumers anyway. + # If we should reenable some workers again, they will continue on the + # remaining tasks, probably with nothing to do. + # We can't clear the task queue if we have removed workers + # as they will receive the termination signal through it, and if + # we had added workers, we wouldn't be here ;). + pass # END process queue return self diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 5edd40bb..f9536a45 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -89,7 +89,7 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) - print "task read", len(items) + # print "task read", len(items) try: # increase the ref-count - we use this to determine whether anyone else # is currently handling our output channel. As this method runs asynchronously, diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 5faad4f8..556b7e92 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -80,6 +80,10 @@ class TerminatableThread(threading.Thread): self._terminated() #} END interface + +class StopProcessing(Exception): + """If thrown in a function processed by a WorkerThread, it will terminate""" + class WorkerThread(TerminatableThread): """ This base allows to call functions on class instances natively. @@ -122,6 +126,11 @@ class WorkerThread(TerminatableThread): self.inq = inq or Queue.Queue() self._current_routine = None # routine we execute right now + @classmethod + def stop(cls, *args): + """If send via the inq of the thread, it will stop once it processed the function""" + raise StopProcessing + def run(self): """Process input tasks until we receive the quit signal""" gettask = self.inq.get @@ -131,12 +140,8 @@ class WorkerThread(TerminatableThread): break # END check for stop request - # don't wait too long, instead check for the termination request more often - try: - tasktuple = gettask(True, 0.25) - except Queue.Empty: - continue - # END get task with timeout + # we wait and block - to terminate, send the 'stop' method + tasktuple = gettask() # needing exactly one function, and one arg assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need" @@ -158,6 +163,8 @@ class WorkerThread(TerminatableThread): print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) break # END make routine call + except StopProcessing: + break except Exception,e: print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) break # abort ... @@ -168,5 +175,8 @@ class WorkerThread(TerminatableThread): """:return: routine we are currently executing, or None if we have no task""" return self._current_routine - + def stop_and_join(self): + """Send stop message to ourselves""" + self.inq.put((self.stop, None)) + super(WorkerThread, self).stop_and_join() #} END classes diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 6d09de59..f3213ed6 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -240,7 +240,6 @@ class AsyncQueue(Queue): # END handle block # can throw if we woke up because we are not writable anymore - print len(q), id(q), current_thread() try: return q.popleft() except IndexError: -- cgit v1.2.1