diff options
Diffstat (limited to 'redis/client.py')
-rwxr-xr-x | redis/client.py | 106 |
1 files changed, 43 insertions, 63 deletions
diff --git a/redis/client.py b/redis/client.py index 753770e..0ae64be 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1,12 +1,12 @@ from itertools import chain import copy import datetime -import hashlib import re import threading import time import warnings -from redis.commands import CoreCommands, RedisModuleCommands, list_or_args +from redis.commands import (CoreCommands, RedisModuleCommands, + SentinelCommands, list_or_args) from redis.connection import (ConnectionPool, UnixDomainSocketConnection, SSLConnection) from redis.lock import Lock @@ -14,7 +14,6 @@ from redis.exceptions import ( ConnectionError, ExecAbortError, ModuleError, - NoScriptError, PubSubError, RedisError, ResponseError, @@ -26,6 +25,9 @@ from redis.utils import safe_str, str_if_bytes SYM_EMPTY = b'' EMPTY_RESPONSE = 'EMPTY_RESPONSE' +# some responses (ie. dump) are binary, and just meant to never be decoded +NEVER_DECODE = 'NEVER_DECODE' + def timestamp_to_datetime(response): "Converts a unix timestamp to a Python datetime object" @@ -460,6 +462,7 @@ def _parse_node_line(line): line_items = line.split(' ') node_id, addr, flags, master_id, ping, pong, epoch, \ connected = line.split(' ')[:8] + addr = addr.split('@')[0] slots = [sl.split('-') for sl in line_items[8:]] node_dict = { 'node_id': node_id, @@ -475,8 +478,13 @@ def _parse_node_line(line): def parse_cluster_nodes(response, **options): - raw_lines = str_if_bytes(response).splitlines() - return dict(_parse_node_line(line) for line in raw_lines) + """ + @see: https://redis.io/commands/cluster-nodes # string + @see: https://redis.io/commands/cluster-replicas # list of string + """ + if isinstance(response, str): + response = response.splitlines() + return dict(_parse_node_line(str_if_bytes(node)) for node in response) def parse_geosearch_generic(response, **options): @@ -515,6 +523,21 @@ def parse_geosearch_generic(response, **options): ] +def parse_command(response, **options): + commands = {} + for command in response: + cmd_dict = {} + cmd_name = str_if_bytes(command[0]) + cmd_dict['name'] = cmd_name + cmd_dict['arity'] = int(command[1]) + cmd_dict['flags'] = [str_if_bytes(flag) for flag in command[2]] + cmd_dict['first_key_pos'] = command[3] + cmd_dict['last_key_pos'] = command[4] + cmd_dict['step_count'] = command[5] + commands[cmd_name] = cmd_dict + return commands + + def parse_pubsub_numsub(response, **options): return list(zip(response[0::2], response[1::2])) @@ -606,7 +629,7 @@ def parse_set_result(response, **options): return response and str_if_bytes(response) == 'OK' -class Redis(RedisModuleCommands, CoreCommands, object): +class Redis(RedisModuleCommands, CoreCommands, SentinelCommands, object): """ Implementation of the Redis protocol. @@ -703,7 +726,10 @@ class Redis(RedisModuleCommands, CoreCommands, object): 'CLUSTER SET-CONFIG-EPOCH': bool_ok, 'CLUSTER SETSLOT': bool_ok, 'CLUSTER SLAVES': parse_cluster_nodes, + 'CLUSTER REPLICAS': parse_cluster_nodes, + 'COMMAND': parse_command, 'COMMAND COUNT': int, + 'COMMAND GETKEYS': lambda r: list(map(str_if_bytes, r)), 'CONFIG GET': parse_config_get, 'CONFIG RESETSTAT': bool_ok, 'CONFIG SET': bool_ok, @@ -826,7 +852,7 @@ class Redis(RedisModuleCommands, CoreCommands, object): ssl_check_hostname=False, max_connections=None, single_connection_client=False, health_check_interval=0, client_name=None, username=None, - retry=None): + retry=None, redis_connect_func=None): """ Initialize a new Redis client. To specify a retry policy, first set `retry_on_timeout` to `True` @@ -854,7 +880,8 @@ class Redis(RedisModuleCommands, CoreCommands, object): 'retry': copy.deepcopy(retry), 'max_connections': max_connections, 'health_check_interval': health_check_interval, - 'client_name': client_name + 'client_name': client_name, + 'redis_connect_func': redis_connect_func } # based on input, setup appropriate connection args if unix_socket_path is not None: @@ -890,12 +917,6 @@ class Redis(RedisModuleCommands, CoreCommands, object): self.response_callbacks = CaseInsensitiveDict( self.__class__.RESPONSE_CALLBACKS) - # preload our class with the available redis commands - try: - self.__redis_commands__() - except RedisError: - pass - def __repr__(self): return "%s<%s>" % (type(self).__name__, repr(self.connection_pool)) @@ -927,18 +948,6 @@ class Redis(RedisModuleCommands, CoreCommands, object): """ setattr(self, funcname, func) - def __redis_commands__(self): - """Store the list of available commands, for our redis instance.""" - cmds = getattr(self, '__commands__', None) - if cmds is not None: - return cmds - try: - cmds = [c[0].upper().decode() for c in self.command()] - except AttributeError: # if encoded - cmds = [c[0].upper() for c in self.command()] - self.__commands__ = cmds - return cmds - def pipeline(self, transaction=True, shard_hint=None): """ Return a new pipeline object that can queue multiple commands for @@ -1098,7 +1107,10 @@ class Redis(RedisModuleCommands, CoreCommands, object): def parse_response(self, connection, command_name, **options): "Parses a response from the Redis server" try: - response = connection.read_response() + if NEVER_DECODE in options: + response = connection.read_response(disable_decoding=True) + else: + response = connection.read_response() except ResponseError: if EMPTY_RESPONSE in options: return options[EMPTY_RESPONSE] @@ -1190,14 +1202,16 @@ class PubSub: HEALTH_CHECK_MESSAGE = 'redis-py-health-check' def __init__(self, connection_pool, shard_hint=None, - ignore_subscribe_messages=False): + ignore_subscribe_messages=False, encoder=None): self.connection_pool = connection_pool self.shard_hint = shard_hint self.ignore_subscribe_messages = ignore_subscribe_messages self.connection = None # we need to know the encoding options for this connection in order # to lookup channel and pattern names for callback handlers. - self.encoder = self.connection_pool.get_encoder() + self.encoder = encoder + if self.encoder is None: + self.encoder = self.connection_pool.get_encoder() if self.encoder.decode_responses: self.health_check_response = ['pong', self.HEALTH_CHECK_MESSAGE] else: @@ -1880,37 +1894,3 @@ class Pipeline(Redis): def unwatch(self): "Unwatches all previously specified keys" return self.watching and self.execute_command('UNWATCH') or True - - -class Script: - "An executable Lua script object returned by ``register_script``" - - def __init__(self, registered_client, script): - self.registered_client = registered_client - self.script = script - # Precalculate and store the SHA1 hex digest of the script. - - if isinstance(script, str): - # We need the encoding from the client in order to generate an - # accurate byte representation of the script - encoder = registered_client.connection_pool.get_encoder() - script = encoder.encode(script) - self.sha = hashlib.sha1(script).hexdigest() - - def __call__(self, keys=[], args=[], client=None): - "Execute the script, passing any required ``args``" - if client is None: - client = self.registered_client - args = tuple(keys) + tuple(args) - # make sure the Redis server knows about the script - if isinstance(client, Pipeline): - # Make sure the pipeline can register the script before executing. - client.scripts.add(self) - try: - return client.evalsha(self.sha, len(keys), *args) - except NoScriptError: - # Maybe the client is pointed to a different server than the client - # that created this instance? - # Overwrite the sha just in case there was a discrepancy. - self.sha = client.script_load(self.script) - return client.evalsha(self.sha, len(keys), *args) |