summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/testing/suite/test_results.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/testing/suite/test_results.py')
-rw-r--r--lib/sqlalchemy/testing/suite/test_results.py81
1 files changed, 64 insertions, 17 deletions
diff --git a/lib/sqlalchemy/testing/suite/test_results.py b/lib/sqlalchemy/testing/suite/test_results.py
index 2eb986c74..e6f6068c8 100644
--- a/lib/sqlalchemy/testing/suite/test_results.py
+++ b/lib/sqlalchemy/testing/suite/test_results.py
@@ -238,6 +238,8 @@ class ServerSideCursorsTest(
elif self.engine.dialect.driver == "mysqldb":
sscursor = __import__("MySQLdb.cursors").cursors.SSCursor
return isinstance(cursor, sscursor)
+ elif self.engine.dialect.driver == "asyncpg":
+ return cursor.server_side
else:
return False
@@ -331,29 +333,74 @@ class ServerSideCursorsTest(
result.close()
@testing.provide_metadata
- def test_roundtrip(self):
+ def test_roundtrip_fetchall(self):
md = self.metadata
- self._fixture(True)
+ engine = self._fixture(True)
test_table = Table(
"test_table",
md,
Column("id", Integer, primary_key=True),
Column("data", String(50)),
)
- test_table.create(checkfirst=True)
- test_table.insert().execute(data="data1")
- test_table.insert().execute(data="data2")
- eq_(
- test_table.select().order_by(test_table.c.id).execute().fetchall(),
- [(1, "data1"), (2, "data2")],
- )
- test_table.update().where(test_table.c.id == 2).values(
- data=test_table.c.data + " updated"
- ).execute()
- eq_(
- test_table.select().order_by(test_table.c.id).execute().fetchall(),
- [(1, "data1"), (2, "data2 updated")],
+
+ with engine.connect() as connection:
+ test_table.create(connection, checkfirst=True)
+ connection.execute(test_table.insert(), dict(data="data1"))
+ connection.execute(test_table.insert(), dict(data="data2"))
+ eq_(
+ connection.execute(
+ test_table.select().order_by(test_table.c.id)
+ ).fetchall(),
+ [(1, "data1"), (2, "data2")],
+ )
+ connection.execute(
+ test_table.update()
+ .where(test_table.c.id == 2)
+ .values(data=test_table.c.data + " updated")
+ )
+ eq_(
+ connection.execute(
+ test_table.select().order_by(test_table.c.id)
+ ).fetchall(),
+ [(1, "data1"), (2, "data2 updated")],
+ )
+ connection.execute(test_table.delete())
+ eq_(
+ connection.scalar(
+ select([func.count("*")]).select_from(test_table)
+ ),
+ 0,
+ )
+
+ @testing.provide_metadata
+ def test_roundtrip_fetchmany(self):
+ md = self.metadata
+
+ engine = self._fixture(True)
+ test_table = Table(
+ "test_table",
+ md,
+ Column("id", Integer, primary_key=True),
+ Column("data", String(50)),
)
- test_table.delete().execute()
- eq_(select([func.count("*")]).select_from(test_table).scalar(), 0)
+
+ with engine.connect() as connection:
+ test_table.create(connection, checkfirst=True)
+ connection.execute(
+ test_table.insert(),
+ [dict(data="data%d" % i) for i in range(1, 20)],
+ )
+
+ result = connection.execute(
+ test_table.select().order_by(test_table.c.id)
+ )
+
+ eq_(
+ result.fetchmany(5), [(i, "data%d" % i) for i in range(1, 6)],
+ )
+ eq_(
+ result.fetchmany(10),
+ [(i, "data%d" % i) for i in range(6, 16)],
+ )
+ eq_(result.fetchall(), [(i, "data%d" % i) for i in range(16, 20)])