diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 23:49:20 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 23:49:20 +0200 |
commit | 3e2ba9c2028f21d11988558f3557905d21e93808 (patch) | |
tree | ac2426a03dbc538fb970cd4fd22a404edb68ce53 /lib/git/async | |
parent | be06e87433685b5ea9cfcc131ab89c56cf8292f2 (diff) | |
parent | 898d47d1711accdfded8ee470520fdb96fb12d46 (diff) | |
download | gitpython-3e2ba9c2028f21d11988558f3557905d21e93808.tar.gz |
Merge branch 'stasks' into async
Diffstat (limited to 'lib/git/async')
-rw-r--r-- | lib/git/async/channel.py | 4 | ||||
-rw-r--r-- | lib/git/async/pool.py | 21 | ||||
-rw-r--r-- | lib/git/async/task.py | 47 | ||||
-rw-r--r-- | lib/git/async/util.py | 14 |
4 files changed, 79 insertions, 7 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 2add9478..2d5ab79c 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -1,10 +1,10 @@ """Contains a queue based channel implementation""" from Queue import ( - Queue, Empty, Full ) +from util import AsyncQueue from time import time import sys @@ -43,7 +43,7 @@ class WChannel(Channel): """initialize this instance, able to hold max_items at once Write calls will block if the channel is full, until someone reads from it""" self._closed = False - self._queue = Queue(max_items) + self._queue = AsyncQueue(max_items) #{ Interface diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 19fc9f6e..d6b5711d 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -5,6 +5,7 @@ from threading import Lock from util import ( SyncQueue, AsyncQueue, + DummyLock ) from task import InputChannelTask @@ -80,12 +81,13 @@ class RPoolChannel(RChannel): # * have_enough = False if count > 0: - have_enough = self._wc._queue.qsize() >= count - # END risky game + have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count + # END ########## prepare ############################## if not have_enough: self._pool._prepare_channel_read(self._task, count) + # END prepare pool scheduling ####### read data ######## @@ -260,26 +262,33 @@ class Pool(object): queue = self._queue if numchunks > 1: for i in xrange(numchunks): + # schedule them as early as we know about them + task.add_scheduled_items(chunksize) queue.put((task.process, chunksize)) # END for each chunk to put else: + task.add_scheduled_items(chunksize) queue.put((task.process, chunksize)) # END try efficient looping if remainder: + task.add_scheduled_items(remainder) queue.put((task.process, remainder)) # END handle chunksize else: # no workers, so we have to do the work ourselves if numchunks > 1: for i in xrange(numchunks): + task.add_scheduled_items(chunksize) task.process(chunksize) # END for each chunk to put else: + task.add_scheduled_items(chunksize) task.process(chunksize) # END try efficient looping if remainder: + task.add_scheduled_items(remainder) task.process(remainder) # END handle chunksize # END handle serial mode @@ -348,6 +357,9 @@ class Pool(object): self._workers.append(worker) # END for each new worker to create elif cur_count > size: + # we can safely increase the size, even from serial mode, as we would + # only be able to do this if the serial ( sync ) mode finished processing. + # Just adding more workers is not a problem at all. del_count = cur_count - size for i in range(del_count): self._workers[i].stop_and_join() @@ -451,6 +463,11 @@ class Pool(object): # END add task relation # END handle input channels for connections + # fix locks - in serial mode, the task does not need real locks + if self.size() == 0: + task._slock = DummyLock() + # END improve locks + return rc #} END interface diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 4e8aef54..cf486f48 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -21,6 +21,8 @@ class OutputChannelTask(Node): '_out_wc', # output write channel '_exc', # exception caught '_done', # True if we are done + '_scheduled_items', # amount of scheduled items that will be processed in total + '_slock', # lock for scheduled items 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -33,6 +35,8 @@ class OutputChannelTask(Node): self._out_wc = None # to be set later self._exc = None self._done = False + self._scheduled_items = 0 + self._slock = threading.Lock() self.fun = fun self.min_count = None self.max_chunksize = 0 # note set @@ -50,6 +54,7 @@ class OutputChannelTask(Node): """Set the write channel to the given one :note: resets it done state in order to allow proper queue handling""" self._done = False + self._scheduled_items = 0 self._out_wc = wc def close(self): @@ -65,6 +70,21 @@ class OutputChannelTask(Node): """:return: Exception caught during last processing or None""" return self._exc + def add_scheduled_items(self, count): + """Add the given amount of scheduled items to this task""" + self._slock.acquire() + self._scheduled_items += count + self._slock.release() + + def scheduled_item_count(self): + """:return: amount of scheduled items for this task""" + self._slock.acquire() + try: + return self._scheduled_items + finally: + self._slock.release() + # END threadsafe return + def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) @@ -78,14 +98,33 @@ class OutputChannelTask(Node): wc = self._out_wc if self.apply_single: for item in items: - wc.write(self.fun(item)) + rval = self.fun(item) + # decrement afterwards, the its unscheduled once its produced + self._slock.acquire() + self._scheduled_items -= 1 + self._slock.release() + wc.write(rval) # END for each item else: - wc.write(self.fun(items)) + # shouldn't apply single be the default anyway ? + # The task designers should chunk them up in advance + rvals = self.fun(items) + self._slock.acquire() + self._scheduled_items -= len(items) + self._slock.release() + for rval in rvals: + wc.write(rval) # END handle single apply except Exception, e: self._exc = e self.set_done() + # unschedule all, we don't know how many have been produced actually + # but only if we don't apply single please + if not self.apply_single: + self._slock.acquire() + self._scheduled_items -= len(items) + self._slock.release() + # END unschedule all # END exception handling del(wc) @@ -189,6 +228,10 @@ class InputChannelTask(OutputChannelTask): # for each task, which would allow to precisely determine whether # the pool as to be triggered, and bail out early. Problem would # be the + # * Perhaps one shouldn't seek the perfect solution , but instead + # document whats working and what not, or under which conditions. + # The whole system is simple, but gets more complicated the + # smarter it wants to be. if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref(): self._read = self._in_rc._read diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 85d44694..55766579 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -41,6 +41,18 @@ def cpu_count(): #} END routines + +class DummyLock(object): + """An object providing a do-nothing lock interface for use in sync mode""" + __slots__ = tuple() + + def acquire(self): + pass + + def release(self): + pass + + class SyncQueue(deque): """Adapter to allow using a deque like a queue, without locking""" def get(self, block=True, timeout=None): @@ -59,7 +71,7 @@ class SyncQueue(deque): class HSCondition(_Condition): """An attempt to make conditions less blocking, which gains performance in return by sleeping less""" - delay = 0.00005 # reduces wait times, but increases overhead + delay = 0.00002 # reduces wait times, but increases overhead def wait(self, timeout=None): waiter = Lock() |