From b3cde0ee162b8f0cb67da981311c8f9c16050a62 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 18:13:21 +0200 Subject: First step of testing the pool - tasks have been separated into a new module including own tests, their design improved to prepare them for some specifics that would be needed for multiprocessing support --- lib/git/async/task.py | 144 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 144 insertions(+) create mode 100644 lib/git/async/task.py (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py new file mode 100644 index 00000000..d2422773 --- /dev/null +++ b/lib/git/async/task.py @@ -0,0 +1,144 @@ +from graph import Node +import threading +import new + +class OutputChannelTask(Node): + """Abstracts a named task as part of a set of interdependent tasks, which contains + additional information on how the task should be queued and processed. + + Results of the item processing are sent to an output channel, which is to be + set by the creator""" + __slots__ = ( '_read', # method to yield items to process + '_out_wc', # output write channel + '_exc', # exception caught + 'fun', # function to call with items read + 'min_count', # minimum amount of items to produce, None means no override + 'max_chunksize', # maximium amount of items to process per process call + 'apply_single' # apply single items even if multiple where read + ) + + def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0): + Node.__init__(self, id) + self._read = None # to be set by subclasss + self._out_wc = None # to be set later + self._exc = None + self.fun = fun + self.min_count = None + self.max_chunksize = 0 # note set + self.apply_single = apply_single + + def is_done(self): + """:return: True if we are finished processing""" + return self._out_wc.closed + + def set_done(self): + """Set ourselves to being done, has we have completed the processing""" + self._out_wc.close() + + def error(self): + """:return: Exception caught during last processing or None""" + return self._exc + + def process(self, count=0): + """Process count items and send the result individually to the output channel""" + items = self._read(count) + + try: + if self.apply_single: + for item in items: + self._out_wc.write(self.fun(item)) + # END for each item + else: + self._out_wc.write(self.fun(items)) + # END handle single apply + except Exception, e: + self._exc = e + self.set_done() + # END exception handling + + # if we didn't get all demanded items, which is also the case if count is 0 + # we have depleted the input channel and are done + if len(items) != count: + self.set_done() + # END handle done state + #{ Configuration + + +class ThreadTaskBase(object): + """Describes tasks which can be used with theaded pools""" + pass + + +class InputIteratorTaskBase(OutputChannelTask): + """Implements a task which processes items from an iterable in a multi-processing + safe manner""" + __slots__ = ('_iterator', '_lock') + # the type of the lock to use when reading from the iterator + lock_type = None + + def __init__(self, iterator, *args, **kwargs): + OutputChannelTask.__init__(self, *args, **kwargs) + if not hasattr(iterator, 'next'): + raise ValueError("Iterator %r needs a next() function" % iterator) + self._iterator = iterator + self._lock = self.lock_type() + self._read = self.__read + + def __read(self, count=0): + """Read count items from the iterator, and return them""" + self._lock.acquire() + try: + if count == 0: + return list(self._iterator) + else: + out = list() + it = self._iterator + for i in xrange(count): + try: + out.append(it.next()) + except StopIteration: + break + # END handle empty iterator + # END for each item to take + return out + # END handle count + finally: + self._lock.release() + # END handle locking + + +class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase): + """An input iterator for threaded pools""" + lock_type = threading.Lock + + +class InputChannelTask(OutputChannelTask): + """Uses an input channel as source for reading items + For instantiation, it takes all arguments of its base, the first one needs + to be the input channel to read from though.""" + __slots__ = ( + 'in_rc', # channel to read items from + '_pool_ref' # to be set by Pool + ) + + def __init__(self, in_rc, *args, **kwargs): + OutputChannelTask.__init__(self, *args, **kwargs) + self._in_rc = in_rc + + def process(self, count=1): + """Verify our setup, and do some additional checking, before the + base implementation can permanently perform all operations""" + self._read = self._in_rc.read + # make sure we don't trigger the pool if we read from a pool channel which + # belongs to our own pool. Channels from different pools are fine though, + # there we want to trigger its computation + if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref(): + self._read = self._in_rc._read + + # permanently install our base for processing + self.process = new.instancemethod(OutputChannelTask.__dict__['process'], self, type(self)) + + # and call it + return OutputChannelTask.process(self, count) + #{ Configuration + -- cgit v1.2.1 From 1b27292936c81637f6b9a7141dafaad1126f268e Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 21:15:13 +0200 Subject: Plenty of fixes in the chunking routine, made possible by a serialized chunking test. Next up, actual async processing --- lib/git/async/task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index d2422773..ec650237 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -58,7 +58,7 @@ class OutputChannelTask(Node): # if we didn't get all demanded items, which is also the case if count is 0 # we have depleted the input channel and are done - if len(items) != count: + if not items or len(items) != count: self.set_done() # END handle done state #{ Configuration -- cgit v1.2.1 From 6a252661c3bf4202a4d571f9c41d2afa48d9d75f Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 23:41:20 +0200 Subject: pool: First version which works as expected in async mode. Its just using a single task for now, but next up are dependent tasks --- lib/git/async/task.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index ec650237..3137746c 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -7,7 +7,13 @@ class OutputChannelTask(Node): additional information on how the task should be queued and processed. Results of the item processing are sent to an output channel, which is to be - set by the creator""" + set by the creator + + * **min_count** assures that not less than min_count items will be processed per call. + * **max_chunksize** assures that multi-threading is happening in smaller chunks. If + someone wants all items to be processed, using read(0), the whole task would go to + one worker, as well as dependent tasks. If you want finer granularity , you can + specify this here, causing chunks to be no larger than max_chunksize""" __slots__ = ( '_read', # method to yield items to process '_out_wc', # output write channel '_exc', # exception caught @@ -42,7 +48,6 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) - try: if self.apply_single: for item in items: @@ -58,6 +63,9 @@ class OutputChannelTask(Node): # if we didn't get all demanded items, which is also the case if count is 0 # we have depleted the input channel and are done + # We could check our output channel for how many items we have and put that + # into the equation, but whats important is that we were asked to produce + # count items. if not items or len(items) != count: self.set_done() # END handle done state -- cgit v1.2.1 From 583cd8807259a69fc01874b798f657c1f9ab7828 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 19:12:44 +0200 Subject: Moved pool utilities into util module, fixed critical issue that caused havok - lets call this a safe-state --- lib/git/async/task.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 3137746c..f106c381 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,5 +1,7 @@ from graph import Node + import threading +import weakref import new class OutputChannelTask(Node): @@ -17,6 +19,7 @@ class OutputChannelTask(Node): __slots__ = ( '_read', # method to yield items to process '_out_wc', # output write channel '_exc', # exception caught + '_done', # True if we are done 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -28,6 +31,7 @@ class OutputChannelTask(Node): self._read = None # to be set by subclasss self._out_wc = None # to be set later self._exc = None + self._done = False self.fun = fun self.min_count = None self.max_chunksize = 0 # note set @@ -35,12 +39,28 @@ class OutputChannelTask(Node): def is_done(self): """:return: True if we are finished processing""" - return self._out_wc.closed + return self._done def set_done(self): """Set ourselves to being done, has we have completed the processing""" + self._done = True + self.close() + + def set_wc(self, wc): + """Set the write channel to the given one + :note: resets it done state in order to allow proper queue handling""" + self._done = False + self._out_wc = wc + + def close(self): + """A closed task will close its channel to assure the readers will wake up + :note: its safe to call this method multiple times""" self._out_wc.close() + def is_closed(self): + """:return: True if the task's write channel is closed""" + return self._out_wc.closed + def error(self): """:return: Exception caught during last processing or None""" return self._exc @@ -148,5 +168,9 @@ class InputChannelTask(OutputChannelTask): # and call it return OutputChannelTask.process(self, count) + + def set_pool(self, pool): + """Set our pool to the given one, it will be weakref'd""" + self._pool_ref = weakref.ref(pool) #{ Configuration -- cgit v1.2.1 From 654e54d200135e665e07e9f0097d913a77f169da Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 20:01:02 +0200 Subject: task: Fixed incorrect handling of channel closure. Performance is alright for up to 2 threads, but 4 are killing the queue --- lib/git/async/task.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index f106c381..b282e371 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -2,6 +2,7 @@ from graph import Node import threading import weakref +import sys import new class OutputChannelTask(Node): @@ -44,7 +45,6 @@ class OutputChannelTask(Node): def set_done(self): """Set ourselves to being done, has we have completed the processing""" self._done = True - self.close() def set_wc(self, wc): """Set the write channel to the given one @@ -69,17 +69,25 @@ class OutputChannelTask(Node): """Process count items and send the result individually to the output channel""" items = self._read(count) try: + # increase the ref-count - we use this to determine whether anyone else + # is currently handling our output channel. As this method runs asynchronously, + # we have to make sure that the channel is closed by the last finishing task, + # which is not necessarily the one which determines that he is done + # as he couldn't read anymore items. + # The refcount will be dropped in the moment we get out of here. + wc = self._out_wc if self.apply_single: for item in items: - self._out_wc.write(self.fun(item)) + wc.write(self.fun(item)) # END for each item else: - self._out_wc.write(self.fun(items)) + wc.write(self.fun(items)) # END handle single apply except Exception, e: self._exc = e self.set_done() # END exception handling + del(wc) # if we didn't get all demanded items, which is also the case if count is 0 # we have depleted the input channel and are done @@ -89,6 +97,16 @@ class OutputChannelTask(Node): if not items or len(items) != count: self.set_done() # END handle done state + + # If we appear to be the only one left with our output channel, and are + # closed ( this could have been set in another thread as well ), make + # sure to close the output channel. + # The count is: 1 = wc itself, 2 = first reader channel, and we have only + # one, 3 is ours + x for every thread having its copy on the stack + # + 1 for the instance we provide to refcount + if self.is_done() and sys.getrefcount(self._out_wc) < 5: + self.close() + # END handle channel closure #{ Configuration -- cgit v1.2.1 From be06e87433685b5ea9cfcc131ab89c56cf8292f2 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 22:00:47 +0200 Subject: improved testing to test the actual async handling of the pool. there are still inconsistencies that need to be fixed, but it already improved, especially the 4-thread performance which now is as fast as the dual-threaded performance --- lib/git/async/task.py | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index b282e371..4e8aef54 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -178,6 +178,17 @@ class InputChannelTask(OutputChannelTask): # make sure we don't trigger the pool if we read from a pool channel which # belongs to our own pool. Channels from different pools are fine though, # there we want to trigger its computation + # PROBLEM: if the user keeps an end, but decides to put the same end into + # a task of this pool, then all items might deplete without new ones being + # produced, causing a deadlock. Just triggering the pool would be better, + # but cost's more, unnecessarily if there is just one consumer, which is + # the user. + # * could encode usage in the channel type, and fail if the refcount on + # the read-pool channel is too high + # * maybe keep track of the elements that are requested or in-production + # for each task, which would allow to precisely determine whether + # the pool as to be triggered, and bail out early. Problem would + # be the if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref(): self._read = self._in_rc._read -- cgit v1.2.1 From def0f73989047c4ddf9b11da05ad2c9c8e387331 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 23:20:37 +0200 Subject: introduced a new counter keeping track of the scheduled tasks - this prevent unnecessary tasks to be scheduled as we keep track of how many items will be produced for the task at hand. This introduces additional locking, but performns well in multithreaded mode. Performance of the master queue is still a huge issue, its currently the limiting factor, as bypassing the master queue in serial moode gives 15x performance, wich is what I would need --- lib/git/async/task.py | 47 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 2 deletions(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 4e8aef54..cf486f48 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -21,6 +21,8 @@ class OutputChannelTask(Node): '_out_wc', # output write channel '_exc', # exception caught '_done', # True if we are done + '_scheduled_items', # amount of scheduled items that will be processed in total + '_slock', # lock for scheduled items 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -33,6 +35,8 @@ class OutputChannelTask(Node): self._out_wc = None # to be set later self._exc = None self._done = False + self._scheduled_items = 0 + self._slock = threading.Lock() self.fun = fun self.min_count = None self.max_chunksize = 0 # note set @@ -50,6 +54,7 @@ class OutputChannelTask(Node): """Set the write channel to the given one :note: resets it done state in order to allow proper queue handling""" self._done = False + self._scheduled_items = 0 self._out_wc = wc def close(self): @@ -65,6 +70,21 @@ class OutputChannelTask(Node): """:return: Exception caught during last processing or None""" return self._exc + def add_scheduled_items(self, count): + """Add the given amount of scheduled items to this task""" + self._slock.acquire() + self._scheduled_items += count + self._slock.release() + + def scheduled_item_count(self): + """:return: amount of scheduled items for this task""" + self._slock.acquire() + try: + return self._scheduled_items + finally: + self._slock.release() + # END threadsafe return + def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) @@ -78,14 +98,33 @@ class OutputChannelTask(Node): wc = self._out_wc if self.apply_single: for item in items: - wc.write(self.fun(item)) + rval = self.fun(item) + # decrement afterwards, the its unscheduled once its produced + self._slock.acquire() + self._scheduled_items -= 1 + self._slock.release() + wc.write(rval) # END for each item else: - wc.write(self.fun(items)) + # shouldn't apply single be the default anyway ? + # The task designers should chunk them up in advance + rvals = self.fun(items) + self._slock.acquire() + self._scheduled_items -= len(items) + self._slock.release() + for rval in rvals: + wc.write(rval) # END handle single apply except Exception, e: self._exc = e self.set_done() + # unschedule all, we don't know how many have been produced actually + # but only if we don't apply single please + if not self.apply_single: + self._slock.acquire() + self._scheduled_items -= len(items) + self._slock.release() + # END unschedule all # END exception handling del(wc) @@ -189,6 +228,10 @@ class InputChannelTask(OutputChannelTask): # for each task, which would allow to precisely determine whether # the pool as to be triggered, and bail out early. Problem would # be the + # * Perhaps one shouldn't seek the perfect solution , but instead + # document whats working and what not, or under which conditions. + # The whole system is simple, but gets more complicated the + # smarter it wants to be. if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref(): self._read = self._in_rc._read -- cgit v1.2.1 From 3776f7a766851058f6435b9f606b16766425d7ca Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 13:24:44 +0200 Subject: The new channeldesign actually works, but it also shows that its located at the wrong spot. The channel is nothing more than an adapter allowing to read multiple items from a thread-safe queue, the queue itself though must be 'closable' for writing, or needs something like a writable flag. --- lib/git/async/task.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index cf486f48..ce701c86 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -140,10 +140,10 @@ class OutputChannelTask(Node): # If we appear to be the only one left with our output channel, and are # closed ( this could have been set in another thread as well ), make # sure to close the output channel. - # The count is: 1 = wc itself, 2 = first reader channel, and we have only - # one, 3 is ours + x for every thread having its copy on the stack + # The count is: 1 = wc itself, 2 = first reader channel, + x for every + # thread having its copy on the stack # + 1 for the instance we provide to refcount - if self.is_done() and sys.getrefcount(self._out_wc) < 5: + if self.is_done() and sys.getrefcount(self._out_wc) < 4: self.close() # END handle channel closure #{ Configuration -- cgit v1.2.1 From 619c11787742ce00a0ee8f841cec075897873c79 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 16:47:48 +0200 Subject: Its getting better already - intermediate commit before further chaning the task class --- lib/git/async/task.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index ce701c86..97521cae 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -88,6 +88,7 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) + print "task read", len(items) try: # increase the ref-count - we use this to determine whether anyone else # is currently handling our output channel. As this method runs asynchronously, @@ -117,6 +118,7 @@ class OutputChannelTask(Node): # END handle single apply except Exception, e: self._exc = e + print str(e) # TODO: REMOVE DEBUG, or make it use logging self.set_done() # unschedule all, we don't know how many have been produced actually # but only if we don't apply single please -- cgit v1.2.1 From 13dd59ba5b3228820841682b59bad6c22476ff66 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 17:25:43 +0200 Subject: task: now deletes itself once its done - for the test this doesn't change a thing as the task deletes itself too late - its time for a paradigm change, the task should be deleted with its RPoolChannel or explicitly by the user. The test needs to adapt, and shouldn't assume anything unless the RPoolChannel is gone --- lib/git/async/task.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 97521cae..dc207c33 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -23,6 +23,7 @@ class OutputChannelTask(Node): '_done', # True if we are done '_scheduled_items', # amount of scheduled items that will be processed in total '_slock', # lock for scheduled items + '_pool_ref', # to be set by Pool 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -84,6 +85,10 @@ class OutputChannelTask(Node): finally: self._slock.release() # END threadsafe return + + def set_pool(self, pool): + """Set our pool to the given one, it will be weakref'd""" + self._pool_ref = weakref.ref(pool) def process(self, count=0): """Process count items and send the result individually to the output channel""" @@ -147,6 +152,16 @@ class OutputChannelTask(Node): # + 1 for the instance we provide to refcount if self.is_done() and sys.getrefcount(self._out_wc) < 4: self.close() + # additionally, remove ourselves from the pool, this is thread-safe + # Previously the pool collected done tasks and removed them, + # but this could happen after a read finished, potentially + # leaving them on the queue until the read-handle was dropped. + # This should assure its more in-time. + # I don't like this back-ref. + pool = self._pool_ref() + if pool: + pool.del_task(self) + # END remove ourselves from the pool # END handle channel closure #{ Configuration @@ -204,8 +219,7 @@ class InputChannelTask(OutputChannelTask): For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" __slots__ = ( - 'in_rc', # channel to read items from - '_pool_ref' # to be set by Pool + 'in_rc' # channel to read items from ) def __init__(self, in_rc, *args, **kwargs): @@ -242,9 +256,5 @@ class InputChannelTask(OutputChannelTask): # and call it return OutputChannelTask.process(self, count) - - def set_pool(self, pool): - """Set our pool to the given one, it will be weakref'd""" - self._pool_ref = weakref.ref(pool) #{ Configuration -- cgit v1.2.1 From e5c0002d069382db1768349bf0c5ff40aafbf140 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 18:20:12 +0200 Subject: Revised task deletion works well, adjusted test to be creating new tasks all the time instead of reusing its own one, it was somewhat hard to manage its state over time and could cause bugs. It works okay, but it occasionally hangs, it appears to be an empty queue, have to gradually put certain things back in, although in the current mode of operation, it should never have empty queues from the pool to the user --- lib/git/async/task.py | 64 +++++++++------------------------------------------ 1 file changed, 11 insertions(+), 53 deletions(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index dc207c33..5edd40bb 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,10 +1,11 @@ from graph import Node import threading -import weakref import sys import new +getrefcount = sys.getrefcount + class OutputChannelTask(Node): """Abstracts a named task as part of a set of interdependent tasks, which contains additional information on how the task should be queued and processed. @@ -23,7 +24,6 @@ class OutputChannelTask(Node): '_done', # True if we are done '_scheduled_items', # amount of scheduled items that will be processed in total '_slock', # lock for scheduled items - '_pool_ref', # to be set by Pool 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -54,7 +54,7 @@ class OutputChannelTask(Node): def set_wc(self, wc): """Set the write channel to the given one :note: resets it done state in order to allow proper queue handling""" - self._done = False + self._done = False # TODO : fix this, this is a side-effect self._scheduled_items = 0 self._out_wc = wc @@ -86,10 +86,6 @@ class OutputChannelTask(Node): self._slock.release() # END threadsafe return - def set_pool(self, pool): - """Set our pool to the given one, it will be weakref'd""" - self._pool_ref = weakref.ref(pool) - def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) @@ -123,7 +119,7 @@ class OutputChannelTask(Node): # END handle single apply except Exception, e: self._exc = e - print str(e) # TODO: REMOVE DEBUG, or make it use logging + print "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging self.set_done() # unschedule all, we don't know how many have been produced actually # but only if we don't apply single please @@ -150,18 +146,8 @@ class OutputChannelTask(Node): # The count is: 1 = wc itself, 2 = first reader channel, + x for every # thread having its copy on the stack # + 1 for the instance we provide to refcount - if self.is_done() and sys.getrefcount(self._out_wc) < 4: + if self.is_done() and getrefcount(self._out_wc) < 4: self.close() - # additionally, remove ourselves from the pool, this is thread-safe - # Previously the pool collected done tasks and removed them, - # but this could happen after a read finished, potentially - # leaving them on the queue until the read-handle was dropped. - # This should assure its more in-time. - # I don't like this back-ref. - pool = self._pool_ref() - if pool: - pool.del_task(self) - # END remove ourselves from the pool # END handle channel closure #{ Configuration @@ -218,43 +204,15 @@ class InputChannelTask(OutputChannelTask): """Uses an input channel as source for reading items For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" - __slots__ = ( - 'in_rc' # channel to read items from - ) def __init__(self, in_rc, *args, **kwargs): OutputChannelTask.__init__(self, *args, **kwargs) - self._in_rc = in_rc - + self._read = in_rc.read + def process(self, count=1): - """Verify our setup, and do some additional checking, before the - base implementation can permanently perform all operations""" - self._read = self._in_rc.read - # make sure we don't trigger the pool if we read from a pool channel which - # belongs to our own pool. Channels from different pools are fine though, - # there we want to trigger its computation - # PROBLEM: if the user keeps an end, but decides to put the same end into - # a task of this pool, then all items might deplete without new ones being - # produced, causing a deadlock. Just triggering the pool would be better, - # but cost's more, unnecessarily if there is just one consumer, which is - # the user. - # * could encode usage in the channel type, and fail if the refcount on - # the read-pool channel is too high - # * maybe keep track of the elements that are requested or in-production - # for each task, which would allow to precisely determine whether - # the pool as to be triggered, and bail out early. Problem would - # be the - # * Perhaps one shouldn't seek the perfect solution , but instead - # document whats working and what not, or under which conditions. - # The whole system is simple, but gets more complicated the - # smarter it wants to be. - if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref(): - self._read = self._in_rc._read - - # permanently install our base for processing - self.process = new.instancemethod(OutputChannelTask.__dict__['process'], self, type(self)) - - # and call it - return OutputChannelTask.process(self, count) + # for now, just blindly read our input, could trigger a pool, even + # ours, but why not ? It should be able to handle this + # TODO: remove this method + super(InputChannelTask, self).process(count) #{ Configuration -- cgit v1.2.1 From 772b95631916223e472989b43f3a31f61e237f31 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 19:25:33 +0200 Subject: workerthread: adjusted to use a blocking queue, it will receive termination events only with its queue, with boosts performance into brigt green levels --- lib/git/async/task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 5edd40bb..f9536a45 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -89,7 +89,7 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) - print "task read", len(items) + # print "task read", len(items) try: # increase the ref-count - we use this to determine whether anyone else # is currently handling our output channel. As this method runs asynchronously, -- cgit v1.2.1 From 15941ca090a2c3c987324fc911bbc6f89e941c47 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 10:34:12 +0200 Subject: queue: fixed critical bug in the notify method, as it was not at all thread-safe, causing locks to be released multiple times. Now it runs very fast, and very stable apparently. Now its about putting previous features back in, and studying their results, before more complex task graphs can be examined --- lib/git/async/task.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index f9536a45..f1448f96 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -89,7 +89,6 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) - # print "task read", len(items) try: # increase the ref-count - we use this to determine whether anyone else # is currently handling our output channel. As this method runs asynchronously, @@ -119,7 +118,7 @@ class OutputChannelTask(Node): # END handle single apply except Exception, e: self._exc = e - print "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging + print >> sys.stderr, "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging self.set_done() # unschedule all, we don't know how many have been produced actually # but only if we don't apply single please -- cgit v1.2.1 From f2c8d26d3b25b864ad48e6de018757266b59f708 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 11:28:37 +0200 Subject: thread: fixed initialization problem if an empty iterable was handed in queue: Queue now derives from deque directly, which safes one dict lookup as the queue does not need to be accessed through self anymore pool test improved to better verify threads are started correctly --- lib/git/async/task.py | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index f1448f96..dd2bd351 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,4 +1,5 @@ from graph import Node +from util import ReadOnly import threading import sys @@ -117,8 +118,9 @@ class OutputChannelTask(Node): wc.write(rval) # END handle single apply except Exception, e: - self._exc = e print >> sys.stderr, "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging + + # be sure our task is not scheduled again self.set_done() # unschedule all, we don't know how many have been produced actually # but only if we don't apply single please @@ -127,6 +129,25 @@ class OutputChannelTask(Node): self._scheduled_items -= len(items) self._slock.release() # END unschedule all + + # PROBLEM: We have failed to create at least one item, hence its not + # garantueed that enough items will be produced for a possibly blocking + # client on the other end. This is why we have no other choice but + # to close the channel, preventing the possibility of blocking. + # This implies that dependent tasks will go down with us, but that is + # just the right thing to do of course - one loose link in the chain ... + # Other chunks of our kind currently being processed will then + # fail to write to the channel and fail as well + # self.close() + + # If some other chunk of our Task had an error, the channel will be closed + # This is not an issue, just be sure we don't overwrite the original + # exception with the ReadOnly error that would be emitted in that case. + # We imply that ReadOnly is exclusive to us, as it won't be an error + # if the user emits it + if not isinstance(e, ReadOnly): + self._exc = e + # END set error flag # END exception handling del(wc) -- cgit v1.2.1 From 4e6bece08aea01859a232e99a1e1ad8cc1eb7d36 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 14:01:51 +0200 Subject: HSCondition: Fixed terrible bug which it inherited from its default python Condition implementation, related to the notify method not being treadsafe. Although I was aware of it, I missed the first check which tests for the size - the result could be incorrect if the whole method wasn't locked. Testing runs stable now, allowing to move on \! --- lib/git/async/task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index dd2bd351..d18cedca 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -138,7 +138,7 @@ class OutputChannelTask(Node): # just the right thing to do of course - one loose link in the chain ... # Other chunks of our kind currently being processed will then # fail to write to the channel and fail as well - # self.close() + self.close() # If some other chunk of our Task had an error, the channel will be closed # This is not an issue, just be sure we don't overwrite the original -- cgit v1.2.1 From 0974f8737a3c56a7c076f9d0b757c6cb106324fb Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 14:47:41 +0200 Subject: Channel: Read method revised - now it really really doesn't block anymore, and it runs faster as well, about 2/3 of the performance we have when being in serial mode --- lib/git/async/task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index d18cedca..539b240f 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -66,7 +66,7 @@ class OutputChannelTask(Node): def is_closed(self): """:return: True if the task's write channel is closed""" - return self._out_wc.closed + return self._out_wc.closed() def error(self): """:return: Exception caught during last processing or None""" -- cgit v1.2.1 From 07996a1a1e53ffdd2680d4bfbc2f4059687859a5 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 15:40:51 +0200 Subject: task: removed scheduled task support, which at some point was introduced to improve performance, but which now hinders performance, besides being unnecessary ;) --- lib/git/async/task.py | 33 --------------------------------- 1 file changed, 33 deletions(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 539b240f..be02cfe8 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -23,8 +23,6 @@ class OutputChannelTask(Node): '_out_wc', # output write channel '_exc', # exception caught '_done', # True if we are done - '_scheduled_items', # amount of scheduled items that will be processed in total - '_slock', # lock for scheduled items 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -37,8 +35,6 @@ class OutputChannelTask(Node): self._out_wc = None # to be set later self._exc = None self._done = False - self._scheduled_items = 0 - self._slock = threading.Lock() self.fun = fun self.min_count = None self.max_chunksize = 0 # note set @@ -72,21 +68,6 @@ class OutputChannelTask(Node): """:return: Exception caught during last processing or None""" return self._exc - def add_scheduled_items(self, count): - """Add the given amount of scheduled items to this task""" - self._slock.acquire() - self._scheduled_items += count - self._slock.release() - - def scheduled_item_count(self): - """:return: amount of scheduled items for this task""" - self._slock.acquire() - try: - return self._scheduled_items - finally: - self._slock.release() - # END threadsafe return - def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) @@ -101,19 +82,12 @@ class OutputChannelTask(Node): if self.apply_single: for item in items: rval = self.fun(item) - # decrement afterwards, the its unscheduled once its produced - self._slock.acquire() - self._scheduled_items -= 1 - self._slock.release() wc.write(rval) # END for each item else: # shouldn't apply single be the default anyway ? # The task designers should chunk them up in advance rvals = self.fun(items) - self._slock.acquire() - self._scheduled_items -= len(items) - self._slock.release() for rval in rvals: wc.write(rval) # END handle single apply @@ -122,13 +96,6 @@ class OutputChannelTask(Node): # be sure our task is not scheduled again self.set_done() - # unschedule all, we don't know how many have been produced actually - # but only if we don't apply single please - if not self.apply_single: - self._slock.acquire() - self._scheduled_items -= len(items) - self._slock.release() - # END unschedule all # PROBLEM: We have failed to create at least one item, hence its not # garantueed that enough items will be produced for a possibly blocking -- cgit v1.2.1 From 365fb14ced88a5571d3287ff1698582ceacd80d6 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 16:59:17 +0200 Subject: task: redesigned write channel access to allow the task creator to set own write channels, possibly some with callbacks installed etc.. Pool.add_task will respect the users choice now, but provide defaults which are optimized for performance --- lib/git/async/task.py | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index be02cfe8..f98336b2 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,4 +1,5 @@ from graph import Node +from channel import WChannel from util import ReadOnly import threading @@ -11,8 +12,8 @@ class OutputChannelTask(Node): """Abstracts a named task as part of a set of interdependent tasks, which contains additional information on how the task should be queued and processed. - Results of the item processing are sent to an output channel, which is to be - set by the creator + Results of the item processing are sent to a write channel, which is to be + set by the creator using the ``set_wchannel`` method. * **min_count** assures that not less than min_count items will be processed per call. * **max_chunksize** assures that multi-threading is happening in smaller chunks. If @@ -29,10 +30,11 @@ class OutputChannelTask(Node): 'apply_single' # apply single items even if multiple where read ) - def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0): + def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0, + wchannel=None): Node.__init__(self, id) self._read = None # to be set by subclasss - self._out_wc = None # to be set later + self._out_wc = wchannel # to be set later self._exc = None self._done = False self.fun = fun @@ -48,13 +50,21 @@ class OutputChannelTask(Node): """Set ourselves to being done, has we have completed the processing""" self._done = True - def set_wc(self, wc): - """Set the write channel to the given one - :note: resets it done state in order to allow proper queue handling""" - self._done = False # TODO : fix this, this is a side-effect - self._scheduled_items = 0 + def set_wchannel(self, wc): + """Set the write channel to the given one""" self._out_wc = wc + def wchannel(self): + """:return: a proxy to our write channel or None if non is set + :note: you must not hold a reference to our write channel when the + task is being processed. This would cause the write channel never + to be closed as the task will think there is still another instance + being processed which can close the channel once it is done. + In the worst case, this will block your reads.""" + if self._out_wc is None: + return None + return self._out_wc + def close(self): """A closed task will close its channel to assure the readers will wake up :note: its safe to call this method multiple times""" @@ -128,8 +138,10 @@ class OutputChannelTask(Node): # END handle done state # If we appear to be the only one left with our output channel, and are - # closed ( this could have been set in another thread as well ), make + # done ( this could have been set in another thread as well ), make # sure to close the output channel. + # Waiting with this to be the last one helps to keep the + # write-channel writable longer # The count is: 1 = wc itself, 2 = first reader channel, + x for every # thread having its copy on the stack # + 1 for the instance we provide to refcount @@ -196,10 +208,5 @@ class InputChannelTask(OutputChannelTask): OutputChannelTask.__init__(self, *args, **kwargs) self._read = in_rc.read - def process(self, count=1): - # for now, just blindly read our input, could trigger a pool, even - # ours, but why not ? It should be able to handle this - # TODO: remove this method - super(InputChannelTask, self).process(count) #{ Configuration -- cgit v1.2.1 From 257a8a9441fca9a9bc384f673ba86ef5c3f1715d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 21:19:54 +0200 Subject: test: prepared task dependency test, which already helped to find bug in the reference counting mechanism, causing references to the pool to be kepts via cycles --- lib/git/async/task.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index f98336b2..03b40492 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -208,5 +208,8 @@ class InputChannelTask(OutputChannelTask): OutputChannelTask.__init__(self, *args, **kwargs) self._read = in_rc.read - #{ Configuration - + def rchannel(self): + """:return: input channel from which we read""" + # the instance is bound in its instance method - lets use this to keep + # the refcount at one ( per consumer ) + return self._read.im_self -- cgit v1.2.1 From 3323464f85b986cba23176271da92a478b33ab9c Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 00:24:49 +0200 Subject: messy first version of a properly working depth-first graph method, which allows the pool to work as expected. Many more tests need to be added, and there still is a problem with shutdown as sometimes it won't kill all threads, mainly because the process came up with worker threads started, which cannot be --- lib/git/async/task.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 03b40492..57dd285d 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -80,7 +80,9 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" + print "%r: reading %i" % (self.id, count) items = self._read(count) + print "%r: done reading" % self.id try: # increase the ref-count - we use this to determine whether anyone else # is currently handling our output channel. As this method runs asynchronously, @@ -102,7 +104,7 @@ class OutputChannelTask(Node): wc.write(rval) # END handle single apply except Exception, e: - print >> sys.stderr, "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging + print >> sys.stderr, "task %s error:" % self.id, type(e), str(e) # TODO: REMOVE DEBUG, or make it use logging # be sure our task is not scheduled again self.set_done() @@ -146,6 +148,7 @@ class OutputChannelTask(Node): # thread having its copy on the stack # + 1 for the instance we provide to refcount if self.is_done() and getrefcount(self._out_wc) < 4: + print "Closing channel of %r" % self.id self.close() # END handle channel closure #{ Configuration -- cgit v1.2.1 From cfb278d74ad01f3f1edf5e0ad113974a9555038d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 10:14:32 +0200 Subject: InputChannelTask now has interface for properly handling the reading from the same and different pools --- lib/git/async/task.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 57dd285d..d5b45609 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,8 +1,12 @@ from graph import Node -from channel import WChannel from util import ReadOnly +from channel import ( + WChannel, + CallbackRChannel + ) import threading +import weakref import sys import new @@ -147,6 +151,7 @@ class OutputChannelTask(Node): # The count is: 1 = wc itself, 2 = first reader channel, + x for every # thread having its copy on the stack # + 1 for the instance we provide to refcount + # Soft close, so others can continue writing their results if self.is_done() and getrefcount(self._out_wc) < 4: print "Closing channel of %r" % self.id self.close() @@ -206,13 +211,32 @@ class InputChannelTask(OutputChannelTask): """Uses an input channel as source for reading items For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" + __slots__ = "_pool_ref" def __init__(self, in_rc, *args, **kwargs): OutputChannelTask.__init__(self, *args, **kwargs) self._read = in_rc.read + self._pool_ref = None + + #{ Internal Interface def rchannel(self): """:return: input channel from which we read""" # the instance is bound in its instance method - lets use this to keep # the refcount at one ( per consumer ) return self._read.im_self + + def set_read(self, read): + """Adjust the read method to the given one""" + self._read = read + + def set_pool(self, pool): + self._pool_ref = weakref.ref(pool) + + def pool(self): + """:return: pool we are attached to, or None""" + if self._pool_ref is None: + return None + return self._pool_ref() + + #} END intenral interface -- cgit v1.2.1 From 01eac1a959c1fa5894a86bf11e6b92f96762bdd8 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 12:06:16 +0200 Subject: Added more dependency task tests, especially the single-reads are not yet fully deterministic as tasks still run into the problem that they try to write into a closed channel, it was closed by one of their task-mates who didn't know someone else was still computing --- lib/git/async/task.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index d5b45609..0b1d0666 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -84,9 +84,9 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" - print "%r: reading %i" % (self.id, count) + # print "%r: reading %i" % (self.id, count) items = self._read(count) - print "%r: done reading" % self.id + # print "%r: done reading %i items" % (self.id, len(items)) try: # increase the ref-count - we use this to determine whether anyone else # is currently handling our output channel. As this method runs asynchronously, @@ -109,7 +109,6 @@ class OutputChannelTask(Node): # END handle single apply except Exception, e: print >> sys.stderr, "task %s error:" % self.id, type(e), str(e) # TODO: REMOVE DEBUG, or make it use logging - # be sure our task is not scheduled again self.set_done() @@ -153,7 +152,7 @@ class OutputChannelTask(Node): # + 1 for the instance we provide to refcount # Soft close, so others can continue writing their results if self.is_done() and getrefcount(self._out_wc) < 4: - print "Closing channel of %r" % self.id + # print "Closing channel of %r" % self.id self.close() # END handle channel closure #{ Configuration -- cgit v1.2.1 From 55e757928e493ce93056822d510482e4ffcaac2d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 14:39:57 +0200 Subject: channel: Changed design to be more logical - a channel now has any amount of readers and writers, a ready is not connected to its writer anymore. This changes the refcounting of course, which is why the auto-cleanup for the pool is currently broken. The benefit of this are faster writes to the channel, reading didn't improve, refcounts should be clearer now --- lib/git/async/task.py | 54 ++++++++++++++++++++++----------------------------- 1 file changed, 23 insertions(+), 31 deletions(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 0b1d0666..5a6c1e95 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,23 +1,17 @@ from graph import Node from util import ReadOnly -from channel import ( - WChannel, - CallbackRChannel - ) import threading import weakref import sys import new -getrefcount = sys.getrefcount - class OutputChannelTask(Node): """Abstracts a named task as part of a set of interdependent tasks, which contains additional information on how the task should be queued and processed. Results of the item processing are sent to a write channel, which is to be - set by the creator using the ``set_wchannel`` method. + set by the creator using the ``set_writer`` method. * **min_count** assures that not less than min_count items will be processed per call. * **max_chunksize** assures that multi-threading is happening in smaller chunks. If @@ -25,9 +19,11 @@ class OutputChannelTask(Node): one worker, as well as dependent tasks. If you want finer granularity , you can specify this here, causing chunks to be no larger than max_chunksize""" __slots__ = ( '_read', # method to yield items to process - '_out_wc', # output write channel + '_out_writer', # output write channel '_exc', # exception caught '_done', # True if we are done + '_num_writers', # number of concurrent writers + '_wlock', # lock for the above 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -35,12 +31,14 @@ class OutputChannelTask(Node): ) def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0, - wchannel=None): + writer=None): Node.__init__(self, id) self._read = None # to be set by subclasss - self._out_wc = wchannel # to be set later + self._out_writer = writer self._exc = None self._done = False + self._num_writers = 0 + self._wlock = threading.Lock() self.fun = fun self.min_count = None self.max_chunksize = 0 # note set @@ -54,29 +52,29 @@ class OutputChannelTask(Node): """Set ourselves to being done, has we have completed the processing""" self._done = True - def set_wchannel(self, wc): + def set_writer(self, writer): """Set the write channel to the given one""" - self._out_wc = wc + self._out_writer = writer - def wchannel(self): + def writer(self): """:return: a proxy to our write channel or None if non is set :note: you must not hold a reference to our write channel when the task is being processed. This would cause the write channel never to be closed as the task will think there is still another instance being processed which can close the channel once it is done. In the worst case, this will block your reads.""" - if self._out_wc is None: + if self._out_writer is None: return None - return self._out_wc + return self._out_writer def close(self): """A closed task will close its channel to assure the readers will wake up :note: its safe to call this method multiple times""" - self._out_wc.close() + self._out_writer.close() def is_closed(self): """:return: True if the task's write channel is closed""" - return self._out_wc.closed() + return self._out_writer.closed() def error(self): """:return: Exception caught during last processing or None""" @@ -88,24 +86,18 @@ class OutputChannelTask(Node): items = self._read(count) # print "%r: done reading %i items" % (self.id, len(items)) try: - # increase the ref-count - we use this to determine whether anyone else - # is currently handling our output channel. As this method runs asynchronously, - # we have to make sure that the channel is closed by the last finishing task, - # which is not necessarily the one which determines that he is done - # as he couldn't read anymore items. - # The refcount will be dropped in the moment we get out of here. - wc = self._out_wc + write = self._out_writer.write if self.apply_single: for item in items: rval = self.fun(item) - wc.write(rval) + write(rval) # END for each item else: # shouldn't apply single be the default anyway ? # The task designers should chunk them up in advance rvals = self.fun(items) for rval in rvals: - wc.write(rval) + write(rval) # END handle single apply except Exception, e: print >> sys.stderr, "task %s error:" % self.id, type(e), str(e) # TODO: REMOVE DEBUG, or make it use logging @@ -131,7 +123,7 @@ class OutputChannelTask(Node): self._exc = e # END set error flag # END exception handling - del(wc) + # if we didn't get all demanded items, which is also the case if count is 0 # we have depleted the input channel and are done @@ -151,7 +143,7 @@ class OutputChannelTask(Node): # thread having its copy on the stack # + 1 for the instance we provide to refcount # Soft close, so others can continue writing their results - if self.is_done() and getrefcount(self._out_wc) < 4: + if self.is_done(): # print "Closing channel of %r" % self.id self.close() # END handle channel closure @@ -212,14 +204,14 @@ class InputChannelTask(OutputChannelTask): to be the input channel to read from though.""" __slots__ = "_pool_ref" - def __init__(self, in_rc, *args, **kwargs): + def __init__(self, in_reader, *args, **kwargs): OutputChannelTask.__init__(self, *args, **kwargs) - self._read = in_rc.read + self._read = in_reader.read self._pool_ref = None #{ Internal Interface - def rchannel(self): + def reader(self): """:return: input channel from which we read""" # the instance is bound in its instance method - lets use this to keep # the refcount at one ( per consumer ) -- cgit v1.2.1 From 7c36f3648e39ace752c67c71867693ce1eee52a3 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 15:38:40 +0200 Subject: Now tracking the amount of concurrent writers to assure the channel is closed only when there is no one else writing to it. This assures that all tasks can continue working, and put their results accordingly. Shutdown is still not working correctly, but that should be solvable as well. Its still not perfect though ... --- lib/git/async/task.py | 64 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 45 insertions(+), 19 deletions(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 5a6c1e95..ae2532d9 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -82,23 +82,36 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" - # print "%r: reading %i" % (self.id, count) + # first thing: increment the writer count + self._wlock.acquire() + self._num_writers += 1 + self._wlock.release() + + #print "%r: reading %i" % (self.id, count) + #if hasattr(self, 'reader'): + # print "from", self.reader().channel items = self._read(count) - # print "%r: done reading %i items" % (self.id, len(items)) + #print "%r: done reading %i items" % (self.id, len(items)) try: - write = self._out_writer.write - if self.apply_single: - for item in items: - rval = self.fun(item) - write(rval) - # END for each item - else: - # shouldn't apply single be the default anyway ? - # The task designers should chunk them up in advance - rvals = self.fun(items) - for rval in rvals: - write(rval) - # END handle single apply + try: + write = self._out_writer.write + if self.apply_single: + for item in items: + rval = self.fun(item) + write(rval) + # END for each item + else: + # shouldn't apply single be the default anyway ? + # The task designers should chunk them up in advance + rvals = self.fun(items) + for rval in rvals: + write(rval) + # END handle single apply + finally: + self._wlock.acquire() + self._num_writers -= 1 + self._wlock.release() + # END handle writer count except Exception, e: print >> sys.stderr, "task %s error:" % self.id, type(e), str(e) # TODO: REMOVE DEBUG, or make it use logging # be sure our task is not scheduled again @@ -144,8 +157,13 @@ class OutputChannelTask(Node): # + 1 for the instance we provide to refcount # Soft close, so others can continue writing their results if self.is_done(): - # print "Closing channel of %r" % self.id - self.close() + self._wlock.acquire() + if self._num_writers == 0: + #if not self.is_closed(): # DEBUG + # print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel + self.close() + # END handle writers + self._wlock.release() # END handle channel closure #{ Configuration @@ -158,7 +176,7 @@ class ThreadTaskBase(object): class InputIteratorTaskBase(OutputChannelTask): """Implements a task which processes items from an iterable in a multi-processing safe manner""" - __slots__ = ('_iterator', '_lock') + __slots__ = ('_iterator', '_lock', '_empty') # the type of the lock to use when reading from the iterator lock_type = None @@ -169,12 +187,19 @@ class InputIteratorTaskBase(OutputChannelTask): self._iterator = iterator self._lock = self.lock_type() self._read = self.__read + self._empty = False def __read(self, count=0): """Read count items from the iterator, and return them""" + # not threadsafe, but worst thing that could happen is that + # we try to get items one more time + if self._empty: + return list() + # END early abort self._lock.acquire() try: if count == 0: + self._empty = True return list(self._iterator) else: out = list() @@ -183,6 +208,7 @@ class InputIteratorTaskBase(OutputChannelTask): try: out.append(it.next()) except StopIteration: + self._empty = True break # END handle empty iterator # END for each item to take @@ -198,7 +224,7 @@ class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase): lock_type = threading.Lock -class InputChannelTask(OutputChannelTask): +class InputChannelTask(OutputChannelTask, ThreadTaskBase): """Uses an input channel as source for reading items For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" -- cgit v1.2.1 From c34343d0b714d2c4657972020afea034a167a682 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 15:52:32 +0200 Subject: tasks can now terminate faster when no items were read, without neglecting their duty to close the channel if required. Code is a little less maintainable now, but faster, it appears --- lib/git/async/task.py | 43 ++++++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 19 deletions(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index ae2532d9..a8ba5ac6 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -92,21 +92,24 @@ class OutputChannelTask(Node): # print "from", self.reader().channel items = self._read(count) #print "%r: done reading %i items" % (self.id, len(items)) + try: try: - write = self._out_writer.write - if self.apply_single: - for item in items: - rval = self.fun(item) - write(rval) - # END for each item - else: - # shouldn't apply single be the default anyway ? - # The task designers should chunk them up in advance - rvals = self.fun(items) - for rval in rvals: - write(rval) - # END handle single apply + if items: + write = self._out_writer.write + if self.apply_single: + for item in items: + rval = self.fun(item) + write(rval) + # END for each item + else: + # shouldn't apply single be the default anyway ? + # The task designers should chunk them up in advance + rvals = self.fun(items) + for rval in rvals: + write(rval) + # END handle single apply + # END if there is anything to do finally: self._wlock.acquire() self._num_writers -= 1 @@ -158,12 +161,14 @@ class OutputChannelTask(Node): # Soft close, so others can continue writing their results if self.is_done(): self._wlock.acquire() - if self._num_writers == 0: - #if not self.is_closed(): # DEBUG - # print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel - self.close() - # END handle writers - self._wlock.release() + try: + if self._num_writers == 0: + # print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel + self.close() + # END handle writers + finally: + self._wlock.release() + # END assure lock release # END handle channel closure #{ Configuration -- cgit v1.2.1 From fbe062bf6dacd3ad63dd827d898337fa542931ac Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 23:55:50 +0200 Subject: Added dependency-task tests, and fixed plenty of ref-count related bugs, as well as concurrency issues. Now it works okay, but the thread-shutdown is still an issue, as it causes incorrect behaviour making the tests fail. Its good, as it hints at additional issues that need to be solved. There is just a little more left on the feature side, but its nearly there --- lib/git/async/task.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index a8ba5ac6..49e7e7cf 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -82,7 +82,8 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" - # first thing: increment the writer count + # first thing: increment the writer count - other tasks must be able + # to respond properly ( even if it turns out we don't need it later ) self._wlock.acquire() self._num_writers += 1 self._wlock.release() @@ -191,7 +192,11 @@ class InputIteratorTaskBase(OutputChannelTask): raise ValueError("Iterator %r needs a next() function" % iterator) self._iterator = iterator self._lock = self.lock_type() - self._read = self.__read + + # this is necessary to prevent a cyclic ref, preventing us from + # getting deleted ( and collected ) + weakself = weakref.ref(self) + self._read = lambda count: weakself().__read(count) self._empty = False def __read(self, count=0): @@ -201,6 +206,7 @@ class InputIteratorTaskBase(OutputChannelTask): if self._empty: return list() # END early abort + self._lock.acquire() try: if count == 0: -- cgit v1.2.1 From e14e3f143e7260de9581aee27e5a9b2645db72de Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 16:42:09 +0200 Subject: Removed commented-out debug code and additional debug printings. Verified it works on py2.4, 2.5 and 2.6 --- lib/git/async/task.py | 6 ------ 1 file changed, 6 deletions(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 49e7e7cf..10b22649 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -88,11 +88,7 @@ class OutputChannelTask(Node): self._num_writers += 1 self._wlock.release() - #print "%r: reading %i" % (self.id, count) - #if hasattr(self, 'reader'): - # print "from", self.reader().channel items = self._read(count) - #print "%r: done reading %i items" % (self.id, len(items)) try: try: @@ -117,7 +113,6 @@ class OutputChannelTask(Node): self._wlock.release() # END handle writer count except Exception, e: - print >> sys.stderr, "task %s error:" % self.id, type(e), str(e) # TODO: REMOVE DEBUG, or make it use logging # be sure our task is not scheduled again self.set_done() @@ -164,7 +159,6 @@ class OutputChannelTask(Node): self._wlock.acquire() try: if self._num_writers == 0: - # print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel self.close() # END handle writers finally: -- cgit v1.2.1 From cac6e06cc9ef2903a15e594186445f3baa989a1a Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 16:58:44 +0200 Subject: test_task: fixed import error, made all modules from x import * safe --- lib/git/async/task.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 10b22649..d7f331b7 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -6,8 +6,11 @@ import weakref import sys import new +__all__ = ('OutputChannelTask', 'ThreadTaskBase', 'InputIteratorTaskBase', + 'InputIteratorThreadTask', 'InputChannelTask') + class OutputChannelTask(Node): - """Abstracts a named task as part of a set of interdependent tasks, which contains + """Abstracts a named task, which contains additional information on how the task should be queued and processed. Results of the item processing are sent to a write channel, which is to be -- cgit v1.2.1 From a28942bdf01f4ddb9d0b5a0489bd6f4e101dd775 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 20:13:21 +0200 Subject: Added performance test, improved iterator task which will now be usable by default. It shows that there must be the notion of a producer, which can work if there are no items read --- lib/git/async/task.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index d7f331b7..0eb4527c 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -196,6 +196,9 @@ class InputIteratorTaskBase(OutputChannelTask): self._read = lambda count: weakself().__read(count) self._empty = False + # defaults to returning our items unchanged + self.fun = lambda item: item + def __read(self, count=0): """Read count items from the iterator, and return them""" # not threadsafe, but worst thing that could happen is that -- cgit v1.2.1 From be8955a0fbb77d673587974b763f17c214904b57 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 12 Jun 2010 11:19:18 +0200 Subject: Cleaned up channel design, Reader and Writer bases don't require a channel anymore, but are abstract. Added IteratorReader, implementing the reader interface from an iterator. The implementation moved from the TaskIterator to the channel --- lib/git/async/task.py | 51 ++++++--------------------------------------------- 1 file changed, 6 insertions(+), 45 deletions(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 0eb4527c..b7b5e699 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,5 +1,7 @@ from graph import Node from util import ReadOnly +from channel import IteratorReader + import threading import weakref @@ -179,56 +181,15 @@ class ThreadTaskBase(object): class InputIteratorTaskBase(OutputChannelTask): """Implements a task which processes items from an iterable in a multi-processing safe manner""" - __slots__ = ('_iterator', '_lock', '_empty') - # the type of the lock to use when reading from the iterator - lock_type = None + __slots__ = tuple() + def __init__(self, iterator, *args, **kwargs): OutputChannelTask.__init__(self, *args, **kwargs) - if not hasattr(iterator, 'next'): - raise ValueError("Iterator %r needs a next() function" % iterator) - self._iterator = iterator - self._lock = self.lock_type() - - # this is necessary to prevent a cyclic ref, preventing us from - # getting deleted ( and collected ) - weakself = weakref.ref(self) - self._read = lambda count: weakself().__read(count) - self._empty = False - + self._read = IteratorReader(iterator).read # defaults to returning our items unchanged self.fun = lambda item: item - - def __read(self, count=0): - """Read count items from the iterator, and return them""" - # not threadsafe, but worst thing that could happen is that - # we try to get items one more time - if self._empty: - return list() - # END early abort - - self._lock.acquire() - try: - if count == 0: - self._empty = True - return list(self._iterator) - else: - out = list() - it = self._iterator - for i in xrange(count): - try: - out.append(it.next()) - except StopIteration: - self._empty = True - break - # END handle empty iterator - # END for each item to take - return out - # END handle count - finally: - self._lock.release() - # END handle locking - + class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase): """An input iterator for threaded pools""" -- cgit v1.2.1 From 7a0b79ee574999ecbc76696506352e4a5a0d7159 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 12 Jun 2010 12:38:02 +0200 Subject: task: improved naming of task types, improved pool test to be less dependent on starting with just the main thread --- lib/git/async/task.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index b7b5e699..ac948dc0 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -8,21 +8,27 @@ import weakref import sys import new -__all__ = ('OutputChannelTask', 'ThreadTaskBase', 'InputIteratorTaskBase', - 'InputIteratorThreadTask', 'InputChannelTask') +__all__ = ('Task', 'ThreadTaskBase', 'IteratorTaskBase', + 'IteratorThreadTask', 'ChannelThreadTask') -class OutputChannelTask(Node): +class Task(Node): """Abstracts a named task, which contains additional information on how the task should be queued and processed. - Results of the item processing are sent to a write channel, which is to be + Results of the item processing are sent to a writer, which is to be set by the creator using the ``set_writer`` method. + Items are read using the internal ``_read`` callable, subclasses are meant to + set this to a callable that supports the Reader interface's read function. + * **min_count** assures that not less than min_count items will be processed per call. * **max_chunksize** assures that multi-threading is happening in smaller chunks. If someone wants all items to be processed, using read(0), the whole task would go to one worker, as well as dependent tasks. If you want finer granularity , you can - specify this here, causing chunks to be no larger than max_chunksize""" + specify this here, causing chunks to be no larger than max_chunksize + * **apply_single** if True, default True, individual items will be given to the + worker function. If False, a list of possibly multiple items will be passed + instead.""" __slots__ = ( '_read', # method to yield items to process '_out_writer', # output write channel '_exc', # exception caught @@ -178,32 +184,32 @@ class ThreadTaskBase(object): pass -class InputIteratorTaskBase(OutputChannelTask): +class IteratorTaskBase(Task): """Implements a task which processes items from an iterable in a multi-processing safe manner""" __slots__ = tuple() def __init__(self, iterator, *args, **kwargs): - OutputChannelTask.__init__(self, *args, **kwargs) + Task.__init__(self, *args, **kwargs) self._read = IteratorReader(iterator).read # defaults to returning our items unchanged self.fun = lambda item: item -class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase): +class IteratorThreadTask(IteratorTaskBase, ThreadTaskBase): """An input iterator for threaded pools""" lock_type = threading.Lock -class InputChannelTask(OutputChannelTask, ThreadTaskBase): +class ChannelThreadTask(Task, ThreadTaskBase): """Uses an input channel as source for reading items For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" __slots__ = "_pool_ref" def __init__(self, in_reader, *args, **kwargs): - OutputChannelTask.__init__(self, *args, **kwargs) + Task.__init__(self, *args, **kwargs) self._read = in_reader.read self._pool_ref = None -- cgit v1.2.1