From 3776f7a766851058f6435b9f606b16766425d7ca Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 13:24:44 +0200 Subject: The new channeldesign actually works, but it also shows that its located at the wrong spot. The channel is nothing more than an adapter allowing to read multiple items from a thread-safe queue, the queue itself though must be 'closable' for writing, or needs something like a writable flag. --- lib/git/async/task.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index cf486f48..ce701c86 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -140,10 +140,10 @@ class OutputChannelTask(Node): # If we appear to be the only one left with our output channel, and are # closed ( this could have been set in another thread as well ), make # sure to close the output channel. - # The count is: 1 = wc itself, 2 = first reader channel, and we have only - # one, 3 is ours + x for every thread having its copy on the stack + # The count is: 1 = wc itself, 2 = first reader channel, + x for every + # thread having its copy on the stack # + 1 for the instance we provide to refcount - if self.is_done() and sys.getrefcount(self._out_wc) < 5: + if self.is_done() and sys.getrefcount(self._out_wc) < 4: self.close() # END handle channel closure #{ Configuration -- cgit v1.2.1 From 619c11787742ce00a0ee8f841cec075897873c79 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 16:47:48 +0200 Subject: Its getting better already - intermediate commit before further chaning the task class --- lib/git/async/task.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index ce701c86..97521cae 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -88,6 +88,7 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) + print "task read", len(items) try: # increase the ref-count - we use this to determine whether anyone else # is currently handling our output channel. As this method runs asynchronously, @@ -117,6 +118,7 @@ class OutputChannelTask(Node): # END handle single apply except Exception, e: self._exc = e + print str(e) # TODO: REMOVE DEBUG, or make it use logging self.set_done() # unschedule all, we don't know how many have been produced actually # but only if we don't apply single please -- cgit v1.2.1 From 13dd59ba5b3228820841682b59bad6c22476ff66 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 17:25:43 +0200 Subject: task: now deletes itself once its done - for the test this doesn't change a thing as the task deletes itself too late - its time for a paradigm change, the task should be deleted with its RPoolChannel or explicitly by the user. The test needs to adapt, and shouldn't assume anything unless the RPoolChannel is gone --- lib/git/async/task.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 97521cae..dc207c33 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -23,6 +23,7 @@ class OutputChannelTask(Node): '_done', # True if we are done '_scheduled_items', # amount of scheduled items that will be processed in total '_slock', # lock for scheduled items + '_pool_ref', # to be set by Pool 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -84,6 +85,10 @@ class OutputChannelTask(Node): finally: self._slock.release() # END threadsafe return + + def set_pool(self, pool): + """Set our pool to the given one, it will be weakref'd""" + self._pool_ref = weakref.ref(pool) def process(self, count=0): """Process count items and send the result individually to the output channel""" @@ -147,6 +152,16 @@ class OutputChannelTask(Node): # + 1 for the instance we provide to refcount if self.is_done() and sys.getrefcount(self._out_wc) < 4: self.close() + # additionally, remove ourselves from the pool, this is thread-safe + # Previously the pool collected done tasks and removed them, + # but this could happen after a read finished, potentially + # leaving them on the queue until the read-handle was dropped. + # This should assure its more in-time. + # I don't like this back-ref. + pool = self._pool_ref() + if pool: + pool.del_task(self) + # END remove ourselves from the pool # END handle channel closure #{ Configuration @@ -204,8 +219,7 @@ class InputChannelTask(OutputChannelTask): For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" __slots__ = ( - 'in_rc', # channel to read items from - '_pool_ref' # to be set by Pool + 'in_rc' # channel to read items from ) def __init__(self, in_rc, *args, **kwargs): @@ -242,9 +256,5 @@ class InputChannelTask(OutputChannelTask): # and call it return OutputChannelTask.process(self, count) - - def set_pool(self, pool): - """Set our pool to the given one, it will be weakref'd""" - self._pool_ref = weakref.ref(pool) #{ Configuration -- cgit v1.2.1 From e5c0002d069382db1768349bf0c5ff40aafbf140 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 18:20:12 +0200 Subject: Revised task deletion works well, adjusted test to be creating new tasks all the time instead of reusing its own one, it was somewhat hard to manage its state over time and could cause bugs. It works okay, but it occasionally hangs, it appears to be an empty queue, have to gradually put certain things back in, although in the current mode of operation, it should never have empty queues from the pool to the user --- lib/git/async/task.py | 64 +++++++++------------------------------------------ 1 file changed, 11 insertions(+), 53 deletions(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index dc207c33..5edd40bb 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,10 +1,11 @@ from graph import Node import threading -import weakref import sys import new +getrefcount = sys.getrefcount + class OutputChannelTask(Node): """Abstracts a named task as part of a set of interdependent tasks, which contains additional information on how the task should be queued and processed. @@ -23,7 +24,6 @@ class OutputChannelTask(Node): '_done', # True if we are done '_scheduled_items', # amount of scheduled items that will be processed in total '_slock', # lock for scheduled items - '_pool_ref', # to be set by Pool 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -54,7 +54,7 @@ class OutputChannelTask(Node): def set_wc(self, wc): """Set the write channel to the given one :note: resets it done state in order to allow proper queue handling""" - self._done = False + self._done = False # TODO : fix this, this is a side-effect self._scheduled_items = 0 self._out_wc = wc @@ -86,10 +86,6 @@ class OutputChannelTask(Node): self._slock.release() # END threadsafe return - def set_pool(self, pool): - """Set our pool to the given one, it will be weakref'd""" - self._pool_ref = weakref.ref(pool) - def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) @@ -123,7 +119,7 @@ class OutputChannelTask(Node): # END handle single apply except Exception, e: self._exc = e - print str(e) # TODO: REMOVE DEBUG, or make it use logging + print "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging self.set_done() # unschedule all, we don't know how many have been produced actually # but only if we don't apply single please @@ -150,18 +146,8 @@ class OutputChannelTask(Node): # The count is: 1 = wc itself, 2 = first reader channel, + x for every # thread having its copy on the stack # + 1 for the instance we provide to refcount - if self.is_done() and sys.getrefcount(self._out_wc) < 4: + if self.is_done() and getrefcount(self._out_wc) < 4: self.close() - # additionally, remove ourselves from the pool, this is thread-safe - # Previously the pool collected done tasks and removed them, - # but this could happen after a read finished, potentially - # leaving them on the queue until the read-handle was dropped. - # This should assure its more in-time. - # I don't like this back-ref. - pool = self._pool_ref() - if pool: - pool.del_task(self) - # END remove ourselves from the pool # END handle channel closure #{ Configuration @@ -218,43 +204,15 @@ class InputChannelTask(OutputChannelTask): """Uses an input channel as source for reading items For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" - __slots__ = ( - 'in_rc' # channel to read items from - ) def __init__(self, in_rc, *args, **kwargs): OutputChannelTask.__init__(self, *args, **kwargs) - self._in_rc = in_rc - + self._read = in_rc.read + def process(self, count=1): - """Verify our setup, and do some additional checking, before the - base implementation can permanently perform all operations""" - self._read = self._in_rc.read - # make sure we don't trigger the pool if we read from a pool channel which - # belongs to our own pool. Channels from different pools are fine though, - # there we want to trigger its computation - # PROBLEM: if the user keeps an end, but decides to put the same end into - # a task of this pool, then all items might deplete without new ones being - # produced, causing a deadlock. Just triggering the pool would be better, - # but cost's more, unnecessarily if there is just one consumer, which is - # the user. - # * could encode usage in the channel type, and fail if the refcount on - # the read-pool channel is too high - # * maybe keep track of the elements that are requested or in-production - # for each task, which would allow to precisely determine whether - # the pool as to be triggered, and bail out early. Problem would - # be the - # * Perhaps one shouldn't seek the perfect solution , but instead - # document whats working and what not, or under which conditions. - # The whole system is simple, but gets more complicated the - # smarter it wants to be. - if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref(): - self._read = self._in_rc._read - - # permanently install our base for processing - self.process = new.instancemethod(OutputChannelTask.__dict__['process'], self, type(self)) - - # and call it - return OutputChannelTask.process(self, count) + # for now, just blindly read our input, could trigger a pool, even + # ours, but why not ? It should be able to handle this + # TODO: remove this method + super(InputChannelTask, self).process(count) #{ Configuration -- cgit v1.2.1 From 772b95631916223e472989b43f3a31f61e237f31 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 19:25:33 +0200 Subject: workerthread: adjusted to use a blocking queue, it will receive termination events only with its queue, with boosts performance into brigt green levels --- lib/git/async/task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/git/async/task.py') diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 5edd40bb..f9536a45 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -89,7 +89,7 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) - print "task read", len(items) + # print "task read", len(items) try: # increase the ref-count - we use this to determine whether anyone else # is currently handling our output channel. As this method runs asynchronously, -- cgit v1.2.1