summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2014-10-10 17:15:19 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2014-10-10 17:15:19 -0400
commit95be42c06ff4e5f3528de42bb04dcba228ea74c2 (patch)
tree602636e859b40fe8809ff38c50162a5c9402f85d /test/sql
parent3a6cdff88429e047a684c0f5d6029a30d9aaa062 (diff)
downloadsqlalchemy-95be42c06ff4e5f3528de42bb04dcba228ea74c2.tar.gz
- :meth:`.Insert.from_select` now includes Python and SQL-expression
defaults if otherwise unspecified; the limitation where non- server column defaults aren't included in an INSERT FROM SELECT is now lifted and these expressions are rendered as constants into the SELECT statement.
Diffstat (limited to 'test/sql')
-rw-r--r--test/sql/test_defaults.py65
-rw-r--r--test/sql/test_insert.py82
2 files changed, 145 insertions, 2 deletions
diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py
index abce600df..10e557b76 100644
--- a/test/sql/test_defaults.py
+++ b/test/sql/test_defaults.py
@@ -14,6 +14,7 @@ from sqlalchemy.dialects import sqlite
from sqlalchemy.testing import fixtures
from sqlalchemy.util import u, b
from sqlalchemy import util
+import itertools
t = f = f2 = ts = currenttime = metadata = default_generator = None
@@ -1278,3 +1279,67 @@ class UnicodeDefaultsTest(fixtures.TestBase):
"foobar", Unicode(32),
default=default
)
+
+
+class InsertFromSelectTest(fixtures.TestBase):
+ __backend__ = True
+
+ def _fixture(self):
+ data = Table(
+ 'data', self.metadata,
+ Column('x', Integer),
+ Column('y', Integer)
+ )
+ data.create()
+ testing.db.execute(data.insert(), {'x': 2, 'y': 5}, {'x': 7, 'y': 12})
+ return data
+
+ @testing.provide_metadata
+ def test_insert_from_select_override_defaults(self):
+ data = self._fixture()
+
+ table = Table('sometable', self.metadata,
+ Column('x', Integer),
+ Column('foo', Integer, default=12),
+ Column('y', Integer))
+
+ table.create()
+
+ sel = select([data.c.x, data.c.y])
+
+ ins = table.insert().\
+ from_select(["x", "y"], sel)
+ testing.db.execute(ins)
+
+ eq_(
+ testing.db.execute(table.select().order_by(table.c.x)).fetchall(),
+ [(2, 12, 5), (7, 12, 12)]
+ )
+
+ @testing.provide_metadata
+ def test_insert_from_select_fn_defaults(self):
+ data = self._fixture()
+
+ counter = itertools.count(1)
+
+ def foo(ctx):
+ return next(counter)
+
+ table = Table('sometable', self.metadata,
+ Column('x', Integer),
+ Column('foo', Integer, default=foo),
+ Column('y', Integer))
+
+ table.create()
+
+ sel = select([data.c.x, data.c.y])
+
+ ins = table.insert().\
+ from_select(["x", "y"], sel)
+ testing.db.execute(ins)
+
+ # counter is only called once!
+ eq_(
+ testing.db.execute(table.select().order_by(table.c.x)).fetchall(),
+ [(2, 1, 5), (7, 1, 12)]
+ )
diff --git a/test/sql/test_insert.py b/test/sql/test_insert.py
index 232c5758b..bd4eaa3e2 100644
--- a/test/sql/test_insert.py
+++ b/test/sql/test_insert.py
@@ -183,7 +183,7 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
checkparams={"name_1": "foo"}
)
- def test_insert_from_select_select_no_defaults(self):
+ def test_insert_from_select_no_defaults(self):
metadata = MetaData()
table = Table('sometable', metadata,
Column('id', Integer, primary_key=True),
@@ -191,7 +191,7 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
sel = select([table1.c.myid]).where(table1.c.name == 'foo')
ins = table.insert().\
- from_select(["id"], sel)
+ from_select(["id"], sel, include_defaults=False)
self.assert_compile(
ins,
"INSERT INTO sometable (id) SELECT mytable.myid "
@@ -199,6 +199,84 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
checkparams={"name_1": "foo"}
)
+ def test_insert_from_select_with_sql_defaults(self):
+ metadata = MetaData()
+ table = Table('sometable', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('foo', Integer, default=func.foobar()))
+ table1 = self.tables.mytable
+ sel = select([table1.c.myid]).where(table1.c.name == 'foo')
+ ins = table.insert().\
+ from_select(["id"], sel)
+ self.assert_compile(
+ ins,
+ "INSERT INTO sometable (id, foo) SELECT "
+ "mytable.myid, foobar() AS foobar_1 "
+ "FROM mytable WHERE mytable.name = :name_1",
+ checkparams={"name_1": "foo"}
+ )
+
+ def test_insert_from_select_with_python_defaults(self):
+ metadata = MetaData()
+ table = Table('sometable', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('foo', Integer, default=12))
+ table1 = self.tables.mytable
+ sel = select([table1.c.myid]).where(table1.c.name == 'foo')
+ ins = table.insert().\
+ from_select(["id"], sel)
+ self.assert_compile(
+ ins,
+ "INSERT INTO sometable (id, foo) SELECT "
+ "mytable.myid, :foo AS anon_1 "
+ "FROM mytable WHERE mytable.name = :name_1",
+ # value filled in at execution time
+ checkparams={"name_1": "foo", "foo": None}
+ )
+
+ def test_insert_from_select_override_defaults(self):
+ metadata = MetaData()
+ table = Table('sometable', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('foo', Integer, default=12))
+ table1 = self.tables.mytable
+ sel = select(
+ [table1.c.myid, table1.c.myid.label('q')]).where(
+ table1.c.name == 'foo')
+ ins = table.insert().\
+ from_select(["id", "foo"], sel)
+ self.assert_compile(
+ ins,
+ "INSERT INTO sometable (id, foo) SELECT "
+ "mytable.myid, mytable.myid AS q "
+ "FROM mytable WHERE mytable.name = :name_1",
+ checkparams={"name_1": "foo"}
+ )
+
+ def test_insert_from_select_fn_defaults(self):
+ metadata = MetaData()
+
+ def foo(ctx):
+ return 12
+
+ table = Table('sometable', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('foo', Integer, default=foo))
+ table1 = self.tables.mytable
+ sel = select(
+ [table1.c.myid]).where(
+ table1.c.name == 'foo')
+ ins = table.insert().\
+ from_select(["id"], sel)
+ self.assert_compile(
+ ins,
+ "INSERT INTO sometable (id, foo) SELECT "
+ "mytable.myid, :foo AS anon_1 "
+ "FROM mytable WHERE mytable.name = :name_1",
+ # value filled in at execution time
+ checkparams={"name_1": "foo", "foo": None}
+ )
+
def test_insert_mix_select_values_exception(self):
table1 = self.tables.mytable
sel = select([table1.c.myid, table1.c.name]).where(