diff options
Diffstat (limited to 'lib/git/async')
-rw-r--r-- | lib/git/async/channel.py | 10 | ||||
-rw-r--r-- | lib/git/async/pool.py | 44 | ||||
-rw-r--r-- | lib/git/async/task.py | 12 | ||||
-rw-r--r-- | lib/git/async/thread.py | 43 |
4 files changed, 55 insertions, 54 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py index 0a1db26b..2add9478 100644 --- a/lib/git/async/channel.py +++ b/lib/git/async/channel.py @@ -162,7 +162,15 @@ class RChannel(Channel): try: if wc.closed: have_timeout = True - break + # its about the 'in the meanwhile' :) - get everything + # we can in non-blocking mode. This will raise + try: + while True: + out.append(queue.get(False)) + # END until it raises Empty + except Empty: + break + # END finally, out of here # END don't continue on closed channels # END abort reading if it was closed ( in the meanwhile ) diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 620e2258..fcb0f442 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -49,7 +49,7 @@ class RPoolChannel(RChannel): If a function is not provided, the call is effectively uninstalled.""" self._post_cb = fun - def read(self, count=0, block=False, timeout=None): + def read(self, count=0, block=True, timeout=None): """Read an item that was processed by one of our threads :note: Triggers task dependency handling needed to provide the necessary input""" @@ -57,14 +57,21 @@ class RPoolChannel(RChannel): self._pre_cb() # END pre callback - ################################################## - self._pool._prepare_processing(self._task, count) - ################################################## + ########## prepare ############################## + self._pool._prepare_channel_read(self._task, count) + + ######### read data ###### + # read actual items, tasks were setup to put their output into our channel ( as well ) items = RChannel.read(self, count, block, timeout) + if self._post_cb: items = self._post_cb(items) + + ####### Finalize ######## + self._pool._post_channel_read(self._task) + return items #{ Internal @@ -119,17 +126,17 @@ class ThreadPool(object): self._consumed_tasks.append(task) return True # END stop processing - - # allow min-count override. This makes sure we take at least min-count - # items off the input queue ( later ) - if task.min_count is not None and count != 0 and count < task.min_count: - count = task.min_count - # END handle min-count # if the task does not have the required output on its queue, schedule # it for processing. If we should process all, we don't care about the # amount as it should process until its all done. if count < 1 or task._out_wc.size() < count: + # allow min-count override. This makes sure we take at least min-count + # items off the input queue ( later ) + if task.min_count is not None and 0 < count < task.min_count: + count = task.min_count + # END handle min-count + numchunks = 1 chunksize = count remainder = 0 @@ -144,10 +151,10 @@ class ThreadPool(object): remainder = count - (numchunks * chunksize) # END handle chunking - print count, numchunks, chunksize, remainder # the following loops are kind of unrolled - code duplication # should make things execute faster. Putting the if statements # into the loop would be less code, but ... slower + print count, numchunks, chunksize, remainder, task._out_wc.size() if self._workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task @@ -176,18 +183,13 @@ class ThreadPool(object): if remainder: task.process(remainder) # END handle chunksize - - # as we are serial, we can check for consumption right away - if task.error() or task.is_done(): - self._consumed_tasks.append(task) - # END handle consumption # END handle serial mode # END handle queuing # always walk the whole graph, we want to find consumed tasks return True - def _prepare_processing(self, task, count): + def _prepare_channel_read(self, task, count): """Process the tasks which depend on the given one to be sure the input channels are filled with data once we process the actual task @@ -201,10 +203,18 @@ class ThreadPool(object): is fine as we walked them depth-first.""" self._tasks.visit_input_inclusive_depth_first(task, lambda n: self._queue_feeder_visitor(n, count)) + def _post_channel_read(self, task): + """Called after we processed a read to cleanup""" + # check whether we consumed the task, and schedule it for deletion + if task.error() or task.is_done(): + self._consumed_tasks.append(task) + # END handle consumption + # delete consumed tasks to cleanup for task in self._consumed_tasks: self.del_task(task) # END for each task to delete + del(self._consumed_tasks[:]) def _del_task_if_orphaned(self, task): diff --git a/lib/git/async/task.py b/lib/git/async/task.py index ec650237..3137746c 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -7,7 +7,13 @@ class OutputChannelTask(Node): additional information on how the task should be queued and processed. Results of the item processing are sent to an output channel, which is to be - set by the creator""" + set by the creator + + * **min_count** assures that not less than min_count items will be processed per call. + * **max_chunksize** assures that multi-threading is happening in smaller chunks. If + someone wants all items to be processed, using read(0), the whole task would go to + one worker, as well as dependent tasks. If you want finer granularity , you can + specify this here, causing chunks to be no larger than max_chunksize""" __slots__ = ( '_read', # method to yield items to process '_out_wc', # output write channel '_exc', # exception caught @@ -42,7 +48,6 @@ class OutputChannelTask(Node): def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) - try: if self.apply_single: for item in items: @@ -58,6 +63,9 @@ class OutputChannelTask(Node): # if we didn't get all demanded items, which is also the case if count is 0 # we have depleted the input channel and are done + # We could check our output channel for how many items we have and put that + # into the equation, but whats important is that we were asked to produce + # count items. if not items or len(items) != count: self.set_done() # END handle done state diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 82acbd8f..0292289d 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -115,33 +115,17 @@ class WorkerThread(TerminatableThread): """ __slots__ = ('inq', 'outq') - class InvalidRoutineError(Exception): - """Class sent as return value in case of an error""" - def __init__(self, inq = None): super(WorkerThread, self).__init__() self.inq = inq or Queue.Queue() - def call(self, function, *args, **kwargs): - """Method that makes the call to the worker using the input queue, - returning our output queue - - :param funciton: can be a standalone function unrelated to this class, - a class method of this class or any instance method. - If it is a string, it will be considered a function residing on this instance - :param args: arguments to pass to function - :parma **kwargs: kwargs to pass to function""" - self.inq.put((function, args, kwargs)) - def run(self): """Process input tasks until we receive the quit signal""" while True: if self._should_terminate(): break # END check for stop request - routine = None - args = tuple() - kwargs = dict() + # don't wait too long, instead check for the termination request more often try: tasktuple = self.inq.get(True, 1) @@ -149,29 +133,19 @@ class WorkerThread(TerminatableThread): continue # END get task with timeout - if isinstance(tasktuple, (tuple, list)): - if len(tasktuple) == 3: - routine, args, kwargs = tasktuple - elif len(tasktuple) == 2: - routine, args = tasktuple - elif len(tasktuple) == 1: - routine = tasktuple[0] - # END tasktuple length check - elif inspect.isroutine(tasktuple): - routine = tasktuple - # END tasktuple handling + # needing exactly one function, and one arg + assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need" + routine, arg = tasktuple try: rval = None if inspect.ismethod(routine): if routine.im_self is None: - rval = routine(self, *args, **kwargs) + rval = routine(self, arg) else: - rval = routine(*args, **kwargs) + rval = routine(arg) elif inspect.isroutine(routine): - rval = routine(*args, **kwargs) - elif isinstance(routine, basestring) and hasattr(self, routine): - rval = getattr(self, routine)(*args, **kwargs) + rval = routine(arg) else: # ignore unknown items print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) @@ -180,7 +154,8 @@ class WorkerThread(TerminatableThread): except StopIteration: break except Exception,e: - print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e)) + print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) + break # abort ... # END routine exception handling # END endless loop |