summaryrefslogtreecommitdiff
path: root/redis/asyncio
diff options
context:
space:
mode:
Diffstat (limited to 'redis/asyncio')
-rw-r--r--redis/asyncio/client.py12
-rw-r--r--redis/asyncio/cluster.py12
2 files changed, 19 insertions, 5 deletions
diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py
index 3d59016..5c0b546 100644
--- a/redis/asyncio/client.py
+++ b/redis/asyncio/client.py
@@ -1349,10 +1349,16 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
conn = cast(Connection, conn)
try:
- return await conn.retry.call_with_retry(
- lambda: execute(conn, stack, raise_on_error),
- lambda error: self._disconnect_raise_reset(conn, error),
+ return await asyncio.shield(
+ conn.retry.call_with_retry(
+ lambda: execute(conn, stack, raise_on_error),
+ lambda error: self._disconnect_raise_reset(conn, error),
+ )
)
+ except asyncio.CancelledError:
+ # not supposed to be possible, yet here we are
+ await conn.disconnect(nowait=True)
+ raise
finally:
await self.reset()
diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py
index 3fe3ebc..8dfb1cb 100644
--- a/redis/asyncio/cluster.py
+++ b/redis/asyncio/cluster.py
@@ -879,10 +879,18 @@ class ClusterNode:
await connection.send_packed_command(connection.pack_command(*args), False)
# Read response
+ return await asyncio.shield(
+ self._parse_and_release(connection, args[0], **kwargs)
+ )
+
+ async def _parse_and_release(self, connection, *args, **kwargs):
try:
- return await self.parse_response(connection, args[0], **kwargs)
+ return await self.parse_response(connection, *args, **kwargs)
+ except asyncio.CancelledError:
+ # should not be possible
+ await connection.disconnect(nowait=True)
+ raise
finally:
- # Release connection
self._free.append(connection)
async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool: