diff options
Diffstat (limited to 'redis/connection.py')
-rwxr-xr-x | redis/connection.py | 45 |
1 files changed, 42 insertions, 3 deletions
diff --git a/redis/connection.py b/redis/connection.py index f631cab..beeba30 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -17,7 +17,7 @@ except ImportError: from redis._compat import (xrange, imap, byte_to_chr, unicode, long, nativestr, basestring, iteritems, LifoQueue, Empty, Full, urlparse, parse_qs, - recv, recv_into, select, unquote) + recv, recv_into, unquote) from redis.exceptions import ( DataError, RedisError, @@ -31,6 +31,7 @@ from redis.exceptions import ( ExecAbortError, ReadOnlyError ) +from redis.selector import DefaultSelector from redis.utils import HIREDIS_AVAILABLE if HIREDIS_AVAILABLE: import hiredis @@ -496,6 +497,7 @@ class Connection(object): raise ConnectionError(self._error_message(e)) self._sock = sock + self._selector = DefaultSelector(sock) try: self.on_connect() except RedisError: @@ -623,8 +625,11 @@ class Connection(object): if not sock: self.connect() sock = self._sock - return self._parser.can_read() or \ - bool(select([sock], [], [], timeout)[0]) + return self._parser.can_read() or self._selector.can_read(timeout) + + def is_ready_for_command(self): + "Check if the connection is ready for a command" + return self._selector.is_ready_for_command() def read_response(self): "Read the response from a previously sent command" @@ -984,6 +989,23 @@ class ConnectionPool(object): except IndexError: connection = self.make_connection() self._in_use_connections.add(connection) + try: + # ensure this connection is connected to Redis + connection.connect() + # connections that the pool provides should be ready to send + # a command. if not, the connection was either returned to the + # pool before all data has been read or the socket has been + # closed. either way, reconnect and verify everything is good. + if not connection.is_ready_for_command(): + connection.disconnect() + connection.connect() + if not connection.is_ready_for_command(): + raise ConnectionError('Connection not ready') + except: # noqa: E722 + # release the connection back to the pool so that we don't leak it + self.release(connection) + raise + return connection def get_encoder(self): @@ -1115,6 +1137,23 @@ class BlockingConnectionPool(ConnectionPool): if connection is None: connection = self.make_connection() + try: + # ensure this connection is connected to Redis + connection.connect() + # connections that the pool provides should be ready to send + # a command. if not, the connection was either returned to the + # pool before all data has been read or the socket has been + # closed. either way, reconnect and verify everything is good. + if not connection.is_ready_for_command(): + connection.disconnect() + connection.connect() + if not connection.is_ready_for_command(): + raise ConnectionError('Connection not ready') + except: # noqa: E722 + # release the connection back to the pool so that we don't leak it + self.release(connection) + raise + return connection def release(self, connection): |