diff options
Diffstat (limited to 'lib/sqlalchemy/testing/suite/test_results.py')
| -rw-r--r-- | lib/sqlalchemy/testing/suite/test_results.py | 81 |
1 files changed, 64 insertions, 17 deletions
diff --git a/lib/sqlalchemy/testing/suite/test_results.py b/lib/sqlalchemy/testing/suite/test_results.py index 2eb986c74..e6f6068c8 100644 --- a/lib/sqlalchemy/testing/suite/test_results.py +++ b/lib/sqlalchemy/testing/suite/test_results.py @@ -238,6 +238,8 @@ class ServerSideCursorsTest( elif self.engine.dialect.driver == "mysqldb": sscursor = __import__("MySQLdb.cursors").cursors.SSCursor return isinstance(cursor, sscursor) + elif self.engine.dialect.driver == "asyncpg": + return cursor.server_side else: return False @@ -331,29 +333,74 @@ class ServerSideCursorsTest( result.close() @testing.provide_metadata - def test_roundtrip(self): + def test_roundtrip_fetchall(self): md = self.metadata - self._fixture(True) + engine = self._fixture(True) test_table = Table( "test_table", md, Column("id", Integer, primary_key=True), Column("data", String(50)), ) - test_table.create(checkfirst=True) - test_table.insert().execute(data="data1") - test_table.insert().execute(data="data2") - eq_( - test_table.select().order_by(test_table.c.id).execute().fetchall(), - [(1, "data1"), (2, "data2")], - ) - test_table.update().where(test_table.c.id == 2).values( - data=test_table.c.data + " updated" - ).execute() - eq_( - test_table.select().order_by(test_table.c.id).execute().fetchall(), - [(1, "data1"), (2, "data2 updated")], + + with engine.connect() as connection: + test_table.create(connection, checkfirst=True) + connection.execute(test_table.insert(), dict(data="data1")) + connection.execute(test_table.insert(), dict(data="data2")) + eq_( + connection.execute( + test_table.select().order_by(test_table.c.id) + ).fetchall(), + [(1, "data1"), (2, "data2")], + ) + connection.execute( + test_table.update() + .where(test_table.c.id == 2) + .values(data=test_table.c.data + " updated") + ) + eq_( + connection.execute( + test_table.select().order_by(test_table.c.id) + ).fetchall(), + [(1, "data1"), (2, "data2 updated")], + ) + connection.execute(test_table.delete()) + eq_( + connection.scalar( + select([func.count("*")]).select_from(test_table) + ), + 0, + ) + + @testing.provide_metadata + def test_roundtrip_fetchmany(self): + md = self.metadata + + engine = self._fixture(True) + test_table = Table( + "test_table", + md, + Column("id", Integer, primary_key=True), + Column("data", String(50)), ) - test_table.delete().execute() - eq_(select([func.count("*")]).select_from(test_table).scalar(), 0) + + with engine.connect() as connection: + test_table.create(connection, checkfirst=True) + connection.execute( + test_table.insert(), + [dict(data="data%d" % i) for i in range(1, 20)], + ) + + result = connection.execute( + test_table.select().order_by(test_table.c.id) + ) + + eq_( + result.fetchmany(5), [(i, "data%d" % i) for i in range(1, 6)], + ) + eq_( + result.fetchmany(10), + [(i, "data%d" % i) for i in range(6, 16)], + ) + eq_(result.fetchall(), [(i, "data%d" % i) for i in range(16, 20)]) |
