From cfa2bc9b7ea860eb4a002eaa7029ecee01e39735 Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Mon, 4 Feb 2019 17:44:39 -0800 Subject: attempt to provide only healthy connections from the pool Adds redis.selector, a module that provides the best selector strategy available on the current platform. A redis.selector polls a socket to provide two pieces of functionality: 1. Check whether data can be read from the socket. Prior versions of redis-py provided this behavior with just select.select(). select() has lots of limitations, most notably a limit of ~1024 file descriptors. Now that better selectors are available, this should make can_read() faster and able to accomodate more clients. See #1115 and #486 2. Check whether a socket is ready for a command to be sent. This doubles as a health check. It ensures that the socket is available for writing, has no data to read and has no known errors. Anytime a socket is disconnected or hung up, data is available to be read, typically zero bytes. ConnectionPool.get_connection has been modified to ensure that connections it returns are connected and are ready for a command to be sent. If get_connection encounters a case where a socket isn't ready for a command the connection is reconnected and checked again. TODO: more tests for this stuff. implement EPoll and KQueue selectors. Fixes #1115 Fixes #486 --- redis/connection.py | 45 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 3 deletions(-) (limited to 'redis/connection.py') diff --git a/redis/connection.py b/redis/connection.py index f631cab..beeba30 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -17,7 +17,7 @@ except ImportError: from redis._compat import (xrange, imap, byte_to_chr, unicode, long, nativestr, basestring, iteritems, LifoQueue, Empty, Full, urlparse, parse_qs, - recv, recv_into, select, unquote) + recv, recv_into, unquote) from redis.exceptions import ( DataError, RedisError, @@ -31,6 +31,7 @@ from redis.exceptions import ( ExecAbortError, ReadOnlyError ) +from redis.selector import DefaultSelector from redis.utils import HIREDIS_AVAILABLE if HIREDIS_AVAILABLE: import hiredis @@ -496,6 +497,7 @@ class Connection(object): raise ConnectionError(self._error_message(e)) self._sock = sock + self._selector = DefaultSelector(sock) try: self.on_connect() except RedisError: @@ -623,8 +625,11 @@ class Connection(object): if not sock: self.connect() sock = self._sock - return self._parser.can_read() or \ - bool(select([sock], [], [], timeout)[0]) + return self._parser.can_read() or self._selector.can_read(timeout) + + def is_ready_for_command(self): + "Check if the connection is ready for a command" + return self._selector.is_ready_for_command() def read_response(self): "Read the response from a previously sent command" @@ -984,6 +989,23 @@ class ConnectionPool(object): except IndexError: connection = self.make_connection() self._in_use_connections.add(connection) + try: + # ensure this connection is connected to Redis + connection.connect() + # connections that the pool provides should be ready to send + # a command. if not, the connection was either returned to the + # pool before all data has been read or the socket has been + # closed. either way, reconnect and verify everything is good. + if not connection.is_ready_for_command(): + connection.disconnect() + connection.connect() + if not connection.is_ready_for_command(): + raise ConnectionError('Connection not ready') + except: # noqa: E722 + # release the connection back to the pool so that we don't leak it + self.release(connection) + raise + return connection def get_encoder(self): @@ -1115,6 +1137,23 @@ class BlockingConnectionPool(ConnectionPool): if connection is None: connection = self.make_connection() + try: + # ensure this connection is connected to Redis + connection.connect() + # connections that the pool provides should be ready to send + # a command. if not, the connection was either returned to the + # pool before all data has been read or the socket has been + # closed. either way, reconnect and verify everything is good. + if not connection.is_ready_for_command(): + connection.disconnect() + connection.connect() + if not connection.is_ready_for_command(): + raise ConnectionError('Connection not ready') + except: # noqa: E722 + # release the connection back to the pool so that we don't leak it + self.release(connection) + raise + return connection def release(self, connection): -- cgit v1.2.1