From 3776f7a766851058f6435b9f606b16766425d7ca Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 13:24:44 +0200 Subject: The new channeldesign actually works, but it also shows that its located at the wrong spot. The channel is nothing more than an adapter allowing to read multiple items from a thread-safe queue, the queue itself though must be 'closable' for writing, or needs something like a writable flag. --- lib/git/async/channel.py | 108 ++++++++++++++++++++++++----------------------- 1 file changed, 56 insertions(+), 52 deletions(-) (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 2d5ab79c..655024fe 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -4,7 +4,11 @@ from Queue import ( Full ) -from util import AsyncQueue +from util import ( + AsyncQueue, + DummyLock + ) + from time import time import sys @@ -23,12 +27,9 @@ class Channel(object): def __new__(cls, *args): if cls is Channel: - max_items = 0 - if len(args) == 1: - max_items = args[0] - if len(args) > 1: - raise ValueError("Specify not more than the number of items the channel should take") - wc = WChannel(max_items) + if len(args) > 0: + raise ValueError("Cannot take any arguments when creating a new channel") + wc = WChannel() rc = RChannel(wc) return wc, rc # END constructor mode @@ -39,11 +40,11 @@ class WChannel(Channel): """The write end of a channel""" __slots__ = ('_closed', '_queue') - def __init__(self, max_items=0): + def __init__(self): """initialize this instance, able to hold max_items at once Write calls will block if the channel is full, until someone reads from it""" self._closed = False - self._queue = AsyncQueue(max_items) + self._queue = AsyncQueue() #{ Interface @@ -74,7 +75,21 @@ class WChannel(Channel): def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" + mutex = self._queue.mutex + mutex.acquire() + # this is atomic already, due to the GIL - no need to get the queue's mutex + print "channel.close()" self._closed = True + # now make sure that the people waiting for an item are released now + # As we it could be that some readers are already on their way to initiate + # a blocking get, we must make sure that locks never block before that happens + + # now we are the only one accessing the queue, so change it + self._queue.mutex = DummyLock() + print self._queue.not_empty._waiters + self._queue.not_empty.notify_all() + print self._queue.not_empty._waiters + mutex.release() @property def closed(self): @@ -134,58 +149,47 @@ class RChannel(Channel): pass # END handle exceptions else: - # if we have really bad timing, the source of the channel - # marks itself closed, but before setting it, the thread - # switches to us. We read it, read False, and try to fetch - # something, and never return. The whole closed channel thing - # is not atomic ( of course ) - # This is why we never block for long, to get a chance to recheck - # for closed channels. - # We blend this into the timeout of the user - ourtimeout = 0.25 # the smaller, the more responsive, but the slower - wc = self._wc - timeout = (timeout is None and sys.maxint) or timeout # make sure we can compute with it - assert timeout != 0.0, "shouldn't block if timeout is 0" # okay safe - if timeout and ourtimeout > timeout: - ourtimeout = timeout - # END setup timeout - # to get everything into one loop, we set the count accordingly if count == 0: count = sys.maxint # END handle count + endtime = sys.maxint # allows timeout for whole operation + if timeout is not None: + endtime = time() + timeout + # could be improved by a separate: no-endtime branch, saving the time calls for i in xrange(count): - have_timeout = False - st = time() - while True: + try: + print "about to read", i, count, block, timeout + out.append(queue.get(block, timeout)) + print "got one" + except Empty: + pass + # END ignore empty + + # if we have been unblocked because the closed state changed + # in the meanwhile, stop trying + # NOTE: must NOT cache _wc + if self._wc.closed: + # its racing time - all threads waiting for the queue + # are awake now, and we actually can't be sure its empty + # Hence we pop it empty without blocking, getting as much + # as we can. This effectively lets us race ( with mutexes ) + # of the other threads. + print "stopped because it was closed" try: - if wc.closed: - have_timeout = True - # its about the 'in the meanwhile' :) - get everything - # we can in non-blocking mode. This will raise - try: - while True: - out.append(queue.get(False)) - # END until it raises Empty - except Empty: - break - # END finally, out of here - # END don't continue on closed channels - - # END abort reading if it was closed ( in the meanwhile ) - out.append(queue.get(block, ourtimeout)) - break # breakout right away + while True: + out.append(queue.get(False)) + # END pop it empty except Empty: - if timeout - (time() - st) <= 0: - # hitting timeout - have_timeout = True - break - # END abort if the user wants no more time spent here - # END handle timeout - # END endless timer loop - if have_timeout: + pass + # END ignore emptyness, we have all + break + # END handle cloased + + if time() >= endtime: + break # END stop on timeout # END for each item # END handle blocking -- cgit v1.2.1 From 53152a824f5186452504f0b68306d10ebebee416 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 14:23:58 +0200 Subject: queue: adjusted queue to be closable ( without own testing yet, except for the pool which runs it ) - its not yet stable, but should be solvable. --- lib/git/async/channel.py | 44 ++++++++++++++------------------------------ 1 file changed, 14 insertions(+), 30 deletions(-) (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 655024fe..08323582 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -6,7 +6,6 @@ from Queue import ( from util import ( AsyncQueue, - DummyLock ) from time import time @@ -56,15 +55,13 @@ class WChannel(Channel): channel :param timeout: timeout in seconds for blocking calls. :raise IOError: when writing into closed file - :raise EOFError: when writing into a non-blocking full channel - :note: may block if the channel has a limited capacity""" - if self._closed: - raise IOError("Cannot write to a closed channel") - + :raise EOFError: when writing into a non-blocking full channel""" + # let the queue handle the 'closed' attribute, we write much more often + # to an open channel than to a closed one, saving a few cycles try: self._queue.put(item, block, timeout) - except Full: - raise EOFError("Capacity of the channel was exeeded") + except ReadOnly: + raise IOError("Cannot write to a closed channel") # END exception handling def size(self): @@ -75,21 +72,10 @@ class WChannel(Channel): def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" - mutex = self._queue.mutex - mutex.acquire() - # this is atomic already, due to the GIL - no need to get the queue's mutex - print "channel.close()" + # yes, close it a little too early, better than having anyone put + # additional items self._closed = True - # now make sure that the people waiting for an item are released now - # As we it could be that some readers are already on their way to initiate - # a blocking get, we must make sure that locks never block before that happens - - # now we are the only one accessing the queue, so change it - self._queue.mutex = DummyLock() - print self._queue.not_empty._waiters - self._queue.not_empty.notify_all() - print self._queue.not_empty._waiters - mutex.release() + self._queue.set_writable(False) @property def closed(self): @@ -124,6 +110,7 @@ class RChannel(Channel): If count was < 1, a list with all items that could be read will be returned.""" # if the channel is closed for writing, we never block + # NOTE: is handled by the queue if self._wc.closed or timeout == 0: block = False @@ -160,9 +147,7 @@ class RChannel(Channel): # could be improved by a separate: no-endtime branch, saving the time calls for i in xrange(count): try: - print "about to read", i, count, block, timeout out.append(queue.get(block, timeout)) - print "got one" except Empty: pass # END ignore empty @@ -176,7 +161,6 @@ class RChannel(Channel): # Hence we pop it empty without blocking, getting as much # as we can. This effectively lets us race ( with mutexes ) # of the other threads. - print "stopped because it was closed" try: while True: out.append(queue.get(False)) @@ -186,11 +170,11 @@ class RChannel(Channel): # END ignore emptyness, we have all break - # END handle cloased - - if time() >= endtime: - break - # END stop on timeout + # END handle channel cloased + + if time() >= endtime: + break + # END stop operation on timeout # END for each item # END handle blocking return out -- cgit v1.2.1 From 619c11787742ce00a0ee8f841cec075897873c79 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 16:47:48 +0200 Subject: Its getting better already - intermediate commit before further chaning the task class --- lib/git/async/channel.py | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 08323582..5c52b1dc 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -6,6 +6,7 @@ from Queue import ( from util import ( AsyncQueue, + ReadOnly ) from time import time @@ -59,6 +60,7 @@ class WChannel(Channel): # let the queue handle the 'closed' attribute, we write much more often # to an open channel than to a closed one, saving a few cycles try: + print "putting item", item, id(self._queue.queue) self._queue.put(item, block, timeout) except ReadOnly: raise IOError("Cannot write to a closed channel") @@ -74,6 +76,7 @@ class WChannel(Channel): an error""" # yes, close it a little too early, better than having anyone put # additional items + print "closing channel", self self._closed = True self._queue.set_writable(False) @@ -102,7 +105,7 @@ class RChannel(Channel): :param count: given amount of items to read. If < 1, all items will be read :param block: if True, the call will block until an item is available :param timeout: if positive and block is True, it will block only for the - given amount of seconds. + given amount of seconds, returning the items it received so far. :return: single item in a list if count is 1, or a list of count items. If the channel was empty and count was 1, an empty list will be returned. If count was greater 1, a list with less than count items will be @@ -149,27 +152,29 @@ class RChannel(Channel): try: out.append(queue.get(block, timeout)) except Empty: - pass + # here we are only if there is nothing on the queue, + # and if we are blocking. If we are not blocking, this + # indiccates that the queue was set unwritable in the meanwhile. + # hence we can abort now to prevent reading (possibly) forever + # Besides, this is racy as all threads will rip on the channel + # without waiting until its empty + if not block: + break # END ignore empty # if we have been unblocked because the closed state changed # in the meanwhile, stop trying # NOTE: must NOT cache _wc if self._wc.closed: - # its racing time - all threads waiting for the queue - # are awake now, and we actually can't be sure its empty - # Hence we pop it empty without blocking, getting as much - # as we can. This effectively lets us race ( with mutexes ) - # of the other threads. - try: - while True: - out.append(queue.get(False)) - # END pop it empty - except Empty: - pass - # END ignore emptyness, we have all + # If we were closed, we drop out even if there might still + # be items. Now its time to get these items, according to + # our count. Just switch to unblocking mode. + # If we are to read unlimited items, this would run forever, + # but the EmptyException handler takes care of this + block = False - break + # we don't continue, but let the timer decide whether + # it wants to abort # END handle channel cloased if time() >= endtime: -- cgit v1.2.1 From 772b95631916223e472989b43f3a31f61e237f31 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 19:25:33 +0200 Subject: workerthread: adjusted to use a blocking queue, it will receive termination events only with its queue, with boosts performance into brigt green levels --- lib/git/async/channel.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 5c52b1dc..c05f7383 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -60,7 +60,6 @@ class WChannel(Channel): # let the queue handle the 'closed' attribute, we write much more often # to an open channel than to a closed one, saving a few cycles try: - print "putting item", item, id(self._queue.queue) self._queue.put(item, block, timeout) except ReadOnly: raise IOError("Cannot write to a closed channel") @@ -76,7 +75,7 @@ class WChannel(Channel): an error""" # yes, close it a little too early, better than having anyone put # additional items - print "closing channel", self + # print "closing channel", self self._closed = True self._queue.set_writable(False) -- cgit v1.2.1