diff options
Diffstat (limited to 'redis/asyncio')
-rw-r--r-- | redis/asyncio/client.py | 12 | ||||
-rw-r--r-- | redis/asyncio/cluster.py | 12 |
2 files changed, 19 insertions, 5 deletions
diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 3d59016..5c0b546 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -1349,10 +1349,16 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass] conn = cast(Connection, conn) try: - return await conn.retry.call_with_retry( - lambda: execute(conn, stack, raise_on_error), - lambda error: self._disconnect_raise_reset(conn, error), + return await asyncio.shield( + conn.retry.call_with_retry( + lambda: execute(conn, stack, raise_on_error), + lambda error: self._disconnect_raise_reset(conn, error), + ) ) + except asyncio.CancelledError: + # not supposed to be possible, yet here we are + await conn.disconnect(nowait=True) + raise finally: await self.reset() diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 3fe3ebc..8dfb1cb 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -879,10 +879,18 @@ class ClusterNode: await connection.send_packed_command(connection.pack_command(*args), False) # Read response + return await asyncio.shield( + self._parse_and_release(connection, args[0], **kwargs) + ) + + async def _parse_and_release(self, connection, *args, **kwargs): try: - return await self.parse_response(connection, args[0], **kwargs) + return await self.parse_response(connection, *args, **kwargs) + except asyncio.CancelledError: + # should not be possible + await connection.disconnect(nowait=True) + raise finally: - # Release connection self._free.append(connection) async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool: |