summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py241
1 files changed, 186 insertions, 55 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 26a6a182..30291835 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -1,8 +1,16 @@
"""Implementation of a thread-pool working with channels"""
from thread import WorkerThread
-from threading import Lock
+
+from threading import (
+ Lock,
+ _Condition,
+ _sleep,
+ _time,
+ )
+
from task import InputChannelTask
from Queue import Queue, Empty
+from collections import deque
from graph import (
Graph,
@@ -18,6 +26,96 @@ import weakref
import sys
+#{ Utilities
+
+class SyncQueue(deque):
+ """Adapter to allow using a deque like a queue, without locking"""
+ def get(self, block=True, timeout=None):
+ try:
+ return self.pop()
+ except IndexError:
+ raise Empty
+ # END raise empty
+
+ def empty(self):
+ return len(self) == 0
+
+ put = deque.append
+
+
+class HSCondition(_Condition):
+ """An attempt to make conditions less blocking, which gains performance
+ in return by sleeping less"""
+ delay = 0.00002 # reduces wait times, but increases overhead
+
+ def wait(self, timeout=None):
+ waiter = Lock()
+ waiter.acquire()
+ self.__dict__['_Condition__waiters'].append(waiter)
+ saved_state = self._release_save()
+ try: # restore state no matter what (e.g., KeyboardInterrupt)
+ if timeout is None:
+ waiter.acquire()
+ else:
+ # Balancing act: We can't afford a pure busy loop, so we
+ # have to sleep; but if we sleep the whole timeout time,
+ # we'll be unresponsive. The scheme here sleeps very
+ # little at first, longer as time goes on, but never longer
+ # than 20 times per second (or the timeout time remaining).
+ endtime = _time() + timeout
+ delay = self.delay
+ acquire = waiter.acquire
+ while True:
+ gotit = acquire(0)
+ if gotit:
+ break
+ remaining = endtime - _time()
+ if remaining <= 0:
+ break
+ delay = min(delay * 2, remaining, .05)
+ _sleep(delay)
+ # END endless loop
+ if not gotit:
+ try:
+ self.__dict__['_Condition__waiters'].remove(waiter)
+ except ValueError:
+ pass
+ # END didn't ever get it
+ finally:
+ self._acquire_restore(saved_state)
+
+ def notify(self, n=1):
+ __waiters = self.__dict__['_Condition__waiters']
+ if not __waiters:
+ return
+ if n == 1:
+ __waiters[0].release()
+ try:
+ __waiters.pop(0)
+ except IndexError:
+ pass
+ else:
+ waiters = __waiters[:n]
+ for waiter in waiters:
+ waiter.release()
+ try:
+ __waiters.remove(waiter)
+ except ValueError:
+ pass
+ # END handle n = 1 case faster
+
+class PerfQueue(Queue):
+ """A queue using different condition objects to gain multithreading performance"""
+ def __init__(self, maxsize=0):
+ Queue.__init__(self, maxsize)
+
+ self.not_empty = HSCondition(self.mutex)
+ self.not_full = HSCondition(self.mutex)
+ self.all_tasks_done = HSCondition(self.mutex)
+
+
+#} END utilities
+
class RPoolChannel(RChannel):
""" A read-only pool channel may not be wrapped or derived from, but it provides slots to call
before and after an item is to be read.
@@ -49,7 +147,7 @@ class RPoolChannel(RChannel):
returns a possibly changed item list. If it raises, the exception will be propagated.
If a function is not provided, the call is effectively uninstalled."""
self._post_cb = fun
-
+
def read(self, count=0, block=True, timeout=None):
"""Read an item that was processed by one of our threads
:note: Triggers task dependency handling needed to provide the necessary
@@ -58,8 +156,18 @@ class RPoolChannel(RChannel):
self._pre_cb()
# END pre callback
+ # if we have count items, don't do any queue preparation - if someone
+ # depletes the queue in the meanwhile, the channel will close and
+ # we will unblock naturally
+ have_enough = False
+ if count > 0:
+ # explicitly > count, as we want a certain safe range
+ have_enough = self._wc._queue.qsize() > count
+ # END risky game
+
########## prepare ##############################
- self._pool._prepare_channel_read(self._task, count)
+ if not have_enough:
+ self._pool._prepare_channel_read(self._task, count)
######### read data ######
@@ -127,9 +235,9 @@ class Pool(object):
def __init__(self, size=0):
self._tasks = Graph()
- self._consumed_tasks = Queue() # make sure its threadsafe
+ self._consumed_tasks = None
self._workers = list()
- self._queue = self.TaskQueueCls()
+ self._queue = SyncQueue() # start with a sync queue
self._taskgraph_lock = self.LockCls()
self._taskorder_cache = dict()
self.set_size(size)
@@ -201,58 +309,60 @@ class Pool(object):
# if the task does not have the required output on its queue, schedule
# it for processing. If we should process all, we don't care about the
# amount as it should process until its all done.
- # NOTE: revise this for multi-tasking - checking qsize doesnt work there !
- if count < 1 or task._out_wc.size() < count:
- # but we continue to use the actual count to produce the output
- numchunks = 1
- chunksize = actual_count
- remainder = 0
-
- # we need the count set for this - can't chunk up unlimited items
- # In serial mode we could do this by checking for empty input channels,
- # but in dispatch mode its impossible ( == not easily possible )
- # Only try it if we have enough demand
- if task.max_chunksize and actual_count > task.max_chunksize:
- numchunks = actual_count / task.max_chunksize
- chunksize = task.max_chunksize
- remainder = actual_count - (numchunks * chunksize)
- # END handle chunking
-
- # the following loops are kind of unrolled - code duplication
- # should make things execute faster. Putting the if statements
- # into the loop would be less code, but ... slower
- # DEBUG
- # print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
- if self._workers:
- # respect the chunk size, and split the task up if we want
- # to process too much. This can be defined per task
- queue = self._queue
- if numchunks > 1:
- for i in xrange(numchunks):
- queue.put((task.process, chunksize))
- # END for each chunk to put
- else:
+ #if count > 1 and task._out_wc.size() >= count:
+ # continue
+ # END skip if we have enough
+
+ # but use the actual count to produce the output, we may produce
+ # more than requested
+ numchunks = 1
+ chunksize = actual_count
+ remainder = 0
+
+ # we need the count set for this - can't chunk up unlimited items
+ # In serial mode we could do this by checking for empty input channels,
+ # but in dispatch mode its impossible ( == not easily possible )
+ # Only try it if we have enough demand
+ if task.max_chunksize and actual_count > task.max_chunksize:
+ numchunks = actual_count / task.max_chunksize
+ chunksize = task.max_chunksize
+ remainder = actual_count - (numchunks * chunksize)
+ # END handle chunking
+
+ # the following loops are kind of unrolled - code duplication
+ # should make things execute faster. Putting the if statements
+ # into the loop would be less code, but ... slower
+ # DEBUG
+ # print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
+ if self._workers:
+ # respect the chunk size, and split the task up if we want
+ # to process too much. This can be defined per task
+ queue = self._queue
+ if numchunks > 1:
+ for i in xrange(numchunks):
queue.put((task.process, chunksize))
- # END try efficient looping
-
- if remainder:
- queue.put((task.process, remainder))
- # END handle chunksize
+ # END for each chunk to put
else:
- # no workers, so we have to do the work ourselves
- if numchunks > 1:
- for i in xrange(numchunks):
- task.process(chunksize)
- # END for each chunk to put
- else:
+ queue.put((task.process, chunksize))
+ # END try efficient looping
+
+ if remainder:
+ queue.put((task.process, remainder))
+ # END handle chunksize
+ else:
+ # no workers, so we have to do the work ourselves
+ if numchunks > 1:
+ for i in xrange(numchunks):
task.process(chunksize)
- # END try efficient looping
-
- if remainder:
- task.process(remainder)
- # END handle chunksize
- # END handle serial mode
- # END handle queuing
+ # END for each chunk to put
+ else:
+ task.process(chunksize)
+ # END try efficient looping
+
+ if remainder:
+ task.process(remainder)
+ # END handle chunksize
+ # END handle serial mode
# END for each task to process
@@ -297,11 +407,22 @@ class Pool(object):
otherwise the work will be distributed among the given amount of threads
:note: currently NOT threadsafe !"""
+ assert size > -1, "Size cannot be negative"
+
# either start new threads, or kill existing ones.
# If we end up with no threads, we process the remaining chunks on the queue
# ourselves
cur_count = len(self._workers)
if cur_count < size:
+ # make sure we have a real queue, and can store our consumed tasks properly
+ if not isinstance(self._queue, self.TaskQueueCls):
+ if self._queue is not None and not self._queue.empty():
+ raise AssertionError("Expected empty queue when switching the queue type")
+ # END safety check
+ self._queue = self.TaskQueueCls()
+ self._consumed_tasks = Queue()
+ # END init queue
+
for i in range(size - cur_count):
worker = self.WorkerCls(self._queue)
worker.start()
@@ -323,6 +444,16 @@ class Pool(object):
except Queue.Empty:
continue
# END while there are tasks on the queue
+
+ # use a serial queue, its faster
+ if not isinstance(self._queue, SyncQueue):
+ self._queue = SyncQueue()
+ # END handle queue type
+
+ if self._consumed_tasks and not self._consumed_tasks.empty():
+ self._post_channel_read(self._consumed_tasks.pop())
+ # END assure consumed tasks are empty
+ self._consumed_tasks = SyncQueue()
# END process queue
return self
@@ -403,4 +534,4 @@ class ThreadPool(Pool):
"""A pool using threads as worker"""
WorkerCls = WorkerThread
LockCls = Lock
- TaskQueueCls = Queue
+ TaskQueueCls = PerfQueue