summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/engine')
-rw-r--r--lib/sqlalchemy/engine/cursor.py6
-rw-r--r--lib/sqlalchemy/engine/result.py59
2 files changed, 14 insertions, 51 deletions
diff --git a/lib/sqlalchemy/engine/cursor.py b/lib/sqlalchemy/engine/cursor.py
index eb6f51c14..c65c4a058 100644
--- a/lib/sqlalchemy/engine/cursor.py
+++ b/lib/sqlalchemy/engine/cursor.py
@@ -51,6 +51,7 @@ from ..sql.compiler import RM_TYPE
from ..sql.type_api import TypeEngine
from ..util import compat
from ..util.typing import Literal
+from ..util.typing import Self
_UNPICKLED = util.symbol("unpickled")
@@ -1366,9 +1367,6 @@ class _NoResultMetaData(ResultMetaData):
_NO_RESULT_METADATA = _NoResultMetaData()
-SelfCursorResult = TypeVar("SelfCursorResult", bound="CursorResult[Any]")
-
-
def null_dml_result() -> IteratorResult[Any]:
it: IteratorResult[Any] = IteratorResult(_NoResultMetaData(), iter([]))
it._soft_close()
@@ -2133,7 +2131,7 @@ class CursorResult(Result[_T]):
self._soft_close(hard=True)
@_generative
- def yield_per(self: SelfCursorResult, num: int) -> SelfCursorResult:
+ def yield_per(self, num: int) -> Self:
self._yield_per = num
self.cursor_strategy.yield_per(self, self.cursor, num)
return self
diff --git a/lib/sqlalchemy/engine/result.py b/lib/sqlalchemy/engine/result.py
index 67151913e..d5b8057ef 100644
--- a/lib/sqlalchemy/engine/result.py
+++ b/lib/sqlalchemy/engine/result.py
@@ -42,6 +42,7 @@ from ..sql.base import InPlaceGenerative
from ..util import HasMemoized_ro_memoized_attribute
from ..util._has_cy import HAS_CYEXTENSION
from ..util.typing import Literal
+from ..util.typing import Self
if typing.TYPE_CHECKING or not HAS_CYEXTENSION:
from ._py_row import tuplegetter as tuplegetter
@@ -371,8 +372,6 @@ class _NoRow(Enum):
_NO_ROW = _NoRow._NO_ROW
-SelfResultInternal = TypeVar("SelfResultInternal", bound="ResultInternal[Any]")
-
class ResultInternal(InPlaceGenerative, Generic[_R]):
__slots__ = ()
@@ -819,9 +818,7 @@ class ResultInternal(InPlaceGenerative, Generic[_R]):
return row
@_generative
- def _column_slices(
- self: SelfResultInternal, indexes: Sequence[_KeyIndexType]
- ) -> SelfResultInternal:
+ def _column_slices(self, indexes: Sequence[_KeyIndexType]) -> Self:
real_result = (
self._real_result
if self._real_result
@@ -887,9 +884,6 @@ class _WithKeys:
return self._metadata.keys
-SelfResult = TypeVar("SelfResult", bound="Result[Any]")
-
-
class Result(_WithKeys, ResultInternal[Row[_TP]]):
"""Represent a set of database results.
@@ -929,7 +923,7 @@ class Result(_WithKeys, ResultInternal[Row[_TP]]):
def __init__(self, cursor_metadata: ResultMetaData):
self._metadata = cursor_metadata
- def __enter__(self: SelfResult) -> SelfResult:
+ def __enter__(self) -> Self:
return self
def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
@@ -971,7 +965,7 @@ class Result(_WithKeys, ResultInternal[Row[_TP]]):
raise NotImplementedError()
@_generative
- def yield_per(self: SelfResult, num: int) -> SelfResult:
+ def yield_per(self, num: int) -> Self:
"""Configure the row-fetching strategy to fetch ``num`` rows at a time.
This impacts the underlying behavior of the result when iterating over
@@ -1023,9 +1017,7 @@ class Result(_WithKeys, ResultInternal[Row[_TP]]):
return self
@_generative
- def unique(
- self: SelfResult, strategy: Optional[_UniqueFilterType] = None
- ) -> SelfResult:
+ def unique(self, strategy: Optional[_UniqueFilterType] = None) -> Self:
"""Apply unique filtering to the objects returned by this
:class:`_engine.Result`.
@@ -1065,9 +1057,7 @@ class Result(_WithKeys, ResultInternal[Row[_TP]]):
self._unique_filter_state = (set(), strategy)
return self
- def columns(
- self: SelfResultInternal, *col_expressions: _KeyIndexType
- ) -> SelfResultInternal:
+ def columns(self, *col_expressions: _KeyIndexType) -> Self:
r"""Establish the columns that should be returned in each row.
This method may be used to limit the columns returned as well
@@ -1582,9 +1572,6 @@ class Result(_WithKeys, ResultInternal[Row[_TP]]):
return MergedResult(self._metadata, (self,) + others)
-SelfFilterResult = TypeVar("SelfFilterResult", bound="FilterResult[Any]")
-
-
class FilterResult(ResultInternal[_R]):
"""A wrapper for a :class:`_engine.Result` that returns objects other than
:class:`_engine.Row` objects, such as dictionaries or scalar objects.
@@ -1607,14 +1594,14 @@ class FilterResult(ResultInternal[_R]):
_real_result: Result[Any]
- def __enter__(self: SelfFilterResult) -> SelfFilterResult:
+ def __enter__(self) -> Self:
return self
def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
self._real_result.__exit__(type_, value, traceback)
@_generative
- def yield_per(self: SelfFilterResult, num: int) -> SelfFilterResult:
+ def yield_per(self, num: int) -> Self:
"""Configure the row-fetching strategy to fetch ``num`` rows at a time.
The :meth:`_engine.FilterResult.yield_per` method is a pass through
@@ -1681,9 +1668,6 @@ class FilterResult(ResultInternal[_R]):
return self._real_result._fetchmany_impl(size=size)
-SelfScalarResult = TypeVar("SelfScalarResult", bound="ScalarResult[Any]")
-
-
class ScalarResult(FilterResult[_R]):
"""A wrapper for a :class:`_engine.Result` that returns scalar values
rather than :class:`_row.Row` values.
@@ -1718,9 +1702,7 @@ class ScalarResult(FilterResult[_R]):
self._unique_filter_state = real_result._unique_filter_state
- def unique(
- self: SelfScalarResult, strategy: Optional[_UniqueFilterType] = None
- ) -> SelfScalarResult:
+ def unique(self, strategy: Optional[_UniqueFilterType] = None) -> Self:
"""Apply unique filtering to the objects returned by this
:class:`_engine.ScalarResult`.
@@ -1817,9 +1799,6 @@ class ScalarResult(FilterResult[_R]):
)
-SelfTupleResult = TypeVar("SelfTupleResult", bound="TupleResult[Any]")
-
-
class TupleResult(FilterResult[_R], util.TypingOnly):
"""A :class:`_engine.Result` that's typed as returning plain
Python tuples instead of rows.
@@ -1989,9 +1968,6 @@ class TupleResult(FilterResult[_R], util.TypingOnly):
...
-SelfMappingResult = TypeVar("SelfMappingResult", bound="MappingResult")
-
-
class MappingResult(_WithKeys, FilterResult[RowMapping]):
"""A wrapper for a :class:`_engine.Result` that returns dictionary values
rather than :class:`_engine.Row` values.
@@ -2014,9 +1990,7 @@ class MappingResult(_WithKeys, FilterResult[RowMapping]):
if result._source_supports_scalars:
self._metadata = self._metadata._reduce([0])
- def unique(
- self: SelfMappingResult, strategy: Optional[_UniqueFilterType] = None
- ) -> SelfMappingResult:
+ def unique(self, strategy: Optional[_UniqueFilterType] = None) -> Self:
"""Apply unique filtering to the objects returned by this
:class:`_engine.MappingResult`.
@@ -2026,9 +2000,7 @@ class MappingResult(_WithKeys, FilterResult[RowMapping]):
self._unique_filter_state = (set(), strategy)
return self
- def columns(
- self: SelfMappingResult, *col_expressions: _KeyIndexType
- ) -> SelfMappingResult:
+ def columns(self, *col_expressions: _KeyIndexType) -> Self:
r"""Establish the columns that should be returned in each row."""
return self._column_slices(col_expressions)
@@ -2305,11 +2277,6 @@ def null_result() -> IteratorResult[Any]:
return IteratorResult(SimpleResultMetaData([]), iter([]))
-SelfChunkedIteratorResult = TypeVar(
- "SelfChunkedIteratorResult", bound="ChunkedIteratorResult[Any]"
-)
-
-
class ChunkedIteratorResult(IteratorResult[_TP]):
"""An :class:`_engine.IteratorResult` that works from an
iterator-producing callable.
@@ -2344,9 +2311,7 @@ class ChunkedIteratorResult(IteratorResult[_TP]):
self.dynamic_yield_per = dynamic_yield_per
@_generative
- def yield_per(
- self: SelfChunkedIteratorResult, num: int
- ) -> SelfChunkedIteratorResult:
+ def yield_per(self, num: int) -> Self:
# TODO: this throws away the iterator which may be holding
# onto a chunk. the yield_per cannot be changed once any
# rows have been fetched. either find a way to enforce this,