summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-07 23:49:20 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-07 23:49:20 +0200
commit3e2ba9c2028f21d11988558f3557905d21e93808 (patch)
treeac2426a03dbc538fb970cd4fd22a404edb68ce53
parentbe06e87433685b5ea9cfcc131ab89c56cf8292f2 (diff)
parent898d47d1711accdfded8ee470520fdb96fb12d46 (diff)
downloadgitpython-3e2ba9c2028f21d11988558f3557905d21e93808.tar.gz
Merge branch 'stasks' into async
-rw-r--r--lib/git/async/channel.py4
-rw-r--r--lib/git/async/pool.py21
-rw-r--r--lib/git/async/task.py47
-rw-r--r--lib/git/async/util.py14
-rw-r--r--test/git/async/test_pool.py8
5 files changed, 85 insertions, 9 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
index 2add9478..2d5ab79c 100644
--- a/lib/git/async/channel.py
+++ b/lib/git/async/channel.py
@@ -1,10 +1,10 @@
"""Contains a queue based channel implementation"""
from Queue import (
- Queue,
Empty,
Full
)
+from util import AsyncQueue
from time import time
import sys
@@ -43,7 +43,7 @@ class WChannel(Channel):
"""initialize this instance, able to hold max_items at once
Write calls will block if the channel is full, until someone reads from it"""
self._closed = False
- self._queue = Queue(max_items)
+ self._queue = AsyncQueue(max_items)
#{ Interface
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 19fc9f6e..d6b5711d 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -5,6 +5,7 @@ from threading import Lock
from util import (
SyncQueue,
AsyncQueue,
+ DummyLock
)
from task import InputChannelTask
@@ -80,12 +81,13 @@ class RPoolChannel(RChannel):
# *
have_enough = False
if count > 0:
- have_enough = self._wc._queue.qsize() >= count
- # END risky game
+ have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count
+ # END
########## prepare ##############################
if not have_enough:
self._pool._prepare_channel_read(self._task, count)
+ # END prepare pool scheduling
####### read data ########
@@ -260,26 +262,33 @@ class Pool(object):
queue = self._queue
if numchunks > 1:
for i in xrange(numchunks):
+ # schedule them as early as we know about them
+ task.add_scheduled_items(chunksize)
queue.put((task.process, chunksize))
# END for each chunk to put
else:
+ task.add_scheduled_items(chunksize)
queue.put((task.process, chunksize))
# END try efficient looping
if remainder:
+ task.add_scheduled_items(remainder)
queue.put((task.process, remainder))
# END handle chunksize
else:
# no workers, so we have to do the work ourselves
if numchunks > 1:
for i in xrange(numchunks):
+ task.add_scheduled_items(chunksize)
task.process(chunksize)
# END for each chunk to put
else:
+ task.add_scheduled_items(chunksize)
task.process(chunksize)
# END try efficient looping
if remainder:
+ task.add_scheduled_items(remainder)
task.process(remainder)
# END handle chunksize
# END handle serial mode
@@ -348,6 +357,9 @@ class Pool(object):
self._workers.append(worker)
# END for each new worker to create
elif cur_count > size:
+ # we can safely increase the size, even from serial mode, as we would
+ # only be able to do this if the serial ( sync ) mode finished processing.
+ # Just adding more workers is not a problem at all.
del_count = cur_count - size
for i in range(del_count):
self._workers[i].stop_and_join()
@@ -451,6 +463,11 @@ class Pool(object):
# END add task relation
# END handle input channels for connections
+ # fix locks - in serial mode, the task does not need real locks
+ if self.size() == 0:
+ task._slock = DummyLock()
+ # END improve locks
+
return rc
#} END interface
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index 4e8aef54..cf486f48 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -21,6 +21,8 @@ class OutputChannelTask(Node):
'_out_wc', # output write channel
'_exc', # exception caught
'_done', # True if we are done
+ '_scheduled_items', # amount of scheduled items that will be processed in total
+ '_slock', # lock for scheduled items
'fun', # function to call with items read
'min_count', # minimum amount of items to produce, None means no override
'max_chunksize', # maximium amount of items to process per process call
@@ -33,6 +35,8 @@ class OutputChannelTask(Node):
self._out_wc = None # to be set later
self._exc = None
self._done = False
+ self._scheduled_items = 0
+ self._slock = threading.Lock()
self.fun = fun
self.min_count = None
self.max_chunksize = 0 # note set
@@ -50,6 +54,7 @@ class OutputChannelTask(Node):
"""Set the write channel to the given one
:note: resets it done state in order to allow proper queue handling"""
self._done = False
+ self._scheduled_items = 0
self._out_wc = wc
def close(self):
@@ -65,6 +70,21 @@ class OutputChannelTask(Node):
""":return: Exception caught during last processing or None"""
return self._exc
+ def add_scheduled_items(self, count):
+ """Add the given amount of scheduled items to this task"""
+ self._slock.acquire()
+ self._scheduled_items += count
+ self._slock.release()
+
+ def scheduled_item_count(self):
+ """:return: amount of scheduled items for this task"""
+ self._slock.acquire()
+ try:
+ return self._scheduled_items
+ finally:
+ self._slock.release()
+ # END threadsafe return
+
def process(self, count=0):
"""Process count items and send the result individually to the output channel"""
items = self._read(count)
@@ -78,14 +98,33 @@ class OutputChannelTask(Node):
wc = self._out_wc
if self.apply_single:
for item in items:
- wc.write(self.fun(item))
+ rval = self.fun(item)
+ # decrement afterwards, the its unscheduled once its produced
+ self._slock.acquire()
+ self._scheduled_items -= 1
+ self._slock.release()
+ wc.write(rval)
# END for each item
else:
- wc.write(self.fun(items))
+ # shouldn't apply single be the default anyway ?
+ # The task designers should chunk them up in advance
+ rvals = self.fun(items)
+ self._slock.acquire()
+ self._scheduled_items -= len(items)
+ self._slock.release()
+ for rval in rvals:
+ wc.write(rval)
# END handle single apply
except Exception, e:
self._exc = e
self.set_done()
+ # unschedule all, we don't know how many have been produced actually
+ # but only if we don't apply single please
+ if not self.apply_single:
+ self._slock.acquire()
+ self._scheduled_items -= len(items)
+ self._slock.release()
+ # END unschedule all
# END exception handling
del(wc)
@@ -189,6 +228,10 @@ class InputChannelTask(OutputChannelTask):
# for each task, which would allow to precisely determine whether
# the pool as to be triggered, and bail out early. Problem would
# be the
+ # * Perhaps one shouldn't seek the perfect solution , but instead
+ # document whats working and what not, or under which conditions.
+ # The whole system is simple, but gets more complicated the
+ # smarter it wants to be.
if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref():
self._read = self._in_rc._read
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
index 85d44694..55766579 100644
--- a/lib/git/async/util.py
+++ b/lib/git/async/util.py
@@ -41,6 +41,18 @@ def cpu_count():
#} END routines
+
+class DummyLock(object):
+ """An object providing a do-nothing lock interface for use in sync mode"""
+ __slots__ = tuple()
+
+ def acquire(self):
+ pass
+
+ def release(self):
+ pass
+
+
class SyncQueue(deque):
"""Adapter to allow using a deque like a queue, without locking"""
def get(self, block=True, timeout=None):
@@ -59,7 +71,7 @@ class SyncQueue(deque):
class HSCondition(_Condition):
"""An attempt to make conditions less blocking, which gains performance
in return by sleeping less"""
- delay = 0.00005 # reduces wait times, but increases overhead
+ delay = 0.00002 # reduces wait times, but increases overhead
def wait(self, timeout=None):
waiter = Lock()
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
index 2b45727c..29c13188 100644
--- a/test/git/async/test_pool.py
+++ b/test/git/async/test_pool.py
@@ -36,7 +36,7 @@ class TestThreadTaskNode(InputIteratorThreadTask):
self.plock.release()
super(TestThreadTaskNode, self).process(count)
- def _assert(self, pc, fc):
+ def _assert(self, pc, fc, check_scheduled=False):
"""Assert for num process counts (pc) and num function counts (fc)
:return: self"""
self.plock.acquire()
@@ -49,6 +49,10 @@ class TestThreadTaskNode(InputIteratorThreadTask):
print self.item_count, fc
assert self.item_count == fc
self.lock.release()
+
+ # if we read all, we can't really use scheduled items
+ if check_scheduled:
+ assert self._scheduled_items == 0
assert not self.error()
return self
@@ -184,7 +188,7 @@ class TestThreadPool(TestBase):
else:
assert rc.read(1)[0] == i
# END for each item
- task._assert(ni / task.min_count + 1, ni)
+ task._assert(ni / task.min_count, ni)
del(rc)
assert p.num_tasks() == null_tasks