summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/client.py')
-rwxr-xr-xredis/client.py106
1 files changed, 43 insertions, 63 deletions
diff --git a/redis/client.py b/redis/client.py
index 753770e..0ae64be 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -1,12 +1,12 @@
from itertools import chain
import copy
import datetime
-import hashlib
import re
import threading
import time
import warnings
-from redis.commands import CoreCommands, RedisModuleCommands, list_or_args
+from redis.commands import (CoreCommands, RedisModuleCommands,
+ SentinelCommands, list_or_args)
from redis.connection import (ConnectionPool, UnixDomainSocketConnection,
SSLConnection)
from redis.lock import Lock
@@ -14,7 +14,6 @@ from redis.exceptions import (
ConnectionError,
ExecAbortError,
ModuleError,
- NoScriptError,
PubSubError,
RedisError,
ResponseError,
@@ -26,6 +25,9 @@ from redis.utils import safe_str, str_if_bytes
SYM_EMPTY = b''
EMPTY_RESPONSE = 'EMPTY_RESPONSE'
+# some responses (ie. dump) are binary, and just meant to never be decoded
+NEVER_DECODE = 'NEVER_DECODE'
+
def timestamp_to_datetime(response):
"Converts a unix timestamp to a Python datetime object"
@@ -460,6 +462,7 @@ def _parse_node_line(line):
line_items = line.split(' ')
node_id, addr, flags, master_id, ping, pong, epoch, \
connected = line.split(' ')[:8]
+ addr = addr.split('@')[0]
slots = [sl.split('-') for sl in line_items[8:]]
node_dict = {
'node_id': node_id,
@@ -475,8 +478,13 @@ def _parse_node_line(line):
def parse_cluster_nodes(response, **options):
- raw_lines = str_if_bytes(response).splitlines()
- return dict(_parse_node_line(line) for line in raw_lines)
+ """
+ @see: https://redis.io/commands/cluster-nodes # string
+ @see: https://redis.io/commands/cluster-replicas # list of string
+ """
+ if isinstance(response, str):
+ response = response.splitlines()
+ return dict(_parse_node_line(str_if_bytes(node)) for node in response)
def parse_geosearch_generic(response, **options):
@@ -515,6 +523,21 @@ def parse_geosearch_generic(response, **options):
]
+def parse_command(response, **options):
+ commands = {}
+ for command in response:
+ cmd_dict = {}
+ cmd_name = str_if_bytes(command[0])
+ cmd_dict['name'] = cmd_name
+ cmd_dict['arity'] = int(command[1])
+ cmd_dict['flags'] = [str_if_bytes(flag) for flag in command[2]]
+ cmd_dict['first_key_pos'] = command[3]
+ cmd_dict['last_key_pos'] = command[4]
+ cmd_dict['step_count'] = command[5]
+ commands[cmd_name] = cmd_dict
+ return commands
+
+
def parse_pubsub_numsub(response, **options):
return list(zip(response[0::2], response[1::2]))
@@ -606,7 +629,7 @@ def parse_set_result(response, **options):
return response and str_if_bytes(response) == 'OK'
-class Redis(RedisModuleCommands, CoreCommands, object):
+class Redis(RedisModuleCommands, CoreCommands, SentinelCommands, object):
"""
Implementation of the Redis protocol.
@@ -703,7 +726,10 @@ class Redis(RedisModuleCommands, CoreCommands, object):
'CLUSTER SET-CONFIG-EPOCH': bool_ok,
'CLUSTER SETSLOT': bool_ok,
'CLUSTER SLAVES': parse_cluster_nodes,
+ 'CLUSTER REPLICAS': parse_cluster_nodes,
+ 'COMMAND': parse_command,
'COMMAND COUNT': int,
+ 'COMMAND GETKEYS': lambda r: list(map(str_if_bytes, r)),
'CONFIG GET': parse_config_get,
'CONFIG RESETSTAT': bool_ok,
'CONFIG SET': bool_ok,
@@ -826,7 +852,7 @@ class Redis(RedisModuleCommands, CoreCommands, object):
ssl_check_hostname=False,
max_connections=None, single_connection_client=False,
health_check_interval=0, client_name=None, username=None,
- retry=None):
+ retry=None, redis_connect_func=None):
"""
Initialize a new Redis client.
To specify a retry policy, first set `retry_on_timeout` to `True`
@@ -854,7 +880,8 @@ class Redis(RedisModuleCommands, CoreCommands, object):
'retry': copy.deepcopy(retry),
'max_connections': max_connections,
'health_check_interval': health_check_interval,
- 'client_name': client_name
+ 'client_name': client_name,
+ 'redis_connect_func': redis_connect_func
}
# based on input, setup appropriate connection args
if unix_socket_path is not None:
@@ -890,12 +917,6 @@ class Redis(RedisModuleCommands, CoreCommands, object):
self.response_callbacks = CaseInsensitiveDict(
self.__class__.RESPONSE_CALLBACKS)
- # preload our class with the available redis commands
- try:
- self.__redis_commands__()
- except RedisError:
- pass
-
def __repr__(self):
return "%s<%s>" % (type(self).__name__, repr(self.connection_pool))
@@ -927,18 +948,6 @@ class Redis(RedisModuleCommands, CoreCommands, object):
"""
setattr(self, funcname, func)
- def __redis_commands__(self):
- """Store the list of available commands, for our redis instance."""
- cmds = getattr(self, '__commands__', None)
- if cmds is not None:
- return cmds
- try:
- cmds = [c[0].upper().decode() for c in self.command()]
- except AttributeError: # if encoded
- cmds = [c[0].upper() for c in self.command()]
- self.__commands__ = cmds
- return cmds
-
def pipeline(self, transaction=True, shard_hint=None):
"""
Return a new pipeline object that can queue multiple commands for
@@ -1098,7 +1107,10 @@ class Redis(RedisModuleCommands, CoreCommands, object):
def parse_response(self, connection, command_name, **options):
"Parses a response from the Redis server"
try:
- response = connection.read_response()
+ if NEVER_DECODE in options:
+ response = connection.read_response(disable_decoding=True)
+ else:
+ response = connection.read_response()
except ResponseError:
if EMPTY_RESPONSE in options:
return options[EMPTY_RESPONSE]
@@ -1190,14 +1202,16 @@ class PubSub:
HEALTH_CHECK_MESSAGE = 'redis-py-health-check'
def __init__(self, connection_pool, shard_hint=None,
- ignore_subscribe_messages=False):
+ ignore_subscribe_messages=False, encoder=None):
self.connection_pool = connection_pool
self.shard_hint = shard_hint
self.ignore_subscribe_messages = ignore_subscribe_messages
self.connection = None
# we need to know the encoding options for this connection in order
# to lookup channel and pattern names for callback handlers.
- self.encoder = self.connection_pool.get_encoder()
+ self.encoder = encoder
+ if self.encoder is None:
+ self.encoder = self.connection_pool.get_encoder()
if self.encoder.decode_responses:
self.health_check_response = ['pong', self.HEALTH_CHECK_MESSAGE]
else:
@@ -1880,37 +1894,3 @@ class Pipeline(Redis):
def unwatch(self):
"Unwatches all previously specified keys"
return self.watching and self.execute_command('UNWATCH') or True
-
-
-class Script:
- "An executable Lua script object returned by ``register_script``"
-
- def __init__(self, registered_client, script):
- self.registered_client = registered_client
- self.script = script
- # Precalculate and store the SHA1 hex digest of the script.
-
- if isinstance(script, str):
- # We need the encoding from the client in order to generate an
- # accurate byte representation of the script
- encoder = registered_client.connection_pool.get_encoder()
- script = encoder.encode(script)
- self.sha = hashlib.sha1(script).hexdigest()
-
- def __call__(self, keys=[], args=[], client=None):
- "Execute the script, passing any required ``args``"
- if client is None:
- client = self.registered_client
- args = tuple(keys) + tuple(args)
- # make sure the Redis server knows about the script
- if isinstance(client, Pipeline):
- # Make sure the pipeline can register the script before executing.
- client.scripts.add(self)
- try:
- return client.evalsha(self.sha, len(keys), *args)
- except NoScriptError:
- # Maybe the client is pointed to a different server than the client
- # that created this instance?
- # Overwrite the sha just in case there was a discrepancy.
- self.sha = client.script_load(self.script)
- return client.evalsha(self.sha, len(keys), *args)