From ab59f78341f1dd188aaf4c30526f6295c63438b1 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 5 Jun 2010 20:03:09 +0200 Subject: Renamed mp to async, as this is a much better name for what is actually going on. The default implementation uses threads, which ends up being nothing more than async, as they are all locked down by internal and the global interpreter lock --- lib/git/async/thread.py | 203 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 203 insertions(+) create mode 100644 lib/git/async/thread.py (limited to 'lib/git/async/thread.py') diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py new file mode 100644 index 00000000..3938666a --- /dev/null +++ b/lib/git/async/thread.py @@ -0,0 +1,203 @@ +# -*- coding: utf-8 -*- +"""Module with threading utilities""" +__docformat__ = "restructuredtext" +import threading +import inspect +import Queue + +#{ Decorators + +def do_terminate_threads(whitelist=list()): + """Simple function which terminates all of our threads + :param whitelist: If whitelist is given, only the given threads will be terminated""" + for t in threading.enumerate(): + if not isinstance(t, TerminatableThread): + continue + if whitelist and t not in whitelist: + continue + if isinstance(t, WorkerThread): + t.inq.put(t.quit) + # END worker special handling + t.stop_and_join() + # END for each thread + +def terminate_threads( func ): + """Kills all worker threads the method has created by sending the quit signal. + This takes over in case of an error in the main function""" + def wrapper(*args, **kwargs): + cur_threads = set(threading.enumerate()) + try: + return func(*args, **kwargs) + finally: + do_terminate_threads(set(threading.enumerate()) - cur_threads) + # END finally shutdown threads + # END wrapper + wrapper.__name__ = func.__name__ + return wrapper + +#} END decorators + +#{ Classes + +class TerminatableThread(threading.Thread): + """A simple thread able to terminate itself on behalf of the user. + + Terminate a thread as follows: + + t.stop_and_join() + + Derived classes call _should_terminate() to determine whether they should + abort gracefully + """ + __slots__ = '_terminate' + + def __init__(self): + super(TerminatableThread, self).__init__() + self._terminate = False + + + #{ Subclass Interface + def _should_terminate(self): + """:return: True if this thread should terminate its operation immediately""" + return self._terminate + + def _terminated(self): + """Called once the thread terminated. Its called in the main thread + and may perform cleanup operations""" + pass + + def start(self): + """Start the thread and return self""" + super(TerminatableThread, self).start() + return self + + #} END subclass interface + + #{ Interface + + def stop_and_join(self): + """Ask the thread to stop its operation and wait for it to terminate + :note: Depending on the implenetation, this might block a moment""" + self._terminate = True + self.join() + self._terminated() + #} END interface + + +class WorkerThread(TerminatableThread): + """ + This base allows to call functions on class instances natively and retrieve + their results asynchronously using a queue. + The thread runs forever unless it receives the terminate signal using + its task queue. + + Tasks could be anything, but should usually be class methods and arguments to + allow the following: + + inq = Queue() + outq = Queue() + w = WorkerThread(inq, outq) + w.start() + inq.put((WorkerThread., args, kwargs)) + res = outq.get() + + finally we call quit to terminate asap. + + alternatively, you can make a call more intuitively - the output is the output queue + allowing you to get the result right away or later + w.call(arg, kwarg='value').get() + + inq.put(WorkerThread.quit) + w.join() + + You may provide the following tuples as task: + t[0] = class method, function or instance method + t[1] = optional, tuple or list of arguments to pass to the routine + t[2] = optional, dictionary of keyword arguments to pass to the routine + """ + __slots__ = ('inq', 'outq') + + class InvalidRoutineError(Exception): + """Class sent as return value in case of an error""" + + def __init__(self, inq = None, outq = None): + super(WorkerThread, self).__init__() + self.inq = inq or Queue.Queue() + self.outq = outq or Queue.Queue() + + def call(self, function, *args, **kwargs): + """Method that makes the call to the worker using the input queue, + returning our output queue + + :param funciton: can be a standalone function unrelated to this class, + a class method of this class or any instance method. + If it is a string, it will be considered a function residing on this instance + :param args: arguments to pass to function + :parma **kwargs: kwargs to pass to function""" + self.inq.put((function, args, kwargs)) + return self.outq + + def wait_until_idle(self): + """wait until the input queue is empty, in the meanwhile, take all + results off the output queue.""" + while not self.inq.empty(): + try: + self.outq.get(False) + except Queue.Empty: + continue + # END while there are tasks on the queue + + def run(self): + """Process input tasks until we receive the quit signal""" + while True: + if self._should_terminate(): + break + # END check for stop request + routine = self.__class__.quit + args = tuple() + kwargs = dict() + tasktuple = self.inq.get() + + if isinstance(tasktuple, (tuple, list)): + if len(tasktuple) == 3: + routine, args, kwargs = tasktuple + elif len(tasktuple) == 2: + routine, args = tasktuple + elif len(tasktuple) == 1: + routine = tasktuple[0] + # END tasktuple length check + elif inspect.isroutine(tasktuple): + routine = tasktuple + # END tasktuple handling + + try: + rval = None + if inspect.ismethod(routine): + if routine.im_self is None: + rval = routine(self, *args, **kwargs) + else: + rval = routine(*args, **kwargs) + elif inspect.isroutine(routine): + rval = routine(*args, **kwargs) + elif isinstance(routine, basestring) and hasattr(self, routine): + rval = getattr(self, routine)(*args, **kwargs) + else: + # ignore unknown items + print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) + self.outq.put(self.InvalidRoutineError(routine)) + break + # END make routine call + self.outq.put(rval) + except StopIteration: + break + except Exception,e: + print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e)) + self.outq.put(e) + # END routine exception handling + # END endless loop + + def quit(self): + raise StopIteration + + +#} END classes -- cgit v1.2.1 From ec28ad575ce1d7bb6a616ffc404f32bbb1af67b2 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 12:48:25 +0200 Subject: thread: adjusted worker thread not to provide an output queue anymore - this is handled by the task system graph: implemented it including test according to the pools requirements pool: implemented set_pool_size --- lib/git/async/thread.py | 27 +++++---------------------- 1 file changed, 5 insertions(+), 22 deletions(-) (limited to 'lib/git/async/thread.py') diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 3938666a..7ca93c86 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -85,9 +85,9 @@ class TerminatableThread(threading.Thread): class WorkerThread(TerminatableThread): - """ - This base allows to call functions on class instances natively and retrieve - their results asynchronously using a queue. + """ This base allows to call functions on class instances natively. + As it is meant to work with a pool, the result of the call must be + handled by the callee. The thread runs forever unless it receives the terminate signal using its task queue. @@ -95,11 +95,9 @@ class WorkerThread(TerminatableThread): allow the following: inq = Queue() - outq = Queue() - w = WorkerThread(inq, outq) + w = WorkerThread(inq) w.start() inq.put((WorkerThread., args, kwargs)) - res = outq.get() finally we call quit to terminate asap. @@ -120,10 +118,9 @@ class WorkerThread(TerminatableThread): class InvalidRoutineError(Exception): """Class sent as return value in case of an error""" - def __init__(self, inq = None, outq = None): + def __init__(self, inq = None): super(WorkerThread, self).__init__() self.inq = inq or Queue.Queue() - self.outq = outq or Queue.Queue() def call(self, function, *args, **kwargs): """Method that makes the call to the worker using the input queue, @@ -135,17 +132,6 @@ class WorkerThread(TerminatableThread): :param args: arguments to pass to function :parma **kwargs: kwargs to pass to function""" self.inq.put((function, args, kwargs)) - return self.outq - - def wait_until_idle(self): - """wait until the input queue is empty, in the meanwhile, take all - results off the output queue.""" - while not self.inq.empty(): - try: - self.outq.get(False) - except Queue.Empty: - continue - # END while there are tasks on the queue def run(self): """Process input tasks until we receive the quit signal""" @@ -184,15 +170,12 @@ class WorkerThread(TerminatableThread): else: # ignore unknown items print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) - self.outq.put(self.InvalidRoutineError(routine)) break # END make routine call - self.outq.put(rval) except StopIteration: break except Exception,e: print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e)) - self.outq.put(e) # END routine exception handling # END endless loop -- cgit v1.2.1 From b3cde0ee162b8f0cb67da981311c8f9c16050a62 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 18:13:21 +0200 Subject: First step of testing the pool - tasks have been separated into a new module including own tests, their design improved to prepare them for some specifics that would be needed for multiprocessing support --- lib/git/async/thread.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'lib/git/async/thread.py') diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 7ca93c86..82acbd8f 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -139,10 +139,15 @@ class WorkerThread(TerminatableThread): if self._should_terminate(): break # END check for stop request - routine = self.__class__.quit + routine = None args = tuple() kwargs = dict() - tasktuple = self.inq.get() + # don't wait too long, instead check for the termination request more often + try: + tasktuple = self.inq.get(True, 1) + except Queue.Empty: + continue + # END get task with timeout if isinstance(tasktuple, (tuple, list)): if len(tasktuple) == 3: -- cgit v1.2.1 From 6a252661c3bf4202a4d571f9c41d2afa48d9d75f Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 23:41:20 +0200 Subject: pool: First version which works as expected in async mode. Its just using a single task for now, but next up are dependent tasks --- lib/git/async/thread.py | 43 +++++++++---------------------------------- 1 file changed, 9 insertions(+), 34 deletions(-) (limited to 'lib/git/async/thread.py') diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 82acbd8f..0292289d 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -115,33 +115,17 @@ class WorkerThread(TerminatableThread): """ __slots__ = ('inq', 'outq') - class InvalidRoutineError(Exception): - """Class sent as return value in case of an error""" - def __init__(self, inq = None): super(WorkerThread, self).__init__() self.inq = inq or Queue.Queue() - def call(self, function, *args, **kwargs): - """Method that makes the call to the worker using the input queue, - returning our output queue - - :param funciton: can be a standalone function unrelated to this class, - a class method of this class or any instance method. - If it is a string, it will be considered a function residing on this instance - :param args: arguments to pass to function - :parma **kwargs: kwargs to pass to function""" - self.inq.put((function, args, kwargs)) - def run(self): """Process input tasks until we receive the quit signal""" while True: if self._should_terminate(): break # END check for stop request - routine = None - args = tuple() - kwargs = dict() + # don't wait too long, instead check for the termination request more often try: tasktuple = self.inq.get(True, 1) @@ -149,29 +133,19 @@ class WorkerThread(TerminatableThread): continue # END get task with timeout - if isinstance(tasktuple, (tuple, list)): - if len(tasktuple) == 3: - routine, args, kwargs = tasktuple - elif len(tasktuple) == 2: - routine, args = tasktuple - elif len(tasktuple) == 1: - routine = tasktuple[0] - # END tasktuple length check - elif inspect.isroutine(tasktuple): - routine = tasktuple - # END tasktuple handling + # needing exactly one function, and one arg + assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need" + routine, arg = tasktuple try: rval = None if inspect.ismethod(routine): if routine.im_self is None: - rval = routine(self, *args, **kwargs) + rval = routine(self, arg) else: - rval = routine(*args, **kwargs) + rval = routine(arg) elif inspect.isroutine(routine): - rval = routine(*args, **kwargs) - elif isinstance(routine, basestring) and hasattr(self, routine): - rval = getattr(self, routine)(*args, **kwargs) + rval = routine(arg) else: # ignore unknown items print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) @@ -180,7 +154,8 @@ class WorkerThread(TerminatableThread): except StopIteration: break except Exception,e: - print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e)) + print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) + break # abort ... # END routine exception handling # END endless loop -- cgit v1.2.1 From a8a448b7864e21db46184eab0f0a21d7725d074f Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 10:38:22 +0200 Subject: pool.consumed_tasks: is now a queue to be thread safe, in preparation for multiple connected pools Reduced waiting time in tests to make them complete faster --- lib/git/async/thread.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'lib/git/async/thread.py') diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 0292289d..2ed002e9 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -115,6 +115,11 @@ class WorkerThread(TerminatableThread): """ __slots__ = ('inq', 'outq') + + # define how often we should check for a shutdown request in case our + # taskqueue is empty + shutdown_check_time_s = 0.5 + def __init__(self, inq = None): super(WorkerThread, self).__init__() self.inq = inq or Queue.Queue() -- cgit v1.2.1 From 8c3c271b0d6b5f56b86e3f177caf3e916b509b52 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 13:05:35 +0200 Subject: Added task order cache, and a lock to prevent us walking the graph while changing tasks Now processing more items to test performance, in dual-threaded mode as well, and its rather bad, have to figure out the reason for this, probably gil, but queues could help --- lib/git/async/thread.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'lib/git/async/thread.py') diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 2ed002e9..f875f094 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -141,7 +141,8 @@ class WorkerThread(TerminatableThread): # needing exactly one function, and one arg assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need" routine, arg = tasktuple - + # DEBUG + # print "%s: picked up: %s(%s)" % (self.name, routine, arg) try: rval = None if inspect.ismethod(routine): -- cgit v1.2.1 From edd9e23c766cfd51b3a6f6eee5aac0b791ef2fd0 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 17:16:48 +0200 Subject: added high-speed locking facilities, allowing our Queue to be faster, at least in tests, and with multiple threads. There is still an sync bug in regard to closed channels to be fixed, as the Task.set_done handling is incorrecft --- lib/git/async/thread.py | 3 --- 1 file changed, 3 deletions(-) (limited to 'lib/git/async/thread.py') diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index f875f094..f7f0c978 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -15,9 +15,6 @@ def do_terminate_threads(whitelist=list()): continue if whitelist and t not in whitelist: continue - if isinstance(t, WorkerThread): - t.inq.put(t.quit) - # END worker special handling t.stop_and_join() # END for each thread -- cgit v1.2.1 From 583cd8807259a69fc01874b798f657c1f9ab7828 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 19:12:44 +0200 Subject: Moved pool utilities into util module, fixed critical issue that caused havok - lets call this a safe-state --- lib/git/async/thread.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) (limited to 'lib/git/async/thread.py') diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index f7f0c978..4240a664 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -110,7 +110,7 @@ class WorkerThread(TerminatableThread): t[1] = optional, tuple or list of arguments to pass to the routine t[2] = optional, dictionary of keyword arguments to pass to the routine """ - __slots__ = ('inq', 'outq') + __slots__ = ('inq', '_current_routine') # define how often we should check for a shutdown request in case our @@ -120,10 +120,12 @@ class WorkerThread(TerminatableThread): def __init__(self, inq = None): super(WorkerThread, self).__init__() self.inq = inq or Queue.Queue() + self._current_routine = None # routine we execute right now def run(self): """Process input tasks until we receive the quit signal""" while True: + self._current_routine = None if self._should_terminate(): break # END check for stop request @@ -138,8 +140,9 @@ class WorkerThread(TerminatableThread): # needing exactly one function, and one arg assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need" routine, arg = tasktuple - # DEBUG - # print "%s: picked up: %s(%s)" % (self.name, routine, arg) + + self._current_routine = routine + try: rval = None if inspect.ismethod(routine): @@ -154,16 +157,15 @@ class WorkerThread(TerminatableThread): print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) break # END make routine call - except StopIteration: - break except Exception,e: print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) break # abort ... # END routine exception handling # END endless loop - def quit(self): - raise StopIteration + def routine(self): + """:return: routine we are currently executing, or None if we have no task""" + return self._current_routine #} END classes -- cgit v1.2.1 From 619c11787742ce00a0ee8f841cec075897873c79 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 16:47:48 +0200 Subject: Its getting better already - intermediate commit before further chaning the task class --- lib/git/async/thread.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'lib/git/async/thread.py') diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 4240a664..5faad4f8 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -124,6 +124,7 @@ class WorkerThread(TerminatableThread): def run(self): """Process input tasks until we receive the quit signal""" + gettask = self.inq.get while True: self._current_routine = None if self._should_terminate(): @@ -132,7 +133,7 @@ class WorkerThread(TerminatableThread): # don't wait too long, instead check for the termination request more often try: - tasktuple = self.inq.get(True, 1) + tasktuple = gettask(True, 0.25) except Queue.Empty: continue # END get task with timeout -- cgit v1.2.1 From 772b95631916223e472989b43f3a31f61e237f31 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 19:25:33 +0200 Subject: workerthread: adjusted to use a blocking queue, it will receive termination events only with its queue, with boosts performance into brigt green levels --- lib/git/async/thread.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) (limited to 'lib/git/async/thread.py') diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 5faad4f8..556b7e92 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -80,6 +80,10 @@ class TerminatableThread(threading.Thread): self._terminated() #} END interface + +class StopProcessing(Exception): + """If thrown in a function processed by a WorkerThread, it will terminate""" + class WorkerThread(TerminatableThread): """ This base allows to call functions on class instances natively. @@ -122,6 +126,11 @@ class WorkerThread(TerminatableThread): self.inq = inq or Queue.Queue() self._current_routine = None # routine we execute right now + @classmethod + def stop(cls, *args): + """If send via the inq of the thread, it will stop once it processed the function""" + raise StopProcessing + def run(self): """Process input tasks until we receive the quit signal""" gettask = self.inq.get @@ -131,12 +140,8 @@ class WorkerThread(TerminatableThread): break # END check for stop request - # don't wait too long, instead check for the termination request more often - try: - tasktuple = gettask(True, 0.25) - except Queue.Empty: - continue - # END get task with timeout + # we wait and block - to terminate, send the 'stop' method + tasktuple = gettask() # needing exactly one function, and one arg assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need" @@ -158,6 +163,8 @@ class WorkerThread(TerminatableThread): print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) break # END make routine call + except StopProcessing: + break except Exception,e: print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) break # abort ... @@ -168,5 +175,8 @@ class WorkerThread(TerminatableThread): """:return: routine we are currently executing, or None if we have no task""" return self._current_routine - + def stop_and_join(self): + """Send stop message to ourselves""" + self.inq.put((self.stop, None)) + super(WorkerThread, self).stop_and_join() #} END classes -- cgit v1.2.1 From 15941ca090a2c3c987324fc911bbc6f89e941c47 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 10:34:12 +0200 Subject: queue: fixed critical bug in the notify method, as it was not at all thread-safe, causing locks to be released multiple times. Now it runs very fast, and very stable apparently. Now its about putting previous features back in, and studying their results, before more complex task graphs can be examined --- lib/git/async/thread.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'lib/git/async/thread.py') diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 556b7e92..cd964f1c 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -5,6 +5,8 @@ import threading import inspect import Queue +import sys + #{ Decorators def do_terminate_threads(whitelist=list()): @@ -160,14 +162,15 @@ class WorkerThread(TerminatableThread): rval = routine(arg) else: # ignore unknown items - print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) + print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) break # END make routine call except StopProcessing: + print self.name, "stops processing" break except Exception,e: - print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) - break # abort ... + print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) + continue # just continue # END routine exception handling # END endless loop -- cgit v1.2.1 From f2c8d26d3b25b864ad48e6de018757266b59f708 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 11:28:37 +0200 Subject: thread: fixed initialization problem if an empty iterable was handed in queue: Queue now derives from deque directly, which safes one dict lookup as the queue does not need to be accessed through self anymore pool test improved to better verify threads are started correctly --- lib/git/async/thread.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'lib/git/async/thread.py') diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index cd964f1c..faeda04f 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -125,7 +125,9 @@ class WorkerThread(TerminatableThread): def __init__(self, inq = None): super(WorkerThread, self).__init__() - self.inq = inq or Queue.Queue() + self.inq = inq + if inq is None: + self.inq = Queue.Queue() self._current_routine = None # routine we execute right now @classmethod @@ -135,6 +137,8 @@ class WorkerThread(TerminatableThread): def run(self): """Process input tasks until we receive the quit signal""" + print self.name, "starts processing" # DEBUG + gettask = self.inq.get while True: self._current_routine = None @@ -166,7 +170,7 @@ class WorkerThread(TerminatableThread): break # END make routine call except StopProcessing: - print self.name, "stops processing" + print self.name, "stops processing" # DEBUG break except Exception,e: print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) -- cgit v1.2.1 From fbe062bf6dacd3ad63dd827d898337fa542931ac Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 23:55:50 +0200 Subject: Added dependency-task tests, and fixed plenty of ref-count related bugs, as well as concurrency issues. Now it works okay, but the thread-shutdown is still an issue, as it causes incorrect behaviour making the tests fail. Its good, as it hints at additional issues that need to be solved. There is just a little more left on the feature side, but its nearly there --- lib/git/async/thread.py | 43 ++++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 21 deletions(-) (limited to 'lib/git/async/thread.py') diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index faeda04f..b8d2e418 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -116,7 +116,7 @@ class WorkerThread(TerminatableThread): t[1] = optional, tuple or list of arguments to pass to the routine t[2] = optional, dictionary of keyword arguments to pass to the routine """ - __slots__ = ('inq', '_current_routine') + __slots__ = ('inq') # define how often we should check for a shutdown request in case our @@ -128,7 +128,6 @@ class WorkerThread(TerminatableThread): self.inq = inq if inq is None: self.inq = Queue.Queue() - self._current_routine = None # routine we execute right now @classmethod def stop(cls, *args): @@ -141,7 +140,6 @@ class WorkerThread(TerminatableThread): gettask = self.inq.get while True: - self._current_routine = None if self._should_terminate(): break # END check for stop request @@ -153,22 +151,27 @@ class WorkerThread(TerminatableThread): assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need" routine, arg = tasktuple - self._current_routine = routine - try: - rval = None - if inspect.ismethod(routine): - if routine.im_self is None: - rval = routine(self, arg) - else: + try: + rval = None + if inspect.ismethod(routine): + if routine.im_self is None: + rval = routine(self, arg) + else: + rval = routine(arg) + elif inspect.isroutine(routine): rval = routine(arg) - elif inspect.isroutine(routine): - rval = routine(arg) - else: - # ignore unknown items - print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) - break - # END make routine call + else: + # ignore unknown items + print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) + break + # END make routine call + finally: + # make sure we delete the routine to release the reference as soon + # as possible. Otherwise objects might not be destroyed + # while we are waiting + del(routine) + del(tasktuple) except StopProcessing: print self.name, "stops processing" # DEBUG break @@ -176,12 +179,10 @@ class WorkerThread(TerminatableThread): print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) continue # just continue # END routine exception handling + + # END handle routine release # END endless loop - def routine(self): - """:return: routine we are currently executing, or None if we have no task""" - return self._current_routine - def stop_and_join(self): """Send stop message to ourselves""" self.inq.put((self.stop, None)) -- cgit v1.2.1 From 6d1212e8c412b0b4802bc1080d38d54907db879d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 11:52:01 +0200 Subject: IMPORTANT: sometimes, when notifying waiters by releasing their lock, the lock is not actually released or they are not actually notifyied, staying in a beautysleep. This glitch is probably caused by some detail not treated correctly in the thread python module, which is something we cannot fix. It works most of the time as expected though - maybe some cleanup is not done correctly which causes this --- lib/git/async/thread.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'lib/git/async/thread.py') diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index b8d2e418..4d046a2f 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -146,9 +146,7 @@ class WorkerThread(TerminatableThread): # we wait and block - to terminate, send the 'stop' method tasktuple = gettask() - # needing exactly one function, and one arg - assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need" routine, arg = tasktuple try: -- cgit v1.2.1 From 1873db442dc7511fc2c92fbaeb8d998d3e62723d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 16:25:27 +0200 Subject: Improved shutdown handling - although its impossible to prevent some stderr printing thanks to the underlying threading implementation, we can at least make sure that the interpreter doesn't block during shutdown. Now it appears to be running smoothly --- lib/git/async/thread.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) (limited to 'lib/git/async/thread.py') diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 4d046a2f..afe0d79d 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -136,16 +136,21 @@ class WorkerThread(TerminatableThread): def run(self): """Process input tasks until we receive the quit signal""" - print self.name, "starts processing" # DEBUG - gettask = self.inq.get while True: if self._should_terminate(): break # END check for stop request - # we wait and block - to terminate, send the 'stop' method + # note: during shutdown, this turns None in the middle of waiting + # for an item to be put onto it - we can't du anything about it - + # even if we catch everything and break gracefully, the parent + # call will think we failed with an empty exception. + # Hence we just don't do anything about it. Alternatively + # we could override the start method to get our own bootstrapping, + # which would mean repeating plenty of code in of the threading module. tasktuple = gettask() + # needing exactly one function, and one arg routine, arg = tasktuple @@ -161,7 +166,7 @@ class WorkerThread(TerminatableThread): rval = routine(arg) else: # ignore unknown items - print >> sys.stderr, "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) + sys.stderr.write("%s: task %s was not understood - terminating\n" % (self.getName(), str(tasktuple))) break # END make routine call finally: @@ -171,10 +176,9 @@ class WorkerThread(TerminatableThread): del(routine) del(tasktuple) except StopProcessing: - print self.name, "stops processing" # DEBUG break except Exception,e: - print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) + sys.stderr.write("%s: Task %s raised unhandled exception: %s - this really shouldn't happen !\n" % (self.getName(), str(tasktuple), str(e))) continue # just continue # END routine exception handling @@ -182,7 +186,12 @@ class WorkerThread(TerminatableThread): # END endless loop def stop_and_join(self): - """Send stop message to ourselves""" + """Send stop message to ourselves - we don't block, the thread will terminate + once it has finished processing its input queue to receive our termination + event""" + # DONT call superclass as it will try to join - join's don't work for + # some reason, as python apparently doesn't switch threads (so often) + # while waiting ... I don't know, but the threads respond properly, + # but only if dear python switches to them self.inq.put((self.stop, None)) - super(WorkerThread, self).stop_and_join() #} END classes -- cgit v1.2.1 From cac6e06cc9ef2903a15e594186445f3baa989a1a Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 16:58:44 +0200 Subject: test_task: fixed import error, made all modules from x import * safe --- lib/git/async/thread.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'lib/git/async/thread.py') diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index afe0d79d..96b4f0c4 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -7,6 +7,10 @@ import Queue import sys +__all__ = ('do_terminate_threads', 'terminate_threads', 'TerminatableThread', + 'WorkerThread') + + #{ Decorators def do_terminate_threads(whitelist=list()): -- cgit v1.2.1