summaryrefslogtreecommitdiff
path: root/test/engine/test_execute.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r--test/engine/test_execute.py186
1 files changed, 186 insertions, 0 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index fbb1878dc..5ea5d3515 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -21,6 +21,8 @@ from sqlalchemy.testing import fixtures
from sqlalchemy.testing.mock import Mock, call, patch
from contextlib import contextmanager
from sqlalchemy.util import nested
+from sqlalchemy.testing.assertsql import CompiledSQL
+
users, metadata, users_autoinc = None, None, None
@@ -805,6 +807,40 @@ class CompiledCacheTest(fixtures.TestBase):
eq_(compile_mock.call_count, 1)
eq_(len(cache), 1)
+ @testing.requires.schemas
+ @testing.provide_metadata
+ def test_schema_translate_in_key(self):
+ Table(
+ 'x', self.metadata, Column('q', Integer))
+ Table(
+ 'x', self.metadata, Column('q', Integer),
+ schema=config.test_schema)
+ self.metadata.create_all()
+
+ m = MetaData()
+ t1 = Table('x', m, Column('q', Integer))
+ ins = t1.insert()
+ stmt = select([t1.c.q])
+
+ cache = {}
+ with config.db.connect().execution_options(
+ compiled_cache=cache,
+ ) as conn:
+ conn.execute(ins, {"q": 1})
+ eq_(conn.scalar(stmt), 1)
+
+ with config.db.connect().execution_options(
+ compiled_cache=cache,
+ schema_translate_map={None: config.test_schema}
+ ) as conn:
+ conn.execute(ins, {"q": 2})
+ eq_(conn.scalar(stmt), 2)
+
+ with config.db.connect().execution_options(
+ compiled_cache=cache,
+ ) as conn:
+ eq_(conn.scalar(stmt), 1)
+
class MockStrategyTest(fixtures.TestBase):
@@ -989,6 +1025,156 @@ class ResultProxyTest(fixtures.TestBase):
finally:
r.close()
+class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults):
+ __requires__ = 'schemas',
+ __backend__ = True
+
+ def test_create_table(self):
+ map_ = {
+ None: config.test_schema,
+ "foo": config.test_schema, "bar": None}
+
+ metadata = MetaData()
+ t1 = Table('t1', metadata, Column('x', Integer))
+ t2 = Table('t2', metadata, Column('x', Integer), schema="foo")
+ t3 = Table('t3', metadata, Column('x', Integer), schema="bar")
+
+ with self.sql_execution_asserter(config.db) as asserter:
+ with config.db.connect().execution_options(
+ schema_translate_map=map_) as conn:
+
+ t1.create(conn)
+ t2.create(conn)
+ t3.create(conn)
+
+ t3.drop(conn)
+ t2.drop(conn)
+ t1.drop(conn)
+
+ asserter.assert_(
+ CompiledSQL("CREATE TABLE %s.t1 (x INTEGER)" % config.test_schema),
+ CompiledSQL("CREATE TABLE %s.t2 (x INTEGER)" % config.test_schema),
+ CompiledSQL("CREATE TABLE t3 (x INTEGER)"),
+ CompiledSQL("DROP TABLE t3"),
+ CompiledSQL("DROP TABLE %s.t2" % config.test_schema),
+ CompiledSQL("DROP TABLE %s.t1" % config.test_schema)
+ )
+
+ def _fixture(self):
+ metadata = self.metadata
+ Table(
+ 't1', metadata, Column('x', Integer),
+ schema=config.test_schema)
+ Table(
+ 't2', metadata, Column('x', Integer),
+ schema=config.test_schema)
+ Table('t3', metadata, Column('x', Integer), schema=None)
+ metadata.create_all()
+
+ def test_ddl_hastable(self):
+
+ map_ = {
+ None: config.test_schema,
+ "foo": config.test_schema, "bar": None}
+
+ metadata = MetaData()
+ Table('t1', metadata, Column('x', Integer))
+ Table('t2', metadata, Column('x', Integer), schema="foo")
+ Table('t3', metadata, Column('x', Integer), schema="bar")
+
+ with config.db.connect().execution_options(
+ schema_translate_map=map_) as conn:
+ metadata.create_all(conn)
+
+ assert config.db.has_table('t1', schema=config.test_schema)
+ assert config.db.has_table('t2', schema=config.test_schema)
+ assert config.db.has_table('t3', schema=None)
+
+ with config.db.connect().execution_options(
+ schema_translate_map=map_) as conn:
+ metadata.drop_all(conn)
+
+ assert not config.db.has_table('t1', schema=config.test_schema)
+ assert not config.db.has_table('t2', schema=config.test_schema)
+ assert not config.db.has_table('t3', schema=None)
+
+ @testing.provide_metadata
+ def test_crud(self):
+ self._fixture()
+
+ map_ = {
+ None: config.test_schema,
+ "foo": config.test_schema, "bar": None}
+
+ metadata = MetaData()
+ t1 = Table('t1', metadata, Column('x', Integer))
+ t2 = Table('t2', metadata, Column('x', Integer), schema="foo")
+ t3 = Table('t3', metadata, Column('x', Integer), schema="bar")
+
+ with self.sql_execution_asserter(config.db) as asserter:
+ with config.db.connect().execution_options(
+ schema_translate_map=map_) as conn:
+
+ conn.execute(t1.insert(), {'x': 1})
+ conn.execute(t2.insert(), {'x': 1})
+ conn.execute(t3.insert(), {'x': 1})
+
+ conn.execute(t1.update().values(x=1).where(t1.c.x == 1))
+ conn.execute(t2.update().values(x=2).where(t2.c.x == 1))
+ conn.execute(t3.update().values(x=3).where(t3.c.x == 1))
+
+ eq_(conn.scalar(select([t1.c.x])), 1)
+ eq_(conn.scalar(select([t2.c.x])), 2)
+ eq_(conn.scalar(select([t3.c.x])), 3)
+
+ conn.execute(t1.delete())
+ conn.execute(t2.delete())
+ conn.execute(t3.delete())
+
+ asserter.assert_(
+ CompiledSQL(
+ "INSERT INTO %s.t1 (x) VALUES (:x)" % config.test_schema),
+ CompiledSQL(
+ "INSERT INTO %s.t2 (x) VALUES (:x)" % config.test_schema),
+ CompiledSQL(
+ "INSERT INTO t3 (x) VALUES (:x)"),
+ CompiledSQL(
+ "UPDATE %s.t1 SET x=:x WHERE %s.t1.x = :x_1" % (
+ config.test_schema, config.test_schema)),
+ CompiledSQL(
+ "UPDATE %s.t2 SET x=:x WHERE %s.t2.x = :x_1" % (
+ config.test_schema, config.test_schema)),
+ CompiledSQL("UPDATE t3 SET x=:x WHERE t3.x = :x_1"),
+ CompiledSQL("SELECT %s.t1.x FROM %s.t1" % (
+ config.test_schema, config.test_schema)),
+ CompiledSQL("SELECT %s.t2.x FROM %s.t2" % (
+ config.test_schema, config.test_schema)),
+ CompiledSQL("SELECT t3.x FROM t3"),
+ CompiledSQL("DELETE FROM %s.t1" % config.test_schema),
+ CompiledSQL("DELETE FROM %s.t2" % config.test_schema),
+ CompiledSQL("DELETE FROM t3")
+ )
+
+ @testing.provide_metadata
+ def test_via_engine(self):
+ self._fixture()
+
+ map_ = {
+ None: config.test_schema,
+ "foo": config.test_schema, "bar": None}
+
+ metadata = MetaData()
+ t2 = Table('t2', metadata, Column('x', Integer), schema="foo")
+
+ with self.sql_execution_asserter(config.db) as asserter:
+ eng = config.db.execution_options(schema_translate_map=map_)
+ conn = eng.connect()
+ conn.execute(select([t2.c.x]))
+ asserter.assert_(
+ CompiledSQL("SELECT %s.t2.x FROM %s.t2" % (
+ config.test_schema, config.test_schema)),
+ )
+
class ExecutionOptionsTest(fixtures.TestBase):