summaryrefslogtreecommitdiff
path: root/lib/git
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-07 12:10:56 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-07 12:19:44 +0200
commit619662a9138fd78df02c52cae6dc89db1d70a0e5 (patch)
treede9ea3a6982285c2194d6afed535ba87b077b2f3 /lib/git
parenta8a448b7864e21db46184eab0f0a21d7725d074f (diff)
downloadgitpython-619662a9138fd78df02c52cae6dc89db1d70a0e5.tar.gz
changed scheduling and chunksize calculation in respect to the task.min_count, to fix theoretical option for a deadlock in serial mode, and unnecessary blocking in async mode
Diffstat (limited to 'lib/git')
-rw-r--r--lib/git/async/pool.py213
1 files changed, 137 insertions, 76 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 5518e37e..009096f2 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -1,5 +1,6 @@
"""Implementation of a thread-pool working with channels"""
from thread import WorkerThread
+from threading import Lock
from task import InputChannelTask
from Queue import Queue, Empty
@@ -83,7 +84,7 @@ class RPoolChannel(RChannel):
#} END internal
-class ThreadPool(object):
+class Pool(object):
"""A thread pool maintains a set of one or more worker threads, but supports
a fully serial mode in which case the amount of threads is zero.
@@ -106,88 +107,35 @@ class ThreadPool(object):
'_consumed_tasks', # a queue with tasks that are done or had an error
'_workers', # list of worker threads
'_queue', # master queue for tasks
+ '_taskgraph_lock', # lock for accessing the task graph
)
+ # CONFIGURATION
+ # The type of worker to create - its expected to provide the Thread interface,
+ # taking the taskqueue as only init argument
+ # as well as a method called stop_and_join() to terminate it
+ WorkerCls = None
+
+ # The type of lock to use to protect critical sections, providing the
+ # threading.Lock interface
+ LockCls = None
+
+ # the type of the task queue to use - it must provide the Queue interface
+ TaskQueueCls = None
+
+
def __init__(self, size=0):
self._tasks = Graph()
self._consumed_tasks = Queue() # make sure its threadsafe
self._workers = list()
- self._queue = Queue()
+ self._queue = self.TaskQueueCls()
+ self._taskgraph_lock = self.LockCls()
self.set_size(size)
def __del__(self):
self.set_size(0)
#{ Internal
- def _queue_feeder_visitor(self, task, count):
- """Walk the graph and find tasks that are done for later cleanup, and
- queue all others for processing by our worker threads ( if available )."""
- if task.error() or task.is_done():
- self._consumed_tasks.put(task)
- return True
- # END stop processing
-
- # if the task does not have the required output on its queue, schedule
- # it for processing. If we should process all, we don't care about the
- # amount as it should process until its all done.
- if count < 1 or task._out_wc.size() < count:
- # allow min-count override. This makes sure we take at least min-count
- # items off the input queue ( later )
- if task.min_count is not None and 0 < count < task.min_count:
- count = task.min_count
- # END handle min-count
-
- numchunks = 1
- chunksize = count
- remainder = 0
-
- # we need the count set for this - can't chunk up unlimited items
- # In serial mode we could do this by checking for empty input channels,
- # but in dispatch mode its impossible ( == not easily possible )
- # Only try it if we have enough demand
- if task.max_chunksize and count > task.max_chunksize:
- numchunks = count / task.max_chunksize
- chunksize = task.max_chunksize
- remainder = count - (numchunks * chunksize)
- # END handle chunking
-
- # the following loops are kind of unrolled - code duplication
- # should make things execute faster. Putting the if statements
- # into the loop would be less code, but ... slower
- print count, numchunks, chunksize, remainder, task._out_wc.size()
- if self._workers:
- # respect the chunk size, and split the task up if we want
- # to process too much. This can be defined per task
- queue = self._queue
- if numchunks > 1:
- for i in xrange(numchunks):
- queue.put((task.process, chunksize))
- # END for each chunk to put
- else:
- queue.put((task.process, chunksize))
- # END try efficient looping
-
- if remainder:
- queue.put((task.process, remainder))
- # END handle chunksize
- else:
- # no workers, so we have to do the work ourselves
- if numchunks > 1:
- for i in xrange(numchunks):
- task.process(chunksize)
- # END for each chunk to put
- else:
- task.process(chunksize)
- # END try efficient looping
-
- if remainder:
- task.process(remainder)
- # END handle chunksize
- # END handle serial mode
- # END handle queuing
-
- # always walk the whole graph, we want to find consumed tasks
- return True
def _prepare_channel_read(self, task, count):
"""Process the tasks which depend on the given one to be sure the input
@@ -201,7 +149,98 @@ class ThreadPool(object):
Tasks which are not done will be put onto the queue for processing, which
is fine as we walked them depth-first."""
- self._tasks.visit_input_inclusive_depth_first(task, lambda n: self._queue_feeder_visitor(n, count))
+ dfirst_tasks = list()
+ # for the walk, we must make sure the ordering does not change
+ # Note: the result of this could be cached
+ self._tasks.visit_input_inclusive_depth_first(task, lambda n: dfirst_tasks.append(n))
+
+ # check the min count on all involved tasks, and be sure that we don't
+ # have any task which produces less than the maximum min-count of all tasks
+ # The actual_count is used when chunking tasks up for the queue, whereas
+ # the count is usued to determine whether we still have enough output
+ # on the queue, checking qsize ( ->revise )
+ # ABTRACT: If T depends on T-1, and the client wants 1 item, T produces
+ # at least 10, T-1 goes with 1, then T will block after 1 item, which
+ # is read by the client. On the next read of 1 item, we would find T's
+ # queue empty and put in another 10, which could put another thread into
+ # blocking state. T-1 produces one more item, which is consumed right away
+ # by the two threads running T. Although this works in the end, it leaves
+ # many threads blocking and waiting for input, which is not desired.
+ # Setting the min-count to the max of the mincount of all tasks assures
+ # we have enough items for all.
+ # Addition: in serial mode, we would enter a deadlock if one task would
+ # ever wait for items !
+ actual_count = count
+ min_counts = (((t.min_count is not None and t.min_count) or count) for t in dfirst_tasks)
+ min_count = reduce(lambda m1, m2: max(m1, m2), min_counts)
+ if 0 < count < min_count:
+ actual_count = min_count
+ # END set actual count
+
+ # the list includes our tasks - the first one to evaluate first, the
+ # requested one last
+ for task in dfirst_tasks:
+ if task.error() or task.is_done():
+ self._consumed_tasks.put(task)
+ continue
+ # END skip processing
+
+ # if the task does not have the required output on its queue, schedule
+ # it for processing. If we should process all, we don't care about the
+ # amount as it should process until its all done.
+ # NOTE: revise this for multi-tasking - checking qsize doesnt work there !
+ if count < 1 or task._out_wc.size() < count:
+ # but we continue to use the actual count to produce the output
+ numchunks = 1
+ chunksize = actual_count
+ remainder = 0
+
+ # we need the count set for this - can't chunk up unlimited items
+ # In serial mode we could do this by checking for empty input channels,
+ # but in dispatch mode its impossible ( == not easily possible )
+ # Only try it if we have enough demand
+ if task.max_chunksize and actual_count > task.max_chunksize:
+ numchunks = actual_count / task.max_chunksize
+ chunksize = task.max_chunksize
+ remainder = actual_count - (numchunks * chunksize)
+ # END handle chunking
+
+ # the following loops are kind of unrolled - code duplication
+ # should make things execute faster. Putting the if statements
+ # into the loop would be less code, but ... slower
+ print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
+ if self._workers:
+ # respect the chunk size, and split the task up if we want
+ # to process too much. This can be defined per task
+ queue = self._queue
+ if numchunks > 1:
+ for i in xrange(numchunks):
+ queue.put((task.process, chunksize))
+ # END for each chunk to put
+ else:
+ queue.put((task.process, chunksize))
+ # END try efficient looping
+
+ if remainder:
+ queue.put((task.process, remainder))
+ # END handle chunksize
+ else:
+ # no workers, so we have to do the work ourselves
+ if numchunks > 1:
+ for i in xrange(numchunks):
+ task.process(chunksize)
+ # END for each chunk to put
+ else:
+ task.process(chunksize)
+ # END try efficient looping
+
+ if remainder:
+ task.process(remainder)
+ # END handle chunksize
+ # END handle serial mode
+ # END handle queuing
+ # END for each task to process
+
def _post_channel_read(self, task):
"""Called after we processed a read to cleanup"""
@@ -250,7 +289,7 @@ class ThreadPool(object):
cur_count = len(self._workers)
if cur_count < size:
for i in range(size - cur_count):
- worker = WorkerThread(self._queue)
+ worker = self.WorkerCls(self._queue)
worker.start()
self._workers.append(worker)
# END for each new worker to create
@@ -291,7 +330,12 @@ class ThreadPool(object):
# keep its input nodes as we check whether they were orphaned
in_tasks = task.in_nodes
task.set_done()
- self._tasks.del_node(task)
+ self._taskgraph_lock.acquire()
+ try:
+ self._tasks.del_node(task)
+ finally:
+ self._taskgraph_lock.release()
+ # END locked deletion
for t in in_tasks:
self._del_task_if_orphaned(t)
@@ -314,16 +358,33 @@ class ThreadPool(object):
task._pool_ref = weakref.ref(self)
# END init input channel task
- self._tasks.add_node(task)
+ self._taskgraph_lock.acquire()
+ try:
+ self._tasks.add_node(task)
+ finally:
+ self._taskgraph_lock.release()
+ # END sync task addition
# If the input channel is one of our read channels, we add the relation
if has_input_channel:
ic = task.in_rc
if isinstance(ic, RPoolChannel) and ic._pool is self:
- self._tasks.add_edge(ic._task, task)
+ self._taskgraph_lock.acquire()
+ try:
+ self._tasks.add_edge(ic._task, task)
+ finally:
+ self._taskgraph_lock.release()
+ # END handle edge-adding
# END add task relation
# END handle input channels for connections
return rc
#} END interface
+
+
+class ThreadPool(Pool):
+ """A pool using threads as worker"""
+ WorkerCls = WorkerThread
+ LockCls = Lock
+ TaskQueueCls = Queue