diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-11 14:59:02 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-11 14:59:02 +0200 |
commit | f606937a7a21237c866efafcad33675e6539c103 (patch) | |
tree | 13ba7731de4798b2c9bfa24ccc893e4d8e5b8e8d /lib/git/async | |
parent | 257a8a9441fca9a9bc384f673ba86ef5c3f1715d (diff) | |
parent | 18e3252a1f655f09093a4cffd5125342a8f94f3b (diff) | |
download | gitpython-f606937a7a21237c866efafcad33675e6539c103.tar.gz |
Merge branch 'taskdep' into async
Diffstat (limited to 'lib/git/async')
-rw-r--r-- | lib/git/async/channel.py | 102 | ||||
-rw-r--r-- | lib/git/async/graph.py | 37 | ||||
-rw-r--r-- | lib/git/async/pool.py | 104 | ||||
-rw-r--r-- | lib/git/async/task.py | 143 | ||||
-rw-r--r-- | lib/git/async/thread.py | 45 | ||||
-rw-r--r-- | lib/git/async/util.py | 16 |
6 files changed, 275 insertions, 172 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index abb31035..9b019707 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -21,61 +21,57 @@ class Channel(object): If the channel is closed, any read operation will result in an exception This base class is not instantiated directly, but instead serves as constructor - for RWChannel pairs. + for Rwriter pairs. Create a new channel """ - __slots__ = tuple() - - -class WChannel(Channel): - """The write end of a channel - it is thread-safe""" - __slots__ = ('_queue') + __slots__ = 'queue' # The queue to use to store the actual data QueueCls = AsyncQueue def __init__(self): - """initialize this instance, able to hold max_items at once - Write calls will block if the channel is full, until someone reads from it""" - self._queue = self.QueueCls() - - #{ Interface - def write(self, item, block=True, timeout=None): - """Send an item into the channel, it can be read from the read end of the - channel accordingly - :param item: Item to send - :param block: If True, the call will block until there is free space in the - channel - :param timeout: timeout in seconds for blocking calls. - :raise ReadOnly: when writing into closed channel""" - # let the queue handle the 'closed' attribute, we write much more often - # to an open channel than to a closed one, saving a few cycles - self._queue.put(item, block, timeout) - + """initialize this instance with a queue holding the channel contents""" + self.queue = self.QueueCls() + + +class SerialChannel(Channel): + """A slightly faster version of a Channel, which sacrificed thead-safety for performance""" + QueueCls = SyncQueue + + +class Writer(object): + """The write end of a channel, a file-like interface for a channel""" + __slots__ = ('write', 'channel') + + def __init__(self, channel): + """Initialize the writer to use the given channel""" + self.channel = channel + self.write = channel.queue.put + + #{ Interface def size(self): - """:return: approximate number of items that could be read from the read-ends - of this channel""" - return self._queue.qsize() + return self.channel.queue.qsize() def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" - self._queue.set_writable(False) + self.channel.queue.set_writable(False) def closed(self): """:return: True if the channel was closed""" - return not self._queue.writable() + return not self.channel.queue.writable() #} END interface -class CallbackWChannel(WChannel): +class CallbackWriter(Writer): """The write end of a channel which allows you to setup a callback to be called after an item was written to the channel""" __slots__ = ('_pre_cb') - def __init__(self): - WChannel.__init__(self) + def __init__(self, channel): + Writer.__init__(self, channel) self._pre_cb = None + self.write = self._write def set_pre_cb(self, fun = lambda item: item): """Install a callback to be called before the given item is written. @@ -88,25 +84,19 @@ class CallbackWChannel(WChannel): self._pre_cb = fun return prev - def write(self, item, block=True, timeout=None): + def _write(self, item, block=True, timeout=None): if self._pre_cb: item = self._pre_cb(item) - WChannel.write(self, item, block, timeout) + self.channel.queue.put(item, block, timeout) - -class SerialWChannel(WChannel): - """A slightly faster version of a WChannel, which sacrificed thead-safety for - performance""" - QueueCls = SyncQueue - -class RChannel(Channel): - """The read-end of a corresponding write channel""" - __slots__ = '_wc' +class Reader(object): + """Allows reading from a channel""" + __slots__ = 'channel' - def __init__(self, wchannel): + def __init__(self, channel): """Initialize this instance from its parent write channel""" - self._wc = wchannel + self.channel = channel #{ Interface @@ -135,7 +125,7 @@ class RChannel(Channel): # in non-blocking mode, its all not a problem out = list() - queue = self._wc._queue + queue = self.channel.queue if not block: # be as fast as possible in non-blocking mode, hence # its a bit 'unrolled' @@ -198,12 +188,12 @@ class RChannel(Channel): #} END interface -class CallbackRChannel(RChannel): +class CallbackReader(Reader): """A channel which sends a callback before items are read from the channel""" __slots__ = "_pre_cb" - def __init__(self, wc): - RChannel.__init__(self, wc) + def __init__(self, channel): + Reader.__init__(self, channel) self._pre_cb = None def set_pre_cb(self, fun = lambda count: None): @@ -220,18 +210,20 @@ class CallbackRChannel(RChannel): def read(self, count=0, block=True, timeout=None): if self._pre_cb: self._pre_cb(count) - return RChannel.read(self, count, block, timeout) + return Reader.read(self, count, block, timeout) #} END classes #{ Constructors -def mkchannel(wctype = WChannel, rctype = RChannel): - """Create a channel, which consists of one write end and one read end - :return: tuple(write_channel, read_channel) +def mkchannel(ctype = Channel, wtype = Writer, rtype = Reader): + """Create a channel, with a reader and a writer + :return: tuple(reader, writer) + :param ctype: Channel to instantiate :param wctype: The type of the write channel to instantiate :param rctype: The type of the read channel to instantiate""" - wc = wctype() - rc = rctype(wc) + c = ctype() + wc = wtype(c) + rc = rtype(c) return wc, rc #} END constructors diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py index 6386cbaa..9ee0e891 100644 --- a/lib/git/async/graph.py +++ b/lib/git/async/graph.py @@ -25,14 +25,24 @@ class Graph(object): def __init__(self): self.nodes = list() + + def __del__(self): + """Deletes bidericational dependencies""" + for node in self.nodes: + node.in_nodes = None + node.out_nodes = None + # END cleanup nodes + + # otherwise the nodes would keep floating around + def add_node(self, node): """Add a new node to the graph :return: the newly added node""" self.nodes.append(node) return node - def del_node(self, node): + def remove_node(self, node): """Delete a node from the graph :return: self""" try: @@ -46,6 +56,8 @@ class Graph(object): del(outn.in_nodes[outn.in_nodes.index(node)]) for inn in node.in_nodes: del(inn.out_nodes[inn.out_nodes.index(node)]) + node.out_nodes = list() + node.in_nodes = list() return self def add_edge(self, u, v): @@ -87,25 +99,26 @@ class Graph(object): return self - def visit_input_inclusive_depth_first(self, node, visitor=lambda n: True ): - """Visit all input nodes of the given node, depth first, calling visitor - for each node on our way. If the function returns False, the traversal - will not go any deeper, but continue at the next branch - It will return the actual input node in the end !""" - nodes = node.in_nodes[:] + def input_inclusive_dfirst_reversed(self, node): + """Return all input nodes of the given node, depth first, + It will return the actual input node last, as it is required + like that by the pool""" + stack = [node] seen = set() # depth first - while nodes: - n = nodes.pop() + out = list() + while stack: + n = stack.pop() if n in seen: continue seen.add(n) + out.append(n) # only proceed in that direction if visitor is fine with it - if visitor(n): - nodes.extend(n.in_nodes) + stack.extend(n.in_nodes) # END call visitor # END while walking - visitor(node) + out.reverse() + return out diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 2ec18f1a..dbc201a9 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -10,7 +10,6 @@ from util import ( DummyLock ) -from task import InputChannelTask from Queue import ( Queue, Empty @@ -19,25 +18,25 @@ from Queue import ( from graph import Graph from channel import ( mkchannel, - WChannel, - SerialWChannel, - CallbackRChannel + Writer, + Channel, + SerialChannel, + CallbackReader ) import sys import weakref from time import sleep +import new -class RPoolChannel(CallbackRChannel): - """ A read-only pool channel may not be wrapped or derived from, but it provides slots to call - before and after an item is to be read. - +class PoolReader(CallbackReader): + """A reader designed to read from channels which take part in pools It acts like a handle to the underlying task in the pool.""" __slots__ = ('_task_ref', '_pool_ref') - def __init__(self, wchannel, task, pool): - CallbackRChannel.__init__(self, wchannel) + def __init__(self, channel, task, pool): + CallbackReader.__init__(self, channel) self._task_ref = weakref.ref(task) self._pool_ref = weakref.ref(pool) @@ -62,10 +61,27 @@ class RPoolChannel(CallbackRChannel): # it has no way of knowing that the write channel is about to diminsh. # which is why we pass the info as a private kwarg - not nice, but # okay for now - # TODO: Fix this - private/public method if sys.getrefcount(self) < 6: - pool.remove_task(task, _from_destructor_=True) + pool.remove_task(task, _from_destructor_ = True) # END handle refcount based removal of task + + #{ Internal + def _read(self, count=0, block=True, timeout=None): + return CallbackReader.read(self, count, block, timeout) + + #} END internal + + #{ Interface + + def pool_ref(self): + """:return: reference to the pool we belong to""" + return self._pool_ref + + def task_ref(self): + """:return: reference to the task producing our items""" + return self._task_ref + + #} END interface def read(self, count=0, block=True, timeout=None): """Read an item that was processed by one of our threads @@ -101,7 +117,7 @@ class RPoolChannel(CallbackRChannel): ####### read data ######## ########################## # read actual items, tasks were setup to put their output into our channel ( as well ) - items = CallbackRChannel.read(self, count, block, timeout) + items = CallbackReader.read(self, count, block, timeout) ########################## @@ -182,8 +198,7 @@ class Pool(object): dfirst_tasks = self._taskorder_cache[id(task)] except KeyError: # have to retrieve the list from the graph - dfirst_tasks = list() - self._tasks.visit_input_inclusive_depth_first(task, lambda n: dfirst_tasks.append(n)) + dfirst_tasks = self._tasks.input_inclusive_dfirst_reversed(task) self._taskorder_cache[id(task)] = dfirst_tasks # END handle cached order retrieval finally: @@ -246,21 +261,21 @@ class Pool(object): # should make things execute faster. Putting the if statements # into the loop would be less code, but ... slower # DEBUG - # print actual_count, numchunks, chunksize, remainder, task._out_wc.size() + # print actual_count, numchunks, chunksize, remainder, task._out_writer.size() if self._num_workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task - queue = self._queue + qput = self._queue.put if numchunks > 1: for i in xrange(numchunks): - queue.put((task.process, chunksize)) + qput((task.process, chunksize)) # END for each chunk to put else: - queue.put((task.process, chunksize)) + qput((task.process, chunksize)) # END try efficient looping if remainder: - queue.put((task.process, remainder)) + qput((task.process, remainder)) # END handle chunksize else: # no workers, so we have to do the work ourselves @@ -281,13 +296,13 @@ class Pool(object): def _remove_task_if_orphaned(self, task, from_destructor): """Check the task, and delete it if it is orphaned""" - # 1 as its stored on the task, 1 for the getrefcount call + # 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader # If we are getting here from the destructor of an RPool channel, # its totally valid to virtually decrement the refcount by 1 as # we can expect it to drop once the destructor completes, which is when # we finish all recursive calls max_ref_count = 3 + from_destructor - if sys.getrefcount(task.wchannel()) < max_ref_count: + if sys.getrefcount(task.writer().channel) < max_ref_count: self.remove_task(task, from_destructor) #} END internal @@ -358,7 +373,7 @@ class Pool(object): finally: self._taskgraph_lock.release() - def remove_task(self, task, _from_destructor_=False): + def remove_task(self, task, _from_destructor_ = False): """Delete the task Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume @@ -386,7 +401,7 @@ class Pool(object): # keep its input nodes as we check whether they were orphaned in_tasks = task.in_nodes - self._tasks.del_node(task) + self._tasks.remove_node(task) self._taskorder_cache.clear() finally: self._taskgraph_lock.release() @@ -404,7 +419,19 @@ class Pool(object): the task will be considered orphaned and will be deleted on the next occasion.""" # create a write channel for it - wctype = WChannel + ctype = Channel + + # adjust the task with our pool ref, if it has the slot and is empty + # For now, we don't allow tasks to be used in multiple pools, except + # for by their channels + if hasattr(task, 'pool'): + their_pool = task.pool() + if their_pool is None: + task.set_pool(self) + elif their_pool is not self: + raise ValueError("Task %r is already registered to another pool" % task.id) + # END handle pool exclusivity + # END handle pool aware tasks self._taskgraph_lock.acquire() try: @@ -413,30 +440,39 @@ class Pool(object): # Use a non-threadsafe queue # This brings about 15% more performance, but sacrifices thread-safety - # when reading from multiple threads. if self.size() == 0: - wctype = SerialWChannel + ctype = SerialChannel # END improve locks # setup the tasks channel - respect the task creators choice though # if it is set. - wc = task.wchannel() + wc = task.writer() + ch = None if wc is None: - wc = wctype() + ch = ctype() + wc = Writer(ch) + task.set_writer(wc) + else: + ch = wc.channel # END create write channel ifunset - rc = RPoolChannel(wc, task, self) - task.set_wchannel(wc) + rc = PoolReader(ch, task, self) finally: self._taskgraph_lock.release() # END sync task addition # If the input channel is one of our read channels, we add the relation - if isinstance(task, InputChannelTask): - ic = task.rchannel() - if isinstance(ic, RPoolChannel) and ic._pool_ref() is self: + if hasattr(task, 'reader'): + ic = task.reader() + if hasattr(ic, 'pool_ref') and ic.pool_ref()() is self: self._taskgraph_lock.acquire() try: self._tasks.add_edge(ic._task_ref(), task) + + # additionally, bypass ourselves when reading from the + # task, if possible + if hasattr(ic, '_read'): + task.set_read(ic._read) + # END handle read bypass finally: self._taskgraph_lock.release() # END handle edge-adding diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 03b40492..49e7e7cf 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,19 +1,17 @@ from graph import Node -from channel import WChannel from util import ReadOnly import threading +import weakref import sys import new -getrefcount = sys.getrefcount - class OutputChannelTask(Node): """Abstracts a named task as part of a set of interdependent tasks, which contains additional information on how the task should be queued and processed. Results of the item processing are sent to a write channel, which is to be - set by the creator using the ``set_wchannel`` method. + set by the creator using the ``set_writer`` method. * **min_count** assures that not less than min_count items will be processed per call. * **max_chunksize** assures that multi-threading is happening in smaller chunks. If @@ -21,9 +19,11 @@ class OutputChannelTask(Node): one worker, as well as dependent tasks. If you want finer granularity , you can specify this here, causing chunks to be no larger than max_chunksize""" __slots__ = ( '_read', # method to yield items to process - '_out_wc', # output write channel + '_out_writer', # output write channel '_exc', # exception caught '_done', # True if we are done + '_num_writers', # number of concurrent writers + '_wlock', # lock for the above 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -31,12 +31,14 @@ class OutputChannelTask(Node): ) def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0, - wchannel=None): + writer=None): Node.__init__(self, id) self._read = None # to be set by subclasss - self._out_wc = wchannel # to be set later + self._out_writer = writer self._exc = None self._done = False + self._num_writers = 0 + self._wlock = threading.Lock() self.fun = fun self.min_count = None self.max_chunksize = 0 # note set @@ -50,29 +52,29 @@ class OutputChannelTask(Node): """Set ourselves to being done, has we have completed the processing""" self._done = True - def set_wchannel(self, wc): + def set_writer(self, writer): """Set the write channel to the given one""" - self._out_wc = wc + self._out_writer = writer - def wchannel(self): + def writer(self): """:return: a proxy to our write channel or None if non is set :note: you must not hold a reference to our write channel when the task is being processed. This would cause the write channel never to be closed as the task will think there is still another instance being processed which can close the channel once it is done. In the worst case, this will block your reads.""" - if self._out_wc is None: + if self._out_writer is None: return None - return self._out_wc + return self._out_writer def close(self): """A closed task will close its channel to assure the readers will wake up :note: its safe to call this method multiple times""" - self._out_wc.close() + self._out_writer.close() def is_closed(self): """:return: True if the task's write channel is closed""" - return self._out_wc.closed() + return self._out_writer.closed() def error(self): """:return: Exception caught during last processing or None""" @@ -80,30 +82,42 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" + # first thing: increment the writer count - other tasks must be able + # to respond properly ( even if it turns out we don't need it later ) + self._wlock.acquire() + self._num_writers += 1 + self._wlock.release() + + #print "%r: reading %i" % (self.id, count) + #if hasattr(self, 'reader'): + # print "from", self.reader().channel items = self._read(count) + #print "%r: done reading %i items" % (self.id, len(items)) + try: - # increase the ref-count - we use this to determine whether anyone else - # is currently handling our output channel. As this method runs asynchronously, - # we have to make sure that the channel is closed by the last finishing task, - # which is not necessarily the one which determines that he is done - # as he couldn't read anymore items. - # The refcount will be dropped in the moment we get out of here. - wc = self._out_wc - if self.apply_single: - for item in items: - rval = self.fun(item) - wc.write(rval) - # END for each item - else: - # shouldn't apply single be the default anyway ? - # The task designers should chunk them up in advance - rvals = self.fun(items) - for rval in rvals: - wc.write(rval) - # END handle single apply + try: + if items: + write = self._out_writer.write + if self.apply_single: + for item in items: + rval = self.fun(item) + write(rval) + # END for each item + else: + # shouldn't apply single be the default anyway ? + # The task designers should chunk them up in advance + rvals = self.fun(items) + for rval in rvals: + write(rval) + # END handle single apply + # END if there is anything to do + finally: + self._wlock.acquire() + self._num_writers -= 1 + self._wlock.release() + # END handle writer count except Exception, e: - print >> sys.stderr, "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging - + print >> sys.stderr, "task %s error:" % self.id, type(e), str(e) # TODO: REMOVE DEBUG, or make it use logging # be sure our task is not scheduled again self.set_done() @@ -126,7 +140,7 @@ class OutputChannelTask(Node): self._exc = e # END set error flag # END exception handling - del(wc) + # if we didn't get all demanded items, which is also the case if count is 0 # we have depleted the input channel and are done @@ -145,8 +159,17 @@ class OutputChannelTask(Node): # The count is: 1 = wc itself, 2 = first reader channel, + x for every # thread having its copy on the stack # + 1 for the instance we provide to refcount - if self.is_done() and getrefcount(self._out_wc) < 4: - self.close() + # Soft close, so others can continue writing their results + if self.is_done(): + self._wlock.acquire() + try: + if self._num_writers == 0: + # print "Closing channel of %r" % self.id, len(self._out_writer.channel.queue), self._out_writer.channel + self.close() + # END handle writers + finally: + self._wlock.release() + # END assure lock release # END handle channel closure #{ Configuration @@ -159,7 +182,7 @@ class ThreadTaskBase(object): class InputIteratorTaskBase(OutputChannelTask): """Implements a task which processes items from an iterable in a multi-processing safe manner""" - __slots__ = ('_iterator', '_lock') + __slots__ = ('_iterator', '_lock', '_empty') # the type of the lock to use when reading from the iterator lock_type = None @@ -169,13 +192,25 @@ class InputIteratorTaskBase(OutputChannelTask): raise ValueError("Iterator %r needs a next() function" % iterator) self._iterator = iterator self._lock = self.lock_type() - self._read = self.__read + + # this is necessary to prevent a cyclic ref, preventing us from + # getting deleted ( and collected ) + weakself = weakref.ref(self) + self._read = lambda count: weakself().__read(count) + self._empty = False def __read(self, count=0): """Read count items from the iterator, and return them""" + # not threadsafe, but worst thing that could happen is that + # we try to get items one more time + if self._empty: + return list() + # END early abort + self._lock.acquire() try: if count == 0: + self._empty = True return list(self._iterator) else: out = list() @@ -184,6 +219,7 @@ class InputIteratorTaskBase(OutputChannelTask): try: out.append(it.next()) except StopIteration: + self._empty = True break # END handle empty iterator # END for each item to take @@ -199,17 +235,36 @@ class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase): lock_type = threading.Lock -class InputChannelTask(OutputChannelTask): +class InputChannelTask(OutputChannelTask, ThreadTaskBase): """Uses an input channel as source for reading items For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" + __slots__ = "_pool_ref" - def __init__(self, in_rc, *args, **kwargs): + def __init__(self, in_reader, *args, **kwargs): OutputChannelTask.__init__(self, *args, **kwargs) - self._read = in_rc.read + self._read = in_reader.read + self._pool_ref = None + + #{ Internal Interface - def rchannel(self): + def reader(self): """:return: input channel from which we read""" # the instance is bound in its instance method - lets use this to keep # the refcount at one ( per consumer ) return self._read.im_self + + def set_read(self, read): + """Adjust the read method to the given one""" + self._read = read + + def set_pool(self, pool): + self._pool_ref = weakref.ref(pool) + + def pool(self): + """:return: pool we are attached to, or None""" + if self._pool_ref is None: + return None + return self._pool_ref() + + #} END intenral interface diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index faeda04f..4d046a2f 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -116,7 +116,7 @@ class WorkerThread(TerminatableThread): t[1] = optional, tuple or list of arguments to pass to the routine t[2] = optional, dictionary of keyword arguments to pass to the routine """ - __slots__ = ('inq', '_current_routine') + __slots__ = ('inq') # define how often we should check for a shutdown request in case our @@ -128,7 +128,6 @@ class WorkerThread(TerminatableThread): self.inq = inq if inq is None: self.inq = Queue.Queue() - self._current_routine = None # routine we execute right now @classmethod def stop(cls, *args): @@ -141,34 +140,36 @@ class WorkerThread(TerminatableThread): gettask = self.inq.get while True: - self._current_routine = None if self._should_terminate(): break # END check for stop request # we wait and block - to terminate, send the 'stop' method tasktuple = gettask() - # needing exactly one function, and one arg - assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need" routine, arg = tasktuple - self._current_routine = routine - try: - rval = None - if inspect.ismethod(routine): - if routine.im_self is None: - rval = routine(self, arg) - else: + try: + rval = None + if inspect.ismethod(routine): + if routine.im_self is None: + rval = routine(self, arg) + else: + rval = routine(arg) + elif inspect.isroutine(routine): rval = routine(arg) - elif inspect.isroutine(routine): - rval = routine(arg) - else: - # ignore unknown items - print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) - break - # END make routine call + else: + # ignore unknown items + print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) + break + # END make routine call + finally: + # make sure we delete the routine to release the reference as soon + # as possible. Otherwise objects might not be destroyed + # while we are waiting + del(routine) + del(tasktuple) except StopProcessing: print self.name, "stops processing" # DEBUG break @@ -176,12 +177,10 @@ class WorkerThread(TerminatableThread): print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) continue # just continue # END routine exception handling + + # END handle routine release # END endless loop - def routine(self): - """:return: routine we are currently executing, or None if we have no task""" - return self._current_routine - def stop_and_join(self): """Send stop message to ourselves""" self.inq.put((self.stop, None)) diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 00d0dbab..11ab75a6 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -101,10 +101,12 @@ class HSCondition(deque): waiter.acquire() # get it the first time, no blocking self.append(waiter) - # in the momemnt we release our lock, someone else might actually resume - self._lock.release() - try: # restore state no matter what (e.g., KeyboardInterrupt) + + try: + # restore state no matter what (e.g., KeyboardInterrupt) # now we block, as we hold the lock already + # in the momemnt we release our lock, someone else might actually resume + self._lock.release() if timeout is None: waiter.acquire() else: @@ -206,7 +208,6 @@ class AsyncQueue(deque): return old finally: self.mutex.release() - # if we won't receive anymore items, inform the getters if not state: self.not_empty.notify_all() @@ -222,6 +223,13 @@ class AsyncQueue(deque): def put(self, item, block=True, timeout=None): self.mutex.acquire() + # NOTE: we explicitly do NOT check for our writable state + # Its just used as a notification signal, and we need to be able + # to continue writing to prevent threads ( easily ) from failing + # to write their computed results, which we want in fact + # NO: we want them to fail and stop processing, as the one who caused + # the channel to close had a reason and wants the threads to + # stop on the task as soon as possible if not self._writable: self.mutex.release() raise ReadOnly |