summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-09 16:59:17 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-09 16:59:17 +0200
commit365fb14ced88a5571d3287ff1698582ceacd80d6 (patch)
tree49344760ab12a9b031e5dbcde92aa0f7347dd875
parentea81f14dafbfb24d70373c74b5f8dabf3f2225d9 (diff)
downloadgitpython-365fb14ced88a5571d3287ff1698582ceacd80d6.tar.gz
task: redesigned write channel access to allow the task creator to set own write channels, possibly some with callbacks installed etc.. Pool.add_task will respect the users choice now, but provide defaults which are optimized for performance
-rw-r--r--lib/git/async/pool.py13
-rw-r--r--lib/git/async/task.py37
2 files changed, 30 insertions, 20 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 7d4e96d1..f7c1cfe0 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -388,18 +388,21 @@ class Pool(object):
self._taskorder_cache.clear()
self._tasks.add_node(task)
- # fix locks - in serial mode, the task does not need real locks
- # Additionally, use a non-threadsafe queue
+ # Use a non-threadsafe queue
# This brings about 15% more performance, but sacrifices thread-safety
# when reading from multiple threads.
if self.size() == 0:
wctype = SerialWChannel
# END improve locks
- # setup the tasks channel
- wc = wctype()
+ # setup the tasks channel - respect the task creators choice though
+ # if it is set.
+ wc = task.wchannel()
+ if wc is None:
+ wc = wctype()
+ # END create write channel ifunset
rc = RPoolChannel(wc, task, self)
- task.set_wc(wc)
+ task.set_wchannel(wc)
finally:
self._taskgraph_lock.release()
# END sync task addition
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index be02cfe8..f98336b2 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -1,4 +1,5 @@
from graph import Node
+from channel import WChannel
from util import ReadOnly
import threading
@@ -11,8 +12,8 @@ class OutputChannelTask(Node):
"""Abstracts a named task as part of a set of interdependent tasks, which contains
additional information on how the task should be queued and processed.
- Results of the item processing are sent to an output channel, which is to be
- set by the creator
+ Results of the item processing are sent to a write channel, which is to be
+ set by the creator using the ``set_wchannel`` method.
* **min_count** assures that not less than min_count items will be processed per call.
* **max_chunksize** assures that multi-threading is happening in smaller chunks. If
@@ -29,10 +30,11 @@ class OutputChannelTask(Node):
'apply_single' # apply single items even if multiple where read
)
- def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0):
+ def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0,
+ wchannel=None):
Node.__init__(self, id)
self._read = None # to be set by subclasss
- self._out_wc = None # to be set later
+ self._out_wc = wchannel # to be set later
self._exc = None
self._done = False
self.fun = fun
@@ -48,13 +50,21 @@ class OutputChannelTask(Node):
"""Set ourselves to being done, has we have completed the processing"""
self._done = True
- def set_wc(self, wc):
- """Set the write channel to the given one
- :note: resets it done state in order to allow proper queue handling"""
- self._done = False # TODO : fix this, this is a side-effect
- self._scheduled_items = 0
+ def set_wchannel(self, wc):
+ """Set the write channel to the given one"""
self._out_wc = wc
+ def wchannel(self):
+ """:return: a proxy to our write channel or None if non is set
+ :note: you must not hold a reference to our write channel when the
+ task is being processed. This would cause the write channel never
+ to be closed as the task will think there is still another instance
+ being processed which can close the channel once it is done.
+ In the worst case, this will block your reads."""
+ if self._out_wc is None:
+ return None
+ return self._out_wc
+
def close(self):
"""A closed task will close its channel to assure the readers will wake up
:note: its safe to call this method multiple times"""
@@ -128,8 +138,10 @@ class OutputChannelTask(Node):
# END handle done state
# If we appear to be the only one left with our output channel, and are
- # closed ( this could have been set in another thread as well ), make
+ # done ( this could have been set in another thread as well ), make
# sure to close the output channel.
+ # Waiting with this to be the last one helps to keep the
+ # write-channel writable longer
# The count is: 1 = wc itself, 2 = first reader channel, + x for every
# thread having its copy on the stack
# + 1 for the instance we provide to refcount
@@ -196,10 +208,5 @@ class InputChannelTask(OutputChannelTask):
OutputChannelTask.__init__(self, *args, **kwargs)
self._read = in_rc.read
- def process(self, count=1):
- # for now, just blindly read our input, could trigger a pool, even
- # ours, but why not ? It should be able to handle this
- # TODO: remove this method
- super(InputChannelTask, self).process(count)
#{ Configuration