summaryrefslogtreecommitdiff
path: root/lib/git/async
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-09 15:29:47 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-09 15:29:47 +0200
commit57a4e09294230a36cc874a6272c71757c48139f2 (patch)
treee77cc49b66cacae91400ba1eaa7856ce367fc1a6 /lib/git/async
parent0974f8737a3c56a7c076f9d0b757c6cb106324fb (diff)
downloadgitpython-57a4e09294230a36cc874a6272c71757c48139f2.tar.gz
Channel: removed pseudoconstructor, which clearly improves the design and makes it easier to constomize
pool: in serial mode, created channels will be serial-only, which brings 15% of performance
Diffstat (limited to 'lib/git/async')
-rw-r--r--lib/git/async/channel.py36
-rw-r--r--lib/git/async/pool.py36
-rw-r--r--lib/git/async/util.py9
3 files changed, 54 insertions, 27 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
index 3a277e7e..bb118f30 100644
--- a/lib/git/async/channel.py
+++ b/lib/git/async/channel.py
@@ -6,6 +6,7 @@ from Queue import (
from util import (
AsyncQueue,
+ SyncQueue,
ReadOnly
)
@@ -24,27 +25,19 @@ class Channel(object):
Create a new channel """
__slots__ = tuple()
-
- def __new__(cls, *args):
- if cls is Channel:
- if len(args) > 0:
- raise ValueError("Cannot take any arguments when creating a new channel")
- wc = WChannel()
- rc = RChannel(wc)
- return wc, rc
- # END constructor mode
- return object.__new__(cls)
class WChannel(Channel):
- """The write end of a channel"""
+ """The write end of a channel - it is thread-safe"""
__slots__ = ('_queue')
+ # The queue to use to store the actual data
+ QueueCls = AsyncQueue
+
def __init__(self):
"""initialize this instance, able to hold max_items at once
Write calls will block if the channel is full, until someone reads from it"""
- self._queue = AsyncQueue()
-
+ self._queue = self.QueueCls()
#{ Interface
def write(self, item, block=True, timeout=None):
@@ -75,6 +68,12 @@ class WChannel(Channel):
#} END interface
+class SerialWChannel(WChannel):
+ """A slightly faster version of a WChannel, which sacrificed thead-safety for
+ performance"""
+ QueueCls = SyncQueue
+
+
class RChannel(Channel):
"""The read-end of a corresponding write channel"""
__slots__ = '_wc'
@@ -174,3 +173,14 @@ class RChannel(Channel):
#} END interface
#} END classes
+
+#{ Constructors
+def mkchannel(wctype = WChannel, rctype = RChannel):
+ """Create a channel, which consists of one write end and one read end
+ :return: tuple(write_channel, read_channel)
+ :param wctype: The type of the write channel to instantiate
+ :param rctype: The type of the read channel to instantiate"""
+ wc = wctype()
+ rc = rctype(wc)
+ return wc, rc
+#} END constructors
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 66a2a105..549c801e 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -6,7 +6,6 @@ from thread import (
from threading import Lock
from util import (
- SyncQueue,
AsyncQueue,
DummyLock
)
@@ -19,8 +18,9 @@ from Queue import (
from graph import Graph
from channel import (
- Channel,
+ mkchannel,
WChannel,
+ SerialWChannel,
RChannel
)
@@ -329,7 +329,8 @@ class Pool(object):
#{ Interface
def size(self):
- """:return: amount of workers in the pool"""
+ """:return: amount of workers in the pool
+ :note: method is not threadsafe !"""
return self._num_workers
def set_size(self, size=0):
@@ -339,7 +340,9 @@ class Pool(object):
:return: self
:param size: if 0, the pool will do all work itself in the calling thread,
- otherwise the work will be distributed among the given amount of threads
+ otherwise the work will be distributed among the given amount of threads.
+ If the size is 0, newly added tasks will use channels which are NOT
+ threadsafe to optimize item throughput.
:note: currently NOT threadsafe !"""
assert size > -1, "Size cannot be negative"
@@ -437,17 +440,29 @@ class Pool(object):
the task will be considered orphaned and will be deleted on the next
occasion."""
# create a write channel for it
- wc, rc = Channel()
- rc = RPoolChannel(wc, task, self)
- task.set_wc(wc)
+ wctype = WChannel
self._taskgraph_lock.acquire()
try:
self._taskorder_cache.clear()
self._tasks.add_node(task)
+
+ # fix locks - in serial mode, the task does not need real locks
+ # Additionally, use a non-threadsafe queue
+ # This brings about 15% more performance, but sacrifices thread-safety
+ # when reading from multiple threads.
+ if self.size() == 0:
+ task._slock = DummyLock()
+ wctype = SerialWChannel
+ # END improve locks
+
+ # setup the tasks channel
+ wc = wctype()
+ rc = RPoolChannel(wc, task, self)
+ task.set_wc(wc)
finally:
self._taskgraph_lock.release()
- # END sync task addition
+ # END sync task addition
# If the input channel is one of our read channels, we add the relation
if isinstance(task, InputChannelTask):
@@ -462,11 +477,6 @@ class Pool(object):
# END add task relation
# END handle input channels for connections
- # fix locks - in serial mode, the task does not need real locks
- if self.size() == 0:
- task._slock = DummyLock()
- # END improve locks
-
return rc
#} END interface
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
index 2f46d55f..00d0dbab 100644
--- a/lib/git/async/util.py
+++ b/lib/git/async/util.py
@@ -66,7 +66,14 @@ class SyncQueue(deque):
def empty(self):
return len(self) == 0
- put = deque.append
+ def set_writable(self, state):
+ pass
+
+ def writable(self):
+ return True
+
+ def put(self, item, block=True, timeout=None):
+ self.append(item)
class HSCondition(deque):