summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-09 15:29:47 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-09 15:29:47 +0200
commit57a4e09294230a36cc874a6272c71757c48139f2 (patch)
treee77cc49b66cacae91400ba1eaa7856ce367fc1a6 /lib/git/async/pool.py
parent0974f8737a3c56a7c076f9d0b757c6cb106324fb (diff)
downloadgitpython-57a4e09294230a36cc874a6272c71757c48139f2.tar.gz
Channel: removed pseudoconstructor, which clearly improves the design and makes it easier to constomize
pool: in serial mode, created channels will be serial-only, which brings 15% of performance
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py36
1 files changed, 23 insertions, 13 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 66a2a105..549c801e 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -6,7 +6,6 @@ from thread import (
from threading import Lock
from util import (
- SyncQueue,
AsyncQueue,
DummyLock
)
@@ -19,8 +18,9 @@ from Queue import (
from graph import Graph
from channel import (
- Channel,
+ mkchannel,
WChannel,
+ SerialWChannel,
RChannel
)
@@ -329,7 +329,8 @@ class Pool(object):
#{ Interface
def size(self):
- """:return: amount of workers in the pool"""
+ """:return: amount of workers in the pool
+ :note: method is not threadsafe !"""
return self._num_workers
def set_size(self, size=0):
@@ -339,7 +340,9 @@ class Pool(object):
:return: self
:param size: if 0, the pool will do all work itself in the calling thread,
- otherwise the work will be distributed among the given amount of threads
+ otherwise the work will be distributed among the given amount of threads.
+ If the size is 0, newly added tasks will use channels which are NOT
+ threadsafe to optimize item throughput.
:note: currently NOT threadsafe !"""
assert size > -1, "Size cannot be negative"
@@ -437,17 +440,29 @@ class Pool(object):
the task will be considered orphaned and will be deleted on the next
occasion."""
# create a write channel for it
- wc, rc = Channel()
- rc = RPoolChannel(wc, task, self)
- task.set_wc(wc)
+ wctype = WChannel
self._taskgraph_lock.acquire()
try:
self._taskorder_cache.clear()
self._tasks.add_node(task)
+
+ # fix locks - in serial mode, the task does not need real locks
+ # Additionally, use a non-threadsafe queue
+ # This brings about 15% more performance, but sacrifices thread-safety
+ # when reading from multiple threads.
+ if self.size() == 0:
+ task._slock = DummyLock()
+ wctype = SerialWChannel
+ # END improve locks
+
+ # setup the tasks channel
+ wc = wctype()
+ rc = RPoolChannel(wc, task, self)
+ task.set_wc(wc)
finally:
self._taskgraph_lock.release()
- # END sync task addition
+ # END sync task addition
# If the input channel is one of our read channels, we add the relation
if isinstance(task, InputChannelTask):
@@ -462,11 +477,6 @@ class Pool(object):
# END add task relation
# END handle input channels for connections
- # fix locks - in serial mode, the task does not need real locks
- if self.size() == 0:
- task._slock = DummyLock()
- # END improve locks
-
return rc
#} END interface