"""Illustrates use of the ``sqlalchemy.ext.asyncio.AsyncSession`` object for asynchronous ORM use. """ from __future__ import annotations import asyncio import datetime from typing import List from typing import Optional from sqlalchemy import ForeignKey from sqlalchemy import func from sqlalchemy.ext.asyncio import async_sessionmaker from sqlalchemy.ext.asyncio import AsyncAttrs from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.future import select from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import Mapped from sqlalchemy.orm import mapped_column from sqlalchemy.orm import relationship from sqlalchemy.orm import selectinload class Base(AsyncAttrs, DeclarativeBase): pass class A(Base): __tablename__ = "a" id: Mapped[int] = mapped_column(primary_key=True) data: Mapped[Optional[str]] create_date: Mapped[datetime.datetime] = mapped_column( server_default=func.now() ) bs: Mapped[List[B]] = relationship() class B(Base): __tablename__ = "b" id: Mapped[int] = mapped_column(primary_key=True) a_id: Mapped[int] = mapped_column(ForeignKey("a.id")) data: Mapped[Optional[str]] async def async_main(): """Main program function.""" engine = create_async_engine( "postgresql+asyncpg://scott:tiger@localhost/test", echo=True, ) async with engine.begin() as conn: await conn.run_sync(Base.metadata.drop_all) async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) # expire_on_commit=False will prevent attributes from being expired # after commit. async_session = async_sessionmaker(engine, expire_on_commit=False) async with async_session() as session: async with session.begin(): session.add_all( [ A(bs=[B(), B()], data="a1"), A(bs=[B()], data="a2"), A(bs=[B(), B()], data="a3"), ] ) # for relationship loading, eager loading should be applied. stmt = select(A).options(selectinload(A.bs)) # AsyncSession.execute() is used for 2.0 style ORM execution # (same as the synchronous API). result = await session.scalars(stmt) # result is a buffered Result object. for a1 in result: print(a1) print(f"created at: {a1.create_date}") for b1 in a1.bs: print(b1) # for streaming ORM results, AsyncSession.stream() may be used. result = await session.stream(stmt) # result is a streaming AsyncResult object. async for a1 in result.scalars(): print(a1) for b1 in a1.bs: print(b1) result = await session.scalars(select(A).order_by(A.id)) a1 = result.one() a1.data = "new data" await session.commit() # use the AsyncAttrs interface to accommodate for a lazy load for b1 in await a1.awaitable_attrs.bs: print(b1) asyncio.run(async_main())