summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-07 17:16:48 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-07 17:16:48 +0200
commitedd9e23c766cfd51b3a6f6eee5aac0b791ef2fd0 (patch)
tree5b1972319baf4c2b3d25c57ec3c7e5f6b2c3d903
parent8c3c271b0d6b5f56b86e3f177caf3e916b509b52 (diff)
downloadgitpython-edd9e23c766cfd51b3a6f6eee5aac0b791ef2fd0.tar.gz
added high-speed locking facilities, allowing our Queue to be faster, at least in tests, and with multiple threads. There is still an sync bug in regard to closed channels to be fixed, as the Task.set_done handling is incorrecft
-rw-r--r--lib/git/async/pool.py241
-rw-r--r--lib/git/async/thread.py3
-rw-r--r--test/git/async/test_pool.py20
3 files changed, 199 insertions, 65 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 26a6a182..30291835 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -1,8 +1,16 @@
"""Implementation of a thread-pool working with channels"""
from thread import WorkerThread
-from threading import Lock
+
+from threading import (
+ Lock,
+ _Condition,
+ _sleep,
+ _time,
+ )
+
from task import InputChannelTask
from Queue import Queue, Empty
+from collections import deque
from graph import (
Graph,
@@ -18,6 +26,96 @@ import weakref
import sys
+#{ Utilities
+
+class SyncQueue(deque):
+ """Adapter to allow using a deque like a queue, without locking"""
+ def get(self, block=True, timeout=None):
+ try:
+ return self.pop()
+ except IndexError:
+ raise Empty
+ # END raise empty
+
+ def empty(self):
+ return len(self) == 0
+
+ put = deque.append
+
+
+class HSCondition(_Condition):
+ """An attempt to make conditions less blocking, which gains performance
+ in return by sleeping less"""
+ delay = 0.00002 # reduces wait times, but increases overhead
+
+ def wait(self, timeout=None):
+ waiter = Lock()
+ waiter.acquire()
+ self.__dict__['_Condition__waiters'].append(waiter)
+ saved_state = self._release_save()
+ try: # restore state no matter what (e.g., KeyboardInterrupt)
+ if timeout is None:
+ waiter.acquire()
+ else:
+ # Balancing act: We can't afford a pure busy loop, so we
+ # have to sleep; but if we sleep the whole timeout time,
+ # we'll be unresponsive. The scheme here sleeps very
+ # little at first, longer as time goes on, but never longer
+ # than 20 times per second (or the timeout time remaining).
+ endtime = _time() + timeout
+ delay = self.delay
+ acquire = waiter.acquire
+ while True:
+ gotit = acquire(0)
+ if gotit:
+ break
+ remaining = endtime - _time()
+ if remaining <= 0:
+ break
+ delay = min(delay * 2, remaining, .05)
+ _sleep(delay)
+ # END endless loop
+ if not gotit:
+ try:
+ self.__dict__['_Condition__waiters'].remove(waiter)
+ except ValueError:
+ pass
+ # END didn't ever get it
+ finally:
+ self._acquire_restore(saved_state)
+
+ def notify(self, n=1):
+ __waiters = self.__dict__['_Condition__waiters']
+ if not __waiters:
+ return
+ if n == 1:
+ __waiters[0].release()
+ try:
+ __waiters.pop(0)
+ except IndexError:
+ pass
+ else:
+ waiters = __waiters[:n]
+ for waiter in waiters:
+ waiter.release()
+ try:
+ __waiters.remove(waiter)
+ except ValueError:
+ pass
+ # END handle n = 1 case faster
+
+class PerfQueue(Queue):
+ """A queue using different condition objects to gain multithreading performance"""
+ def __init__(self, maxsize=0):
+ Queue.__init__(self, maxsize)
+
+ self.not_empty = HSCondition(self.mutex)
+ self.not_full = HSCondition(self.mutex)
+ self.all_tasks_done = HSCondition(self.mutex)
+
+
+#} END utilities
+
class RPoolChannel(RChannel):
""" A read-only pool channel may not be wrapped or derived from, but it provides slots to call
before and after an item is to be read.
@@ -49,7 +147,7 @@ class RPoolChannel(RChannel):
returns a possibly changed item list. If it raises, the exception will be propagated.
If a function is not provided, the call is effectively uninstalled."""
self._post_cb = fun
-
+
def read(self, count=0, block=True, timeout=None):
"""Read an item that was processed by one of our threads
:note: Triggers task dependency handling needed to provide the necessary
@@ -58,8 +156,18 @@ class RPoolChannel(RChannel):
self._pre_cb()
# END pre callback
+ # if we have count items, don't do any queue preparation - if someone
+ # depletes the queue in the meanwhile, the channel will close and
+ # we will unblock naturally
+ have_enough = False
+ if count > 0:
+ # explicitly > count, as we want a certain safe range
+ have_enough = self._wc._queue.qsize() > count
+ # END risky game
+
########## prepare ##############################
- self._pool._prepare_channel_read(self._task, count)
+ if not have_enough:
+ self._pool._prepare_channel_read(self._task, count)
######### read data ######
@@ -127,9 +235,9 @@ class Pool(object):
def __init__(self, size=0):
self._tasks = Graph()
- self._consumed_tasks = Queue() # make sure its threadsafe
+ self._consumed_tasks = None
self._workers = list()
- self._queue = self.TaskQueueCls()
+ self._queue = SyncQueue() # start with a sync queue
self._taskgraph_lock = self.LockCls()
self._taskorder_cache = dict()
self.set_size(size)
@@ -201,58 +309,60 @@ class Pool(object):
# if the task does not have the required output on its queue, schedule
# it for processing. If we should process all, we don't care about the
# amount as it should process until its all done.
- # NOTE: revise this for multi-tasking - checking qsize doesnt work there !
- if count < 1 or task._out_wc.size() < count:
- # but we continue to use the actual count to produce the output
- numchunks = 1
- chunksize = actual_count
- remainder = 0
-
- # we need the count set for this - can't chunk up unlimited items
- # In serial mode we could do this by checking for empty input channels,
- # but in dispatch mode its impossible ( == not easily possible )
- # Only try it if we have enough demand
- if task.max_chunksize and actual_count > task.max_chunksize:
- numchunks = actual_count / task.max_chunksize
- chunksize = task.max_chunksize
- remainder = actual_count - (numchunks * chunksize)
- # END handle chunking
-
- # the following loops are kind of unrolled - code duplication
- # should make things execute faster. Putting the if statements
- # into the loop would be less code, but ... slower
- # DEBUG
- # print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
- if self._workers:
- # respect the chunk size, and split the task up if we want
- # to process too much. This can be defined per task
- queue = self._queue
- if numchunks > 1:
- for i in xrange(numchunks):
- queue.put((task.process, chunksize))
- # END for each chunk to put
- else:
+ #if count > 1 and task._out_wc.size() >= count:
+ # continue
+ # END skip if we have enough
+
+ # but use the actual count to produce the output, we may produce
+ # more than requested
+ numchunks = 1
+ chunksize = actual_count
+ remainder = 0
+
+ # we need the count set for this - can't chunk up unlimited items
+ # In serial mode we could do this by checking for empty input channels,
+ # but in dispatch mode its impossible ( == not easily possible )
+ # Only try it if we have enough demand
+ if task.max_chunksize and actual_count > task.max_chunksize:
+ numchunks = actual_count / task.max_chunksize
+ chunksize = task.max_chunksize
+ remainder = actual_count - (numchunks * chunksize)
+ # END handle chunking
+
+ # the following loops are kind of unrolled - code duplication
+ # should make things execute faster. Putting the if statements
+ # into the loop would be less code, but ... slower
+ # DEBUG
+ # print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
+ if self._workers:
+ # respect the chunk size, and split the task up if we want
+ # to process too much. This can be defined per task
+ queue = self._queue
+ if numchunks > 1:
+ for i in xrange(numchunks):
queue.put((task.process, chunksize))
- # END try efficient looping
-
- if remainder:
- queue.put((task.process, remainder))
- # END handle chunksize
+ # END for each chunk to put
else:
- # no workers, so we have to do the work ourselves
- if numchunks > 1:
- for i in xrange(numchunks):
- task.process(chunksize)
- # END for each chunk to put
- else:
+ queue.put((task.process, chunksize))
+ # END try efficient looping
+
+ if remainder:
+ queue.put((task.process, remainder))
+ # END handle chunksize
+ else:
+ # no workers, so we have to do the work ourselves
+ if numchunks > 1:
+ for i in xrange(numchunks):
task.process(chunksize)
- # END try efficient looping
-
- if remainder:
- task.process(remainder)
- # END handle chunksize
- # END handle serial mode
- # END handle queuing
+ # END for each chunk to put
+ else:
+ task.process(chunksize)
+ # END try efficient looping
+
+ if remainder:
+ task.process(remainder)
+ # END handle chunksize
+ # END handle serial mode
# END for each task to process
@@ -297,11 +407,22 @@ class Pool(object):
otherwise the work will be distributed among the given amount of threads
:note: currently NOT threadsafe !"""
+ assert size > -1, "Size cannot be negative"
+
# either start new threads, or kill existing ones.
# If we end up with no threads, we process the remaining chunks on the queue
# ourselves
cur_count = len(self._workers)
if cur_count < size:
+ # make sure we have a real queue, and can store our consumed tasks properly
+ if not isinstance(self._queue, self.TaskQueueCls):
+ if self._queue is not None and not self._queue.empty():
+ raise AssertionError("Expected empty queue when switching the queue type")
+ # END safety check
+ self._queue = self.TaskQueueCls()
+ self._consumed_tasks = Queue()
+ # END init queue
+
for i in range(size - cur_count):
worker = self.WorkerCls(self._queue)
worker.start()
@@ -323,6 +444,16 @@ class Pool(object):
except Queue.Empty:
continue
# END while there are tasks on the queue
+
+ # use a serial queue, its faster
+ if not isinstance(self._queue, SyncQueue):
+ self._queue = SyncQueue()
+ # END handle queue type
+
+ if self._consumed_tasks and not self._consumed_tasks.empty():
+ self._post_channel_read(self._consumed_tasks.pop())
+ # END assure consumed tasks are empty
+ self._consumed_tasks = SyncQueue()
# END process queue
return self
@@ -403,4 +534,4 @@ class ThreadPool(Pool):
"""A pool using threads as worker"""
WorkerCls = WorkerThread
LockCls = Lock
- TaskQueueCls = Queue
+ TaskQueueCls = PerfQueue
diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py
index f875f094..f7f0c978 100644
--- a/lib/git/async/thread.py
+++ b/lib/git/async/thread.py
@@ -15,9 +15,6 @@ def do_terminate_threads(whitelist=list()):
continue
if whitelist and t not in whitelist:
continue
- if isinstance(t, WorkerThread):
- t.inq.put(t.quit)
- # END worker special handling
t.stop_and_join()
# END for each thread
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
index df3eaf11..791f89d4 100644
--- a/test/git/async/test_pool.py
+++ b/test/git/async/test_pool.py
@@ -2,6 +2,7 @@
from test.testlib import *
from git.async.pool import *
from git.async.task import *
+from git.async.thread import terminate_threads
from git.async.util import cpu_count
import threading
import time
@@ -46,7 +47,7 @@ class TestThreadPool(TestBase):
# add a simple task
# it iterates n items
- ni = 1000
+ ni = 500
assert ni % 2 == 0, "ni needs to be dividable by 2"
assert ni % 4 == 0, "ni needs to be dividable by 4"
@@ -106,8 +107,9 @@ class TestThreadPool(TestBase):
assert len(rc.read(1)) == 1 # processes nothing
# rest - it has ni/2 - 2 on the queue, and pulls ni-2
# It wants too much, so the task realizes its done. The task
- # doesn't care about the items in its output channel
- assert len(rc.read(ni-2)) == ni - 2
+ # doesn't care about the items in its output channel
+ items = rc.read(ni-2)
+ assert len(items) == ni - 2
assert p.num_tasks() == null_tasks
task._assert(2, ni) # two chunks, 20 calls ( all items )
@@ -125,7 +127,8 @@ class TestThreadPool(TestBase):
assert len(rc.read(ni / 2 + 2)) == ni / 2 + 2
# have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing
# ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing
- assert len(rc.read(ni / 2 - 2)) == ni / 2 - 2
+ items = rc.read(ni / 2 - 2)
+ assert len(items) == ni / 2 - 2
task._assert( 5, ni)
assert p.num_tasks() == null_tasks # depleted
@@ -158,9 +161,12 @@ class TestThreadPool(TestBase):
task.min_count = ni / 4
rc = p.add_task(task)
for i in range(ni):
- assert rc.read(1)[0] == i
+ if async:
+ assert len(rc.read(1)) == 1
+ else:
+ assert rc.read(1)[0] == i
# END for each item
- task._assert(ni / task.min_count, ni)
+ task._assert(ni / task.min_count + 1, ni)
del(rc)
assert p.num_tasks() == null_tasks
@@ -181,6 +187,7 @@ class TestThreadPool(TestBase):
# t1 -> x -> t3
pass
+ @terminate_threads
def test_base(self):
p = ThreadPool()
@@ -239,7 +246,6 @@ class TestThreadPool(TestBase):
p.set_size(2)
self._assert_single_task(p, True)
-
# DEPENDENT TASK ASYNC MODE
###########################
self._assert_async_dependent_tasks(p)