summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergey Shepelev <temotor@gmail.com>2014-04-24 17:42:30 +0400
committerSergey Shepelev <temotor@gmail.com>2014-07-18 02:10:43 +0400
commitf37a87b1f8e2f09b8a30c2e947486cf0858650cd (patch)
tree2c21417a40fcbc65d2ee2ed72f50642a0e40ddc0
parenta651a3097f7ba957ad29342dac011d5a9218e76b (diff)
downloadeventlet-f37a87b1f8e2f09b8a30c2e947486cf0858650cd.tar.gz
greenio, tpool: python3 compatibility
Also: - PEP-8 - check both EAGAIN/EWOULDBLOCK - use system implementation of GreenPipe.readinto()
-rw-r--r--eventlet/greenio.py64
-rw-r--r--eventlet/tpool.py86
-rw-r--r--tests/tpool_test.py2
3 files changed, 70 insertions, 82 deletions
diff --git a/eventlet/greenio.py b/eventlet/greenio.py
index 44a4e00..12f5f06 100644
--- a/eventlet/greenio.py
+++ b/eventlet/greenio.py
@@ -1,4 +1,3 @@
-import array
import errno
import os
from socket import socket as _original_socket
@@ -60,12 +59,12 @@ def socket_accept(descriptor):
if sys.platform[:3] == "win":
# winsock sometimes throws ENOTCONN
- SOCKET_BLOCKING = set((errno.EWOULDBLOCK,))
+ SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK,))
SOCKET_CLOSED = set((errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN))
else:
# oddly, on linux/darwin, an unconnected socket is expected to block,
# so we treat ENOTCONN the same as EWOULDBLOCK
- SOCKET_BLOCKING = set((errno.EWOULDBLOCK, errno.ENOTCONN))
+ SOCKET_BLOCKING = set((errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOTCONN))
SOCKET_CLOSED = set((errno.ECONNRESET, errno.ESHUTDOWN, errno.EPIPE))
@@ -351,7 +350,11 @@ class GreenSocket(object):
class _SocketDuckForFd(object):
- """ Class implementing all socket method used by _fileobject in cooperative manner using low level os I/O calls."""
+ """Class implementing all socket method used by _fileobject
+ in cooperative manner using low level os I/O calls.
+ """
+ _refcount = 0
+
def __init__(self, fileno):
self._fileno = fileno
@@ -368,10 +371,26 @@ class _SocketDuckForFd(object):
data = os.read(self._fileno, buflen)
return data
except OSError as e:
- if get_errno(e) != errno.EAGAIN:
+ if get_errno(e) not in SOCKET_BLOCKING:
raise IOError(*e.args)
trampoline(self, read=True)
+ def recv_into(self, buf, nbytes=0, flags=0):
+ if nbytes == 0:
+ nbytes = len(buf)
+ data = self.recv(nbytes)
+ buf[:nbytes] = data
+ return len(data)
+
+ def send(self, data):
+ while True:
+ try:
+ os.write(self._fileno, data)
+ except OSError as e:
+ if get_errno(e) not in SOCKET_BLOCKING:
+ raise IOError(*e.args)
+ trampoline(self, write=True)
+
def sendall(self, data):
len_data = len(data)
os_write = os.write
@@ -397,22 +416,22 @@ class _SocketDuckForFd(object):
try:
os.close(self._fileno)
except:
- # os.close may fail if __init__ didn't complete (i.e file dscriptor passed to popen was invalid
+ # os.close may fail if __init__ didn't complete
+ # (i.e file dscriptor passed to popen was invalid
pass
def __repr__(self):
return "%s:%d" % (self.__class__.__name__, self._fileno)
- if "__pypy__" in sys.builtin_module_names:
- _refcount = 0
-
- def _reuse(self):
- self._refcount += 1
+ def _reuse(self):
+ self._refcount += 1
- def _drop(self):
- self._refcount -= 1
- if self._refcount == 0:
- self._close()
+ def _drop(self):
+ self._refcount -= 1
+ if self._refcount == 0:
+ self._close()
+ # Python3
+ _decref_socketios = _drop
def _operationOnClosedFile(*args, **kwargs):
@@ -449,7 +468,7 @@ class GreenPipe(_fileobject):
self._name = f.name
f.close()
- super(GreenPipe, self).__init__(_SocketDuckForFd(fileno), mode, bufsize)
+ super(GreenPipe, self).__init__(_SocketDuckForFd(fileno), mode)
set_nonblocking(self)
self.softspace = 0
@@ -470,7 +489,7 @@ class GreenPipe(_fileobject):
for method in [
'fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
'readline', 'readlines', 'seek', 'tell', 'truncate',
- 'write', 'xreadlines', '__iter__', 'writelines']:
+ 'write', 'xreadlines', '__iter__', '__next__', 'writelines']:
setattr(self, method, _operationOnClosedFile)
def __enter__(self):
@@ -479,17 +498,6 @@ class GreenPipe(_fileobject):
def __exit__(self, *args):
self.close()
- def readinto(self, buf):
- data = self.read(len(buf)) # FIXME could it be done without allocating intermediate?
- n = len(data)
- try:
- buf[:n] = data
- except TypeError as err:
- if not isinstance(buf, array.array):
- raise err
- buf[:n] = array.array('c', data)
- return n
-
def _get_readahead_len(self):
return len(self._rbuf.getvalue())
diff --git a/eventlet/tpool.py b/eventlet/tpool.py
index e4e3a7b..82aae4b 100644
--- a/eventlet/tpool.py
+++ b/eventlet/tpool.py
@@ -21,39 +21,38 @@ import traceback
from eventlet import event, greenio, greenthread, patcher, timeout
from eventlet.support import six
+__all__ = ['execute', 'Proxy', 'killall', 'set_num_threads']
+
+EXC_CLASSES = (Exception, timeout.Timeout)
+SYS_EXCS = (GeneratorExit, KeyboardInterrupt, SystemExit)
+
+QUIET = True
+
+socket = patcher.original('socket')
threading = patcher.original('threading')
if six.PY2:
Queue_module = patcher.original('Queue')
if six.PY3:
Queue_module = patcher.original('queue')
-Queue = Queue_module.Queue
Empty = Queue_module.Empty
-
-__all__ = ['execute', 'Proxy', 'killall']
-
-QUIET = True
-
-_rfile = _wfile = None
+Queue = Queue_module.Queue
_bytetosend = ' '.encode()
-
-
-def _signal_t2e():
- _wfile.write(_bytetosend)
- _wfile.flush()
-
-
-_reqq = None
-_rspq = None
+_coro = None
+_nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20))
+_reqq = _rspq = None
+_rsock = _wsock = None
+_setup_already = False
+_threads = []
def tpool_trampoline():
global _rspq
while True:
try:
- _c = _rfile.read(1)
+ _c = _rsock.recv(1)
assert _c
except ValueError:
break # will be raised when pipe is closed
@@ -66,13 +65,9 @@ def tpool_trampoline():
pass
-SYS_EXCS = (KeyboardInterrupt, SystemExit)
-EXC_CLASSES = (Exception, timeout.Timeout)
-
-
def tworker():
global _rspq
- while(True):
+ while True:
try:
msg = _reqq.get()
except AttributeError:
@@ -91,7 +86,7 @@ def tworker():
# exc_info does not lead to memory leaks
_rspq.put((e, rv))
msg = meth = args = kwargs = e = rv = None
- _signal_t2e()
+ _wsock.sendall(_bytetosend)
def execute(meth, *args, **kwargs):
@@ -248,40 +243,25 @@ class Proxy(object):
return Proxy(it)
def next(self):
- return proxy_call(self._autowrap, self._obj.next)
+ return proxy_call(self._autowrap, next, self._obj)
# Python3
__next__ = next
-_nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20))
-_threads = []
-_coro = None
-_setup_already = False
-
-
def setup():
- global _rfile, _wfile, _threads, _coro, _setup_already, _rspq, _reqq
+ global _rsock, _wsock, _threads, _coro, _setup_already, _rspq, _reqq
if _setup_already:
return
else:
_setup_already = True
- try:
- _rpipe, _wpipe = os.pipe()
- _wfile = greenio.GreenPipe(_wpipe, 'wb', 0)
- _rfile = greenio.GreenPipe(_rpipe, 'rb', 0)
- except (ImportError, NotImplementedError):
- # This is Windows compatibility -- use a socket instead of a pipe because
- # pipes don't really exist on Windows.
- import socket
- from eventlet import util
- sock = util.__original_socket__(socket.AF_INET, socket.SOCK_STREAM)
- sock.bind(('localhost', 0))
- sock.listen(50)
- csock = util.__original_socket__(socket.AF_INET, socket.SOCK_STREAM)
- csock.connect(('localhost', sock.getsockname()[1]))
- nsock, addr = sock.accept()
- _rfile = greenio.GreenSocket(csock).makefile('rb', 0)
- _wfile = nsock.makefile('wb', 0)
+
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.bind(('', 0))
+ sock.listen(1)
+ csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ csock.connect(sock.getsockname())
+ _wsock, _addr = sock.accept()
+ _rsock = greenio.GreenSocket(csock)
_reqq = Queue(maxsize=-1)
_rspq = Queue(maxsize=-1)
@@ -302,7 +282,7 @@ def setup():
def killall():
- global _setup_already, _rspq, _rfile, _wfile
+ global _setup_already, _rspq, _rsock, _wsock
if not _setup_already:
return
for thr in _threads:
@@ -312,10 +292,10 @@ def killall():
del _threads[:]
if _coro is not None:
greenthread.kill(_coro)
- _rfile.close()
- _wfile.close()
- _rfile = None
- _wfile = None
+ _rsock.close()
+ _wsock.close()
+ _rsock = None
+ _wsock = None
_rspq = None
_setup_already = False
diff --git a/tests/tpool_test.py b/tests/tpool_test.py
index 35b6056..6ad4355 100644
--- a/tests/tpool_test.py
+++ b/tests/tpool_test.py
@@ -80,7 +80,7 @@ class TestTpool(LimitedTestCase):
def test_wrap_dict(self):
my_object = {'a': 1}
prox = tpool.Proxy(my_object)
- self.assertEqual('a', prox.keys()[0])
+ self.assertEqual('a', list(prox.keys())[0])
self.assertEqual(1, prox['a'])
self.assertEqual(str(my_object), str(prox))
self.assertEqual(repr(my_object), repr(prox))