summaryrefslogtreecommitdiff
path: root/lib/git/async
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async')
-rw-r--r--lib/git/async/channel.py102
-rw-r--r--lib/git/async/pool.py76
-rw-r--r--lib/git/async/task.py54
3 files changed, 107 insertions, 125 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
index abb31035..9b019707 100644
--- a/lib/git/async/channel.py
+++ b/lib/git/async/channel.py
@@ -21,61 +21,57 @@ class Channel(object):
If the channel is closed, any read operation will result in an exception
This base class is not instantiated directly, but instead serves as constructor
- for RWChannel pairs.
+ for Rwriter pairs.
Create a new channel """
- __slots__ = tuple()
-
-
-class WChannel(Channel):
- """The write end of a channel - it is thread-safe"""
- __slots__ = ('_queue')
+ __slots__ = 'queue'
# The queue to use to store the actual data
QueueCls = AsyncQueue
def __init__(self):
- """initialize this instance, able to hold max_items at once
- Write calls will block if the channel is full, until someone reads from it"""
- self._queue = self.QueueCls()
-
- #{ Interface
- def write(self, item, block=True, timeout=None):
- """Send an item into the channel, it can be read from the read end of the
- channel accordingly
- :param item: Item to send
- :param block: If True, the call will block until there is free space in the
- channel
- :param timeout: timeout in seconds for blocking calls.
- :raise ReadOnly: when writing into closed channel"""
- # let the queue handle the 'closed' attribute, we write much more often
- # to an open channel than to a closed one, saving a few cycles
- self._queue.put(item, block, timeout)
-
+ """initialize this instance with a queue holding the channel contents"""
+ self.queue = self.QueueCls()
+
+
+class SerialChannel(Channel):
+ """A slightly faster version of a Channel, which sacrificed thead-safety for performance"""
+ QueueCls = SyncQueue
+
+
+class Writer(object):
+ """The write end of a channel, a file-like interface for a channel"""
+ __slots__ = ('write', 'channel')
+
+ def __init__(self, channel):
+ """Initialize the writer to use the given channel"""
+ self.channel = channel
+ self.write = channel.queue.put
+
+ #{ Interface
def size(self):
- """:return: approximate number of items that could be read from the read-ends
- of this channel"""
- return self._queue.qsize()
+ return self.channel.queue.qsize()
def close(self):
"""Close the channel. Multiple close calls on a closed channel are no
an error"""
- self._queue.set_writable(False)
+ self.channel.queue.set_writable(False)
def closed(self):
""":return: True if the channel was closed"""
- return not self._queue.writable()
+ return not self.channel.queue.writable()
#} END interface
-class CallbackWChannel(WChannel):
+class CallbackWriter(Writer):
"""The write end of a channel which allows you to setup a callback to be
called after an item was written to the channel"""
__slots__ = ('_pre_cb')
- def __init__(self):
- WChannel.__init__(self)
+ def __init__(self, channel):
+ Writer.__init__(self, channel)
self._pre_cb = None
+ self.write = self._write
def set_pre_cb(self, fun = lambda item: item):
"""Install a callback to be called before the given item is written.
@@ -88,25 +84,19 @@ class CallbackWChannel(WChannel):
self._pre_cb = fun
return prev
- def write(self, item, block=True, timeout=None):
+ def _write(self, item, block=True, timeout=None):
if self._pre_cb:
item = self._pre_cb(item)
- WChannel.write(self, item, block, timeout)
+ self.channel.queue.put(item, block, timeout)
-
-class SerialWChannel(WChannel):
- """A slightly faster version of a WChannel, which sacrificed thead-safety for
- performance"""
- QueueCls = SyncQueue
-
-class RChannel(Channel):
- """The read-end of a corresponding write channel"""
- __slots__ = '_wc'
+class Reader(object):
+ """Allows reading from a channel"""
+ __slots__ = 'channel'
- def __init__(self, wchannel):
+ def __init__(self, channel):
"""Initialize this instance from its parent write channel"""
- self._wc = wchannel
+ self.channel = channel
#{ Interface
@@ -135,7 +125,7 @@ class RChannel(Channel):
# in non-blocking mode, its all not a problem
out = list()
- queue = self._wc._queue
+ queue = self.channel.queue
if not block:
# be as fast as possible in non-blocking mode, hence
# its a bit 'unrolled'
@@ -198,12 +188,12 @@ class RChannel(Channel):
#} END interface
-class CallbackRChannel(RChannel):
+class CallbackReader(Reader):
"""A channel which sends a callback before items are read from the channel"""
__slots__ = "_pre_cb"
- def __init__(self, wc):
- RChannel.__init__(self, wc)
+ def __init__(self, channel):
+ Reader.__init__(self, channel)
self._pre_cb = None
def set_pre_cb(self, fun = lambda count: None):
@@ -220,18 +210,20 @@ class CallbackRChannel(RChannel):
def read(self, count=0, block=True, timeout=None):
if self._pre_cb:
self._pre_cb(count)
- return RChannel.read(self, count, block, timeout)
+ return Reader.read(self, count, block, timeout)
#} END classes
#{ Constructors
-def mkchannel(wctype = WChannel, rctype = RChannel):
- """Create a channel, which consists of one write end and one read end
- :return: tuple(write_channel, read_channel)
+def mkchannel(ctype = Channel, wtype = Writer, rtype = Reader):
+ """Create a channel, with a reader and a writer
+ :return: tuple(reader, writer)
+ :param ctype: Channel to instantiate
:param wctype: The type of the write channel to instantiate
:param rctype: The type of the read channel to instantiate"""
- wc = wctype()
- rc = rctype(wc)
+ c = ctype()
+ wc = wtype(c)
+ rc = rtype(c)
return wc, rc
#} END constructors
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 1b3c2748..68551ea3 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -18,27 +18,28 @@ from Queue import (
from graph import Graph
from channel import (
mkchannel,
- WChannel,
- SerialWChannel,
- CallbackRChannel
+ Writer,
+ Channel,
+ SerialChannel,
+ CallbackReader
)
import sys
import weakref
from time import sleep
+import new
-class RPoolChannel(CallbackRChannel):
- """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call
- before and after an item is to be read.
-
+class PoolReader(CallbackReader):
+ """A reader designed to read from channels which take part in pools
It acts like a handle to the underlying task in the pool."""
- __slots__ = ('_task_ref', '_pool_ref')
+ __slots__ = ('_task_ref', '_pool_ref', '_read')
- def __init__(self, wchannel, task, pool):
- CallbackRChannel.__init__(self, wchannel)
+ def __init__(self, channel, task, pool):
+ CallbackReader.__init__(self, channel)
self._task_ref = weakref.ref(task)
self._pool_ref = weakref.ref(pool)
+ self._read = new.instancemethod(CallbackReader.__dict__['read'], self, CallbackReader)
def __del__(self):
"""Assures that our task will be deleted if we were the last reader"""
@@ -63,15 +64,9 @@ class RPoolChannel(CallbackRChannel):
# okay for now
# TODO: Fix this - private/public method
if sys.getrefcount(self) < 6:
- pool.remove_task(task, _from_destructor_=True)
+ pool.remove_task(task)
# END handle refcount based removal of task
- #{ Internal
- def _read(self, count=0, block=True, timeout=None):
- """Direct read, bypassing the pool handling"""
- return CallbackRChannel.read(self, count, block, timeout)
- #} END internal
-
#{ Interface
def pool_ref(self):
@@ -118,7 +113,7 @@ class RPoolChannel(CallbackRChannel):
####### read data ########
##########################
# read actual items, tasks were setup to put their output into our channel ( as well )
- items = CallbackRChannel.read(self, count, block, timeout)
+ items = CallbackReader.read(self, count, block, timeout)
##########################
@@ -262,21 +257,21 @@ class Pool(object):
# should make things execute faster. Putting the if statements
# into the loop would be less code, but ... slower
# DEBUG
- # print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
+ # print actual_count, numchunks, chunksize, remainder, task._out_writer.size()
if self._num_workers:
# respect the chunk size, and split the task up if we want
# to process too much. This can be defined per task
- queue = self._queue
+ qput = self._queue
if numchunks > 1:
for i in xrange(numchunks):
- queue.put((task.process, chunksize))
+ qput((task.process, chunksize))
# END for each chunk to put
else:
- queue.put((task.process, chunksize))
+ qput((task.process, chunksize))
# END try efficient looping
if remainder:
- queue.put((task.process, remainder))
+ qput((task.process, remainder))
# END handle chunksize
else:
# no workers, so we have to do the work ourselves
@@ -295,16 +290,16 @@ class Pool(object):
# END for each task to process
- def _remove_task_if_orphaned(self, task, from_destructor):
+ def _remove_task_if_orphaned(self, task):
"""Check the task, and delete it if it is orphaned"""
- # 1 as its stored on the task, 1 for the getrefcount call
+ # 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader
# If we are getting here from the destructor of an RPool channel,
# its totally valid to virtually decrement the refcount by 1 as
# we can expect it to drop once the destructor completes, which is when
# we finish all recursive calls
- max_ref_count = 3 + from_destructor
- if sys.getrefcount(task.wchannel()) < max_ref_count:
- self.remove_task(task, from_destructor)
+ max_ref_count = 3
+ if sys.getrefcount(task.writer().channel) < max_ref_count:
+ self.remove_task(task)
#} END internal
#{ Interface
@@ -375,7 +370,7 @@ class Pool(object):
finally:
self._taskgraph_lock.release()
- def remove_task(self, task, _from_destructor_=False):
+ def remove_task(self, task):
"""Delete the task
Additionally we will remove orphaned tasks, which can be identified if their
output channel is only held by themselves, so no one will ever consume
@@ -410,7 +405,7 @@ class Pool(object):
# END locked deletion
for t in in_tasks:
- self._remove_task_if_orphaned(t, _from_destructor_)
+ self._remove_task_if_orphaned(t)
# END handle orphans recursively
return self
@@ -421,7 +416,7 @@ class Pool(object):
the task will be considered orphaned and will be deleted on the next
occasion."""
# create a write channel for it
- wctype = WChannel
+ ctype = Channel
# adjust the task with our pool ref, if it has the slot and is empty
# For now, we don't allow tasks to be used in multiple pools, except
@@ -442,26 +437,29 @@ class Pool(object):
# Use a non-threadsafe queue
# This brings about 15% more performance, but sacrifices thread-safety
- # when reading from multiple threads.
if self.size() == 0:
- wctype = SerialWChannel
+ ctype = SerialChannel
# END improve locks
# setup the tasks channel - respect the task creators choice though
# if it is set.
- wc = task.wchannel()
+ wc = task.writer()
+ ch = None
if wc is None:
- wc = wctype()
+ ch = ctype()
+ wc = Writer(ch)
+ task.set_writer(wc)
+ else:
+ ch = wc.channel
# END create write channel ifunset
- rc = RPoolChannel(wc, task, self)
- task.set_wchannel(wc)
+ rc = PoolReader(ch, task, self)
finally:
self._taskgraph_lock.release()
# END sync task addition
# If the input channel is one of our read channels, we add the relation
- if hasattr(task, 'rchannel'):
- ic = task.rchannel()
+ if hasattr(task, 'reader'):
+ ic = task.reader()
if hasattr(ic, 'pool_ref') and ic.pool_ref()() is self:
self._taskgraph_lock.acquire()
try:
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index 0b1d0666..5a6c1e95 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -1,23 +1,17 @@
from graph import Node
from util import ReadOnly
-from channel import (
- WChannel,
- CallbackRChannel
- )
import threading
import weakref
import sys
import new
-getrefcount = sys.getrefcount
-
class OutputChannelTask(Node):
"""Abstracts a named task as part of a set of interdependent tasks, which contains
additional information on how the task should be queued and processed.
Results of the item processing are sent to a write channel, which is to be
- set by the creator using the ``set_wchannel`` method.
+ set by the creator using the ``set_writer`` method.
* **min_count** assures that not less than min_count items will be processed per call.
* **max_chunksize** assures that multi-threading is happening in smaller chunks. If
@@ -25,9 +19,11 @@ class OutputChannelTask(Node):
one worker, as well as dependent tasks. If you want finer granularity , you can
specify this here, causing chunks to be no larger than max_chunksize"""
__slots__ = ( '_read', # method to yield items to process
- '_out_wc', # output write channel
+ '_out_writer', # output write channel
'_exc', # exception caught
'_done', # True if we are done
+ '_num_writers', # number of concurrent writers
+ '_wlock', # lock for the above
'fun', # function to call with items read
'min_count', # minimum amount of items to produce, None means no override
'max_chunksize', # maximium amount of items to process per process call
@@ -35,12 +31,14 @@ class OutputChannelTask(Node):
)
def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0,
- wchannel=None):
+ writer=None):
Node.__init__(self, id)
self._read = None # to be set by subclasss
- self._out_wc = wchannel # to be set later
+ self._out_writer = writer
self._exc = None
self._done = False
+ self._num_writers = 0
+ self._wlock = threading.Lock()
self.fun = fun
self.min_count = None
self.max_chunksize = 0 # note set
@@ -54,29 +52,29 @@ class OutputChannelTask(Node):
"""Set ourselves to being done, has we have completed the processing"""
self._done = True
- def set_wchannel(self, wc):
+ def set_writer(self, writer):
"""Set the write channel to the given one"""
- self._out_wc = wc
+ self._out_writer = writer
- def wchannel(self):
+ def writer(self):
""":return: a proxy to our write channel or None if non is set
:note: you must not hold a reference to our write channel when the
task is being processed. This would cause the write channel never
to be closed as the task will think there is still another instance
being processed which can close the channel once it is done.
In the worst case, this will block your reads."""
- if self._out_wc is None:
+ if self._out_writer is None:
return None
- return self._out_wc
+ return self._out_writer
def close(self):
"""A closed task will close its channel to assure the readers will wake up
:note: its safe to call this method multiple times"""
- self._out_wc.close()
+ self._out_writer.close()
def is_closed(self):
""":return: True if the task's write channel is closed"""
- return self._out_wc.closed()
+ return self._out_writer.closed()
def error(self):
""":return: Exception caught during last processing or None"""
@@ -88,24 +86,18 @@ class OutputChannelTask(Node):
items = self._read(count)
# print "%r: done reading %i items" % (self.id, len(items))
try:
- # increase the ref-count - we use this to determine whether anyone else
- # is currently handling our output channel. As this method runs asynchronously,
- # we have to make sure that the channel is closed by the last finishing task,
- # which is not necessarily the one which determines that he is done
- # as he couldn't read anymore items.
- # The refcount will be dropped in the moment we get out of here.
- wc = self._out_wc
+ write = self._out_writer.write
if self.apply_single:
for item in items:
rval = self.fun(item)
- wc.write(rval)
+ write(rval)
# END for each item
else:
# shouldn't apply single be the default anyway ?
# The task designers should chunk them up in advance
rvals = self.fun(items)
for rval in rvals:
- wc.write(rval)
+ write(rval)
# END handle single apply
except Exception, e:
print >> sys.stderr, "task %s error:" % self.id, type(e), str(e) # TODO: REMOVE DEBUG, or make it use logging
@@ -131,7 +123,7 @@ class OutputChannelTask(Node):
self._exc = e
# END set error flag
# END exception handling
- del(wc)
+
# if we didn't get all demanded items, which is also the case if count is 0
# we have depleted the input channel and are done
@@ -151,7 +143,7 @@ class OutputChannelTask(Node):
# thread having its copy on the stack
# + 1 for the instance we provide to refcount
# Soft close, so others can continue writing their results
- if self.is_done() and getrefcount(self._out_wc) < 4:
+ if self.is_done():
# print "Closing channel of %r" % self.id
self.close()
# END handle channel closure
@@ -212,14 +204,14 @@ class InputChannelTask(OutputChannelTask):
to be the input channel to read from though."""
__slots__ = "_pool_ref"
- def __init__(self, in_rc, *args, **kwargs):
+ def __init__(self, in_reader, *args, **kwargs):
OutputChannelTask.__init__(self, *args, **kwargs)
- self._read = in_rc.read
+ self._read = in_reader.read
self._pool_ref = None
#{ Internal Interface
- def rchannel(self):
+ def reader(self):
""":return: input channel from which we read"""
# the instance is bound in its instance method - lets use this to keep
# the refcount at one ( per consumer )