From ab59f78341f1dd188aaf4c30526f6295c63438b1 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 5 Jun 2010 20:03:09 +0200 Subject: Renamed mp to async, as this is a much better name for what is actually going on. The default implementation uses threads, which ends up being nothing more than async, as they are all locked down by internal and the global interpreter lock --- lib/git/async/channel.py | 108 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 lib/git/async/channel.py (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py new file mode 100644 index 00000000..c9cbfb87 --- /dev/null +++ b/lib/git/async/channel.py @@ -0,0 +1,108 @@ +"""Contains a queue based channel implementation""" +from Queue import ( + Queue, + Empty, + Full + ) + +#{ Classes +class Channel(object): + """A channel is similar to a system pipe. It has a write end as well as one or + more read ends. If Data is in the channel, it can be read, if not the read operation + will block until data becomes available. + If the channel is closed, any read operation will result in an exception + + This base class is not instantiated directly, but instead serves as constructor + for RWChannel pairs. + + Create a new channel """ + __slots__ = tuple() + + def __new__(cls, *args): + if cls is Channel: + max_items = 0 + if len(args) == 1: + max_items = args[0] + if len(args) > 1: + raise ValueError("Specify not more than the number of items the channel should take") + wc = WChannel(max_items) + rc = RChannel(wc) + return wc, rc + # END constructor mode + return object.__new__(cls) + + +class WChannel(Channel): + """The write end of a channel""" + __slots__ = ('_closed', '_queue') + + def __init__(self, max_items=0): + """initialize this instance, able to hold max_items at once + Write calls will block if the channel is full, until someone reads from it""" + self._closed = False + self._queue = Queue(max_items) + + + #{ Interface + def write(self, item, block=True, timeout=None): + """Send an item into the channel, it can be read from the read end of the + channel accordingly + :param item: Item to send + :param block: If True, the call will block until there is free space in the + channel + :param timeout: timeout in seconds for blocking calls. + :raise IOError: when writing into closed file or when writing into a non-blocking + full channel + :note: may block if the channel has a limited capacity""" + if self._closed: + raise IOError("Cannot write to a closed channel") + + try: + self._queue.put(item, block, timeout) + except Full: + raise IOError("Capacity of the channel was exeeded") + # END exception handling + + def close(self): + """Close the channel. Multiple close calls on a closed channel are no + an error""" + self._closed = True + + @property + def closed(self): + """:return: True if the channel was closed""" + return self._closed + #} END interface + + +class RChannel(Channel): + """The read-end of a corresponding write channel""" + __slots__ = '_wc' + + def __init__(self, wchannel): + """Initialize this instance from its parent write channel""" + self._wc = wchannel + + + #{ Interface + + def read(self, block=True, timeout=None): + """:return: an item read from the channel + :param block: if True, the call will block until an item is available + :param timeout: if positive and block is True, it will block only for the + given amount of seconds. + :raise IOError: When reading from an empty channel ( if non-blocking, or + if the channel is still empty after the timeout""" + # if the channel is closed for writing, we never block + if self._wc.closed: + block = False + + try: + return self._wc._queue.get(block, timeout) + except Empty: + raise IOError("Error reading from an empty channel") + # END handle reading + + #} END interface + +#} END classes -- cgit v1.2.1 From b72e2704022d889f116e49abf3e1e5d3e3192d3b Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 01:00:12 +0200 Subject: Improved pool design and started rough implementation, top down to learn while going. Tests will be written soon for verification, its still quite theoretical --- lib/git/async/channel.py | 49 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 37 insertions(+), 12 deletions(-) (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index c9cbfb87..70daed24 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -7,7 +7,7 @@ from Queue import ( #{ Classes class Channel(object): - """A channel is similar to a system pipe. It has a write end as well as one or + """A channel is similar to a file like object. It has a write end as well as one or more read ends. If Data is in the channel, it can be read, if not the read operation will block until data becomes available. If the channel is closed, any read operation will result in an exception @@ -51,8 +51,8 @@ class WChannel(Channel): :param block: If True, the call will block until there is free space in the channel :param timeout: timeout in seconds for blocking calls. - :raise IOError: when writing into closed file or when writing into a non-blocking - full channel + :raise IOError: when writing into closed file + :raise EOFError: when writing into a non-blocking full channel :note: may block if the channel has a limited capacity""" if self._closed: raise IOError("Cannot write to a closed channel") @@ -60,9 +60,14 @@ class WChannel(Channel): try: self._queue.put(item, block, timeout) except Full: - raise IOError("Capacity of the channel was exeeded") + raise EOFError("Capacity of the channel was exeeded") # END exception handling + def size(self): + """:return: approximate number of items that could be read from the read-ends + of this channel""" + return self._queue.qsize() + def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" @@ -86,22 +91,42 @@ class RChannel(Channel): #{ Interface - def read(self, block=True, timeout=None): - """:return: an item read from the channel + def read(self, count=0, block=True, timeout=None): + """read a list of items read from the channel. The list, as a sequence + of items, is similar to the string of characters returned when reading from + file like objects. + :param count: given amount of items to read. If < 1, all items will be read :param block: if True, the call will block until an item is available :param timeout: if positive and block is True, it will block only for the given amount of seconds. - :raise IOError: When reading from an empty channel ( if non-blocking, or - if the channel is still empty after the timeout""" + :return: single item in a list if count is 1, or a list of count items. + If the channel was empty and count was 1, an empty list will be returned. + If count was greater 1, a list with less than count items will be + returned. + If count was < 1, a list with all items that could be read will be + returned.""" # if the channel is closed for writing, we never block if self._wc.closed: block = False - + + out = list() try: - return self._wc._queue.get(block, timeout) + if count == 1: + out.append(self._wc._queue.get(block, timeout)) + elif count < 1: + while True: + out.append(self._wc._queue.get(block, timeout)) + # END for each item + return out + else: + for i in xrange(count): + out.append(self._wc._queue.get(block, timeout)) + # END for each item + # END handle count except Empty: - raise IOError("Error reading from an empty channel") - # END handle reading + pass + # END handle exceptions + return out #} END interface -- cgit v1.2.1 From 867129e2950458ab75523b920a5e227e3efa8bbc Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 23:08:06 +0200 Subject: channel.read: enhanced to be sure we don't run into non-atomicity issues related to our channel closed flag, which is the only way not to block forever on read(0) channels which were closed by a thread 'in the meanwhile' --- lib/git/async/channel.py | 89 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 72 insertions(+), 17 deletions(-) (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 70daed24..0a1db26b 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -5,6 +5,9 @@ from Queue import ( Full ) +from time import time +import sys + #{ Classes class Channel(object): """A channel is similar to a file like object. It has a write end as well as one or @@ -106,26 +109,78 @@ class RChannel(Channel): If count was < 1, a list with all items that could be read will be returned.""" # if the channel is closed for writing, we never block - if self._wc.closed: + if self._wc.closed or timeout == 0: block = False - + + # in non-blocking mode, its all not a problem out = list() - try: - if count == 1: - out.append(self._wc._queue.get(block, timeout)) - elif count < 1: - while True: - out.append(self._wc._queue.get(block, timeout)) - # END for each item - return out - else: - for i in xrange(count): - out.append(self._wc._queue.get(block, timeout)) - # END for each item + queue = self._wc._queue + if not block: + # be as fast as possible in non-blocking mode, hence + # its a bit 'unrolled' + try: + if count == 1: + out.append(queue.get(False)) + elif count < 1: + while True: + out.append(queue.get(False)) + # END for each item + else: + for i in xrange(count): + out.append(queue.get(False)) + # END for each item + # END handle count + except Empty: + pass + # END handle exceptions + else: + # if we have really bad timing, the source of the channel + # marks itself closed, but before setting it, the thread + # switches to us. We read it, read False, and try to fetch + # something, and never return. The whole closed channel thing + # is not atomic ( of course ) + # This is why we never block for long, to get a chance to recheck + # for closed channels. + # We blend this into the timeout of the user + ourtimeout = 0.25 # the smaller, the more responsive, but the slower + wc = self._wc + timeout = (timeout is None and sys.maxint) or timeout # make sure we can compute with it + assert timeout != 0.0, "shouldn't block if timeout is 0" # okay safe + if timeout and ourtimeout > timeout: + ourtimeout = timeout + # END setup timeout + + # to get everything into one loop, we set the count accordingly + if count == 0: + count = sys.maxint # END handle count - except Empty: - pass - # END handle exceptions + + for i in xrange(count): + have_timeout = False + st = time() + while True: + try: + if wc.closed: + have_timeout = True + break + # END don't continue on closed channels + + # END abort reading if it was closed ( in the meanwhile ) + out.append(queue.get(block, ourtimeout)) + break # breakout right away + except Empty: + if timeout - (time() - st) <= 0: + # hitting timeout + have_timeout = True + break + # END abort if the user wants no more time spent here + # END handle timeout + # END endless timer loop + if have_timeout: + break + # END stop on timeout + # END for each item + # END handle blocking return out #} END interface -- cgit v1.2.1 From 6a252661c3bf4202a4d571f9c41d2afa48d9d75f Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 23:41:20 +0200 Subject: pool: First version which works as expected in async mode. Its just using a single task for now, but next up are dependent tasks --- lib/git/async/channel.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 0a1db26b..2add9478 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -162,7 +162,15 @@ class RChannel(Channel): try: if wc.closed: have_timeout = True - break + # its about the 'in the meanwhile' :) - get everything + # we can in non-blocking mode. This will raise + try: + while True: + out.append(queue.get(False)) + # END until it raises Empty + except Empty: + break + # END finally, out of here # END don't continue on closed channels # END abort reading if it was closed ( in the meanwhile ) -- cgit v1.2.1 From e825f8b69760e269218b1bf1991018baf3c16b04 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 23:38:08 +0200 Subject: Channel now uses the AsyncQueue, boosting performance by factor 4, its a start --- lib/git/async/channel.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 2add9478..2d5ab79c 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -1,10 +1,10 @@ """Contains a queue based channel implementation""" from Queue import ( - Queue, Empty, Full ) +from util import AsyncQueue from time import time import sys @@ -43,7 +43,7 @@ class WChannel(Channel): """initialize this instance, able to hold max_items at once Write calls will block if the channel is full, until someone reads from it""" self._closed = False - self._queue = Queue(max_items) + self._queue = AsyncQueue(max_items) #{ Interface -- cgit v1.2.1 From 3776f7a766851058f6435b9f606b16766425d7ca Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 13:24:44 +0200 Subject: The new channeldesign actually works, but it also shows that its located at the wrong spot. The channel is nothing more than an adapter allowing to read multiple items from a thread-safe queue, the queue itself though must be 'closable' for writing, or needs something like a writable flag. --- lib/git/async/channel.py | 108 ++++++++++++++++++++++++----------------------- 1 file changed, 56 insertions(+), 52 deletions(-) (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 2d5ab79c..655024fe 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -4,7 +4,11 @@ from Queue import ( Full ) -from util import AsyncQueue +from util import ( + AsyncQueue, + DummyLock + ) + from time import time import sys @@ -23,12 +27,9 @@ class Channel(object): def __new__(cls, *args): if cls is Channel: - max_items = 0 - if len(args) == 1: - max_items = args[0] - if len(args) > 1: - raise ValueError("Specify not more than the number of items the channel should take") - wc = WChannel(max_items) + if len(args) > 0: + raise ValueError("Cannot take any arguments when creating a new channel") + wc = WChannel() rc = RChannel(wc) return wc, rc # END constructor mode @@ -39,11 +40,11 @@ class WChannel(Channel): """The write end of a channel""" __slots__ = ('_closed', '_queue') - def __init__(self, max_items=0): + def __init__(self): """initialize this instance, able to hold max_items at once Write calls will block if the channel is full, until someone reads from it""" self._closed = False - self._queue = AsyncQueue(max_items) + self._queue = AsyncQueue() #{ Interface @@ -74,7 +75,21 @@ class WChannel(Channel): def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" + mutex = self._queue.mutex + mutex.acquire() + # this is atomic already, due to the GIL - no need to get the queue's mutex + print "channel.close()" self._closed = True + # now make sure that the people waiting for an item are released now + # As we it could be that some readers are already on their way to initiate + # a blocking get, we must make sure that locks never block before that happens + + # now we are the only one accessing the queue, so change it + self._queue.mutex = DummyLock() + print self._queue.not_empty._waiters + self._queue.not_empty.notify_all() + print self._queue.not_empty._waiters + mutex.release() @property def closed(self): @@ -134,58 +149,47 @@ class RChannel(Channel): pass # END handle exceptions else: - # if we have really bad timing, the source of the channel - # marks itself closed, but before setting it, the thread - # switches to us. We read it, read False, and try to fetch - # something, and never return. The whole closed channel thing - # is not atomic ( of course ) - # This is why we never block for long, to get a chance to recheck - # for closed channels. - # We blend this into the timeout of the user - ourtimeout = 0.25 # the smaller, the more responsive, but the slower - wc = self._wc - timeout = (timeout is None and sys.maxint) or timeout # make sure we can compute with it - assert timeout != 0.0, "shouldn't block if timeout is 0" # okay safe - if timeout and ourtimeout > timeout: - ourtimeout = timeout - # END setup timeout - # to get everything into one loop, we set the count accordingly if count == 0: count = sys.maxint # END handle count + endtime = sys.maxint # allows timeout for whole operation + if timeout is not None: + endtime = time() + timeout + # could be improved by a separate: no-endtime branch, saving the time calls for i in xrange(count): - have_timeout = False - st = time() - while True: + try: + print "about to read", i, count, block, timeout + out.append(queue.get(block, timeout)) + print "got one" + except Empty: + pass + # END ignore empty + + # if we have been unblocked because the closed state changed + # in the meanwhile, stop trying + # NOTE: must NOT cache _wc + if self._wc.closed: + # its racing time - all threads waiting for the queue + # are awake now, and we actually can't be sure its empty + # Hence we pop it empty without blocking, getting as much + # as we can. This effectively lets us race ( with mutexes ) + # of the other threads. + print "stopped because it was closed" try: - if wc.closed: - have_timeout = True - # its about the 'in the meanwhile' :) - get everything - # we can in non-blocking mode. This will raise - try: - while True: - out.append(queue.get(False)) - # END until it raises Empty - except Empty: - break - # END finally, out of here - # END don't continue on closed channels - - # END abort reading if it was closed ( in the meanwhile ) - out.append(queue.get(block, ourtimeout)) - break # breakout right away + while True: + out.append(queue.get(False)) + # END pop it empty except Empty: - if timeout - (time() - st) <= 0: - # hitting timeout - have_timeout = True - break - # END abort if the user wants no more time spent here - # END handle timeout - # END endless timer loop - if have_timeout: + pass + # END ignore emptyness, we have all + break + # END handle cloased + + if time() >= endtime: + break # END stop on timeout # END for each item # END handle blocking -- cgit v1.2.1 From 53152a824f5186452504f0b68306d10ebebee416 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 14:23:58 +0200 Subject: queue: adjusted queue to be closable ( without own testing yet, except for the pool which runs it ) - its not yet stable, but should be solvable. --- lib/git/async/channel.py | 44 ++++++++++++++------------------------------ 1 file changed, 14 insertions(+), 30 deletions(-) (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 655024fe..08323582 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -6,7 +6,6 @@ from Queue import ( from util import ( AsyncQueue, - DummyLock ) from time import time @@ -56,15 +55,13 @@ class WChannel(Channel): channel :param timeout: timeout in seconds for blocking calls. :raise IOError: when writing into closed file - :raise EOFError: when writing into a non-blocking full channel - :note: may block if the channel has a limited capacity""" - if self._closed: - raise IOError("Cannot write to a closed channel") - + :raise EOFError: when writing into a non-blocking full channel""" + # let the queue handle the 'closed' attribute, we write much more often + # to an open channel than to a closed one, saving a few cycles try: self._queue.put(item, block, timeout) - except Full: - raise EOFError("Capacity of the channel was exeeded") + except ReadOnly: + raise IOError("Cannot write to a closed channel") # END exception handling def size(self): @@ -75,21 +72,10 @@ class WChannel(Channel): def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" - mutex = self._queue.mutex - mutex.acquire() - # this is atomic already, due to the GIL - no need to get the queue's mutex - print "channel.close()" + # yes, close it a little too early, better than having anyone put + # additional items self._closed = True - # now make sure that the people waiting for an item are released now - # As we it could be that some readers are already on their way to initiate - # a blocking get, we must make sure that locks never block before that happens - - # now we are the only one accessing the queue, so change it - self._queue.mutex = DummyLock() - print self._queue.not_empty._waiters - self._queue.not_empty.notify_all() - print self._queue.not_empty._waiters - mutex.release() + self._queue.set_writable(False) @property def closed(self): @@ -124,6 +110,7 @@ class RChannel(Channel): If count was < 1, a list with all items that could be read will be returned.""" # if the channel is closed for writing, we never block + # NOTE: is handled by the queue if self._wc.closed or timeout == 0: block = False @@ -160,9 +147,7 @@ class RChannel(Channel): # could be improved by a separate: no-endtime branch, saving the time calls for i in xrange(count): try: - print "about to read", i, count, block, timeout out.append(queue.get(block, timeout)) - print "got one" except Empty: pass # END ignore empty @@ -176,7 +161,6 @@ class RChannel(Channel): # Hence we pop it empty without blocking, getting as much # as we can. This effectively lets us race ( with mutexes ) # of the other threads. - print "stopped because it was closed" try: while True: out.append(queue.get(False)) @@ -186,11 +170,11 @@ class RChannel(Channel): # END ignore emptyness, we have all break - # END handle cloased - - if time() >= endtime: - break - # END stop on timeout + # END handle channel cloased + + if time() >= endtime: + break + # END stop operation on timeout # END for each item # END handle blocking return out -- cgit v1.2.1 From 619c11787742ce00a0ee8f841cec075897873c79 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 16:47:48 +0200 Subject: Its getting better already - intermediate commit before further chaning the task class --- lib/git/async/channel.py | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 08323582..5c52b1dc 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -6,6 +6,7 @@ from Queue import ( from util import ( AsyncQueue, + ReadOnly ) from time import time @@ -59,6 +60,7 @@ class WChannel(Channel): # let the queue handle the 'closed' attribute, we write much more often # to an open channel than to a closed one, saving a few cycles try: + print "putting item", item, id(self._queue.queue) self._queue.put(item, block, timeout) except ReadOnly: raise IOError("Cannot write to a closed channel") @@ -74,6 +76,7 @@ class WChannel(Channel): an error""" # yes, close it a little too early, better than having anyone put # additional items + print "closing channel", self self._closed = True self._queue.set_writable(False) @@ -102,7 +105,7 @@ class RChannel(Channel): :param count: given amount of items to read. If < 1, all items will be read :param block: if True, the call will block until an item is available :param timeout: if positive and block is True, it will block only for the - given amount of seconds. + given amount of seconds, returning the items it received so far. :return: single item in a list if count is 1, or a list of count items. If the channel was empty and count was 1, an empty list will be returned. If count was greater 1, a list with less than count items will be @@ -149,27 +152,29 @@ class RChannel(Channel): try: out.append(queue.get(block, timeout)) except Empty: - pass + # here we are only if there is nothing on the queue, + # and if we are blocking. If we are not blocking, this + # indiccates that the queue was set unwritable in the meanwhile. + # hence we can abort now to prevent reading (possibly) forever + # Besides, this is racy as all threads will rip on the channel + # without waiting until its empty + if not block: + break # END ignore empty # if we have been unblocked because the closed state changed # in the meanwhile, stop trying # NOTE: must NOT cache _wc if self._wc.closed: - # its racing time - all threads waiting for the queue - # are awake now, and we actually can't be sure its empty - # Hence we pop it empty without blocking, getting as much - # as we can. This effectively lets us race ( with mutexes ) - # of the other threads. - try: - while True: - out.append(queue.get(False)) - # END pop it empty - except Empty: - pass - # END ignore emptyness, we have all + # If we were closed, we drop out even if there might still + # be items. Now its time to get these items, according to + # our count. Just switch to unblocking mode. + # If we are to read unlimited items, this would run forever, + # but the EmptyException handler takes care of this + block = False - break + # we don't continue, but let the timer decide whether + # it wants to abort # END handle channel cloased if time() >= endtime: -- cgit v1.2.1 From 772b95631916223e472989b43f3a31f61e237f31 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 19:25:33 +0200 Subject: workerthread: adjusted to use a blocking queue, it will receive termination events only with its queue, with boosts performance into brigt green levels --- lib/git/async/channel.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 5c52b1dc..c05f7383 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -60,7 +60,6 @@ class WChannel(Channel): # let the queue handle the 'closed' attribute, we write much more often # to an open channel than to a closed one, saving a few cycles try: - print "putting item", item, id(self._queue.queue) self._queue.put(item, block, timeout) except ReadOnly: raise IOError("Cannot write to a closed channel") @@ -76,7 +75,7 @@ class WChannel(Channel): an error""" # yes, close it a little too early, better than having anyone put # additional items - print "closing channel", self + # print "closing channel", self self._closed = True self._queue.set_writable(False) -- cgit v1.2.1 From 15941ca090a2c3c987324fc911bbc6f89e941c47 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 10:34:12 +0200 Subject: queue: fixed critical bug in the notify method, as it was not at all thread-safe, causing locks to be released multiple times. Now it runs very fast, and very stable apparently. Now its about putting previous features back in, and studying their results, before more complex task graphs can be examined --- lib/git/async/channel.py | 1 - 1 file changed, 1 deletion(-) (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index c05f7383..58c35f96 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -75,7 +75,6 @@ class WChannel(Channel): an error""" # yes, close it a little too early, better than having anyone put # additional items - # print "closing channel", self self._closed = True self._queue.set_writable(False) -- cgit v1.2.1 From 0974f8737a3c56a7c076f9d0b757c6cb106324fb Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 14:47:41 +0200 Subject: Channel: Read method revised - now it really really doesn't block anymore, and it runs faster as well, about 2/3 of the performance we have when being in serial mode --- lib/git/async/channel.py | 85 +++++++++++++++++++++--------------------------- 1 file changed, 37 insertions(+), 48 deletions(-) (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 58c35f96..3a277e7e 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -38,12 +38,11 @@ class Channel(object): class WChannel(Channel): """The write end of a channel""" - __slots__ = ('_closed', '_queue') + __slots__ = ('_queue') def __init__(self): """initialize this instance, able to hold max_items at once Write calls will block if the channel is full, until someone reads from it""" - self._closed = False self._queue = AsyncQueue() @@ -55,15 +54,10 @@ class WChannel(Channel): :param block: If True, the call will block until there is free space in the channel :param timeout: timeout in seconds for blocking calls. - :raise IOError: when writing into closed file - :raise EOFError: when writing into a non-blocking full channel""" + :raise ReadOnly: when writing into closed channel""" # let the queue handle the 'closed' attribute, we write much more often # to an open channel than to a closed one, saving a few cycles - try: - self._queue.put(item, block, timeout) - except ReadOnly: - raise IOError("Cannot write to a closed channel") - # END exception handling + self._queue.put(item, block, timeout) def size(self): """:return: approximate number of items that could be read from the read-ends @@ -73,15 +67,11 @@ class WChannel(Channel): def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" - # yes, close it a little too early, better than having anyone put - # additional items - self._closed = True self._queue.set_writable(False) - @property def closed(self): """:return: True if the channel was closed""" - return self._closed + return not self._queue.writable() #} END interface @@ -104,6 +94,7 @@ class RChannel(Channel): :param block: if True, the call will block until an item is available :param timeout: if positive and block is True, it will block only for the given amount of seconds, returning the items it received so far. + The timeout is applied to each read item, not for the whole operation. :return: single item in a list if count is 1, or a list of count items. If the channel was empty and count was 1, an empty list will be returned. If count was greater 1, a list with less than count items will be @@ -112,9 +103,11 @@ class RChannel(Channel): returned.""" # if the channel is closed for writing, we never block # NOTE: is handled by the queue - if self._wc.closed or timeout == 0: - block = False - + # We don't check for a closed state here has it costs time - most of + # the time, it will not be closed, and will bail out automatically once + # it gets closed + + # in non-blocking mode, its all not a problem out = list() queue = self._wc._queue @@ -142,42 +135,38 @@ class RChannel(Channel): count = sys.maxint # END handle count - endtime = sys.maxint # allows timeout for whole operation - if timeout is not None: - endtime = time() + timeout - # could be improved by a separate: no-endtime branch, saving the time calls - for i in xrange(count): + i = 0 + while i < count: try: out.append(queue.get(block, timeout)) + i += 1 except Empty: - # here we are only if there is nothing on the queue, - # and if we are blocking. If we are not blocking, this - # indiccates that the queue was set unwritable in the meanwhile. - # hence we can abort now to prevent reading (possibly) forever - # Besides, this is racy as all threads will rip on the channel - # without waiting until its empty - if not block: - break - # END ignore empty - - # if we have been unblocked because the closed state changed - # in the meanwhile, stop trying - # NOTE: must NOT cache _wc - if self._wc.closed: - # If we were closed, we drop out even if there might still - # be items. Now its time to get these items, according to - # our count. Just switch to unblocking mode. - # If we are to read unlimited items, this would run forever, - # but the EmptyException handler takes care of this - block = False + # here we are only if + # someone woke us up to inform us about the queue that changed + # its writable state + # The following branch checks for closed channels, and pulls + # as many items as we need and as possible, before + # leaving the loop. + if not queue.writable(): + try: + while i < count: + out.append(queue.get(False, None)) + i += 1 + # END count loop + except Empty: + break # out of count loop + # END handle absolutely empty queue + # END handle closed channel - # we don't continue, but let the timer decide whether - # it wants to abort - # END handle channel cloased - - if time() >= endtime: + # if we are here, we woke up and the channel is not closed + # Either the queue became writable again, which currently shouldn't + # be able to happen in the channel, or someone read with a timeout + # that actually timed out. + # As it timed out, which is the only reason we are here, + # we have to abort break - # END stop operation on timeout + # END ignore empty + # END for each item # END handle blocking return out -- cgit v1.2.1 From 57a4e09294230a36cc874a6272c71757c48139f2 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 15:29:47 +0200 Subject: Channel: removed pseudoconstructor, which clearly improves the design and makes it easier to constomize pool: in serial mode, created channels will be serial-only, which brings 15% of performance --- lib/git/async/channel.py | 36 +++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 3a277e7e..bb118f30 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -6,6 +6,7 @@ from Queue import ( from util import ( AsyncQueue, + SyncQueue, ReadOnly ) @@ -24,27 +25,19 @@ class Channel(object): Create a new channel """ __slots__ = tuple() - - def __new__(cls, *args): - if cls is Channel: - if len(args) > 0: - raise ValueError("Cannot take any arguments when creating a new channel") - wc = WChannel() - rc = RChannel(wc) - return wc, rc - # END constructor mode - return object.__new__(cls) class WChannel(Channel): - """The write end of a channel""" + """The write end of a channel - it is thread-safe""" __slots__ = ('_queue') + # The queue to use to store the actual data + QueueCls = AsyncQueue + def __init__(self): """initialize this instance, able to hold max_items at once Write calls will block if the channel is full, until someone reads from it""" - self._queue = AsyncQueue() - + self._queue = self.QueueCls() #{ Interface def write(self, item, block=True, timeout=None): @@ -75,6 +68,12 @@ class WChannel(Channel): #} END interface +class SerialWChannel(WChannel): + """A slightly faster version of a WChannel, which sacrificed thead-safety for + performance""" + QueueCls = SyncQueue + + class RChannel(Channel): """The read-end of a corresponding write channel""" __slots__ = '_wc' @@ -174,3 +173,14 @@ class RChannel(Channel): #} END interface #} END classes + +#{ Constructors +def mkchannel(wctype = WChannel, rctype = RChannel): + """Create a channel, which consists of one write end and one read end + :return: tuple(write_channel, read_channel) + :param wctype: The type of the write channel to instantiate + :param rctype: The type of the read channel to instantiate""" + wc = wctype() + rc = rctype(wc) + return wc, rc +#} END constructors -- cgit v1.2.1 From ea81f14dafbfb24d70373c74b5f8dabf3f2225d9 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 16:38:21 +0200 Subject: Channel: Callbacks reviewed - they are now part of Subclasses of the default channel implementation, one of which is used as base by the Pool Read channel, releasing it of the duty to call these itself. The write channel with callback subclass allows the transformation of the item to be written --- lib/git/async/channel.py | 51 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index bb118f30..abb31035 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -68,6 +68,32 @@ class WChannel(Channel): #} END interface +class CallbackWChannel(WChannel): + """The write end of a channel which allows you to setup a callback to be + called after an item was written to the channel""" + __slots__ = ('_pre_cb') + + def __init__(self): + WChannel.__init__(self) + self._pre_cb = None + + def set_pre_cb(self, fun = lambda item: item): + """Install a callback to be called before the given item is written. + It returns a possibly altered item which will be written to the channel + instead, making it useful for pre-write item conversions. + Providing None uninstalls the current method. + :return: the previously installed function or None + :note: Must be thread-safe if the channel is used in multiple threads""" + prev = self._pre_cb + self._pre_cb = fun + return prev + + def write(self, item, block=True, timeout=None): + if self._pre_cb: + item = self._pre_cb(item) + WChannel.write(self, item, block, timeout) + + class SerialWChannel(WChannel): """A slightly faster version of a WChannel, which sacrificed thead-safety for performance""" @@ -171,7 +197,32 @@ class RChannel(Channel): return out #} END interface + +class CallbackRChannel(RChannel): + """A channel which sends a callback before items are read from the channel""" + __slots__ = "_pre_cb" + + def __init__(self, wc): + RChannel.__init__(self, wc) + self._pre_cb = None + + def set_pre_cb(self, fun = lambda count: None): + """Install a callback to call with the item count to be read before any + item is actually read from the channel. + Exceptions will be propagated. + If a function is not provided, the call is effectively uninstalled. + :return: the previously installed callback or None + :note: The callback must be threadsafe if the channel is used by multiple threads.""" + prev = self._pre_cb + self._pre_cb = fun + return prev + def read(self, count=0, block=True, timeout=None): + if self._pre_cb: + self._pre_cb(count) + return RChannel.read(self, count, block, timeout) + + #} END classes #{ Constructors -- cgit v1.2.1 From 55e757928e493ce93056822d510482e4ffcaac2d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 14:39:57 +0200 Subject: channel: Changed design to be more logical - a channel now has any amount of readers and writers, a ready is not connected to its writer anymore. This changes the refcounting of course, which is why the auto-cleanup for the pool is currently broken. The benefit of this are faster writes to the channel, reading didn't improve, refcounts should be clearer now --- lib/git/async/channel.py | 102 ++++++++++++++++++++++------------------------- 1 file changed, 47 insertions(+), 55 deletions(-) (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index abb31035..9b019707 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -21,61 +21,57 @@ class Channel(object): If the channel is closed, any read operation will result in an exception This base class is not instantiated directly, but instead serves as constructor - for RWChannel pairs. + for Rwriter pairs. Create a new channel """ - __slots__ = tuple() - - -class WChannel(Channel): - """The write end of a channel - it is thread-safe""" - __slots__ = ('_queue') + __slots__ = 'queue' # The queue to use to store the actual data QueueCls = AsyncQueue def __init__(self): - """initialize this instance, able to hold max_items at once - Write calls will block if the channel is full, until someone reads from it""" - self._queue = self.QueueCls() - - #{ Interface - def write(self, item, block=True, timeout=None): - """Send an item into the channel, it can be read from the read end of the - channel accordingly - :param item: Item to send - :param block: If True, the call will block until there is free space in the - channel - :param timeout: timeout in seconds for blocking calls. - :raise ReadOnly: when writing into closed channel""" - # let the queue handle the 'closed' attribute, we write much more often - # to an open channel than to a closed one, saving a few cycles - self._queue.put(item, block, timeout) - + """initialize this instance with a queue holding the channel contents""" + self.queue = self.QueueCls() + + +class SerialChannel(Channel): + """A slightly faster version of a Channel, which sacrificed thead-safety for performance""" + QueueCls = SyncQueue + + +class Writer(object): + """The write end of a channel, a file-like interface for a channel""" + __slots__ = ('write', 'channel') + + def __init__(self, channel): + """Initialize the writer to use the given channel""" + self.channel = channel + self.write = channel.queue.put + + #{ Interface def size(self): - """:return: approximate number of items that could be read from the read-ends - of this channel""" - return self._queue.qsize() + return self.channel.queue.qsize() def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" - self._queue.set_writable(False) + self.channel.queue.set_writable(False) def closed(self): """:return: True if the channel was closed""" - return not self._queue.writable() + return not self.channel.queue.writable() #} END interface -class CallbackWChannel(WChannel): +class CallbackWriter(Writer): """The write end of a channel which allows you to setup a callback to be called after an item was written to the channel""" __slots__ = ('_pre_cb') - def __init__(self): - WChannel.__init__(self) + def __init__(self, channel): + Writer.__init__(self, channel) self._pre_cb = None + self.write = self._write def set_pre_cb(self, fun = lambda item: item): """Install a callback to be called before the given item is written. @@ -88,25 +84,19 @@ class CallbackWChannel(WChannel): self._pre_cb = fun return prev - def write(self, item, block=True, timeout=None): + def _write(self, item, block=True, timeout=None): if self._pre_cb: item = self._pre_cb(item) - WChannel.write(self, item, block, timeout) + self.channel.queue.put(item, block, timeout) - -class SerialWChannel(WChannel): - """A slightly faster version of a WChannel, which sacrificed thead-safety for - performance""" - QueueCls = SyncQueue - -class RChannel(Channel): - """The read-end of a corresponding write channel""" - __slots__ = '_wc' +class Reader(object): + """Allows reading from a channel""" + __slots__ = 'channel' - def __init__(self, wchannel): + def __init__(self, channel): """Initialize this instance from its parent write channel""" - self._wc = wchannel + self.channel = channel #{ Interface @@ -135,7 +125,7 @@ class RChannel(Channel): # in non-blocking mode, its all not a problem out = list() - queue = self._wc._queue + queue = self.channel.queue if not block: # be as fast as possible in non-blocking mode, hence # its a bit 'unrolled' @@ -198,12 +188,12 @@ class RChannel(Channel): #} END interface -class CallbackRChannel(RChannel): +class CallbackReader(Reader): """A channel which sends a callback before items are read from the channel""" __slots__ = "_pre_cb" - def __init__(self, wc): - RChannel.__init__(self, wc) + def __init__(self, channel): + Reader.__init__(self, channel) self._pre_cb = None def set_pre_cb(self, fun = lambda count: None): @@ -220,18 +210,20 @@ class CallbackRChannel(RChannel): def read(self, count=0, block=True, timeout=None): if self._pre_cb: self._pre_cb(count) - return RChannel.read(self, count, block, timeout) + return Reader.read(self, count, block, timeout) #} END classes #{ Constructors -def mkchannel(wctype = WChannel, rctype = RChannel): - """Create a channel, which consists of one write end and one read end - :return: tuple(write_channel, read_channel) +def mkchannel(ctype = Channel, wtype = Writer, rtype = Reader): + """Create a channel, with a reader and a writer + :return: tuple(reader, writer) + :param ctype: Channel to instantiate :param wctype: The type of the write channel to instantiate :param rctype: The type of the read channel to instantiate""" - wc = wctype() - rc = rctype(wc) + c = ctype() + wc = wtype(c) + rc = rtype(c) return wc, rc #} END constructors -- cgit v1.2.1 From cac6e06cc9ef2903a15e594186445f3baa989a1a Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 16:58:44 +0200 Subject: test_task: fixed import error, made all modules from x import * safe --- lib/git/async/channel.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 9b019707..ae476cda 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -13,6 +13,9 @@ from util import ( from time import time import sys +__all__ = ('Channel', 'SerialChannel', 'Writer', 'CallbackWriter', 'Reader', + 'CallbackReader', 'mkchannel', 'ReadOnly') + #{ Classes class Channel(object): """A channel is similar to a file like object. It has a write end as well as one or -- cgit v1.2.1 From be8955a0fbb77d673587974b763f17c214904b57 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 12 Jun 2010 11:19:18 +0200 Subject: Cleaned up channel design, Reader and Writer bases don't require a channel anymore, but are abstract. Added IteratorReader, implementing the reader interface from an iterator. The implementation moved from the TaskIterator to the channel --- lib/git/async/channel.py | 139 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 116 insertions(+), 23 deletions(-) (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index ae476cda..79cb5294 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -11,10 +11,12 @@ from util import ( ) from time import time +import threading import sys -__all__ = ('Channel', 'SerialChannel', 'Writer', 'CallbackWriter', 'Reader', - 'CallbackReader', 'mkchannel', 'ReadOnly') +__all__ = ('Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter', + 'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly', + 'IteratorReader') #{ Classes class Channel(object): @@ -43,15 +45,50 @@ class SerialChannel(Channel): class Writer(object): + """A writer is an object providing write access to a possibly blocking reading device""" + __slots__ = tuple() + + #{ Interface + + def __init__(self, device): + """Initialize the instance with the device to write to""" + + def write(self, item, block=True, timeout=None): + """Write the given item into the device + :param block: True if the device may block until space for the item is available + :param timeout: The time in seconds to wait for the device to become ready + in blocking mode""" + raise NotImplementedError() + + def size(self): + """:return: number of items already in the device, they could be read with a reader""" + raise NotImplementedError() + + def close(self): + """Close the channel. Multiple close calls on a closed channel are no + an error""" + raise NotImplementedError() + + def closed(self): + """:return: True if the channel was closed""" + raise NotImplementedError() + + #} END interface + + +class ChannelWriter(Writer): """The write end of a channel, a file-like interface for a channel""" - __slots__ = ('write', 'channel') + __slots__ = ('channel', '_put') def __init__(self, channel): """Initialize the writer to use the given channel""" self.channel = channel - self.write = channel.queue.put + self._put = self.channel.queue.put #{ Interface + def write(self, item, block=False, timeout=None): + return self._put(item, block, timeout) + def size(self): return self.channel.queue.qsize() @@ -66,15 +103,14 @@ class Writer(object): #} END interface -class CallbackWriter(Writer): +class CallbackChannelWriter(ChannelWriter): """The write end of a channel which allows you to setup a callback to be called after an item was written to the channel""" __slots__ = ('_pre_cb') def __init__(self, channel): - Writer.__init__(self, channel) + super(CallbackChannelWriter, self).__init__(channel) self._pre_cb = None - self.write = self._write def set_pre_cb(self, fun = lambda item: item): """Install a callback to be called before the given item is written. @@ -87,25 +123,22 @@ class CallbackWriter(Writer): self._pre_cb = fun return prev - def _write(self, item, block=True, timeout=None): + def write(self, item, block=True, timeout=None): if self._pre_cb: item = self._pre_cb(item) - self.channel.queue.put(item, block, timeout) + super(CallbackChannelWriter, self).write(item, block, timeout) class Reader(object): - """Allows reading from a channel""" - __slots__ = 'channel' + """Allows reading from a device""" + __slots__ = tuple() - def __init__(self, channel): - """Initialize this instance from its parent write channel""" - self.channel = channel - - #{ Interface - + def __init__(self, device): + """Initialize the instance with the device to read from""" + def read(self, count=0, block=True, timeout=None): - """read a list of items read from the channel. The list, as a sequence + """read a list of items read from the device. The list, as a sequence of items, is similar to the string of characters returned when reading from file like objects. :param count: given amount of items to read. If < 1, all items will be read @@ -114,11 +147,25 @@ class Reader(object): given amount of seconds, returning the items it received so far. The timeout is applied to each read item, not for the whole operation. :return: single item in a list if count is 1, or a list of count items. - If the channel was empty and count was 1, an empty list will be returned. + If the device was empty and count was 1, an empty list will be returned. If count was greater 1, a list with less than count items will be returned. If count was < 1, a list with all items that could be read will be returned.""" + raise NotImplementedError() + + +class ChannelReader(Reader): + """Allows reading from a channel. The reader is thread-safe if the channel is as well""" + __slots__ = 'channel' + + def __init__(self, channel): + """Initialize this instance from its parent write channel""" + self.channel = channel + + #{ Interface + + def read(self, count=0, block=True, timeout=None): # if the channel is closed for writing, we never block # NOTE: is handled by the queue # We don't check for a closed state here has it costs time - most of @@ -191,12 +238,12 @@ class Reader(object): #} END interface -class CallbackReader(Reader): +class CallbackChannelReader(ChannelReader): """A channel which sends a callback before items are read from the channel""" __slots__ = "_pre_cb" def __init__(self, channel): - Reader.__init__(self, channel) + super(CallbackChannelReader, self).__init__(channel) self._pre_cb = None def set_pre_cb(self, fun = lambda count: None): @@ -213,13 +260,59 @@ class CallbackReader(Reader): def read(self, count=0, block=True, timeout=None): if self._pre_cb: self._pre_cb(count) - return Reader.read(self, count, block, timeout) + return super(CallbackChannelReader, self).read(count, block, timeout) + +class IteratorReader(Reader): + """A Reader allowing to read items from an iterator, instead of a channel. + Reads will never block. Its thread-safe""" + __slots__ = ("_empty", '_iter', '_lock') + + # the type of the lock to use when reading from the iterator + lock_type = threading.Lock + + def __init__(self, iterator): + self._empty = False + if not hasattr(iterator, 'next'): + raise ValueError("Iterator %r needs a next() function" % iterator) + self._iter = iterator + self._lock = self.lock_type() + + def read(self, count=0, block=True, timeout=None): + """Non-Blocking implementation of read""" + # not threadsafe, but worst thing that could happen is that + # we try to get items one more time + if self._empty: + return list() + # END early abort + + self._lock.acquire() + try: + if count == 0: + self._empty = True + return list(self._iter) + else: + out = list() + it = self._iter + for i in xrange(count): + try: + out.append(it.next()) + except StopIteration: + self._empty = True + break + # END handle empty iterator + # END for each item to take + return out + # END handle count + finally: + self._lock.release() + # END handle locking + #} END classes #{ Constructors -def mkchannel(ctype = Channel, wtype = Writer, rtype = Reader): +def mkchannel(ctype = Channel, wtype = ChannelWriter, rtype = ChannelReader): """Create a channel, with a reader and a writer :return: tuple(reader, writer) :param ctype: Channel to instantiate -- cgit v1.2.1 From 1d8a577ffc6ad7ce1465001ddebdc157aecc1617 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 12 Jun 2010 11:41:10 +0200 Subject: channel: cleaned up inheritance hierarchy, adding mixing for callback functionality - previously the callback functionality was bound to channel based readers/writers --- lib/git/async/channel.py | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) (limited to 'lib/git/async/channel.py') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 79cb5294..a29ff17c 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -103,13 +103,14 @@ class ChannelWriter(Writer): #} END interface -class CallbackChannelWriter(ChannelWriter): +class CallbackWriterMixin(object): """The write end of a channel which allows you to setup a callback to be called after an item was written to the channel""" - __slots__ = ('_pre_cb') + # slots don't work with mixin's :( + # __slots__ = ('_pre_cb') - def __init__(self, channel): - super(CallbackChannelWriter, self).__init__(channel) + def __init__(self, *args): + super(CallbackWriterMixin, self).__init__(*args) self._pre_cb = None def set_pre_cb(self, fun = lambda item: item): @@ -126,7 +127,12 @@ class CallbackChannelWriter(ChannelWriter): def write(self, item, block=True, timeout=None): if self._pre_cb: item = self._pre_cb(item) - super(CallbackChannelWriter, self).write(item, block, timeout) + super(CallbackWriterMixin, self).write(item, block, timeout) + + +class CallbackChannelWriter(CallbackWriterMixin, ChannelWriter): + """Implements a channel writer with callback functionality""" + pass class Reader(object): @@ -238,12 +244,14 @@ class ChannelReader(Reader): #} END interface -class CallbackChannelReader(ChannelReader): + +class CallbackReaderMixin(object): """A channel which sends a callback before items are read from the channel""" - __slots__ = "_pre_cb" + # unfortunately, slots can only use direct inheritance, have to turn it off :( + # __slots__ = "_pre_cb" - def __init__(self, channel): - super(CallbackChannelReader, self).__init__(channel) + def __init__(self, *args): + super(CallbackReaderMixin, self).__init__(*args) self._pre_cb = None def set_pre_cb(self, fun = lambda count: None): @@ -260,7 +268,12 @@ class CallbackChannelReader(ChannelReader): def read(self, count=0, block=True, timeout=None): if self._pre_cb: self._pre_cb(count) - return super(CallbackChannelReader, self).read(count, block, timeout) + return super(CallbackReaderMixin, self).read(count, block, timeout) + + +class CallbackChannelReader(CallbackReaderMixin, ChannelReader): + """Implements a channel reader with callback functionality""" + pass class IteratorReader(Reader): -- cgit v1.2.1