diff options
| author | Alexandre Vassalotti <alexandre@peadrop.com> | 2008-05-11 19:59:59 +0000 | 
|---|---|---|
| committer | Alexandre Vassalotti <alexandre@peadrop.com> | 2008-05-11 19:59:59 +0000 | 
| commit | f260e443ac07bc794fc9409096679b94528868e2 (patch) | |
| tree | 244c7542db14c43a2fe52ce5c8b6373d17a19e7f /Lib/Queue.py | |
| parent | 4e3e21e2cf896295d568828f2d20dc5a25feff54 (diff) | |
| download | cpython-git-f260e443ac07bc794fc9409096679b94528868e2.tar.gz | |
Rename Queue module to queue.
Updated documentation to use new name.
Merged revisions 63077 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk
........
  r63077 | alexandre.vassalotti | 2008-05-11 15:39:48 -0400 (Sun, 11 May 2008) | 3 lines
  Added stub for the Queue module to be renamed in 3.0.
  Use the 3.0 module name to avoid spurious warnings.
........
Diffstat (limited to 'Lib/Queue.py')
| -rw-r--r-- | Lib/Queue.py | 230 | 
1 files changed, 0 insertions, 230 deletions
| diff --git a/Lib/Queue.py b/Lib/Queue.py deleted file mode 100644 index 4d89d97c45..0000000000 --- a/Lib/Queue.py +++ /dev/null @@ -1,230 +0,0 @@ -"""A multi-producer, multi-consumer queue.""" - -from time import time as _time -from collections import deque -import heapq - -__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] - -class Empty(Exception): -    "Exception raised by Queue.get(block=0)/get_nowait()." -    pass - -class Full(Exception): -    "Exception raised by Queue.put(block=0)/put_nowait()." -    pass - -class Queue: -    """Create a queue object with a given maximum size. - -    If maxsize is <= 0, the queue size is infinite. -    """ -    def __init__(self, maxsize=0): -        try: -            import threading -        except ImportError: -            import dummy_threading as threading -        self.maxsize = maxsize -        self._init(maxsize) -        # mutex must be held whenever the queue is mutating.  All methods -        # that acquire mutex must release it before returning.  mutex -        # is shared between the three conditions, so acquiring and -        # releasing the conditions also acquires and releases mutex. -        self.mutex = threading.Lock() -        # Notify not_empty whenever an item is added to the queue; a -        # thread waiting to get is notified then. -        self.not_empty = threading.Condition(self.mutex) -        # Notify not_full whenever an item is removed from the queue; -        # a thread waiting to put is notified then. -        self.not_full = threading.Condition(self.mutex) -        # Notify all_tasks_done whenever the number of unfinished tasks -        # drops to zero; thread waiting to join() is notified to resume -        self.all_tasks_done = threading.Condition(self.mutex) -        self.unfinished_tasks = 0 - -    def task_done(self): -        """Indicate that a formerly enqueued task is complete. - -        Used by Queue consumer threads.  For each get() used to fetch a task, -        a subsequent call to task_done() tells the queue that the processing -        on the task is complete. - -        If a join() is currently blocking, it will resume when all items -        have been processed (meaning that a task_done() call was received -        for every item that had been put() into the queue). - -        Raises a ValueError if called more times than there were items -        placed in the queue. -        """ -        self.all_tasks_done.acquire() -        try: -            unfinished = self.unfinished_tasks - 1 -            if unfinished <= 0: -                if unfinished < 0: -                    raise ValueError('task_done() called too many times') -                self.all_tasks_done.notifyAll() -            self.unfinished_tasks = unfinished -        finally: -            self.all_tasks_done.release() - -    def join(self): -        """Blocks until all items in the Queue have been gotten and processed. - -        The count of unfinished tasks goes up whenever an item is added to the -        queue. The count goes down whenever a consumer thread calls task_done() -        to indicate the item was retrieved and all work on it is complete. - -        When the count of unfinished tasks drops to zero, join() unblocks. -        """ -        self.all_tasks_done.acquire() -        try: -            while self.unfinished_tasks: -                self.all_tasks_done.wait() -        finally: -            self.all_tasks_done.release() - -    def qsize(self): -        """Return the approximate size of the queue (not reliable!).""" -        self.mutex.acquire() -        n = self._qsize() -        self.mutex.release() -        return n - -    def put(self, item, block=True, timeout=None): -        """Put an item into the queue. - -        If optional args 'block' is true and 'timeout' is None (the default), -        block if necessary until a free slot is available. If 'timeout' is -        a positive number, it blocks at most 'timeout' seconds and raises -        the Full exception if no free slot was available within that time. -        Otherwise ('block' is false), put an item on the queue if a free slot -        is immediately available, else raise the Full exception ('timeout' -        is ignored in that case). -        """ -        self.not_full.acquire() -        try: -            if self.maxsize > 0: -                if not block: -                    if self._qsize() == self.maxsize: -                        raise Full -                elif timeout is None: -                    while self._qsize() == self.maxsize: -                        self.not_full.wait() -                elif timeout < 0: -                    raise ValueError("'timeout' must be a positive number") -                else: -                    endtime = _time() + timeout -                    while self._qsize() == self.maxsize: -                        remaining = endtime - _time() -                        if remaining <= 0.0: -                            raise Full -                        self.not_full.wait(remaining) -            self._put(item) -            self.unfinished_tasks += 1 -            self.not_empty.notify() -        finally: -            self.not_full.release() - -    def put_nowait(self, item): -        """Put an item into the queue without blocking. - -        Only enqueue the item if a free slot is immediately available. -        Otherwise raise the Full exception. -        """ -        return self.put(item, False) - -    def get(self, block=True, timeout=None): -        """Remove and return an item from the queue. - -        If optional args 'block' is true and 'timeout' is None (the default), -        block if necessary until an item is available. If 'timeout' is -        a positive number, it blocks at most 'timeout' seconds and raises -        the Empty exception if no item was available within that time. -        Otherwise ('block' is false), return an item if one is immediately -        available, else raise the Empty exception ('timeout' is ignored -        in that case). -        """ -        self.not_empty.acquire() -        try: -            if not block: -                if not self._qsize(): -                    raise Empty -            elif timeout is None: -                while not self._qsize(): -                    self.not_empty.wait() -            elif timeout < 0: -                raise ValueError("'timeout' must be a positive number") -            else: -                endtime = _time() + timeout -                while not self._qsize(): -                    remaining = endtime - _time() -                    if remaining <= 0.0: -                        raise Empty -                    self.not_empty.wait(remaining) -            item = self._get() -            self.not_full.notify() -            return item -        finally: -            self.not_empty.release() - -    def get_nowait(self): -        """Remove and return an item from the queue without blocking. - -        Only get an item if one is immediately available. Otherwise -        raise the Empty exception. -        """ -        return self.get(False) - -    # Override these methods to implement other queue organizations -    # (e.g. stack or priority queue). -    # These will only be called with appropriate locks held - -    # Initialize the queue representation -    def _init(self, maxsize): -        self.queue = deque() - -    def _qsize(self, len=len): -        return len(self.queue) - -    # Put a new item in the queue -    def _put(self, item): -        self.queue.append(item) - -    # Get an item from the queue -    def _get(self): -        return self.queue.popleft() - - -class PriorityQueue(Queue): -    '''Variant of Queue that retrieves open entries in priority order (lowest first). - -    Entries are typically tuples of the form:  (priority number, data). -    ''' - -    def _init(self, maxsize): -        self.queue = [] - -    def _qsize(self, len=len): -        return len(self.queue) - -    def _put(self, item, heappush=heapq.heappush): -        heappush(self.queue, item) - -    def _get(self, heappop=heapq.heappop): -        return heappop(self.queue) - - -class LifoQueue(Queue): -    '''Variant of Queue that retrieves most recently added entries first.''' - -    def _init(self, maxsize): -        self.queue = [] - -    def _qsize(self, len=len): -        return len(self.queue) - -    def _put(self, item): -        self.queue.append(item) - -    def _get(self): -        return self.queue.pop() | 
