diff options
Diffstat (limited to 'lib/git/async')
-rw-r--r-- | lib/git/async/channel.py | 102 | ||||
-rw-r--r-- | lib/git/async/pool.py | 76 | ||||
-rw-r--r-- | lib/git/async/task.py | 54 |
3 files changed, 107 insertions, 125 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index abb31035..9b019707 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -21,61 +21,57 @@ class Channel(object): If the channel is closed, any read operation will result in an exception This base class is not instantiated directly, but instead serves as constructor - for RWChannel pairs. + for Rwriter pairs. Create a new channel """ - __slots__ = tuple() - - -class WChannel(Channel): - """The write end of a channel - it is thread-safe""" - __slots__ = ('_queue') + __slots__ = 'queue' # The queue to use to store the actual data QueueCls = AsyncQueue def __init__(self): - """initialize this instance, able to hold max_items at once - Write calls will block if the channel is full, until someone reads from it""" - self._queue = self.QueueCls() - - #{ Interface - def write(self, item, block=True, timeout=None): - """Send an item into the channel, it can be read from the read end of the - channel accordingly - :param item: Item to send - :param block: If True, the call will block until there is free space in the - channel - :param timeout: timeout in seconds for blocking calls. - :raise ReadOnly: when writing into closed channel""" - # let the queue handle the 'closed' attribute, we write much more often - # to an open channel than to a closed one, saving a few cycles - self._queue.put(item, block, timeout) - + """initialize this instance with a queue holding the channel contents""" + self.queue = self.QueueCls() + + +class SerialChannel(Channel): + """A slightly faster version of a Channel, which sacrificed thead-safety for performance""" + QueueCls = SyncQueue + + +class Writer(object): + """The write end of a channel, a file-like interface for a channel""" + __slots__ = ('write', 'channel') + + def __init__(self, channel): + """Initialize the writer to use the given channel""" + self.channel = channel + self.write = channel.queue.put + + #{ Interface def size(self): - """:return: approximate number of items that could be read from the read-ends - of this channel""" - return self._queue.qsize() + return self.channel.queue.qsize() def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" - self._queue.set_writable(False) + self.channel.queue.set_writable(False) def closed(self): """:return: True if the channel was closed""" - return not self._queue.writable() + return not self.channel.queue.writable() #} END interface -class CallbackWChannel(WChannel): +class CallbackWriter(Writer): """The write end of a channel which allows you to setup a callback to be called after an item was written to the channel""" __slots__ = ('_pre_cb') - def __init__(self): - WChannel.__init__(self) + def __init__(self, channel): + Writer.__init__(self, channel) self._pre_cb = None + self.write = self._write def set_pre_cb(self, fun = lambda item: item): """Install a callback to be called before the given item is written. @@ -88,25 +84,19 @@ class CallbackWChannel(WChannel): self._pre_cb = fun return prev - def write(self, item, block=True, timeout=None): + def _write(self, item, block=True, timeout=None): if self._pre_cb: item = self._pre_cb(item) - WChannel.write(self, item, block, timeout) + self.channel.queue.put(item, block, timeout) - -class SerialWChannel(WChannel): - """A slightly faster version of a WChannel, which sacrificed thead-safety for - performance""" - QueueCls = SyncQueue - -class RChannel(Channel): - """The read-end of a corresponding write channel""" - __slots__ = '_wc' +class Reader(object): + """Allows reading from a channel""" + __slots__ = 'channel' - def __init__(self, wchannel): + def __init__(self, channel): """Initialize this instance from its parent write channel""" - self._wc = wchannel + self.channel = channel #{ Interface @@ -135,7 +125,7 @@ class RChannel(Channel): # in non-blocking mode, its all not a problem out = list() - queue = self._wc._queue + queue = self.channel.queue if not block: # be as fast as possible in non-blocking mode, hence # its a bit 'unrolled' @@ -198,12 +188,12 @@ class RChannel(Channel): #} END interface -class CallbackRChannel(RChannel): +class CallbackReader(Reader): """A channel which sends a callback before items are read from the channel""" __slots__ = "_pre_cb" - def __init__(self, wc): - RChannel.__init__(self, wc) + def __init__(self, channel): + Reader.__init__(self, channel) self._pre_cb = None def set_pre_cb(self, fun = lambda count: None): @@ -220,18 +210,20 @@ class CallbackRChannel(RChannel): def read(self, count=0, block=True, timeout=None): if self._pre_cb: self._pre_cb(count) - return RChannel.read(self, count, block, timeout) + return Reader.read(self, count, block, timeout) #} END classes #{ Constructors -def mkchannel(wctype = WChannel, rctype = RChannel): - """Create a channel, which consists of one write end and one read end - :return: tuple(write_channel, read_channel) +def mkchannel(ctype = Channel, wtype = Writer, rtype = Reader): + """Create a channel, with a reader and a writer + :return: tuple(reader, writer) + :param ctype: Channel to instantiate :param wctype: The type of the write channel to instantiate :param rctype: The type of the read channel to instantiate""" - wc = wctype() - rc = rctype(wc) + c = ctype() + wc = wtype(c) + rc = rtype(c) return wc, rc #} END constructors diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 1b3c2748..68551ea3 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -18,27 +18,28 @@ from Queue import ( from graph import Graph from channel import ( mkchannel, - WChannel, - SerialWChannel, - CallbackRChannel + Writer, + Channel, + SerialChannel, + CallbackReader ) import sys import weakref from time import sleep +import new -class RPoolChannel(CallbackRChannel): - """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call - before and after an item is to be read. - +class PoolReader(CallbackReader): + """A reader designed to read from channels which take part in pools It acts like a handle to the underlying task in the pool.""" - __slots__ = ('_task_ref', '_pool_ref') + __slots__ = ('_task_ref', '_pool_ref', '_read') - def __init__(self, wchannel, task, pool): - CallbackRChannel.__init__(self, wchannel) + def __init__(self, channel, task, pool): + CallbackReader.__init__(self, channel) self._task_ref = weakref.ref(task) self._pool_ref = weakref.ref(pool) + self._read = new.instancemethod(CallbackReader.__dict__['read'], self, CallbackReader) def __del__(self): """Assures that our task will be deleted if we were the last reader""" @@ -63,15 +64,9 @@ class RPoolChannel(CallbackRChannel): # okay for now # TODO: Fix this - private/public method if sys.getrefcount(self) < 6: - pool.remove_task(task, _from_destructor_=True) + pool.remove_task(task) # END handle refcount based removal of task - #{ Internal - def _read(self, count=0, block=True, timeout=None): - """Direct read, bypassing the pool handling""" - return CallbackRChannel.read(self, count, block, timeout) - #} END internal - #{ Interface def pool_ref(self): @@ -118,7 +113,7 @@ class RPoolChannel(CallbackRChannel): ####### read data ######## ########################## # read actual items, tasks were setup to put their output into our channel ( as well ) - items = CallbackRChannel.read(self, count, block, timeout) + items = CallbackReader.read(self, count, block, timeout) ########################## @@ -262,21 +257,21 @@ class Pool(object): # should make things execute faster. Putting the if statements # into the loop would be less code, but ... slower # DEBUG - # print actual_count, numchunks, chunksize, remainder, task._out_wc.size() + # print actual_count, numchunks, chunksize, remainder, task._out_writer.size() if self._num_workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task - queue = self._queue + qput = self._queue if numchunks > 1: for i in xrange(numchunks): - queue.put((task.process, chunksize)) + qput((task.process, chunksize)) # END for each chunk to put else: - queue.put((task.process, chunksize)) + qput((task.process, chunksize)) # END try efficient looping if remainder: - queue.put((task.process, remainder)) + qput((task.process, remainder)) # END handle chunksize else: # no workers, so we have to do the work ourselves @@ -295,16 +290,16 @@ class Pool(object): # END for each task to process - def _remove_task_if_orphaned(self, task, from_destructor): + def _remove_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" - # 1 as its stored on the task, 1 for the getrefcount call + # 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader # If we are getting here from the destructor of an RPool channel, # its totally valid to virtually decrement the refcount by 1 as # we can expect it to drop once the destructor completes, which is when # we finish all recursive calls - max_ref_count = 3 + from_destructor - if sys.getrefcount(task.wchannel()) < max_ref_count: - self.remove_task(task, from_destructor) + max_ref_count = 3 + if sys.getrefcount(task.writer().channel) < max_ref_count: + self.remove_task(task) #} END internal #{ Interface @@ -375,7 +370,7 @@ class Pool(object): finally: self._taskgraph_lock.release() - def remove_task(self, task, _from_destructor_=False): + def remove_task(self, task): """Delete the task Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume @@ -410,7 +405,7 @@ class Pool(object): # END locked deletion for t in in_tasks: - self._remove_task_if_orphaned(t, _from_destructor_) + self._remove_task_if_orphaned(t) # END handle orphans recursively return self @@ -421,7 +416,7 @@ class Pool(object): the task will be considered orphaned and will be deleted on the next occasion.""" # create a write channel for it - wctype = WChannel + ctype = Channel # adjust the task with our pool ref, if it has the slot and is empty # For now, we don't allow tasks to be used in multiple pools, except @@ -442,26 +437,29 @@ class Pool(object): # Use a non-threadsafe queue # This brings about 15% more performance, but sacrifices thread-safety - # when reading from multiple threads. if self.size() == 0: - wctype = SerialWChannel + ctype = SerialChannel # END improve locks # setup the tasks channel - respect the task creators choice though # if it is set. - wc = task.wchannel() + wc = task.writer() + ch = None if wc is None: - wc = wctype() + ch = ctype() + wc = Writer(ch) + task.set_writer(wc) + else: + ch = wc.channel # END create write channel ifunset - rc = RPoolChannel(wc, task, self) - task.set_wchannel(wc) + rc = PoolReader(ch, task, self) finally: self._taskgraph_lock.release() # END sync task addition # If the input channel is one of our read channels, we add the relation - if hasattr(task, 'rchannel'): - ic = task.rchannel() + if hasattr(task, 'reader'): + ic = task.reader() if hasattr(ic, 'pool_ref') and ic.pool_ref()() is self: self._taskgraph_lock.acquire() try: diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 0b1d0666..5a6c1e95 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,23 +1,17 @@ from graph import Node from util import ReadOnly -from channel import ( - WChannel, - CallbackRChannel - ) import threading import weakref import sys import new -getrefcount = sys.getrefcount - class OutputChannelTask(Node): """Abstracts a named task as part of a set of interdependent tasks, which contains additional information on how the task should be queued and processed. Results of the item processing are sent to a write channel, which is to be - set by the creator using the ``set_wchannel`` method. + set by the creator using the ``set_writer`` method. * **min_count** assures that not less than min_count items will be processed per call. * **max_chunksize** assures that multi-threading is happening in smaller chunks. If @@ -25,9 +19,11 @@ class OutputChannelTask(Node): one worker, as well as dependent tasks. If you want finer granularity , you can specify this here, causing chunks to be no larger than max_chunksize""" __slots__ = ( '_read', # method to yield items to process - '_out_wc', # output write channel + '_out_writer', # output write channel '_exc', # exception caught '_done', # True if we are done + '_num_writers', # number of concurrent writers + '_wlock', # lock for the above 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -35,12 +31,14 @@ class OutputChannelTask(Node): ) def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0, - wchannel=None): + writer=None): Node.__init__(self, id) self._read = None # to be set by subclasss - self._out_wc = wchannel # to be set later + self._out_writer = writer self._exc = None self._done = False + self._num_writers = 0 + self._wlock = threading.Lock() self.fun = fun self.min_count = None self.max_chunksize = 0 # note set @@ -54,29 +52,29 @@ class OutputChannelTask(Node): """Set ourselves to being done, has we have completed the processing""" self._done = True - def set_wchannel(self, wc): + def set_writer(self, writer): """Set the write channel to the given one""" - self._out_wc = wc + self._out_writer = writer - def wchannel(self): + def writer(self): """:return: a proxy to our write channel or None if non is set :note: you must not hold a reference to our write channel when the task is being processed. This would cause the write channel never to be closed as the task will think there is still another instance being processed which can close the channel once it is done. In the worst case, this will block your reads.""" - if self._out_wc is None: + if self._out_writer is None: return None - return self._out_wc + return self._out_writer def close(self): """A closed task will close its channel to assure the readers will wake up :note: its safe to call this method multiple times""" - self._out_wc.close() + self._out_writer.close() def is_closed(self): """:return: True if the task's write channel is closed""" - return self._out_wc.closed() + return self._out_writer.closed() def error(self): """:return: Exception caught during last processing or None""" @@ -88,24 +86,18 @@ class OutputChannelTask(Node): items = self._read(count) # print "%r: done reading %i items" % (self.id, len(items)) try: - # increase the ref-count - we use this to determine whether anyone else - # is currently handling our output channel. As this method runs asynchronously, - # we have to make sure that the channel is closed by the last finishing task, - # which is not necessarily the one which determines that he is done - # as he couldn't read anymore items. - # The refcount will be dropped in the moment we get out of here. - wc = self._out_wc + write = self._out_writer.write if self.apply_single: for item in items: rval = self.fun(item) - wc.write(rval) + write(rval) # END for each item else: # shouldn't apply single be the default anyway ? # The task designers should chunk them up in advance rvals = self.fun(items) for rval in rvals: - wc.write(rval) + write(rval) # END handle single apply except Exception, e: print >> sys.stderr, "task %s error:" % self.id, type(e), str(e) # TODO: REMOVE DEBUG, or make it use logging @@ -131,7 +123,7 @@ class OutputChannelTask(Node): self._exc = e # END set error flag # END exception handling - del(wc) + # if we didn't get all demanded items, which is also the case if count is 0 # we have depleted the input channel and are done @@ -151,7 +143,7 @@ class OutputChannelTask(Node): # thread having its copy on the stack # + 1 for the instance we provide to refcount # Soft close, so others can continue writing their results - if self.is_done() and getrefcount(self._out_wc) < 4: + if self.is_done(): # print "Closing channel of %r" % self.id self.close() # END handle channel closure @@ -212,14 +204,14 @@ class InputChannelTask(OutputChannelTask): to be the input channel to read from though.""" __slots__ = "_pool_ref" - def __init__(self, in_rc, *args, **kwargs): + def __init__(self, in_reader, *args, **kwargs): OutputChannelTask.__init__(self, *args, **kwargs) - self._read = in_rc.read + self._read = in_reader.read self._pool_ref = None #{ Internal Interface - def rchannel(self): + def reader(self): """:return: input channel from which we read""" # the instance is bound in its instance method - lets use this to keep # the refcount at one ( per consumer ) |