diff options
Diffstat (limited to 'Lib/asyncio/locks.py')
| -rw-r--r-- | Lib/asyncio/locks.py | 401 | 
1 files changed, 401 insertions, 0 deletions
| diff --git a/Lib/asyncio/locks.py b/Lib/asyncio/locks.py new file mode 100644 index 0000000000..06edbbc1d3 --- /dev/null +++ b/Lib/asyncio/locks.py @@ -0,0 +1,401 @@ +"""Synchronization primitives.""" + +__all__ = ['Lock', 'Event', 'Condition', 'Semaphore'] + +import collections + +from . import events +from . import futures +from . import tasks + + +class Lock: +    """Primitive lock objects. + +    A primitive lock is a synchronization primitive that is not owned +    by a particular coroutine when locked.  A primitive lock is in one +    of two states, 'locked' or 'unlocked'. + +    It is created in the unlocked state.  It has two basic methods, +    acquire() and release().  When the state is unlocked, acquire() +    changes the state to locked and returns immediately.  When the +    state is locked, acquire() blocks until a call to release() in +    another coroutine changes it to unlocked, then the acquire() call +    resets it to locked and returns.  The release() method should only +    be called in the locked state; it changes the state to unlocked +    and returns immediately.  If an attempt is made to release an +    unlocked lock, a RuntimeError will be raised. + +    When more than one coroutine is blocked in acquire() waiting for +    the state to turn to unlocked, only one coroutine proceeds when a +    release() call resets the state to unlocked; first coroutine which +    is blocked in acquire() is being processed. + +    acquire() is a coroutine and should be called with 'yield from'. + +    Locks also support the context manager protocol.  '(yield from lock)' +    should be used as context manager expression. + +    Usage: + +        lock = Lock() +        ... +        yield from lock +        try: +            ... +        finally: +            lock.release() + +    Context manager usage: + +        lock = Lock() +        ... +        with (yield from lock): +             ... + +    Lock objects can be tested for locking state: + +        if not lock.locked(): +           yield from lock +        else: +           # lock is acquired +           ... + +    """ + +    def __init__(self, *, loop=None): +        self._waiters = collections.deque() +        self._locked = False +        if loop is not None: +            self._loop = loop +        else: +            self._loop = events.get_event_loop() + +    def __repr__(self): +        res = super().__repr__() +        extra = 'locked' if self._locked else 'unlocked' +        if self._waiters: +            extra = '{},waiters:{}'.format(extra, len(self._waiters)) +        return '<{} [{}]>'.format(res[1:-1], extra) + +    def locked(self): +        """Return true if lock is acquired.""" +        return self._locked + +    @tasks.coroutine +    def acquire(self): +        """Acquire a lock. + +        This method blocks until the lock is unlocked, then sets it to +        locked and returns True. +        """ +        if not self._waiters and not self._locked: +            self._locked = True +            return True + +        fut = futures.Future(loop=self._loop) +        self._waiters.append(fut) +        try: +            yield from fut +            self._locked = True +            return True +        finally: +            self._waiters.remove(fut) + +    def release(self): +        """Release a lock. + +        When the lock is locked, reset it to unlocked, and return. +        If any other coroutines are blocked waiting for the lock to become +        unlocked, allow exactly one of them to proceed. + +        When invoked on an unlocked lock, a RuntimeError is raised. + +        There is no return value. +        """ +        if self._locked: +            self._locked = False +            # Wake up the first waiter who isn't cancelled. +            for fut in self._waiters: +                if not fut.done(): +                    fut.set_result(True) +                    break +        else: +            raise RuntimeError('Lock is not acquired.') + +    def __enter__(self): +        if not self._locked: +            raise RuntimeError( +                '"yield from" should be used as context manager expression') +        return True + +    def __exit__(self, *args): +        self.release() + +    def __iter__(self): +        yield from self.acquire() +        return self + + +class Event: +    """An Event implementation, our equivalent to threading.Event. + +    Class implementing event objects. An event manages a flag that can be set +    to true with the set() method and reset to false with the clear() method. +    The wait() method blocks until the flag is true. The flag is initially +    false. +    """ + +    def __init__(self, *, loop=None): +        self._waiters = collections.deque() +        self._value = False +        if loop is not None: +            self._loop = loop +        else: +            self._loop = events.get_event_loop() + +    def __repr__(self): +        # TODO: add waiters:N if > 0. +        res = super().__repr__() +        return '<{} [{}]>'.format(res[1:-1], 'set' if self._value else 'unset') + +    def is_set(self): +        """Return true if and only if the internal flag is true.""" +        return self._value + +    def set(self): +        """Set the internal flag to true. All coroutines waiting for it to +        become true are awakened. Coroutine that call wait() once the flag is +        true will not block at all. +        """ +        if not self._value: +            self._value = True + +            for fut in self._waiters: +                if not fut.done(): +                    fut.set_result(True) + +    def clear(self): +        """Reset the internal flag to false. Subsequently, coroutines calling +        wait() will block until set() is called to set the internal flag +        to true again.""" +        self._value = False + +    @tasks.coroutine +    def wait(self): +        """Block until the internal flag is true. + +        If the internal flag is true on entry, return True +        immediately.  Otherwise, block until another coroutine calls +        set() to set the flag to true, then return True. +        """ +        if self._value: +            return True + +        fut = futures.Future(loop=self._loop) +        self._waiters.append(fut) +        try: +            yield from fut +            return True +        finally: +            self._waiters.remove(fut) + + +# TODO: Why is this a Lock subclass?  threading.Condition *has* a lock. +class Condition(Lock): +    """A Condition implementation. + +    This class implements condition variable objects. A condition variable +    allows one or more coroutines to wait until they are notified by another +    coroutine. +    """ + +    def __init__(self, *, loop=None): +        super().__init__(loop=loop) +        self._condition_waiters = collections.deque() + +    # TODO: Add __repr__() with len(_condition_waiters). + +    @tasks.coroutine +    def wait(self): +        """Wait until notified. + +        If the calling coroutine has not acquired the lock when this +        method is called, a RuntimeError is raised. + +        This method releases the underlying lock, and then blocks +        until it is awakened by a notify() or notify_all() call for +        the same condition variable in another coroutine.  Once +        awakened, it re-acquires the lock and returns True. +        """ +        if not self._locked: +            raise RuntimeError('cannot wait on un-acquired lock') + +        keep_lock = True +        self.release() +        try: +            fut = futures.Future(loop=self._loop) +            self._condition_waiters.append(fut) +            try: +                yield from fut +                return True +            finally: +                self._condition_waiters.remove(fut) + +        except GeneratorExit: +            keep_lock = False  # Prevent yield in finally clause. +            raise +        finally: +            if keep_lock: +                yield from self.acquire() + +    @tasks.coroutine +    def wait_for(self, predicate): +        """Wait until a predicate becomes true. + +        The predicate should be a callable which result will be +        interpreted as a boolean value.  The final predicate value is +        the return value. +        """ +        result = predicate() +        while not result: +            yield from self.wait() +            result = predicate() +        return result + +    def notify(self, n=1): +        """By default, wake up one coroutine waiting on this condition, if any. +        If the calling coroutine has not acquired the lock when this method +        is called, a RuntimeError is raised. + +        This method wakes up at most n of the coroutines waiting for the +        condition variable; it is a no-op if no coroutines are waiting. + +        Note: an awakened coroutine does not actually return from its +        wait() call until it can reacquire the lock. Since notify() does +        not release the lock, its caller should. +        """ +        if not self._locked: +            raise RuntimeError('cannot notify on un-acquired lock') + +        idx = 0 +        for fut in self._condition_waiters: +            if idx >= n: +                break + +            if not fut.done(): +                idx += 1 +                fut.set_result(False) + +    def notify_all(self): +        """Wake up all threads waiting on this condition. This method acts +        like notify(), but wakes up all waiting threads instead of one. If the +        calling thread has not acquired the lock when this method is called, +        a RuntimeError is raised. +        """ +        self.notify(len(self._condition_waiters)) + + +class Semaphore: +    """A Semaphore implementation. + +    A semaphore manages an internal counter which is decremented by each +    acquire() call and incremented by each release() call. The counter +    can never go below zero; when acquire() finds that it is zero, it blocks, +    waiting until some other thread calls release(). + +    Semaphores also support the context manager protocol. + +    The first optional argument gives the initial value for the internal +    counter; it defaults to 1. If the value given is less than 0, +    ValueError is raised. + +    The second optional argument determins can semophore be released more than +    initial internal counter value; it defaults to False. If the value given +    is True and number of release() is more than number of successfull +    acquire() calls ValueError is raised. +    """ + +    def __init__(self, value=1, bound=False, *, loop=None): +        if value < 0: +            raise ValueError("Semaphore initial value must be > 0") +        self._value = value +        self._bound = bound +        self._bound_value = value +        self._waiters = collections.deque() +        self._locked = False +        if loop is not None: +            self._loop = loop +        else: +            self._loop = events.get_event_loop() + +    def __repr__(self): +        # TODO: add waiters:N if > 0. +        res = super().__repr__() +        return '<{} [{}]>'.format( +            res[1:-1], +            'locked' if self._locked else 'unlocked,value:{}'.format( +                self._value)) + +    def locked(self): +        """Returns True if semaphore can not be acquired immediately.""" +        return self._locked + +    @tasks.coroutine +    def acquire(self): +        """Acquire a semaphore. + +        If the internal counter is larger than zero on entry, +        decrement it by one and return True immediately.  If it is +        zero on entry, block, waiting until some other coroutine has +        called release() to make it larger than 0, and then return +        True. +        """ +        if not self._waiters and self._value > 0: +            self._value -= 1 +            if self._value == 0: +                self._locked = True +            return True + +        fut = futures.Future(loop=self._loop) +        self._waiters.append(fut) +        try: +            yield from fut +            self._value -= 1 +            if self._value == 0: +                self._locked = True +            return True +        finally: +            self._waiters.remove(fut) + +    def release(self): +        """Release a semaphore, incrementing the internal counter by one. +        When it was zero on entry and another coroutine is waiting for it to +        become larger than zero again, wake up that coroutine. + +        If Semaphore is create with "bound" paramter equals true, then +        release() method checks to make sure its current value doesn't exceed +        its initial value. If it does, ValueError is raised. +        """ +        if self._bound and self._value >= self._bound_value: +            raise ValueError('Semaphore released too many times') + +        self._value += 1 +        self._locked = False + +        for waiter in self._waiters: +            if not waiter.done(): +                waiter.set_result(True) +                break + +    def __enter__(self): +        # TODO: This is questionable.  How do we know the user actually +        # wrote "with (yield from sema)" instead of "with sema"? +        return True + +    def __exit__(self, *args): +        self.release() + +    def __iter__(self): +        yield from self.acquire() +        return self | 
