diff options
author | Andy McCurdy <andy@andymccurdy.com> | 2019-02-04 17:44:39 -0800 |
---|---|---|
committer | Andy McCurdy <andy@andymccurdy.com> | 2019-02-04 17:45:27 -0800 |
commit | cfa2bc9b7ea860eb4a002eaa7029ecee01e39735 (patch) | |
tree | 9dc5d02bafeadb3278c6e58143454ffc27a7d3c8 /redis/connection.py | |
parent | a4644592162afdfe5b8809fa28eff041f7be6993 (diff) | |
download | redis-py-cfa2bc9b7ea860eb4a002eaa7029ecee01e39735.tar.gz |
attempt to provide only healthy connections from the pool
Adds redis.selector, a module that provides the best selector strategy
available on the current platform. A redis.selector polls a socket to
provide two pieces of functionality:
1. Check whether data can be read from the socket. Prior versions of redis-py
provided this behavior with just select.select(). select() has lots of
limitations, most notably a limit of ~1024 file descriptors. Now that
better selectors are available, this should make can_read() faster and
able to accomodate more clients. See #1115 and #486
2. Check whether a socket is ready for a command to be sent. This doubles
as a health check. It ensures that the socket is available for writing,
has no data to read and has no known errors. Anytime a socket is
disconnected or hung up, data is available to be read, typically zero bytes.
ConnectionPool.get_connection has been modified to ensure that connections
it returns are connected and are ready for a command to be sent. If
get_connection encounters a case where a socket isn't ready for a command
the connection is reconnected and checked again.
TODO: more tests for this stuff. implement EPoll and KQueue selectors.
Fixes #1115
Fixes #486
Diffstat (limited to 'redis/connection.py')
-rwxr-xr-x | redis/connection.py | 45 |
1 files changed, 42 insertions, 3 deletions
diff --git a/redis/connection.py b/redis/connection.py index f631cab..beeba30 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -17,7 +17,7 @@ except ImportError: from redis._compat import (xrange, imap, byte_to_chr, unicode, long, nativestr, basestring, iteritems, LifoQueue, Empty, Full, urlparse, parse_qs, - recv, recv_into, select, unquote) + recv, recv_into, unquote) from redis.exceptions import ( DataError, RedisError, @@ -31,6 +31,7 @@ from redis.exceptions import ( ExecAbortError, ReadOnlyError ) +from redis.selector import DefaultSelector from redis.utils import HIREDIS_AVAILABLE if HIREDIS_AVAILABLE: import hiredis @@ -496,6 +497,7 @@ class Connection(object): raise ConnectionError(self._error_message(e)) self._sock = sock + self._selector = DefaultSelector(sock) try: self.on_connect() except RedisError: @@ -623,8 +625,11 @@ class Connection(object): if not sock: self.connect() sock = self._sock - return self._parser.can_read() or \ - bool(select([sock], [], [], timeout)[0]) + return self._parser.can_read() or self._selector.can_read(timeout) + + def is_ready_for_command(self): + "Check if the connection is ready for a command" + return self._selector.is_ready_for_command() def read_response(self): "Read the response from a previously sent command" @@ -984,6 +989,23 @@ class ConnectionPool(object): except IndexError: connection = self.make_connection() self._in_use_connections.add(connection) + try: + # ensure this connection is connected to Redis + connection.connect() + # connections that the pool provides should be ready to send + # a command. if not, the connection was either returned to the + # pool before all data has been read or the socket has been + # closed. either way, reconnect and verify everything is good. + if not connection.is_ready_for_command(): + connection.disconnect() + connection.connect() + if not connection.is_ready_for_command(): + raise ConnectionError('Connection not ready') + except: # noqa: E722 + # release the connection back to the pool so that we don't leak it + self.release(connection) + raise + return connection def get_encoder(self): @@ -1115,6 +1137,23 @@ class BlockingConnectionPool(ConnectionPool): if connection is None: connection = self.make_connection() + try: + # ensure this connection is connected to Redis + connection.connect() + # connections that the pool provides should be ready to send + # a command. if not, the connection was either returned to the + # pool before all data has been read or the socket has been + # closed. either way, reconnect and verify everything is good. + if not connection.is_ready_for_command(): + connection.disconnect() + connection.connect() + if not connection.is_ready_for_command(): + raise ConnectionError('Connection not ready') + except: # noqa: E722 + # release the connection back to the pool so that we don't leak it + self.release(connection) + raise + return connection def release(self, connection): |