summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-07 17:16:48 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-07 17:16:48 +0200
commitedd9e23c766cfd51b3a6f6eee5aac0b791ef2fd0 (patch)
tree5b1972319baf4c2b3d25c57ec3c7e5f6b2c3d903 /lib/git/async/pool.py
parent8c3c271b0d6b5f56b86e3f177caf3e916b509b52 (diff)
downloadgitpython-edd9e23c766cfd51b3a6f6eee5aac0b791ef2fd0.tar.gz
added high-speed locking facilities, allowing our Queue to be faster, at least in tests, and with multiple threads. There is still an sync bug in regard to closed channels to be fixed, as the Task.set_done handling is incorrecft
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py241
1 files changed, 186 insertions, 55 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 26a6a182..30291835 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -1,8 +1,16 @@
"""Implementation of a thread-pool working with channels"""
from thread import WorkerThread
-from threading import Lock
+
+from threading import (
+ Lock,
+ _Condition,
+ _sleep,
+ _time,
+ )
+
from task import InputChannelTask
from Queue import Queue, Empty
+from collections import deque
from graph import (
Graph,
@@ -18,6 +26,96 @@ import weakref
import sys
+#{ Utilities
+
+class SyncQueue(deque):
+ """Adapter to allow using a deque like a queue, without locking"""
+ def get(self, block=True, timeout=None):
+ try:
+ return self.pop()
+ except IndexError:
+ raise Empty
+ # END raise empty
+
+ def empty(self):
+ return len(self) == 0
+
+ put = deque.append
+
+
+class HSCondition(_Condition):
+ """An attempt to make conditions less blocking, which gains performance
+ in return by sleeping less"""
+ delay = 0.00002 # reduces wait times, but increases overhead
+
+ def wait(self, timeout=None):
+ waiter = Lock()
+ waiter.acquire()
+ self.__dict__['_Condition__waiters'].append(waiter)
+ saved_state = self._release_save()
+ try: # restore state no matter what (e.g., KeyboardInterrupt)
+ if timeout is None:
+ waiter.acquire()
+ else:
+ # Balancing act: We can't afford a pure busy loop, so we
+ # have to sleep; but if we sleep the whole timeout time,
+ # we'll be unresponsive. The scheme here sleeps very
+ # little at first, longer as time goes on, but never longer
+ # than 20 times per second (or the timeout time remaining).
+ endtime = _time() + timeout
+ delay = self.delay
+ acquire = waiter.acquire
+ while True:
+ gotit = acquire(0)
+ if gotit:
+ break
+ remaining = endtime - _time()
+ if remaining <= 0:
+ break
+ delay = min(delay * 2, remaining, .05)
+ _sleep(delay)
+ # END endless loop
+ if not gotit:
+ try:
+ self.__dict__['_Condition__waiters'].remove(waiter)
+ except ValueError:
+ pass
+ # END didn't ever get it
+ finally:
+ self._acquire_restore(saved_state)
+
+ def notify(self, n=1):
+ __waiters = self.__dict__['_Condition__waiters']
+ if not __waiters:
+ return
+ if n == 1:
+ __waiters[0].release()
+ try:
+ __waiters.pop(0)
+ except IndexError:
+ pass
+ else:
+ waiters = __waiters[:n]
+ for waiter in waiters:
+ waiter.release()
+ try:
+ __waiters.remove(waiter)
+ except ValueError:
+ pass
+ # END handle n = 1 case faster
+
+class PerfQueue(Queue):
+ """A queue using different condition objects to gain multithreading performance"""
+ def __init__(self, maxsize=0):
+ Queue.__init__(self, maxsize)
+
+ self.not_empty = HSCondition(self.mutex)
+ self.not_full = HSCondition(self.mutex)
+ self.all_tasks_done = HSCondition(self.mutex)
+
+
+#} END utilities
+
class RPoolChannel(RChannel):
""" A read-only pool channel may not be wrapped or derived from, but it provides slots to call
before and after an item is to be read.
@@ -49,7 +147,7 @@ class RPoolChannel(RChannel):
returns a possibly changed item list. If it raises, the exception will be propagated.
If a function is not provided, the call is effectively uninstalled."""
self._post_cb = fun
-
+
def read(self, count=0, block=True, timeout=None):
"""Read an item that was processed by one of our threads
:note: Triggers task dependency handling needed to provide the necessary
@@ -58,8 +156,18 @@ class RPoolChannel(RChannel):
self._pre_cb()
# END pre callback
+ # if we have count items, don't do any queue preparation - if someone
+ # depletes the queue in the meanwhile, the channel will close and
+ # we will unblock naturally
+ have_enough = False
+ if count > 0:
+ # explicitly > count, as we want a certain safe range
+ have_enough = self._wc._queue.qsize() > count
+ # END risky game
+
########## prepare ##############################
- self._pool._prepare_channel_read(self._task, count)
+ if not have_enough:
+ self._pool._prepare_channel_read(self._task, count)
######### read data ######
@@ -127,9 +235,9 @@ class Pool(object):
def __init__(self, size=0):
self._tasks = Graph()
- self._consumed_tasks = Queue() # make sure its threadsafe
+ self._consumed_tasks = None
self._workers = list()
- self._queue = self.TaskQueueCls()
+ self._queue = SyncQueue() # start with a sync queue
self._taskgraph_lock = self.LockCls()
self._taskorder_cache = dict()
self.set_size(size)
@@ -201,58 +309,60 @@ class Pool(object):
# if the task does not have the required output on its queue, schedule
# it for processing. If we should process all, we don't care about the
# amount as it should process until its all done.
- # NOTE: revise this for multi-tasking - checking qsize doesnt work there !
- if count < 1 or task._out_wc.size() < count:
- # but we continue to use the actual count to produce the output
- numchunks = 1
- chunksize = actual_count
- remainder = 0
-
- # we need the count set for this - can't chunk up unlimited items
- # In serial mode we could do this by checking for empty input channels,
- # but in dispatch mode its impossible ( == not easily possible )
- # Only try it if we have enough demand
- if task.max_chunksize and actual_count > task.max_chunksize:
- numchunks = actual_count / task.max_chunksize
- chunksize = task.max_chunksize
- remainder = actual_count - (numchunks * chunksize)
- # END handle chunking
-
- # the following loops are kind of unrolled - code duplication
- # should make things execute faster. Putting the if statements
- # into the loop would be less code, but ... slower
- # DEBUG
- # print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
- if self._workers:
- # respect the chunk size, and split the task up if we want
- # to process too much. This can be defined per task
- queue = self._queue
- if numchunks > 1:
- for i in xrange(numchunks):
- queue.put((task.process, chunksize))
- # END for each chunk to put
- else:
+ #if count > 1 and task._out_wc.size() >= count:
+ # continue
+ # END skip if we have enough
+
+ # but use the actual count to produce the output, we may produce
+ # more than requested
+ numchunks = 1
+ chunksize = actual_count
+ remainder = 0
+
+ # we need the count set for this - can't chunk up unlimited items
+ # In serial mode we could do this by checking for empty input channels,
+ # but in dispatch mode its impossible ( == not easily possible )
+ # Only try it if we have enough demand
+ if task.max_chunksize and actual_count > task.max_chunksize:
+ numchunks = actual_count / task.max_chunksize
+ chunksize = task.max_chunksize
+ remainder = actual_count - (numchunks * chunksize)
+ # END handle chunking
+
+ # the following loops are kind of unrolled - code duplication
+ # should make things execute faster. Putting the if statements
+ # into the loop would be less code, but ... slower
+ # DEBUG
+ # print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
+ if self._workers:
+ # respect the chunk size, and split the task up if we want
+ # to process too much. This can be defined per task
+ queue = self._queue
+ if numchunks > 1:
+ for i in xrange(numchunks):
queue.put((task.process, chunksize))
- # END try efficient looping
-
- if remainder:
- queue.put((task.process, remainder))
- # END handle chunksize
+ # END for each chunk to put
else:
- # no workers, so we have to do the work ourselves
- if numchunks > 1:
- for i in xrange(numchunks):
- task.process(chunksize)
- # END for each chunk to put
- else:
+ queue.put((task.process, chunksize))
+ # END try efficient looping
+
+ if remainder:
+ queue.put((task.process, remainder))
+ # END handle chunksize
+ else:
+ # no workers, so we have to do the work ourselves
+ if numchunks > 1:
+ for i in xrange(numchunks):
task.process(chunksize)
- # END try efficient looping
-
- if remainder:
- task.process(remainder)
- # END handle chunksize
- # END handle serial mode
- # END handle queuing
+ # END for each chunk to put
+ else:
+ task.process(chunksize)
+ # END try efficient looping
+
+ if remainder:
+ task.process(remainder)
+ # END handle chunksize
+ # END handle serial mode
# END for each task to process
@@ -297,11 +407,22 @@ class Pool(object):
otherwise the work will be distributed among the given amount of threads
:note: currently NOT threadsafe !"""
+ assert size > -1, "Size cannot be negative"
+
# either start new threads, or kill existing ones.
# If we end up with no threads, we process the remaining chunks on the queue
# ourselves
cur_count = len(self._workers)
if cur_count < size:
+ # make sure we have a real queue, and can store our consumed tasks properly
+ if not isinstance(self._queue, self.TaskQueueCls):
+ if self._queue is not None and not self._queue.empty():
+ raise AssertionError("Expected empty queue when switching the queue type")
+ # END safety check
+ self._queue = self.TaskQueueCls()
+ self._consumed_tasks = Queue()
+ # END init queue
+
for i in range(size - cur_count):
worker = self.WorkerCls(self._queue)
worker.start()
@@ -323,6 +444,16 @@ class Pool(object):
except Queue.Empty:
continue
# END while there are tasks on the queue
+
+ # use a serial queue, its faster
+ if not isinstance(self._queue, SyncQueue):
+ self._queue = SyncQueue()
+ # END handle queue type
+
+ if self._consumed_tasks and not self._consumed_tasks.empty():
+ self._post_channel_read(self._consumed_tasks.pop())
+ # END assure consumed tasks are empty
+ self._consumed_tasks = SyncQueue()
# END process queue
return self
@@ -403,4 +534,4 @@ class ThreadPool(Pool):
"""A pool using threads as worker"""
WorkerCls = WorkerThread
LockCls = Lock
- TaskQueueCls = Queue
+ TaskQueueCls = PerfQueue