diff options
author | dvora-h <67596500+dvora-h@users.noreply.github.com> | 2022-07-28 14:24:17 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-28 14:24:17 +0300 |
commit | c94821c11e74d360ae3ccdcf9581cfe24e120a07 (patch) | |
tree | cb8dae16e14be25d443bd67480e3e4a9dae201ae /redis/commands/graph/commands.py | |
parent | f9f9d06c9951f8536bf9321dcebc96759eae03e0 (diff) | |
download | redis-py-c94821c11e74d360ae3ccdcf9581cfe24e120a07.tar.gz |
Add support for async GRAPH module (#2273)
* Add support for async graph
* linters
* fix docstring
* Use retry mechanism in async version of Connection objects (#2271)
* fix is_connected (#2278)
* fix: workaround asyncio bug on connection reset by peer (#2259)
Fixes #2237
* Fix crash: key expire while search (#2270)
* fix expire while search
* sleep
* docs: Fix a few typos (#2274)
* docs: Fix a few typos
There are small typos in:
- redis/cluster.py
- redis/commands/core.py
- redis/ocsp.py
- tests/test_cluster.py
Fixes:
- Should read `validity` rather than `valididy`.
- Should read `reinitialize` rather than `reinitilize`.
- Should read `farthest` rather than `farest`.
- Should read `commands` rather than `comamnds`.
* Update core.py
* async_cluster: fix concurrent pipeline (#2280)
- each pipeline should create separate stacks for each node
* Add support for TIMESERIES 1.8 (#2296)
* Add support for timeseries 1.8
* fix info
* linters
* linters
* fix info test
* type hints
* linters
* Remove verbose logging from `redis-py/redis/cluster.py` (#2238)
* removed the logging module and its corresponding methods
* updated CHANGES
* except block for RedisClusterException and BusyLoadingError removed
* removed unused import (redis.exceptions.BusyLoadingError)
* empty commit to re-trigger Actions workflow
* replaced BaseException with Exception
* empty commit to re-trigger Actions workflow
* empty commit to re-trigger Actions workflow
* redundant logic removed
* re-trigger pipeline
* reverted changes
* re-trigger pipeline
* except logic changed
* redis stream example (#2269)
* redis stream example
* redis stream example on docs/examples.rst
Co-authored-by: pedro.frazao <perl.pf@netcf.org>
* Fix: `start_id` type for `XAUTOCLAIM` (#2257)
* Changed start_id type for xautoclaim
* Added to changes
Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com>
* Doc add timeseries example (#2267)
* DOC add timeseries example
* DOC add timeseries examples
* Apply suggestions
* Fix typo
Detention period => Retention period
Co-authored-by: Gauthier Imbert <gauthier@PC17>
* Fix warnings and resource usage problems in asyncio unittests (#2258)
* Use pytest-asyncio in auto mode
Remove overly genereric `pytestmark=pytest.mark.asyncio` causing lots of warning noise
* Use "Factories as Fixtures" test pattern for the `create_redis` fixture
this fixture is now async, avoiding teardown problems with missing event loops.
* Fix sporadic error on fast event loops, such as `--uvloop`
* Close connection, even if "username" was in kwargs
This fixes a resource usage warning in the async unittests.
* Do async cleanup of acl passwords via a fixture
* Remove unused import, fix whitespace
* Fix test with missing "await"
* Close pubsub objects after use in unittest
Use a simple fixture where possible, otherwise manually call pubsub.close()
* re-introduce `pytestmark=pytest.mark.asyncio` for python 3.6
* Use context manager to clean up connections in connection pool for unit tests
* Provide asynccontextmanager for python 3.6
* make `test_late_subscribe()` more robuste
* Catch a couple of additional leaked resources
* Graph - add counters for removed labels and properties (#2292)
* grpah - add counters for removed labels and properties
* added mock graph result set statistics
* docstrings for graph result set statistics
* format
* isort
* moved docstrings into functions
* cleaning up the readme and moving docs into readthedocs (#2291)
* cleaning up the readme and moving docs into readthedocs
* examples at the end as per pr comments
* async_cluster: fix max_connections/ssl & improve args (#2217)
* async_cluster: fix max_connections/ssl & improve args
- set proper connection_class if ssl = True
- pass max_connections/connection_class to ClusterNode
- recreate startup_nodes to properly initialize
- pass parser_class to Connection instead of changing it in on_connect
- only pass redis_connect_func if read_from_replicas = True
- add connection_error_retry_attempts parameter
- skip is_connected check in acquire_connection as it is already checked in send_packed_command
BREAKING:
- RedisCluster args except host & port are kw-only now
- RedisCluster will no longer accept unknown arguments
- RedisCluster will no longer accept url as an argument. Use RedisCluster.from_url
- RedisCluster.require_full_coverage defaults to True
- ClusterNode args except host, port, & server_type are kw-only now
* async_cluster: remove kw-only requirement from client
Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com>
* fix review comments
* fix
* fix review comments
* fix review comments
Co-authored-by: Chayim <chayim@users.noreply.github.com>
Co-authored-by: szumka <106675199+szumka@users.noreply.github.com>
Co-authored-by: Mehdi ABAAKOUK <sileht@sileht.net>
Co-authored-by: Tim Gates <tim.gates@iress.com>
Co-authored-by: Utkarsh Gupta <utkarshgupta137@gmail.com>
Co-authored-by: Nial Daly <34862917+nialdaly@users.noreply.github.com>
Co-authored-by: pedrofrazao <603718+pedrofrazao@users.noreply.github.com>
Co-authored-by: pedro.frazao <perl.pf@netcf.org>
Co-authored-by: Антон Безденежных <gamer392@yandex.ru>
Co-authored-by: Iglesys <g.imbert34@gmail.com>
Co-authored-by: Gauthier Imbert <gauthier@PC17>
Co-authored-by: Kristján Valur Jónsson <sweskman@gmail.com>
Co-authored-by: DvirDukhan <dvir@redis.com>
Diffstat (limited to 'redis/commands/graph/commands.py')
-rw-r--r-- | redis/commands/graph/commands.py | 146 |
1 files changed, 123 insertions, 23 deletions
diff --git a/redis/commands/graph/commands.py b/redis/commands/graph/commands.py index fe4224b..762ab42 100644 --- a/redis/commands/graph/commands.py +++ b/redis/commands/graph/commands.py @@ -3,7 +3,16 @@ from redis.exceptions import ResponseError from .exceptions import VersionMismatchException from .execution_plan import ExecutionPlan -from .query_result import QueryResult +from .query_result import AsyncQueryResult, QueryResult + +PROFILE_CMD = "GRAPH.PROFILE" +RO_QUERY_CMD = "GRAPH.RO_QUERY" +QUERY_CMD = "GRAPH.QUERY" +DELETE_CMD = "GRAPH.DELETE" +SLOWLOG_CMD = "GRAPH.SLOWLOG" +CONFIG_CMD = "GRAPH.CONFIG" +LIST_CMD = "GRAPH.LIST" +EXPLAIN_CMD = "GRAPH.EXPLAIN" class GraphCommands: @@ -52,33 +61,28 @@ class GraphCommands: query = q # handle query parameters - if params is not None: - query = self._build_params_header(params) + query + query = self._build_params_header(params) + query # construct query command # ask for compact result-set format # specify known graph version if profile: - cmd = "GRAPH.PROFILE" + cmd = PROFILE_CMD else: - cmd = "GRAPH.RO_QUERY" if read_only else "GRAPH.QUERY" + cmd = RO_QUERY_CMD if read_only else QUERY_CMD command = [cmd, self.name, query, "--compact"] # include timeout is specified - if timeout: - if not isinstance(timeout, int): - raise Exception("Timeout argument must be a positive integer") - command += ["timeout", timeout] + if isinstance(timeout, int): + command.extend(["timeout", timeout]) + elif timeout is not None: + raise Exception("Timeout argument must be a positive integer") # issue query try: response = self.execute_command(*command) return QueryResult(self, response, profile) except ResponseError as e: - if "wrong number of arguments" in str(e): - print( - "Note: RedisGraph Python requires server version 2.2.8 or above" - ) # noqa if "unknown command" in str(e) and read_only: # `GRAPH.RO_QUERY` is unavailable in older versions. return self.query(q, params, timeout, read_only=False) @@ -106,7 +110,7 @@ class GraphCommands: For more information see `DELETE <https://redis.io/commands/graph.delete>`_. # noqa """ self._clear_schema() - return self.execute_command("GRAPH.DELETE", self.name) + return self.execute_command(DELETE_CMD, self.name) # declared here, to override the built in redis.db.flush() def flush(self): @@ -146,7 +150,7 @@ class GraphCommands: 3. The issued query. 4. The amount of time needed for its execution, in milliseconds. """ - return self.execute_command("GRAPH.SLOWLOG", self.name) + return self.execute_command(SLOWLOG_CMD, self.name) def config(self, name, value=None, set=False): """ @@ -170,14 +174,14 @@ class GraphCommands: raise DataError( "``value`` can be provided only when ``set`` is True" ) # noqa - return self.execute_command("GRAPH.CONFIG", *params) + return self.execute_command(CONFIG_CMD, *params) def list_keys(self): """ Lists all graph keys in the keyspace. For more information see `GRAPH.LIST <https://redis.io/commands/graph.list>`_. # noqa """ - return self.execute_command("GRAPH.LIST") + return self.execute_command(LIST_CMD) def execution_plan(self, query, params=None): """ @@ -188,10 +192,9 @@ class GraphCommands: query: the query that will be executed params: query parameters """ - if params is not None: - query = self._build_params_header(params) + query + query = self._build_params_header(params) + query - plan = self.execute_command("GRAPH.EXPLAIN", self.name, query) + plan = self.execute_command(EXPLAIN_CMD, self.name, query) if isinstance(plan[0], bytes): plan = [b.decode() for b in plan] return "\n".join(plan) @@ -206,8 +209,105 @@ class GraphCommands: query: the query that will be executed params: query parameters """ - if params is not None: - query = self._build_params_header(params) + query + query = self._build_params_header(params) + query + + plan = self.execute_command(EXPLAIN_CMD, self.name, query) + return ExecutionPlan(plan) + + +class AsyncGraphCommands(GraphCommands): + async def query(self, q, params=None, timeout=None, read_only=False, profile=False): + """ + Executes a query against the graph. + For more information see `GRAPH.QUERY <https://oss.redis.com/redisgraph/master/commands/#graphquery>`_. # noqa + + Args: + + q : str + The query. + params : dict + Query parameters. + timeout : int + Maximum runtime for read queries in milliseconds. + read_only : bool + Executes a readonly query if set to True. + profile : bool + Return details on results produced by and time + spent in each operation. + """ + + # maintain original 'q' + query = q + + # handle query parameters + query = self._build_params_header(params) + query + + # construct query command + # ask for compact result-set format + # specify known graph version + if profile: + cmd = PROFILE_CMD + else: + cmd = RO_QUERY_CMD if read_only else QUERY_CMD + command = [cmd, self.name, query, "--compact"] + + # include timeout is specified + if isinstance(timeout, int): + command.extend(["timeout", timeout]) + elif timeout is not None: + raise Exception("Timeout argument must be a positive integer") + + # issue query + try: + response = await self.execute_command(*command) + return await AsyncQueryResult().initialize(self, response, profile) + except ResponseError as e: + if "unknown command" in str(e) and read_only: + # `GRAPH.RO_QUERY` is unavailable in older versions. + return await self.query(q, params, timeout, read_only=False) + raise e + except VersionMismatchException as e: + # client view over the graph schema is out of sync + # set client version and refresh local schema + self.version = e.version + self._refresh_schema() + # re-issue query + return await self.query(q, params, timeout, read_only) + + async def execution_plan(self, query, params=None): + """ + Get the execution plan for given query, + GRAPH.EXPLAIN returns an array of operations. + + Args: + query: the query that will be executed + params: query parameters + """ + query = self._build_params_header(params) + query - plan = self.execute_command("GRAPH.EXPLAIN", self.name, query) + plan = await self.execute_command(EXPLAIN_CMD, self.name, query) + if isinstance(plan[0], bytes): + plan = [b.decode() for b in plan] + return "\n".join(plan) + + async def explain(self, query, params=None): + """ + Get the execution plan for given query, + GRAPH.EXPLAIN returns ExecutionPlan object. + + Args: + query: the query that will be executed + params: query parameters + """ + query = self._build_params_header(params) + query + + plan = await self.execute_command(EXPLAIN_CMD, self.name, query) return ExecutionPlan(plan) + + async def flush(self): + """ + Commit the graph and reset the edges and the nodes to zero length. + """ + await self.commit() + self.nodes = {} + self.edges = [] |