diff options
Diffstat (limited to 'Lib/threading.py')
-rw-r--r-- | Lib/threading.py | 681 |
1 files changed, 442 insertions, 239 deletions
diff --git a/Lib/threading.py b/Lib/threading.py index 58ffa7ebc2..625c9b9d7b 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -3,14 +3,18 @@ import sys as _sys import _thread -from time import time as _time, sleep as _sleep +from time import sleep as _sleep +try: + from time import monotonic as _time +except ImportError: + from time import time as _time from traceback import format_exc as _format_exc from _weakrefset import WeakSet # Note regarding PEP 8 compliant names # This threading model was originally inspired by Java, and inherited # the convention of camelCase function and method names from that -# language. Those originaly names are not in any imminent danger of +# language. Those original names are not in any imminent danger of # being deprecated (even for Py3k),so this module provides them as an # alias for the PEP 8 compliant names # Note that using the new PEP 8 compliant names facilitates substitution @@ -19,12 +23,12 @@ from _weakrefset import WeakSet __all__ = ['active_count', 'Condition', 'current_thread', 'enumerate', 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 'Barrier', - 'Timer', 'setprofile', 'settrace', 'local', 'stack_size'] + 'Timer', 'ThreadError', 'setprofile', 'settrace', 'local', 'stack_size'] # Rename some stuff so "from threading import *" is safe _start_new_thread = _thread.start_new_thread _allocate_lock = _thread.allocate_lock -_get_ident = _thread.get_ident +get_ident = _thread.get_ident ThreadError = _thread.error try: _CRLock = _thread.RLock @@ -34,50 +38,28 @@ TIMEOUT_MAX = _thread.TIMEOUT_MAX del _thread -# Debug support (adapted from ihooks.py). - -_VERBOSE = False - -if __debug__: - - class _Verbose(object): - - def __init__(self, verbose=None): - if verbose is None: - verbose = _VERBOSE - self._verbose = verbose - - def _note(self, format, *args): - if self._verbose: - format = format % args - # Issue #4188: calling current_thread() can incur an infinite - # recursion if it has to create a DummyThread on the fly. - ident = _get_ident() - try: - name = _active[ident].name - except KeyError: - name = "<OS thread %d>" % ident - format = "%s: %s\n" % (name, format) - _sys.stderr.write(format) - -else: - # Disable this when using "python -O" - class _Verbose(object): - def __init__(self, verbose=None): - pass - def _note(self, *args): - pass - # Support for profile and trace hooks _profile_hook = None _trace_hook = None def setprofile(func): + """Set a profile function for all threads started from the threading module. + + The func will be passed to sys.setprofile() for each thread, before its + run() method is called. + + """ global _profile_hook _profile_hook = func def settrace(func): + """Set a trace function for all threads started from the threading module. + + The func will be passed to sys.settrace() for each thread, before its run() + method is called. + + """ global _trace_hook _trace_hook = func @@ -85,17 +67,30 @@ def settrace(func): Lock = _allocate_lock -def RLock(verbose=None, *args, **kwargs): - if verbose is None: - verbose = _VERBOSE - if (__debug__ and verbose) or _CRLock is None: - return _PyRLock(verbose, *args, **kwargs) +def RLock(*args, **kwargs): + """Factory function that returns a new reentrant lock. + + A reentrant lock must be released by the thread that acquired it. Once a + thread has acquired a reentrant lock, the same thread may acquire it again + without blocking; the thread must release it once for each time it has + acquired it. + + """ + if _CRLock is None: + return _PyRLock(*args, **kwargs) return _CRLock(*args, **kwargs) -class _RLock(_Verbose): +class _RLock: + """This class implements reentrant lock objects. + + A reentrant lock must be released by the thread that acquired it. Once a + thread has acquired a reentrant lock, the same thread may acquire it + again without blocking; the thread must release it once for each time it + has acquired it. + + """ - def __init__(self, verbose=None): - _Verbose.__init__(self, verbose) + def __init__(self): self._block = _allocate_lock() self._owner = None self._count = 0 @@ -110,37 +105,65 @@ class _RLock(_Verbose): self.__class__.__name__, owner, self._count) def acquire(self, blocking=True, timeout=-1): - me = _get_ident() + """Acquire a lock, blocking or non-blocking. + + When invoked without arguments: if this thread already owns the lock, + increment the recursion level by one, and return immediately. Otherwise, + if another thread owns the lock, block until the lock is unlocked. Once + the lock is unlocked (not owned by any thread), then grab ownership, set + the recursion level to one, and return. If more than one thread is + blocked waiting until the lock is unlocked, only one at a time will be + able to grab ownership of the lock. There is no return value in this + case. + + When invoked with the blocking argument set to true, do the same thing + as when called without arguments, and return true. + + When invoked with the blocking argument set to false, do not block. If a + call without an argument would block, return false immediately; + otherwise, do the same thing as when called without arguments, and + return true. + + When invoked with the floating-point timeout argument set to a positive + value, block for at most the number of seconds specified by timeout + and as long as the lock cannot be acquired. Return true if the lock has + been acquired, false if the timeout has elapsed. + + """ + me = get_ident() if self._owner == me: self._count = self._count + 1 - if __debug__: - self._note("%s.acquire(%s): recursive success", self, blocking) return 1 rc = self._block.acquire(blocking, timeout) if rc: self._owner = me self._count = 1 - if __debug__: - self._note("%s.acquire(%s): initial success", self, blocking) - else: - if __debug__: - self._note("%s.acquire(%s): failure", self, blocking) return rc __enter__ = acquire def release(self): - if self._owner != _get_ident(): + """Release a lock, decrementing the recursion level. + + If after the decrement it is zero, reset the lock to unlocked (not owned + by any thread), and if any other threads are blocked waiting for the + lock to become unlocked, allow exactly one of them to proceed. If after + the decrement the recursion level is still nonzero, the lock remains + locked and owned by the calling thread. + + Only call this method when the calling thread owns the lock. A + RuntimeError is raised if this method is called when the lock is + unlocked. + + There is no return value. + + """ + if self._owner != get_ident(): raise RuntimeError("cannot release un-acquired lock") self._count = count = self._count - 1 if not count: self._owner = None self._block.release() - if __debug__: - self._note("%s.release(): final release", self) - else: - if __debug__: - self._note("%s.release(): non-final release", self) def __exit__(self, t, v, tb): self.release() @@ -150,12 +173,10 @@ class _RLock(_Verbose): def _acquire_restore(self, state): self._block.acquire() self._count, self._owner = state - if __debug__: - self._note("%s._acquire_restore()", self) def _release_save(self): - if __debug__: - self._note("%s._release_save()", self) + if self._count == 0: + raise RuntimeError("cannot release un-acquired lock") count = self._count self._count = 0 owner = self._owner @@ -164,18 +185,24 @@ class _RLock(_Verbose): return (count, owner) def _is_owned(self): - return self._owner == _get_ident() + return self._owner == get_ident() _PyRLock = _RLock -def Condition(*args, **kwargs): - return _Condition(*args, **kwargs) +class Condition: + """Class that implements a condition variable. + + A condition variable allows one or more threads to wait until they are + notified by another thread. + + If the lock argument is given and not None, it must be a Lock or RLock + object, and it is used as the underlying lock. Otherwise, a new RLock object + is created and used as the underlying lock. -class _Condition(_Verbose): + """ - def __init__(self, lock=None, verbose=None): - _Verbose.__init__(self, verbose) + def __init__(self, lock=None): if lock is None: lock = RLock() self._lock = lock @@ -224,6 +251,28 @@ class _Condition(_Verbose): return True def wait(self, timeout=None): + """Wait until notified or until a timeout occurs. + + If the calling thread has not acquired the lock when this method is + called, a RuntimeError is raised. + + This method releases the underlying lock, and then blocks until it is + awakened by a notify() or notify_all() call for the same condition + variable in another thread, or until the optional timeout occurs. Once + awakened or timed out, it re-acquires the lock and returns. + + When the timeout argument is present and not None, it should be a + floating point number specifying a timeout for the operation in seconds + (or fractions thereof). + + When the underlying lock is an RLock, it is not released using its + release() method, since this may not actually unlock the lock when it + was acquired multiple times recursively. Instead, an internal interface + of the RLock class is used, which really unlocks it even when it has + been recursively acquired several times. Another internal interface is + then used to restore the recursion level when the lock is reacquired. + + """ if not self._is_owned(): raise RuntimeError("cannot wait on un-acquired lock") waiter = _allocate_lock() @@ -234,28 +283,28 @@ class _Condition(_Verbose): if timeout is None: waiter.acquire() gotit = True - if __debug__: - self._note("%s.wait(): got it", self) else: if timeout > 0: gotit = waiter.acquire(True, timeout) else: gotit = waiter.acquire(False) if not gotit: - if __debug__: - self._note("%s.wait(%s): timed out", self, timeout) try: self._waiters.remove(waiter) except ValueError: pass - else: - if __debug__: - self._note("%s.wait(%s): got it", self, timeout) return gotit finally: self._acquire_restore(saved_state) def wait_for(self, predicate, timeout=None): + """Wait until a condition evaluates to True. + + predicate should be a callable which result will be interpreted as a + boolean value. A timeout may be provided giving the maximum time to + wait. + + """ endtime = None waittime = timeout result = predicate() @@ -266,32 +315,27 @@ class _Condition(_Verbose): else: waittime = endtime - _time() if waittime <= 0: - if __debug__: - self._note("%s.wait_for(%r, %r): Timed out.", - self, predicate, timeout) break - if __debug__: - self._note("%s.wait_for(%r, %r): Waiting with timeout=%s.", - self, predicate, timeout, waittime) self.wait(waittime) result = predicate() - else: - if __debug__: - self._note("%s.wait_for(%r, %r): Success.", - self, predicate, timeout) return result def notify(self, n=1): + """Wake up one or more threads waiting on this condition, if any. + + If the calling thread has not acquired the lock when this method is + called, a RuntimeError is raised. + + This method wakes up at most n of the threads waiting for the condition + variable; it is a no-op if no threads are waiting. + + """ if not self._is_owned(): raise RuntimeError("cannot notify on un-acquired lock") __waiters = self._waiters waiters = __waiters[:n] if not waiters: - if __debug__: - self._note("%s.notify(): no waiters", self) return - self._note("%s.notify(): notifying %d waiter%s", self, n, - n!=1 and "s" or "") for waiter in waiters: waiter.release() try: @@ -300,93 +344,147 @@ class _Condition(_Verbose): pass def notify_all(self): + """Wake up all threads waiting on this condition. + + If the calling thread has not acquired the lock when this method + is called, a RuntimeError is raised. + + """ self.notify(len(self._waiters)) notifyAll = notify_all -def Semaphore(*args, **kwargs): - return _Semaphore(*args, **kwargs) +class Semaphore: + """This class implements semaphore objects. -class _Semaphore(_Verbose): + Semaphores manage a counter representing the number of release() calls minus + the number of acquire() calls, plus an initial value. The acquire() method + blocks if necessary until it can return without making the counter + negative. If not given, value defaults to 1. + + """ # After Tim Peters' semaphore class, but not quite the same (no maximum) - def __init__(self, value=1, verbose=None): + def __init__(self, value=1): if value < 0: raise ValueError("semaphore initial value must be >= 0") - _Verbose.__init__(self, verbose) self._cond = Condition(Lock()) self._value = value def acquire(self, blocking=True, timeout=None): + """Acquire a semaphore, decrementing the internal counter by one. + + When invoked without arguments: if the internal counter is larger than + zero on entry, decrement it by one and return immediately. If it is zero + on entry, block, waiting until some other thread has called release() to + make it larger than zero. This is done with proper interlocking so that + if multiple acquire() calls are blocked, release() will wake exactly one + of them up. The implementation may pick one at random, so the order in + which blocked threads are awakened should not be relied on. There is no + return value in this case. + + When invoked with blocking set to true, do the same thing as when called + without arguments, and return true. + + When invoked with blocking set to false, do not block. If a call without + an argument would block, return false immediately; otherwise, do the + same thing as when called without arguments, and return true. + + When invoked with a timeout other than None, it will block for at + most timeout seconds. If acquire does not complete successfully in + that interval, return false. Return true otherwise. + + """ if not blocking and timeout is not None: raise ValueError("can't specify timeout for non-blocking acquire") rc = False endtime = None - self._cond.acquire() - while self._value == 0: - if not blocking: - break - if __debug__: - self._note("%s.acquire(%s): blocked waiting, value=%s", - self, blocking, self._value) - if timeout is not None: - if endtime is None: - endtime = _time() + timeout - else: - timeout = endtime - _time() - if timeout <= 0: - break - self._cond.wait(timeout) - else: - self._value = self._value - 1 - if __debug__: - self._note("%s.acquire: success, value=%s", - self, self._value) - rc = True - self._cond.release() + with self._cond: + while self._value == 0: + if not blocking: + break + if timeout is not None: + if endtime is None: + endtime = _time() + timeout + else: + timeout = endtime - _time() + if timeout <= 0: + break + self._cond.wait(timeout) + else: + self._value = self._value - 1 + rc = True return rc __enter__ = acquire def release(self): - self._cond.acquire() - self._value = self._value + 1 - if __debug__: - self._note("%s.release: success, value=%s", - self, self._value) - self._cond.notify() - self._cond.release() + """Release a semaphore, incrementing the internal counter by one. + + When the counter is zero on entry and another thread is waiting for it + to become larger than zero again, wake up that thread. + + """ + with self._cond: + self._value = self._value + 1 + self._cond.notify() def __exit__(self, t, v, tb): self.release() -def BoundedSemaphore(*args, **kwargs): - return _BoundedSemaphore(*args, **kwargs) +class BoundedSemaphore(Semaphore): + """Implements a bounded semaphore. + + A bounded semaphore checks to make sure its current value doesn't exceed its + initial value. If it does, ValueError is raised. In most situations + semaphores are used to guard resources with limited capacity. + + If the semaphore is released too many times it's a sign of a bug. If not + given, value defaults to 1. + + Like regular semaphores, bounded semaphores manage a counter representing + the number of release() calls minus the number of acquire() calls, plus an + initial value. The acquire() method blocks if necessary until it can return + without making the counter negative. If not given, value defaults to 1. -class _BoundedSemaphore(_Semaphore): - """Semaphore that checks that # releases is <= # acquires""" - def __init__(self, value=1, verbose=None): - _Semaphore.__init__(self, value, verbose) + """ + + def __init__(self, value=1): + Semaphore.__init__(self, value) self._initial_value = value def release(self): - if self._value >= self._initial_value: - raise ValueError("Semaphore released too many times") - return _Semaphore.release(self) + """Release a semaphore, incrementing the internal counter by one. + + When the counter is zero on entry and another thread is waiting for it + to become larger than zero again, wake up that thread. + If the number of releases exceeds the number of acquires, + raise a ValueError. + + """ + with self._cond: + if self._value >= self._initial_value: + raise ValueError("Semaphore released too many times") + self._value += 1 + self._cond.notify() -def Event(*args, **kwargs): - return _Event(*args, **kwargs) -class _Event(_Verbose): +class Event: + """Class implementing event objects. + + Events manage a flag that can be set to true with the set() method and reset + to false with the clear() method. The wait() method blocks until the flag is + true. The flag is initially false. + + """ # After Tim Peters' event class (without is_posted()) - def __init__(self, verbose=None): - _Verbose.__init__(self, verbose) + def __init__(self): self._cond = Condition(Lock()) self._flag = False @@ -395,11 +493,18 @@ class _Event(_Verbose): self._cond.__init__() def is_set(self): + """Return true if and only if the internal flag is true.""" return self._flag isSet = is_set def set(self): + """Set the internal flag to true. + + All threads waiting for it to become true are awakened. Threads + that call wait() once the flag is true will not block at all. + + """ self._cond.acquire() try: self._flag = True @@ -408,6 +513,12 @@ class _Event(_Verbose): self._cond.release() def clear(self): + """Reset the internal flag to false. + + Subsequently, threads calling wait() will block until set() is called to + set the internal flag to true again. + + """ self._cond.acquire() try: self._flag = False @@ -415,6 +526,20 @@ class _Event(_Verbose): self._cond.release() def wait(self, timeout=None): + """Block until the internal flag is true. + + If the internal flag is true on entry, return immediately. Otherwise, + block until another thread calls set() to set the flag to true, or until + the optional timeout occurs. + + When the timeout argument is present and not None, it should be a + floating point number specifying a timeout for the operation in seconds + (or fractions thereof). + + This method returns the internal flag on exit, so it will always return + True except if a timeout is given and the operation times out. + + """ self._cond.acquire() try: signaled = self._flag @@ -436,22 +561,24 @@ class _Event(_Verbose): # since the previous cycle. In addition, a 'resetting' state exists which is # similar to 'draining' except that threads leave with a BrokenBarrierError, # and a 'broken' state in which all threads get the exception. -class Barrier(_Verbose): - """ - Barrier. Useful for synchronizing a fixed number of threads - at known synchronization points. Threads block on 'wait()' and are - simultaneously once they have all made that call. +class Barrier: + """Implements a Barrier. + + Useful for synchronizing a fixed number of threads at known synchronization + points. Threads block on 'wait()' and are simultaneously once they have all + made that call. + """ - def __init__(self, parties, action=None, timeout=None, verbose=None): - """ - Create a barrier, initialised to 'parties' threads. - 'action' is a callable which, when supplied, will be called - by one of the threads after they have all entered the - barrier and just prior to releasing them all. - If a 'timeout' is provided, it is uses as the default for - all subsequent 'wait()' calls. + + def __init__(self, parties, action=None, timeout=None): + """Create a barrier, initialised to 'parties' threads. + + 'action' is a callable which, when supplied, will be called by one of + the threads after they have all entered the barrier and just prior to + releasing them all. If a 'timeout' is provided, it is uses as the + default for all subsequent 'wait()' calls. + """ - _Verbose.__init__(self, verbose) self._cond = Condition(Lock()) self._action = action self._timeout = timeout @@ -460,12 +587,13 @@ class Barrier(_Verbose): self._count = 0 def wait(self, timeout=None): - """ - Wait for the barrier. When the specified number of threads have - started waiting, they are all simultaneously awoken. If an 'action' - was provided for the barrier, one of the threads will have executed - that callback prior to returning. + """Wait for the barrier. + + When the specified number of threads have started waiting, they are all + simultaneously awoken. If an 'action' was provided for the barrier, one + of the threads will have executed that callback prior to returning. Returns an individual index number from 0 to 'parties-1'. + """ if timeout is None: timeout = self._timeout @@ -532,10 +660,11 @@ class Barrier(_Verbose): self._cond.notify_all() def reset(self): - """ - Reset the barrier to the initial state. + """Reset the barrier to the initial state. + Any threads currently waiting will get the BrokenBarrier exception raised. + """ with self._cond: if self._count > 0: @@ -551,11 +680,11 @@ class Barrier(_Verbose): self._cond.notify_all() def abort(self): - """ - Place the barrier into a 'broken' state. - Useful in case of error. Any currently waiting threads and - threads attempting to 'wait()' will have BrokenBarrierError - raised. + """Place the barrier into a 'broken' state. + + Useful in case of error. Any currently waiting threads and threads + attempting to 'wait()' will have BrokenBarrierError raised. + """ with self._cond: self._break() @@ -568,16 +697,12 @@ class Barrier(_Verbose): @property def parties(self): - """ - Return the number of threads required to trip the barrier. - """ + """Return the number of threads required to trip the barrier.""" return self._parties @property def n_waiting(self): - """ - Return the number of threads that are currently waiting at the barrier. - """ + """Return the number of threads currently waiting at the barrier.""" # We don't need synchronization here since this is an ephemeral result # anyway. It returns the correct value in the steady state. if self._state == 0: @@ -586,13 +711,12 @@ class Barrier(_Verbose): @property def broken(self): - """ - Return True if the barrier is in a broken state - """ + """Return True if the barrier is in a broken state.""" return self._state == -2 -#exception raised by the Barrier class -class BrokenBarrierError(RuntimeError): pass +# exception raised by the Barrier class +class BrokenBarrierError(RuntimeError): + pass # Helper to generate new thread names @@ -612,7 +736,14 @@ _dangling = WeakSet() # Main class for threads -class Thread(_Verbose): +class Thread: + """A class that represents a thread of control. + + This class can be safely subclassed in a limited fashion. There are two ways + to specify the activity: by passing a callable object to the constructor, or + by overriding the run() method in a subclass. + + """ __initialized = False # Need to store a reference to sys.exc_info for printing @@ -625,16 +756,39 @@ class Thread(_Verbose): #XXX __exc_clear = _sys.exc_clear def __init__(self, group=None, target=None, name=None, - args=(), kwargs=None, verbose=None): + args=(), kwargs=None, *, daemon=None): + """This constructor should always be called with keyword arguments. Arguments are: + + *group* should be None; reserved for future extension when a ThreadGroup + class is implemented. + + *target* is the callable object to be invoked by the run() + method. Defaults to None, meaning nothing is called. + + *name* is the thread name. By default, a unique name is constructed of + the form "Thread-N" where N is a small decimal number. + + *args* is the argument tuple for the target invocation. Defaults to (). + + *kwargs* is a dictionary of keyword arguments for the target + invocation. Defaults to {}. + + If a subclass overrides the constructor, it must make sure to invoke + the base class constructor (Thread.__init__()) before doing anything + else to the thread. + + """ assert group is None, "group argument must be None for now" - _Verbose.__init__(self, verbose) if kwargs is None: kwargs = {} self._target = target self._name = str(name or _newname()) self._args = args self._kwargs = kwargs - self._daemonic = self._set_daemon() + if daemon is not None: + self._daemonic = daemon + else: + self._daemonic = current_thread().daemon self._ident = None self._started = Event() self._stopped = False @@ -652,10 +806,6 @@ class Thread(_Verbose): self._block.__init__() self._started._reset_internal_locks() - def _set_daemon(self): - # Overridden in _MainThread and _DummyThread - return current_thread().daemon - def __repr__(self): assert self._initialized, "Thread.__init__() was not called" status = "initial" @@ -670,13 +820,20 @@ class Thread(_Verbose): return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status) def start(self): + """Start the thread's activity. + + It must be called at most once per thread object. It arranges for the + object's run() method to be invoked in a separate thread of control. + + This method will raise a RuntimeError if called more than once on the + same thread object. + + """ if not self._initialized: raise RuntimeError("thread.__init__() not called") if self._started.is_set(): raise RuntimeError("threads can only be started once") - if __debug__: - self._note("%s.start(): starting thread", self) with _active_limbo_lock: _limbo[self] = self try: @@ -688,6 +845,14 @@ class Thread(_Verbose): self._started.wait() def run(self): + """Method representing the thread's activity. + + You may override this method in a subclass. The standard run() method + invokes the callable object passed to the object's constructor as the + target argument, if any, with sequential and keyword arguments taken + from the args and kwargs arguments, respectively. + + """ try: if self._target: self._target(*self._args, **self._kwargs) @@ -717,7 +882,7 @@ class Thread(_Verbose): raise def _set_ident(self): - self._ident = _get_ident() + self._ident = get_ident() def _bootstrap_inner(self): try: @@ -726,24 +891,17 @@ class Thread(_Verbose): with _active_limbo_lock: _active[self._ident] = self del _limbo[self] - if __debug__: - self._note("%s._bootstrap(): thread started", self) if _trace_hook: - self._note("%s._bootstrap(): registering trace hook", self) _sys.settrace(_trace_hook) if _profile_hook: - self._note("%s._bootstrap(): registering profile hook", self) _sys.setprofile(_profile_hook) try: self.run() except SystemExit: - if __debug__: - self._note("%s._bootstrap(): raised SystemExit", self) + pass except: - if __debug__: - self._note("%s._bootstrap(): unhandled exception", self) # If sys.stderr is no more (most likely from interpreter # shutdown) use self._stderr. Otherwise still use sys (as in # _sys) in case sys.stderr was redefined since the creation of @@ -774,9 +932,6 @@ class Thread(_Verbose): # hog; deleting everything else is just for thoroughness finally: del exc_type, exc_value, exc_tb - else: - if __debug__: - self._note("%s._bootstrap(): normal return", self) finally: # Prevent a race in # test_threading.test_no_refcycle_through_target when @@ -790,7 +945,7 @@ class Thread(_Verbose): try: # We don't call self._delete() because it also # grabs _active_limbo_lock. - del _active[_get_ident()] + del _active[get_ident()] except: pass @@ -826,7 +981,7 @@ class Thread(_Verbose): try: with _active_limbo_lock: - del _active[_get_ident()] + del _active[get_ident()] # There must not be any python code between the previous line # and after the lock is released. Otherwise a tracing function # could try to acquire the lock again in the same thread, (in @@ -836,6 +991,29 @@ class Thread(_Verbose): raise def join(self, timeout=None): + """Wait until the thread terminates. + + This blocks the calling thread until the thread whose join() method is + called terminates -- either normally or through an unhandled exception + or until the optional timeout occurs. + + When the timeout argument is present and not None, it should be a + floating point number specifying a timeout for the operation in seconds + (or fractions thereof). As join() always returns None, you must call + isAlive() after join() to decide whether a timeout happened -- if the + thread is still alive, the join() call timed out. + + When the timeout argument is not present or None, the operation will + block until the thread terminates. + + A thread can be join()ed many times. + + join() raises a RuntimeError if an attempt is made to join the current + thread as that would cause a deadlock. It is also an error to join() a + thread before it has been started and attempts to do so raises the same + exception. + + """ if not self._initialized: raise RuntimeError("Thread.__init__() not called") if not self._started.is_set(): @@ -843,34 +1021,29 @@ class Thread(_Verbose): if self is current_thread(): raise RuntimeError("cannot join current thread") - if __debug__: - if not self._stopped: - self._note("%s.join(): waiting until thread stops", self) - self._block.acquire() try: if timeout is None: while not self._stopped: self._block.wait() - if __debug__: - self._note("%s.join(): thread stopped", self) else: deadline = _time() + timeout while not self._stopped: delay = deadline - _time() if delay <= 0: - if __debug__: - self._note("%s.join(): timed out", self) break self._block.wait(delay) - else: - if __debug__: - self._note("%s.join(): thread stopped", self) finally: self._block.release() @property def name(self): + """A string used for identification purposes only. + + It has no semantics. Multiple threads may be given the same name. The + initial name is set by the constructor. + + """ assert self._initialized, "Thread.__init__() not called" return self._name @@ -881,10 +1054,24 @@ class Thread(_Verbose): @property def ident(self): + """Thread identifier of this thread or None if it has not been started. + + This is a nonzero integer. See the thread.get_ident() function. Thread + identifiers may be recycled when a thread exits and another thread is + created. The identifier is available even after the thread has exited. + + """ assert self._initialized, "Thread.__init__() not called" return self._ident def is_alive(self): + """Return whether the thread is alive. + + This method returns True just before the run() method starts until just + after the run() method terminates. The module function enumerate() + returns a list of all alive threads. + + """ assert self._initialized, "Thread.__init__() not called" return self._started.is_set() and not self._stopped @@ -892,6 +1079,17 @@ class Thread(_Verbose): @property def daemon(self): + """A boolean value indicating whether this thread is a daemon thread. + + This must be set before start() is called, otherwise RuntimeError is + raised. Its initial value is inherited from the creating thread; the + main thread is not a daemon thread and therefore all threads created in + the main thread default to daemon = False. + + The entire Python program exits when no alive non-daemon threads are + left. + + """ assert self._initialized, "Thread.__init__() not called" return self._daemonic @@ -917,27 +1115,25 @@ class Thread(_Verbose): # The timer class was contributed by Itamar Shtull-Trauring -def Timer(*args, **kwargs): - return _Timer(*args, **kwargs) - -class _Timer(Thread): +class Timer(Thread): """Call a function after a specified number of seconds: - t = Timer(30.0, f, args=[], kwargs={}) - t.start() - t.cancel() # stop the timer's action if it's still waiting + t = Timer(30.0, f, args=None, kwargs=None) + t.start() + t.cancel() # stop the timer's action if it's still waiting + """ - def __init__(self, interval, function, args=[], kwargs={}): + def __init__(self, interval, function, args=None, kwargs=None): Thread.__init__(self) self.interval = interval self.function = function - self.args = args - self.kwargs = kwargs + self.args = args if args is not None else [] + self.kwargs = kwargs if kwargs is not None else {} self.finished = Event() def cancel(self): - """Stop the timer if it hasn't finished yet""" + """Stop the timer if it hasn't finished yet.""" self.finished.set() def run(self): @@ -952,26 +1148,18 @@ class _Timer(Thread): class _MainThread(Thread): def __init__(self): - Thread.__init__(self, name="MainThread") + Thread.__init__(self, name="MainThread", daemon=False) self._started.set() self._set_ident() with _active_limbo_lock: _active[self._ident] = self - def _set_daemon(self): - return False - def _exitfunc(self): self._stop() t = _pickSomeNonDaemonThread() - if t: - if __debug__: - self._note("%s: waiting for other threads", self) while t: t.join() t = _pickSomeNonDaemonThread() - if __debug__: - self._note("%s: exiting", self) self._delete() def _pickSomeNonDaemonThread(): @@ -992,7 +1180,7 @@ def _pickSomeNonDaemonThread(): class _DummyThread(Thread): def __init__(self): - Thread.__init__(self, name=_newname("Dummy-%d")) + Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True) # Thread._block consumes an OS-level locking primitive, which # can never be used by a _DummyThread. Since a _DummyThread @@ -1004,9 +1192,6 @@ class _DummyThread(Thread): with _active_limbo_lock: _active[self._ident] = self - def _set_daemon(self): - return True - def _stop(self): pass @@ -1017,15 +1202,26 @@ class _DummyThread(Thread): # Global API functions def current_thread(): + """Return the current Thread object, corresponding to the caller's thread of control. + + If the caller's thread of control was not created through the threading + module, a dummy thread object with limited functionality is returned. + + """ try: - return _active[_get_ident()] + return _active[get_ident()] except KeyError: - ##print "current_thread(): no current thread for", _get_ident() return _DummyThread() currentThread = current_thread def active_count(): + """Return the number of Thread objects currently alive. + + The returned count is equal to the length of the list returned by + enumerate(). + + """ with _active_limbo_lock: return len(_active) + len(_limbo) @@ -1036,6 +1232,13 @@ def _enumerate(): return list(_active.values()) + list(_limbo.values()) def enumerate(): + """Return a list of all Thread objects currently alive. + + The list includes daemonic threads, dummy thread objects created by + current_thread(), and the main thread. It excludes terminated threads and + threads that have not yet been started. + + """ with _active_limbo_lock: return list(_active.values()) + list(_limbo.values()) @@ -1070,14 +1273,14 @@ def _after_fork(): new_active = {} current = current_thread() with _active_limbo_lock: - for thread in _active.values(): + for thread in _enumerate(): # Any lock/condition variable may be currently locked or in an # invalid state, so we reinitialize them. thread._reset_internal_locks() if thread is current: # There is only one active thread. We reset the ident to # its new value since it can have changed. - ident = _get_ident() + ident = get_ident() thread._ident = ident new_active[ident] = thread else: |