From 3323464f85b986cba23176271da92a478b33ab9c Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 00:24:49 +0200 Subject: messy first version of a properly working depth-first graph method, which allows the pool to work as expected. Many more tests need to be added, and there still is a problem with shutdown as sometimes it won't kill all threads, mainly because the process came up with worker threads started, which cannot be --- lib/git/async/graph.py | 23 ++++++++++++----------- lib/git/async/pool.py | 6 +++--- lib/git/async/task.py | 5 ++++- lib/git/async/util.py | 8 +++++++- 4 files changed, 26 insertions(+), 16 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py index 6386cbaa..e3999cdc 100644 --- a/lib/git/async/graph.py +++ b/lib/git/async/graph.py @@ -87,25 +87,26 @@ class Graph(object): return self - def visit_input_inclusive_depth_first(self, node, visitor=lambda n: True ): - """Visit all input nodes of the given node, depth first, calling visitor - for each node on our way. If the function returns False, the traversal - will not go any deeper, but continue at the next branch - It will return the actual input node in the end !""" - nodes = node.in_nodes[:] + def input_inclusive_dfirst_reversed(self, node): + """Return all input nodes of the given node, depth first, + It will return the actual input node last, as it is required + like that by the pool""" + stack = [node] seen = set() # depth first - while nodes: - n = nodes.pop() + out = list() + while stack: + n = stack.pop() if n in seen: continue seen.add(n) + out.append(n) # only proceed in that direction if visitor is fine with it - if visitor(n): - nodes.extend(n.in_nodes) + stack.extend(n.in_nodes) # END call visitor # END while walking - visitor(node) + out.reverse() + return out diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 2ec18f1a..5ebc3655 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -182,14 +182,13 @@ class Pool(object): dfirst_tasks = self._taskorder_cache[id(task)] except KeyError: # have to retrieve the list from the graph - dfirst_tasks = list() - self._tasks.visit_input_inclusive_depth_first(task, lambda n: dfirst_tasks.append(n)) + dfirst_tasks = self._tasks.input_inclusive_dfirst_reversed(task) self._taskorder_cache[id(task)] = dfirst_tasks # END handle cached order retrieval finally: self._taskgraph_lock.release() # END handle locking - + print dfirst_tasks # check the min count on all involved tasks, and be sure that we don't # have any task which produces less than the maximum min-count of all tasks # The actual_count is used when chunking tasks up for the queue, whereas @@ -309,6 +308,7 @@ class Pool(object): threadsafe to optimize item throughput. :note: currently NOT threadsafe !""" + print "set_size", size assert size > -1, "Size cannot be negative" # either start new threads, or kill existing ones. diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 03b40492..57dd285d 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -80,7 +80,9 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" + print "%r: reading %i" % (self.id, count) items = self._read(count) + print "%r: done reading" % self.id try: # increase the ref-count - we use this to determine whether anyone else # is currently handling our output channel. As this method runs asynchronously, @@ -102,7 +104,7 @@ class OutputChannelTask(Node): wc.write(rval) # END handle single apply except Exception, e: - print >> sys.stderr, "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging + print >> sys.stderr, "task %s error:" % self.id, type(e), str(e) # TODO: REMOVE DEBUG, or make it use logging # be sure our task is not scheduled again self.set_done() @@ -146,6 +148,7 @@ class OutputChannelTask(Node): # thread having its copy on the stack # + 1 for the instance we provide to refcount if self.is_done() and getrefcount(self._out_wc) < 4: + print "Closing channel of %r" % self.id self.close() # END handle channel closure #{ Configuration diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 00d0dbab..b7750b0b 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -206,7 +206,6 @@ class AsyncQueue(deque): return old finally: self.mutex.release() - # if we won't receive anymore items, inform the getters if not state: self.not_empty.notify_all() @@ -222,6 +221,13 @@ class AsyncQueue(deque): def put(self, item, block=True, timeout=None): self.mutex.acquire() + # NOTE: we explicitly do NOT check for our writable state + # Its just used as a notification signal, and we need to be able + # to continue writing to prevent threads ( easily ) from failing + # to write their computed results, which we want in fact + # NO: we want them to fail and stop processing, as the one who caused + # the channel to close had a reason and wants the threads to + # stop on the task as soon as possible if not self._writable: self.mutex.release() raise ReadOnly -- cgit v1.2.1 From cfb278d74ad01f3f1edf5e0ad113974a9555038d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 10:14:32 +0200 Subject: InputChannelTask now has interface for properly handling the reading from the same and different pools --- lib/git/async/pool.py | 43 +++++++++++++++++++++++++++++++++++++++---- lib/git/async/task.py | 26 +++++++++++++++++++++++++- 2 files changed, 64 insertions(+), 5 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 5ebc3655..1b3c2748 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -10,7 +10,6 @@ from util import ( DummyLock ) -from task import InputChannelTask from Queue import ( Queue, Empty @@ -66,6 +65,24 @@ class RPoolChannel(CallbackRChannel): if sys.getrefcount(self) < 6: pool.remove_task(task, _from_destructor_=True) # END handle refcount based removal of task + + #{ Internal + def _read(self, count=0, block=True, timeout=None): + """Direct read, bypassing the pool handling""" + return CallbackRChannel.read(self, count, block, timeout) + #} END internal + + #{ Interface + + def pool_ref(self): + """:return: reference to the pool we belong to""" + return self._pool_ref + + def task_ref(self): + """:return: reference to the task producing our items""" + return self._task_ref + + #} END interface def read(self, count=0, block=True, timeout=None): """Read an item that was processed by one of our threads @@ -188,7 +205,7 @@ class Pool(object): finally: self._taskgraph_lock.release() # END handle locking - print dfirst_tasks + # check the min count on all involved tasks, and be sure that we don't # have any task which produces less than the maximum min-count of all tasks # The actual_count is used when chunking tasks up for the queue, whereas @@ -406,6 +423,18 @@ class Pool(object): # create a write channel for it wctype = WChannel + # adjust the task with our pool ref, if it has the slot and is empty + # For now, we don't allow tasks to be used in multiple pools, except + # for by their channels + if hasattr(task, 'pool'): + their_pool = task.pool() + if their_pool is None: + task.set_pool(self) + elif their_pool is not self: + raise ValueError("Task %r is already registered to another pool" % task.id) + # END handle pool exclusivity + # END handle pool aware tasks + self._taskgraph_lock.acquire() try: self._taskorder_cache.clear() @@ -431,12 +460,18 @@ class Pool(object): # END sync task addition # If the input channel is one of our read channels, we add the relation - if isinstance(task, InputChannelTask): + if hasattr(task, 'rchannel'): ic = task.rchannel() - if isinstance(ic, RPoolChannel) and ic._pool_ref() is self: + if hasattr(ic, 'pool_ref') and ic.pool_ref()() is self: self._taskgraph_lock.acquire() try: self._tasks.add_edge(ic._task_ref(), task) + + # additionally, bypass ourselves when reading from the + # task, if possible + if hasattr(ic, '_read'): + task.set_read(ic._read) + # END handle read bypass finally: self._taskgraph_lock.release() # END handle edge-adding diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 57dd285d..d5b45609 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,8 +1,12 @@ from graph import Node -from channel import WChannel from util import ReadOnly +from channel import ( + WChannel, + CallbackRChannel + ) import threading +import weakref import sys import new @@ -147,6 +151,7 @@ class OutputChannelTask(Node): # The count is: 1 = wc itself, 2 = first reader channel, + x for every # thread having its copy on the stack # + 1 for the instance we provide to refcount + # Soft close, so others can continue writing their results if self.is_done() and getrefcount(self._out_wc) < 4: print "Closing channel of %r" % self.id self.close() @@ -206,13 +211,32 @@ class InputChannelTask(OutputChannelTask): """Uses an input channel as source for reading items For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" + __slots__ = "_pool_ref" def __init__(self, in_rc, *args, **kwargs): OutputChannelTask.__init__(self, *args, **kwargs) self._read = in_rc.read + self._pool_ref = None + + #{ Internal Interface def rchannel(self): """:return: input channel from which we read""" # the instance is bound in its instance method - lets use this to keep # the refcount at one ( per consumer ) return self._read.im_self + + def set_read(self, read): + """Adjust the read method to the given one""" + self._read = read + + def set_pool(self, pool): + self._pool_ref = weakref.ref(pool) + + def pool(self): + """:return: pool we are attached to, or None""" + if self._pool_ref is None: + return None + return self._pool_ref() + + #} END intenral interface -- cgit v1.2.1 From 01eac1a959c1fa5894a86bf11e6b92f96762bdd8 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 12:06:16 +0200 Subject: Added more dependency task tests, especially the single-reads are not yet fully deterministic as tasks still run into the problem that they try to write into a closed channel, it was closed by one of their task-mates who didn't know someone else was still computing --- lib/git/async/task.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index d5b45609..0b1d0666 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -84,9 +84,9 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" - print "%r: reading %i" % (self.id, count) + # print "%r: reading %i" % (self.id, count) items = self._read(count) - print "%r: done reading" % self.id + # print "%r: done reading %i items" % (self.id, len(items)) try: # increase the ref-count - we use this to determine whether anyone else # is currently handling our output channel. As this method runs asynchronously, @@ -109,7 +109,6 @@ class OutputChannelTask(Node): # END handle single apply except Exception, e: print >> sys.stderr, "task %s error:" % self.id, type(e), str(e) # TODO: REMOVE DEBUG, or make it use logging - # be sure our task is not scheduled again self.set_done() @@ -153,7 +152,7 @@ class OutputChannelTask(Node): # + 1 for the instance we provide to refcount # Soft close, so others can continue writing their results if self.is_done() and getrefcount(self._out_wc) < 4: - print "Closing channel of %r" % self.id + # print "Closing channel of %r" % self.id self.close() # END handle channel closure #{ Configuration -- cgit v1.2.1 From 55e757928e493ce93056822d510482e4ffcaac2d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 14:39:57 +0200 Subject: channel: Changed design to be more logical - a channel now has any amount of readers and writers, a ready is not connected to its writer anymore. This changes the refcounting of course, which is why the auto-cleanup for the pool is currently broken. The benefit of this are faster writes to the channel, reading didn't improve, refcounts should be clearer now --- lib/git/async/channel.py | 102 ++++++++++++++++++++++------------------------- lib/git/async/pool.py | 76 +++++++++++++++++------------------ lib/git/async/task.py | 54 +++++++++++-------------- 3 files changed, 107 insertions(+), 125 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index abb31035..9b019707 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -21,61 +21,57 @@ class Channel(object): If the channel is closed, any read operation will result in an exception This base class is not instantiated directly, but instead serves as constructor - for RWChannel pairs. + for Rwriter pairs. Create a new channel """ - __slots__ = tuple() - - -class WChannel(Channel): - """The write end of a channel - it is thread-safe""" - __slots__ = ('_queue') + __slots__ = 'queue' # The queue to use to store the actual data QueueCls = AsyncQueue def __init__(self): - """initialize this instance, able to hold max_items at once - Write calls will block if the channel is full, until someone reads from it""" - self._queue = self.QueueCls() - - #{ Interface - def write(self, item, block=True, timeout=None): - """Send an item into the channel, it can be read from the read end of the - channel accordingly - :param item: Item to send - :param block: If True, the call will block until there is free space in the - channel - :param timeout: timeout in seconds for blocking calls. - :raise ReadOnly: when writing into closed channel""" - # let the queue handle the 'closed' attribute, we write much more often - # to an open channel than to a closed one, saving a few cycles - self._queue.put(item, block, timeout) - + """initialize this instance with a queue holding the channel contents""" + self.queue = self.QueueCls() + + +class SerialChannel(Channel): + """A slightly faster version of a Channel, which sacrificed thead-safety for performance""" + QueueCls = SyncQueue + + +class Writer(object): + """The write end of a channel, a file-like interface for a channel""" + __slots__ = ('write', 'channel') + + def __init__(self, channel): + """Initialize the writer to use the given channel""" + self.channel = channel + self.write = channel.queue.put + + #{ Interface def size(self): - """:return: approximate number of items that could be read from the read-ends - of this channel""" - return self._queue.qsize() + return self.channel.queue.qsize() def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" - self._queue.set_writable(False) + self.channel.queue.set_writable(False) def closed(self): """:return: True if the channel was closed""" - return not self._queue.writable() + return not self.channel.queue.writable() #} END interface -class CallbackWChannel(WChannel): +class CallbackWriter(Writer): """The write end of a channel which allows you to setup a callback to be called after an item was written to the channel""" __slots__ = ('_pre_cb') - def __init__(self): - WChannel.__init__(self) + def __init__(self, channel): + Writer.__init__(self, channel) self._pre_cb = None + self.write = self._write def set_pre_cb(self, fun = lambda item: item): """Install a callback to be called before the given item is written. @@ -88,25 +84,19 @@ class CallbackWChannel(WChannel): self._pre_cb = fun return prev - def write(self, item, block=True, timeout=None): + def _write(self, item, block=True, timeout=None): if self._pre_cb: item = self._pre_cb(item) - WChannel.write(self, item, block, timeout) + self.channel.queue.put(item, block, timeout) - -class SerialWChannel(WChannel): - """A slightly faster version of a WChannel, which sacrificed thead-safety for - performance""" - QueueCls = SyncQueue - -class RChannel(Channel): - """The read-end of a corresponding write channel""" - __slots__ = '_wc' +class Reader(object): + """Allows reading from a channel""" + __slots__ = 'channel' - def __init__(self, wchannel): + def __init__(self, channel): """Initialize this instance from its parent write channel""" - self._wc = wchannel + self.channel = channel #{ Interface @@ -135,7 +125,7 @@ class RChannel(Channel): # in non-blocking mode, its all not a problem out = list() - queue = self._wc._queue + queue = self.channel.queue if not block: # be as fast as possible in non-blocking mode, hence # its a bit 'unrolled' @@ -198,12 +188,12 @@ class RChannel(Channel): #} END interface -class CallbackRChannel(RChannel): +class CallbackReader(Reader): """A channel which sends a callback before items are read from the channel""" __slots__ = "_pre_cb" - def __init__(self, wc): - RChannel.__init__(self, wc) + def __init__(self, channel): + Reader.__init__(self, channel) self._pre_cb = None def set_pre_cb(self, fun = lambda count: None): @@ -220,18 +210,20 @@ class CallbackRChannel(RChannel): def read(self, count=0, block=True, timeout=None): if self._pre_cb: self._pre_cb(count) - return RChannel.read(self, count, block, timeout) + return Reader.read(self, count, block, timeout) #} END classes #{ Constructors -def mkchannel(wctype = WChannel, rctype = RChannel): - """Create a channel, which consists of one write end and one read end - :return: tuple(write_channel, read_channel) +def mkchannel(ctype = Channel, wtype = Writer, rtype = Reader): + """Create a channel, with a reader and a writer + :return: tuple(reader, writer) + :param ctype: Channel to instantiate :param wctype: The type of the write channel to instantiate :param rctype: The type of the read channel to instantiate""" - wc = wctype() - rc = rctype(wc) + c = ctype() + wc = wtype(c) + rc = rtype(c) return wc, rc #} END constructors diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 1b3c2748..68551ea3 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -18,27 +18,28 @@ from Queue import ( from graph import Graph from channel import ( mkchannel, - WChannel, - SerialWChannel, - CallbackRChannel + Writer, + Channel, + SerialChannel, + CallbackReader ) import sys import weakref from time import sleep +import new -class RPoolChannel(CallbackRChannel): - """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call - before and after an item is to be read. - +class PoolReader(CallbackReader): + """A reader designed to read from channels which take part in pools It acts like a handle to the underlying task in the pool.""" - __slots__ = ('_task_ref', '_pool_ref') + __slots__ = ('_task_ref', '_pool_ref', '_read') - def __init__(self, wchannel, task, pool): - CallbackRChannel.__init__(self, wchannel) + def __init__(self, channel, task, pool): + CallbackReader.__init__(self, channel) self._task_ref = weakref.ref(task) self._pool_ref = weakref.ref(pool) + self._read = new.instancemethod(CallbackReader.__dict__['read'], self, CallbackReader) def __del__(self): """Assures that our task will be deleted if we were the last reader""" @@ -63,15 +64,9 @@ class RPoolChannel(CallbackRChannel): # okay for now # TODO: Fix this - private/public method if sys.getrefcount(self) < 6: - pool.remove_task(task, _from_destructor_=True) + pool.remove_task(task) # END handle refcount based removal of task - #{ Internal - def _read(self, count=0, block=True, timeout=None): - """Direct read, bypassing the pool handling""" - return CallbackRChannel.read(self, count, block, timeout) - #} END internal - #{ Interface def pool_ref(self): @@ -118,7 +113,7 @@ class RPoolChannel(CallbackRChannel): ####### read data ######## ########################## # read actual items, tasks were setup to put their output into our channel ( as well ) - items = CallbackRChannel.read(self, count, block, timeout) + items = CallbackReader.read(self, count, block, timeout) ########################## @@ -262,21 +257,21 @@ class Pool(object): # should make things execute faster. Putting the if statements # into the loop would be less code, but ... slower # DEBUG - # print actual_count, numchunks, chunksize, remainder, task._out_wc.size() + # print actual_count, numchunks, chunksize, remainder, task._out_writer.size() if self._num_workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task - queue = self._queue + qput = self._queue if numchunks > 1: for i in xrange(numchunks): - queue.put((task.process, chunksize)) + qput((task.process, chunksize)) # END for each chunk to put else: - queue.put((task.process, chunksize)) + qput((task.process, chunksize)) # END try efficient looping if remainder: - queue.put((task.process, remainder)) + qput((task.process, remainder)) # END handle chunksize else: # no workers, so we have to do the work ourselves @@ -295,16 +290,16 @@ class Pool(object): # END for each task to process - def _remove_task_if_orphaned(self, task, from_destructor): + def _remove_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" - # 1 as its stored on the task, 1 for the getrefcount call + # 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader # If we are getting here from the destructor of an RPool channel, # its totally valid to virtually decrement the refcount by 1 as # we can expect it to drop once the destructor completes, which is when # we finish all recursive calls - max_ref_count = 3 + from_destructor - if sys.getrefcount(task.wchannel()) < max_ref_count: - self.remove_task(task, from_destructor) + max_ref_count = 3 + if sys.getrefcount(task.writer().channel) < max_ref_count: + self.remove_task(task) #} END internal #{ Interface @@ -375,7 +370,7 @@ class Pool(object): finally: self._taskgraph_lock.release() - def remove_task(self, task, _from_destructor_=False): + def remove_task(self, task): """Delete the task Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume @@ -410,7 +405,7 @@ class Pool(object): # END locked deletion for t in in_tasks: - self._remove_task_if_orphaned(t, _from_destructor_) + self._remove_task_if_orphaned(t) # END handle orphans recursively return self @@ -421,7 +416,7 @@ class Pool(object): the task will be considered orphaned and will be deleted on the next occasion.""" # create a write channel for it - wctype = WChannel + ctype = Channel # adjust the task with our pool ref, if it has the slot and is empty # For now, we don't allow tasks to be used in multiple pools, except @@ -442,26 +437,29 @@ class Pool(object): # Use a non-threadsafe queue # This brings about 15% more performance, but sacrifices thread-safety - # when reading from multiple threads. if self.size() == 0: - wctype = SerialWChannel + ctype = SerialChannel # END improve locks # setup the tasks channel - respect the task creators choice though # if it is set. - wc = task.wchannel() + wc = task.writer() + ch = None if wc is None: - wc = wctype() + ch = ctype() + wc = Writer(ch) + task.set_writer(wc) + else: + ch = wc.channel # END create write channel ifunset - rc = RPoolChannel(wc, task, self) - task.set_wchannel(wc) + rc = PoolReader(ch, task, self) finally: self._taskgraph_lock.release() # END sync task addition # If the input channel is one of our read channels, we add the relation - if hasattr(task, 'rchannel'): - ic = task.rchannel() + if hasattr(task, 'reader'): + ic = task.reader() if hasattr(ic, 'pool_ref') and ic.pool_ref()() is self: self._taskgraph_lock.acquire() try: diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 0b1d0666..5a6c1e95 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,23 +1,17 @@ from graph import Node from util import ReadOnly -from channel import ( - WChannel, - CallbackRChannel - ) import threading import weakref import sys import new -getrefcount = sys.getrefcount - class OutputChannelTask(Node): """Abstracts a named task as part of a set of interdependent tasks, which contains additional information on how the task should be queued and processed. Results of the item processing are sent to a write channel, which is to be - set by the creator using the ``set_wchannel`` method. + set by the creator using the ``set_writer`` method. * **min_count** assures that not less than min_count items will be processed per call. * **max_chunksize** assures that multi-threading is happening in smaller chunks. If @@ -25,9 +19,11 @@ class OutputChannelTask(Node): one worker, as well as dependent tasks. If you want finer granularity , you can specify this here, causing chunks to be no larger than max_chunksize""" __slots__ = ( '_read', # method to yield items to process - '_out_wc', # output write channel + '_out_writer', # output write channel '_exc', # exception caught '_done', # True if we are done + '_num_writers', # number of concurrent writers + '_wlock', # lock for the above 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -35,12 +31,14 @@ class OutputChannelTask(Node): ) def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0, - wchannel=None): + writer=None): Node.__init__(self, id) self._read = None # to be set by subclasss - self._out_wc = wchannel # to be set later + self._out_writer = writer self._exc = None self._done = False + self._num_writers = 0 + self._wlock = threading.Lock() self.fun = fun self.min_count = None self.max_chunksize = 0 # note set @@ -54,29 +52,29 @@ class OutputChannelTask(Node): """Set ourselves to being done, has we have completed the processing""" self._done = True - def set_wchannel(self, wc): + def set_writer(self, writer): """Set the write channel to the given one""" - self._out_wc = wc + self._out_writer = writer - def wchannel(self): + def writer(self): """:return: a proxy to our write channel or None if non is set :note: you must not hold a reference to our write channel when the task is being processed. This would cause the write channel never to be closed as the task will think there is still another instance being processed which can close the channel once it is done. In the worst case, this will block your reads.""" - if self._out_wc is None: + if self._out_writer is None: return None - return self._out_wc + return self._out_writer def close(self): """A closed task will close its channel to assure the readers will wake up :note: its safe to call this method multiple times""" - self._out_wc.close() + self._out_writer.close() def is_closed(self): """:return: True if the task's write channel is closed""" - return self._out_wc.closed() + return self._out_writer.closed() def error(self): """:return: Exception caught during last processing or None""" @@ -88,24 +86,18 @@ class OutputChannelTask(Node): items = self._read(count) # print "%r: done reading %i items" % (self.id, len(items)) try: - # increase the ref-count - we use this to determine whether anyone else - # is currently handling our output channel. As this method runs asynchronously, - # we have to make sure that the channel is closed by the last finishing task, - # which is not necessarily the one which determines that he is done - # as he couldn't read anymore items. - # The refcount will be dropped in the moment we get out of here. - wc = self._out_wc + write = self._out_writer.write if self.apply_single: for item in items: rval = self.fun(item) - wc.write(rval) + write(rval) # END for each item else: # shouldn't apply single be the default anyway ? # The task designers should chunk them up in advance rvals = self.fun(items) for rval in rvals: - wc.write(rval) + write(rval) # END handle single apply except Exception, e: print >> sys.stderr, "task %s error:" % self.id, type(e), str(e) # TODO: REMOVE DEBUG, or make it use logging @@ -131,7 +123,7 @@ class OutputChannelTask(Node): self._exc = e # END set error flag # END exception handling - del(wc) + # if we didn't get all demanded items, which is also the case if count is 0 # we have depleted the input channel and are done @@ -151,7 +143,7 @@ class OutputChannelTask(Node): # thread having its copy on the stack # + 1 for the instance we provide to refcount # Soft close, so others can continue writing their results - if self.is_done() and getrefcount(self._out_wc) < 4: + if self.is_done(): # print "Closing channel of %r" % self.id self.close() # END handle channel closure @@ -212,14 +204,14 @@ class InputChannelTask(OutputChannelTask): to be the input channel to read from though.""" __slots__ = "_pool_ref" - def __init__(self, in_rc, *args, **kwargs): + def __init__(self, in_reader, *args, **kwargs): OutputChannelTask.__init__(self, *args, **kwargs) - self._read = in_rc.read + self._read = in_reader.read self._pool_ref = None #{ Internal Interface - def rchannel(self): + def reader(self): """:return: input channel from which we read""" # the instance is bound in its instance method - lets use this to keep # the refcount at one ( per consumer ) -- cgit v1.2.1 From 7c36f3648e39ace752c67c71867693ce1eee52a3 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 15:38:40 +0200 Subject: Now tracking the amount of concurrent writers to assure the channel is closed only when there is no one else writing to it. This assures that all tasks can continue working, and put their results accordingly. Shutdown is still not working correctly, but that should be solvable as well. Its still not perfect though ... --- lib/git/async/pool.py | 24 +++++++++++-------- lib/git/async/task.py | 64 ++++++++++++++++++++++++++++++++++++--------------- 2 files changed, 59 insertions(+), 29 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 68551ea3..3fd99c7b 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -33,13 +33,12 @@ import new class PoolReader(CallbackReader): """A reader designed to read from channels which take part in pools It acts like a handle to the underlying task in the pool.""" - __slots__ = ('_task_ref', '_pool_ref', '_read') + __slots__ = ('_task_ref', '_pool_ref') def __init__(self, channel, task, pool): CallbackReader.__init__(self, channel) self._task_ref = weakref.ref(task) self._pool_ref = weakref.ref(pool) - self._read = new.instancemethod(CallbackReader.__dict__['read'], self, CallbackReader) def __del__(self): """Assures that our task will be deleted if we were the last reader""" @@ -62,11 +61,16 @@ class PoolReader(CallbackReader): # it has no way of knowing that the write channel is about to diminsh. # which is why we pass the info as a private kwarg - not nice, but # okay for now - # TODO: Fix this - private/public method if sys.getrefcount(self) < 6: - pool.remove_task(task) + pool.remove_task(task, _from_destructor_ = True) # END handle refcount based removal of task + #{ Internal + def _read(self, count=0, block=True, timeout=None): + return CallbackReader.read(self, count, block, timeout) + + #} END internal + #{ Interface def pool_ref(self): @@ -261,7 +265,7 @@ class Pool(object): if self._num_workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task - qput = self._queue + qput = self._queue.put if numchunks > 1: for i in xrange(numchunks): qput((task.process, chunksize)) @@ -290,16 +294,16 @@ class Pool(object): # END for each task to process - def _remove_task_if_orphaned(self, task): + def _remove_task_if_orphaned(self, task, from_destructor): """Check the task, and delete it if it is orphaned""" # 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader # If we are getting here from the destructor of an RPool channel, # its totally valid to virtually decrement the refcount by 1 as # we can expect it to drop once the destructor completes, which is when # we finish all recursive calls - max_ref_count = 3 + max_ref_count = 3 + from_destructor if sys.getrefcount(task.writer().channel) < max_ref_count: - self.remove_task(task) + self.remove_task(task, from_destructor) #} END internal #{ Interface @@ -370,7 +374,7 @@ class Pool(object): finally: self._taskgraph_lock.release() - def remove_task(self, task): + def remove_task(self, task, _from_destructor_ = False): """Delete the task Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume @@ -405,7 +409,7 @@ class Pool(object): # END locked deletion for t in in_tasks: - self._remove_task_if_orphaned(t) + self._remove_task_if_orphaned(t, _from_destructor_) # END handle orphans recursively return self diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 5a6c1e95..ae2532d9 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -82,23 +82,36 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" - # print "%r: reading %i" % (self.id, count) + # first thing: increment the writer count + self._wlock.acquire() + self._num_writers += 1 + self._wlock.release() + + #print "%r: reading %i" % (self.id, count) + #if hasattr(self, 'reader'): + # print "from", self.reader().channel items = self._read(count) - # print "%r: done reading %i items" % (self.id, len(items)) + #print "%r: done reading %i items" % (self.id, len(items)) try: - write = self._out_writer.write - if self.apply_single: - for item in items: - rval = self.fun(item) - write(rval) - # END for each item - else: - # shouldn't apply single be the default anyway ? - # The task designers should chunk them up in advance - rvals = self.fun(items) - for rval in rvals: - write(rval) - # END handle single apply + try: + write = self._out_writer.write + if self.apply_single: + for item in items: + rval = self.fun(item) + write(rval) + # END for each item + else: + # shouldn't apply single be the default anyway ? + # The task designers should chunk them up in advance + rvals = self.fun(items) + for rval in rvals: + write(rval) + # END handle single apply + finally: + self._wlock.acquire() + self._num_writers -= 1 + self._wlock.release() + # END handle writer count except Exception, e: print >> sys.stderr, "task %s error:" % self.id, type(e), str(e) # TODO: REMOVE DEBUG, or make it use logging # be sure our task is not scheduled again @@ -144,8 +157,13 @@ class OutputChannelTask(Node): # + 1 for the instance we provide to refcount # Soft close, so others can continue writing their results if self.is_done(): - # print "Closing channel of %r" % self.id - self.close() + self._wlock.acquire() + if self._num_writers == 0: + #if not self.is_closed(): # DEBUG + # print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel + self.close() + # END handle writers + self._wlock.release() # END handle channel closure #{ Configuration @@ -158,7 +176,7 @@ class ThreadTaskBase(object): class InputIteratorTaskBase(OutputChannelTask): """Implements a task which processes items from an iterable in a multi-processing safe manner""" - __slots__ = ('_iterator', '_lock') + __slots__ = ('_iterator', '_lock', '_empty') # the type of the lock to use when reading from the iterator lock_type = None @@ -169,12 +187,19 @@ class InputIteratorTaskBase(OutputChannelTask): self._iterator = iterator self._lock = self.lock_type() self._read = self.__read + self._empty = False def __read(self, count=0): """Read count items from the iterator, and return them""" + # not threadsafe, but worst thing that could happen is that + # we try to get items one more time + if self._empty: + return list() + # END early abort self._lock.acquire() try: if count == 0: + self._empty = True return list(self._iterator) else: out = list() @@ -183,6 +208,7 @@ class InputIteratorTaskBase(OutputChannelTask): try: out.append(it.next()) except StopIteration: + self._empty = True break # END handle empty iterator # END for each item to take @@ -198,7 +224,7 @@ class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase): lock_type = threading.Lock -class InputChannelTask(OutputChannelTask): +class InputChannelTask(OutputChannelTask, ThreadTaskBase): """Uses an input channel as source for reading items For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" -- cgit v1.2.1 From c34343d0b714d2c4657972020afea034a167a682 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 15:52:32 +0200 Subject: tasks can now terminate faster when no items were read, without neglecting their duty to close the channel if required. Code is a little less maintainable now, but faster, it appears --- lib/git/async/task.py | 43 ++++++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 19 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index ae2532d9..a8ba5ac6 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -92,21 +92,24 @@ class OutputChannelTask(Node): # print "from", self.reader().channel items = self._read(count) #print "%r: done reading %i items" % (self.id, len(items)) + try: try: - write = self._out_writer.write - if self.apply_single: - for item in items: - rval = self.fun(item) - write(rval) - # END for each item - else: - # shouldn't apply single be the default anyway ? - # The task designers should chunk them up in advance - rvals = self.fun(items) - for rval in rvals: - write(rval) - # END handle single apply + if items: + write = self._out_writer.write + if self.apply_single: + for item in items: + rval = self.fun(item) + write(rval) + # END for each item + else: + # shouldn't apply single be the default anyway ? + # The task designers should chunk them up in advance + rvals = self.fun(items) + for rval in rvals: + write(rval) + # END handle single apply + # END if there is anything to do finally: self._wlock.acquire() self._num_writers -= 1 @@ -158,12 +161,14 @@ class OutputChannelTask(Node): # Soft close, so others can continue writing their results if self.is_done(): self._wlock.acquire() - if self._num_writers == 0: - #if not self.is_closed(): # DEBUG - # print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel - self.close() - # END handle writers - self._wlock.release() + try: + if self._num_writers == 0: + # print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel + self.close() + # END handle writers + finally: + self._wlock.release() + # END assure lock release # END handle channel closure #{ Configuration -- cgit v1.2.1 From fbe062bf6dacd3ad63dd827d898337fa542931ac Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 23:55:50 +0200 Subject: Added dependency-task tests, and fixed plenty of ref-count related bugs, as well as concurrency issues. Now it works okay, but the thread-shutdown is still an issue, as it causes incorrect behaviour making the tests fail. Its good, as it hints at additional issues that need to be solved. There is just a little more left on the feature side, but its nearly there --- lib/git/async/graph.py | 14 +++++++++++++- lib/git/async/pool.py | 2 +- lib/git/async/task.py | 10 ++++++++-- lib/git/async/thread.py | 43 ++++++++++++++++++++++--------------------- 4 files changed, 44 insertions(+), 25 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py index e3999cdc..9ee0e891 100644 --- a/lib/git/async/graph.py +++ b/lib/git/async/graph.py @@ -25,14 +25,24 @@ class Graph(object): def __init__(self): self.nodes = list() + + def __del__(self): + """Deletes bidericational dependencies""" + for node in self.nodes: + node.in_nodes = None + node.out_nodes = None + # END cleanup nodes + + # otherwise the nodes would keep floating around + def add_node(self, node): """Add a new node to the graph :return: the newly added node""" self.nodes.append(node) return node - def del_node(self, node): + def remove_node(self, node): """Delete a node from the graph :return: self""" try: @@ -46,6 +56,8 @@ class Graph(object): del(outn.in_nodes[outn.in_nodes.index(node)]) for inn in node.in_nodes: del(inn.out_nodes[inn.out_nodes.index(node)]) + node.out_nodes = list() + node.in_nodes = list() return self def add_edge(self, u, v): diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 3fd99c7b..0aad90ae 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -402,7 +402,7 @@ class Pool(object): # keep its input nodes as we check whether they were orphaned in_tasks = task.in_nodes - self._tasks.del_node(task) + self._tasks.remove_node(task) self._taskorder_cache.clear() finally: self._taskgraph_lock.release() diff --git a/lib/git/async/task.py b/lib/git/async/task.py index a8ba5ac6..49e7e7cf 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -82,7 +82,8 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" - # first thing: increment the writer count + # first thing: increment the writer count - other tasks must be able + # to respond properly ( even if it turns out we don't need it later ) self._wlock.acquire() self._num_writers += 1 self._wlock.release() @@ -191,7 +192,11 @@ class InputIteratorTaskBase(OutputChannelTask): raise ValueError("Iterator %r needs a next() function" % iterator) self._iterator = iterator self._lock = self.lock_type() - self._read = self.__read + + # this is necessary to prevent a cyclic ref, preventing us from + # getting deleted ( and collected ) + weakself = weakref.ref(self) + self._read = lambda count: weakself().__read(count) self._empty = False def __read(self, count=0): @@ -201,6 +206,7 @@ class InputIteratorTaskBase(OutputChannelTask): if self._empty: return list() # END early abort + self._lock.acquire() try: if count == 0: diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index faeda04f..b8d2e418 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -116,7 +116,7 @@ class WorkerThread(TerminatableThread): t[1] = optional, tuple or list of arguments to pass to the routine t[2] = optional, dictionary of keyword arguments to pass to the routine """ - __slots__ = ('inq', '_current_routine') + __slots__ = ('inq') # define how often we should check for a shutdown request in case our @@ -128,7 +128,6 @@ class WorkerThread(TerminatableThread): self.inq = inq if inq is None: self.inq = Queue.Queue() - self._current_routine = None # routine we execute right now @classmethod def stop(cls, *args): @@ -141,7 +140,6 @@ class WorkerThread(TerminatableThread): gettask = self.inq.get while True: - self._current_routine = None if self._should_terminate(): break # END check for stop request @@ -153,22 +151,27 @@ class WorkerThread(TerminatableThread): assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need" routine, arg = tasktuple - self._current_routine = routine - try: - rval = None - if inspect.ismethod(routine): - if routine.im_self is None: - rval = routine(self, arg) - else: + try: + rval = None + if inspect.ismethod(routine): + if routine.im_self is None: + rval = routine(self, arg) + else: + rval = routine(arg) + elif inspect.isroutine(routine): rval = routine(arg) - elif inspect.isroutine(routine): - rval = routine(arg) - else: - # ignore unknown items - print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) - break - # END make routine call + else: + # ignore unknown items + print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) + break + # END make routine call + finally: + # make sure we delete the routine to release the reference as soon + # as possible. Otherwise objects might not be destroyed + # while we are waiting + del(routine) + del(tasktuple) except StopProcessing: print self.name, "stops processing" # DEBUG break @@ -176,12 +179,10 @@ class WorkerThread(TerminatableThread): print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) continue # just continue # END routine exception handling + + # END handle routine release # END endless loop - def routine(self): - """:return: routine we are currently executing, or None if we have no task""" - return self._current_routine - def stop_and_join(self): """Send stop message to ourselves""" self.inq.put((self.stop, None)) -- cgit v1.2.1 From 6d1212e8c412b0b4802bc1080d38d54907db879d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 11:52:01 +0200 Subject: IMPORTANT: sometimes, when notifying waiters by releasing their lock, the lock is not actually released or they are not actually notifyied, staying in a beautysleep. This glitch is probably caused by some detail not treated correctly in the thread python module, which is something we cannot fix. It works most of the time as expected though - maybe some cleanup is not done correctly which causes this --- lib/git/async/pool.py | 1 - lib/git/async/thread.py | 2 -- lib/git/async/util.py | 8 +++++--- 3 files changed, 5 insertions(+), 6 deletions(-) (limited to 'lib/git/async') diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 0aad90ae..dbc201a9 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -324,7 +324,6 @@ class Pool(object): threadsafe to optimize item throughput. :note: currently NOT threadsafe !""" - print "set_size", size assert size > -1, "Size cannot be negative" # either start new threads, or kill existing ones. diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index b8d2e418..4d046a2f 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -146,9 +146,7 @@ class WorkerThread(TerminatableThread): # we wait and block - to terminate, send the 'stop' method tasktuple = gettask() - # needing exactly one function, and one arg - assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need" routine, arg = tasktuple try: diff --git a/lib/git/async/util.py b/lib/git/async/util.py index b7750b0b..11ab75a6 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -101,10 +101,12 @@ class HSCondition(deque): waiter.acquire() # get it the first time, no blocking self.append(waiter) - # in the momemnt we release our lock, someone else might actually resume - self._lock.release() - try: # restore state no matter what (e.g., KeyboardInterrupt) + + try: + # restore state no matter what (e.g., KeyboardInterrupt) # now we block, as we hold the lock already + # in the momemnt we release our lock, someone else might actually resume + self._lock.release() if timeout is None: waiter.acquire() else: -- cgit v1.2.1