diff options
author | Chayim I. Kirshen <c@kirshen.com> | 2021-11-29 20:07:20 +0200 |
---|---|---|
committer | Chayim I. Kirshen <c@kirshen.com> | 2021-11-29 20:07:20 +0200 |
commit | 39fc550251d238cdba7966ff153321ca9e488508 (patch) | |
tree | e79360ec70feac7f0ab992813f8b2d43f7c67bab /redis/connection.py | |
parent | a924269502b96dc71339cca3dfb20aaa3899a9d0 (diff) | |
parent | 4db85ef574a64a2b230a3ae1ff19c9d04065a114 (diff) | |
download | redis-py-ck-linkdocs.tar.gz |
merging masterck-linkdocs
Diffstat (limited to 'redis/connection.py')
-rwxr-xr-x | redis/connection.py | 63 |
1 files changed, 37 insertions, 26 deletions
diff --git a/redis/connection.py b/redis/connection.py index cb9acb4..6ff3650 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -1,4 +1,4 @@ -from distutils.version import LooseVersion +from packaging.version import Version from itertools import chain from time import time from queue import LifoQueue, Empty, Full @@ -9,9 +9,9 @@ import io import os import socket import threading -import warnings import weakref +from redis.backoff import NoBackoff from redis.exceptions import ( AuthenticationError, AuthenticationWrongNumberOfArgsError, @@ -29,9 +29,9 @@ from redis.exceptions import ( TimeoutError, ModuleError, ) -from redis.utils import HIREDIS_AVAILABLE, str_if_bytes -from redis.backoff import NoBackoff + from redis.retry import Retry +from redis.utils import HIREDIS_AVAILABLE, str_if_bytes try: import ssl @@ -55,26 +55,18 @@ NONBLOCKING_EXCEPTIONS = tuple(NONBLOCKING_EXCEPTION_ERROR_NUMBERS.keys()) if HIREDIS_AVAILABLE: import hiredis - hiredis_version = LooseVersion(hiredis.__version__) + hiredis_version = Version(hiredis.__version__) HIREDIS_SUPPORTS_CALLABLE_ERRORS = \ - hiredis_version >= LooseVersion('0.1.3') + hiredis_version >= Version('0.1.3') HIREDIS_SUPPORTS_BYTE_BUFFER = \ - hiredis_version >= LooseVersion('0.1.4') + hiredis_version >= Version('0.1.4') HIREDIS_SUPPORTS_ENCODING_ERRORS = \ - hiredis_version >= LooseVersion('1.0.0') - - if not HIREDIS_SUPPORTS_BYTE_BUFFER: - msg = ("redis-py works best with hiredis >= 0.1.4. You're running " - "hiredis %s. Please consider upgrading." % hiredis.__version__) - warnings.warn(msg) + hiredis_version >= Version('1.0.0') HIREDIS_USE_BYTE_BUFFER = True # only use byte buffer if hiredis supports it if not HIREDIS_SUPPORTS_BYTE_BUFFER: HIREDIS_USE_BYTE_BUFFER = False -else: - msg = "redis-py works best with hiredis. Please consider installing" - warnings.warn(msg) SYM_STAR = b'*' SYM_DOLLAR = b'$' @@ -323,7 +315,7 @@ class PythonParser(BaseParser): def can_read(self, timeout): return self._buffer and self._buffer.can_read(timeout) - def read_response(self): + def read_response(self, disable_decoding=False): raw = self._buffer.readline() if not raw: raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) @@ -363,8 +355,9 @@ class PythonParser(BaseParser): length = int(response) if length == -1: return None - response = [self.read_response() for i in range(length)] - if isinstance(response, bytes): + response = [self.read_response(disable_decoding=disable_decoding) + for i in range(length)] + if isinstance(response, bytes) and disable_decoding is False: response = self.encoder.decode(response) return response @@ -458,7 +451,7 @@ class HiredisParser(BaseParser): if custom_timeout: sock.settimeout(self._socket_timeout) - def read_response(self): + def read_response(self, disable_decoding=False): if not self._reader: raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) @@ -507,7 +500,7 @@ class Connection: encoding_errors='strict', decode_responses=False, parser_class=DefaultParser, socket_read_size=65536, health_check_interval=0, client_name=None, username=None, - retry=None): + retry=None, redis_connect_func=None): """ Initialize a new Connection. To specify a retry policy, first set `retry_on_timeout` to `True` @@ -537,8 +530,10 @@ class Connection: self.health_check_interval = health_check_interval self.next_health_check = 0 self.encoder = Encoder(encoding, encoding_errors, decode_responses) + self.redis_connect_func = redis_connect_func self._sock = None - self._parser = parser_class(socket_read_size=socket_read_size) + self._socket_read_size = socket_read_size + self.set_parser(parser_class) self._connect_callbacks = [] self._buffer_cutoff = 6000 @@ -568,6 +563,14 @@ class Connection: def clear_connect_callbacks(self): self._connect_callbacks = [] + def set_parser(self, parser_class): + """ + Creates a new instance of parser_class with socket size: + _socket_read_size and assigns it to the parser for the connection + :param parser_class: The required parser class + """ + self._parser = parser_class(socket_read_size=self._socket_read_size) + def connect(self): "Connects to the Redis server if not already connected" if self._sock: @@ -581,7 +584,12 @@ class Connection: self._sock = sock try: - self.on_connect() + if self.redis_connect_func is None: + # Use the default on_connect function + self.on_connect() + else: + # Use the passed function redis_connect_func + self.redis_connect_func(self) except RedisError: # clean up after any error in on_connect self.disconnect() @@ -751,10 +759,12 @@ class Connection: self.connect() return self._parser.can_read(timeout) - def read_response(self): + def read_response(self, disable_decoding=False): """Read the response from a previously sent command""" try: - response = self._parser.read_response() + response = self._parser.read_response( + disable_decoding=disable_decoding + ) except socket.timeout: self.disconnect() raise TimeoutError("Timeout reading from %s:%s" % @@ -912,7 +922,8 @@ class UnixDomainSocketConnection(Connection): self.next_health_check = 0 self.encoder = Encoder(encoding, encoding_errors, decode_responses) self._sock = None - self._parser = parser_class(socket_read_size=socket_read_size) + self._socket_read_size = socket_read_size + self.set_parser(parser_class) self._connect_callbacks = [] self._buffer_cutoff = 6000 |