diff options
Diffstat (limited to 'lib/git/async')
-rw-r--r-- | lib/git/async/channel.py | 36 | ||||
-rw-r--r-- | lib/git/async/pool.py | 36 | ||||
-rw-r--r-- | lib/git/async/util.py | 9 |
3 files changed, 54 insertions, 27 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 3a277e7e..bb118f30 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -6,6 +6,7 @@ from Queue import ( from util import ( AsyncQueue, + SyncQueue, ReadOnly ) @@ -24,27 +25,19 @@ class Channel(object): Create a new channel """ __slots__ = tuple() - - def __new__(cls, *args): - if cls is Channel: - if len(args) > 0: - raise ValueError("Cannot take any arguments when creating a new channel") - wc = WChannel() - rc = RChannel(wc) - return wc, rc - # END constructor mode - return object.__new__(cls) class WChannel(Channel): - """The write end of a channel""" + """The write end of a channel - it is thread-safe""" __slots__ = ('_queue') + # The queue to use to store the actual data + QueueCls = AsyncQueue + def __init__(self): """initialize this instance, able to hold max_items at once Write calls will block if the channel is full, until someone reads from it""" - self._queue = AsyncQueue() - + self._queue = self.QueueCls() #{ Interface def write(self, item, block=True, timeout=None): @@ -75,6 +68,12 @@ class WChannel(Channel): #} END interface +class SerialWChannel(WChannel): + """A slightly faster version of a WChannel, which sacrificed thead-safety for + performance""" + QueueCls = SyncQueue + + class RChannel(Channel): """The read-end of a corresponding write channel""" __slots__ = '_wc' @@ -174,3 +173,14 @@ class RChannel(Channel): #} END interface #} END classes + +#{ Constructors +def mkchannel(wctype = WChannel, rctype = RChannel): + """Create a channel, which consists of one write end and one read end + :return: tuple(write_channel, read_channel) + :param wctype: The type of the write channel to instantiate + :param rctype: The type of the read channel to instantiate""" + wc = wctype() + rc = rctype(wc) + return wc, rc +#} END constructors diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 66a2a105..549c801e 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -6,7 +6,6 @@ from thread import ( from threading import Lock from util import ( - SyncQueue, AsyncQueue, DummyLock ) @@ -19,8 +18,9 @@ from Queue import ( from graph import Graph from channel import ( - Channel, + mkchannel, WChannel, + SerialWChannel, RChannel ) @@ -329,7 +329,8 @@ class Pool(object): #{ Interface def size(self): - """:return: amount of workers in the pool""" + """:return: amount of workers in the pool + :note: method is not threadsafe !""" return self._num_workers def set_size(self, size=0): @@ -339,7 +340,9 @@ class Pool(object): :return: self :param size: if 0, the pool will do all work itself in the calling thread, - otherwise the work will be distributed among the given amount of threads + otherwise the work will be distributed among the given amount of threads. + If the size is 0, newly added tasks will use channels which are NOT + threadsafe to optimize item throughput. :note: currently NOT threadsafe !""" assert size > -1, "Size cannot be negative" @@ -437,17 +440,29 @@ class Pool(object): the task will be considered orphaned and will be deleted on the next occasion.""" # create a write channel for it - wc, rc = Channel() - rc = RPoolChannel(wc, task, self) - task.set_wc(wc) + wctype = WChannel self._taskgraph_lock.acquire() try: self._taskorder_cache.clear() self._tasks.add_node(task) + + # fix locks - in serial mode, the task does not need real locks + # Additionally, use a non-threadsafe queue + # This brings about 15% more performance, but sacrifices thread-safety + # when reading from multiple threads. + if self.size() == 0: + task._slock = DummyLock() + wctype = SerialWChannel + # END improve locks + + # setup the tasks channel + wc = wctype() + rc = RPoolChannel(wc, task, self) + task.set_wc(wc) finally: self._taskgraph_lock.release() - # END sync task addition + # END sync task addition # If the input channel is one of our read channels, we add the relation if isinstance(task, InputChannelTask): @@ -462,11 +477,6 @@ class Pool(object): # END add task relation # END handle input channels for connections - # fix locks - in serial mode, the task does not need real locks - if self.size() == 0: - task._slock = DummyLock() - # END improve locks - return rc #} END interface diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 2f46d55f..00d0dbab 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -66,7 +66,14 @@ class SyncQueue(deque): def empty(self): return len(self) == 0 - put = deque.append + def set_writable(self, state): + pass + + def writable(self): + return True + + def put(self, item, block=True, timeout=None): + self.append(item) class HSCondition(deque): |