diff options
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r-- | lib/git/async/pool.py | 36 |
1 files changed, 23 insertions, 13 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 66a2a105..549c801e 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -6,7 +6,6 @@ from thread import ( from threading import Lock from util import ( - SyncQueue, AsyncQueue, DummyLock ) @@ -19,8 +18,9 @@ from Queue import ( from graph import Graph from channel import ( - Channel, + mkchannel, WChannel, + SerialWChannel, RChannel ) @@ -329,7 +329,8 @@ class Pool(object): #{ Interface def size(self): - """:return: amount of workers in the pool""" + """:return: amount of workers in the pool + :note: method is not threadsafe !""" return self._num_workers def set_size(self, size=0): @@ -339,7 +340,9 @@ class Pool(object): :return: self :param size: if 0, the pool will do all work itself in the calling thread, - otherwise the work will be distributed among the given amount of threads + otherwise the work will be distributed among the given amount of threads. + If the size is 0, newly added tasks will use channels which are NOT + threadsafe to optimize item throughput. :note: currently NOT threadsafe !""" assert size > -1, "Size cannot be negative" @@ -437,17 +440,29 @@ class Pool(object): the task will be considered orphaned and will be deleted on the next occasion.""" # create a write channel for it - wc, rc = Channel() - rc = RPoolChannel(wc, task, self) - task.set_wc(wc) + wctype = WChannel self._taskgraph_lock.acquire() try: self._taskorder_cache.clear() self._tasks.add_node(task) + + # fix locks - in serial mode, the task does not need real locks + # Additionally, use a non-threadsafe queue + # This brings about 15% more performance, but sacrifices thread-safety + # when reading from multiple threads. + if self.size() == 0: + task._slock = DummyLock() + wctype = SerialWChannel + # END improve locks + + # setup the tasks channel + wc = wctype() + rc = RPoolChannel(wc, task, self) + task.set_wc(wc) finally: self._taskgraph_lock.release() - # END sync task addition + # END sync task addition # If the input channel is one of our read channels, we add the relation if isinstance(task, InputChannelTask): @@ -462,11 +477,6 @@ class Pool(object): # END add task relation # END handle input channels for connections - # fix locks - in serial mode, the task does not need real locks - if self.size() == 0: - task._slock = DummyLock() - # END improve locks - return rc #} END interface |