summaryrefslogtreecommitdiff
path: root/redis/selector.py
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2019-02-04 17:44:39 -0800
committerAndy McCurdy <andy@andymccurdy.com>2019-02-04 17:45:27 -0800
commitcfa2bc9b7ea860eb4a002eaa7029ecee01e39735 (patch)
tree9dc5d02bafeadb3278c6e58143454ffc27a7d3c8 /redis/selector.py
parenta4644592162afdfe5b8809fa28eff041f7be6993 (diff)
downloadredis-py-cfa2bc9b7ea860eb4a002eaa7029ecee01e39735.tar.gz
attempt to provide only healthy connections from the pool
Adds redis.selector, a module that provides the best selector strategy available on the current platform. A redis.selector polls a socket to provide two pieces of functionality: 1. Check whether data can be read from the socket. Prior versions of redis-py provided this behavior with just select.select(). select() has lots of limitations, most notably a limit of ~1024 file descriptors. Now that better selectors are available, this should make can_read() faster and able to accomodate more clients. See #1115 and #486 2. Check whether a socket is ready for a command to be sent. This doubles as a health check. It ensures that the socket is available for writing, has no data to read and has no known errors. Anytime a socket is disconnected or hung up, data is available to be read, typically zero bytes. ConnectionPool.get_connection has been modified to ensure that connections it returns are connected and are ready for a command to be sent. If get_connection encounters a case where a socket isn't ready for a command the connection is reconnected and checked again. TODO: more tests for this stuff. implement EPoll and KQueue selectors. Fixes #1115 Fixes #486
Diffstat (limited to 'redis/selector.py')
-rw-r--r--redis/selector.py187
1 files changed, 187 insertions, 0 deletions
diff --git a/redis/selector.py b/redis/selector.py
new file mode 100644
index 0000000..fd9b12b
--- /dev/null
+++ b/redis/selector.py
@@ -0,0 +1,187 @@
+import errno
+import select
+from redis.exceptions import RedisError
+
+
+_DEFAULT_SELECTOR = None
+
+
+class BaseSelector(object):
+ """
+ Base class for all Selectors
+ """
+ def __init__(self, sock):
+ self.sock = sock
+
+ def can_read(self, timeout=0):
+ """
+ Return True if data is ready to be read from the socket,
+ otherwise False.
+
+ This doesn't guarentee that the socket is still connected, just that
+ there is data to read.
+
+ Automatically retries EINTR errors based on PEP 475.
+ """
+ while True:
+ try:
+ return self.check_can_read(timeout)
+ except (select.error, IOError) as ex:
+ if self.errno_from_exception(ex) == errno.EINTR:
+ continue
+ return False
+
+ def is_ready_for_command(self, timeout=0):
+ """
+ Return True if the socket is ready to send a command,
+ otherwise False.
+
+ Automatically retries EINTR errors based on PEP 475.
+ """
+ while True:
+ try:
+ return self.check_is_ready_for_command(timeout)
+ except (select.error, IOError) as ex:
+ if self.errno_from_exception(ex) == errno.EINTR:
+ continue
+ return False
+
+ def check_can_read(self, timeout):
+ """
+ Perform the can_read check. Subclasses should implement this.
+ """
+ raise NotImplementedError
+
+ def check_is_ready_for_command(self, timeout):
+ """
+ Perform the is_ready_for_command check. Subclasses should
+ implement this.
+ """
+ raise NotImplementedError
+
+ def close(self):
+ """
+ Close the selector.
+ """
+ self.sock = None
+
+ def errno_from_exception(self, ex):
+ """
+ Get the error number from an exception
+ """
+ if hasattr(ex, 'errno'):
+ return ex.errno
+ elif ex.args:
+ return ex.args[0]
+ else:
+ return None
+
+
+if hasattr(select, 'select'):
+ class SelectSelector(BaseSelector):
+ """
+ A select-based selector that should work on most platforms.
+
+ This is the worst poll strategy and should only be used if no other
+ option is available.
+ """
+ def check_can_read(self, timeout):
+ """
+ Return True if data is ready to be read from the socket,
+ otherwise False.
+
+ This doesn't guarentee that the socket is still connected, just
+ that there is data to read.
+ """
+ return bool(select.select([self.sock], [], [], timeout)[0])
+
+ def check_is_ready_for_command(self, timeout):
+ """
+ Return True if the socket is ready to send a command,
+ otherwise False.
+ """
+ r, w, e = select.select([self.sock], [self.sock], [self.sock],
+ timeout)
+ return bool(w and not r and not e)
+
+
+if hasattr(select, 'poll'):
+ class PollSelector(BaseSelector):
+ """
+ A poll-based selector that should work on (almost?) all versions
+ of Unix
+ """
+ _EVENT_MASK = (select.POLLIN | select.POLLPRI | select.POLLOUT |
+ select.POLLERR | select.POLLHUP)
+ _READ_MASK = select.POLLIN | select.POLLPRI
+ _WRITE_MASK = select.POLLOUT
+
+ def __init__(self, sock):
+ super().__init__(sock)
+ self.poller = select.poll()
+ self.poller.register(sock, self._EVENT_MASK)
+
+ def close(self):
+ """
+ Close the selector.
+ """
+ try:
+ self.poller.unregister(self.sock)
+ except (KeyError, ValueError):
+ # KeyError is raised if somehow the socket was not registered
+ # ValueError is raised if the socket's file descriptor is
+ # negative.
+ # In either case, we can't do anything better than to remove
+ # the reference to the poller.
+ pass
+ self.poller = None
+ self.sock = None
+
+ def check_can_read(self, timeout=0):
+ """
+ Return True if data is ready to be read from the socket,
+ otherwise False.
+
+ This doesn't guarentee that the socket is still connected, just
+ that there is data to read.
+ """
+ events = self.poller.poll(0)
+ return bool(events and events[0][1] & self._READ_MASK)
+
+ def check_is_ready_for_command(self, timeout=0):
+ """
+ Return True if the socket is ready to send a command,
+ otherwise False
+ """
+ events = self.poller.poll(0)
+ return bool(events and events[0][1] == self._WRITE_MASK)
+
+
+def has_selector(selector):
+ "Determine if the current platform has the selector available"
+ try:
+ if selector == 'poll':
+ # the select module offers the poll selector even if the platform
+ # doesn't support it. Attempt to poll for nothing to make sure
+ # poll is available
+ p = select.poll()
+ p.poll(0)
+ else:
+ # the other selectors will fail when instantiated
+ getattr(select, selector)().close()
+ return True
+ except (OSError, AttributeError):
+ return False
+
+
+def DefaultSelector(sock):
+ "Return the best selector for the platform"
+ global _DEFAULT_SELECTOR
+ if _DEFAULT_SELECTOR is None:
+ if has_selector('poll'):
+ _DEFAULT_SELECTOR = PollSelector
+ elif hasattr(select, 'select'):
+ _DEFAULT_SELECTOR = SelectSelector
+ else:
+ raise RedisError('Platform does not support any selectors')
+ return _DEFAULT_SELECTOR(sock)