summaryrefslogtreecommitdiff
path: root/eventlet/twistedutil/protocol.py
diff options
context:
space:
mode:
Diffstat (limited to 'eventlet/twistedutil/protocol.py')
-rw-r--r--eventlet/twistedutil/protocol.py414
1 files changed, 0 insertions, 414 deletions
diff --git a/eventlet/twistedutil/protocol.py b/eventlet/twistedutil/protocol.py
deleted file mode 100644
index 60d43ad..0000000
--- a/eventlet/twistedutil/protocol.py
+++ /dev/null
@@ -1,414 +0,0 @@
-"""Basic twisted protocols converted to synchronous mode"""
-import sys
-from twisted.internet.protocol import Protocol as twistedProtocol
-from twisted.internet.error import ConnectionDone
-from twisted.internet.protocol import Factory, ClientFactory
-from twisted.internet import main
-from twisted.python import failure
-
-from eventlet import greenthread
-from eventlet import getcurrent
-from eventlet.coros import Queue
-from eventlet.event import Event as BaseEvent
-
-
-class ValueQueue(Queue):
- """Queue that keeps the last item forever in the queue if it's an exception.
- Useful if you send an exception over queue only once, and once sent it must be always
- available.
- """
-
- def send(self, value=None, exc=None):
- if exc is not None or not self.has_error():
- Queue.send(self, value, exc)
-
- def wait(self):
- """The difference from Queue.wait: if there is an only item in the
- Queue and it is an exception, raise it, but keep it in the Queue, so
- that future calls to wait() will raise it again.
- """
- if self.has_error() and len(self.items)==1:
- # the last item, which is an exception, raise without emptying the Queue
- getcurrent().throw(*self.items[0][1])
- else:
- return Queue.wait(self)
-
- def has_error(self):
- return self.items and self.items[-1][1] is not None
-
-
-class Event(BaseEvent):
-
- def send(self, value, exc=None):
- if self.ready():
- self.reset()
- return BaseEvent.send(self, value, exc)
-
- def send_exception(self, *throw_args):
- if self.ready():
- self.reset()
- return BaseEvent.send_exception(self, *throw_args)
-
-class Producer2Event(object):
-
- # implements IPullProducer
-
- def __init__(self, event):
- self.event = event
-
- def resumeProducing(self):
- self.event.send(1)
-
- def stopProducing(self):
- del self.event
-
-
-class GreenTransportBase(object):
-
- transportBufferSize = None
-
- def __init__(self, transportBufferSize=None):
- if transportBufferSize is not None:
- self.transportBufferSize = transportBufferSize
- self._queue = ValueQueue()
- self._write_event = Event()
- self._disconnected_event = Event()
-
- def build_protocol(self):
- return self.protocol_class(self)
-
- def _got_transport(self, transport):
- self._queue.send(transport)
-
- def _got_data(self, data):
- self._queue.send(data)
-
- def _connectionLost(self, reason):
- self._disconnected_event.send(reason.value)
- self._queue.send_exception(reason.value)
- self._write_event.send_exception(reason.value)
-
- def _wait(self):
- if self.disconnecting or self._disconnected_event.ready():
- if self._queue:
- return self._queue.wait()
- else:
- raise self._disconnected_event.wait()
- self.resumeProducing()
- try:
- return self._queue.wait()
- finally:
- self.pauseProducing()
-
- def write(self, data, wait=True):
- if self._disconnected_event.ready():
- raise self._disconnected_event.wait()
- if wait:
- self._write_event.reset()
- self.transport.write(data)
- self._write_event.wait()
- else:
- self.transport.write(data)
-
- def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE), wait=True):
- self.transport.unregisterProducer()
- self.transport.loseConnection(connDone)
- if wait:
- self._disconnected_event.wait()
-
- def __getattr__(self, item):
- if item=='transport':
- raise AttributeError(item)
- if hasattr(self, 'transport'):
- try:
- return getattr(self.transport, item)
- except AttributeError:
- me = type(self).__name__
- trans = type(self.transport).__name__
- raise AttributeError("Neither %r nor %r has attribute %r" % (me, trans, item))
- else:
- raise AttributeError(item)
-
- def resumeProducing(self):
- self.paused -= 1
- if self.paused==0:
- self.transport.resumeProducing()
-
- def pauseProducing(self):
- self.paused += 1
- if self.paused==1:
- self.transport.pauseProducing()
-
- def _init_transport_producer(self):
- self.transport.pauseProducing()
- self.paused = 1
-
- def _init_transport(self):
- transport = self._queue.wait()
- self.transport = transport
- if self.transportBufferSize is not None:
- transport.bufferSize = self.transportBufferSize
- self._init_transport_producer()
- transport.registerProducer(Producer2Event(self._write_event), False)
-
-
-class Protocol(twistedProtocol):
-
- def __init__(self, recepient):
- self._recepient = recepient
-
- def connectionMade(self):
- self._recepient._got_transport(self.transport)
-
- def dataReceived(self, data):
- self._recepient._got_data(data)
-
- def connectionLost(self, reason):
- self._recepient._connectionLost(reason)
-
-
-class UnbufferedTransport(GreenTransportBase):
- """A very simple implementation of a green transport without an additional buffer"""
-
- protocol_class = Protocol
-
- def recv(self):
- """Receive a single chunk of undefined size.
-
- Return '' if connection was closed cleanly, raise the exception if it was closed
- in a non clean fashion. After that all successive calls return ''.
- """
- if self._disconnected_event.ready():
- return ''
- try:
- return self._wait()
- except ConnectionDone:
- return ''
-
- def read(self):
- """Read the data from the socket until the connection is closed cleanly.
-
- If connection was closed in a non-clean fashion, the appropriate exception
- is raised. In that case already received data is lost.
- Next time read() is called it returns ''.
- """
- result = ''
- while True:
- recvd = self.recv()
- if not recvd:
- break
- result += recvd
- return result
-
- # iterator protocol:
-
- def __iter__(self):
- return self
-
- def next(self):
- result = self.recv()
- if not result:
- raise StopIteration
- return result
-
-
-class GreenTransport(GreenTransportBase):
-
- protocol_class = Protocol
- _buffer = ''
- _error = None
-
- def read(self, size=-1):
- """Read size bytes or until EOF"""
- if not self._disconnected_event.ready():
- try:
- while len(self._buffer) < size or size < 0:
- self._buffer += self._wait()
- except ConnectionDone:
- pass
- except:
- if not self._disconnected_event.has_exception():
- raise
- if size>=0:
- result, self._buffer = self._buffer[:size], self._buffer[size:]
- else:
- result, self._buffer = self._buffer, ''
- if not result and self._disconnected_event.has_exception():
- try:
- self._disconnected_event.wait()
- except ConnectionDone:
- pass
- return result
-
- def recv(self, buflen=None):
- """Receive a single chunk of undefined size but no bigger than buflen"""
- if not self._disconnected_event.ready():
- self.resumeProducing()
- try:
- try:
- recvd = self._wait()
- #print 'received %r' % recvd
- self._buffer += recvd
- except ConnectionDone:
- pass
- except:
- if not self._disconnected_event.has_exception():
- raise
- finally:
- self.pauseProducing()
- if buflen is None:
- result, self._buffer = self._buffer, ''
- else:
- result, self._buffer = self._buffer[:buflen], self._buffer[buflen:]
- if not result and self._disconnected_event.has_exception():
- try:
- self._disconnected_event.wait()
- except ConnectionDone:
- pass
- return result
-
- # iterator protocol:
-
- def __iter__(self):
- return self
-
- def next(self):
- res = self.recv()
- if not res:
- raise StopIteration
- return res
-
-
-class GreenInstanceFactory(ClientFactory):
-
- def __init__(self, instance, event):
- self.instance = instance
- self.event = event
-
- def buildProtocol(self, addr):
- return self.instance
-
- def clientConnectionFailed(self, connector, reason):
- self.event.send_exception(reason.type, reason.value, reason.tb)
-
-
-class GreenClientCreator(object):
- """Connect to a remote host and return a connected green transport instance.
- """
-
- gtransport_class = GreenTransport
-
- def __init__(self, reactor=None, gtransport_class=None, *args, **kwargs):
- if reactor is None:
- from twisted.internet import reactor
- self.reactor = reactor
- if gtransport_class is not None:
- self.gtransport_class = gtransport_class
- self.args = args
- self.kwargs = kwargs
-
- def _make_transport_and_factory(self):
- gtransport = self.gtransport_class(*self.args, **self.kwargs)
- protocol = gtransport.build_protocol()
- factory = GreenInstanceFactory(protocol, gtransport._queue)
- return gtransport, factory
-
- def connectTCP(self, host, port, *args, **kwargs):
- gtransport, factory = self._make_transport_and_factory()
- self.reactor.connectTCP(host, port, factory, *args, **kwargs)
- gtransport._init_transport()
- return gtransport
-
- def connectSSL(self, host, port, *args, **kwargs):
- gtransport, factory = self._make_transport_and_factory()
- self.reactor.connectSSL(host, port, factory, *args, **kwargs)
- gtransport._init_transport()
- return gtransport
-
- def connectTLS(self, host, port, *args, **kwargs):
- gtransport, factory = self._make_transport_and_factory()
- self.reactor.connectTLS(host, port, factory, *args, **kwargs)
- gtransport._init_transport()
- return gtransport
-
- def connectUNIX(self, address, *args, **kwargs):
- gtransport, factory = self._make_transport_and_factory()
- self.reactor.connectUNIX(address, factory, *args, **kwargs)
- gtransport._init_transport()
- return gtransport
-
- def connectSRV(self, service, domain, *args, **kwargs):
- SRVConnector = kwargs.pop('ConnectorClass', None)
- if SRVConnector is None:
- from twisted.names.srvconnect import SRVConnector
- gtransport, factory = self._make_transport_and_factory()
- c = SRVConnector(self.reactor, service, domain, factory, *args, **kwargs)
- c.connect()
- gtransport._init_transport()
- return gtransport
-
-
-class SimpleSpawnFactory(Factory):
- """Factory that spawns a new greenlet for each incoming connection.
-
- For an incoming connection a new greenlet is created using the provided
- callback as a function and a connected green transport instance as an
- argument.
- """
-
- gtransport_class = GreenTransport
-
- def __init__(self, handler, gtransport_class=None, *args, **kwargs):
- if callable(handler):
- self.handler = handler
- else:
- self.handler = handler.send
- if hasattr(handler, 'send_exception'):
- self.exc_handler = handler.send_exception
- if gtransport_class is not None:
- self.gtransport_class = gtransport_class
- self.args = args
- self.kwargs = kwargs
-
- def exc_handler(self, *args):
- pass
-
- def buildProtocol(self, addr):
- gtransport = self.gtransport_class(*self.args, **self.kwargs)
- protocol = gtransport.build_protocol()
- protocol.factory = self
- self._do_spawn(gtransport, protocol)
- return protocol
-
- def _do_spawn(self, gtransport, protocol):
- greenthread.spawn(self._run_handler, gtransport, protocol)
-
- def _run_handler(self, gtransport, protocol):
- try:
- gtransport._init_transport()
- except Exception:
- self.exc_handler(*sys.exc_info())
- else:
- self.handler(gtransport)
-
-
-class SpawnFactory(SimpleSpawnFactory):
- """An extension to SimpleSpawnFactory that provides some control over
- the greenlets it has spawned.
- """
-
- def __init__(self, handler, gtransport_class=None, *args, **kwargs):
- self.greenlets = set()
- SimpleSpawnFactory.__init__(self, handler, gtransport_class, *args, **kwargs)
-
- def _do_spawn(self, gtransport, protocol):
- g = greenthread.spawn(self._run_handler, gtransport, protocol)
- self.greenlets.add(g)
- g.link(lambda *_: self.greenlets.remove(g))
-
- def waitall(self):
- results = []
- for g in self.greenlets:
- results.append(g.wait())
- return results
-