summaryrefslogtreecommitdiff
path: root/redis/connection.py
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2019-02-04 17:44:39 -0800
committerAndy McCurdy <andy@andymccurdy.com>2019-02-04 17:45:27 -0800
commitcfa2bc9b7ea860eb4a002eaa7029ecee01e39735 (patch)
tree9dc5d02bafeadb3278c6e58143454ffc27a7d3c8 /redis/connection.py
parenta4644592162afdfe5b8809fa28eff041f7be6993 (diff)
downloadredis-py-cfa2bc9b7ea860eb4a002eaa7029ecee01e39735.tar.gz
attempt to provide only healthy connections from the pool
Adds redis.selector, a module that provides the best selector strategy available on the current platform. A redis.selector polls a socket to provide two pieces of functionality: 1. Check whether data can be read from the socket. Prior versions of redis-py provided this behavior with just select.select(). select() has lots of limitations, most notably a limit of ~1024 file descriptors. Now that better selectors are available, this should make can_read() faster and able to accomodate more clients. See #1115 and #486 2. Check whether a socket is ready for a command to be sent. This doubles as a health check. It ensures that the socket is available for writing, has no data to read and has no known errors. Anytime a socket is disconnected or hung up, data is available to be read, typically zero bytes. ConnectionPool.get_connection has been modified to ensure that connections it returns are connected and are ready for a command to be sent. If get_connection encounters a case where a socket isn't ready for a command the connection is reconnected and checked again. TODO: more tests for this stuff. implement EPoll and KQueue selectors. Fixes #1115 Fixes #486
Diffstat (limited to 'redis/connection.py')
-rwxr-xr-xredis/connection.py45
1 files changed, 42 insertions, 3 deletions
diff --git a/redis/connection.py b/redis/connection.py
index f631cab..beeba30 100755
--- a/redis/connection.py
+++ b/redis/connection.py
@@ -17,7 +17,7 @@ except ImportError:
from redis._compat import (xrange, imap, byte_to_chr, unicode, long,
nativestr, basestring, iteritems,
LifoQueue, Empty, Full, urlparse, parse_qs,
- recv, recv_into, select, unquote)
+ recv, recv_into, unquote)
from redis.exceptions import (
DataError,
RedisError,
@@ -31,6 +31,7 @@ from redis.exceptions import (
ExecAbortError,
ReadOnlyError
)
+from redis.selector import DefaultSelector
from redis.utils import HIREDIS_AVAILABLE
if HIREDIS_AVAILABLE:
import hiredis
@@ -496,6 +497,7 @@ class Connection(object):
raise ConnectionError(self._error_message(e))
self._sock = sock
+ self._selector = DefaultSelector(sock)
try:
self.on_connect()
except RedisError:
@@ -623,8 +625,11 @@ class Connection(object):
if not sock:
self.connect()
sock = self._sock
- return self._parser.can_read() or \
- bool(select([sock], [], [], timeout)[0])
+ return self._parser.can_read() or self._selector.can_read(timeout)
+
+ def is_ready_for_command(self):
+ "Check if the connection is ready for a command"
+ return self._selector.is_ready_for_command()
def read_response(self):
"Read the response from a previously sent command"
@@ -984,6 +989,23 @@ class ConnectionPool(object):
except IndexError:
connection = self.make_connection()
self._in_use_connections.add(connection)
+ try:
+ # ensure this connection is connected to Redis
+ connection.connect()
+ # connections that the pool provides should be ready to send
+ # a command. if not, the connection was either returned to the
+ # pool before all data has been read or the socket has been
+ # closed. either way, reconnect and verify everything is good.
+ if not connection.is_ready_for_command():
+ connection.disconnect()
+ connection.connect()
+ if not connection.is_ready_for_command():
+ raise ConnectionError('Connection not ready')
+ except: # noqa: E722
+ # release the connection back to the pool so that we don't leak it
+ self.release(connection)
+ raise
+
return connection
def get_encoder(self):
@@ -1115,6 +1137,23 @@ class BlockingConnectionPool(ConnectionPool):
if connection is None:
connection = self.make_connection()
+ try:
+ # ensure this connection is connected to Redis
+ connection.connect()
+ # connections that the pool provides should be ready to send
+ # a command. if not, the connection was either returned to the
+ # pool before all data has been read or the socket has been
+ # closed. either way, reconnect and verify everything is good.
+ if not connection.is_ready_for_command():
+ connection.disconnect()
+ connection.connect()
+ if not connection.is_ready_for_command():
+ raise ConnectionError('Connection not ready')
+ except: # noqa: E722
+ # release the connection back to the pool so that we don't leak it
+ self.release(connection)
+ raise
+
return connection
def release(self, connection):