diff options
Diffstat (limited to 'lib/git/async')
-rw-r--r-- | lib/git/async/pool.py | 13 | ||||
-rw-r--r-- | lib/git/async/task.py | 37 |
2 files changed, 30 insertions, 20 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 7d4e96d1..f7c1cfe0 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -388,18 +388,21 @@ class Pool(object): self._taskorder_cache.clear() self._tasks.add_node(task) - # fix locks - in serial mode, the task does not need real locks - # Additionally, use a non-threadsafe queue + # Use a non-threadsafe queue # This brings about 15% more performance, but sacrifices thread-safety # when reading from multiple threads. if self.size() == 0: wctype = SerialWChannel # END improve locks - # setup the tasks channel - wc = wctype() + # setup the tasks channel - respect the task creators choice though + # if it is set. + wc = task.wchannel() + if wc is None: + wc = wctype() + # END create write channel ifunset rc = RPoolChannel(wc, task, self) - task.set_wc(wc) + task.set_wchannel(wc) finally: self._taskgraph_lock.release() # END sync task addition diff --git a/lib/git/async/task.py b/lib/git/async/task.py index be02cfe8..f98336b2 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,4 +1,5 @@ from graph import Node +from channel import WChannel from util import ReadOnly import threading @@ -11,8 +12,8 @@ class OutputChannelTask(Node): """Abstracts a named task as part of a set of interdependent tasks, which contains additional information on how the task should be queued and processed. - Results of the item processing are sent to an output channel, which is to be - set by the creator + Results of the item processing are sent to a write channel, which is to be + set by the creator using the ``set_wchannel`` method. * **min_count** assures that not less than min_count items will be processed per call. * **max_chunksize** assures that multi-threading is happening in smaller chunks. If @@ -29,10 +30,11 @@ class OutputChannelTask(Node): 'apply_single' # apply single items even if multiple where read ) - def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0): + def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0, + wchannel=None): Node.__init__(self, id) self._read = None # to be set by subclasss - self._out_wc = None # to be set later + self._out_wc = wchannel # to be set later self._exc = None self._done = False self.fun = fun @@ -48,13 +50,21 @@ class OutputChannelTask(Node): """Set ourselves to being done, has we have completed the processing""" self._done = True - def set_wc(self, wc): - """Set the write channel to the given one - :note: resets it done state in order to allow proper queue handling""" - self._done = False # TODO : fix this, this is a side-effect - self._scheduled_items = 0 + def set_wchannel(self, wc): + """Set the write channel to the given one""" self._out_wc = wc + def wchannel(self): + """:return: a proxy to our write channel or None if non is set + :note: you must not hold a reference to our write channel when the + task is being processed. This would cause the write channel never + to be closed as the task will think there is still another instance + being processed which can close the channel once it is done. + In the worst case, this will block your reads.""" + if self._out_wc is None: + return None + return self._out_wc + def close(self): """A closed task will close its channel to assure the readers will wake up :note: its safe to call this method multiple times""" @@ -128,8 +138,10 @@ class OutputChannelTask(Node): # END handle done state # If we appear to be the only one left with our output channel, and are - # closed ( this could have been set in another thread as well ), make + # done ( this could have been set in another thread as well ), make # sure to close the output channel. + # Waiting with this to be the last one helps to keep the + # write-channel writable longer # The count is: 1 = wc itself, 2 = first reader channel, + x for every # thread having its copy on the stack # + 1 for the instance we provide to refcount @@ -196,10 +208,5 @@ class InputChannelTask(OutputChannelTask): OutputChannelTask.__init__(self, *args, **kwargs) self._read = in_rc.read - def process(self, count=1): - # for now, just blindly read our input, could trigger a pool, even - # ours, but why not ? It should be able to handle this - # TODO: remove this method - super(InputChannelTask, self).process(count) #{ Configuration |