"""Contains a queue based channel implementation""" from Queue import ( Empty, Full ) from util import ( AsyncQueue, DummyLock ) from time import time import sys #{ Classes class Channel(object): """A channel is similar to a file like object. It has a write end as well as one or more read ends. If Data is in the channel, it can be read, if not the read operation will block until data becomes available. If the channel is closed, any read operation will result in an exception This base class is not instantiated directly, but instead serves as constructor for RWChannel pairs. Create a new channel """ __slots__ = tuple() def __new__(cls, *args): if cls is Channel: if len(args) > 0: raise ValueError("Cannot take any arguments when creating a new channel") wc = WChannel() rc = RChannel(wc) return wc, rc # END constructor mode return object.__new__(cls) class WChannel(Channel): """The write end of a channel""" __slots__ = ('_closed', '_queue') def __init__(self): """initialize this instance, able to hold max_items at once Write calls will block if the channel is full, until someone reads from it""" self._closed = False self._queue = AsyncQueue() #{ Interface def write(self, item, block=True, timeout=None): """Send an item into the channel, it can be read from the read end of the channel accordingly :param item: Item to send :param block: If True, the call will block until there is free space in the channel :param timeout: timeout in seconds for blocking calls. :raise IOError: when writing into closed file :raise EOFError: when writing into a non-blocking full channel :note: may block if the channel has a limited capacity""" if self._closed: raise IOError("Cannot write to a closed channel") try: self._queue.put(item, block, timeout) except Full: raise EOFError("Capacity of the channel was exeeded") # END exception handling def size(self): """:return: approximate number of items that could be read from the read-ends of this channel""" return self._queue.qsize() def close(self): """Close the channel. Multiple close calls on a closed channel are no an error""" mutex = self._queue.mutex mutex.acquire() # this is atomic already, due to the GIL - no need to get the queue's mutex print "channel.close()" self._closed = True # now make sure that the people waiting for an item are released now # As we it could be that some readers are already on their way to initiate # a blocking get, we must make sure that locks never block before that happens # now we are the only one accessing the queue, so change it self._queue.mutex = DummyLock() print self._queue.not_empty._waiters self._queue.not_empty.notify_all() print self._queue.not_empty._waiters mutex.release() @property def closed(self): """:return: True if the channel was closed""" return self._closed #} END interface class RChannel(Channel): """The read-end of a corresponding write channel""" __slots__ = '_wc' def __init__(self, wchannel): """Initialize this instance from its parent write channel""" self._wc = wchannel #{ Interface def read(self, count=0, block=True, timeout=None): """read a list of items read from the channel. The list, as a sequence of items, is similar to the string of characters returned when reading from file like objects. :param count: given amount of items to read. If < 1, all items will be read :param block: if True, the call will block until an item is available :param timeout: if positive and block is True, it will block only for the given amount of seconds. :return: single item in a list if count is 1, or a list of count items. If the channel was empty and count was 1, an empty list will be returned. If count was greater 1, a list with less than count items will be returned. If count was < 1, a list with all items that could be read will be returned.""" # if the channel is closed for writing, we never block if self._wc.closed or timeout == 0: block = False # in non-blocking mode, its all not a problem out = list() queue = self._wc._queue if not block: # be as fast as possible in non-blocking mode, hence # its a bit 'unrolled' try: if count == 1: out.append(queue.get(False)) elif count < 1: while True: out.append(queue.get(False)) # END for each item else: for i in xrange(count): out.append(queue.get(False)) # END for each item # END handle count except Empty: pass # END handle exceptions else: # to get everything into one loop, we set the count accordingly if count == 0: count = sys.maxint # END handle count endtime = sys.maxint # allows timeout for whole operation if timeout is not None: endtime = time() + timeout # could be improved by a separate: no-endtime branch, saving the time calls for i in xrange(count): try: print "about to read", i, count, block, timeout out.append(queue.get(block, timeout)) print "got one" except Empty: pass # END ignore empty # if we have been unblocked because the closed state changed # in the meanwhile, stop trying # NOTE: must NOT cache _wc if self._wc.closed: # its racing time - all threads waiting for the queue # are awake now, and we actually can't be sure its empty # Hence we pop it empty without blocking, getting as much # as we can. This effectively lets us race ( with mutexes ) # of the other threads. print "stopped because it was closed" try: while True: out.append(queue.get(False)) # END pop it empty except Empty: pass # END ignore emptyness, we have all break # END handle cloased if time() >= endtime: break # END stop on timeout # END for each item # END handle blocking return out #} END interface #} END classes