diff options
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r-- | lib/git/async/pool.py | 241 |
1 files changed, 186 insertions, 55 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 26a6a182..30291835 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,8 +1,16 @@ """Implementation of a thread-pool working with channels""" from thread import WorkerThread -from threading import Lock + +from threading import ( + Lock, + _Condition, + _sleep, + _time, + ) + from task import InputChannelTask from Queue import Queue, Empty +from collections import deque from graph import ( Graph, @@ -18,6 +26,96 @@ import weakref import sys +#{ Utilities + +class SyncQueue(deque): + """Adapter to allow using a deque like a queue, without locking""" + def get(self, block=True, timeout=None): + try: + return self.pop() + except IndexError: + raise Empty + # END raise empty + + def empty(self): + return len(self) == 0 + + put = deque.append + + +class HSCondition(_Condition): + """An attempt to make conditions less blocking, which gains performance + in return by sleeping less""" + delay = 0.00002 # reduces wait times, but increases overhead + + def wait(self, timeout=None): + waiter = Lock() + waiter.acquire() + self.__dict__['_Condition__waiters'].append(waiter) + saved_state = self._release_save() + try: # restore state no matter what (e.g., KeyboardInterrupt) + if timeout is None: + waiter.acquire() + else: + # Balancing act: We can't afford a pure busy loop, so we + # have to sleep; but if we sleep the whole timeout time, + # we'll be unresponsive. The scheme here sleeps very + # little at first, longer as time goes on, but never longer + # than 20 times per second (or the timeout time remaining). + endtime = _time() + timeout + delay = self.delay + acquire = waiter.acquire + while True: + gotit = acquire(0) + if gotit: + break + remaining = endtime - _time() + if remaining <= 0: + break + delay = min(delay * 2, remaining, .05) + _sleep(delay) + # END endless loop + if not gotit: + try: + self.__dict__['_Condition__waiters'].remove(waiter) + except ValueError: + pass + # END didn't ever get it + finally: + self._acquire_restore(saved_state) + + def notify(self, n=1): + __waiters = self.__dict__['_Condition__waiters'] + if not __waiters: + return + if n == 1: + __waiters[0].release() + try: + __waiters.pop(0) + except IndexError: + pass + else: + waiters = __waiters[:n] + for waiter in waiters: + waiter.release() + try: + __waiters.remove(waiter) + except ValueError: + pass + # END handle n = 1 case faster + +class PerfQueue(Queue): + """A queue using different condition objects to gain multithreading performance""" + def __init__(self, maxsize=0): + Queue.__init__(self, maxsize) + + self.not_empty = HSCondition(self.mutex) + self.not_full = HSCondition(self.mutex) + self.all_tasks_done = HSCondition(self.mutex) + + +#} END utilities + class RPoolChannel(RChannel): """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call before and after an item is to be read. @@ -49,7 +147,7 @@ class RPoolChannel(RChannel): returns a possibly changed item list. If it raises, the exception will be propagated. If a function is not provided, the call is effectively uninstalled.""" self._post_cb = fun - + def read(self, count=0, block=True, timeout=None): """Read an item that was processed by one of our threads :note: Triggers task dependency handling needed to provide the necessary @@ -58,8 +156,18 @@ class RPoolChannel(RChannel): self._pre_cb() # END pre callback + # if we have count items, don't do any queue preparation - if someone + # depletes the queue in the meanwhile, the channel will close and + # we will unblock naturally + have_enough = False + if count > 0: + # explicitly > count, as we want a certain safe range + have_enough = self._wc._queue.qsize() > count + # END risky game + ########## prepare ############################## - self._pool._prepare_channel_read(self._task, count) + if not have_enough: + self._pool._prepare_channel_read(self._task, count) ######### read data ###### @@ -127,9 +235,9 @@ class Pool(object): def __init__(self, size=0): self._tasks = Graph() - self._consumed_tasks = Queue() # make sure its threadsafe + self._consumed_tasks = None self._workers = list() - self._queue = self.TaskQueueCls() + self._queue = SyncQueue() # start with a sync queue self._taskgraph_lock = self.LockCls() self._taskorder_cache = dict() self.set_size(size) @@ -201,58 +309,60 @@ class Pool(object): # if the task does not have the required output on its queue, schedule # it for processing. If we should process all, we don't care about the # amount as it should process until its all done. - # NOTE: revise this for multi-tasking - checking qsize doesnt work there ! - if count < 1 or task._out_wc.size() < count: - # but we continue to use the actual count to produce the output - numchunks = 1 - chunksize = actual_count - remainder = 0 - - # we need the count set for this - can't chunk up unlimited items - # In serial mode we could do this by checking for empty input channels, - # but in dispatch mode its impossible ( == not easily possible ) - # Only try it if we have enough demand - if task.max_chunksize and actual_count > task.max_chunksize: - numchunks = actual_count / task.max_chunksize - chunksize = task.max_chunksize - remainder = actual_count - (numchunks * chunksize) - # END handle chunking - - # the following loops are kind of unrolled - code duplication - # should make things execute faster. Putting the if statements - # into the loop would be less code, but ... slower - # DEBUG - # print actual_count, numchunks, chunksize, remainder, task._out_wc.size() - if self._workers: - # respect the chunk size, and split the task up if we want - # to process too much. This can be defined per task - queue = self._queue - if numchunks > 1: - for i in xrange(numchunks): - queue.put((task.process, chunksize)) - # END for each chunk to put - else: + #if count > 1 and task._out_wc.size() >= count: + # continue + # END skip if we have enough + + # but use the actual count to produce the output, we may produce + # more than requested + numchunks = 1 + chunksize = actual_count + remainder = 0 + + # we need the count set for this - can't chunk up unlimited items + # In serial mode we could do this by checking for empty input channels, + # but in dispatch mode its impossible ( == not easily possible ) + # Only try it if we have enough demand + if task.max_chunksize and actual_count > task.max_chunksize: + numchunks = actual_count / task.max_chunksize + chunksize = task.max_chunksize + remainder = actual_count - (numchunks * chunksize) + # END handle chunking + + # the following loops are kind of unrolled - code duplication + # should make things execute faster. Putting the if statements + # into the loop would be less code, but ... slower + # DEBUG + # print actual_count, numchunks, chunksize, remainder, task._out_wc.size() + if self._workers: + # respect the chunk size, and split the task up if we want + # to process too much. This can be defined per task + queue = self._queue + if numchunks > 1: + for i in xrange(numchunks): queue.put((task.process, chunksize)) - # END try efficient looping - - if remainder: - queue.put((task.process, remainder)) - # END handle chunksize + # END for each chunk to put else: - # no workers, so we have to do the work ourselves - if numchunks > 1: - for i in xrange(numchunks): - task.process(chunksize) - # END for each chunk to put - else: + queue.put((task.process, chunksize)) + # END try efficient looping + + if remainder: + queue.put((task.process, remainder)) + # END handle chunksize + else: + # no workers, so we have to do the work ourselves + if numchunks > 1: + for i in xrange(numchunks): task.process(chunksize) - # END try efficient looping - - if remainder: - task.process(remainder) - # END handle chunksize - # END handle serial mode - # END handle queuing + # END for each chunk to put + else: + task.process(chunksize) + # END try efficient looping + + if remainder: + task.process(remainder) + # END handle chunksize + # END handle serial mode # END for each task to process @@ -297,11 +407,22 @@ class Pool(object): otherwise the work will be distributed among the given amount of threads :note: currently NOT threadsafe !""" + assert size > -1, "Size cannot be negative" + # either start new threads, or kill existing ones. # If we end up with no threads, we process the remaining chunks on the queue # ourselves cur_count = len(self._workers) if cur_count < size: + # make sure we have a real queue, and can store our consumed tasks properly + if not isinstance(self._queue, self.TaskQueueCls): + if self._queue is not None and not self._queue.empty(): + raise AssertionError("Expected empty queue when switching the queue type") + # END safety check + self._queue = self.TaskQueueCls() + self._consumed_tasks = Queue() + # END init queue + for i in range(size - cur_count): worker = self.WorkerCls(self._queue) worker.start() @@ -323,6 +444,16 @@ class Pool(object): except Queue.Empty: continue # END while there are tasks on the queue + + # use a serial queue, its faster + if not isinstance(self._queue, SyncQueue): + self._queue = SyncQueue() + # END handle queue type + + if self._consumed_tasks and not self._consumed_tasks.empty(): + self._post_channel_read(self._consumed_tasks.pop()) + # END assure consumed tasks are empty + self._consumed_tasks = SyncQueue() # END process queue return self @@ -403,4 +534,4 @@ class ThreadPool(Pool): """A pool using threads as worker""" WorkerCls = WorkerThread LockCls = Lock - TaskQueueCls = Queue + TaskQueueCls = PerfQueue |